Skip to content

Commit

Permalink
feat(client): get Header at height from Client
Browse files Browse the repository at this point in the history
  • Loading branch information
rustaceanrob committed Oct 31, 2024
1 parent b9b19c9 commit 13d6fab
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 7 deletions.
2 changes: 2 additions & 0 deletions example/signet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ async fn main() {
tracing::info!("Synced chain up to block {}", update.tip.height);
tracing::info!("Chain tip: {}", update.tip.hash);
let recent = update.recent_history;
let header = client.get_header(update.tip.height).await.unwrap().unwrap();
assert_eq!(header.block_hash(), update.tip.hash);
tracing::info!("Recent history:");
for (height, hash) in recent {
tracing::info!("Height: {}", height);
Expand Down
30 changes: 28 additions & 2 deletions src/chain/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,38 @@ impl<H: HeaderStore> Chain<H> {
}

// This header chain contains a block hash in memory
pub(crate) fn header_at_height(&self, height: u32) -> Option<&Header> {
pub(crate) fn cached_header_at_height(&self, height: u32) -> Option<&Header> {
self.header_chain.header_at_height(height)
}

// Fetch a header from the cache or disk.
pub(crate) async fn fetch_header(
&mut self,
height: u32,
) -> Result<Option<Header>, HeaderPersistenceError<H::Error>> {
match self.header_chain.header_at_height(height) {
Some(header) => Ok(Some(header.clone())),
None => {
let mut db = self.db.lock().await;
let header_opt = db.header_at(height).await;
if let Err(_) = header_opt {
self.dialog
.send_warning(Warning::FailedPersistance {
warning: format!(
"Unexpected error fetching a header from the header store at height {height}"
),
})
.await;
}
header_opt.map_err(|e| HeaderPersistenceError::Database(e))
}
}
}

// The hash at the given height, potentially checking on disk
pub(crate) async fn blockhash_at_height(&self, height: u32) -> Option<BlockHash> {
match self
.header_at_height(height)
.cached_header_at_height(height)
.map(|header| header.block_hash())
{
Some(hash) => Some(hash),
Expand Down Expand Up @@ -852,6 +876,7 @@ mod tests {
vec![block_8, block_9, new_block_10, block_11],
chain.header_chain.values()
);
assert_eq!(chain.fetch_header(10).await.unwrap().unwrap(), new_block_10);
// A new peer sending us these headers should not do anything
let dup_sync = chain.sync_chain(batch_4).await;
assert_eq!(11, chain.height());
Expand Down Expand Up @@ -922,6 +947,7 @@ mod tests {
let chain_sync = chain.sync_chain(batch_4).await;
assert!(chain_sync.is_ok());
assert_eq!(chain.height(), 3);
assert_eq!(chain.fetch_header(3).await.unwrap().unwrap(), block_3);
assert_eq!(
chain.header_chain.values(),
vec![new_block_1, new_block_2, block_3]
Expand Down
29 changes: 27 additions & 2 deletions src/core/client.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use bitcoin::block::Header;
#[cfg(feature = "silent-payments")]
use bitcoin::BlockHash;
use bitcoin::ScriptBuf;
Expand All @@ -9,8 +10,8 @@ use tokio::sync::mpsc::Sender;
use crate::{IndexedBlock, TrustedPeer, TxBroadcast};

use super::{
error::ClientError,
messages::{ClientMessage, NodeMessage, SyncUpdate},
error::{ClientError, FetchHeaderError},
messages::{ClientMessage, HeaderRequest, NodeMessage, SyncUpdate},
};

/// A [`Client`] allows for communication with a running node.
Expand Down Expand Up @@ -189,6 +190,30 @@ macro_rules! impl_core_client {
.map_err(|_| ClientError::SendError)
}

/// Get a header at the specified height, if it exists.
///
/// # Note
///
/// The height of the chain is the canonical index of the header in the chain.
/// For example, the genesis block is at a height of zero.
///
/// # Errors
///
/// If the node has stopped running.
pub async fn get_header(
&self,
height: u32,
) -> Result<Option<Header>, FetchHeaderError> {
let (tx, rx) =
tokio::sync::oneshot::channel::<Result<Option<Header>, FetchHeaderError>>();
let message = HeaderRequest::new(tx, height);
self.ntx
.send(ClientMessage::GetHeader(message))
.await
.map_err(|_| FetchHeaderError::SendError)?;
rx.await.map_err(|_| FetchHeaderError::RecvError)?
}

/// Starting at the configured anchor checkpoint, look for block inclusions with newly added scripts.
///
/// # Errors
Expand Down
37 changes: 37 additions & 0 deletions src/core/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,40 @@ impl core::fmt::Display for ClientError {
}

impl_sourceless_error!(ClientError);

/// Errors occuring when the client is fetching headers from the node.
#[derive(Debug)]
pub enum FetchHeaderError {
/// The channel to the node was likely closed and dropped from memory.
/// This implies the node is not running.
SendError,
/// The database operation failed while attempting to find the header.
DatabaseOptFailed {
/// The message from the backend describing the failure.
error: String,
},
/// The channel to the client was likely closed by the node and dropped from memory.
RecvError,
}

impl core::fmt::Display for FetchHeaderError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FetchHeaderError::SendError => {
write!(f, "the receiver of this message was dropped from memory.")
}
FetchHeaderError::DatabaseOptFailed { error } => {
write!(
f,
"the database operation failed while attempting to find the header: {error}"
)
}
FetchHeaderError::RecvError => write!(
f,
"the channel to the client was likely closed by the node and dropped from memory."
),
}
}
}

impl_sourceless_error!(FetchHeaderError);
28 changes: 26 additions & 2 deletions src/core/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
TrustedPeer, TxBroadcast,
};

use super::node::NodeState;
use super::{error::FetchHeaderError, node::NodeState};

/// Messages receivable by a running node.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -94,7 +94,7 @@ impl FailurePayload {
}

/// Commands to issue a node.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub(crate) enum ClientMessage {
/// Stop the node.
Shutdown,
Expand All @@ -115,6 +115,22 @@ pub(crate) enum ClientMessage {
SetDuration(Duration),
/// Add another known peer to connect to.
AddPeer(TrustedPeer),
/// Request a header from a specified height.
GetHeader(HeaderRequest),
}

type HeaderSender = tokio::sync::oneshot::Sender<Result<Option<Header>, FetchHeaderError>>;

#[derive(Debug)]
pub(crate) struct HeaderRequest {
pub(crate) oneshot: HeaderSender,
pub(crate) height: u32,
}

impl HeaderRequest {
pub(crate) fn new(oneshot: HeaderSender, height: u32) -> Self {
Self { oneshot, height }
}
}

/// Warnings a node may issue while running.
Expand Down Expand Up @@ -153,6 +169,8 @@ pub enum Warning {
/// Additional context as to why block syncing failed.
warning: String,
},
/// A channel that was supposed to receive a message was dropped.
ChannelDropped,
}

impl core::fmt::Display for Warning {
Expand Down Expand Up @@ -198,6 +216,12 @@ impl core::fmt::Display for Warning {
"A peer sent us a peer-to-peer message the node did not request."
)
}
Warning::ChannelDropped => {
write!(
f,
"A channel that was supposed to receive a message was dropped."
)
}
}
}
}
9 changes: 8 additions & 1 deletion src/core/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
checkpoints::{HeaderCheckpoint, HeaderCheckpoints},
error::HeaderSyncError,
},
core::peer_map::PeerMap,
core::{error::FetchHeaderError, peer_map::PeerMap},
db::traits::{HeaderStore, PeerStore},
filters::{cfheader_chain::AppendAttempt, error::CFilterSyncError},
ConnectionType, FailurePayload, TrustedPeer, TxBroadcastPolicy,
Expand Down Expand Up @@ -316,6 +316,13 @@ impl<H: HeaderStore, P: PeerStore> Node<H, P> {
ClientMessage::AddPeer(peer) => {
let mut peer_map = self.peer_map.lock().await;
peer_map.add_trusted_peer(peer);
},
ClientMessage::GetHeader(request) => {
let mut chain = self.chain.lock().await;
let header_opt = chain.fetch_header(request.height).await;
if let Err(_) = request.oneshot.send(header_opt.map_err(|e| FetchHeaderError::DatabaseOptFailed { error: e.to_string() })) {
self.dialog.send_warning(Warning::ChannelDropped).await
};
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions tests/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ async fn test_mine_after_reorg() {
// Reorganize the blocks
let old_best = best;
let old_height = rpc.get_block_count().unwrap();
let fetched_header = client.get_header(10).await.unwrap().unwrap();
assert_eq!(old_best, fetched_header.block_hash());
invalidate_block(&rpc, &best).await;
mine_blocks(&rpc, &miner, 2, 1).await;
let best = rpc.get_best_block_hash().unwrap();
Expand Down

0 comments on commit 13d6fab

Please sign in to comment.