Skip to content

Commit

Permalink
ft: support ingesting gzipped data for logs
Browse files Browse the repository at this point in the history
Close #1699
  • Loading branch information
hauleth committed Oct 3, 2023
1 parent 4991fc9 commit 569987a
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 46 deletions.
19 changes: 6 additions & 13 deletions lib/logflare_web/controllers/plugs/bert_parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ defmodule Plug.Parsers.BERT do
"""

@behaviour Plug.Parsers
import Plug.Conn
@gzip_header {"content-encoding", "gzip"}

def init(_params) do
def init(opts) do
{body_reader, opts} = Keyword.pop(opts, :body_reader, {Plug.Conn, :read_body, []})
{body_reader, opts}
end

def parse(conn, "application", "bert", _headers, _opts) do
def parse(conn, "application", "bert", _headers, {{mod, fun, args}, _opts}) do
conn
|> read_body()
|> then(&apply(mod, fun, [&1 | args]))
|> decode()
end

Expand All @@ -26,14 +26,7 @@ defmodule Plug.Parsers.BERT do
end

def decode({:ok, body, conn}) do
body =
if @gzip_header in conn.req_headers do
body |> :zlib.gunzip() |> Bertex.safe_decode()
else
body |> Bertex.safe_decode()
end

{:ok, body, conn}
{:ok, Bertex.safe_decode(body), conn}
rescue
e ->
reraise Plug.Parsers.ParseError, [exception: e], __STACKTRACE__
Expand Down
33 changes: 33 additions & 0 deletions lib/logflare_web/controllers/plugs/compressed_body_reader.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule Plug.Parsers.CompressedBodyReader do
def read_body(conn, opts \\ []) do
content_encoding = Plug.Conn.get_req_header(conn, "content-encoding")

with {:ok, body, conn} <- Plug.Conn.read_body(conn, opts) do
case try_decompress(body, content_encoding) do
{:ok, data} -> {:ok, data, conn}
{:more, data} -> {:more, data, conn}
{:error, _} = error -> error
end
end
end

defp try_decompress(data, []), do: {:ok, data}
defp try_decompress(data, ["gzip"]), do: safe_gunzip(data)

defp safe_gunzip(data) do
z = :zlib.open()
try do
:zlib.inflateInit(z, 31)
result = :zlib.safeInflate(z, data)
:zlib.inflateEnd(z)

result
after
:zlib.close(z)
else
{:finished, data} -> {:ok, IO.iodata_to_binary(data)}
{:continue, data} -> {:more, IO.iodata_to_binary(data)}
{:need_dictionary, _, _} -> {:error, :not_supported}
end
end
end
17 changes: 6 additions & 11 deletions lib/logflare_web/controllers/plugs/ndjson_parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ defmodule Plug.Parsers.NDJSON do
require Logger

@behaviour Plug.Parsers
import Plug.Conn
@gzip_header {"content-encoding", "gzip"}

def init(_params) do
def init(opts) do
{body_reader, opts} = Keyword.pop(opts, :body_reader, {Plug.Conn, :read_body, []})
{body_reader, opts}
end

def parse(conn, "application", "x-ndjson", _headers, _opts) do
def parse(conn, "application", "x-ndjson", _headers, {{mod, fun, args}, _opts}) do
conn
|> read_body()
|> then(&apply(mod, fun, [&1 | args]))
|> decode()
end

Expand All @@ -27,12 +27,7 @@ defmodule Plug.Parsers.NDJSON do
end

def decode({:ok, body, conn}) do
body =
if @gzip_header in conn.req_headers do
body |> :zlib.gunzip() |> String.split("\n", trim: true)
else
body |> String.split("\n", trim: true)
end
body = body |> String.split("\n", trim: true)

batch =
for line <- body do
Expand Down
18 changes: 7 additions & 11 deletions lib/logflare_web/controllers/plugs/syslog_parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@ defmodule Plug.Parsers.SYSLOG do
require Logger

@behaviour Plug.Parsers
import Plug.Conn
@gzip_header {"content-encoding", "gzip"}

alias Logflare.Logs.SyslogParser
alias Logflare.Logs.SyslogMessage

def init(_params) do
def init(opts) do
{body_reader, opts} = Keyword.pop(opts, :body_reader, {Plug.Conn, :read_body, []})
{body_reader, opts}
end

def parse(conn, "application", "logplex-1", _headers, _opts) do
def parse(conn, "application", "logplex-1", _headers, {{mod, fun, args}, _opts}) do
conn
|> read_body()
|> then(&apply(mod, fun, [&1 | args]))
|> decode()
end

Expand All @@ -29,12 +30,7 @@ defmodule Plug.Parsers.SYSLOG do
end

def decode({:ok, body, conn}) do
body =
if @gzip_header in conn.req_headers do
body |> :zlib.gunzip() |> String.split("\n", trim: true)
else
body |> String.split("\n", trim: true)
end
body = body |> String.split("\n", trim: true)

opts =
case conn.request_path do
Expand Down
3 changes: 2 additions & 1 deletion lib/logflare_web/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ defmodule LogflareWeb.Router do

plug(Plug.Parsers,
parsers: [:json, :bert, :syslog, :ndjson],
json_decoder: Jason
json_decoder: Jason,
body_reader: {Plug.Parsers.CompressedBodyReader, :read_body, []}
)

plug(:accepts, ["json", "bert"])
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ defmodule Logflare.Mixfile do
# Test
{:mix_test_watch, "~> 1.0", only: [:dev, :test], runtime: false},
{:mimic, "~> 1.0", only: :test},
{:stream_data, "~> 0.6.0", only: [:test]},

# Pagination
{:scrivener_ecto, "~> 2.2"},
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
"sobelow": {:hex, :sobelow, "0.12.2", "45f4d500e09f95fdb5a7b94c2838d6b26625828751d9f1127174055a78542cf5", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "2f0b617dce551db651145662b84c8da4f158e7abe049a76daaaae2282df01c5d"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"},
"statistex": {:hex, :statistex, "1.0.0", "f3dc93f3c0c6c92e5f291704cf62b99b553253d7969e9a5fa713e5481cd858a5", [:mix], [], "hexpm", "ff9d8bee7035028ab4742ff52fc80a2aa35cece833cf5319009b52f1b5a86c27"},
"stream_data": {:hex, :stream_data, "0.6.0", "e87a9a79d7ec23d10ff83eb025141ef4915eeb09d4491f79e52f2562b73e5f47", [:mix], [], "hexpm", "b92b5031b650ca480ced047578f1d57ea6dd563f5b57464ad274718c9c29501c"},
"stripity_stripe": {:hex, :stripity_stripe, "2.9.0", "241f49bb653a2bf1b94744264c885d4e29331fbffa3334db35500cac3af05ca9", [:mix], [{:hackney, "~> 1.15", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:uri_query, "~> 0.1.2", [hex: :uri_query, repo: "hexpm", optional: false]}], "hexpm", "22702f51b949a4897922a5b622fdb4651c081d8ca82e209359a693e34a50cb50"},
"swoosh": {:hex, :swoosh, "0.25.6", "5a7db75f0c206c65d31c9da056234fe98e29835668b04b7a2bf7839e626da88f", [:mix], [{:cowboy, "~> 1.0.1 or ~> 1.1 or ~> 2.4", [hex: :cowboy, repo: "hexpm", optional: true]}, {:gen_smtp, "~> 0.13", [hex: :gen_smtp, repo: "hexpm", optional: true]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mail, "~> 0.2", [hex: :mail, repo: "hexpm", optional: true]}, {:mime, "~> 1.1", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_cowboy, ">= 1.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}], "hexpm", "5d97f8f183e50d812b37d4d27db3e393996a335004dee6477c0aff5195d5f52b"},
"telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"},
Expand Down
33 changes: 33 additions & 0 deletions test/logflare_web/plugs/compressed_body_reader_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule LogflareWeb.Plugs.CompressedBodyReaderTest do
use ExUnit.Case, async: true
use ExUnitProperties

@subject Plug.Parsers.CompressedBodyReader

doctest @subject

def conn(body, headers \\ []) do
conn = Plug.Test.conn("POST", "/example", body)

Enum.reduce(headers, conn, fn {key, value}, conn ->
Plug.Conn.put_req_header(conn, key, value)
end)
end

property "with no `content-encoding` header data is passed through as is" do
check all(data <- binary()) do
assert {:ok, read, _} = @subject.read_body(conn(data))
assert read == data
end
end

property "with `content-encoding: gzip` header data is passed through as is" do
check all(data <- binary()) do
compressed = :zlib.gzip(data)
conn = conn(compressed, [{"content-encoding", "gzip"}])

assert {:ok, read, _} = @subject.read_body(conn)
assert read == data
end
end
end
11 changes: 1 addition & 10 deletions test/logflare_web/plugs/ndjson_parser_test.exs
Original file line number Diff line number Diff line change
@@ -1,20 +1,11 @@
defmodule Plugs.Parsers.NDJSONTest do
@moduledoc false
use LogflareWeb.ConnCase

alias Plug.Parsers.NDJSON
alias Logflare.TestUtils

describe "Plugs.Parsers.NDJSON" do
test "decodes a ndjson log batch post request", %{conn: conn} do
conn = Plug.Conn.put_req_header(conn, "content-encoding", "gzip")

body = TestUtils.cloudflare_log_push_body(decoded: false)

data = TestUtils.cloudflare_log_push_body(decoded: true)

assert NDJSON.decode({:ok, body, conn}) == {:ok, data, conn}
end

test "decodes a non-gzipped ndjson log batch post request", %{conn: conn} do
body = TestUtils.cloudflare_log_push_body(decoded: false) |> :zlib.gunzip()

Expand Down

0 comments on commit 569987a

Please sign in to comment.