Skip to content

Commit

Permalink
Merge pull request #12716 from rabbitmq/mergify/bp/v4.0.x/pr-12712
Browse files Browse the repository at this point in the history
QQ: reduce memory use when dropping many messages at once. (backport #12712)
  • Loading branch information
michaelklishin authored Nov 13, 2024
2 parents db25654 + 0ef15b0 commit eeaa668
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
21 changes: 20 additions & 1 deletion deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1596,11 +1596,30 @@ drop_head(#?STATE{ra_indexes = Indexes0} = State0, Effects) ->
#?STATE{cfg = #cfg{dead_letter_handler = DLH},
dlx = DlxState} = State = State3,
{_, DlxEffects} = rabbit_fifo_dlx:discard([Msg], maxlen, DLH, DlxState),
{State, DlxEffects ++ Effects};
{State, combine_effects(DlxEffects, Effects)};
empty ->
{State0, Effects}
end.

%% combine global counter update effects to avoid bulding a huge list of
%% effects if many messages are dropped at the same time as could happen
%% when the `max_length' is changed via a configuration update.
combine_effects([{mod_call,
rabbit_global_counters,
messages_dead_lettered,
[Reason, rabbit_quorum_queue, Type, NewLen]}],
[{mod_call,
rabbit_global_counters,
messages_dead_lettered,
[Reason, rabbit_quorum_queue, Type, PrevLen]} | Rem]) ->
[{mod_call,
rabbit_global_counters,
messages_dead_lettered,
[Reason, rabbit_quorum_queue, Type, PrevLen + NewLen]} | Rem];
combine_effects(New, Old) ->
New ++ Old.


maybe_set_msg_ttl(Msg, RaCmdTs, Header,
#?STATE{cfg = #cfg{msg_ttl = MsgTTL}}) ->
case mc:is(Msg) of
Expand Down
28 changes: 28 additions & 0 deletions deps/rabbit/test/rabbit_fifo_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2185,6 +2185,34 @@ update_config_delivery_limit_test(Config) ->

ok.

update_config_max_length_test(Config) ->
QName = rabbit_misc:r("/", queue, ?FUNCTION_NAME_B),
InitConf = #{name => ?FUNCTION_NAME,
queue_resource => QName,
delivery_limit => 20
},
State0 = init(InitConf),
?assertMatch(#{config := #{delivery_limit := 20}},
rabbit_fifo:overview(State0)),

State1 = lists:foldl(fun (Num, FS0) ->
{FS, _} = enq(Config, Num, Num, Num, FS0),
FS
end, State0, lists:seq(1, 100)),
Conf = #{name => ?FUNCTION_NAME,
queue_resource => QName,
max_length => 2,
dead_letter_handler => undefined},
%% assert only one global counter effect is generated rather than 1 per
%% dropped message
{State, ok, Effects} = apply(meta(Config, ?LINE),
rabbit_fifo:make_update_config(Conf), State1),
?assertMatch([{mod_call, rabbit_global_counters, messages_dead_lettered,
[maxlen, rabbit_quorum_queue,disabled, 98]}], Effects),
?assertMatch(#{config := #{max_length := 2},
num_ready_messages := 2}, rabbit_fifo:overview(State)),
ok.

purge_nodes_test(Config) ->
Node = purged@node,
ThisNode = node(),
Expand Down

0 comments on commit eeaa668

Please sign in to comment.