Skip to content

Commit

Permalink
feat: use stream combinators
Browse files Browse the repository at this point in the history
  • Loading branch information
dandanlen committed Dec 6, 2024
1 parent e874ff3 commit 4c0b271
Showing 1 changed file with 40 additions and 72 deletions.
112 changes: 40 additions & 72 deletions state-chain/custom-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ use state_chain_runtime::{
Block, Hash, NetworkFee, SolanaInstance,
};
use std::{
collections::{BTreeMap, HashMap, VecDeque},
collections::{BTreeMap, HashMap},
marker::PhantomData,
sync::Arc,
};
Expand Down Expand Up @@ -1984,76 +1984,43 @@ where
};

// construct either best or finalized blocks stream from the chain head subscription
let blocks_stream = Box::pin(stream::unfold(
(VecDeque::<Hash>::new(), subscription),
move |(mut st_items_to_stream, mut st_subscription)| async move {
// before checking for events, stream any remaining items from the previous run
// This can happen since finalization can produce an array of finalized_block_hashes
// unlike best block production which always produces only 1 best_block_hash
if let Some(hash) = st_items_to_stream.pop_front() {
Some((hash, (st_items_to_stream, st_subscription)))
} else {
while let Some(result) = st_subscription.next().await {
match result {
Ok((event, _subs_id)) => {
if only_finalized {
// when only_finalized is set, handle both initialized and
// finalized events
match event {
FollowEvent::Initialized(mut ev_initialized) => {
// Initialized event guarantees that
// finalized_block_hashes is of at least one
// element, the last element is the current
// final block.
return Some((
ev_initialized.finalized_block_hashes.pop()
.expect("received FollowEvent::Initialized event with less than 1 hashes"),
(
VecDeque::with_capacity(0),
st_subscription,
)
))
},
FollowEvent::Finalized(ev_finalized) => {
// In case of Finalized event, send the first
// element and send the remaining as part of
// the unfold state
let mut block_hashes =
VecDeque::from(ev_finalized.finalized_block_hashes);
return Some((
block_hashes.pop_front()
.expect("received FollowEvent::Finalized event with less than 1 hashes"),
(
block_hashes,
st_subscription,
),
))
},
_ => continue,
}
} else {
// when !only_finalized, handle only bestBlockChanged events
match event {
// Ignore st_init_block to not send it twice
FollowEvent::BestBlockChanged(ev_best_block) =>
return Some((
ev_best_block.best_block_hash,
(VecDeque::with_capacity(0), st_subscription),
)),
_ => continue,
}
}
},
Err(e) => {
log::warn!("ChainHead subscription error {:?}", e);
return None // Close the stream on subscription error
},
}
}
None // Close the steam, if subscription is closed
}
},
));
let blocks_stream = stream::unfold(subscription, move |mut sub| async move {
match sub.next().await {
Some(Ok((event, _subs_id))) => Some((event, sub)),
Some(Err(e)) => {
log::warn!("ChainHead subscription error {:?}", e);
None
},
_ => None,
}
})
.filter_map(move |event| async move {
match (only_finalized, event) {
(
true,
FollowEvent::Initialized(sc_rpc_spec_v2::chain_head::Initialized {
finalized_block_hashes,
..
}),
) => Some(finalized_block_hashes),
(
true,
FollowEvent::Finalized(sc_rpc_spec_v2::chain_head::Finalized {
finalized_block_hashes,
..
}),
) => Some(finalized_block_hashes),
(
false,
FollowEvent::BestBlockChanged(sc_rpc_spec_v2::chain_head::BestBlockChanged {
best_block_hash,
}),
) => Some(vec![best_block_hash]),
_ => None,
}
})
.map(stream::iter)
.flatten();

let stream = blocks_stream
.filter_map({
Expand Down Expand Up @@ -2099,7 +2066,8 @@ where
}
})
.take_while(|item| futures::future::ready(item.is_ok()))
.map(Result::unwrap);
.map(Result::unwrap)
.boxed();

self.executor.spawn(
"cf-rpc-update-subscription",
Expand Down

0 comments on commit 4c0b271

Please sign in to comment.