Skip to content

Commit

Permalink
chore: reorganise buffers behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
hauleth committed Oct 12, 2023
1 parent dbc5bf0 commit 9cf9dc5
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 58 deletions.
5 changes: 3 additions & 2 deletions lib/logflare/backends.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule Logflare.Backends do
alias Logflare.Backends.SourcesSup
alias Logflare.Backends.SourceSup

alias Logflare.Buffers.Buffer
alias Logflare.Buffers.MemoryBuffer
alias Logflare.LogEvent
alias Logflare.Repo
Expand Down Expand Up @@ -127,10 +128,10 @@ defmodule Logflare.Backends do
The ingestion pipeline then pulls from the buffer and dispatches log events to the correct backends.
"""
@type log_param :: map()
@spec ingest_logs(list(log_param()), Source.t()) :: :ok
@spec ingest_logs([log_param()], Source.t()) :: :ok
def ingest_logs(log_events, source) do
via = via_source(source, :buffer)
MemoryBuffer.add_many(via, log_events)
Buffer.add_many(MemoryBuffer, via, log_events)
:ok
end

Expand Down
3 changes: 2 additions & 1 deletion lib/logflare/backends/adaptor/postgres_adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor do
alias Logflare.Backends
alias Logflare.Backends.SourceBackend
alias Logflare.Backends.SourceDispatcher
alias Logflare.Buffers.Buffer
alias Logflare.Buffers.MemoryBuffer

import Ecto.Changeset
Expand Down Expand Up @@ -159,7 +160,7 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor do

@impl GenServer
def handle_call({:ingest, log_events}, _from, %{config: _config} = state) do
MemoryBuffer.add_many(state.buffer_pid, log_events)
Buffer.add_many(state.buffer_module, state.buffer_pid, log_events)
{:reply, :ok, state}
end
end
3 changes: 2 additions & 1 deletion lib/logflare/backends/adaptor/webhook_adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do
alias Logflare.Backends.Adaptor.WebhookAdaptor
alias Logflare.Backends.SourceBackend
alias Logflare.Backends.SourceDispatcher
alias Logflare.Buffers.Buffer
alias Logflare.Buffers.MemoryBuffer

typedstruct enforce: true do
Expand Down Expand Up @@ -66,7 +67,7 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do
@impl true
def handle_call({:ingest, log_events}, _from, %{config: _config} = state) do
# TODO: queue, send concurrently
MemoryBuffer.add_many(state.buffer_pid, log_events)
Buffer.add_many(state.buffer_module, state.buffer_pid, log_events)
{:reply, :ok, state}
end

Expand Down
44 changes: 37 additions & 7 deletions lib/logflare/buffers/buffer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ defmodule Logflare.Buffers.Buffer do
@moduledoc """
Defines a behaviour for a buffer.
"""
@doc """
Adds a payload to the buffer.
"""
@callback add(identifier(), payload :: term()) :: :ok

@doc """
Adds a list of payloads to the buffer.
Expand All @@ -23,12 +19,46 @@ defmodule Logflare.Buffers.Buffer do
@callback length(identifier()) :: non_neg_integer()

@doc """
Returns one item from the buffer
Returns multiple items from the buffer
"""
@callback pop_many(identifier(), non_neg_integer()) :: [term()]

@doc """
Adds payload to the buffer.
"""
@spec add(module(), identifier(), term()) :: :ok
def add(mod, ident, payload),
do: mod.add_many(ident, [payload])

@doc """
Adds a list of payloads to the buffer.
"""
@callback pop(identifier) :: term()
@spec add_many(module(), identifier(), [term()]) :: :ok
def add_many(mod, ident, payloads) when is_list(payloads),
do: mod.add_many(ident, payloads)

@doc """
Clears the buffer and removes all enqueued items.
"""
@spec clear(module(), identifier()) :: :ok
def clear(mod, ident), do: mod.clear(ident)

@doc """
Returns the length of the buffer
"""
@spec length(module(), identifier()) :: non_neg_integer()
def length(mod, ident), do: mod.length(ident)

@doc """
Returns single item from the buffer
"""
@spec pop(module(), identifier()) :: term()
def pop(mod, ident), do: mod.pop_many(ident, 1)

@doc """
Returns multiple items from the buffer
"""
@callback pop_many(identifier(), non_neg_integer()) :: [term()]
@spec pop_many(module(), identifier(), non_neg_integer()) :: [term()]
def pop_many(mod, ident, count) when is_integer(count) and count > 0,
do: mod.pop_many(ident, count)
end
6 changes: 4 additions & 2 deletions lib/logflare/buffers/buffer_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ defmodule Logflare.Buffers.BufferProducer do
"""
use GenStage

alias Logflare.Buffers.Buffer

def start_link(opts) do
GenStage.start_link(__MODULE__, opts)
end
Expand Down Expand Up @@ -33,11 +35,11 @@ defmodule Logflare.Buffers.BufferProducer do
end

defp resolve_demand(
%{buffer_module: module, buffer_pid: pid, demand: prev_demand} = state,
%{demand: prev_demand} = state,
new_demand \\ 0
) do
total_demand = prev_demand + new_demand
{:ok, items} = module.pop_many(pid, total_demand)
{:ok, items} = Buffer.pop_many(state.buffer_module, state.buffer_pid, total_demand)

{items, %{state | demand: total_demand - length(items)}}
end
Expand Down
54 changes: 22 additions & 32 deletions lib/logflare/buffers/memory_buffer.ex
Original file line number Diff line number Diff line change
@@ -1,66 +1,58 @@
defmodule Logflare.Buffers.MemoryBuffer do
@moduledoc """
This is an implementation of an in-memory buffer, using `:queue`.any()
All operations are syncronous.
All operations are synchronous.
"""
alias Logflare.{Buffers.Buffer, Buffers.MemoryBuffer}
use GenServer
@behaviour Buffer

# GenServer state and init callbacks
defstruct queue: nil

def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, [], opts)
end
@behaviour Logflare.Buffers.Buffer

@impl true
def init(_) do
{:ok, %MemoryBuffer{queue: :queue.new()}}
end
use GenServer

# API

@impl Buffer
def add(identifier, payload) do
GenServer.call(identifier, {:add, [payload]})
end

@impl Buffer
@impl Logflare.Buffers.Buffer
def add_many(identifier, payloads) do
GenServer.call(identifier, {:add, payloads})
end

@impl Buffer
def pop(identifier) do
GenServer.call(identifier, {:pop, 1})
end

@impl Buffer
@impl Logflare.Buffers.Buffer
def pop_many(identifier, number) do
GenServer.call(identifier, {:pop, number})
end

@impl Buffer
@impl Logflare.Buffers.Buffer
def clear(identifier) do
GenServer.call(identifier, :clear)
end

@impl Buffer
@impl Logflare.Buffers.Buffer
def length(identifier) do
GenServer.call(identifier, :length)
end

# GenServer state and init callbacks
defstruct queue: nil, size: 0, proc_name: nil

def start_link(opts \\ []) do
proc_name = opts[:proc_name] || opts[:name]

GenServer.start_link(__MODULE__, %{proc_name: proc_name}, opts)
end

@impl GenServer
def init(%{proc_name: proc_name}) do
{:ok, %__MODULE__{queue: :queue.new(), proc_name: proc_name || inspect(self())}}
end

# GenServer callbacks

@impl true
@impl GenServer
def handle_call({:add, payloads}, _from, state) do
to_join = :queue.from_list(payloads)
new_queue = :queue.join(state.queue, to_join)
{:reply, :ok, %{state | queue: new_queue}}
end

@impl true
def handle_call({:pop, number}, _from, state) do
{items, new_queue} =
case :queue.len(state.queue) do
Expand All @@ -78,12 +70,10 @@ defmodule Logflare.Buffers.MemoryBuffer do
{:reply, {:ok, items}, %{state | queue: new_queue}}
end

@impl true
def handle_call(:clear, _from, state) do
{:reply, :ok, %{state | queue: :queue.new()}}
end

@impl true
def handle_call(:length, _from, state) do
{:reply, :queue.len(state.queue), state}
end
Expand Down
29 changes: 16 additions & 13 deletions test/logflare/buffers/memory_buffer_test.exs
Original file line number Diff line number Diff line change
@@ -1,38 +1,41 @@
defmodule Logflare.Queues.MemoryBufferTest do
@moduledoc false
use ExUnit.Case, async: true
alias Logflare.Buffers.MemoryBuffer

alias Logflare.Buffers.Buffer

@subject Logflare.Buffers.MemoryBuffer

@job %{some: "job"}

setup do
pid = start_supervised!(MemoryBuffer)
pid = start_supervised!(@subject)
{:ok, pid: pid}
end

test "can enqueue jobs", %{pid: pid} do
assert :ok = MemoryBuffer.add(pid, @job)
assert :ok = MemoryBuffer.add_many(pid, [@job])
assert MemoryBuffer.length(pid) == 2
assert :ok = Buffer.add(@subject, pid, @job)
assert :ok = Buffer.add_many(@subject, pid, [@job])
assert Buffer.length(@subject, pid) == 2
end

test "can pop n jobs from queue", %{pid: pid} do
job1 = %{some: "job1"}
job2 = %{some: "job2"}
job3 = %{some: "job3"}
job4 = %{some: "job4"}
assert :ok = MemoryBuffer.add_many(pid, [job1, job2])
assert :ok = MemoryBuffer.add_many(pid, [job3, job4])
assert :ok = Buffer.add_many(@subject, pid, [job1, job2])
assert :ok = Buffer.add_many(@subject, pid, [job3, job4])

# should use fifo
assert {:ok, [%{some: "job1"}, %{some: "job2"}]} = MemoryBuffer.pop_many(pid, 2)
assert {:ok, [%{some: "job3"}, %{some: "job4"}]} = MemoryBuffer.pop_many(pid, 5)
assert {:ok, []} = MemoryBuffer.pop_many(pid, 5)
assert {:ok, [%{some: "job1"}, %{some: "job2"}]} = Buffer.pop_many(@subject, pid, 2)
assert {:ok, [%{some: "job3"}, %{some: "job4"}]} = Buffer.pop_many(@subject, pid, 5)
assert {:ok, []} = Buffer.pop_many(@subject, pid, 5)
end

test "can clear jobs from queue", %{pid: pid} do
assert :ok = MemoryBuffer.add(pid, @job)
MemoryBuffer.clear(pid)
assert MemoryBuffer.length(pid) == 0
assert :ok = Buffer.add(@subject, pid, @job)
Buffer.clear(@subject, pid)
assert Buffer.length(@subject, pid) == 0
end
end

0 comments on commit 9cf9dc5

Please sign in to comment.