Skip to content

Commit

Permalink
feat: ingestion time field copying, dashes replacement
Browse files Browse the repository at this point in the history
  • Loading branch information
Ziinc committed Nov 22, 2024
1 parent da52044 commit ba140e7
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 19 deletions.
67 changes: 67 additions & 0 deletions lib/logflare/logs/log_event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ defmodule Logflare.LogEvent do

Logflare.LogEvent
|> struct!(le_map)
|> transform()
|> validate()
end

Expand Down Expand Up @@ -182,6 +183,72 @@ defmodule Logflare.LogEvent do
end)
end

@spec transform(LE.t()) :: LE.t()
defp transform(%LE{valid: false} = le), do: le

defp transform(%LE{valid: true} = le) do
with {:ok, le} <- dashes_to_underscore(le),
{:ok, le} <- copy_fields(le) do
le
else
{:error, message} ->
%{
le
| valid: false,
pipeline_error: %LE.PipelineError{
stage: "transform",
type: "transform",
message: message
}
}
end
end

defp dashes_to_underscore(le) do
new_body =
Iteraptor.reduce(le.body, %{}, fn {k, v}, acc ->
new_path =
Enum.map(k, fn
key when is_binary(key) -> String.replace(key, "-", "_")
key -> key
end)

put_in(acc, Enum.map(new_path, &Access.key(&1, %{})), v)
end)

{:ok, %{le | body: new_body}}
end

defp copy_fields(%LE{source: %Source{transform_copy_fields: nil}} = le), do: {:ok, le}

defp copy_fields(le) do
instructions = String.split(le.source.transform_copy_fields, ~r/\n/, trim: true)

new_body =
for instruction <- instructions, instruction = String.trim(instruction), reduce: le.body do
acc ->
case String.split(instruction, ":", parts: 2) do
[from, to] ->
from = String.replace_prefix(from, "m.", "metadata.")
from_path = String.split(from, ".")

to = String.replace_prefix(to, "m.", "metadata.")
to_path = String.split(to, ".")

if value = get_in(acc, from_path) do
put_in(acc, Enum.map(to_path, &Access.key(&1, %{})), value)
else
acc
end

_ ->
acc
end
end

{:ok, Map.put(le, :body, new_body)}
end

# used to stringify changeset errors
# TODO: move to utils
defp changeset_error_to_string(changeset) do
Expand Down
10 changes: 7 additions & 3 deletions lib/logflare/sources/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ defmodule Logflare.Source do
:notifications,
:custom_event_message_keys,
:backends,
:retention_days
:retention_days,
:transform_copy_fields
]}
defp env_dataset_id_append,
do: Application.get_env(:logflare, Logflare.Google)[:dataset_id_append]
Expand Down Expand Up @@ -131,6 +132,7 @@ defmodule Logflare.Source do
field(:v2_pipeline, :boolean, default: false)
field(:suggested_keys, :string, default: "")
field(:retention_days, :integer, virtual: true)
field(:transform_copy_fields, :string)
# Causes a shitstorm
# field :bigquery_schema, Ecto.Term

Expand Down Expand Up @@ -181,7 +183,8 @@ defmodule Logflare.Source do
:drop_lql_string,
:v2_pipeline,
:suggested_keys,
:retention_days
:retention_days,
:transform_copy_fields
])
|> cast_embed(:notifications, with: &Notifications.changeset/2)
|> put_single_tenant_postgres_changes()
Expand All @@ -207,7 +210,8 @@ defmodule Logflare.Source do
:drop_lql_string,
:v2_pipeline,
:suggested_keys,
:retention_days
:retention_days,
:transform_copy_fields
])
|> cast_embed(:notifications, with: &Notifications.changeset/2)
|> put_single_tenant_postgres_changes()
Expand Down
51 changes: 35 additions & 16 deletions lib/logflare_web/templates/source/edit.html.eex
Original file line number Diff line number Diff line change
Expand Up @@ -187,19 +187,6 @@
<%= link "Test webhook", to: Routes.source_path(@conn, :test_alerts, @source), class: "btn btn-primary form-button", role: "button" %>
<% end %>

<%= section_header("Custom Event Message") %>
<p>Comma separated list of metadata keys to customize the log event message. Use <code>message</code> to target the default log event message. Use JSONpath queries to target more complex nested data structures. The root (<code>$.</code>) is implied here.</p>
<%= form_for @changeset, Routes.source_path(@conn, :update, @source), fn aa -> %>
<div class="form-group">
<%= text_input aa, :custom_event_message_keys, placeholder: "metadata.field.key, metadata.field.another_key", class: "form-control form-control-margin" %>
<%= error_tag aa, :custom_event_message_keys %>
<small class="form-text text-muted">
Key values will be concatenated with a pipe ( | ).
</small>
</div>

<%= submit "Update keys", class: "btn btn-primary form-button" %>
<% end %>
<%= section_header("Public Access") %>
<p>Public paths let you share your logs with other people without having to give them a login. Generate a public token
and this source will be accessible by anyone with this url. Regenerate a public path if needed.</p>
Expand Down Expand Up @@ -228,18 +215,50 @@
</p>
<%= button "#{if @source.validate_schema, do: "Disable", else: "Enable"} schema validation", to: Routes.source_path(@conn, :toggle_schema_validation, @source), class: "btn btn-primary form-button" %>

<%= section_header("Drop Log Events") %>

<hr/>
<h3>Pipeline Rules</h3>

<%= section_header("Drop Log Events") %>
<p>Use an LQL statement to drop log events on ingest.</p>
<%= form_for @changeset, Routes.source_path(@conn, :update, @source), fn drop -> %>
<div class="form-group">
<%= text_input drop, :drop_lql_string, placeholder: "", class: "form-control form-control-margin" %>
<%= error_tag drop, :drop_lql_string %>
</div>
<%= submit "Update LQL", class: "btn btn-primary form-button" %>
<% end %>

<%= section_header("Custom Event Message") %>
<p>Comma separated list of metadata keys to customize the log event message. Use <code>message</code> to target the default log event message. Use JSONpath queries to target more complex nested data structures. The root (<code>$.</code>) is implied here.</p>
<%= form_for @changeset, Routes.source_path(@conn, :update, @source), fn aa -> %>
<div class="form-group">
<%= text_input aa, :custom_event_message_keys, placeholder: "metadata.field.key, metadata.field.another_key", class: "form-control form-control-margin" %>
<%= error_tag aa, :custom_event_message_keys %>
<small class="form-text text-muted">
Currently in beta.
Key values will be concatenated with a pipe ( | ).
</small>
</div>
<%= submit "Update LQL", class: "btn btn-primary form-button" %>

<%= submit "Update keys", class: "btn btn-primary form-button" %>
<% end %>


<%= section_header("Copy fields") %>
<p>
Copy fields to different paths at ingestion. Rules are executed sequentailly. Invalid rules are ignored.
</p>
<%= form_for @changeset, Routes.source_path(@conn, :update, @source), fn e -> %>
<div class="form-group">
<%= textarea e, :transform_copy_fields, placeholder: "metadata.my_field:my.destination.path", class: "form-control form-control-margin tw-h-[120px]" %>
<%= error_tag e, :transform_copy_fields %>
<small class="form-text text-muted">
Separate each rule with a new line. Use the pattern <code>from.field:to.other.field</code> to specify the path to copy a field.
</small>
</div>
<%= submit "Update field copying rules", class: "btn btn-primary form-button" %>
<% end %>
<hr/>

<%= section_header("Backend TTL") %>
<%= if @conn.assigns.user.billing_enabled do %>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
defmodule Logflare.Repo.Migrations.AddTransformCopyFieldsColumnToSourcesTable do
use Ecto.Migration

def change do

alter table(:sources) do
add :transform_copy_fields, :string
end
end
end
1 change: 1 addition & 0 deletions test/logflare/backends_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ defmodule Logflare.BackendsTest do
alias Logflare.Rules
alias Logflare.Backends.IngestEventQueue
alias Logflare.Backends.SourceSupWorker
alias Logflare.LogEvent

setup do
start_supervised!(AllLogsLogged)
Expand Down
89 changes: 89 additions & 0 deletions test/logflare/log_event_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,95 @@ defmodule Logflare.LogEventTest do
assert body["metadata"]["my"] == "key"
end

describe "make/2 transformations" do
test "dashes to underscores", %{source: source} do
assert %LogEvent{
body: %{
"test_field" => _
}
} = LogEvent.make(%{"test-field" => 123}, %{source: source})
end

test "field copying - nested", %{source: source} do
source = %{
source
| transform_copy_fields: """
food:my.field
"""
}

assert %LogEvent{
body: %{
"my" => %{
"field" => 123
},
"food" => _
}
} = LogEvent.make(%{"food" => 123}, %{source: source})
end

test "field copying - top level", %{source: source} do
source = %{
source
| transform_copy_fields: """
food:field
"""
}

assert %LogEvent{
body: %{
"field" => 123,
"food" => 123
}
} = LogEvent.make(%{"food" => 123}, %{source: source})
end

test "field copying - multiple", %{source: source} do
source = %{
source
| transform_copy_fields: """
food:field
field:123
"""
}

assert %LogEvent{
body: %{
"123" => 123,
"field" => 123,
"food" => 123
}
} = LogEvent.make(%{"food" => 123}, %{source: source})
end

test "field copying - dashes in field", %{source: source} do
source = %{
source
| transform_copy_fields: """
my-food:field
"""
}

assert %LogEvent{body: body} = LogEvent.make(%{"my-food" => 123}, %{source: source})
assert Map.drop(body, ["id", "timestamp"]) == %{"my_food" => 123}
end

test "field copying - invalid instructions are ignored", %{source: source} do
source = %{
source
| transform_copy_fields: """
food field
food-field
foodfield
missing:field
"""
}

assert %LogEvent{body: body} = LogEvent.make(%{"food" => 123}, %{source: source})
assert Map.drop(body, ["id", "timestamp"]) == %{"food" => 123}
end
end

test "make/2 with metadata string", %{source: source} do
assert %LogEvent{
body: body,
Expand Down
4 changes: 4 additions & 0 deletions test/logflare_web/controllers/api/source_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ defmodule LogflareWeb.Api.SourceControllerTest do
end
end

describe "transformations" do

end

describe "retention_days" do
setup do
Logflare.Google.BigQuery
Expand Down
43 changes: 43 additions & 0 deletions test/logflare_web/controllers/source_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,49 @@ defmodule LogflareWeb.SourceControllerTest do
end
end


describe "edit" do
setup %{conn: conn} do
insert(:plan, name: "Free")
free_user = insert(:user)
insert(:team, user: free_user)

[conn: conn, free_user: free_user]
end

test "pipeline", %{conn: conn, free_user: user} do
source = insert(:source, user: user)
html =
conn
|> login_user(user)
|> get(Routes.source_path(conn, :edit, source))
|> html_response(200)

assert html =~ "Pipeline Rules"
assert html =~ "Copy fields"
assert html =~ "Update field copying rules"


conn =
conn
|> recycle()
|> login_user(user)
|> patch("/sources/#{source.id}", %{
source: %{transform_copy_fields: """
test:123
123:1234
"""}
})

assert html_response(conn, 302) =~ "redirected"
assert Phoenix.Flash.get(conn.assigns.flash, :info) == "Source updated!"

updated = Sources.get_by(id: source.id)

assert updated.transform_copy_fields != nil
end
end

describe "dashboard single tenant" do
TestUtils.setup_single_tenant(seed_user: true)

Expand Down

0 comments on commit ba140e7

Please sign in to comment.