Skip to content

Commit

Permalink
fix: BufferCounter removal and considation to BufferProducer
Browse files Browse the repository at this point in the history
  • Loading branch information
Ziinc committed May 2, 2024
1 parent 4cf5ada commit f88421f
Show file tree
Hide file tree
Showing 22 changed files with 248 additions and 565 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.6.5
1.6.6
12 changes: 12 additions & 0 deletions lib/logflare/backends.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ defmodule Logflare.Backends do
alias Logflare.Logs
alias Logflare.Logs.SourceRouting
alias Logflare.SystemMetrics
alias Logflare.PubSubRates
import Ecto.Query

@adaptor_mapping %{
Expand Down Expand Up @@ -379,6 +380,17 @@ defmodule Logflare.Backends do
end
end

@doc """
Retrieves cluster-wide buffer size stored in cache for a given backend/source combination.
"""
def buffer_len(%Source{} = source, backend \\ nil) do
if backend do
PubSubRates.Cache.get_cluster_buffers(source.token, backend.token)
else
PubSubRates.Cache.get_cluster_buffers(source.token)
end
end

@doc """
Lists the latest recent logs of all caches across the cluster.
Performs a check to ensure that the cache is started. If not started yet globally, it will start the cache locally.
Expand Down
22 changes: 5 additions & 17 deletions lib/logflare/backends/adaptor/bigquery_adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do
alias Logflare.Source.BigQuery.Pipeline
alias Logflare.Source.BigQuery.Schema
alias Logflare.Source.BigQuery.Pipeline
alias Logflare.Source.BigQuery.BufferCounter
alias Logflare.Users
alias Logflare.Billing
use Supervisor
Expand Down Expand Up @@ -35,17 +34,11 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do

with :ok <- Backends.register_backend_for_ingest_dispatch(source, backend) do
children = [
{BufferCounter,
[
source_id: source.id,
source_token: source.token,
backend_token: backend.token,
name: Backends.via_source(source, BufferCounter, backend.id)
]},
{Pipeline,
[
source: source,
backend_id: backend.id,
backend_token: backend.token,
bigquery_project_id: project_id,
bigquery_dataset_id: dataset_id,
name: Backends.via_source(source, Pipeline, backend.id)
Expand All @@ -70,25 +63,20 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do
backend_id = Keyword.get(opts, :backend_id)
source = Sources.Cache.get_by_id(source_id)

buffer_counter_via = Backends.via_source(source, {BufferCounter, backend_id})

messages =
for le <- log_events,
do: %Broadway.Message{
data: le,
acknowledger: {__MODULE__, buffer_counter_via, nil}
acknowledger: {__MODULE__, nil, nil}
}

with {:ok, _count} <- BufferCounter.inc(buffer_counter_via, Enum.count(messages)) do
Backends.via_source(source, {Pipeline, backend_id})
|> Broadway.push_messages(messages)
end
Backends.via_source(source, {Pipeline, backend_id})
|> Broadway.push_messages(messages)

:ok
end

def ack(via, successful, failed) do
BufferCounter.decr(via, Enum.count(successful) + Enum.count(failed))
def ack(_via, _successful, _failed) do
:ok
end

Expand Down
6 changes: 5 additions & 1 deletion lib/logflare/backends/adaptor/postgres_adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor do

import Ecto.Changeset

typedstruct enforce: true do
typedstruct do
field(:config, %{
url: String.t(),
schema: String.t(),
Expand All @@ -34,6 +34,8 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor do

field(:source, Source.t())
field(:backend, Backend.t())
field(:backend_token, String.t())
field(:source_token, atom())
field(:pipeline_name, tuple())
end

Expand Down Expand Up @@ -181,6 +183,8 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor do
state = %__MODULE__{
config: backend.config,
backend: backend,
backend_token: if(backend, do: backend.token, else: nil),
source_token: source.token,
source: source,
pipeline_name: Backends.via_source(source, Pipeline, backend.id)
}
Expand Down
6 changes: 4 additions & 2 deletions lib/logflare/backends/adaptor/postgres_adaptor/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,16 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor.Pipeline do

alias Broadway.Message
alias Logflare.Backends.Adaptor.PostgresAdaptor
alias Logflare.Buffers.BufferProducer
alias Logflare.Backends.BufferProducer

@spec start_link(PostgresAdaptor.t()) :: {:ok, pid()}
def start_link(adaptor_state) do
Broadway.start_link(__MODULE__,
name: adaptor_state.pipeline_name,
producer: [
module: {BufferProducer, []},
module:
{BufferProducer,
[source_token: adaptor_state.source_token, backend_token: adaptor_state.backend_token]},
transformer: {__MODULE__, :transform, []},
concurrency: 1
],
Expand Down
15 changes: 12 additions & 3 deletions lib/logflare/backends/adaptor/webhook_adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do

@behaviour Logflare.Backends.Adaptor

typedstruct enforce: true do
typedstruct do
field(:config, %{
url: String.t(),
headers: map()
})

field(:backend, Backend.t())
field(:pipeline_name, tuple())
field(:backend_token, String.t())
field(:source_token, atom())
end

# API
Expand Down Expand Up @@ -64,6 +66,8 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do
state = %__MODULE__{
config: backend.config,
backend: backend,
backend_token: if(backend, do: backend.token, else: nil),
source_token: source.token,
pipeline_name: Backends.via_source(source, __MODULE__.Pipeline, backend.id)
}

Expand Down Expand Up @@ -92,7 +96,7 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do
@moduledoc false
use Broadway
alias Broadway.Message
alias Logflare.Buffers.BufferProducer
alias Logflare.Backends.BufferProducer
alias Logflare.Backends.Adaptor.WebhookAdaptor
alias Logflare.Backends.Adaptor.WebhookAdaptor.Client

Expand All @@ -105,7 +109,12 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do
fullsweep_after: 100
],
producer: [
module: {BufferProducer, []},
module:
{BufferProducer,
[
source_token: adaptor_state.source_token,
backend_token: adaptor_state.backend_token
]},
transformer: {__MODULE__, :transform, []},
concurrency: 1
],
Expand Down
115 changes: 115 additions & 0 deletions lib/logflare/backends/buffer_producer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
defmodule Logflare.Backends.BufferProducer do
@moduledoc """
A generic broadway producer that doesn't actually produce anything.
Meant for push through Broadway.push_messages/2
"""
use GenStage
alias Logflare.Source
alias Logflare.PubSubRates

@default_broadcast_interval 2_000

def start_link(opts) when is_list(opts) do
GenStage.start_link(__MODULE__, opts)
end

def init(opts) do
state =
Enum.into(opts, %{
buffer_module: nil,
buffer_pid: nil,
demand: 0,
# TODO: broadcast by id instead.
source_token: nil,
backend_token: nil,
broadcast_interval: @default_broadcast_interval,
last_broadcast: DateTime.utc_now()
})

loop(state.broadcast_interval)
{:producer, state, buffer_size: 10_000}
end

def handle_info(:resolve, state) do
{items, state} = resolve_demand(state)
{:noreply, items, state}
end

def handle_info(:broadcast, state) do
do_async_broadcast(state)
loop(state.broadcast_interval)
{:noreply, [], state}
end

def handle_info({:update_state, new_state}, _state) do
{:noreply, [], new_state}
end

def handle_info({:add_to_buffer, items}, state) do
{:noreply, items, state}
end

defp do_async_broadcast(state) do
pid = self()

Task.start_link(fn ->
pool_size = Application.get_env(:logflare, Logflare.PubSub)[:pool_size]
len = GenStage.estimate_buffered_count(pid)
local_buffer = %{Node.self() => %{len: len}}

last_broadcast = state.last_broadcast
shard = :erlang.phash2(state.source_token, pool_size)

cluster_buffer = PubSubRates.Cache.get_cluster_buffers(state.source_token)

# maybe broadcast
payload = %{
buffer: cluster_buffer,
source_token: state.source_token,
backend_token: state.backend_token
}

if len != 0 or
DateTime.diff(DateTime.utc_now(), last_broadcast, :millisecond) >
state.broadcast_interval * 2 do
# cluster broadcast
cluster_broadcast_payload =
if state.backend_token do
{:buffers, state.source_token, state.backend_token, local_buffer}
else
{:buffers, state.source_token, local_buffer}
end

Phoenix.PubSub.broadcast(
Logflare.PubSub,
"buffers:shard-#{shard}",
cluster_broadcast_payload
)

# channel broadcast
Source.ChannelTopics.broadcast_buffer(payload)

state = %{state | last_broadcast: DateTime.utc_now()}
send(pid, {:update_state, state})
end
end)
end

def handle_demand(demand, state) do
{items, state} = resolve_demand(state, demand)
{:noreply, items, state}
end

defp resolve_demand(
%{demand: prev_demand} = state,
new_demand \\ 0
) do
total_demand = prev_demand + new_demand
{[], %{state | demand: total_demand}}
end

defp loop(interval) do
Process.send_after(self(), :broadcast, interval)
end
end
64 changes: 0 additions & 64 deletions lib/logflare/buffers/buffer.ex

This file was deleted.

33 changes: 0 additions & 33 deletions lib/logflare/buffers/buffer_producer.ex

This file was deleted.

Loading

0 comments on commit f88421f

Please sign in to comment.