diff --git a/lib/logflare/logs/log_event.ex b/lib/logflare/logs/log_event.ex
index d670eacbf..39c0771a4 100644
--- a/lib/logflare/logs/log_event.ex
+++ b/lib/logflare/logs/log_event.ex
@@ -84,6 +84,7 @@ defmodule Logflare.LogEvent do
Logflare.LogEvent
|> struct!(le_map)
+ |> transform()
|> validate()
end
@@ -182,6 +183,72 @@ defmodule Logflare.LogEvent do
end)
end
+ @spec transform(LE.t()) :: LE.t()
+ defp transform(%LE{valid: false} = le), do: le
+
+ defp transform(%LE{valid: true} = le) do
+ with {:ok, le} <- dashes_to_underscore(le),
+ {:ok, le} <- copy_fields(le) do
+ le
+ else
+ {:error, message} ->
+ %{
+ le
+ | valid: false,
+ pipeline_error: %LE.PipelineError{
+ stage: "transform",
+ type: "transform",
+ message: message
+ }
+ }
+ end
+ end
+
+ defp dashes_to_underscore(le) do
+ new_body =
+ Iteraptor.reduce(le.body, %{}, fn {k, v}, acc ->
+ new_path =
+ Enum.map(k, fn
+ key when is_binary(key) -> String.replace(key, "-", "_")
+ key -> key
+ end)
+
+ put_in(acc, Enum.map(new_path, &Access.key(&1, %{})), v)
+ end)
+
+ {:ok, %{le | body: new_body}}
+ end
+
+ defp copy_fields(%LE{source: %Source{transform_copy_fields: nil}} = le), do: {:ok, le}
+
+ defp copy_fields(le) do
+ instructions = String.split(le.source.transform_copy_fields, ~r/\n/, trim: true)
+
+ new_body =
+ for instruction <- instructions, instruction = String.trim(instruction), reduce: le.body do
+ acc ->
+ case String.split(instruction, ":", parts: 2) do
+ [from, to] ->
+ from = String.replace_prefix(from, "m.", "metadata.")
+ from_path = String.split(from, ".")
+
+ to = String.replace_prefix(to, "m.", "metadata.")
+ to_path = String.split(to, ".")
+
+ if value = get_in(acc, from_path) do
+ put_in(acc, Enum.map(to_path, &Access.key(&1, %{})), value)
+ else
+ acc
+ end
+
+ _ ->
+ acc
+ end
+ end
+
+ {:ok, Map.put(le, :body, new_body)}
+ end
+
# used to stringify changeset errors
# TODO: move to utils
defp changeset_error_to_string(changeset) do
diff --git a/lib/logflare/sources/source.ex b/lib/logflare/sources/source.ex
index 8840b3bab..d2e285c34 100644
--- a/lib/logflare/sources/source.ex
+++ b/lib/logflare/sources/source.ex
@@ -28,7 +28,8 @@ defmodule Logflare.Source do
:notifications,
:custom_event_message_keys,
:backends,
- :retention_days
+ :retention_days,
+ :transform_copy_fields
]}
defp env_dataset_id_append,
do: Application.get_env(:logflare, Logflare.Google)[:dataset_id_append]
@@ -131,6 +132,7 @@ defmodule Logflare.Source do
field(:v2_pipeline, :boolean, default: false)
field(:suggested_keys, :string, default: "")
field(:retention_days, :integer, virtual: true)
+ field(:transform_copy_fields, :string)
# Causes a shitstorm
# field :bigquery_schema, Ecto.Term
@@ -181,7 +183,8 @@ defmodule Logflare.Source do
:drop_lql_string,
:v2_pipeline,
:suggested_keys,
- :retention_days
+ :retention_days,
+ :transform_copy_fields
])
|> cast_embed(:notifications, with: &Notifications.changeset/2)
|> put_single_tenant_postgres_changes()
@@ -207,7 +210,8 @@ defmodule Logflare.Source do
:drop_lql_string,
:v2_pipeline,
:suggested_keys,
- :retention_days
+ :retention_days,
+ :transform_copy_fields
])
|> cast_embed(:notifications, with: &Notifications.changeset/2)
|> put_single_tenant_postgres_changes()
diff --git a/lib/logflare_web/templates/source/edit.html.eex b/lib/logflare_web/templates/source/edit.html.eex
index 09c3f954c..4d87bc6f9 100644
--- a/lib/logflare_web/templates/source/edit.html.eex
+++ b/lib/logflare_web/templates/source/edit.html.eex
@@ -187,19 +187,6 @@
<%= link "Test webhook", to: Routes.source_path(@conn, :test_alerts, @source), class: "btn btn-primary form-button", role: "button" %>
<% end %>
- <%= section_header("Custom Event Message") %>
-
Comma separated list of metadata keys to customize the log event message. Use message
to target the default log event message. Use JSONpath queries to target more complex nested data structures. The root ($.
) is implied here.
- <%= form_for @changeset, Routes.source_path(@conn, :update, @source), fn aa -> %>
-
- <%= text_input aa, :custom_event_message_keys, placeholder: "metadata.field.key, metadata.field.another_key", class: "form-control form-control-margin" %>
- <%= error_tag aa, :custom_event_message_keys %>
-
- Key values will be concatenated with a pipe ( | ).
-
-
-
- <%= submit "Update keys", class: "btn btn-primary form-button" %>
- <% end %>
<%= section_header("Public Access") %>
Public paths let you share your logs with other people without having to give them a login. Generate a public token
and this source will be accessible by anyone with this url. Regenerate a public path if needed.
@@ -228,18 +215,50 @@
<%= button "#{if @source.validate_schema, do: "Disable", else: "Enable"} schema validation", to: Routes.source_path(@conn, :toggle_schema_validation, @source), class: "btn btn-primary form-button" %>
- <%= section_header("Drop Log Events") %>
+
+
+ Pipeline Rules
+
+ <%= section_header("Drop Log Events") %>
Use an LQL statement to drop log events on ingest.
<%= form_for @changeset, Routes.source_path(@conn, :update, @source), fn drop -> %>
<%= text_input drop, :drop_lql_string, placeholder: "", class: "form-control form-control-margin" %>
<%= error_tag drop, :drop_lql_string %>
+
+ <%= submit "Update LQL", class: "btn btn-primary form-button" %>
+ <% end %>
+
+ <%= section_header("Custom Event Message") %>
+ Comma separated list of metadata keys to customize the log event message. Use message
to target the default log event message. Use JSONpath queries to target more complex nested data structures. The root ($.
) is implied here.
+ <%= form_for @changeset, Routes.source_path(@conn, :update, @source), fn aa -> %>
+
+ <%= text_input aa, :custom_event_message_keys, placeholder: "metadata.field.key, metadata.field.another_key", class: "form-control form-control-margin" %>
+ <%= error_tag aa, :custom_event_message_keys %>
- Currently in beta.
+ Key values will be concatenated with a pipe ( | ).
- <%= submit "Update LQL", class: "btn btn-primary form-button" %>
+
+ <%= submit "Update keys", class: "btn btn-primary form-button" %>
+ <% end %>
+
+
+ <%= section_header("Copy fields") %>
+
+ Copy fields to different paths at ingestion. Rules are executed sequentailly. Invalid rules are ignored.
+
+ <%= form_for @changeset, Routes.source_path(@conn, :update, @source), fn e -> %>
+
+ <%= textarea e, :transform_copy_fields, placeholder: "metadata.my_field:my.destination.path", class: "form-control form-control-margin tw-h-[120px]" %>
+ <%= error_tag e, :transform_copy_fields %>
+
+ Separate each rule with a new line. Use the pattern from.field:to.other.field
to specify the path to copy a field.
+
+
+ <%= submit "Update field copying rules", class: "btn btn-primary form-button" %>
<% end %>
+
<%= section_header("Backend TTL") %>
<%= if @conn.assigns.user.billing_enabled do %>
diff --git a/priv/repo/migrations/20241121183147_add_transform_copy_fields_column_to_sources_table.exs b/priv/repo/migrations/20241121183147_add_transform_copy_fields_column_to_sources_table.exs
new file mode 100644
index 000000000..2c6e49885
--- /dev/null
+++ b/priv/repo/migrations/20241121183147_add_transform_copy_fields_column_to_sources_table.exs
@@ -0,0 +1,10 @@
+defmodule Logflare.Repo.Migrations.AddTransformCopyFieldsColumnToSourcesTable do
+ use Ecto.Migration
+
+ def change do
+
+ alter table(:sources) do
+ add :transform_copy_fields, :string
+ end
+ end
+end
diff --git a/test/logflare/backends_test.exs b/test/logflare/backends_test.exs
index 299723515..e124af127 100644
--- a/test/logflare/backends_test.exs
+++ b/test/logflare/backends_test.exs
@@ -19,6 +19,7 @@ defmodule Logflare.BackendsTest do
alias Logflare.Rules
alias Logflare.Backends.IngestEventQueue
alias Logflare.Backends.SourceSupWorker
+ alias Logflare.LogEvent
setup do
start_supervised!(AllLogsLogged)
diff --git a/test/logflare/log_event_test.exs b/test/logflare/log_event_test.exs
index d166691ec..16a7f7950 100644
--- a/test/logflare/log_event_test.exs
+++ b/test/logflare/log_event_test.exs
@@ -36,6 +36,95 @@ defmodule Logflare.LogEventTest do
assert body["metadata"]["my"] == "key"
end
+ describe "make/2 transformations" do
+ test "dashes to underscores", %{source: source} do
+ assert %LogEvent{
+ body: %{
+ "test_field" => _
+ }
+ } = LogEvent.make(%{"test-field" => 123}, %{source: source})
+ end
+
+ test "field copying - nested", %{source: source} do
+ source = %{
+ source
+ | transform_copy_fields: """
+ food:my.field
+ """
+ }
+
+ assert %LogEvent{
+ body: %{
+ "my" => %{
+ "field" => 123
+ },
+ "food" => _
+ }
+ } = LogEvent.make(%{"food" => 123}, %{source: source})
+ end
+
+ test "field copying - top level", %{source: source} do
+ source = %{
+ source
+ | transform_copy_fields: """
+ food:field
+ """
+ }
+
+ assert %LogEvent{
+ body: %{
+ "field" => 123,
+ "food" => 123
+ }
+ } = LogEvent.make(%{"food" => 123}, %{source: source})
+ end
+
+ test "field copying - multiple", %{source: source} do
+ source = %{
+ source
+ | transform_copy_fields: """
+ food:field
+ field:123
+ """
+ }
+
+ assert %LogEvent{
+ body: %{
+ "123" => 123,
+ "field" => 123,
+ "food" => 123
+ }
+ } = LogEvent.make(%{"food" => 123}, %{source: source})
+ end
+
+ test "field copying - dashes in field", %{source: source} do
+ source = %{
+ source
+ | transform_copy_fields: """
+ my-food:field
+ """
+ }
+
+ assert %LogEvent{body: body} = LogEvent.make(%{"my-food" => 123}, %{source: source})
+ assert Map.drop(body, ["id", "timestamp"]) == %{"my_food" => 123}
+ end
+
+ test "field copying - invalid instructions are ignored", %{source: source} do
+ source = %{
+ source
+ | transform_copy_fields: """
+ food field
+ food-field
+ foodfield
+ missing:field
+ """
+ }
+
+ assert %LogEvent{body: body} = LogEvent.make(%{"food" => 123}, %{source: source})
+ assert Map.drop(body, ["id", "timestamp"]) == %{"food" => 123}
+ end
+ end
+
test "make/2 with metadata string", %{source: source} do
assert %LogEvent{
body: body,
diff --git a/test/logflare_web/controllers/api/source_controller_test.exs b/test/logflare_web/controllers/api/source_controller_test.exs
index 7616c38e7..18eddca71 100644
--- a/test/logflare_web/controllers/api/source_controller_test.exs
+++ b/test/logflare_web/controllers/api/source_controller_test.exs
@@ -157,6 +157,10 @@ defmodule LogflareWeb.Api.SourceControllerTest do
end
end
+ describe "transformations" do
+
+ end
+
describe "retention_days" do
setup do
Logflare.Google.BigQuery
diff --git a/test/logflare_web/controllers/source_controller_test.exs b/test/logflare_web/controllers/source_controller_test.exs
index 120f8fd5e..6dd007cc6 100644
--- a/test/logflare_web/controllers/source_controller_test.exs
+++ b/test/logflare_web/controllers/source_controller_test.exs
@@ -191,6 +191,49 @@ defmodule LogflareWeb.SourceControllerTest do
end
end
+
+ describe "edit" do
+ setup %{conn: conn} do
+ insert(:plan, name: "Free")
+ free_user = insert(:user)
+ insert(:team, user: free_user)
+
+ [conn: conn, free_user: free_user]
+ end
+
+ test "pipeline", %{conn: conn, free_user: user} do
+ source = insert(:source, user: user)
+ html =
+ conn
+ |> login_user(user)
+ |> get(Routes.source_path(conn, :edit, source))
+ |> html_response(200)
+
+ assert html =~ "Pipeline Rules"
+ assert html =~ "Copy fields"
+ assert html =~ "Update field copying rules"
+
+
+ conn =
+ conn
+ |> recycle()
+ |> login_user(user)
+ |> patch("/sources/#{source.id}", %{
+ source: %{transform_copy_fields: """
+ test:123
+ 123:1234
+ """}
+ })
+
+ assert html_response(conn, 302) =~ "redirected"
+ assert Phoenix.Flash.get(conn.assigns.flash, :info) == "Source updated!"
+
+ updated = Sources.get_by(id: source.id)
+
+ assert updated.transform_copy_fields != nil
+ end
+ end
+
describe "dashboard single tenant" do
TestUtils.setup_single_tenant(seed_user: true)