Skip to content

Commit

Permalink
perf: initial profling scripts of v1 and v2 ingestion (#2111)
Browse files Browse the repository at this point in the history
* perf: initial profling scripts of v1 and v2 ingestion

* chore: fix failing test
  • Loading branch information
Ziinc authored Jun 24, 2024
1 parent abb4f9e commit 11dcb69
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 36 deletions.
35 changes: 16 additions & 19 deletions lib/logflare/backends.ex
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ defmodule Logflare.Backends do
@spec ingest_logs([log_param()], Source.t()) :: :ok
@spec ingest_logs([log_param()], Source.t(), Backend.t() | nil) :: :ok
def ingest_logs(event_params, source, backend \\ nil) do
:ok = Source.Supervisor.ensure_started(source)
{log_events, errors} = split_valid_events(source, event_params)
count = Enum.count(log_events)
increment_counters(source, count)
Expand All @@ -230,34 +229,32 @@ defmodule Logflare.Backends do
defp split_valid_events(source, event_params) do
event_params
|> Enum.reduce({[], []}, fn param, {events, errors} ->
le =
param
|> case do
%LogEvent{source: %Source{}} = le ->
le
param
|> case do
%LogEvent{source: %Source{}} = le ->
le

%LogEvent{} = le ->
%{le | source: source}
%LogEvent{} = le ->
%{le | source: source}

param ->
LogEvent.make(param, %{source: source})
end
|> Logs.maybe_mark_le_dropped_by_lql()
|> LogEvent.apply_custom_event_message()

cond do
le.drop ->
param ->
LogEvent.make(param, %{source: source})
end
|> Logs.maybe_mark_le_dropped_by_lql()
|> LogEvent.apply_custom_event_message()
|> case do
%{drop: true} = le ->
do_telemetry(:drop, le)
{events, errors}

not le.valid ->
%{valid: false} = le ->
do_telemetry(:invalid, le)
{events, errors}

le.pipeline_error ->
%{pipeline_error: err} = le when err != nil ->
{events, [le.pipeline_error.message | errors]}

true ->
le ->
{[le | events], errors}
end
end)
Expand Down
24 changes: 8 additions & 16 deletions lib/logflare/logs/logs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,15 @@ defmodule Logflare.Logs do
le
end

def maybe_mark_le_dropped_by_lql(%LE{source: %{drop_lql_string: drop_lql_string}} = le)
when is_nil(drop_lql_string) do
le
end
def maybe_mark_le_dropped_by_lql(%LE{source: %{drop_lql_string: nil}} = le), do: le
def maybe_mark_le_dropped_by_lql(%LE{source: %{drop_lql_string: ""}} = le), do: le
def maybe_mark_le_dropped_by_lql(%LE{source: %{drop_lql_filters: []}} = le), do: le

def maybe_mark_le_dropped_by_lql(
%LE{body: _body, source: %{drop_lql_string: drop_lql_string, drop_lql_filters: filters}} =
le
)
when is_binary(drop_lql_string) do
cond do
not Enum.empty?(filters) and
SourceRouting.route_with_lql_rules?(le, %Rule{lql_filters: filters}) ->
Map.put(le, :drop, true)

true ->
le
def maybe_mark_le_dropped_by_lql(%LE{source: %{drop_lql_filters: filters}} = le) do
if SourceRouting.route_with_lql_rules?(le, %Rule{lql_filters: filters}) do
%{le | drop: true}
else
le
end
end

Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ defmodule Logflare.Mixfile do

# Test
{:mix_test_watch, "~> 1.0", only: [:dev, :test], runtime: false},
{:mimic, "~> 1.0", only: :test},
{:mimic, "~> 1.0", only: [:dev, :test]},
{:stream_data, "~> 0.6.0", only: [:dev, :test]},

# Pagination
Expand Down
34 changes: 34 additions & 0 deletions test/profiling/ingest_logs.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
alias Logflare.Sources

Mimic.copy(Broadway)

Mimic.stub(Broadway, :push_messages, fn _, _ -> :ok end)

ver = System.argv() |> Enum.at(0)

source =
Sources.get(:"f74e843a-e09d-42e1-b2bc-1915e75b53c5")
|> Sources.refresh_source_metrics_for_ingest()
|> Sources.preload_defaults()
|> case do
s when ver == "v1" -> %{s | v2_pipeline: false}
s when ver == "v2" -> %{s | v2_pipeline: true}
end

:ok = Logflare.Source.Supervisor.ensure_started(source)

batch = for _ <- 1..1000, do: %{message: "some message"}

if ver == "v1" do
Logflare.Logs.ingest_logs(batch, source)
else
Logflare.Backends.ingest_logs(batch, source)
end

# Current: 2024-06-02 v1
# CNT ACC (ms) OWN (ms)
# 2,085,049 3441.462 3439.073

# Current: 2024-06-02 v2
# CNT ACC (ms) OWN (ms)
# 1,706,387 2816.090 2810.488

0 comments on commit 11dcb69

Please sign in to comment.