Skip to content

Commit

Permalink
Allow reading Parquet maps that lack a values field (#6730)
Browse files Browse the repository at this point in the history
* allow value-less maps

* remove commented out code

* use recently added map_no_value.parquet from parquet-testing
  • Loading branch information
etseidl authored Nov 25, 2024
1 parent a75da00 commit 5d992a3
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 26 deletions.
39 changes: 39 additions & 0 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4066,4 +4066,43 @@ mod tests {
}
}
}

#[test]
fn test_map_no_value() {
// File schema:
// message schema {
// required group my_map (MAP) {
// repeated group key_value {
// required int32 key;
// optional int32 value;
// }
// }
// required group my_map_no_v (MAP) {
// repeated group key_value {
// required int32 key;
// }
// }
// required group my_list (LIST) {
// repeated group list {
// required int32 element;
// }
// }
// }
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{testdata}/map_no_value.parquet");
let file = File::open(path).unwrap();

let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
.unwrap()
.build()
.unwrap();
let out = reader.next().unwrap().unwrap();
assert_eq!(out.num_rows(), 3);
assert_eq!(out.num_columns(), 3);
// my_map_no_v and my_list columns should now be equivalent
let c0 = out.column(1).as_list::<i32>();
let c1 = out.column(2).as_list::<i32>();
assert_eq!(c0.len(), c1.len());
c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
}
}
7 changes: 6 additions & 1 deletion parquet/src/arrow/schema/complex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,13 @@ impl Visitor {
return Err(arrow_err!("Child of map field must be repeated"));
}

// According to the specification the values are optional (#1642).
// In this case, return the keys as a list.
if map_key_value.get_fields().len() == 1 {
return self.visit_list(map_type, context);
}

if map_key_value.get_fields().len() != 2 {
// According to the specification the values are optional (#1642)
return Err(arrow_err!(
"Child of map field must have two children, found {}",
map_key_value.get_fields().len()
Expand Down
91 changes: 67 additions & 24 deletions parquet/src/record/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,15 @@ impl TreeBuilder {
Repetition::REPEATED,
"Invalid map type: {field:?}"
);
assert_eq!(
key_value_type.get_fields().len(),
2,
"Invalid map type: {field:?}"
);
// Parquet spec allows no value. In that case treat as a list. #1642
if key_value_type.get_fields().len() != 1 {
// If not a list, then there can only be 2 fields in the struct
assert_eq!(
key_value_type.get_fields().len(),
2,
"Invalid map type: {field:?}"
);
}

path.push(String::from(key_value_type.name()));

Expand All @@ -239,25 +243,35 @@ impl TreeBuilder {
row_group_reader,
)?;

let value_type = &key_value_type.get_fields()[1];
let value_reader = self.reader_tree(
value_type.clone(),
path,
curr_def_level + 1,
curr_rep_level + 1,
paths,
row_group_reader,
)?;
if key_value_type.get_fields().len() == 1 {
path.pop();
Reader::RepeatedReader(
field,
curr_def_level,
curr_rep_level,
Box::new(key_reader),
)
} else {
let value_type = &key_value_type.get_fields()[1];
let value_reader = self.reader_tree(
value_type.clone(),
path,
curr_def_level + 1,
curr_rep_level + 1,
paths,
row_group_reader,
)?;

path.pop();
path.pop();

Reader::KeyValueReader(
field,
curr_def_level,
curr_rep_level,
Box::new(key_reader),
Box::new(value_reader),
)
Reader::KeyValueReader(
field,
curr_def_level,
curr_rep_level,
Box::new(key_reader),
Box::new(value_reader),
)
}
}
// A repeated field that is neither contained by a `LIST`- or
// `MAP`-annotated group nor annotated by `LIST` or `MAP`
Expand Down Expand Up @@ -1459,8 +1473,7 @@ mod tests {
}

#[test]
#[should_panic(expected = "Invalid map type")]
fn test_file_reader_rows_invalid_map_type() {
fn test_file_reader_rows_nested_map_type() {
let schema = "
message spark_schema {
OPTIONAL group a (MAP) {
Expand Down Expand Up @@ -1823,6 +1836,36 @@ mod tests {
assert_eq!(rows, expected_rows);
}

#[test]
fn test_map_no_value() {
// File schema:
// message schema {
// required group my_map (MAP) {
// repeated group key_value {
// required int32 key;
// optional int32 value;
// }
// }
// required group my_map_no_v (MAP) {
// repeated group key_value {
// required int32 key;
// }
// }
// required group my_list (LIST) {
// repeated group list {
// required int32 element;
// }
// }
// }
let rows = test_file_reader_rows("map_no_value.parquet", None).unwrap();

// the my_map_no_v and my_list columns should be equivalent lists by this point
for row in rows {
let cols = row.into_columns();
assert_eq!(cols[1].1, cols[2].1);
}
}

fn test_file_reader_rows(file_name: &str, schema: Option<Type>) -> Result<Vec<Row>> {
let file = get_test_file(file_name);
let file_reader: Box<dyn FileReader> = Box::new(SerializedFileReader::new(file)?);
Expand Down

0 comments on commit 5d992a3

Please sign in to comment.