Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: reorganise buffers behaviour #1755

Merged
merged 1 commit into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions lib/logflare/backends.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule Logflare.Backends do
alias Logflare.Backends.SourcesSup
alias Logflare.Backends.SourceSup

alias Logflare.Buffers.Buffer
alias Logflare.Buffers.MemoryBuffer
alias Logflare.LogEvent
alias Logflare.Repo
Expand Down Expand Up @@ -129,10 +130,10 @@ defmodule Logflare.Backends do
The ingestion pipeline then pulls from the buffer and dispatches log events to the correct backends.
"""
@type log_param :: map()
@spec ingest_logs(list(log_param()), Source.t()) :: :ok
@spec ingest_logs([log_param()], Source.t()) :: :ok
def ingest_logs(log_events, source) do
via = via_source(source, :buffer)
MemoryBuffer.add_many(via, log_events)
Buffer.add_many(MemoryBuffer, via, log_events)
:ok
end

Expand Down
3 changes: 2 additions & 1 deletion lib/logflare/backends/adaptor/postgres_adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor do
alias Logflare.Backends
alias Logflare.Backends.SourceBackend
alias Logflare.Backends.SourceDispatcher
alias Logflare.Buffers.Buffer
alias Logflare.Buffers.MemoryBuffer

@behaviour Logflare.Backends.Adaptor
Expand Down Expand Up @@ -160,7 +161,7 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor do

@impl GenServer
def handle_call({:ingest, log_events}, _from, %{config: _config} = state) do
MemoryBuffer.add_many(state.buffer_pid, log_events)
Buffer.add_many(state.buffer_module, state.buffer_pid, log_events)
{:reply, :ok, state}
end
end
3 changes: 2 additions & 1 deletion lib/logflare/backends/adaptor/webhook_adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do
alias Logflare.Backends.Adaptor.WebhookAdaptor
alias Logflare.Backends.SourceBackend
alias Logflare.Backends.SourceDispatcher
alias Logflare.Buffers.Buffer
alias Logflare.Buffers.MemoryBuffer

@behaviour Logflare.Backends.Adaptor
Expand Down Expand Up @@ -71,7 +72,7 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do
@impl GenServer
def handle_call({:ingest, log_events}, _from, %{config: _config} = state) do
# TODO: queue, send concurrently
MemoryBuffer.add_many(state.buffer_pid, log_events)
Buffer.add_many(state.buffer_module, state.buffer_pid, log_events)
{:reply, :ok, state}
end

Expand Down
44 changes: 37 additions & 7 deletions lib/logflare/buffers/buffer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ defmodule Logflare.Buffers.Buffer do
@moduledoc """
Defines a behaviour for a buffer.
"""
@doc """
Adds a payload to the buffer.
"""
@callback add(identifier(), payload :: term()) :: :ok

@doc """
Adds a list of payloads to the buffer.
Expand All @@ -23,12 +19,46 @@ defmodule Logflare.Buffers.Buffer do
@callback length(identifier()) :: non_neg_integer()

@doc """
Returns one item from the buffer
Returns multiple items from the buffer
"""
@callback pop_many(identifier(), non_neg_integer()) :: [term()]

@doc """
Adds payload to the buffer.
"""
@spec add(module(), identifier(), term()) :: :ok
def add(mod, ident, payload),
do: mod.add_many(ident, [payload])

@doc """
Adds a list of payloads to the buffer.
"""
@callback pop(identifier) :: term()
@spec add_many(module(), identifier(), [term()]) :: :ok
def add_many(mod, ident, payloads) when is_list(payloads),
do: mod.add_many(ident, payloads)

@doc """
Clears the buffer and removes all enqueued items.
"""
@spec clear(module(), identifier()) :: :ok
def clear(mod, ident), do: mod.clear(ident)

@doc """
Returns the length of the buffer
"""
@spec length(module(), identifier()) :: non_neg_integer()
def length(mod, ident), do: mod.length(ident)

@doc """
Returns single item from the buffer
"""
@spec pop(module(), identifier()) :: term()
def pop(mod, ident), do: mod.pop_many(ident, 1)

@doc """
Returns multiple items from the buffer
"""
@callback pop_many(identifier(), non_neg_integer()) :: [term()]
@spec pop_many(module(), identifier(), non_neg_integer()) :: [term()]
def pop_many(mod, ident, count) when is_integer(count) and count > 0,
do: mod.pop_many(ident, count)
end
6 changes: 4 additions & 2 deletions lib/logflare/buffers/buffer_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ defmodule Logflare.Buffers.BufferProducer do
"""
use GenStage

alias Logflare.Buffers.Buffer

def start_link(opts) do
GenStage.start_link(__MODULE__, opts)
end
Expand Down Expand Up @@ -33,11 +35,11 @@ defmodule Logflare.Buffers.BufferProducer do
end

defp resolve_demand(
%{buffer_module: module, buffer_pid: pid, demand: prev_demand} = state,
%{demand: prev_demand} = state,
new_demand \\ 0
) do
total_demand = prev_demand + new_demand
{:ok, items} = module.pop_many(pid, total_demand)
{:ok, items} = Buffer.pop_many(state.buffer_module, state.buffer_pid, total_demand)

{items, %{state | demand: total_demand - length(items)}}
end
Expand Down
67 changes: 32 additions & 35 deletions lib/logflare/buffers/memory_buffer.ex
Original file line number Diff line number Diff line change
@@ -1,66 +1,65 @@
defmodule Logflare.Buffers.MemoryBuffer do
@moduledoc """
This is an implementation of an in-memory buffer, using `:queue`.any()
All operations are syncronous.
All operations are synchronous.
"""
alias Logflare.{Buffers.Buffer, Buffers.MemoryBuffer}
use GenServer
@behaviour Buffer

# GenServer state and init callbacks
defstruct queue: nil

def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, [], opts)
end
@behaviour Logflare.Buffers.Buffer

@impl true
def init(_) do
{:ok, %MemoryBuffer{queue: :queue.new()}}
end
use GenServer
use TypedStruct

# API

@impl Buffer
def add(identifier, payload) do
GenServer.call(identifier, {:add, [payload]})
end

@impl Buffer
@impl Logflare.Buffers.Buffer
def add_many(identifier, payloads) do
GenServer.call(identifier, {:add, payloads})
end

@impl Buffer
def pop(identifier) do
GenServer.call(identifier, {:pop, 1})
end

@impl Buffer
@impl Logflare.Buffers.Buffer
def pop_many(identifier, number) do
GenServer.call(identifier, {:pop, number})
end

@impl Buffer
@impl Logflare.Buffers.Buffer
def clear(identifier) do
GenServer.call(identifier, :clear)
end

@impl Buffer
@impl Logflare.Buffers.Buffer
def length(identifier) do
GenServer.call(identifier, :length)
end

# GenServer state and init callbacks
typedstruct module: State do
@moduledoc false

field :queue, :queue.queue() | nil, default: :queue.new()
field :proc_name, atom() | binary() | pid(), enforce: true
field :size, non_neg_integer(), default: 0
end

def start_link(opts \\ []) do
proc_name = opts[:proc_name] || opts[:name]

GenServer.start_link(__MODULE__, %{proc_name: proc_name}, opts)
end

@impl GenServer
def init(%{proc_name: proc_name}) do
{:ok, %State{proc_name: proc_name || self()}}
end

# GenServer callbacks

@impl true
@impl GenServer
def handle_call({:add, payloads}, _from, state) do
to_join = :queue.from_list(payloads)
new_queue = :queue.join(state.queue, to_join)
{:reply, :ok, %{state | queue: new_queue}}
{:reply, :ok, %State{state | queue: new_queue}}
end

@impl true
def handle_call({:pop, number}, _from, state) do
{items, new_queue} =
case :queue.len(state.queue) do
Expand All @@ -75,15 +74,13 @@ defmodule Logflare.Buffers.MemoryBuffer do
{:queue.to_list(popped_queue), queue}
end

{:reply, {:ok, items}, %{state | queue: new_queue}}
{:reply, {:ok, items}, %State{state | queue: new_queue}}
end

@impl true
def handle_call(:clear, _from, state) do
{:reply, :ok, %{state | queue: :queue.new()}}
{:reply, :ok, %State{state | queue: :queue.new()}}
end

@impl true
def handle_call(:length, _from, state) do
{:reply, :queue.len(state.queue), state}
end
Expand Down
29 changes: 16 additions & 13 deletions test/logflare/buffers/memory_buffer_test.exs
Original file line number Diff line number Diff line change
@@ -1,38 +1,41 @@
defmodule Logflare.Queues.MemoryBufferTest do
@moduledoc false
use ExUnit.Case, async: true
alias Logflare.Buffers.MemoryBuffer

alias Logflare.Buffers.Buffer

@subject Logflare.Buffers.MemoryBuffer

@job %{some: "job"}

setup do
pid = start_supervised!(MemoryBuffer)
pid = start_supervised!(@subject)
{:ok, pid: pid}
end

test "can enqueue jobs", %{pid: pid} do
assert :ok = MemoryBuffer.add(pid, @job)
assert :ok = MemoryBuffer.add_many(pid, [@job])
assert MemoryBuffer.length(pid) == 2
assert :ok = Buffer.add(@subject, pid, @job)
assert :ok = Buffer.add_many(@subject, pid, [@job])
assert Buffer.length(@subject, pid) == 2
end

test "can pop n jobs from queue", %{pid: pid} do
job1 = %{some: "job1"}
job2 = %{some: "job2"}
job3 = %{some: "job3"}
job4 = %{some: "job4"}
assert :ok = MemoryBuffer.add_many(pid, [job1, job2])
assert :ok = MemoryBuffer.add_many(pid, [job3, job4])
assert :ok = Buffer.add_many(@subject, pid, [job1, job2])
assert :ok = Buffer.add_many(@subject, pid, [job3, job4])

# should use fifo
assert {:ok, [%{some: "job1"}, %{some: "job2"}]} = MemoryBuffer.pop_many(pid, 2)
assert {:ok, [%{some: "job3"}, %{some: "job4"}]} = MemoryBuffer.pop_many(pid, 5)
assert {:ok, []} = MemoryBuffer.pop_many(pid, 5)
assert {:ok, [%{some: "job1"}, %{some: "job2"}]} = Buffer.pop_many(@subject, pid, 2)
assert {:ok, [%{some: "job3"}, %{some: "job4"}]} = Buffer.pop_many(@subject, pid, 5)
assert {:ok, []} = Buffer.pop_many(@subject, pid, 5)
end

test "can clear jobs from queue", %{pid: pid} do
assert :ok = MemoryBuffer.add(pid, @job)
MemoryBuffer.clear(pid)
assert MemoryBuffer.length(pid) == 0
assert :ok = Buffer.add(@subject, pid, @job)
Buffer.clear(@subject, pid)
assert Buffer.length(@subject, pid) == 0
end
end
Loading