diff --git a/Cargo.lock b/Cargo.lock index c0f3194955..5112d35e14 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8310,6 +8310,7 @@ dependencies = [ "rstest 0.18.2", "serde", "serde_json", + "similar-asserts", "starknet 0.12.0", "tempfile", "thiserror", diff --git a/crates/katana/primitives/src/event.rs b/crates/katana/primitives/src/event.rs index fa65129a9f..9d2d45d711 100644 --- a/crates/katana/primitives/src/event.rs +++ b/crates/katana/primitives/src/event.rs @@ -20,7 +20,7 @@ pub struct OrderedEvent { /// /// There JSON-RPC specification does not specify the format of the continuation token, /// so how the node should handle it is implementation specific. -#[derive(PartialEq, Eq, Debug, Default)] +#[derive(PartialEq, Eq, Debug, Clone, Default)] pub struct ContinuationToken { /// The block number to continue from. pub block_n: u64, @@ -61,8 +61,59 @@ impl fmt::Display for ContinuationToken { } } +/// Represents a continuation token that can either be a Katana native [`ContinuationToken`] or a +/// continuation token returned by the forked provider. +/// +/// This is only used in the `starknet_getEvents` API. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum MaybeForkedContinuationToken { + /// A continuation token returned by the forked provider. + /// Used to tell Katana to continue fetching events from the forked provider. + /// + /// It's a string because there is no a guarantee format. + Forked(String), + /// A Katana specific continuation token. Used to tell Katana the next events to fetch is in the + /// local blocks and not in the forked provider. + Token(ContinuationToken), +} + +impl MaybeForkedContinuationToken { + /// Parses a continuation token from a string. It can be either a Katana native + /// [`ContinuationToken`] or a forked token. The forked token is identified by the prefix + /// `FK_`. + pub fn parse(value: &str) -> Result { + const FORKED_TOKEN_PREFIX: &str = "FK_"; + if let Some(token) = value.strip_prefix(FORKED_TOKEN_PREFIX) { + Ok(MaybeForkedContinuationToken::Forked(token.to_string())) + } else { + let token = ContinuationToken::parse(value)?; + Ok(MaybeForkedContinuationToken::Token(token)) + } + } + + /// Tries to convert the continuation token to a Katana native [`ContinuationToken`]. `None` if + /// the continuation token is a forked token. + pub fn to_token(self) -> Option { + match self { + MaybeForkedContinuationToken::Token(token) => Some(token), + _ => None, + } + } +} + +impl std::fmt::Display for MaybeForkedContinuationToken { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + MaybeForkedContinuationToken::Token(token) => write!(f, "{token}"), + MaybeForkedContinuationToken::Forked(token) => write!(f, "FK_{token}"), + } + } +} + #[cfg(test)] mod test { + use assert_matches::assert_matches; + use super::*; #[test] @@ -115,4 +166,21 @@ mod test { ContinuationTokenError::ParseFailed(_) ); } + + #[test] + fn parse_forked_token_works() { + let forked_token = "FK_test_token"; + let parsed = MaybeForkedContinuationToken::parse(forked_token).unwrap(); + assert_matches!(parsed, MaybeForkedContinuationToken::Forked(s) => { + assert_eq!(s, "test_token") + }); + + let regular_token = "1e,ff,4"; + let parsed = MaybeForkedContinuationToken::parse(regular_token).unwrap(); + assert_matches!(parsed, MaybeForkedContinuationToken::Token(t) => { + assert_eq!(t.block_n, 30); + assert_eq!(t.txn_n, 255); + assert_eq!(t.event_n, 4); + }); + } } diff --git a/crates/katana/rpc/rpc/Cargo.toml b/crates/katana/rpc/rpc/Cargo.toml index 4f5cd75c30..3d73b86ca4 100644 --- a/crates/katana/rpc/rpc/Cargo.toml +++ b/crates/katana/rpc/rpc/Cargo.toml @@ -45,5 +45,6 @@ rand.workspace = true rstest.workspace = true serde.workspace = true serde_json.workspace = true +similar-asserts.workspace = true tempfile.workspace = true tokio.workspace = true diff --git a/crates/katana/rpc/rpc/src/starknet/forking.rs b/crates/katana/rpc/rpc/src/starknet/forking.rs index e15dbfdc33..4cdfbe07f9 100644 --- a/crates/katana/rpc/rpc/src/starknet/forking.rs +++ b/crates/katana/rpc/rpc/src/starknet/forking.rs @@ -1,13 +1,16 @@ -use katana_primitives::block::{BlockIdOrTag, BlockNumber}; +use katana_primitives::block::{BlockHash, BlockIdOrTag, BlockNumber}; +use katana_primitives::contract::ContractAddress; use katana_primitives::transaction::TxHash; +use katana_primitives::Felt; use katana_rpc_types::block::{ MaybePendingBlockWithReceipts, MaybePendingBlockWithTxHashes, MaybePendingBlockWithTxs, }; use katana_rpc_types::error::starknet::StarknetApiError; +use katana_rpc_types::event::EventsPage; use katana_rpc_types::receipt::TxReceiptWithBlockInfo; use katana_rpc_types::state_update::MaybePendingStateUpdate; use katana_rpc_types::transaction::Tx; -use starknet::core::types::TransactionStatus; +use starknet::core::types::{EventFilter, TransactionStatus}; use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::{JsonRpcClient, Provider, ProviderError}; use url::Url; @@ -20,6 +23,12 @@ pub enum Error { #[error("Block out of range")] BlockOutOfRange, + + #[error("Not allowed to use block tag as a block identifier")] + BlockTagNotAllowed, + + #[error("Unexpected pending data")] + UnexpectedPendingData, } #[derive(Debug)] @@ -50,6 +59,22 @@ impl ForkedClient { } impl ForkedClient

{ + pub async fn get_block_number_by_hash(&self, hash: BlockHash) -> Result { + use starknet::core::types::MaybePendingBlockWithTxHashes as StarknetRsMaybePendingBlockWithTxHashes; + + let block = self.provider.get_block_with_tx_hashes(BlockIdOrTag::Hash(hash)).await?; + // Pending block doesn't have a hash yet, so if we get a pending block, we return an error. + let StarknetRsMaybePendingBlockWithTxHashes::Block(block) = block else { + return Err(Error::UnexpectedPendingData); + }; + + if block.block_number > self.block { + Err(Error::BlockOutOfRange) + } else { + Ok(block.block_number) + } + } + pub async fn get_transaction_by_hash(&self, hash: TxHash) -> Result { let tx = self.provider.get_transaction_by_hash(hash).await?; Ok(tx.into()) @@ -108,7 +133,7 @@ impl ForkedClient

{ block.block_number } starknet::core::types::MaybePendingBlockWithTxHashes::PendingBlock(_) => { - panic!("shouldn't be possible to be pending") + return Err(Error::UnexpectedPendingData); } }; @@ -119,9 +144,7 @@ impl ForkedClient

{ Ok(tx?.into()) } - BlockIdOrTag::Tag(_) => { - panic!("shouldn't be possible to be tag") - } + BlockIdOrTag::Tag(_) => Err(Error::BlockTagNotAllowed), } } @@ -141,7 +164,7 @@ impl ForkedClient

{ } starknet::core::types::MaybePendingBlockWithTxs::PendingBlock(_) => { - panic!("shouldn't be possible to be pending") + Err(Error::UnexpectedPendingData) } } } @@ -159,7 +182,7 @@ impl ForkedClient

{ } } starknet::core::types::MaybePendingBlockWithReceipts::PendingBlock(_) => { - panic!("shouldn't be possible to be pending") + return Err(Error::UnexpectedPendingData); } } @@ -179,7 +202,7 @@ impl ForkedClient

{ } } starknet::core::types::MaybePendingBlockWithTxHashes::PendingBlock(_) => { - panic!("shouldn't be possible to be pending") + return Err(Error::UnexpectedPendingData); } } @@ -201,7 +224,7 @@ impl ForkedClient

{ } } BlockIdOrTag::Tag(_) => { - panic!("shouldn't be possible to be tag") + return Err(Error::BlockTagNotAllowed); } _ => {} } @@ -228,7 +251,7 @@ impl ForkedClient

{ } } BlockIdOrTag::Tag(_) => { - panic!("shouldn't be possible to be tag") + return Err(Error::BlockTagNotAllowed); } _ => {} } @@ -236,6 +259,32 @@ impl ForkedClient

{ let state_update = self.provider.get_state_update(block_id).await?; Ok(state_update.into()) } + + // NOTE(kariy): The reason why I don't just use EventFilter as a param, bcs i wanna make sure + // the from/to blocks are not None. maybe should do the same for other methods that accept a + // BlockId in some way? + pub async fn get_events( + &self, + from: BlockNumber, + to: BlockNumber, + address: Option, + keys: Option>>, + continuation_token: Option, + chunk_size: u64, + ) -> Result { + if from > self.block || to > self.block { + return Err(Error::BlockOutOfRange); + } + + let from_block = Some(BlockIdOrTag::Number(from)); + let to_block = Some(BlockIdOrTag::Number(to)); + let address = address.map(Felt::from); + let filter = EventFilter { from_block, to_block, address, keys }; + + let events = self.provider.get_events(filter, continuation_token, chunk_size).await?; + + Ok(events) + } } impl From for StarknetApiError { @@ -243,6 +292,43 @@ impl From for StarknetApiError { match value { Error::Provider(provider_error) => provider_error.into(), Error::BlockOutOfRange => StarknetApiError::BlockNotFound, + Error::BlockTagNotAllowed | Error::UnexpectedPendingData => { + StarknetApiError::UnexpectedError { reason: value.to_string() } + } } } } + +#[cfg(test)] +mod tests { + use katana_primitives::felt; + use url::Url; + + use super::*; + + const SEPOLIA_URL: &str = "https://api.cartridge.gg/x/starknet/sepolia"; + const FORK_BLOCK_NUMBER: BlockNumber = 268_471; + + #[tokio::test] + async fn get_block_hash() { + let url = Url::parse(SEPOLIA_URL).unwrap(); + let client = ForkedClient::new_http(url, FORK_BLOCK_NUMBER); + + // ----------------------------------------------------------------------- + // Block before the forked block + + // https://sepolia.voyager.online/block/0x4dfd88ba652622450c7758b49ac4a2f23b1fa8e6676297333ea9c97d0756c7a + let hash = felt!("0x4dfd88ba652622450c7758b49ac4a2f23b1fa8e6676297333ea9c97d0756c7a"); + let number = + client.get_block_number_by_hash(hash).await.expect("failed to get block number"); + assert_eq!(number, 268469); + + // ----------------------------------------------------------------------- + // Block after the forked block (exists only in the forked chain) + + // https://sepolia.voyager.online/block/0x335a605f2c91873f8f830a6e5285e704caec18503ca28c18485ea6f682eb65e + let hash = felt!("0x335a605f2c91873f8f830a6e5285e704caec18503ca28c18485ea6f682eb65e"); + let err = client.get_block_number_by_hash(hash).await.expect_err("should return an error"); + assert!(matches!(err, Error::BlockOutOfRange)); + } +} diff --git a/crates/katana/rpc/rpc/src/starknet/mod.rs b/crates/katana/rpc/rpc/src/starknet/mod.rs index b29f2da609..be7f65200c 100644 --- a/crates/katana/rpc/rpc/src/starknet/mod.rs +++ b/crates/katana/rpc/rpc/src/starknet/mod.rs @@ -22,7 +22,7 @@ use katana_primitives::contract::{ContractAddress, Nonce, StorageKey, StorageVal use katana_primitives::conversion::rpc::legacy_inner_to_rpc_class; use katana_primitives::da::L1DataAvailabilityMode; use katana_primitives::env::BlockEnv; -use katana_primitives::event::ContinuationToken; +use katana_primitives::event::MaybeForkedContinuationToken; use katana_primitives::transaction::{ExecutableTxWithHash, TxHash}; use katana_primitives::Felt; use katana_provider::traits::block::{BlockHashProvider, BlockIdReader, BlockNumberProvider}; @@ -37,6 +37,7 @@ use katana_rpc_types::block::{ PendingBlockWithReceipts, PendingBlockWithTxHashes, PendingBlockWithTxs, }; use katana_rpc_types::error::starknet::StarknetApiError; +use katana_rpc_types::event::{EventFilterWithPage, EventsPage}; use katana_rpc_types::receipt::{ReceiptBlock, TxReceiptWithBlockInfo}; use katana_rpc_types::state_update::MaybePendingStateUpdate; use katana_rpc_types::transaction::Tx; @@ -44,7 +45,7 @@ use katana_rpc_types::FeeEstimate; use katana_rpc_types_builder::ReceiptBuilder; use katana_tasks::{BlockingTaskPool, TokioTaskSpawner}; use starknet::core::types::{ - ContractClass, EventsPage, PriceUnit, TransactionExecutionStatus, TransactionStatus, + ContractClass, PriceUnit, ResultPageRequest, TransactionExecutionStatus, TransactionStatus, }; use crate::utils; @@ -490,133 +491,6 @@ impl StarknetApi { } } - // TODO: should document more and possible find a simpler solution(?) - fn events( - &self, - from_block: BlockIdOrTag, - to_block: BlockIdOrTag, - address: Option, - keys: Option>>, - continuation_token: Option, - chunk_size: u64, - ) -> StarknetApiResult { - let provider = self.inner.backend.blockchain.provider(); - - let from = if BlockIdOrTag::Tag(BlockTag::Pending) == from_block { - EventBlockId::Pending - } else { - let num = provider.convert_block_id(from_block)?; - EventBlockId::Num(num.ok_or(StarknetApiError::BlockNotFound)?) - }; - - let to = if BlockIdOrTag::Tag(BlockTag::Pending) == to_block { - EventBlockId::Pending - } else { - let num = provider.convert_block_id(to_block)?; - EventBlockId::Num(num.ok_or(StarknetApiError::BlockNotFound)?) - }; - - let token: Option = match continuation_token { - Some(token) => Some(ContinuationToken::parse(&token)?.into()), - None => None, - }; - - // reserved buffer to fill up with events to avoid reallocations - let mut buffer = Vec::with_capacity(chunk_size as usize); - let filter = utils::events::Filter { address, keys }; - - match (from, to) { - (EventBlockId::Num(from), EventBlockId::Num(to)) => { - let cursor = utils::events::fetch_events_at_blocks( - provider, - from..=to, - &filter, - chunk_size, - token, - &mut buffer, - )?; - - Ok(EventsPage { - events: buffer, - continuation_token: cursor.map(|c| c.into_rpc_cursor().to_string()), - }) - } - - (EventBlockId::Num(from), EventBlockId::Pending) => { - let latest = provider.latest_number()?; - let int_cursor = utils::events::fetch_events_at_blocks( - provider, - from..=latest, - &filter, - chunk_size, - token.clone(), - &mut buffer, - )?; - - // if the internal cursor is Some, meaning the buffer is full and we havent - // reached the latest block. - if let Some(c) = int_cursor { - return Ok(EventsPage { - events: buffer, - continuation_token: Some(c.into_rpc_cursor().to_string()), - }); - } - - if let Some(executor) = self.pending_executor() { - let cursor = utils::events::fetch_pending_events( - &executor, - &filter, - chunk_size, - token, - &mut buffer, - )?; - - Ok(EventsPage { - events: buffer, - continuation_token: Some(cursor.into_rpc_cursor().to_string()), - }) - } else { - let cursor = Cursor::new_block(latest + 1); - Ok(EventsPage { - events: buffer, - continuation_token: Some(cursor.into_rpc_cursor().to_string()), - }) - } - } - - (EventBlockId::Pending, EventBlockId::Pending) => { - if let Some(executor) = self.pending_executor() { - let cursor = utils::events::fetch_pending_events( - &executor, - &filter, - chunk_size, - token, - &mut buffer, - )?; - - Ok(EventsPage { - events: buffer, - continuation_token: Some(cursor.into_rpc_cursor().to_string()), - }) - } else { - let latest = provider.latest_number()?; - let cursor = Cursor::new_block(latest); - - Ok(EventsPage { - events: buffer, - continuation_token: Some(cursor.into_rpc_cursor().to_string()), - }) - } - } - - (EventBlockId::Pending, EventBlockId::Num(_)) => { - Err(StarknetApiError::UnexpectedError { - reason: "Invalid block range; `from` block must be lower than `to`".to_string(), - }) - } - } - } - async fn transaction_status(&self, hash: TxHash) -> StarknetApiResult { let status = self .on_io_blocking_task(move |this| { @@ -928,4 +802,300 @@ impl StarknetApi { Err(StarknetApiError::BlockNotFound) } } + + async fn events(&self, filter: EventFilterWithPage) -> StarknetApiResult { + let EventFilterWithPage { event_filter, result_page_request } = filter; + let ResultPageRequest { continuation_token, chunk_size } = result_page_request; + + self.on_io_blocking_task(move |this| { + let from = match event_filter.from_block { + Some(id) => id, + None => BlockIdOrTag::Number(0), + }; + + let to = match event_filter.to_block { + Some(id) => id, + None => BlockIdOrTag::Tag(BlockTag::Pending), + }; + + let keys = event_filter.keys.filter(|keys| !(keys.len() == 1 && keys.is_empty())); + let continuation_token = if let Some(token) = continuation_token { + Some(MaybeForkedContinuationToken::parse(&token)?) + } else { + None + }; + + let events = this.events_inner( + from, + to, + event_filter.address.map(|f| f.into()), + keys, + continuation_token, + chunk_size, + )?; + + Ok(events) + }) + .await + } + + fn forked_client(&self) -> Option<&ForkedClient> { + self.inner.forked_client.as_ref() + } + + // TODO: should document more and possible find a simpler solution(?) + fn events_inner( + &self, + from_block: BlockIdOrTag, + to_block: BlockIdOrTag, + address: Option, + keys: Option>>, + continuation_token: Option, + chunk_size: u64, + ) -> StarknetApiResult { + let provider = self.inner.backend.blockchain.provider(); + + let from = self.resolve_event_block_id_if_forked(from_block)?; + let to = self.resolve_event_block_id_if_forked(to_block)?; + + // reserved buffer to fill up with events to avoid reallocations + let mut events = Vec::with_capacity(chunk_size as usize); + let filter = utils::events::Filter { address, keys: keys.clone() }; + + match (from, to) { + (EventBlockId::Num(from), EventBlockId::Num(to)) => { + // 1. check if the from and to block is lower than the forked block + // 2. if both are lower, then we can fetch the events from the provider + + // first determine whether the continuation token is from the forked client + let from_after_forked_if_any = if let Some(client) = &self.inner.forked_client { + let forked_block = *client.block(); + + // if the from block is lower than the forked block, we fetch events from the + // forked client + if from <= forked_block { + // if the to_block is greater than the forked block, we limit the to_block + // up until the forked block + let to = if to <= forked_block { to } else { forked_block }; + + // basically this is to determine that if the token is a katana native + // token, then we can skip fetching from the forked + // network. but if theres no token at all, or the + // token is a forked token, then we need to fetch from the forked network. + // + // TODO: simplify this + let forked_token = Some(continuation_token.clone()).and_then(|t| match t { + None => Some(None), + Some(t) => match t { + MaybeForkedContinuationToken::Token(_) => None, + MaybeForkedContinuationToken::Forked(t) => { + Some(Some(t.to_string())) + } + }, + }); + + // check if the continuation token is a forked continuation token + // if not we skip fetching from forked network + if let Some(token) = forked_token { + let forked_result = futures::executor::block_on( + client.get_events(from, to, address, keys, token, chunk_size), + )?; + + events.extend(forked_result.events); + + // return early if a token is present + if let Some(token) = forked_result.continuation_token { + let token = MaybeForkedContinuationToken::Forked(token); + let continuation_token = Some(token.to_string()); + return Ok(EventsPage { events, continuation_token }); + } + } + } + + // we start from block + 1 because we dont have the events locally and we may + // have fetched it from the forked network earlier + *client.block() + 1 + } else { + from + }; + + let cursor = continuation_token.and_then(|t| t.to_token().map(|t| t.into())); + let block_range = from_after_forked_if_any..=to; + + let cursor = utils::events::fetch_events_at_blocks( + provider, + block_range, + &filter, + chunk_size, + cursor, + &mut events, + )?; + + let continuation_token = cursor.map(|c| c.into_rpc_cursor().to_string()); + let events_page = EventsPage { events, continuation_token }; + + Ok(events_page) + } + + (EventBlockId::Num(from), EventBlockId::Pending) => { + // 1. check if the from and to block is lower than the forked block + // 2. if both are lower, then we can fetch the events from the provider + + // first determine whether the continuation token is from the forked client + let from_after_forked_if_any = if let Some(client) = &self.inner.forked_client { + let forked_block = *client.block(); + + // if the from block is lower than the forked block, we fetch events from the + // forked client + if from <= forked_block { + // we limit the to_block up until the forked block bcs pending block is + // pointing to a locally block + let to = forked_block; + + // basically this is to determine that if the token is a katana native + // token, then we can skip fetching from the forked + // network. but if theres no token at all, or the + // token is a forked token, then we need to fetch from the forked network. + // + // TODO: simplify this + let forked_token = Some(continuation_token.clone()).and_then(|t| match t { + None => Some(None), + Some(t) => match t { + MaybeForkedContinuationToken::Token(_) => None, + MaybeForkedContinuationToken::Forked(t) => { + Some(Some(t.to_string())) + } + }, + }); + + // check if the continuation token is a forked continuation token + // if not we skip fetching from forked network + if let Some(token) = forked_token { + let forked_result = futures::executor::block_on( + client.get_events(from, to, address, keys, token, chunk_size), + )?; + + events.extend(forked_result.events); + + // return early if a token is present + if let Some(token) = forked_result.continuation_token { + let token = MaybeForkedContinuationToken::Forked(token); + let continuation_token = Some(token.to_string()); + return Ok(EventsPage { events, continuation_token }); + } + } + } + + // we start from block + 1 because we dont have the events locally and we may + // have fetched it from the forked network earlier + *client.block() + 1 + } else { + from + }; + + let cursor = continuation_token.and_then(|t| t.to_token().map(|t| t.into())); + let latest = provider.latest_number()?; + let block_range = from_after_forked_if_any..=latest; + + let int_cursor = utils::events::fetch_events_at_blocks( + provider, + block_range, + &filter, + chunk_size, + cursor.clone(), + &mut events, + )?; + + // if the internal cursor is Some, meaning the buffer is full and we havent + // reached the latest block. + if let Some(c) = int_cursor { + let continuation_token = Some(c.into_rpc_cursor().to_string()); + return Ok(EventsPage { events, continuation_token }); + } + + if let Some(executor) = self.pending_executor() { + let cursor = utils::events::fetch_pending_events( + &executor, + &filter, + chunk_size, + cursor, + &mut events, + )?; + + let continuation_token = Some(cursor.into_rpc_cursor().to_string()); + Ok(EventsPage { events, continuation_token }) + } else { + let cursor = Cursor::new_block(latest + 1); + let continuation_token = Some(cursor.into_rpc_cursor().to_string()); + Ok(EventsPage { events, continuation_token }) + } + } + + (EventBlockId::Pending, EventBlockId::Pending) => { + if let Some(executor) = self.pending_executor() { + let cursor = continuation_token.and_then(|t| t.to_token().map(|t| t.into())); + let new_cursor = utils::events::fetch_pending_events( + &executor, + &filter, + chunk_size, + cursor, + &mut events, + )?; + + let continuation_token = Some(new_cursor.into_rpc_cursor().to_string()); + Ok(EventsPage { events, continuation_token }) + } else { + let latest = provider.latest_number()?; + let new_cursor = Cursor::new_block(latest); + + let continuation_token = Some(new_cursor.into_rpc_cursor().to_string()); + Ok(EventsPage { events, continuation_token }) + } + } + + (EventBlockId::Pending, EventBlockId::Num(_)) => { + Err(StarknetApiError::UnexpectedError { + reason: "Invalid block range; `from` block must be lower than `to`".to_string(), + }) + } + } + } + + // Determine the block number based on its Id. In the case where the block id is a hash, we need + // to check if the block is in the forked client AND within the valid range (ie lower than + // forked block). + fn resolve_event_block_id_if_forked( + &self, + id: BlockIdOrTag, + ) -> StarknetApiResult { + let provider = self.inner.backend.blockchain.provider(); + + let id = match id { + BlockIdOrTag::Tag(BlockTag::Pending) => EventBlockId::Pending, + BlockIdOrTag::Number(num) => EventBlockId::Num(num), + + BlockIdOrTag::Tag(BlockTag::Latest) => { + let num = provider.convert_block_id(id)?; + EventBlockId::Num(num.ok_or(StarknetApiError::BlockNotFound)?) + } + + BlockIdOrTag::Hash(hash) => { + // Check first if the block hash belongs to a local block. + if let Some(num) = provider.convert_block_id(id)? { + EventBlockId::Num(num) + } + // If not, check if the block hash belongs to a forked block. + else if let Some(client) = self.forked_client() { + let num = futures::executor::block_on(client.get_block_number_by_hash(hash))?; + EventBlockId::Num(num) + } + // Otherwise the block hash is not found. + else { + return Err(StarknetApiError::BlockNotFound); + } + } + }; + + Ok(id) + } } diff --git a/crates/katana/rpc/rpc/src/starknet/read.rs b/crates/katana/rpc/rpc/src/starknet/read.rs index 20d5c23173..1aa5a07686 100644 --- a/crates/katana/rpc/rpc/src/starknet/read.rs +++ b/crates/katana/rpc/rpc/src/starknet/read.rs @@ -17,7 +17,7 @@ use katana_rpc_types::transaction::{BroadcastedTx, Tx}; use katana_rpc_types::{ ContractClass, FeeEstimate, FeltAsHex, FunctionCall, SimulationFlagForEstimateFee, }; -use starknet::core::types::{BlockTag, TransactionStatus}; +use starknet::core::types::TransactionStatus; use super::StarknetApi; @@ -120,33 +120,7 @@ impl StarknetApiServer for StarknetApi { } async fn get_events(&self, filter: EventFilterWithPage) -> RpcResult { - self.on_io_blocking_task(move |this| { - let EventFilterWithPage { event_filter, result_page_request } = filter; - - let from = match event_filter.from_block { - Some(id) => id, - None => BlockIdOrTag::Number(0), - }; - - let to = match event_filter.to_block { - Some(id) => id, - None => BlockIdOrTag::Tag(BlockTag::Pending), - }; - - let keys = event_filter.keys.filter(|keys| !(keys.len() == 1 && keys.is_empty())); - - let events = this.events( - from, - to, - event_filter.address.map(|f| f.into()), - keys, - result_page_request.continuation_token, - result_page_request.chunk_size, - )?; - - Ok(events) - }) - .await + Ok(self.events(filter).await?) } async fn call( diff --git a/crates/katana/rpc/rpc/tests/forking.rs b/crates/katana/rpc/rpc/tests/forking.rs index 71372f981b..fd2abf34be 100644 --- a/crates/katana/rpc/rpc/tests/forking.rs +++ b/crates/katana/rpc/rpc/tests/forking.rs @@ -4,12 +4,13 @@ use cainome::rs::abigen_legacy; use dojo_test_utils::sequencer::{get_default_test_config, TestSequencer}; use katana_node::config::fork::ForkingConfig; use katana_node::config::SequencingConfig; -use katana_primitives::block::{BlockHash, BlockHashOrNumber, BlockIdOrTag, BlockNumber}; +use katana_primitives::block::{BlockHash, BlockHashOrNumber, BlockIdOrTag, BlockNumber, BlockTag}; use katana_primitives::chain::NamedChainId; +use katana_primitives::event::MaybeForkedContinuationToken; use katana_primitives::genesis::constant::DEFAULT_ETH_FEE_TOKEN_ADDRESS; use katana_primitives::transaction::TxHash; use katana_primitives::{felt, Felt}; -use starknet::core::types::{MaybePendingBlockWithTxHashes, StarknetError}; +use starknet::core::types::{EventFilter, MaybePendingBlockWithTxHashes, StarknetError}; use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::{JsonRpcClient, Provider, ProviderError}; use url::Url; @@ -19,6 +20,8 @@ mod common; const SEPOLIA_CHAIN_ID: Felt = NamedChainId::SN_SEPOLIA; const SEPOLIA_URL: &str = "https://api.cartridge.gg/x/starknet/sepolia"; const FORK_BLOCK_NUMBER: BlockNumber = 268_471; +const FORK_BLOCK_HASH: BlockHash = + felt!("0x208950cfcbba73ecbda1c14e4d58d66a8d60655ea1b9dcf07c16014ae8a93cd"); fn forking_cfg() -> ForkingConfig { ForkingConfig { @@ -34,8 +37,9 @@ type LocalTestVector = Vec<((BlockNumber, BlockHash), TxHash)>; /// a single transaction. /// /// The returned [`TestVector`] is a list of all the locally created blocks and transactions. -async fn setup_test() -> (TestSequencer, impl Provider, LocalTestVector) { +async fn setup_test_inner(no_mining: bool) -> (TestSequencer, impl Provider, LocalTestVector) { let mut config = get_default_test_config(SequencingConfig::default()); + config.sequencing.no_mining = no_mining; config.forking = Some(forking_cfg()); let sequencer = TestSequencer::start(config).await; @@ -47,27 +51,50 @@ async fn setup_test() -> (TestSequencer, impl Provider, LocalTestVector) { abigen_legacy!(FeeToken, "crates/katana/rpc/rpc/tests/test_data/erc20.json"); let contract = FeeToken::new(DEFAULT_ETH_FEE_TOKEN_ADDRESS.into(), sequencer.account()); - // we're in auto mining, each transaction will create a new block - for i in 1..=10 { - let amount = Uint256 { low: Felt::ONE, high: Felt::ZERO }; - let res = contract.transfer(&Felt::ONE, &amount).send().await.unwrap(); - let _ = dojo_utils::TransactionWaiter::new(res.transaction_hash, &provider).await.unwrap(); - - let block_num = FORK_BLOCK_NUMBER + i; - - let block_id = BlockIdOrTag::Number(block_num); - let block = provider.get_block_with_tx_hashes(block_id).await.unwrap(); - let block_hash = match block { - MaybePendingBlockWithTxHashes::Block(b) => b.block_hash, - _ => panic!("Expected a block"), - }; - - txs_vector.push(((FORK_BLOCK_NUMBER + i, block_hash), res.transaction_hash)); + if no_mining { + // In no mining mode, bcs we're not producing any blocks, the transactions that we send + // will all be included in the same block (pending). + for _ in 1..=10 { + let amount = Uint256 { low: Felt::ONE, high: Felt::ZERO }; + let res = contract.transfer(&Felt::ONE, &amount).send().await.unwrap(); + dojo_utils::TransactionWaiter::new(res.transaction_hash, &provider).await.unwrap(); + + // events in pending block doesn't have block hash and number, so we can safely put + // dummy values here. + txs_vector.push(((0, Felt::ZERO), res.transaction_hash)); + } + } else { + // We're in auto mining, each transaction will create a new block + for i in 1..=10 { + let amount = Uint256 { low: Felt::ONE, high: Felt::ZERO }; + let res = contract.transfer(&Felt::ONE, &amount).send().await.unwrap(); + let _ = + dojo_utils::TransactionWaiter::new(res.transaction_hash, &provider).await.unwrap(); + + let block_num = FORK_BLOCK_NUMBER + i; + + let block_id = BlockIdOrTag::Number(block_num); + let block = provider.get_block_with_tx_hashes(block_id).await.unwrap(); + let block_hash = match block { + MaybePendingBlockWithTxHashes::Block(b) => b.block_hash, + _ => panic!("Expected a block"), + }; + + txs_vector.push(((FORK_BLOCK_NUMBER + i, block_hash), res.transaction_hash)); + } } (sequencer, provider, txs_vector) } +async fn setup_test() -> (TestSequencer, impl Provider, LocalTestVector) { + setup_test_inner(false).await +} + +async fn setup_test_pending() -> (TestSequencer, impl Provider, LocalTestVector) { + setup_test_inner(true).await +} + #[tokio::test] async fn can_fork() -> Result<()> { let (_sequencer, provider, _) = setup_test().await; @@ -82,7 +109,7 @@ async fn can_fork() -> Result<()> { } #[tokio::test] -async fn forked_blocks_from_num() -> Result<()> { +async fn get_blocks_from_num() -> Result<()> { use starknet::core::types::{ MaybePendingBlockWithReceipts, MaybePendingBlockWithTxHashes, MaybePendingBlockWithTxs, }; @@ -202,7 +229,7 @@ async fn forked_blocks_from_num() -> Result<()> { } #[tokio::test] -async fn forked_blocks_from_hash() -> Result<()> { +async fn get_blocks_from_hash() -> Result<()> { use starknet::core::types::{ MaybePendingBlockWithReceipts, MaybePendingBlockWithTxHashes, MaybePendingBlockWithTxs, }; @@ -322,7 +349,7 @@ async fn forked_blocks_from_hash() -> Result<()> { } #[tokio::test] -async fn forked_transactions() -> Result<()> { +async fn get_transactions() -> Result<()> { let (_sequencer, provider, local_only_data) = setup_test().await; // ----------------------------------------------------------------------- @@ -385,3 +412,272 @@ async fn forked_transactions() -> Result<()> { Ok(()) } + +#[tokio::test] +#[rstest::rstest] +#[case(BlockIdOrTag::Number(FORK_BLOCK_NUMBER))] +#[case(BlockIdOrTag::Hash(felt!("0x208950cfcbba73ecbda1c14e4d58d66a8d60655ea1b9dcf07c16014ae8a93cd")))] +async fn get_events_partially_from_forked(#[case] block_id: BlockIdOrTag) -> Result<()> { + let (_sequencer, provider, _) = setup_test().await; + let forked_provider = JsonRpcClient::new(HttpTransport::new(Url::parse(SEPOLIA_URL)?)); + + // ----------------------------------------------------------------------- + // Fetch events partially from forked block. + // + // Here we want to make sure the continuation token is working as expected. + + let filter = EventFilter { + keys: None, + address: None, + to_block: Some(block_id), + from_block: Some(block_id), + }; + + // events fetched directly from the forked chain. + let result = forked_provider.get_events(filter.clone(), None, 5).await?; + let events = result.events; + + // events fetched through the forked katana. + let result = provider.get_events(filter, None, 5).await?; + let forked_events = result.events; + + let token = MaybeForkedContinuationToken::parse(&result.continuation_token.unwrap())?; + assert_matches!(token, MaybeForkedContinuationToken::Forked(_)); + + for (a, b) in events.iter().zip(forked_events) { + assert_eq!(a.block_number, Some(FORK_BLOCK_NUMBER)); + assert_eq!(a.block_hash, Some(FORK_BLOCK_HASH)); + assert_eq!(a.block_number, b.block_number); + assert_eq!(a.block_hash, b.block_hash); + assert_eq!(a.transaction_hash, b.transaction_hash); + assert_eq!(a.from_address, b.from_address); + assert_eq!(a.keys, b.keys); + assert_eq!(a.data, b.data); + } + + Ok(()) +} + +#[tokio::test] +#[rstest::rstest] +#[case(BlockIdOrTag::Number(FORK_BLOCK_NUMBER))] +#[case(BlockIdOrTag::Hash(felt!("0x208950cfcbba73ecbda1c14e4d58d66a8d60655ea1b9dcf07c16014ae8a93cd")))] +async fn get_events_all_from_forked(#[case] block_id: BlockIdOrTag) -> Result<()> { + let (_sequencer, provider, _) = setup_test().await; + let forked_provider = JsonRpcClient::new(HttpTransport::new(Url::parse(SEPOLIA_URL)?)); + + // ----------------------------------------------------------------------- + // Fetch events from the forked block (ie `FORK_BLOCK_NUMBER`) only. + // + // Based on https://sepolia.voyager.online/block/0x208950cfcbba73ecbda1c14e4d58d66a8d60655ea1b9dcf07c16014ae8a93cd, there are only 89 events in the `FORK_BLOCK_NUMBER` block. + // So we set the chunk size to 100 to ensure we get all the events in one request. + + let filter = EventFilter { + keys: None, + address: None, + to_block: Some(block_id), + from_block: Some(block_id), + }; + + // events fetched directly from the forked chain. + let result = forked_provider.get_events(filter.clone(), None, 100).await?; + let events = result.events; + + // events fetched through the forked katana. + let result = provider.get_events(filter, None, 100).await?; + let forked_events = result.events; + + assert!(result.continuation_token.is_none()); + + for (a, b) in events.iter().zip(forked_events) { + assert_eq!(a.block_number, Some(FORK_BLOCK_NUMBER)); + assert_eq!(a.block_hash, Some(FORK_BLOCK_HASH)); + assert_eq!(a.block_number, b.block_number); + assert_eq!(a.block_hash, b.block_hash); + assert_eq!(a.transaction_hash, b.transaction_hash); + assert_eq!(a.from_address, b.from_address); + assert_eq!(a.keys, b.keys); + assert_eq!(a.data, b.data); + } + + Ok(()) +} + +#[tokio::test] +async fn get_events_local() -> Result<()> { + let (_sequencer, provider, local_only_data) = setup_test().await; + + // ----------------------------------------------------------------------- + // Get events from the local chain block. + + let filter = EventFilter { + keys: None, + address: None, + to_block: None, + from_block: Some(BlockIdOrTag::Number(FORK_BLOCK_NUMBER + 1)), + }; + + let result = provider.get_events(filter, None, 10).await?; + let forked_events = result.events; + + // compare the events + + for (event, (block, tx)) in forked_events.iter().zip(local_only_data.iter()) { + let (block_number, block_hash) = block; + + assert_eq!(event.transaction_hash, *tx); + assert_eq!(event.block_hash, Some(*block_hash)); + assert_eq!(event.block_number, Some(*block_number)); + } + + Ok(()) +} + +#[tokio::test] +async fn get_events_pending_exhaustive() -> Result<()> { + let (_sequencer, provider, local_only_data) = setup_test_pending().await; + + // ----------------------------------------------------------------------- + // Get events from the local chain pending block. + + let filter = EventFilter { + keys: None, + address: None, + to_block: Some(BlockIdOrTag::Tag(BlockTag::Pending)), + from_block: Some(BlockIdOrTag::Tag(BlockTag::Pending)), + }; + + let result = provider.get_events(filter, None, 10).await?; + let events = result.events; + + // This is expected behaviour, as the pending block is not yet closed. + // so there may still more events to come. + assert!(result.continuation_token.is_some()); + + for (event, (_, tx)) in events.iter().zip(local_only_data.iter()) { + assert_eq!(event.transaction_hash, *tx); + // pending events should not have block number and block hash. + assert_eq!(event.block_hash, None); + assert_eq!(event.block_number, None); + } + + Ok(()) +} + +#[tokio::test] +#[rstest::rstest] +#[case(BlockIdOrTag::Number(FORK_BLOCK_NUMBER))] +#[case(BlockIdOrTag::Hash(felt!("0x208950cfcbba73ecbda1c14e4d58d66a8d60655ea1b9dcf07c16014ae8a93cd")))] // FORK_BLOCK_NUMBER hash +async fn get_events_forked_and_local_boundary_exhaustive( + #[case] block_id: BlockIdOrTag, +) -> Result<()> { + let (_sequencer, provider, local_only_data) = setup_test().await; + let forked_provider = JsonRpcClient::new(HttpTransport::new(Url::parse(SEPOLIA_URL)?)); + + // ----------------------------------------------------------------------- + // Get events from that cross the boundaries between forked and local chain block. + // + // Total events in `FORK_BLOCK_NUMBER` block is 89. While `FORK_BLOCK_NUMBER` + 1 is 1 ∴ 89 + 1 + // = 90 events. + + let filter = EventFilter { + keys: None, + address: None, + to_block: Some(block_id), + from_block: Some(block_id), + }; + + // events fetched directly from the forked chain. + let result = forked_provider.get_events(filter.clone(), None, 100).await?; + let events = result.events; + + let filter = EventFilter { + keys: None, + address: None, + to_block: Some(BlockIdOrTag::Tag(BlockTag::Latest)), + from_block: Some(block_id), + }; + + let result = provider.get_events(filter, None, 100).await?; + let boundary_events = result.events; + + // because we're pointing to latest block, we should not have anymore continuation token. + assert!(result.continuation_token.is_none()); + + let forked_events = &boundary_events[..89]; + let local_events = &boundary_events[89..]; + + similar_asserts::assert_eq!(forked_events, events); + + for (event, (block, tx)) in local_events.iter().zip(local_only_data.iter()) { + let (block_number, block_hash) = block; + + assert_eq!(event.transaction_hash, *tx); + assert_eq!(event.block_hash, Some(*block_hash)); + assert_eq!(event.block_number, Some(*block_number)); + } + + Ok(()) +} + +#[tokio::test] +#[rstest::rstest] +#[case(BlockIdOrTag::Number(FORK_BLOCK_NUMBER - 1))] +#[case(BlockIdOrTag::Hash(felt!("0x4a6a79bfefceb03af4f78758785b0c40ddf9f757e9a8f72f01ecb0aad11e298")))] // FORK_BLOCK_NUMBER - 1 hash +async fn get_events_forked_and_local_boundary_non_exhaustive( + #[case] block_id: BlockIdOrTag, +) -> Result<()> { + let (_sequencer, provider, _) = setup_test().await; + let forked_provider = JsonRpcClient::new(HttpTransport::new(Url::parse(SEPOLIA_URL)?)); + + // ----------------------------------------------------------------------- + // Get events that cross the boundaries between forked and local chain block, but + // not all events from the forked range is fetched. + + let filter = EventFilter { + keys: None, + address: None, + to_block: Some(block_id), + from_block: Some(block_id), + }; + + // events fetched directly from the forked chain. + let result = forked_provider.get_events(filter.clone(), None, 50).await?; + let forked_events = result.events; + + let filter = EventFilter { + keys: None, + address: None, + to_block: Some(BlockIdOrTag::Tag(BlockTag::Pending)), + from_block: Some(block_id), + }; + + let result = provider.get_events(filter, None, 50).await?; + let katana_events = result.events; + + let token = MaybeForkedContinuationToken::parse(&result.continuation_token.unwrap())?; + assert_matches!(token, MaybeForkedContinuationToken::Forked(_)); + similar_asserts::assert_eq!(katana_events, forked_events); + + Ok(()) +} + +#[tokio::test] +#[rstest::rstest] +#[case::doesnt_exist_at_all(felt!("0x123"))] +#[case::after_forked_block_but_on_the_forked_chain(felt!("0x21f4c20f9cc721dbaee2eaf44c79342b37c60f55ac37c13a4bdd6785ac2a5e5"))] +async fn get_events_with_invalid_block_hash(#[case] hash: BlockHash) -> Result<()> { + let (_sequencer, provider, _) = setup_test().await; + + let filter = EventFilter { + keys: None, + address: None, + to_block: Some(BlockIdOrTag::Hash(hash)), + from_block: Some(BlockIdOrTag::Hash(hash)), + }; + + let result = provider.get_events(filter.clone(), None, 5).await.unwrap_err(); + assert_provider_starknet_err!(result, StarknetError::BlockNotFound); + + Ok(()) +}