Skip to content

Commit

Permalink
fix(event-stream): close immediately if CheckForShutdown is not running
Browse files Browse the repository at this point in the history
  • Loading branch information
paulswartz committed Oct 4, 2023
1 parent ce2d6c5 commit bc50cf8
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 15 deletions.
15 changes: 10 additions & 5 deletions apps/api_web/lib/api_web/event_stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,23 @@ defmodule ApiWeb.EventStream do

@spec initialize(Plug.Conn.t(), module) :: state
def initialize(conn, module, timeout \\ 30_000) do
{:ok, pid} = Supervisor.server_subscribe(conn, module)
Logger.debug("#{__MODULE__} connected self=#{inspect(self())} server=#{inspect(pid)}")
Process.monitor(pid)

conn =
conn
|> put_resp_header("content-type", "text/event-stream")
# prevent nginx from buffering the stream
|> put_resp_header("x-accel-buffering", "no")
|> send_chunked(200)

ensure_timer(%__MODULE__{conn: conn, pid: pid, timeout: timeout})
if CheckForShutdown.running?() do
{:ok, pid} = Supervisor.server_subscribe(conn, module)
Logger.debug("#{__MODULE__} connected self=#{inspect(self())} server=#{inspect(pid)}")
Process.monitor(pid)

ensure_timer(%__MODULE__{conn: conn, pid: pid, timeout: timeout})
else
send(self(), {:close, conn})
%__MODULE__{conn: conn, pid: nil, timeout: timeout}
end
end

@spec hibernate_loop(state) :: Plug.Conn.t()
Expand Down
13 changes: 3 additions & 10 deletions apps/api_web/test/api_web/event_stream_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -136,29 +136,23 @@ defmodule ApiWeb.EventStreamTest do
on_exit(fn -> assert_stopped(state.pid) end)
end

test "closes the connection once CheckForShutdown.shutdown() is called (events)", %{
test "closes immediately if CheckForShutdown is not running", %{
conn: conn
} do
CheckForShutdown.shutdown()

state = initialize(conn, @module)
assert_receive_data()

send(self(), {:events, []})

assert {:close, conn} = receive_result(state)

assert chunks(conn) == ""
on_exit(fn -> assert_stopped(state.pid) end)
end

test "closes the connection once CheckForShutdown.shutdown() is called (timeout)", %{
conn: conn
} do
CheckForShutdown.shutdown()

state = initialize(conn, @module)
assert_receive_data()
CheckForShutdown.shutdown()

send(self(), :timeout)

Expand All @@ -171,10 +165,9 @@ defmodule ApiWeb.EventStreamTest do
test "closes the connection once CheckForShutdown.shutdown() is called (unknown message)", %{
conn: conn
} do
CheckForShutdown.shutdown()

state = initialize(conn, @module)
assert_receive_data()
CheckForShutdown.shutdown()

send(self(), :unknown_message)

Expand Down

0 comments on commit bc50cf8

Please sign in to comment.