Skip to content

Commit

Permalink
Merge branch 'main' into doc/renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
n00m4d committed Sep 27, 2024
2 parents ce9735f + a26b229 commit 4af7eea
Show file tree
Hide file tree
Showing 21 changed files with 1,601 additions and 765 deletions.
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ API_RUN_PROFILING=false
API_PROFILING_FILE_PATH_CONTAINER="/usr/src/profiling"
API_JSON_MIDDLEWARE_CONFIG='{is_enabled=true, max_urls_to_parse=10}'
API_ARCHIVES_DIR="/rocksdb/_rocksdb_backup_archives"
API_CONSISTENCE_SYNCHRONIZATION_API_THRESHOLD=1_000_000
API_CONSISTENCE_BACKFILLING_SLOTS_THRESHOLD=500

# Synchronizer instance config
SYNCHRONIZER_DATABASE_CONFIG='{max_postgres_connections=100, url="postgres://solana:solana@compressed-nft-indexer_db_1:5432/solana"}'
Expand Down
661 changes: 661 additions & 0 deletions LICENSE

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,4 @@ The project's structure is a reflection of the following clean architecture prin
The API specification is compatible with the standard DAS specification here https://github.com/metaplex-foundation/api-specifications

### Developing and running
Full documentation and contribution guidelines coming soon…
Full documentation and contribution guidelines coming soon…
36 changes: 35 additions & 1 deletion nft_ingester/src/accounts_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast::Receiver;
use tokio::sync::Mutex;
use tokio::task::JoinSet;
use tokio::task::{JoinError, JoinSet};
use tokio::time::Instant;
use tracing::error;

Expand All @@ -27,6 +27,40 @@ const WORKER_IDLE_TIMEOUT: Duration = Duration::from_millis(100);
// interval after which buffer is flushed
const FLUSH_INTERVAL: Duration = Duration::from_millis(500);

#[allow(clippy::too_many_arguments)]
pub async fn run_accounts_processor<AG: UnprocessedAccountsGetter + Sync + Send + 'static>(
rx: Receiver<()>,
mutexed_tasks: Arc<Mutex<JoinSet<Result<(), JoinError>>>>,
unprocessed_transactions_getter: Arc<AG>,
rocks_storage: Arc<Storage>,
account_buffer_size: usize,
fees_buffer_size: usize,
metrics: Arc<IngesterMetricsConfig>,
postgre_client: Arc<PgClient>,
rpc_client: Arc<RpcClient>,
join_set: Arc<Mutex<JoinSet<Result<(), JoinError>>>>,
) {
mutexed_tasks.lock().await.spawn(async move {
let account_processor = AccountsProcessor::build(
rx.resubscribe(),
fees_buffer_size,
unprocessed_transactions_getter,
metrics,
postgre_client,
rpc_client,
join_set,
)
.await
.expect("Failed to build 'AccountsProcessor'!");

account_processor
.process_accounts(rx, rocks_storage, account_buffer_size)
.await;

Ok(())
});
}

pub struct AccountsProcessor<T: UnprocessedAccountsGetter> {
fees_batch_size: usize,
unprocessed_account_getter: Arc<T>,
Expand Down
8 changes: 8 additions & 0 deletions nft_ingester/src/api/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ impl RpcApiBuilder {
}
});
module.add_alias("getAssetBatch", "get_asset_batch");
module.add_alias("getAssets", "get_asset_batch");
module.add_alias("get_assets", "get_asset_batch");

let cloned_api = api.clone();
module.add_method("get_asset_proof_batch", move |rpc_params: Params| {
Expand All @@ -172,6 +174,8 @@ impl RpcApiBuilder {
}
});
module.add_alias("getAssetProofBatch", "get_asset_proof_batch");
module.add_alias("getAssetProofs", "get_asset_proof_batch");
module.add_alias("get_asset_proofs", "get_asset_proof_batch");

let cloned_api = api.clone();
module.add_method("get_grouping", move |rpc_params: Params| {
Expand Down Expand Up @@ -213,6 +217,8 @@ impl RpcApiBuilder {
}
});
module.add_alias("getSignaturesForAsset", "get_signatures_for_asset");
module.add_alias("getAssetSignatures", "get_signatures_for_asset");
module.add_alias("get_asset_signatures", "get_signatures_for_asset");

let cloned_api = api.clone();
module.add_method("get_signatures_for_asset_v2", move |rpc_params: Params| {
Expand All @@ -224,6 +230,8 @@ impl RpcApiBuilder {
}
});
module.add_alias("getSignaturesForAssetV2", "get_signatures_for_asset_v2");
module.add_alias("getAssetSignaturesV2", "get_signatures_for_asset_v2");
module.add_alias("get_asset_signatures_v2", "get_signatures_for_asset_v2");

let cloned_api = api.clone();
module.add_method("get_token_accounts", move |rpc_params: Params| {
Expand Down
61 changes: 35 additions & 26 deletions nft_ingester/src/api/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,36 +60,48 @@ pub async fn start_api(
json_middleware_config: Option<JsonMiddlewareConfig>,
tasks: Arc<Mutex<JoinSet<Result<(), JoinError>>>>,
archives_dir: &str,
consistence_synchronization_api_threshold: u64,
consistence_backfilling_slots_threshold: u64,
consistence_synchronization_api_threshold: Option<u64>,
consistence_backfilling_slots_threshold: Option<u64>,
batch_mint_service_port: Option<u16>,
file_storage_path: &str,
account_balance_getter: Arc<AccountBalanceGetterImpl>,
storage_service_base_url: Option<String>,
) -> Result<(), DasApiError> {
let response_middleware = RpcResponseMiddleware {};
let request_middleware = RpcRequestMiddleware::new(archives_dir);
let synchronization_state_consistency_checker =
Arc::new(SynchronizationStateConsistencyChecker::new());
synchronization_state_consistency_checker
.run(
tasks.clone(),
rx.resubscribe(),
pg_client.clone(),
rocks_db.clone(),
consistence_synchronization_api_threshold,
)
.await;

let backfilling_state_consistency_checker = Arc::new(BackfillingStateConsistencyChecker::new());
backfilling_state_consistency_checker
.run(
tasks.clone(),
rx.resubscribe(),
rocks_db.clone(),
consistence_backfilling_slots_threshold,
)
.await;
let mut consistency_checkers: Vec<Arc<dyn ConsistencyChecker>> = vec![];

if let Some(consistence_synchronization_api_threshold) =
consistence_synchronization_api_threshold
{
let synchronization_state_consistency_checker =
Arc::new(SynchronizationStateConsistencyChecker::new());
synchronization_state_consistency_checker
.run(
tasks.clone(),
rx.resubscribe(),
pg_client.clone(),
rocks_db.clone(),
consistence_synchronization_api_threshold,
)
.await;
consistency_checkers.push(synchronization_state_consistency_checker);
}

if let Some(consistence_backfilling_slots_threshold) = consistence_backfilling_slots_threshold {
let backfilling_state_consistency_checker =
Arc::new(BackfillingStateConsistencyChecker::new());
backfilling_state_consistency_checker
.run(
tasks.clone(),
rx.resubscribe(),
rocks_db.clone(),
consistence_backfilling_slots_threshold,
)
.await;
consistency_checkers.push(backfilling_state_consistency_checker);
}

let addr = SocketAddr::from(([0, 0, 0, 0], port));
let api = DasApi::new(
Expand All @@ -110,10 +122,7 @@ pub async fn start_api(
Some(MiddlewaresData {
response_middleware,
request_middleware,
consistency_checkers: vec![
synchronization_state_consistency_checker,
backfilling_state_consistency_checker,
],
consistency_checkers,
}),
addr,
tasks,
Expand Down
147 changes: 144 additions & 3 deletions nft_ingester/src/backfiller.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use crate::config::BackfillerConfig;
use crate::config::{BackfillerConfig, BackfillerSourceMode, IngesterConfig};
use crate::error::IngesterError;
use async_trait::async_trait;
use backfill_rpc::rpc::BackfillRPC;
use entities::models::{BufferedTransaction, RawBlock};
use flatbuffers::FlatBufferBuilder;
use futures::future::join_all;
use interface::error::BlockConsumeError;
use interface::error::{BlockConsumeError, StorageError, UsecaseError};
use interface::signature_persistence::{BlockConsumer, BlockProducer};
use interface::slot_getter::FinalizedSlotGetter;
use interface::slots_dumper::{SlotGetter, SlotsDumper};
Expand All @@ -14,8 +15,9 @@ use rocks_db::bubblegum_slots::{BubblegumSlotGetter, ForceReingestableSlots};
use rocks_db::column::TypedColumn;
use rocks_db::transaction::{TransactionProcessor, TransactionResultPersister};
use rocks_db::Storage;
use solana_program::pubkey::Pubkey;
use solana_transaction_status::{
EncodedConfirmedTransactionWithStatusMeta, EncodedTransactionWithStatusMeta,
EncodedConfirmedTransactionWithStatusMeta, EncodedTransactionWithStatusMeta, UiConfirmedBlock,
};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
Expand All @@ -34,6 +36,145 @@ pub const GET_DATA_FROM_BG_RETRIES: u32 = 5;
pub const SECONDS_TO_RETRY_ROCKSDB_OPERATION: u64 = 5;
pub const DELETE_SLOT_RETRIES: u32 = 5;

pub async fn run_slot_force_persister<C, P, S>(
force_reingestable_transactions_parser: Arc<TransactionsParser<C, P, S>>,
rx: Receiver<()>,
) -> Result<(), JoinError>
where
C: BlockConsumer,
P: BlockProducer,
S: SlotGetter,
{
info!("Running slot force persister...");

force_reingestable_transactions_parser
.parse_transactions(rx)
.await;

info!("Force slot persister finished working.");

Ok(())
}

pub enum BackfillSource {
Bigtable(Arc<BigTableClient>),
Rpc(Arc<BackfillRPC>),
}

impl BackfillSource {
pub async fn new(
ingester_config: &IngesterConfig,
backfiller_config: &BackfillerConfig,
) -> Self {
match ingester_config.backfiller_source_mode {
BackfillerSourceMode::Bigtable => Self::Bigtable(Arc::new(
connect_new_bigtable_from_config(backfiller_config.clone())
.await
.unwrap(),
)),
BackfillerSourceMode::RPC => Self::Rpc(Arc::new(BackfillRPC::connect(
ingester_config.backfill_rpc_address.clone(),
))),
}
}
}

#[async_trait]
impl SlotsGetter for BackfillSource {
async fn get_slots(
&self,
collected_key: &Pubkey,
start_at: u64,
rows_limit: i64,
) -> Result<Vec<u64>, UsecaseError> {
match self {
BackfillSource::Bigtable(bigtable) => {
bigtable
.big_table_inner_client
.get_slots(collected_key, start_at, rows_limit)
.await
}
BackfillSource::Rpc(rpc) => rpc.get_slots(collected_key, start_at, rows_limit).await,
}
}
}

#[async_trait]
impl BlockProducer for BackfillSource {
async fn get_block(
&self,
slot: u64,
backup_provider: Option<Arc<impl BlockProducer>>,
) -> Result<UiConfirmedBlock, StorageError> {
match self {
BackfillSource::Bigtable(bigtable) => bigtable.get_block(slot, backup_provider).await,
BackfillSource::Rpc(rpc) => rpc.get_block(slot, backup_provider).await,
}
}
}

pub async fn run_perpetual_slot_collection(
backfiller_clone: Arc<Backfiller<BackfillSource>>,
rpc_backfiller_clone: Arc<BackfillRPC>,
metrics: Arc<BackfillerMetricsConfig>,
backfiller_wait_period_sec: u64,
rx: Receiver<()>,
) -> Result<(), JoinError> {
info!("Running slot fetcher...");

if let Err(e) = backfiller_clone
.run_perpetual_slot_collection(
metrics,
Duration::from_secs(backfiller_wait_period_sec),
rpc_backfiller_clone,
rx,
)
.await
{
error!("Error while running perpetual slot fetcher: {}", e);
}

info!("Slot fetcher finished working");

Ok(())
}

#[allow(clippy::too_many_arguments)]
pub async fn run_perpetual_slot_processing<SG, BC, BP>(
backfiller_clone: Arc<Backfiller<BackfillSource>>,
metrics: Arc<BackfillerMetricsConfig>,
slot_getter: Arc<SG>,
consumer: Arc<BC>,
producer: Arc<BP>,
backfiller_wait_period_sec: u64,
rx: Receiver<()>,
backup: Option<Arc<BackfillSource>>,
) -> Result<(), JoinError>
where
BC: BlockConsumer,
SG: SlotGetter,
BP: BlockProducer,
{
info!("Running slot persister...");
if let Err(e) = backfiller_clone
.run_perpetual_slot_processing(
metrics,
slot_getter,
consumer,
producer,
Duration::from_secs(backfiller_wait_period_sec),
rx,
backup,
)
.await
{
error!("Error while running perpetual slot persister: {}", e);
}
info!("Slot persister finished working");

Ok(())
}

pub struct Backfiller<T: SlotsGetter + Send + Sync + 'static> {
rocks_client: Arc<rocks_db::Storage>,
slots_getter: Arc<T>,
Expand Down
2 changes: 1 addition & 1 deletion nft_ingester/src/batch_mint/batch_mint_persister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct BatchMintPersister<D: BatchMintDownloader> {
metrics: Arc<BatchMintPersisterMetricsConfig>,
}

pub struct BatchMintDownloaderForPersister {}
pub struct BatchMintDownloaderForPersister;

#[async_trait]
impl BatchMintDownloader for BatchMintDownloaderForPersister {
Expand Down
12 changes: 12 additions & 0 deletions nft_ingester/src/batch_mint/batch_mint_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast::Receiver;
use tokio::task::JoinError;
use tokio::time::Instant;
use tracing::{error, info};

Expand All @@ -30,6 +31,17 @@ const TRANSACTION_FAIL_METRICS_LABEL: &str = "transaction_fail";
const ARWEAVE_UPLOAD_FAIL_METRICS_LABEL: &str = "arweave_upload_fail";
const FILE_PROCESSING_METRICS_LABEL: &str = "batch_mint_file_processing";

pub async fn process_batch_mints<R: BatchMintTxSender, P: PermanentStorageClient>(
processor_clone: Arc<BatchMintProcessor<R, P>>,
rx: Receiver<()>,
) -> Result<(), JoinError> {
info!("Start processing batch_mints...");
processor_clone.process_batch_mints(rx).await;
info!("Finish processing batch_mints...");

Ok(())
}

pub struct BatchMintDownloaderImpl {
pg_client: Arc<PgClient>,
file_storage_path: String,
Expand Down
Loading

0 comments on commit 4af7eea

Please sign in to comment.