Skip to content

Commit

Permalink
Feat: Private Broker Channel Witnessing (#5383)
Browse files Browse the repository at this point in the history
* feat: private broker channel witnessing

* fix: address review
  • Loading branch information
msgmaxim authored Nov 6, 2024
1 parent 403cfff commit ed9ecd0
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ where
.chunk_by_vault(vaults, scope)
.deposit_addresses(scope, state_chain_stream.clone(), state_chain_client.clone())
.await
.private_deposit_channels(scope, state_chain_stream.clone(), state_chain_client.clone())
.await
.btc_deposits(witness_call.clone())
.egress_items(scope, state_chain_stream, state_chain_client)
.await
Expand Down
12 changes: 10 additions & 2 deletions engine/src/witness/btc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub async fn start<
prewitness_call: PrewitnessCall,
state_chain_client: Arc<StateChainClient>,
state_chain_stream: StateChainStream,
unfinalised_state_chain_stream: impl StreamApi<UNFINALIZED>,
unfinalised_state_chain_stream: impl StreamApi<UNFINALIZED> + Clone,
epoch_source: EpochSourceBuilder<'_, '_, StateChainClient, (), ()>,
db: Arc<PersistentKeyDB>,
) -> Result<()>
Expand Down Expand Up @@ -126,7 +126,13 @@ where
block_source
.clone()
.chunk_by_vault(vaults.clone(), scope)
.deposit_addresses(scope, unfinalised_state_chain_stream, state_chain_client.clone())
.deposit_addresses(
scope,
unfinalised_state_chain_stream.clone(),
state_chain_client.clone(),
)
.await
.private_deposit_channels(scope, unfinalised_state_chain_stream, state_chain_client.clone())
.await
.btc_deposits(prewitness_call)
.logging("pre-witnessing")
Expand Down Expand Up @@ -164,6 +170,8 @@ where
.chunk_by_vault(vaults, scope)
.deposit_addresses(scope, state_chain_stream.clone(), state_chain_client.clone())
.await
.private_deposit_channels(scope, state_chain_stream.clone(), state_chain_client.clone())
.await
.btc_deposits(process_call.clone())
.egress_items(scope, state_chain_stream, state_chain_client.clone())
.await
Expand Down
34 changes: 22 additions & 12 deletions engine/src/witness/btc/deposits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use pallet_cf_ingress_egress::{DepositChannelDetails, DepositWitness};
use state_chain_runtime::BitcoinInstance;

use super::super::common::chunked_chain_source::chunked_by_vault::{
builder::ChunkedByVaultBuilder, ChunkedByVault,
builder::ChunkedByVaultBuilder, private_deposit_channels::BrokerPrivateChannels, ChunkedByVault,
};
use crate::{
btc::rpc::VerboseTransaction,
Expand Down Expand Up @@ -39,7 +39,7 @@ impl<Inner: ChunkedByVault> ChunkedByVaultBuilder<Inner> {
Inner: ChunkedByVault<
Index = u64,
Hash = BlockHash,
Data = (((), Vec<VerboseTransaction>), Addresses<Inner>),
Data = ((((), Vec<VerboseTransaction>), Addresses<Inner>), BrokerPrivateChannels),
Chain = Bitcoin,
>,
ProcessCall: Fn(state_chain_runtime::RuntimeCall, EpochIndex) -> ProcessingFut
Expand All @@ -55,25 +55,35 @@ impl<Inner: ChunkedByVault> ChunkedByVaultBuilder<Inner> {
self.then(move |epoch, header| {
let process_call = process_call.clone();
async move {
// TODO: Make addresses a Map of some kind?
let ((((), txs), deposit_channels), private_channels) = header.data;

let vault_addresses = {
use cf_chains::btc::{
deposit_address::DepositAddress, AggKey, CHANGE_ADDRESS_SALT,
};

let key: &AggKey = &epoch.info.0;

let maybe_previous_vault_address =
key.previous.map(|key| DepositAddress::new(key, CHANGE_ADDRESS_SALT));
let current_vault_address =
DepositAddress::new(key.current, CHANGE_ADDRESS_SALT);

[current_vault_address].into_iter().chain(maybe_previous_vault_address)
// Take all current private broker chanenls and use them to build a list of all
// deposit addresses that we should check for vault swaps. Note that we
// monitor previous epoch key (if exists) in addition to the current one, which
// means we get up to two deposit addresses per broker. A special case is the
// "change address" that doesn't have a broker associated with it.
[key.current].into_iter().chain(key.previous).flat_map(|key| {
[(None, CHANGE_ADDRESS_SALT)]
.into_iter()
.chain(private_channels.clone().into_iter().map(
|(broker_id, channel_id)| (Some(broker_id), channel_id as u32),
))
.map(move |(maybe_broker_id, channel_id)| {
(maybe_broker_id, DepositAddress::new(key, channel_id))
})
})
};

// TODO: Make addresses a Map of some kind?
let (((), txs), deposit_channels) = header.data;

for vault_address in vault_addresses {
// TODO: provide broker id (along with broker fees) in the call
for (_maybe_broker_id, vault_address) in vault_addresses {
for tx in &txs {
if let Some(call) =
super::vault_swaps::try_extract_vault_swap_call(tx, &vault_address)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod continuous;
pub mod deposit_addresses;
pub mod egress_items;
pub mod monitored_items;
pub mod private_deposit_channels;

use cf_chains::{Chain, ChainCrypto};
use futures_util::StreamExt;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use super::{builder::ChunkedByVaultBuilder, monitored_items::MonitoredSCItems, ChunkedByVault};
use cf_chains::Chain;
use cf_primitives::{AccountId, ChannelId};
use cf_utilities::task_scope::Scope;
use std::sync::Arc;

pub type BrokerPrivateChannels = Vec<(AccountId, ChannelId)>;

use crate::{
state_chain_observer::client::{
storage_api::StorageApi, stream_api::StreamApi, STATE_CHAIN_CONNECTION,
},
witness::common::RuntimeHasChain,
};

impl<Inner: ChunkedByVault> ChunkedByVaultBuilder<Inner> {
pub async fn private_deposit_channels<
'env,
StateChainStream,
StateChainClient,
const IS_FINALIZED: bool,
>(
self,
scope: &Scope<'env, anyhow::Error>,
state_chain_stream: StateChainStream,
state_chain_client: Arc<StateChainClient>,
) -> ChunkedByVaultBuilder<
MonitoredSCItems<
Inner,
BrokerPrivateChannels,
impl Fn(
<Inner::Chain as Chain>::ChainBlockNumber,
&BrokerPrivateChannels,
) -> BrokerPrivateChannels
+ Send
+ Sync
+ Clone
+ 'static,
>,
>
where
state_chain_runtime::Runtime: RuntimeHasChain<Inner::Chain>,
StateChainStream: StreamApi<IS_FINALIZED>,
StateChainClient: StorageApi + Send + Sync + 'static,
{
let state_chain_client_c = state_chain_client.clone();

ChunkedByVaultBuilder::new(
MonitoredSCItems::new(
self.source,
scope,
state_chain_stream,
state_chain_client,
move |block_hash| {
let state_chain_client = state_chain_client_c.clone();
async move {
state_chain_client
.storage_map::<pallet_cf_swapping::BrokerPrivateBtcChannels<
state_chain_runtime::Runtime,
>, Vec<_>>(block_hash)
.await
.expect(STATE_CHAIN_CONNECTION)
}
},
// Private channels are not reusable (at least at the moment), so we
// don't need to check for their expiration:
|index, addresses: &BrokerPrivateChannels| {
assert!(<Inner::Chain as Chain>::is_block_witness_root(index));
addresses.clone()
},
)
.await,
self.parameters,
)
}
}
2 changes: 1 addition & 1 deletion state-chain/pallets/cf-swapping/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ pub mod pallet {
StorageMap<_, Twox64Concat, Asset, AssetAmount, ValueQuery>;

#[pallet::storage]
pub(crate) type BrokerPrivateBtcChannels<T: Config> =
pub type BrokerPrivateBtcChannels<T: Config> =
StorageMap<_, Identity, T::AccountId, ChannelId, OptionQuery>;

#[pallet::event]
Expand Down

0 comments on commit ed9ecd0

Please sign in to comment.