Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure IPC stream messages are contiguous #6321

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion arrow-flight/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,8 @@ fn schema_to_ipc_format(schema_ipc: SchemaAsIpc) -> ArrowResult<IpcMessage> {
let encoded_data = flight_schema_as_encoded_data(pair.0, pair.1);

let mut schema = vec![];
writer::write_message(&mut schema, encoded_data, pair.1)?;
let already_written_len = 0;
writer::write_message(&mut schema, already_written_len, encoded_data, pair.1)?;
Ok(IpcMessage(schema.into()))
}

Expand Down
171 changes: 131 additions & 40 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::CONTINUATION_MARKER;
/// IPC write options used to control the behaviour of the [`IpcDataGenerator`]
#[derive(Debug, Clone)]
pub struct IpcWriteOptions {
/// Write padding after memory buffers to this multiple of bytes.
/// Write padding to ensure that each data buffer is aligned to this multiple of bytes.
/// Must be 8, 16, 32, or 64 - defaults to 64.
alignment: u8,
/// The legacy format is for releases before 0.15.0, and uses metadata V4
Expand Down Expand Up @@ -909,12 +909,12 @@ impl DictionaryTracker {
pub struct FileWriter<W> {
/// The object to write to
writer: W,
/// The number of bytes written
written_len: usize,
/// IPC write options
write_options: IpcWriteOptions,
/// A reference to the schema, used in validating record batches
schema: SchemaRef,
/// The number of bytes between each block of bytes, as an offset for random access
block_offsets: usize,
Comment on lines -916 to -917
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand what this was doing so I'm not sure I understand what it means that it is going away.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the former mechanism for tracking absolute position in the output file. It was named according to the only thing it was used for: setting Block::offsets. Since it's now used to track how much padding should be written after flatbuffers Messages and to make its expected value more obvious, I renamed it.

/// Dictionary blocks that will be written as part of the IPC footer
dictionary_blocks: Vec<crate::Block>,
/// Record blocks that will be written as part of the IPC footer
Expand Down Expand Up @@ -964,14 +964,20 @@ impl<W: Write> FileWriter<W> {
write_options: IpcWriteOptions,
) -> Result<Self, ArrowError> {
let data_gen = IpcDataGenerator::default();
// write magic to header aligned on alignment boundary
let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
let header_size = super::ARROW_MAGIC.len() + pad_len;

let mut written_len = 0;

// write magic and padding
writer.write_all(&super::ARROW_MAGIC)?;
written_len += super::ARROW_MAGIC.len();
let pad_len = pad_to_alignment(8, written_len);
writer.write_all(&PADDING[..pad_len])?;
written_len += pad_len;

// write the schema, set the written bytes to the schema + header
#[allow(deprecated)]
let preserve_dict_id = write_options.preserve_dict_id;

#[allow(deprecated)]
let mut dictionary_tracker =
DictionaryTracker::new_with_preserve_dict_id(true, preserve_dict_id);
Expand All @@ -980,12 +986,20 @@ impl<W: Write> FileWriter<W> {
&mut dictionary_tracker,
&write_options,
);
let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;

let (meta, data) = write_message(&mut writer, written_len, encoded_message, &write_options)?;

// The schema message has no body
debug_assert_eq!(data, 0);

// written bytes = padded_magic + schema
written_len += meta;

Ok(Self {
writer,
written_len,
write_options,
schema: Arc::new(schema.clone()),
block_offsets: meta + data + header_size,
dictionary_blocks: vec![],
record_blocks: vec![],
finished: false,
Expand Down Expand Up @@ -1015,23 +1029,32 @@ impl<W: Write> FileWriter<W> {
)?;

for encoded_dictionary in encoded_dictionaries {
let (meta, data) =
write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
let (meta, data) = write_message(
&mut self.writer,
self.written_len,
encoded_dictionary,
&self.write_options,
)?;

let block = crate::Block::new(self.block_offsets as i64, meta as i32, data as i64);
let block = crate::Block::new(self.written_len as i64, meta as i32, data as i64);
self.dictionary_blocks.push(block);
self.block_offsets += meta + data;
self.written_len += meta + data;
}

let (meta, data) = write_message(&mut self.writer, encoded_message, &self.write_options)?;
let (meta, data) = write_message(
&mut self.writer,
self.written_len,
encoded_message,
&self.write_options,
)?;
// add a record block for the footer
let block = crate::Block::new(
self.block_offsets as i64,
self.written_len as i64,
meta as i32, // TODO: is this still applicable?
data as i64,
);
self.record_blocks.push(block);
self.block_offsets += meta + data;
self.written_len += meta + data;
Ok(())
}

Expand All @@ -1044,7 +1067,7 @@ impl<W: Write> FileWriter<W> {
}

// write EOS
write_continuation(&mut self.writer, &self.write_options, 0)?;
self.written_len += write_continuation(&mut self.writer, &self.write_options, 0)?;

let mut fbb = FlatBufferBuilder::new();
let dictionaries = fbb.create_vector(&self.dictionary_blocks);
Expand Down Expand Up @@ -1139,6 +1162,8 @@ impl<W: Write> RecordBatchWriter for FileWriter<W> {
pub struct StreamWriter<W> {
/// The object to write to
writer: W,
/// The number of bytes written
written_len: usize,
/// IPC write options
write_options: IpcWriteOptions,
/// Whether the writer footer has been written, and the writer is finished
Expand Down Expand Up @@ -1194,9 +1219,14 @@ impl<W: Write> StreamWriter<W> {
&mut dictionary_tracker,
&write_options,
);
write_message(&mut writer, encoded_message, &write_options)?;
let (meta, data) = write_message(&mut writer, 0, encoded_message, &write_options)?;

// The schema message has no body
debug_assert_eq!(data, 0);

Ok(Self {
writer,
written_len: meta,
write_options,
finished: false,
dictionary_tracker,
Expand All @@ -1218,10 +1248,22 @@ impl<W: Write> StreamWriter<W> {
.expect("StreamWriter is configured to not error on dictionary replacement");

for encoded_dictionary in encoded_dictionaries {
write_message(&mut self.writer, encoded_dictionary, &self.write_options)?;
let (meta, data) = write_message(
&mut self.writer,
self.written_len,
encoded_dictionary,
&self.write_options,
)?;
self.written_len += meta + data;
}

write_message(&mut self.writer, encoded_message, &self.write_options)?;
let (meta, data) = write_message(
&mut self.writer,
self.written_len,
encoded_message,
&self.write_options,
)?;
self.written_len += meta + data;
Ok(())
}

Expand All @@ -1233,7 +1275,7 @@ impl<W: Write> StreamWriter<W> {
));
}

write_continuation(&mut self.writer, &self.write_options, 0)?;
self.written_len += write_continuation(&mut self.writer, &self.write_options, 0)?;

self.finished = true;

Expand Down Expand Up @@ -1326,48 +1368,50 @@ pub struct EncodedData {
/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
pub fn write_message<W: Write>(
mut writer: W,
already_written_len: usize,
encoded: EncodedData,
write_options: &IpcWriteOptions,
) -> Result<(usize, usize), ArrowError> {
Comment on lines 1369 to 1374
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think technically this would be a breaking change as the method is publicly exposed to the API. Could maybe deprecate this and create it as a new method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's preferable I can certainly do that; I think that was the first thing I wrote. Then it seemed that this function is only public for use in arrow-flight, so I prioritized keeping things simpler.

let arrow_data_len = encoded.arrow_data.len();
if arrow_data_len % usize::from(write_options.alignment) != 0 {
if already_written_len % 8 != 0 {
return Err(ArrowError::MemoryError(
"Arrow data not aligned".to_string(),
"Writing an IPC Message unaligned to 8 bytes".to_string(),
));
}

let a = usize::from(write_options.alignment - 1);
let buffer = encoded.ipc_message;
let flatbuf_size = buffer.len();
let prefix_size = if write_options.write_legacy_ipc_format {
let continuation_size = if write_options.write_legacy_ipc_format {
4
} else {
8
};
let aligned_size = (flatbuf_size + prefix_size + a) & !a;
let padding_bytes = aligned_size - flatbuf_size - prefix_size;
let flatbuf_size = encoded.ipc_message.len();
assert_ne!(flatbuf_size, 0);

let padding_size = pad_to_alignment(
write_options.alignment,
already_written_len + continuation_size + flatbuf_size,
);
let padded_size = continuation_size + flatbuf_size + padding_size;
assert_eq!(
(already_written_len + padded_size) % write_options.alignment as usize,
0
);

// write continuation, flatbuf, and padding
write_continuation(
&mut writer,
write_options,
(aligned_size - prefix_size) as i32,
(padded_size - continuation_size) as i32,
)?;

// write the flatbuf
if flatbuf_size > 0 {
writer.write_all(&buffer)?;
}
// write padding
writer.write_all(&PADDING[..padding_bytes])?;
writer.write_all(&encoded.ipc_message)?;
writer.write_all(&PADDING[..padding_size])?;

// write arrow data
let body_len = if arrow_data_len > 0 {
let body_len = if !encoded.arrow_data.is_empty() {
write_body_buffers(&mut writer, &encoded.arrow_data, write_options.alignment)?
} else {
0
};

Ok((aligned_size, body_len))
Ok((padded_size, body_len))
}

fn write_body_buffers<W: Write>(
Expand Down Expand Up @@ -2877,4 +2921,51 @@ mod tests {
assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
}

#[test]
fn test_embedded_stream_is_valid() {
const IPC_ALIGNMENT: usize = 8;
// We write a valid file then try to read it as a stream.
let num_cols = 2;
let mut fields = Vec::new();
for i in 0..num_cols {
let field = Field::new(format!("col_{}", i), DataType::Decimal128(38, 10), true);
fields.push(field);
}
let schema = Schema::new(fields);

for num_batches in [0, 1, 4] {
let mut writer = FileWriter::try_new_with_options(
Vec::new(),
&schema,
IpcWriteOptions::try_new(IPC_ALIGNMENT, false, MetadataVersion::V5).unwrap(),
)
.unwrap();

let mut batches = Vec::new();
for b in 0..num_batches {
let mut arrays = Vec::new();
for i in 0..num_cols {
let array = Decimal128Array::from(vec![(i * b) as i128; 777]);
arrays.push(Arc::new(array) as Arc<dyn Array>);
}
let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
writer.write(&batch).unwrap();
batches.push(batch);
}

writer.finish().unwrap();

let mut out: Vec<u8> = writer.into_inner().unwrap();
let stream_reader = StreamReader::try_new(Cursor::new(&mut out[IPC_ALIGNMENT..]), None).unwrap();

assert_eq!(stream_reader.schema().as_ref(), &schema);

let mut stream_batches = Vec::new();
for batch in stream_reader {
stream_batches.push(batch.unwrap());
}
assert_eq!(stream_batches, batches);
}
}
}
Loading