Skip to content

Commit

Permalink
feat: retention days api refactoring (#2221)
Browse files Browse the repository at this point in the history
* feat: retention days api refactoring

* chore: PR review comments
  • Loading branch information
Ziinc authored Oct 9, 2024
1 parent 72af7e4 commit 77d0998
Show file tree
Hide file tree
Showing 28 changed files with 213 additions and 56 deletions.
68 changes: 66 additions & 2 deletions lib/logflare/sources.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ defmodule Logflare.Sources do
alias Logflare.SourceSchemas
alias Logflare.User
alias Logflare.Backends
alias Logflare.Billing.Plan
alias Logflare.Billing
alias Logflare.User
alias Logflare.Users
alias Logflare.SingleTenant
alias Logflare.Google.BigQuery

require Logger

Expand All @@ -37,6 +43,7 @@ defmodule Logflare.Sources do
def list_sources_by_user(user_id) do
from(s in Source, where: s.user_id == ^user_id)
|> Repo.all()
|> Enum.map(&put_retention_days/1)
end

@spec create_source(map(), User.t()) :: {:ok, Source.t()} | {:error, Ecto.Changeset.t()}
Expand All @@ -46,7 +53,7 @@ defmodule Logflare.Sources do
|> Enum.map(fn {k, v} -> {to_string(k), v} end)
|> Map.new()

with {:ok, source} = res <-
with {:ok, source} <-
user
|> Ecto.build_assoc(:sources)
|> Source.update_by_user_changeset(source_params)
Expand All @@ -55,7 +62,11 @@ defmodule Logflare.Sources do
create_big_query_schema_and_start_source(source)
end

res
updated =
source
|> put_retention_days()

{:ok, updated}
end
end

Expand Down Expand Up @@ -108,6 +119,7 @@ defmodule Logflare.Sources do

def get(source_id) when is_integer(source_id) do
Repo.get(Source, source_id)
|> put_retention_days()
end

def update_source(source) do
Expand All @@ -118,12 +130,35 @@ defmodule Logflare.Sources do
source
|> Source.changeset(attrs)
|> Repo.update()
|> post_update(source)
end

defp post_update({:ok, updated}, source) do
# only update the default backend
source = put_retention_days(source)
updated = put_retention_days(updated)

if source.retention_days != updated.retention_days and not SingleTenant.postgres_backend?() do
user = Users.Cache.get(updated.user_id) |> Users.maybe_put_bigquery_defaults()

BigQuery.patch_table_ttl(
updated.token,
updated.retention_days * 86_400_000,
user.bigquery_dataset_id,
user.bigquery_project_id
)
end

{:ok, updated}
end

defp post_update(res, _prev), do: res

def update_source_by_user(source, attrs) do
source
|> Source.update_by_user_changeset(attrs)
|> Repo.update()
|> post_update(source)
end

def update_source_by_user(_source, _plan, %{"notifications_every" => ""}) do
Expand Down Expand Up @@ -159,6 +194,7 @@ defmodule Logflare.Sources do
@spec get_by(Keyword.t()) :: Source.t() | nil
def get_by(kw) do
Repo.get_by(Source, kw)
|> put_retention_days()
end

@spec get_by_and_preload(Keyword.t()) :: Source.t() | nil
Expand All @@ -169,6 +205,7 @@ defmodule Logflare.Sources do
nil -> nil
s -> preload_defaults(s)
end)
|> put_retention_days()
end

@spec get_by_and_preload(Keyword.t(), Keyword.t()) :: Source.t() | nil
Expand All @@ -179,6 +216,7 @@ defmodule Logflare.Sources do
nil -> nil
s -> Repo.preload(s, preloads)
end)
|> put_retention_days()
end

def get_rate_limiter_metrics(source, bucket: :default) do
Expand Down Expand Up @@ -402,4 +440,30 @@ defmodule Logflare.Sources do
_ -> false
end
end

def put_retention_days(%Source{} = source) do
user = Users.Cache.get(source.user_id)
plan = Billing.Cache.get_plan_by_user(user)
%{source | retention_days: source_ttl_to_days(source, plan)}
end

def put_retention_days(source), do: source

@doc """
Formats a source TTL to the specified unit
"""
@spec source_ttl_to_days(Source.t(), Plan.t()) :: integer()
def source_ttl_to_days(%Source{bigquery_table_ttl: ttl}, _plan)
when ttl >= 0 and ttl != nil do
round(ttl)
end

# fallback to plan value or default init value
# use min to avoid misrepresenting what user should see, in cases where actual is more than plan.
def source_ttl_to_days(_source, %Plan{limit_source_ttl: ttl}) do
min(
round(GenUtils.default_table_ttl_days()),
round(ttl / :timer.hours(24))
)
end
end
16 changes: 13 additions & 3 deletions lib/logflare/sources/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ defmodule Logflare.Source do
:metrics,
:notifications,
:custom_event_message_keys,
:backends
:backends,
:retention_days
]}
defp env_dataset_id_append,
do: Application.get_env(:logflare, Logflare.Google)[:dataset_id_append]
Expand Down Expand Up @@ -129,6 +130,7 @@ defmodule Logflare.Source do
field(:drop_lql_string, :string)
field(:v2_pipeline, :boolean, default: false)
field(:suggested_keys, :string, default: "")
field(:retention_days, :integer, virtual: true)
# Causes a shitstorm
# field :bigquery_schema, Ecto.Term

Expand Down Expand Up @@ -178,7 +180,8 @@ defmodule Logflare.Source do
:drop_lql_filters,
:drop_lql_string,
:v2_pipeline,
:suggested_keys
:suggested_keys,
:retention_days
])
|> cast_embed(:notifications, with: &Notifications.changeset/2)
|> put_single_tenant_postgres_changes()
Expand All @@ -203,7 +206,8 @@ defmodule Logflare.Source do
:drop_lql_filters,
:drop_lql_string,
:v2_pipeline,
:suggested_keys
:suggested_keys,
:retention_days
])
|> cast_embed(:notifications, with: &Notifications.changeset/2)
|> put_single_tenant_postgres_changes()
Expand All @@ -216,9 +220,15 @@ defmodule Logflare.Source do
|> unique_constraint(:name, name: :sources_name_index)
|> unique_constraint(:token)
|> unique_constraint(:public_token)
|> put_source_ttl_change()
|> validate_source_ttl(source)
end

defp put_source_ttl_change(changeset) do
value = get_field(changeset, :retention_days)
put_change(changeset, :bigquery_table_ttl, value)
end

def validate_source_ttl(changeset, source) do
if source.user_id do
user = Users.get(source.user_id)
Expand Down
6 changes: 6 additions & 0 deletions lib/logflare/users/users.ex
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ defmodule Logflare.Users do
user
|> Repo.preload([:sources, :billing_account, :team])
|> maybe_put_bigquery_defaults()
|> Map.update!(:sources, fn sources ->
Enum.map(sources, &Sources.put_retention_days/1)
end)
end

def preload_team(user) do
Expand All @@ -104,6 +107,9 @@ defmodule Logflare.Users do

def preload_sources(user) do
Repo.preload(user, :sources)
|> Map.update!(:sources, fn sources ->
Enum.map(sources, &Sources.put_retention_days/1)
end)
end

def preload_endpoints(user) do
Expand Down
17 changes: 2 additions & 15 deletions lib/logflare_web/controllers/source_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -439,22 +439,9 @@ defmodule LogflareWeb.SourceController do
end
end

def update(%{assigns: %{source: old_source, user: user}} = conn, %{"source" => source_params}) do
changeset = Source.update_by_user_changeset(old_source, source_params)

case Repo.update(changeset) do
def update(%{assigns: %{source: old_source}} = conn, %{"source" => source_params}) do
case Sources.update_source_by_user(old_source, source_params) do
{:ok, source} ->
ttl = source.bigquery_table_ttl

if ttl do
BigQuery.patch_table_ttl(
source.token,
source.bigquery_table_ttl * 86_400_000,
user.bigquery_dataset_id,
user.bigquery_project_id
)
end

:ok = Supervisor.ensure_started(source)

conn
Expand Down
2 changes: 1 addition & 1 deletion lib/logflare_web/templates/source/dashboard.html.heex
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@
</span>
</div>
</div>
<%= render(LogflareWeb.SharedView, "dashboard_source_metadata.html", conn: @conn, source: source, source_ttl_days: source_ttl_to_days(source, @plan), pipeline_counts: @pipeline_counts) %>
<%= render(LogflareWeb.SharedView, "dashboard_source_metadata.html", conn: @conn, source: source, source_ttl_days: source.retention_days, pipeline_counts: @pipeline_counts) %>
</li>
<% end %>
</ul>
Expand Down
4 changes: 2 additions & 2 deletions lib/logflare_web/templates/source/edit.html.eex
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@
<p>Set how long to keep data in your backend.</p>
<%= form_for @changeset, Routes.source_path(@conn, :update, @source), fn e -> %>
<div class="form-group">
<%= text_input e, :bigquery_table_ttl, placeholder: "3", class: "form-control form-control-margin" %>
<%= error_tag e, :bigquery_table_ttl %>
<%= text_input e, :retention_days, placeholder: "3", class: "form-control form-control-margin" %>
<%= error_tag e, :retention_days %>
<small class="form-text text-muted">
Days to keep data.
</small>
Expand Down
21 changes: 0 additions & 21 deletions lib/logflare_web/views/source_view.ex
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
defmodule LogflareWeb.SourceView do
import LogflareWeb.Helpers.Forms
alias LogflareWeb.Router.Helpers, as: Routes
alias Logflare.Billing.Plan
alias Logflare.Source
alias Logflare.Google.BigQuery.GenUtils
use LogflareWeb, :view

def log_url(route) do
Expand All @@ -20,22 +17,4 @@ defmodule LogflareWeb.SourceView do
end
|> URI.to_string()
end

@doc """
Formats a source TTL to the specified unit
"""
@spec source_ttl_to_days(Source.t(), Plan.t()) :: integer()
def source_ttl_to_days(%Source{bigquery_table_ttl: ttl}, _plan)
when ttl >= 0 and ttl != nil do
round(ttl)
end

# fallback to plan value or default init value
# use min to avoid misrepresenting what user should see, in cases where actual is more than plan.
def source_ttl_to_days(_source, %Plan{limit_source_ttl: ttl}) do
min(
round(GenUtils.default_table_ttl_days()),
round(ttl / :timer.hours(24))
)
end
end
1 change: 1 addition & 0 deletions test/logflare/backends/adaptor/datadog_adaptor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ defmodule Logflare.Backends.Adaptor.DatadogAdaptorTest do

setup do
start_supervised!(AllLogsLogged)
insert(:plan)
:ok
end

Expand Down
5 changes: 5 additions & 0 deletions test/logflare/backends/buffer_producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ defmodule Logflare.Backends.BufferProducerTest do

import ExUnit.CaptureLog

setup do
insert(:plan)
:ok
end

test "pulls events from IngestEventQueue" do
user = insert(:user)
source = insert(:source, user: user)
Expand Down
1 change: 1 addition & 0 deletions test/logflare/backends/dynamic_pipeline_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule Logflare.DynamicPipelineTest do
import ExUnit.CaptureLog

setup do
insert(:plan)
user = insert(:user)
source = insert(:source, user: user)

Expand Down
1 change: 1 addition & 0 deletions test/logflare/backends/elastic_adaptor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule Logflare.Backends.Adaptor.ElasticAdaptorTest do
doctest @subject

setup do
insert(:plan)
start_supervised!(AllLogsLogged)
:ok
end
Expand Down
5 changes: 5 additions & 0 deletions test/logflare/backends/ingest_events_queue_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ defmodule Logflare.Backends.IngestEventQueueTest do
alias Logflare.Backends
alias Logflare.Backends.IngestEventQueue

setup do
insert(:plan)
:ok
end

test "get_table_size/1 returns nil for non-existing tables" do
assert nil == IngestEventQueue.get_table_size({1, 2, 4})
end
Expand Down
2 changes: 1 addition & 1 deletion test/logflare/backends/webhook_adaptor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ defmodule Logflare.Backends.WebhookAdaptorTest do
@subject Logflare.Backends.Adaptor.WebhookAdaptor

setup do
insert(:plan)
start_supervised!(AllLogsLogged)
:ok
end

describe "ingestion tests" do
setup do
insert(:plan)
user = insert(:user)
source = insert(:source, user: user)

Expand Down
1 change: 1 addition & 0 deletions test/logflare/cluster_pubsub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ defmodule Logflare.ClusterPubSubTest do

describe "ChannelTopics" do
setup do
insert(:plan)
[source: insert(:source, user: insert(:user))]
end

Expand Down
Loading

0 comments on commit 77d0998

Please sign in to comment.