Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make serialized Replicas smaller #6

Merged
merged 24 commits into from
Jan 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
cd74f28
refactors
noib3 Jan 27, 2024
8dcd286
sketch `RunTree`'s `Encode` impl
noib3 Jan 27, 2024
6e0b47d
implement `Encode` and `Decode` for `bool`
noib3 Jan 27, 2024
e9ffa20
implement `Encode` and `Decode` for `InodeIdx`, `LeafIdx`
noib3 Jan 27, 2024
a1755fc
implement `Encode` for `RunFragment`
noib3 Jan 27, 2024
3cb07f6
implement `DecodeWithCtx` for `RunFragment`
noib3 Jan 27, 2024
d295996
implement `DecodeWithCtx` for the other types
noib3 Jan 27, 2024
b81ba8e
remove `Int`s
noib3 Jan 27, 2024
17bc318
remove the `Length` trait
noib3 Jan 27, 2024
ed54376
implement `Encode` and `Decode` for `NodeIdx`
noib3 Jan 27, 2024
b5f3f69
implement `Encode` and `Decode` for `Inode`
noib3 Jan 27, 2024
c559b2d
remove unused `Serialize` and `Deserialize` impls
noib3 Jan 27, 2024
9e25c4d
correctly encode the number of fragments
noib3 Jan 27, 2024
d9378c9
fix `Gtree`'s `PartialEq` impl
noib3 Jan 27, 2024
e7e9e26
don't push `Lnode`s when decoding, insert them into the right slots
noib3 Jan 27, 2024
848e975
implement `Encode` and `Decode` for `BaseMap`
noib3 Jan 28, 2024
0b48b27
implement `Encode` for `Backlog`
noib3 Jan 28, 2024
78bbb1f
implement `Decode` for `Backlog`
noib3 Jan 28, 2024
4b85c05
implement `Serialize` and `Deserialize` for `Backlog`
noib3 Jan 28, 2024
6e60d46
implement `Encode` and `Decode` for `LamportClock`, `RunClock`
noib3 Jan 28, 2024
d33fc20
implement `Encode` and `Decode` for `Replica`
noib3 Jan 28, 2024
4e8e7ff
implement `Encode` and `Decode` for `EncodedReplica`
noib3 Jan 28, 2024
a5720f8
remove unused deps
noib3 Jan 28, 2024
6652a26
update `CHANGELOG`
noib3 Jan 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

### Changed

- the `Serialize` impl of `EncodedReplica` now produces ~7x smaller payloads,
depending on the data format used
([#6](https://github.com/nomad/cola/pull/6));

- the `Serialize` impl of `Insertion` now produces 3-4x smaller payloads,
depending on the data format used ([#5](https://github.com/nomad/cola/pull/5));

Expand Down
6 changes: 1 addition & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,11 @@ exclude = ["/.github/*", "/examples/**", "/fuzz/**", "/tests/**"]
features = ["serde"]
rustdoc-args = ["--cfg", "docsrs"]

[lib]
name = "cola"

[features]
encode = ["dep:bincode", "dep:sha2", "dep:serde"]
encode = ["dep:sha2"]
serde = ["encode", "dep:serde"]

[dependencies]
bincode = { version = "1.3", optional = true }
serde = { version = "1.0", features = ["derive"], optional = true }
sha2 = { version = "0.10", optional = true }

Expand Down
14 changes: 7 additions & 7 deletions src/anchor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,14 @@ impl InnerAnchor {
#[cfg(feature = "encode")]
mod encode {
use super::*;
use crate::encode::{Decode, Encode, Int, IntDecodeError};
use crate::encode::{Decode, Encode, IntDecodeError};

impl Encode for InnerAnchor {
#[inline]
fn encode(&self, buf: &mut Vec<u8>) {
Int::new(self.replica_id()).encode(buf);
Int::new(self.run_ts()).encode(buf);
Int::new(self.offset()).encode(buf);
self.replica_id().encode(buf);
self.run_ts().encode(buf);
self.offset().encode(buf);
}
}

Expand All @@ -172,9 +172,9 @@ mod encode {

#[inline]
fn decode(buf: &[u8]) -> Result<(Self, &[u8]), Self::Error> {
let (replica_id, buf) = Int::<ReplicaId>::decode(buf)?;
let (run_ts, buf) = Int::<RunTs>::decode(buf)?;
let (offset, buf) = Int::<Length>::decode(buf)?;
let (replica_id, buf) = ReplicaId::decode(buf)?;
let (run_ts, buf) = RunTs::decode(buf)?;
let (offset, buf) = Length::decode(buf)?;
let anchor = Self::new(replica_id, offset, run_ts);
Ok((anchor, buf))
}
Expand Down
276 changes: 273 additions & 3 deletions src/backlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::*;
///
/// See [`Replica::backlogged`] for more information.
#[derive(Debug, Clone, Default, PartialEq)]
#[cfg_attr(feature = "encode", derive(serde::Serialize, serde::Deserialize))]
pub(crate) struct Backlog {
insertions: ReplicaIdMap<InsertionsBacklog>,
deletions: ReplicaIdMap<DeletionsBacklog>,
Expand Down Expand Up @@ -69,7 +68,6 @@ impl Backlog {

/// Stores the backlogged [`Insertion`]s of a particular replica.
#[derive(Clone, Default, PartialEq)]
#[cfg_attr(feature = "encode", derive(serde::Serialize, serde::Deserialize))]
struct InsertionsBacklog {
insertions: VecDeque<Insertion>,
}
Expand Down Expand Up @@ -115,7 +113,6 @@ impl InsertionsBacklog {

/// Stores the backlogged [`Deletion`]s of a particular replica.
#[derive(Clone, Default, PartialEq)]
#[cfg_attr(feature = "encode", derive(serde::Serialize, serde::Deserialize))]
struct DeletionsBacklog {
deletions: VecDeque<Deletion>,
}
Expand Down Expand Up @@ -282,3 +279,276 @@ impl Iterator for BackloggedInsertions<'_> {
}

impl core::iter::FusedIterator for BackloggedInsertions<'_> {}

#[cfg(feature = "encode")]
pub(crate) mod encode {
use super::*;
use crate::encode::{Decode, DecodeWithCtx, Encode, IntDecodeError};
use crate::version_map::encode::BaseMapDecodeError;

impl InsertionsBacklog {
#[inline(always)]
fn iter(&self) -> impl Iterator<Item = &Insertion> + '_ {
self.insertions.iter()
}

#[inline(always)]
fn len(&self) -> usize {
self.insertions.len()
}

#[inline(always)]
fn push(&mut self, insertion: Insertion) {
self.insertions.push_back(insertion);
}
}

impl DeletionsBacklog {
#[inline(always)]
fn iter(&self) -> impl Iterator<Item = &Deletion> + '_ {
self.deletions.iter()
}

#[inline(always)]
fn len(&self) -> usize {
self.deletions.len()
}

#[inline(always)]
fn push(&mut self, deletion: Deletion) {
self.deletions.push_back(deletion);
}
}

impl Encode for Backlog {
#[inline]
fn encode(&self, buf: &mut Vec<u8>) {
(self.insertions.len() as u64).encode(buf);

for (id, insertions) in &self.insertions {
ReplicaIdInsertions::new(*id, insertions).encode(buf);
}

(self.deletions.len() as u64).encode(buf);

for (id, deletions) in &self.deletions {
ReplicaIdDeletions::new(*id, deletions).encode(buf);
}
}
}

pub(crate) enum BacklogDecodeError {
Int(IntDecodeError),
VersionMap(BaseMapDecodeError<Length>),
}

impl From<IntDecodeError> for BacklogDecodeError {
#[inline(always)]
fn from(err: IntDecodeError) -> Self {
Self::Int(err)
}
}

impl From<BaseMapDecodeError<Length>> for BacklogDecodeError {
#[inline(always)]
fn from(err: BaseMapDecodeError<Length>) -> Self {
Self::VersionMap(err)
}
}

impl core::fmt::Display for BacklogDecodeError {
#[inline]
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
let err: &dyn core::fmt::Display = match self {
Self::VersionMap(err) => err,
Self::Int(err) => err,
};

write!(f, "Backlog: couldn't be decoded: {err}")
}
}

impl Decode for Backlog {
type Value = Self;

type Error = BacklogDecodeError;

#[inline]
fn decode(buf: &[u8]) -> Result<(Self::Value, &[u8]), Self::Error> {
let (num_replicas, mut buf) = u64::decode(buf)?;

let mut insertions = ReplicaIdMap::default();

for _ in 0..num_replicas {
let ((), new_buf) =
ReplicaIdInsertions::decode(buf, &mut insertions)?;
buf = new_buf;
}

let (num_replicas, mut buf) = u64::decode(buf)?;

let mut deletions = ReplicaIdMap::default();

for _ in 0..num_replicas {
let ((), new_buf) =
ReplicaIdDeletions::decode(buf, &mut deletions)?;
buf = new_buf;
}

let this = Self { insertions, deletions };

Ok((this, buf))
}
}

struct ReplicaIdInsertions<'a> {
replica_id: ReplicaId,
insertions: &'a InsertionsBacklog,
}

impl<'a> ReplicaIdInsertions<'a> {
#[inline]
fn new(
replica_id: ReplicaId,
insertions: &'a InsertionsBacklog,
) -> Self {
Self { replica_id, insertions }
}
}

impl Encode for ReplicaIdInsertions<'_> {
#[inline]
fn encode(&self, buf: &mut Vec<u8>) {
self.replica_id.encode(buf);

(self.insertions.len() as u64).encode(buf);

for insertion in self.insertions.iter() {
insertion.anchor().encode(buf);
let range = insertion.text().temporal_range();
range.start.encode(buf);
range.len().encode(buf);
insertion.run_ts().encode(buf);
insertion.lamport_ts().encode(buf);
}
}
}

impl DecodeWithCtx for ReplicaIdInsertions<'_> {
type Value = ();

type Ctx = ReplicaIdMap<InsertionsBacklog>;

type Error = BacklogDecodeError;

#[inline]
fn decode<'buf>(
buf: &'buf [u8],
ctx: &mut Self::Ctx,
) -> Result<((), &'buf [u8]), Self::Error> {
let (replica_id, buf) = ReplicaId::decode(buf)?;

let (num_insertions, mut buf) = u64::decode(buf)?;

let mut insertions = InsertionsBacklog::default();

for _ in 0..num_insertions {
let (anchor, new_buf) = InnerAnchor::decode(buf)?;
let (start, new_buf) = Length::decode(new_buf)?;
let (len, new_buf) = Length::decode(new_buf)?;
let (run_ts, new_buf) = RunTs::decode(new_buf)?;
let (lamport_ts, new_buf) = LamportTs::decode(new_buf)?;

let insertion = Insertion::new(
anchor,
Text::new(replica_id, start..start + len),
lamport_ts,
run_ts,
);

insertions.push(insertion);

buf = new_buf;
}

ctx.insert(replica_id, insertions);

Ok(((), buf))
}
}

struct ReplicaIdDeletions<'a> {
replica_id: ReplicaId,
deletions: &'a DeletionsBacklog,
}

impl<'a> ReplicaIdDeletions<'a> {
#[inline]
fn new(
replica_id: ReplicaId,
deletions: &'a DeletionsBacklog,
) -> Self {
Self { replica_id, deletions }
}
}

impl Encode for ReplicaIdDeletions<'_> {
#[inline]
fn encode(&self, buf: &mut Vec<u8>) {
self.replica_id.encode(buf);

(self.deletions.len() as u64).encode(buf);

for deletion in self.deletions.iter() {
deletion.start().encode(buf);
deletion.end().encode(buf);
deletion.version_map().encode(buf);
deletion.deletion_ts().encode(buf);
}
}
}

impl DecodeWithCtx for ReplicaIdDeletions<'_> {
type Value = ();

type Ctx = ReplicaIdMap<DeletionsBacklog>;

type Error = BacklogDecodeError;

#[inline]
fn decode<'buf>(
buf: &'buf [u8],
ctx: &mut Self::Ctx,
) -> Result<((), &'buf [u8]), Self::Error> {
let (replica_id, buf) = ReplicaId::decode(buf)?;

let (num_deletions, mut buf) = u64::decode(buf)?;

let mut deletions = DeletionsBacklog::default();

for _ in 0..num_deletions {
let (start, new_buf) = InnerAnchor::decode(buf)?;
let (end, new_buf) = InnerAnchor::decode(new_buf)?;
let (version_map, new_buf) = VersionMap::decode(new_buf)?;
let (deletion_ts, new_buf) = DeletionTs::decode(new_buf)?;

let deletion =
Deletion::new(start, end, version_map, deletion_ts);

deletions.push(deletion);

buf = new_buf;
}

ctx.insert(replica_id, deletions);

Ok(((), buf))
}
}
}

#[cfg(feature = "serde")]
mod serde {
crate::encode::impl_serialize!(super::Backlog);
crate::encode::impl_deserialize!(super::Backlog);
}
Loading
Loading