diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index a7aa3a5a18c..0fab693cf8f 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -923,8 +923,31 @@ readers(QName) -> {node(), 0} end. +get_writer_pid(Q) -> + case amqqueue:get_pid(Q) of + none -> + %% the stream is still starting; wait up to 5 seconds + %% and ask the coordinator as it has the Pid sooner + #{name := StreamId} = amqqueue:get_type_state(Q), + get_writer_pid(StreamId, 50); + Pid -> + Pid + end. + +get_writer_pid(_StreamId, 0) -> + stream_not_found; +get_writer_pid(StreamId, N) -> + case rabbit_stream_coordinator:writer_pid(StreamId) of + {ok, Pid} -> + Pid; + _ -> + timer:sleep(100), + get_writer_pid(StreamId, N - 1) + end. + + init(Q) when ?is_amqqueue(Q) -> - Leader = amqqueue:get_pid(Q), + Leader = get_writer_pid(Q), QName = amqqueue:get_name(Q), #{name := StreamId} = amqqueue:get_type_state(Q), %% tell us about leader changes so we can fail over