Skip to content

Commit

Permalink
implement Decode for Backlog
Browse files Browse the repository at this point in the history
  • Loading branch information
noib3 committed Jan 28, 2024
1 parent 0b48b27 commit 78bbb1f
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 22 deletions.
207 changes: 186 additions & 21 deletions src/backlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,16 +111,6 @@ impl InsertionsBacklog {

self.insertions.insert(offset, insertion);
}

#[inline(always)]
fn iter(&self) -> impl Iterator<Item = &Insertion> + '_ {
self.insertions.iter()
}

#[inline(always)]
fn len(&self) -> usize {
self.insertions.len()
}
}

/// Stores the backlogged [`Deletion`]s of a particular replica.
Expand Down Expand Up @@ -169,16 +159,6 @@ impl DeletionsBacklog {

self.deletions.insert(offset, deletion);
}

#[inline(always)]
fn iter(&self) -> impl Iterator<Item = &Deletion> + '_ {
self.deletions.iter()
}

#[inline(always)]
fn len(&self) -> usize {
self.deletions.len()
}
}

/// An iterator over the backlogged deletions that are ready to be
Expand Down Expand Up @@ -306,20 +286,124 @@ impl core::iter::FusedIterator for BackloggedInsertions<'_> {}
#[cfg(feature = "encode")]
mod encode {
use super::*;
use crate::encode::Encode;
use crate::encode::{Decode, DecodeWithCtx, Encode, IntDecodeError};
use crate::version_map::encode::BaseMapDecodeError;

impl InsertionsBacklog {
#[inline(always)]
fn iter(&self) -> impl Iterator<Item = &Insertion> + '_ {
self.insertions.iter()
}

#[inline(always)]
fn len(&self) -> usize {
self.insertions.len()
}

#[inline(always)]
fn push(&mut self, insertion: Insertion) {
self.insertions.push_back(insertion);
}
}

impl DeletionsBacklog {
#[inline(always)]
fn iter(&self) -> impl Iterator<Item = &Deletion> + '_ {
self.deletions.iter()
}

#[inline(always)]
fn len(&self) -> usize {
self.deletions.len()
}

#[inline(always)]
fn push(&mut self, deletion: Deletion) {
self.deletions.push_back(deletion);
}
}

impl Encode for Backlog {
#[inline]
fn encode(&self, buf: &mut Vec<u8>) {
(self.insertions.len() as u64).encode(buf);

for (id, insertions) in &self.insertions {
ReplicaIdInsertions::new(*id, insertions).encode(buf);
}

(self.deletions.len() as u64).encode(buf);

for (id, deletions) in &self.deletions {
ReplicaIdDeletions::new(*id, deletions).encode(buf);
}
}
}

pub(crate) enum BacklogDecodeError {
Int(IntDecodeError),
VersionMap(BaseMapDecodeError<Length>),
}

impl From<IntDecodeError> for BacklogDecodeError {
#[inline(always)]
fn from(err: IntDecodeError) -> Self {
Self::Int(err)
}
}

impl From<BaseMapDecodeError<Length>> for BacklogDecodeError {
#[inline(always)]
fn from(err: BaseMapDecodeError<Length>) -> Self {
Self::VersionMap(err)
}
}

impl core::fmt::Display for BacklogDecodeError {
#[inline]
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
let err: &dyn core::fmt::Display = match self {
Self::VersionMap(err) => err,
Self::Int(err) => err,
};

write!(f, "Backlog: couldn't be decoded: {err}")
}
}

impl Decode for Backlog {
type Value = Self;

type Error = BacklogDecodeError;

#[inline]
fn decode(buf: &[u8]) -> Result<(Self::Value, &[u8]), Self::Error> {
let (num_replicas, mut buf) = u64::decode(buf)?;

let mut insertions = ReplicaIdMap::default();

for _ in 0..num_replicas {
let ((), new_buf) =
ReplicaIdInsertions::decode(buf, &mut insertions)?;
buf = new_buf;
}

let (num_replicas, mut buf) = u64::decode(buf)?;

let mut deletions = ReplicaIdMap::default();

for _ in 0..num_replicas {
let ((), new_buf) =
ReplicaIdDeletions::decode(buf, &mut deletions)?;
buf = new_buf;
}

let this = Self { insertions, deletions };

Ok((this, buf))
}
}

struct ReplicaIdInsertions<'a> {
replica_id: ReplicaId,
insertions: &'a InsertionsBacklog,
Expand Down Expand Up @@ -353,6 +437,49 @@ mod encode {
}
}

impl DecodeWithCtx for ReplicaIdInsertions<'_> {
type Value = ();

type Ctx = ReplicaIdMap<InsertionsBacklog>;

type Error = BacklogDecodeError;

#[inline]
fn decode<'buf>(
buf: &'buf [u8],
ctx: &mut Self::Ctx,
) -> Result<((), &'buf [u8]), Self::Error> {
let (replica_id, buf) = ReplicaId::decode(buf)?;

let (num_insertions, mut buf) = u64::decode(buf)?;

let mut insertions = InsertionsBacklog::default();

for _ in 0..num_insertions {
let (anchor, new_buf) = InnerAnchor::decode(buf)?;
let (start, new_buf) = Length::decode(new_buf)?;
let (len, new_buf) = Length::decode(new_buf)?;
let (run_ts, new_buf) = RunTs::decode(new_buf)?;
let (lamport_ts, new_buf) = LamportTs::decode(new_buf)?;

let insertion = Insertion::new(
anchor,
Text::new(replica_id, start..start + len),
lamport_ts,
run_ts,
);

insertions.push(insertion);

buf = new_buf;
}

ctx.insert(replica_id, insertions);

Ok(((), buf))
}
}

struct ReplicaIdDeletions<'a> {
replica_id: ReplicaId,
deletions: &'a DeletionsBacklog,
Expand Down Expand Up @@ -383,4 +510,42 @@ mod encode {
}
}
}

impl DecodeWithCtx for ReplicaIdDeletions<'_> {
type Value = ();

type Ctx = ReplicaIdMap<DeletionsBacklog>;

type Error = BacklogDecodeError;

#[inline]
fn decode<'buf>(
buf: &'buf [u8],
ctx: &mut Self::Ctx,
) -> Result<((), &'buf [u8]), Self::Error> {
let (replica_id, buf) = ReplicaId::decode(buf)?;

let (num_deletions, mut buf) = u64::decode(buf)?;

let mut deletions = DeletionsBacklog::default();

for _ in 0..num_deletions {
let (start, new_buf) = InnerAnchor::decode(buf)?;
let (end, new_buf) = InnerAnchor::decode(new_buf)?;
let (version_map, new_buf) = VersionMap::decode(new_buf)?;
let (deletion_ts, new_buf) = DeletionTs::decode(new_buf)?;

let deletion =
Deletion::new(start, end, version_map, deletion_ts);

deletions.push(deletion);

buf = new_buf;
}

ctx.insert(replica_id, deletions);

Ok(((), buf))
}
}
}
2 changes: 1 addition & 1 deletion src/version_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl PartialOrd for VersionMap {
}

#[cfg(feature = "encode")]
mod encode {
pub(crate) mod encode {
use super::*;
use crate::encode::{Decode, Encode, IntDecodeError};

Expand Down

0 comments on commit 78bbb1f

Please sign in to comment.