diff --git a/api/bin/chainflip-ingress-egress-tracker/src/witnessing/btc.rs b/api/bin/chainflip-ingress-egress-tracker/src/witnessing/btc.rs index 0d2cb85c75..30c758e402 100644 --- a/api/bin/chainflip-ingress-egress-tracker/src/witnessing/btc.rs +++ b/api/bin/chainflip-ingress-egress-tracker/src/witnessing/btc.rs @@ -54,8 +54,7 @@ where } } }) - .shared(scope) - .chunk_by_vault(vaults) + .chunk_by_vault(vaults, scope) .deposit_addresses(scope, state_chain_stream.clone(), state_chain_client.clone()) .await .btc_deposits(witness_call.clone()) diff --git a/api/bin/chainflip-ingress-egress-tracker/src/witnessing/dot.rs b/api/bin/chainflip-ingress-egress-tracker/src/witnessing/dot.rs index de269f6aa5..c3eb841a08 100644 --- a/api/bin/chainflip-ingress-egress-tracker/src/witnessing/dot.rs +++ b/api/bin/chainflip-ingress-egress-tracker/src/witnessing/dot.rs @@ -64,8 +64,7 @@ where DotUnfinalisedSource::new(dot_client.clone()) .then(|header| async move { header.data.iter().filter_map(filter_map_events).collect() }) .strictly_monotonic() - .shared(scope) - .chunk_by_vault(vaults.clone()) + .chunk_by_vault(vaults.clone(), scope) .deposit_addresses(scope, state_chain_stream.clone(), state_chain_client.clone()) .await // Deposit witnessing diff --git a/api/bin/chainflip-ingress-egress-tracker/src/witnessing/eth.rs b/api/bin/chainflip-ingress-egress-tracker/src/witnessing/eth.rs index f45c28120c..c1d116c9a6 100644 --- a/api/bin/chainflip-ingress-egress-tracker/src/witnessing/eth.rs +++ b/api/bin/chainflip-ingress-egress-tracker/src/witnessing/eth.rs @@ -51,8 +51,7 @@ where let vaults = epoch_source.vaults().await; let eth_source = EthSource::new(eth_client.clone()) .strictly_monotonic() - .shared(scope) - .chunk_by_vault(vaults); + .chunk_by_vault(vaults, scope); let eth_source_deposit_addresses = eth_source .clone() diff --git a/engine/src/witness/btc.rs b/engine/src/witness/btc.rs index 4bdbc5a5f2..c7e03d7f02 100644 --- a/engine/src/witness/btc.rs +++ b/engine/src/witness/btc.rs @@ -104,8 +104,7 @@ where btc_source .clone() - .shared(scope) - .chunk_by_time(epoch_source.clone()) + .chunk_by_time(epoch_source.clone(), scope) .chain_tracking(state_chain_client.clone(), btc_client.clone()) .logging("chain tracking") .spawn(scope); @@ -129,7 +128,7 @@ where // Pre-witnessing stream. strictly_monotonic_source .clone() - .chunk_by_vault(vaults.clone()) + .chunk_by_vault(vaults.clone(), scope) .deposit_addresses(scope, unfinalised_state_chain_stream, state_chain_client.clone()) .await .btc_deposits(prewitness_call) @@ -140,8 +139,7 @@ where strictly_monotonic_source .lag_safety(SAFETY_MARGIN) .logging("safe block produced") - .shared(scope) - .chunk_by_vault(vaults) + .chunk_by_vault(vaults, scope) .deposit_addresses(scope, state_chain_stream.clone(), state_chain_client.clone()) .await .btc_deposits(process_call.clone()) diff --git a/engine/src/witness/common/chain_source/extension.rs b/engine/src/witness/common/chain_source/extension.rs index 7214e2d623..cb18ce8214 100644 --- a/engine/src/witness/common/chain_source/extension.rs +++ b/engine/src/witness/common/chain_source/extension.rs @@ -84,34 +84,45 @@ pub trait ChainSourceExt: ChainSource { /// Chunk the chain source by time (in blocks). Some consumers do not care about the exact /// external chain block number they start and end but we only want to run it for the epoch /// duration (as measured approximately by the State Chain blocks we consume). - fn chunk_by_time>>( + fn chunk_by_time<'env, Epochs: Into>>( self, epochs: Epochs, - ) -> ChunkedByTimeBuilder> + scope: &Scope<'env, anyhow::Error>, + ) -> ChunkedByTimeBuilder>> where - Self: ExternalChainSource + Sized, + Self: ExternalChainSource + Sized + 'env, + Self::Client: Clone, + Self::Data: Clone, { - ChunkedByTimeBuilder::new(ChunkByTime::new(self), epochs.into()) + // Note the use of the shared adapter which ensures that chunked adapter uses + // the same underlying stream and client for each epoch: + ChunkedByTimeBuilder::new(ChunkByTime::new(self.shared(scope)), epochs.into()) } /// Chunk the chain source by vault. We specifically want to chunk the chain source from the /// block the epoch starts at for a particular chain. This ensures we don't miss witnesses, and /// allows us to only run for those epochs we are interested in. fn chunk_by_vault< + 'env, ExtraInfo, ExtraHistoricInfo, Vaults: Into>, >( self, vaults: Vaults, - ) -> ChunkedByVaultBuilder> + scope: &Scope<'env, anyhow::Error>, + ) -> ChunkedByVaultBuilder, ExtraInfo, ExtraHistoricInfo>> where - Self: ExternalChainSource + Sized, + Self: ExternalChainSource + Sized + 'env, + Self::Client: Clone, + Self::Data: Clone, state_chain_runtime::Runtime: RuntimeHasChain, ExtraInfo: Clone + Send + Sync + 'static, ExtraHistoricInfo: Clone + Send + Sync + 'static, { - ChunkedByVaultBuilder::new(ChunkByVault::new(self), vaults.into()) + // Note the use of the shared adapter which ensures that chunked adapter uses + // the same underlying stream and client for each epoch: + ChunkedByVaultBuilder::new(ChunkByVault::new(self.shared(scope)), vaults.into()) } } impl ChainSourceExt for T {} diff --git a/engine/src/witness/dot.rs b/engine/src/witness/dot.rs index 868e02a44b..b15d4bf88a 100644 --- a/engine/src/witness/dot.rs +++ b/engine/src/witness/dot.rs @@ -202,7 +202,7 @@ where unfinalised_source .clone() - .chunk_by_time(epoch_source.clone()) + .chunk_by_time(epoch_source.clone(), scope) .chain_tracking(state_chain_client.clone(), dot_client.clone()) .logging("chain tracking") .spawn(scope); @@ -226,8 +226,7 @@ where // Pre-witnessing unfinalised_source .strictly_monotonic() - .shared(scope) - .chunk_by_vault(vaults.clone()) + .chunk_by_vault(vaults.clone(), scope) .deposit_addresses(scope, unfinalized_state_chain_stream, state_chain_client.clone()) .await .dot_deposits(prewitness_call) @@ -241,8 +240,7 @@ where .then(|header| async move { header.data.iter().filter_map(filter_map_events).collect::>() }) - .shared(scope) - .chunk_by_vault(vaults) + .chunk_by_vault(vaults, scope) .deposit_addresses(scope, state_chain_stream.clone(), state_chain_client.clone()) .await // Deposit witnessing diff --git a/engine/src/witness/eth.rs b/engine/src/witness/eth.rs index e5763733ab..94a89944de 100644 --- a/engine/src/witness/eth.rs +++ b/engine/src/witness/eth.rs @@ -119,8 +119,7 @@ where eth_source .clone() - .shared(scope) - .chunk_by_time(epoch_source.clone()) + .chunk_by_time(epoch_source.clone(), scope) .chain_tracking(state_chain_client.clone(), eth_client.clone()) .logging("chain tracking") .spawn(scope); @@ -128,11 +127,8 @@ where let vaults = epoch_source.vaults().await; // ===== Prewitnessing stream ===== - let prewitness_source = eth_source - .clone() - .strictly_monotonic() - .shared(scope) - .chunk_by_vault(vaults.clone()); + let prewitness_source = + eth_source.clone().strictly_monotonic().chunk_by_vault(vaults.clone(), scope); let prewitness_source_deposit_addresses = prewitness_source .clone() @@ -194,8 +190,7 @@ where .strictly_monotonic() .lag_safety(SAFETY_MARGIN) .logging("safe block produced") - .shared(scope) - .chunk_by_vault(vaults); + .chunk_by_vault(vaults, scope); let eth_safe_vault_source_deposit_addresses = eth_safe_vault_source .clone() diff --git a/engine/src/witness/eth/key_manager.rs b/engine/src/witness/eth/key_manager.rs index 5f9877754b..7a5bb601b6 100644 --- a/engine/src/witness/eth/key_manager.rs +++ b/engine/src/witness/eth/key_manager.rs @@ -235,7 +235,7 @@ mod tests { .await; EthSource::new(retry_client.clone()) - .chunk_by_vault(vault_source) + .chunk_by_vault(vault_source, scope) .key_manager_witnessing( |call, _| async move { println!("Witnessed call: {:?}", call);