diff --git a/CODEOWNERS b/CODEOWNERS index 84ee4eb07..e53e31331 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -11,15 +11,16 @@ /ci-scripts/ @smrz2001 @ceramicnetwork/eng-reviewers /core/ @nathanielc @ceramicnetwork/eng-reviewers /event/ @nathanielc @ceramicnetwork/eng-reviewers +/flight/ @nathanielc @ceramicnetwork/eng-reviewers /fpm/ @dbcfd @ceramicnetwork/eng-reviewers /kubo-rpc-server/ @nathanielc @ceramicnetwork/eng-reviewers /kubo-rpc/ @nathanielc @ceramicnetwork/eng-reviewers /metrics/ @dav1do @ceramicnetwork/eng-reviewers /migrations/ @dav1do @ceramicnetwork/eng-reviewers +/olap/ @nathanielc @ceramicnetwork/eng-reviewers /one/ @stbrody @ceramicnetwork/eng-reviewers /p2p/ @nathanielc @ceramicnetwork/eng-reviewers /recon/ @AaronGoldman @ceramicnetwork/eng-reviewers /service/ @dav1do @ceramicnetwork/eng-reviewers /store/ @dav1do @ceramicnetwork/eng-reviewers -/olap/ @nathanielc @ceramicnetwork/eng-reviewers /validation/ @dav1do @ceramicnetwork/eng-reviewers diff --git a/Cargo.lock b/Cargo.lock index 6b827e5e8..33852cb73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1791,21 +1791,34 @@ name = "ceramic-olap" version = "0.35.0" dependencies = [ "anyhow", + "arrow", + "arrow-flight", + "ceramic-arrow-test", + "ceramic-core", + "ceramic-flight", "ceramic-metrics", + "cid 0.11.1", "clap 4.5.4", "datafusion", + "datafusion-federation", + "datafusion-flight-sql-table-provider", + "expect-test", "futures", "git-version", "hyper 0.14.28", + "json-patch", "multibase 0.9.1", "multihash 0.19.1", "multihash-codetable", "multihash-derive 0.9.0", "names", "prometheus-client", + "serde_json", "signal-hook", "signal-hook-tokio", + "test-log", "tokio", + "tonic 0.11.0", "tracing", ] @@ -2849,6 +2862,20 @@ dependencies = [ "tonic 0.11.0", ] +[[package]] +name = "datafusion-flight-sql-table-provider" +version = "0.2.1" +source = "git+https://github.com/datafusion-contrib/datafusion-federation.git?branch=main#c79097031341f4109bec1cdbf893c239e5bcf928" +dependencies = [ + "arrow", + "arrow-flight", + "async-trait", + "datafusion", + "datafusion-federation", + "futures", + "tonic 0.11.0", +] + [[package]] name = "datafusion-functions" version = "41.0.0" @@ -3684,6 +3711,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "fluent-uri" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17c704e9dbe1ddd863da1e6ff3567795087b1eb201ce80d8fa81162e1516500d" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "flume" version = "0.10.14" @@ -5117,6 +5153,18 @@ dependencies = [ "smallvec", ] +[[package]] +name = "json-patch" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b1fb8864823fad91877e6caea0baca82e49e8db50f8e5c9f9a453e27d3330fc" +dependencies = [ + "jsonptr", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "json-syntax" version = "0.9.6" @@ -5136,6 +5184,17 @@ dependencies = [ "smallvec", ] +[[package]] +name = "jsonptr" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c6e529149475ca0b2820835d3dce8fcc41c6b943ca608d32f35b449255e4627" +dependencies = [ + "fluent-uri", + "serde", + "serde_json", +] + [[package]] name = "k256" version = "0.11.6" diff --git a/Cargo.toml b/Cargo.toml index 7dff78948..44970659d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,7 +87,9 @@ crossterm = "0.25" ctrlc = "3.2.2" dag-jose = "0.2" datafusion = "41.0.0" +datafusion-federation = "0.2" datafusion-flight-sql-server = "0.2" +datafusion-flight-sql-table-provider = { git = "https://github.com/datafusion-contrib/datafusion-federation.git", branch = "main" } deadqueue = "0.2.3" derivative = "2.2" derive_more = "0.99.17" @@ -150,6 +152,7 @@ paste = "1.0.9" phf = "0.11" prometheus-client = "0.22" proptest = "1" +#TODO upgrade to 0.12 prost = "0.11" prost-build = "0.11.1" quic-rpc = { version = "0.3.2", default-features = false } diff --git a/flight/src/server.rs b/flight/src/server.rs index 4fa20c0b8..3b2a71be2 100644 --- a/flight/src/server.rs +++ b/flight/src/server.rs @@ -39,7 +39,7 @@ pub fn new_server( feed: Arc, ) -> anyhow::Result { let ctx = SessionContext::new_with_config( - SessionConfig::new().with_default_catalog_and_schema("flight_sql", "ceramic"), + SessionConfig::new().with_default_catalog_and_schema("ceramic", "v0"), ); ctx.register_table("conclusion_feed", Arc::new(FeedTable::new(feed)))?; let svc = FlightServiceServer::new(FlightSqlService::new(ctx.state())); diff --git a/olap/Cargo.toml b/olap/Cargo.toml index f25d797cf..bb5652b4e 100644 --- a/olap/Cargo.toml +++ b/olap/Cargo.toml @@ -9,19 +9,34 @@ repository.workspace = true [dependencies] anyhow.workspace = true +arrow.workspace = true +arrow-flight.workspace = true +ceramic-arrow-test.workspace = true ceramic-metrics.workspace = true +cid.workspace = true clap.workspace = true datafusion.workspace = true +datafusion-federation.workspace = true +datafusion-flight-sql-table-provider.workspace = true futures.workspace = true git-version = "0.3.9" hyper.workspace = true +json-patch = "2.0.0" multibase.workspace = true multihash.workspace = true multihash-codetable = { version = "0.1.3", features = ["sha2"] } multihash-derive = "0.9.0" names.workspace = true prometheus-client.workspace = true +serde_json.workspace = true signal-hook = "0.3.17" signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] } tokio = { workspace = true, features = ["fs", "rt-multi-thread"] } +tonic.workspace = true tracing.workspace = true + +[dev-dependencies] +test-log.workspace = true +ceramic-flight.workspace = true +expect-test.workspace = true +ceramic-core.workspace = true diff --git a/olap/src/aggregator.rs b/olap/src/aggregator.rs deleted file mode 100644 index f8b119ee9..000000000 --- a/olap/src/aggregator.rs +++ /dev/null @@ -1,12 +0,0 @@ -use std::future::Future; - -use anyhow::Result; -use datafusion::execution::context::SessionContext; - -pub async fn run(shutdown_signal: impl Future) -> Result<()> { - // Create datafusion context - let _ctx = SessionContext::new(); - - shutdown_signal.await; - Ok(()) -} diff --git a/olap/src/aggregator/ceramic_patch.rs b/olap/src/aggregator/ceramic_patch.rs new file mode 100644 index 000000000..0e100ded7 --- /dev/null +++ b/olap/src/aggregator/ceramic_patch.rs @@ -0,0 +1,147 @@ +use std::sync::Arc; + +use arrow::{ + array::{Array as _, ArrayBuilder as _, ArrayRef, StringBuilder}, + datatypes::DataType, +}; +use datafusion::{ + common::{ + cast::{as_binary_array, as_string_array}, + exec_datafusion_err, Result, + }, + logical_expr::{ + PartitionEvaluator, Signature, TypeSignature, Volatility, WindowUDF, WindowUDFImpl, + }, +}; + +/// Applies a Ceramic data event to a document state returning the new document state. +#[derive(Debug)] +pub struct CeramicPatch { + signature: Signature, +} + +impl CeramicPatch { + pub fn new_udwf() -> WindowUDF { + WindowUDF::new_from_impl(Self::new()) + } + fn new() -> Self { + Self { + signature: Signature::new( + TypeSignature::Exact(vec![ + DataType::Binary, + DataType::Binary, + DataType::Utf8, + DataType::Utf8, + ]), + Volatility::Immutable, + ), + } + } +} + +impl WindowUDFImpl for CeramicPatch { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn name(&self) -> &str { + "ceramic_patch" + } + + fn signature(&self) -> &datafusion::logical_expr::Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Utf8) + } + + fn partition_evaluator(&self) -> Result> { + Ok(Box::new(CeramicPatchEvaluator)) + } +} + +#[derive(Debug)] +struct CeramicPatchEvaluator; + +impl CeramicPatchEvaluator { + fn apply_patch(patch: &str, previous_state: &str) -> Result { + let patch: Vec = serde_json::from_str(patch) + .map_err(|err| exec_datafusion_err!("Error parsing patch: {err}"))?; + let mut new_state: serde_json::Value = serde_json::from_str(previous_state) + .map_err(|err| exec_datafusion_err!("Error parsing previous state: {err}"))?; + json_patch::patch(&mut new_state, &patch) + .map_err(|err| exec_datafusion_err!("Error applying JSON patch: {err}"))?; + serde_json::to_string(&new_state) + .map_err(|err| exec_datafusion_err!("Error JSON encoding: {err}")) + } +} + +impl PartitionEvaluator for CeramicPatchEvaluator { + fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result { + let event_cids = as_binary_array(&values[0])?; + let previous_cids = as_binary_array(&values[1])?; + let previous_states = as_string_array(&values[2])?; + let patches = as_string_array(&values[3])?; + let mut new_states = StringBuilder::new(); + for i in 0..num_rows { + if previous_cids.is_null(i) { + //Init event, patch value is the initial state + if patches.is_null(i) { + // If we have an init event without data use an empty object as the initial + // state. This feels like a leaky abstraction is this expected based on the + // Ceramic spec? + new_states.append_value("{}"); + } else { + new_states.append_value(patches.value(i)); + } + } else if let Some(previous_state) = if !previous_states.is_null(i) { + // We know the previous state already + Some(previous_states.value(i)) + } else { + // Iterator backwards till we find the previous state among the new states. + let previous_cid = previous_cids.value(i); + let mut j = i; + loop { + if j == 0 { + break None; + } + j -= 1; + if event_cids.value(j) == previous_cid { + break Some(value_at(&new_states, j)); + } + } + } { + if patches.is_null(i) { + // We have a time event, new state is just the previous state + // + // Allow clippy warning as previous_state is a reference back into new_states. + // So we need to copy the data to a new location before we can copy it back + // into the new_states. + #[allow(clippy::unnecessary_to_owned)] + new_states.append_value(previous_state.to_string()); + } else { + new_states.append_value(CeramicPatchEvaluator::apply_patch( + patches.value(i), + previous_state, + )?); + } + } else { + // Unreachable when data is well formed. + new_states.append_null(); + } + } + Ok(Arc::new(new_states.finish())) + } +} + +fn value_at(builder: &StringBuilder, idx: usize) -> &str { + let start = builder.offsets_slice()[idx] as usize; + let stop = if idx < builder.len() { + builder.offsets_slice()[idx + 1] as usize + } else { + builder.values_slice().len() + }; + std::str::from_utf8(&builder.values_slice()[start..stop]) + .expect("new states should always be valid utf8") +} diff --git a/olap/src/aggregator/mod.rs b/olap/src/aggregator/mod.rs new file mode 100644 index 000000000..a62cd895d --- /dev/null +++ b/olap/src/aggregator/mod.rs @@ -0,0 +1,628 @@ +//! Aggregation functions for Ceramic Model Instance Document streams. +//! Applies each new event to the previous state of the stream producing the stream state at each +//! event in the stream. +mod ceramic_patch; + +use std::{any::Any, future::Future, sync::Arc}; + +use anyhow::{Context as _, Result}; +use arrow::datatypes::{DataType, Field, Fields, SchemaBuilder}; +use arrow_flight::sql::client::FlightSqlServiceClient; +use ceramic_patch::CeramicPatch; +use datafusion::{ + catalog::{CatalogProvider, SchemaProvider}, + common::{Column, JoinType}, + dataframe::{DataFrame, DataFrameWriteOptions}, + datasource::{file_format::parquet::ParquetFormat, listing::ListingOptions}, + error::DataFusionError, + execution::context::SessionContext, + functions_array::extract::array_element, + logical_expr::{ + col, expr::WindowFunction, Cast, Expr, ExprFunctionExt as _, WindowFunctionDefinition, + }, + scalar::ScalarValue, + sql::TableReference, +}; +use datafusion_federation::sql::{SQLFederationProvider, SQLSchemaProvider}; +use datafusion_flight_sql_table_provider::FlightSQLExecutor; +use tonic::transport::Endpoint; + +pub struct Config { + pub flight_sql_endpoint: String, +} + +pub async fn run( + config: impl Into, + shutdown_signal: impl Future, +) -> Result<()> { + let config = config.into(); + // Create federated datafusion state + let state = datafusion_federation::default_session_state(); + let client = new_client(config.flight_sql_endpoint.clone()).await?; + let executor = Arc::new(FlightSQLExecutor::new(config.flight_sql_endpoint, client)); + let provider = Arc::new(SQLFederationProvider::new(executor)); + let schema_provider = Arc::new( + SQLSchemaProvider::new_with_tables(provider, vec!["conclusion_feed".to_string()]).await?, + ); + + // Create datafusion context + let ctx = SessionContext::new_with_state(state); + + // Register federated catalog + ctx.register_catalog("ceramic", Arc::new(SQLCatalog { schema_provider })); + + // Configure doc_state listing table + let file_format = ParquetFormat::default().with_enable_pruning(true); + let listing_options = + ListingOptions::new(Arc::new(file_format)).with_file_extension(".parquet"); + + // TODO place this table in object store, see AES-284 + ctx.register_listing_table( + "doc_state", + "./db/doc_state", + listing_options, + Some(Arc::new( + SchemaBuilder::from(&Fields::from([ + Arc::new(Field::new("index", DataType::UInt64, false)), + Arc::new(Field::new("stream_cid", DataType::Binary, false)), + Arc::new(Field::new("event_type", DataType::UInt8, false)), + Arc::new(Field::new("controller", DataType::Utf8, false)), + Arc::new(Field::new("event_cid", DataType::Binary, false)), + Arc::new(Field::new("state", DataType::Utf8, false)), + ])) + .finish(), + )), + None, + ) + .await?; + + let conclusion_feed = ctx + .table(TableReference::full("ceramic", "v0", "conclusion_feed")) + .await? + .select(vec![ + col("index"), + col("event_type"), + col("stream_cid"), + col("controller"), + col("event_cid"), + Expr::Cast(Cast::new(Box::new(col("data")), DataType::Utf8)).alias("data"), + col("previous"), + ])?; + + // TODO call this in a loop, see AES-291 + process_feed_batch(ctx, conclusion_feed).await?; + + shutdown_signal.await; + Ok(()) +} + +/// Creates a new [FlightSqlServiceClient] for the passed endpoint. Completes the relevant auth configurations +/// or handshake as appropriate for the passed [FlightSQLAuth] variant. +async fn new_client(dsn: String) -> Result> { + let endpoint = Endpoint::new(dsn).map_err(tx_error_to_df)?; + let channel = endpoint.connect().await.map_err(tx_error_to_df)?; + Ok(FlightSqlServiceClient::new(channel)) +} + +fn tx_error_to_df(err: tonic::transport::Error) -> DataFusionError { + DataFusionError::External(format!("failed to connect: {err:?}").into()) +} + +// Process events from the conclusion feed, producing a new document state for each input event. +// The session context must have a registered a `doc_state` table with stream_cid, event_cid, and +// state columns. +// +// The events in the conclusion feed must: +// * have stream_cid, event_cid, previous, and data columns, +// * have previous CIDs that either already exist in `doc_state` or be contained within the +// current conclusion_feed batch, +// * be valid JSON patch data documents. +async fn process_feed_batch(ctx: SessionContext, conclusion_feed: DataFrame) -> Result<()> { + let doc_state = ctx.table("doc_state").await?.select_columns(&[ + "stream_cid", + "index", + "event_type", + "controller", + "event_cid", + "state", + ])?; + + conclusion_feed + // MID only ever use the first previous, so we can optimize the join by selecting the + // first element of the previous array. + .select(vec![ + col("index"), + col("event_type"), + col("stream_cid"), + col("controller"), + col("event_cid"), + Expr::Cast(Cast::new(Box::new(col("data")), DataType::Utf8)).alias("data"), + array_element(col("previous"), Expr::Literal(ScalarValue::Int8(Some(1)))) + .alias("previous"), + ])? + .join_on( + doc_state, + JoinType::Left, + [col("previous").eq(col("doc_state.event_cid"))], + )? + // Project joined columns to just the ones we need + .select(vec![ + col(Column { + relation: Some(TableReference::from("?table?")), + name: "index".to_string(), + }) + .alias("index"), + col(Column { + relation: Some(TableReference::from("?table?")), + name: "event_type".to_string(), + }) + .alias("event_type"), + col(Column { + relation: Some(TableReference::from("?table?")), + name: "stream_cid".to_string(), + }) + .alias("stream_cid"), + col(Column { + relation: Some(TableReference::from("?table?")), + name: "controller".to_string(), + }) + .alias("controller"), + col(Column { + relation: Some(TableReference::from("?table?")), + name: "event_cid".to_string(), + }) + .alias("event_cid"), + col("previous"), + col("doc_state.state").alias("previous_state"), + col("data"), + ])? + .window(vec![Expr::WindowFunction(WindowFunction::new( + WindowFunctionDefinition::WindowUDF(Arc::new(CeramicPatch::new_udwf())), + vec![ + col("event_cid"), + col("previous"), + col("previous_state"), + col("data"), + ], + )) + .partition_by(vec![col("stream_cid")]) + .order_by(vec![col("index").sort(true, true)]) + .build()? + .alias("new_state")])? + // Rename columns to match doc_state table schema + .select_columns(&[ + "index", + "stream_cid", + "event_type", + "controller", + "event_cid", + "new_state", + ])? + .with_column_renamed("new_state", "state")? + // Write states to the doc_state table + .write_table("doc_state", DataFrameWriteOptions::new()) + .await + .context("computing states")?; + Ok(()) +} + +struct SQLCatalog { + schema_provider: Arc, +} + +impl CatalogProvider for SQLCatalog { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + vec!["v0".to_string()] + } + + fn schema(&self, _name: &str) -> Option> { + Some(self.schema_provider.clone()) + } +} + +#[cfg(test)] +mod tests { + use std::str::FromStr as _; + + use super::*; + + use arrow::{array::RecordBatch, util::pretty::pretty_format_batches}; + use ceramic_arrow_test::CidString; + use ceramic_core::StreamIdType; + use ceramic_flight::{ + conclusion_events_to_record_batch, ConclusionData, ConclusionEvent, ConclusionInit, + ConclusionTime, + }; + use cid::Cid; + use datafusion::{ + common::Constraints, + logical_expr::{ + expr::ScalarFunction, CreateMemoryTable, DdlStatement, EmptyRelation, LogicalPlan, + ScalarUDF, + }, + }; + use expect_test::expect; + use test_log::test; + + async fn do_test(conclusion_feed: RecordBatch) -> anyhow::Result { + do_pass(init_ctx().await?, conclusion_feed).await + } + async fn init_ctx() -> anyhow::Result { + let ctx = SessionContext::new(); + ctx.execute_logical_plan(LogicalPlan::Ddl(DdlStatement::CreateMemoryTable( + CreateMemoryTable { + name: "doc_state".into(), + constraints: Constraints::empty(), + input: LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new( + SchemaBuilder::from(&Fields::from([ + Arc::new(Field::new("index", DataType::UInt64, false)), + Arc::new(Field::new("stream_cid", DataType::Binary, false)), + Arc::new(Field::new("event_type", DataType::UInt8, false)), + Arc::new(Field::new("controller", DataType::Utf8, false)), + Arc::new(Field::new("event_cid", DataType::Binary, false)), + Arc::new(Field::new("state", DataType::Utf8, false)), + ])) + .finish() + .try_into() + .unwrap(), + ), + }) + .into(), + if_not_exists: false, + or_replace: false, + column_defaults: vec![], + }, + ))) + .await?; + Ok(ctx) + } + async fn do_pass( + ctx: SessionContext, + conclusion_feed: RecordBatch, + ) -> anyhow::Result { + let conclusion_feed = ctx.read_batch(conclusion_feed)?; + process_feed_batch(ctx.clone(), conclusion_feed).await?; + let cid_string = Arc::new(ScalarUDF::from(CidString::new())); + let doc_state = ctx + .table("doc_state") + .await? + .select(vec![ + col("index"), + Expr::ScalarFunction(ScalarFunction::new_udf( + cid_string.clone(), + vec![col("stream_cid")], + )) + .alias("stream_cid"), + col("event_type"), + col("controller"), + Expr::ScalarFunction(ScalarFunction::new_udf(cid_string, vec![col("event_cid")])) + .alias("event_cid"), + col("state"), + ])? + .collect() + .await?; + Ok(pretty_format_batches(&doc_state)?) + } + + #[test(tokio::test)] + async fn single_init_event() -> anyhow::Result<()> { + let doc_state = do_test(conclusion_events_to_record_batch(&[ + ConclusionEvent::Data(ConclusionData { + index: 0, + event_cid: Cid::from_str( + "baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi", + )?, + init: ConclusionInit { + stream_cid: Cid::from_str( + "baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu", + )?, + stream_type: StreamIdType::Model as u8, + controller: "did:key:bob".to_string(), + dimensions: vec![], + }, + previous: vec![], + data: r#"{"a":0}"#.into(), + }), + ])?) + .await?; + expect![[r#" + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+ + | index | stream_cid | event_type | controller | event_cid | state | + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"a":0} | + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+"#]].assert_eq(&doc_state.to_string()); + Ok(()) + } + #[test(tokio::test)] + async fn multiple_data_events() -> anyhow::Result<()> { + let doc_state = do_test(conclusion_events_to_record_batch(&[ + ConclusionEvent::Data(ConclusionData { + index: 1, + event_cid: Cid::from_str( + "baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi", + )?, + init: ConclusionInit { + stream_cid: Cid::from_str( + "baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu", + )?, + stream_type: StreamIdType::Model as u8, + controller: "did:key:bob".to_string(), + dimensions: vec![], + }, + previous: vec![], + data: r#"{"a":0}"#.into(), + }), + ConclusionEvent::Data(ConclusionData { + index: 2, + event_cid: Cid::from_str( + "baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du", + )?, + init: ConclusionInit { + stream_cid: Cid::from_str( + "baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu", + )?, + stream_type: StreamIdType::Model as u8, + controller: "did:key:bob".to_string(), + dimensions: vec![], + }, + previous: vec![Cid::from_str( + "baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi", + )?], + data: r#"[{"op":"replace", "path": "/a", "value":1}]"#.into(), + }), + ])?) + .await?; + expect![[r#" + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+ + | index | stream_cid | event_type | controller | event_cid | state | + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+ + | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"a":0} | + | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | {"a":1} | + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+"#]].assert_eq(&doc_state.to_string()); + Ok(()) + } + #[test(tokio::test)] + async fn multiple_data_and_time_events() -> anyhow::Result<()> { + let doc_state = do_test(conclusion_events_to_record_batch(&[ + ConclusionEvent::Data(ConclusionData { + index: 0, + event_cid: Cid::from_str( + "baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi", + )?, + init: ConclusionInit { + stream_cid: Cid::from_str( + "baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu", + )?, + stream_type: StreamIdType::Model as u8, + controller: "did:key:bob".to_string(), + dimensions: vec![], + }, + previous: vec![], + data: r#"{"a":0}"#.into(), + }), + ConclusionEvent::Time(ConclusionTime { + index: 1, + event_cid: Cid::from_str( + "baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq", + )?, + init: ConclusionInit { + stream_cid: Cid::from_str( + "baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu", + )?, + stream_type: StreamIdType::Model as u8, + controller: "did:key:bob".to_string(), + dimensions: vec![], + }, + previous: vec![Cid::from_str( + "baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi", + )?], + }), + ConclusionEvent::Data(ConclusionData { + index: 2, + event_cid: Cid::from_str( + "baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du", + )?, + init: ConclusionInit { + stream_cid: Cid::from_str( + "baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu", + )?, + stream_type: StreamIdType::Model as u8, + controller: "did:key:bob".to_string(), + dimensions: vec![], + }, + previous: vec![Cid::from_str( + "baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq", + )?], + data: r#"[{"op":"replace", "path": "/a", "value":1}]"#.into(), + }), + ])?) + .await?; + expect![[r#" + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+ + | index | stream_cid | event_type | controller | event_cid | state | + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"a":0} | + | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 1 | did:key:bob | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | {"a":0} | + | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | {"a":1} | + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+"#]].assert_eq(&doc_state.to_string()); + Ok(()) + } + + #[test(tokio::test)] + async fn multiple_single_event_passes() -> anyhow::Result<()> { + // Test multiple passes where a single event for the stream is present in the conclusion + // feed for each pass. + let ctx = init_ctx().await?; + let doc_state = do_pass( + ctx.clone(), + conclusion_events_to_record_batch(&[ConclusionEvent::Data(ConclusionData { + index: 0, + event_cid: Cid::from_str( + "baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi", + )?, + init: ConclusionInit { + stream_cid: Cid::from_str( + "baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu", + )?, + stream_type: StreamIdType::Model as u8, + controller: "did:key:bob".to_string(), + dimensions: vec![], + }, + previous: vec![], + data: r#"{"a":0}"#.into(), + })])?, + ) + .await?; + expect![[r#" + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+ + | index | stream_cid | event_type | controller | event_cid | state | + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"a":0} | + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+"#]].assert_eq(&doc_state.to_string()); + let doc_state = do_pass( + ctx.clone(), + conclusion_events_to_record_batch(&[ConclusionEvent::Time(ConclusionTime { + index: 1, + event_cid: Cid::from_str( + "baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq", + )?, + init: ConclusionInit { + stream_cid: Cid::from_str( + "baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu", + )?, + stream_type: StreamIdType::Model as u8, + controller: "did:key:bob".to_string(), + dimensions: vec![], + }, + previous: vec![Cid::from_str( + "baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi", + )?], + })])?, + ) + .await?; + expect![[r#" + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+ + | index | stream_cid | event_type | controller | event_cid | state | + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"a":0} | + | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 1 | did:key:bob | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | {"a":0} | + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+"#]].assert_eq(&doc_state.to_string()); + let doc_state = do_pass( + ctx, + conclusion_events_to_record_batch(&[ConclusionEvent::Data(ConclusionData { + index: 2, + event_cid: Cid::from_str( + "baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du", + )?, + init: ConclusionInit { + stream_cid: Cid::from_str( + "baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu", + )?, + stream_type: StreamIdType::Model as u8, + controller: "did:key:bob".to_string(), + dimensions: vec![], + }, + previous: vec![Cid::from_str( + "baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq", + )?], + data: r#"[{"op":"replace", "path": "/a", "value":1}]"#.into(), + })])?, + ) + .await?; + expect![[r#" + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+ + | index | stream_cid | event_type | controller | event_cid | state | + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"a":0} | + | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 1 | did:key:bob | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | {"a":0} | + | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | {"a":1} | + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+"#]].assert_eq(&doc_state.to_string()); + Ok(()) + } + #[test(tokio::test)] + async fn multiple_passes() -> anyhow::Result<()> { + let ctx = init_ctx().await?; + let doc_state = do_pass( + ctx.clone(), + conclusion_events_to_record_batch(&[ConclusionEvent::Data(ConclusionData { + index: 0, + event_cid: Cid::from_str( + "baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi", + )?, + init: ConclusionInit { + stream_cid: Cid::from_str( + "baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu", + )?, + stream_type: StreamIdType::Model as u8, + controller: "did:key:bob".to_string(), + dimensions: vec![], + }, + previous: vec![], + data: r#"{"a":0}"#.into(), + })])?, + ) + .await?; + expect![[r#" + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+ + | index | stream_cid | event_type | controller | event_cid | state | + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"a":0} | + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+"#]].assert_eq(&doc_state.to_string()); + let doc_state = do_pass( + ctx.clone(), + conclusion_events_to_record_batch(&[ + ConclusionEvent::Time(ConclusionTime { + index: 1, + event_cid: Cid::from_str( + "baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq", + )?, + init: ConclusionInit { + stream_cid: Cid::from_str( + "baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu", + )?, + stream_type: StreamIdType::Model as u8, + controller: "did:key:bob".to_string(), + dimensions: vec![], + }, + previous: vec![Cid::from_str( + "baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi", + )?], + }), + ConclusionEvent::Data(ConclusionData { + index: 2, + event_cid: Cid::from_str( + "baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du", + )?, + init: ConclusionInit { + stream_cid: Cid::from_str( + "baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu", + )?, + stream_type: StreamIdType::Model as u8, + controller: "did:key:bob".to_string(), + dimensions: vec![], + }, + previous: vec![Cid::from_str( + "baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq", + )?], + data: r#"[{"op":"replace", "path": "/a", "value":1}]"#.into(), + }), + ])?, + ) + .await?; + expect![[r#" + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+ + | index | stream_cid | event_type | controller | event_cid | state | + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"a":0} | + | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 1 | did:key:bob | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | {"a":0} | + | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | {"a":1} | + +-------+-------------------------------------------------------------+------------+-------------+-------------------------------------------------------------+---------+"#]].assert_eq(&doc_state.to_string()); + Ok(()) + } +} diff --git a/olap/src/lib.rs b/olap/src/lib.rs index d610fb077..5f0f58d1a 100644 --- a/olap/src/lib.rs +++ b/olap/src/lib.rs @@ -1,3 +1,6 @@ +//! OLAP Aggregator process for aggregating Ceramic Model Instance Document streams. +#![warn(missing_docs)] + mod aggregator; mod metrics; @@ -28,11 +31,20 @@ enum Command { #[derive(Args, Debug)] struct DaemonOpts { + /// Endpoint of a Flight SQL server for the conclusion feed. + #[arg( + short, + long, + default_value = "127.0.0.1:5102", + env = "CERAMIC_OLAP_FLIGHT_SQL_ENDPOINT" + )] + flight_sql_endpoint: String, + /// Bind address of the metrics endpoint. #[arg( short, long, - default_value = "127.0.0.1:9464", + default_value = "127.0.0.1:9465", env = "CERAMIC_OLAP_METRICS_BIND_ADDRESS" )] metrics_bind_address: String, @@ -78,6 +90,14 @@ enum LogFormat { Json, } +impl From<&DaemonOpts> for aggregator::Config { + fn from(value: &DaemonOpts) -> Self { + Self { + flight_sql_endpoint: value.flight_sql_endpoint.clone(), + } + } +} + /// Run the ceramic one binary process pub async fn run() -> Result<()> { let args = Cli::parse(); @@ -130,7 +150,7 @@ async fn daemon(opts: DaemonOpts) -> Result<()> { let signals_handle = tokio::spawn(handle_signals(signals, tx)); // Start aggregator - aggregator::run(async move { + aggregator::run(&opts, async move { let _ = rx.await; }) .await?; diff --git a/olap/src/metrics.rs b/olap/src/metrics.rs index 1c77aa78c..b7a2f89dc 100644 --- a/olap/src/metrics.rs +++ b/olap/src/metrics.rs @@ -1,3 +1,4 @@ +//! Metrics server use std::{convert::Infallible, net::SocketAddr}; use anyhow::Result;