From 5d992a3b14f87a0ce300232d9ee7022f61770d45 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Mon, 25 Nov 2024 05:58:54 -0800 Subject: [PATCH] Allow reading Parquet maps that lack a `values` field (#6730) * allow value-less maps * remove commented out code * use recently added map_no_value.parquet from parquet-testing --- parquet-testing | 2 +- parquet/src/arrow/arrow_reader/mod.rs | 39 ++++++++++++ parquet/src/arrow/schema/complex.rs | 7 ++- parquet/src/record/reader.rs | 91 ++++++++++++++++++++------- 4 files changed, 113 insertions(+), 26 deletions(-) diff --git a/parquet-testing b/parquet-testing index 550368ca77b9..4439a223a315 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit 550368ca77b97231efead39251a96bd6f8f08c6e +Subproject commit 4439a223a315cf874746d3b5da25e6a6b2a2b16e diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 1488395b4eec..f351c25bd3ab 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -4066,4 +4066,43 @@ mod tests { } } } + + #[test] + fn test_map_no_value() { + // File schema: + // message schema { + // required group my_map (MAP) { + // repeated group key_value { + // required int32 key; + // optional int32 value; + // } + // } + // required group my_map_no_v (MAP) { + // repeated group key_value { + // required int32 key; + // } + // } + // required group my_list (LIST) { + // repeated group list { + // required int32 element; + // } + // } + // } + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{testdata}/map_no_value.parquet"); + let file = File::open(path).unwrap(); + + let mut reader = ParquetRecordBatchReaderBuilder::try_new(file) + .unwrap() + .build() + .unwrap(); + let out = reader.next().unwrap().unwrap(); + assert_eq!(out.num_rows(), 3); + assert_eq!(out.num_columns(), 3); + // my_map_no_v and my_list columns should now be equivalent + let c0 = out.column(1).as_list::(); + let c1 = out.column(2).as_list::(); + assert_eq!(c0.len(), c1.len()); + c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r)); + } } diff --git a/parquet/src/arrow/schema/complex.rs b/parquet/src/arrow/schema/complex.rs index e487feabb848..dbc81d1ee195 100644 --- a/parquet/src/arrow/schema/complex.rs +++ b/parquet/src/arrow/schema/complex.rs @@ -271,8 +271,13 @@ impl Visitor { return Err(arrow_err!("Child of map field must be repeated")); } + // According to the specification the values are optional (#1642). + // In this case, return the keys as a list. + if map_key_value.get_fields().len() == 1 { + return self.visit_list(map_type, context); + } + if map_key_value.get_fields().len() != 2 { - // According to the specification the values are optional (#1642) return Err(arrow_err!( "Child of map field must have two children, found {}", map_key_value.get_fields().len() diff --git a/parquet/src/record/reader.rs b/parquet/src/record/reader.rs index be58f5c5ad10..5e8dc9714875 100644 --- a/parquet/src/record/reader.rs +++ b/parquet/src/record/reader.rs @@ -217,11 +217,15 @@ impl TreeBuilder { Repetition::REPEATED, "Invalid map type: {field:?}" ); - assert_eq!( - key_value_type.get_fields().len(), - 2, - "Invalid map type: {field:?}" - ); + // Parquet spec allows no value. In that case treat as a list. #1642 + if key_value_type.get_fields().len() != 1 { + // If not a list, then there can only be 2 fields in the struct + assert_eq!( + key_value_type.get_fields().len(), + 2, + "Invalid map type: {field:?}" + ); + } path.push(String::from(key_value_type.name())); @@ -239,25 +243,35 @@ impl TreeBuilder { row_group_reader, )?; - let value_type = &key_value_type.get_fields()[1]; - let value_reader = self.reader_tree( - value_type.clone(), - path, - curr_def_level + 1, - curr_rep_level + 1, - paths, - row_group_reader, - )?; + if key_value_type.get_fields().len() == 1 { + path.pop(); + Reader::RepeatedReader( + field, + curr_def_level, + curr_rep_level, + Box::new(key_reader), + ) + } else { + let value_type = &key_value_type.get_fields()[1]; + let value_reader = self.reader_tree( + value_type.clone(), + path, + curr_def_level + 1, + curr_rep_level + 1, + paths, + row_group_reader, + )?; - path.pop(); + path.pop(); - Reader::KeyValueReader( - field, - curr_def_level, - curr_rep_level, - Box::new(key_reader), - Box::new(value_reader), - ) + Reader::KeyValueReader( + field, + curr_def_level, + curr_rep_level, + Box::new(key_reader), + Box::new(value_reader), + ) + } } // A repeated field that is neither contained by a `LIST`- or // `MAP`-annotated group nor annotated by `LIST` or `MAP` @@ -1459,8 +1473,7 @@ mod tests { } #[test] - #[should_panic(expected = "Invalid map type")] - fn test_file_reader_rows_invalid_map_type() { + fn test_file_reader_rows_nested_map_type() { let schema = " message spark_schema { OPTIONAL group a (MAP) { @@ -1823,6 +1836,36 @@ mod tests { assert_eq!(rows, expected_rows); } + #[test] + fn test_map_no_value() { + // File schema: + // message schema { + // required group my_map (MAP) { + // repeated group key_value { + // required int32 key; + // optional int32 value; + // } + // } + // required group my_map_no_v (MAP) { + // repeated group key_value { + // required int32 key; + // } + // } + // required group my_list (LIST) { + // repeated group list { + // required int32 element; + // } + // } + // } + let rows = test_file_reader_rows("map_no_value.parquet", None).unwrap(); + + // the my_map_no_v and my_list columns should be equivalent lists by this point + for row in rows { + let cols = row.into_columns(); + assert_eq!(cols[1].1, cols[2].1); + } + } + fn test_file_reader_rows(file_name: &str, schema: Option) -> Result> { let file = get_test_file(file_name); let file_reader: Box = Box::new(SerializedFileReader::new(file)?);