Skip to content

Commit

Permalink
feat: http1 for elastic
Browse files Browse the repository at this point in the history
  • Loading branch information
Ziinc committed May 16, 2024
1 parent 4e0ddfb commit 75129e3
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 14 deletions.
22 changes: 21 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@ services:
# read_only: true
depends_on:
- db

filebeat:
image: docker.elastic.co/beats/filebeat:8.13.3-arm64
# https://github.com/docker/swarmkit/issues/1951
# Need to override user so we can access the log files, and docker.sock
# user: root
ports:
- "8000:8000"
# networks:
# - elastic
volumes:
- ./test/filebeat.yml:/usr/share/filebeat/filebeat.yml
# - filebeat:/usr/share/filebeat/data
# - /var/log/audit/:/var/log/audit/:ro
# environment:
# - ELASTICSEARCH_HOST=${ELASTICSEARCH_HOST:-node1}
# - KIBANA_HOST=${KIBANA_HOST:-node1}
# - ELASTICSEARCH_USERNAME=${ELASTICSEARCH_USERNAME:-elastic}
# - ELASTICSEARCH_PASSWORD=${ELASTICSEARCH_PASSWORD:-changeme}
# disable strict permission checks
command: ["--strict.perms=false"]
volumes:
pg-data:

5 changes: 4 additions & 1 deletion lib/logflare/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,10 @@ defmodule Logflare.Application do
name: Logflare.FinchQuery, pools: %{default: [protocol: :http2, count: max(base * 2, 10)]}},
{Finch, name: Logflare.FinchGoth, pools: %{default: [protocol: :http2, count: 1]}},
{Finch,
name: Logflare.FinchDefault, pools: %{default: [protocol: :http2, count: max(base, 5)]}}
name: Logflare.FinchDefault, pools: %{default: [protocol: :http1, count: max(base, 5)]}},
{Finch,
name: Logflare.FinchDefaultHttp1,
pools: %{default: [protocol: :http1, count: max(base, 5) * 3]}}
]
end

Expand Down
1 change: 1 addition & 0 deletions lib/logflare/backends/adaptor/elastic_adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ defmodule Logflare.Backends.Adaptor.ElasticAdaptor do
backend
| config: %{
url: backend.config.url,
http: "http1",
headers:
if basic_auth do
%{"Authorization" => "Basic #{basic_auth}"}
Expand Down
43 changes: 31 additions & 12 deletions lib/logflare/backends/adaptor/webhook_adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do
typedstruct do
field(:config, %{
url: String.t(),
headers: map()
headers: map(),
http: String.t()
})

field(:backend, Backend.t())
Expand Down Expand Up @@ -43,8 +44,8 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do

@impl Logflare.Backends.Adaptor
def cast_config(params) do
{%{}, %{url: :string, headers: :map}}
|> Ecto.Changeset.cast(params, [:url, :headers])
{%{}, %{url: :string, headers: :map, http: :string}}
|> Ecto.Changeset.cast(params, [:url, :headers, :http])
end

@impl Logflare.Backends.Adaptor
Expand All @@ -54,7 +55,7 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do
@impl Logflare.Backends.Adaptor
def validate_config(changeset) do
changeset
|> Ecto.Changeset.validate_required([:url])
|> Ecto.Changeset.validate_required([:url, :http])
|> Ecto.Changeset.validate_format(:url, ~r/https?\:\/\/.+/)
end

Expand All @@ -80,14 +81,27 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do
@moduledoc false
use Tesla, docs: false

plug(Tesla.Middleware.Telemetry)
plug(Tesla.Middleware.JSON)

def send(opts) do
opts
|> Keyword.put_new(:method, :post)
|> Keyword.update(:headers, [], &Map.to_list/1)
|> request()
adaptor =
if Keyword.get(opts, :http) == "http1" do
{Tesla.Adapter.Finch, name: Logflare.FinchDefaultHttp1, receive_timeout: 5_000}
else
{Tesla.Adapter.Finch, name: Logflare.FinchDefault, receive_timeout: 5_000}
end

opts =
opts
|> Keyword.put_new(:method, :post)
|> Keyword.update(:headers, [], &Map.to_list/1)

Tesla.client(
[
Tesla.Middleware.Telemetry,
Tesla.Middleware.JSON
],
adaptor
)
|> request(opts)
end
end

Expand Down Expand Up @@ -146,7 +160,12 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do
end

defp process_data(log_event_bodies, %{config: %{} = config}) do
Client.send(url: config.url, body: log_event_bodies, headers: config[:headers] || %{})
Client.send(
url: config.url,
body: log_event_bodies,
headers: config[:headers] || %{},
http: config.http
)
end

# Broadway transformer for custom producer
Expand Down
10 changes: 10 additions & 0 deletions test/filebeat.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@

filebeat.inputs:
- type: http_endpoint
enabled: true
listen_address: 0.0.0.0
content_type: ""

output.console:
pretty: true

0 comments on commit 75129e3

Please sign in to comment.