Skip to content

Commit

Permalink
feat: ingestion time field copying, dashes replacement (#2263)
Browse files Browse the repository at this point in the history
* feat: ingestion time field copying, dashes replacement

* chore: reuse IngestTransformers logic

* chore: formatting

* chore: adjust tests to match ingest transformers

* docs: add transformation docs
  • Loading branch information
Ziinc authored Nov 22, 2024
1 parent da52044 commit 532fbc1
Show file tree
Hide file tree
Showing 10 changed files with 305 additions and 25 deletions.
62 changes: 58 additions & 4 deletions docs/docs.logflare.com/docs/concepts/ingestion/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,64 @@ To ingest by batch, send your request with the following JSON body:

Note that if you have mutliple sources with the same name, it will result in an error on ingestion and the log event will be discarded.

## Pipeline Transformations

At ingestion, the pipeline will perform the following in sequence:

1. Add in the `id`, `timestamp`, and `event_message` fields.

- `event_message` will be populated from the special `message` field if missing.

2. Ensure that all field names adhere to the BigQuery column requirements. See [key transformation](#key-transformation) for more details.

- Any fields that are automatically adjusted will be prefixed with an underscore (`_`).

3. If set, fields will be copied in sequence. See [field copying](#copy-fields) for more details.

### Key Transformation

When logging object, your object keys will be transformed automatically to comply with the respective backend in use. For example, BigQuery column requirements require that names only contain letters (a-z, A-Z), numbers (0-9), or underscores (\_), and it must start with a letter or underscore. This will be automatically handled for you when ingesting data.

### Copy Fields

A source can be configured to copy fields from one path to another. This allows for augmentation of the event at ingestion time before inserting into the underlying backend-ttl

A `:` symbol is used as a delimiter between the source field and destination field. The pattern is `source:destination`. Dot syntax is used for specifying nested paths.

For example, to copy a field from a nested field to the top-level (for performance reasons or otherwise)

```text
metadata.nested_field:top_field
```

Multiple rules can be specified and chained, as they are executed in sequence.

```text
metadata.my_nested.field:top
top:top_level_copy
top:backup_copy
```

In this example, 3 additional fields will be created: `top`, `top_level_copy`, and `backup_copy`.

As field name transformations to BigQuery specification occur before this step, modified field names must be used.

For example, if a payload with dashes is ingested:

```json
{ "top-level": 123, ...}
```

The field will be converted into `_top_level` in the field name transformation step. We will then have to refer to it as such in the Copy fields configuration:

```text
_top_level:my_copied_field
```

:::note
Destination field names must match the BigQuery colummn name specification, or the event risks being rejected.
:::

## Adaptive Schema

As your logging needs changes, Logflare is capable of detecting and adjusting the database schema accordingly. This allows you to focus on analyzing your logs instead of having to manage your logging pipeline manually.
Expand Down Expand Up @@ -107,10 +165,6 @@ On high ingestion volume, Logflare will sample incoming events instead of checki
From 10-100 events per second, sample rate is 0.1. From 100-1,000 events per second, sample rate is 0.01. From 1,000-10,000 events per second, sample rate is 0.001. Above 10,000 events per second, sample rate is 0.0001.
:::

### Key Transformation

When logging object, your object keys will be transformed automatically to comply with the respective backend in use. For example, BigQuery column requirements require that names only contain letters (a-z, A-Z), numbers (0-9), or underscores (\_), and it must start with a letter or underscore. This will be automatically handled for you when ingesting data.

### Schema Changes

The schema changes is done automatically. If this is not the desired behaviour, you can disable this by locking the schema in the source's settings.
Expand Down
58 changes: 58 additions & 0 deletions lib/logflare/logs/log_event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Logflare.LogEvent do
alias Logflare.Source
alias __MODULE__, as: LE
alias Logflare.Logs.Validators.{EqDeepFieldTypes, BigQuerySchemaChange}
alias Logflare.Logs.IngestTransformers

require Logger

Expand Down Expand Up @@ -84,6 +85,7 @@ defmodule Logflare.LogEvent do

Logflare.LogEvent
|> struct!(le_map)
|> transform()
|> validate()
end

Expand Down Expand Up @@ -182,6 +184,62 @@ defmodule Logflare.LogEvent do
end)
end

@spec transform(LE.t()) :: LE.t()
defp transform(%LE{valid: false} = le), do: le

defp transform(%LE{valid: true} = le) do
with {:ok, le} <- bigquery_spec(le),
{:ok, le} <- copy_fields(le) do
le
else
{:error, message} ->
%{
le
| valid: false,
pipeline_error: %LE.PipelineError{
stage: "transform",
type: "transform",
message: message
}
}
end
end

defp bigquery_spec(le) do
new_body = IngestTransformers.transform(le.body, :to_bigquery_column_spec)
{:ok, %{le | body: new_body}}
end

defp copy_fields(%LE{source: %Source{transform_copy_fields: nil}} = le), do: {:ok, le}

defp copy_fields(le) do
instructions = String.split(le.source.transform_copy_fields, ~r/\n/, trim: true)

new_body =
for instruction <- instructions, instruction = String.trim(instruction), reduce: le.body do
acc ->
case String.split(instruction, ":", parts: 2) do
[from, to] ->
from = String.replace_prefix(from, "m.", "metadata.")
from_path = String.split(from, ".")

to = String.replace_prefix(to, "m.", "metadata.")
to_path = String.split(to, ".")

if value = get_in(acc, from_path) do
put_in(acc, Enum.map(to_path, &Access.key(&1, %{})), value)
else
acc
end

_ ->
acc
end
end

{:ok, Map.put(le, :body, new_body)}
end

# used to stringify changeset errors
# TODO: move to utils
defp changeset_error_to_string(changeset) do
Expand Down
2 changes: 0 additions & 2 deletions lib/logflare/logs/logs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ defmodule Logflare.Logs do
alias Logflare.{SystemMetrics, Source, Sources}
alias Logflare.Logs.SourceRouting
alias Logflare.Logs.IngestTypecasting
alias Logflare.Logs.IngestTransformers
alias Logflare.Rule
alias Logflare.Backends.IngestEventQueue

Expand All @@ -19,7 +18,6 @@ defmodule Logflare.Logs do
|> Enum.map(fn log ->
log
|> IngestTypecasting.maybe_apply_transform_directives()
|> IngestTransformers.transform(:to_bigquery_column_spec)
|> LE.make(%{source: source})
|> maybe_mark_le_dropped_by_lql()
|> maybe_ingest_and_broadcast()
Expand Down
10 changes: 7 additions & 3 deletions lib/logflare/sources/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ defmodule Logflare.Source do
:notifications,
:custom_event_message_keys,
:backends,
:retention_days
:retention_days,
:transform_copy_fields
]}
defp env_dataset_id_append,
do: Application.get_env(:logflare, Logflare.Google)[:dataset_id_append]
Expand Down Expand Up @@ -131,6 +132,7 @@ defmodule Logflare.Source do
field(:v2_pipeline, :boolean, default: false)
field(:suggested_keys, :string, default: "")
field(:retention_days, :integer, virtual: true)
field(:transform_copy_fields, :string)
# Causes a shitstorm
# field :bigquery_schema, Ecto.Term

Expand Down Expand Up @@ -181,7 +183,8 @@ defmodule Logflare.Source do
:drop_lql_string,
:v2_pipeline,
:suggested_keys,
:retention_days
:retention_days,
:transform_copy_fields
])
|> cast_embed(:notifications, with: &Notifications.changeset/2)
|> put_single_tenant_postgres_changes()
Expand All @@ -207,7 +210,8 @@ defmodule Logflare.Source do
:drop_lql_string,
:v2_pipeline,
:suggested_keys,
:retention_days
:retention_days,
:transform_copy_fields
])
|> cast_embed(:notifications, with: &Notifications.changeset/2)
|> put_single_tenant_postgres_changes()
Expand Down
51 changes: 35 additions & 16 deletions lib/logflare_web/templates/source/edit.html.eex
Original file line number Diff line number Diff line change
Expand Up @@ -187,19 +187,6 @@
<%= link "Test webhook", to: Routes.source_path(@conn, :test_alerts, @source), class: "btn btn-primary form-button", role: "button" %>
<% end %>

<%= section_header("Custom Event Message") %>
<p>Comma separated list of metadata keys to customize the log event message. Use <code>message</code> to target the default log event message. Use JSONpath queries to target more complex nested data structures. The root (<code>$.</code>) is implied here.</p>
<%= form_for @changeset, Routes.source_path(@conn, :update, @source), fn aa -> %>
<div class="form-group">
<%= text_input aa, :custom_event_message_keys, placeholder: "metadata.field.key, metadata.field.another_key", class: "form-control form-control-margin" %>
<%= error_tag aa, :custom_event_message_keys %>
<small class="form-text text-muted">
Key values will be concatenated with a pipe ( | ).
</small>
</div>

<%= submit "Update keys", class: "btn btn-primary form-button" %>
<% end %>
<%= section_header("Public Access") %>
<p>Public paths let you share your logs with other people without having to give them a login. Generate a public token
and this source will be accessible by anyone with this url. Regenerate a public path if needed.</p>
Expand Down Expand Up @@ -228,18 +215,50 @@
</p>
<%= button "#{if @source.validate_schema, do: "Disable", else: "Enable"} schema validation", to: Routes.source_path(@conn, :toggle_schema_validation, @source), class: "btn btn-primary form-button" %>

<%= section_header("Drop Log Events") %>

<hr/>
<h3>Pipeline Rules</h3>

<%= section_header("Drop Log Events") %>
<p>Use an LQL statement to drop log events on ingest.</p>
<%= form_for @changeset, Routes.source_path(@conn, :update, @source), fn drop -> %>
<div class="form-group">
<%= text_input drop, :drop_lql_string, placeholder: "", class: "form-control form-control-margin" %>
<%= error_tag drop, :drop_lql_string %>
</div>
<%= submit "Update LQL", class: "btn btn-primary form-button" %>
<% end %>

<%= section_header("Custom Event Message") %>
<p>Comma separated list of metadata keys to customize the log event message. Use <code>message</code> to target the default log event message. Use JSONpath queries to target more complex nested data structures. The root (<code>$.</code>) is implied here.</p>
<%= form_for @changeset, Routes.source_path(@conn, :update, @source), fn aa -> %>
<div class="form-group">
<%= text_input aa, :custom_event_message_keys, placeholder: "metadata.field.key, metadata.field.another_key", class: "form-control form-control-margin" %>
<%= error_tag aa, :custom_event_message_keys %>
<small class="form-text text-muted">
Currently in beta.
Key values will be concatenated with a pipe ( | ).
</small>
</div>
<%= submit "Update LQL", class: "btn btn-primary form-button" %>

<%= submit "Update keys", class: "btn btn-primary form-button" %>
<% end %>


<%= section_header("Copy fields") %>
<p>
Copy fields to different paths at ingestion. Rules are executed sequentailly. Invalid rules are ignored.
</p>
<%= form_for @changeset, Routes.source_path(@conn, :update, @source), fn e -> %>
<div class="form-group">
<%= textarea e, :transform_copy_fields, placeholder: "metadata.my_field:my.destination.path", class: "form-control form-control-margin tw-h-[120px]" %>
<%= error_tag e, :transform_copy_fields %>
<small class="form-text text-muted">
Separate each rule with a new line. Use the pattern <code>from.field:to.other.field</code> to specify the path to copy a field.
</small>
</div>
<%= submit "Update field copying rules", class: "btn btn-primary form-button" %>
<% end %>
<hr/>

<%= section_header("Backend TTL") %>
<%= if @conn.assigns.user.billing_enabled do %>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
defmodule Logflare.Repo.Migrations.AddTransformCopyFieldsColumnToSourcesTable do
use Ecto.Migration

def change do

alter table(:sources) do
add :transform_copy_fields, :string
end
end
end
1 change: 1 addition & 0 deletions test/logflare/backends_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ defmodule Logflare.BackendsTest do
alias Logflare.Rules
alias Logflare.Backends.IngestEventQueue
alias Logflare.Backends.SourceSupWorker
alias Logflare.LogEvent

setup do
start_supervised!(AllLogsLogged)
Expand Down
89 changes: 89 additions & 0 deletions test/logflare/log_event_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,95 @@ defmodule Logflare.LogEventTest do
assert body["metadata"]["my"] == "key"
end

describe "make/2 transformations" do
test "dashes to underscores", %{source: source} do
assert %LogEvent{
body: %{
"_test_field" => _
}
} = LogEvent.make(%{"test-field" => 123}, %{source: source})
end

test "field copying - nested", %{source: source} do
source = %{
source
| transform_copy_fields: """
food:my.field
"""
}

assert %LogEvent{
body: %{
"my" => %{
"field" => 123
},
"food" => _
}
} = LogEvent.make(%{"food" => 123}, %{source: source})
end

test "field copying - top level", %{source: source} do
source = %{
source
| transform_copy_fields: """
food:field
"""
}

assert %LogEvent{
body: %{
"field" => 123,
"food" => 123
}
} = LogEvent.make(%{"food" => 123}, %{source: source})
end

test "field copying - multiple", %{source: source} do
source = %{
source
| transform_copy_fields: """
food:field
field:123
"""
}

assert %LogEvent{
body: %{
"123" => 123,
"field" => 123,
"food" => 123
}
} = LogEvent.make(%{"food" => 123}, %{source: source})
end

test "field copying - dashes in field", %{source: source} do
source = %{
source
| transform_copy_fields: """
_my_food:field
"""
}

assert %LogEvent{body: body} = LogEvent.make(%{"my-food" => 123}, %{source: source})
assert Map.drop(body, ["id", "timestamp"]) == %{"_my_food" => 123, "field" => 123}
end

test "field copying - invalid instructions are ignored", %{source: source} do
source = %{
source
| transform_copy_fields: """
food field
food-field
foodfield
missing:field
"""
}

assert %LogEvent{body: body} = LogEvent.make(%{"food" => 123}, %{source: source})
assert Map.drop(body, ["id", "timestamp"]) == %{"food" => 123}
end
end

test "make/2 with metadata string", %{source: source} do
assert %LogEvent{
body: body,
Expand Down
Loading

0 comments on commit 532fbc1

Please sign in to comment.