diff --git a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs index c8289ff446a..62cdca975b0 100644 --- a/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/arrow-integration-testing/src/flight_client_scenarios/integration_test.rs @@ -29,7 +29,7 @@ use arrow::{ }; use arrow_flight::{ flight_descriptor::DescriptorType, flight_service_client::FlightServiceClient, - utils::flight_data_to_arrow_batch, FlightData, FlightDescriptor, Location, SchemaAsIpc, Ticket, + utils::flight_data_to_arrow_batch, FlightData, FlightDescriptor, IpcMessage, Location, Ticket, }; use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt}; use tonic::{Request, Streaming}; @@ -72,7 +72,19 @@ async fn upload_data( let (mut upload_tx, upload_rx) = mpsc::channel(10); let options = arrow::ipc::writer::IpcWriteOptions::default(); - let mut schema_flight_data: FlightData = SchemaAsIpc::new(&schema, &options).into(); + let mut dict_tracker = + writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id()); + let data_gen = writer::IpcDataGenerator::default(); + let data = IpcMessage( + data_gen + .schema_to_bytes_with_dictionary_tracker(&schema, &mut dict_tracker, &options) + .ipc_message + .into(), + ); + let mut schema_flight_data = FlightData { + data_header: data.0, + ..Default::default() + }; // arrow_flight::utils::flight_data_from_arrow_schema(&schema, &options); schema_flight_data.flight_descriptor = Some(descriptor.clone()); upload_tx.send(schema_flight_data).await?; @@ -82,7 +94,14 @@ async fn upload_data( if let Some((counter, first_batch)) = original_data_iter.next() { let metadata = counter.to_string().into_bytes(); // Preload the first batch into the channel before starting the request - send_batch(&mut upload_tx, &metadata, first_batch, &options).await?; + send_batch( + &mut upload_tx, + &metadata, + first_batch, + &options, + &mut dict_tracker, + ) + .await?; let outer = client.do_put(Request::new(upload_rx)).await?; let mut inner = outer.into_inner(); @@ -97,7 +116,14 @@ async fn upload_data( // Stream the rest of the batches for (counter, batch) in original_data_iter { let metadata = counter.to_string().into_bytes(); - send_batch(&mut upload_tx, &metadata, batch, &options).await?; + send_batch( + &mut upload_tx, + &metadata, + batch, + &options, + &mut dict_tracker, + ) + .await?; let r = inner .next() @@ -124,12 +150,12 @@ async fn send_batch( metadata: &[u8], batch: &RecordBatch, options: &writer::IpcWriteOptions, + dictionary_tracker: &mut writer::DictionaryTracker, ) -> Result { let data_gen = writer::IpcDataGenerator::default(); - let mut dictionary_tracker = writer::DictionaryTracker::new_with_preserve_dict_id(false, true); let (encoded_dictionaries, encoded_batch) = data_gen - .encoded_batch(batch, &mut dictionary_tracker, options) + .encoded_batch(batch, dictionary_tracker, options) .expect("DictionaryTracker configured above to not error on replacement"); let dictionary_flight_data: Vec = diff --git a/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs b/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs index 0f404b2ae28..27721750081 100644 --- a/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/arrow-integration-testing/src/flight_server_scenarios/integration_test.rs @@ -119,18 +119,31 @@ impl FlightService for FlightServiceImpl { .ok_or_else(|| Status::not_found(format!("Could not find flight. {key}")))?; let options = arrow::ipc::writer::IpcWriteOptions::default(); + let mut dictionary_tracker = + writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id()); + let data_gen = writer::IpcDataGenerator::default(); + let data = IpcMessage( + data_gen + .schema_to_bytes_with_dictionary_tracker( + &flight.schema, + &mut dictionary_tracker, + &options, + ) + .ipc_message + .into(), + ); + let schema_flight_data = FlightData { + data_header: data.0, + ..Default::default() + }; - let schema = std::iter::once(Ok(SchemaAsIpc::new(&flight.schema, &options).into())); + let schema = std::iter::once(Ok(schema_flight_data)); let batches = flight .chunks .iter() .enumerate() .flat_map(|(counter, batch)| { - let data_gen = writer::IpcDataGenerator::default(); - let mut dictionary_tracker = - writer::DictionaryTracker::new_with_preserve_dict_id(false, true); - let (encoded_dictionaries, encoded_batch) = data_gen .encoded_batch(batch, &mut dictionary_tracker, &options) .expect("DictionaryTracker configured above to not error on replacement"); diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index dfaf12892a7..32dab762447 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -64,7 +64,7 @@ pub struct IpcWriteOptions { /// Flag indicating whether the writer should preserve the dictionary IDs defined in the /// schema or generate unique dictionary IDs internally during encoding. /// - /// Defaults to `true` + /// Defaults to `false` preserve_dict_id: bool, } @@ -113,7 +113,7 @@ impl IpcWriteOptions { write_legacy_ipc_format, metadata_version, batch_compression_type: None, - preserve_dict_id: true, + preserve_dict_id: false, }), crate::MetadataVersion::V5 => { if write_legacy_ipc_format { @@ -126,7 +126,7 @@ impl IpcWriteOptions { write_legacy_ipc_format, metadata_version, batch_compression_type: None, - preserve_dict_id: true, + preserve_dict_id: false, }) } } @@ -162,7 +162,7 @@ impl Default for IpcWriteOptions { write_legacy_ipc_format: false, metadata_version: crate::MetadataVersion::V5, batch_compression_type: None, - preserve_dict_id: true, + preserve_dict_id: false, } } } @@ -786,7 +786,7 @@ impl DictionaryTracker { written: HashMap::new(), dict_ids: Vec::new(), error_on_replacement, - preserve_dict_id: true, + preserve_dict_id: false, } }