diff --git a/lib/logflare/backends.ex b/lib/logflare/backends.ex index 517a800cf..f83acd1ed 100644 --- a/lib/logflare/backends.ex +++ b/lib/logflare/backends.ex @@ -217,7 +217,6 @@ defmodule Logflare.Backends do @spec ingest_logs([log_param()], Source.t()) :: :ok @spec ingest_logs([log_param()], Source.t(), Backend.t() | nil) :: :ok def ingest_logs(event_params, source, backend \\ nil) do - :ok = Source.Supervisor.ensure_started(source) {log_events, errors} = split_valid_events(source, event_params) count = Enum.count(log_events) increment_counters(source, count) @@ -230,34 +229,32 @@ defmodule Logflare.Backends do defp split_valid_events(source, event_params) do event_params |> Enum.reduce({[], []}, fn param, {events, errors} -> - le = - param - |> case do - %LogEvent{source: %Source{}} = le -> - le + param + |> case do + %LogEvent{source: %Source{}} = le -> + le - %LogEvent{} = le -> - %{le | source: source} + %LogEvent{} = le -> + %{le | source: source} - param -> - LogEvent.make(param, %{source: source}) - end - |> Logs.maybe_mark_le_dropped_by_lql() - |> LogEvent.apply_custom_event_message() - - cond do - le.drop -> + param -> + LogEvent.make(param, %{source: source}) + end + |> Logs.maybe_mark_le_dropped_by_lql() + |> LogEvent.apply_custom_event_message() + |> case do + %{drop: true} = le -> do_telemetry(:drop, le) {events, errors} - not le.valid -> + %{valid: false} = le -> do_telemetry(:invalid, le) {events, errors} - le.pipeline_error -> + %{pipeline_error: err} = le when err != nil -> {events, [le.pipeline_error.message | errors]} - true -> + le -> {[le | events], errors} end end) diff --git a/lib/logflare/logs/logs.ex b/lib/logflare/logs/logs.ex index 319cad4f2..f437e8da8 100644 --- a/lib/logflare/logs/logs.ex +++ b/lib/logflare/logs/logs.ex @@ -69,23 +69,15 @@ defmodule Logflare.Logs do le end - def maybe_mark_le_dropped_by_lql(%LE{source: %{drop_lql_string: drop_lql_string}} = le) - when is_nil(drop_lql_string) do - le - end + def maybe_mark_le_dropped_by_lql(%LE{source: %{drop_lql_string: nil}} = le), do: le + def maybe_mark_le_dropped_by_lql(%LE{source: %{drop_lql_string: ""}} = le), do: le + def maybe_mark_le_dropped_by_lql(%LE{source: %{drop_lql_filters: []}} = le), do: le - def maybe_mark_le_dropped_by_lql( - %LE{body: _body, source: %{drop_lql_string: drop_lql_string, drop_lql_filters: filters}} = - le - ) - when is_binary(drop_lql_string) do - cond do - not Enum.empty?(filters) and - SourceRouting.route_with_lql_rules?(le, %Rule{lql_filters: filters}) -> - Map.put(le, :drop, true) - - true -> - le + def maybe_mark_le_dropped_by_lql(%LE{source: %{drop_lql_filters: filters}} = le) do + if SourceRouting.route_with_lql_rules?(le, %Rule{lql_filters: filters}) do + %{le | drop: true} + else + le end end diff --git a/mix.exs b/mix.exs index 42b8755d3..2cf9698b1 100644 --- a/mix.exs +++ b/mix.exs @@ -132,7 +132,7 @@ defmodule Logflare.Mixfile do # Test {:mix_test_watch, "~> 1.0", only: [:dev, :test], runtime: false}, - {:mimic, "~> 1.0", only: :test}, + {:mimic, "~> 1.0", only: [:dev, :test]}, {:stream_data, "~> 0.6.0", only: [:dev, :test]}, # Pagination diff --git a/test/profiling/ingest_logs.exs b/test/profiling/ingest_logs.exs new file mode 100644 index 000000000..6fdcf7085 --- /dev/null +++ b/test/profiling/ingest_logs.exs @@ -0,0 +1,34 @@ +alias Logflare.Sources + +Mimic.copy(Broadway) + +Mimic.stub(Broadway, :push_messages, fn _, _ -> :ok end) + +ver = System.argv() |> Enum.at(0) + +source = + Sources.get(:"f74e843a-e09d-42e1-b2bc-1915e75b53c5") + |> Sources.refresh_source_metrics_for_ingest() + |> Sources.preload_defaults() + |> case do + s when ver == "v1" -> %{s | v2_pipeline: false} + s when ver == "v2" -> %{s | v2_pipeline: true} + end + +:ok = Logflare.Source.Supervisor.ensure_started(source) + +batch = for _ <- 1..1000, do: %{message: "some message"} + +if ver == "v1" do + Logflare.Logs.ingest_logs(batch, source) +else + Logflare.Backends.ingest_logs(batch, source) +end + +# Current: 2024-06-02 v1 +# CNT ACC (ms) OWN (ms) +# 2,085,049 3441.462 3439.073 + +# Current: 2024-06-02 v2 +# CNT ACC (ms) OWN (ms) +# 1,706,387 2816.090 2810.488