Skip to content

Commit

Permalink
Ensure trackables occupy slots in the runner's concurreny pool. (#59)
Browse files Browse the repository at this point in the history
*Note*: this allows a single caller checkout (FLAME.call) to
return multiple trackables that exceed the runners max_concurrency
limits. This is intentional and concurrency limits will be properly
tracked and freed as the trackables go down and their checkouts
are released.
  • Loading branch information
chrismccord authored Sep 5, 2024
1 parent 999688a commit 0d55f3e
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 42 deletions.
47 changes: 29 additions & 18 deletions lib/flame/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -289,13 +289,13 @@ defmodule FLAME.Pool do
|> Keyword.put_new(:link, true)

case Runner.place_child(runner_pid, child_spec, place_opts) do
{:ok, child_pid} = result ->
{{:ok, child_pid}, _trackable_pids = []} = result ->
# we are placing the link back on the parent node, but we are protected
# from racing the link on the child FLAME because the terminator on
# the remote flame is monitoring the caller and will terminator the child
# if we go away
if Keyword.fetch!(place_opts, :link), do: Process.link(child_pid)
{:cancel, {:replace, child_pid}, result}
{:cancel, {:replace, [child_pid]}, result}

:ignore ->
{:cancel, :ok, :ignore}
Expand Down Expand Up @@ -330,7 +330,11 @@ defmodule FLAME.Pool do
send_cancel(pid, ref, :catch)
:erlang.raise(kind, reason, __STACKTRACE__)
else
{:cancel, reason, result} ->
{:cancel, :ok, {result, [_ | _] = trackable_pids}} ->
send_cancel(pid, ref, {:replace, trackable_pids})
result

{:cancel, reason, {result, [] = _trackable_pids}} ->
send_cancel(pid, ref, reason)
result
end
Expand Down Expand Up @@ -481,8 +485,8 @@ defmodule FLAME.Pool do

def handle_info({:cancel, ref, caller_pid, reason}, state) do
case reason do
{:replace, child_pid} ->
{:noreply, replace_caller(state, ref, caller_pid, child_pid)}
{:replace, child_pids} ->
{:noreply, replace_caller(state, ref, caller_pid, child_pids)}

reason when reason in [:ok, :timeout, :catch] ->
{:noreply, checkin_runner(state, ref, caller_pid, reason)}
Expand Down Expand Up @@ -511,23 +515,30 @@ defmodule FLAME.Pool do
end
end

defp replace_caller(state, checkout_ref, caller_pid, child_pid) do
# replace caller with child pid and do not inc concurrency counts since we are replacing
defp replace_caller(%Pool{} = state, checkout_ref, caller_pid, [_ | _] = child_pids) do
# replace caller with child pids and increase concurrency counts for the runner
%{^caller_pid => %Caller{checkout_ref: ^checkout_ref} = caller} = state.callers
Process.demonitor(caller.monitor_ref, [:flush])

new_caller = %Caller{
checkout_ref: checkout_ref,
monitor_ref: Process.monitor(child_pid),
runner_ref: caller.runner_ref
}
new_callers = Map.delete(state.callers, caller_pid)

new_callers =
state.callers
|> Map.delete(caller_pid)
|> Map.put(child_pid, new_caller)
Enum.reduce(child_pids, new_callers, fn child_pid, acc ->
new_caller = %Caller{
checkout_ref: checkout_ref,
monitor_ref: Process.monitor(child_pid),
runner_ref: caller.runner_ref
}

Map.put(acc, child_pid, new_caller)
end)

%Pool{state | callers: new_callers}
inc_runner_count(
%Pool{state | callers: new_callers},
caller.runner_ref,
# subtract 1 because the caller we are replacing is already in the count
length(child_pids) - 1
)
end

defp checkin_runner(state, ref, caller_pid, reason)
Expand Down Expand Up @@ -677,10 +688,10 @@ defmodule FLAME.Pool do
{runner, new_state}
end

defp inc_runner_count(%Pool{} = state, ref) do
defp inc_runner_count(%Pool{} = state, ref, amount \\ 1) do
new_runners =
Map.update!(state.runners, ref, fn %RunnerState{} = runner ->
%RunnerState{runner | count: runner.count + 1}
%RunnerState{runner | count: runner.count + amount}
end)

%Pool{state | runners: new_runners}
Expand Down
39 changes: 23 additions & 16 deletions lib/flame/runner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,12 @@ defmodule FLAME.Runner do
end)

case result do
{:ok, value} ->
:ok = checkin(runner_pid, ref)
value
{:ok, {value, trackable_pids}} ->
:ok = checkin(runner_pid, ref, trackable_pids)
{value, trackable_pids}

{kind, reason} ->
:ok = checkin(runner_pid, ref)
:ok = checkin(runner_pid, ref, [])

case kind do
:exit -> exit(reason)
Expand All @@ -151,8 +151,8 @@ defmodule FLAME.Runner do
GenServer.call(runner_pid, :checkout)
end

defp checkin(runner_pid, ref) do
GenServer.call(runner_pid, {:checkin, ref})
defp checkin(runner_pid, ref, trackable_pids) do
GenServer.call(runner_pid, {:checkin, ref, trackable_pids})
end

@impl true
Expand Down Expand Up @@ -253,8 +253,6 @@ defmodule FLAME.Runner do
end

def handle_call(:checkout, {from_pid, _tag}, state) do
ref = Process.monitor(from_pid)

state =
case maybe_diff_code_paths(state) do
{new_state, nil} ->
Expand All @@ -270,12 +268,20 @@ defmodule FLAME.Runner do
new_state
end

{:reply, {ref, state.runner, state.backend_state}, put_checkout(state, from_pid, ref)}
{new_state, ref} = put_checkout(state, from_pid)
{:reply, {ref, new_state.runner, new_state.backend_state}, new_state}
end

def handle_call({:checkin, ref}, _from, state) do
def handle_call({:checkin, ref, trackable_pids}, _from, state) do
Process.demonitor(ref, [:flush])
{:reply, :ok, drop_checkout(state, ref)}

new_state =
Enum.reduce(trackable_pids, state, fn pid, acc ->
{acc, _ref} = put_checkout(acc, pid)
acc
end)

{:reply, :ok, drop_checkout(new_state, ref)}
end

def handle_call({:remote_boot, base_sync_stream, _timeout}, _from, state) do
Expand All @@ -301,7 +307,7 @@ defmodule FLAME.Runner do
terminator: term
} = new_runner

:ok =
{:ok, _} =
remote_call!(runner, new_backend_state, runner.boot_timeout, false, fn ->
# ensure app is fully started if parent connects before up
if otp_app, do: {:ok, _} = Application.ensure_all_started(otp_app)
Expand Down Expand Up @@ -407,8 +413,9 @@ defmodule FLAME.Runner do
result
end

defp put_checkout(state, from_pid, ref) when is_pid(from_pid) do
%{state | checkouts: Map.put(state.checkouts, ref, from_pid)}
defp put_checkout(state, from_pid) when is_pid(from_pid) do
ref = Process.monitor(from_pid)
{%{state | checkouts: Map.put(state.checkouts, ref, from_pid)}, ref}
end

defp drop_checkout(state, ref) when is_reference(ref) do
Expand Down Expand Up @@ -463,9 +470,9 @@ defmodule FLAME.Runner do
if track_resources? do
{result, pids} = FLAME.track_resources(result, [], node(remote_pid))
send(remote_pid, {parent_ref, pids})
{:ok, result}
{:ok, {result, pids}}
else
{:ok, result}
{:ok, {result, []}}
end

{:DOWN, ^remote_monitor_ref, :process, ^remote_pid, reason} ->
Expand Down
75 changes: 75 additions & 0 deletions test/flame_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -543,5 +543,80 @@ defmodule FLAME.FLAMETest do
send(pid, {trackable.ref, :stop})
assert_receive {:DOWN, _, _, ^runner, {:shutdown, :idle}}, 1000
end

@tag runner: [
min: 0,
max: 1,
max_concurrency: 1,
idle_shutdown_after: 100,
track_resources: true
]
test "remote with tracking max concurrency", %{runner_sup: runner_sup} = config do
non_trackable = URI.new!("/")

call = fn count ->
ref = make_ref()

trackables =
for _ <- 1..count,
do: %MyTrackable{
name: :"#{config.test}_trackable_#{System.unique_integer()}",
ref: ref
}

[{%{"yes" => trackables, "no" => non_trackable}}]
end

[{map}] = FLAME.call(config.test, fn -> call.(2) end)

assert [{:undefined, runner, :worker, [FLAME.Runner]}] =
Supervisor.which_children(runner_sup)

Process.monitor(runner)
assert map_size(map) == 2
assert ^non_trackable = map["no"]
assert [%MyTrackable{} = trackable1, %MyTrackable{} = trackable2] = map["yes"]

# original trackables still occupies the slots
assert Process.alive?(trackable1.pid)
assert Process.alive?(trackable2.pid)
refute_receive {:DOWN, _, _, ^runner, _}, 1000

# check in the trackable 1
send(trackable1.pid, {trackable1.ref, :stop})

# no idle down because second trackable still alive
refute_receive {:DOWN, _, _, ^runner, _}, 1000

# trackable2 occupies the only available slot, so next call times out
caught =
try do
FLAME.call(config.test, fn -> call.(1) end, timeout: 1000)
catch
kind, reason -> {kind, reason}
end

assert {:exit, {:timeout, _}} = caught

# check in the trackable 2
send(trackable2.pid, {trackable2.ref, :stop})

# runner is now free for more work on open slot
[{map}] = FLAME.call(config.test, fn -> call.(1) end)

assert [{:undefined, runner, :worker, [FLAME.Runner]}] =
Supervisor.which_children(runner_sup)

Process.monitor(runner)
assert map_size(map) == 2
assert ^non_trackable = map["no"]
assert [%MyTrackable{pid: pid} = trackable] = map["yes"]

# check in the trackable
send(pid, {trackable.ref, :stop})

# runner idles down
assert_receive {:DOWN, _, _, ^runner, {:shutdown, :idle}}, 1000
end
end
end
16 changes: 8 additions & 8 deletions test/runner_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ defmodule FLAME.RunnerTest do
)

assert Runner.remote_boot(runner, nil) == :ok
assert Runner.call(runner, self(), fn -> :works end) == :works
assert Runner.call(runner, self(), fn -> :works end) == {:works, []}
assert_receive :stopped
end

Expand All @@ -113,9 +113,9 @@ defmodule FLAME.RunnerTest do

assert Runner.remote_boot(runner, nil) == :ok
assert Runner.remote_boot(runner, nil) == {:error, :already_booted}
assert Runner.call(runner, self(), fn -> :works end) == :works
assert Runner.call(runner, self(), fn -> :works end) == {:works, []}
refute_receive :stopped
assert Runner.call(runner, self(), fn -> :still_works end) == :still_works
assert Runner.call(runner, self(), fn -> :still_works end) == {:still_works, []}
ref = Process.monitor(runner)
assert Runner.shutdown(runner) == :ok
assert_receive :stopped
Expand Down Expand Up @@ -169,7 +169,7 @@ defmodule FLAME.RunnerTest do
assert {:exit, {%RuntimeError{message: "boom"}, _}} = error
refute_receive :stopped
refute_receive {:DOWN, _ref, :process, ^runner, _}
assert Runner.call(runner, self(), fn -> :works end) == :works
assert Runner.call(runner, self(), fn -> :works end) == {:works, []}
assert Runner.shutdown(runner) == :ok
end
end
Expand Down Expand Up @@ -210,7 +210,7 @@ defmodule FLAME.RunnerTest do

refute_receive :stopped
refute_receive {:DOWN, _ref, :process, ^runner, _}
assert Runner.call(runner, self(), fn -> :works end, timeout: 1234) == :works
assert Runner.call(runner, self(), fn -> :works end, timeout: 1234) == {:works, []}
assert Runner.shutdown(runner) == :ok
end
end
Expand All @@ -231,7 +231,7 @@ defmodule FLAME.RunnerTest do
Process.unlink(runner)
Process.monitor(runner)
assert Runner.remote_boot(runner, nil) == :ok
assert Runner.call(runner, self(), fn -> :works end) == :works
assert Runner.call(runner, self(), fn -> :works end) == {:works, []}
assert_receive :stopped, timeout * 2
assert_receive {:DOWN, _ref, :process, ^runner, _}
end
Expand Down Expand Up @@ -264,7 +264,7 @@ defmodule FLAME.RunnerTest do

Process.monitor(runner)
assert Runner.remote_boot(runner, stream) == :ok
assert Runner.call(runner, self(), fn -> :works end, timeout: 1234) == :works
assert Runner.call(runner, self(), fn -> :works end, timeout: 1234) == {:works, []}
assert Runner.shutdown(runner) == :ok
# called on remote boot
assert_receive {CodeSyncMock, {_mock, :extract}}
Expand All @@ -278,7 +278,7 @@ defmodule FLAME.RunnerTest do

Process.monitor(runner)
assert Runner.remote_boot(runner, nil) == :ok
assert Runner.call(runner, self(), fn -> :works end, timeout: 1234) == :works
assert Runner.call(runner, self(), fn -> :works end, timeout: 1234) == {:works, []}
assert Runner.shutdown(runner) == :ok
refute_receive {CodeSyncMock, _}
end
Expand Down

0 comments on commit 0d55f3e

Please sign in to comment.