Skip to content

Commit

Permalink
feat: make consistency checks on API optional (#264)
Browse files Browse the repository at this point in the history
  • Loading branch information
n00m4d authored Sep 26, 2024
1 parent bbb3711 commit a26b229
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 36 deletions.
2 changes: 2 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ API_RUN_PROFILING=false
API_PROFILING_FILE_PATH_CONTAINER="/usr/src/profiling"
API_JSON_MIDDLEWARE_CONFIG='{is_enabled=true, max_urls_to_parse=10}'
API_ARCHIVES_DIR="/rocksdb/_rocksdb_backup_archives"
API_CONSISTENCE_SYNCHRONIZATION_API_THRESHOLD=1_000_000
API_CONSISTENCE_BACKFILLING_SLOTS_THRESHOLD=500

# Synchronizer instance config
SYNCHRONIZER_DATABASE_CONFIG='{max_postgres_connections=100, url="postgres://solana:solana@compressed-nft-indexer_db_1:5432/solana"}'
Expand Down
61 changes: 35 additions & 26 deletions nft_ingester/src/api/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,36 +60,48 @@ pub async fn start_api(
json_middleware_config: Option<JsonMiddlewareConfig>,
tasks: Arc<Mutex<JoinSet<Result<(), JoinError>>>>,
archives_dir: &str,
consistence_synchronization_api_threshold: u64,
consistence_backfilling_slots_threshold: u64,
consistence_synchronization_api_threshold: Option<u64>,
consistence_backfilling_slots_threshold: Option<u64>,
batch_mint_service_port: Option<u16>,
file_storage_path: &str,
account_balance_getter: Arc<AccountBalanceGetterImpl>,
storage_service_base_url: Option<String>,
) -> Result<(), DasApiError> {
let response_middleware = RpcResponseMiddleware {};
let request_middleware = RpcRequestMiddleware::new(archives_dir);
let synchronization_state_consistency_checker =
Arc::new(SynchronizationStateConsistencyChecker::new());
synchronization_state_consistency_checker
.run(
tasks.clone(),
rx.resubscribe(),
pg_client.clone(),
rocks_db.clone(),
consistence_synchronization_api_threshold,
)
.await;

let backfilling_state_consistency_checker = Arc::new(BackfillingStateConsistencyChecker::new());
backfilling_state_consistency_checker
.run(
tasks.clone(),
rx.resubscribe(),
rocks_db.clone(),
consistence_backfilling_slots_threshold,
)
.await;
let mut consistency_checkers: Vec<Arc<dyn ConsistencyChecker>> = vec![];

if let Some(consistence_synchronization_api_threshold) =
consistence_synchronization_api_threshold
{
let synchronization_state_consistency_checker =
Arc::new(SynchronizationStateConsistencyChecker::new());
synchronization_state_consistency_checker
.run(
tasks.clone(),
rx.resubscribe(),
pg_client.clone(),
rocks_db.clone(),
consistence_synchronization_api_threshold,
)
.await;
consistency_checkers.push(synchronization_state_consistency_checker);
}

if let Some(consistence_backfilling_slots_threshold) = consistence_backfilling_slots_threshold {
let backfilling_state_consistency_checker =
Arc::new(BackfillingStateConsistencyChecker::new());
backfilling_state_consistency_checker
.run(
tasks.clone(),
rx.resubscribe(),
rocks_db.clone(),
consistence_backfilling_slots_threshold,
)
.await;
consistency_checkers.push(backfilling_state_consistency_checker);
}

let addr = SocketAddr::from(([0, 0, 0, 0], port));
let api = DasApi::new(
Expand All @@ -110,10 +122,7 @@ pub async fn start_api(
Some(MiddlewaresData {
response_middleware,
request_middleware,
consistency_checkers: vec![
synchronization_state_consistency_checker,
backfilling_state_consistency_checker,
],
consistency_checkers,
}),
addr,
tasks,
Expand Down
12 changes: 2 additions & 10 deletions nft_ingester/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,21 +302,13 @@ pub struct ApiConfig {
pub max_page_limit: usize,
pub json_middleware_config: Option<JsonMiddlewareConfig>,
pub archives_dir: String,
#[serde(default = "default_synchronization_api_threshold")]
pub consistence_synchronization_api_threshold: u64,
pub consistence_synchronization_api_threshold: Option<u64>,
#[serde(default = "default_heap_path")]
pub heap_path: String,
#[serde(default = "default_consistence_backfilling_slots_threshold")]
pub consistence_backfilling_slots_threshold: u64,
pub consistence_backfilling_slots_threshold: Option<u64>,
pub storage_service_base_url: Option<String>,
}

const fn default_synchronization_api_threshold() -> u64 {
1_000_000
}
const fn default_consistence_backfilling_slots_threshold() -> u64 {
500
}
fn default_heap_path() -> String {
"/usr/src/app/heaps".to_string()
}
Expand Down

0 comments on commit a26b229

Please sign in to comment.