Skip to content

Commit

Permalink
implement Encode and Decode for Replica
Browse files Browse the repository at this point in the history
  • Loading branch information
noib3 committed Jan 28, 2024
1 parent 6e60d46 commit d33fc20
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 107 deletions.
2 changes: 1 addition & 1 deletion src/backlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ impl Iterator for BackloggedInsertions<'_> {
impl core::iter::FusedIterator for BackloggedInsertions<'_> {}

#[cfg(feature = "encode")]
mod encode {
pub(crate) mod encode {
use super::*;
use crate::encode::{Decode, DecodeWithCtx, Encode, IntDecodeError};
use crate::version_map::encode::BaseMapDecodeError;
Expand Down
184 changes: 79 additions & 105 deletions src/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,13 +404,16 @@ impl Replica {
return Err(DecodeError::ChecksumFailed);
}

let Some((
run_tree,
lamport_clock,
mut version_map,
mut deletion_map,
backlog,
)) = old_encode::decode(encoded.bytes())
let Ok((
(
run_tree,
lamport_clock,
mut version_map,
mut deletion_map,
backlog,
),
_,
)) = <Self as crate::encode::Decode>::decode(encoded.bytes())
else {
return Err(DecodeError::InvalidData);
};
Expand Down Expand Up @@ -516,9 +519,10 @@ impl Replica {
#[cfg_attr(docsrs, doc(cfg(feature = "encode")))]
#[inline]
pub fn encode(&self) -> EncodedReplica {
let bytes = old_encode::encode(self);
let checksum = checksum(&bytes);
EncodedReplica::new(PROTOCOL_VERSION, checksum, bytes)
let mut buf = Vec::new();
crate::encode::Encode::encode(self, &mut buf);
let checksum = checksum(&buf);
EncodedReplica::new(PROTOCOL_VERSION, checksum, buf.into())
}

/// Creates a new `Replica` with the given [`ReplicaId`] but with the same
Expand Down Expand Up @@ -975,7 +979,6 @@ pub type LamportTs = u64;
///
/// See [this](https://en.wikipedia.org/wiki/Lamport_timestamp) for more.
#[derive(Copy, Clone)]
#[cfg_attr(feature = "encode", derive(serde::Serialize, serde::Deserialize))]
pub struct LamportClock(LamportTs);

impl core::fmt::Debug for LamportClock {
Expand Down Expand Up @@ -1046,7 +1049,10 @@ pub type DeletionTs = u64;
#[cfg(feature = "encode")]
mod encode {
use super::*;
use crate::backlog::encode::BacklogDecodeError;
use crate::encode::{Decode, Encode, IntDecodeError};
use crate::run_tree::encode::RunTreeDecodeError;
use crate::version_map::encode::BaseMapDecodeError;

impl Encode for LamportClock {
#[inline(always)]
Expand All @@ -1066,122 +1072,90 @@ mod encode {
}
}

impl Encode for RunClock {
impl Encode for Replica {
#[inline(always)]
fn encode(&self, buf: &mut Vec<u8>) {
self.0.encode(buf)
self.run_tree.encode(buf);
self.lamport_clock.encode(buf);
self.version_map.encode(buf);
self.deletion_map.encode(buf);
self.backlog.encode(buf);
}
}

impl Decode for RunClock {
type Value = Self;

type Error = IntDecodeError;
pub(crate) enum ReplicaDecodeError {
Backlog(BacklogDecodeError),
DeletionMap(BaseMapDecodeError<DeletionTs>),
Int(IntDecodeError),
RunTree(RunTreeDecodeError),
VersionMap(BaseMapDecodeError<Length>),
}

impl From<BacklogDecodeError> for ReplicaDecodeError {
#[inline(always)]
fn decode(buf: &[u8]) -> Result<(Self, &[u8]), IntDecodeError> {
RunTs::decode(buf).map(|(ts, buf)| (Self(ts), buf))
fn from(err: BacklogDecodeError) -> Self {
Self::Backlog(err)
}
}
}

#[cfg(feature = "encode")]
mod old_encode {
use serde::{de, ser};

use super::*;

type EncodedFields =
(RunTree, LamportClock, VersionMap, DeletionMap, Backlog);

#[inline]
pub(super) fn encode(replica: &Replica) -> Box<[u8]> {
let mut encoded = Vec::new();

encode_field(&mut encoded, &replica.run_tree);
encode_field(&mut encoded, &replica.lamport_clock);
encode_field(&mut encoded, &replica.version_map);
encode_field(&mut encoded, &replica.deletion_map);
encode_field(&mut encoded, &replica.backlog);

encoded.into_boxed_slice()
impl From<BaseMapDecodeError<DeletionTs>> for ReplicaDecodeError {
#[inline(always)]
fn from(err: BaseMapDecodeError<DeletionTs>) -> Self {
Self::DeletionMap(err)
}
}

#[inline]
pub(super) fn decode(bytes: &[u8]) -> Option<EncodedFields> {
let (run_tree, bytes) = decode_field(bytes)?;
let (lamport_clock, bytes) = decode_field(bytes)?;
let (version_map, bytes) = decode_field(bytes)?;
let (deletion_map, bytes) = decode_field(bytes)?;
let (backlog, bytes) = decode_field(bytes)?;

if bytes.is_empty() {
Some((run_tree, lamport_clock, version_map, deletion_map, backlog))
} else {
None
impl From<IntDecodeError> for ReplicaDecodeError {
#[inline(always)]
fn from(err: IntDecodeError) -> Self {
Self::Int(err)
}
}

#[inline]
fn encode_field<T>(buf: &mut Vec<u8>, field: &T)
where
T: ser::Serialize,
{
// Make room for the length of the encoded field.
let len_bytes = [0; 8];
buf.extend_from_slice(&len_bytes);

let len_before = buf.len();
serialize(buf, field);
let len_after = buf.len();

// We convert the usize into a u64 before turning it into a byte array
// to guarantee that the latter is always 8 bytes long. If we don't the
// length will vary depending on the architecture.
let len_bytes = ((len_after - len_before) as u64).to_le_bytes();

buf[len_before - 8..len_before].copy_from_slice(&len_bytes);
impl From<RunTreeDecodeError> for ReplicaDecodeError {
#[inline(always)]
fn from(err: RunTreeDecodeError) -> Self {
Self::RunTree(err)
}
}

#[inline]
fn decode_field<'a, T>(buf: &'a [u8]) -> Option<(T, &'a [u8])>
where
T: de::Deserialize<'a>,
{
// The first 8 bytes represent the length of the encoded field.
let (len_bytes, rest) = if buf.len() >= 8 {
buf.split_at(8)
} else {
return None;
};

let len_bytes: [u8; 8] = len_bytes.try_into().ok()?;

let len = u64::from_le_bytes(len_bytes) as usize;
impl From<BaseMapDecodeError<Length>> for ReplicaDecodeError {
#[inline(always)]
fn from(err: BaseMapDecodeError<Length>) -> Self {
Self::VersionMap(err)
}
}

let (encoded_field, rest) = if rest.len() >= len {
rest.split_at(len)
} else {
return None;
};
impl core::fmt::Display for ReplicaDecodeError {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
let err: &dyn core::fmt::Display = match self {
Self::Backlog(err) => err,
Self::DeletionMap(err) => err,
Self::Int(err) => err,
Self::RunTree(err) => err,
Self::VersionMap(err) => err,
};

deserialize::<T>(encoded_field).map(|field| (field, rest))
write!(f, "Replica: couldn't be decoded: {err}")
}
}

#[inline]
fn serialize<T>(buf: &mut Vec<u8>, value: &T)
where
T: ser::Serialize,
{
bincode::serialize_into(buf, value).expect("failed to serialize")
}
impl Decode for Replica {
type Value = (RunTree, LamportClock, VersionMap, DeletionMap, Backlog);

#[inline]
fn deserialize<'a, T>(bytes: &'a [u8]) -> Option<T>
where
T: de::Deserialize<'a>,
{
bincode::deserialize(bytes).ok()
type Error = ReplicaDecodeError;

#[inline(always)]
fn decode(buf: &[u8]) -> Result<(Self::Value, &[u8]), Self::Error> {
let (run_tree, buf) = RunTree::decode(buf)?;
let (lamport_clock, buf) = LamportClock::decode(buf)?;
let (version_map, buf) = VersionMap::decode(buf)?;
let (deletion_map, buf) = DeletionMap::decode(buf)?;
let (backlog, buf) = Backlog::decode(buf)?;
let this =
(run_tree, lamport_clock, version_map, deletion_map, backlog);
Ok((this, buf))
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/run_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1115,7 +1115,7 @@ pub(crate) type DebugAsSelf<'a> =
gtree::DebugAsSelf<'a, RUN_TREE_ARITY, EditRun>;

#[cfg(feature = "encode")]
mod encode {
pub(crate) mod encode {
use core::mem;

use super::*;
Expand Down

0 comments on commit d33fc20

Please sign in to comment.