Skip to content

Commit

Permalink
Fix channel crash when publishing to a new stream (#12969)
Browse files Browse the repository at this point in the history
The following scenario led to a channel crash:
1. Publish to a non-existing stream: `perf-test -y 0 -p -e amq.default -t direct -k stream`
2. Declare the stream: `rabbitmqadmin declare queue name=stream queue_type=stream`

There is no pid yet, so we got a function_clause with `none`
```
{function_clause,
   [{osiris_writer,write,
        [none,<0.877.0>,<<"<0.877.0>_-65ZKFz18ll5lau0phi7CsQ">>,1,
         [[0,"Sp",[192,6,5,"B@@ac"]],
          [0,"Sr",
           [193,38,4,
            [[[163,10,<<"x-exchange">>],[161,0,<<>>]],
             [[163,13,<<"x-routing-key">>],[161,6,<<"stream">>]]]]],
          [0,"Su",[160,12,[<<0,19,252,1,0,0,98,171,20,16,108,167>>]]]]],
        [{file,"src/osiris_writer.erl"},{line,158}]},
    {rabbit_stream_queue,deliver0,4,
        [{file,"rabbit_stream_queue.erl"},{line,540}]},
    {rabbit_stream_queue,'-deliver/3-fun-0-',4,
        [{file,"rabbit_stream_queue.erl"},{line,526}]},
    {lists,foldl,3,[{file,"lists.erl"},{line,2146}]},
    {rabbit_queue_type,'-deliver0/4-fun-5-',5,
        [{file,"rabbit_queue_type.erl"},{line,707}]},
    {maps,fold_1,4,[{file,"maps.erl"},{line,860}]},
    {rabbit_queue_type,deliver0,4,
        [{file,"rabbit_queue_type.erl"},{line,704}]},
    {rabbit_queue_type,deliver,4,
        [{file,"rabbit_queue_type.erl"},{line,662}]}]}
```

Co-authored-by: Karl Nilsson <kjnilsson@gmail.com>
(cherry picked from commit 68de3fd)
  • Loading branch information
mkuratczyk authored and mergify[bot] committed Dec 20, 2024
1 parent 9cf304c commit 55dc45c
Showing 1 changed file with 24 additions and 1 deletion.
25 changes: 24 additions & 1 deletion deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -923,8 +923,31 @@ readers(QName) ->
{node(), 0}
end.

get_writer_pid(Q) ->
case amqqueue:get_pid(Q) of
none ->
%% the stream is still starting; wait up to 5 seconds
%% and ask the coordinator as it has the Pid sooner
#{name := StreamId} = amqqueue:get_type_state(Q),
get_writer_pid(StreamId, 50);
Pid ->
Pid
end.

get_writer_pid(_StreamId, 0) ->
stream_not_found;
get_writer_pid(StreamId, N) ->
case rabbit_stream_coordinator:writer_pid(StreamId) of
{ok, Pid} ->
Pid;
_ ->
timer:sleep(100),
get_writer_pid(StreamId, N - 1)
end.


init(Q) when ?is_amqqueue(Q) ->
Leader = amqqueue:get_pid(Q),
Leader = get_writer_pid(Q),
QName = amqqueue:get_name(Q),
#{name := StreamId} = amqqueue:get_type_state(Q),
%% tell us about leader changes so we can fail over
Expand Down

0 comments on commit 55dc45c

Please sign in to comment.