Skip to content

Commit

Permalink
Merge pull request #1745 from hauleth/chore/cleanup-backends-implemen…
Browse files Browse the repository at this point in the history
…tation

chore: cleanup `backends.ex`
  • Loading branch information
Ziinc authored Oct 12, 2023
2 parents 37b5768 + 63abb1f commit 122e1da
Showing 1 changed file with 14 additions and 25 deletions.
39 changes: 14 additions & 25 deletions lib/logflare/backends.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,9 @@ defmodule Logflare.Backends do
|> validate_config()
|> Repo.insert()

case source_backend do
{:ok, updated} ->
restart_source_sup(source)
{:ok, typecast_config_string_map_to_atom_map(updated)}

other ->
other
with {:ok, updated} <- source_backend do
restart_source_sup(source)
{:ok, typecast_config_string_map_to_atom_map(updated)}
end
end

Expand All @@ -76,22 +72,19 @@ defmodule Logflare.Backends do
|> validate_config()
|> Repo.update()

case source_backend_config do
{:ok, updated} ->
restart_source_sup(source_backend.source)
{:ok, typecast_config_string_map_to_atom_map(updated)}

other ->
other
with {:ok, updated} <- source_backend_config do
restart_source_sup(source_backend.source)
{:ok, typecast_config_string_map_to_atom_map(updated)}
end
end

# common config validation function
defp validate_config(changeset) do
type = Ecto.Changeset.get_field(changeset, :type)
mod = @adaptor_mapping[type]

Ecto.Changeset.validate_change(changeset, :config, fn :config, config ->
case @adaptor_mapping[type].cast_and_validate_config(config) do
case mod.cast_and_validate_config(config) do
%{valid?: true} -> []
%{valid?: false, errors: errors} -> for {key, err} <- errors, do: {:"config.#{key}", err}
end
Expand All @@ -102,16 +95,12 @@ defmodule Logflare.Backends do
defp typecast_config_string_map_to_atom_map(nil), do: nil

defp typecast_config_string_map_to_atom_map(%SourceBackend{type: type} = source_backend) do
Map.update!(source_backend, :config, fn config ->
mod = @adaptor_mapping[type]
mod = @adaptor_mapping[type]

typecasted =
config
|> mod.cast_config()
|> Ecto.Changeset.apply_changes()

mod_struct = struct(mod, %{config: typecasted})
mod_struct.config
Map.update!(source_backend, :config, fn config ->
config
|> mod.cast_config()
|> Ecto.Changeset.apply_changes()
end)
end

Expand Down Expand Up @@ -154,7 +143,7 @@ defmodule Logflare.Backends do
Registry.dispatch(SourceDispatcher, source.id, fn entries ->
for {pid, {adaptor_module, :ingest}} <- entries do
# TODO: spawn tasks to do this concurrently
apply(adaptor_module, :ingest, [pid, log_events])
adaptor_module.ingest(pid, log_events)
end
end)

Expand Down

0 comments on commit 122e1da

Please sign in to comment.