Skip to content

Commit

Permalink
Merge branch 'main' into metadata_fix
Browse files Browse the repository at this point in the history
  • Loading branch information
rtso authored May 14, 2024
2 parents 318d6e5 + 4b2db91 commit 26cd8ef
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 12 deletions.
10 changes: 10 additions & 0 deletions rust/processor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ pub struct IndexerGrpcProcessorConfig {
#[serde(default = "AHashMap::new")]
pub per_table_chunk_sizes: AHashMap<String, usize>,
pub enable_verbose_logging: Option<bool>,

#[serde(default = "IndexerGrpcProcessorConfig::default_grpc_response_item_timeout_in_secs")]
pub grpc_response_item_timeout_in_secs: u64,

#[serde(default)]
pub transaction_filter: TransactionFilter,
}
Expand All @@ -65,6 +69,11 @@ impl IndexerGrpcProcessorConfig {
pub const fn default_pb_channel_txn_chunk_size() -> usize {
100_000
}

/// Default timeout for grpc response item in seconds. Defaults to 60 seconds.
pub const fn default_grpc_response_item_timeout_in_secs() -> u64 {
60
}
}

#[async_trait::async_trait]
Expand All @@ -85,6 +94,7 @@ impl RunnableConfig for IndexerGrpcProcessorConfig {
self.per_table_chunk_sizes.clone(),
self.enable_verbose_logging,
self.transaction_filter.clone(),
self.grpc_response_item_timeout_in_secs,
)
.await
.context("Failed to build worker")?;
Expand Down
47 changes: 35 additions & 12 deletions rust/processor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ use url::Url;
// of 50 means that we could potentially have at least 4.8GB of data in memory at any given time and that we should provision
// machines accordingly.
pub const BUFFER_SIZE: usize = 100;
// Consumer thread will wait X seconds before panicking if it doesn't receive any data
pub const CONSUMER_THREAD_TIMEOUT_IN_SECS: u64 = 60 * 5;
pub const PROCESSOR_SERVICE_TYPE: &str = "processor";

pub struct Worker {
Expand All @@ -63,6 +61,7 @@ pub struct Worker {
pub per_table_chunk_sizes: AHashMap<String, usize>,
pub enable_verbose_logging: Option<bool>,
pub transaction_filter: TransactionFilter,
pub grpc_response_item_timeout_in_secs: u64,
}

impl Worker {
Expand All @@ -83,6 +82,7 @@ impl Worker {
per_table_chunk_sizes: AHashMap<String, usize>,
enable_verbose_logging: Option<bool>,
transaction_filter: TransactionFilter,
grpc_response_item_timeout_in_secs: u64,
) -> Result<Self> {
let processor_name = processor_config.name();
info!(processor_name = processor_name, "[Parser] Kicking off");
Expand Down Expand Up @@ -117,6 +117,7 @@ impl Worker {
per_table_chunk_sizes,
enable_verbose_logging,
transaction_filter,
grpc_response_item_timeout_in_secs,
})
}

Expand Down Expand Up @@ -306,7 +307,8 @@ impl Worker {
let chain_id = self
.grpc_chain_id
.expect("GRPC chain ID has not been fetched yet!");

let grpc_response_item_timeout =
std::time::Duration::from_secs(self.grpc_response_item_timeout_in_secs);
tokio::spawn(async move {
let task_index_str = task_index.to_string();
let step = ProcessorStep::ProcessedBatch.get_step();
Expand All @@ -321,6 +323,7 @@ impl Worker {
&stream_address,
receiver_clone.clone(),
task_index,
grpc_response_item_timeout,
)
.await;

Expand Down Expand Up @@ -600,9 +603,36 @@ async fn fetch_transactions(
stream_address: &str,
receiver: kanal::AsyncReceiver<TransactionsPBResponse>,
task_index: usize,
grpc_response_item_timeout: std::time::Duration,
) -> TransactionsPBResponse {
let pb_channel_fetch_time = std::time::Instant::now();
let txn_pb_res = receiver.recv().await;
let txn_pb_res = match tokio::time::timeout(grpc_response_item_timeout, receiver.recv()).await {
Ok(Ok(res)) => Ok(res),
Ok(Err(_)) => {
error!(
processor_name = processor_name,
service_type = PROCESSOR_SERVICE_TYPE,
stream_address = stream_address,
"[Parser][T#{}] Consumer thread failed to receive transactions",
task_index
);
Err(anyhow::anyhow!(
"Consumer thread failed to receive transactions"
))
},
Err(_) => {
error!(
processor_name = processor_name,
service_type = PROCESSOR_SERVICE_TYPE,
stream_address = stream_address,
"[Parser][T#{}] Consumer thread timed out waiting for transactions",
task_index
);
Err(anyhow::anyhow!(
"Consumer thread timed out waiting for transactions"
))
},
};
// Track how much time this task spent waiting for a pb bundle
PB_CHANNEL_FETCH_WAIT_TIME_SECS
.with_label_values(&[processor_name, &task_index.to_string()])
Expand All @@ -611,15 +641,8 @@ async fn fetch_transactions(
match txn_pb_res {
Ok(txn_pb) => txn_pb,
Err(_e) => {
error!(
processor_name = processor_name,
service_type = PROCESSOR_SERVICE_TYPE,
stream_address = stream_address,
"[Parser][T#{}] Consumer thread timed out waiting for transactions",
task_index
);
panic!(
"[Parser][T#{}] Consumer thread timed out waiting for transactions",
"[Parser][T#{}] Consumer thread failed or timed out waiting for transactions",
task_index
);
},
Expand Down

0 comments on commit 26cd8ef

Please sign in to comment.