Skip to content

Commit

Permalink
make record compression into default features (#86)
Browse files Browse the repository at this point in the history
Co-authored-by: charlotte <charlotte.c.mcelwain@gmail.com>
  • Loading branch information
rukai and tychedelia authored Sep 24, 2024
1 parent bea5f24 commit 9d3cd22
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 8 deletions.
20 changes: 15 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,27 @@ client = []
# Enable this feature if you are implementing a kafka protocol compatible broker.
# It will enable encoding of responses and decoding of requests.
broker = []
default = ["client", "broker"]

# Enable compression of records using gzip
gzip = ["dep:flate2"]
# Enable compression of records using zstd
zstd = ["dep:zstd"]
# Enable compression of records using snap
snap = ["dep:snap"]
# Enable compression of records using lz4
lz4 = ["dep:lz4"]

default = ["client", "broker", "gzip", "zstd", "snap", "lz4"]

[dependencies]
bytes = "1.0.1"
uuid = "1.3.0"
indexmap = "2.0.0"
crc = "3.0.0"
snap = "1.0.5"
flate2 = "1.0.20"
zstd = "0.13"
lz4 = "1.24"
snap = { version = "1.0.5", optional = true }
flate2 = { version = "1.0.20", optional = true }
zstd = { version = "0.13", optional = true }
lz4 = { version = "1.24", optional = true }
paste = "1.0.7"
crc32c = "0.6.4"
anyhow = "1.0.80"
Expand Down
8 changes: 8 additions & 0 deletions src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,24 @@
use crate::protocol::buf::{ByteBuf, ByteBufMut};
use anyhow::Result;

#[cfg(feature = "gzip")]
mod gzip;
#[cfg(feature = "lz4")]
mod lz4;
mod none;
#[cfg(feature = "snappy")]
mod snappy;
#[cfg(feature = "zstd")]
mod zstd;

#[cfg(feature = "gzip")]
pub use gzip::Gzip;
#[cfg(feature = "lz4")]
pub use lz4::Lz4;
pub use none::None;
#[cfg(feature = "snappy")]
pub use snappy::Snappy;
#[cfg(feature = "zstd")]
pub use zstd::Zstd;

/// A trait for record compression algorithms.
Expand Down
3 changes: 1 addition & 2 deletions src/compression/lz4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ mod test {
use bytes::BytesMut;
use std::fmt::Write;
use std::str;
use zstd::zstd_safe::WriteBuf;

#[test]
fn test_lz4() {
Expand All @@ -74,7 +73,7 @@ mod test {
.unwrap();

Lz4::decompress(&mut compressed, |buf| -> Result<()> {
let decompressed_str = str::from_utf8(buf.as_slice()).unwrap();
let decompressed_str = str::from_utf8(buf.as_ref()).unwrap();
assert_eq!(decompressed_str, "hello lz4");
Ok(())
})
Expand Down
28 changes: 27 additions & 1 deletion src/records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,19 +238,27 @@ impl RecordBatchEncoder {
compressor(&mut encoded_buf, buf, options.compression)?;
} else {
match options.compression {
#[cfg(feature = "snappy")]
Compression::Snappy => cmpr::Snappy::compress(buf, |buf| {
Self::encode_legacy_records(buf, records, &inner_opts)
})?,
#[cfg(feature = "gzip")]
Compression::Gzip => cmpr::Gzip::compress(buf, |buf| {
Self::encode_legacy_records(buf, records, &inner_opts)
})?,
#[cfg(feature = "lz4")]
Compression::Lz4 => cmpr::Lz4::compress(buf, |buf| {
Self::encode_legacy_records(buf, records, &inner_opts)
})?,
#[cfg(feature = "zstd")]
Compression::Zstd => cmpr::Zstd::compress(buf, |buf| {
Self::encode_legacy_records(buf, records, &inner_opts)
})?,
_ => unimplemented!(),
c => {
return Err(anyhow!(
"Support for {c:?} is not enabled as a cargo feature"
))
}
}
}

Expand Down Expand Up @@ -415,18 +423,27 @@ impl RecordBatchEncoder {
Compression::None => cmpr::None::compress(buf, |buf| {
Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
})?,
#[cfg(feature = "snappy")]
Compression::Snappy => cmpr::Snappy::compress(buf, |buf| {
Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
})?,
#[cfg(feature = "gzip")]
Compression::Gzip => cmpr::Gzip::compress(buf, |buf| {
Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
})?,
#[cfg(feature = "lz4")]
Compression::Lz4 => cmpr::Lz4::compress(buf, |buf| {
Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
})?,
#[cfg(feature = "zstd")]
Compression::Zstd => cmpr::Zstd::compress(buf, |buf| {
Self::encode_new_records(buf, records, min_offset, min_timestamp, options)
})?,
c => {
return Err(anyhow!(
"Support for {c:?} is not enabled as a cargo feature"
))
}
}
}
let batch_end = buf.offset();
Expand Down Expand Up @@ -618,18 +635,27 @@ impl RecordBatchDecoder {
Compression::None => cmpr::None::decompress(buf, |buf| {
Self::decode_new_records(buf, &batch_decode_info, version, records)
})?,
#[cfg(feature = "snappy")]
Compression::Snappy => cmpr::Snappy::decompress(buf, |buf| {
Self::decode_new_records(buf, &batch_decode_info, version, records)
})?,
#[cfg(feature = "gzip")]
Compression::Gzip => cmpr::Gzip::decompress(buf, |buf| {
Self::decode_new_records(buf, &batch_decode_info, version, records)
})?,
#[cfg(feature = "zstd")]
Compression::Zstd => cmpr::Zstd::decompress(buf, |buf| {
Self::decode_new_records(buf, &batch_decode_info, version, records)
})?,
#[cfg(feature = "lz4")]
Compression::Lz4 => cmpr::Lz4::decompress(buf, |buf| {
Self::decode_new_records(buf, &batch_decode_info, version, records)
})?,
c => {
return Err(anyhow!(
"Support for {c:?} is not enabled as a cargo feature"
))
}
};
}

Expand Down

0 comments on commit 9d3cd22

Please sign in to comment.