Skip to content

Commit

Permalink
feat(katana): forked events (#2594)
Browse files Browse the repository at this point in the history
  • Loading branch information
kariy authored Oct 30, 2024
1 parent 8a9dc0c commit e2d96b4
Show file tree
Hide file tree
Showing 7 changed files with 787 additions and 191 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 69 additions & 1 deletion crates/katana/primitives/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct OrderedEvent {
///
/// There JSON-RPC specification does not specify the format of the continuation token,
/// so how the node should handle it is implementation specific.
#[derive(PartialEq, Eq, Debug, Default)]
#[derive(PartialEq, Eq, Debug, Clone, Default)]
pub struct ContinuationToken {
/// The block number to continue from.
pub block_n: u64,
Expand Down Expand Up @@ -61,8 +61,59 @@ impl fmt::Display for ContinuationToken {
}
}

/// Represents a continuation token that can either be a Katana native [`ContinuationToken`] or a
/// continuation token returned by the forked provider.
///
/// This is only used in the `starknet_getEvents` API.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MaybeForkedContinuationToken {
/// A continuation token returned by the forked provider.
/// Used to tell Katana to continue fetching events from the forked provider.
///
/// It's a string because there is no a guarantee format.
Forked(String),
/// A Katana specific continuation token. Used to tell Katana the next events to fetch is in the
/// local blocks and not in the forked provider.
Token(ContinuationToken),
}

impl MaybeForkedContinuationToken {
/// Parses a continuation token from a string. It can be either a Katana native
/// [`ContinuationToken`] or a forked token. The forked token is identified by the prefix
/// `FK_`.
pub fn parse(value: &str) -> Result<Self, ContinuationTokenError> {
const FORKED_TOKEN_PREFIX: &str = "FK_";
if let Some(token) = value.strip_prefix(FORKED_TOKEN_PREFIX) {
Ok(MaybeForkedContinuationToken::Forked(token.to_string()))
} else {
let token = ContinuationToken::parse(value)?;
Ok(MaybeForkedContinuationToken::Token(token))
}
}

/// Tries to convert the continuation token to a Katana native [`ContinuationToken`]. `None` if
/// the continuation token is a forked token.
pub fn to_token(self) -> Option<ContinuationToken> {
match self {
MaybeForkedContinuationToken::Token(token) => Some(token),
_ => None,
}
}
}

impl std::fmt::Display for MaybeForkedContinuationToken {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
MaybeForkedContinuationToken::Token(token) => write!(f, "{token}"),
MaybeForkedContinuationToken::Forked(token) => write!(f, "FK_{token}"),
}
}
}

#[cfg(test)]
mod test {
use assert_matches::assert_matches;

use super::*;

#[test]
Expand Down Expand Up @@ -115,4 +166,21 @@ mod test {
ContinuationTokenError::ParseFailed(_)
);
}

#[test]
fn parse_forked_token_works() {
let forked_token = "FK_test_token";
let parsed = MaybeForkedContinuationToken::parse(forked_token).unwrap();
assert_matches!(parsed, MaybeForkedContinuationToken::Forked(s) => {
assert_eq!(s, "test_token")
});

let regular_token = "1e,ff,4";
let parsed = MaybeForkedContinuationToken::parse(regular_token).unwrap();
assert_matches!(parsed, MaybeForkedContinuationToken::Token(t) => {
assert_eq!(t.block_n, 30);
assert_eq!(t.txn_n, 255);
assert_eq!(t.event_n, 4);
});
}
}
1 change: 1 addition & 0 deletions crates/katana/rpc/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,6 @@ rand.workspace = true
rstest.workspace = true
serde.workspace = true
serde_json.workspace = true
similar-asserts.workspace = true
tempfile.workspace = true
tokio.workspace = true
108 changes: 97 additions & 11 deletions crates/katana/rpc/rpc/src/starknet/forking.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use katana_primitives::block::{BlockIdOrTag, BlockNumber};
use katana_primitives::block::{BlockHash, BlockIdOrTag, BlockNumber};
use katana_primitives::contract::ContractAddress;
use katana_primitives::transaction::TxHash;
use katana_primitives::Felt;
use katana_rpc_types::block::{
MaybePendingBlockWithReceipts, MaybePendingBlockWithTxHashes, MaybePendingBlockWithTxs,
};
use katana_rpc_types::error::starknet::StarknetApiError;
use katana_rpc_types::event::EventsPage;
use katana_rpc_types::receipt::TxReceiptWithBlockInfo;
use katana_rpc_types::state_update::MaybePendingStateUpdate;
use katana_rpc_types::transaction::Tx;
use starknet::core::types::TransactionStatus;
use starknet::core::types::{EventFilter, TransactionStatus};
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::{JsonRpcClient, Provider, ProviderError};
use url::Url;
Expand All @@ -20,6 +23,12 @@ pub enum Error {

#[error("Block out of range")]
BlockOutOfRange,

#[error("Not allowed to use block tag as a block identifier")]
BlockTagNotAllowed,

#[error("Unexpected pending data")]
UnexpectedPendingData,
}

#[derive(Debug)]
Expand Down Expand Up @@ -50,6 +59,22 @@ impl ForkedClient {
}

impl<P: Provider> ForkedClient<P> {
pub async fn get_block_number_by_hash(&self, hash: BlockHash) -> Result<BlockNumber, Error> {
use starknet::core::types::MaybePendingBlockWithTxHashes as StarknetRsMaybePendingBlockWithTxHashes;

let block = self.provider.get_block_with_tx_hashes(BlockIdOrTag::Hash(hash)).await?;
// Pending block doesn't have a hash yet, so if we get a pending block, we return an error.
let StarknetRsMaybePendingBlockWithTxHashes::Block(block) = block else {
return Err(Error::UnexpectedPendingData);
};

if block.block_number > self.block {
Err(Error::BlockOutOfRange)
} else {
Ok(block.block_number)
}
}

pub async fn get_transaction_by_hash(&self, hash: TxHash) -> Result<Tx, Error> {
let tx = self.provider.get_transaction_by_hash(hash).await?;
Ok(tx.into())
Expand Down Expand Up @@ -108,7 +133,7 @@ impl<P: Provider> ForkedClient<P> {
block.block_number
}
starknet::core::types::MaybePendingBlockWithTxHashes::PendingBlock(_) => {
panic!("shouldn't be possible to be pending")
return Err(Error::UnexpectedPendingData);
}
};

Expand All @@ -119,9 +144,7 @@ impl<P: Provider> ForkedClient<P> {
Ok(tx?.into())
}

BlockIdOrTag::Tag(_) => {
panic!("shouldn't be possible to be tag")
}
BlockIdOrTag::Tag(_) => Err(Error::BlockTagNotAllowed),
}
}

Expand All @@ -141,7 +164,7 @@ impl<P: Provider> ForkedClient<P> {
}

starknet::core::types::MaybePendingBlockWithTxs::PendingBlock(_) => {
panic!("shouldn't be possible to be pending")
Err(Error::UnexpectedPendingData)
}
}
}
Expand All @@ -159,7 +182,7 @@ impl<P: Provider> ForkedClient<P> {
}
}
starknet::core::types::MaybePendingBlockWithReceipts::PendingBlock(_) => {
panic!("shouldn't be possible to be pending")
return Err(Error::UnexpectedPendingData);
}
}

Expand All @@ -179,7 +202,7 @@ impl<P: Provider> ForkedClient<P> {
}
}
starknet::core::types::MaybePendingBlockWithTxHashes::PendingBlock(_) => {
panic!("shouldn't be possible to be pending")
return Err(Error::UnexpectedPendingData);
}
}

Expand All @@ -201,7 +224,7 @@ impl<P: Provider> ForkedClient<P> {
}
}
BlockIdOrTag::Tag(_) => {
panic!("shouldn't be possible to be tag")
return Err(Error::BlockTagNotAllowed);
}
_ => {}
}
Expand All @@ -228,21 +251,84 @@ impl<P: Provider> ForkedClient<P> {
}
}
BlockIdOrTag::Tag(_) => {
panic!("shouldn't be possible to be tag")
return Err(Error::BlockTagNotAllowed);
}
_ => {}
}

let state_update = self.provider.get_state_update(block_id).await?;
Ok(state_update.into())
}

// NOTE(kariy): The reason why I don't just use EventFilter as a param, bcs i wanna make sure
// the from/to blocks are not None. maybe should do the same for other methods that accept a
// BlockId in some way?
pub async fn get_events(
&self,
from: BlockNumber,
to: BlockNumber,
address: Option<ContractAddress>,
keys: Option<Vec<Vec<Felt>>>,
continuation_token: Option<String>,
chunk_size: u64,
) -> Result<EventsPage, Error> {
if from > self.block || to > self.block {
return Err(Error::BlockOutOfRange);
}

let from_block = Some(BlockIdOrTag::Number(from));
let to_block = Some(BlockIdOrTag::Number(to));
let address = address.map(Felt::from);
let filter = EventFilter { from_block, to_block, address, keys };

let events = self.provider.get_events(filter, continuation_token, chunk_size).await?;

Ok(events)
}
}

impl From<Error> for StarknetApiError {
fn from(value: Error) -> Self {
match value {
Error::Provider(provider_error) => provider_error.into(),
Error::BlockOutOfRange => StarknetApiError::BlockNotFound,
Error::BlockTagNotAllowed | Error::UnexpectedPendingData => {
StarknetApiError::UnexpectedError { reason: value.to_string() }
}
}
}
}

#[cfg(test)]
mod tests {
use katana_primitives::felt;
use url::Url;

use super::*;

const SEPOLIA_URL: &str = "https://api.cartridge.gg/x/starknet/sepolia";
const FORK_BLOCK_NUMBER: BlockNumber = 268_471;

#[tokio::test]
async fn get_block_hash() {
let url = Url::parse(SEPOLIA_URL).unwrap();
let client = ForkedClient::new_http(url, FORK_BLOCK_NUMBER);

// -----------------------------------------------------------------------
// Block before the forked block

// https://sepolia.voyager.online/block/0x4dfd88ba652622450c7758b49ac4a2f23b1fa8e6676297333ea9c97d0756c7a
let hash = felt!("0x4dfd88ba652622450c7758b49ac4a2f23b1fa8e6676297333ea9c97d0756c7a");
let number =
client.get_block_number_by_hash(hash).await.expect("failed to get block number");
assert_eq!(number, 268469);

// -----------------------------------------------------------------------
// Block after the forked block (exists only in the forked chain)

// https://sepolia.voyager.online/block/0x335a605f2c91873f8f830a6e5285e704caec18503ca28c18485ea6f682eb65e
let hash = felt!("0x335a605f2c91873f8f830a6e5285e704caec18503ca28c18485ea6f682eb65e");
let err = client.get_block_number_by_hash(hash).await.expect_err("should return an error");
assert!(matches!(err, Error::BlockOutOfRange));
}
}
Loading

0 comments on commit e2d96b4

Please sign in to comment.