Skip to content

Commit

Permalink
perf: BufferLimiter optimization, use PubSubRates.Cache instead (#2141)
Browse files Browse the repository at this point in the history
* perf: BufferLimiter optimization, use PubSubRates.Cache to limit incoming requests.

* chore: compilation warnings
  • Loading branch information
Ziinc authored Jul 12, 2024
1 parent f073f89 commit bd8e7e6
Show file tree
Hide file tree
Showing 12 changed files with 136 additions and 31 deletions.
47 changes: 42 additions & 5 deletions lib/logflare/backends.ex
Original file line number Diff line number Diff line change
Expand Up @@ -416,22 +416,59 @@ defmodule Logflare.Backends do
Checks if a local buffer is full.
"""
def local_pending_buffer_full?(%Source{} = source) do
local_pending_buffer_len(source) > @max_pending_buffer_len
get_and_cache_local_pending_buffer_len(source) > @max_pending_buffer_len
end

@doc """
Uses the buffers cache in PubSubRates.Cache to determine if pending buffer is full.
Much more performant than not using the cache.
"""
def cached_local_pending_buffer_full?(%Source{} = source) do
cached_local_pending_buffer_len(source) > @max_pending_buffer_len
end

@doc """
Get local pending buffer len of a source/backend combination, and caches it at the same time.
"""
@spec get_and_cache_local_pending_buffer_len(
Source.t() | integer(),
Backend.t() | nil | integer()
) ::
integer()
def get_and_cache_local_pending_buffer_len(source, backend \\ nil) do
len =
case IngestEventQueue.count_pending({source, backend}) do
len when is_integer(len) -> len
_ -> 0
end

payload = %{Node.self() => %{len: len}}

if is_map(source) do
PubSubRates.Cache.cache_buffers(source.id, Map.get(backend || %{}, :id), payload)
else
PubSubRates.Cache.cache_buffers(source, backend, payload)
end

len
end

@doc """
Get local pending buffer len of a source/backend combination
"""
def local_pending_buffer_len(source, backend \\ nil) do
case IngestEventQueue.count_pending({source, backend}) do
len when is_integer(len) -> len
_ -> 0
@spec cached_local_pending_buffer_len(Source.t(), Backend.t() | nil) :: non_neg_integer()
def cached_local_pending_buffer_len(source, backend \\ nil) do
PubSubRates.Cache.get_local_buffer(source.id, Map.get(backend || %{}, :id))
|> case do
%{len: len} -> len
other -> other
end
end

@doc """
Retrieves cluster-wide pending buffer size stored in cache for a given backend/source combination.
"""
@spec cached_pending_buffer_len(Source.t(), Backend.t() | nil) :: non_neg_integer()
def cached_pending_buffer_len(%Source{} = source, backend \\ nil) do
PubSubRates.Cache.get_cluster_buffers(source.id, Map.get(backend || %{}, :id))
end
Expand Down
3 changes: 2 additions & 1 deletion lib/logflare/backends/adaptor/bigquery_adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do
alias Logflare.Users
alias Logflare.Sources
alias Logflare.Billing
alias Logflare.Backends
use Supervisor
require Logger

Expand Down Expand Up @@ -51,7 +52,7 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do
initial_count: 1,
resolve_count: fn state ->
source = Sources.refresh_source_metrics_for_ingest(source)
len = Backends.local_pending_buffer_len(source, backend)
len = Backends.get_and_cache_local_pending_buffer_len(source, backend)
handle_resolve_count(state, len, source.metrics.avg)
end
},
Expand Down
14 changes: 5 additions & 9 deletions lib/logflare/backends/ingest_event_queue/broadcast_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ defmodule Logflare.Backends.IngestEventQueue.BroadcastWorker do
A worker that broadcasts all source-backend buffer statistics periodically for the entire node.
Broadcasts cluster buffer length of a given queue (an integer) locally.
Broadcasts local buffer length of a given queue globally
Broadcasts local buffer length of a given queue globally.
"""
use GenServer
alias Logflare.Source
alias Logflare.Backends.IngestEventQueue
alias Logflare.PubSubRates
alias Logflare.Backends
require Logger
@ets_table_mapper :ingest_event_queue_mapping

Expand All @@ -20,7 +20,7 @@ defmodule Logflare.Backends.IngestEventQueue.BroadcastWorker do

def init(opts) do
state = %{interval: Keyword.get(opts, :interval, @default_interval)}
Process.send_after(self(), :global_broadcast, state.interval * 2)
Process.send_after(self(), :global_broadcast, state.interval)
Process.send_after(self(), :local_broadcast, state.interval)
{:ok, state}
end
Expand Down Expand Up @@ -53,12 +53,8 @@ defmodule Logflare.Backends.IngestEventQueue.BroadcastWorker do
{:noreply, state}
end

defp global_broadcast_producer_buffer_len({source_id, backend_id} = sid_bid) do
len =
case IngestEventQueue.count_pending(sid_bid) do
v when is_integer(v) -> v
_ -> 0
end
defp global_broadcast_producer_buffer_len({source_id, backend_id}) do
len = Backends.get_and_cache_local_pending_buffer_len(source_id, backend_id)

local_buffer = %{Node.self() => %{len: len}}
PubSubRates.global_broadcast_rate({:buffers, source_id, backend_id, local_buffer})
Expand Down
6 changes: 2 additions & 4 deletions lib/logflare/backends/ingest_event_queue/queue_janitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule Logflare.Backends.IngestEventQueue.QueueJanitor do
use GenServer
alias Logflare.Backends.IngestEventQueue
alias Logflare.Sources
alias Logflare.Backends
require Logger
@default_interval 1_000
@default_remainder 100
Expand Down Expand Up @@ -53,10 +54,7 @@ defmodule Logflare.Backends.IngestEventQueue.QueueJanitor do
sid_bid = {state.source_id, state.backend_id}
# clear out ingested events
pending_size =
case IngestEventQueue.count_pending(sid_bid) do
value when is_integer(value) -> value
_ -> 0
end
Backends.get_and_cache_local_pending_buffer_len(state.source_id, state.backend_id)

if pending_size > state.remainder do
# drop all ingested
Expand Down
13 changes: 13 additions & 0 deletions lib/logflare/pubsub_rates/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ defmodule Logflare.PubSubRates.Cache do
%{id: __MODULE__, start: {Cachex, :start_link, [@cache, [stats: stats]]}}
end

def clear() do
Cachex.clear(__MODULE__)
end

def cache_rates(source_id, rates) when is_atom(source_id) do
{:ok, val} = Cachex.get(__MODULE__, {source_id, "rates"})

Expand Down Expand Up @@ -63,6 +67,15 @@ defmodule Logflare.PubSubRates.Cache do
Cachex.get(__MODULE__, {source_id, backend_id, "buffers"})
end

@spec get_local_buffer(non_neg_integer(), non_neg_integer() | nil) :: map()
def get_local_buffer(source_id, backend_id) do
Cachex.get(__MODULE__, {source_id, backend_id, "buffers"})
|> case do
{:ok, val} when val != nil -> Map.get(val, Node.self(), 0)
_ -> 0
end
end

@doc """
Returns the sum of all buffers across the cluster for a given source and backend combination.
"""
Expand Down
2 changes: 1 addition & 1 deletion lib/logflare_web/controllers/plugs/buffer_limiter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule LogflareWeb.Plugs.BufferLimiter do
def init(_opts), do: nil

def call(%{assigns: %{source: source}} = conn, _opts \\ []) do
if Backends.local_pending_buffer_full?(source) do
if Backends.cached_local_pending_buffer_full?(source) do
conn
|> send_resp(429, "Buffer full: Too many requests")
|> halt()
Expand Down
8 changes: 4 additions & 4 deletions test/logflare/backends/bigquery_adaptor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,16 @@ defmodule Logflare.Backends.BigQueryAdaptorTest do

assert {:ok, _} = Backends.ingest_logs([log_event], source)

assert Backends.local_pending_buffer_len(source, nil) == 1
assert Backends.local_pending_buffer_len(source, backend) == 1
assert Backends.get_and_cache_local_pending_buffer_len(source, nil) == 1
assert Backends.get_and_cache_local_pending_buffer_len(source, backend) == 1
:timer.sleep(2000)

TestUtils.retry_assert(fn ->
assert_receive ^ref
end)

assert Backends.local_pending_buffer_len(source, nil) == 0
assert Backends.local_pending_buffer_len(source, backend) == 0
assert Backends.get_and_cache_local_pending_buffer_len(source, nil) == 0
assert Backends.get_and_cache_local_pending_buffer_len(source, backend) == 0
end
end

Expand Down
6 changes: 3 additions & 3 deletions test/logflare/backends/ingest_events_queue_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ defmodule Logflare.Backends.IngestEventQueueTest do
IngestEventQueue.add_to_table({source, backend}, [le])
IngestEventQueue.add_to_table({source, nil}, [le])
:timer.sleep(300)
assert Backends.local_pending_buffer_len(source) == 1
assert Backends.local_pending_buffer_len(source, backend) == 1
assert Backends.get_and_cache_local_pending_buffer_len(source) == 1
assert Backends.get_and_cache_local_pending_buffer_len(source, backend) == 1
assert PubSubRates.Cache.get_cluster_buffers(source.id, backend.id) == 1
assert PubSubRates.Cache.get_cluster_buffers(source.id, nil) == 1
end
Expand All @@ -147,7 +147,7 @@ defmodule Logflare.Backends.IngestEventQueueTest do
assert {:ok, [_]} = DemandWorker.fetch({source, backend}, 5)
assert IngestEventQueue.get_table_size({source, backend}) == 1
assert IngestEventQueue.count_pending({source, backend}) == 0
assert Backends.local_pending_buffer_len(source, backend) == 0
assert Backends.get_and_cache_local_pending_buffer_len(source, backend) == 0
end

test "QueueJanitor cleans up :ingested events" do
Expand Down
59 changes: 55 additions & 4 deletions test/logflare/backends_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ defmodule Logflare.BackendsTest do
alias Logflare.Source.V1SourceSup
alias Logflare.PubSubRates
alias Logflare.Logs.SourceRouting
alias Logflare.PubSubRates
alias Logflare.Backends.IngestEventQueue

setup do
start_supervised!(AllLogsLogged)
Expand Down Expand Up @@ -186,14 +188,14 @@ defmodule Logflare.BackendsTest do
end

test "can get length of queue", %{source: source} do
assert Backends.local_pending_buffer_len(source, nil) == 0
assert Backends.local_pending_buffer_len(source) == 0
assert Backends.get_and_cache_local_pending_buffer_len(source, nil) == 0
assert Backends.get_and_cache_local_pending_buffer_len(source) == 0
events = for _n <- 1..5, do: build(:log_event, source: source, some: "event")
assert {:ok, 5} = Backends.ingest_logs(events, source)
assert Backends.local_pending_buffer_len(source) > 0
assert Backends.get_and_cache_local_pending_buffer_len(source) > 0
# Producer will pop from the queue
:timer.sleep(1_500)
assert Backends.local_pending_buffer_len(source) == 0
assert Backends.get_and_cache_local_pending_buffer_len(source) == 0
end
end

Expand Down Expand Up @@ -370,6 +372,7 @@ defmodule Logflare.BackendsTest do
# - transformation of params to log events
# - BQ max insertion rate
@tag :benchmark
@tag :skip
test "BQ - v1 Logs vs v2 Logs vs v2 Backend", %{user: user} do
[source1, source2] = insert_pair(:source, user: user, rules: [])
# start_supervised!({Pipeline, [rls, name: @pipeline_name]})
Expand Down Expand Up @@ -404,6 +407,7 @@ defmodule Logflare.BackendsTest do
# This benchmarks two areas:
# - rules dispatching, with and without any rules
@tag :benchmark
@tag :skip
test "backend rules routing benchmarking", %{user: user} do
backend = insert(:backend, user: user)
[source1, source2] = insert_pair(:source, user: user, rules: [])
Expand Down Expand Up @@ -445,4 +449,51 @@ defmodule Logflare.BackendsTest do
)
end
end

@tag :benchmark
@tag timeout: :infinity
@tag :skip
# benchmark results:
# using the buffers cache results in >5899.70x higher ips for 50k inputs
# memory usage is 3.4x higher without cache
# reductions is 3455x higher without cache
test "local_pending_buffer_len" do
user = insert(:user)
source = insert(:source, user: user)
backend = insert(:backend, user: user)
{:ok, tid} = IngestEventQueue.upsert_tid({source, backend})
sb = {source.id, backend.id}

Benchee.run(
%{
"with cache" => fn {_input, _resource} ->
Backends.cached_local_pending_buffer_len(source, backend)
end,
"without caching" => fn {_input, _resource} ->
Backends.get_and_cache_local_pending_buffer_len(source, backend)
end
},
inputs: %{
"50k" => for(_ <- 1..50_000, do: build(:log_event)),
"10k" => for(_ <- 1..10_000, do: build(:log_event)),
"1k" => for(_ <- 1..1_000, do: build(:log_event))
},
# insert the batch
before_scenario: fn input ->
:ets.delete_all_objects(tid)
IngestEventQueue.add_to_table(sb, input)

PubSubRates.Cache.cache_buffers(source.id, backend.id, %{
Node.self() => %{len: length(input)}
})

{input, nil}
end,
time: 3,
warmup: 1,
memory_time: 3,
reduction_time: 3,
print: [configuration: false]
)
end
end
7 changes: 7 additions & 0 deletions test/logflare_web/plugs/buffer_limiter_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule LogflareWeb.Plugs.BufferLimiterTest do
use LogflareWeb.ConnCase
alias LogflareWeb.Plugs.BufferLimiter
alias Logflare.Backends.IngestEventQueue
alias Logflare.Backends

setup do
conn = build_conn(:post, "/api/logs", %{"message" => "some text"})
Expand All @@ -17,6 +18,9 @@ defmodule LogflareWeb.Plugs.BufferLimiterTest do
IngestEventQueue.add_to_table({source, nil}, [le])
end

# get and cache the value
Backends.get_and_cache_local_pending_buffer_len(source, nil)

conn =
conn
|> assign(:source, source)
Expand All @@ -33,6 +37,9 @@ defmodule LogflareWeb.Plugs.BufferLimiterTest do
IngestEventQueue.mark_ingested({source, nil}, [le])
end

# get and cache the value
Backends.get_and_cache_local_pending_buffer_len(source, nil)

conn =
conn
|> assign(:source, source)
Expand Down
1 change: 1 addition & 0 deletions test/support/conn_case.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ defmodule LogflareWeb.ConnCase do

on_exit(fn ->
Logflare.Backends.IngestEventQueue.delete_all_mappings()
Logflare.PubSubRates.Cache.clear()
end)

:ok
Expand Down
1 change: 1 addition & 0 deletions test/support/data_case.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ defmodule Logflare.DataCase do

on_exit(fn ->
Logflare.Backends.IngestEventQueue.delete_all_mappings()
Logflare.PubSubRates.Cache.clear()
end)

:ok
Expand Down

0 comments on commit bd8e7e6

Please sign in to comment.