From 110f71c5abb148394ccb823354a85d053d923a0e Mon Sep 17 00:00:00 2001 From: Kyle Espinola Date: Wed, 9 Oct 2024 14:59:28 +0200 Subject: [PATCH] Support Account Snapshot Flushes (#154) * feat: mock snapshots code * refactor: remove dead code * feat: getting snapshot stream in the works * feat: adds snapshot stream * feat: adds config-ingester settings * Remove dead config * Add snapshot config in ingester --------- Co-authored-by: Kevin Rodriguez <_@kevinrodriguez.io> --- grpc-ingest/config-download-metadata.yml | 12 - grpc-ingest/config-ingester.yml | 30 +- grpc-ingest/src/config.rs | 268 +----------- grpc-ingest/src/download_metadata.rs | 236 ----------- grpc-ingest/src/grpc.rs | 205 +--------- grpc-ingest/src/ingester.rs | 334 +++------------ grpc-ingest/src/main.rs | 19 +- grpc-ingest/src/postgres.rs | 8 - grpc-ingest/src/prom.rs | 8 - grpc-ingest/src/redis.rs | 497 +---------------------- 10 files changed, 76 insertions(+), 1541 deletions(-) delete mode 100644 grpc-ingest/config-download-metadata.yml delete mode 100644 grpc-ingest/src/download_metadata.rs diff --git a/grpc-ingest/config-download-metadata.yml b/grpc-ingest/config-download-metadata.yml deleted file mode 100644 index c42307be..00000000 --- a/grpc-ingest/config-download-metadata.yml +++ /dev/null @@ -1,12 +0,0 @@ -# Important: only ONE `download-metadata` instance is supported right now! -prometheus: 127.0.0.1:8875 -postgres: - url: postgres://solana:solana@localhost/solana - min_connections: 10 - max_connections: 50 -download_metadata: - max_in_process: 50 # maximum tasks in process (downloading metadata) - prefetch_queue_size: 100 - limit_to_fetch: 200 # maximum number of tasks fetched from database - wait_tasks_max_idle_ms: 100 # if we do not have pending tasks, wait max ms - download_timeout_ms: 5_000 diff --git a/grpc-ingest/config-ingester.yml b/grpc-ingest/config-ingester.yml index 38404071..02c6de87 100644 --- a/grpc-ingest/config-ingester.yml +++ b/grpc-ingest/config-ingester.yml @@ -1,29 +1,15 @@ prometheus: 127.0.0.1:8875 -redis: - url: redis://localhost:6379 - group: ingester - consumer: consumer # every new ingester instance should have uniq name - - streams: - - type: account - stream: ACCOUNTS - - type: transaction - stream: TRANSACTIONS - xack_batch_max_size: 100 - xack_batch_max_idle_ms: 10 - xack_max_in_process: 100 - - type: metadatajson - stream: METADATA_JSON - prefetch_queue_size: 5_000 # max number of messages available in the read queue for processing - xpending_max: 1_000 # used for reading pending messages - xpending_only: false # exit once all pending messages consumed (should be applied if you want downscale number of ingesters) - xreadgroup_max: 1_000 # applied per each stream in one request +redis: redis://localhost:6379 postgres: url: postgres://solana:solana@localhost/solana min_connections: 10 max_connections: 50 # `max_connection` should be bigger than `program_transformer.max_tasks_in_process` otherwise unresolved lock is possible -program_transformer: - max_tasks_in_process: 100 +snapshots: + name: SNAPSHOTS + max_concurrency: 10 + batch_size: 100 + xack_batch_max_idle_ms: 1_000 + xack_buffer_size: 10_000 accounts: name: ACCOUNTS max_concurrency: 10 @@ -34,5 +20,5 @@ transactions: name: TRANSACTIONS download_metadata: max_attempts: 3 - stream_config: + stream: name: METADATA_JSON diff --git a/grpc-ingest/src/config.rs b/grpc-ingest/src/config.rs index ebd47ea5..5d998bb4 100644 --- a/grpc-ingest/src/config.rs +++ b/grpc-ingest/src/config.rs @@ -9,9 +9,6 @@ use { }, }; -pub const REDIS_STREAM_ACCOUNTS: &str = "ACCOUNTS"; -pub const REDIS_STREAM_TRANSACTIONS: &str = "TRANSACTIONS"; -pub const REDIS_STREAM_METADATA_JSON: &str = "METADATA_JSON"; pub const REDIS_STREAM_DATA_KEY: &str = "data"; pub async fn load(path: impl AsRef + Copy) -> anyhow::Result @@ -124,21 +121,6 @@ pub struct ConfigGrpc { pub geyser_endpoint: String, - #[serde(default = "ConfigGrpc::default_request_snapshot")] - pub request_snapshot: bool, - - #[serde( - default = "ConfigGrpc::default_geyser_update_message_buffer_size", - deserialize_with = "deserialize_usize_str" - )] - pub geyser_update_message_buffer_size: usize, - - #[serde( - default = "ConfigGrpc::solana_seen_event_cache_max_size", - deserialize_with = "deserialize_usize_str" - )] - pub solana_seen_event_cache_max_size: usize, - pub redis: ConfigGrpcRedis, #[serde( @@ -149,26 +131,13 @@ pub struct ConfigGrpc { } impl ConfigGrpc { - pub const fn default_request_snapshot() -> bool { - false - } - pub const fn default_max_concurrency() -> usize { 10 } - - pub const fn default_geyser_update_message_buffer_size() -> usize { - 100_000 - } - - pub const fn solana_seen_event_cache_max_size() -> usize { - 10_000 - } } #[derive(Debug, Clone, Deserialize)] pub struct ConfigGrpcAccounts { - #[serde(default = "ConfigGrpcAccounts::default_stream")] pub stream: String, #[serde( default = "ConfigGrpcAccounts::default_stream_maxlen", @@ -182,10 +151,6 @@ pub struct ConfigGrpcAccounts { } impl ConfigGrpcAccounts { - pub fn default_stream() -> String { - REDIS_STREAM_ACCOUNTS.to_owned() - } - pub const fn default_stream_maxlen() -> usize { 100_000_000 } @@ -258,137 +223,21 @@ where #[derive(Debug, Clone, Deserialize)] pub struct ConfigIngester { - pub redis: ConfigIngesterRedis, + pub redis: String, pub postgres: ConfigIngesterPostgres, pub download_metadata: ConfigIngesterDownloadMetadata, - pub program_transformer: ConfigIngesterProgramTransformer, + pub snapshots: ConfigIngestStream, pub accounts: ConfigIngestStream, pub transactions: ConfigIngestStream, } -#[derive(Debug, Clone, Deserialize)] -pub struct ConfigIngesterRedis { - pub url: String, - #[serde(default = "ConfigIngesterRedis::default_group")] - pub group: String, - #[serde(default = "ConfigIngesterRedis::default_consumer")] - pub consumer: String, - pub streams: Vec, - #[serde( - default = "ConfigIngesterRedis::default_prefetch_queue_size", - deserialize_with = "deserialize_usize_str" - )] - pub prefetch_queue_size: usize, - #[serde( - default = "ConfigIngesterRedis::default_xpending_max", - deserialize_with = "deserialize_usize_str" - )] - pub xpending_max: usize, - #[serde(default = "ConfigIngesterRedis::default_xpending_only")] - pub xpending_only: bool, - #[serde( - default = "ConfigIngesterRedis::default_xreadgroup_max", - deserialize_with = "deserialize_usize_str" - )] - pub xreadgroup_max: usize, -} - -impl ConfigIngesterRedis { - pub fn default_group() -> String { - "ingester".to_owned() - } - - pub fn default_consumer() -> String { - "consumer".to_owned() - } - - pub const fn default_prefetch_queue_size() -> usize { - 1_000 - } - - pub const fn default_xpending_max() -> usize { - 100 - } - - pub const fn default_xpending_only() -> bool { - false - } - - pub const fn default_xreadgroup_max() -> usize { - 1_000 - } -} - -#[derive(Debug, Clone, Copy)] -pub struct ConfigIngesterRedisStream { - pub stream_type: ConfigIngesterRedisStreamType, - pub stream: &'static str, - pub xack_batch_max_size: usize, - pub xack_batch_max_idle: Duration, - pub xack_max_in_process: usize, -} - -impl<'de> Deserialize<'de> for ConfigIngesterRedisStream { - fn deserialize(deserializer: D) -> Result - where - D: de::Deserializer<'de>, - { - #[derive(Debug, Clone, Copy, Deserialize)] - struct Raw { - #[serde(rename = "type")] - pub stream_type: ConfigIngesterRedisStreamType, - #[serde( - default = "default_xack_batch_max_size", - deserialize_with = "deserialize_usize_str" - )] - pub xack_batch_max_size: usize, - #[serde( - default = "default_xack_batch_max_idle", - deserialize_with = "deserialize_duration_str", - rename = "xack_batch_max_idle_ms" - )] - pub xack_batch_max_idle: Duration, - #[serde( - default = "default_xack_max_in_process", - deserialize_with = "deserialize_usize_str" - )] - pub xack_max_in_process: usize, - } - - const fn default_xack_batch_max_size() -> usize { - 100 - } - - const fn default_xack_batch_max_idle() -> Duration { - Duration::from_millis(10) - } - - const fn default_xack_max_in_process() -> usize { - 100 - } - - let raw = Raw::deserialize(deserializer)?; - - Ok(Self { - stream_type: raw.stream_type, - stream: match raw.stream_type { - ConfigIngesterRedisStreamType::Account => REDIS_STREAM_ACCOUNTS, - ConfigIngesterRedisStreamType::Transaction => REDIS_STREAM_TRANSACTIONS, - ConfigIngesterRedisStreamType::MetadataJson => REDIS_STREAM_METADATA_JSON, - }, - xack_batch_max_size: raw.xack_batch_max_size, - xack_batch_max_idle: raw.xack_batch_max_idle, - xack_max_in_process: raw.xack_max_in_process, - }) - } -} - #[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum ConfigIngesterRedisStreamType { Account, Transaction, MetadataJson, + Snapshot, } #[derive(Debug, Clone, Deserialize)] @@ -416,51 +265,9 @@ impl ConfigIngesterPostgres { } } -#[derive(Debug, Clone, Deserialize)] -pub struct ConfigIngesterProgramTransformer { - #[serde( - default = "ConfigIngesterProgramTransformer::default_max_tasks_in_process", - deserialize_with = "deserialize_usize_str" - )] - pub max_tasks_in_process: usize, - #[serde( - default = "ConfigIngesterProgramTransformer::default_account_num_threads", - deserialize_with = "deserialize_usize_str" - )] - pub account_num_threads: usize, - #[serde( - default = "ConfigIngesterProgramTransformer::default_transaction_num_threads", - deserialize_with = "deserialize_usize_str" - )] - pub transaction_num_threads: usize, - #[serde( - default = "ConfigIngesterProgramTransformer::default_metadata_json_num_threads", - deserialize_with = "deserialize_usize_str" - )] - pub metadata_json_num_threads: usize, -} - -impl ConfigIngesterProgramTransformer { - pub const fn default_account_num_threads() -> usize { - 5 - } - - pub const fn default_transaction_num_threads() -> usize { - 1 - } - - pub const fn default_metadata_json_num_threads() -> usize { - 1 - } - - pub const fn default_max_tasks_in_process() -> usize { - 40 - } -} - #[derive(Debug, Clone, Default, Deserialize)] pub struct ConfigIngesterDownloadMetadata { - pub stream_config: ConfigIngestStream, + pub stream: ConfigIngestStream, #[serde( default = "ConfigIngesterDownloadMetadata::default_num_threads", deserialize_with = "deserialize_usize_str" @@ -482,8 +289,6 @@ pub struct ConfigIngesterDownloadMetadata { deserialize_with = "deserialize_usize_str" )] pub stream_maxlen: usize, - #[serde(default = "ConfigIngesterDownloadMetadata::default_stream")] - pub stream: String, #[serde( default = "ConfigIngesterDownloadMetadata::default_stream_max_size", deserialize_with = "deserialize_usize_str" @@ -510,10 +315,6 @@ impl ConfigIngesterDownloadMetadata { 10 } - pub fn default_stream() -> String { - REDIS_STREAM_METADATA_JSON.to_owned() - } - pub const fn default_stream_maxlen() -> usize { 10_000_000 } @@ -526,64 +327,3 @@ impl ConfigIngesterDownloadMetadata { Duration::from_millis(3_000) } } - -#[derive(Debug, Clone, Deserialize)] -pub struct ConfigDownloadMetadata { - pub postgres: ConfigIngesterPostgres, - pub download_metadata: ConfigDownloadMetadataOpts, -} - -#[derive(Debug, Clone, Copy, Deserialize)] -pub struct ConfigDownloadMetadataOpts { - #[serde( - default = "ConfigDownloadMetadataOpts::default_max_in_process", - deserialize_with = "deserialize_usize_str" - )] - pub max_in_process: usize, - #[serde( - default = "ConfigDownloadMetadataOpts::default_prefetch_queue_size", - deserialize_with = "deserialize_usize_str" - )] - pub prefetch_queue_size: usize, - #[serde( - default = "ConfigDownloadMetadataOpts::default_limit_to_fetch", - deserialize_with = "deserialize_usize_str" - )] - pub limit_to_fetch: usize, - #[serde( - default = "ConfigDownloadMetadataOpts::default_wait_tasks_max_idle", - deserialize_with = "deserialize_duration_str", - rename = "wait_tasks_max_idle_ms" - )] - pub wait_tasks_max_idle: Duration, - #[serde( - default = "ConfigDownloadMetadataOpts::default_download_timeout", - deserialize_with = "deserialize_duration_str", - rename = "download_timeout_ms" - )] - pub download_timeout: Duration, - - pub stream: ConfigIngesterRedisStream, -} - -impl ConfigDownloadMetadataOpts { - pub const fn default_max_in_process() -> usize { - 50 - } - - pub const fn default_prefetch_queue_size() -> usize { - 100 - } - - pub const fn default_limit_to_fetch() -> usize { - 200 - } - - pub const fn default_wait_tasks_max_idle() -> Duration { - Duration::from_millis(100) - } - - pub const fn default_download_timeout() -> Duration { - Duration::from_millis(5_000) - } -} diff --git a/grpc-ingest/src/download_metadata.rs b/grpc-ingest/src/download_metadata.rs deleted file mode 100644 index 1e8e8a35..00000000 --- a/grpc-ingest/src/download_metadata.rs +++ /dev/null @@ -1,236 +0,0 @@ -use { - crate::{ - config::{ConfigDownloadMetadata, ConfigDownloadMetadataOpts}, - postgres::{create_pool as pg_create_pool, metrics_pgpool}, - util::create_shutdown, - }, - das_core::DownloadMetadataInfo, - digital_asset_types::dao::{asset_data, sea_orm_active_enums::TaskStatus, tasks}, - futures::{ - future::{pending, FutureExt}, - stream::StreamExt, - }, - reqwest::{ClientBuilder, StatusCode}, - sea_orm::{ - entity::{ActiveValue, ColumnTrait, EntityTrait}, - query::{Condition, Order, QueryFilter, QueryOrder, QuerySelect}, - sea_query::expr::Expr, - SqlxPostgresConnector, TransactionTrait, - }, - sqlx::PgPool, - std::{sync::Arc, time::Duration}, - tokio::{ - sync::{mpsc, Notify}, - task::JoinSet, - time::sleep, - }, - tracing::{info, warn}, -}; - -pub const TASK_TYPE: &str = "DownloadMetadata"; - -pub async fn run(config: ConfigDownloadMetadata) -> anyhow::Result<()> { - let mut shutdown = create_shutdown()?; - - // open connection to postgres - let pool = pg_create_pool(config.postgres).await?; - tokio::spawn({ - let pool = pool.clone(); - async move { metrics_pgpool(pool).await } - }); - - // reset previously runned tasks - tokio::select! { - result = reset_pending_tasks(pool.clone()) => { - let updated = result?; - info!("Reset {updated} tasks to Pending status"); - }, - Some(signal) = shutdown.next() => { - warn!("{signal} received, waiting spawned tasks..."); - return Ok(()) - }, - } - - // prefetch queue - let (tasks_tx, mut tasks_rx) = mpsc::channel(config.download_metadata.prefetch_queue_size); - let prefetch_shutdown = Arc::new(Notify::new()); - let prefetch_jh = { - let pool = pool.clone(); - let download_metadata = config.download_metadata; - let shutdown = Arc::clone(&prefetch_shutdown); - async move { - tokio::select! { - result = get_pending_tasks(pool, tasks_tx, download_metadata) => result, - _ = shutdown.notified() => Ok(()) - } - } - }; - tokio::pin!(prefetch_jh); - - // process tasks - let mut tasks = JoinSet::new(); - loop { - let pending_task_fut = if tasks.len() >= config.download_metadata.max_in_process { - pending().boxed() - } else { - tasks_rx.recv().boxed() - }; - - let tasks_fut = if tasks.is_empty() { - pending().boxed() - } else { - tasks.join_next().boxed() - }; - - tokio::select! { - Some(signal) = shutdown.next() => { - warn!("{signal} received, waiting spawned tasks..."); - break Ok(()); - }, - result = &mut prefetch_jh => break result, - Some(result) = tasks_fut => { - result??; - }, - Some(pending_task) = pending_task_fut => { - tasks.spawn(execute_task(pool.clone(), pending_task, config.download_metadata.download_timeout)); - } - }; - }?; - - tokio::select! { - Some(signal) = shutdown.next() => { - anyhow::bail!("{signal} received, force shutdown..."); - } - result = async move { - // shutdown `prefetch` channel - prefetch_shutdown.notify_one(); - // wait all spawned tasks - while let Some(result) = tasks.join_next().await { - result??; - } - // shutdown database connection - pool.close().await; - Ok::<(), anyhow::Error>(()) - } => result, - } -} - -// On startup reset tasks status -async fn reset_pending_tasks(pool: PgPool) -> anyhow::Result { - let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); - tasks::Entity::update_many() - .set(tasks::ActiveModel { - status: ActiveValue::Set(TaskStatus::Pending), - ..Default::default() - }) - .filter( - Condition::all() - .add(tasks::Column::Status.eq(TaskStatus::Running)) - .add(tasks::Column::TaskType.eq(TASK_TYPE)), - ) - .exec(&conn) - .await - .map(|result| result.rows_affected) - .map_err(Into::into) -} - -// Select Pending tasks, update status to Running and send to prefetch queue -async fn get_pending_tasks( - pool: PgPool, - tasks_tx: mpsc::Sender, - config: ConfigDownloadMetadataOpts, -) -> anyhow::Result<()> { - let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); - loop { - let pending_tasks = tasks::Entity::find() - .filter( - Condition::all() - .add(tasks::Column::Status.eq(TaskStatus::Pending)) - .add( - Expr::col(tasks::Column::Attempts) - .less_or_equal(Expr::col(tasks::Column::MaxAttempts)), - ), - ) - .order_by(tasks::Column::Attempts, Order::Asc) - .order_by(tasks::Column::CreatedAt, Order::Desc) - .limit(config.limit_to_fetch as u64) - .all(&conn) - .await?; - - if pending_tasks.is_empty() { - sleep(config.wait_tasks_max_idle).await; - } else { - tasks::Entity::update_many() - .set(tasks::ActiveModel { - status: ActiveValue::Set(TaskStatus::Running), - ..Default::default() - }) - .filter(tasks::Column::Id.is_in(pending_tasks.iter().map(|v| v.id.clone()))) - .exec(&conn) - .await?; - - for task in pending_tasks { - tasks_tx - .send(task) - .await - .map_err(|_error| anyhow::anyhow!("failed to send task to prefetch queue"))?; - } - } - } -} - -// Try to download metadata and remove task with asset_data update or update tasks to Pending/Failed -async fn execute_task(pool: PgPool, task: tasks::Model, timeout: Duration) -> anyhow::Result<()> { - let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); - match download_metadata(task.data, timeout).await { - Ok((asset_data_id, metadata)) => { - // Remove task and set metadata in transacstion - let txn = conn.begin().await?; - tasks::Entity::delete_by_id(task.id).exec(&txn).await?; - asset_data::Entity::update(asset_data::ActiveModel { - id: ActiveValue::Unchanged(asset_data_id), - metadata: ActiveValue::Set(metadata), - reindex: ActiveValue::Set(Some(false)), - ..Default::default() - }) - .exec(&txn) - .await?; - txn.commit().await?; - } - Err(error) => { - let status = if task.attempts + 1 == task.max_attempts { - TaskStatus::Failed - } else { - TaskStatus::Pending - }; - tasks::Entity::update(tasks::ActiveModel { - id: ActiveValue::Unchanged(task.id), - status: ActiveValue::Set(status), - attempts: ActiveValue::Set(task.attempts + 1), - errors: ActiveValue::Set(Some(error.to_string())), - ..Default::default() - }) - .exec(&conn) - .await?; - } - } - Ok(()) -} - -async fn download_metadata( - data: serde_json::Value, - timeout: Duration, -) -> anyhow::Result<(Vec, serde_json::Value)> { - let (id, uri, _slot) = serde_json::from_value::(data)?.into_inner(); - - // Need to check for malicious sites ? - let client = ClientBuilder::new().timeout(timeout).build()?; - let response = client.get(uri).send().await?; - - anyhow::ensure!( - response.status() == StatusCode::OK, - "HttpError status_code: {}", - response.status().as_str() - ); - Ok((id, response.json().await?)) -} diff --git a/grpc-ingest/src/grpc.rs b/grpc-ingest/src/grpc.rs index 5212a589..82f9237d 100644 --- a/grpc-ingest/src/grpc.rs +++ b/grpc-ingest/src/grpc.rs @@ -1,26 +1,19 @@ use { crate::{ - config::{ConfigGrpc, REDIS_STREAM_DATA_KEY}, - prom::redis_xadd_status_inc, - redis::{metrics_xlen, TrackedPipeline}, + config::ConfigGrpc, prom::redis_xadd_status_inc, redis::TrackedPipeline, util::create_shutdown, }, anyhow::Context, - futures::{channel::mpsc, stream::StreamExt, SinkExt}, - redis::{streams::StreamMaxlen, RedisResult, Value as RedisValue}, + futures::stream::StreamExt, + redis::streams::StreamMaxlen, std::{collections::HashMap, sync::Arc, time::Duration}, - tokio::{ - spawn, - sync::Mutex, - task::JoinSet, - time::{sleep, Instant}, - }, + tokio::{sync::Mutex, time::sleep}, topograph::{ executor::{Executor, Nonblock, Tokio}, prelude::*, AsyncHandler, }, - tracing::{debug, info, warn}, + tracing::{debug, warn}, yellowstone_grpc_client::GeyserGrpcClient, yellowstone_grpc_proto::{ geyser::{SubscribeRequest, SubscribeUpdate}, @@ -118,7 +111,7 @@ impl<'a> AsyncHandler anyhow::Result<()> { +pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> { let redis_client = redis::Client::open(config.redis.url.clone())?; let config = Arc::new(config); let connection = redis_client.get_multiplexed_tokio_connection().await?; @@ -142,17 +135,11 @@ pub async fn run_v2(config: ConfigGrpc) -> anyhow::Result<()> { ..Default::default() }; - info!( - "cmd=grpc2redis request_snapshot={:?}", - config.request_snapshot - ); - let mut dragon_mouth_client = GeyserGrpcClient::build_from_shared(config.geyser_endpoint.clone())? .x_token(config.x_token.clone())? .connect_timeout(Duration::from_secs(10)) .timeout(Duration::from_secs(10)) - .set_x_request_snapshot(config.request_snapshot)? .connect() .await .context("failed to connect to gRPC")?; @@ -192,183 +179,3 @@ pub async fn run_v2(config: ConfigGrpc) -> anyhow::Result<()> { Ok(()) } - -#[allow(dead_code)] -pub async fn run(config: ConfigGrpc) -> anyhow::Result<()> { - let config = Arc::new(config); - let (tx, mut rx) = mpsc::channel::(config.geyser_update_message_buffer_size); // Adjust buffer size as needed - - // Connect to Redis - let client = redis::Client::open(config.redis.url.clone())?; - let connection = client.get_multiplexed_tokio_connection().await?; - - // Check stream length for the metrics - let jh_metrics_xlen = spawn({ - let connection = connection.clone(); - let streams = vec![ - config.accounts.stream.clone(), - config.transactions.stream.clone(), - ]; - async move { metrics_xlen(connection, &streams).await } - }); - tokio::pin!(jh_metrics_xlen); - - // Spawn gRPC client connections - let config = Arc::clone(&config); - let mut tx = tx.clone(); - - let mut client = GeyserGrpcClient::build_from_shared(config.geyser_endpoint.clone())? - .x_token(config.x_token.clone())? - .connect_timeout(Duration::from_secs(10)) - .timeout(Duration::from_secs(10)) - .connect() - .await - .context("failed to connect to gRPC")?; - - let grc_config = Arc::clone(&config); - spawn(async move { - let mut accounts = HashMap::with_capacity(1); - let mut transactions = HashMap::with_capacity(1); - - accounts.insert( - "das".to_string(), - grc_config.accounts.filter.clone().to_proto(), - ); - transactions.insert( - "das".to_string(), - grc_config.transactions.filter.clone().to_proto(), - ); - - let request = SubscribeRequest { - accounts, - transactions, - ..Default::default() - }; - - let (_subscribe_tx, mut stream) = client.subscribe_with_request(Some(request)).await?; - - while let Some(Ok(msg)) = stream.next().await { - if let Some(update) = msg.update_oneof { - tx.send(update) - .await - .expect("Failed to send update to management thread"); - } - } - Ok::<(), anyhow::Error>(()) - }); - - // Management thread - let mut shutdown = create_shutdown()?; - let mut tasks = JoinSet::new(); - let mut pipe = redis::pipe(); - let mut pipe_accounts = 0; - let mut pipe_transactions = 0; - - let pipeline_max_idle = config.redis.pipeline_max_idle; - let deadline = sleep(pipeline_max_idle); - tokio::pin!(deadline); - - let result = loop { - tokio::select! { - result = &mut jh_metrics_xlen => match result { - Ok(Ok(_)) => unreachable!(), - Ok(Err(error)) => break Err(error), - Err(error) => break Err(error.into()), - }, - Some(signal) = shutdown.next() => { - warn!("{signal} received, waiting spawned tasks..."); - break Ok(()); - }, - Some(update) = rx.next() => { - match update { - UpdateOneof::Account(account) => { - - pipe.xadd_maxlen( - &config.accounts.stream, - StreamMaxlen::Approx(config.accounts.stream_maxlen), - "*", - &[(REDIS_STREAM_DATA_KEY, account.encode_to_vec())], - ); - - pipe_accounts += 1; - } - UpdateOneof::Transaction(transaction) => { - pipe.xadd_maxlen( - &config.transactions.stream, - StreamMaxlen::Approx(config.transactions.stream_maxlen), - "*", - &[(REDIS_STREAM_DATA_KEY, transaction.encode_to_vec())] - ); - - pipe_transactions += 1; - } - _ => continue, - } - if pipe_accounts + pipe_transactions >= config.redis.pipeline_max_size { - let mut pipe = std::mem::replace(&mut pipe, redis::pipe()); - let pipe_accounts = std::mem::replace(&mut pipe_accounts, 0); - let pipe_transactions = std::mem::replace(&mut pipe_transactions, 0); - deadline.as_mut().reset(Instant::now() + config.redis.pipeline_max_idle); - - tasks.spawn({ - let mut connection = connection.clone(); - let config = Arc::clone(&config); - async move { - let result: RedisResult = - pipe.atomic().query_async(&mut connection).await; - - let status = result.map(|_| ()).map_err(|_| ()); - redis_xadd_status_inc(&config.accounts.stream, status, pipe_accounts); - redis_xadd_status_inc(&config.transactions.stream, status, pipe_transactions); - - Ok::<(), anyhow::Error>(()) - } - }); - } - }, - _ = &mut deadline => { - if pipe_accounts + pipe_transactions > 0 { - let mut pipe = std::mem::replace(&mut pipe, redis::pipe()); - let pipe_accounts = std::mem::replace(&mut pipe_accounts, 0); - let pipe_transactions = std::mem::replace(&mut pipe_transactions, 0); - deadline.as_mut().reset(Instant::now() + config.redis.pipeline_max_idle); - - tasks.spawn({ - let mut connection = connection.clone(); - let config = Arc::clone(&config); - async move { - let result: RedisResult = - pipe.atomic().query_async(&mut connection).await; - - let status = result.map(|_| ()).map_err(|_| ()); - redis_xadd_status_inc(&config.accounts.stream, status, pipe_accounts); - redis_xadd_status_inc(&config.transactions.stream, status, pipe_transactions); - - Ok::<(), anyhow::Error>(()) - } - }); - } - }, - }; - - while tasks.len() >= config.redis.max_xadd_in_process { - if let Some(result) = tasks.join_next().await { - result??; - } - } - }; - - tokio::select! { - Some(signal) = shutdown.next() => { - anyhow::bail!("{signal} received, force shutdown..."); - } - result = async move { - while let Some(result) = tasks.join_next().await { - result??; - } - Ok::<(), anyhow::Error>(()) - } => result?, - }; - - result -} diff --git a/grpc-ingest/src/ingester.rs b/grpc-ingest/src/ingester.rs index 30a7924f..000f747b 100644 --- a/grpc-ingest/src/ingester.rs +++ b/grpc-ingest/src/ingester.rs @@ -1,46 +1,17 @@ use { crate::{ - config::{ConfigIngester, ConfigIngesterDownloadMetadata, REDIS_STREAM_DATA_KEY}, - download_metadata::TASK_TYPE, - postgres::{create_pool as pg_create_pool, metrics_pgpool, report_pgpool}, - prom::{ - download_metadata_inserted_total_inc, program_transformer_task_status_inc, - program_transformer_tasks_total_set, redis_xack_inc, ProgramTransformerTaskStatusKind, - }, - redis::{ - metrics_xlen, IngestStream, ProgramTransformerInfo, RedisStream, RedisStreamMessage, - }, + config::{ConfigIngester, REDIS_STREAM_DATA_KEY}, + postgres::{create_pool as pg_create_pool, report_pgpool}, + prom::redis_xack_inc, + redis::{IngestStream, RedisStreamMessage}, util::create_shutdown, }, - chrono::Utc, - crypto::{digest::Digest, sha2::Sha256}, das_core::{DownloadMetadata, DownloadMetadataInfo, DownloadMetadataNotifier}, - digital_asset_types::dao::{sea_orm_active_enums::TaskStatus, tasks}, - futures::{ - future::{pending, BoxFuture, FusedFuture, FutureExt}, - stream::StreamExt, - }, - program_transformers::{ - error::ProgramTransformerError, AccountInfo, ProgramTransformer, TransactionInfo, - }, + futures::{future::BoxFuture, stream::StreamExt}, + program_transformers::{AccountInfo, ProgramTransformer, TransactionInfo}, redis::aio::MultiplexedConnection, - sea_orm::{ - entity::{ActiveModelTrait, ActiveValue}, - error::{DbErr, RuntimeErr}, - SqlxPostgresConnector, - }, - sqlx::{Error as SqlxError, PgPool}, - std::{ - borrow::Cow, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - }, - tokio::{ - task::JoinSet, - time::{sleep, Duration}, - }, + std::sync::Arc, + tokio::time::{sleep, Duration}, tracing::warn, }; @@ -81,8 +52,8 @@ fn download_metadata_notifier_v2( ) } -pub async fn run_v2(config: ConfigIngester) -> anyhow::Result<()> { - let redis_client = redis::Client::open(config.redis.url.clone())?; +pub async fn run(config: ConfigIngester) -> anyhow::Result<()> { + let redis_client = redis::Client::open(config.redis)?; let connection = redis_client.get_multiplexed_tokio_connection().await?; let pool = pg_create_pool(config.postgres).await?; @@ -91,12 +62,17 @@ pub async fn run_v2(config: ConfigIngester) -> anyhow::Result<()> { let accounts_download_metadata_notifier = download_metadata_notifier_v2( connection.clone(), - download_metadata_stream.clone(), + download_metadata_stream.name.clone(), + download_metadata_stream_maxlen, + )?; + let snapshots_download_metadata_notifier = download_metadata_notifier_v2( + connection.clone(), + download_metadata_stream.name.clone(), download_metadata_stream_maxlen, )?; let transactions_download_metadata_notifier = download_metadata_notifier_v2( connection.clone(), - download_metadata_stream.clone(), + download_metadata_stream.name.clone(), download_metadata_stream_maxlen, )?; @@ -104,6 +80,10 @@ pub async fn run_v2(config: ConfigIngester) -> anyhow::Result<()> { pool.clone(), accounts_download_metadata_notifier, )); + let pt_snapshots = Arc::new(ProgramTransformer::new( + pool.clone(), + snapshots_download_metadata_notifier, + )); let pt_transactions = Arc::new(ProgramTransformer::new( pool.clone(), transactions_download_metadata_notifier, @@ -114,7 +94,7 @@ pub async fn run_v2(config: ConfigIngester) -> anyhow::Result<()> { let download_metadata = Arc::new(DownloadMetadata::new(http_client, pool.clone())); let download_metadata_stream = IngestStream::build() - .config(config.download_metadata.stream_config.clone()) + .config(config.download_metadata.stream.clone()) .connection(connection.clone()) .handler(move |info| { let download_metadata = Arc::clone(&download_metadata); @@ -161,258 +141,46 @@ pub async fn run_v2(config: ConfigIngester) -> anyhow::Result<()> { }) }) .start()?; + let snapshot_stream = IngestStream::build() + .config(config.snapshots.clone()) + .connection(connection.clone()) + .handler(move |info| { + let pt_snapshots = Arc::clone(&pt_snapshots); + + Box::pin(async move { + let info = AccountInfo::try_parse_msg(info)?; + + pt_snapshots + .handle_account_update(&info) + .await + .map_err(Into::into) + }) + }) + .start()?; let mut shutdown = create_shutdown()?; - loop { - tokio::select! { - _ = sleep(Duration::from_millis(100)) => { - report_pgpool(pool.clone()); - } - Some(signal) = shutdown.next() => { - warn!("{signal} received, waiting spawned tasks..."); - break; - } + let report_pool = pool.clone(); + let report_handle = tokio::spawn(async move { + let pool = report_pool.clone(); + loop { + sleep(Duration::from_millis(100)).await; + report_pgpool(pool.clone()); } + }); + + if let Some(signal) = shutdown.next().await { + warn!("{signal} received, waiting for spawned tasks..."); } + report_handle.abort(); + account_stream.stop().await?; transactions_stream.stop().await?; download_metadata_stream.stop().await?; + snapshot_stream.stop().await?; pool.close().await; Ok::<(), anyhow::Error>(()) } - -#[allow(dead_code)] -pub async fn run(config: ConfigIngester) -> anyhow::Result<()> { - // connect to Redis - let client = redis::Client::open(config.redis.url.clone())?; - let connection = client.get_multiplexed_tokio_connection().await?; - - // check stream length for the metrics in spawned task - let jh_metrics_xlen = tokio::spawn({ - let connection = connection.clone(); - let streams = config - .redis - .streams - .iter() - .map(|config| config.stream.to_string()) - .collect::>(); - async move { metrics_xlen(connection, &streams).await } - }); - tokio::pin!(jh_metrics_xlen); - - // open connection to postgres - let pgpool = pg_create_pool(config.postgres).await?; - tokio::spawn({ - let pgpool = pgpool.clone(); - async move { metrics_pgpool(pgpool).await } - }); - - // create redis stream reader - let (mut redis_messages, redis_tasks_fut) = RedisStream::new(config.redis, connection).await?; - tokio::pin!(redis_tasks_fut); - - // program transforms related - let pt_accounts = Arc::new(ProgramTransformer::new( - pgpool.clone(), - create_download_metadata_notifier(pgpool.clone(), config.download_metadata.clone())?, - )); - let pt_transactions = Arc::new(ProgramTransformer::new( - pgpool.clone(), - create_download_metadata_notifier(pgpool.clone(), config.download_metadata)?, - )); - let pt_max_tasks_in_process = config.program_transformer.max_tasks_in_process; - let mut pt_tasks = JoinSet::new(); - let pt_tasks_len = Arc::new(AtomicUsize::new(0)); - - tokio::spawn({ - let pt_tasks_len = Arc::clone(&pt_tasks_len); - - async move { - loop { - program_transformer_tasks_total_set(pt_tasks_len.load(Ordering::Relaxed)); - sleep(Duration::from_millis(100)).await; - } - } - }); - - // read and process messages in the loop - let mut shutdown = create_shutdown()?; - loop { - pt_tasks_len.store(pt_tasks.len(), Ordering::Relaxed); - - let redis_messages_recv = if pt_tasks.len() == pt_max_tasks_in_process { - pending().boxed() - } else { - redis_messages.recv().boxed() - }; - let pt_tasks_next = if pt_tasks.is_empty() { - pending().boxed() - } else { - pt_tasks.join_next().boxed() - }; - - let msg = tokio::select! { - result = &mut jh_metrics_xlen => match result { - Ok(Ok(_)) => unreachable!(), - Ok(Err(error)) => break Err(error), - Err(error) => break Err(error.into()), - }, - Some(signal) = shutdown.next() => { - warn!("{signal} received, waiting spawned tasks..."); - break Ok(()); - }, - result = &mut redis_tasks_fut => break result, - msg = redis_messages_recv => match msg { - Some(msg) => msg, - None => break Ok(()), - }, - result = pt_tasks_next => { - if let Some(result) = result { - result??; - } - continue; - } - }; - - pt_tasks.spawn({ - let pt_accounts = Arc::clone(&pt_accounts); - let pt_transactions = Arc::clone(&pt_transactions); - async move { - let result = match &msg.get_data() { - ProgramTransformerInfo::Account(account) => { - pt_accounts.handle_account_update(account).await - } - ProgramTransformerInfo::Transaction(transaction) => { - pt_transactions.handle_transaction(transaction).await - } - ProgramTransformerInfo::MetadataJson(_download_metadata_info) => Ok(()), - }; - - macro_rules! log_or_bail { - ($action:path, $msg:expr, $error:ident) => { - match msg.get_data() { - ProgramTransformerInfo::Account(account) => { - $action!("{} account {}: {:?}", $msg, account.pubkey, $error) - } - ProgramTransformerInfo::Transaction(transaction) => { - $action!( - "{} transaction {}: {:?}", - $msg, - transaction.signature, - $error - ) - } - ProgramTransformerInfo::MetadataJson(_download_metadata_info) => { - todo!() - } - } - }; - } - - match result { - Ok(()) => program_transformer_task_status_inc( - ProgramTransformerTaskStatusKind::Success, - ), - Err(ProgramTransformerError::NotImplemented) => { - program_transformer_task_status_inc( - ProgramTransformerTaskStatusKind::NotImplemented, - ) - } - Err(ProgramTransformerError::DeserializationError(error)) => { - program_transformer_task_status_inc( - ProgramTransformerTaskStatusKind::DeserializationError, - ); - log_or_bail!(warn, "failed to deserialize", error) - } - Err(ProgramTransformerError::ParsingError(error)) => { - program_transformer_task_status_inc( - ProgramTransformerTaskStatusKind::ParsingError, - ); - log_or_bail!(warn, "failed to parse", error) - } - Err(ProgramTransformerError::DatabaseError(error)) => { - log_or_bail!(anyhow::bail, "database error for", error) - } - Err(ProgramTransformerError::AssetIndexError(error)) => { - log_or_bail!(anyhow::bail, "indexing error for ", error) - } - Err(error) => { - log_or_bail!(anyhow::bail, "failed to handle", error) - } - } - - msg.ack() - } - }); - }?; - - tokio::select! { - Some(signal) = shutdown.next() => { - anyhow::bail!("{signal} received, force shutdown..."); - } - result = async move { - // shutdown `prefetch` channel (but not Receiver) - redis_messages.shutdown(); - // wait all `program_transformer` spawned tasks - while let Some(result) = pt_tasks.join_next().await { - result??; - } - // wait all `ack` spawned tasks - if !redis_tasks_fut.is_terminated() { - redis_tasks_fut.await?; - } - // shutdown database connection - pgpool.close().await; - Ok::<(), anyhow::Error>(()) - } => result, - } -} - -fn create_download_metadata_notifier( - pgpool: PgPool, - config: ConfigIngesterDownloadMetadata, -) -> anyhow::Result { - let max_attempts = config.max_attempts.try_into()?; - Ok(Box::new(move |info: DownloadMetadataInfo| -> BoxFuture< - 'static, - Result<(), Box>, - > { - let pgpool = pgpool.clone(); - Box::pin(async move { - let data = serde_json::to_value(info)?; - - let mut hasher = Sha256::new(); - hasher.input(TASK_TYPE.as_bytes()); - hasher.input(serde_json::to_vec(&data)?.as_slice()); - let hash = hasher.result_str(); - - let model = tasks::ActiveModel { - id: ActiveValue::Set(hash), - task_type: ActiveValue::Set(TASK_TYPE.to_owned()), - data: ActiveValue::Set(data), - status: ActiveValue::Set(TaskStatus::Pending), - created_at: ActiveValue::Set(Utc::now().naive_utc()), - locked_until: ActiveValue::Set(None), - locked_by: ActiveValue::Set(None), - max_attempts: ActiveValue::Set(max_attempts), - attempts: ActiveValue::Set(0), - duration: ActiveValue::Set(None), - errors: ActiveValue::Set(None), - }; - let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pgpool); - - match model.insert(&conn).await.map(|_mode| ()) { - // skip unique_violation error - Err(DbErr::Query(RuntimeErr::SqlxError(SqlxError::Database(dberr)))) if dberr.code() == Some(Cow::Borrowed("23505")) => {}, - value => value?, - }; - download_metadata_inserted_total_inc(); - - Ok(()) - }) - })) -} diff --git a/grpc-ingest/src/main.rs b/grpc-ingest/src/main.rs index f467d396..907e1e00 100644 --- a/grpc-ingest/src/main.rs +++ b/grpc-ingest/src/main.rs @@ -1,9 +1,6 @@ use { crate::{ - config::{ - load as config_load, ConfigDownloadMetadata, ConfigGrpc, ConfigIngester, - ConfigPrometheus, - }, + config::{load as config_load, ConfigGrpc, ConfigIngester, ConfigPrometheus}, prom::run_server as prometheus_run_server, tracing::init as tracing_init, }, @@ -13,7 +10,6 @@ use { }; mod config; -mod download_metadata; mod grpc; mod ingester; mod postgres; @@ -46,9 +42,6 @@ enum ArgsAction { /// Run ingester process (process events from Redis) #[command(name = "ingester")] Ingester, - /// Run metadata downloader - #[command(name = "download-metadata")] - DownloadMetadata, } #[tokio::main] @@ -71,19 +64,13 @@ async fn main() -> anyhow::Result<()> { let config = config_load::(&args.config) .await .with_context(|| format!("failed to parse config from: {}", args.config))?; - grpc::run_v2(config).await + grpc::run(config).await } ArgsAction::Ingester => { let config = config_load::(&args.config) .await .with_context(|| format!("failed to parse config from: {}", args.config))?; - ingester::run_v2(config).await - } - ArgsAction::DownloadMetadata => { - let config = config_load::(&args.config) - .await - .with_context(|| format!("failed to parse config from: {}", args.config))?; - download_metadata::run(config).await + ingester::run(config).await } } } diff --git a/grpc-ingest/src/postgres.rs b/grpc-ingest/src/postgres.rs index dd29ca73..07e0ff92 100644 --- a/grpc-ingest/src/postgres.rs +++ b/grpc-ingest/src/postgres.rs @@ -7,7 +7,6 @@ use { postgres::{PgConnectOptions, PgPoolOptions}, PgPool, }, - tokio::time::{sleep, Duration}, }; pub async fn create_pool(config: ConfigIngesterPostgres) -> anyhow::Result { @@ -24,10 +23,3 @@ pub fn report_pgpool(pgpool: PgPool) { pgpool_connections_set(PgpoolConnectionsKind::Total, pgpool.size() as usize); pgpool_connections_set(PgpoolConnectionsKind::Idle, pgpool.num_idle()); } - -pub async fn metrics_pgpool(pgpool: PgPool) { - loop { - report_pgpool(pgpool.clone()); - sleep(Duration::from_millis(100)).await; - } -} diff --git a/grpc-ingest/src/prom.rs b/grpc-ingest/src/prom.rs index 83b51011..33f2c622 100644 --- a/grpc-ingest/src/prom.rs +++ b/grpc-ingest/src/prom.rs @@ -162,10 +162,6 @@ pub fn pgpool_connections_set(kind: PgpoolConnectionsKind, size: usize) { .set(size as i64) } -pub fn program_transformer_tasks_total_set(size: usize) { - PROGRAM_TRANSFORMER_TASKS_TOTAL.set(size as i64) -} - pub fn ingest_tasks_total_inc(stream: &str) { INGEST_TASKS_TOTAL.with_label_values(&[stream]).inc() } @@ -287,7 +283,3 @@ pub fn program_transformer_task_status_inc(kind: ProgramTransformerTaskStatusKin .with_label_values(&[kind.to_str()]) .inc() } - -pub fn download_metadata_inserted_total_inc() { - DOWNLOAD_METADATA_INSERTED_TOTAL.inc() -} diff --git a/grpc-ingest/src/redis.rs b/grpc-ingest/src/redis.rs index feef7228..8510683c 100644 --- a/grpc-ingest/src/redis.rs +++ b/grpc-ingest/src/redis.rs @@ -1,9 +1,6 @@ use { crate::{ - config::{ - ConfigIngestStream, ConfigIngesterRedis, ConfigIngesterRedisStreamType, - REDIS_STREAM_DATA_KEY, - }, + config::{ConfigIngestStream, REDIS_STREAM_DATA_KEY}, prom::{ ingest_tasks_reset, ingest_tasks_total_dec, ingest_tasks_total_inc, program_transformer_task_status_inc, redis_xack_inc, redis_xlen_set, @@ -11,7 +8,7 @@ use { }, }, das_core::DownloadMetadataInfo, - futures::future::{BoxFuture, Fuse, FutureExt}, + futures::future::BoxFuture, program_transformers::{AccountInfo, TransactionInfo}, redis::{ aio::MultiplexedConnection, @@ -22,19 +19,8 @@ use { AsyncCommands, ErrorKind as RedisErrorKind, RedisResult, Value as RedisValue, }, solana_sdk::{pubkey::Pubkey, signature::Signature}, - std::{ - collections::HashMap, - convert::Infallible, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - }, - tokio::{ - sync::mpsc, - task::JoinSet, - time::{sleep, Duration, Instant}, - }, + std::{collections::HashMap, sync::Arc}, + tokio::time::{sleep, Duration}, topograph::{ executor::{Executor, Nonblock, Tokio}, prelude::*, @@ -660,24 +646,6 @@ pub async fn report_xlen( Ok(()) } -pub async fn metrics_xlen( - mut connection: C, - streams: &[String], -) -> anyhow::Result { - loop { - let mut pipe = redis::pipe(); - for stream in streams { - pipe.xlen(stream); - } - let xlens: Vec = pipe.query_async(&mut connection).await?; - - for (stream, xlen) in streams.iter().zip(xlens.into_iter()) { - redis_xlen_set(stream, xlen); - } - - sleep(Duration::from_millis(100)).await; - } -} pub async fn xgroup_create( connection: &mut C, @@ -707,463 +675,6 @@ pub async fn xgroup_create( Ok(()) } -#[derive(Debug)] -struct RedisStreamInfo { - group: String, - consumer: String, - stream_name: String, - stream_type: ConfigIngesterRedisStreamType, - xack_batch_max_size: usize, - xack_batch_max_idle: Duration, - xack_max_in_process: usize, -} - -#[derive(Debug)] -pub enum ProgramTransformerInfo { - Account(AccountInfo), - Transaction(TransactionInfo), - MetadataJson(DownloadMetadataInfo), -} - -#[derive(Debug)] -pub struct RedisStreamMessageInfo { - id: String, - data: ProgramTransformerInfo, - ack_tx: mpsc::UnboundedSender, -} - -impl RedisStreamMessageInfo { - fn parse( - stream: &RedisStreamInfo, - StreamId { id, map }: StreamId, - ack_tx: mpsc::UnboundedSender, - ) -> anyhow::Result { - let to_anyhow = |error: String| anyhow::anyhow!(error); - - let data = match map.get(REDIS_STREAM_DATA_KEY) { - Some(RedisValue::Data(vec)) => match stream.stream_type { - ConfigIngesterRedisStreamType::Account => { - let SubscribeUpdateAccount { account, slot, .. } = - Message::decode(vec.as_ref())?; - - let account = account.ok_or_else(|| { - anyhow::anyhow!("received invalid SubscribeUpdateAccount") - })?; - - ProgramTransformerInfo::Account(AccountInfo { - slot, - pubkey: Pubkey::try_from(account.pubkey.as_slice())?, - owner: Pubkey::try_from(account.owner.as_slice())?, - data: account.data, - }) - } - ConfigIngesterRedisStreamType::Transaction => { - let SubscribeUpdateTransaction { transaction, slot } = - Message::decode(vec.as_ref())?; - - let transaction = transaction.ok_or_else(|| { - anyhow::anyhow!("received invalid SubscribeUpdateTransaction") - })?; - let tx = transaction.transaction.ok_or_else(|| { - anyhow::anyhow!( - "received invalid transaction in SubscribeUpdateTransaction" - ) - })?; - let message = tx.message.ok_or_else(|| { - anyhow::anyhow!("received invalid message in SubscribeUpdateTransaction") - })?; - let meta = transaction.meta.ok_or_else(|| { - anyhow::anyhow!("received invalid meta in SubscribeUpdateTransaction") - })?; - - let mut account_keys = - create_pubkey_vec(message.account_keys).map_err(to_anyhow)?; - for pubkey in - create_pubkey_vec(meta.loaded_writable_addresses).map_err(to_anyhow)? - { - account_keys.push(pubkey); - } - for pubkey in - create_pubkey_vec(meta.loaded_readonly_addresses).map_err(to_anyhow)? - { - account_keys.push(pubkey); - } - - ProgramTransformerInfo::Transaction(TransactionInfo { - slot, - signature: Signature::try_from(transaction.signature.as_slice())?, - account_keys, - message_instructions: create_message_instructions(message.instructions) - .map_err(to_anyhow)?, - meta_inner_instructions: create_meta_inner_instructions( - meta.inner_instructions, - ) - .map_err(to_anyhow)?, - }) - } - ConfigIngesterRedisStreamType::MetadataJson => { - let info: DownloadMetadataInfo = serde_json::from_slice(vec.as_ref())?; - - ProgramTransformerInfo::MetadataJson(info) - } - }, - Some(_) => anyhow::bail!( - "invalid data (key: {:?}) from stream {:?}", - REDIS_STREAM_DATA_KEY, - stream.stream_name - ), - None => anyhow::bail!( - "failed to get data (key: {:?}) from stream {:?}", - REDIS_STREAM_DATA_KEY, - stream.stream_name - ), - }; - Ok(Self { id, data, ack_tx }) - } - - pub const fn get_data(&self) -> &ProgramTransformerInfo { - &self.data - } - - pub fn ack(self) -> anyhow::Result<()> { - self.ack_tx - .send(self.id) - .map_err(|_error| anyhow::anyhow!("failed to send message to ack channel")) - } -} - -#[derive(Debug)] -pub struct RedisStream { - shutdown: Arc, - messages_rx: mpsc::Receiver, -} - -#[allow(dead_code)] -async fn run_ack( - stream: Arc, - connection: MultiplexedConnection, - mut ack_rx: mpsc::UnboundedReceiver, -) -> anyhow::Result<()> { - let mut ids = vec![]; - let deadline = sleep(stream.xack_batch_max_idle); - tokio::pin!(deadline); - let mut tasks = JoinSet::new(); - - let result = loop { - let terminated = tokio::select! { - msg = ack_rx.recv() => match msg { - Some(msg) => { - ids.push(msg); - if ids.len() < stream.xack_batch_max_size { - continue; - } - false - } - None => true, - }, - _ = &mut deadline => false, - }; - - let ids = std::mem::take(&mut ids); - deadline - .as_mut() - .reset(Instant::now() + stream.xack_batch_max_idle); - if !ids.is_empty() { - tasks.spawn({ - let stream = Arc::clone(&stream); - let mut connection = connection.clone(); - async move { - match redis::pipe() - .atomic() - .xack(&stream.stream_name, &stream.group, &ids) - .xdel(&stream.stream_name, &ids) - .query_async::<_, redis::Value>(&mut connection) - .await - { - Ok(_) => { - debug!("Acknowledged and deleted idle messages: {:?}", ids); - redis_xack_inc(&stream.stream_name, ids.len()); - } - Err(e) => { - error!("Failed to acknowledge or delete idle messages: {:?}", e); - } - } - redis_xack_inc(&stream.stream_name, ids.len()); - Ok::<(), anyhow::Error>(()) - } - }); - while tasks.len() >= stream.xack_max_in_process { - if let Some(result) = tasks.join_next().await { - result??; - } - } - } - - if terminated { - break Ok(()); - } - }; - - while let Some(result) = tasks.join_next().await { - result??; - } - - result -} - -impl RedisStream { - pub async fn new( - config: ConfigIngesterRedis, - mut connection: MultiplexedConnection, - ) -> anyhow::Result<(Self, Fuse>>)> { - // create group with consumer per stream - for stream in config.streams.iter() { - xgroup_create( - &mut connection, - stream.stream, - &config.group, - &config.consumer, - ) - .await?; - } - - // shutdown flag - let shutdown = Arc::new(AtomicBool::new(false)); - - // create stream info wrapped by Arc - let mut ack_tasks = vec![]; - let streams = config - .streams - .iter() - .map(|stream| { - let (ack_tx, ack_rx) = mpsc::unbounded_channel(); - let info = Arc::new(RedisStreamInfo { - group: config.group.clone(), - consumer: config.consumer.clone(), - stream_name: stream.stream.to_string(), - stream_type: stream.stream_type, - xack_batch_max_size: stream.xack_batch_max_size, - xack_batch_max_idle: stream.xack_batch_max_idle, - xack_max_in_process: stream.xack_max_in_process, - }); - ack_tasks.push((Arc::clone(&info), ack_rx)); - (stream.stream, (ack_tx, info)) - }) - .collect::>(); - - // spawn xack tasks - let ack_jh_vec = ack_tasks - .into_iter() - .map(|(stream, ack_rx)| { - let connection = connection.clone(); - tokio::spawn(async move { Self::run_ack(stream, connection, ack_rx).await }) - }) - .collect::>(); - - // spawn prefetch task - let (messages_tx, messages_rx) = mpsc::channel(config.prefetch_queue_size); - let jh_prefetch = tokio::spawn({ - let shutdown = Arc::clone(&shutdown); - async move { Self::run_prefetch(config, streams, connection, messages_tx, shutdown).await } - }); - - // merge spawned xack / prefetch tasks - let spawned_tasks = async move { - jh_prefetch.await??; - for jh in ack_jh_vec.into_iter() { - jh.await??; - } - Ok::<(), anyhow::Error>(()) - }; - - Ok(( - Self { - shutdown, - messages_rx, - }, - spawned_tasks.boxed().fuse(), - )) - } - - pub async fn recv(&mut self) -> Option { - self.messages_rx.recv().await - } - - pub fn shutdown(mut self) { - self.shutdown.store(true, Ordering::Relaxed); - tokio::spawn(async move { while self.messages_rx.recv().await.is_some() {} }); - } - - async fn run_prefetch( - config: ConfigIngesterRedis, - streams: HashMap<&str, (mpsc::UnboundedSender, Arc)>, - mut connection: MultiplexedConnection, - messages_tx: mpsc::Sender, - shutdown: Arc, - ) -> anyhow::Result<()> { - // read pending first - for (ack_tx, stream) in streams.values() { - let mut start = "-".to_owned(); - while !shutdown.load(Ordering::Relaxed) { - let StreamPendingCountReply { ids: pending_ids } = redis::cmd("XPENDING") - .arg(&stream.stream_name) - .arg(&stream.group) - .arg(&start) - .arg("+") - .arg(config.xpending_max) - .arg(&stream.consumer) // we can't use `xpending_count` because it doesn't support `consumer` filter - .query_async(&mut connection) - .await?; - - // drop first item if we do not start from the beginning - let used_ids = if start == "-" { 0.. } else { 1.. }; - let ids_str = pending_ids[used_ids] - .iter() - .map(|pending| pending.id.as_str()) - .collect::>(); - - // check that we fetched all pendings and update start - match pending_ids.last() { - Some(id) => { - if id.id == start { - break; - } else { - start = id.id.clone(); - } - } - None => break, - } - - let StreamClaimReply { ids: pendings } = connection - .xclaim( - &stream.stream_name, - &stream.group, - &stream.consumer, - 0, - &ids_str, - ) - .await?; - for pending in pendings { - let item = RedisStreamMessageInfo::parse(stream, pending, ack_tx.clone())?; - messages_tx.send(item).await.map_err(|_error| { - anyhow::anyhow!("failed to send item to prefetch channel") - })?; - } - } - } - - // exit if need to handle only pending - if config.xpending_only { - return Ok(()); - } - - let streams_keys = streams.keys().collect::>(); - let streams_ids = (0..streams_keys.len()).map(|_| ">").collect::>(); - - while !shutdown.load(Ordering::Relaxed) { - let opts = StreamReadOptions::default() - .count(config.xreadgroup_max) - .group(&config.group, &config.consumer); - let results: StreamReadReply = connection - .xread_options(&streams_keys, &streams_ids, &opts) - .await?; - - if results.keys.is_empty() { - sleep(Duration::from_millis(5)).await; - continue; - } - - for StreamKey { key, ids } in results.keys { - let (ack_tx, stream) = match streams.get(key.as_str()) { - Some(value) => value, - None => anyhow::bail!("unknown stream: {:?}", key), - }; - - for id in ids { - let item = RedisStreamMessageInfo::parse(stream, id, ack_tx.clone())?; - messages_tx.send(item).await.map_err(|_error| { - anyhow::anyhow!("failed to send item to prefetch channel") - })?; - } - } - } - - Ok(()) - } - - async fn run_ack( - stream: Arc, - connection: MultiplexedConnection, - mut ack_rx: mpsc::UnboundedReceiver, - ) -> anyhow::Result<()> { - let mut ids = vec![]; - let deadline = sleep(stream.xack_batch_max_idle); - tokio::pin!(deadline); - let mut tasks = JoinSet::new(); - - let result = loop { - let terminated = tokio::select! { - msg = ack_rx.recv() => match msg { - Some(msg) => { - ids.push(msg); - if ids.len() < stream.xack_batch_max_size { - continue; - } - false - } - None => true, - }, - _ = &mut deadline => false, - }; - - let ids = std::mem::take(&mut ids); - deadline - .as_mut() - .reset(Instant::now() + stream.xack_batch_max_idle); - if !ids.is_empty() { - tasks.spawn({ - let stream = Arc::clone(&stream); - let mut connection = connection.clone(); - async move { - match redis::pipe() - .atomic() - .xack(&stream.stream_name, &stream.group, &ids) - .xdel(&stream.stream_name, &ids) - .query_async::<_, redis::Value>(&mut connection) - .await - { - Ok(info) => { - info!("Acknowledged and deleted idle messages: {:?}", info); - redis_xack_inc(&stream.stream_name, ids.len()); - } - Err(e) => { - error!("Failed to acknowledge or delete idle messages: {:?}", e); - } - } - redis_xack_inc(&stream.stream_name, ids.len()); - Ok::<(), anyhow::Error>(()) - } - }); - while tasks.len() >= stream.xack_max_in_process { - if let Some(result) = tasks.join_next().await { - result??; - } - } - } - - if terminated { - break Ok(()); - } - }; - - while let Some(result) = tasks.join_next().await { - result??; - } - - result - } -} - pub struct TrackedPipeline { pipeline: redis::Pipeline, counts: HashMap,