Skip to content

Commit

Permalink
Merge pull request #94 from zmstone/feat-add-zstd-compression
Browse files Browse the repository at this point in the history
feat: add zstd compression support
  • Loading branch information
zmstone authored Jan 20, 2025
2 parents ae7df49 + a8df8c4 commit de2d276
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 23 deletions.
22 changes: 14 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@ This library provides:

See [brod](https://github.com/kafka4beam/brod) for a complete kafka client implementation.

## Compression Support
## Compression

Since 4.0, this lib no longer includes [snappyer](https://github.com/kafka4beam/snappyer) and
[lz4b](https://github.com/kafka4beam/lz4b) as rebar dependencies.
However `kafka_protocol` still defaults to use `snappyer` and `lz4b_frame` for compress and
decompress.
Since version 4.0, `kafka_protocol` no longer includes compression libraries as dependencies.
You must add your desired dependencies to the wrapping project's rebar or mix config.

### Provide compression module overrides
| Compression Algorithm | Default Library |
|-----------------------|----------------------------------------------------|
| Snappy | [snappyer](https://github.com/kafka4beam/snappyer) |
| Lz4 | [lz4b](https://github.com/kafka4beam/lz4b) |
| Zstd | [zstd](https://github.com/silviucpp/ezstd) |

### Override default compression dependencies

User may override default compression libs with modules having below APIs implemented:

Expand All @@ -31,12 +35,14 @@ There are two approaches to inject such dynamic dependencies to `kakfa_protocol`

#### Set application environment

e.g. Set `{provide_compression, [{snappy, my_snappy_module}, {lz4, my_lz4_module}]}`
e.g. Set `{provide_compression, [{snappy, my_snappy_module}, {lz4, my_lz4_module}, {zstd, my_zstd_module}]}`
in `kafka_protocol` application environment, (or provide from sys.config).

Starting from 4.2, the compression modules are cached in `persistent_term`, which can be overridden by calling `kpro:provide_compression`.

#### Call `kpro:provide_compression`

e.g. `kpro:provide_compression([{snappy, my_snappy_module}, {lz4, my_lz4_module}]).`
e.g. `kpro:provide_compression([{snappy, my_snappy_module}, {lz4, my_lz4_module}, {zstd, my_zstd_module}]).`

## Test (`make eunit`)

Expand Down
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
* 4.2.0
- Add support for `zstd` compression codec

* 4.1.10
- Resolve timeout value for discover and connect
- partition leader
Expand Down
6 changes: 6 additions & 0 deletions include/kpro_private.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
-define(gzip, gzip).
-define(snappy, snappy).
-define(lz4, lz4).
-define(zstd, zstd).

%% Compression attributes
-define(KPRO_COMPRESS_NONE, 0).
-define(KPRO_COMPRESS_GZIP, 1).
-define(KPRO_COMPRESS_SNAPPY, 2).
-define(KPRO_COMPRESS_LZ4, 3).
-define(KPRO_COMPRESS_ZSTD, 4).

-define(KPRO_COMPRESSION_MASK, 2#111).
-define(KPRO_IS_GZIP_ATTR(ATTR),
Expand All @@ -38,6 +40,9 @@
((?KPRO_COMPRESSION_MASK band ATTR) =:= ?KPRO_COMPRESS_SNAPPY)).
-define(KPRO_IS_LZ4_ATTR(ATTR),
((?KPRO_COMPRESSION_MASK band ATTR) =:= ?KPRO_COMPRESS_LZ4)).
-define(KPRO_IS_ZSTD_ATTR(ATTR),
((?KPRO_COMPRESSION_MASK band ATTR) =:= ?KPRO_COMPRESS_ZSTD)).


-define(KPRO_TS_TYPE_CREATE, 0).
-define(KPRO_TS_TYPE_APPEND, 2#1000).
Expand Down Expand Up @@ -95,6 +100,7 @@
-define(KAFKA_0_11, 11).
-define(KAFKA_1_0, 100).
-define(KAFKA_1_1, 110).
-define(KAFKA_2_1, 210).

-ifdef(OTP_RELEASE).
-define(BIND_STACKTRACE(Var), :Var).
Expand Down
3 changes: 2 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
[ { test,
[ {deps,
[ {snappyer, "1.2.9"},
{lz4b, "0.0.11"}
{lz4b, "0.0.11"},
{ezstd, "1.1.0"}
]}
]
}
Expand Down
6 changes: 3 additions & 3 deletions src/kpro.erl
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,9 @@

%%%_* APIs =====================================================================

%% @doc Set snappy or lz4 compression modules.
%% This should override the default usage of `snappyer' and `lz4b_frame'.
-spec provide_compression([{?snappy | ?lz4, module()}]) -> ok.
%% @doc Set `snappy', `lz4' or `zstd' compression modules.
%% The module must implement `compress/1' and and `decompress/1'.
-spec provide_compression([{?snappy | ?lz4 | ?zstd, module()}]) -> ok.
provide_compression(Modules) -> kpro_compress:provide(Modules).

%% Get batch magic version from produce API version.
Expand Down
22 changes: 14 additions & 8 deletions src/kpro_compress.erl
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
%%% Copyright (c) 2018-2021, Klarna Bank AB (publ)
%%% Copyright (c) 2022-2025, Kafka4beam contributors.
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
Expand All @@ -24,9 +25,9 @@

-include("kpro_private.hrl").

%% @doc Set snappy or lz4 compression modules.
%% This should override the default usage of `snappyer' and `lz4b_frame'.
-spec provide([{snappy | lz4, module()}]) -> ok.
%% @doc Set snappy, lz4 or zstd compression modules.
%% This should override the default usage of `snappyer', `lz4b_frame' and `ezstd'.
-spec provide([{snappy | lz4 | zstd, module()}]) -> ok.
provide(Libs) ->
lists:foreach(fun({Name, Module}) ->
persistent_term:put({?MODULE, Name}, Module)
Expand All @@ -37,12 +38,14 @@ provide(Libs) ->
codec_to_method(A) when ?KPRO_IS_GZIP_ATTR(A) -> ?gzip;
codec_to_method(A) when ?KPRO_IS_SNAPPY_ATTR(A) -> ?snappy;
codec_to_method(A) when ?KPRO_IS_LZ4_ATTR(A) -> ?lz4;
codec_to_method(A) when ?KPRO_IS_ZSTD_ATTR(A) -> ?zstd;
codec_to_method(_) -> ?no_compression.

%% @doc Translate compression method to bits for kafka batch attributes.
method_to_codec(?gzip) -> ?KPRO_COMPRESS_GZIP;
method_to_codec(?snappy) -> ?KPRO_COMPRESS_SNAPPY;
method_to_codec(?lz4) -> ?KPRO_COMPRESS_LZ4;
method_to_codec(?zstd) -> ?KPRO_COMPRESS_ZSTD;
method_to_codec(?no_compression) -> ?KPRO_COMPRESS_NONE.

%% @doc Compress encoded batch.
Expand All @@ -56,7 +59,8 @@ compress(Name, IoData) -> do_compress(Name, IoData).
decompress(?no_compression, Bin) -> Bin;
decompress(?gzip, Bin) -> zlib:gunzip(Bin);
decompress(?snappy, Bin) -> java_snappy_unpack(Bin);
decompress(?lz4, Bin) -> do_decompress(?lz4, Bin).
decompress(?lz4, Bin) -> do_decompress(?lz4, Bin);
decompress(?zstd, Bin) -> do_decompress(?zstd, Bin).

%%%_* Internals ================================================================

Expand Down Expand Up @@ -90,10 +94,12 @@ do_decompress(Name, Bin) ->
Module = get_module(Name),
iodata(Module:decompress(Bin)).

get_module(snappy) ->
get_module(snappy, snappyer);
get_module(lz4) ->
get_module(lz4, lz4b_frame).
get_module(?snappy) ->
get_module(?snappy, snappyer);
get_module(?lz4) ->
get_module(?lz4, lz4b_frame);
get_module(?zstd) ->
get_module(?zstd, ezstd).

get_module(Name, Default) ->
persistent_term:get({?MODULE, Name}, Default).
Expand Down
2 changes: 1 addition & 1 deletion test/kpro_batch_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ encode_decode_test_() ->
?assertMatch(<<"v">>, Value)
end,
MagicVersions = [0, 1, 2],
CompressionOpts = [no_compression, gzip, snappy, lz4],
CompressionOpts = [no_compression, gzip, snappy, lz4, zstd],
[{atom_to_list(CompressionOpt), " magic v" ++ integer_to_list(MagicV),
fun() -> F(MagicV, CompressionOpt) end} ||
CompressionOpt <- CompressionOpts,
Expand Down
13 changes: 11 additions & 2 deletions test/kpro_produce_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,29 @@ non_monotoic_ts_in_batch_test() ->
end.

%% batches can be encoded by caller before making a produce request
encode_batch_beforehand_test() ->
encode_batch_beforehand(Compression) ->
{_, Vsn} = get_api_vsn_range(),
Batch = [#{ts => kpro_lib:now_ts(),
value => make_value(?LINE),
headers => []}],
Magic = kpro_lib:produce_api_vsn_to_magic_vsn(Vsn),
Bin = kpro:encode_batch(Magic, Batch, no_compression),
Bin = kpro:encode_batch(Magic, Batch, Compression),
Req = kpro_req_lib:produce(Vsn, topic(), ?PARTI, Bin),
with_connection(
fun(Pid) ->
{ok, Rsp} = kpro:request_sync(Pid, Req, ?TIMEOUT),
?ASSERT_RESPONSE_NO_ERROR(Vsn, Rsp)
end).

encode_batch_beforehand_test_() ->
Methods0 = [?no_compression, ?gzip, ?snappy],
Methods = case kpro_test_lib:get_kafka_version() >= ?KAFKA_2_1 of
true -> Methods0 ++ [?zstd];
false -> Methods0
end,
[{atom_to_list(Method), fun() -> encode_batch_beforehand(Method) end}
|| Method <- Methods].

%% async send test
async_send_test() ->
{_, Vsn} = get_api_vsn_range(),
Expand Down

0 comments on commit de2d276

Please sign in to comment.