Skip to content

Commit

Permalink
Updates from Kafka upstream. (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
tychedelia authored Feb 8, 2023
1 parent b62f904 commit 30425e3
Show file tree
Hide file tree
Showing 152 changed files with 3,902 additions and 419 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#### Enhancements:

- Add derrive builder for all messages.
- Add derive builder for all messages.

## v0.1.0

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ snap = "1.0.5"
flate2 = "1.0.20"
string = "0.3.0"
derive_builder = "0.10.2"
paste = "1.0.7"
paste = "1.0.7"
79 changes: 77 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,84 @@ Unlike other Kafka protocol implementations, this project uses code generation t
including different protocol versions. See [Kafka's repo](https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message)
for an example of protocol schema.

The code generator fetch the Kafka [repo](https://github.com/apache/kafka), and use the default branch to generate codes.
## Versioning

## For Developers
Protocol messages are generated against the most recent stable Kafka release, currently [3.3.2](https://github.com/apache/kafka/releases/tag/3.3.2).

Although the Kafka protocol remains relatively stable and strives to be backwards compatible, new fields are occasionally
added. In order to ensure forward compatability with the protocol, this crate marks all exported items as `#[non-exhaustive]`.
Protocol messages can be constructed using either `Default::default` or their provided [builder](https://docs.rs/derive_builder/latest/derive_builder/).

## Working with messages

Using `Default::default`:
```rust
use kafka_protocol::messages::{ApiKey, MetadataRequest, RequestHeader};
use kafka_protocol::protocol::StrBytes;

let mut header = RequestHeader::default();
header.client_id = Some(StrBytes::from_str("my-client"));
header.request_api_key = ApiKey::MetadataKey as i16;
header.request_api_version = 12;

let mut request = MetadataRequest::default();
request.topics = None;
request.allow_auto_topic_creation = true;
```

Using `kafka_protocol::protocol::Builder`:
```rust
use kafka_protocol::messages::{ApiKey, MetadataRequest, RequestHeader};
use kafka_protocol::protocol::{Builder, StrBytes};

let header = RequestHeader::builder()
.client_id(Some(StrBytes::from_str("my-client")))
.request_api_key(ApiKey::MetadataKey as i16)
.request_api_version(12)
.build();
!
let request = MetadataRequest::builder()
.topics(None)
.allow_auto_topic_creation(true)
.build();
```
### Serialization

Once a message has been created, it can be serialized using [`Encodable`], writing
the struct to a provided [`bytes::BytesMut`]. The API version for the given message
matching the version specified in the request header must be provided.

```rust
use bytes::BytesMut;
use kafka_protocol::messages::MetadataRequest;
use kafka_protocol::protocol::Encodable;

let mut bytes = BytesMut::new();
let request = MetadataRequest::default();
request.encode(&mut bytes, 12).unwrap();
```

### Deserialization

Messages can be decoded using [`Decodobale`] and providing the matching API version from their
corresponding request.

```rust
use bytes::Bytes;
use kafka_protocol::messages::ApiVersionsRequest;
use kafka_protocol::protocol::Decodable;

let bytes: [u8; 25] = [
0x12, 0x61, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2d,
0x6b, 0x61, 0x66, 0x6b, 0x61, 0x2d, 0x6a, 0x61,
0x76, 0x61, 0x06, 0x32, 0x2e, 0x38, 0x2e, 0x30,
0x00
];

let res = ApiVersionsRequest::decode(&mut Bytes::from(bytes.to_vec()), 3).unwrap();
```

### Development

Run `cargo run -p protocol_codegen` in the root path of this repo to generate/update the Rust codes via the latest Kafka
protocol schema.
Expand Down
2 changes: 1 addition & 1 deletion protocol_codegen/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ serde_plain = "0.3.0"
parse-display = "0.1.1"
json_comments = "0.2.0"
assert-json-diff = "1.0.1"
git2 = "0.13"
git2 = "0.16"
tempfile = "3.2.0"
uuid = { version = "0.8.2", features = ["v4", "serde" ] }
15 changes: 14 additions & 1 deletion protocol_codegen/src/generate_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::io::Write;
use std::collections::{BTreeSet, BTreeMap};

use failure::Error;
use git2::Oid;

mod code_writer;
pub mod expr;
Expand All @@ -23,11 +24,21 @@ pub fn run() -> Result<(), Error> {

// Download messages from head of Kafka repo
println!("Cloning kafka repo");
git2::build::RepoBuilder::new()
let repo = git2::build::RepoBuilder::new()
.fetch_options(git2::FetchOptions::new())
.with_checkout(git2::build::CheckoutBuilder::new())
.clone("https://github.com/apache/kafka.git", &input_tmpdir.as_path())?;

// Checkout the release commit
// https://github.com/apache/kafka/releases/tag/3.3.2
// checking out a tag with git2 is annoying -- we pin to the tag's commit sha instead
let release_commit = "b66af662e61082cb8def576ded1fe5cee37e155f";
println!("Checking out release {}", release_commit);
let oid = Oid::from_str(release_commit).unwrap();
let commit = repo.find_commit(oid).expect("Could not find release commit!").into_object();
repo.checkout_tree(&commit, None).unwrap();
repo.set_head_detached(commit.id()).unwrap();

// Clear output directory
for file in fs::read_dir(output_path)? {
let file = file?;
Expand Down Expand Up @@ -131,6 +142,7 @@ pub fn run() -> Result<(), Error> {
writeln!(module_file)?;

writeln!(module_file, "/// Wrapping enum for all requests in the Kafka protocol.")?;
writeln!(module_file, "#[non_exhaustive]")?;
writeln!(module_file, "#[derive(Debug, Clone, PartialEq)]")?;
writeln!(module_file, "pub enum RequestKind {{")?;
for (_, request_type) in request_types.iter() {
Expand All @@ -141,6 +153,7 @@ pub fn run() -> Result<(), Error> {
writeln!(module_file)?;

writeln!(module_file, "/// Wrapping enum for all responses in the Kafka protocol.")?;
writeln!(module_file, "#[non_exhaustive]")?;
writeln!(module_file, "#[derive(Debug, Clone, PartialEq)]")?;
writeln!(module_file, "pub enum ResponseKind {{")?;
for (_, response_type) in response_types.iter() {
Expand Down
18 changes: 17 additions & 1 deletion protocol_codegen/src/generate_messages/generate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ fn write_struct_def<W: Write>(
}

writeln!(w, "/// Valid versions: {}", valid_versions)?;
writeln!(w, "#[non_exhaustive]")?;
writeln!(w, "#[derive(Debug, Clone, PartialEq, derive_builder::Builder)]")?;
write!(w, "pub struct {} ", name)?;
w.block(|w| {
Expand All @@ -738,6 +739,21 @@ fn write_struct_def<W: Write>(
writeln!(w)?;
writeln!(w)?;

write!(w, "impl Builder for {} ", name)?;
w.block(|w| {
writeln!(w, "type Builder = {}Builder;", name)?;
writeln!(w)?;
write!(w, "fn builder() -> Self::Builder")?;
w.block(|w| {
writeln!(w, "{}Builder::default()", name)?;
Ok(())
})?;
writeln!(w)?;
Ok(())
})?;
writeln!(w)?;
writeln!(w)?;

if map_key.is_some() {
write!(w, "impl MapEncodable for {} ", name)?;
} else {
Expand Down Expand Up @@ -887,7 +903,7 @@ fn write_file_header<W: Write>(
writeln!(w)?;
writeln!(w, "use crate::protocol::{{")?;
writeln!(w, " Encodable, Decodable, MapEncodable, MapDecodable, Encoder, Decoder, EncodeError, DecodeError, Message, HeaderVersion, VersionRange,")?;
writeln!(w, " types, write_unknown_tagged_fields, compute_unknown_tagged_fields_size, StrBytes, buf::{{ByteBuf, ByteBufMut}}")?;
writeln!(w, " types, write_unknown_tagged_fields, compute_unknown_tagged_fields_size, StrBytes, buf::{{ByteBuf, ByteBufMut}}, Builder")?;
writeln!(w, "}};")?;
writeln!(w)?;
writeln!(w)?;
Expand Down
2 changes: 2 additions & 0 deletions src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,7 @@ impl TryFrom<i16> for ApiKey {
}

/// Wrapping enum for all requests in the Kafka protocol.
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq)]
pub enum RequestKind {
/// ProduceRequest,
Expand Down Expand Up @@ -1139,6 +1140,7 @@ pub enum RequestKind {
}

/// Wrapping enum for all responses in the Kafka protocol.
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq)]
pub enum ResponseKind {
/// ProduceResponse,
Expand Down
11 changes: 10 additions & 1 deletion src/messages/add_offsets_to_txn_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ use uuid::Uuid;

use crate::protocol::{
Encodable, Decodable, MapEncodable, MapDecodable, Encoder, Decoder, EncodeError, DecodeError, Message, HeaderVersion, VersionRange,
types, write_unknown_tagged_fields, compute_unknown_tagged_fields_size, StrBytes, buf::{ByteBuf, ByteBufMut}
types, write_unknown_tagged_fields, compute_unknown_tagged_fields_size, StrBytes, buf::{ByteBuf, ByteBufMut}, Builder
};


/// Valid versions: 0-3
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, derive_builder::Builder)]
pub struct AddOffsetsToTxnRequest {
/// The transactional id corresponding to the transaction.
Expand All @@ -44,6 +45,14 @@ pub struct AddOffsetsToTxnRequest {
pub unknown_tagged_fields: BTreeMap<i32, Vec<u8>>,
}

impl Builder for AddOffsetsToTxnRequest {
type Builder = AddOffsetsToTxnRequestBuilder;

fn builder() -> Self::Builder{
AddOffsetsToTxnRequestBuilder::default()
}
}

impl Encodable for AddOffsetsToTxnRequest {
fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<(), EncodeError> {
if version >= 3 {
Expand Down
11 changes: 10 additions & 1 deletion src/messages/add_offsets_to_txn_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ use uuid::Uuid;

use crate::protocol::{
Encodable, Decodable, MapEncodable, MapDecodable, Encoder, Decoder, EncodeError, DecodeError, Message, HeaderVersion, VersionRange,
types, write_unknown_tagged_fields, compute_unknown_tagged_fields_size, StrBytes, buf::{ByteBuf, ByteBufMut}
types, write_unknown_tagged_fields, compute_unknown_tagged_fields_size, StrBytes, buf::{ByteBuf, ByteBufMut}, Builder
};


/// Valid versions: 0-3
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, derive_builder::Builder)]
pub struct AddOffsetsToTxnResponse {
/// Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
Expand All @@ -34,6 +35,14 @@ pub struct AddOffsetsToTxnResponse {
pub unknown_tagged_fields: BTreeMap<i32, Vec<u8>>,
}

impl Builder for AddOffsetsToTxnResponse {
type Builder = AddOffsetsToTxnResponseBuilder;

fn builder() -> Self::Builder{
AddOffsetsToTxnResponseBuilder::default()
}
}

impl Encodable for AddOffsetsToTxnResponse {
fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<(), EncodeError> {
types::Int32.encode(buf, &self.throttle_time_ms)?;
Expand Down
20 changes: 19 additions & 1 deletion src/messages/add_partitions_to_txn_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ use uuid::Uuid;

use crate::protocol::{
Encodable, Decodable, MapEncodable, MapDecodable, Encoder, Decoder, EncodeError, DecodeError, Message, HeaderVersion, VersionRange,
types, write_unknown_tagged_fields, compute_unknown_tagged_fields_size, StrBytes, buf::{ByteBuf, ByteBufMut}
types, write_unknown_tagged_fields, compute_unknown_tagged_fields_size, StrBytes, buf::{ByteBuf, ByteBufMut}, Builder
};


/// Valid versions: 0-3
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, derive_builder::Builder)]
pub struct AddPartitionsToTxnTopic {
/// The partition indexes to add to the transaction
Expand All @@ -29,6 +30,14 @@ pub struct AddPartitionsToTxnTopic {
pub unknown_tagged_fields: BTreeMap<i32, Vec<u8>>,
}

impl Builder for AddPartitionsToTxnTopic {
type Builder = AddPartitionsToTxnTopicBuilder;

fn builder() -> Self::Builder{
AddPartitionsToTxnTopicBuilder::default()
}
}

impl MapEncodable for AddPartitionsToTxnTopic {
type Key = super::TopicName;
fn encode<B: ByteBufMut>(&self, key: &Self::Key, buf: &mut B, version: i16) -> Result<(), EncodeError> {
Expand Down Expand Up @@ -125,6 +134,7 @@ impl Message for AddPartitionsToTxnTopic {
}

/// Valid versions: 0-3
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, derive_builder::Builder)]
pub struct AddPartitionsToTxnRequest {
/// The transactional id corresponding to the transaction.
Expand All @@ -151,6 +161,14 @@ pub struct AddPartitionsToTxnRequest {
pub unknown_tagged_fields: BTreeMap<i32, Vec<u8>>,
}

impl Builder for AddPartitionsToTxnRequest {
type Builder = AddPartitionsToTxnRequestBuilder;

fn builder() -> Self::Builder{
AddPartitionsToTxnRequestBuilder::default()
}
}

impl Encodable for AddPartitionsToTxnRequest {
fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<(), EncodeError> {
if version >= 3 {
Expand Down
29 changes: 28 additions & 1 deletion src/messages/add_partitions_to_txn_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ use uuid::Uuid;

use crate::protocol::{
Encodable, Decodable, MapEncodable, MapDecodable, Encoder, Decoder, EncodeError, DecodeError, Message, HeaderVersion, VersionRange,
types, write_unknown_tagged_fields, compute_unknown_tagged_fields_size, StrBytes, buf::{ByteBuf, ByteBufMut}
types, write_unknown_tagged_fields, compute_unknown_tagged_fields_size, StrBytes, buf::{ByteBuf, ByteBufMut}, Builder
};


/// Valid versions: 0-3
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, derive_builder::Builder)]
pub struct AddPartitionsToTxnPartitionResult {
/// The response error code.
Expand All @@ -29,6 +30,14 @@ pub struct AddPartitionsToTxnPartitionResult {
pub unknown_tagged_fields: BTreeMap<i32, Vec<u8>>,
}

impl Builder for AddPartitionsToTxnPartitionResult {
type Builder = AddPartitionsToTxnPartitionResultBuilder;

fn builder() -> Self::Builder{
AddPartitionsToTxnPartitionResultBuilder::default()
}
}

impl MapEncodable for AddPartitionsToTxnPartitionResult {
type Key = i32;
fn encode<B: ByteBufMut>(&self, key: &Self::Key, buf: &mut B, version: i16) -> Result<(), EncodeError> {
Expand Down Expand Up @@ -101,6 +110,7 @@ impl Message for AddPartitionsToTxnPartitionResult {
}

/// Valid versions: 0-3
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, derive_builder::Builder)]
pub struct AddPartitionsToTxnTopicResult {
/// The results for each partition
Expand All @@ -112,6 +122,14 @@ pub struct AddPartitionsToTxnTopicResult {
pub unknown_tagged_fields: BTreeMap<i32, Vec<u8>>,
}

impl Builder for AddPartitionsToTxnTopicResult {
type Builder = AddPartitionsToTxnTopicResultBuilder;

fn builder() -> Self::Builder{
AddPartitionsToTxnTopicResultBuilder::default()
}
}

impl MapEncodable for AddPartitionsToTxnTopicResult {
type Key = super::TopicName;
fn encode<B: ByteBufMut>(&self, key: &Self::Key, buf: &mut B, version: i16) -> Result<(), EncodeError> {
Expand Down Expand Up @@ -208,6 +226,7 @@ impl Message for AddPartitionsToTxnTopicResult {
}

/// Valid versions: 0-3
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, derive_builder::Builder)]
pub struct AddPartitionsToTxnResponse {
/// Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
Expand All @@ -224,6 +243,14 @@ pub struct AddPartitionsToTxnResponse {
pub unknown_tagged_fields: BTreeMap<i32, Vec<u8>>,
}

impl Builder for AddPartitionsToTxnResponse {
type Builder = AddPartitionsToTxnResponseBuilder;

fn builder() -> Self::Builder{
AddPartitionsToTxnResponseBuilder::default()
}
}

impl Encodable for AddPartitionsToTxnResponse {
fn encode<B: ByteBufMut>(&self, buf: &mut B, version: i16) -> Result<(), EncodeError> {
types::Int32.encode(buf, &self.throttle_time_ms)?;
Expand Down
Loading

0 comments on commit 30425e3

Please sign in to comment.