From 320e4ee1a07d4af031d6ba686fb3135b2e8c6209 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 15 Aug 2023 08:06:48 +1000 Subject: [PATCH] cargo fmt non-generated code in generated crate --- src/compression.rs | 6 +- src/compression/gzip.rs | 13 +- src/compression/none.rs | 6 +- src/compression/snappy.rs | 18 +-- src/lib.rs | 8 +- src/protocol/buf.rs | 5 +- src/protocol/mod.rs | 20 +-- src/protocol/types.rs | 272 ++++++++++++++++++++++++++----------- src/records.rs | 279 ++++++++++++++++++++++++++++---------- tests/api_versions.rs | 8 +- tests/fetch_response.rs | 14 +- tests/request_header.rs | 47 +++---- 12 files changed, 476 insertions(+), 220 deletions(-) diff --git a/src/compression.rs b/src/compression.rs index c14346e..db90b73 100644 --- a/src/compression.rs +++ b/src/compression.rs @@ -4,15 +4,15 @@ //! allows encoding and decoding records into a [`Record`](crate::records::Record). use crate::protocol::buf::{ByteBuf, ByteBufMut}; -use crate::protocol::{EncodeError, DecodeError}; +use crate::protocol::{DecodeError, EncodeError}; +mod gzip; mod none; mod snappy; -mod gzip; +pub use gzip::Gzip; pub use none::None; pub use snappy::Snappy; -pub use gzip::Gzip; /// A trait for record compression algorithms. pub trait Compressor { diff --git a/src/compression/gzip.rs b/src/compression/gzip.rs index dd74ee4..8c8da5f 100644 --- a/src/compression/gzip.rs +++ b/src/compression/gzip.rs @@ -1,13 +1,13 @@ use std::io::Write; -use bytes::{Bytes, BytesMut}; use bytes::buf::BufMut; +use bytes::{Bytes, BytesMut}; +use flate2::write::{GzDecoder, GzEncoder}; use flate2::Compression; -use flate2::write::{GzEncoder, GzDecoder}; use log::error; use crate::protocol::buf::{ByteBuf, ByteBufMut}; -use crate::protocol::{EncodeError, DecodeError}; +use crate::protocol::{DecodeError, EncodeError}; use super::{Compressor, Decompressor}; @@ -29,7 +29,7 @@ impl Compressor for Gzip { type BufMut = BytesMut; fn compress(buf: &mut B, f: F) -> Result where - F: FnOnce(&mut Self::BufMut) -> Result + F: FnOnce(&mut Self::BufMut) -> Result, { // Write uncompressed bytes into a temporary buffer let mut tmp = BytesMut::new(); @@ -48,13 +48,14 @@ impl Decompressor for Gzip { type Buf = Bytes; fn decompress(buf: &mut B, f: F) -> Result where - F: FnOnce(&mut Self::Buf) -> Result + F: FnOnce(&mut Self::Buf) -> Result, { let mut tmp = BytesMut::new(); // Decompress directly from the input buffer let mut d = GzDecoder::new((&mut tmp).writer()); - d.write_all(&buf.copy_to_bytes(buf.remaining())).map_err(decompression_err)?; + d.write_all(&buf.copy_to_bytes(buf.remaining())) + .map_err(decompression_err)?; d.finish().map_err(decompression_err)?; f(&mut tmp.into()) diff --git a/src/compression/none.rs b/src/compression/none.rs index d10c813..0dfe6f2 100644 --- a/src/compression/none.rs +++ b/src/compression/none.rs @@ -1,5 +1,5 @@ use crate::protocol::buf::{ByteBuf, ByteBufMut}; -use crate::protocol::{EncodeError, DecodeError}; +use crate::protocol::{DecodeError, EncodeError}; use super::{Compressor, Decompressor}; @@ -10,7 +10,7 @@ impl Compressor for None { type BufMut = B; fn compress(buf: &mut B, f: F) -> Result where - F: FnOnce(&mut Self::BufMut) -> Result + F: FnOnce(&mut Self::BufMut) -> Result, { f(buf) } @@ -20,7 +20,7 @@ impl Decompressor for None { type Buf = B; fn decompress(buf: &mut B, f: F) -> Result where - F: FnOnce(&mut Self::Buf) -> Result + F: FnOnce(&mut Self::Buf) -> Result, { f(buf) } diff --git a/src/compression/snappy.rs b/src/compression/snappy.rs index 6abb555..57ddea6 100644 --- a/src/compression/snappy.rs +++ b/src/compression/snappy.rs @@ -1,9 +1,9 @@ use bytes::{Bytes, BytesMut}; -use snap::raw::*; use log::error; +use snap::raw::*; use crate::protocol::buf::{ByteBuf, ByteBufMut}; -use crate::protocol::{EncodeError, DecodeError}; +use crate::protocol::{DecodeError, EncodeError}; use super::{Compressor, Decompressor}; @@ -15,7 +15,7 @@ impl Compressor for Snappy { type BufMut = BytesMut; fn compress(buf: &mut B, f: F) -> Result where - F: FnOnce(&mut Self::BufMut) -> Result + F: FnOnce(&mut Self::BufMut) -> Result, { // Write uncompressed bytes into a temporary buffer let mut tmp = BytesMut::new(); @@ -24,10 +24,12 @@ impl Compressor for Snappy { // Compress directly into the target buffer let start_pos = buf.offset(); let compress_gap = buf.put_gap(max_compress_len(tmp.len())); - let actual_len = Encoder::new().compress(&tmp, buf.gap_buf(compress_gap)).map_err(|e| { - error!("Failed to compress buffer: {}", e); - EncodeError - })?; + let actual_len = Encoder::new() + .compress(&tmp, buf.gap_buf(compress_gap)) + .map_err(|e| { + error!("Failed to compress buffer: {}", e); + EncodeError + })?; buf.seek(start_pos + actual_len); Ok(res) @@ -38,7 +40,7 @@ impl Decompressor for Snappy { type Buf = Bytes; fn decompress(buf: &mut B, f: F) -> Result where - F: FnOnce(&mut Self::Buf) -> Result + F: FnOnce(&mut Self::Buf) -> Result, { // Allocate a temporary buffer to hold the uncompressed bytes let buf = buf.copy_to_bytes(buf.remaining()); diff --git a/src/lib.rs b/src/lib.rs index 3ff69c5..e485cc0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,7 +83,7 @@ //! # api_versions_req.client_software_version = StrBytes::from_str("1.0"); //! # api_versions_req.client_software_name = StrBytes::from_str("example-client"); //! # api_versions_req.encode(&mut buf, 3); -//! +//! //! let api_key = buf.peek_bytes(0..2).get_i16(); //! let api_version = buf.peek_bytes(2..4).get_i16(); //! let header_version = ApiKey::try_from(api_key).unwrap().request_header_version(api_version); @@ -107,11 +107,11 @@ #[macro_use] extern crate log; +pub mod compression; +pub mod error; #[allow(clippy::all)] pub mod messages; -pub mod records; -pub mod compression; pub mod protocol; -pub mod error; +pub mod records; pub use error::ResponseError; diff --git a/src/protocol/buf.rs b/src/protocol/buf.rs index 5d1f723..120f4a0 100644 --- a/src/protocol/buf.rs +++ b/src/protocol/buf.rs @@ -257,7 +257,10 @@ pub trait ByteBufMut: BufMut { /// Put a gap of `len` at the current buffer offset. fn put_gap(&mut self, len: usize) -> Gap { - let res = Gap { offset: self.offset(), len }; + let res = Gap { + offset: self.offset(), + len, + }; self.seek(res.offset + len); res } diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index cc180c3..2be7f4b 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -1,19 +1,19 @@ //! Most types are used internally in encoding/decoding, and are not required by typical use cases //! for interacting with the protocol. However, types can be used for decoding partial messages, //! or rewriting parts of an encoded message. -use std::string::FromUtf8Error; -use std::{error::Error, str::Utf8Error}; use std::borrow::Borrow; -use std::ops::RangeBounds; -use std::collections::BTreeMap; use std::cmp; +use std::collections::BTreeMap; +use std::ops::RangeBounds; +use std::string::FromUtf8Error; +use std::{error::Error, str::Utf8Error}; use buf::{ByteBuf, ByteBufMut}; use self::buf::NotEnoughBytesError; -pub mod types; pub mod buf; +pub mod types; /// A string type backed by [`bytes::Bytes`]. pub type StrBytes = string::String; @@ -54,7 +54,6 @@ impl From for DecodeError { #[derive(Debug)] pub struct EncodeError; - impl std::fmt::Display for EncodeError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "EncodeError") @@ -131,7 +130,12 @@ pub trait Decodable: Sized { pub(crate) trait MapEncodable: Sized { type Key; - fn encode(&self, key: &Self::Key, buf: &mut B, version: i16) -> Result<(), EncodeError>; + fn encode( + &self, + key: &Self::Key, + buf: &mut B, + version: i16, + ) -> Result<(), EncodeError>; fn compute_size(&self, key: &Self::Key, version: i16) -> Result; } @@ -199,4 +203,4 @@ pub trait Builder { /// Retrieve the builder for this protocol item. fn builder() -> Self::Builder; -} \ No newline at end of file +} diff --git a/src/protocol/types.rs b/src/protocol/types.rs index 8012027..ed0190d 100644 --- a/src/protocol/types.rs +++ b/src/protocol/types.rs @@ -4,13 +4,16 @@ //! types that use zigzag encoded to represent length as a "compact" representation. //! //! It is unnecessary to interact directly with these types for most use cases. -use std::string::String as StdString; use std::hash::Hash; +use std::string::String as StdString; use indexmap::IndexMap; use string::TryFrom; -use super::{DecodeError, EncodeError, Encoder, Decoder, Encodable, Decodable, MapEncodable, MapDecodable, NewType, StrBytes}; +use super::{ + Decodable, DecodeError, Decoder, Encodable, EncodeError, Encoder, MapDecodable, MapEncodable, + NewType, StrBytes, +}; use crate::protocol::buf::{ByteBuf, ByteBufMut}; macro_rules! define_copy_impl { @@ -84,7 +87,7 @@ macro_rules! define_simple_ints { ) } -define_simple_ints!{ +define_simple_ints! { Int8: i8 [put_i8, try_get_i8], Int16: i16 [put_i16, try_get_i16], UInt16: u16 [put_u16, try_get_u16], @@ -125,7 +128,7 @@ impl> Decoder for UnsignedVarInt { let mut value = 0; for i in 0..5 { let b = buf.try_get_u8()? as u32; - value |= (b & 0x7F) << (i*7); + value |= (b & 0x7F) << (i * 7); if b < 0x80 { break; } @@ -172,7 +175,7 @@ impl> Decoder for UnsignedVarLong { let mut value = 0; for i in 0..10 { let b = buf.try_get_u8()? as u64; - value |= (b & 0x7F) << (i*7); + value |= (b & 0x7F) << (i * 7); if b < 0x80 { break; } @@ -297,7 +300,11 @@ impl Encoder> for String { } impl Encoder> for String { - fn encode(&self, buf: &mut B, value: Option<&StdString>) -> Result<(), EncodeError> { + fn encode( + &self, + buf: &mut B, + value: Option<&StdString>, + ) -> Result<(), EncodeError> { String.encode(buf, value.map(|s| s.as_str())) } fn compute_size(&self, value: Option<&StdString>) -> Result { @@ -306,7 +313,11 @@ impl Encoder> for String { } impl Encoder<&Option> for String { - fn encode(&self, buf: &mut B, value: &Option) -> Result<(), EncodeError> { + fn encode( + &self, + buf: &mut B, + value: &Option, + ) -> Result<(), EncodeError> { String.encode(buf, value.as_ref()) } fn compute_size(&self, value: &Option) -> Result { @@ -367,7 +378,7 @@ impl Decoder> for String { let mut strbuf = vec![0; n as usize]; buf.try_copy_to_slice(&mut strbuf)?; Ok(Some(std::string::String::from_utf8(strbuf)?)) - }, + } n => { error!("String length is negative ({})", n); Err(DecodeError) @@ -383,7 +394,7 @@ impl> Decoder> for String { n if n >= 0 => { let strbuf = StrBytes::try_from(buf.try_get_bytes(n as usize)?)?; Ok(Some(strbuf.into())) - }, + } n => { error!("String length is negative ({})", n); Err(DecodeError) @@ -398,7 +409,7 @@ impl Decoder for String { Ok(None) => { error!("String length is negative (-1)"); Err(DecodeError) - }, + } Ok(Some(s)) => Ok(s), Err(e) => Err(e), } @@ -411,7 +422,7 @@ impl> Decoder for String { Ok(None) => { error!("String length is negative (-1)"); Err(DecodeError) - }, + } Ok(Some(s)) => Ok(s), Err(e) => Err(e), } @@ -455,7 +466,11 @@ impl Encoder> for CompactString { } impl Encoder> for CompactString { - fn encode(&self, buf: &mut B, value: Option<&StdString>) -> Result<(), EncodeError> { + fn encode( + &self, + buf: &mut B, + value: Option<&StdString>, + ) -> Result<(), EncodeError> { CompactString.encode(buf, value.map(|s| s.as_str())) } fn compute_size(&self, value: Option<&StdString>) -> Result { @@ -473,7 +488,11 @@ impl> Encoder> for CompactString { } impl Encoder<&Option> for CompactString { - fn encode(&self, buf: &mut B, value: &Option) -> Result<(), EncodeError> { + fn encode( + &self, + buf: &mut B, + value: &Option, + ) -> Result<(), EncodeError> { CompactString.encode(buf, value.as_ref()) } fn compute_size(&self, value: &Option) -> Result { @@ -522,10 +541,10 @@ impl Decoder> for CompactString { match UnsignedVarInt.decode(buf)? { 0 => Ok(None), n => { - let mut strbuf = vec![0; (n-1) as usize]; + let mut strbuf = vec![0; (n - 1) as usize]; buf.try_copy_to_slice(&mut strbuf)?; Ok(Some(std::string::String::from_utf8(strbuf)?)) - }, + } } } } @@ -535,9 +554,9 @@ impl> Decoder> for CompactString { match UnsignedVarInt.decode(buf)? { 0 => Ok(None), n => { - let strbuf = StrBytes::try_from(buf.try_get_bytes((n-1) as usize)?)?; + let strbuf = StrBytes::try_from(buf.try_get_bytes((n - 1) as usize)?)?; Ok(Some(strbuf.into())) - }, + } } } } @@ -548,7 +567,7 @@ impl Decoder for CompactString { Ok(None) => { error!("CompactString length is negative (-1)"); Err(DecodeError) - }, + } Ok(Some(s)) => Ok(s), Err(e) => Err(e), } @@ -561,7 +580,7 @@ impl> Decoder for CompactString { Ok(None) => { error!("CompactString length is negative (-1)"); Err(DecodeError) - }, + } Ok(Some(s)) => Ok(s), Err(e) => Err(e), } @@ -603,7 +622,11 @@ impl Encoder> for Bytes { } impl Encoder>> for Bytes { - fn encode(&self, buf: &mut B, value: Option<&Vec>) -> Result<(), EncodeError> { + fn encode( + &self, + buf: &mut B, + value: Option<&Vec>, + ) -> Result<(), EncodeError> { Bytes.encode(buf, value.map(|s| s.as_slice())) } fn compute_size(&self, value: Option<&Vec>) -> Result { @@ -612,7 +635,11 @@ impl Encoder>> for Bytes { } impl Encoder<&Option>> for Bytes { - fn encode(&self, buf: &mut B, value: &Option>) -> Result<(), EncodeError> { + fn encode( + &self, + buf: &mut B, + value: &Option>, + ) -> Result<(), EncodeError> { Bytes.encode(buf, value.as_ref()) } fn compute_size(&self, value: &Option>) -> Result { @@ -621,7 +648,11 @@ impl Encoder<&Option>> for Bytes { } impl Encoder> for Bytes { - fn encode(&self, buf: &mut B, value: Option<&bytes::Bytes>) -> Result<(), EncodeError> { + fn encode( + &self, + buf: &mut B, + value: Option<&bytes::Bytes>, + ) -> Result<(), EncodeError> { Bytes.encode(buf, value.map(|s| &**s)) } fn compute_size(&self, value: Option<&bytes::Bytes>) -> Result { @@ -630,7 +661,11 @@ impl Encoder> for Bytes { } impl Encoder<&Option> for Bytes { - fn encode(&self, buf: &mut B, value: &Option) -> Result<(), EncodeError> { + fn encode( + &self, + buf: &mut B, + value: &Option, + ) -> Result<(), EncodeError> { Bytes.encode(buf, value.as_ref()) } fn compute_size(&self, value: &Option) -> Result { @@ -673,7 +708,7 @@ impl Decoder>> for Bytes { let mut data = vec![0; n as usize]; buf.try_copy_to_slice(&mut data)?; Ok(Some(data)) - }, + } n => { error!("Data length is negative ({})", n); Err(DecodeError) @@ -686,9 +721,7 @@ impl Decoder> for Bytes { fn decode(&self, buf: &mut B) -> Result, DecodeError> { match Int32.decode(buf)? { -1 => Ok(None), - n if n >= 0 => { - Ok(Some(buf.try_get_bytes(n as usize)?)) - }, + n if n >= 0 => Ok(Some(buf.try_get_bytes(n as usize)?)), n => { error!("Data length is negative ({})", n); Err(DecodeError) @@ -703,7 +736,7 @@ impl Decoder> for Bytes { Ok(None) => { error!("Data length is negative (-1)"); Err(DecodeError) - }, + } Ok(Some(s)) => Ok(s), Err(e) => Err(e), } @@ -716,7 +749,7 @@ impl Decoder for Bytes { Ok(None) => { error!("Data length is negative (-1)"); Err(DecodeError) - }, + } Ok(Some(s)) => Ok(s), Err(e) => Err(e), } @@ -760,7 +793,11 @@ impl Encoder> for CompactBytes { } impl Encoder>> for CompactBytes { - fn encode(&self, buf: &mut B, value: Option<&Vec>) -> Result<(), EncodeError> { + fn encode( + &self, + buf: &mut B, + value: Option<&Vec>, + ) -> Result<(), EncodeError> { CompactBytes.encode(buf, value.map(|s| s.as_slice())) } fn compute_size(&self, value: Option<&Vec>) -> Result { @@ -769,7 +806,11 @@ impl Encoder>> for CompactBytes { } impl Encoder<&Option>> for CompactBytes { - fn encode(&self, buf: &mut B, value: &Option>) -> Result<(), EncodeError> { + fn encode( + &self, + buf: &mut B, + value: &Option>, + ) -> Result<(), EncodeError> { CompactBytes.encode(buf, value.as_ref()) } fn compute_size(&self, value: &Option>) -> Result { @@ -778,7 +819,11 @@ impl Encoder<&Option>> for CompactBytes { } impl Encoder> for CompactBytes { - fn encode(&self, buf: &mut B, value: Option<&bytes::Bytes>) -> Result<(), EncodeError> { + fn encode( + &self, + buf: &mut B, + value: Option<&bytes::Bytes>, + ) -> Result<(), EncodeError> { CompactBytes.encode(buf, value.map(|s| &**s)) } fn compute_size(&self, value: Option<&bytes::Bytes>) -> Result { @@ -787,7 +832,11 @@ impl Encoder> for CompactBytes { } impl Encoder<&Option> for CompactBytes { - fn encode(&self, buf: &mut B, value: &Option) -> Result<(), EncodeError> { + fn encode( + &self, + buf: &mut B, + value: &Option, + ) -> Result<(), EncodeError> { CompactBytes.encode(buf, value.as_ref()) } fn compute_size(&self, value: &Option) -> Result { @@ -827,10 +876,10 @@ impl Decoder>> for CompactBytes { match UnsignedVarInt.decode(buf)? { 0 => Ok(None), n => { - let mut data = vec![0; (n-1) as usize]; + let mut data = vec![0; (n - 1) as usize]; buf.try_copy_to_slice(&mut data)?; Ok(Some(data)) - }, + } } } } @@ -839,9 +888,7 @@ impl Decoder> for CompactBytes { fn decode(&self, buf: &mut B) -> Result, DecodeError> { match UnsignedVarInt.decode(buf)? { 0 => Ok(None), - n => { - Ok(Some(buf.try_get_bytes((n-1) as usize)?)) - }, + n => Ok(Some(buf.try_get_bytes((n - 1) as usize)?)), } } } @@ -852,7 +899,7 @@ impl Decoder> for CompactBytes { Ok(None) => { error!("Data length is negative (-1)"); Err(DecodeError) - }, + } Ok(Some(s)) => Ok(s), Err(e) => Err(e), } @@ -865,7 +912,7 @@ impl Decoder for CompactBytes { Ok(None) => { error!("Data length is negative (-1)"); Err(DecodeError) - }, + } Ok(Some(s)) => Ok(s), Err(e) => Err(e), } @@ -876,7 +923,7 @@ impl Decoder for CompactBytes { #[derive(Debug, Copy, Clone, Default)] pub struct Struct { /// The version of the struct. - pub version: i16 + pub version: i16, } impl Encoder<&T> for Struct { @@ -895,7 +942,11 @@ impl Decoder for Struct { } impl Encoder<(&T::Key, &T)> for Struct { - fn encode(&self, buf: &mut B, (key, value): (&T::Key, &T)) -> Result<(), EncodeError> { + fn encode( + &self, + buf: &mut B, + (key, value): (&T::Key, &T), + ) -> Result<(), EncodeError> { value.encode(key, buf, self.version) } fn compute_size(&self, (key, value): (&T::Key, &T)) -> Result { @@ -952,7 +1003,11 @@ impl Encoder<&'a T>> Encoder> for Array { } impl Encoder<&'a T>> Encoder>> for Array { - fn encode(&self, buf: &mut B, value: Option<&Vec>) -> Result<(), EncodeError> { + fn encode( + &self, + buf: &mut B, + value: Option<&Vec>, + ) -> Result<(), EncodeError> { self.encode(buf, value.map(|s| s.as_slice())) } fn compute_size(&self, value: Option<&Vec>) -> Result { @@ -961,7 +1016,11 @@ impl Encoder<&'a T>> Encoder>> for Array { } impl Encoder<&'a T>> Encoder<&Option>> for Array { - fn encode(&self, buf: &mut B, value: &Option>) -> Result<(), EncodeError> { + fn encode( + &self, + buf: &mut B, + value: &Option>, + ) -> Result<(), EncodeError> { self.encode(buf, value.as_ref()) } fn compute_size(&self, value: &Option>) -> Result { @@ -987,8 +1046,14 @@ impl Encoder<&'a T>> Encoder<&Vec> for Array { } } -impl Encoder<(&'a K, &'a V)>> Encoder>> for Array { - fn encode(&self, buf: &mut B, value: Option<&IndexMap>) -> Result<(), EncodeError> { +impl Encoder<(&'a K, &'a V)>> Encoder>> + for Array +{ + fn encode( + &self, + buf: &mut B, + value: Option<&IndexMap>, + ) -> Result<(), EncodeError> { if let Some(a) = value { if a.len() > i32::MAX as usize { error!("Array is too long to encode ({} items)", a.len()); @@ -1025,8 +1090,14 @@ impl Encoder<(&'a K, &'a V)>> Encoder Encoder<(&'a K, &'a V)>> Encoder<&Option>> for Array { - fn encode(&self, buf: &mut B, value: &Option>) -> Result<(), EncodeError> { +impl Encoder<(&'a K, &'a V)>> Encoder<&Option>> + for Array +{ + fn encode( + &self, + buf: &mut B, + value: &Option>, + ) -> Result<(), EncodeError> { self.encode(buf, value.as_ref()) } fn compute_size(&self, value: &Option>) -> Result { @@ -1035,7 +1106,11 @@ impl Encoder<(&'a K, &'a V)>> Encoder<&Option Encoder<(&'a K, &'a V)>> Encoder<&IndexMap> for Array { - fn encode(&self, buf: &mut B, value: &IndexMap) -> Result<(), EncodeError> { + fn encode( + &self, + buf: &mut B, + value: &IndexMap, + ) -> Result<(), EncodeError> { self.encode(buf, Some(value)) } fn compute_size(&self, value: &IndexMap) -> Result { @@ -1053,7 +1128,7 @@ impl> Decoder>> for Array { result.push(self.0.decode(buf)?); } Ok(Some(result)) - }, + } n => { error!("Array length is negative ({})", n); Err(DecodeError) @@ -1068,7 +1143,7 @@ impl> Decoder> for Array { Ok(None) => { error!("Array length is negative (-1)"); Err(DecodeError) - }, + } Ok(Some(s)) => Ok(s), Err(e) => Err(e), } @@ -1086,7 +1161,7 @@ impl> Decoder>> for Ar result.insert(k, v); } Ok(Some(result)) - }, + } n => { error!("Array length is negative ({})", n); Err(DecodeError) @@ -1101,7 +1176,7 @@ impl> Decoder> for Array { Ok(None) => { error!("Array length is negative (-1)"); Err(DecodeError) - }, + } Ok(Some(s)) => Ok(s), Err(e) => Err(e), } @@ -1138,7 +1213,7 @@ impl Encoder<&'a T>> Encoder> for CompactArray { error!("CompactArray is too long to encode ({} items)", a.len()); Err(EncodeError) } else if let Some(fixed_size) = self.0.fixed_size() { - Ok(UnsignedVarInt.compute_size((a.len() as u32) + 1)? + a.len()*fixed_size) + Ok(UnsignedVarInt.compute_size((a.len() as u32) + 1)? + a.len() * fixed_size) } else { let mut total_size = UnsignedVarInt.compute_size((a.len() as u32) + 1)?; for item in a { @@ -1153,7 +1228,11 @@ impl Encoder<&'a T>> Encoder> for CompactArray { } impl Encoder<&'a T>> Encoder>> for CompactArray { - fn encode(&self, buf: &mut B, value: Option<&Vec>) -> Result<(), EncodeError> { + fn encode( + &self, + buf: &mut B, + value: Option<&Vec>, + ) -> Result<(), EncodeError> { self.encode(buf, value.map(|s| s.as_slice())) } fn compute_size(&self, value: Option<&Vec>) -> Result { @@ -1162,7 +1241,11 @@ impl Encoder<&'a T>> Encoder>> for CompactArray } impl Encoder<&'a T>> Encoder<&Option>> for CompactArray { - fn encode(&self, buf: &mut B, value: &Option>) -> Result<(), EncodeError> { + fn encode( + &self, + buf: &mut B, + value: &Option>, + ) -> Result<(), EncodeError> { self.encode(buf, value.as_ref()) } fn compute_size(&self, value: &Option>) -> Result { @@ -1188,8 +1271,14 @@ impl Encoder<&'a T>> Encoder<&Vec> for CompactArray { } } -impl Encoder<(&'a K, &'a V)>> Encoder>> for CompactArray { - fn encode(&self, buf: &mut B, value: Option<&IndexMap>) -> Result<(), EncodeError> { +impl Encoder<(&'a K, &'a V)>> Encoder>> + for CompactArray +{ + fn encode( + &self, + buf: &mut B, + value: Option<&IndexMap>, + ) -> Result<(), EncodeError> { if let Some(a) = value { // Use >= because we're going to add one to the length if a.len() >= std::u32::MAX as usize { @@ -1214,7 +1303,7 @@ impl Encoder<(&'a K, &'a V)>> Encoder Encoder<(&'a K, &'a V)>> Encoder Encoder<(&'a K, &'a V)>> Encoder<&Option>> for CompactArray { - fn encode(&self, buf: &mut B, value: &Option>) -> Result<(), EncodeError> { +impl Encoder<(&'a K, &'a V)>> Encoder<&Option>> + for CompactArray +{ + fn encode( + &self, + buf: &mut B, + value: &Option>, + ) -> Result<(), EncodeError> { self.encode(buf, value.as_ref()) } fn compute_size(&self, value: &Option>) -> Result { @@ -1237,8 +1332,14 @@ impl Encoder<(&'a K, &'a V)>> Encoder<&Option Encoder<(&'a K, &'a V)>> Encoder<&IndexMap> for CompactArray { - fn encode(&self, buf: &mut B, value: &IndexMap) -> Result<(), EncodeError> { +impl Encoder<(&'a K, &'a V)>> Encoder<&IndexMap> + for CompactArray +{ + fn encode( + &self, + buf: &mut B, + value: &IndexMap, + ) -> Result<(), EncodeError> { self.encode(buf, Some(value)) } fn compute_size(&self, value: &IndexMap) -> Result { @@ -1251,12 +1352,12 @@ impl> Decoder>> for CompactArray { match UnsignedVarInt.decode(buf)? { 0 => Ok(None), n => { - let mut result = Vec::with_capacity((n-1) as usize); + let mut result = Vec::with_capacity((n - 1) as usize); for _ in 1..n { result.push(self.0.decode(buf)?); } Ok(Some(result)) - }, + } } } } @@ -1267,7 +1368,7 @@ impl> Decoder> for CompactArray { Ok(None) => { error!("CompactArray length is negative (-1)"); Err(DecodeError) - }, + } Ok(Some(s)) => Ok(s), Err(e) => Err(e), } @@ -1285,7 +1386,7 @@ impl> Decoder>> for Co result.insert(k, v); } Ok(Some(result)) - }, + } } } } @@ -1296,7 +1397,7 @@ impl> Decoder> for CompactArr Ok(None) => { error!("CompactArray length is negative (-1)"); Err(DecodeError) - }, + } Ok(Some(s)) => Ok(s), Err(e) => Err(e), } @@ -1309,7 +1410,11 @@ mod tests { use std::fmt::Debug; - fn test_encoder_decoder Encoder<&'a V> + Decoder>(encoder: E, value: V, mut expected: &[u8]) { + fn test_encoder_decoder Encoder<&'a V> + Decoder>( + encoder: E, + value: V, + mut expected: &[u8], + ) { let mut buf = vec![]; encoder.encode(&mut buf, &value).unwrap(); assert_eq!(buf, expected); @@ -1336,32 +1441,47 @@ mod tests { test_encoder_decoder(VarLong, 1, &[2]); test_encoder_decoder(VarLong, -2, &[3]); test_encoder_decoder(VarLong, 300, &[216, 4]); - test_encoder_decoder(VarLong, std::i64::MAX, &[254, 255, 255, 255, 255, 255, 255, 255, 255, 1]); - test_encoder_decoder(VarLong, std::i64::MIN, &[255, 255, 255, 255, 255, 255, 255, 255, 255, 1]); + test_encoder_decoder( + VarLong, + std::i64::MAX, + &[254, 255, 255, 255, 255, 255, 255, 255, 255, 1], + ); + test_encoder_decoder( + VarLong, + std::i64::MIN, + &[255, 255, 255, 255, 255, 255, 255, 255, 255, 1], + ); } #[test] fn smoke_string_encoder_decoder() { - test_encoder_decoder(String, std::string::String::from("hello"), &[0, 5, 104, 101, 108, 108, 111]); + test_encoder_decoder( + String, + std::string::String::from("hello"), + &[0, 5, 104, 101, 108, 108, 111], + ); test_encoder_decoder(String, None::, &[255, 255]); } #[test] fn smoke_compact_string_encoder_decoder() { - test_encoder_decoder(CompactString, std::string::String::from("hello"), &[6, 104, 101, 108, 108, 111]); + test_encoder_decoder( + CompactString, + std::string::String::from("hello"), + &[6, 104, 101, 108, 108, 111], + ); test_encoder_decoder(CompactString, None::, &[0]); } #[test] fn smoke_bytes_encoder_decoder() { - test_encoder_decoder(Bytes, vec![1,2,3,4], &[0, 0, 0, 4, 1, 2, 3, 4]); + test_encoder_decoder(Bytes, vec![1, 2, 3, 4], &[0, 0, 0, 4, 1, 2, 3, 4]); test_encoder_decoder(Bytes, None::>, &[255, 255, 255, 255]); } #[test] fn smoke_compact_bytes_encoder_decoder() { - test_encoder_decoder(CompactBytes, vec![1,2,3,4], &[5, 1, 2, 3, 4]); + test_encoder_decoder(CompactBytes, vec![1, 2, 3, 4], &[5, 1, 2, 3, 4]); test_encoder_decoder(CompactBytes, None::>, &[0]); } - } diff --git a/src/records.rs b/src/records.rs index dcbb0bc..81b1645 100644 --- a/src/records.rs +++ b/src/records.rs @@ -31,12 +31,15 @@ //! } //! ``` use bytes::Bytes; +use crc::{Crc, CRC_32_CKSUM, CRC_32_ISCSI}; use indexmap::IndexMap; -use log::{error}; -use crc::{CRC_32_CKSUM, CRC_32_ISCSI, Crc}; +use log::error; use string::TryFrom; -use crate::protocol::{Encoder, Decoder, EncodeError, DecodeError, StrBytes, types, buf::{ByteBuf, ByteBufMut, gap}}; +use crate::protocol::{ + buf::{gap, ByteBuf, ByteBufMut}, + types, DecodeError, Decoder, EncodeError, Encoder, StrBytes, +}; use super::compression::{self as cmpr, Compressor, Decompressor}; use std::cmp::Ordering; @@ -152,11 +155,11 @@ impl RecordBatchEncoder { pub fn encode<'a, B, I>( buf: &mut B, records: I, - options: &RecordEncodeOptions + options: &RecordEncodeOptions, ) -> Result<(), EncodeError> where B: ByteBufMut, - I: Iterator + Clone, + I: Iterator + Clone, { match options.version { 0..=1 => Self::encode_legacy(buf, records, options), @@ -167,11 +170,11 @@ impl RecordBatchEncoder { fn encode_legacy_records<'a, B, I>( buf: &mut B, records: I, - options: &RecordEncodeOptions + options: &RecordEncodeOptions, ) -> Result<(), EncodeError> where B: ByteBufMut, - I: Iterator + Clone, + I: Iterator + Clone, { for record in records { record.encode_legacy(buf, options)?; @@ -181,11 +184,11 @@ impl RecordBatchEncoder { fn encode_legacy<'a, B, I>( buf: &mut B, records: I, - options: &RecordEncodeOptions + options: &RecordEncodeOptions, ) -> Result<(), EncodeError> where B: ByteBufMut, - I: Iterator + Clone, + I: Iterator + Clone, { if options.compression == Compression::None { // No wrapper needed @@ -200,7 +203,11 @@ impl RecordBatchEncoder { Record::encode_legacy_static(buf, options, |buf| { // Timestamp if options.version > 0 { - let min_timestamp = records.clone().map(|r| r.timestamp).min().unwrap_or_default(); + let min_timestamp = records + .clone() + .map(|r| r.timestamp) + .min() + .unwrap_or_default(); types::Int64.encode(buf, min_timestamp)?; }; @@ -211,15 +218,22 @@ impl RecordBatchEncoder { let size_gap = buf.put_typed_gap(gap::I32); let value_start = buf.offset(); match options.compression { - Compression::Snappy => cmpr::Snappy::compress(buf, |buf| Self::encode_legacy_records(buf, records, &inner_opts))?, - Compression::Gzip => cmpr::Gzip::compress(buf, |buf| Self::encode_legacy_records(buf, records, &inner_opts))?, + Compression::Snappy => cmpr::Snappy::compress(buf, |buf| { + Self::encode_legacy_records(buf, records, &inner_opts) + })?, + Compression::Gzip => cmpr::Gzip::compress(buf, |buf| { + Self::encode_legacy_records(buf, records, &inner_opts) + })?, _ => unimplemented!(), } let value_end = buf.offset(); let value_size = value_end - value_start; if value_size > i32::MAX as usize { - error!("Record batch was too large to encode ({} bytes)", value_size); + error!( + "Record batch was too large to encode ({} bytes)", + value_size + ); return Err(EncodeError); } buf.fill_typed_gap(size_gap, value_size as i32); @@ -229,17 +243,17 @@ impl RecordBatchEncoder { } Ok(()) } - + fn encode_new_records<'a, B, I>( buf: &mut B, records: I, min_offset: i64, min_timestamp: i64, - options: &RecordEncodeOptions + options: &RecordEncodeOptions, ) -> Result<(), EncodeError> where B: ByteBufMut, - I: Iterator, + I: Iterator, { for record in records { record.encode_new(buf, min_offset, min_timestamp, options)?; @@ -250,11 +264,11 @@ impl RecordBatchEncoder { fn encode_new_batch<'a, B, I>( buf: &mut B, records: &mut I, - options: &RecordEncodeOptions + options: &RecordEncodeOptions, ) -> Result where B: ByteBufMut, - I: Iterator + Clone, + I: Iterator + Clone, { let mut record_peeker = records.clone(); @@ -265,21 +279,47 @@ impl RecordBatchEncoder { }; // Determine how many additional records can be included in the batch - let num_records = record_peeker.take_while(|record| { - record.transactional == first_record.transactional && - record.control == first_record.control && - record.partition_leader_epoch == first_record.partition_leader_epoch && - record.producer_id == first_record.producer_id && - record.producer_epoch == first_record.producer_epoch && - (record.offset as i32).wrapping_sub(record.sequence) == (first_record.offset as i32).wrapping_sub(first_record.sequence) - }).count() + 1; + let num_records = record_peeker + .take_while(|record| { + record.transactional == first_record.transactional + && record.control == first_record.control + && record.partition_leader_epoch == first_record.partition_leader_epoch + && record.producer_id == first_record.producer_id + && record.producer_epoch == first_record.producer_epoch + && (record.offset as i32).wrapping_sub(record.sequence) + == (first_record.offset as i32).wrapping_sub(first_record.sequence) + }) + .count() + + 1; // Aggregate various record properties - let min_offset = records.clone().take(num_records).map(|r| r.offset).min().expect("Batch contains at least one element"); - let max_offset = records.clone().take(num_records).map(|r| r.offset).max().expect("Batch contains at least one element"); - let min_timestamp = records.clone().take(num_records).map(|r| r.timestamp).min().expect("Batch contains at least one element"); - let max_timestamp = records.clone().take(num_records).map(|r| r.timestamp).max().expect("Batch contains at least one element"); - let base_sequence = first_record.sequence.wrapping_sub((first_record.offset - min_offset) as i32); + let min_offset = records + .clone() + .take(num_records) + .map(|r| r.offset) + .min() + .expect("Batch contains at least one element"); + let max_offset = records + .clone() + .take(num_records) + .map(|r| r.offset) + .max() + .expect("Batch contains at least one element"); + let min_timestamp = records + .clone() + .take(num_records) + .map(|r| r.timestamp) + .min() + .expect("Batch contains at least one element"); + let max_timestamp = records + .clone() + .take(num_records) + .map(|r| r.timestamp) + .max() + .expect("Batch contains at least one element"); + let base_sequence = first_record + .sequence + .wrapping_sub((first_record.offset - min_offset) as i32); // Base offset types::Int64.encode(buf, min_offset)?; @@ -328,7 +368,10 @@ impl RecordBatchEncoder { // Record count if num_records > i32::MAX as usize { - error!("Too many records to encode in one batch ({} records)", num_records); + error!( + "Too many records to encode in one batch ({} records)", + num_records + ); return Err(EncodeError); } types::Int32.encode(buf, num_records as i32)?; @@ -336,9 +379,15 @@ impl RecordBatchEncoder { // Records let records = records.take(num_records); match options.compression { - Compression::None => cmpr::None::compress(buf, |buf| Self::encode_new_records(buf, records, min_offset, min_timestamp, options))?, - Compression::Snappy => cmpr::Snappy::compress(buf, |buf| Self::encode_new_records(buf, records, min_offset, min_timestamp, options))?, - Compression::Gzip => cmpr::Gzip::compress(buf, |buf| Self::encode_new_records(buf, records, min_offset, min_timestamp, options))?, + Compression::None => cmpr::None::compress(buf, |buf| { + Self::encode_new_records(buf, records, min_offset, min_timestamp, options) + })?, + Compression::Snappy => cmpr::Snappy::compress(buf, |buf| { + Self::encode_new_records(buf, records, min_offset, min_timestamp, options) + })?, + Compression::Gzip => cmpr::Gzip::compress(buf, |buf| { + Self::encode_new_records(buf, records, min_offset, min_timestamp, options) + })?, _ => unimplemented!(), } @@ -347,7 +396,10 @@ impl RecordBatchEncoder { // Fill size gap let batch_size = batch_end - batch_start; if batch_size > i32::MAX as usize { - error!("Record batch was too large to encode ({} bytes)", batch_size); + error!( + "Record batch was too large to encode ({} bytes)", + batch_size + ); return Err(EncodeError); } @@ -363,11 +415,11 @@ impl RecordBatchEncoder { fn encode_new<'a, B, I>( buf: &mut B, mut records: I, - options: &RecordEncodeOptions + options: &RecordEncodeOptions, ) -> Result<(), EncodeError> where B: ByteBufMut, - I: Iterator + Clone, + I: Iterator + Clone, { while Self::encode_new_batch(buf, &mut records, options)? {} Ok(()) @@ -384,7 +436,7 @@ impl RecordBatchDecoder { Ok(records) } fn decode_batch(buf: &mut B, records: &mut Vec) -> Result<(), DecodeError> { - let version = buf.try_peek_bytes(MAGIC_BYTE_OFFSET..(MAGIC_BYTE_OFFSET+1))?[0] as i8; + let version = buf.try_peek_bytes(MAGIC_BYTE_OFFSET..(MAGIC_BYTE_OFFSET + 1))?[0] as i8; debug!("Decoding record batch (version: {})", version); match version { 0..=1 => Record::decode_legacy(buf, version, records), @@ -392,7 +444,7 @@ impl RecordBatchDecoder { _ => { error!("Unknown record batch version ({})", version); Err(DecodeError) - }, + } } } fn decode_new_records( @@ -407,7 +459,11 @@ impl RecordBatchDecoder { } Ok(()) } - fn decode_new_batch(buf: &mut B, version: i8, records: &mut Vec) -> Result<(), DecodeError> { + fn decode_new_batch( + buf: &mut B, + version: i8, + records: &mut Vec, + ) -> Result<(), DecodeError> { // Base offset let min_offset = types::Int64.decode(buf)?; @@ -434,9 +490,12 @@ impl RecordBatchDecoder { // CRC let supplied_crc: u32 = types::UInt32.decode(buf)?; let actual_crc = CASTAGNOLI.checksum(buf); - + if supplied_crc != actual_crc { - error!("Cyclic redundancy check failed ({} != {})", supplied_crc, actual_crc); + error!( + "Cyclic redundancy check failed ({} != {})", + supplied_crc, actual_crc + ); return Err(DecodeError); } @@ -502,9 +561,15 @@ impl RecordBatchDecoder { // Records match compression { - Compression::None => cmpr::None::decompress(buf, |buf| Self::decode_new_records(buf, &batch_decode_info, version, records))?, - Compression::Snappy => cmpr::Snappy::decompress(buf, |buf| Self::decode_new_records(buf, &batch_decode_info, version, records))?, - Compression::Gzip => cmpr::Gzip::decompress(buf, |buf| Self::decode_new_records(buf, &batch_decode_info, version, records))?, + Compression::None => cmpr::None::decompress(buf, |buf| { + Self::decode_new_records(buf, &batch_decode_info, version, records) + })?, + Compression::Snappy => cmpr::Snappy::decompress(buf, |buf| { + Self::decode_new_records(buf, &batch_decode_info, version, records) + })?, + Compression::Gzip => cmpr::Gzip::decompress(buf, |buf| { + Self::decode_new_records(buf, &batch_decode_info, version, records) + })?, _ => unimplemented!(), }; @@ -513,10 +578,14 @@ impl RecordBatchDecoder { } impl Record { - fn encode_legacy_static(buf: &mut B, options: &RecordEncodeOptions, content_writer: F) -> Result<(), EncodeError> + fn encode_legacy_static( + buf: &mut B, + options: &RecordEncodeOptions, + content_writer: F, + ) -> Result<(), EncodeError> where B: ByteBufMut, - F: FnOnce(&mut B) -> Result<(), EncodeError> + F: FnOnce(&mut B) -> Result<(), EncodeError>, { types::Int64.encode(buf, 0)?; let size_gap = buf.put_typed_gap(gap::I32); @@ -528,7 +597,10 @@ impl Record { let compression = options.compression as i8; if compression > 2 + options.version { - error!("Compression algorithm '{:?}' is unsupported for record version '{}'", options.compression, options.version); + error!( + "Compression algorithm '{:?}' is unsupported for record version '{}'", + options.compression, options.version + ); return Err(EncodeError); } types::Int8.encode(buf, compression)?; @@ -550,7 +622,11 @@ impl Record { Ok(()) } - fn encode_legacy(&self, buf: &mut B, options: &RecordEncodeOptions) -> Result<(), EncodeError> { + fn encode_legacy( + &self, + buf: &mut B, + options: &RecordEncodeOptions, + ) -> Result<(), EncodeError> { if self.transactional || self.control { error!("Transactional and control records are not supported in this version of the protocol!"); return Err(EncodeError); @@ -571,7 +647,13 @@ impl Record { Ok(()) }) } - fn encode_new(&self, buf: &mut B, min_offset: i64, min_timestamp: i64, options: &RecordEncodeOptions) -> Result<(), EncodeError> { + fn encode_new( + &self, + buf: &mut B, + min_offset: i64, + min_timestamp: i64, + options: &RecordEncodeOptions, + ) -> Result<(), EncodeError> { // Size let size = self.compute_size_new(min_offset, min_timestamp, options)?; if size > i32::MAX as usize { @@ -586,7 +668,10 @@ impl Record { // Timestamp delta let timestamp_delta = self.timestamp - min_timestamp; if timestamp_delta > i32::MAX as i64 || timestamp_delta < i32::MIN as i64 { - error!("Timestamps within batch are too far apart ({}, {})", min_timestamp, self.timestamp); + error!( + "Timestamps within batch are too far apart ({}, {})", + min_timestamp, self.timestamp + ); return Err(EncodeError); } types::VarInt.encode(buf, timestamp_delta as i32)?; @@ -594,7 +679,10 @@ impl Record { // Offset delta let offset_delta = self.offset - min_offset; if offset_delta > i32::MAX as i64 || offset_delta < i32::MIN as i64 { - error!("Timestamps within batch are too far apart ({}, {})", min_offset, self.offset); + error!( + "Timestamps within batch are too far apart ({}, {})", + min_offset, self.offset + ); return Err(EncodeError); } types::VarInt.encode(buf, offset_delta as i32)?; @@ -632,7 +720,10 @@ impl Record { for (k, v) in &self.headers { // Key len if k.len() > i32::MAX as usize { - error!("Record header key was too large to encode ({} bytes)", k.len()); + error!( + "Record header key was too large to encode ({} bytes)", + k.len() + ); return Err(EncodeError); } types::VarInt.encode(buf, k.len() as i32)?; @@ -643,7 +734,10 @@ impl Record { // Value if let Some(v) = v.as_ref() { if v.len() > i32::MAX as usize { - error!("Record header value was too large to encode ({} bytes)", v.len()); + error!( + "Record header value was too large to encode ({} bytes)", + v.len() + ); return Err(EncodeError); } types::VarInt.encode(buf, v.len() as i32)?; @@ -655,7 +749,12 @@ impl Record { Ok(()) } - fn compute_size_new(&self, min_offset: i64, min_timestamp: i64, _options: &RecordEncodeOptions) -> Result { + fn compute_size_new( + &self, + min_offset: i64, + min_timestamp: i64, + _options: &RecordEncodeOptions, + ) -> Result { let mut total_size = 0; // Attributes @@ -664,7 +763,10 @@ impl Record { // Timestamp delta let timestamp_delta = self.timestamp - min_timestamp; if timestamp_delta > i32::MAX as i64 || timestamp_delta < i32::MIN as i64 { - error!("Timestamps within batch are too far apart ({}, {})", min_timestamp, self.timestamp); + error!( + "Timestamps within batch are too far apart ({}, {})", + min_timestamp, self.timestamp + ); return Err(EncodeError); } total_size += types::VarInt.compute_size(timestamp_delta as i32)?; @@ -672,7 +774,10 @@ impl Record { // Offset delta let offset_delta = self.offset - min_offset; if offset_delta > i32::MAX as i64 || offset_delta < i32::MIN as i64 { - error!("Timestamps within batch are too far apart ({}, {})", min_offset, self.offset); + error!( + "Timestamps within batch are too far apart ({}, {})", + min_offset, self.offset + ); return Err(EncodeError); } total_size += types::VarInt.compute_size(offset_delta as i32)?; @@ -710,7 +815,10 @@ impl Record { for (k, v) in &self.headers { // Key len if k.len() > i32::MAX as usize { - error!("Record header key was too large to encode ({} bytes)", k.len()); + error!( + "Record header key was too large to encode ({} bytes)", + k.len() + ); return Err(EncodeError); } total_size += types::VarInt.compute_size(k.len() as i32)?; @@ -721,7 +829,10 @@ impl Record { // Value if let Some(v) = v.as_ref() { if v.len() > i32::MAX as usize { - error!("Record header value was too large to encode ({} bytes)", v.len()); + error!( + "Record header value was too large to encode ({} bytes)", + v.len() + ); return Err(EncodeError); } total_size += types::VarInt.compute_size(v.len() as i32)?; @@ -733,7 +844,11 @@ impl Record { Ok(total_size) } - fn decode_legacy(buf: &mut B, version: i8, records: &mut Vec) -> Result<(), DecodeError> { + fn decode_legacy( + buf: &mut B, + version: i8, + records: &mut Vec, + ) -> Result<(), DecodeError> { let offset = types::Int64.decode(buf)?; let size: i32 = types::Int32.decode(buf)?; if size < 0 { @@ -747,9 +862,12 @@ impl Record { // CRC let supplied_crc: u32 = types::UInt32.decode(buf)?; let actual_crc = IEEE.checksum(buf); - + if supplied_crc != actual_crc { - error!("Cyclic redundancy check failed ({} != {})", supplied_crc, actual_crc); + error!( + "Cyclic redundancy check failed ({} != {})", + supplied_crc, actual_crc + ); return Err(DecodeError); } @@ -817,7 +935,11 @@ impl Record { Ok(()) } - fn decode_new(buf: &mut B, batch_decode_info: &BatchDecodeInfo, _version: i8) -> Result { + fn decode_new( + buf: &mut B, + batch_decode_info: &BatchDecodeInfo, + _version: i8, + ) -> Result { // Size let size: i32 = types::VarInt.decode(buf)?; if size < 0 { @@ -846,20 +968,23 @@ impl Record { Ordering::Less => { error!("Unexpected negative record key length ({} bytes)", key_len); return Err(DecodeError); - }, + } Ordering::Equal => None, - Ordering::Greater => Some(buf.try_get_bytes(key_len as usize)?) + Ordering::Greater => Some(buf.try_get_bytes(key_len as usize)?), }; // Value let value_len: i32 = types::VarInt.decode(buf)?; let value = match value_len.cmp(&-1) { Ordering::Less => { - error!("Unexpected negative record value length ({} bytes)", value_len); + error!( + "Unexpected negative record value length ({} bytes)", + value_len + ); return Err(DecodeError); - }, + } Ordering::Equal => None, - Ordering::Greater => Some(buf.try_get_bytes(value_len as usize)?) + Ordering::Greater => Some(buf.try_get_bytes(value_len as usize)?), }; // Headers @@ -875,7 +1000,10 @@ impl Record { // Key len let key_len: i32 = types::VarInt.decode(buf)?; if key_len < 0 { - error!("Unexpected negative record header key length ({} bytes)", key_len); + error!( + "Unexpected negative record header key length ({} bytes)", + key_len + ); return Err(DecodeError); } @@ -888,11 +1016,14 @@ impl Record { // Value let value = match value_len.cmp(&-2) { Ordering::Less => { - error!("Unexpected negative record header value length ({} bytes)", value_len); - return Err(DecodeError); - }, + error!( + "Unexpected negative record header value length ({} bytes)", + value_len + ); + return Err(DecodeError); + } Ordering::Equal => None, - Ordering::Greater => Some(buf.try_get_bytes(value_len as usize)?) + Ordering::Greater => Some(buf.try_get_bytes(value_len as usize)?), }; headers.insert(key, value); diff --git a/tests/api_versions.rs b/tests/api_versions.rs index 8e9a917..3fb5fc5 100644 --- a/tests/api_versions.rs +++ b/tests/api_versions.rs @@ -1,14 +1,12 @@ -use kafka_protocol::messages::ApiVersionsRequest; use bytes::Bytes; +use kafka_protocol::messages::ApiVersionsRequest; use kafka_protocol::protocol::Decodable; #[test] fn api_versions() { let bytes: [u8; 25] = [ - 0x12, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2d, - 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2d, 0x6a, 0x61, - 0x76, 0x61, 0x06, 0x32, 0x2e, 0x38, 0x2e, 0x30, - 0x00 + 0x12, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2d, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2d, 0x6a, + 0x61, 0x76, 0x61, 0x06, 0x32, 0x2e, 0x38, 0x2e, 0x30, 0x00, ]; let res = ApiVersionsRequest::decode(&mut Bytes::from(bytes.to_vec()), 3).unwrap(); diff --git a/tests/fetch_response.rs b/tests/fetch_response.rs index 5b0c160..85536b8 100644 --- a/tests/fetch_response.rs +++ b/tests/fetch_response.rs @@ -1,9 +1,5 @@ use bytes::Bytes; -use kafka_protocol::{ - protocol::Decodable, - messages::FetchResponse, - records::RecordBatchDecoder, -}; +use kafka_protocol::{messages::FetchResponse, protocol::Decodable, records::RecordBatchDecoder}; const HEADERS: [u8; 45] = [ // Throttle time @@ -32,9 +28,9 @@ const FIRST_RECORD: [u8; 79] = [ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, // Producer epoch 0x0, 0x0, // First sequence 0x0, 0x0, 0x0, 0x0, // Number of records - 0x0, 0x0, 0x0, 0x1, // Size + 0x0, 0x0, 0x0, 0x1, // Size 0x22, // Attributes - 0x1, // Timestamp delta + 0x1, // Timestamp delta 0xd0, 0xf, // Offset delta 0x2, // Key 0xa, 0x68, 0x65, 0x6c, 0x6c, 0x6f, // Value @@ -56,9 +52,9 @@ const SECOND_RECORD: [u8; 79] = [ 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, // Producer epoch 0x0, 0x0, // First sequence 0x0, 0x0, 0x0, 0x0, // Number of records - 0x0, 0x0, 0x0, 0x1, // Size + 0x0, 0x0, 0x0, 0x1, // Size 0x22, // Attributes - 0x1, // Timestamp delta + 0x1, // Timestamp delta 0xd0, 0xf, // Offset delta 0x2, // Key 0xa, 0x68, 0x69, 0x69, 0x69, 0x69, // Value diff --git a/tests/request_header.rs b/tests/request_header.rs index b9ecb3f..07086f7 100644 --- a/tests/request_header.rs +++ b/tests/request_header.rs @@ -1,23 +1,24 @@ -use std::convert::TryFrom; -use kafka_protocol::messages::{RequestHeader, ApiKey}; -use kafka_protocol::protocol::Decodable; -use kafka_protocol::protocol::buf::ByteBuf; -use bytes::{Bytes, Buf}; - -#[test] -fn request_header() { - let mut bytes = Bytes::from(vec![ - 0x00, 0x12, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, - 0x00, 0x0d, 0x61, 0x64, 0x6d, 0x69, 0x6e, 0x63, - 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x2d, 0x31, 0x00 - ]); - - let api_key = bytes.peek_bytes(0..2).get_i16(); - let api_version = bytes.peek_bytes(2..4).get_i16(); - let header_version = ApiKey::try_from(api_key).unwrap().request_header_version(api_version); - let res = RequestHeader::decode(&mut bytes, header_version).unwrap(); - - assert_eq!(res.request_api_key, 18); - assert_eq!(res.request_api_version, 3); - assert_eq!(res.client_id.unwrap().to_string(), "adminclient-1"); -} \ No newline at end of file +use bytes::{Buf, Bytes}; +use kafka_protocol::messages::{ApiKey, RequestHeader}; +use kafka_protocol::protocol::buf::ByteBuf; +use kafka_protocol::protocol::Decodable; +use std::convert::TryFrom; + +#[test] +fn request_header() { + let mut bytes = Bytes::from(vec![ + 0x00, 0x12, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0d, 0x61, 0x64, 0x6d, 0x69, 0x6e, + 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x2d, 0x31, 0x00, + ]); + + let api_key = bytes.peek_bytes(0..2).get_i16(); + let api_version = bytes.peek_bytes(2..4).get_i16(); + let header_version = ApiKey::try_from(api_key) + .unwrap() + .request_header_version(api_version); + let res = RequestHeader::decode(&mut bytes, header_version).unwrap(); + + assert_eq!(res.request_api_key, 18); + assert_eq!(res.request_api_version, 3); + assert_eq!(res.client_id.unwrap().to_string(), "adminclient-1"); +}