diff --git a/Cargo.toml b/Cargo.toml index 631074a..7bafe4e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,17 +28,27 @@ client = [] # Enable this feature if you are implementing a kafka protocol compatible broker. # It will enable encoding of responses and decoding of requests. broker = [] -default = ["client", "broker"] + +# Enable compression of records using gzip +gzip = ["dep:flate2"] +# Enable compression of records using zstd +zstd = ["dep:zstd"] +# Enable compression of records using snap +snap = ["dep:snap"] +# Enable compression of records using lz4 +lz4 = ["dep:lz4"] + +default = ["client", "broker", "gzip", "zstd", "snap", "lz4"] [dependencies] bytes = "1.0.1" uuid = "1.3.0" indexmap = "2.0.0" crc = "3.0.0" -snap = "1.0.5" -flate2 = "1.0.20" -zstd = "0.13" -lz4 = "1.24" +snap = { version = "1.0.5", optional = true } +flate2 = { version = "1.0.20", optional = true } +zstd = { version = "0.13", optional = true } +lz4 = { version = "1.24", optional = true } paste = "1.0.7" crc32c = "0.6.4" anyhow = "1.0.80" diff --git a/src/compression.rs b/src/compression.rs index dec14bd..00570be 100644 --- a/src/compression.rs +++ b/src/compression.rs @@ -6,16 +6,24 @@ use crate::protocol::buf::{ByteBuf, ByteBufMut}; use anyhow::Result; +#[cfg(feature = "gzip")] mod gzip; +#[cfg(feature = "lz4")] mod lz4; mod none; +#[cfg(feature = "snappy")] mod snappy; +#[cfg(feature = "zstd")] mod zstd; +#[cfg(feature = "gzip")] pub use gzip::Gzip; +#[cfg(feature = "lz4")] pub use lz4::Lz4; pub use none::None; +#[cfg(feature = "snappy")] pub use snappy::Snappy; +#[cfg(feature = "zstd")] pub use zstd::Zstd; /// A trait for record compression algorithms. diff --git a/src/compression/lz4.rs b/src/compression/lz4.rs index d959201..78464ba 100644 --- a/src/compression/lz4.rs +++ b/src/compression/lz4.rs @@ -62,7 +62,6 @@ mod test { use bytes::BytesMut; use std::fmt::Write; use std::str; - use zstd::zstd_safe::WriteBuf; #[test] fn test_lz4() { @@ -74,7 +73,7 @@ mod test { .unwrap(); Lz4::decompress(&mut compressed, |buf| -> Result<()> { - let decompressed_str = str::from_utf8(buf.as_slice()).unwrap(); + let decompressed_str = str::from_utf8(buf.as_ref()).unwrap(); assert_eq!(decompressed_str, "hello lz4"); Ok(()) }) diff --git a/src/records.rs b/src/records.rs index 12aa942..62b0814 100644 --- a/src/records.rs +++ b/src/records.rs @@ -238,19 +238,27 @@ impl RecordBatchEncoder { compressor(&mut encoded_buf, buf, options.compression)?; } else { match options.compression { + #[cfg(feature = "snappy")] Compression::Snappy => cmpr::Snappy::compress(buf, |buf| { Self::encode_legacy_records(buf, records, &inner_opts) })?, + #[cfg(feature = "gzip")] Compression::Gzip => cmpr::Gzip::compress(buf, |buf| { Self::encode_legacy_records(buf, records, &inner_opts) })?, + #[cfg(feature = "lz4")] Compression::Lz4 => cmpr::Lz4::compress(buf, |buf| { Self::encode_legacy_records(buf, records, &inner_opts) })?, + #[cfg(feature = "zstd")] Compression::Zstd => cmpr::Zstd::compress(buf, |buf| { Self::encode_legacy_records(buf, records, &inner_opts) })?, - _ => unimplemented!(), + c => { + return Err(anyhow!( + "Support for {c:?} is not enabled as a cargo feature" + )) + } } } @@ -415,18 +423,27 @@ impl RecordBatchEncoder { Compression::None => cmpr::None::compress(buf, |buf| { Self::encode_new_records(buf, records, min_offset, min_timestamp, options) })?, + #[cfg(feature = "snappy")] Compression::Snappy => cmpr::Snappy::compress(buf, |buf| { Self::encode_new_records(buf, records, min_offset, min_timestamp, options) })?, + #[cfg(feature = "gzip")] Compression::Gzip => cmpr::Gzip::compress(buf, |buf| { Self::encode_new_records(buf, records, min_offset, min_timestamp, options) })?, + #[cfg(feature = "lz4")] Compression::Lz4 => cmpr::Lz4::compress(buf, |buf| { Self::encode_new_records(buf, records, min_offset, min_timestamp, options) })?, + #[cfg(feature = "zstd")] Compression::Zstd => cmpr::Zstd::compress(buf, |buf| { Self::encode_new_records(buf, records, min_offset, min_timestamp, options) })?, + c => { + return Err(anyhow!( + "Support for {c:?} is not enabled as a cargo feature" + )) + } } } let batch_end = buf.offset(); @@ -618,18 +635,27 @@ impl RecordBatchDecoder { Compression::None => cmpr::None::decompress(buf, |buf| { Self::decode_new_records(buf, &batch_decode_info, version, records) })?, + #[cfg(feature = "snappy")] Compression::Snappy => cmpr::Snappy::decompress(buf, |buf| { Self::decode_new_records(buf, &batch_decode_info, version, records) })?, + #[cfg(feature = "gzip")] Compression::Gzip => cmpr::Gzip::decompress(buf, |buf| { Self::decode_new_records(buf, &batch_decode_info, version, records) })?, + #[cfg(feature = "zstd")] Compression::Zstd => cmpr::Zstd::decompress(buf, |buf| { Self::decode_new_records(buf, &batch_decode_info, version, records) })?, + #[cfg(feature = "lz4")] Compression::Lz4 => cmpr::Lz4::decompress(buf, |buf| { Self::decode_new_records(buf, &batch_decode_info, version, records) })?, + c => { + return Err(anyhow!( + "Support for {c:?} is not enabled as a cargo feature" + )) + } }; }