diff --git a/apps/emqx_audit/src/emqx_audit.app.src b/apps/emqx_audit/src/emqx_audit.app.src index f741177892c..b771b1494ad 100644 --- a/apps/emqx_audit/src/emqx_audit.app.src +++ b/apps/emqx_audit/src/emqx_audit.app.src @@ -1,6 +1,6 @@ {application, emqx_audit, [ {description, "Audit log for EMQX"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {mod, {emqx_audit_app, []}}, {applications, [kernel, stdlib, emqx]}, diff --git a/apps/emqx_audit/src/emqx_audit.erl b/apps/emqx_audit/src/emqx_audit.erl index 6fad95d0f32..5183737a7b3 100644 --- a/apps/emqx_audit/src/emqx_audit.erl +++ b/apps/emqx_audit/src/emqx_audit.erl @@ -29,6 +29,8 @@ -define(FILTER_REQ, [cert, host_info, has_sent_resp, pid, path_info, peer, ref, sock, streamid]). +-define(CHARS_LIMIT_IN_DB, 1024). + -ifdef(TEST). -define(INTERVAL, 100). -else. @@ -38,8 +40,8 @@ to_audit(#{from := cli, cmd := Cmd, args := Args, duration_ms := DurationMs}) -> #?AUDIT{ operation_id = <<"">>, - operation_type = atom_to_binary(Cmd), - args = Args, + operation_type = truncate_large_term(Cmd), + args = truncate_large_term(Args), operation_result = <<"">>, failure = <<"">>, duration_ms = DurationMs, @@ -65,7 +67,7 @@ to_audit(#{from := erlang_console, function := F, args := Args}) -> http_method = <<"">>, http_request = <<"">>, duration_ms = 0, - args = iolist_to_binary(io_lib:format("~p: ~ts", [F, Args])) + args = truncate_large_term({F, Args}) }; to_audit(#{from := From} = Log) when is_atom(From) -> #{ @@ -93,7 +95,7 @@ to_audit(#{from := From} = Log) when is_atom(From) -> %% request detail http_status_code = StatusCode, http_method = Method, - http_request = Request, + http_request = truncate_large_term(Request), duration_ms = DurationMs, args = <<"">> }. @@ -243,3 +245,6 @@ log_to_file(Level, Meta, #{module := Module} = Handler) -> ) end end. + +truncate_large_term(Req) -> + unicode:characters_to_binary(io_lib:format("~0p", [Req], [{chars_limit, ?CHARS_LIMIT_IN_DB}])). diff --git a/apps/emqx_audit/test/emqx_audit_api_SUITE.erl b/apps/emqx_audit/test/emqx_audit_api_SUITE.erl index 2597a892301..a1b22f12d33 100644 --- a/apps/emqx_audit/test/emqx_audit_api_SUITE.erl +++ b/apps/emqx_audit/test/emqx_audit_api_SUITE.erl @@ -94,12 +94,7 @@ t_http_api(_) -> <<"operation_id">> := <<"/configs/global_zone">>, <<"source_ip">> := <<"127.0.0.1">>, <<"source">> := _, - <<"http_request">> := #{ - <<"method">> := <<"put">>, - <<"body">> := #{<<"mqtt">> := #{<<"max_qos_allowed">> := 1}}, - <<"bindings">> := _, - <<"headers">> := #{<<"authorization">> := <<"******">>} - }, + <<"http_request">> := _, <<"http_status_code">> := 200, <<"operation_result">> := <<"success">>, <<"operation_type">> := <<"configs">> @@ -166,7 +161,7 @@ t_cli(_Config) -> <<"operation_id">> := <<"">>, <<"source_ip">> := <<"">>, <<"operation_type">> := <<"conf">>, - <<"args">> := [<<"show">>, <<"log">>], + <<"args">> := <<"[<<\"show\">>,<<\"log\">>]">>, <<"node">> := _, <<"source">> := <<"">>, <<"http_request">> := <<"">> @@ -184,7 +179,7 @@ t_cli(_Config) -> {ok, Res1} = emqx_mgmt_api_test_util:request_api(get, AuditPath, "from=cli", AuthHeader), #{<<"data">> := Data1} = emqx_utils_json:decode(Res1, [return_maps]), ?assertMatch( - [ShowLogEntry, #{<<"operation_type">> := <<"emqx">>, <<"args">> := [<<"start">>]}], + [ShowLogEntry, #{<<"operation_type">> := <<"emqx">>, <<"args">> := <<"[<<\"start\">>]">>}], Data1 ), {ok, Res2} = emqx_mgmt_api_test_util:request_api( diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index e8a4324e343..0392859e060 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -15,6 +15,10 @@ -define(ROOT_KEY_ACTIONS, actions). -define(ROOT_KEY_SOURCES, sources). +-define(tpal(MSG), begin + ct:pal(MSG), + ?tp(notice, MSG, #{}) +end). %% ct setup helpers @@ -946,7 +950,9 @@ t_consume(Config, Opts) -> check_fn := CheckFn, produce_tracepoint := TracePointFn } = Opts, + TestTimeout = maps:get(test_timeout, Opts, 60_000), ?check_trace( + #{timetrap => TestTimeout}, begin ConsumerReadyTimeout = maps:get(consumer_ready_timeout, Opts, 15_000), case ConsumerReadyTPFn of @@ -957,9 +963,12 @@ t_consume(Config, Opts) -> Predicate, _NEvents = 1, ConsumerReadyTimeout ) end, + ?tpal("creating connector and source"), ?assertMatch({ok, _}, create_bridge_api(Config)), ?assertMatch({ok, _}, snabbkaffe:receive_events(SRef0)), + ?tpal("adding hookpoint"), ok = add_source_hookpoint(Config), + ?tpal("waiting until connected"), ?retry( _Sleep = 200, _Attempts = 20, @@ -968,14 +977,16 @@ t_consume(Config, Opts) -> health_check_channel(Config) ) ), + ?tpal("producing message and waiting for it to be consumed"), ?assertMatch( {_, {ok, _}}, snabbkaffe:wait_async_action( ProduceFn, TracePointFn, - 15_000 + infinity ) ), + ?tpal("waiting for consumed message"), receive {consumed_message, Message} -> CheckFn(Message) diff --git a/apps/emqx_bridge_azure_event_hub/rebar.config b/apps/emqx_bridge_azure_event_hub/rebar.config index 55b13fbc68c..3013c0116ef 100644 --- a/apps/emqx_bridge_azure_event_hub/rebar.config +++ b/apps/emqx_bridge_azure_event_hub/rebar.config @@ -3,10 +3,10 @@ {erl_opts, [debug_info]}. {deps, [ {wolff, "4.0.3"}, - {kafka_protocol, "4.1.8"}, + {kafka_protocol, "4.1.9"}, {brod_gssapi, "0.1.3"}, - {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, - {snappyer, "1.2.9"}, + {brod, "4.3.1"}, + {snappyer, "1.2.10"}, {emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_bridge, {path, "../../apps/emqx_bridge"}} diff --git a/apps/emqx_bridge_confluent/rebar.config b/apps/emqx_bridge_confluent/rebar.config index 4954c0fcdab..7148a68943a 100644 --- a/apps/emqx_bridge_confluent/rebar.config +++ b/apps/emqx_bridge_confluent/rebar.config @@ -3,10 +3,10 @@ {erl_opts, [debug_info]}. {deps, [ {wolff, "4.0.3"}, - {kafka_protocol, "4.1.8"}, + {kafka_protocol, "4.1.9"}, {brod_gssapi, "0.1.3"}, - {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, - {snappyer, "1.2.9"}, + {brod, "4.3.1"}, + {snappyer, "1.2.10"}, {emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_bridge, {path, "../../apps/emqx_bridge"}} diff --git a/apps/emqx_bridge_datalayers/test/emqx_bridge_datalayers_SUITE.erl b/apps/emqx_bridge_datalayers/test/emqx_bridge_datalayers_SUITE.erl index 56ea76aa1a4..8e9bb5ad730 100644 --- a/apps/emqx_bridge_datalayers/test/emqx_bridge_datalayers_SUITE.erl +++ b/apps/emqx_bridge_datalayers/test/emqx_bridge_datalayers_SUITE.erl @@ -874,7 +874,9 @@ t_bad_timestamp(Config) -> [ #{ error := [ - {error, {bad_timestamp, <<"bad_timestamp">>}} + {error, + {bad_timestamp, + {non_integer_timestamp, <<"bad_timestamp">>}}} ] } ], @@ -883,7 +885,7 @@ t_bad_timestamp(Config) -> {sync, false} -> ?assertEqual( {error, [ - {error, {bad_timestamp, <<"bad_timestamp">>}} + {error, {bad_timestamp, {non_integer_timestamp, <<"bad_timestamp">>}}} ]}, Return ); diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index 6eef1a86f52..e889f386b77 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -714,8 +714,13 @@ parse_timestamp([TsBin]) -> {ok, binary_to_integer(TsBin)} catch _:_ -> - {error, TsBin} - end. + {error, {non_integer_timestamp, TsBin}} + end; +parse_timestamp(InvalidTs) -> + %% The timestamp field must be a single integer or a single placeholder. i.e. the + %% following is not allowed: + %% - weather,location=us-midwest,season=summer temperature=82 ${timestamp}00 + {error, {unsupported_placeholder_usage_for_timestamp, InvalidTs}}. continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc) -> case line_to_point(Data, Item) of diff --git a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl index 730622be50b..0aed64575b4 100644 --- a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl +++ b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl @@ -915,6 +915,10 @@ t_tag_set_use_literal_value(Config) -> ?assertEqual(TsStr, TimeReturned). t_bad_timestamp(Config) -> + test_bad_timestamp(Config, <<"bad_timestamp">>, non_integer_timestamp), + test_bad_timestamp(Config, <<"${timestamp}000">>, unsupported_placeholder_usage_for_timestamp). + +test_bad_timestamp(Config, Timestamp, ErrTag) -> InfluxDBType = ?config(influxdb_type, Config), InfluxDBName = ?config(influxdb_name, Config), QueryMode = ?config(query_mode, Config), @@ -929,7 +933,7 @@ t_bad_timestamp(Config) -> %% N.B.: this single space characters are relevant <<"${topic}", " ", "payload=${payload},", "${clientid}_int_value=${payload.int_key}i,", "uint_value=${payload.uint_key}u," - "bool=${payload.bool}", " ", "bad_timestamp">>, + "bool=${payload.bool}", " ", Timestamp/binary>>, %% append this to override the config InfluxDBConfigString1 = io_lib:format( @@ -983,16 +987,16 @@ t_bad_timestamp(Config) -> [ #{ error := [ - {error, {bad_timestamp, <<"bad_timestamp">>}} + {error, {bad_timestamp, {ErrTag, _}}} ] } ], ?of_kind(influxdb_connector_send_query_error, Trace) ); {sync, false} -> - ?assertEqual( + ?assertMatch( {error, [ - {error, {bad_timestamp, <<"bad_timestamp">>}} + {error, {bad_timestamp, {ErrTag, _}}} ]}, Return ); diff --git a/apps/emqx_bridge_kafka/rebar.config b/apps/emqx_bridge_kafka/rebar.config index f15788f369b..7ac4f2f34fe 100644 --- a/apps/emqx_bridge_kafka/rebar.config +++ b/apps/emqx_bridge_kafka/rebar.config @@ -3,10 +3,10 @@ {erl_opts, [debug_info]}. {deps, [ {wolff, "4.0.3"}, - {kafka_protocol, "4.1.8"}, + {kafka_protocol, "4.1.9"}, {brod_gssapi, "0.1.3"}, - {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.18.0"}}}, - {snappyer, "1.2.9"}, + {brod, "4.3.1"}, + {snappyer, "1.2.10"}, {emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_bridge, {path, "../../apps/emqx_bridge"}} diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_action_info.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_action_info.erl index 5ea6451e831..0f05642ddc3 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_action_info.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_action_info.erl @@ -36,8 +36,7 @@ connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> {Params1, V1Config4} = maps:take(<<"parameters">>, V1Config3), TopLevelCfgKeys = [to_bin(K) || {K, _} <- emqx_bridge_kafka:fields(consumer_opts), K =/= kafka], TopLevelCfg = maps:with(TopLevelCfgKeys, Params1), - %% `topic' is v2-only - Params = maps:without([<<"topic">> | TopLevelCfgKeys], Params1), + Params = maps:with(v1_source_parameters(), Params1), V1Config5 = emqx_utils_maps:deep_merge(V1Config4, TopLevelCfg), V1Config = emqx_utils_maps:update_if_present( <<"resource_opts">>, @@ -64,6 +63,14 @@ bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) -> %% Internal helper functions %%------------------------------------------------------------------------------------------ +v1_source_parameters() -> + [ + <<"max_batch_bytes">>, + <<"max_rejoin_attempts">>, + <<"offset_commit_interval_seconds">>, + <<"offset_reset_policy">> + ]. + %% The new schema has a single kafka topic, so we take it from topic mapping when %% converting from v1. maybe_set_kafka_topic(#{<<"topic_mapping">> := [#{<<"kafka_topic">> := Topic} | _]} = Params) -> diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl index 8fe0d635181..e8ba5405fbf 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl @@ -92,6 +92,14 @@ fields(source_parameters) -> required => false, desc => ?DESC(group_id) } + )}, + {max_wait_time, + mk( + emqx_schema:timeout_duration_ms(), + #{ + default => <<"1s">>, + desc => ?DESC("max_wait_time") + } )} | Fields ]; diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index 07c859c7825..e45683e61f0 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -54,6 +54,7 @@ group_id => binary(), key_encoding_mode := encoding_mode(), max_batch_bytes := emqx_schema:bytesize(), + max_wait_time := non_neg_integer(), max_rejoin_attempts := non_neg_integer(), offset_commit_interval_seconds := pos_integer(), offset_reset_policy := offset_reset_policy(), @@ -414,6 +415,7 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID) -> parameters := #{ key_encoding_mode := KeyEncodingMode, max_batch_bytes := MaxBatchBytes, + max_wait_time := MaxWaitTime, max_rejoin_attempts := MaxRejoinAttempts, offset_commit_interval_seconds := OffsetCommitInterval, offset_reset_policy := OffsetResetPolicy0, @@ -445,6 +447,7 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID) -> ConsumerConfig = [ {begin_offset, BeginOffset}, {max_bytes, MaxBatchBytes}, + {max_wait_time, MaxWaitTime}, {offset_reset_policy, OffsetResetPolicy} ], GroupConfig = [ diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index d7408f0e524..bf49eee2f47 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -17,7 +17,6 @@ -define(BRIDGE_TYPE_BIN, <<"kafka_consumer">>). -define(CONNECTOR_TYPE_BIN, <<"kafka_consumer">>). -define(SOURCE_TYPE_BIN, <<"kafka_consumer">>). --define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_bridge_kafka]). %%------------------------------------------------------------------------------ %% CT boilerplate diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl index eeb8a8b8094..7d4518dad31 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl @@ -207,8 +207,9 @@ source_config(Overrides0) -> #{ <<"key_encoding_mode">> => <<"none">>, <<"max_batch_bytes">> => <<"896KB">>, + <<"max_wait_time">> => <<"500ms">>, <<"max_rejoin_attempts">> => <<"5">>, - <<"offset_reset_policy">> => <<"latest">>, + <<"offset_reset_policy">> => <<"earliest">>, <<"topic">> => <<"please override">>, <<"value_encoding_mode">> => <<"none">> }, @@ -217,7 +218,7 @@ source_config(Overrides0) -> <<"resume_interval">> => <<"2s">> } }, - maps:merge(CommonConfig, Overrides). + emqx_utils_maps:deep_merge(CommonConfig, Overrides). %%------------------------------------------------------------------------------ %% Testcases @@ -279,6 +280,7 @@ t_consume(Config) -> ok = emqx_bridge_v2_testlib:t_consume( Config, #{ + test_timeout => timer:seconds(20), consumer_ready_tracepoint => ?match_n_events( NumPartitions, #{?snk_kind := kafka_consumer_subscriber_init} diff --git a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_api.erl b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_api.erl index 7d5318e9855..36ff3043ff4 100644 --- a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_api.erl +++ b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_api.erl @@ -177,13 +177,18 @@ login(post, #{bindings := #{backend := Backend}, body := Body} = Request) -> request => emqx_utils:redact(Request) }), Redirect; - {error, Reason} -> + {error, Reason0} -> + Reason = emqx_utils:redact(Reason0), ?SLOG(info, #{ msg => "dashboard_sso_login_failed", request => emqx_utils:redact(Request), - reason => emqx_utils:redact(Reason) + reason => Reason }), - {401, #{code => ?BAD_USERNAME_OR_PWD, message => <<"Auth failed">>}} + {401, #{ + code => ?BAD_USERNAME_OR_PWD, + message => <<"Auth failed">>, + reason => Reason + }} end end. diff --git a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc.erl b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc.erl index 4d1fa94395e..9494323f0a0 100644 --- a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc.erl +++ b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc.erl @@ -214,25 +214,28 @@ login( }), Data = maps:with([nonce, require_pkce, pkce_verifier], Opts), - State = emqx_dashboard_sso_oidc_session:new(Data), - - case - oidcc:create_redirect_url( - ?PROVIDER_SVR_NAME, - ClientId, - emqx_secret:unwrap(Secret), - Opts#{ - state => State, - client_jwks => ClientJwks, - preferred_auth_methods => AuthMethods - } - ) - of - {ok, [Base, Delimiter, Params]} -> - RedirectUri = <>, - Redirect = {302, ?RESPHEADERS#{<<"location">> => RedirectUri}, ?REDIRECT_BODY}, - {redirect, Redirect}; - {error, _Reason} = Error -> + case emqx_dashboard_sso_oidc_session:new(Data) of + {ok, State} -> + case + oidcc:create_redirect_url( + ?PROVIDER_SVR_NAME, + ClientId, + emqx_secret:unwrap(Secret), + Opts#{ + state => State, + client_jwks => ClientJwks, + preferred_auth_methods => AuthMethods + } + ) + of + {ok, [Base, Delimiter, Params]} -> + RedirectUri = <>, + Redirect = {302, ?RESPHEADERS#{<<"location">> => RedirectUri}, ?REDIRECT_BODY}, + {redirect, Redirect}; + {error, _Reason} = Error -> + Error + end; + Error -> Error end. diff --git a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc_session.erl b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc_session.erl index 6937968a47c..604b7c1483e 100644 --- a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc_session.erl +++ b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc_session.erl @@ -72,16 +72,23 @@ stop() -> ok. new(Data) -> - State = new_state(), - ets:insert( - ?TAB, - #?TAB{ - state = State, - created_at = ?NOW, - data = Data - } - ), - State. + case ets:whereis(?TAB) of + undefined -> + %% The OIDCC may crash for some reason, even if we have some monitor to observe it + %% users also may open an OIDC login before the monitor finds it has crashed + {error, <<"No valid OIDC provider">>}; + _ -> + State = new_state(), + ets:insert( + ?TAB, + #?TAB{ + state = State, + created_at = ?NOW, + data = Data + } + ), + {ok, State} + end. delete(State) -> ets:delete(?TAB, State). diff --git a/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl b/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl index 623345f6ec9..b9e2cfa517f 100644 --- a/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl +++ b/apps/emqx_ds_backends/test/emqx_ds_backends_SUITE.erl @@ -562,6 +562,7 @@ t_drop_generation_with_used_once_iterator(Config) -> message(<<"foo/bar">>, <<"2">>, 1) ], ?assertMatch(ok, emqx_ds:store_batch(DB, Msgs0)), + timer:sleep(1_000), [{_, Stream0}] = emqx_ds:get_streams(DB, TopicFilter, StartTime), {ok, Iter0} = emqx_ds:make_iterator(DB, Stream0, TopicFilter, StartTime), diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 997c9c1f0b6..92ef1ed1614 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -326,28 +326,43 @@ dispatch(Context, Topic) -> -spec delete_message(context(), topic()) -> ok. delete_message(Context, Topic) -> - Mod = backend_module(Context), - BackendState = backend_state(Context), - Mod:delete_message(BackendState, Topic). + with_backend_module( + Context, + fun(Mod, BackendState) -> + Mod:delete_message(BackendState, Topic) + end, + ok + ). -spec read_message(context(), topic()) -> {ok, list(message())}. read_message(Context, Topic) -> - Mod = backend_module(Context), - BackendState = backend_state(Context), - Mod:read_message(BackendState, Topic). + with_backend_module( + Context, + fun(Mod, BackendState) -> Mod:read_message(BackendState, Topic) end, + {ok, []} + ). -spec page_read(context(), emqx_maybe:t(topic()), deadline(), non_neg_integer(), non_neg_integer()) -> {ok, has_next(), list(message())}. page_read(Context, Topic, Deadline, Page, Limit) -> - Mod = backend_module(Context), - BackendState = backend_state(Context), - Mod:page_read(BackendState, Topic, Deadline, Page, Limit). + with_backend_module( + Context, + fun(Mod, BackendState) -> Mod:page_read(BackendState, Topic, Deadline, Page, Limit) end, + {ok, false, []} + ). -spec count(context()) -> non_neg_integer(). count(Context) -> - Mod = backend_module(Context), - BackendState = backend_state(Context), - Mod:size(BackendState). + with_backend_module(Context, fun(Mod, BackendState) -> Mod:size(BackendState) end, 0). + +with_backend_module(Context, Fun, Default) -> + case backend_module(Context) of + undefined -> + Default; + Mod -> + BackendState = backend_state(Context), + Fun(Mod, BackendState) + end. -spec start_clear_expired(context()) -> ok. start_clear_expired(Context) -> @@ -371,16 +386,16 @@ store_retained(Context, #message{topic = Topic, payload = Payload} = Msg) -> limit => Limit }); _ -> - Mod = backend_module(Context), - BackendState = backend_state(Context), - Mod:store_retained(BackendState, Msg) + with_backend_module( + Context, + fun(Mod, BackendState) -> Mod:store_retained(BackendState, Msg) end, + ok + ) end. -spec clean(context()) -> ok. clean(Context) -> - Mod = backend_module(Context), - BackendState = backend_state(Context), - Mod:clean(BackendState). + with_backend_module(Context, fun(Mod, BackendState) -> Mod:clean(BackendState) end, ok). -spec update_config(state(), hocon:config(), hocon:config()) -> state(). update_config(State, NewConfig, OldConfig) -> @@ -512,8 +527,7 @@ create(Cfg) -> -spec close(context()) -> ok | {error, term()}. close(Context) -> - Mod = backend_module(Context), - Mod:close(Context). + with_backend_module(Context, fun(Mod, BackendState) -> Mod:close(BackendState) end, ok). -spec load(context()) -> ok. load(Context) -> diff --git a/bin/emqx b/bin/emqx index ea17d7db576..1b9c4a6dd77 100755 --- a/bin/emqx +++ b/bin/emqx @@ -374,13 +374,6 @@ maybe_use_portable_dynlibs() { BUILD_INFO="$(cat "${REL_DIR}/BUILD_INFO")" COMPATIBILITY_INFO="$(compatiblity_info 2>/dev/null || true)" if ! (echo -e "$COMPATIBILITY_INFO" | $GREP -q 'CRYPTO_OK'); then - if [[ ${RUN_EMQX_UNSAFE:-} != "I_AGREE" ]] ; then - logerr "Please ensure libcrypto, libncurses and libatomic1 are properly installed via Opearating System package manager." - logerr "You may set environment variable RUN_EMQX_UNSAFE=I_AGREE to workaround this but not recommended from security perspective." - logerr "For more information check: https://docs.emqx.com/en/emqx/latest/faq/deployment.html#emqx-failed-to-start-with-log-message-on-load-function-failed-crypto" - die "Unsafe operation detected." - fi - ## failed to start, might be due to missing libs, try to be portable export LD_LIBRARY_PATH="${LD_LIBRARY_PATH:-$DYNLIBS_DIR}" if [ "$LD_LIBRARY_PATH" != "$DYNLIBS_DIR" ]; then diff --git a/changes/ce/breaking-14063.en.md b/changes/ce/breaking-14063.en.md deleted file mode 100644 index d6f99ce3707..00000000000 --- a/changes/ce/breaking-14063.en.md +++ /dev/null @@ -1,5 +0,0 @@ -For security, user now must set envvar `RUN_EMQX_UNSAFE`="I_AGREE" to load dynlibs in the tar release package. - -This only affects the EMQX tar release package. -This does not affect the OS installation packages (rpm, deb). - diff --git a/changes/ce/fix-14090.en.md b/changes/ce/fix-14090.en.md new file mode 100644 index 00000000000..3551cf4a981 --- /dev/null +++ b/changes/ce/fix-14090.en.md @@ -0,0 +1 @@ +Don't crash retainer api when retainer is disabled diff --git a/changes/ee/14094.en.md b/changes/ee/14094.en.md new file mode 100644 index 00000000000..d4ccea93b8f --- /dev/null +++ b/changes/ee/14094.en.md @@ -0,0 +1,2 @@ +Return an error if login with an invalid OIDC provider. + diff --git a/changes/ee/feat-14079.en.md b/changes/ee/feat-14079.en.md new file mode 100644 index 00000000000..90b1852c28f --- /dev/null +++ b/changes/ee/feat-14079.en.md @@ -0,0 +1 @@ +Added the option of setting `max_wait_time` for Kafka Consumer sources. diff --git a/changes/ee/fix-14079.en.md b/changes/ee/fix-14079.en.md new file mode 100644 index 00000000000..5cb24bdf239 --- /dev/null +++ b/changes/ee/fix-14079.en.md @@ -0,0 +1,5 @@ +Fix Kafka consumer latency issue when partitions share the same partition leader in Kafka. + +When fetching from a Kafka partition leader, the request is blocked until the previously sent fetch requests have returned. +This is because Kafka only serves one inflight fetch request at a time, it causes a head-of-line blocking if there happends to be more than one partitions sharing the same parition leader broker. +The fix in this change is to make sure partition consumers create their own TCP connection to the partition leader. diff --git a/changes/ee/fix-14091.en.md b/changes/ee/fix-14091.en.md new file mode 100644 index 00000000000..05af4467fa0 --- /dev/null +++ b/changes/ee/fix-14091.en.md @@ -0,0 +1,11 @@ +A simple fix that removes `function_clause` from the log message if the user has provided an unsupported write syntax: + +``` +weather,location=us-midwest,season=summer temperature=82 ${timestamp}u +``` + +The error log before this fix: + +``` +pid: <0.558392.0>, info: {"stacktrace":["{emqx_bridge_influxdb_connector,parse_timestamp,[[1719350482910000000,<<\"u\">>]],[{file,\"emqx_bridge_influxdb_connector.erl\"},{line,692}]}", ...], ..., "error":"{error,function_clause}"}, tag: ERROR, msg: resource_exception +``` diff --git a/changes/ee/perf-14077.en.md b/changes/ee/perf-14077.en.md new file mode 100644 index 00000000000..b7aa5a353f3 --- /dev/null +++ b/changes/ee/perf-14077.en.md @@ -0,0 +1 @@ +Avoid storing excessively lengthy audit log content with log truncation. diff --git a/mix.exs b/mix.exs index 5777a9d09f9..0017b51f59d 100644 --- a/mix.exs +++ b/mix.exs @@ -160,6 +160,8 @@ defmodule EMQXUmbrella.MixProject do common_dep(:sasl_auth), # avlizer currently uses older :erlavro version common_dep(:erlavro), + # in conflict by erlavro + common_dep(:snappyer), common_dep(:crc32cer) ] end @@ -278,13 +280,13 @@ defmodule EMQXUmbrella.MixProject do def common_dep(:brod_gssapi), do: {:brod_gssapi, "0.1.3"} def common_dep(:kafka_protocol), - do: {:kafka_protocol, "4.1.8", override: true} + do: {:kafka_protocol, "4.1.9", override: true} - def common_dep(:brod), do: {:brod, github: "kafka4beam/brod", tag: "3.18.0"} + def common_dep(:brod), do: {:brod, "4.3.1"} ## TODO: remove `mix.exs` from `wolff` and remove this override ## TODO: remove `mix.exs` from `pulsar` and remove this override - def common_dep(:snappyer), do: {:snappyer, "1.2.9", override: true} - def common_dep(:crc32cer), do: {:crc32cer, "0.1.8", override: true} + def common_dep(:snappyer), do: {:snappyer, "1.2.10", override: true} + def common_dep(:crc32cer), do: {:crc32cer, "0.1.11", override: true} def common_dep(:jesse), do: {:jesse, github: "emqx/jesse", tag: "1.8.1.1"} def common_dep(:erlavro), do: {:erlavro, github: "emqx/erlavro", tag: "2.10.0", override: true} diff --git a/rel/i18n/emqx_bridge_kafka.hocon b/rel/i18n/emqx_bridge_kafka.hocon index 704d9eb29fe..5cce6cd815f 100644 --- a/rel/i18n/emqx_bridge_kafka.hocon +++ b/rel/i18n/emqx_bridge_kafka.hocon @@ -51,7 +51,10 @@ consumer_offset_commit_interval_seconds.label: """Offset Commit Interval""" consumer_max_batch_bytes.desc: -"""Set how many bytes to pull from Kafka in each fetch request. Please note that if the configured value is smaller than the message size in Kafka, it may negatively impact the fetch performance.""" +"""Set how many bytes to pull from Kafka in each fetch request. +Messages are fetched in batches by the consumer, and if the first record batch in the first non-empty +partition of the fetch is larger than this value, the record batch will still be returned to ensure +that the consumer can make progress. As such, this is not an absolute maximum. Set `1` for minimal latency.""" consumer_max_batch_bytes.label: """Fetch Bytes""" diff --git a/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon b/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon index 3ac6a4e2b27..092d9031cab 100644 --- a/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon +++ b/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon @@ -20,4 +20,9 @@ emqx_bridge_kafka_consumer_schema { group_id.label: """Custom Consumer Group Id""" + max_wait_time.desc: + """Maximum amount of time that is waited for the Kafka broker to send a fetch response.""" + max_wait_time.label: + """Max Wait Time""" + }