From 75b852ff4be57b5ee80cc8230b06b767e928a6ce Mon Sep 17 00:00:00 2001 From: Seph Gentle Date: Tue, 29 Oct 2024 13:49:08 +1100 Subject: [PATCH] wip uplog encoding changes --- src/encoding/mod.rs | 2 +- src/oplog.rs | 69 ++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 66 insertions(+), 5 deletions(-) diff --git a/src/encoding/mod.rs b/src/encoding/mod.rs index 07d20a0..41c0ad3 100644 --- a/src/encoding/mod.rs +++ b/src/encoding/mod.rs @@ -49,7 +49,7 @@ pub(crate) enum ChunkType { CausalGraph = 21, Operations = 20, // OpTypeAndPosition = 22, - + CGClientIDs = 22, CGEntries = 23, diff --git a/src/oplog.rs b/src/oplog.rs index fb69c6a..29beb25 100644 --- a/src/oplog.rs +++ b/src/oplog.rs @@ -526,14 +526,43 @@ impl OpLog { // // Better to write the chunk header first, and use the "raw output" as the ids, and so on. // - which would all avoid some allocations & copies. - let mut cg_changes = Vec::new(); - push_chunk(&mut cg_changes, ChunkType::CGClientIDs, &ids).unwrap(); - push_chunk(&mut cg_changes, ChunkType::CGEntries, &cg_entries).unwrap(); - + + // Serialize map operations let mut map_ops = Vec::new(); + + let mut last_crdt = LVKey::MAX; + let mut last_key: Option<&str> = None; + for (crdt, key) in map_crdts_to_send { + // We need to write 4 fields: + // - The ID of the map CRDT being edited + // - The CRDT's key + // - The version of the edit + // - The new value + + let write_crdt = crdt != last_crdt; + last_crdt = crdt; + + let write_key = last_key != Some(&key); + last_key = Some(key.as_str()); + + // There are 3 cases for the CRDT: + // 1. We're editing the same CRDT as the previous operation. This is the most common. + // 2. We're editing the root CRDT. Write mapped agent 0. + // 3. We're editing some other CRDT. Write mapped agent a + 1. + if write_crdt { + if crdt == ROOT_CRDT_ID { + + } else { + let av = self.cg.agent_assignment.local_to_agent_version(crdt); + let mapped_agent = write_map.map_and_store(av.0, &self.cg.agent_assignment.client_data, &mut ids); + + } + } + + let crdt_name = self.crdt_name_to_remote(crdt); let entry = self.map_keys.get(&(crdt, key.clone())) .unwrap(); @@ -553,6 +582,33 @@ impl OpLog { } } } + + + + + + // // Serialize map operations + // let mut map_ops = Vec::new(); + // for (crdt, key) in map_crdts_to_send { + // let crdt_name = self.crdt_name_to_remote(crdt); + // let entry = self.map_keys.get(&(crdt, key.clone())) + // .unwrap(); + // for r in diff_rev.iter().rev() { + // // Find all the unknown ops. + // // TODO: Add a flag to trim this to only the most recent ops. + // let start_idx = entry.ops + // .binary_search_by_key(&r.start, |e| e.0) + // .unwrap_or_else(|idx| idx); + // + // for pair in &entry.ops[start_idx..] { + // if pair.0 >= r.end { break; } + // + // // dbg!(pair); + // let rv = self.cg.agent_assignment.local_to_remote_version(pair.0); + // map_ops.push((crdt_name, rv, key.as_str(), pair.1.clone())); + // } + // } + // } // Serialize text operations let mut text_context = ListOperationCtx::new(); @@ -578,6 +634,11 @@ impl OpLog { } } + let mut cg_changes = Vec::new(); + push_chunk(&mut cg_changes, ChunkType::CGClientIDs, &ids).unwrap(); + push_chunk(&mut cg_changes, ChunkType::CGEntries, &cg_entries).unwrap(); + + SerializedOps { cg_changes, map_ops,