Skip to content

Commit

Permalink
fix: use event service rather than store to enforce deliverability/or… (
Browse files Browse the repository at this point in the history
#403)

* fix: use event service rather than store to enforce deliverability/ordering for migrations

* refactor: use unvalidated::Event to manage carfile instead of blocks directly

changed TimeEvent to use Vec<ProofEdge> instead of Vec<Ipld> that had to be
an IPLD list to avoid a bug I introduced in migration by flattening the vec
to causing every witness node entry to be in their own block.
  • Loading branch information
dav1do authored Jul 1, 2024
1 parent f1ee03a commit 3379fa7
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 81 deletions.
12 changes: 9 additions & 3 deletions event/src/unvalidated/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use anyhow::{anyhow, bail};
use cid::Cid;
use ipld_core::ipld::Ipld;

use super::cid_from_dag_cbor;
use super::{cid_from_dag_cbor, ProofEdge};

/// Builder for constructing events.
pub struct Builder;
Expand Down Expand Up @@ -394,8 +394,14 @@ impl TimeBuilder<TimeBuilderWithRoot> {
.state
.witness_nodes
.into_iter()
.map(|(_index, edge)| edge)
.collect();
.map(|(_index, edge)| match edge {
Ipld::List(v) => Ok(v),
ipld => {
tracing::info!(?ipld, "Time event witness node is not a list");
Err(anyhow!("Time event witness node is not a list"))
}
})
.collect::<anyhow::Result<Vec<ProofEdge>>>()?;

let event = unvalidated::RawTimeEvent::new(self.state.id, prev, proof_cid, path);
Ok(unvalidated::TimeEvent::new(event, proof, blocks_in_path))
Expand Down
16 changes: 13 additions & 3 deletions event/src/unvalidated/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ where
serde_ipld_dagcbor::from_slice(proof_bytes).context("decoding proof")?;
let blocks_in_path =
Self::get_time_event_witness_blocks(&event, &proof, car_blocks)?;
let blocks_in_path = blocks_in_path
.into_iter()
.map(|block| match block {
Ipld::List(l) => Ok(l),
ipld => {
tracing::info!(?ipld, "Time Event witness node is not a list");
Err(anyhow!("Time Event witness node must be a list"))
}
})
.collect::<anyhow::Result<Vec<ProofEdge>>>()?;

Ok((
event_cid,
Expand Down Expand Up @@ -228,12 +238,12 @@ impl<D> From<signed::Envelope> for RawEvent<D> {
pub struct TimeEvent {
event: RawTimeEvent,
proof: Proof,
blocks_in_path: Vec<Ipld>,
blocks_in_path: Vec<ProofEdge>,
}

impl TimeEvent {
/// Create a new time event from its parts
pub fn new(event: RawTimeEvent, proof: Proof, blocks_in_path: Vec<Ipld>) -> Self {
pub fn new(event: RawTimeEvent, proof: Proof, blocks_in_path: Vec<ProofEdge>) -> Self {
Self {
event,
proof,
Expand Down Expand Up @@ -363,7 +373,7 @@ impl Proof {
}
}

/// Proof edge
/// Proof edge TODO: rename witness node
pub type ProofEdge = Vec<Ipld>;

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ futures.workspace = true
hex.workspace = true
ipld-core.workspace = true
iroh-bitswap.workspace = true
iroh-car.workspace = true
multibase.workspace = true
multihash-codetable.workspace = true
multihash-derive.workspace = true
Expand Down
128 changes: 59 additions & 69 deletions service/src/event/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@ use std::collections::{BTreeMap, BTreeSet};
use anyhow::{anyhow, bail, Context as _, Result};
use ceramic_core::{EventId, Network};
use ceramic_event::unvalidated::{self, signed::cacao::Capability};
use ceramic_store::{
BlockHash, CeramicOneEvent, EventBlockRaw, EventInsertable, EventInsertableBody, SqlitePool,
};
use cid::Cid;
use futures::{Stream, StreamExt, TryStreamExt};
use ipld_core::ipld::Ipld;
use thiserror::Error;
use tracing::{debug, error, info, instrument, Level};

use crate::CeramicEventService;

use super::service::{Block, BoxedBlock};

pub struct Migrator {
pub struct Migrator<'a> {
service: &'a CeramicEventService,
network: Network,
blocks: BTreeMap<Cid, Box<dyn Block>>,
batch: Vec<EventInsertable>,
batch: Vec<(EventId, Vec<u8>)>,

// All unsigned init payloads we have found.
unsigned_init_payloads: BTreeSet<Cid>,
Expand All @@ -32,8 +32,9 @@ pub struct Migrator {
event_count: usize,
}

impl Migrator {
impl<'a> Migrator<'a> {
pub async fn new(
service: &'a CeramicEventService,
network: Network,
blocks: impl Stream<Item = Result<BoxedBlock>>,
) -> Result<Self> {
Expand All @@ -43,6 +44,7 @@ impl Migrator {
.await?;
Ok(Self {
network,
service,
blocks,
batch: Default::default(),
unsigned_init_payloads: Default::default(),
Expand All @@ -60,8 +62,8 @@ impl Migrator {
}
}

#[instrument(skip(self, sql_pool), ret(level = Level::DEBUG))]
pub async fn migrate(mut self, sql_pool: &SqlitePool) -> Result<()> {
#[instrument(skip(self), ret(level = Level::DEBUG))]
pub async fn migrate(mut self) -> Result<()> {
let cids: Vec<Cid> = self.blocks.keys().cloned().collect();
for cid in cids {
let ret = self.process_block(cid).await;
Expand All @@ -70,12 +72,12 @@ impl Migrator {
error!(%cid, err=format!("{err:#}"), "error processing block");
}
if self.batch.len() > 1000 {
self.write_batch(sql_pool).await?
self.write_batch().await?
}
}
self.write_batch(sql_pool).await?;
self.write_batch().await?;

self.process_unreferenced_init_payloads(sql_pool).await?;
self.process_unreferenced_init_payloads().await?;

info!(
event_count = self.event_count,
Expand All @@ -96,11 +98,9 @@ impl Migrator {
self.unsigned_init_payloads.insert(cid);
}
Ok(unvalidated::RawEvent::Signed(event)) => {
self.process_signed_event(cid, data, event).await?
}
Ok(unvalidated::RawEvent::Time(event)) => {
self.process_time_event(cid, data, *event).await?
self.process_signed_event(cid, event).await?
}
Ok(unvalidated::RawEvent::Time(event)) => self.process_time_event(cid, *event).await?,
// Ignore blocks that are not Ceramic events
Err(_) => {}
}
Expand All @@ -113,7 +113,7 @@ impl Migrator {
// Therefore once we have processed all other events then the remaining init payloads must be
// the blocks that are unsigned init events.
#[instrument(skip(self), ret(level = Level::DEBUG))]
async fn process_unreferenced_init_payloads(&mut self, sql_pool: &SqlitePool) -> Result<()> {
async fn process_unreferenced_init_payloads(&mut self) -> Result<()> {
let init_events: Vec<_> = self
.unsigned_init_payloads
.difference(&self.referenced_unsigned_init_payloads)
Expand All @@ -123,35 +123,41 @@ impl Migrator {
let block = self.find_block(&cid).context("finding init event block")?;
let data = block.data().await?;
let payload: unvalidated::init::Payload<Ipld> = serde_ipld_dagcbor::from_slice(&data)?;
let mut event_builder = EventBuilder::new(
let event_builder = EventBuilder::new(
cid,
payload.header().model().to_vec(),
payload.header().controllers()[0].clone(),
cid,
);
event_builder.add_root(cid, data);
let event = unvalidated::Event::from(payload);
self.batch
.push(event_builder.build(self.network.clone()).await?);
.push(event_builder.build(&self.network, event).await?);
if self.batch.len() > 1000 {
self.write_batch(sql_pool).await?
self.write_batch().await?
}
}
self.write_batch(sql_pool).await?;
self.write_batch().await?;
Ok(())
}
async fn write_batch(&mut self, sql_pool: &SqlitePool) -> Result<()> {
CeramicOneEvent::insert_many(sql_pool, self.batch.iter()).await?;
async fn write_batch(&mut self) -> Result<()> {
let items = self
.batch
.iter()
.map(|(id, body)| recon::ReconItem::new(id, body))
.collect::<Vec<_>>();
self.service
.insert_events_from_carfiles_recon(&items)
.await?;
self.event_count += self.batch.len();
self.batch.truncate(0);
Ok(())
}

// Find and add all blocks related to this signed event
#[instrument(skip(self, data, event), ret(level = Level::DEBUG))]
#[instrument(skip(self, event), ret(level = Level::DEBUG))]
async fn process_signed_event(
&mut self,
cid: Cid,
data: Vec<u8>,
event: unvalidated::signed::Envelope,
) -> Result<()> {
let link = event
Expand All @@ -163,7 +169,7 @@ impl Migrator {
let payload_data = block.data().await?;
let payload: unvalidated::Payload<Ipld> =
serde_ipld_dagcbor::from_slice(&payload_data).context("decoding payload")?;
let mut event_builder = match payload {
let event_builder = match &payload {
unvalidated::Payload::Init(payload) => {
self.referenced_unsigned_init_payloads.insert(link);
EventBuilder::new(
Expand All @@ -183,21 +189,22 @@ impl Migrator {
)
}
};
let mut capability = None;
if let Some(capability_cid) = event.capability() {
debug!(%capability_cid, "found cap chain");
let block = self
.find_block(&capability_cid)
.context("finding capability block")?;
let data = block.data().await?;
// Parse capability to ensure it is valid
let capability: Capability = serde_ipld_dagcbor::from_slice(&data)?;
debug!(%capability_cid, ?capability, "capability");
event_builder.add_block(capability_cid, data);
let cap: Capability = serde_ipld_dagcbor::from_slice(&data)?;
debug!(%capability_cid, ?cap, "capability");
capability = Some((capability_cid, cap));
}
event_builder.add_block(link, payload_data.clone());
event_builder.add_root(cid, data);
let s = unvalidated::signed::Event::new(cid, event, link, payload, capability);
let event = unvalidated::Event::from(s);
self.batch
.push(event_builder.build(self.network.clone()).await?);
.push(event_builder.build(&self.network, event).await?);
Ok(())
}
async fn find_init_payload(&self, cid: &Cid) -> Result<unvalidated::init::Payload<Ipld>> {
Expand All @@ -223,28 +230,26 @@ impl Migrator {
}
}
// Find and add all blocks related to this time event
#[instrument(skip(self, data, event), ret(level = Level::DEBUG))]
#[instrument(skip(self, event), ret(level = Level::DEBUG))]
async fn process_time_event(
&mut self,
cid: Cid,
data: Vec<u8>,
event: unvalidated::RawTimeEvent,
) -> Result<()> {
let init = event.id();
let init_payload = self.find_init_payload(&event.id()).await?;
let mut event_builder = EventBuilder::new(
let event_builder = EventBuilder::new(
cid,
init_payload.header().model().to_vec(),
init_payload.header().controllers()[0].clone(),
init,
);
event_builder.add_root(cid, data.clone());
let proof_id = event.proof();
let block = self.find_block(&proof_id).context("finding proof block")?;
let data = block.data().await?;
let proof: unvalidated::Proof = serde_ipld_dagcbor::from_slice(&data)?;
event_builder.add_block(proof_id, data);
let mut curr = proof.root();
let mut proof_edges = Vec::new();
for index in event.path().split('/') {
if curr == event.prev() {
// The time event's previous link is the same as the last link in the proof.
Expand All @@ -256,20 +261,22 @@ impl Migrator {
let data = block.data().await.context("fetch block data")?;
let edge: unvalidated::ProofEdge =
serde_ipld_dagcbor::from_slice(&data).context("dag cbor decode")?;
// Add edge data to event
event_builder.add_block(curr, data);

// Follow path
if let Some(Ipld::Link(link)) = edge.get(idx) {
curr = *link;
let maybe_link = edge.get(idx).cloned();
// Add edge data to event
proof_edges.push(edge);
if let Some(Ipld::Link(link)) = maybe_link {
curr = link;
} else {
error!(%curr, "missing block");
break;
}
}

let time = unvalidated::TimeEvent::new(event, proof, proof_edges);
let event: unvalidated::Event<Ipld> = unvalidated::Event::from(Box::new(time));
self.batch
.push(event_builder.build(self.network.clone()).await?);
.push(event_builder.build(&self.network, event).await?);
Ok(())
}
}
Expand All @@ -282,8 +289,6 @@ enum AnalyzeError {

struct EventBuilder {
event_cid: Cid,
blocks: Vec<EventBlockRaw>,

sep: Vec<u8>,
controller: String,
init: Cid,
Expand All @@ -292,42 +297,27 @@ struct EventBuilder {
impl EventBuilder {
fn new(event_cid: Cid, sep: Vec<u8>, controller: String, init: Cid) -> Self {
Self {
blocks: Default::default(),
event_cid,
sep,
controller,
init,
}
}

fn add_root(&mut self, cid: Cid, data: Vec<u8>) {
self._add_block(cid, data, true)
}
fn add_block(&mut self, cid: Cid, data: Vec<u8>) {
self._add_block(cid, data, false)
}
fn _add_block(&mut self, cid: Cid, data: Vec<u8>, root: bool) {
self.blocks.push(EventBlockRaw {
event_cid: self.event_cid.to_bytes(),
codec: cid.codec() as i64,
root,
idx: self.blocks.len() as i32,
multihash: BlockHash::new(cid.hash().to_owned()),
bytes: data,
})
}

async fn build(self, network: Network) -> Result<EventInsertable> {
async fn build(
self,
network: &Network,
event: unvalidated::Event<Ipld>,
) -> Result<(EventId, Vec<u8>)> {
let event_id = EventId::builder()
.with_network(&network)
.with_network(network)
.with_sep("model", &self.sep)
.with_controller(&self.controller)
.with_init(&self.init)
.with_event(&self.event_cid)
.build();
Ok(EventInsertable::try_new(
event_id,
EventInsertableBody::new(self.event_cid, self.blocks, false),
)?)

let body = event.encode_car().await?;
Ok((event_id, body))
}
}
7 changes: 2 additions & 5 deletions service/src/event/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,10 @@ impl CeramicEventService {
network: Network,
blocks: impl Stream<Item = anyhow::Result<BoxedBlock>>,
) -> Result<()> {
let migrator = Migrator::new(network, blocks)
.await
.map_err(Error::new_fatal)?;
migrator
.migrate(&self.pool)
let migrator = Migrator::new(self, network, blocks)
.await
.map_err(Error::new_fatal)?;
migrator.migrate().await.map_err(Error::new_fatal)?;
Ok(())
}

Expand Down
Loading

0 comments on commit 3379fa7

Please sign in to comment.