Skip to content

Commit

Permalink
feat: elastic adaptor (#2065)
Browse files Browse the repository at this point in the history
* feat: elastic filebeat adaptor

* docs: elastic filebeat docs

* fix: add ui for elastic backend

* feat: http1 for elastic

* docs: add link to filebeat docs

* feat: default webhook value, add default field value changeset helper

* chore: review comments

* chore: only use Tesla.Mock for certain tesla clients

* chore: fix documentation link, sidebar order

* chore: fix failing test

* chore: fix failing tests due to dynamic tesla client

* chore: formatting
  • Loading branch information
Ziinc authored May 24, 2024
1 parent 530b3d9 commit 15c264b
Show file tree
Hide file tree
Showing 19 changed files with 389 additions and 47 deletions.
22 changes: 21 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@ services:
# read_only: true
depends_on:
- db

filebeat:
image: docker.elastic.co/beats/filebeat:8.13.3-arm64
# https://github.com/docker/swarmkit/issues/1951
# Need to override user so we can access the log files, and docker.sock
# user: root
ports:
- "8000:8000"
# networks:
# - elastic
volumes:
- ./test/filebeat.yml:/usr/share/filebeat/filebeat.yml
# - filebeat:/usr/share/filebeat/data
# - /var/log/audit/:/var/log/audit/:ro
# environment:
# - ELASTICSEARCH_HOST=${ELASTICSEARCH_HOST:-node1}
# - KIBANA_HOST=${KIBANA_HOST:-node1}
# - ELASTICSEARCH_USERNAME=${ELASTICSEARCH_USERNAME:-elastic}
# - ELASTICSEARCH_PASSWORD=${ELASTICSEARCH_PASSWORD:-changeme}
# disable strict permission checks
command: ["--strict.perms=false"]
volumes:
pg-data:

1 change: 1 addition & 0 deletions docs/docs.logflare.com/docs/backends/bigquery/index.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
---
toc_max_heading_level: 3
sidebar_position: 1
---

# BigQuery
Expand Down
21 changes: 21 additions & 0 deletions docs/docs.logflare.com/docs/backends/elastic.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
sidebar_position: 4
---
# Elastic

The Elastic backend is **ingest-only** to a Filebeat HTTP server endpoint.

See Filebeat HTTP input [configuration](https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-http_endpoint.html) for details on setting up the input server.

## Behaviour and configurations
### Configuration

The following values are required when creating a webhook backend:

- `url`: (`string`, required) a valid HTTP(S) endpoint.
- `username`: (`string`, optional) used for basic auth.
- `password`: (`string`, optional) used for basic auth.

### Implementation Details

Implementation is based on the [webhook backend](/backends/webhook).
1 change: 1 addition & 0 deletions docs/docs.logflare.com/docs/backends/postgres/index.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
---
toc_max_heading_level: 3
sidebar_position: 2
---

# PostgreSQL
Expand Down
3 changes: 3 additions & 0 deletions docs/docs.logflare.com/docs/backends/webhook.mdx
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
---
sidebar_position: 3
---
# Webhook

The Webhook backend will send batched events to a desired HTTP(S) destination.
Expand Down
5 changes: 4 additions & 1 deletion lib/logflare/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,10 @@ defmodule Logflare.Application do
name: Logflare.FinchQuery, pools: %{default: [protocol: :http2, count: max(base * 2, 10)]}},
{Finch, name: Logflare.FinchGoth, pools: %{default: [protocol: :http2, count: 1]}},
{Finch,
name: Logflare.FinchDefault, pools: %{default: [protocol: :http2, count: max(base, 5)]}}
name: Logflare.FinchDefault, pools: %{default: [protocol: :http1, count: max(base, 5)]}},
{Finch,
name: Logflare.FinchDefaultHttp1,
pools: %{default: [protocol: :http1, count: max(base, 5) * 3]}}
]
end

Expand Down
15 changes: 2 additions & 13 deletions lib/logflare/backends.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ defmodule Logflare.Backends do
@moduledoc false

alias Logflare.Backends.Adaptor
alias Logflare.Backends.Adaptor.WebhookAdaptor
alias Logflare.Backends.Adaptor.PostgresAdaptor
alias Logflare.Backends.Adaptor.BigQueryAdaptor
alias Logflare.Backends.Adaptor.DatadogAdaptor
alias Logflare.Backends.Backend
alias Logflare.Backends.SourceDispatcher
alias Logflare.Backends.SourceRegistry
Expand All @@ -23,13 +19,6 @@ defmodule Logflare.Backends do
alias Logflare.PubSubRates
import Ecto.Query

@adaptor_mapping %{
webhook: WebhookAdaptor,
postgres: PostgresAdaptor,
bigquery: BigQueryAdaptor,
datadog: DatadogAdaptor
}

defdelegate child_spec(arg), to: __MODULE__.Supervisor

@doc """
Expand Down Expand Up @@ -140,7 +129,7 @@ defmodule Logflare.Backends do
# common config validation function
defp validate_config(%{valid?: true} = changeset) do
type = Ecto.Changeset.get_field(changeset, :type)
mod = @adaptor_mapping[type]
mod = Backend.adaptor_mapping()[type]

Ecto.Changeset.validate_change(changeset, :config, fn :config, config ->
case Adaptor.cast_and_validate_config(mod, config) do
Expand All @@ -156,7 +145,7 @@ defmodule Logflare.Backends do
defp typecast_config_string_map_to_atom_map(nil), do: nil

defp typecast_config_string_map_to_atom_map(%Backend{type: type} = backend) do
mod = @adaptor_mapping[type]
mod = Backend.adaptor_mapping()[type]

Map.update!(backend, :config, fn config ->
(config || %{})
Expand Down
10 changes: 3 additions & 7 deletions lib/logflare/backends/adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,8 @@ defmodule Logflare.Backends.Adaptor do
"""
@spec get_adaptor(Backend.t()) :: module()
def get_adaptor(%Backend{type: type}) do
case type do
:datadog -> __MODULE__.DatadogAdaptor
:webhook -> __MODULE__.WebhookAdaptor
:postgres -> __MODULE__.PostgresAdaptor
:bigquery -> __MODULE__.BigQueryAdaptor
end
mapping = Backend.adaptor_mapping()
mapping[type]
end

@callback start_link(source_backend()) ::
Expand Down Expand Up @@ -70,7 +66,7 @@ defmodule Logflare.Backends.Adaptor do
Validate configuration for given adaptor implementation
"""
@spec cast_and_validate_config(module(), map()) :: Ecto.Changeset.t()
def cast_and_validate_config(mod, params) do
def cast_and_validate_config(mod, params) when is_atom(mod) do
params
|> mod.cast_config()
|> mod.validate_config()
Expand Down
3 changes: 2 additions & 1 deletion lib/logflare/backends/adaptor/datadog_adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ defmodule Logflare.Backends.Adaptor.DatadogAdaptor do
backend
| config: %{
url: Map.get(@api_url_mapping, backend.config.region),
headers: %{"dd-api-key" => backend.config.api_key}
headers: %{"dd-api-key" => backend.config.api_key},
http: "http2"
}
}

Expand Down
99 changes: 99 additions & 0 deletions lib/logflare/backends/adaptor/elastic_adaptor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
defmodule Logflare.Backends.Adaptor.ElasticAdaptor do
@moduledoc """
Ingestion uses Filebeat HTTP input.
https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-http_endpoint.html
Basic auth implementation reference:
https://datatracker.ietf.org/doc/html/rfc7617
"""

use TypedStruct

alias Logflare.Backends.Adaptor.WebhookAdaptor

typedstruct enforce: true do
field(:url, String.t())
# basic auth username and password
field(:username, String.t())
field(:password, String.t())
end

@behaviour Logflare.Backends.Adaptor

def child_spec(arg) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [arg]}
}
end

@impl Logflare.Backends.Adaptor
def start_link({source, backend}) do
basic_auth = get_basic_auth(backend.config)

backend = %{
backend
| config: %{
url: backend.config.url,
http: "http1",
headers:
if basic_auth do
%{"Authorization" => "Basic #{basic_auth}"}
else
%{}
end
}
}

WebhookAdaptor.start_link({source, backend})
end

defp get_basic_auth(%{username: username, password: password})
when is_binary(username) and is_binary(password) do
Base.encode64(username <> ":" <> password)
end

defp get_basic_auth(_), do: nil

@impl Logflare.Backends.Adaptor
def ingest(pid, log_events, opts) do
WebhookAdaptor.ingest(pid, log_events, opts)
end

@impl Logflare.Backends.Adaptor
def execute_query(_ident, _query), do: {:error, :not_implemented}

@impl Logflare.Backends.Adaptor
def cast_config(params) do
{%{}, %{url: :string, username: :string, password: :string}}
|> Ecto.Changeset.cast(params, [:username, :password, :url])
|> validate_user_pass()
end

defp validate_user_pass(changeset) do
user = Ecto.Changeset.get_field(changeset, :username)
pass = Ecto.Changeset.get_field(changeset, :password)
user_pass = [user, pass]

if user_pass != [nil, nil] and Enum.any?(user_pass, &is_nil/1) do
msg = "Both username and password must be provided for basic auth"

changeset
|> Ecto.Changeset.add_error(:username, msg)
|> Ecto.Changeset.add_error(:password, msg)
else
changeset
end
end

@impl Logflare.Backends.Adaptor
def validate_config(changeset) do
import Ecto.Changeset

changeset
|> validate_required([:url])
end
end
43 changes: 32 additions & 11 deletions lib/logflare/backends/adaptor/webhook_adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do
typedstruct do
field(:config, %{
url: String.t(),
headers: map()
headers: map(),
http: String.t()
})

field(:backend, Backend.t())
Expand Down Expand Up @@ -43,8 +44,9 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do

@impl Logflare.Backends.Adaptor
def cast_config(params) do
{%{}, %{url: :string, headers: :map}}
|> Ecto.Changeset.cast(params, [:url, :headers])
{%{}, %{url: :string, headers: :map, http: :string}}
|> Ecto.Changeset.cast(params, [:url, :headers, :http])
|> Logflare.Utils.default_field_value(:http, "http2")
end

@impl Logflare.Backends.Adaptor
Expand All @@ -56,6 +58,7 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do
changeset
|> Ecto.Changeset.validate_required([:url])
|> Ecto.Changeset.validate_format(:url, ~r/https?\:\/\/.+/)
|> Ecto.Changeset.validate_inclusion(:http, ["http1", "http2"])
end

# GenServer
Expand All @@ -80,14 +83,27 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do
@moduledoc false
use Tesla, docs: false

plug(Tesla.Middleware.Telemetry)
plug(Tesla.Middleware.JSON)

def send(opts) do
opts
|> Keyword.put_new(:method, :post)
|> Keyword.update(:headers, [], &Map.to_list/1)
|> request()
adaptor =
if Keyword.get(opts, :http) == "http1" do
{Tesla.Adapter.Finch, name: Logflare.FinchDefaultHttp1, receive_timeout: 5_000}
else
{Tesla.Adapter.Finch, name: Logflare.FinchDefault, receive_timeout: 5_000}
end

opts =
opts
|> Keyword.put_new(:method, :post)
|> Keyword.update(:headers, [], &Map.to_list/1)

Tesla.client(
[
Tesla.Middleware.Telemetry,
Tesla.Middleware.JSON
],
adaptor
)
|> request(opts)
end
end

Expand Down Expand Up @@ -146,7 +162,12 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do
end

defp process_data(log_event_bodies, %{config: %{} = config}) do
Client.send(url: config.url, body: log_event_bodies, headers: config[:headers] || %{})
Client.send(
url: config.url,
body: log_event_bodies,
headers: config[:headers] || %{},
http: config.http
)
end

# Broadway transformer for custom producer
Expand Down
14 changes: 11 additions & 3 deletions lib/logflare/backends/backend.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,19 @@ defmodule Logflare.Backends.Backend do
alias Logflare.User
alias Logflare.Rule

@adaptor_types [:bigquery, :webhook, :postgres, :datadog, :elastic]
@adaptor_mapping %{
webhook: Adaptor.WebhookAdaptor,
elastic: Adaptor.ElasticAdaptor,
datadog: Adaptor.DatadogAdaptor,
postgres: Adaptor.PostgresAdaptor,
bigquery: Adaptor.BigQueryAdaptor
}

typed_schema "backends" do
field(:name, :string)
field(:description, :string)
field(:token, Ecto.UUID, autogenerate: true)
field(:type, Ecto.Enum, values: @adaptor_types)
field(:type, Ecto.Enum, values: Map.keys(@adaptor_mapping))
# TODO: maybe use polymorphic embeds
field(:config, :map)
many_to_many(:sources, Source, join_through: "sources_backends")
Expand All @@ -26,11 +32,13 @@ defmodule Logflare.Backends.Backend do
timestamps()
end

def adaptor_mapping(), do: @adaptor_mapping

def changeset(backend, attrs) do
backend
|> cast(attrs, [:type, :config, :user_id, :name, :description])
|> validate_required([:user_id, :type, :config, :name])
|> validate_inclusion(:type, @adaptor_types)
|> validate_inclusion(:type, Map.keys(@adaptor_mapping))
end

@spec child_spec(Source.t(), Backend.t()) :: map()
Expand Down
Loading

0 comments on commit 15c264b

Please sign in to comment.