Skip to content

Commit

Permalink
Merge pull request #1715 from hauleth/ft/compressed-data-ingestion
Browse files Browse the repository at this point in the history
ft: support ingesting gzipped data for logs
  • Loading branch information
Ziinc authored Oct 10, 2023
2 parents 9bc5d5c + d31915e commit 29c35a4
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 48 deletions.
19 changes: 6 additions & 13 deletions lib/logflare_web/controllers/plugs/bert_parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ defmodule Plug.Parsers.BERT do
"""

@behaviour Plug.Parsers
import Plug.Conn
@gzip_header {"content-encoding", "gzip"}

def init(_params) do
def init(opts) do
{body_reader, opts} = Keyword.pop(opts, :body_reader, {Plug.Conn, :read_body, []})
{body_reader, opts}
end

def parse(conn, "application", "bert", _headers, _opts) do
def parse(conn, "application", "bert", _headers, {{mod, fun, args}, _opts}) do
conn
|> read_body()
|> then(&apply(mod, fun, [&1 | args]))
|> decode()
end

Expand All @@ -26,14 +26,7 @@ defmodule Plug.Parsers.BERT do
end

def decode({:ok, body, conn}) do
body =
if @gzip_header in conn.req_headers do
body |> :zlib.gunzip() |> Bertex.safe_decode()
else
body |> Bertex.safe_decode()
end

{:ok, body, conn}
{:ok, Bertex.safe_decode(body), conn}
rescue
e ->
reraise Plug.Parsers.ParseError, [exception: e], __STACKTRACE__
Expand Down
34 changes: 34 additions & 0 deletions lib/logflare_web/controllers/plugs/compressed_body_reader.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
defmodule LogflareWeb.Plugs.CompressedBodyReader do
def read_body(conn, opts \\ []) do
content_encoding = Plug.Conn.get_req_header(conn, "content-encoding")

with {:ok, body, conn} <- Plug.Conn.read_body(conn, opts) do
case try_decompress(body, content_encoding) do
{:ok, data} -> {:ok, data, conn}
{:more, data} -> {:more, data, conn}
{:error, _} = error -> error
end
end
end

defp try_decompress(data, []), do: {:ok, data}
defp try_decompress(data, ["gzip"]), do: safe_gunzip(data)

defp safe_gunzip(data) do
z = :zlib.open()

try do
:zlib.inflateInit(z, 31)
result = :zlib.safeInflate(z, data)
:zlib.inflateEnd(z)

result
after
:zlib.close(z)
else
{:finished, data} -> {:ok, IO.iodata_to_binary(data)}
{:continue, data} -> {:more, IO.iodata_to_binary(data)}
{:need_dictionary, _, _} -> {:error, :not_supported}
end
end
end
17 changes: 6 additions & 11 deletions lib/logflare_web/controllers/plugs/ndjson_parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ defmodule Plug.Parsers.NDJSON do
require Logger

@behaviour Plug.Parsers
import Plug.Conn
@gzip_header {"content-encoding", "gzip"}

def init(_params) do
def init(opts) do
{body_reader, opts} = Keyword.pop(opts, :body_reader, {Plug.Conn, :read_body, []})
{body_reader, opts}
end

def parse(conn, "application", "x-ndjson", _headers, _opts) do
def parse(conn, "application", "x-ndjson", _headers, {{mod, fun, args}, _opts}) do
conn
|> read_body()
|> then(&apply(mod, fun, [&1 | args]))
|> decode()
end

Expand All @@ -27,12 +27,7 @@ defmodule Plug.Parsers.NDJSON do
end

def decode({:ok, body, conn}) do
body =
if @gzip_header in conn.req_headers do
body |> :zlib.gunzip() |> String.split("\n", trim: true)
else
body |> String.split("\n", trim: true)
end
body = body |> String.split("\n", trim: true)

batch =
for line <- body do
Expand Down
18 changes: 7 additions & 11 deletions lib/logflare_web/controllers/plugs/syslog_parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@ defmodule Plug.Parsers.SYSLOG do
require Logger

@behaviour Plug.Parsers
import Plug.Conn
@gzip_header {"content-encoding", "gzip"}

alias Logflare.Logs.SyslogParser
alias Logflare.Logs.SyslogMessage

def init(_params) do
def init(opts) do
{body_reader, opts} = Keyword.pop(opts, :body_reader, {Plug.Conn, :read_body, []})
{body_reader, opts}
end

def parse(conn, "application", "logplex-1", _headers, _opts) do
def parse(conn, "application", "logplex-1", _headers, {{mod, fun, args}, _opts}) do
conn
|> read_body()
|> then(&apply(mod, fun, [&1 | args]))
|> decode()
end

Expand All @@ -29,12 +30,7 @@ defmodule Plug.Parsers.SYSLOG do
end

def decode({:ok, body, conn}) do
body =
if @gzip_header in conn.req_headers do
body |> :zlib.gunzip() |> String.split("\n", trim: true)
else
body |> String.split("\n", trim: true)
end
body = body |> String.split("\n", trim: true)

opts =
case conn.request_path do
Expand Down
3 changes: 2 additions & 1 deletion lib/logflare_web/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ defmodule LogflareWeb.Router do

plug(Plug.Parsers,
parsers: [:json, :bert, :syslog, :ndjson],
json_decoder: Jason
json_decoder: Jason,
body_reader: {LogflareWeb.Plugs.CompressedBodyReader, :read_body, []}
)

plug(:accepts, ["json", "bert"])
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ defmodule Logflare.Mixfile do
# Test
{:mix_test_watch, "~> 1.0", only: [:dev, :test], runtime: false},
{:mimic, "~> 1.0", only: :test},
{:stream_data, "~> 0.6.0", only: [:test]},

# Pagination
{:scrivener_ecto, "~> 2.2"},
Expand Down
3 changes: 1 addition & 2 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"eternal": {:hex, :eternal, "1.2.2", "d1641c86368de99375b98d183042dd6c2b234262b8d08dfd72b9eeaafc2a1abd", [:mix], [], "hexpm", "2c9fe32b9c3726703ba5e1d43a1d255a4f3f2d8f8f9bc19f094c7cb1a7a9e782"},
"ets": {:hex, :ets, "0.8.1", "8ff9bcda5682b98493f8878fc9dbd990e48d566cba8cce59f7c2a78130da29ea", [:mix], [], "hexpm", "6be41b50adb5bc5c43626f25ea2d0af1f4a242fb3fad8d53f0c67c20b78915cc"},
"etso": {:hex, :etso, "1.1.0", "ddbf5417522ecc5f9544a5daeb67fc5f7509a5edb7f65add85a530dc35f80ec5", [], [{:ecto, "~> 3.8.3", [hex: :ecto, repo: "hexpm", optional: false]}], "hexpm", "aa74f6bd76fb444aaa94554c668d637eedd6d71c0a9887ef973437ebe6645368"},
"ex2ms": {:hex, :ex2ms, "1.6.1", "66d472eb14da43087c156e0396bac3cc7176b4f24590a251db53f84e9a0f5f72", [:mix], [], "hexpm", "a7192899d84af03823a8ec2f306fa858cbcce2c2e7fd0f1c49e05168fb9c740e"},
"ex_machina": {:hex, :ex_machina, "2.7.0", "b792cc3127fd0680fecdb6299235b4727a4944a09ff0fa904cc639272cd92dc7", [:mix], [{:ecto, "~> 2.2 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_sql, "~> 3.0", [hex: :ecto_sql, repo: "hexpm", optional: true]}], "hexpm", "419aa7a39bde11894c87a615c4ecaa52d8f107bbdd81d810465186f783245bf8"},
"ex_oauth2_provider": {:git, "https://github.com/aristamd/ex_oauth2_provider.git", "8df4b62acfe858e918be38202948b6b4db142dc6", []},
Expand Down Expand Up @@ -71,7 +70,6 @@
"logflare_logger_backend": {:hex, :logflare_logger_backend, "0.11.4", "3a5df94e764b7c8ee4bd7b875a480a34a27807128d8459aa59ea63b2b38bddc7", [:mix], [{:bertex, "~> 1.3", [hex: :bertex, repo: "hexpm", optional: false]}, {:logflare_api_client, "~> 0.3.5", [hex: :logflare_api_client, repo: "hexpm", optional: false]}, {:logflare_etso, "~> 1.1.2", [hex: :logflare_etso, repo: "hexpm", optional: false]}, {:typed_struct, "~> 0.3.0", [hex: :typed_struct, repo: "hexpm", optional: false]}], "hexpm", "00998d81b3c481ad93d2bf25e66d1ddb1a01ad77d994e2c1a7638c6da94755c5"},
"lqueue": {:hex, :lqueue, "1.2.0", "9cde07d8595a012e61fce9c5b751cb464aff030b0944f915086994b017e694f4", [:mix], [], "hexpm", "cdb983cfc609dff425ae237b722e101f85c763ef1f2bbe363e1f1385f370285d"},
"map_keys": {:hex, :map_keys, "0.1.0", "6941966acd7460542318630e81744a26f0d9fd944dac9d1afcb5339d374dc2ca", [:mix], [{:atomic_map, "~> 0.9.2", [hex: :atomic_map, repo: "hexpm", optional: false]}, {:key_tools, "~> 0.4.0", [hex: :key_tools, repo: "hexpm", optional: false]}, {:proper_case, "~> 1.0", [hex: :proper_case, repo: "hexpm", optional: false]}], "hexpm", "edcdfa2cc828324fbbed94e42564c5978af63bf0bb551e58b5dd473e00d34e37"},
"meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
"mime": {:hex, :mime, "1.6.0", "dabde576a497cef4bbdd60aceee8160e02a6c89250d6c0b29e56c0dfb00db3d2", [:mix], [], "hexpm", "31a1a8613f8321143dde1dafc36006a17d28d02bdfecb9e95a880fa7aabd19a7"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
Expand Down Expand Up @@ -119,6 +117,7 @@
"sobelow": {:hex, :sobelow, "0.12.2", "45f4d500e09f95fdb5a7b94c2838d6b26625828751d9f1127174055a78542cf5", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "2f0b617dce551db651145662b84c8da4f158e7abe049a76daaaae2282df01c5d"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"},
"statistex": {:hex, :statistex, "1.0.0", "f3dc93f3c0c6c92e5f291704cf62b99b553253d7969e9a5fa713e5481cd858a5", [:mix], [], "hexpm", "ff9d8bee7035028ab4742ff52fc80a2aa35cece833cf5319009b52f1b5a86c27"},
"stream_data": {:hex, :stream_data, "0.6.0", "e87a9a79d7ec23d10ff83eb025141ef4915eeb09d4491f79e52f2562b73e5f47", [:mix], [], "hexpm", "b92b5031b650ca480ced047578f1d57ea6dd563f5b57464ad274718c9c29501c"},
"stripity_stripe": {:hex, :stripity_stripe, "2.9.0", "241f49bb653a2bf1b94744264c885d4e29331fbffa3334db35500cac3af05ca9", [:mix], [{:hackney, "~> 1.15", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:uri_query, "~> 0.1.2", [hex: :uri_query, repo: "hexpm", optional: false]}], "hexpm", "22702f51b949a4897922a5b622fdb4651c081d8ca82e209359a693e34a50cb50"},
"swoosh": {:hex, :swoosh, "0.25.6", "5a7db75f0c206c65d31c9da056234fe98e29835668b04b7a2bf7839e626da88f", [:mix], [{:cowboy, "~> 1.0.1 or ~> 1.1 or ~> 2.4", [hex: :cowboy, repo: "hexpm", optional: true]}, {:gen_smtp, "~> 0.13", [hex: :gen_smtp, repo: "hexpm", optional: true]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mail, "~> 0.2", [hex: :mail, repo: "hexpm", optional: true]}, {:mime, "~> 1.1", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_cowboy, ">= 1.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}], "hexpm", "5d97f8f183e50d812b37d4d27db3e393996a335004dee6477c0aff5195d5f52b"},
"telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"},
Expand Down
33 changes: 33 additions & 0 deletions test/logflare_web/plugs/compressed_body_reader_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule LogflareWeb.Plugs.CompressedBodyReaderTest do
use ExUnit.Case, async: true
use ExUnitProperties

@subject LogflareWeb.Plugs.CompressedBodyReader

doctest @subject

def conn(body, headers \\ []) do
conn = Plug.Test.conn("POST", "/example", body)

Enum.reduce(headers, conn, fn {key, value}, conn ->
Plug.Conn.put_req_header(conn, key, value)
end)
end

property "with no `content-encoding` header data is passed through as is" do
check all(data <- binary()) do
assert {:ok, read, _} = @subject.read_body(conn(data))
assert read == data
end
end

property "with `content-encoding: gzip` header data is passed through as is" do
check all(data <- binary()) do
compressed = :zlib.gzip(data)
conn = conn(compressed, [{"content-encoding", "gzip"}])

assert {:ok, read, _} = @subject.read_body(conn)
assert read == data
end
end
end
11 changes: 1 addition & 10 deletions test/logflare_web/plugs/ndjson_parser_test.exs
Original file line number Diff line number Diff line change
@@ -1,20 +1,11 @@
defmodule Plugs.Parsers.NDJSONTest do
@moduledoc false
use LogflareWeb.ConnCase

alias Plug.Parsers.NDJSON
alias Logflare.TestUtils

describe "Plugs.Parsers.NDJSON" do
test "decodes a ndjson log batch post request", %{conn: conn} do
conn = Plug.Conn.put_req_header(conn, "content-encoding", "gzip")

body = TestUtils.cloudflare_log_push_body(decoded: false)

data = TestUtils.cloudflare_log_push_body(decoded: true)

assert NDJSON.decode({:ok, body, conn}) == {:ok, data, conn}
end

test "decodes a non-gzipped ndjson log batch post request", %{conn: conn} do
body = TestUtils.cloudflare_log_push_body(decoded: false) |> :zlib.gunzip()

Expand Down

0 comments on commit 29c35a4

Please sign in to comment.