Skip to content

Commit

Permalink
Add deprecation warnings for everything related to dict_id (apache#…
Browse files Browse the repository at this point in the history
…6873)

* Add deprecation warnings for everything related to `dict_id`

* Add allow deprecations where necessary
  • Loading branch information
brancz authored Dec 16, 2024
1 parent c4dbf0d commit fc6936a
Show file tree
Hide file tree
Showing 18 changed files with 158 additions and 17 deletions.
9 changes: 9 additions & 0 deletions arrow-flight/src/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,8 +535,10 @@ fn prepare_field_for_flight(
)
.with_metadata(field.metadata().clone())
} else {
#[allow(deprecated)]
let dict_id = dictionary_tracker.set_dict_id(field.as_ref());

#[allow(deprecated)]
Field::new_dict(
field.name(),
field.data_type().clone(),
Expand Down Expand Up @@ -583,7 +585,9 @@ fn prepare_schema_for_flight(
)
.with_metadata(field.metadata().clone())
} else {
#[allow(deprecated)]
let dict_id = dictionary_tracker.set_dict_id(field.as_ref());
#[allow(deprecated)]
Field::new_dict(
field.name(),
field.data_type().clone(),
Expand Down Expand Up @@ -650,10 +654,12 @@ struct FlightIpcEncoder {

impl FlightIpcEncoder {
fn new(options: IpcWriteOptions, error_on_replacement: bool) -> Self {
#[allow(deprecated)]
let preserve_dict_id = options.preserve_dict_id();
Self {
options,
data_gen: IpcDataGenerator::default(),
#[allow(deprecated)]
dictionary_tracker: DictionaryTracker::new_with_preserve_dict_id(
error_on_replacement,
preserve_dict_id,
Expand Down Expand Up @@ -1541,6 +1547,7 @@ mod tests {
async fn verify_flight_round_trip(mut batches: Vec<RecordBatch>) {
let expected_schema = batches.first().unwrap().schema();

#[allow(deprecated)]
let encoder = FlightDataEncoderBuilder::default()
.with_options(IpcWriteOptions::default().with_preserve_dict_id(false))
.with_dictionary_handling(DictionaryHandling::Resend)
Expand Down Expand Up @@ -1568,6 +1575,7 @@ mod tests {
HashMap::from([("some_key".to_owned(), "some_value".to_owned())]),
);

#[allow(deprecated)]
let mut dictionary_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);

let got = prepare_schema_for_flight(&schema, &mut dictionary_tracker, false);
Expand Down Expand Up @@ -1598,6 +1606,7 @@ mod tests {
options: &IpcWriteOptions,
) -> (Vec<FlightData>, FlightData) {
let data_gen = IpcDataGenerator::default();
#[allow(deprecated)]
let mut dictionary_tracker =
DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());

Expand Down
1 change: 1 addition & 0 deletions arrow-flight/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ pub struct IpcMessage(pub Bytes);

fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData {
let data_gen = writer::IpcDataGenerator::default();
#[allow(deprecated)]
let mut dict_tracker =
writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());
data_gen.schema_to_bytes_with_dictionary_tracker(arrow_schema, &mut dict_tracker, options)
Expand Down
1 change: 1 addition & 0 deletions arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pub fn batches_to_flight_data(
let mut flight_data = vec![];

let data_gen = writer::IpcDataGenerator::default();
#[allow(deprecated)]
let mut dictionary_tracker =
writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());

Expand Down
27 changes: 16 additions & 11 deletions arrow-integration-test/src/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ pub fn field_from_json(json: &serde_json::Value) -> Result<Field> {
_ => data_type,
};

#[allow(deprecated)]
let mut field = Field::new_dict(name, data_type, nullable, dict_id, dict_is_ordered);
field.set_metadata(metadata);
Ok(field)
Expand All @@ -274,17 +275,21 @@ pub fn field_to_json(field: &Field) -> serde_json::Value {
};

match field.data_type() {
DataType::Dictionary(ref index_type, ref value_type) => serde_json::json!({
"name": field.name(),
"nullable": field.is_nullable(),
"type": data_type_to_json(value_type),
"children": children,
"dictionary": {
"id": field.dict_id().unwrap(),
"indexType": data_type_to_json(index_type),
"isOrdered": field.dict_is_ordered().unwrap(),
}
}),
DataType::Dictionary(ref index_type, ref value_type) => {
#[allow(deprecated)]
let dict_id = field.dict_id().unwrap();
serde_json::json!({
"name": field.name(),
"nullable": field.is_nullable(),
"type": data_type_to_json(value_type),
"children": children,
"dictionary": {
"id": dict_id,
"indexType": data_type_to_json(index_type),
"isOrdered": field.dict_is_ordered().unwrap(),
}
})
}
_ => serde_json::json!({
"name": field.name(),
"nullable": field.is_nullable(),
Expand Down
3 changes: 3 additions & 0 deletions arrow-integration-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,7 @@ pub fn array_from_json(
Ok(Arc::new(array))
}
DataType::Dictionary(key_type, value_type) => {
#[allow(deprecated)]
let dict_id = field.dict_id().ok_or_else(|| {
ArrowError::JsonError(format!("Unable to find dict_id for field {field:?}"))
})?;
Expand Down Expand Up @@ -930,10 +931,12 @@ pub fn dictionary_array_from_json(
let null_buf = create_null_buf(&json_col);

// build the key data into a buffer, then construct values separately
#[allow(deprecated)]
let key_field = Field::new_dict(
"key",
dict_key.clone(),
field.is_nullable(),
#[allow(deprecated)]
field
.dict_id()
.expect("Dictionary fields must have a dict_id value"),
Expand Down
1 change: 1 addition & 0 deletions arrow-integration-test/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ mod tests {
Field::new("c30", DataType::Duration(TimeUnit::Millisecond), false),
Field::new("c31", DataType::Duration(TimeUnit::Microsecond), false),
Field::new("c32", DataType::Duration(TimeUnit::Nanosecond), false),
#[allow(deprecated)]
Field::new_dict(
"c33",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ async fn upload_data(
let (mut upload_tx, upload_rx) = mpsc::channel(10);

let options = arrow::ipc::writer::IpcWriteOptions::default();
#[allow(deprecated)]
let mut dict_tracker =
writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());
let data_gen = writer::IpcDataGenerator::default();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ impl FlightService for FlightServiceImpl {
.ok_or_else(|| Status::not_found(format!("Could not find flight. {key}")))?;

let options = arrow::ipc::writer::IpcWriteOptions::default();
#[allow(deprecated)]
let mut dictionary_tracker =
writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());
let data_gen = writer::IpcDataGenerator::default();
Expand Down
5 changes: 5 additions & 0 deletions arrow-ipc/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ pub fn schema_to_fb_offset<'a>(
impl From<crate::Field<'_>> for Field {
fn from(field: crate::Field) -> Field {
let arrow_field = if let Some(dictionary) = field.dictionary() {
#[allow(deprecated)]
Field::new_dict(
field.name().unwrap(),
get_data_type(field, true),
Expand Down Expand Up @@ -519,6 +520,7 @@ pub(crate) fn build_field<'a>(
match dictionary_tracker {
Some(tracker) => Some(get_fb_dictionary(
index_type,
#[allow(deprecated)]
tracker.set_dict_id(field),
field
.dict_is_ordered()
Expand All @@ -527,6 +529,7 @@ pub(crate) fn build_field<'a>(
)),
None => Some(get_fb_dictionary(
index_type,
#[allow(deprecated)]
field
.dict_id()
.expect("Dictionary type must have a dictionary id"),
Expand Down Expand Up @@ -1143,13 +1146,15 @@ mod tests {
),
true,
),
#[allow(deprecated)]
Field::new_dict(
"dictionary<int32, utf8>",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
true,
123,
true,
),
#[allow(deprecated)]
Field::new_dict(
"dictionary<uint8, uint32>",
DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::UInt32)),
Expand Down
14 changes: 14 additions & 0 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ fn create_array(
let index_node = reader.next_node(field)?;
let index_buffers = [reader.next_buffer()?, reader.next_buffer()?];

#[allow(deprecated)]
let dict_id = field.dict_id().ok_or_else(|| {
ArrowError::ParseError(format!("Field {field} does not have dict id"))
})?;
Expand Down Expand Up @@ -617,6 +618,7 @@ fn read_dictionary_impl(
}

let id = batch.id();
#[allow(deprecated)]
let fields_using_this_dictionary = schema.fields_with_dict_id(id);
let first_field = fields_using_this_dictionary.first().ok_or_else(|| {
ArrowError::InvalidArgumentError(format!("dictionary id {id} not found in schema"))
Expand Down Expand Up @@ -1725,6 +1727,7 @@ mod tests {
let mut writer = crate::writer::FileWriter::try_new_with_options(
&mut buf,
batch.schema_ref(),
#[allow(deprecated)]
IpcWriteOptions::default().with_preserve_dict_id(false),
)
.unwrap();
Expand Down Expand Up @@ -1869,13 +1872,15 @@ mod tests {
let key_dict_keys = Int8Array::from_iter_values([0, 0, 2, 1, 1, 3]);
let key_dict_array = DictionaryArray::new(key_dict_keys, values);

#[allow(deprecated)]
let keys_field = Arc::new(Field::new_dict(
"keys",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
true, // It is technically not legal for this field to be null.
1,
false,
));
#[allow(deprecated)]
let values_field = Arc::new(Field::new_dict(
"values",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
Expand Down Expand Up @@ -1956,6 +1961,7 @@ mod tests {
#[test]
fn test_roundtrip_stream_dict_of_list_of_dict() {
// list
#[allow(deprecated)]
let list_data_type = DataType::List(Arc::new(Field::new_dict(
"item",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
Expand All @@ -1967,6 +1973,7 @@ mod tests {
test_roundtrip_stream_dict_of_list_of_dict_impl::<i32, i32>(list_data_type, offsets);

// large list
#[allow(deprecated)]
let list_data_type = DataType::LargeList(Arc::new(Field::new_dict(
"item",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
Expand All @@ -1985,6 +1992,7 @@ mod tests {
let dict_array = DictionaryArray::new(keys, Arc::new(values));
let dict_data = dict_array.into_data();

#[allow(deprecated)]
let list_data_type = DataType::FixedSizeList(
Arc::new(Field::new_dict(
"item",
Expand Down Expand Up @@ -2075,6 +2083,7 @@ mod tests {

let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
let key_dict_array = DictionaryArray::new(key_dict_keys, utf8_view_array.clone());
#[allow(deprecated)]
let keys_field = Arc::new(Field::new_dict(
"keys",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8View)),
Expand All @@ -2085,6 +2094,7 @@ mod tests {

let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0, 1]);
let value_dict_array = DictionaryArray::new(value_dict_keys, bin_view_array);
#[allow(deprecated)]
let values_field = Arc::new(Field::new_dict(
"values",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::BinaryView)),
Expand Down Expand Up @@ -2150,6 +2160,7 @@ mod tests {
.unwrap();

let gen = IpcDataGenerator {};
#[allow(deprecated)]
let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
let (_, encoded) = gen
.encoded_batch(&batch, &mut dict_tracker, &Default::default())
Expand Down Expand Up @@ -2187,6 +2198,7 @@ mod tests {
.unwrap();

let gen = IpcDataGenerator {};
#[allow(deprecated)]
let mut dict_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
let (_, encoded) = gen
.encoded_batch(&batch, &mut dict_tracker, &Default::default())
Expand Down Expand Up @@ -2326,6 +2338,7 @@ mod tests {
["a", "b"]
.iter()
.map(|name| {
#[allow(deprecated)]
Field::new_dict(
name.to_string(),
DataType::Dictionary(
Expand Down Expand Up @@ -2360,6 +2373,7 @@ mod tests {
let mut writer = crate::writer::StreamWriter::try_new_with_options(
&mut buf,
batch.schema().as_ref(),
#[allow(deprecated)]
crate::writer::IpcWriteOptions::default().with_preserve_dict_id(false),
)
.expect("Failed to create StreamWriter");
Expand Down
2 changes: 2 additions & 0 deletions arrow-ipc/src/reader/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ mod tests {
"test1",
DataType::RunEndEncoded(
Arc::new(Field::new("run_ends".to_string(), DataType::Int32, false)),
#[allow(deprecated)]
Arc::new(Field::new_dict(
"values".to_string(),
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
Expand Down Expand Up @@ -353,6 +354,7 @@ mod tests {
let mut writer = StreamWriter::try_new_with_options(
&mut buffer,
&schema,
#[allow(deprecated)]
IpcWriteOptions::default().with_preserve_dict_id(false),
)
.expect("Failed to create StreamWriter");
Expand Down
Loading

0 comments on commit fc6936a

Please sign in to comment.