Skip to content

Commit

Permalink
fix: FlexBuffer serialization ambiguity for binary/string/list(u8) (#…
Browse files Browse the repository at this point in the history
…1859)

After the recent switch to ByteBuffer, FlexBuffer would have am
ambiguity serializing the different sequence variants. This was
resulting in Binary ScalarValues written to files to be reinterpreted as
List(u8) on read.
    
Instead, we explicitly handle serialization.
  • Loading branch information
a10y authored Jan 8, 2025
1 parent c16e4de commit 01f4f92
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 12 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 2 additions & 6 deletions vortex-array/src/array/list/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::array::PrimitiveArray;
use crate::builders::{ArrayBuilder, ListBuilder};
use crate::compute::{scalar_at, slice};
use crate::encoding::ids;
use crate::stats::{Stat, StatisticsVTable, StatsSet};
use crate::stats::{StatisticsVTable, StatsSet};
use crate::validity::{LogicalValidity, Validity, ValidityMetadata, ValidityVTable};
use crate::variants::{ListArrayTrait, PrimitiveArrayTrait, VariantsVTable};
use crate::visitor::{ArrayVisitor, VisitorVTable};
Expand Down Expand Up @@ -174,11 +174,7 @@ impl IntoCanonical for ListArray {
}
}

impl StatisticsVTable<ListArray> for ListEncoding {
fn compute_statistics(&self, _array: &ListArray, _stat: Stat) -> VortexResult<StatsSet> {
Ok(StatsSet::default())
}
}
impl StatisticsVTable<ListArray> for ListEncoding {}

impl ListArrayTrait for ListArray {}

Expand Down
6 changes: 3 additions & 3 deletions vortex-sampling-compressor/src/sampling_compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,16 @@ impl Default for SamplingCompressor<'_> {
}

impl<'a> SamplingCompressor<'a> {
pub fn new(compressors: HashSet<CompressorRef<'a>>) -> Self {
pub fn new(compressors: impl Into<HashSet<CompressorRef<'a>>>) -> Self {
Self::new_with_options(compressors, Default::default())
}

pub fn new_with_options(
compressors: HashSet<CompressorRef<'a>>,
compressors: impl Into<HashSet<CompressorRef<'a>>>,
options: CompressConfig,
) -> Self {
Self {
compressors,
compressors: compressors.into(),
options,
path: Vec::new(),
depth: 0,
Expand Down
4 changes: 4 additions & 0 deletions vortex-scalar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ vortex-error = { workspace = true }
vortex-flatbuffers = { workspace = true, optional = true }
vortex-proto = { workspace = true, optional = true }

[dev-dependencies]
flexbuffers = { workspace = true }
rstest = { workspace = true }

[lints]
workspace = true

Expand Down
43 changes: 40 additions & 3 deletions vortex-scalar/src/serde/serde.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::fmt::Formatter;

use serde::de::{Error, SeqAccess, Visitor};
use serde::ser::SerializeSeq;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use vortex_buffer::{BufferString, ByteBuffer};

Expand All @@ -25,9 +26,19 @@ impl Serialize for InnerScalarValue {
Self::Null => ().serialize(serializer),
Self::Bool(b) => b.serialize(serializer),
Self::Primitive(p) => p.serialize(serializer),
Self::Buffer(buffer) => buffer.as_slice().serialize(serializer),
Self::BufferString(buffer) => buffer.as_str().serialize(serializer),
Self::List(l) => l.serialize(serializer),
// NOTE: we explicitly handle the serialization of bytes, strings and lists so as not
// to create ambiguities amongst them. The serde data model has specific representations
// of binary data, UTF-8 strings and sequences.
// See https://serde.rs/data-model.html.
Self::Buffer(buffer) => serializer.serialize_bytes(buffer.as_slice()),
Self::BufferString(buffer) => serializer.serialize_str(buffer.as_str()),
Self::List(l) => {
let mut seq = serializer.serialize_seq(Some(l.len()))?;
for item in l.iter() {
seq.serialize_element(item)?;
}
seq.end()
}
}
}
}
Expand Down Expand Up @@ -197,3 +208,29 @@ impl<'de> Deserialize<'de> for PValue {
})
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use flexbuffers::{FlexbufferSerializer, Reader};
use rstest::rstest;
use vortex_dtype::{Nullability, PType};

use super::*;
use crate::Scalar;

#[rstest]
#[case(Scalar::binary(ByteBuffer::copy_from(b"hello"), Nullability::NonNullable).into_value())]
#[case(Scalar::utf8("hello", Nullability::NonNullable).into_value())]
#[case(Scalar::primitive(1u8, Nullability::NonNullable).into_value())]
#[case(Scalar::list(Arc::new(PType::U8.into()), vec![Scalar::primitive(1u8, Nullability::NonNullable)], Nullability::NonNullable).into_value())]
fn test_scalar_value_serde_roundtrip(#[case] scalar_value: ScalarValue) {
let mut serializer = FlexbufferSerializer::new();
scalar_value.serialize(&mut serializer).unwrap();
let written = serializer.take_buffer();
let reader = Reader::get_root(written.as_ref()).unwrap();
let scalar_read_back = ScalarValue::deserialize(reader).unwrap();
assert_eq!(scalar_value, scalar_read_back);
}
}

0 comments on commit 01f4f92

Please sign in to comment.