From 202f021d3373de8d562806bf8fb0f11fbf402e69 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Fri, 30 Jun 2023 13:44:52 +0200 Subject: [PATCH] feat: Parse message metadata and report E2E latency --- doc/src/schema.adoc | 5 +++ src/behaviors/emqttb_behavior_sub.erl | 40 ++++++++++++-------- src/conf/emqttb_conf.erl | 2 +- src/conf/emqttb_conf_model.erl | 3 +- src/scenarios/emqttb_scenario_pubsub_fwd.erl | 1 + src/scenarios/emqttb_scenario_sub.erl | 8 ++++ 6 files changed, 41 insertions(+), 18 deletions(-) diff --git a/doc/src/schema.adoc b/doc/src/schema.adoc index e04eb91..88f4ef3 100644 --- a/doc/src/schema.adoc +++ b/doc/src/schema.adoc @@ -1,6 +1,11 @@ :!sectids: = Documentation +[id=cluster.node_name] +== Node name + +Note: erlang distribution is disabled when node name is `undefined`. + [id=restapi.enabled] == Enable REST API `+--restapi+` CLI argument enables REST API (by default it's available at http://127.0.0.0:8017), and it also means that the script keeps running after completing the scenarios. diff --git a/src/behaviors/emqttb_behavior_sub.erl b/src/behaviors/emqttb_behavior_sub.erl index 738c9c4..1a48ab0 100644 --- a/src/behaviors/emqttb_behavior_sub.erl +++ b/src/behaviors/emqttb_behavior_sub.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ %%================================================================================ -define(CNT_SUB_MESSAGES(GRP), {emqttb_received_messages, GRP}). +-define(CNT_SUB_LATENCY(GRP), {emqttb_e2e_latency, GRP}). -define(AVG_SUB_TIME, subscribe). %%================================================================================ @@ -35,25 +36,24 @@ init_per_group(Group, #{ topic := Topic - , qos := QoS + , qos := _QoS } = Opts) when is_binary(Topic) -> SubCnt = emqttb_metrics:new_counter(?CNT_SUB_MESSAGES(Group), [ {help, <<"Number of received messages">>} , {labels, [group]} ]), + LatCnt = emqttb_metrics:new_rolling_average(?CNT_SUB_LATENCY(Group), + [ {help, <<"End-to-end latency">>} + , {labels, [group]} + ]), emqttb_worker:new_opstat(Group, ?AVG_SUB_TIME), - Expiry = maps:get(expiry, Opts, 0), - CleanStart = maps:get(clean_start, Opts, true), - HostShift = maps:get(host_shift, Opts, 0), - HostSelection = maps:get(host_selection, Opts, random), - #{ topic => Topic - , sub_counter => SubCnt - , qos => QoS - , expiry => Expiry - , clean_start => CleanStart - , host_shift => HostShift - , host_selection => HostSelection - }. + Defaults = #{ expiry => 0 + , clean_start => true + , host_shift => 0 + , host_selection => random + , parse_metadata => false + }, + maps:merge(Defaults, Opts #{sub_counter => SubCnt, latency_counter => LatCnt}). init(SubOpts0 = #{topic := T, qos := QoS, expiry := Expiry, clean_start := CleanStart}) -> SubOpts = maps:with([host_shift, host_selection], SubOpts0), @@ -65,9 +65,19 @@ init(SubOpts0 = #{topic := T, qos := QoS, expiry := Expiry, clean_start := Clean emqttb_worker:call_with_counter(?AVG_SUB_TIME, emqtt, subscribe, [Conn, emqttb_worker:format_topic(T), QoS]), Conn. -handle_message(#{sub_counter := Cnt}, Conn, {publish, #{client_pid := Pid}}) when +handle_message(#{sub_counter := Cnt, parse_metadata := ParseMetadata}, + Conn, + {publish, #{client_pid := Pid, payload := Payload}}) when Pid =:= Conn -> emqttb_metrics:counter_inc(Cnt, 1), + case ParseMetadata of + true -> + {_Id, _SeqNo, TS} = emqttb_behavior_pub:parse_metadata(Payload), + Dt = os:system_time(microsecond) - TS, + emqttb_metrics:rolling_average_observe(?CNT_SUB_LATENCY(emqttb_worker:my_group()), Dt); + false -> + ok + end, {ok, Conn}; handle_message(_, Conn, _) -> {ok, Conn}. diff --git a/src/conf/emqttb_conf.erl b/src/conf/emqttb_conf.erl index d4e2495..4fd0d72 100644 --- a/src/conf/emqttb_conf.erl +++ b/src/conf/emqttb_conf.erl @@ -68,7 +68,7 @@ reload() -> end. patch(Patch) -> - logger:notice("Patching configuration: ~p", [Patch]), + logger:debug("Patching configuration: ~p", [Patch]), case lee:patch(?MYMODEL, ?MYCONF, Patch) of {ok, _, _} -> true; _ -> false diff --git a/src/conf/emqttb_conf_model.erl b/src/conf/emqttb_conf_model.erl index f598ea7..a84203d 100644 --- a/src/conf/emqttb_conf_model.erl +++ b/src/conf/emqttb_conf_model.erl @@ -43,8 +43,7 @@ model() -> , cluster => #{ node_name => {[value, os_env], - #{ oneliner => "Enable clustering" - , type => atom() + #{ type => atom() , default => undefined }} } diff --git a/src/scenarios/emqttb_scenario_pubsub_fwd.erl b/src/scenarios/emqttb_scenario_pubsub_fwd.erl index 130c494..f2b36de 100644 --- a/src/scenarios/emqttb_scenario_pubsub_fwd.erl +++ b/src/scenarios/emqttb_scenario_pubsub_fwd.erl @@ -172,6 +172,7 @@ subscribe_stage() -> , clean_start => true , host_shift => 0 , host_selection => HostSelection + , parse_metadata => true }, emqttb_group:ensure(#{ id => ?SUB_GROUP , client_config => my_conf([group]) diff --git a/src/scenarios/emqttb_scenario_sub.erl b/src/scenarios/emqttb_scenario_sub.erl index 19be16a..288ff65 100644 --- a/src/scenarios/emqttb_scenario_sub.erl +++ b/src/scenarios/emqttb_scenario_sub.erl @@ -91,12 +91,20 @@ model() -> , cli_operand => "qos" , cli_short => $q }} + , parse_metadata => + {[value, cli_param], + #{ oneliner => "Extract metadata from message payloads" + , type => boolean() + , default => false + , cli_operand => "parse-metadata" + }} }. run() -> SubOpts = #{ topic => my_conf([topic]) , qos => my_conf([qos]) , expiry => my_conf([expiry]) + , parse_metadata => my_conf([parse_metadata]) }, emqttb_group:ensure(#{ id => ?GROUP , client_config => my_conf([group])