From 75129e3fc8c93a132f2a006a703f4ca6384da87f Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Thu, 9 May 2024 13:47:42 +0800 Subject: [PATCH] feat: http1 for elastic --- docker-compose.yml | 22 +++++++++- lib/logflare/application.ex | 5 ++- .../backends/adaptor/elastic_adaptor.ex | 1 + .../backends/adaptor/webhook_adaptor.ex | 43 +++++++++++++------ test/filebeat.yml | 10 +++++ 5 files changed, 67 insertions(+), 14 deletions(-) create mode 100644 test/filebeat.yml diff --git a/docker-compose.yml b/docker-compose.yml index 0bf5d765e..a609bdc39 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -40,6 +40,26 @@ services: # read_only: true depends_on: - db - + filebeat: + image: docker.elastic.co/beats/filebeat:8.13.3-arm64 + # https://github.com/docker/swarmkit/issues/1951 + # Need to override user so we can access the log files, and docker.sock + # user: root + ports: + - "8000:8000" + # networks: + # - elastic + volumes: + - ./test/filebeat.yml:/usr/share/filebeat/filebeat.yml + # - filebeat:/usr/share/filebeat/data + # - /var/log/audit/:/var/log/audit/:ro + # environment: + # - ELASTICSEARCH_HOST=${ELASTICSEARCH_HOST:-node1} + # - KIBANA_HOST=${KIBANA_HOST:-node1} + # - ELASTICSEARCH_USERNAME=${ELASTICSEARCH_USERNAME:-elastic} + # - ELASTICSEARCH_PASSWORD=${ELASTICSEARCH_PASSWORD:-changeme} + # disable strict permission checks + command: ["--strict.perms=false"] volumes: pg-data: + diff --git a/lib/logflare/application.ex b/lib/logflare/application.ex index 6640227b3..01b152004 100644 --- a/lib/logflare/application.ex +++ b/lib/logflare/application.ex @@ -253,7 +253,10 @@ defmodule Logflare.Application do name: Logflare.FinchQuery, pools: %{default: [protocol: :http2, count: max(base * 2, 10)]}}, {Finch, name: Logflare.FinchGoth, pools: %{default: [protocol: :http2, count: 1]}}, {Finch, - name: Logflare.FinchDefault, pools: %{default: [protocol: :http2, count: max(base, 5)]}} + name: Logflare.FinchDefault, pools: %{default: [protocol: :http1, count: max(base, 5)]}}, + {Finch, + name: Logflare.FinchDefaultHttp1, + pools: %{default: [protocol: :http1, count: max(base, 5) * 3]}} ] end diff --git a/lib/logflare/backends/adaptor/elastic_adaptor.ex b/lib/logflare/backends/adaptor/elastic_adaptor.ex index 34ddf5f5c..f5c1af2f5 100644 --- a/lib/logflare/backends/adaptor/elastic_adaptor.ex +++ b/lib/logflare/backends/adaptor/elastic_adaptor.ex @@ -38,6 +38,7 @@ defmodule Logflare.Backends.Adaptor.ElasticAdaptor do backend | config: %{ url: backend.config.url, + http: "http1", headers: if basic_auth do %{"Authorization" => "Basic #{basic_auth}"} diff --git a/lib/logflare/backends/adaptor/webhook_adaptor.ex b/lib/logflare/backends/adaptor/webhook_adaptor.ex index 7b0fa5c3f..1c6af3ef9 100644 --- a/lib/logflare/backends/adaptor/webhook_adaptor.ex +++ b/lib/logflare/backends/adaptor/webhook_adaptor.ex @@ -13,7 +13,8 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do typedstruct do field(:config, %{ url: String.t(), - headers: map() + headers: map(), + http: String.t() }) field(:backend, Backend.t()) @@ -43,8 +44,8 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do @impl Logflare.Backends.Adaptor def cast_config(params) do - {%{}, %{url: :string, headers: :map}} - |> Ecto.Changeset.cast(params, [:url, :headers]) + {%{}, %{url: :string, headers: :map, http: :string}} + |> Ecto.Changeset.cast(params, [:url, :headers, :http]) end @impl Logflare.Backends.Adaptor @@ -54,7 +55,7 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do @impl Logflare.Backends.Adaptor def validate_config(changeset) do changeset - |> Ecto.Changeset.validate_required([:url]) + |> Ecto.Changeset.validate_required([:url, :http]) |> Ecto.Changeset.validate_format(:url, ~r/https?\:\/\/.+/) end @@ -80,14 +81,27 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do @moduledoc false use Tesla, docs: false - plug(Tesla.Middleware.Telemetry) - plug(Tesla.Middleware.JSON) - def send(opts) do - opts - |> Keyword.put_new(:method, :post) - |> Keyword.update(:headers, [], &Map.to_list/1) - |> request() + adaptor = + if Keyword.get(opts, :http) == "http1" do + {Tesla.Adapter.Finch, name: Logflare.FinchDefaultHttp1, receive_timeout: 5_000} + else + {Tesla.Adapter.Finch, name: Logflare.FinchDefault, receive_timeout: 5_000} + end + + opts = + opts + |> Keyword.put_new(:method, :post) + |> Keyword.update(:headers, [], &Map.to_list/1) + + Tesla.client( + [ + Tesla.Middleware.Telemetry, + Tesla.Middleware.JSON + ], + adaptor + ) + |> request(opts) end end @@ -146,7 +160,12 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do end defp process_data(log_event_bodies, %{config: %{} = config}) do - Client.send(url: config.url, body: log_event_bodies, headers: config[:headers] || %{}) + Client.send( + url: config.url, + body: log_event_bodies, + headers: config[:headers] || %{}, + http: config.http + ) end # Broadway transformer for custom producer diff --git a/test/filebeat.yml b/test/filebeat.yml new file mode 100644 index 000000000..600a2c3e9 --- /dev/null +++ b/test/filebeat.yml @@ -0,0 +1,10 @@ + +filebeat.inputs: +- type: http_endpoint + enabled: true + listen_address: 0.0.0.0 + content_type: "" + +output.console: + pretty: true +