Skip to content

Commit

Permalink
Encode UUID as FixedLenByteArray in parquet_derive (#5773)
Browse files Browse the repository at this point in the history
* fix uuid derive

* fix byte array length handling

* test lengths

* fmt
  • Loading branch information
conradludgate authored May 18, 2024
1 parent dfe0f26 commit 30762e8
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 35 deletions.
125 changes: 90 additions & 35 deletions parquet_derive/src/parquet_field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,19 @@ impl Field {
Type::Option(_) => unimplemented!("Unsupported nesting encountered"),
Type::Reference(_, ref second_type)
| Type::Vec(ref second_type)
| Type::Array(ref second_type)
| Type::Array(ref second_type, _)
| Type::Slice(ref second_type) => match **second_type {
Type::TypePath(_) => Some(self.optional_definition_levels()),
_ => unimplemented!("Unsupported nesting encountered"),
},
},
Type::Reference(_, ref first_type)
| Type::Vec(ref first_type)
| Type::Array(ref first_type)
| Type::Array(ref first_type, _)
| Type::Slice(ref first_type) => match **first_type {
Type::TypePath(_) => None,
Type::Vec(ref second_type)
| Type::Array(ref second_type)
| Type::Array(ref second_type, _)
| Type::Slice(ref second_type) => match **second_type {
Type::TypePath(_) => None,
Type::Reference(_, ref third_type) => match **third_type {
Expand All @@ -161,7 +161,7 @@ impl Field {
match **second_type {
Type::TypePath(_) => Some(self.optional_definition_levels()),
Type::Vec(ref third_type)
| Type::Array(ref third_type)
| Type::Array(ref third_type, _)
| Type::Slice(ref third_type) => match **third_type {
Type::TypePath(_) => Some(self.optional_definition_levels()),
Type::Reference(_, ref fourth_type) => match **fourth_type {
Expand Down Expand Up @@ -316,25 +316,23 @@ impl Field {
let logical_type = self.ty.logical_type();
let repetition = self.ty.repetition();
let converted_type = self.ty.converted_type();
let length = self.ty.length();

let mut builder = quote! {
ParquetType::primitive_type_builder(#field_name, #physical_type)
.with_logical_type(#logical_type)
.with_repetition(#repetition)
};

if let Some(converted_type) = converted_type {
quote! {
fields.push(ParquetType::primitive_type_builder(#field_name, #physical_type)
.with_logical_type(#logical_type)
.with_repetition(#repetition)
.with_converted_type(#converted_type)
.build().unwrap().into()
)
}
} else {
quote! {
fields.push(ParquetType::primitive_type_builder(#field_name, #physical_type)
.with_logical_type(#logical_type)
.with_repetition(#repetition)
.build().unwrap().into()
)
}
builder = quote! { #builder.with_converted_type(#converted_type) };
}

if let Some(length) = length {
builder = quote! { #builder.with_length(#length) };
}

quote! { fields.push(#builder.build().unwrap().into()) }
}

fn option_into_vals(&self) -> proc_macro2::TokenStream {
Expand Down Expand Up @@ -394,7 +392,7 @@ impl Field {
quote! { rec.#field_name.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32 }
}
Some(ThirdPartyType::Uuid) => {
quote! { (&rec.#field_name.to_string()[..]).into() }
quote! { rec.#field_name.as_bytes().to_vec().into() }
}
_ => {
if self.is_a_byte_buf {
Expand Down Expand Up @@ -430,7 +428,7 @@ impl Field {
}
}
Some(ThirdPartyType::Uuid) => {
quote! { ::uuid::Uuid::parse_str(vals[i].data().convert()).unwrap() }
quote! { ::uuid::Uuid::from_bytes(vals[i].data().try_into().unwrap()) }
}
_ => match &self.ty {
Type::TypePath(_) => match self.ty.last_part().as_str() {
Expand Down Expand Up @@ -469,7 +467,7 @@ impl Field {
#[allow(clippy::large_enum_variant)]
#[derive(Debug, PartialEq)]
enum Type {
Array(Box<Type>),
Array(Box<Type>, syn::Expr),
Option(Box<Type>),
Slice(Box<Type>),
Vec(Box<Type>),
Expand Down Expand Up @@ -542,7 +540,7 @@ impl Type {
Type::TypePath(_) => parent_ty.unwrap_or(ty),
Type::Option(ref first_type)
| Type::Vec(ref first_type)
| Type::Array(ref first_type)
| Type::Array(ref first_type, _)
| Type::Slice(ref first_type)
| Type::Reference(_, ref first_type) => {
Type::leaf_type_recursive_helper(first_type, Some(ty))
Expand All @@ -560,7 +558,7 @@ impl Type {
Type::TypePath(ref type_) => type_,
Type::Option(ref first_type)
| Type::Vec(ref first_type)
| Type::Array(ref first_type)
| Type::Array(ref first_type, _)
| Type::Slice(ref first_type)
| Type::Reference(_, ref first_type) => match **first_type {
Type::TypePath(ref type_) => type_,
Expand Down Expand Up @@ -607,7 +605,7 @@ impl Type {
let leaf_type = self.leaf_type_recursive();

match leaf_type {
Type::Array(ref first_type) => {
Type::Array(ref first_type, _length) => {
if let Type::TypePath(_) = **first_type {
if last_part == "u8" {
return BasicType::FIXED_LEN_BYTE_ARRAY;
Expand Down Expand Up @@ -638,17 +636,38 @@ impl Type {
}
"f32" => BasicType::FLOAT,
"f64" => BasicType::DOUBLE,
"String" | "str" | "Uuid" => BasicType::BYTE_ARRAY,
"String" | "str" => BasicType::BYTE_ARRAY,
"Uuid" => BasicType::FIXED_LEN_BYTE_ARRAY,
f => unimplemented!("{} currently is not supported", f),
}
}

fn length(&self) -> Option<syn::Expr> {
let last_part = self.last_part();
let leaf_type = self.leaf_type_recursive();

// `[u8; N]` => Some(N)
if let Type::Array(ref first_type, length) = leaf_type {
if let Type::TypePath(_) = **first_type {
if last_part == "u8" {
return Some(length.clone());
}
}
}

match last_part.trim() {
// Uuid => [u8; 16] => Some(16)
"Uuid" => Some(syn::parse_quote!(16)),
_ => None,
}
}

fn logical_type(&self) -> proc_macro2::TokenStream {
let last_part = self.last_part();
let leaf_type = self.leaf_type_recursive();

match leaf_type {
Type::Array(ref first_type) => {
Type::Array(ref first_type, _length) => {
if let Type::TypePath(_) = **first_type {
if last_part == "u8" {
return quote! { None };
Expand Down Expand Up @@ -789,7 +808,7 @@ impl Type {

fn from_type_array(f: &syn::Field, ta: &syn::TypeArray) -> Self {
let inner_type = Type::from_type(f, ta.elem.as_ref());
Type::Array(Box::new(inner_type))
Type::Array(Box::new(inner_type), ta.len.clone())
}

fn from_type_slice(f: &syn::Field, ts: &syn::TypeSlice) -> Self {
Expand Down Expand Up @@ -1091,6 +1110,7 @@ mod test {
a_fix_byte_buf: [u8; 10],
a_complex_option: ::std::option::Option<&Vec<u8>>,
a_complex_vec: &::std::vec::Vec<&Option<u8>>,
a_uuid: ::uuid::Uuid,
}
};

Expand All @@ -1110,7 +1130,42 @@ mod test {
BasicType::BYTE_ARRAY,
BasicType::FIXED_LEN_BYTE_ARRAY,
BasicType::BYTE_ARRAY,
BasicType::INT32
BasicType::INT32,
BasicType::FIXED_LEN_BYTE_ARRAY,
]
)
}

#[test]
fn test_type_length() {
let snippet: proc_macro2::TokenStream = quote! {
struct LotsOfInnerTypes {
a_buf: ::std::vec::Vec<u8>,
a_number: i32,
a_verbose_option: ::std::option::Option<bool>,
a_silly_string: String,
a_fix_byte_buf: [u8; 10],
a_complex_option: ::std::option::Option<&Vec<u8>>,
a_complex_vec: &::std::vec::Vec<&Option<u8>>,
a_uuid: ::uuid::Uuid,
}
};

let fields = extract_fields(snippet);
let converted_fields: Vec<_> = fields.iter().map(Type::from).collect();
let lengths: Vec<_> = converted_fields.iter().map(|ty| ty.length()).collect();

assert_eq!(
lengths,
vec![
None,
None,
None,
None,
Some(syn::parse_quote!(10)),
None,
None,
Some(syn::parse_quote!(16)),
]
)
}
Expand Down Expand Up @@ -1328,8 +1383,8 @@ mod test {
let when = Field::from(&fields[0]);
assert_eq!(when.writer_snippet().to_string(),(quote!{
{
let vals : Vec<_> = records.iter().map(|rec| (&rec.unique_id.to_string()[..]).into() ).collect();
if let ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer.untyped() {
let vals : Vec<_> = records.iter().map(|rec| rec.unique_id.as_bytes().to_vec().into() ).collect();
if let ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) = column_writer.untyped() {
typed.write_batch(&vals[..], None, None) ?;
} else {
panic!("Schema and struct disagree on type for {}" , stringify!{ unique_id })
Expand All @@ -1349,7 +1404,7 @@ mod test {
}
}).collect();

if let ColumnWriter::ByteArrayColumnWriter(ref mut typed) = column_writer.untyped() {
if let ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) = column_writer.untyped() {
typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
} else {
panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_unique_id })
Expand All @@ -1371,13 +1426,13 @@ mod test {
assert_eq!(when.reader_snippet().to_string(),(quote!{
{
let mut vals = Vec::new();
if let ColumnReader::ByteArrayColumnReader(mut typed) = column_reader {
if let ColumnReader::FixedLenByteArrayColumnReader(mut typed) = column_reader {
typed.read_records(num_records, None, None, &mut vals)?;
} else {
panic!("Schema and struct disagree on type for {}", stringify!{ unique_id });
}
for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
r.unique_id = ::uuid::Uuid::parse_str(vals[i].data().convert()).unwrap();
r.unique_id = ::uuid::Uuid::from_bytes(vals[i].data().try_into().unwrap());
}
}
}).to_string());
Expand Down
1 change: 1 addition & 0 deletions parquet_derive_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ rust-version = { workspace = true }
parquet = { workspace = true }
parquet_derive = { path = "../parquet_derive", default-features = false }
chrono = { workspace = true }
uuid = { version = "1", features = ["v4"] }
5 changes: 5 additions & 0 deletions parquet_derive_test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ struct ACompleteRecord<'a> {
pub borrowed_maybe_a_string: &'a Option<String>,
pub borrowed_maybe_a_str: &'a Option<&'a str>,
pub now: chrono::NaiveDateTime,
pub uuid: uuid::Uuid,
pub byte_vec: Vec<u8>,
pub maybe_byte_vec: Option<Vec<u8>>,
pub borrowed_byte_vec: &'a [u8],
Expand All @@ -61,6 +62,7 @@ struct APartiallyCompleteRecord {
pub double: f64,
pub now: chrono::NaiveDateTime,
pub date: chrono::NaiveDate,
pub uuid: uuid::Uuid,
pub byte_vec: Vec<u8>,
}

Expand Down Expand Up @@ -105,6 +107,7 @@ mod tests {
OPTIONAL BINARY borrowed_maybe_a_string (STRING);
OPTIONAL BINARY borrowed_maybe_a_str (STRING);
REQUIRED INT64 now (TIMESTAMP_MILLIS);
REQUIRED FIXED_LEN_BYTE_ARRAY (16) uuid (UUID);
REQUIRED BINARY byte_vec;
OPTIONAL BINARY maybe_byte_vec;
REQUIRED BINARY borrowed_byte_vec;
Expand Down Expand Up @@ -144,6 +147,7 @@ mod tests {
borrowed_maybe_a_string: &maybe_a_string,
borrowed_maybe_a_str: &maybe_a_str,
now: chrono::Utc::now().naive_local(),
uuid: uuid::Uuid::new_v4(),
byte_vec: vec![0x65, 0x66, 0x67],
maybe_byte_vec: Some(vec![0x88, 0x89, 0x90]),
borrowed_byte_vec: &borrowed_byte_vec,
Expand Down Expand Up @@ -179,6 +183,7 @@ mod tests {
double: std::f64::NAN,
now: chrono::Utc::now().naive_local(),
date: chrono::naive::NaiveDate::from_ymd_opt(2015, 3, 14).unwrap(),
uuid: uuid::Uuid::new_v4(),
byte_vec: vec![0x65, 0x66, 0x67],
}];

Expand Down

0 comments on commit 30762e8

Please sign in to comment.