Skip to content

Commit

Permalink
fix(event-stream): consume any extra messages when closing
Browse files Browse the repository at this point in the history
  • Loading branch information
paulswartz committed Oct 4, 2023
1 parent c01d0f5 commit 683518e
Showing 1 changed file with 28 additions and 6 deletions.
34 changes: 28 additions & 6 deletions apps/api_web/lib/api_web/event_stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,17 @@ defmodule ApiWeb.EventStream do
:proc_lib.hibernate(__MODULE__, :hibernate_loop, [state])

{:close, conn} ->
Supervisor.server_unsubscribe(state.pid)
conn
unsubscribe(%{state | conn: conn})

{:error, :closed} ->
Supervisor.server_unsubscribe(state.pid)
state.conn
unsubscribe(state)

{:error, error} ->
Logger.warning(
"#{__MODULE__} unexpected error in hibernate_loop self=#{inspect(self())} server=#{inspect(state.pid)} reason=#{inspect(error)}"
)

Supervisor.server_unsubscribe(state.pid)
state.conn
unsubscribe(state)
end
end

Expand Down Expand Up @@ -117,6 +114,31 @@ defmodule ApiWeb.EventStream do
end
end

defp unsubscribe(%{pid: pid} = state) when is_pid(pid) do
Supervisor.server_unsubscribe(pid)
unsubscribe(%{state | pid: nil})
end

defp unsubscribe(state) do
# consume any extra messages received after unsubscribing
receive do
{:events, _} ->
unsubscribe(state)

:timeout ->
unsubscribe(state)

{:error, _} ->
unsubscribe(state)

{:DOWN, _, :process, _pid, _reason} ->
unsubscribe(state)
after
0 ->
state.conn
end
end

@spec ensure_timer(state) :: state
defp ensure_timer(%{timer: nil, timeout: timeout} = state) do
ref = Process.send_after(self(), :timeout, timeout)
Expand Down

0 comments on commit 683518e

Please sign in to comment.