Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow callers to define the scaling strategy of the FLAME pool #51

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 66 additions & 74 deletions lib/flame/pool.ex
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
defmodule FLAME.Pool.RunnerState do
@moduledoc false

@type t :: %__MODULE__{}
defstruct count: nil, pid: nil, monitor_ref: nil
end

defmodule FLAME.Pool.WaitingState do
@moduledoc false

@type t :: %__MODULE__{}
defstruct from: nil, monitor_ref: nil, deadline: nil
end

Expand All @@ -26,7 +28,7 @@ defmodule FLAME.Pool do

children = [
...,
{FLAME.Pool, name: MyRunner, min: 1, max: 10, max_concurrency: 100}
{FLAME.Pool, name: MyRunner, min: 1, max: 10}
]

See `start_link/1` for supported options.
Expand All @@ -40,11 +42,12 @@ defmodule FLAME.Pool do
alias FLAME.{Pool, Runner, Queue, CodeSync}
alias FLAME.Pool.{RunnerState, WaitingState, Caller}

@default_max_concurrency 100
@default_strategy {Pool.PerRunnerMaxConcurrencyStrategy, [max_concurrency: 100]}
@boot_timeout 30_000
@idle_shutdown_after 30_000
@async_boot_debounce 1_000

@type t :: %__MODULE__{}
defstruct name: nil,
runner_sup: nil,
task_sup: nil,
Expand All @@ -55,7 +58,7 @@ defmodule FLAME.Pool do
min_idle_shutdown_after: nil,
min: nil,
max: nil,
max_concurrency: nil,
strategy: nil,
callers: %{},
waiting: Queue.new(),
runners: %{},
Expand Down Expand Up @@ -89,8 +92,7 @@ defmodule FLAME.Pool do

* `:max` - The maximum number of runners to elastically grow to in the pool.

* `:max_concurrency` - The maximum number of concurrent executions per runner before
booting new runners or queueing calls. Defaults to `100`.
* `:strategy` - The strategy to use. Defaults to `FLAME.Pool.PerRunnerMaxConcurrencyStrategy`.

* `:single_use` - if `true`, runners will be terminated after each call completes.
Defaults `false`.
Expand Down Expand Up @@ -183,7 +185,7 @@ defmodule FLAME.Pool do
],
min: 1,
max: 1,
max_concurrency: 10,
strategy: {FLAME.Pool.PerRunnerMaxConcurrencyStrategy, [max_concurrency: 10]},
backend: {FLAME.FlyBackend,
cpu_kind: "performance", cpus: 4, memory_mb: 8192,
token: System.fetch_env!("FLY_API_TOKEN"),
Expand All @@ -203,7 +205,7 @@ defmodule FLAME.Pool do
:min_idle_shutdown_after,
:min,
:max,
:max_concurrency,
:strategy,
:backend,
:log,
:single_use,
Expand Down Expand Up @@ -417,7 +419,7 @@ defmodule FLAME.Pool do
boot_timeout: boot_timeout,
idle_shutdown_after: Keyword.get(opts, :idle_shutdown_after, @idle_shutdown_after),
min_idle_shutdown_after: Keyword.get(opts, :min_idle_shutdown_after, :infinity),
max_concurrency: Keyword.get(opts, :max_concurrency, @default_max_concurrency),
strategy: Keyword.get(opts, :strategy, @default_strategy),
on_grow_start: opts[:on_grow_start],
on_grow_end: opts[:on_grow_end],
on_shrink: opts[:on_shrink],
Expand Down Expand Up @@ -489,21 +491,16 @@ defmodule FLAME.Pool do
{:noreply, checkout_runner(state, deadline, from)}
end

defp runner_count(state) do
map_size(state.runners) + map_size(state.pending_runners)
def runner_count(state) do
map_size(state.runners)
end

defp waiting_count(%Pool{waiting: %Queue{} = waiting}) do
def waiting_count(%Pool{waiting: %Queue{} = waiting}) do
Queue.size(waiting)
end

defp min_runner(state) do
if map_size(state.runners) == 0 do
nil
else
{_ref, min} = Enum.min_by(state.runners, fn {_, %RunnerState{count: count}} -> count end)
min
end
def pending_count(state) do
map_size(state.pending_runners)
end

defp replace_caller(state, checkout_ref, caller_pid, child_pid) do
Expand Down Expand Up @@ -544,26 +541,17 @@ defmodule FLAME.Pool do
end

defp checkout_runner(%Pool{} = state, deadline, from, monitor_ref \\ nil) do
min_runner = min_runner(state)
runner_count = runner_count(state)
{strategy_module, strategy_opts} = state.strategy

cond do
min_runner && min_runner.count < state.max_concurrency ->
reply_runner_checkout(state, min_runner, from, monitor_ref)
actions = strategy_module.checkout_runner(state, strategy_opts)

runner_count < state.max ->
if state.async_boot_timer ||
map_size(state.pending_runners) * state.max_concurrency > waiting_count(state) do
waiting_in(state, deadline, from)
else
state
|> async_boot_runner()
|> waiting_in(deadline, from)
end

true ->
waiting_in(state, deadline, from)
end
Enum.reduce(actions, state, fn action, acc ->
case action do
:wait -> waiting_in(acc, deadline, from)
:scale -> async_boot_runner(acc)
{:checkout, runner} -> reply_runner_checkout(acc, runner, from, monitor_ref)
end
end)
end

defp reply_runner_checkout(state, %RunnerState{} = runner, from, monitor_ref) do
Expand Down Expand Up @@ -630,17 +618,29 @@ defmodule FLAME.Pool do
end

defp async_boot_runner(%Pool{on_grow_start: on_grow_start, name: name} = state) do
new_count = runner_count(state) + 1
{strategy_module, strategy_opts} = state.strategy

task =
Task.Supervisor.async_nolink(state.task_sup, fn ->
if on_grow_start, do: on_grow_start.(%{count: new_count, name: name, pid: self()})
current_count = runner_count(state) + pending_count(state)
new_count = strategy_module.desired_count(state, strategy_opts)

start_child_runner(state)
end)
num_tasks = max(new_count - current_count, 0)

new_pending = Map.put(state.pending_runners, task.ref, task.pid)
%Pool{state | pending_runners: new_pending}
if num_tasks do
tasks =
for _ <- 1..num_tasks do
Task.Supervisor.async_nolink(state.task_sup, fn ->
if on_grow_start, do: on_grow_start.(%{count: new_count, name: name, pid: self()})
start_child_runner(state)
end)
end

pending_runners = Map.new(tasks, &{&1.ref, &1.pid})
new_pending = Map.merge(state.pending_runners, pending_runners)

%Pool{state | pending_runners: new_pending}
else
state
end
end

defp start_child_runner(%Pool{} = state, runner_opts \\ []) do
Expand Down Expand Up @@ -754,8 +754,15 @@ defmodule FLAME.Pool do
defp handle_down(%Pool{} = state, {:DOWN, ref, :process, pid, reason}) do
state = maybe_drop_waiting(state, pid)

%{
callers: callers,
runners: runners,
pending_runners: pending_runners,
strategy: {strategy_module, strategy_opts}
} = state

state =
case state.callers do
case callers do
%{^pid => %Caller{monitor_ref: ^ref} = caller} ->
drop_caller(state, pid, caller)

Expand All @@ -764,16 +771,16 @@ defmodule FLAME.Pool do
end

state =
case state.runners do
case runners do
%{^ref => _} -> drop_child_runner(state, ref)
%{} -> state
end

case state.pending_runners do
case pending_runners do
%{^ref => _} ->
state = %Pool{state | pending_runners: Map.delete(state.pending_runners, ref)}
# we rate limit this to avoid many failed async boot attempts
if has_unmet_servicable_demand?(state) do
if strategy_module.has_unmet_servicable_demand?(state, strategy_opts) do
state
|> maybe_on_grow_end(pid, {:exit, reason})
|> schedule_async_boot_runner()
Expand All @@ -787,7 +794,7 @@ defmodule FLAME.Pool do
end

defp maybe_on_grow_end(%Pool{on_grow_end: on_grow_end} = state, pid, result) do
new_count = runner_count(state)
new_count = runner_count(state) + pending_count(state)
meta = %{count: new_count, name: state.name, pid: pid}

case result do
Expand All @@ -799,17 +806,12 @@ defmodule FLAME.Pool do
end

defp maybe_on_shrink(%Pool{} = state) do
new_count = runner_count(state)
new_count = runner_count(state) + pending_count(state)
if state.on_shrink, do: state.on_shrink.(%{count: new_count, name: state.name})

state
end

defp has_unmet_servicable_demand?(%Pool{} = state) do
waiting_count(state) > map_size(state.pending_runners) * state.max_concurrency and
runner_count(state) < state.max
end

defp handle_runner_async_up(%Pool{} = state, pid, ref) when is_pid(pid) and is_reference(ref) do
%{^ref => task_pid} = state.pending_runners
Process.demonitor(ref, [:flush])
Expand All @@ -818,25 +820,15 @@ defmodule FLAME.Pool do
{runner, new_state} = put_runner(new_state, pid)
new_state = maybe_on_grow_end(new_state, task_pid, :ok)

# pop waiting callers up to max_concurrency, but we must handle:
# 1. the case where we have no waiting callers
# 2. the case where we process a DOWN for the new runner as we pop DOWNs
# looking for fresh waiting
# 3. if we still have waiting callers at the end, boot more runners if we have capacity
Enum.reduce_while(1..state.max_concurrency, new_state, fn i, acc ->
with {:ok, %RunnerState{} = runner} <- Map.fetch(acc.runners, runner.monitor_ref),
true <- i <= acc.max_concurrency do
case pop_next_waiting_caller(acc) do
{%WaitingState{} = next, acc} ->
{:cont, reply_runner_checkout(acc, runner, next.from, next.monitor_ref)}

{nil, acc} ->
{:halt, acc}
end
else
_ -> {:halt, acc}
end
end)
{strategy_module, strategy_opts} = state.strategy

pop = fn state -> pop_next_waiting_caller(state) end

checkout = fn state, runner, from, monitor_ref ->
reply_runner_checkout(state, runner, from, monitor_ref)
end

strategy_module.assign_waiting_callers(new_state, runner, pop, checkout, strategy_opts)
end

defp deadline(timeout) when is_integer(timeout) do
Expand Down
79 changes: 79 additions & 0 deletions lib/flame/pool/per_runner_max_concurrency_strategy.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
defmodule FLAME.Pool.PerRunnerMaxConcurrencyStrategy do
alias FLAME.Pool
@behaviour FLAME.Pool.Strategy

def checkout_runner(%Pool{} = pool, opts) do
min_runner = min_runner(pool)
runner_count = Pool.runner_count(pool) + Pool.pending_count(pool)
max_concurrency = Keyword.fetch!(opts, :max_concurrency)

cond do
min_runner && min_runner.count < max_concurrency ->
[{:checkout, min_runner}]

runner_count < pool.max ->
if pool.async_boot_timer ||
map_size(pool.pending_runners) * max_concurrency > Pool.waiting_count(pool) do
[:wait]
else
[:scale, :wait]
end

true ->
[:wait]
end
end

def assign_waiting_callers(
%Pool{} = pool,
%Pool.RunnerState{} = runner,
pop_next_waiting_caller,
reply_runner_checkout,
opts
) do
max_concurrency = Keyword.fetch!(opts, :max_concurrency)

# pop waiting callers up to max_concurrency, but we must handle:
# 1. the case where we have no waiting callers
# 2. the case where we process a DOWN for the new runner as we pop DOWNs
# looking for fresh waiting
{pool, _assigned_concurrency} =
Enum.reduce_while(1..max_concurrency, {pool, 0}, fn _i, {pool, assigned_concurrency} ->
with {:ok, %Pool.RunnerState{} = runner} <- Map.fetch(pool.runners, runner.monitor_ref),
true <- assigned_concurrency <= max_concurrency do
case pop_next_waiting_caller.(pool) do
{%Pool.WaitingState{} = next, pool} ->
pool = reply_runner_checkout.(pool, runner, next.from, next.monitor_ref)
{:cont, {pool, assigned_concurrency + 1}}

{nil, pool} ->
{:halt, {pool, assigned_concurrency}}
end
else
_ -> {:halt, {pool, assigned_concurrency}}
end
end)

pool
end

def desired_count(%Pool{} = pool, _opts) do
Pool.runner_count(pool) + Pool.pending_count(pool) + 1
end

def has_unmet_servicable_demand?(%Pool{} = pool, _opts) do
runner_count = Pool.runner_count(pool) + Pool.pending_count(pool)
Pool.waiting_count(pool) > 0 and runner_count < pool.max
end

defp min_runner(pool) do
if map_size(pool.runners) == 0 do
nil
else
{_ref, min} =
Enum.min_by(pool.runners, fn {_, %Pool.RunnerState{count: count}} -> count end)

min
end
end
end
27 changes: 27 additions & 0 deletions lib/flame/pool/strategy.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
defmodule FLAME.Pool.Strategy do
alias FLAME.Pool

@type action ::
:wait
| :scale
| {:checkout, Pool.RunnerState.t()}

@callback checkout_runner(state :: Pool.t(), opts :: Keyword.t()) :: list(action)

@type pop_next_waiting_caller_fun :: (Pool.t() -> {Pool.WaitingState.t() | nil, Pool.t()})
@type reply_runner_checkout_fun ::
(Pool.t(), Pool.RunnerState.t(), pid(), reference() -> Pool.t())

@callback assign_waiting_callers(
state :: Pool.t(),
runner :: Pool.RunnerState.t(),
pop_next_waiting_caller :: pop_next_waiting_caller_fun(),
reply_runner_checkout :: reply_runner_checkout_fun(),
opts :: Keyword.t()
) ::
Pool.t()

@callback desired_count(state :: Pool.t(), opts :: Keyword.t()) :: non_neg_integer()

@callback has_unmet_servicable_demand?(state :: Pool.t(), opts :: Keyword.t()) :: boolean()
end
Loading