diff --git a/docs/docs.logflare.com/docs/concepts/ingestion/index.mdx b/docs/docs.logflare.com/docs/concepts/ingestion/index.mdx index 4452e6a07..3b72c4abd 100644 --- a/docs/docs.logflare.com/docs/concepts/ingestion/index.mdx +++ b/docs/docs.logflare.com/docs/concepts/ingestion/index.mdx @@ -55,6 +55,64 @@ To ingest by batch, send your request with the following JSON body: Note that if you have mutliple sources with the same name, it will result in an error on ingestion and the log event will be discarded. +## Pipeline Transformations + +At ingestion, the pipeline will perform the following in sequence: + +1. Add in the `id`, `timestamp`, and `event_message` fields. + +- `event_message` will be populated from the special `message` field if missing. + +2. Ensure that all field names adhere to the BigQuery column requirements. See [key transformation](#key-transformation) for more details. + +- Any fields that are automatically adjusted will be prefixed with an underscore (`_`). + +3. If set, fields will be copied in sequence. See [field copying](#copy-fields) for more details. + +### Key Transformation + +When logging object, your object keys will be transformed automatically to comply with the respective backend in use. For example, BigQuery column requirements require that names only contain letters (a-z, A-Z), numbers (0-9), or underscores (\_), and it must start with a letter or underscore. This will be automatically handled for you when ingesting data. + +### Copy Fields + +A source can be configured to copy fields from one path to another. This allows for augmentation of the event at ingestion time before inserting into the underlying backend-ttl + +A `:` symbol is used as a delimiter between the source field and destination field. The pattern is `source:destination`. Dot syntax is used for specifying nested paths. + +For example, to copy a field from a nested field to the top-level (for performance reasons or otherwise) + +```text +metadata.nested_field:top_field +``` + +Multiple rules can be specified and chained, as they are executed in sequence. + +```text +metadata.my_nested.field:top +top:top_level_copy +top:backup_copy +``` + +In this example, 3 additional fields will be created: `top`, `top_level_copy`, and `backup_copy`. + +As field name transformations to BigQuery specification occur before this step, modified field names must be used. + +For example, if a payload with dashes is ingested: + +```json +{ "top-level": 123, ...} +``` + +The field will be converted into `_top_level` in the field name transformation step. We will then have to refer to it as such in the Copy fields configuration: + +```text +_top_level:my_copied_field +``` + +:::note +Destination field names must match the BigQuery colummn name specification, or the event risks being rejected. +::: + ## Adaptive Schema As your logging needs changes, Logflare is capable of detecting and adjusting the database schema accordingly. This allows you to focus on analyzing your logs instead of having to manage your logging pipeline manually. @@ -107,10 +165,6 @@ On high ingestion volume, Logflare will sample incoming events instead of checki From 10-100 events per second, sample rate is 0.1. From 100-1,000 events per second, sample rate is 0.01. From 1,000-10,000 events per second, sample rate is 0.001. Above 10,000 events per second, sample rate is 0.0001. ::: -### Key Transformation - -When logging object, your object keys will be transformed automatically to comply with the respective backend in use. For example, BigQuery column requirements require that names only contain letters (a-z, A-Z), numbers (0-9), or underscores (\_), and it must start with a letter or underscore. This will be automatically handled for you when ingesting data. - ### Schema Changes The schema changes is done automatically. If this is not the desired behaviour, you can disable this by locking the schema in the source's settings. diff --git a/lib/logflare/logs/log_event.ex b/lib/logflare/logs/log_event.ex index d670eacbf..cfeadacc3 100644 --- a/lib/logflare/logs/log_event.ex +++ b/lib/logflare/logs/log_event.ex @@ -7,6 +7,7 @@ defmodule Logflare.LogEvent do alias Logflare.Source alias __MODULE__, as: LE alias Logflare.Logs.Validators.{EqDeepFieldTypes, BigQuerySchemaChange} + alias Logflare.Logs.IngestTransformers require Logger @@ -84,6 +85,7 @@ defmodule Logflare.LogEvent do Logflare.LogEvent |> struct!(le_map) + |> transform() |> validate() end @@ -182,6 +184,62 @@ defmodule Logflare.LogEvent do end) end + @spec transform(LE.t()) :: LE.t() + defp transform(%LE{valid: false} = le), do: le + + defp transform(%LE{valid: true} = le) do + with {:ok, le} <- bigquery_spec(le), + {:ok, le} <- copy_fields(le) do + le + else + {:error, message} -> + %{ + le + | valid: false, + pipeline_error: %LE.PipelineError{ + stage: "transform", + type: "transform", + message: message + } + } + end + end + + defp bigquery_spec(le) do + new_body = IngestTransformers.transform(le.body, :to_bigquery_column_spec) + {:ok, %{le | body: new_body}} + end + + defp copy_fields(%LE{source: %Source{transform_copy_fields: nil}} = le), do: {:ok, le} + + defp copy_fields(le) do + instructions = String.split(le.source.transform_copy_fields, ~r/\n/, trim: true) + + new_body = + for instruction <- instructions, instruction = String.trim(instruction), reduce: le.body do + acc -> + case String.split(instruction, ":", parts: 2) do + [from, to] -> + from = String.replace_prefix(from, "m.", "metadata.") + from_path = String.split(from, ".") + + to = String.replace_prefix(to, "m.", "metadata.") + to_path = String.split(to, ".") + + if value = get_in(acc, from_path) do + put_in(acc, Enum.map(to_path, &Access.key(&1, %{})), value) + else + acc + end + + _ -> + acc + end + end + + {:ok, Map.put(le, :body, new_body)} + end + # used to stringify changeset errors # TODO: move to utils defp changeset_error_to_string(changeset) do diff --git a/lib/logflare/logs/logs.ex b/lib/logflare/logs/logs.ex index f368b6c0c..ec4d2c2df 100644 --- a/lib/logflare/logs/logs.ex +++ b/lib/logflare/logs/logs.ex @@ -7,7 +7,6 @@ defmodule Logflare.Logs do alias Logflare.{SystemMetrics, Source, Sources} alias Logflare.Logs.SourceRouting alias Logflare.Logs.IngestTypecasting - alias Logflare.Logs.IngestTransformers alias Logflare.Rule alias Logflare.Backends.IngestEventQueue @@ -19,7 +18,6 @@ defmodule Logflare.Logs do |> Enum.map(fn log -> log |> IngestTypecasting.maybe_apply_transform_directives() - |> IngestTransformers.transform(:to_bigquery_column_spec) |> LE.make(%{source: source}) |> maybe_mark_le_dropped_by_lql() |> maybe_ingest_and_broadcast() diff --git a/lib/logflare/sources/source.ex b/lib/logflare/sources/source.ex index 8840b3bab..d2e285c34 100644 --- a/lib/logflare/sources/source.ex +++ b/lib/logflare/sources/source.ex @@ -28,7 +28,8 @@ defmodule Logflare.Source do :notifications, :custom_event_message_keys, :backends, - :retention_days + :retention_days, + :transform_copy_fields ]} defp env_dataset_id_append, do: Application.get_env(:logflare, Logflare.Google)[:dataset_id_append] @@ -131,6 +132,7 @@ defmodule Logflare.Source do field(:v2_pipeline, :boolean, default: false) field(:suggested_keys, :string, default: "") field(:retention_days, :integer, virtual: true) + field(:transform_copy_fields, :string) # Causes a shitstorm # field :bigquery_schema, Ecto.Term @@ -181,7 +183,8 @@ defmodule Logflare.Source do :drop_lql_string, :v2_pipeline, :suggested_keys, - :retention_days + :retention_days, + :transform_copy_fields ]) |> cast_embed(:notifications, with: &Notifications.changeset/2) |> put_single_tenant_postgres_changes() @@ -207,7 +210,8 @@ defmodule Logflare.Source do :drop_lql_string, :v2_pipeline, :suggested_keys, - :retention_days + :retention_days, + :transform_copy_fields ]) |> cast_embed(:notifications, with: &Notifications.changeset/2) |> put_single_tenant_postgres_changes() diff --git a/lib/logflare_web/templates/source/edit.html.eex b/lib/logflare_web/templates/source/edit.html.eex index 09c3f954c..4d87bc6f9 100644 --- a/lib/logflare_web/templates/source/edit.html.eex +++ b/lib/logflare_web/templates/source/edit.html.eex @@ -187,19 +187,6 @@ <%= link "Test webhook", to: Routes.source_path(@conn, :test_alerts, @source), class: "btn btn-primary form-button", role: "button" %> <% end %> - <%= section_header("Custom Event Message") %> -

Comma separated list of metadata keys to customize the log event message. Use message to target the default log event message. Use JSONpath queries to target more complex nested data structures. The root ($.) is implied here.

- <%= form_for @changeset, Routes.source_path(@conn, :update, @source), fn aa -> %> -
- <%= text_input aa, :custom_event_message_keys, placeholder: "metadata.field.key, metadata.field.another_key", class: "form-control form-control-margin" %> - <%= error_tag aa, :custom_event_message_keys %> - - Key values will be concatenated with a pipe ( | ). - -
- - <%= submit "Update keys", class: "btn btn-primary form-button" %> - <% end %> <%= section_header("Public Access") %>

Public paths let you share your logs with other people without having to give them a login. Generate a public token and this source will be accessible by anyone with this url. Regenerate a public path if needed.

@@ -228,18 +215,50 @@

<%= button "#{if @source.validate_schema, do: "Disable", else: "Enable"} schema validation", to: Routes.source_path(@conn, :toggle_schema_validation, @source), class: "btn btn-primary form-button" %> - <%= section_header("Drop Log Events") %> + +
+

Pipeline Rules

+ + <%= section_header("Drop Log Events") %>

Use an LQL statement to drop log events on ingest.

<%= form_for @changeset, Routes.source_path(@conn, :update, @source), fn drop -> %>
<%= text_input drop, :drop_lql_string, placeholder: "", class: "form-control form-control-margin" %> <%= error_tag drop, :drop_lql_string %> +
+ <%= submit "Update LQL", class: "btn btn-primary form-button" %> + <% end %> + + <%= section_header("Custom Event Message") %> +

Comma separated list of metadata keys to customize the log event message. Use message to target the default log event message. Use JSONpath queries to target more complex nested data structures. The root ($.) is implied here.

+ <%= form_for @changeset, Routes.source_path(@conn, :update, @source), fn aa -> %> +
+ <%= text_input aa, :custom_event_message_keys, placeholder: "metadata.field.key, metadata.field.another_key", class: "form-control form-control-margin" %> + <%= error_tag aa, :custom_event_message_keys %> - Currently in beta. + Key values will be concatenated with a pipe ( | ).
- <%= submit "Update LQL", class: "btn btn-primary form-button" %> + + <%= submit "Update keys", class: "btn btn-primary form-button" %> + <% end %> + + + <%= section_header("Copy fields") %> +

+ Copy fields to different paths at ingestion. Rules are executed sequentailly. Invalid rules are ignored. +

+ <%= form_for @changeset, Routes.source_path(@conn, :update, @source), fn e -> %> +
+ <%= textarea e, :transform_copy_fields, placeholder: "metadata.my_field:my.destination.path", class: "form-control form-control-margin tw-h-[120px]" %> + <%= error_tag e, :transform_copy_fields %> + + Separate each rule with a new line. Use the pattern from.field:to.other.field to specify the path to copy a field. + +
+ <%= submit "Update field copying rules", class: "btn btn-primary form-button" %> <% end %> +
<%= section_header("Backend TTL") %> <%= if @conn.assigns.user.billing_enabled do %> diff --git a/priv/repo/migrations/20241121183147_add_transform_copy_fields_column_to_sources_table.exs b/priv/repo/migrations/20241121183147_add_transform_copy_fields_column_to_sources_table.exs new file mode 100644 index 000000000..2c6e49885 --- /dev/null +++ b/priv/repo/migrations/20241121183147_add_transform_copy_fields_column_to_sources_table.exs @@ -0,0 +1,10 @@ +defmodule Logflare.Repo.Migrations.AddTransformCopyFieldsColumnToSourcesTable do + use Ecto.Migration + + def change do + + alter table(:sources) do + add :transform_copy_fields, :string + end + end +end diff --git a/test/logflare/backends_test.exs b/test/logflare/backends_test.exs index 299723515..e124af127 100644 --- a/test/logflare/backends_test.exs +++ b/test/logflare/backends_test.exs @@ -19,6 +19,7 @@ defmodule Logflare.BackendsTest do alias Logflare.Rules alias Logflare.Backends.IngestEventQueue alias Logflare.Backends.SourceSupWorker + alias Logflare.LogEvent setup do start_supervised!(AllLogsLogged) diff --git a/test/logflare/log_event_test.exs b/test/logflare/log_event_test.exs index d166691ec..42f1816e2 100644 --- a/test/logflare/log_event_test.exs +++ b/test/logflare/log_event_test.exs @@ -36,6 +36,95 @@ defmodule Logflare.LogEventTest do assert body["metadata"]["my"] == "key" end + describe "make/2 transformations" do + test "dashes to underscores", %{source: source} do + assert %LogEvent{ + body: %{ + "_test_field" => _ + } + } = LogEvent.make(%{"test-field" => 123}, %{source: source}) + end + + test "field copying - nested", %{source: source} do + source = %{ + source + | transform_copy_fields: """ + food:my.field + """ + } + + assert %LogEvent{ + body: %{ + "my" => %{ + "field" => 123 + }, + "food" => _ + } + } = LogEvent.make(%{"food" => 123}, %{source: source}) + end + + test "field copying - top level", %{source: source} do + source = %{ + source + | transform_copy_fields: """ + food:field + """ + } + + assert %LogEvent{ + body: %{ + "field" => 123, + "food" => 123 + } + } = LogEvent.make(%{"food" => 123}, %{source: source}) + end + + test "field copying - multiple", %{source: source} do + source = %{ + source + | transform_copy_fields: """ + food:field + field:123 + """ + } + + assert %LogEvent{ + body: %{ + "123" => 123, + "field" => 123, + "food" => 123 + } + } = LogEvent.make(%{"food" => 123}, %{source: source}) + end + + test "field copying - dashes in field", %{source: source} do + source = %{ + source + | transform_copy_fields: """ + _my_food:field + """ + } + + assert %LogEvent{body: body} = LogEvent.make(%{"my-food" => 123}, %{source: source}) + assert Map.drop(body, ["id", "timestamp"]) == %{"_my_food" => 123, "field" => 123} + end + + test "field copying - invalid instructions are ignored", %{source: source} do + source = %{ + source + | transform_copy_fields: """ + food field + food-field + foodfield + missing:field + """ + } + + assert %LogEvent{body: body} = LogEvent.make(%{"food" => 123}, %{source: source}) + assert Map.drop(body, ["id", "timestamp"]) == %{"food" => 123} + end + end + test "make/2 with metadata string", %{source: source} do assert %LogEvent{ body: body, diff --git a/test/logflare_web/controllers/api/source_controller_test.exs b/test/logflare_web/controllers/api/source_controller_test.exs index 7616c38e7..262101f7e 100644 --- a/test/logflare_web/controllers/api/source_controller_test.exs +++ b/test/logflare_web/controllers/api/source_controller_test.exs @@ -157,6 +157,9 @@ defmodule LogflareWeb.Api.SourceControllerTest do end end + describe "transformations" do + end + describe "retention_days" do setup do Logflare.Google.BigQuery diff --git a/test/logflare_web/controllers/source_controller_test.exs b/test/logflare_web/controllers/source_controller_test.exs index 120f8fd5e..f6eb8d3bf 100644 --- a/test/logflare_web/controllers/source_controller_test.exs +++ b/test/logflare_web/controllers/source_controller_test.exs @@ -191,6 +191,50 @@ defmodule LogflareWeb.SourceControllerTest do end end + describe "edit" do + setup %{conn: conn} do + insert(:plan, name: "Free") + free_user = insert(:user) + insert(:team, user: free_user) + + [conn: conn, free_user: free_user] + end + + test "pipeline", %{conn: conn, free_user: user} do + source = insert(:source, user: user) + + html = + conn + |> login_user(user) + |> get(Routes.source_path(conn, :edit, source)) + |> html_response(200) + + assert html =~ "Pipeline Rules" + assert html =~ "Copy fields" + assert html =~ "Update field copying rules" + + conn = + conn + |> recycle() + |> login_user(user) + |> patch("/sources/#{source.id}", %{ + source: %{ + transform_copy_fields: """ + test:123 + 123:1234 + """ + } + }) + + assert html_response(conn, 302) =~ "redirected" + assert Phoenix.Flash.get(conn.assigns.flash, :info) == "Source updated!" + + updated = Sources.get_by(id: source.id) + + assert updated.transform_copy_fields != nil + end + end + describe "dashboard single tenant" do TestUtils.setup_single_tenant(seed_user: true)