From 5fb674291bd5afc65b3e8f7fda62aa391ee4e4a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Niemier?= Date: Wed, 20 Sep 2023 22:50:44 +0200 Subject: [PATCH] ft: support ingesting gzipped data for logs Close #1699 --- .../controllers/plugs/bert_parser.ex | 19 ++++------- .../plugs/compressed_body_reader.ex | 11 +++++++ .../controllers/plugs/ndjson_parser.ex | 17 ++++------ .../controllers/plugs/syslog_parser.ex | 18 ++++------ lib/logflare_web/router.ex | 3 +- mix.exs | 1 + mix.lock | 1 + .../plugs/compressed_body_reader_test.exs | 33 +++++++++++++++++++ .../logflare_web/plugs/ndjson_parser_test.exs | 11 +------ 9 files changed, 68 insertions(+), 46 deletions(-) create mode 100644 lib/logflare_web/controllers/plugs/compressed_body_reader.ex create mode 100644 test/logflare_web/plugs/compressed_body_reader_test.exs diff --git a/lib/logflare_web/controllers/plugs/bert_parser.ex b/lib/logflare_web/controllers/plugs/bert_parser.ex index 08f2c9c79..a727483f6 100644 --- a/lib/logflare_web/controllers/plugs/bert_parser.ex +++ b/lib/logflare_web/controllers/plugs/bert_parser.ex @@ -4,15 +4,15 @@ defmodule Plug.Parsers.BERT do """ @behaviour Plug.Parsers - import Plug.Conn - @gzip_header {"content-encoding", "gzip"} - def init(_params) do + def init(opts) do + {body_reader, opts} = Keyword.pop(opts, :body_reader, {Plug.Conn, :read_body, []}) + {body_reader, opts} end - def parse(conn, "application", "bert", _headers, _opts) do + def parse(conn, "application", "bert", _headers, {{mod, fun, args}, _opts}) do conn - |> read_body() + |> then(&apply(mod, fun, [&1 | args])) |> decode() end @@ -26,14 +26,7 @@ defmodule Plug.Parsers.BERT do end def decode({:ok, body, conn}) do - body = - if @gzip_header in conn.req_headers do - body |> :zlib.gunzip() |> Bertex.safe_decode() - else - body |> Bertex.safe_decode() - end - - {:ok, body, conn} + {:ok, Bertex.safe_decode(body), conn} rescue e -> reraise Plug.Parsers.ParseError, [exception: e], __STACKTRACE__ diff --git a/lib/logflare_web/controllers/plugs/compressed_body_reader.ex b/lib/logflare_web/controllers/plugs/compressed_body_reader.ex new file mode 100644 index 000000000..e62cf5df3 --- /dev/null +++ b/lib/logflare_web/controllers/plugs/compressed_body_reader.ex @@ -0,0 +1,11 @@ +defmodule Plug.Parsers.CompressedBodyReader do + def read_body(conn, opts \\ []) do + content_encoding = Plug.Conn.get_req_header(conn, "content-encoding") + + with {:ok, body, conn} <- Plug.Conn.read_body(conn, opts), + do: {:ok, try_decompress(body, content_encoding), conn} + end + + defp try_decompress(data, []), do: data + defp try_decompress(data, ["gzip"]), do: :zlib.gunzip(data) +end diff --git a/lib/logflare_web/controllers/plugs/ndjson_parser.ex b/lib/logflare_web/controllers/plugs/ndjson_parser.ex index 47fd9afc8..953db98bf 100644 --- a/lib/logflare_web/controllers/plugs/ndjson_parser.ex +++ b/lib/logflare_web/controllers/plugs/ndjson_parser.ex @@ -5,15 +5,15 @@ defmodule Plug.Parsers.NDJSON do require Logger @behaviour Plug.Parsers - import Plug.Conn - @gzip_header {"content-encoding", "gzip"} - def init(_params) do + def init(opts) do + {body_reader, opts} = Keyword.pop(opts, :body_reader, {Plug.Conn, :read_body, []}) + {body_reader, opts} end - def parse(conn, "application", "x-ndjson", _headers, _opts) do + def parse(conn, "application", "x-ndjson", _headers, {{mod, fun, args}, _opts}) do conn - |> read_body() + |> then(&apply(mod, fun, [&1 | args])) |> decode() end @@ -27,12 +27,7 @@ defmodule Plug.Parsers.NDJSON do end def decode({:ok, body, conn}) do - body = - if @gzip_header in conn.req_headers do - body |> :zlib.gunzip() |> String.split("\n", trim: true) - else - body |> String.split("\n", trim: true) - end + body = body |> String.split("\n", trim: true) batch = for line <- body do diff --git a/lib/logflare_web/controllers/plugs/syslog_parser.ex b/lib/logflare_web/controllers/plugs/syslog_parser.ex index 401fa191e..f2eb10b9d 100644 --- a/lib/logflare_web/controllers/plugs/syslog_parser.ex +++ b/lib/logflare_web/controllers/plugs/syslog_parser.ex @@ -5,17 +5,18 @@ defmodule Plug.Parsers.SYSLOG do require Logger @behaviour Plug.Parsers - import Plug.Conn - @gzip_header {"content-encoding", "gzip"} + alias Logflare.Logs.SyslogParser alias Logflare.Logs.SyslogMessage - def init(_params) do + def init(opts) do + {body_reader, opts} = Keyword.pop(opts, :body_reader, {Plug.Conn, :read_body, []}) + {body_reader, opts} end - def parse(conn, "application", "logplex-1", _headers, _opts) do + def parse(conn, "application", "logplex-1", _headers, {{mod, fun, args}, _opts}) do conn - |> read_body() + |> then(&apply(mod, fun, [&1 | args])) |> decode() end @@ -29,12 +30,7 @@ defmodule Plug.Parsers.SYSLOG do end def decode({:ok, body, conn}) do - body = - if @gzip_header in conn.req_headers do - body |> :zlib.gunzip() |> String.split("\n", trim: true) - else - body |> String.split("\n", trim: true) - end + body = body |> String.split("\n", trim: true) opts = case conn.request_path do diff --git a/lib/logflare_web/router.ex b/lib/logflare_web/router.ex index 6a25579b7..852f23018 100644 --- a/lib/logflare_web/router.ex +++ b/lib/logflare_web/router.ex @@ -55,7 +55,8 @@ defmodule LogflareWeb.Router do plug(Plug.Parsers, parsers: [:json, :bert, :syslog, :ndjson], - json_decoder: Jason + json_decoder: Jason, + body_reader: {Plug.Parsers.CompressedBodyReader, :read_body, []} ) plug(:accepts, ["json", "bert"]) diff --git a/mix.exs b/mix.exs index ea015e949..18d6ebf67 100644 --- a/mix.exs +++ b/mix.exs @@ -130,6 +130,7 @@ defmodule Logflare.Mixfile do # Test {:mix_test_watch, "~> 1.0", only: [:dev, :test], runtime: false}, {:mimic, "~> 1.0", only: :test}, + {:stream_data, "~> 0.6.0", only: [:test]}, # Pagination {:scrivener_ecto, "~> 2.2"}, diff --git a/mix.lock b/mix.lock index d2da80ccc..1b18c1f40 100644 --- a/mix.lock +++ b/mix.lock @@ -119,6 +119,7 @@ "sobelow": {:hex, :sobelow, "0.12.2", "45f4d500e09f95fdb5a7b94c2838d6b26625828751d9f1127174055a78542cf5", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "2f0b617dce551db651145662b84c8da4f158e7abe049a76daaaae2282df01c5d"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, "statistex": {:hex, :statistex, "1.0.0", "f3dc93f3c0c6c92e5f291704cf62b99b553253d7969e9a5fa713e5481cd858a5", [:mix], [], "hexpm", "ff9d8bee7035028ab4742ff52fc80a2aa35cece833cf5319009b52f1b5a86c27"}, + "stream_data": {:hex, :stream_data, "0.6.0", "e87a9a79d7ec23d10ff83eb025141ef4915eeb09d4491f79e52f2562b73e5f47", [:mix], [], "hexpm", "b92b5031b650ca480ced047578f1d57ea6dd563f5b57464ad274718c9c29501c"}, "stripity_stripe": {:hex, :stripity_stripe, "2.9.0", "241f49bb653a2bf1b94744264c885d4e29331fbffa3334db35500cac3af05ca9", [:mix], [{:hackney, "~> 1.15", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:uri_query, "~> 0.1.2", [hex: :uri_query, repo: "hexpm", optional: false]}], "hexpm", "22702f51b949a4897922a5b622fdb4651c081d8ca82e209359a693e34a50cb50"}, "swoosh": {:hex, :swoosh, "0.25.6", "5a7db75f0c206c65d31c9da056234fe98e29835668b04b7a2bf7839e626da88f", [:mix], [{:cowboy, "~> 1.0.1 or ~> 1.1 or ~> 2.4", [hex: :cowboy, repo: "hexpm", optional: true]}, {:gen_smtp, "~> 0.13", [hex: :gen_smtp, repo: "hexpm", optional: true]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mail, "~> 0.2", [hex: :mail, repo: "hexpm", optional: true]}, {:mime, "~> 1.1", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_cowboy, ">= 1.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}], "hexpm", "5d97f8f183e50d812b37d4d27db3e393996a335004dee6477c0aff5195d5f52b"}, "telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"}, diff --git a/test/logflare_web/plugs/compressed_body_reader_test.exs b/test/logflare_web/plugs/compressed_body_reader_test.exs new file mode 100644 index 000000000..d986da214 --- /dev/null +++ b/test/logflare_web/plugs/compressed_body_reader_test.exs @@ -0,0 +1,33 @@ +defmodule LogflareWeb.Plugs.CompressedBodyReaderTest do + use ExUnit.Case, async: true + use ExUnitProperties + + @subject Plug.Parsers.CompressedBodyReader + + doctest @subject + + def conn(body, headers \\ []) do + conn = Plug.Test.conn("POST", "/example", body) + + Enum.reduce(headers, conn, fn {key, value}, conn -> + Plug.Conn.put_req_header(conn, key, value) + end) + end + + property "with no `content-encoding` header data is passed through as is" do + check all(data <- binary()) do + assert {:ok, read, _} = @subject.read_body(conn(data)) + assert read == data + end + end + + property "with `content-encoding: gzip` header data is passed through as is" do + check all(data <- binary()) do + compressed = :zlib.gzip(data) + conn = conn(compressed, [{"content-encoding", "gzip"}]) + + assert {:ok, read, _} = @subject.read_body(conn) + assert read == data + end + end +end diff --git a/test/logflare_web/plugs/ndjson_parser_test.exs b/test/logflare_web/plugs/ndjson_parser_test.exs index 5f5dfdc34..39f269464 100644 --- a/test/logflare_web/plugs/ndjson_parser_test.exs +++ b/test/logflare_web/plugs/ndjson_parser_test.exs @@ -1,20 +1,11 @@ defmodule Plugs.Parsers.NDJSONTest do @moduledoc false use LogflareWeb.ConnCase + alias Plug.Parsers.NDJSON alias Logflare.TestUtils describe "Plugs.Parsers.NDJSON" do - test "decodes a ndjson log batch post request", %{conn: conn} do - conn = Plug.Conn.put_req_header(conn, "content-encoding", "gzip") - - body = TestUtils.cloudflare_log_push_body(decoded: false) - - data = TestUtils.cloudflare_log_push_body(decoded: true) - - assert NDJSON.decode({:ok, body, conn}) == {:ok, data, conn} - end - test "decodes a non-gzipped ndjson log batch post request", %{conn: conn} do body = TestUtils.cloudflare_log_push_body(decoded: false) |> :zlib.gunzip()