Skip to content

Commit

Permalink
Merge pull request #44 from Logflare/feat/request-throttling
Browse files Browse the repository at this point in the history
feat: request throttling
  • Loading branch information
Ziinc authored Nov 23, 2023
2 parents 49b5b7a + dedc09a commit 0f0e1e9
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 7 deletions.
8 changes: 8 additions & 0 deletions lib/counter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ defmodule Loadfest.Counter do
def add(batch_size) do
GenServer.cast(__MODULE__, {:add, batch_size})
end
def requests() do
GenServer.call(__MODULE__, :requests)
end

def handle_cast({:add, size}, state) do
:counters.add(state.ref, idx(:requests), 1)
Expand All @@ -29,6 +32,11 @@ defmodule Loadfest.Counter do
{:noreply, state}
end

def handle_call(:requests, _caller, state) do
value = :counters.get(state.ref, idx(:requests))
{:reply, value, state}
end

def handle_info(:log, state) do
stats = %{
requests: :counters.get(state.ref, idx(:requests)),
Expand Down
18 changes: 11 additions & 7 deletions lib/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Loadfest.Pipeline do
alias Broadway.Message
require Logger
@source_names Application.get_env(:loadfest, :source_names)
@max_rps Application.get_env(:loadfest, :max_rps, 100_000)
defmodule Producer do
use GenStage

Expand Down Expand Up @@ -32,7 +33,7 @@ defmodule Loadfest.Pipeline do
module: {Loadfest.Pipeline.Producer, []}
],
processors: [
default: [concurrency: 50, max_demand: 1]
default: [concurrency: min(@max_rps, 50), max_demand: 1]
]
)
end
Expand All @@ -45,14 +46,17 @@ defmodule Loadfest.Pipeline do
batch: message.data
}

request = Loadfest.Client.send(name, body)
rps = Loadfest.Counter.requests()
if rps < @max_rps do
request = Loadfest.Client.send(name, body)

if request.status == 200 do
Loadfest.Counter.add(length(message.data))
end
if request.status == 200 do
Loadfest.Counter.add(length(message.data))
end

if request.status >= 400 do
Logger.warning("#{request.status} | #{inspect(request.body)}")
if request.status >= 400 do
Logger.warning("#{request.status} | #{inspect(request.body)}")
end
end

message
Expand Down

0 comments on commit 0f0e1e9

Please sign in to comment.