From 8333cc9021f54bab1b941712fce5e2fdae60787a Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Thu, 20 Jun 2024 13:35:28 -0600 Subject: [PATCH] feat: adds migration logic from IPFS (#326) The migration logic is exposed via the CLI. --- Cargo.lock | 43 +- one/Cargo.toml | 15 +- one/src/cbor_value.rs | 285 ------------- one/src/ethereum_rpc.rs | 175 -------- one/src/events.rs | 586 -------------------------- one/src/lib.rs | 32 +- one/src/migrations.rs | 155 +++++++ service/Cargo.toml | 6 +- service/src/event/migration.rs | 337 +++++++++++++++ service/src/event/mod.rs | 3 +- service/src/event/service.rs | 37 +- service/src/lib.rs | 2 +- service/src/tests/migration.rs | 310 ++++++++++++++ service/src/tests/mod.rs | 1 + store/src/lib.rs | 6 +- store/src/sql/entities/event.rs | 7 +- store/src/sql/entities/event_block.rs | 4 + store/src/sql/entities/hash.rs | 13 +- 18 files changed, 891 insertions(+), 1126 deletions(-) delete mode 100644 one/src/cbor_value.rs delete mode 100644 one/src/ethereum_rpc.rs delete mode 100644 one/src/events.rs create mode 100644 one/src/migrations.rs create mode 100644 service/src/event/migration.rs create mode 100644 service/src/tests/migration.rs diff --git a/Cargo.lock b/Cargo.lock index 84d2c8be6..3df66fb10 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1233,43 +1233,38 @@ name = "ceramic-one" version = "0.24.0" dependencies = [ "anyhow", + "async-stream", + "async-trait", "ceramic-api", "ceramic-api-server", "ceramic-core", + "ceramic-event", "ceramic-kubo-rpc", "ceramic-kubo-rpc-server", "ceramic-metrics", "ceramic-p2p", "ceramic-service", "ceramic-store", - "chrono", "cid 0.11.1", "clap 4.5.4", - "dirs", - "enum-as-inner", "expect-test", "futures", "git-version", - "glob", - "hex", "home", "hyper", "iroh-bitswap", "iroh-rpc-client", "iroh-rpc-types", "libp2p", - "minicbor", "multiaddr", "multibase 0.9.1", "multihash 0.19.1", "multihash-codetable", "multihash-derive 0.9.0", "names", - "ordered-float", "prometheus-client", "recon", - "reqwest", - "serde_json", + "serde_ipld_dagcbor", "signal-hook", "signal-hook-tokio", "swagger", @@ -1333,11 +1328,14 @@ dependencies = [ "ceramic-store", "cid 0.11.1", "expect-test", + "futures", "hex", "ipld-core", "iroh-bitswap", + "iroh-car", "multibase 0.9.1", "multihash-codetable", + "multihash-derive 0.9.0", "paste", "rand 0.8.5", "recon", @@ -2225,15 +2223,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "dirs" -version = "5.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225" -dependencies = [ - "dirs-sys", -] - [[package]] name = "dirs-next" version = "2.0.0" @@ -2244,18 +2233,6 @@ dependencies = [ "dirs-sys-next", ] -[[package]] -name = "dirs-sys" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" -dependencies = [ - "libc", - "option-ext", - "redox_users", - "windows-sys 0.48.0", -] - [[package]] name = "dirs-sys-next" version = "0.1.2" @@ -5738,12 +5715,6 @@ dependencies = [ "tokio-stream", ] -[[package]] -name = "option-ext" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" - [[package]] name = "ordered-float" version = "4.2.0" diff --git a/one/Cargo.toml b/one/Cargo.toml index c64df005d..4dcf899c7 100644 --- a/one/Cargo.toml +++ b/one/Cargo.toml @@ -10,42 +10,37 @@ publish = false [dependencies] anyhow.workspace = true +async-stream.workspace = true +async-trait.workspace = true ceramic-api-server.workspace = true ceramic-api.workspace = true ceramic-core.workspace = true +ceramic-event.workspace = true ceramic-kubo-rpc = { path = "../kubo-rpc", features = ["http"] } ceramic-kubo-rpc-server.workspace = true ceramic-metrics.workspace = true ceramic-p2p.workspace = true ceramic-service.workspace = true ceramic-store.workspace = true -chrono.workspace = true cid.workspace = true clap.workspace = true -dirs = "5.0.1" -enum-as-inner = "0.6.0" futures.workspace = true git-version = "0.3" -glob = "0.3.1" -hex.workspace = true home = "0.5" hyper.workspace = true iroh-bitswap.workspace = true iroh-rpc-client.workspace = true iroh-rpc-types.workspace = true libp2p.workspace = true -minicbor.workspace = true multiaddr.workspace = true multibase.workspace = true -multihash.workspace = true multihash-codetable.workspace = true multihash-derive.workspace = true +multihash.workspace = true names.workspace = true -ordered-float = "4.1.1" prometheus-client.workspace = true recon.workspace = true -reqwest.workspace = true -serde_json.workspace = true +serde_ipld_dagcbor.workspace = true signal-hook = "0.3.17" signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] } swagger.workspace = true diff --git a/one/src/cbor_value.rs b/one/src/cbor_value.rs deleted file mode 100644 index 11321de72..000000000 --- a/one/src/cbor_value.rs +++ /dev/null @@ -1,285 +0,0 @@ -use anyhow::{anyhow, Error, Result}; -use cid::Cid; -use enum_as_inner::EnumAsInner; -use minicbor::{data::Tag, data::Type, Decoder}; -use ordered_float::OrderedFloat; -use std::{collections::BTreeMap, ops::Index}; -use tracing::debug; - -/// CborValue -/// a enum for working with dynamic values from decoding CBOR values. -#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug, EnumAsInner)] -pub enum CborValue { - // not clear if Indef should be treated distinctly from the finite versions - /// Simple Values (20 false | 21 true) - /// https://datatracker.ietf.org/doc/html/rfc8949#fpnoconttbl2 - Bool(bool), - /// Simple Values (22 null) - /// https://datatracker.ietf.org/doc/html/rfc8949#fpnoconttbl2 - Null, - /// Simple Values (23 undefined) - /// https://datatracker.ietf.org/doc/html/rfc8949#fpnoconttbl2 - Undefined, - /// Major type 0: An unsigned integer in the range 0..(2**64)-1 inclusive. - /// https://datatracker.ietf.org/doc/html/rfc8949#section-3.1 - U64(u64), - /// Major type 1: A negative integer in the range -2**64..-1 inclusive. - /// https://datatracker.ietf.org/doc/html/rfc8949#section-3.1 - /// note: we store Major type 1: A negative integer less then -2**63 in a i128 - I64(i64), - /// Major type 1: A negative integer in the range -2**64..-1 inclusive. - /// https://datatracker.ietf.org/doc/html/rfc8949#section-3.1 - /// note: we store Major type 1: A negative integer grater then -2**63 in a i64 - Int(i128), // Cbor ints range from [ -(2^64) , 2^64-1 ] to fit this i65 we use i128 - /// Major type 7: - /// Floating-point numbers and simple values, as well as the "break" stop code. - /// https://datatracker.ietf.org/doc/html/rfc8949#fpnocont - /// note: we store Simple values and breaks separately - F64(OrderedFloat), // this makes it possible to put CborValue in BTreeMap - /// Simple Values (20 false | 21 true) - /// https://datatracker.ietf.org/doc/html/rfc8949#fpnoconttbl2 - /// 24..31 (reserved) - /// 32..255 (unassigned) - Simple(u8), - /// Major type 2: A byte string. - /// https://datatracker.ietf.org/doc/html/rfc8949#section-3.1 - Bytes(Vec), - /// Major type 3: A text string - /// https://datatracker.ietf.org/doc/html/rfc8949#section-3.1 - String(String), - /// Major type 4: An array of data items. - /// https://datatracker.ietf.org/doc/html/rfc8949#section-3.1 - Array(Vec), - /// Major type 5: A map of pairs of data items. - /// https://datatracker.ietf.org/doc/html/rfc8949#section-3.1 - Map(BTreeMap), - /// Major type 6: - /// A tagged data item ("tag") whose tag number, an integer in the range 0..(2**64)-1 inclusive - /// https://datatracker.ietf.org/doc/html/rfc8949#tags - /// https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml - /// tag(42) = Cid - Tag((Tag, Box)), - /// Major type 7: 0xff Break signal - /// https://datatracker.ietf.org/doc/html/rfc8949#break - Break, // 0xff - /// Used to represent a byte that was not valid for its location. - /// You should not see this from a well formed CBOR value. - Unknown(u8), -} - -impl CborValue { - /// Parse bytes to a single CborValue - /// This will error on CBOR seq. - pub fn parse(bytes: &[u8]) -> Result { - let mut decoder = Decoder::new(bytes); - let c = CborValue::next(&mut decoder); - if decoder.position() != bytes.len() { - debug!("decoder not at end {}/{}", decoder.position(), bytes.len()); - return Err(anyhow!("CborValue decode error not at end")); - } - c - } - fn next(decoder: &mut Decoder) -> Result { - match decoder.datatype() { - Ok(Type::Bool) => Ok(CborValue::Bool(decoder.bool()?)), - Ok(Type::Null) => { - decoder.null()?; - Ok(CborValue::Null) - } - Ok(Type::Undefined) => { - decoder.undefined()?; - Ok(CborValue::Undefined) - } - Ok(Type::U8) => Ok(CborValue::U64(decoder.u8()?.into())), - Ok(Type::U16) => Ok(CborValue::U64(decoder.u16()?.into())), - Ok(Type::U32) => Ok(CborValue::U64(decoder.u32()?.into())), - Ok(Type::U64) => Ok(CborValue::U64(decoder.u64()?)), - Ok(Type::I8) => Ok(CborValue::I64(decoder.i8()?.into())), - Ok(Type::I16) => Ok(CborValue::I64(decoder.i16()?.into())), - Ok(Type::I32) => Ok(CborValue::I64(decoder.i32()?.into())), - Ok(Type::I64) => Ok(CborValue::I64(decoder.i64()?)), - Ok(Type::Int) => Ok(CborValue::Int(decoder.int()?.into())), - Ok(Type::F16) => Ok(CborValue::F64(OrderedFloat(decoder.f16()?.into()))), - Ok(Type::F32) => Ok(CborValue::F64(OrderedFloat(decoder.f32()?.into()))), - Ok(Type::F64) => Ok(CborValue::F64(OrderedFloat(decoder.f64()?))), - Ok(Type::Simple) => Ok(CborValue::Simple(decoder.simple()?)), - Ok(Type::Bytes) => Ok(CborValue::Bytes(decoder.bytes()?.to_vec())), - Ok(Type::BytesIndef) => Ok(CborValue::Undefined), // TODO: support Type::BytesIndef - Ok(Type::String) => Ok(CborValue::String(decoder.str()?.to_string())), - Ok(Type::StringIndef) => Ok(CborValue::Undefined), // TODO: support Type::StringIndef - Ok(Type::Array) => { - let mut array = Vec::new(); - for _ in 0..decoder.array().unwrap().unwrap() { - let value = CborValue::next(decoder)?; - array.push(value); - } - Ok(CborValue::Array(array)) - } - Ok(Type::ArrayIndef) => { - let mut array = Vec::new(); - loop { - let value = CborValue::next(decoder)?; - if let CborValue::Break = value { - break; - } - array.push(value) - } - Ok(CborValue::Array(array)) - } - Ok(Type::Map) => { - let mut map = BTreeMap::new(); - for _ in 0..decoder.map().unwrap().unwrap() { - let key = CborValue::next(decoder)?; - let value = CborValue::next(decoder)?; - map.insert(key, value); - } - Ok(CborValue::Map(map)) - } - Ok(Type::MapIndef) => Ok(CborValue::Undefined), // TODO: support Type::MapIndef - Ok(Type::Tag) => { - let tag: Tag = decoder.tag()?; - let value = CborValue::next(decoder)?; - Ok(CborValue::Tag((tag, Box::new(value)))) - } - Ok(Type::Break) => Ok(CborValue::Break), - Ok(Type::Unknown(additional)) => Ok(CborValue::Unknown(additional)), - Err(e) => Err(e.into()), - } - } - - /// Accesses a deeply nested CborValue when the path keys are all strings. - pub fn path(&self, parts: &[&str]) -> CborValue { - match parts.split_first() { - None => self.clone(), // if there are no parts this is thing at the path. - Some((first, rest)) => { - let CborValue::Map(map) = &self else { - return CborValue::Undefined; // if self is not a map there is no thing at the path. - }; - let Some(next) = map.get(&first.to_string().into()) else { - return CborValue::Undefined; // if the next part is not in the map there is no thing at the path. - }; - next.path(rest) - } - } - } - - /// Accesses the underlying CborValue for a specified tag type. - pub fn untag(&self, tag: u64) -> Option<&CborValue> { - if let CborValue::Tag((Tag::Unassigned(inner_tag), boxed_value)) = &self { - if inner_tag == &tag { - return Some(boxed_value); - } - } - None - } - - /// get a printable string of the type_name - pub fn type_name(&self) -> &str { - match self { - CborValue::Bool(_) => "bool", - CborValue::Null => "null", - CborValue::Undefined => "undefined", - CborValue::U64(_) => "u64", - CborValue::I64(_) => "i64", - CborValue::Int(_) => "int", - CborValue::F64(_) => "f64", - CborValue::Simple(_) => "simple", - CborValue::Bytes(_) => "bytes", - CborValue::String(_) => "string", - CborValue::Array(_) => "array", - CborValue::Map(_) => "map", - CborValue::Tag(_) => "tag", - CborValue::Break => "break", - CborValue::Unknown(_) => "unknown", - } - } - - /// If the CborValue is a CborValue::Array get the CborValue at index. - /// If it is not a CborValue::Array or index is out of bounds returns None. - pub fn get_index(&self, index: usize) -> Option<&CborValue> { - self.as_array()?.get(index) - } - - /// If the CborValue is a CborValue::Map get the CborValue at index. - /// If it is not a CborValue::Map or key is missing returns None. - pub fn get_key(&self, key: &str) -> Option<&CborValue> { - self.as_map()?.get(&CborValue::String(key.to_owned())) - } -} - -impl From<&[u8]> for CborValue { - fn from(bytes: &[u8]) -> Self { - CborValue::Bytes(bytes.to_vec()) - } -} - -impl TryInto> for CborValue { - type Error = Error; - fn try_into(self) -> Result, Self::Error> { - match self { - CborValue::Bytes(bytes) => Ok(bytes.to_vec()), - CborValue::String(string) => Ok(string.as_bytes().to_vec()), - _ => Err(anyhow!("{} not Bytes or String", self.type_name())), - } - } -} - -impl From<&str> for CborValue { - fn from(string: &str) -> Self { - CborValue::String(string.to_string()) - } -} - -impl From for CborValue { - fn from(string: String) -> Self { - CborValue::String(string) - } -} - -impl TryInto for CborValue { - type Error = Error; - fn try_into(self) -> Result { - if let Some(CborValue::Bytes(cid_bytes)) = self.untag(42) { - Cid::try_from(&cid_bytes[1..]).map_err(|e| e.into()) - } else { - Err(anyhow!("{} not a CID", self.type_name())) - } - } -} - -impl TryInto for CborValue { - type Error = Error; - fn try_into(self) -> Result { - if let CborValue::String(s) = self { - Ok(s) - } else { - Err(anyhow!("{} not a String", self.type_name())) - } - } -} - -impl Index for CborValue { - type Output = Self; - - /// Returns a reference to the value corresponding to the supplied key. - /// - /// # Panics - /// - /// Panics if the key is not present in the `Vec`. - fn index(&self, index: usize) -> &Self { - &self.as_array().unwrap()[index] - } -} - -impl Index<&CborValue> for CborValue { - type Output = Self; - - /// Returns a reference to the value corresponding to the supplied key. - /// - /// # Panics - /// - /// Panics if the key is not present in the `BTreeMap`. - fn index(&self, key: &CborValue) -> &Self { - &self.as_map().unwrap()[key] - } -} diff --git a/one/src/ethereum_rpc.rs b/one/src/ethereum_rpc.rs deleted file mode 100644 index a6a780cb1..000000000 --- a/one/src/ethereum_rpc.rs +++ /dev/null @@ -1,175 +0,0 @@ -use anyhow::{anyhow, Context, Result}; -use cid::{multihash, Cid}; -use multihash::Multihash; -use tracing::debug; - -pub(crate) struct RootTime { - pub root: Cid, - pub block_hash: String, - pub timestamp: i64, -} - -pub trait EthRpc { - async fn root_time_by_transaction_cid(&self, cid: Cid) -> Result; -} - -pub(crate) struct HttpEthRpc { - url: String, - client: reqwest::Client, -} - -impl HttpEthRpc { - pub fn new(url: String) -> Self { - Self { - url, - client: reqwest::Client::new(), - } - } - - async fn eth_block_number(&self) -> Result { - // Get the latest block number. - // curl https://mainnet.infura.io/v3/{api_token} \ - // -X POST \ - // -H "Content-Type: application/json" \ - // -d '{"jsonrpc":"2.0","method":"eth_blockNumber","params": [],"id":1}' - // >> {"jsonrpc": "2.0", "id": 1, "result": "0x127cc18"} - Ok(self - .client - .post(&self.url) - .json(&serde_json::json!({ - "jsonrpc": "2.0", - "method": "eth_blockNumber", - "params": [], - "id": 1, - })) - .send() - .await? - .json() - .await?) - } - - async fn eth_block_by_hash(&self, block_hash: &str) -> Result { - // curl https://mainnet.infura.io/v3/{api_token} \ - // -X POST \ - // -H "Content-Type: application/json" \ - // -d '{"jsonrpc":"2.0","method":"eth_getBlockByHash","params": ["0x{block_hash}",false],"id":1}' - // >> {"jsonrpc": "2.0", "id": 1, "result": {"number": "0x105f34f", "timestamp": "0x644fe98b"}} - Ok(self - .client - .post(&self.url) - .json(&serde_json::json!({ - "jsonrpc": "2.0", - "method": "eth_getBlockByHash", - "params": [block_hash, false], - "id": 1, - })) - .send() - .await? - .json() - .await?) - } - - async fn eth_transaction_by_hash(&self, transaction_hash: &str) -> Result { - // Get the block_hash and input from the transaction. - // curl https://mainnet.infura.io/v3/{api_token} \ - // -X POST \ - // -H "Content-Type: application/json" \ - // -d '{"jsonrpc":"2.0","method":"eth_getTransactionByHash", - // "params":["0xBF7BC715A09DEA3177866AC4FC294AC9800EE2B49E09C55F56078579BFBBF158"],"id":1}' - // >> {"jsonrpc":"2.0", "id":1, "result": { - // "blockHash": "0x783cd5a6febe13d08ac0d59fa7e666483d5e476542b29688a6f0bec3d15febd4", - // "blockNumber": "0x105f34f", - // "input": "0x97ad09eb41b6408c1b4be5016f652396ef47c0982c36d5877ebb874919bae3a9b854d8e1" - // }} - Ok(self - .client - .post(&self.url) - .json(&serde_json::json!({ - "jsonrpc": "2.0", - "method": "eth_getTransactionByHash", - "params": [transaction_hash], - "id": 1, - })) - .send() - .await? - .json() - .await?) - } -} - -impl EthRpc for HttpEthRpc { - async fn root_time_by_transaction_cid(&self, cid: Cid) -> Result { - // transaction to blockHash, blockNumber, input - let tx_hash = format!("0x{}", hex::encode(cid.hash().digest())); - let json: serde_json::Value = self.eth_transaction_by_hash(&tx_hash).await?; - debug!("txByHash response: {}", json); - let result = json.get("result").context("missing field result")?; - let block_hash = result - .get("blockHash") - .context("missing field result.blockHash")?; - let block_hash = block_hash - .as_str() - .context("missing field result.blockHash was not a string")?; - let block_number = result - .get("blockNumber") - .context("missing field result.blockNumber")?; - let block_number = block_number - .as_str() - .context("missing field result.blockNumber was not a string")?; - let block_number = get_timestamp_from_hex_string(block_number) - .context("missing field result.blockNumber was not a hex timestamp")?; - let input = result.get("input").context("missing field result.input")?; - let input = input - .as_str() - .context("missing field result.input was not a string")?; - let root = get_root_from_input(input)?; - - // Get the block with block_hash if latest block number minus result.number grater then 3 the block is stable. - let Some(Ok(latest_block_number)) = self.eth_block_number().await?["result"] - .as_str() - .map(get_timestamp_from_hex_string) - else { - return Err(anyhow!("latest_block_number not found")); - }; - if latest_block_number - block_number < 3 { - return Err(anyhow!("latest_block_number - block_number < 3")); - } - - // Get the block time. - let json: serde_json::Value = self.eth_block_by_hash(block_hash).await?; - debug!("blockByHash response: {}", json); - - let Some(timestamp) = json["result"]["timestamp"].as_str() else { - return Err(anyhow!( - "missing field result.timestamp or was not a string" - )); - }; - let timestamp = get_timestamp_from_hex_string(timestamp)?; - Ok(RootTime { - root, - block_hash: block_hash.to_string(), - timestamp, - }) - } -} - -fn get_root_from_input(input: &str) -> Result { - if let Some(input) = input.strip_prefix("0x97ad09eb") { - // Strip "0x97ad09eb" from the input and convert it into a cidv1 - dag-cbor - (sha2-256 : 256) - // 0x12 -> sha2-256 - // 0x20 -> 256 bits of hash - let root_bytes = [vec![0x12_u8, 0x20], hex::decode(input)?.to_vec()].concat(); - Ok(Cid::new_v1(0x71, Multihash::from_bytes(&root_bytes)?)) - } else { - Err(anyhow!("input is not anchor-cbor")) - } -} - -fn get_timestamp_from_hex_string(ts: &str) -> Result { - // Strip "0x" from the timestamp and convert it to a u64 - if let Some(ts) = ts.strip_prefix("0x") { - Ok(i64::from_str_radix(ts, 16)?) - } else { - Err(anyhow!("timestamp is not valid hex")) - } -} diff --git a/one/src/events.rs b/one/src/events.rs deleted file mode 100644 index f0cd44d44..000000000 --- a/one/src/events.rs +++ /dev/null @@ -1,586 +0,0 @@ -use crate::ethereum_rpc::{EthRpc, HttpEthRpc, RootTime}; -use crate::CborValue; -use anyhow::{anyhow, bail, Context, Result}; -use ceramic_core::{ssi, Base64UrlString, DidDocument, Jwk}; -use ceramic_metrics::init_local_tracing; -use ceramic_service::CeramicEventService; -use ceramic_store::{Migrations, SqlitePool, SqliteRootStore}; -use chrono::{SecondsFormat, TimeZone, Utc}; -use cid::{multibase, multihash, Cid}; -use clap::{Args, Subcommand}; -use glob::{glob, Paths}; -use iroh_bitswap::Store; -use multihash::Multihash; -use std::{fs, path::PathBuf, str::FromStr}; -use tracing::{debug, info, warn}; - -#[derive(Subcommand, Debug)] -pub enum EventsCommand { - /// Slurp events into the local event database. - Slurp(SlurpOpts), - Validate(ValidateOpts), -} - -#[derive(Args, Debug)] -pub struct SlurpOpts { - /// The path to the ipfs_repo [eg: ~/.ipfs/blocks] - #[clap(long, short, value_parser)] - input_ipfs_path: Option, - - /// The path to the input_ceramic_db [eg: ~/.ceramic-one/db.sqlite3] - #[clap(long, short = 'c', value_parser)] - input_ceramic_db: Option, - - /// The path to the output_ceramic_db [eg: ~/.ceramic-one/db.sqlite3] - #[clap(long, short, value_parser)] - output_ceramic_path: Option, -} - -#[derive(Args, Debug)] -pub struct ValidateOpts { - /// Path to storage directory - #[arg(short, long, env = "CERAMIC_ONE_STORE_DIR")] - store_dir: Option, - - /// CID of the block to validate - #[arg(short, long)] - cid: Cid, - - /// Ethereum RPC URL, e.g. ETHEREUM_RPC_URL=https://mainnet.infura.io/v3/ - #[arg(short, long, env = "ETHEREUM_RPC_URL")] - ethereum_rpc_url: String, -} - -pub async fn events(cmd: EventsCommand) -> Result<()> { - if let Err(e) = init_local_tracing() { - eprintln!("Failed to initialize tracing: {}", e); - } - match cmd { - EventsCommand::Slurp(opts) => slurp(opts).await, - EventsCommand::Validate(opts) => validate(opts).await, - } -} - -async fn slurp(opts: SlurpOpts) -> Result<()> { - let home: PathBuf = dirs::home_dir().unwrap_or("/data/".into()); - let default_output_ceramic_path: PathBuf = home.join(".ceramic-one/db.sqlite3"); - - let output_ceramic_path = opts - .output_ceramic_path - .unwrap_or(default_output_ceramic_path); - let output_ceramic_path = output_ceramic_path.display().to_string(); - info!( - "{} Opening output ceramic SQLite DB at: {}", - Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), - output_ceramic_path - ); - - let pool = SqlitePool::connect(&output_ceramic_path, Migrations::Apply) - .await - .context("Failed to connect to database")?; - let block_store = CeramicEventService::new(pool).await.unwrap(); - - if let Some(input_ceramic_db) = opts.input_ceramic_db { - migrate_from_database(input_ceramic_db, &block_store).await?; - } - if let Some(input_ipfs_path) = opts.input_ipfs_path { - migrate_from_filesystem(input_ipfs_path, &block_store).await?; - } - Ok(()) -} - -fn get_store_dir(opts: &ValidateOpts) -> Result { - let home: PathBuf = dirs::home_dir().unwrap_or("/data/".into()); - let store_dir = opts - .store_dir - .as_ref() - .cloned() - .unwrap_or(home.join(".ceramic-one/")); - info!( - "{} Opening ceramic SQLite DB at: {}", - Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), - store_dir.display() - ); - let path = store_dir.join("db.sqlite3").display().to_string(); - Ok(path) -} - -async fn validate(opts: ValidateOpts) -> Result<()> { - let path = get_store_dir(&opts)?; - let pool = SqlitePool::connect(&path, Migrations::Apply) - .await - .context("Failed to connect to database")?; - let block_store = CeramicEventService::new(pool.clone()) - .await - .with_context(|| "Failed to create block store")?; - let root_store = SqliteRootStore::new(pool) - .await - .with_context(|| "Failed to create root store")?; - - // Validate that the CID is either DAG-CBOR or DAG-JOSE - if (opts.cid.codec() != 0x71) && (opts.cid.codec() != 0x85) { - bail!("CID {} is not a valid Ceramic event", opts.cid); - } - - // If the CID is a DAG-JOSE, the event is either a signed Data Event or a signed Init Event, both of which can be - // validated similarly. - if opts.cid.codec() == 0x85 { - let controller = validate_data_event_envelope(&opts.cid, &block_store).await; - match controller { - Ok(controller) => { - info!( - "DataEvent({}) validated as authored by Controller({})", - opts.cid, controller - ) - } - Err(e) => warn!("{} failed with error: {:?}", opts.cid, e), - } - return Ok(()); - } - // If the CID is a DAG-CBOR, the event is either a Time Event or an unsigned Init Event. In this case, we'll pull - // the block from the store and use the presence of the "proof" field to determine whether this is a Time Event or - // an unsigned Init Event. - // - // When validating events from Recon, the block store will be a CARFileBlockStore that wraps the CAR file received - // over Recon. - let block = block_store.get(&opts.cid).await?; - let event = CborValue::parse(&block.data)?; - if event.get_key("proof").is_some() { - let timestamp = validate_time_event( - &opts.cid, - &block_store, - &root_store, - &HttpEthRpc::new(opts.ethereum_rpc_url), - ) - .await; - match timestamp { - Ok(timestamp) => { - let rfc3339 = Utc - .timestamp_opt(timestamp, 0) - .unwrap() - .to_rfc3339_opts(SecondsFormat::Secs, true); - info!( - "TimeEvent({}) validated at Timestamp({}) = {}", - opts.cid, timestamp, rfc3339 - ) - } - Err(e) => warn!("{} failed with error: {:?}", opts.cid, e), - } - } else { - if event.get_key("data").is_some() { - warn!("UnsignedDataEvent({}) is not signed", opts.cid); - } - validate_data_event_payload(&opts.cid, &block_store, None).await?; - } - - Ok(()) -} - -async fn validate_data_event_envelope( - cid: &Cid, - block_store: &CeramicEventService, -) -> Result { - let block = block_store.get(cid).await?; - let envelope = CborValue::parse(&block.data)?; - let signatures_0 = envelope - .get_key("signatures") - .context("Not an envelope")? - .get_index(0) - .context("Not an envelope")?; - let protected = signatures_0 - .get_key("protected") - .context("Not an envelope")? - .as_bytes() - .context("Not an envelope")? - .as_slice(); - // Deserialize protected as JSON - let protected_json: serde_json::Value = serde_json::from_slice(protected)?; - let controller = protected_json.get("kid").unwrap().as_str().unwrap(); - let protected: Base64UrlString = protected.into(); - let signature: Base64UrlString = signatures_0 - .get_key("signature") - .context("Not an envelope")? - .as_bytes() - .context("Not an envelope")? - .as_slice() - .into(); - let payload: Base64UrlString = envelope - .get_key("payload") - .context("Not an envelope")? - .as_bytes() - .context("Not an envelope")? - .as_slice() - .into(); - let compact = format!("{}.{}.{}", protected, payload, signature); - let did = DidDocument::new(controller); - let jwk = Jwk::new(&did).await.unwrap(); - ssi::jws::decode_verify(&compact, &jwk)?; - validate_data_event_payload(cid, block_store, Some(did.id.clone())).await?; - Ok(did.id) -} - -// TODO: Validate the Data Event payload structure -// TODO: Validate CACAO -async fn validate_data_event_payload( - _payload_cid: &Cid, - _block_store: &CeramicEventService, - _controller: Option, -) -> Result<()> { - Ok(()) -} - -// To validate a Time Event, we need to prove: -// - TimeEvent/prev == TimeEvent/proof/root/${TimeEvent/path} -// - TimeEvent/proof/root is in the Root Store -// -// - If root not in local root store try to read it from tx_hash. -// - Validated time is the time from the Root Store -async fn validate_time_event( - cid: &Cid, - block_store: &CeramicEventService, - root_store: &SqliteRootStore, - eth_rpc: &impl EthRpc, -) -> Result { - let block = block_store.get(cid).await?; - let time_event = CborValue::parse(&block.data)?; - // Destructure the proof to get the tag and the value - let proof_cid: Cid = time_event.path(&["proof"]).try_into()?; - let prev: Cid = time_event.path(&["prev"]).try_into()?; - let proof_block = block_store.get(&proof_cid).await?; - let proof_cbor = CborValue::parse(&proof_block.data)?; - let proof_root: Cid = proof_cbor.path(&["root"]).try_into()?; - let path: String = time_event.path(&["path"]).try_into()?; - let tx_hash_cid: Cid = proof_cbor.path(&["txHash"]).try_into()?; - - // If prev not in root then TimeEvent is not valid. - if !prev_in_root(prev, proof_root, path, block_store).await? { - return Err(anyhow!("prev {} not in root {}", prev, proof_root)); - } - - // if root in root_store return timestamp. - // note: at some point we will need a negative cache to exponentially backoff eth_getTransactionByHash - if let Ok(Some(timestamp)) = root_store.get(tx_hash_cid.hash().digest()).await { - return Ok(timestamp); - } - - // else eth_transaction_by_hash - - let RootTime { - root: transaction_root, - block_hash, - timestamp, - } = eth_rpc.root_time_by_transaction_cid(tx_hash_cid).await?; - debug!("root: {}, timestamp: {}", transaction_root, timestamp); - - if transaction_root == proof_root { - root_store - .put( - tx_hash_cid.hash().digest(), - transaction_root.hash().digest(), - block_hash, - timestamp, - ) - .await?; - Ok(timestamp) - } else { - Err(anyhow!( - "root from transaction {} != root from proof {}", - transaction_root, - proof_root - )) - } -} - -async fn prev_in_root( - prev: Cid, - root: Cid, - path: String, - block_store: &CeramicEventService, -) -> Result { - let mut current_cid = root; - for segment in path.split('/') { - let block = block_store.get(¤t_cid).await?; - let current = CborValue::parse(&block.data)?; - current_cid = current.as_array().unwrap()[usize::from_str(segment)?] - .clone() - .try_into()?; - } - Ok(prev == current_cid) -} - -async fn migrate_from_filesystem( - input_ipfs_path: PathBuf, - store: &CeramicEventService, -) -> Result<()> { - // the block store is split in to 1024 directories and then the blocks stored as files. - // the dir structure is the penultimate two characters as dir then the b32 sha256 multihash of the block - // The leading "B" for the b32 sha256 multihash is left off - // ~/.ipfs/blocks/QV/CIQOHMGEIKMPYHAUTL57JSEZN64SIJ5OIHSGJG4TJSSJLGI3PBJLQVI.data // cspell:disable-line - let p = input_ipfs_path - .join("**/*") - .to_str() - .expect("expect utf8") - .to_owned(); - info!( - "{} Opening IPFS Repo at: {}", - Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), - &p - ); - let paths: Paths = glob(&p).unwrap(); - - let mut count = 0; - let mut err_count = 0; - - for path in paths { - let path = path.unwrap().as_path().to_owned(); - if !path.is_file() { - continue; - } - - let Ok((_base, hash_bytes)) = - multibase::decode("B".to_string() + path.file_stem().unwrap().to_str().unwrap()) - else { - info!( - "{} {:?} is not a base32upper multihash.", - Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), - path.display() - ); - err_count += 1; - continue; - }; - let Ok(hash) = Multihash::from_bytes(&hash_bytes) else { - info!( - "{} {:?} is not a base32upper multihash.", - Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), - path.display() - ); - err_count += 1; - continue; - }; - let cid = Cid::new_v1(0x71, hash); - let blob = fs::read(&path).unwrap(); - - if count % 10000 == 0 { - info!( - "{} {} {} ok:{}, err:{}", - Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), - path.display(), - cid, - count, - err_count - ); - } - - let block = iroh_bitswap::Block::new(blob.into(), cid); - let result = store.put(&block).await; - if result.is_err() { - info!( - "{} err: {} {:?}", - Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), - path.display(), - result - ); - err_count += 1; - continue; - } - count += 1; - } - - info!( - "{} count={}, err_count={}", - Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), - count, - err_count - ); - Ok(()) -} - -async fn migrate_from_database( - input_ceramic_db: PathBuf, - store: &CeramicEventService, -) -> Result<()> { - let input_ceramic_db_filename = input_ceramic_db.to_str().expect("expect utf8"); - info!( - "{} Importing blocks from {}.", - Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), - input_ceramic_db_filename - ); - let result = store.merge_from_sqlite(input_ceramic_db_filename).await; - info!( - "{} Done importing blocks from {}.", - Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), - input_ceramic_db_filename - ); - Ok(result?) -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::ethereum_rpc::EthRpc; - use ceramic_store::SqlitePool; - use multihash_codetable::{Code, MultihashDigest}; - use test_log::test; - - struct HardCodedEthRpc {} - impl EthRpc for HardCodedEthRpc { - async fn root_time_by_transaction_cid(&self, cid: Cid) -> Result { - let _ = cid; - Ok(RootTime { - root: Cid::from_str("bafyreicbwzaiyg2l4uaw6zjds3xupqeyfq3nlb36xodusgn24ou3qvgy4e") // cspell:disable-line - .unwrap(), - timestamp: 1682958731, - block_hash: "0x783cd5a6febe13d08ac0d59fa7e666483d5e476542b29688a6f0bec3d15febd4" - .to_string(), - }) - } - } - struct NeverCalledEthRpc {} - impl EthRpc for NeverCalledEthRpc { - async fn root_time_by_transaction_cid(&self, cid: Cid) -> Result { - let _cid = cid; - panic!("If we get here the test failed"); - } - } - - #[test(tokio::test)] - async fn test_validate_time_event() { - // todo: add a negative test. - // Create an in-memory SQLite pool - let pool = SqlitePool::connect_in_memory().await.unwrap(); - - // Create a new SQLiteBlockStore and SQLiteRootStore - let block_store = CeramicEventService::new(pool.clone()).await.unwrap(); - let root_store = SqliteRootStore::new(pool).await.unwrap(); - - // Add all the blocks for the Data Event & Time Event to the block store - let blocks = vec![ - // bafyreihu557meceujusxajkaro3epfe6nnzjgbjaxsapgtml7ox5ezb5qy - "a4626964d82a58260001850112207ac18e1235f2f7a84548eeb543a33f89 - 79eb88566a2dfc3d9596c55a683b7517647061746873302f302f302f302f - 302f302f302f302f302f306470726576d82a58260001850112207ac18e12 - 35f2f7a84548eeb543a33f8979eb88566a2dfc3d9596c55a683b75176570 - 726f6f66d82a58250001711220664fe7627b86f38a74cfbbcdb702d77fe9 - 38533e5402b6ce867777078d706df5", - // bafyreihu557meceujusxajkaro3epfe6nnzjgbjaxsapgtml7ox5ezb5qy/proof/ - "a464726f6f74d82a5825000171122041b6408c1b4be5016f652396ef47c0 - 982c36d5877ebb874919bae3a9b854d8e166747848617368d82a58260001 - 93011b20bf7bc715a09dea3177866ac4fc294ac9800ee2b49e09c55f5607 - 8579bfbbf158667478547970656a6628627974657333322967636861696e - 4964686569703135353a31", - // bafyreihu557meceujusxajkaro3epfe6nnzjgbjaxsapgtml7ox5ezb5qy/proof/root/ - "83d82a58250001711220e71cf20b225c878c15446675b1a209d14448eee5 - f51f5f378b192c7c64cfe1f0d82a5825000171122002920cb2496290eca3 - dff550072251d8b70b8995c243cf7808cbc305960c7962d82a5825000171 - 12202d5794752351a770a7eba07e67a0d29119ac7a67fd32eacbffb690fc - 3e4f7ffb", - // bafyreihu557meceujusxajkaro3epfe6nnzjgbjaxsapgtml7ox5ezb5qy/proof/root/0/ - "82d82a58250001711220b44e4681002e0c7bce248e9c91d49d85a3229aa7 - bdd1c43c7d245e3a7b8fc24ed82a582500017112206cf2b9460adabb65f5 - 9369aeb45eaeab359501ea9a183efaf68c72af2dbaaa27", - // bafyreihu557meceujusxajkaro3epfe6nnzjgbjaxsapgtml7ox5ezb5qy/proof/root/0/0/ - "82d82a582500017112200a07aa83a2ad17ed2809359d5c81cfd46213fa6b - 1a215639d3b973f122c8c04cd82a5825000171122079c9e24051148f5ec6 - 6ceb6ea7c3ef24b3d219794a3ce738306380676eb92af0", - // bafyreihu557meceujusxajkaro3epfe6nnzjgbjaxsapgtml7ox5ezb5qy/proof/root/0/0/0/ - "82d82a58250001711220a3261c31bfce9e22eb83ed305b017cb2b1b2edd2 - a90a294dd09f40201887020dd82a582500017112202096f43b3646196715 - a7cd7c503b020ebd21e1a4856952029782fb43abe3e54b", - // bafyreihu557meceujusxajkaro3epfe6nnzjgbjaxsapgtml7ox5ezb5qy/proof/root/0/0/0/0/ - "82d82a58250001711220838a384925aa1d757b17b2a22607130d333efb75 - ab9523250d0d17c2e5cbbfc7d82a5825000171122035f019bfe0ae32bc3f - 47acc4babc8526f98185696fc3b5eb45757f8b05f7de0e", - // bafyreihu557meceujusxajkaro3epfe6nnzjgbjaxsapgtml7ox5ezb5qy/proof/root/0/0/0/0/0/ - "82d82a58250001711220c0c93bdc49b93ac0786346a0118567ca66e4cfbd - 1d4c0519618c83ecbaa6e2aad82a582500017112202f3352cde99e1fe491 - 18f2b5d598369add98957792c00b525e36757468086fcb", - // bafyreihu557meceujusxajkaro3epfe6nnzjgbjaxsapgtml7ox5ezb5qy/proof/root/0/0/0/0/0/0/ - "82d82a582500017112204a813e0e1151f11776d02e88897fb615ae1ba3c6 - 43fc5a486ed3378fa5fcf49dd82a58250001711220269d5384e44b53c54b - 5035b15bf13a8f4e3513e5ead4a23bfdeaa4af141ad37c", - // bafyreihu557meceujusxajkaro3epfe6nnzjgbjaxsapgtml7ox5ezb5qy/proof/root/0/0/0/0/0/0/0/ - "82d82a582500017112201e48beab1c3fef5b29838492361155bd5c9c6389 - 98bccc2da00625db5a9359cfd82a582500017112200d1bf984f229cddb85 - 6fea0c8a6fd5c716defa3926b27b1ffc8308a29be4006c", - // bafyreihu557meceujusxajkaro3epfe6nnzjgbjaxsapgtml7ox5ezb5qy/proof/root/0/0/0/0/0/0/0/0/ - "82d82a58250001711220015067f14cae18a15faebcacd1d07c1c3dcf2a24 - 2334c7148de4d235f27308c6d82a58250001711220749e6401d5f860a457 - 5c94f1742a8976f6d625fa47f1923132de758184e4b599", - // bafyreihu557meceujusxajkaro3epfe6nnzjgbjaxsapgtml7ox5ezb5qy/proof/root/0/0/0/0/0/0/0/0/0/ - "82d82a58260001850112207ac18e1235f2f7a84548eeb543a33f8979eb88 - 566a2dfc3d9596c55a683b7517d82a58260001850112209ef4cd6403d5ed - 4ebeb221809d141fbedb6686b6866a9c6e9230b802fd6353cd", - // bafyreihu557meceujusxajkaro3epfe6nnzjgbjaxsapgtml7ox5ezb5qy/proof/root/0/0/0/0/0/0/0/0/0/0/ // cspell:disable-line - // bafyreihu557meceujusxajkaro3epfe6nnzjgbjaxsapgtml7ox5ezb5qy/prev - "a2677061796c6f6164582401711220cb63d41a0489a815f44ee0a771bd70 - 2f21a717bce67fcac4c4be0f14a25ad71f6a7369676e61747572657381a2 - 6970726f74656374656458817b22616c67223a224564445341222c226b69 - 64223a226469643a6b65793a7a364d6b675356337441757737675557714b - 4355593761653675574e7871596764775068554a624a6846394546586d39 - 237a364d6b675356337441757737675557714b4355593761653675574e78 - 71596764775068554a624a6846394546586d39227d697369676e61747572 - 6558403bc9687175be61ecd54a4caf82c5c8bd1938c36e5285edf26d0cca - 64597c9a99a1234eee4fa4798cfadb1c17cbe828fef73a5ab24dc50a1935 - f3bae2b37b7103", - // bafyreihu557meceujusxajkaro3epfe6nnzjgbjaxsapgtml7ox5ezb5qy/proof/root/0/0/0/0/0/0/0/0/0/0/link // cspell:disable-line - // bafyreihu557meceujusxajkaro3epfe6nnzjgbjaxsapgtml7ox5ezb5qy/prev/link - "a26464617461a7646e616d6568426c657373696e67657669657773a16661 - 7574686f72a164747970656f646f63756d656e744163636f756e74667363 - 68656d61a66474797065666f626a656374652464656673a16a4772617068 - 514c444944a4647479706566737472696e67657469746c656a4772617068 - 514c444944677061747465726e788f5e6469643a5b612d7a412d5a302d39 - 2e2123242526272a2b5c2f3d3f5e5f607b7c7d7e2d5d2b3a5b612d7a412d - 5a302d392e2123242526272a2b5c2f3d3f5e5f607b7c7d7e2d5d2a3a3f5b - 612d7a412d5a302d392e2123242526272a2b5c2f3d3f5e5f607b7c7d7e2d - 5d2a3a3f5b612d7a412d5a302d392e2123242526272a2b5c2f3d3f5e5f60 - 7b7c7d7e2d5d2a24696d61784c656e67746818646724736368656d61782c - 68747470733a2f2f6a736f6e2d736368656d612e6f72672f64726166742f - 323032302d31322f736368656d616872657175697265648162746f6a7072 - 6f70657274696573a262746fa1642472656672232f24646566732f477261 - 7068514c4449446474657874a2647479706566737472696e67696d61784c - 656e67746818f0746164646974696f6e616c50726f70657274696573f467 - 76657273696f6e63312e306972656c6174696f6e73a06b64657363726970 - 74696f6e6a4120626c657373696e676f6163636f756e7452656c6174696f - 6ea16474797065646c69737466686561646572a363736570656d6f64656c - 656d6f64656c52ce01040171710b0009686d6f64656c2d76316b636f6e74 - 726f6c6c6572738178386469643a6b65793a7a364d6b6753563374417577 - 37675557714b4355593761653675574e7871596764775068554a624a6846 - 394546586d39", - ]; - for block in blocks { - // Strip whitespace and decode the block from hex - let block = hex::decode(block.replace(['\n', ' '], "")).unwrap(); - // Create the CID and store the block. - let hash = Code::Sha2_256.digest(block.as_slice()); - let block = iroh_bitswap::Block::new(block.into(), Cid::new_v1(0x71, hash)); - block_store.put(&block).await.unwrap(); - } - - assert_eq!( - validate_time_event( - &Cid::try_from("bafyreihu557meceujusxajkaro3epfe6nnzjgbjaxsapgtml7ox5ezb5qy") // cspell:disable-line - .unwrap(), - &block_store, - &root_store, - &HardCodedEthRpc {}, - ) - .await - .unwrap(), - 1682958731 - ); - // Call validation a second time with an invalid NeverCalledEthRpc. - // The validation should still work because the result from the previous call was cached. - assert_eq!( - validate_time_event( - &Cid::try_from("bafyreihu557meceujusxajkaro3epfe6nnzjgbjaxsapgtml7ox5ezb5qy") // cspell:disable-line - .unwrap(), - &block_store, - &root_store, - &NeverCalledEthRpc {}, - ) - .await - .unwrap(), - 1682958731 - ); - } -} diff --git a/one/src/lib.rs b/one/src/lib.rs index 171b0d13b..f86df96f9 100644 --- a/one/src/lib.rs +++ b/one/src/lib.rs @@ -1,12 +1,10 @@ //! Ceramic implements a single binary ceramic node. #![warn(missing_docs)] -mod cbor_value; -mod ethereum_rpc; -mod events; mod http; mod http_metrics; mod metrics; +mod migrations; mod network; use std::{env, path::PathBuf, time::Duration}; @@ -34,8 +32,6 @@ use tracing::{debug, info, warn}; use crate::network::Ipfs; -pub use cbor_value::CborValue; - #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Cli { @@ -47,9 +43,9 @@ struct Cli { enum Command { /// Run a daemon process Daemon(DaemonOpts), - /// Event store tools + /// Perform various migrations #[command(subcommand)] - Events(events::EventsCommand), + Migrations(migrations::EventsCommand), } #[derive(Args, Debug)] @@ -83,10 +79,6 @@ struct DaemonOpts { )] extra_ceramic_peer_addresses: Vec, - /// Path to storage directory - #[arg(short, long, env = "CERAMIC_ONE_STORE_DIR")] - store_dir: Option, - /// Bind address of the metrics endpoint. #[arg( short, @@ -149,6 +141,16 @@ struct DaemonOpts { env = "CERAMIC_ONE_IDLE_CONNS_TIMEOUT_MS" )] idle_conns_timeout_ms: u64, + + #[command(flatten)] + db_opts: DBOpts, +} + +#[derive(Args, Debug)] +struct DBOpts { + /// Path to storage directory + #[arg(short, long, env = "CERAMIC_ONE_STORE_DIR")] + store_dir: Option, } #[derive(ValueEnum, Debug, Clone, Default)] @@ -226,14 +228,14 @@ pub async fn run() -> Result<()> { let args = Cli::parse(); match args.command { Command::Daemon(opts) => Daemon::run(opts).await, - Command::Events(opts) => events::events(opts).await, + Command::Migrations(opts) => migrations::migrate(opts).await, } } type InterestInterest = FullInterests; type ModelInterest = ReconInterestProvider; -impl DaemonOpts { +impl DBOpts { fn default_directory(&self) -> PathBuf { // 1 path from options // 2 path $HOME/.ceramic-one @@ -281,7 +283,7 @@ struct Daemon; impl Daemon { async fn run(opts: DaemonOpts) -> Result<()> { - let db = opts.get_database().await?; + let db = opts.db_opts.get_database().await?; // we should be able to consolidate the Store traits now that they all rely on &self, but for now we use // static dispatch and require compile-time type information, so we pass all the types we need in, even @@ -354,7 +356,7 @@ impl Daemon { ); debug!(?opts, "using daemon options"); - let dir = opts.default_directory(); + let dir = opts.db_opts.default_directory(); debug!("using directory: {}", dir.display()); // Setup tokio-metrics diff --git a/one/src/migrations.rs b/one/src/migrations.rs new file mode 100644 index 000000000..c94e5559e --- /dev/null +++ b/one/src/migrations.rs @@ -0,0 +1,155 @@ +use std::path::PathBuf; + +use anyhow::{anyhow, Result}; +use async_stream::try_stream; +use async_trait::async_trait; +use ceramic_event::unvalidated; +use ceramic_metrics::{config::Config as MetricsConfig, init_local_tracing}; +use ceramic_service::{Block, BoxedBlock}; +use cid::Cid; +use clap::{Args, Subcommand}; +use futures::Stream; +use multihash_codetable::{Code, Multihash, MultihashDigest}; +use tracing::{debug, info}; + +use crate::{DBOpts, Info}; + +#[derive(Subcommand, Debug)] +pub enum EventsCommand { + /// Migrate raw event blocks from IPFS. + FromIpfs(FromIpfsOpts), +} + +#[derive(Args, Debug)] +pub struct FromIpfsOpts { + /// The path to the ipfs_repo [eg: ~/.ipfs/blocks] + #[clap(long, short, value_parser, env = "CERAMIC_ONE_INPUT_IPFS_PATH")] + input_ipfs_path: PathBuf, + + /// Path to storage directory + #[clap(long, short, env = "CERAMIC_ONE_OUTPUT_STORE_PATH")] + output_store_path: Option, + + /// Unique key used to find other Ceramic peers via the DHT + #[arg(long, default_value = "testnet-clay", env = "CERAMIC_ONE_NETWORK")] + network: crate::Network, + + /// Unique id when the network type is 'local'. + #[arg(long, env = "CERAMIC_ONE_LOCAL_NETWORK_ID")] + local_network_id: Option, +} + +impl From<&FromIpfsOpts> for DBOpts { + fn from(value: &FromIpfsOpts) -> Self { + Self { + store_dir: value.output_store_path.clone(), + } + } +} + +pub async fn migrate(cmd: EventsCommand) -> Result<()> { + if let Err(e) = init_local_tracing() { + eprintln!("Failed to initialize tracing: {}", e); + } + let info = Info::new().await?; + + let mut metrics_config = MetricsConfig { + export: false, + tracing: false, + log_format: ceramic_metrics::config::LogFormat::MultiLine, + ..Default::default() + }; + info.apply_to_metrics_config(&mut metrics_config); + ceramic_metrics::MetricsHandle::new(metrics_config.clone()) + .await + .expect("failed to initialize metrics"); + match cmd { + EventsCommand::FromIpfs(opts) => from_ipfs(opts).await, + } +} + +async fn from_ipfs(opts: FromIpfsOpts) -> Result<()> { + let network = opts.network.to_network(&opts.local_network_id)?; + let db_opts: DBOpts = (&opts).into(); + let crate::Databases::Sqlite(db) = db_opts.get_database().await?; + let blocks = blocks_from_filesystem(opts.input_ipfs_path); + db.event_store.migrate_from_ipfs(network, blocks).await?; + Ok(()) +} +fn blocks_from_filesystem(input_ipfs_path: PathBuf) -> impl Stream> { + // the block store is split in to 1024 directories and then the blocks stored as files. + // the dir structure is the penultimate two characters as dir then the b32 sha256 multihash of the block + // The leading "B" for the b32 sha256 multihash is left off + // ~/.ipfs/blocks/QV/CIQOHMGEIKMPYHAUTL57JSEZN64SIJ5OIHSGJG4TJSSJLGI3PBJLQVI.data // cspell:disable-line + info!(path = %input_ipfs_path.display(), "opening IPFS repo"); + + let mut dirs = Vec::new(); + dirs.push(input_ipfs_path); + + try_stream! { + while !dirs.is_empty() { + let mut entries = tokio::fs::read_dir(dirs.pop().unwrap()).await?; + while let Some(entry) = entries.next_entry().await? { + if entry.metadata().await?.is_dir() { + dirs.push(entry.path()) + } else if let Some(block) = block_from_path(entry.path()).await?{ + yield block + } + } + } + } +} +async fn block_from_path(block_path: PathBuf) -> Result> { + if !block_path.is_file() { + return Ok(None); + } + + let Ok((_base, hash_bytes)) = + multibase::decode("B".to_string() + block_path.file_stem().unwrap().to_str().unwrap()) + else { + debug!(path = %block_path.display(), "block filename is not valid base32upper"); + return Ok(None); + }; + let Ok(hash) = Multihash::from_bytes(&hash_bytes) else { + debug!(path = %block_path.display(), "block filename is not a valid multihash"); + return Ok(None); + }; + let blob = tokio::fs::read(&block_path).await?; + let blob_hash = match hash.code() { + 0x12 => Code::Sha2_256.digest(&blob), + code => return Err(anyhow!("unsupported hash {code}")), + }; + if blob_hash != hash { + return Err(anyhow!( + "block data does not match hash: path={}", + block_path.display() + )); + } + // If we can decode the block as a JWS envelope then we can assume the block is dag-jose + // encoded. + const DAG_CBOR: u64 = 0x71; + const DAG_JOSE: u64 = 0x85; + let result: Result = serde_ipld_dagcbor::from_slice(&blob); + let cid = if result.is_ok() { + Cid::new_v1(DAG_JOSE, hash) + } else { + Cid::new_v1(DAG_CBOR, hash) + }; + Ok(Some(Box::new(FSBlock { + cid, + path: block_path, + }))) +} +struct FSBlock { + cid: Cid, + path: PathBuf, +} +#[async_trait] +impl Block for FSBlock { + fn cid(&self) -> Cid { + self.cid + } + async fn data(&self) -> Result> { + Ok(tokio::fs::read(&self.path).await?) + } +} diff --git a/service/Cargo.toml b/service/Cargo.toml index 9428156e3..64166d12b 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -15,12 +15,15 @@ ceramic-core.workspace = true ceramic-event.workspace = true ceramic-store.workspace = true cid.workspace = true +futures.workspace = true hex.workspace = true ipld-core.workspace = true -serde_ipld_dagcbor.workspace = true iroh-bitswap.workspace = true +multibase.workspace = true multihash-codetable.workspace = true +multihash-derive.workspace = true recon.workspace = true +serde_ipld_dagcbor.workspace = true thiserror.workspace = true tokio.workspace = true tracing.workspace = true @@ -38,3 +41,4 @@ tmpdir.workspace = true tokio.workspace = true tracing-subscriber.workspace = true uuid.workspace = true +iroh-car.workspace = true diff --git a/service/src/event/migration.rs b/service/src/event/migration.rs new file mode 100644 index 000000000..0172ef9bd --- /dev/null +++ b/service/src/event/migration.rs @@ -0,0 +1,337 @@ +use std::collections::{BTreeMap, BTreeSet}; + +use anyhow::{anyhow, bail, Context as _, Result}; +use ceramic_core::{EventId, Network}; +use ceramic_event::unvalidated::{self, signed::cacao::Capability}; +use ceramic_store::{ + BlockHash, CeramicOneEvent, EventBlockRaw, EventInsertable, EventInsertableBody, SqlitePool, +}; +use cid::Cid; +use futures::{Stream, StreamExt, TryStreamExt}; +use ipld_core::ipld::Ipld; +use thiserror::Error; +use tracing::{debug, error, info, instrument, Level}; + +use super::service::{Block, BoxedBlock}; + +pub struct Migrator { + network: Network, + blocks: BTreeMap>, + batch: Vec, + + // All unsigned init payloads we have found. + unsigned_init_payloads: BTreeSet, + // All unsigned init payloads we have found that are referenced from a signed init event + // envelope. + // We use two sets because we do not know the order in which we find blocks. + // Simply removing the referenced blocks from the unsigned_init_payloads set as we find them is + // not sufficient. + referenced_unsigned_init_payloads: BTreeSet, + + error_count: usize, + event_count: usize, +} + +impl Migrator { + pub async fn new( + network: Network, + blocks: impl Stream>, + ) -> Result { + let blocks = blocks + .map(|block| block.map(|block| (block.cid(), block))) + .try_collect::>() + .await?; + Ok(Self { + network, + blocks, + batch: Default::default(), + unsigned_init_payloads: Default::default(), + referenced_unsigned_init_payloads: Default::default(), + error_count: 0, + event_count: 0, + }) + } + + fn find_block(&self, cid: &Cid) -> Result<&BoxedBlock, AnalyzeError> { + if let Some(block) = self.blocks.get(cid) { + Ok(block) + } else { + Err(AnalyzeError::MissingBlock(*cid)) + } + } + + #[instrument(skip(self, sql_pool), ret(level = Level::DEBUG))] + pub async fn migrate(mut self, sql_pool: &SqlitePool) -> Result<()> { + let cids: Vec = self.blocks.keys().cloned().collect(); + for cid in cids { + let ret = self.process_block(cid).await; + if let Err(err) = ret { + self.error_count += 1; + error!(%cid, %err, "error processing block"); + } + if self.batch.len() > 1000 { + self.write_batch(sql_pool).await? + } + } + self.write_batch(sql_pool).await?; + + self.process_unreferenced_init_payloads(sql_pool).await?; + + info!( + event_count = self.event_count, + error_count = self.error_count, + "migration finished" + ); + Ok(()) + } + // Decodes the block and if it is a Ceramic event, it and related blocks are constructed into an + // event. + #[instrument(skip(self), ret(level = Level::DEBUG))] + async fn process_block(&mut self, cid: Cid) -> Result<()> { + let block = self.find_block(&cid)?; + let data: Vec = block.data().await?; + let event: Result, _> = serde_ipld_dagcbor::from_slice(&data); + match event { + Ok(unvalidated::RawEvent::Unsigned(_)) => { + self.unsigned_init_payloads.insert(cid); + } + Ok(unvalidated::RawEvent::Signed(event)) => { + self.process_signed_event(cid, data, event).await? + } + Ok(unvalidated::RawEvent::Time(event)) => { + self.process_time_event(cid, data, *event).await? + } + // Ignore blocks that are not Ceramic events + Err(_) => {} + } + + Ok(()) + } + // For any unsigned init payload blocks there are two possibilities: + // 1. It is an unsigned init event + // 2. It is the payload of another signed init event. + // Therefore once we have processed all other events then the remaining init payloads must be + // the blocks that are unsigned init events. + #[instrument(skip(self), ret(level = Level::DEBUG))] + async fn process_unreferenced_init_payloads(&mut self, sql_pool: &SqlitePool) -> Result<()> { + let init_events: Vec<_> = self + .unsigned_init_payloads + .difference(&self.referenced_unsigned_init_payloads) + .cloned() + .collect(); + for cid in init_events { + let block = self.find_block(&cid)?; + let data = block.data().await?; + let payload: unvalidated::init::Payload = serde_ipld_dagcbor::from_slice(&data)?; + let mut event_builder = EventBuilder::new( + cid, + payload.header().model().to_vec(), + payload.header().controllers()[0].clone(), + cid, + ); + event_builder.add_root(cid, data); + self.batch + .push(event_builder.build(self.network.clone()).await?); + if self.batch.len() > 1000 { + self.write_batch(sql_pool).await? + } + } + self.write_batch(sql_pool).await?; + Ok(()) + } + async fn write_batch(&mut self, sql_pool: &SqlitePool) -> Result<()> { + CeramicOneEvent::insert_many(sql_pool, &self.batch).await?; + self.event_count += self.batch.len(); + self.batch.truncate(0); + Ok(()) + } + + // Find and add all blocks related to this signed event + #[instrument(skip(self, data, event), ret(level = Level::DEBUG))] + async fn process_signed_event( + &mut self, + cid: Cid, + data: Vec, + event: unvalidated::signed::Envelope, + ) -> Result<()> { + let link = event + .link() + .ok_or_else(|| anyhow!("event envelope must have a link"))?; + let block = self.find_block(&link)?; + let payload_data = block.data().await?; + let payload: unvalidated::Payload = + serde_ipld_dagcbor::from_slice(&payload_data).context("decoding payload")?; + let mut event_builder = match payload { + unvalidated::Payload::Init(payload) => { + self.referenced_unsigned_init_payloads.insert(link); + EventBuilder::new( + cid, + payload.header().model().to_vec(), + payload.header().controllers()[0].clone(), + cid, + ) + } + unvalidated::Payload::Data(payload) => { + let init_payload = self.find_init_payload(payload.id()).await?; + EventBuilder::new( + cid, + init_payload.header().model().to_vec(), + init_payload.header().controllers()[0].clone(), + *payload.id(), + ) + } + }; + if let Some(capability_cid) = event.capability() { + debug!(%capability_cid, "found cap chain"); + let block = self.find_block(&capability_cid)?; + let data = block.data().await?; + // Parse capability to ensure it is valid + let capability: Capability = serde_ipld_dagcbor::from_slice(&data)?; + debug!(%capability_cid, ?capability, "capability"); + event_builder.add_block(capability_cid, data); + } + event_builder.add_block(link, payload_data.clone()); + event_builder.add_root(cid, data); + self.batch + .push(event_builder.build(self.network.clone()).await?); + Ok(()) + } + async fn find_init_payload(&self, cid: &Cid) -> Result> { + let init_block = self.find_block(cid)?; + let init_data = init_block.data().await?; + let init: unvalidated::RawEvent = + serde_ipld_dagcbor::from_slice(&init_data).context("decoding init envelope")?; + match init { + unvalidated::RawEvent::Time(_) => bail!("init event must not be a time event"), + unvalidated::RawEvent::Signed(init) => { + let init_payload_block = self.find_block( + &init + .link() + .ok_or_else(|| anyhow!("init envelope must have a link"))?, + )?; + let init_payload_data = init_payload_block.data().await?; + + serde_ipld_dagcbor::from_slice(&init_payload_data).context("decoding init payload") + } + unvalidated::RawEvent::Unsigned(payload) => Ok(payload), + } + } + // Find and add all blocks related to this time event + #[instrument(skip(self, data, event), ret(level = Level::DEBUG))] + async fn process_time_event( + &mut self, + cid: Cid, + data: Vec, + event: unvalidated::RawTimeEvent, + ) -> Result<()> { + let init = event.id(); + let init_block = self.find_block(&init)?; + let init_event: unvalidated::signed::Envelope = + serde_ipld_dagcbor::from_slice(&init_block.data().await?)?; + if let Some(init_link) = init_event.link() { + let payload_block = self.find_block(&init_link)?; + let payload: unvalidated::init::Payload = + serde_ipld_dagcbor::from_slice(&payload_block.data().await?)?; + let mut event_builder = EventBuilder::new( + cid, + payload.header().model().to_vec(), + payload.header().controllers()[0].clone(), + init, + ); + event_builder.add_root(cid, data.clone()); + let proof_id = event.proof(); + let block = self.find_block(&proof_id)?; + let data = block.data().await?; + let proof: unvalidated::Proof = serde_ipld_dagcbor::from_slice(&data)?; + event_builder.add_block(proof_id, data); + let mut curr = proof.root(); + for index in event.path().split('/') { + if curr == event.prev() { + // The time event's previous link is the same as the last link in the proof. + // That block should already be included independently no need to include it here. + break; + } + let idx: usize = index.parse().context("parsing path segment as index")?; + let block = self.find_block(&curr)?; + let data = block.data().await?; + let edge: unvalidated::ProofEdge = serde_ipld_dagcbor::from_slice(&data)?; + // Add edge data to event + event_builder.add_block(curr, data); + + // Follow path + if let Some(link) = edge.get(idx) { + curr = *link; + } else { + error!(%curr, "missing block"); + break; + } + } + + self.batch + .push(event_builder.build(self.network.clone()).await?); + } + Ok(()) + } +} + +#[derive(Error, Debug)] +enum AnalyzeError { + #[error("missing linked block from event {0}")] + MissingBlock(Cid), +} + +struct EventBuilder { + event_cid: Cid, + blocks: Vec, + + sep: Vec, + controller: String, + init: Cid, +} + +impl EventBuilder { + fn new(event_cid: Cid, sep: Vec, controller: String, init: Cid) -> Self { + Self { + blocks: Default::default(), + event_cid, + sep, + controller, + init, + } + } + + fn add_root(&mut self, cid: Cid, data: Vec) { + self._add_block(cid, data, true) + } + fn add_block(&mut self, cid: Cid, data: Vec) { + self._add_block(cid, data, false) + } + fn _add_block(&mut self, cid: Cid, data: Vec, root: bool) { + self.blocks.push(EventBlockRaw { + event_cid: self.event_cid.to_bytes(), + codec: cid.codec() as i64, + root, + idx: self.blocks.len() as i32, + multihash: BlockHash::new(cid.hash().to_owned()), + bytes: data, + }) + } + + async fn build(self, network: Network) -> Result { + let event_id = EventId::builder() + .with_network(&network) + .with_sep("model", &self.sep) + .with_controller(&self.controller) + .with_init(&self.init) + .with_event(&self.event_cid) + .build(); + Ok(EventInsertable::try_new( + event_id, + EventInsertableBody { + cid: self.event_cid, + deliverable: false, + blocks: self.blocks, + }, + )?) + } +} diff --git a/service/src/event/mod.rs b/service/src/event/mod.rs index f7dce573a..fdf734ee7 100644 --- a/service/src/event/mod.rs +++ b/service/src/event/mod.rs @@ -1,5 +1,6 @@ +mod migration; mod ordering_task; mod service; mod store; -pub use service::CeramicEventService; +pub use service::{Block, BoxedBlock, CeramicEventService}; diff --git a/service/src/event/service.rs b/service/src/event/service.rs index ddb864fb5..aa18e3fc5 100644 --- a/service/src/event/service.rs +++ b/service/src/event/service.rs @@ -1,16 +1,21 @@ use std::collections::{HashMap, HashSet}; -use ceramic_core::EventId; +use async_trait::async_trait; +use ceramic_core::{EventId, Network}; use ceramic_event::unvalidated; use ceramic_store::{CeramicOneEvent, EventInsertable, EventInsertableBody, SqlitePool}; use cid::Cid; +use futures::Stream; use ipld_core::ipld::Ipld; use recon::ReconItem; use tracing::{trace, warn}; -use super::ordering_task::{ - DeliverableEvent, DeliverableMetadata, DeliverableTask, DeliveredEvent, OrderingState, - OrderingTask, StreamEvents, +use super::{ + migration::Migrator, + ordering_task::{ + DeliverableEvent, DeliverableMetadata, DeliverableTask, DeliveredEvent, OrderingState, + OrderingTask, StreamEvents, + }, }; use crate::{Error, Result}; @@ -25,6 +30,16 @@ pub struct CeramicEventService { pub(crate) pool: SqlitePool, delivery_task: DeliverableTask, } +/// An object that represents an IPFS block where the data can be loaded async. +#[async_trait] +pub trait Block { + /// Report the CID of the block. + fn cid(&self) -> Cid; + /// Asynchronously load the block data. + /// This data should not be cached in memory as block data is accessed randomly. + async fn data(&self) -> anyhow::Result>; +} +pub type BoxedBlock = Box; impl CeramicEventService { /// Create a new CeramicEventStore @@ -53,6 +68,20 @@ impl CeramicEventService { delivery_task, }) } + pub async fn migrate_from_ipfs( + &self, + network: Network, + blocks: impl Stream>, + ) -> Result<()> { + let migrator = Migrator::new(network, blocks) + .await + .map_err(Error::new_fatal)?; + migrator + .migrate(&self.pool) + .await + .map_err(Error::new_fatal)?; + Ok(()) + } /// merge_from_sqlite takes the filepath to a sqlite file. /// If the file dose not exist the ATTACH DATABASE command will create it. diff --git a/service/src/lib.rs b/service/src/lib.rs index e42c6d02e..60de0b329 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -5,7 +5,7 @@ mod interest; mod tests; pub use error::Error; -pub use event::CeramicEventService; +pub use event::{Block, BoxedBlock, CeramicEventService}; pub use interest::CeramicInterestService; pub(crate) type Result = std::result::Result; diff --git a/service/src/tests/migration.rs b/service/src/tests/migration.rs new file mode 100644 index 000000000..58031cd94 --- /dev/null +++ b/service/src/tests/migration.rs @@ -0,0 +1,310 @@ +use std::{collections::BTreeSet, io::Cursor, str::FromStr}; + +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use ceramic_core::{DidDocument, EventId, Network, StreamId}; +use ceramic_event::unvalidated; +use cid::Cid; +use futures::{stream::BoxStream, Stream, StreamExt as _, TryStreamExt as _}; +use ipld_core::{codec::Codec, ipld, ipld::Ipld}; +use iroh_car::CarReader; +use multihash_codetable::{Code, MultihashDigest}; +use rand::{thread_rng, Rng, RngCore}; +use recon::Key; +use serde_ipld_dagcbor::codec::DagCborCodec; +use test_log::test; + +use crate::{Block, BoxedBlock, CeramicEventService}; + +struct InMemBlock { + cid: Cid, + data: Vec, +} +#[async_trait] +impl Block for InMemBlock { + fn cid(&self) -> Cid { + self.cid + } + async fn data(&self) -> Result> { + Ok(self.data.clone()) + } +} +async fn blocks_from_cars(cars: Vec>) -> impl Stream> { + let mut stream: Option>> = None; + for car in cars { + let reader = CarReader::new(Cursor::new(car)).await.unwrap(); + let s = reader + .stream() + .map_err(|err| anyhow!("{err}")) + .map(|block| block.map(|(cid, data)| Box::new(InMemBlock { cid, data }) as BoxedBlock)); + if let Some(prev_stream) = stream.take() { + stream = Some(prev_stream.chain(s).boxed()); + } else { + stream = Some(s.boxed()); + } + } + // Error if the stream is not Some, all tests pass at least one car file so this should not + // occur. + stream.unwrap() +} +async fn test_migration(cars: Vec>) { + let expected_events: BTreeSet<_> = cars + .iter() + .map(|car| multibase::encode(multibase::Base::Base64Url, car)) + .collect(); + let blocks = blocks_from_cars(cars).await; + let conn = ceramic_store::SqlitePool::connect_in_memory() + .await + .unwrap(); + let service = CeramicEventService::new(conn).await.unwrap(); + service + .migrate_from_ipfs(Network::Local(42), blocks) + .await + .unwrap(); + let actual_events: BTreeSet<_> = recon::Store::range_with_values( + &service, + &EventId::min_value()..&EventId::max_value(), + 0, + usize::MAX, + ) + .await + .unwrap() + .map(|(_event_id, car)| multibase::encode(multibase::Base::Base64Url, car)) + .collect(); + assert_eq!(expected_events, actual_events) +} +async fn signer() -> unvalidated::signed::JwkSigner { + unvalidated::signed::JwkSigner::new( + DidDocument::new("did:key:z6MktBynAPLrEyeS7pVthbiyScmfu8n5V7boXgxyo5q3SZRR#z6MktBynAPLrEyeS7pVthbiyScmfu8n5V7boXgxyo5q3SZRR"), + "df9ecf4c79e5ad77701cfc88c196632b353149d85810a381f469f8fc05dc1b92", + ) + .await + .unwrap() +} +async fn random_unsigned_init_event() -> unvalidated::Event { + let model = + StreamId::from_str("kjzl6hvfrbw6c90uwoyz8j519gxma787qbsfjtrarkr1huq1g1s224k7hopvsyg") + .unwrap(); + let model = model.to_vec(); + let mut unique = vec![0; 12]; + thread_rng().fill_bytes(&mut unique); + let data = ipld_core::ipld!({"key": thread_rng().gen::()}); + + let payload = unvalidated::Builder::init() + .with_controller("did:key:z6MktBynAPLrEyeS7pVthbiyScmfu8n5V7boXgxyo5q3SZRR".to_string()) + .with_sep("model".to_string(), model) + .with_unique(unique) + .with_data(data) + .build(); + + payload.into() +} +async fn random_signed_init_event() -> unvalidated::signed::Event { + let model = + StreamId::from_str("kjzl6hvfrbw6c90uwoyz8j519gxma787qbsfjtrarkr1huq1g1s224k7hopvsyg") + .unwrap(); + let model = model.to_vec(); + let mut unique = vec![0; 12]; + thread_rng().fill_bytes(&mut unique); + let data = ipld_core::ipld!({"key": thread_rng().gen::()}); + + let payload = unvalidated::Builder::init() + .with_controller("did:key:z6MktBynAPLrEyeS7pVthbiyScmfu8n5V7boXgxyo5q3SZRR".to_string()) + .with_sep("model".to_string(), model) + .with_unique(unique) + .with_data(data) + .build(); + + unvalidated::signed::Event::from_payload(unvalidated::Payload::Init(payload), signer().await) + .unwrap() +} +// We do not yet have the ability to sign events with a CACAO. +// So for now we test against a hard coded event signed with a CACAO. +const CACAO_CAR:&str="mO6Jlcm9vdHOB2CpYJgABhQESIN12WhnV8Y3aMRxZYqUSpaO4mSQnbGxEpeC8jMsldwv6Z3ZlcnNpb24BigQBcRIgtP3Gc62zs2I/pu98uctnwBAYUUrgyLjnPaxYwOnBytajYWihYXRnZWlwNDM2MWFwqWNhdWR4OGRpZDprZXk6ejZNa3I0YTNaM0ZGYUpGOFlxV25XblZIcWQ1ZURqWk45YkRTVDZ3VlpDMWhKODFQY2V4cHgYMjAyNC0wNi0xOVQyMDowNDo0Mi40NjRaY2lhdHgYMjAyNC0wNi0xMlQyMDowNDo0Mi40NjRaY2lzc3g7ZGlkOnBraDplaXAxNTU6MToweDM3OTRkNGYwNzdjMDhkOTI1ZmY4ZmY4MjAwMDZiNzM1MzI5OWIyMDBlbm9uY2Vqd1BpQ09jcGtsbGZkb21haW5kdGVzdGd2ZXJzaW9uYTFpcmVzb3VyY2VzgWtjZXJhbWljOi8vKmlzdGF0ZW1lbnR4PEdpdmUgdGhpcyBhcHBsaWNhdGlvbiBhY2Nlc3MgdG8gc29tZSBvZiB5b3VyIGRhdGEgb24gQ2VyYW1pY2FzomFzeIQweGIyNjY5OTkyNjM0NDZkZGI5YmY1ODg4MjVlOWFjMDhiNTQ1ZTY1NWY2MDc3ZThkODU3OWE4ZDY2MzljMTE2N2M1NmY3ZGFlN2FjNzBmN2ZhZWQ4YzE0MWFmOWUxMjRhN2ViNGY3NzQyM2E1NzJiMzYxNDRhZGE4ZWYyMjA2Y2RhMWNhdGZlaXAxOTHTAQFxEiB0pJYyJOf2PmUMzjHeHCaX9ETIA0AKnHWwb1l0CfEy/KJkZGF0YaFkc3RlcBkCWGZoZWFkZXKkY3NlcGVtb2RlbGVtb2RlbFgozgECAYUBEiAGE66nrxdaqHUZ5oBVN6FulPnXix/we9MdpVKJHSR4uGZ1bmlxdWVMxwa2TBHgC66/1V9xa2NvbnRyb2xsZXJzgXg7ZGlkOnBraDplaXAxNTU6MToweDM3OTRkNGYwNzdjMDhkOTI1ZmY4ZmY4MjAwMDZiNzM1MzI5OWIyMDCFAwGFARIg3XZaGdXxjdoxHFlipRKlo7iZJCdsbESl4LyMyyV3C/qiZ3BheWxvYWRYJAFxEiB0pJYyJOf2PmUMzjHeHCaX9ETIA0AKnHWwb1l0CfEy/GpzaWduYXR1cmVzgaJpcHJvdGVjdGVkWMx7ImFsZyI6IkVkRFNBIiwiY2FwIjoiaXBmczovL2JhZnlyZWlmdTd4ZGhobG50d25yZDdqeHBwczQ0d3o2YWNhbWZjc3hhemM0b29wbm1sZGFvdHFvazJ5Iiwia2lkIjoiZGlkOmtleTp6Nk1rcjRhM1ozRkZhSkY4WXFXblduVkhxZDVlRGpaTjliRFNUNndWWkMxaEo4MVAjejZNa3I0YTNaM0ZGYUpGOFlxV25XblZIcWQ1ZURqWk45YkRTVDZ3VlpDMWhKODFQIn1pc2lnbmF0dXJlWEB19qFT2VTY3D/LT8MYvVi0fK4tfVCgB3tMZ18ZPG+Tc4CSxm+R+Q6u57MEUWXUf1dBzBU0l1Un3lxurDlSueID"; +fn new_cacao_signed_data_event() -> Vec { + multibase::decode(CACAO_CAR).unwrap().1 +} + +async fn random_signed_data_event() -> Vec> { + let init = random_signed_init_event().await; + let data = ipld_core::ipld!({"key": thread_rng().gen::()}); + let payload = unvalidated::Builder::data() + .with_id(init.envelope_cid()) + .with_prev(init.envelope_cid()) + .with_data(data) + .build(); + vec![ + init.into(), + unvalidated::signed::Event::from_payload( + unvalidated::Payload::Data(payload), + signer().await, + ) + .unwrap() + .into(), + ] +} + +fn random_cid() -> cid::Cid { + use multihash_codetable::Code; + use multihash_derive::MultihashDigest; + + let mut data = [0u8; 8]; + rand::Rng::fill(&mut rand::thread_rng(), &mut data); + let hash = Code::Sha2_256.digest(&data); + cid::Cid::new_v1(0x00, hash) +} + +fn cid_from_dag_cbor(data: &[u8]) -> Cid { + Cid::new_v1( + >::CODE, + Code::Sha2_256.digest(data), + ) +} +async fn random_time_event() -> Vec> { + let init = random_signed_init_event().await; + let mut prev = init.envelope_cid(); + let mut witness_nodes = Vec::new(); + for _ in 0..10 { + let (idx, edge) = if thread_rng().gen() { + (0, ipld!([prev, random_cid()])) + } else { + (1, ipld!([random_cid(), prev])) + }; + let edge_bytes = serde_ipld_dagcbor::to_vec(&edge).unwrap(); + prev = cid_from_dag_cbor(&edge_bytes); + witness_nodes.push((idx, edge)); + } + let (root_idx, root_edge) = witness_nodes.pop().unwrap(); + + let mut builder = unvalidated::Builder::time() + .with_id(init.envelope_cid()) + .with_tx( + "eip155:11155111".to_string(), + random_cid(), + "f(bytes32)".to_string(), + ) + .with_root(root_idx, root_edge); + for (idx, edge) in witness_nodes.into_iter().rev() { + builder = builder.with_witness_node(idx, edge); + } + let time = builder.build().unwrap(); + vec![init.into(), Box::new(time).into()] +} +#[test(tokio::test)] +async fn unsigned_init_event() { + test_migration(vec![random_unsigned_init_event() + .await + .encode_car() + .await + .unwrap()]) + .await; +} +#[test(tokio::test)] +async fn many_unsigned_init_events() { + test_migration(vec![ + random_unsigned_init_event() + .await + .encode_car() + .await + .unwrap(), + random_unsigned_init_event() + .await + .encode_car() + .await + .unwrap(), + random_unsigned_init_event() + .await + .encode_car() + .await + .unwrap(), + ]) + .await; +} +#[test(tokio::test)] +async fn signed_init_event() { + test_migration(vec![random_signed_init_event() + .await + .encode_car() + .await + .unwrap()]) + .await; +} +#[test(tokio::test)] +async fn many_signed_init_events() { + test_migration(vec![ + random_signed_init_event().await.encode_car().await.unwrap(), + random_signed_init_event().await.encode_car().await.unwrap(), + random_signed_init_event().await.encode_car().await.unwrap(), + ]) + .await; +} +#[test(tokio::test)] +async fn signed_data_event() { + let mut cars = Vec::new(); + for event in random_signed_data_event().await { + cars.push(event.encode_car().await.unwrap()); + } + test_migration(cars).await; +} +#[test(tokio::test)] +async fn many_signed_data_events() { + let mut cars = Vec::new(); + for _ in 0..3 { + for event in random_signed_data_event().await { + cars.push(event.encode_car().await.unwrap()); + } + } + test_migration(cars).await; +} +#[test(tokio::test)] +async fn cacao_signed_data_event() { + test_migration(vec![new_cacao_signed_data_event()]).await; +} +#[test(tokio::test)] +async fn time_event() { + let mut cars = Vec::new(); + for event in random_time_event().await { + cars.push(event.encode_car().await.unwrap()); + } + test_migration(cars).await; +} +#[test(tokio::test)] +async fn many_time_events() { + let mut cars = Vec::new(); + for _ in 0..3 { + for event in random_time_event().await { + cars.push(event.encode_car().await.unwrap()); + } + } + test_migration(cars).await; +} +#[test(tokio::test)] +async fn all_events() { + let mut cars = Vec::new(); + + cars.push(new_cacao_signed_data_event()); + + for _ in 0..3 { + cars.push( + random_unsigned_init_event() + .await + .encode_car() + .await + .unwrap(), + ); + } + for _ in 0..3 { + cars.push(random_signed_init_event().await.encode_car().await.unwrap()); + } + for _ in 0..3 { + for event in random_signed_data_event().await { + cars.push(event.encode_car().await.unwrap()); + } + } + for _ in 0..3 { + for event in random_time_event().await { + cars.push(event.encode_car().await.unwrap()); + } + } + test_migration(cars).await; +} diff --git a/service/src/tests/mod.rs b/service/src/tests/mod.rs index 644d3081b..c052c0958 100644 --- a/service/src/tests/mod.rs +++ b/service/src/tests/mod.rs @@ -1,5 +1,6 @@ mod event; mod interest; +mod migration; mod ordering; use ceramic_core::{DidDocument, EventId, Network, StreamId}; diff --git a/store/src/lib.rs b/store/src/lib.rs index 39747ffbd..a81d357e3 100644 --- a/store/src/lib.rs +++ b/store/src/lib.rs @@ -9,9 +9,9 @@ mod sql; pub use error::Error; pub use metrics::{Metrics, StoreMetricsMiddleware}; pub use sql::{ - entities::EventInsertable, entities::EventInsertableBody, CeramicOneBlock, CeramicOneEvent, - CeramicOneEventBlock, CeramicOneInterest, InsertResult, InsertedEvent, Migrations, SqlitePool, - SqliteRootStore, SqliteTransaction, + entities::{BlockHash, EventBlockRaw, EventInsertable, EventInsertableBody}, + CeramicOneBlock, CeramicOneEvent, CeramicOneEventBlock, CeramicOneInterest, InsertResult, + InsertedEvent, Migrations, SqlitePool, SqliteRootStore, SqliteTransaction, }; pub(crate) type Result = std::result::Result; diff --git a/store/src/sql/entities/event.rs b/store/src/sql/entities/event.rs index fb8ae1692..363ee7ae6 100644 --- a/store/src/sql/entities/event.rs +++ b/store/src/sql/entities/event.rs @@ -5,10 +5,9 @@ use iroh_car::{CarHeader, CarReader, CarWriter}; use std::collections::BTreeSet; -use crate::{ - sql::entities::{BlockRow, EventBlockRaw}, - Error, Result, -}; +pub use crate::sql::entities::EventBlockRaw; + +use crate::{sql::entities::BlockRow, Error, Result}; pub async fn rebuild_car(blocks: Vec) -> Result>> { if blocks.is_empty() { diff --git a/store/src/sql/entities/event_block.rs b/store/src/sql/entities/event_block.rs index 58b80f2f6..9f9e352ba 100644 --- a/store/src/sql/entities/event_block.rs +++ b/store/src/sql/entities/event_block.rs @@ -12,7 +12,9 @@ use crate::{ Error, Result, }; +// TODO: make type private #[derive(Debug, Clone)] +#[allow(missing_docs)] pub struct EventBlockRaw { pub event_cid: Vec, pub codec: i64, @@ -91,6 +93,7 @@ impl sqlx::FromRow<'_, SqliteRow> for EventBlockRaw { } impl EventBlockRaw { + #[allow(missing_docs)] pub fn try_new( event_cid: &Cid, idx: i32, @@ -135,6 +138,7 @@ impl EventBlockRaw { }) } + /// CID of the block pub fn cid(&self) -> Cid { Cid::new_v1(self.codec as u64, self.multihash.clone().into_inner()) } diff --git a/store/src/sql/entities/hash.rs b/store/src/sql/entities/hash.rs index a914b808f..f94d6cd0c 100644 --- a/store/src/sql/entities/hash.rs +++ b/store/src/sql/entities/hash.rs @@ -4,27 +4,30 @@ use sqlx::{sqlite::SqliteRow, Row as _}; use crate::{Error, Result}; -#[derive(Debug, Clone, PartialEq, Eq, sqlx::Type)] +// TODO: make type private +#[allow(missing_docs)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, sqlx::Type)] pub struct BlockHash(Multihash<64>); impl BlockHash { + #[allow(missing_docs)] pub fn new(hash: Multihash<64>) -> Self { Self(hash) } - pub fn try_from_vec(data: &[u8]) -> Result { + pub(crate) fn try_from_vec(data: &[u8]) -> Result { Ok(Self(Multihash::from_bytes(data).map_err(Error::new_app)?)) } - pub fn to_bytes(&self) -> Vec { + pub(crate) fn to_bytes(&self) -> Vec { self.0.to_bytes() } - pub fn inner(&self) -> &Multihash<64> { + pub(crate) fn inner(&self) -> &Multihash<64> { &self.0 } - pub fn into_inner(self) -> Multihash<64> { + pub(crate) fn into_inner(self) -> Multihash<64> { self.0 } }