diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index a435869dbece..9549cfeeb3b8 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -138,7 +138,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aef82843a0ec9f8b19567445ad2421ceeb1d711514384bdd3d49fe37102ee13" dependencies = [ "bigdecimal", - "bzip2", + "bzip2 0.4.4", "crc32fast", "digest", "libflate", @@ -411,7 +411,7 @@ version = "0.4.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df895a515f70646414f4b45c0b79082783b80552b373a68283012928df56f522" dependencies = [ - "bzip2", + "bzip2 0.4.4", "flate2", "futures-core", "futures-io", @@ -928,6 +928,16 @@ dependencies = [ "libc", ] +[[package]] +name = "bzip2" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bafdbf26611df8c14810e268ddceda071c297570a5fb360ceddf617fe417ef58" +dependencies = [ + "bzip2-sys", + "libc", +] + [[package]] name = "bzip2-sys" version = "0.1.11+1.0.8" @@ -1232,7 +1242,7 @@ dependencies = [ "async-compression", "async-trait", "bytes", - "bzip2", + "bzip2 0.5.0", "chrono", "dashmap", "datafusion-catalog", diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 01bf03f32e8e..9bf530a9d6ac 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -92,7 +92,7 @@ async-compression = { version = "0.4.0", features = [ ], optional = true } async-trait = { workspace = true } bytes = { workspace = true } -bzip2 = { version = "0.4.3", optional = true } +bzip2 = { version = "0.5.0", optional = true } chrono = { workspace = true } dashmap = { workspace = true } datafusion-catalog = { workspace = true } diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index e5ce28e73806..b4167900d4c2 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -106,6 +106,28 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir: &Path) -> Result(BzEncoder); + +#[cfg(feature = "compression")] +impl Write for AutoFinishBzEncoder { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.0.write(buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.0.flush() + } +} + +#[cfg(feature = "compression")] +impl Drop for AutoFinishBzEncoder { + fn drop(&mut self) { + let _ = self.0.try_finish(); + } +} + /// Returns file groups [`Vec>`] for scanning `partitions` of `filename` pub fn partitioned_file_groups( path: &str, @@ -147,9 +169,10 @@ pub fn partitioned_file_groups( Box::new(encoder) } #[cfg(feature = "compression")] - FileCompressionType::BZIP2 => { - Box::new(BzEncoder::new(file, BzCompression::default())) - } + FileCompressionType::BZIP2 => Box::new(AutoFinishBzEncoder(BzEncoder::new( + file, + BzCompression::default(), + ))), #[cfg(not(feature = "compression"))] FileCompressionType::GZIP | FileCompressionType::BZIP2 @@ -183,8 +206,8 @@ pub fn partitioned_file_groups( } } - // Must drop the stream before creating ObjectMeta below as drop - // triggers finish for ZstdEncoder which writes additional data + // Must drop the stream before creating ObjectMeta below as drop triggers + // finish for ZstdEncoder/BzEncoder which writes additional data for mut w in writers.into_iter() { w.flush().unwrap(); }