Skip to content

Commit

Permalink
Merge pull request #1681 from Logflare/staging
Browse files Browse the repository at this point in the history
Release v1.4.1
  • Loading branch information
chasers authored Aug 9, 2023
2 parents 1187ae2 + 8231835 commit 33d0f48
Show file tree
Hide file tree
Showing 13 changed files with 147 additions and 70 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.4.0
1.4.1
52 changes: 45 additions & 7 deletions lib/logflare/logs/log_event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,22 @@ defmodule Logflare.LogEvent do
@primary_key {:id, :binary_id, []}
typed_embedded_schema do
field :body, :map, default: %{}
embeds_one :source, Source
field :valid, :boolean
field :drop, :boolean, default: false
field :is_from_stale_query, :boolean
field :validation_error, {:array, :string}
field :ingested_at, :utc_datetime_usec
field :sys_uint, :integer
field :params, :map
field :origin_source_id, Ecto.UUID.Atom
field :via_rule, :map

embeds_one :source, Source

embeds_one :pipeline_error, PipelineError do
field :stage, :string
field :type, :string
field :message, :string
end
end

@doc """
Expand All @@ -41,8 +47,9 @@ defmodule Logflare.LogEvent do
|> mapper()

%__MODULE__{}
|> cast(params, [:valid, :validation_error, :id, :body])
|> cast(params, [:valid, :id, :body])
|> cast_embed(:source, with: &Source.no_casting_changeset/1)
|> cast_embed(:pipeline_error, with: &pipeline_error_changeset/2)
|> apply_changes()
|> Map.put(:source, source)
end
Expand All @@ -54,13 +61,23 @@ defmodule Logflare.LogEvent do
def make(params, %{source: source}) do
changeset =
%__MODULE__{}
|> cast(mapper(params), [:body, :valid, :validation_error])
|> cast(mapper(params), [:body, :valid])
|> cast_embed(:source, with: &Source.no_casting_changeset/1)
|> cast_embed(:pipeline_error, with: &pipeline_error_changeset/2)
|> validate_required([:body])

pipeline_error =
if changeset.valid?,
do: nil,
else: %LE.PipelineError{
stage: "changeset",
type: "validators",
message: changeset_error_to_string(changeset)
}

le_map =
changeset.changes
|> Map.put(:validation_error, changeset_error_to_string(changeset))
|> Map.put(:pipeline_error, pipeline_error)
|> Map.put(:source, source)
|> Map.put(:origin_source_id, source.token)
|> Map.put(:valid, changeset.valid?)
Expand Down Expand Up @@ -141,10 +158,19 @@ defmodule Logflare.LogEvent do
|> Enum.reduce_while(true, fn validator, _acc ->
case validator.validate(le) do
:ok ->
{:cont, %{le | valid: true}}
{:cont, %{le | valid: true, pipeline_error: nil}}

{:error, message} ->
{:halt, %{le | valid: false, validation_error: message}}
{:halt,
%{
le
| valid: false,
pipeline_error: %LE.PipelineError{
stage: "validators",
type: "validate",
message: message
}
}}
end
end)
end
Expand Down Expand Up @@ -176,6 +202,18 @@ defmodule Logflare.LogEvent do
Kernel.put_in(le.body["event_message"], message)
end

@doc """
Changeset for pipeline errors.
"""
def pipeline_error_changeset(pipeline_error, attrs) do
pipeline_error
|> cast(attrs, [
:stage,
:message
])
|> validate_required([:stage, :message])
end

defp make_message(le, source) do
message = le.body["message"] || le.body["event_message"]

Expand Down
88 changes: 57 additions & 31 deletions lib/logflare/logs/logs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,40 +23,38 @@ defmodule Logflare.Logs do
|> maybe_mark_le_dropped_by_lql()
|> maybe_ingest_and_broadcast()
end)
|> Enum.reduce([], fn le, acc ->
if le.valid do
acc
else
[le.validation_error | acc]
end
end)
|> maybe_acc_errors()
|> case do
[] -> :ok
errors when is_list(errors) -> {:error, errors}
end
end

@spec ingest(Logflare.LogEvent.t()) :: Logflare.LogEvent.t() | {:error, term}
def ingest(%LE{source: %Source{} = source} = le) do
# indvididual source genservers
Supervisor.ensure_started(source.token)

# error here if this doesn't match
{:ok, _} = BufferCounter.push(le)

RecentLogsServer.push(le)

# all sources genservers
with {:ok, _} <- Supervisor.ensure_started(source.token),
{:ok, _} <- BufferCounter.push(le),
:ok <- RecentLogsServer.push(le),
# tests fail when we match on these for some reason
_ok <- Sources.Counters.increment(source.token),
_ok <- SystemMetrics.AllLogsLogged.increment(:total_logs_logged) do
le
else
{:error, _reason} = e ->
e

Sources.Counters.increment(source.token)
SystemMetrics.AllLogsLogged.increment(:total_logs_logged)

:ok
e ->
{:error, e}
end
end

@spec broadcast(Logflare.LogEvent.t()) :: Logflare.LogEvent.t()
def broadcast(%LE{} = le) do
if le.source.metrics.avg < 5 do
Source.ChannelTopics.broadcast_new(le)
end

le
end

def maybe_mark_le_dropped_by_lql(%LE{source: %{drop_lql_string: drop_lql_string}} = le)
Expand All @@ -78,22 +76,50 @@ defmodule Logflare.Logs do
end
end

defp maybe_ingest_and_broadcast(%LE{} = le) do
cond do
le.drop ->
@spec maybe_ingest_and_broadcast(Logflare.LogEvent.t()) :: Logflare.LogEvent.t()
def maybe_ingest_and_broadcast(%LE{} = le) do
with {:drop, false} <- {:drop, le.drop},
{:valid, true} <- {:valid, le.valid},
%LE{} = le <- LE.apply_custom_event_message(le),
%LE{} = le <- ingest(le),
%LE{} = le <- __MODULE__.broadcast(le),
%LE{} = le <- SourceRouting.route_to_sinks_and_ingest(le) do
le
else
{:drop, true} ->
le

le.valid ->
{:valid, false} ->
tap(le, &RejectedLogEvents.ingest/1)

{:error, :buffer_full} ->
le
|> tap(&SourceRouting.route_to_sinks_and_ingest/1)
|> LE.apply_custom_event_message()
|> tap(&ingest/1)
# use module reference namespace for Mimic mocking
|> tap(&__MODULE__.broadcast/1)
|> Map.put(:valid, false)
|> Map.put(:pipeline_error, %LE.PipelineError{
stage: "ingest",
type: "buffer_full",
message: "Source buffer full, please try again in a minute."
})

e ->
Logger.error("Unknown ingest error: " <> inspect(e))

true ->
le
|> tap(&RejectedLogEvents.ingest/1)
|> Map.put(:valid, false)
|> Map.put(:pipeline_error, %LE.PipelineError{
stage: "ingest",
type: "unknown_error",
message: "An unknown error has occured, please contact support if this continues."
})
end
end

defp maybe_acc_errors(log_events) do
Enum.reduce(log_events, [], fn le, acc ->
cond do
le.valid -> acc
le.pipeline_error -> [le.pipeline_error.message | acc]
end
end)
end
end
4 changes: 2 additions & 2 deletions lib/logflare/source/bigquery/buffer_counter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,11 @@ defmodule Logflare.Source.BigQuery.BufferCounter do
Looks up the counter reference from the Registry.
"""

@spec lookup_counter(atom) :: {:error, :not_found} | {:ok, any}
@spec lookup_counter(atom) :: {:error, :buffer_counter_not_found} | {:ok, any}
def lookup_counter(source_uuid) when is_atom(source_uuid) do
case Registry.lookup(Logflare.CounterRegistry, {__MODULE__, source_uuid}) do
[{_pid, counter_ref}] -> {:ok, counter_ref}
_error -> {:error, :not_found}
_error -> {:error, :buffer_counter_not_found}
end
end

Expand Down
8 changes: 5 additions & 3 deletions lib/logflare/source/bigquery/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ defmodule Logflare.Source.BigQuery.Pipeline do
alias Logflare.Sources
alias Logflare.Users

@batcher_multiple 0.5

def start_link(%RLS{source: source, plan: _plan} = rls) do
# procs = calc_procs(source, plan)
max_batchers = (System.schedulers_online() * @batcher_multiple) |> Kernel.floor()

Broadway.start_link(__MODULE__,
name: name(source.token),
producer: [
Expand All @@ -32,7 +35,7 @@ defmodule Logflare.Source.BigQuery.Pipeline do
default: [concurrency: 1]
],
batchers: [
bq: [concurrency: 15, batch_size: 250, batch_timeout: 1000]
bq: [concurrency: max_batchers, batch_size: 250, batch_timeout: 1000]
],
context: rls
)
Expand All @@ -45,7 +48,6 @@ defmodule Logflare.Source.BigQuery.Pipeline do
|> Message.put_batcher(:bq)
end

@spec handle_batch(:bq, list(Broadway.Message.t()), any, RLS.t()) :: [Broadway.Message.t()]
def handle_batch(:bq, messages, _batch_info, %RLS{} = context) do
stream_batch(context, messages)
end
Expand Down
5 changes: 3 additions & 2 deletions lib/logflare/source/recent_logs_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,13 @@ defmodule Logflare.Source.RecentLogsServer do
{:ok, rls, {:continue, :boot}}
end

@spec push(atom | String.t() | LE.t()) :: :ok
@spec push(LE.t()) :: :ok
def push(%LE{source: %Source{token: source_id}} = log_event) do
GenServer.cast(source_id, {:push, source_id, log_event})
end

def push(source_id, %LE{} = log_event) do
@spec push(atom(), Logflare.LogEvent.t()) :: :ok
def push(source_id, %LE{} = log_event) when is_atom(source_id) do
GenServer.cast(source_id, {:push, source_id, log_event})
end

Expand Down
1 change: 1 addition & 0 deletions lib/logflare/source/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ defmodule Logflare.Source.Supervisor do
end
end

@spec ensure_started(atom) :: {:ok, :already_started | :started}
def ensure_started(source_id) do
case Process.whereis(source_id) do
nil ->
Expand Down
8 changes: 4 additions & 4 deletions lib/logflare/sources/source_routing.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ defmodule Logflare.Logs.SourceRouting do
alias Logflare.Logs
require Logger

@spec route_to_sinks_and_ingest(LE.t()) :: :ok | :noop
@spec route_to_sinks_and_ingest(LE.t()) :: LE.t()
def route_to_sinks_and_ingest(%LE{via_rule: %Rule{} = rule} = le) do
Logger.error(
"LogEvent #{le.id} has already been routed using the rule #{rule.id}, can't proceed!"
)

:noop
le
end

def route_to_sinks_and_ingest(%LE{body: body, source: source, via_rule: nil} = le) do
Expand All @@ -31,11 +31,11 @@ defmodule Logflare.Logs.SourceRouting do
do_route(le, rule)

true ->
:noop
le
end
end

:ok
le
end

# routes the log event
Expand Down
2 changes: 1 addition & 1 deletion lib/logflare_web/templates/source/show_rejected.html.eex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<%= @logs |> Enum.with_index |> Enum.map(fn {log_event, inx} -> %>
<li>
<mark class="log-datestamp" data-timestamp="<%= 1_000_000 * (DateTime.from_naive!(log_event.ingested_at, "Etc/UTC") |> DateTime.to_unix()) %>"><%= log_event.ingested_at %></mark>
<%= log_event.validation_error %>
<%= log_event.pipeline_error.message %>
<a class="metadata-link" data-toggle="collapse" href="#metadata-<%= inx %>"aria-expanded="false">
log payload
</a>
Expand Down
6 changes: 3 additions & 3 deletions test/logflare/log_event_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ defmodule Logflare.LogEventTest do
source: %_{},
sys_uint: _,
valid: true,
validation_error: "",
pipeline_error: nil,
via_rule: nil
} = LogEvent.make(@valid_params, %{source: source})

Expand All @@ -35,14 +35,14 @@ defmodule Logflare.LogEventTest do
params =
Map.merge(@valid_params, %{
"valid" => false,
"validation_error" => "some error"
"pipeline_error" => "some error"
})

assert %LogEvent{
drop: false,
# validity gets overwritten
valid: true,
validation_error: "",
pipeline_error: nil,
source: %_{}
} = LogEvent.make(params, %{source: source})
end
Expand Down
Loading

0 comments on commit 33d0f48

Please sign in to comment.