From 4126a9239c227b24a536625800442c23997dd5c9 Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Wed, 24 Jan 2024 15:26:56 +0100 Subject: [PATCH 01/24] add `Encode` and `Decode` traits --- src/encode.rs | 15 +++++++++++++++ src/lib.rs | 17 ++++++++++------- 2 files changed, 25 insertions(+), 7 deletions(-) create mode 100644 src/encode.rs diff --git a/src/encode.rs b/src/encode.rs new file mode 100644 index 0000000..458e05e --- /dev/null +++ b/src/encode.rs @@ -0,0 +1,15 @@ +use std::error::Error as StdError; + +/// TODO: docs +pub(crate) trait Encode { + /// TODO: docs + fn encode(&self, buf: &mut Vec); +} + +/// TODO: docs +pub(crate) trait Decode: Sized { + type Error: StdError; + + /// TODO: docs + fn decode(buf: &[u8]) -> Result<(Self, &[u8]), Self::Error>; +} diff --git a/src/lib.rs b/src/lib.rs index 5cde0f1..e67c607 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -129,6 +129,10 @@ extern crate alloc; mod anchor; mod backlog; mod deletion; +#[cfg(feature = "encode")] +mod encode; +#[cfg(feature = "encode")] +mod encoded_replica; mod gtree; mod insertion; mod replica; @@ -144,6 +148,12 @@ pub use anchor::{Anchor, AnchorBias}; use backlog::Backlog; pub use backlog::{BackloggedDeletions, BackloggedInsertions}; pub use deletion::Deletion; +#[cfg(feature = "encode")] +use encode::{Decode, Encode}; +#[cfg(feature = "encode")] +use encoded_replica::{checksum, checksum_array}; +#[cfg(feature = "encode")] +pub use encoded_replica::{DecodeError, EncodedReplica}; use gtree::{Gtree, LeafIdx}; pub use insertion::Insertion; pub use replica::Replica; @@ -156,13 +166,6 @@ pub use text::Text; use utils::*; use version_map::{DeletionMap, VersionMap}; -#[cfg(feature = "encode")] -mod encoded_replica; -#[cfg(feature = "encode")] -use encoded_replica::{checksum, checksum_array}; -#[cfg(feature = "encode")] -pub use encoded_replica::{DecodeError, EncodedReplica}; - /// The version of the protocol cola uses to represent `EncodedReplica`s and /// `CrdtEdit`s. /// From 0ad14678975424b3d1c631c463a1374ad0b64bba Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Wed, 24 Jan 2024 15:54:14 +0100 Subject: [PATCH 02/24] add `impl_deserialize` and `impl_serialize` --- src/encode.rs | 68 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 2 ++ 2 files changed, 70 insertions(+) diff --git a/src/encode.rs b/src/encode.rs index 458e05e..8587944 100644 --- a/src/encode.rs +++ b/src/encode.rs @@ -13,3 +13,71 @@ pub(crate) trait Decode: Sized { /// TODO: docs fn decode(buf: &[u8]) -> Result<(Self, &[u8]), Self::Error>; } + +pub(crate) use serde::{impl_deserialize, impl_serialize}; + +#[cfg(feature = "serde")] +mod serde { + macro_rules! impl_deserialize { + ($ty:ty) => { + impl<'de> ::serde::de::Deserialize<'de> for $ty { + #[inline] + fn deserialize(deserializer: D) -> Result + where + D: ::serde::de::Deserializer<'de>, + { + struct Visitor; + + impl<'de> ::serde::de::Visitor<'de> for Visitor { + type Value = $ty; + + #[inline] + fn expecting( + &self, + formatter: &mut ::core::fmt::Formatter, + ) -> ::core::fmt::Result { + formatter.write_str("a byte slice") + } + + #[inline] + fn visit_bytes( + self, + v: &[u8], + ) -> Result + where + E: ::serde::de::Error, + { + ::decode(v) + .map(|(value, _rest)| value) + .map_err(E::custom) + } + } + + deserializer.deserialize_bytes(Visitor) + } + } + }; + } + + macro_rules! impl_serialize { + ($ty:ty) => { + impl ::serde::ser::Serialize for $ty { + #[inline] + fn serialize( + &self, + serializer: S, + ) -> Result + where + S: ::serde::ser::Serializer, + { + let mut buf = Vec::new(); + ::encode(&self, &mut buf); + serializer.serialize_bytes(&buf) + } + } + }; + } + + pub(crate) use impl_deserialize; + pub(crate) use impl_serialize; +} diff --git a/src/lib.rs b/src/lib.rs index e67c607..29e810c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -148,6 +148,8 @@ pub use anchor::{Anchor, AnchorBias}; use backlog::Backlog; pub use backlog::{BackloggedDeletions, BackloggedInsertions}; pub use deletion::Deletion; +#[cfg(feature = "serde")] +use encode::{impl_deserialize, impl_serialize}; #[cfg(feature = "encode")] use encode::{Decode, Encode}; #[cfg(feature = "encode")] From 75916556b995eb91832df3e41834a92b676c94c6 Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Wed, 24 Jan 2024 19:24:34 +0100 Subject: [PATCH 03/24] add `Int` for encoding and decoding integer types --- src/encode.rs | 104 ++++++++++++++++++++++++++++++++++++++++++++++++-- src/lib.rs | 2 +- 2 files changed, 102 insertions(+), 4 deletions(-) diff --git a/src/encode.rs b/src/encode.rs index 8587944..8e86392 100644 --- a/src/encode.rs +++ b/src/encode.rs @@ -7,13 +7,111 @@ pub(crate) trait Encode { } /// TODO: docs -pub(crate) trait Decode: Sized { +pub(crate) trait Decode { + type Value: Sized; + type Error: StdError; /// TODO: docs - fn decode(buf: &[u8]) -> Result<(Self, &[u8]), Self::Error>; + fn decode(buf: &[u8]) -> Result<(Self::Value, &[u8]), Self::Error>; +} + +/// A variable-length encoded integer. +pub(crate) struct Int(I); + +impl Int { + #[inline] + pub(crate) fn new(integer: I) -> Self { + Self(integer) + } +} + +impl_int_encode!(u8); +impl_int_encode!(u16); +impl_int_encode!(u32); +impl_int_encode!(u64); + +impl_int_decode!(u8); +impl_int_decode!(u16); +impl_int_decode!(u32); +impl_int_decode!(u64); + +impl Encode for Int { + #[inline(always)] + fn encode(&self, buf: &mut Vec) { + Int(self.0 as u64).encode(buf) + } } +impl Decode for Int { + type Value = usize; + + type Error = core::convert::Infallible; + + #[inline] + fn decode(buf: &[u8]) -> Result<(usize, &[u8]), Self::Error> { + Int::::decode(buf).map(|(value, rest)| (value as usize, rest)) + } +} + +macro_rules! impl_int_encode { + ($ty:ty) => { + impl Encode for Int<$ty> { + #[inline] + fn encode(&self, buf: &mut Vec) { + let array = self.0.to_le_bytes(); + + let num_trailing_zeros = array + .iter() + .rev() + .copied() + .take_while(|&byte| byte == 0) + .count(); + + let len = array.len() - num_trailing_zeros; + + buf.push(len as u8); + + buf.extend_from_slice(&array[..len]); + } + } + }; +} + +use impl_int_encode; + +macro_rules! impl_int_decode { + ($ty:ty) => { + impl Decode for Int<$ty> { + type Value = $ty; + + type Error = core::convert::Infallible; + + #[inline] + fn decode(buf: &[u8]) -> Result<($ty, &[u8]), Self::Error> { + let Some((&len, buf)) = buf.split_first() else { todo!() }; + + if len as usize > buf.len() { + todo!(); + } + + let mut array = [0u8; ::core::mem::size_of::<$ty>()]; + + let (bytes, buf) = buf.split_at(len as usize); + + array[..bytes.len()].copy_from_slice(bytes); + + let int = <$ty>::from_le_bytes(array); + + Ok((int, buf)) + } + } + }; +} + +use impl_int_decode; + +#[cfg(feature = "serde")] pub(crate) use serde::{impl_deserialize, impl_serialize}; #[cfg(feature = "serde")] @@ -29,7 +127,7 @@ mod serde { struct Visitor; impl<'de> ::serde::de::Visitor<'de> for Visitor { - type Value = $ty; + type Value = <$ty as $crate::Decode>::Value; #[inline] fn expecting( diff --git a/src/lib.rs b/src/lib.rs index 29e810c..499df0b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -151,7 +151,7 @@ pub use deletion::Deletion; #[cfg(feature = "serde")] use encode::{impl_deserialize, impl_serialize}; #[cfg(feature = "encode")] -use encode::{Decode, Encode}; +use encode::{Decode, Encode, Int}; #[cfg(feature = "encode")] use encoded_replica::{checksum, checksum_array}; #[cfg(feature = "encode")] From 82eb4c8b85f4ca1db1278504204434d83abba6b4 Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Wed, 24 Jan 2024 19:24:51 +0100 Subject: [PATCH 04/24] implement `Encode` and `Decode` for `Anchor` --- src/anchor.rs | 62 ++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/src/anchor.rs b/src/anchor.rs index 111a024..a323d77 100644 --- a/src/anchor.rs +++ b/src/anchor.rs @@ -62,6 +62,25 @@ impl Anchor { } } +/// A bias to use when creating an [`Anchor`]. +/// +/// This is used in the +/// [`Replica::create_anchor()`][crate::Replica::create_anchor] method to +/// create a new [`Anchor`]. See the documentation of that method for more +/// information. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[cfg_attr( + any(feature = "encode", feature = "serde"), + derive(serde::Serialize, serde::Deserialize) +)] +pub enum AnchorBias { + /// The anchor should attach to the left. + Left, + + /// The anchor should attach to the right. + Right, +} + /// TODO: docs #[derive(Copy, Clone, PartialEq, Eq)] #[cfg_attr( @@ -126,21 +145,32 @@ impl InnerAnchor { } } -/// A bias to use when creating an [`Anchor`]. -/// -/// This is used in the -/// [`Replica::create_anchor()`][crate::Replica::create_anchor] method to -/// create a new [`Anchor`]. See the documentation of that method for more -/// information. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -#[cfg_attr( - any(feature = "encode", feature = "serde"), - derive(serde::Serialize, serde::Deserialize) -)] -pub enum AnchorBias { - /// The anchor should attach to the left. - Left, +#[cfg(feature = "encode")] +mod encode { + use super::*; + use crate::{Decode, Encode, Int}; + + impl Encode for InnerAnchor { + #[inline] + fn encode(&self, buf: &mut Vec) { + Int::new(self.replica_id()).encode(buf); + Int::new(self.run_ts()).encode(buf); + Int::new(self.offset()).encode(buf); + } + } - /// The anchor should attach to the right. - Right, + impl Decode for InnerAnchor { + type Value = Self; + + type Error = core::convert::Infallible; + + #[inline] + fn decode(buf: &[u8]) -> Result<(Self, &[u8]), Self::Error> { + let (replica_id, buf) = Int::::decode(buf)?; + let (run_ts, buf) = Int::::decode(buf)?; + let (offset, buf) = Int::::decode(buf)?; + let anchor = Self::new(replica_id, offset, run_ts); + Ok((anchor, buf)) + } + } } From 643c8e3e5c6ba5c2eed7c03240f778494b93eee9 Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Wed, 24 Jan 2024 19:25:03 +0100 Subject: [PATCH 05/24] implement `Encode` and `Decode` for `Text` --- src/text.rs | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/src/text.rs b/src/text.rs index 1655198..54f11b7 100644 --- a/src/text.rs +++ b/src/text.rs @@ -109,3 +109,34 @@ impl Text { self.range.clone() } } + +#[cfg(feature = "encode")] +mod encode { + use super::*; + use crate::{Decode, Encode, Int}; + + impl Encode for Text { + #[inline] + fn encode(&self, buf: &mut Vec) { + Int::new(self.inserted_by).encode(buf); + Int::new(self.start()).encode(buf); + let len = self.end() - self.start(); + Int::new(len).encode(buf); + } + } + + impl Decode for Text { + type Value = Self; + + type Error = core::convert::Infallible; + + #[inline] + fn decode(buf: &[u8]) -> Result<(Self, &[u8]), Self::Error> { + let (inserted_by, buf) = Int::::decode(buf)?; + let (start, buf) = Int::::decode(buf)?; + let (len, buf) = Int::::decode(buf)?; + let text = Self { inserted_by, range: start..start + len }; + Ok((text, buf)) + } + } +} From 26c1e6952f19ff515a2fbba75524e6dcfd30f43f Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Wed, 24 Jan 2024 16:32:05 +0100 Subject: [PATCH 06/24] implement `Encode` and `Decode` for `Insertion` --- src/insertion.rs | 126 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 121 insertions(+), 5 deletions(-) diff --git a/src/insertion.rs b/src/insertion.rs index dae3418..1d131e0 100644 --- a/src/insertion.rs +++ b/src/insertion.rs @@ -1,5 +1,5 @@ use crate::anchor::InnerAnchor as Anchor; -use crate::*; +use crate::{LamportTs, Length, ReplicaId, RunTs, Text}; /// An insertion in CRDT coordinates. /// @@ -10,10 +10,6 @@ use crate::*; /// /// See the documentation of those methods for more information. #[derive(Debug, Clone, PartialEq, Eq)] -#[cfg_attr( - any(feature = "encode", feature = "serde"), - derive(serde::Serialize, serde::Deserialize) -)] pub struct Insertion { /// The anchor point of the insertion. anchor: Anchor, @@ -91,3 +87,123 @@ impl Insertion { &self.text } } + +#[cfg(feature = "encode")] +mod encode { + use super::*; + use crate::{Decode, Encode, Int}; + + impl Insertion { + #[inline] + fn encode_anchor(&self, run: InsertionRun, buf: &mut Vec) { + match run { + InsertionRun::BeginsNew => self.anchor.encode(buf), + InsertionRun::ContinuesExisting => {}, + } + } + + #[inline] + fn decode_anchor<'buf>( + run: InsertionRun, + text: &Text, + run_ts: RunTs, + buf: &'buf [u8], + ) -> Result<(Anchor, &'buf [u8]), ::Error> { + match run { + InsertionRun::BeginsNew => Anchor::decode(buf), + + InsertionRun::ContinuesExisting => { + let anchor = + Anchor::new(text.inserted_by(), text.start(), run_ts); + Ok((anchor, buf)) + }, + } + } + } + + impl Encode for Insertion { + #[inline] + fn encode(&self, buf: &mut Vec) { + self.text.encode(buf); + Int::new(self.run_ts).encode(buf); + Int::new(self.lamport_ts).encode(buf); + let run = InsertionRun::new(self); + run.encode(buf); + self.encode_anchor(run, buf); + } + } + + impl Decode for Insertion { + type Value = Self; + + type Error = core::convert::Infallible; + + #[inline] + fn decode(buf: &[u8]) -> Result<(Self, &[u8]), Self::Error> { + let (text, buf) = Text::decode(buf)?; + let (run_ts, buf) = Int::::decode(buf)?; + let (lamport_ts, buf) = Int::::decode(buf)?; + let (run, buf) = InsertionRun::decode(buf)?; + let (anchor, buf) = Self::decode_anchor(run, &text, run_ts, buf)?; + let insertion = Self::new(anchor, text, run_ts, lamport_ts); + Ok((insertion, buf)) + } + } + + /// TODO: docs + enum InsertionRun { + /// TODO: docs + BeginsNew, + + /// TODO: docs + ContinuesExisting, + } + + impl InsertionRun { + #[inline] + fn new(insertion: &Insertion) -> Self { + let is_continuation = insertion.anchor.replica_id() + == insertion.text.inserted_by() + && insertion.anchor.offset() == insertion.text.start(); + + if is_continuation { + Self::ContinuesExisting + } else { + Self::BeginsNew + } + } + } + + impl Encode for InsertionRun { + #[inline] + fn encode(&self, buf: &mut Vec) { + let is_continuation = matches!(self, Self::ContinuesExisting); + buf.push(is_continuation as u8); + } + } + + impl Decode for InsertionRun { + type Value = Self; + + type Error = core::convert::Infallible; + + #[inline] + fn decode(buf: &[u8]) -> Result<(Self, &[u8]), Self::Error> { + let Some((&first_byte, rest)) = buf.split_first() else { todo!() }; + + let this = match first_byte { + 0 => Self::BeginsNew, + 1 => Self::ContinuesExisting, + _other => todo!(), + }; + + Ok((this, rest)) + } + } +} + +#[cfg(feature = "serde")] +mod serde { + crate::impl_deserialize!(super::Insertion); + crate::impl_serialize!(super::Insertion); +} From 76973ea75cd0449057d11dac76f1d6a1aa4c80a3 Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Wed, 24 Jan 2024 22:00:57 +0100 Subject: [PATCH 07/24] properly handle errors in `Int`'s `Decode` impl --- src/anchor.rs | 4 ++-- src/encode.rs | 52 ++++++++++++++++++++++++++++++++++++++---------- src/insertion.rs | 6 +++--- src/lib.rs | 2 -- src/text.rs | 4 ++-- 5 files changed, 49 insertions(+), 19 deletions(-) diff --git a/src/anchor.rs b/src/anchor.rs index a323d77..d1b0cbb 100644 --- a/src/anchor.rs +++ b/src/anchor.rs @@ -148,7 +148,7 @@ impl InnerAnchor { #[cfg(feature = "encode")] mod encode { use super::*; - use crate::{Decode, Encode, Int}; + use crate::encode::{Decode, Encode, Int, IntDecodeError}; impl Encode for InnerAnchor { #[inline] @@ -162,7 +162,7 @@ mod encode { impl Decode for InnerAnchor { type Value = Self; - type Error = core::convert::Infallible; + type Error = IntDecodeError; #[inline] fn decode(buf: &[u8]) -> Result<(Self, &[u8]), Self::Error> { diff --git a/src/encode.rs b/src/encode.rs index 8e86392..930ec97 100644 --- a/src/encode.rs +++ b/src/encode.rs @@ -1,4 +1,4 @@ -use std::error::Error as StdError; +use core::fmt::Display; /// TODO: docs pub(crate) trait Encode { @@ -10,7 +10,7 @@ pub(crate) trait Encode { pub(crate) trait Decode { type Value: Sized; - type Error: StdError; + type Error: Display; /// TODO: docs fn decode(buf: &[u8]) -> Result<(Self::Value, &[u8]), Self::Error>; @@ -26,6 +26,35 @@ impl Int { } } +/// An error that can occur when decoding an [`Int`]. +pub(crate) enum IntDecodeError { + /// The buffer passed to `Int::decode` is empty. This is always an error, + /// even if the integer being decoded is zero. + EmptyBuffer, + + /// The actual byte length of the buffer is less than what was specified + /// in the prefix. + LengthLessThanPrefix { prefix: u8, actual: u8 }, +} + +impl Display for IntDecodeError { + #[inline] + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::EmptyBuffer => f.write_str( + "Int couldn't be decoded because the buffer is empty", + ), + Self::LengthLessThanPrefix { prefix, actual } => { + write!( + f, + "Int couldn't be decoded because the buffer's length is \ + {actual}, but the prefix specified a length of {prefix}", + ) + }, + } + } +} + impl_int_encode!(u8); impl_int_encode!(u16); impl_int_encode!(u32); @@ -46,7 +75,7 @@ impl Encode for Int { impl Decode for Int { type Value = usize; - type Error = core::convert::Infallible; + type Error = IntDecodeError; #[inline] fn decode(buf: &[u8]) -> Result<(usize, &[u8]), Self::Error> { @@ -85,14 +114,18 @@ macro_rules! impl_int_decode { impl Decode for Int<$ty> { type Value = $ty; - type Error = core::convert::Infallible; + type Error = $crate::encode::IntDecodeError; #[inline] fn decode(buf: &[u8]) -> Result<($ty, &[u8]), Self::Error> { - let Some((&len, buf)) = buf.split_first() else { todo!() }; + let (&len, buf) = + buf.split_first().ok_or(IntDecodeError::EmptyBuffer)?; if len as usize > buf.len() { - todo!(); + return Err(IntDecodeError::LengthLessThanPrefix { + prefix: len, + actual: buf.len() as u8, + }); } let mut array = [0u8; ::core::mem::size_of::<$ty>()]; @@ -110,7 +143,6 @@ macro_rules! impl_int_decode { } use impl_int_decode; - #[cfg(feature = "serde")] pub(crate) use serde::{impl_deserialize, impl_serialize}; @@ -127,7 +159,7 @@ mod serde { struct Visitor; impl<'de> ::serde::de::Visitor<'de> for Visitor { - type Value = <$ty as $crate::Decode>::Value; + type Value = <$ty as $crate::encode::Decode>::Value; #[inline] fn expecting( @@ -145,7 +177,7 @@ mod serde { where E: ::serde::de::Error, { - ::decode(v) + ::decode(v) .map(|(value, _rest)| value) .map_err(E::custom) } @@ -169,7 +201,7 @@ mod serde { S: ::serde::ser::Serializer, { let mut buf = Vec::new(); - ::encode(&self, &mut buf); + ::encode(&self, &mut buf); serializer.serialize_bytes(&buf) } } diff --git a/src/insertion.rs b/src/insertion.rs index 1d131e0..a5d76c1 100644 --- a/src/insertion.rs +++ b/src/insertion.rs @@ -91,7 +91,7 @@ impl Insertion { #[cfg(feature = "encode")] mod encode { use super::*; - use crate::{Decode, Encode, Int}; + use crate::encode::{Decode, Encode, Int, IntDecodeError}; impl Insertion { #[inline] @@ -136,14 +136,14 @@ mod encode { impl Decode for Insertion { type Value = Self; - type Error = core::convert::Infallible; + type Error = IntDecodeError; #[inline] fn decode(buf: &[u8]) -> Result<(Self, &[u8]), Self::Error> { let (text, buf) = Text::decode(buf)?; let (run_ts, buf) = Int::::decode(buf)?; let (lamport_ts, buf) = Int::::decode(buf)?; - let (run, buf) = InsertionRun::decode(buf)?; + let (run, buf) = InsertionRun::decode(buf).unwrap(); let (anchor, buf) = Self::decode_anchor(run, &text, run_ts, buf)?; let insertion = Self::new(anchor, text, run_ts, lamport_ts); Ok((insertion, buf)) diff --git a/src/lib.rs b/src/lib.rs index 499df0b..749aa27 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -151,8 +151,6 @@ pub use deletion::Deletion; #[cfg(feature = "serde")] use encode::{impl_deserialize, impl_serialize}; #[cfg(feature = "encode")] -use encode::{Decode, Encode, Int}; -#[cfg(feature = "encode")] use encoded_replica::{checksum, checksum_array}; #[cfg(feature = "encode")] pub use encoded_replica::{DecodeError, EncodedReplica}; diff --git a/src/text.rs b/src/text.rs index 54f11b7..d0f5164 100644 --- a/src/text.rs +++ b/src/text.rs @@ -113,7 +113,7 @@ impl Text { #[cfg(feature = "encode")] mod encode { use super::*; - use crate::{Decode, Encode, Int}; + use crate::encode::{Decode, Encode, Int, IntDecodeError}; impl Encode for Text { #[inline] @@ -128,7 +128,7 @@ mod encode { impl Decode for Text { type Value = Self; - type Error = core::convert::Infallible; + type Error = IntDecodeError; #[inline] fn decode(buf: &[u8]) -> Result<(Self, &[u8]), Self::Error> { From b5f2f4ab3ca4a94d56bfb699a7dada2d4d8e93e6 Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Wed, 24 Jan 2024 22:12:05 +0100 Subject: [PATCH 08/24] properly handle errors in `Insertion`'s `Decode` impl --- src/insertion.rs | 70 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 65 insertions(+), 5 deletions(-) diff --git a/src/insertion.rs b/src/insertion.rs index a5d76c1..19508e3 100644 --- a/src/insertion.rs +++ b/src/insertion.rs @@ -133,17 +133,48 @@ mod encode { } } + pub(crate) enum InsertionDecodeError { + Int(IntDecodeError), + Run(InsertionRunDecodeError), + } + + impl From for InsertionDecodeError { + #[inline] + fn from(err: IntDecodeError) -> Self { + Self::Int(err) + } + } + + impl From for InsertionDecodeError { + #[inline] + fn from(err: InsertionRunDecodeError) -> Self { + Self::Run(err) + } + } + + impl core::fmt::Display for InsertionDecodeError { + #[inline] + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let err: &dyn core::fmt::Display = match self { + Self::Int(err) => err, + Self::Run(err) => err, + }; + + write!(f, "InsertionRun couldn't be decoded: {err}") + } + } + impl Decode for Insertion { type Value = Self; - type Error = IntDecodeError; + type Error = InsertionDecodeError; #[inline] fn decode(buf: &[u8]) -> Result<(Self, &[u8]), Self::Error> { let (text, buf) = Text::decode(buf)?; let (run_ts, buf) = Int::::decode(buf)?; let (lamport_ts, buf) = Int::::decode(buf)?; - let (run, buf) = InsertionRun::decode(buf).unwrap(); + let (run, buf) = InsertionRun::decode(buf)?; let (anchor, buf) = Self::decode_anchor(run, &text, run_ts, buf)?; let insertion = Self::new(anchor, text, run_ts, lamport_ts); Ok((insertion, buf)) @@ -182,19 +213,48 @@ mod encode { } } + pub(crate) enum InsertionRunDecodeError { + EmptyBuffer, + InvalidByte(u8), + } + + impl core::fmt::Display for InsertionRunDecodeError { + #[inline] + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + Self::EmptyBuffer => f.write_str( + "InsertionRun couldn't be decoded because the buffer is \ + empty", + ), + Self::InvalidByte(byte) => { + write!( + f, + "InsertionRun cannot be decoded from byte {}, it \ + must be 0 or 1", + byte, + ) + }, + } + } + } + impl Decode for InsertionRun { type Value = Self; - type Error = core::convert::Infallible; + type Error = InsertionRunDecodeError; #[inline] fn decode(buf: &[u8]) -> Result<(Self, &[u8]), Self::Error> { - let Some((&first_byte, rest)) = buf.split_first() else { todo!() }; + let (&first_byte, rest) = buf + .split_first() + .ok_or(InsertionRunDecodeError::EmptyBuffer)?; let this = match first_byte { 0 => Self::BeginsNew, 1 => Self::ContinuesExisting, - _other => todo!(), + other => { + return Err(InsertionRunDecodeError::InvalidByte(other)) + }, }; Ok((this, rest)) From 17f1c9290755c09dc2ec8448de087f53bd8701e8 Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Wed, 24 Jan 2024 22:19:21 +0100 Subject: [PATCH 09/24] encode `Int` with a single byte when possible --- src/encode.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/encode.rs b/src/encode.rs index 930ec97..4b71483 100644 --- a/src/encode.rs +++ b/src/encode.rs @@ -88,7 +88,16 @@ macro_rules! impl_int_encode { impl Encode for Int<$ty> { #[inline] fn encode(&self, buf: &mut Vec) { - let array = self.0.to_le_bytes(); + let int = self.0; + + // We can encode the entire integer with a single byte if it + // falls within this range. + if int == 0 || (int > 8 && int <= u8::MAX as $ty) { + buf.push(int as u8); + return; + } + + let array = int.to_le_bytes(); let num_trailing_zeros = array .iter() @@ -121,6 +130,11 @@ macro_rules! impl_int_decode { let (&len, buf) = buf.split_first().ok_or(IntDecodeError::EmptyBuffer)?; + if len == 0 || len > 8 { + let int = len as $ty; + return Ok((int, buf)); + } + if len as usize > buf.len() { return Err(IntDecodeError::LengthLessThanPrefix { prefix: len, From b5a6b34220ea637ab4d96140a7eca2fa614066db Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Wed, 24 Jan 2024 22:34:27 +0100 Subject: [PATCH 10/24] fix docs --- src/insertion.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/insertion.rs b/src/insertion.rs index 19508e3..76d6f6c 100644 --- a/src/insertion.rs +++ b/src/insertion.rs @@ -3,12 +3,15 @@ use crate::{LamportTs, Length, ReplicaId, RunTs, Text}; /// An insertion in CRDT coordinates. /// -/// This struct is created by the [`inserted`](Replica::inserted) method on the -/// [`Replica`] owned by the peer that performed the insertion, and can be -/// integrated by another [`Replica`] via the -/// [`integrate_insertion`](Replica::integrate_insertion) method. +/// This struct is created by the [`inserted`] method on the [`Replica`] owned +/// by the peer that performed the insertion, and can be integrated by another +/// [`Replica`] via the [`integrate_insertion`] method. /// /// See the documentation of those methods for more information. +/// +/// [`Replica`]: crate::Replica +/// [`inserted`]: crate::Replica::inserted +/// [`integrate_insertion`]: crate::Replica::integrate_insertion #[derive(Debug, Clone, PartialEq, Eq)] pub struct Insertion { /// The anchor point of the insertion. From aa33e8b9cc95825fb6fb68e3459a12f4ad8dc5bf Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Thu, 25 Jan 2024 12:27:15 +0100 Subject: [PATCH 11/24] test `Encode`ing and `Decode`ing of `Int`s --- src/encode.rs | 106 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) diff --git a/src/encode.rs b/src/encode.rs index 4b71483..0d0c806 100644 --- a/src/encode.rs +++ b/src/encode.rs @@ -27,6 +27,7 @@ impl Int { } /// An error that can occur when decoding an [`Int`]. +#[cfg_attr(test, derive(PartialEq, Eq))] pub(crate) enum IntDecodeError { /// The buffer passed to `Int::decode` is empty. This is always an error, /// even if the integer being decoded is zero. @@ -225,3 +226,108 @@ mod serde { pub(crate) use impl_deserialize; pub(crate) use impl_serialize; } + +#[cfg(test)] +mod tests { + use super::*; + + impl core::fmt::Debug for IntDecodeError { + #[inline] + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + core::fmt::Display::fmt(self, f) + } + } + + /// Tests that some integers can be encoded with a single byte. + #[test] + fn encode_int_single_byte() { + let ints = core::iter::once(0).chain(9..=u8::MAX as u64); + + let mut buf = Vec::new(); + + for int in ints { + Int::new(int).encode(&mut buf); + assert_eq!(buf.len(), 1); + let (decoded, rest) = Int::::decode(&buf).unwrap(); + assert_eq!(int, decoded); + assert!(rest.is_empty()); + buf.clear(); + } + } + + /// Tests that integers are encoded using the correct number of bytes. + #[test] + fn encode_int_num_bytes() { + let ints = (1..=8).chain([ + u8::MAX as u64 + 1, + u16::MAX as u64, + u16::MAX as u64 + 1, + u32::MAX as u64, + u32::MAX as u64 + 1, + u64::MAX, + ]); + + let mut buf = Vec::new(); + + // The highest number that can be represented with this many bytes. + let max_num_with_n_bytes = |n_bytes: u8| { + let bits = n_bytes * 8; + // We use a u128 here to avoid overlowing if `n_bytes` is 8. + ((1u128 << bits) - 1) as u64 + }; + + for int in ints { + Int::new(int).encode(&mut buf); + + let expected_len = (1..=8) + .map(|n_bytes| (n_bytes, max_num_with_n_bytes(n_bytes))) + .find_map(|(n_bytes, max_for_bytes)| { + (int <= max_for_bytes).then_some(n_bytes) + }) + .unwrap(); + + assert_eq!(buf[0], expected_len); + + assert_eq!(buf[1..].len() as u8, expected_len); + + let (decoded, rest) = Int::::decode(&buf).unwrap(); + + assert_eq!(int, decoded); + + assert!(rest.is_empty()); + + buf.clear(); + } + } + + /// Tests that decoding an `Int` fails if the buffer is empty. + #[test] + fn encode_int_fails_if_buffer_empty() { + let mut buf = Vec::new(); + + Int::new(42u32).encode(&mut buf); + + buf.clear(); + + assert_eq!( + Int::::decode(&buf).unwrap_err(), + IntDecodeError::EmptyBuffer + ); + } + + /// Tests that decoding an `Int` fails if the length specified in the + /// prefix is greater than the actual length of the buffer. + #[test] + fn encode_int_fails_if_buffer_too_short() { + let mut buf = Vec::new(); + + Int::new(u8::MAX as u16 + 1).encode(&mut buf); + + buf.pop(); + + assert_eq!( + Int::::decode(&buf).unwrap_err(), + IntDecodeError::LengthLessThanPrefix { prefix: 2, actual: 1 } + ); + } +} From e26e4721f2fa35be70deb033b1152ecc69636c72 Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Thu, 25 Jan 2024 12:50:19 +0100 Subject: [PATCH 12/24] fix logic to determine value of `InsertionRun` --- src/encode.rs | 2 +- src/insertion.rs | 13 ++++++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/encode.rs b/src/encode.rs index 0d0c806..f06b9ad 100644 --- a/src/encode.rs +++ b/src/encode.rs @@ -78,7 +78,7 @@ impl Decode for Int { type Error = IntDecodeError; - #[inline] + #[inline(always)] fn decode(buf: &[u8]) -> Result<(usize, &[u8]), Self::Error> { Int::::decode(buf).map(|(value, rest)| (value as usize, rest)) } diff --git a/src/insertion.rs b/src/insertion.rs index 76d6f6c..1eca64b 100644 --- a/src/insertion.rs +++ b/src/insertion.rs @@ -196,9 +196,20 @@ mod encode { impl InsertionRun { #[inline] fn new(insertion: &Insertion) -> Self { + // To determine whether this insertion is a continuation of an + // existing insertion run we simply check: + // + // 1: the `ReplicaId`s of the anchor and the text. Clearly they + // must match because you can't continue someone else's + // insertion; + // + // 2: the `RunTs` of the anchor and the insertion. Since that + // counter is only incremented when a new insertion run begins, + // we know that if they match then this insertion must continue + // an existing run. let is_continuation = insertion.anchor.replica_id() == insertion.text.inserted_by() - && insertion.anchor.offset() == insertion.text.start(); + && insertion.anchor.run_ts() == insertion.run_ts(); if is_continuation { Self::ContinuesExisting From 81f725763e3f74fa5c6b07f1ad3f0ec845cdf86d Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Thu, 25 Jan 2024 13:29:46 +0100 Subject: [PATCH 13/24] document `InsertionRun` --- src/insertion.rs | 38 +++++++++++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/src/insertion.rs b/src/insertion.rs index 1eca64b..91e4fdb 100644 --- a/src/insertion.rs +++ b/src/insertion.rs @@ -184,12 +184,44 @@ mod encode { } } - /// TODO: docs + /// Whether an [`Insertion`] begins a new run or continues an existing one. + /// + /// This is used when encoding and decoding [`Insertion`]s to determine + /// whether their [`Anchor`] needs to be encoded. + /// + /// Most of the time when people edit a document they insert a bunch of + /// characters in a single run before moving the cursor or deleting some + /// text, and we can use this pattern to save some bytes. + /// + /// For example, if someone types "foo" sequentially in a blank document, + /// we'll create the following insertions (assuming a `ReplicaId` of 1 and + /// omitting fields that aren't relevant to this discussion): + /// + /// ```text + /// f -> Insertion { anchor: zero, text: 1.0..1, .. }, + /// o -> Insertion { anchor: 1.1, text: 1.1..2, .. }, + /// o -> Insertion { anchor: 1.2, text: 1.2..3, .. }, + /// ``` + /// + /// The first insertion begins a new run, but from then on every + /// Insertion's anchor is the same as the start of its text. + /// + /// This means that we can save space when encoding by omitting the anchor + /// and adding a flag that indicates that it should be derived from the + /// text and the run timestamp. + /// + /// This enum corresponds to that flag. enum InsertionRun { - /// TODO: docs + /// The [`Insertion`] begins a new run. + /// + /// In this case we also encode the insertion's [`Anchor`]. BeginsNew, - /// TODO: docs + /// The [`Insertion`] continues an existing run. + /// + /// In this case we can avoid encoding the insertion's [`Anchor`] + /// because it can be fully decoded from the insertion's [`Text`] and + /// [`RunTs`]. ContinuesExisting, } From e98703136c2ae74b54487e183fe5308b3cc1a0c9 Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Thu, 25 Jan 2024 16:40:38 +0100 Subject: [PATCH 14/24] document `Text`'s `Encode` impl --- src/text.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/text.rs b/src/text.rs index d0f5164..b3a69a3 100644 --- a/src/text.rs +++ b/src/text.rs @@ -120,6 +120,12 @@ mod encode { fn encode(&self, buf: &mut Vec) { Int::new(self.inserted_by).encode(buf); Int::new(self.start()).encode(buf); + // We encode the length of the text because it's often smaller than + // its end, especially for longer editing sessions. + // + // For example, if a user inserts a character after already having + // inserted 1000 before, it's better to encode `1000, 1` rather + // than `1000, 1001`. let len = self.end() - self.start(); Int::new(len).encode(buf); } From c82a5a54c6825b39325fd297e96747213d155db7 Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Thu, 25 Jan 2024 17:07:54 +0100 Subject: [PATCH 15/24] document `Int` --- src/encode.rs | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/encode.rs b/src/encode.rs index f06b9ad..a5421dd 100644 --- a/src/encode.rs +++ b/src/encode.rs @@ -17,6 +17,43 @@ pub(crate) trait Decode { } /// A variable-length encoded integer. +/// +/// This is a newtype around integers that can be `Encode`d using a variable +/// number of bytes based on their value. When encoding, we: +/// +/// - turn the integer into the corresponding little-endian byte array. The +/// resulting array will have a fixed length equal to `mem::size_of::`; +/// +/// - ignore any trailing zeroes in the resulting byte array; +/// +/// - push the length of the resulting byte slice; +/// +/// - push the byte slice; +/// +/// For example, `256u64` gets encoded as `[2, 0, 1]`. +/// +/// With this scheme we could potentially encode integers up to +/// `2 ^ (255 * 8) - 1`, which is ridiculously overkill for our use case since +/// we only need to encode integers up to `u64::MAX`. +/// +/// Because of this, we actually use the first byte to encode the integer +/// itself if it's either 0 or between 9 and 255. We don't do this for 1..=8 +/// because we need to reserve those to represent the number of bytes that +/// follow. +/// +/// A few examples: +/// +/// - `0` is encoded as `[0]`; +/// +/// - `1` is encoded as `[1, 1]`; +/// +/// - `8` is encoded as `[1, 8]`; +/// +/// - `9` is encoded as `[9]`; +/// +/// - `255` is encoded as `[255]`; +/// +/// - numbers greater than 255 are always encoded as `[length, ..bytes..]`. pub(crate) struct Int(I); impl Int { From b9648af909babb90561b3c5a07662993e450a4fa Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Thu, 25 Jan 2024 17:47:58 +0100 Subject: [PATCH 16/24] implement `visit_seq` inside `impl_deserialize!` --- src/encode.rs | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/encode.rs b/src/encode.rs index a5421dd..c64ce32 100644 --- a/src/encode.rs +++ b/src/encode.rs @@ -233,6 +233,27 @@ mod serde { .map(|(value, _rest)| value) .map_err(E::custom) } + + #[inline] + fn visit_seq( + self, + mut seq: A, + ) -> Result + where + A: ::serde::de::SeqAccess<'de>, + { + let size = seq.size_hint().unwrap_or(0); + let mut buf = + ::alloc::vec::Vec::::with_capacity(size); + while let Some(byte) = seq.next_element()? { + buf.push(byte); + } + ::decode( + &buf, + ) + .map(|(value, _rest)| value) + .map_err(::custom) + } } deserializer.deserialize_bytes(Visitor) From adf6396f817976d62686364978c3cb51b0358541 Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Thu, 25 Jan 2024 17:38:42 +0100 Subject: [PATCH 17/24] test serde's `Serialize` and `Deserialize` impls --- Cargo.toml | 1 + tests/common/mod.rs | 1 + tests/encode.rs | 8 ++++-- tests/serde.rs | 67 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 74 insertions(+), 3 deletions(-) create mode 100644 tests/serde.rs diff --git a/Cargo.toml b/Cargo.toml index 0d72bd0..cdf520c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ sha2 = { version = "0.10", optional = true } criterion = "0.5" rand = "0.8" rand_chacha = "0.3" +serde_json = "1" traces = { path = "./traces" } [[bench]] diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 7ccb117..1d1124e 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -39,6 +39,7 @@ impl PartialEq for &str { } #[derive(Clone, Debug)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub enum Edit { Insertion(cola::Insertion, String), Deletion(cola::Deletion), diff --git a/tests/encode.rs b/tests/encode.rs index 1641413..60f2eef 100644 --- a/tests/encode.rs +++ b/tests/encode.rs @@ -2,6 +2,7 @@ mod encode { use cola::Replica; + /// Tests an encode-decode round-trip of an empty `Replica`. #[test] fn encode_empty() { let replica = Replica::new(1, 42); @@ -10,16 +11,17 @@ mod encode { assert!(replica.eq_decoded(&decoded)); } + /// Tests an encode-decode round-trip of a `Replica` that has gone through + /// the `automerge` trace. #[test] - #[allow(unused_must_use)] fn encode_automerge() { let automerge = traces::automerge().chars_to_bytes(); let mut replica = Replica::new(1, automerge.start_content().len()); for (start, end, text) in automerge.edits() { - replica.deleted(start..end); - replica.inserted(start, text.len()); + let _ = replica.deleted(start..end); + let _ = replica.inserted(start, text.len()); } let encoded = replica.encode(); diff --git a/tests/serde.rs b/tests/serde.rs new file mode 100644 index 0000000..efd553d --- /dev/null +++ b/tests/serde.rs @@ -0,0 +1,67 @@ +mod common; + +#[cfg(feature = "serde")] +mod serde { + use traces::{ConcurrentTraceInfos, Crdt, Edit}; + + use super::common::{self, Replica}; + + type Encode = fn(&common::Edit) -> Vec; + + type Decode = fn(Vec) -> common::Edit; + + fn test_trace( + trace: ConcurrentTraceInfos, + encode: Encode, + decode: Decode, + ) { + let ConcurrentTraceInfos { trace, mut peers, final_content, .. } = + trace; + + for edit in trace.edits() { + match edit { + Edit::Insertion(idx, offset, text) => { + peers[*idx].local_insert(*offset, text); + peers[*idx].assert_invariants(); + }, + Edit::Deletion(idx, start, end) => { + peers[*idx].local_delete(*start, *end); + peers[*idx].assert_invariants(); + }, + Edit::Merge(idx, edit) => { + let encoded = encode(edit); + let decoded = decode(encoded); + peers[*idx].remote_merge(&decoded); + peers[*idx].assert_invariants(); + }, + } + } + + for replica in &mut peers { + replica.merge_backlogged(); + } + + for replica in &peers { + assert_eq!(replica.buffer, final_content); + } + } + + /// Tests that the `friends-forever` trace converges if we serialize and + /// deserialize every edit before applying it. + #[test] + fn serde_friends_forever_round_trip() { + test_trace( + traces::friends_forever(), + serde_json_encode, + serde_json_decode, + ); + } + + fn serde_json_encode(edit: &common::Edit) -> Vec { + serde_json::to_vec(edit).unwrap() + } + + fn serde_json_decode(buf: Vec) -> common::Edit { + serde_json::from_slice(&buf).unwrap() + } +} From ddf16e37ba4d91cae775ca54eb4fed75fc488663 Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Thu, 25 Jan 2024 18:13:30 +0100 Subject: [PATCH 18/24] fix typo --- src/insertion.rs | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/src/insertion.rs b/src/insertion.rs index 91e4fdb..39c073b 100644 --- a/src/insertion.rs +++ b/src/insertion.rs @@ -179,7 +179,7 @@ mod encode { let (lamport_ts, buf) = Int::::decode(buf)?; let (run, buf) = InsertionRun::decode(buf)?; let (anchor, buf) = Self::decode_anchor(run, &text, run_ts, buf)?; - let insertion = Self::new(anchor, text, run_ts, lamport_ts); + let insertion = Self::new(anchor, text, lamport_ts, run_ts); Ok((insertion, buf)) } } @@ -313,3 +313,27 @@ mod serde { crate::impl_deserialize!(super::Insertion); crate::impl_serialize!(super::Insertion); } + +#[cfg(all(test, feature = "encode"))] +mod encode_tests { + use super::*; + use crate::encode::{Decode, Encode}; + + impl core::fmt::Debug for encode::InsertionDecodeError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + core::fmt::Display::fmt(self, f) + } + } + + #[test] + fn encode_insertion_round_trip_0() { + let anchor = Anchor::new(1, 1, 1); + let text = Text::new(2, 0..1); + let insertion = Insertion::new(anchor, text, 3, 0); + let mut buf = Vec::new(); + insertion.encode(&mut buf); + let (decoded, rest) = Insertion::decode(&buf).unwrap(); + assert_eq!(insertion, decoded); + assert!(rest.is_empty()); + } +} From ad89163356f827c59152a70d9d785c302796c956 Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Thu, 25 Jan 2024 18:13:55 +0100 Subject: [PATCH 19/24] print `RunTs` in `InnerAnchor`'s `Debug` if `f.alternate()` is on --- src/anchor.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/anchor.rs b/src/anchor.rs index d1b0cbb..2c07cf2 100644 --- a/src/anchor.rs +++ b/src/anchor.rs @@ -102,6 +102,12 @@ impl core::fmt::Debug for InnerAnchor { fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { if self == &Self::zero() { write!(f, "zero") + } else if f.alternate() { + write!( + f, + "{:x}.{} in {}", + self.replica_id, self.offset, self.contained_in + ) } else { write!(f, "{:x}.{}", self.replica_id, self.offset) } From 639b6af1117751f3da1a9e95a26fa4d248b8064a Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Thu, 25 Jan 2024 18:14:19 +0100 Subject: [PATCH 20/24] tests: derive `PartialEq` and `Eq` for `Edit` --- tests/common/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 1d1124e..9e57b7e 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -38,7 +38,7 @@ impl PartialEq for &str { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub enum Edit { Insertion(cola::Insertion, String), From 54a9ae2ae85a4659464a41413288d5365d4029c4 Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Thu, 25 Jan 2024 18:46:05 +0100 Subject: [PATCH 21/24] tests: check total sizes of `Replica`, `Insertion`s and `Deletion`s after running trace --- Cargo.toml | 1 + tests/serde.rs | 126 ++++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 109 insertions(+), 18 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index cdf520c..813f09c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ serde = { version = "1.0", features = ["derive"], optional = true } sha2 = { version = "0.10", optional = true } [dev-dependencies] +bincode = "1.3" criterion = "0.5" rand = "0.8" rand_chacha = "0.3" diff --git a/tests/serde.rs b/tests/serde.rs index efd553d..d94b1d7 100644 --- a/tests/serde.rs +++ b/tests/serde.rs @@ -2,18 +2,48 @@ mod common; #[cfg(feature = "serde")] mod serde { - use traces::{ConcurrentTraceInfos, Crdt, Edit}; + use serde::de::DeserializeOwned; + use serde::ser::Serialize; + use traces::{ConcurrentTraceInfos, Crdt, Edit, SequentialTrace}; - use super::common::{self, Replica}; + use super::common; - type Encode = fn(&common::Edit) -> Vec; + trait Encoder { + const NAME: &'static str; + fn encode(value: &T) -> Vec; + fn decode(buf: Vec) -> T; + } + + struct SerdeJson; + + impl Encoder for SerdeJson { + const NAME: &'static str = "serde_json"; + + fn encode(value: &T) -> Vec { + serde_json::to_vec(value).unwrap() + } + + fn decode(buf: Vec) -> T { + serde_json::from_slice(&buf).unwrap() + } + } + + struct Bincode; - type Decode = fn(Vec) -> common::Edit; + impl Encoder for Bincode { + const NAME: &'static str = "bincode"; - fn test_trace( - trace: ConcurrentTraceInfos, - encode: Encode, - decode: Decode, + fn encode(value: &T) -> Vec { + bincode::serialize(value).unwrap() + } + + fn decode(buf: Vec) -> T { + bincode::deserialize(&buf).unwrap() + } + } + + fn test_trace( + trace: ConcurrentTraceInfos, ) { let ConcurrentTraceInfos { trace, mut peers, final_content, .. } = trace; @@ -29,8 +59,8 @@ mod serde { peers[*idx].assert_invariants(); }, Edit::Merge(idx, edit) => { - let encoded = encode(edit); - let decoded = decode(encoded); + let encoded = E::encode(edit); + let decoded = E::decode(encoded); peers[*idx].remote_merge(&decoded); peers[*idx].assert_invariants(); }, @@ -50,18 +80,78 @@ mod serde { /// deserialize every edit before applying it. #[test] fn serde_friends_forever_round_trip() { - test_trace( - traces::friends_forever(), - serde_json_encode, - serde_json_decode, + test_trace::<2, SerdeJson>(traces::friends_forever()); + } + + /// Runs a trace and prints the total size of the serialized `Insertion`s + /// and `Deletion`s. + fn serde_sizes(trace: &SequentialTrace) { + let trace = trace.chars_to_bytes(); + + let mut replica = cola::Replica::new(1, trace.start_content().len()); + + let mut insertions = Vec::new(); + + let mut deletions = Vec::new(); + + for (start, end, text) in trace.edits() { + if end > start { + let deletion = replica.deleted(start..end); + deletions.push(E::encode(&deletion)); + } + + if !text.is_empty() { + let insertion = replica.inserted(start, text.len()); + insertions.push(E::encode(&insertion)); + } + } + + let printed_size = |num_bytes: usize| { + let num_bytes = num_bytes as f64; + + if num_bytes < 1024.0 { + format!("{} B", num_bytes) + } else if num_bytes < 1024.0 * 1024.0 { + format!("{:.2} KB", num_bytes / 1024.0) + } else if num_bytes < 1024.0 * 1024.0 * 1024.0 { + format!("{:.2} MB", num_bytes / 1024.0 / 1024.0) + } else { + format!("{:.2} GB", num_bytes / 1024.0 / 1024.0 / 1024.0) + } + }; + + let replica_size = E::encode(&replica.encode()).len(); + + println!("{} | Replica: {}", E::NAME, printed_size(replica_size)); + + let total_insertions_size = + insertions.iter().map(Vec::len).sum::(); + + println!( + "{} | Total insertions: {}", + E::NAME, + printed_size(total_insertions_size) + ); + + let total_deletions_size = + deletions.iter().map(Vec::len).sum::(); + + println!( + "{} | Total deletions: {}", + E::NAME, + printed_size(total_deletions_size) ); } - fn serde_json_encode(edit: &common::Edit) -> Vec { - serde_json::to_vec(edit).unwrap() + // `cargo t --features=serde serde_automerge_json_sizes -- --nocapture` + #[test] + fn serde_automerge_json_sizes() { + serde_sizes::(&traces::automerge()); } - fn serde_json_decode(buf: Vec) -> common::Edit { - serde_json::from_slice(&buf).unwrap() + // `cargo t --features=serde serde_automerge_bincode_sizes -- --nocapture` + #[test] + fn serde_automerge_bincode_sizes() { + serde_sizes::(&traces::automerge()); } } From 8bc0eb7fdfe7a5a699101b6f2c21712aa0602257 Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Thu, 25 Jan 2024 19:02:20 +0100 Subject: [PATCH 22/24] update `CHANGELOG` --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 02c5790..4066153 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ ## [Unreleased] +### Changed + +- the `Serialize` impl of `Insertion` now produces 3-4x smaller payloads, + depending on the data format used ([#5](https://github.com/nomad/cola/pull/5)); + ## [0.3.0] - Jan 9 2024 ### Added From 078a2690da337b067d600f9313344a6adf6a0fa0 Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Thu, 25 Jan 2024 19:12:48 +0100 Subject: [PATCH 23/24] implement `Serialize` and `Deserialize` for `Text` based on encoding --- src/insertion.rs | 4 ++-- src/lib.rs | 2 -- src/text.rs | 7 ++++++- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/insertion.rs b/src/insertion.rs index 39c073b..99fb7ba 100644 --- a/src/insertion.rs +++ b/src/insertion.rs @@ -310,8 +310,8 @@ mod encode { #[cfg(feature = "serde")] mod serde { - crate::impl_deserialize!(super::Insertion); - crate::impl_serialize!(super::Insertion); + crate::encode::impl_deserialize!(super::Insertion); + crate::encode::impl_serialize!(super::Insertion); } #[cfg(all(test, feature = "encode"))] diff --git a/src/lib.rs b/src/lib.rs index 749aa27..3bcd7d5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -148,8 +148,6 @@ pub use anchor::{Anchor, AnchorBias}; use backlog::Backlog; pub use backlog::{BackloggedDeletions, BackloggedInsertions}; pub use deletion::Deletion; -#[cfg(feature = "serde")] -use encode::{impl_deserialize, impl_serialize}; #[cfg(feature = "encode")] use encoded_replica::{checksum, checksum_array}; #[cfg(feature = "encode")] diff --git a/src/text.rs b/src/text.rs index b3a69a3..5e80b4a 100644 --- a/src/text.rs +++ b/src/text.rs @@ -10,7 +10,6 @@ use crate::*; /// [`inserted_by`](Text::inserted_by) and /// [`temporal_range`](Text::temporal_range) methods respectively. #[derive(Clone, PartialEq, Eq, Hash)] -#[cfg_attr(feature = "encode", derive(serde::Serialize, serde::Deserialize))] pub struct Text { pub(crate) inserted_by: ReplicaId, pub(crate) range: Range, @@ -146,3 +145,9 @@ mod encode { } } } + +#[cfg(feature = "serde")] +mod serde { + crate::encode::impl_deserialize!(super::Text); + crate::encode::impl_serialize!(super::Text); +} From 07802a9bf5121b4f14c2153fb546e067b6a26339 Mon Sep 17 00:00:00 2001 From: Riccardo Mazzarini Date: Thu, 25 Jan 2024 19:23:17 +0100 Subject: [PATCH 24/24] tests: add `Zstd` --- Cargo.toml | 1 + tests/serde.rs | 39 ++++++++++++++++++++++++++++++++++++--- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 813f09c..27482c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ rand = "0.8" rand_chacha = "0.3" serde_json = "1" traces = { path = "./traces" } +zstd = "0.13" [[bench]] name = "traces" diff --git a/tests/serde.rs b/tests/serde.rs index d94b1d7..5bf7c12 100644 --- a/tests/serde.rs +++ b/tests/serde.rs @@ -10,6 +10,9 @@ mod serde { trait Encoder { const NAME: &'static str; + fn name() -> impl std::fmt::Display { + Self::NAME + } fn encode(value: &T) -> Vec; fn decode(buf: Vec) -> T; } @@ -42,6 +45,24 @@ mod serde { } } + struct Zstd(std::marker::PhantomData); + + impl Encoder for Zstd { + const NAME: &'static str = "zstd'd "; + + fn name() -> impl std::fmt::Display { + format!("{}{}", Self::NAME, E::name()) + } + + fn encode(value: &T) -> Vec { + zstd::stream::encode_all(&*E::encode(value), 0).unwrap() + } + + fn decode(buf: Vec) -> T { + E::decode(zstd::stream::decode_all(&*buf).unwrap()) + } + } + fn test_trace( trace: ConcurrentTraceInfos, ) { @@ -122,14 +143,14 @@ mod serde { let replica_size = E::encode(&replica.encode()).len(); - println!("{} | Replica: {}", E::NAME, printed_size(replica_size)); + println!("{} | Replica: {}", E::name(), printed_size(replica_size)); let total_insertions_size = insertions.iter().map(Vec::len).sum::(); println!( "{} | Total insertions: {}", - E::NAME, + E::name(), printed_size(total_insertions_size) ); @@ -138,7 +159,7 @@ mod serde { println!( "{} | Total deletions: {}", - E::NAME, + E::name(), printed_size(total_deletions_size) ); } @@ -154,4 +175,16 @@ mod serde { fn serde_automerge_bincode_sizes() { serde_sizes::(&traces::automerge()); } + + // `cargo t --features=serde serde_automerge_compressed_json_sizes -- --nocapture` + #[test] + fn serde_automerge_compressed_json_sizes() { + serde_sizes::>(&traces::automerge()); + } + + // `cargo t --features=serde serde_automerge_compressed_bincode_sizes -- --nocapture` + #[test] + fn serde_automerge_compressed_bincode_sizes() { + serde_sizes::>(&traces::automerge()); + } }