Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Zstd and Lz4 compression #35

Merged
merged 1 commit into from
Jan 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ indexmap = "2.0.0"
crc = "3.0.0"
snap = "1.0.5"
flate2 = "1.0.20"
zstd = "0.13"
lz4 = "1.24"
string = "0.3.0"
derive_builder = "0.12.0"
paste = "1.0.7"
Expand Down
15 changes: 15 additions & 0 deletions src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,31 @@
//! This module has implementations of gzip, Snappy, as well as a noop compression format that
//! allows encoding and decoding records into a [`Record`](crate::records::Record).

use std::fmt::Debug;
use crate::protocol::buf::{ByteBuf, ByteBufMut};
use crate::protocol::{DecodeError, EncodeError};

mod gzip;
mod none;
mod snappy;
mod zstd;
mod lz4;

pub use gzip::Gzip;
pub use none::None;
pub use snappy::Snappy;
pub use zstd::Zstd;
pub use lz4::Lz4;

pub(crate) fn compression_err<Error: Debug>(e: Error) -> EncodeError {
error!("Error whilst compressing data: {:?}", e);
EncodeError
}

pub(crate) fn decompression_err<Error: Debug>(e: Error) -> DecodeError {
error!("Error whilst decompressing data: {:?}", e);
DecodeError
}

/// A trait for record compression algorithms.
pub trait Compressor<B: ByteBufMut> {
Expand Down
13 changes: 1 addition & 12 deletions src/compression/gzip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,16 @@ use bytes::buf::BufMut;
use bytes::{Bytes, BytesMut};
use flate2::write::{GzDecoder, GzEncoder};
use flate2::Compression;
use log::error;

use crate::protocol::buf::{ByteBuf, ByteBufMut};
use crate::protocol::{DecodeError, EncodeError};

use super::{Compressor, Decompressor};
use super::{Compressor, Decompressor, compression_err, decompression_err};

/// Gzip compression algorithm. See [Kafka's broker configuration](https://kafka.apache.org/documentation/#brokerconfigs_compression.type)
/// for more information.
pub struct Gzip;

fn compression_err(e: std::io::Error) -> EncodeError {
error!("Error whilst compressing data: {}", e);
EncodeError
}

fn decompression_err(e: std::io::Error) -> DecodeError {
error!("Error whilst decompressing data: {}", e);
DecodeError
}

impl<B: ByteBufMut> Compressor<B> for Gzip {
type BufMut = BytesMut;
fn compress<R, F>(buf: &mut B, f: F) -> Result<R, EncodeError>
Expand Down
91 changes: 91 additions & 0 deletions src/compression/lz4.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
use std::io;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use crate::protocol::buf::{ByteBuf, ByteBufMut};
use crate::protocol::{DecodeError, EncodeError};

use lz4::{Decoder, EncoderBuilder};

use super::{Compressor, Decompressor, compression_err, decompression_err};

/// Gzip compression algorithm. See [Kafka's broker configuration](https://kafka.apache.org/documentation/#brokerconfigs_compression.type)
/// for more information.
pub struct Lz4;

const COMPRESSION_LEVEL: u32 = 4;

impl<B: ByteBufMut> Compressor<B> for Lz4 {
type BufMut = BytesMut;
fn compress<R, F>(buf: &mut B, f: F) -> Result<R, EncodeError>
where
F: FnOnce(&mut Self::BufMut) -> Result<R, EncodeError>,
{
// Write uncompressed bytes into a temporary buffer
let mut tmp = BytesMut::new();
let res = f(&mut tmp)?;

let mut encoder = EncoderBuilder::new()
.level(COMPRESSION_LEVEL)
.build(buf.writer())
.map_err(compression_err)?;

io::copy(&mut tmp.reader(), &mut encoder).map_err(compression_err)?;
let (_, result) = encoder.finish();
result.map_err(compression_err)?;

Ok(res)
}
}

impl<B: ByteBuf> Decompressor<B> for Lz4 {
type Buf = Bytes;
fn decompress<R, F>(buf: &mut B, f: F) -> Result<R, DecodeError>
where
F: FnOnce(&mut Self::Buf) -> Result<R, DecodeError>,
{
let mut tmp = BytesMut::new().writer();

// Allocate a temporary buffer to hold the uncompressed bytes
let buf = buf.copy_to_bytes(buf.remaining());

let mut decoder = Decoder::new(buf.reader())
.map_err(decompression_err)?;
io::copy(&mut decoder, &mut tmp)
.map_err(decompression_err)?;

f(&mut tmp.into_inner().into())
}
}

#[cfg(test)]
mod test {
use std::str;
use std::fmt::Write;
use bytes::BytesMut;
use zstd::zstd_safe::WriteBuf;
use crate::compression::{compression_err, decompression_err, Lz4};
use crate::compression::{Compressor, Decompressor};
use crate::protocol::{DecodeError, EncodeError};

#[test]
fn test_lz4() {
let mut compressed = BytesMut::new();
let _ = Lz4::compress(
&mut compressed,
|buf| -> Result<(), EncodeError> {
buf.write_str("hello lz4").map_err(compression_err)?;
Ok(())
}
).unwrap();

let _ = Lz4::decompress(
&mut compressed,
|buf| -> Result<(), DecodeError> {
let decompressed_str = str::from_utf8(buf.as_slice())
.map_err(decompression_err)?;
assert_eq!(decompressed_str, "hello lz4");
Ok(())
}
).unwrap();
}

}
18 changes: 4 additions & 14 deletions src/compression/snappy.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use bytes::{Bytes, BytesMut};
use log::error;
use snap::raw::*;

use crate::protocol::buf::{ByteBuf, ByteBufMut};
use crate::protocol::{DecodeError, EncodeError};

use super::{Compressor, Decompressor};
use super::{Compressor, Decompressor, compression_err, decompression_err};

/// Snappy compression algorithm. See [Kafka's broker configuration](https://kafka.apache.org/documentation/#brokerconfigs_compression.type)
/// for more information.
Expand All @@ -26,10 +25,7 @@ impl<B: ByteBufMut> Compressor<B> for Snappy {
let compress_gap = buf.put_gap(max_compress_len(tmp.len()));
let actual_len = Encoder::new()
.compress(&tmp, buf.gap_buf(compress_gap))
.map_err(|e| {
error!("Failed to compress buffer: {}", e);
EncodeError
})?;
.map_err(compression_err)?;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 thanks for cleanup

buf.seek(start_pos + actual_len);

Ok(res)
Expand All @@ -44,18 +40,12 @@ impl<B: ByteBuf> Decompressor<B> for Snappy {
{
// Allocate a temporary buffer to hold the uncompressed bytes
let buf = buf.copy_to_bytes(buf.remaining());
let actual_len = decompress_len(&buf).map_err(|e| {
error!("Failed to decompress buffer: {}", e);
DecodeError
})?;
let actual_len = decompress_len(&buf).map_err(decompression_err)?;
let mut tmp = BytesMut::new();
tmp.resize(actual_len, 0);

// Decompress directly from the input buffer
Decoder::new().decompress(&buf, &mut tmp).map_err(|e| {
error!("Failed to decompress buffer: {}", e);
DecodeError
})?;
Decoder::new().decompress(&buf, &mut tmp).map_err(decompression_err)?;

f(&mut tmp.into())
}
Expand Down
81 changes: 81 additions & 0 deletions src/compression/zstd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use bytes::{Buf, BufMut, Bytes, BytesMut};
use crate::protocol::buf::{ByteBuf, ByteBufMut};
use crate::protocol::{DecodeError, EncodeError};

use super::{Compressor, Decompressor, compression_err, decompression_err};

/// Gzip compression algorithm. See [Kafka's broker configuration](https://kafka.apache.org/documentation/#brokerconfigs_compression.type)
/// for more information.
pub struct Zstd;

const COMPRESSION_LEVEL: i32 = 3;

impl<B: ByteBufMut> Compressor<B> for Zstd {
type BufMut = BytesMut;
fn compress<R, F>(buf: &mut B, f: F) -> Result<R, EncodeError>
where
F: FnOnce(&mut Self::BufMut) -> Result<R, EncodeError>,
{
// Write uncompressed bytes into a temporary buffer
let mut tmp = BytesMut::new();
let res = f(&mut tmp)?;

// Compress directly into the target buffer
zstd::stream::copy_encode(tmp.reader(), buf.writer(), COMPRESSION_LEVEL)
.map_err(compression_err)?;

Ok(res)
}
}

impl<B: ByteBuf> Decompressor<B> for Zstd {
type Buf = Bytes;

fn decompress<R, F>(buf: &mut B, f: F) -> Result<R, DecodeError>
where
F: FnOnce(&mut Self::Buf) -> Result<R, DecodeError>,
{
let mut tmp = BytesMut::new().writer();
// Allocate a temporary buffer to hold the uncompressed bytes
let buf = buf.copy_to_bytes(buf.remaining());
zstd::stream::copy_decode(buf.reader(), &mut tmp)
.map_err(decompression_err)?;

f(&mut tmp.into_inner().into())
}
}

#[cfg(test)]
mod test {
use std::str;
use std::fmt::Write;
use bytes::BytesMut;
use zstd::zstd_safe::WriteBuf;
use crate::compression::{compression_err, decompression_err, Zstd};
use crate::compression::{Compressor, Decompressor};
use crate::protocol::{DecodeError, EncodeError};


#[test]
fn test_zstd() {
let mut compressed = BytesMut::new();
let _ = Zstd::compress(
&mut compressed,
|buf| -> Result<(), EncodeError> {
buf.write_str("hello zstd").map_err(compression_err)?;
Ok(())
}
).unwrap();

let _ = Zstd::decompress(
&mut compressed,
|buf| -> Result<(), DecodeError> {
let decompressed_str = str::from_utf8(buf.as_slice())
.map_err(decompression_err)?;
assert_eq!(decompressed_str, "hello zstd");
Ok(())
}
).unwrap();
}

}
20 changes: 18 additions & 2 deletions src/records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ impl RecordBatchEncoder {
Compression::Gzip => cmpr::Gzip::compress(buf, |buf| {
Self::encode_legacy_records(buf, records, &inner_opts)
})?,
Compression::Lz4 => cmpr::Lz4::compress(buf, |buf| {
Self::encode_legacy_records(buf, records, &inner_opts)
})?,
Compression::Zstd => cmpr::Zstd::compress(buf, |buf| {
Self::encode_legacy_records(buf, records, &inner_opts)
})?,
_ => unimplemented!(),
}

Expand Down Expand Up @@ -387,7 +393,12 @@ impl RecordBatchEncoder {
Compression::Gzip => cmpr::Gzip::compress(buf, |buf| {
Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
})?,
_ => unimplemented!(),
Compression::Lz4 => cmpr::Lz4::compress(buf, |buf| {
Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
})?,
Compression::Zstd => cmpr::Zstd::compress(buf, |buf| {
Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
})?,
}

let batch_end = buf.offset();
Expand Down Expand Up @@ -569,7 +580,12 @@ impl RecordBatchDecoder {
Compression::Gzip => cmpr::Gzip::decompress(buf, |buf| {
Self::decode_new_records(buf, &batch_decode_info, version, records)
})?,
_ => unimplemented!(),
Compression::Zstd => cmpr::Zstd::decompress(buf, |buf| {
Self::decode_new_records(buf, &batch_decode_info, version, records)
})?,
Compression::Lz4 => cmpr::Lz4::decompress(buf, |buf| {
Self::decode_new_records(buf, &batch_decode_info, version, records)
})?,
};

Ok(())
Expand Down
Loading