Skip to content

Commit

Permalink
Merge pull request #54 from emqx/metadat
Browse files Browse the repository at this point in the history
Metadata
  • Loading branch information
ieQu1 authored Jun 30, 2023
2 parents 2e527eb + 202f021 commit 92a7a7a
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 28 deletions.
5 changes: 5 additions & 0 deletions doc/src/schema.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
:!sectids:
= Documentation

[id=cluster.node_name]
== Node name

Note: erlang distribution is disabled when node name is `undefined`.

[id=restapi.enabled]
== Enable REST API
`+--restapi+` CLI argument enables REST API (by default it's available at http://127.0.0.0:8017), and it also means that the script keeps running after completing the scenarios.
Expand Down
40 changes: 25 additions & 15 deletions src/behaviors/emqttb_behavior_sub.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,7 @@
%%================================================================================

-define(CNT_SUB_MESSAGES(GRP), {emqttb_received_messages, GRP}).
-define(CNT_SUB_LATENCY(GRP), {emqttb_e2e_latency, GRP}).
-define(AVG_SUB_TIME, subscribe).

%%================================================================================
Expand All @@ -35,25 +36,24 @@

init_per_group(Group,
#{ topic := Topic
, qos := QoS
, qos := _QoS
} = Opts) when is_binary(Topic) ->
SubCnt = emqttb_metrics:new_counter(?CNT_SUB_MESSAGES(Group),
[ {help, <<"Number of received messages">>}
, {labels, [group]}
]),
LatCnt = emqttb_metrics:new_rolling_average(?CNT_SUB_LATENCY(Group),
[ {help, <<"End-to-end latency">>}
, {labels, [group]}
]),
emqttb_worker:new_opstat(Group, ?AVG_SUB_TIME),
Expiry = maps:get(expiry, Opts, 0),
CleanStart = maps:get(clean_start, Opts, true),
HostShift = maps:get(host_shift, Opts, 0),
HostSelection = maps:get(host_selection, Opts, random),
#{ topic => Topic
, sub_counter => SubCnt
, qos => QoS
, expiry => Expiry
, clean_start => CleanStart
, host_shift => HostShift
, host_selection => HostSelection
}.
Defaults = #{ expiry => 0
, clean_start => true
, host_shift => 0
, host_selection => random
, parse_metadata => false
},
maps:merge(Defaults, Opts #{sub_counter => SubCnt, latency_counter => LatCnt}).

init(SubOpts0 = #{topic := T, qos := QoS, expiry := Expiry, clean_start := CleanStart}) ->
SubOpts = maps:with([host_shift, host_selection], SubOpts0),
Expand All @@ -65,9 +65,19 @@ init(SubOpts0 = #{topic := T, qos := QoS, expiry := Expiry, clean_start := Clean
emqttb_worker:call_with_counter(?AVG_SUB_TIME, emqtt, subscribe, [Conn, emqttb_worker:format_topic(T), QoS]),
Conn.

handle_message(#{sub_counter := Cnt}, Conn, {publish, #{client_pid := Pid}}) when
handle_message(#{sub_counter := Cnt, parse_metadata := ParseMetadata},
Conn,
{publish, #{client_pid := Pid, payload := Payload}}) when
Pid =:= Conn ->
emqttb_metrics:counter_inc(Cnt, 1),
case ParseMetadata of
true ->
{_Id, _SeqNo, TS} = emqttb_behavior_pub:parse_metadata(Payload),
Dt = os:system_time(microsecond) - TS,
emqttb_metrics:rolling_average_observe(?CNT_SUB_LATENCY(emqttb_worker:my_group()), Dt);
false ->
ok
end,
{ok, Conn};
handle_message(_, Conn, _) ->
{ok, Conn}.
Expand Down
2 changes: 1 addition & 1 deletion src/conf/emqttb_conf.erl
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ reload() ->
end.

patch(Patch) ->
logger:notice("Patching configuration: ~p", [Patch]),
logger:debug("Patching configuration: ~p", [Patch]),
case lee:patch(?MYMODEL, ?MYCONF, Patch) of
{ok, _, _} -> true;
_ -> false
Expand Down
7 changes: 3 additions & 4 deletions src/conf/emqttb_conf_model.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,10 @@ model() ->
, prog_name => "emqttb"
}}
, cluster =>
#{ enabled =>
#{ node_name =>
{[value, os_env],
#{ oneliner => "Enable clustering"
, type => boolean()
, default => true
#{ type => atom()
, default => undefined
}}
}
, interval =>
Expand Down
18 changes: 10 additions & 8 deletions src/framework/emqttb_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ post_init() ->
, {emqttb_pushgw, start_link, []}
),
emqttb_logger:setup(),
?CFG([cluster, enabled]) andalso
start_distr(),
maybe_start_distr(),
ok.

start_distr() ->
os:cmd("epmd -daemon"),
Opts = #{dist_listen => true, name_domain => shortnames},
Name = list_to_atom("emqttb-" ++ [$A + rand:uniform($Z-$A) - 1 || _ <- lists:seq(1, 5)]),
net_kernel:start(Name, Opts),
logger:notice("Started distribution with name: ~p", [node()]).
maybe_start_distr() ->
case ?CFG([cluster, node_name]) of
undefined ->
ok;
Name ->
os:cmd("epmd -daemon"),
Opts = #{dist_listen => true, name_domain => shortnames},
net_kernel:start(Name, Opts)
end.
1 change: 1 addition & 0 deletions src/scenarios/emqttb_scenario_pubsub_fwd.erl
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ subscribe_stage() ->
, clean_start => true
, host_shift => 0
, host_selection => HostSelection
, parse_metadata => true
},
emqttb_group:ensure(#{ id => ?SUB_GROUP
, client_config => my_conf([group])
Expand Down
8 changes: 8 additions & 0 deletions src/scenarios/emqttb_scenario_sub.erl
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,20 @@ model() ->
, cli_operand => "qos"
, cli_short => $q
}}
, parse_metadata =>
{[value, cli_param],
#{ oneliner => "Extract metadata from message payloads"
, type => boolean()
, default => false
, cli_operand => "parse-metadata"
}}
}.

run() ->
SubOpts = #{ topic => my_conf([topic])
, qos => my_conf([qos])
, expiry => my_conf([expiry])
, parse_metadata => my_conf([parse_metadata])
},
emqttb_group:ensure(#{ id => ?GROUP
, client_config => my_conf([group])
Expand Down

0 comments on commit 92a7a7a

Please sign in to comment.