From ec088cd6c553c388c234a7b51f0cdc9318405db7 Mon Sep 17 00:00:00 2001 From: Vadim Date: Thu, 12 Sep 2024 15:14:37 +0200 Subject: [PATCH] feat: add skip batch mint indexing feature --- .../src/dao/generated/batch_mint_to_verify.rs | 3 + integration_tests/Cargo.toml | 3 +- .../integration_tests/batch_mint_tests.rs | 324 +++++++++++++++++- .../tests/integration_tests/cnft_tests.rs | 8 + .../tests/integration_tests/common.rs | 17 +- .../tests/integration_tests/mpl_core_tests.rs | 14 + ...alize_tree_with_root_instruction_handle.rs | 24 ++ nft_ingester/src/account_updates.rs | 1 + nft_ingester/src/config.rs | 2 +- nft_ingester/src/main.rs | 3 +- nft_ingester/src/transaction_notifications.rs | 15 +- .../src/batch_minting/batch_mint_persister.rs | 17 +- .../src/bubblegum/finalize_tree_with_root.rs | 14 +- program_transformers/src/bubblegum/mod.rs | 12 +- program_transformers/src/lib.rs | 61 +++- 15 files changed, 489 insertions(+), 29 deletions(-) diff --git a/digital_asset_types/src/dao/generated/batch_mint_to_verify.rs b/digital_asset_types/src/dao/generated/batch_mint_to_verify.rs index 5be802e9..d563a310 100644 --- a/digital_asset_types/src/dao/generated/batch_mint_to_verify.rs +++ b/digital_asset_types/src/dao/generated/batch_mint_to_verify.rs @@ -20,6 +20,7 @@ pub struct Model { pub url: String, pub created_at_slot: i64, pub signature: String, + pub merkle_tree: Vec, pub staker: Vec, pub collection: Option>, pub download_attempts: i32, @@ -33,6 +34,7 @@ pub enum Column { Url, CreatedAtSlot, Signature, + MerkleTree, Staker, Collection, DownloadAttempts, @@ -63,6 +65,7 @@ impl ColumnTrait for Column { Self::Url => ColumnType::String(None).def(), Self::CreatedAtSlot => ColumnType::BigInteger.def(), Self::Signature => ColumnType::String(None).def(), + Self::MerkleTree => ColumnType::Binary.def(), Self::Staker => ColumnType::Binary.def(), Self::Collection => ColumnType::Binary.def().null(), Self::DownloadAttempts => ColumnType::Integer.def(), diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index c389b61a..cbbc9c25 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -39,4 +39,5 @@ tempfile = { workspace = true } async-channel = { workspace = true } cadence = { workspace = true } cadence-macros = { workspace = true } -bubblegum-batch-sdk = { workspace = true } \ No newline at end of file +bubblegum-batch-sdk = { workspace = true } +spl-account-compression = { workspace = true } \ No newline at end of file diff --git a/integration_tests/tests/integration_tests/batch_mint_tests.rs b/integration_tests/tests/integration_tests/batch_mint_tests.rs index 2a4853b2..0ef24f33 100644 --- a/integration_tests/tests/integration_tests/batch_mint_tests.rs +++ b/integration_tests/tests/integration_tests/batch_mint_tests.rs @@ -1,4 +1,6 @@ +use crate::common::Network; use crate::common::TestSetup; +use crate::common::TestSetupOptions; use borsh::BorshSerialize; use bubblegum_batch_sdk::batch_mint_client::BatchMintClient; use bubblegum_batch_sdk::batch_mint_validations::generate_batch_mint; @@ -8,6 +10,7 @@ use cadence::{NopMetricSink, StatsdClient}; use cadence_macros::set_global_default; use das_api::api::ApiContract; use das_api::api::GetAssetProof; +use digital_asset_types::dao::asset; use digital_asset_types::dao::sea_orm_active_enums::{ BatchMintFailStatus, BatchMintPersistingState, }; @@ -15,7 +18,11 @@ use digital_asset_types::dao::{batch_mint, batch_mint_to_verify}; use flatbuffers::FlatBufferBuilder; use mpl_bubblegum::types::Collection; use mpl_bubblegum::types::Creator; +use mpl_bubblegum::types::TokenProgramVersion; +use mpl_bubblegum::types::Version; use mpl_bubblegum::types::{LeafSchema, MetadataArgs}; +use mpl_bubblegum::utils::get_asset_id; +use mpl_bubblegum::LeafSchemaEvent; use nft_ingester::batch_mint_updates::create_batch_mint_notification_channel; use nft_ingester::plerkle::PlerkleTransactionInfo; use plerkle_serialization::root_as_transaction_info; @@ -24,6 +31,7 @@ use program_transformers::batch_minting::batch_mint_persister::{ BatchMintPersister, MockBatchMintDownloader, }; use program_transformers::error::BatchMintValidationError; +use program_transformers::ProgramTransformer; use sea_orm::sea_query::OnConflict; use sea_orm::{ColumnTrait, ConnectionTrait, DbBackend, IntoActiveModel, QueryTrait, Set}; use sea_orm::{EntityTrait, QueryFilter}; @@ -37,6 +45,12 @@ use solana_sdk::signature::Signature; use solana_sdk::signer::Signer; use solana_sdk::transaction::{SanitizedTransaction, Transaction}; use solana_transaction_status::{InnerInstruction, InnerInstructions, TransactionStatusMeta}; +use spl_account_compression::events::ApplicationDataEvent; +use spl_account_compression::events::ApplicationDataEventV1; +use spl_account_compression::events::ChangeLogEventV1; +use spl_account_compression::state::PathNode; +use spl_account_compression::AccountCompressionEvent; +use spl_account_compression::ChangeLogEvent; use spl_concurrent_merkle_tree::concurrent_merkle_tree::ConcurrentMerkleTree; use std::collections::HashMap; use std::fs::File; @@ -69,7 +83,7 @@ async fn save_batch_mint_to_queue_test() { // took it from Bubblegum client // this value is generated by Anchor library, it's instruction identifier - let mut instruction_data = vec![101, 214, 253, 135, 176, 170, 11, 235]; + let mut instruction_data = vec![77, 73, 220, 153, 126, 225, 64, 204]; instruction_data.extend(batch_mint_instruction_data.try_to_vec().unwrap().iter()); let transaction = SanitizedTransaction::from_transaction_for_tests(Transaction { @@ -85,11 +99,13 @@ async fn save_batch_mint_to_queue_test() { Pubkey::from_str("BGUMAp9Gq7iTEuizy4pqaxsTyUCBK68MDfK752saRPUY").unwrap(), Pubkey::new_unique(), Pubkey::new_unique(), + Pubkey::new_unique(), + Pubkey::new_unique(), ], recent_blockhash: [1; 32].into(), instructions: vec![CompiledInstruction { program_id_index: 1, - accounts: vec![2, 3], + accounts: vec![0, 4, 2, 3, 5], data: instruction_data, }], }, @@ -144,6 +160,303 @@ async fn save_batch_mint_to_queue_test() { assert_eq!(r.url, metadata_url); } +#[tokio::test] +async fn skip_batched_minted_trees_test() { + let client = StatsdClient::builder("batch_mint.test", NopMetricSink) + .with_error_handler(|e| eprintln!("metric error: {}", e)) + .build(); + + set_global_default(client); + + let setup = TestSetup::new_with_options( + "skip_batched_minted_trees_test".to_string(), + TestSetupOptions { + network: Some(Network::Devnet), + skip_batch_minted_trees: false, + }, + ) + .await; + let metadata_url = "url".to_string(); + let metadata_hash = "hash".to_string(); + + // arbitrary data + let batch_mint_instruction_data = + mpl_bubblegum::instructions::FinalizeTreeWithRootInstructionArgs { + root: [1; 32], + rightmost_leaf: [1; 32], + rightmost_index: 99, + metadata_url: metadata_url.clone(), + metadata_hash: metadata_hash.clone(), + }; + + // took it from Bubblegum client + // this value is generated by Anchor library, it's instruction identifier + let mut instruction_data = vec![77, 73, 220, 153, 126, 225, 64, 204]; + instruction_data.extend(batch_mint_instruction_data.try_to_vec().unwrap().iter()); + + let merkle_tree_id = Pubkey::new_unique(); + + let transaction = SanitizedTransaction::from_transaction_for_tests(Transaction { + signatures: vec![Signature::new_unique()], + message: Message { + header: MessageHeader { + num_required_signatures: 1, + num_readonly_signed_accounts: 0, + num_readonly_unsigned_accounts: 0, + }, + account_keys: vec![ + Pubkey::new_unique(), + Pubkey::from_str("BGUMAp9Gq7iTEuizy4pqaxsTyUCBK68MDfK752saRPUY").unwrap(), + Pubkey::new_unique(), + Pubkey::new_unique(), + merkle_tree_id, + Pubkey::new_unique(), + ], + recent_blockhash: [1; 32].into(), + instructions: vec![CompiledInstruction { + program_id_index: 1, + // here important only 1th index - 4 + accounts: vec![0, 4, 2, 3, 5], + data: instruction_data, + }], + }, + }); + + // inner instruction is useless here but required by transaction parser + let transaction_status_meta = TransactionStatusMeta { + inner_instructions: Some(vec![InnerInstructions { + index: 0, + instructions: vec![InnerInstruction { + instruction: CompiledInstruction { + program_id_index: 2, + accounts: vec![], + data: vec![], + }, + stack_height: None, + }], + }]), + ..Default::default() + }; + + let transaction_info = + plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaTransactionInfoV2 { + signature: &Signature::new_unique(), + is_vote: false, + transaction: &transaction, + transaction_status_meta: &transaction_status_meta, + index: 0, + }; + let builder = FlatBufferBuilder::new(); + let builder = serialize_transaction(builder, &transaction_info, 10); + let transaction_info = PlerkleTransactionInfo( + root_as_transaction_info(builder.finished_data().to_vec().as_slice()).unwrap(), + ) + .try_into() + .unwrap(); + + setup + .transformer + .handle_transaction(&transaction_info) + .await + .unwrap(); + + // mint asset and make sure we save it because program transformer is configured + // to save batched minted updates + mint_asset_to_tree(merkle_tree_id, &setup.transformer, 1, 1).await; + + let assets = asset::Entity::find().all(setup.db.as_ref()).await.unwrap(); + + assert_eq!(assets.len(), 1); + + // now we change program transformer and tell it not to save updates related to batched minted trees + // at this moment there is only 1 such tree + // also we make sure that during initialization program transformer selects all the batched tree ids from the DB + let setup = TestSetup::new_with_options( + "skip_batched_minted_trees_test".to_string(), + TestSetupOptions { + network: Some(Network::Devnet), + skip_batch_minted_trees: true, + }, + ) + .await; + + mint_asset_to_tree(merkle_tree_id, &setup.transformer, 2, 2).await; + + let assets = asset::Entity::find().all(setup.db.as_ref()).await.unwrap(); + + assert_eq!(assets.len(), 1); + + // now we switch back to saving batched mint tree updates and check if it works + let setup = TestSetup::new_with_options( + "skip_batched_minted_trees_test".to_string(), + TestSetupOptions { + network: Some(Network::Devnet), + skip_batch_minted_trees: false, + }, + ) + .await; + + mint_asset_to_tree(merkle_tree_id, &setup.transformer, 3, 3).await; + + let assets = asset::Entity::find().all(setup.db.as_ref()).await.unwrap(); + + assert_eq!(assets.len(), 2); +} + +async fn mint_asset_to_tree( + merkle_tree: Pubkey, + transformer: &ProgramTransformer, + index: u64, + sequence: u64, +) { + let mint_instruction_data = mpl_bubblegum::instructions::MintV1InstructionArgs { + metadata: MetadataArgs { + name: "name".to_string(), + symbol: "symbol".to_string(), + uri: "uri".to_string(), + seller_fee_basis_points: 1, + primary_sale_happened: false, + is_mutable: false, + edition_nonce: None, + token_standard: None, + collection: None, + uses: None, + token_program_version: TokenProgramVersion::Original, + creators: vec![], + }, + }; + + // took it from Bubblegum client + // this value is generated by Anchor library, it's instruction identifier + let mut instruction_data = vec![145, 98, 192, 118, 184, 147, 118, 104]; + instruction_data.extend(mint_instruction_data.try_to_vec().unwrap().iter()); + + let transaction = SanitizedTransaction::from_transaction_for_tests(Transaction { + signatures: vec![Signature::new_unique()], + message: Message { + header: MessageHeader { + num_required_signatures: 1, + num_readonly_signed_accounts: 0, + num_readonly_unsigned_accounts: 0, + }, + account_keys: vec![ + Pubkey::new_unique(), + Pubkey::from_str("BGUMAp9Gq7iTEuizy4pqaxsTyUCBK68MDfK752saRPUY").unwrap(), + Pubkey::from_str("noopb9bkMVfRPU8AsbpTUg8AQkHtKwMYZiFUjNRtMmV").unwrap(), + Pubkey::new_unique(), + merkle_tree, + Pubkey::new_unique(), + ], + recent_blockhash: [1; 32].into(), + instructions: vec![CompiledInstruction { + program_id_index: 1, + // here important only 3th index - 4 + accounts: vec![0, 3, 2, 4, 5], + data: instruction_data, + }], + }, + }); + + let change_log = ChangeLogEventV1 { + id: merkle_tree, + path: vec![ + PathNode { + node: Pubkey::new_unique().to_bytes(), + index: 32, + }, + PathNode { + node: Pubkey::new_unique().to_bytes(), + index: 16, + }, + PathNode { + node: Pubkey::new_unique().to_bytes(), + index: 8, + }, + PathNode { + node: Pubkey::new_unique().to_bytes(), + index: 4, + }, + PathNode { + node: Pubkey::new_unique().to_bytes(), + index: 2, + }, + PathNode { + node: Pubkey::new_unique().to_bytes(), + index: 1, + }, + ], + seq: sequence, + index: index as u32, + }; + + let change_log_event = AccountCompressionEvent::ChangeLog(ChangeLogEvent::V1(change_log)); + + let leaf_schema = LeafSchemaEvent::new( + Version::V1, + LeafSchema::V1 { + id: get_asset_id(&merkle_tree, index), + owner: Pubkey::new_unique(), + delegate: Pubkey::new_unique(), + nonce: index, + data_hash: Pubkey::new_unique().to_bytes(), + creator_hash: Pubkey::new_unique().to_bytes(), + }, + Pubkey::new_unique().to_bytes(), + ); + let leaf_schema_event = AccountCompressionEvent::ApplicationData(ApplicationDataEvent::V1( + ApplicationDataEventV1 { + application_data: leaf_schema.try_to_vec().unwrap(), + }, + )); + + // inner instruction is useless here but required by transaction parser + let transaction_status_meta = TransactionStatusMeta { + inner_instructions: Some(vec![InnerInstructions { + index: 0, + instructions: vec![ + InnerInstruction { + instruction: CompiledInstruction { + program_id_index: 2, + accounts: vec![], + data: change_log_event.try_to_vec().unwrap(), + }, + stack_height: None, + }, + InnerInstruction { + instruction: CompiledInstruction { + program_id_index: 2, + accounts: vec![], + data: leaf_schema_event.try_to_vec().unwrap(), + }, + stack_height: None, + }, + ], + }]), + ..Default::default() + }; + + let transaction_info = + plerkle_serialization::solana_geyser_plugin_interface_shims::ReplicaTransactionInfoV2 { + signature: &Signature::new_unique(), + is_vote: false, + transaction: &transaction, + transaction_status_meta: &transaction_status_meta, + index: 0, + }; + let builder = FlatBufferBuilder::new(); + let builder = serialize_transaction(builder, &transaction_info, 10); + let transaction_info = PlerkleTransactionInfo( + root_as_transaction_info(builder.finished_data().to_vec().as_slice()).unwrap(), + ) + .try_into() + .unwrap(); + + transformer + .handle_transaction(&transaction_info) + .await + .unwrap(); +} + fn generate_merkle_tree_from_batch_mint(batch_mint: &BatchMint) -> ConcurrentMerkleTree<10, 32> { let mut merkle_tree = ConcurrentMerkleTree::<10, 32>::new(); merkle_tree.initialize().unwrap(); @@ -207,6 +520,7 @@ async fn batch_mint_persister_test() { url: Set(metadata_url.clone()), created_at_slot: Set(10), signature: Set(Signature::new_unique().to_string()), + merkle_tree: Set(Pubkey::default().to_bytes().to_vec()), staker: Set(Pubkey::default().to_bytes().to_vec()), download_attempts: Set(0), batch_mint_persisting_state: Set(BatchMintPersistingState::ReceivedTransaction), @@ -335,6 +649,7 @@ async fn batch_mint_persister_download_fail_test() { url: Set(metadata_url.clone()), created_at_slot: Set(10), signature: Set(Signature::new_unique().to_string()), + merkle_tree: Set(Pubkey::default().to_bytes().to_vec()), staker: Set(Pubkey::default().to_bytes().to_vec()), download_attempts: Set(download_attempts), batch_mint_persisting_state: Set(BatchMintPersistingState::ReceivedTransaction), @@ -481,6 +796,7 @@ async fn batch_mint_with_verified_creators_test() { url: Set(metadata_url.clone()), created_at_slot: Set(10), signature: Set(Signature::new_unique().to_string()), + merkle_tree: Set(Pubkey::default().to_bytes().to_vec()), staker: Set(Pubkey::default().to_bytes().to_vec()), download_attempts: Set(download_attempts), batch_mint_persisting_state: Set(BatchMintPersistingState::ReceivedTransaction), @@ -581,6 +897,7 @@ async fn batch_mint_with_unverified_creators_test() { url: Set(metadata_url.clone()), created_at_slot: Set(10), signature: Set(Signature::new_unique().to_string()), + merkle_tree: Set(Pubkey::default().to_bytes().to_vec()), staker: Set(Pubkey::default().to_bytes().to_vec()), download_attempts: Set(download_attempts), batch_mint_persisting_state: Set(BatchMintPersistingState::ReceivedTransaction), @@ -718,6 +1035,7 @@ async fn batch_mint_with_verified_collection_test() { url: Set(metadata_url.clone()), created_at_slot: Set(10), signature: Set(Signature::new_unique().to_string()), + merkle_tree: Set(Pubkey::default().to_bytes().to_vec()), staker: Set(Pubkey::default().to_bytes().to_vec()), download_attempts: Set(download_attempts), batch_mint_persisting_state: Set(BatchMintPersistingState::ReceivedTransaction), @@ -867,6 +1185,7 @@ async fn batch_mint_with_wrong_collection_test() { url: Set(metadata_url.clone()), created_at_slot: Set(10), signature: Set(Signature::new_unique().to_string()), + merkle_tree: Set(Pubkey::default().to_bytes().to_vec()), staker: Set(Pubkey::default().to_bytes().to_vec()), download_attempts: Set(download_attempts), batch_mint_persisting_state: Set(BatchMintPersistingState::ReceivedTransaction), @@ -1004,6 +1323,7 @@ async fn batch_mint_with_unverified_collection_test() { url: Set(metadata_url.clone()), created_at_slot: Set(10), signature: Set(Signature::new_unique().to_string()), + merkle_tree: Set(Pubkey::default().to_bytes().to_vec()), staker: Set(Pubkey::default().to_bytes().to_vec()), download_attempts: Set(download_attempts), batch_mint_persisting_state: Set(BatchMintPersistingState::ReceivedTransaction), diff --git a/integration_tests/tests/integration_tests/cnft_tests.rs b/integration_tests/tests/integration_tests/cnft_tests.rs index 5c772d30..7868ed19 100644 --- a/integration_tests/tests/integration_tests/cnft_tests.rs +++ b/integration_tests/tests/integration_tests/cnft_tests.rs @@ -70,6 +70,7 @@ async fn test_cnft_scenario_mint_update_metadata() { name.clone(), TestSetupOptions { network: Some(Network::Devnet), + skip_batch_minted_trees: false, }, ) .await; @@ -95,6 +96,7 @@ async fn test_cnft_scenario_mint_update_metadata_remove_creators() { name.clone(), TestSetupOptions { network: Some(Network::Devnet), + skip_batch_minted_trees: false, }, ) .await; @@ -180,6 +182,7 @@ async fn test_mint_delegate_transfer() { name.clone(), TestSetupOptions { network: Some(Network::Devnet), + skip_batch_minted_trees: false, }, ) .await; @@ -204,6 +207,7 @@ async fn test_mint_redeem_cancel_redeem() { name.clone(), TestSetupOptions { network: Some(Network::Devnet), + skip_batch_minted_trees: false, }, ) .await; @@ -246,6 +250,7 @@ async fn test_mint_transfer_burn() { name.clone(), TestSetupOptions { network: Some(Network::Devnet), + skip_batch_minted_trees: false, }, ) .await; @@ -288,6 +293,7 @@ async fn test_mint_transfer_transfer() { name.clone(), TestSetupOptions { network: Some(Network::Devnet), + skip_batch_minted_trees: false, }, ) .await; @@ -312,6 +318,7 @@ async fn test_mint_verify_creator() { name.clone(), TestSetupOptions { network: Some(Network::Devnet), + skip_batch_minted_trees: false, }, ) .await; @@ -335,6 +342,7 @@ async fn test_mint_verify_collection() { name.clone(), TestSetupOptions { network: Some(Network::Devnet), + skip_batch_minted_trees: false, }, ) .await; diff --git a/integration_tests/tests/integration_tests/common.rs b/integration_tests/tests/integration_tests/common.rs index 3e328c73..19ce409e 100644 --- a/integration_tests/tests/integration_tests/common.rs +++ b/integration_tests/tests/integration_tests/common.rs @@ -85,7 +85,8 @@ impl TestSetup { let pool = setup_pg_pool(database_test_url.clone()).await; let db = SqlxPostgresConnector::from_sqlx_postgres_pool(pool.clone()); - let transformer = load_ingest_program_transformer(pool.clone()).await; + let transformer = + load_ingest_program_transformer(pool.clone(), opts.skip_batch_minted_trees).await; let rpc_url = match opts.network.unwrap_or_default() { Network::Mainnet => std::env::var("MAINNET_RPC_URL").unwrap(), @@ -115,6 +116,7 @@ impl TestSetup { #[derive(Clone, Copy, Default)] pub struct TestSetupOptions { pub network: Option, + pub skip_batch_minted_trees: bool, } pub async fn setup_pg_pool(database_url: String) -> PgPool { @@ -163,8 +165,17 @@ pub async fn apply_migrations_and_delete_data(db: Arc) { .unwrap(); } -async fn load_ingest_program_transformer(pool: sqlx::Pool) -> ProgramTransformer { - ProgramTransformer::new(pool, Box::new(|_info| ready(Ok(())).boxed()), false) +async fn load_ingest_program_transformer( + pool: sqlx::Pool, + skip_batch_minted_trees: bool, +) -> ProgramTransformer { + ProgramTransformer::new( + pool, + Box::new(|_info| ready(Ok(())).boxed()), + false, + skip_batch_minted_trees, + ) + .await } pub async fn get_transaction( diff --git a/integration_tests/tests/integration_tests/mpl_core_tests.rs b/integration_tests/tests/integration_tests/mpl_core_tests.rs index a8630e84..b6699018 100644 --- a/integration_tests/tests/integration_tests/mpl_core_tests.rs +++ b/integration_tests/tests/integration_tests/mpl_core_tests.rs @@ -17,6 +17,7 @@ async fn test_mpl_core_get_asset() { name.clone(), TestSetupOptions { network: Some(Network::Devnet), + skip_batch_minted_trees: false, }, ) .await; @@ -46,6 +47,7 @@ async fn test_mpl_core_get_collection() { name.clone(), TestSetupOptions { network: Some(Network::Devnet), + skip_batch_minted_trees: false, }, ) .await; @@ -75,6 +77,7 @@ async fn test_mpl_core_get_assets_by_authority() { name.clone(), TestSetupOptions { network: Some(Network::Devnet), + skip_batch_minted_trees: false, }, ) .await; @@ -117,6 +120,7 @@ async fn test_mpl_core_get_assets_by_group() { name.clone(), TestSetupOptions { network: Some(Network::Devnet), + skip_batch_minted_trees: false, }, ) .await; @@ -158,6 +162,7 @@ async fn test_mpl_core_get_assets_by_owner() { name.clone(), TestSetupOptions { network: Some(Network::Devnet), + skip_batch_minted_trees: false, }, ) .await; @@ -196,6 +201,7 @@ async fn test_mpl_core_get_asset_with_edition() { name.clone(), TestSetupOptions { network: Some(Network::Devnet), + skip_batch_minted_trees: false, }, ) .await; @@ -225,6 +231,7 @@ async fn test_mpl_core_get_asset_with_pubkey_in_rule_set() { name.clone(), TestSetupOptions { network: Some(Network::Mainnet), + skip_batch_minted_trees: false, }, ) .await; @@ -254,6 +261,7 @@ async fn test_mpl_core_get_asset_with_two_oracle_external_plugins() { name.clone(), TestSetupOptions { network: Some(Network::Devnet), + skip_batch_minted_trees: false, }, ) .await; @@ -283,6 +291,7 @@ async fn test_mpl_core_get_asset_with_oracle_external_plugin_on_collection() { name.clone(), TestSetupOptions { network: Some(Network::Devnet), + skip_batch_minted_trees: false, }, ) .await; @@ -312,6 +321,7 @@ async fn test_mpl_core_get_asset_with_oracle_multiple_lifecycle_events() { name.clone(), TestSetupOptions { network: Some(Network::Devnet), + skip_batch_minted_trees: false, }, ) .await; @@ -341,6 +351,7 @@ async fn test_mpl_core_get_asset_with_oracle_custom_offset_and_base_address_conf name.clone(), TestSetupOptions { network: Some(Network::Devnet), + skip_batch_minted_trees: false, }, ) .await; @@ -370,6 +381,7 @@ async fn test_mpl_core_get_asset_with_oracle_no_offset() { name.clone(), TestSetupOptions { network: Some(Network::Devnet), + skip_batch_minted_trees: false, }, ) .await; @@ -399,6 +411,7 @@ async fn test_mpl_core_get_assets_by_group_with_oracle_and_custom_pda_all_seeds( name.clone(), TestSetupOptions { network: Some(Network::Devnet), + skip_batch_minted_trees: false, }, ) .await; @@ -438,6 +451,7 @@ async fn test_mpl_core_get_asset_with_multiple_internal_and_external_plugins() { name.clone(), TestSetupOptions { network: Some(Network::Devnet), + skip_batch_minted_trees: false, }, ) .await; diff --git a/migration/src/m20240720_120101_add_finalize_tree_with_root_instruction_handle.rs b/migration/src/m20240720_120101_add_finalize_tree_with_root_instruction_handle.rs index 8e491675..f45bb86c 100644 --- a/migration/src/m20240720_120101_add_finalize_tree_with_root_instruction_handle.rs +++ b/migration/src/m20240720_120101_add_finalize_tree_with_root_instruction_handle.rs @@ -63,6 +63,11 @@ impl MigrationTrait for Migration { .string() .not_null(), ) + .col( + ColumnDef::new(BatchMintToVerify::MerkleTree) + .binary() + .not_null(), + ) .col( ColumnDef::new(BatchMintToVerify::Staker) .binary() @@ -109,6 +114,24 @@ impl MigrationTrait for Migration { ) .await?; + manager + .create_index( + Index::create() + .name("idx_batch_mint_tree") + .table(BatchMintToVerify::Table) + .col(BatchMintToVerify::MerkleTree) + .to_owned(), + ) + .await?; + + manager + .get_connection() + .execute(Statement::from_string( + DatabaseBackend::Postgres, + "ALTER TABLE batch_mint_to_verify ADD CONSTRAINT unique_filehash_staker UNIQUE (file_hash, staker);".to_string(), + )) + .await?; + manager .create_table( Table::create() @@ -175,6 +198,7 @@ enum BatchMintToVerify { FileHash, CreatedAtSlot, Signature, + MerkleTree, DownloadAttempts, BatchMintPersistingState, BatchMintFailStatus, diff --git a/nft_ingester/src/account_updates.rs b/nft_ingester/src/account_updates.rs index 6d537255..b807d045 100644 --- a/nft_ingester/src/account_updates.rs +++ b/nft_ingester/src/account_updates.rs @@ -35,6 +35,7 @@ pub fn account_worker( pool, create_download_metadata_notifier(bg_task_sender), false, + false, )); loop { let e = msg.recv(stream_key, consumption_type.clone()).await; diff --git a/nft_ingester/src/config.rs b/nft_ingester/src/config.rs index 5fd5cacd..b7e8c6d6 100644 --- a/nft_ingester/src/config.rs +++ b/nft_ingester/src/config.rs @@ -31,7 +31,7 @@ pub struct IngesterConfig { pub code_version: Option<&'static str>, pub background_task_runner_config: Option, pub cl_audits: Option, // save transaction logs for compressed nfts - pub skip_rollup_indexing: bool, + pub skip_batch_mint_indexing: bool, } #[derive(Deserialize, PartialEq, Debug, Clone)] diff --git a/nft_ingester/src/main.rs b/nft_ingester/src/main.rs index 2b2e3182..3cf830d2 100644 --- a/nft_ingester/src/main.rs +++ b/nft_ingester/src/main.rs @@ -104,7 +104,7 @@ pub async fn main() -> Result<(), IngesterError> { let mut rollup_persister: Option< Arc>, > = None; - if !config.skip_rollup_indexing { + if !config.skip_batch_mint_indexing { let r = batch_mint_updates::create_batch_mint_notification_channel( &config.get_database_url(), &mut tasks, @@ -170,6 +170,7 @@ pub async fn main() -> Result<(), IngesterError> { }, config.cl_audits.unwrap_or(false), stream_name, + config.skip_batch_mint_indexing, ); } WorkerType::Rollup => { diff --git a/nft_ingester/src/transaction_notifications.rs b/nft_ingester/src/transaction_notifications.rs index df005bc5..3351ee6f 100644 --- a/nft_ingester/src/transaction_notifications.rs +++ b/nft_ingester/src/transaction_notifications.rs @@ -28,15 +28,20 @@ pub fn transaction_worker( consumption_type: ConsumptionType, cl_audits: bool, stream_key: &'static str, + skip_batch_minted_trees: bool, ) -> JoinHandle<()> { tokio::spawn(async move { let source = T::new(config).await; if let Ok(mut msg) = source { - let manager = Arc::new(ProgramTransformer::new( - pool, - create_download_metadata_notifier(bg_task_sender), - cl_audits, - )); + let manager = Arc::new( + ProgramTransformer::new( + pool, + create_download_metadata_notifier(bg_task_sender), + cl_audits, + skip_batch_minted_trees, + ) + .await, + ); loop { let e = msg.recv(stream_key, consumption_type.clone()).await; let mut tasks = JoinSet::new(); diff --git a/program_transformers/src/batch_minting/batch_mint_persister.rs b/program_transformers/src/batch_minting/batch_mint_persister.rs index 4668f8d6..eb3ad665 100644 --- a/program_transformers/src/batch_minting/batch_mint_persister.rs +++ b/program_transformers/src/batch_minting/batch_mint_persister.rs @@ -218,6 +218,7 @@ impl BatchMintPer url: Set(r.url.clone()), created_at_slot: Set(r.created_at_slot), signature: Set(r.signature.clone()), + merkle_tree: Set(r.merkle_tree.clone()), staker: Set(r.staker.clone()), download_attempts: Set(r.download_attempts), batch_mint_persisting_state: Set(BatchMintPersistingState::StartProcessing), @@ -253,12 +254,8 @@ impl BatchMintPer Ok(r) => { let query = batch_mint::Entity::insert(batch_mint::ActiveModel { file_hash: Set(batch_mint_to_verify.file_hash.clone()), - batch_mint_binary_bincode: Set( - bincode::serialize(&r) - .map_err(|e| { - ProgramTransformerError::SerializatonError(e.to_string()) - })?, - ), + batch_mint_binary_bincode: Set(bincode::serialize(&r) + .map_err(|e| ProgramTransformerError::SerializatonError(e.to_string()))?), }) .on_conflict( OnConflict::columns([batch_mint::Column::FileHash]) @@ -320,6 +317,7 @@ impl BatchMintPer url: Set(batch_mint_to_verify.url.clone()), created_at_slot: Set(batch_mint_to_verify.created_at_slot), signature: Set(batch_mint_to_verify.signature.clone()), + merkle_tree: Set(batch_mint_to_verify.merkle_tree.clone()), staker: Set(batch_mint_to_verify.staker.clone()), download_attempts: Set(batch_mint_to_verify.download_attempts + 1), batch_mint_persisting_state: Set(batch_mint_to_verify @@ -424,6 +422,7 @@ impl BatchMintPer url: Set(batch_mint.url.clone()), created_at_slot: Set(batch_mint.created_at_slot), signature: Set(batch_mint.signature.clone()), + merkle_tree: Set(batch_mint.merkle_tree.clone()), staker: Set(batch_mint.staker.clone()), download_attempts: Set(batch_mint.download_attempts), batch_mint_persisting_state: Set(batch_mint.batch_mint_persisting_state.clone()), @@ -468,13 +467,11 @@ where for batched_mint in batch_mint.batch_mints.iter() { bubblegum::mint_v1::mint_v1( &batched_mint.into(), + // only signature and slot will be used &InstructionBundle { txn_id: &signature, - program: Default::default(), - instruction: None, - inner_ix: None, - keys: &[], slot, + ..Default::default() }, txn, "FinalizeTreeWithRoot", diff --git a/program_transformers/src/bubblegum/finalize_tree_with_root.rs b/program_transformers/src/bubblegum/finalize_tree_with_root.rs index 5b2cbab4..f6badaee 100644 --- a/program_transformers/src/bubblegum/finalize_tree_with_root.rs +++ b/program_transformers/src/bubblegum/finalize_tree_with_root.rs @@ -1,7 +1,11 @@ +use std::collections::HashSet; + use blockbuster::programs::bubblegum::Payload; use digital_asset_types::dao::sea_orm_active_enums::BatchMintPersistingState; use sea_orm::sea_query::OnConflict; use sea_orm::{DbBackend, EntityTrait, QueryTrait, Set}; +use solana_sdk::pubkey::Pubkey; +use tokio::sync::RwLock; use { crate::error::{ProgramTransformerError, ProgramTransformerResult}, blockbuster::{instruction::InstructionBundle, programs::bubblegum::BubblegumInstruction}, @@ -12,17 +16,25 @@ pub async fn finalize_tree_with_root<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, + batched_trees: &Option>>, ) -> ProgramTransformerResult<()> where T: ConnectionTrait + TransactionTrait, { - if let Some(Payload::FinalizeTreeWithRoot { args, .. }) = &parsing_result.payload { + if let Some(Payload::FinalizeTreeWithRoot { args, tree_id }) = &parsing_result.payload { + if let Some(batched_trees) = batched_trees { + let mut b_trees = batched_trees.write().await; + b_trees.insert(tree_id.clone()); + drop(b_trees); + } + let query = digital_asset_types::dao::batch_mint_to_verify::Entity::insert( digital_asset_types::dao::batch_mint_to_verify::ActiveModel { file_hash: Set(args.metadata_hash.clone()), url: Set(args.metadata_url.clone()), created_at_slot: Set(bundle.slot as i64), signature: Set(bundle.txn_id.to_string()), + merkle_tree: Set(tree_id.to_bytes().to_vec()), staker: Set(args.staker.to_bytes().to_vec()), download_attempts: Set(0), batch_mint_persisting_state: Set(BatchMintPersistingState::ReceivedTransaction), diff --git a/program_transformers/src/bubblegum/mod.rs b/program_transformers/src/bubblegum/mod.rs index aadf692a..d3402bbb 100644 --- a/program_transformers/src/bubblegum/mod.rs +++ b/program_transformers/src/bubblegum/mod.rs @@ -11,6 +11,9 @@ use { token_metadata::types::UseMethod as TokenMetadataUseMethod, }, sea_orm::{ConnectionTrait, TransactionTrait}, + solana_sdk::pubkey::Pubkey, + std::collections::HashSet, + tokio::sync::RwLock, tracing::{debug, info}, }; @@ -32,6 +35,7 @@ pub async fn handle_bubblegum_instruction<'c, T>( txn: &T, download_metadata_notifier: &DownloadMetadataNotifier, cl_audits: bool, + batched_trees: &Option>>, ) -> ProgramTransformerResult<()> where T: ConnectionTrait + TransactionTrait, @@ -116,7 +120,13 @@ where } InstructionName::FinalizeTreeWithRoot | InstructionName::FinalizeTreeWithRootAndCollection => { - finalize_tree_with_root::finalize_tree_with_root(parsing_result, bundle, txn).await? + finalize_tree_with_root::finalize_tree_with_root( + parsing_result, + bundle, + txn, + batched_trees, + ) + .await? } _ => debug!("Bubblegum: Not Implemented Instruction"), } diff --git a/program_transformers/src/lib.rs b/program_transformers/src/lib.rs index c43c1352..db649ec4 100644 --- a/program_transformers/src/lib.rs +++ b/program_transformers/src/lib.rs @@ -15,16 +15,20 @@ use { ProgramParseResult, }, }, + digital_asset_types::dao::batch_mint_to_verify, futures::future::BoxFuture, sea_orm::{ entity::EntityTrait, query::Select, ConnectionTrait, DatabaseConnection, DbErr, - SqlxPostgresConnector, TransactionTrait, + QuerySelect, SqlxPostgresConnector, TransactionTrait, }, solana_sdk::{instruction::CompiledInstruction, pubkey::Pubkey, signature::Signature}, solana_transaction_status::InnerInstructions, sqlx::PgPool, std::collections::{HashMap, HashSet, VecDeque}, - tokio::time::{sleep, Duration}, + tokio::{ + sync::RwLock, + time::{sleep, Duration}, + }, tracing::{debug, error, info}, }; @@ -86,13 +90,15 @@ pub struct ProgramTransformer { parsers: HashMap>, key_set: HashSet, cl_audits: bool, + batched_trees: Option>>, } impl ProgramTransformer { - pub fn new( + pub async fn new( pool: PgPool, download_metadata_notifier: DownloadMetadataNotifier, cl_audits: bool, + skip_batch_mined_trees: bool, ) -> Self { let mut parsers: HashMap> = HashMap::with_capacity(3); let bgum = BubblegumParser {}; @@ -108,12 +114,47 @@ impl ProgramTransformer { acc }); let pool: PgPool = pool; + + let storage = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); + + let batched_trees = { + if !skip_batch_mined_trees { + None + } else { + match batch_mint_to_verify::Entity::find() + .column(batch_mint_to_verify::Column::MerkleTree) + .all(&storage) + .await + { + Ok(models) => { + let trees_result: Result, _> = models + .iter() + .map(|m| Pubkey::try_from(m.merkle_tree.clone())) + .collect(); + + match trees_result { + Ok(trees) => Some(RwLock::new(trees)), + Err(e) => { + error!("Failed to convert merkle_tree to Pubkey: {:?}", e); + None + } + } + } + Err(e) => { + error!("Failed to fetch batch_mint_to_verify models: {:?}", e); + None + } + } + } + }; + ProgramTransformer { - storage: SqlxPostgresConnector::from_sqlx_postgres_pool(pool), + storage, download_metadata_notifier, parsers, key_set: hs, cl_audits, + batched_trees, } } @@ -182,12 +223,24 @@ impl ProgramTransformer { let concrete = result.result_type(); match concrete { ProgramParseResult::Bubblegum(parsing_result) => { + if let Some(batched_trees) = &self.batched_trees { + if let Some(change_log) = &parsing_result.tree_update { + let batched_trees = batched_trees.read().await; + + if let Some(_tree) = batched_trees.get(&change_log.id) { + drop(batched_trees); + continue; + } + } + } + handle_bubblegum_instruction( parsing_result, &ix, &self.storage, &self.download_metadata_notifier, self.cl_audits, + &self.batched_trees, ) .await .map_err(|err| {