Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: feat: bitcoin elections #5457

Draft
wants to merge 31 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c623ac5
feat: start of block witnesser
kylezs Nov 28, 2024
630e31c
feat: instantiate bitcoin voter, and block witnesser clean up (#5461)
kylezs Nov 29, 2024
5c12dab
nearly complete deposit channel witnessing btc
kylezs Nov 29, 2024
27ea2ab
WIP
kylezs Nov 29, 2024
d88316c
WIP
kylezs Dec 5, 2024
bf4ec56
WIP
kylezs Dec 5, 2024
7ffcec4
WIP
kylezs Dec 5, 2024
e1ac718
WIP
kylezs Nov 28, 2024
c90c6d8
WIP: Implement merging of existing headers and new consensus.
MxmUrw Dec 5, 2024
b40e1fb
Fix compilation after rebase.
MxmUrw Dec 5, 2024
6671d97
WIP: Fill in most of the logic.
MxmUrw Dec 5, 2024
f3ae4a5
Integrate block height witnesser in btc electoral system.
MxmUrw Dec 6, 2024
7d3ea93
debugging and reorder some code
kylezs Dec 9, 2024
d174c65
don't run old btc witnessing
kylezs Dec 9, 2024
411a53d
back fill missing headers
kylezs Dec 9, 2024
de44d31
Return range of blocks whose headers were witnessed in `on_finalize`.
MxmUrw Dec 9, 2024
d0aab3a
Add special case for when the next block to witness from is zero to t…
MxmUrw Dec 9, 2024
1baf7d7
chore: force use of constructor for CorruptStorageError
kylezs Dec 10, 2024
8fa7f00
use range instead of block
kylezs Dec 10, 2024
f8d9747
Multiple changes:
MxmUrw Dec 10, 2024
17375b3
Divide logic between enum variants (#5488)
kylezs Dec 11, 2024
58225cd
Witnesser WIP: Update `ChainProgress` enum variant (#5494)
MxmUrw Dec 11, 2024
057eac0
fix: use end()
kylezs Dec 12, 2024
2683778
WIP: Fix block height witnesser bugs (#5498)
MxmUrw Dec 12, 2024
95bc09c
wip compiles
kylezs Dec 18, 2024
870c5bc
WIP actually process deposits
kylezs Dec 18, 2024
6e436ca
refactor: use range of block ranges, and single reorg range (#5501)
kylezs Dec 18, 2024
b5fd6b7
WIP
kylezs Dec 19, 2024
5971673
WIP
kylezs Dec 19, 2024
dcbc040
refactor: rewrite BHW as a state machine (#5515)
MxmUrw Dec 19, 2024
3f90235
fix: initialise bw from first consensus
kylezs Dec 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
419 changes: 237 additions & 182 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ reqwest = { version = "0.11.4" }
rlp = { version = "0.5.2", default-features = false }
rocksdb = { version = "0.21.0" }
serde_bytes = { version = "0.11.14", default-features = false }
serde-big-array = "0.5.1"
serde_path_to_error = "*"
serde_with = { version = "3.11.0", default-features = false }
scale-decode = { version = "0.13" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ where
move |header| {
let btc_client = btc_client.clone();
async move {
let block = btc_client.block(header.hash).await;
let block = btc_client.block(header.hash).await.expect("TODO: Delete this");
(header.data, block.txdata)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,8 @@ where
Swapping(_) |
LiquidityProvider(_) |
LiquidityPools(_) |
SolanaElections(_) => {},
SolanaElections(_) |
BitcoinElections(_) => {},
};

Ok(())
Expand Down
49 changes: 41 additions & 8 deletions engine/src/btc/retry_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl BtcRetryRpcClient {

#[async_trait::async_trait]
pub trait BtcRetryRpcApi: Clone {
async fn block(&self, block_hash: BlockHash) -> VerboseBlock;
async fn block(&self, block_hash: BlockHash) -> anyhow::Result<VerboseBlock>;

async fn block_hash(&self, block_number: cf_chains::btc::BlockNumber) -> BlockHash;

Expand All @@ -61,19 +61,25 @@ pub trait BtcRetryRpcApi: Clone {

async fn average_block_fee_rate(&self, block_hash: BlockHash) -> cf_chains::btc::BtcAmount;

async fn best_block_header(&self) -> BlockHeader;
async fn best_block_header(&self) -> anyhow::Result<BlockHeader>;

async fn block_header(
&self,
block_number: cf_chains::btc::BlockNumber,
) -> anyhow::Result<BlockHeader>;
}

#[async_trait::async_trait]
impl BtcRetryRpcApi for BtcRetryRpcClient {
async fn block(&self, block_hash: BlockHash) -> VerboseBlock {
async fn block(&self, block_hash: BlockHash) -> anyhow::Result<VerboseBlock> {
self.retry_client
.request(
.request_with_limit(
RequestLog::new("block".to_string(), Some(format!("{block_hash}"))),
Box::pin(move |client| {
#[allow(clippy::redundant_async_block)]
Box::pin(async move { client.block(block_hash).await })
}),
2,
)
.await
}
Expand Down Expand Up @@ -134,9 +140,9 @@ impl BtcRetryRpcApi for BtcRetryRpcClient {
.await
}

async fn best_block_header(&self) -> BlockHeader {
async fn best_block_header(&self) -> anyhow::Result<BlockHeader> {
self.retry_client
.request(
.request_with_limit(
RequestLog::new("best_block_header".to_string(), None),
Box::pin(move |client| {
#[allow(clippy::redundant_async_block)]
Expand All @@ -147,6 +153,28 @@ impl BtcRetryRpcApi for BtcRetryRpcClient {
Ok(header)
})
}),
2,
)
.await
}

async fn block_header(
&self,
block_number: cf_chains::btc::BlockNumber,
) -> anyhow::Result<BlockHeader> {
self.retry_client
.request_with_limit(
RequestLog::new("block_header".to_string(), Some(block_number.to_string())),
Box::pin(move |client| {
#[allow(clippy::redundant_async_block)]
Box::pin(async move {
let block_hash = client.block_hash(block_number).await?;
let header = client.block_header(block_hash).await?;
assert_eq!(header.hash, block_hash);
Ok(header)
})
}),
2,
)
.await
}
Expand Down Expand Up @@ -200,7 +228,7 @@ pub mod mocks {

#[async_trait::async_trait]
impl BtcRetryRpcApi for BtcRetryRpcClient {
async fn block(&self, block_hash: BlockHash) -> VerboseBlock;
async fn block(&self, block_hash: BlockHash) -> anyhow::Result<VerboseBlock>;

async fn block_hash(&self, block_number: cf_chains::btc::BlockNumber) -> BlockHash;

Expand All @@ -210,7 +238,12 @@ pub mod mocks {

async fn average_block_fee_rate(&self, block_hash: BlockHash) -> cf_chains::btc::BtcAmount;

async fn best_block_header(&self) -> BlockHeader;
async fn best_block_header(&self) -> anyhow::Result<BlockHeader>;

async fn block_header(
&self,
block_number: cf_chains::btc::BlockNumber,
) -> anyhow::Result<BlockHeader>;
}
}
}
74 changes: 42 additions & 32 deletions engine/src/elections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
},
};
use anyhow::anyhow;
use cf_chains::instances::ChainInstanceAlias;
use cf_primitives::MILLISECONDS_PER_BLOCK;
use cf_utilities::{future_map::FutureMap, task_scope::Scope, UnendingStream};
use futures::{stream, StreamExt, TryStreamExt};
Expand All @@ -31,28 +32,30 @@ const MAXIMUM_SHARED_DATA_CACHE_ITEMS: usize = 1024;
const MAXIMUM_CONCURRENT_VOTER_REQUESTS: u32 = 32;
const INITIAL_VOTER_REQUEST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);

pub type ChainInstance<Chain> = <Chain as ChainInstanceAlias>::Instance;

pub struct Voter<
Instance: 'static,
StateChainClient: ElectoralApi<Instance> + SignedExtrinsicApi + ChainApi,
VoterClient: CompositeVoterApi<<state_chain_runtime::Runtime as pallet_cf_elections::Config<Instance>>::ElectoralSystemRunner> + Send + Sync + 'static,
Chain: cf_chains::Chain + 'static,
StateChainClient: ElectoralApi<Chain, ChainInstance<Chain>> + SignedExtrinsicApi + ChainApi,
VoterClient: CompositeVoterApi<<state_chain_runtime::Runtime as pallet_cf_elections::Config<ChainInstance<Chain>>>::ElectoralSystemRunner> + Send + Sync + 'static,
> where
state_chain_runtime::Runtime:
pallet_cf_elections::Config<Instance>,
pallet_cf_elections::Config<ChainInstance<Chain>>,
{
state_chain_client: Arc<StateChainClient>,
voter: RetrierClient<VoterClient>,
_phantom: core::marker::PhantomData<Instance>,
_phantom: core::marker::PhantomData<Chain>,
}

impl<
Instance: Send + Sync + 'static,
StateChainClient: ElectoralApi<Instance> + SignedExtrinsicApi + ChainApi,
VoterClient: CompositeVoterApi<<state_chain_runtime::Runtime as pallet_cf_elections::Config<Instance>>::ElectoralSystemRunner> + Clone + Send + Sync + 'static,
> Voter<Instance, StateChainClient, VoterClient>
Chain: cf_chains::Chain + 'static,
StateChainClient: ElectoralApi<Chain, ChainInstance<Chain>> + SignedExtrinsicApi + ChainApi,
VoterClient: CompositeVoterApi<<state_chain_runtime::Runtime as pallet_cf_elections::Config<ChainInstance<Chain>>>::ElectoralSystemRunner> + Clone + Send + Sync + 'static,
> Voter<Chain, StateChainClient, VoterClient>
where
state_chain_runtime::Runtime:
pallet_cf_elections::Config<Instance>,
pallet_cf_elections::Call<state_chain_runtime::Runtime, Instance>:
pallet_cf_elections::Config<ChainInstance<Chain>>,
pallet_cf_elections::Call<state_chain_runtime::Runtime, ChainInstance<Chain>>:
std::convert::Into<state_chain_runtime::RuntimeCall>,
{
pub fn new(
Expand All @@ -76,9 +79,9 @@ where

pub async fn continuously_vote(self) {
loop {
info!("Beginning voting");
info!("{}: Beginning voting", Chain::NAME);
if let Err(error) = self.reset_and_continuously_vote().await {
error!("Voting reset due to error: '{}'", error);
error!("{}: Voting reset due to error: '{error}'", Chain::NAME);
}
}
}
Expand All @@ -87,41 +90,46 @@ where
let mut rng = rand::rngs::OsRng;
let latest_unfinalized_block = self.state_chain_client.latest_unfinalized_block();
if let Some(_electoral_data) = self.state_chain_client.electoral_data(latest_unfinalized_block).await {
let (_, _, block_header, _) = self.state_chain_client.submit_signed_extrinsic(pallet_cf_elections::Call::<state_chain_runtime::Runtime, Instance>::ignore_my_votes {}).await.until_in_block().await?;
tracing::info!("{}: Got some electoral data", Chain::NAME);
let (_, _, block_header, _) = self.state_chain_client.submit_signed_extrinsic(pallet_cf_elections::Call::<state_chain_runtime::Runtime, ChainInstance<Chain>>::ignore_my_votes {}).await.until_in_block().await?;

if let Some(electoral_data) = self.state_chain_client.electoral_data(block_header.into()).await {
tracing::info!("{}: Got some electoral data 2", Chain::NAME);
stream::iter(electoral_data.current_elections).map(|(election_identifier, election_data)| {
let state_chain_client = &self.state_chain_client;
async move {
if election_data.option_existing_vote.is_some() {
state_chain_client.finalize_signed_extrinsic(pallet_cf_elections::Call::<state_chain_runtime::Runtime, Instance>::delete_vote {
state_chain_client.finalize_signed_extrinsic(pallet_cf_elections::Call::<state_chain_runtime::Runtime, ChainInstance<Chain>>::delete_vote {
election_identifier,
}).await.until_in_block().await?;
}
Ok::<_, anyhow::Error>(())
}
}).buffer_unordered(32).try_collect::<Vec<_>>().await?;

self.state_chain_client.submit_signed_extrinsic(pallet_cf_elections::Call::<state_chain_runtime::Runtime, Instance>::stop_ignoring_my_votes {}).await.until_in_block().await?;
tracing::info!("{}: Submitting signed extrinsic", Chain::NAME);
self.state_chain_client.submit_signed_extrinsic(pallet_cf_elections::Call::<state_chain_runtime::Runtime, ChainInstance<Chain>>::stop_ignoring_my_votes {}).await.until_in_block().await?;
tracing::info!("{}: Submitted signed extrinsic", Chain::NAME);
}
}

let mut unfinalized_block_stream = self.state_chain_client.unfinalized_block_stream().await;
// TEMP: Half block time to hack BTC voting.
const BLOCK_TIME: std::time::Duration =
std::time::Duration::from_millis(MILLISECONDS_PER_BLOCK);
std::time::Duration::from_millis(MILLISECONDS_PER_BLOCK / 2);
let mut submit_interval = tokio::time::interval(BLOCK_TIME);
let mut pending_submissions = BTreeMap::<
CompositeElectionIdentifierOf<<state_chain_runtime::Runtime as pallet_cf_elections::Config<Instance>>::ElectoralSystemRunner>,
CompositeElectionIdentifierOf<<state_chain_runtime::Runtime as pallet_cf_elections::Config<ChainInstance<Chain>>>::ElectoralSystemRunner>,
(
<<<state_chain_runtime::Runtime as pallet_cf_elections::Config<Instance>>::ElectoralSystemRunner as ElectoralSystemRunner>::Vote as VoteStorage>::PartialVote,
<<<state_chain_runtime::Runtime as pallet_cf_elections::Config<Instance>>::ElectoralSystemRunner as ElectoralSystemRunner>::Vote as VoteStorage>::Vote,
<<<state_chain_runtime::Runtime as pallet_cf_elections::Config<ChainInstance<Chain>>>::ElectoralSystemRunner as ElectoralSystemRunner>::Vote as VoteStorage>::PartialVote,
<<<state_chain_runtime::Runtime as pallet_cf_elections::Config<ChainInstance<Chain>>>::ElectoralSystemRunner as ElectoralSystemRunner>::Vote as VoteStorage>::Vote,
)
>::default();
let mut vote_tasks = FutureMap::default();
let mut shared_data_cache = HashMap::<
SharedDataHash,
(
<<<state_chain_runtime::Runtime as pallet_cf_elections::Config<Instance>>::ElectoralSystemRunner as ElectoralSystemRunner>::Vote as VoteStorage>::SharedData,
<<<state_chain_runtime::Runtime as pallet_cf_elections::Config<ChainInstance<Chain>>>::ElectoralSystemRunner as ElectoralSystemRunner>::Vote as VoteStorage>::SharedData,
std::time::Instant,
)
>::default();
Expand Down Expand Up @@ -149,10 +157,10 @@ where
let state_chain_client = &self.state_chain_client;
async move {
for (election_identifier, _) in votes.iter() {
info!("Submitting vote for election: '{:?}'", election_identifier);
info!("{}: Submitting vote for election: '{election_identifier:?}'", Chain::NAME);
}
// TODO: Use block hash you got this vote tasks details from as the based of the mortal of the extrinsic
state_chain_client.submit_signed_extrinsic(pallet_cf_elections::Call::<state_chain_runtime::Runtime, Instance>::vote {
state_chain_client.submit_signed_extrinsic(pallet_cf_elections::Call::<state_chain_runtime::Runtime, ChainInstance<Chain>>::vote {
authority_votes: BTreeMap::from_iter(votes).try_into().unwrap(/*Safe due to chunking*/),
}).await;
}
Expand All @@ -161,9 +169,9 @@ where
let (election_identifier, result_vote) = vote_tasks.next_or_pending() => {
match result_vote {
Ok(vote) => {
info!("Voting task for election: '{:?}' succeeded.", election_identifier);
info!("{}: Voting task for election: '{election_identifier:?}' succeeded.", Chain::NAME);
// Create the partial_vote early so that SharedData can be provided as soon as the vote has been generated, rather than only after it is submitted.
let partial_vote = <<<state_chain_runtime::Runtime as pallet_cf_elections::Config<Instance>>::ElectoralSystemRunner as ElectoralSystemRunner>::Vote as VoteStorage>::vote_into_partial_vote(&vote, |shared_data| {
let partial_vote = <<<state_chain_runtime::Runtime as pallet_cf_elections::Config<ChainInstance<Chain>>>::ElectoralSystemRunner as ElectoralSystemRunner>::Vote as VoteStorage>::vote_into_partial_vote(&vote, |shared_data| {
let shared_data_hash = SharedDataHash::of(&shared_data);
if shared_data_cache.len() > MAXIMUM_SHARED_DATA_CACHE_ITEMS {
for shared_data_hash in shared_data_cache.keys().cloned().take(shared_data_cache.len() - MAXIMUM_SHARED_DATA_CACHE_ITEMS).collect::<Vec<_>>() {
Expand All @@ -177,7 +185,7 @@ where
pending_submissions.insert(election_identifier, (partial_vote, vote));
},
Err(error) => {
warn!("Voting task for election '{:?}' failed with error: '{:?}'.", election_identifier, error);
warn!("{}: Voting task for election '{election_identifier:?}' failed with error: '{error:?}'.", Chain::NAME);
}
}
},
Expand All @@ -190,16 +198,18 @@ where
added_to_cache.elapsed() < LIFETIME_OF_SHARED_DATA_IN_CACHE
});

tracing::info!("{}: Unfinalised next, getting electoral_data", Chain::NAME);
if let Some(electoral_data) = self.state_chain_client.electoral_data(block_info).await {
tracing::info!("{}: Unfinalised next, got some electoral_data: {:?}", Chain::NAME, electoral_data);
if electoral_data.contributing {
for (election_identifier, election_data) in electoral_data.current_elections {
if election_data.is_vote_desired {
if !vote_tasks.contains_key(&election_identifier) {
info!("Voting task for election: '{:?}' initiate.", election_identifier);
info!("{}: Voting task for election: '{election_identifier:?}' initiate.", Chain::NAME);
vote_tasks.insert(
election_identifier,
Box::pin(self.voter.request_with_limit(
RequestLog::new("vote".to_string(), Some(format!("{election_identifier:?}"))), // Add some identifier for `Instance`.
RequestLog::new("vote".to_string(), Some(format!("{}: {election_identifier:?}", Chain::NAME))),
Box::pin(move |client| {
let election_data = election_data.clone();
#[allow(clippy::redundant_async_block)]
Expand All @@ -214,7 +224,7 @@ where
))
);
} else {
info!("Voting task for election: '{:?}' not initiated as a task is already running for that election.", election_identifier);
info!("{}: Voting task for election: '{election_identifier:?}' not initiated as a task is already running for that election.", Chain::NAME);
}
}
}
Expand All @@ -232,7 +242,7 @@ where
let final_probability = 1.0 / (core::cmp::max(1, core::cmp::min(reference_details.count, electoral_data.authority_count)) as f64);

if rng.gen_bool((1.0 - lerp_factor) * initial_probability + lerp_factor * final_probability) {
self.state_chain_client.submit_signed_extrinsic(pallet_cf_elections::Call::<state_chain_runtime::Runtime, Instance>::provide_shared_data {
self.state_chain_client.submit_signed_extrinsic(pallet_cf_elections::Call::<state_chain_runtime::Runtime, ChainInstance<Chain>>::provide_shared_data {
shared_data: shared_data.clone(),
}).await;
}
Expand All @@ -242,10 +252,10 @@ where
} else {
// We expect this to happen when a validator joins the set, since they won't be contributing, but will be a validator.
// Therefore they get Some() from `electoral_data` but `contributing` is false, until we reset the voting by throwing an error here.
return Err(anyhow!("Validator has just joined the authority set, or has been unexpectedly set as not contributing."));
return Err(anyhow!("{}: Validator has just joined the authority set, or has been unexpectedly set as not contributing.", Chain::NAME));
}
} else {
info!("Not voting as not an authority.");
info!("{}: Not voting as not an authority.", Chain::NAME);
}
} else break Ok(()),
}
Expand Down
2 changes: 2 additions & 0 deletions engine/src/elections/voter_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,6 @@ macro_rules! generate_voter_api_tuple_impls {
}
}

generate_voter_api_tuple_impls!(tuple_1_impls: ((A, A0)));
generate_voter_api_tuple_impls!(tuple_2_impls: ((A, A0), (B, B0)));
generate_voter_api_tuple_impls!(tuple_7_impls: ((A, A0), (B, B0), (C, C0), (D, D0), (EE, E0), (FF, F0), (GG, G0)));
Loading
Loading