Skip to content

Commit

Permalink
fix: properly handle finalized events + code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
nmammeri committed Dec 4, 2024
1 parent feb19b5 commit cbfad94
Showing 1 changed file with 122 additions and 103 deletions.
225 changes: 122 additions & 103 deletions state-chain/custom-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use jsonrpsee::{
error::{ErrorObject, ErrorObjectOwned},
ErrorCode,
},
PendingSubscriptionSink, RpcModule, Subscription,
PendingSubscriptionSink, RpcModule,
};
use order_fills::OrderFills;
use pallet_cf_governance::GovCallHash;
Expand Down Expand Up @@ -70,7 +70,7 @@ use state_chain_runtime::{
Block, Hash, NetworkFee, SolanaInstance,
};
use std::{
collections::{BTreeMap, HashMap},
collections::{BTreeMap, HashMap, VecDeque},
marker::PhantomData,
sync::Arc,
};
Expand Down Expand Up @@ -1977,103 +1977,134 @@ where
self.chain_head_api().subscribe_unbounded("chainHead_v1_follow", [false]).await
else {
pending_sink
.reject(CfApiError::from(internal_error("chainHead_v1_follow subscription failed")))
.reject(internal_error("chainHead_v1_follow subscription failed"))
.await;
return;
};

// depending on only_finalized param, construct the blocks stream from the subscription
let seed_state = (only_finalized, subscription);
let blocks_stream = Box::pin(stream::unfold(seed_state, |mut state| async move {
while let Ok(event) = get_next_chain_head_event(&mut state.1).await {
if state.0 {
// only interested in initialized and finalized events
match event {
FollowEvent::Initialized(mut ev_initialized) => {
// Initialized event guarantees that finalized_block_hashes is of
// at least one element, ordered by increasing block number
let hash = ev_initialized.finalized_block_hashes.pop().unwrap();
return Some((hash, state))
},
FollowEvent::Finalized(mut ev_finalized) => {
let hash = ev_finalized.finalized_block_hashes.pop().unwrap();
return Some((hash, state))
},
_ => continue,
}
// construct either best or finalized blocks stream from the chain head subscription
let blocks_stream = Box::pin(stream::unfold(
(VecDeque::<Hash>::new(), subscription, only_finalized),
|(mut st_items_to_stream, mut st_subscription, st_only_finalized)| async move {
// before checking for events, stream any remaining items from the previous run
// This can happen since finalization can produce an array of finalized_block_hashes
// unlike best block production which always produces only 1 best_block_hash
if let Some(hash) = st_items_to_stream.pop_front() {
Some((hash, (st_items_to_stream, st_subscription, st_only_finalized)))
} else {
// only interested in bestBlockChanged events
match event {
FollowEvent::BestBlockChanged(ev_best_block) =>
return Some((ev_best_block.best_block_hash, state)),
_ => continue,
while let Some(result) = st_subscription.next().await {
match result {
Ok((event, _subs_id)) => {
if st_only_finalized {
// when only_finalized is set, handle both initialized and
// finalized events
match event {
FollowEvent::Initialized(mut ev_initialized) => {
// Initialized event guarantees that
// finalized_block_hashes is of at least one
// element, the last element is the current
// final block.
return Some((
ev_initialized.finalized_block_hashes.pop()
.expect("received FollowEvent::Initialized event with less than 1 hashes"),
(
VecDeque::with_capacity(0),
st_subscription,
st_only_finalized,
)
))
},
FollowEvent::Finalized(ev_finalized) => {
// In case of Finalized event, send the first
// element and send the remaining as part of
// the unfold state
let mut block_hashes =
VecDeque::from(ev_finalized.finalized_block_hashes);
return Some((
block_hashes.pop_front()
.expect("received FollowEvent::Finalized event with less than 1 hashes"),
(
block_hashes,
st_subscription,
st_only_finalized,
),
))
},
_ => continue,
}
} else {
// when !only_finalized, handle only bestBlockChanged events
match event {
// Ignore st_init_block to not send it twice
FollowEvent::BestBlockChanged(ev_best_block) =>
return Some((
ev_best_block.best_block_hash,
(
VecDeque::with_capacity(0),
st_subscription,
st_only_finalized,
),
)),
_ => continue,
}
}
},
Err(e) => {
log::warn!("ChainHead subscription error {:?}", e);
return None // Close the stream on subscription error
},
}
}
None // Close the steam, if subscription is closed
}
}
None
}));

// create the initial item and state
let info = self.client.info();
let block_hash = if only_finalized { info.finalized_hash } else { info.best_hash };
let block_number = if only_finalized { info.finalized_number } else { info.best_number };

let (initial_item, initial_state) = match f(&self.client, block_hash, None) {
Ok(initial) => initial,
Err(e) => {
log::warn!(target: "cf-rpc", "Error in subscription initialization: {:?}", e);
pending_sink.reject(e).await;
return;
},
};

let stream = futures::stream::iter(std::iter::once(Ok(BlockUpdate {
block_hash,
block_number,
data: initial_item.clone(),
})))
.chain(blocks_stream.filter_map({
let client = self.client.clone();

let mut previous_item = initial_item;
let mut previous_state = initial_state;

move |hash| {
futures::future::ready(match f(&client, hash, Some(&previous_state)) {
Ok((new_item, new_state)) if !only_on_changes || new_item != previous_item => {
previous_item = new_item.clone();
previous_state = new_state;

if let Ok(Some(header)) = client.header(hash) {
Some(Ok(BlockUpdate {
block_hash: hash,
block_number: *header.number(),
data: new_item,
}))
} else if end_on_error {
Some(Err(internal_error(format!(
"Cannot fetch block header for block {:?}",
hash
))))
} else {
None
}
},
Err(error) => {
log::warn!("Subscription Error: {error}.");
if end_on_error {
log::warn!("Closing Subscription.");
Some(Err(ErrorObjectOwned::from(error)))
} else {
None
}
},
_ => None,
})
}
}))
.take_while(|item| futures::future::ready(item.is_ok()))
.map(Result::unwrap);
));

let stream = blocks_stream
.filter_map({
let client = self.client.clone();

let mut previous_item = None;
let mut previous_state = None;

move |hash| {
futures::future::ready(match f(&client, hash, previous_state.as_ref()) {
Ok((new_item, new_state))
if !only_on_changes || Some(&new_item) != previous_item.as_ref() =>
{
previous_item = Some(new_item.clone());
previous_state = Some(new_state);

if let Ok(Some(header)) = client.header(hash) {
Some(Ok(BlockUpdate {
block_hash: hash,
block_number: *header.number(),
data: new_item,
}))
} else if end_on_error {
Some(Err(internal_error(format!(
"Could not fetch block header for block {:?}",
hash
))))
} else {
None
}
},
Err(error) => {
log::warn!("Subscription Error: {error}.");
if end_on_error {
log::warn!("Closing Subscription.");
Some(Err(ErrorObjectOwned::from(error)))
} else {
None
}
},
_ => None,
})
}
})
.take_while(|item| futures::future::ready(item.is_ok()))
.map(Result::unwrap);

self.executor.spawn(
"cf-rpc-update-subscription",
Expand Down Expand Up @@ -2101,18 +2132,6 @@ where
}
}

async fn get_next_chain_head_event(
sub: &mut Subscription,
) -> Result<FollowEvent<Hash>, CfApiError> {
match sub.next().await {
Some(result) => match result {
Ok((event, _subs_id)) => Ok(event),
Err(e) => Err(CfApiError::from(internal_error(format!("ChainHead event error {e}")))),
},
None => Err(CfApiError::from(internal_error("ChainHead deserialization error"))),
}
}

#[cfg(test)]
mod test {
use std::collections::BTreeSet;
Expand Down

0 comments on commit cbfad94

Please sign in to comment.