diff --git a/Cargo.toml b/Cargo.toml index 9ee8ae1..931a6ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,8 @@ indexmap = "2.0.0" crc = "3.0.0" snap = "1.0.5" flate2 = "1.0.20" +zstd = "0.13" +lz4 = "1.24" string = "0.3.0" derive_builder = "0.12.0" paste = "1.0.7" diff --git a/src/compression.rs b/src/compression.rs index db90b73..ac1f85b 100644 --- a/src/compression.rs +++ b/src/compression.rs @@ -3,16 +3,31 @@ //! This module has implementations of gzip, Snappy, as well as a noop compression format that //! allows encoding and decoding records into a [`Record`](crate::records::Record). +use std::fmt::Debug; use crate::protocol::buf::{ByteBuf, ByteBufMut}; use crate::protocol::{DecodeError, EncodeError}; mod gzip; mod none; mod snappy; +mod zstd; +mod lz4; pub use gzip::Gzip; pub use none::None; pub use snappy::Snappy; +pub use zstd::Zstd; +pub use lz4::Lz4; + +pub(crate) fn compression_err(e: Error) -> EncodeError { + error!("Error whilst compressing data: {:?}", e); + EncodeError +} + +pub(crate) fn decompression_err(e: Error) -> DecodeError { + error!("Error whilst decompressing data: {:?}", e); + DecodeError +} /// A trait for record compression algorithms. pub trait Compressor { diff --git a/src/compression/gzip.rs b/src/compression/gzip.rs index 8c8da5f..04e6157 100644 --- a/src/compression/gzip.rs +++ b/src/compression/gzip.rs @@ -4,27 +4,16 @@ use bytes::buf::BufMut; use bytes::{Bytes, BytesMut}; use flate2::write::{GzDecoder, GzEncoder}; use flate2::Compression; -use log::error; use crate::protocol::buf::{ByteBuf, ByteBufMut}; use crate::protocol::{DecodeError, EncodeError}; -use super::{Compressor, Decompressor}; +use super::{Compressor, Decompressor, compression_err, decompression_err}; /// Gzip compression algorithm. See [Kafka's broker configuration](https://kafka.apache.org/documentation/#brokerconfigs_compression.type) /// for more information. pub struct Gzip; -fn compression_err(e: std::io::Error) -> EncodeError { - error!("Error whilst compressing data: {}", e); - EncodeError -} - -fn decompression_err(e: std::io::Error) -> DecodeError { - error!("Error whilst decompressing data: {}", e); - DecodeError -} - impl Compressor for Gzip { type BufMut = BytesMut; fn compress(buf: &mut B, f: F) -> Result diff --git a/src/compression/lz4.rs b/src/compression/lz4.rs new file mode 100644 index 0000000..af4af60 --- /dev/null +++ b/src/compression/lz4.rs @@ -0,0 +1,91 @@ +use std::io; +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use crate::protocol::buf::{ByteBuf, ByteBufMut}; +use crate::protocol::{DecodeError, EncodeError}; + +use lz4::{Decoder, EncoderBuilder}; + +use super::{Compressor, Decompressor, compression_err, decompression_err}; + +/// Gzip compression algorithm. See [Kafka's broker configuration](https://kafka.apache.org/documentation/#brokerconfigs_compression.type) +/// for more information. +pub struct Lz4; + +const COMPRESSION_LEVEL: u32 = 4; + +impl Compressor for Lz4 { + type BufMut = BytesMut; + fn compress(buf: &mut B, f: F) -> Result + where + F: FnOnce(&mut Self::BufMut) -> Result, + { + // Write uncompressed bytes into a temporary buffer + let mut tmp = BytesMut::new(); + let res = f(&mut tmp)?; + + let mut encoder = EncoderBuilder::new() + .level(COMPRESSION_LEVEL) + .build(buf.writer()) + .map_err(compression_err)?; + + io::copy(&mut tmp.reader(), &mut encoder).map_err(compression_err)?; + let (_, result) = encoder.finish(); + result.map_err(compression_err)?; + + Ok(res) + } +} + +impl Decompressor for Lz4 { + type Buf = Bytes; + fn decompress(buf: &mut B, f: F) -> Result + where + F: FnOnce(&mut Self::Buf) -> Result, + { + let mut tmp = BytesMut::new().writer(); + + // Allocate a temporary buffer to hold the uncompressed bytes + let buf = buf.copy_to_bytes(buf.remaining()); + + let mut decoder = Decoder::new(buf.reader()) + .map_err(decompression_err)?; + io::copy(&mut decoder, &mut tmp) + .map_err(decompression_err)?; + + f(&mut tmp.into_inner().into()) + } +} + +#[cfg(test)] +mod test { + use std::str; + use std::fmt::Write; + use bytes::BytesMut; + use zstd::zstd_safe::WriteBuf; + use crate::compression::{compression_err, decompression_err, Lz4}; + use crate::compression::{Compressor, Decompressor}; + use crate::protocol::{DecodeError, EncodeError}; + + #[test] + fn test_lz4() { + let mut compressed = BytesMut::new(); + let _ = Lz4::compress( + &mut compressed, + |buf| -> Result<(), EncodeError> { + buf.write_str("hello lz4").map_err(compression_err)?; + Ok(()) + } + ).unwrap(); + + let _ = Lz4::decompress( + &mut compressed, + |buf| -> Result<(), DecodeError> { + let decompressed_str = str::from_utf8(buf.as_slice()) + .map_err(decompression_err)?; + assert_eq!(decompressed_str, "hello lz4"); + Ok(()) + } + ).unwrap(); + } + +} diff --git a/src/compression/snappy.rs b/src/compression/snappy.rs index 57ddea6..d5e1aca 100644 --- a/src/compression/snappy.rs +++ b/src/compression/snappy.rs @@ -1,11 +1,10 @@ use bytes::{Bytes, BytesMut}; -use log::error; use snap::raw::*; use crate::protocol::buf::{ByteBuf, ByteBufMut}; use crate::protocol::{DecodeError, EncodeError}; -use super::{Compressor, Decompressor}; +use super::{Compressor, Decompressor, compression_err, decompression_err}; /// Snappy compression algorithm. See [Kafka's broker configuration](https://kafka.apache.org/documentation/#brokerconfigs_compression.type) /// for more information. @@ -26,10 +25,7 @@ impl Compressor for Snappy { let compress_gap = buf.put_gap(max_compress_len(tmp.len())); let actual_len = Encoder::new() .compress(&tmp, buf.gap_buf(compress_gap)) - .map_err(|e| { - error!("Failed to compress buffer: {}", e); - EncodeError - })?; + .map_err(compression_err)?; buf.seek(start_pos + actual_len); Ok(res) @@ -44,18 +40,12 @@ impl Decompressor for Snappy { { // Allocate a temporary buffer to hold the uncompressed bytes let buf = buf.copy_to_bytes(buf.remaining()); - let actual_len = decompress_len(&buf).map_err(|e| { - error!("Failed to decompress buffer: {}", e); - DecodeError - })?; + let actual_len = decompress_len(&buf).map_err(decompression_err)?; let mut tmp = BytesMut::new(); tmp.resize(actual_len, 0); // Decompress directly from the input buffer - Decoder::new().decompress(&buf, &mut tmp).map_err(|e| { - error!("Failed to decompress buffer: {}", e); - DecodeError - })?; + Decoder::new().decompress(&buf, &mut tmp).map_err(decompression_err)?; f(&mut tmp.into()) } diff --git a/src/compression/zstd.rs b/src/compression/zstd.rs new file mode 100644 index 0000000..4649824 --- /dev/null +++ b/src/compression/zstd.rs @@ -0,0 +1,81 @@ +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use crate::protocol::buf::{ByteBuf, ByteBufMut}; +use crate::protocol::{DecodeError, EncodeError}; + +use super::{Compressor, Decompressor, compression_err, decompression_err}; + +/// Gzip compression algorithm. See [Kafka's broker configuration](https://kafka.apache.org/documentation/#brokerconfigs_compression.type) +/// for more information. +pub struct Zstd; + +const COMPRESSION_LEVEL: i32 = 3; + +impl Compressor for Zstd { + type BufMut = BytesMut; + fn compress(buf: &mut B, f: F) -> Result + where + F: FnOnce(&mut Self::BufMut) -> Result, + { + // Write uncompressed bytes into a temporary buffer + let mut tmp = BytesMut::new(); + let res = f(&mut tmp)?; + + // Compress directly into the target buffer + zstd::stream::copy_encode(tmp.reader(), buf.writer(), COMPRESSION_LEVEL) + .map_err(compression_err)?; + + Ok(res) + } +} + +impl Decompressor for Zstd { + type Buf = Bytes; + + fn decompress(buf: &mut B, f: F) -> Result + where + F: FnOnce(&mut Self::Buf) -> Result, + { + let mut tmp = BytesMut::new().writer(); + // Allocate a temporary buffer to hold the uncompressed bytes + let buf = buf.copy_to_bytes(buf.remaining()); + zstd::stream::copy_decode(buf.reader(), &mut tmp) + .map_err(decompression_err)?; + + f(&mut tmp.into_inner().into()) + } +} + +#[cfg(test)] +mod test { + use std::str; + use std::fmt::Write; + use bytes::BytesMut; + use zstd::zstd_safe::WriteBuf; + use crate::compression::{compression_err, decompression_err, Zstd}; + use crate::compression::{Compressor, Decompressor}; + use crate::protocol::{DecodeError, EncodeError}; + + + #[test] + fn test_zstd() { + let mut compressed = BytesMut::new(); + let _ = Zstd::compress( + &mut compressed, + |buf| -> Result<(), EncodeError> { + buf.write_str("hello zstd").map_err(compression_err)?; + Ok(()) + } + ).unwrap(); + + let _ = Zstd::decompress( + &mut compressed, + |buf| -> Result<(), DecodeError> { + let decompressed_str = str::from_utf8(buf.as_slice()) + .map_err(decompression_err)?; + assert_eq!(decompressed_str, "hello zstd"); + Ok(()) + } + ).unwrap(); + } + +} diff --git a/src/records.rs b/src/records.rs index a68357e..103fa24 100644 --- a/src/records.rs +++ b/src/records.rs @@ -223,6 +223,12 @@ impl RecordBatchEncoder { Compression::Gzip => cmpr::Gzip::compress(buf, |buf| { Self::encode_legacy_records(buf, records, &inner_opts) })?, + Compression::Lz4 => cmpr::Lz4::compress(buf, |buf| { + Self::encode_legacy_records(buf, records, &inner_opts) + })?, + Compression::Zstd => cmpr::Zstd::compress(buf, |buf| { + Self::encode_legacy_records(buf, records, &inner_opts) + })?, _ => unimplemented!(), } @@ -387,7 +393,12 @@ impl RecordBatchEncoder { Compression::Gzip => cmpr::Gzip::compress(buf, |buf| { Self::encode_new_records(buf, records, min_offset, min_timestamp, options) })?, - _ => unimplemented!(), + Compression::Lz4 => cmpr::Lz4::compress(buf, |buf| { + Self::encode_new_records(buf, records, min_offset, min_timestamp, options) + })?, + Compression::Zstd => cmpr::Zstd::compress(buf, |buf| { + Self::encode_new_records(buf, records, min_offset, min_timestamp, options) + })?, } let batch_end = buf.offset(); @@ -569,7 +580,12 @@ impl RecordBatchDecoder { Compression::Gzip => cmpr::Gzip::decompress(buf, |buf| { Self::decode_new_records(buf, &batch_decode_info, version, records) })?, - _ => unimplemented!(), + Compression::Zstd => cmpr::Zstd::decompress(buf, |buf| { + Self::decode_new_records(buf, &batch_decode_info, version, records) + })?, + Compression::Lz4 => cmpr::Lz4::decompress(buf, |buf| { + Self::decode_new_records(buf, &batch_decode_info, version, records) + })?, }; Ok(())