Skip to content

Commit

Permalink
refactor: shared source inside chunked_by adapters (#4232)
Browse files Browse the repository at this point in the history
* refactor: shared source inside chunked_by adapters

* chore: address review
  • Loading branch information
msgmaxim authored Nov 16, 2023
1 parent d96fada commit aa50279
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ where
}
}
})
.shared(scope)
.chunk_by_vault(vaults)
.chunk_by_vault(vaults, scope)
.deposit_addresses(scope, state_chain_stream.clone(), state_chain_client.clone())
.await
.btc_deposits(witness_call.clone())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ where
DotUnfinalisedSource::new(dot_client.clone())
.then(|header| async move { header.data.iter().filter_map(filter_map_events).collect() })
.strictly_monotonic()
.shared(scope)
.chunk_by_vault(vaults.clone())
.chunk_by_vault(vaults.clone(), scope)
.deposit_addresses(scope, state_chain_stream.clone(), state_chain_client.clone())
.await
// Deposit witnessing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ where
let vaults = epoch_source.vaults().await;
let eth_source = EthSource::new(eth_client.clone())
.strictly_monotonic()
.shared(scope)
.chunk_by_vault(vaults);
.chunk_by_vault(vaults, scope);

let eth_source_deposit_addresses = eth_source
.clone()
Expand Down
8 changes: 3 additions & 5 deletions engine/src/witness/btc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ where

btc_source
.clone()
.shared(scope)
.chunk_by_time(epoch_source.clone())
.chunk_by_time(epoch_source.clone(), scope)
.chain_tracking(state_chain_client.clone(), btc_client.clone())
.logging("chain tracking")
.spawn(scope);
Expand All @@ -129,7 +128,7 @@ where
// Pre-witnessing stream.
strictly_monotonic_source
.clone()
.chunk_by_vault(vaults.clone())
.chunk_by_vault(vaults.clone(), scope)
.deposit_addresses(scope, unfinalised_state_chain_stream, state_chain_client.clone())
.await
.btc_deposits(prewitness_call)
Expand All @@ -140,8 +139,7 @@ where
strictly_monotonic_source
.lag_safety(SAFETY_MARGIN)
.logging("safe block produced")
.shared(scope)
.chunk_by_vault(vaults)
.chunk_by_vault(vaults, scope)
.deposit_addresses(scope, state_chain_stream.clone(), state_chain_client.clone())
.await
.btc_deposits(process_call.clone())
Expand Down
25 changes: 18 additions & 7 deletions engine/src/witness/common/chain_source/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,34 +84,45 @@ pub trait ChainSourceExt: ChainSource {
/// Chunk the chain source by time (in blocks). Some consumers do not care about the exact
/// external chain block number they start and end but we only want to run it for the epoch
/// duration (as measured approximately by the State Chain blocks we consume).
fn chunk_by_time<Epochs: Into<EpochSource<(), ()>>>(
fn chunk_by_time<'env, Epochs: Into<EpochSource<(), ()>>>(
self,
epochs: Epochs,
) -> ChunkedByTimeBuilder<ChunkByTime<Self>>
scope: &Scope<'env, anyhow::Error>,
) -> ChunkedByTimeBuilder<ChunkByTime<SharedSource<Self>>>
where
Self: ExternalChainSource + Sized,
Self: ExternalChainSource + Sized + 'env,
Self::Client: Clone,
Self::Data: Clone,
{
ChunkedByTimeBuilder::new(ChunkByTime::new(self), epochs.into())
// Note the use of the shared adapter which ensures that chunked adapter uses
// the same underlying stream and client for each epoch:
ChunkedByTimeBuilder::new(ChunkByTime::new(self.shared(scope)), epochs.into())
}

/// Chunk the chain source by vault. We specifically want to chunk the chain source from the
/// block the epoch starts at for a particular chain. This ensures we don't miss witnesses, and
/// allows us to only run for those epochs we are interested in.
fn chunk_by_vault<
'env,
ExtraInfo,
ExtraHistoricInfo,
Vaults: Into<VaultSource<Self::Chain, ExtraInfo, ExtraHistoricInfo>>,
>(
self,
vaults: Vaults,
) -> ChunkedByVaultBuilder<ChunkByVault<Self, ExtraInfo, ExtraHistoricInfo>>
scope: &Scope<'env, anyhow::Error>,
) -> ChunkedByVaultBuilder<ChunkByVault<SharedSource<Self>, ExtraInfo, ExtraHistoricInfo>>
where
Self: ExternalChainSource + Sized,
Self: ExternalChainSource + Sized + 'env,
Self::Client: Clone,
Self::Data: Clone,
state_chain_runtime::Runtime: RuntimeHasChain<Self::Chain>,
ExtraInfo: Clone + Send + Sync + 'static,
ExtraHistoricInfo: Clone + Send + Sync + 'static,
{
ChunkedByVaultBuilder::new(ChunkByVault::new(self), vaults.into())
// Note the use of the shared adapter which ensures that chunked adapter uses
// the same underlying stream and client for each epoch:
ChunkedByVaultBuilder::new(ChunkByVault::new(self.shared(scope)), vaults.into())
}
}
impl<T: ChainSource> ChainSourceExt for T {}
8 changes: 3 additions & 5 deletions engine/src/witness/dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ where

unfinalised_source
.clone()
.chunk_by_time(epoch_source.clone())
.chunk_by_time(epoch_source.clone(), scope)
.chain_tracking(state_chain_client.clone(), dot_client.clone())
.logging("chain tracking")
.spawn(scope);
Expand All @@ -226,8 +226,7 @@ where
// Pre-witnessing
unfinalised_source
.strictly_monotonic()
.shared(scope)
.chunk_by_vault(vaults.clone())
.chunk_by_vault(vaults.clone(), scope)
.deposit_addresses(scope, unfinalized_state_chain_stream, state_chain_client.clone())
.await
.dot_deposits(prewitness_call)
Expand All @@ -241,8 +240,7 @@ where
.then(|header| async move {
header.data.iter().filter_map(filter_map_events).collect::<Vec<_>>()
})
.shared(scope)
.chunk_by_vault(vaults)
.chunk_by_vault(vaults, scope)
.deposit_addresses(scope, state_chain_stream.clone(), state_chain_client.clone())
.await
// Deposit witnessing
Expand Down
13 changes: 4 additions & 9 deletions engine/src/witness/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,20 +119,16 @@ where

eth_source
.clone()
.shared(scope)
.chunk_by_time(epoch_source.clone())
.chunk_by_time(epoch_source.clone(), scope)
.chain_tracking(state_chain_client.clone(), eth_client.clone())
.logging("chain tracking")
.spawn(scope);

let vaults = epoch_source.vaults().await;

// ===== Prewitnessing stream =====
let prewitness_source = eth_source
.clone()
.strictly_monotonic()
.shared(scope)
.chunk_by_vault(vaults.clone());
let prewitness_source =
eth_source.clone().strictly_monotonic().chunk_by_vault(vaults.clone(), scope);

let prewitness_source_deposit_addresses = prewitness_source
.clone()
Expand Down Expand Up @@ -194,8 +190,7 @@ where
.strictly_monotonic()
.lag_safety(SAFETY_MARGIN)
.logging("safe block produced")
.shared(scope)
.chunk_by_vault(vaults);
.chunk_by_vault(vaults, scope);

let eth_safe_vault_source_deposit_addresses = eth_safe_vault_source
.clone()
Expand Down
2 changes: 1 addition & 1 deletion engine/src/witness/eth/key_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ mod tests {
.await;

EthSource::new(retry_client.clone())
.chunk_by_vault(vault_source)
.chunk_by_vault(vault_source, scope)
.key_manager_witnessing(
|call, _| async move {
println!("Witnessed call: {:?}", call);
Expand Down

0 comments on commit aa50279

Please sign in to comment.