From d31915efb198f361c12d2b6f6b753c7489e2f9fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Niemier?= Date: Fri, 6 Oct 2023 00:10:29 +0200 Subject: [PATCH] ft: support ingesting gzipped data for logs Close #1699 --- .../controllers/plugs/bert_parser.ex | 19 ++++------- .../plugs/compressed_body_reader.ex | 34 +++++++++++++++++++ .../controllers/plugs/ndjson_parser.ex | 17 ++++------ .../controllers/plugs/syslog_parser.ex | 18 ++++------ lib/logflare_web/router.ex | 3 +- mix.exs | 1 + mix.lock | 3 +- .../plugs/compressed_body_reader_test.exs | 33 ++++++++++++++++++ .../logflare_web/plugs/ndjson_parser_test.exs | 11 +----- 9 files changed, 91 insertions(+), 48 deletions(-) create mode 100644 lib/logflare_web/controllers/plugs/compressed_body_reader.ex create mode 100644 test/logflare_web/plugs/compressed_body_reader_test.exs diff --git a/lib/logflare_web/controllers/plugs/bert_parser.ex b/lib/logflare_web/controllers/plugs/bert_parser.ex index 08f2c9c79..a727483f6 100644 --- a/lib/logflare_web/controllers/plugs/bert_parser.ex +++ b/lib/logflare_web/controllers/plugs/bert_parser.ex @@ -4,15 +4,15 @@ defmodule Plug.Parsers.BERT do """ @behaviour Plug.Parsers - import Plug.Conn - @gzip_header {"content-encoding", "gzip"} - def init(_params) do + def init(opts) do + {body_reader, opts} = Keyword.pop(opts, :body_reader, {Plug.Conn, :read_body, []}) + {body_reader, opts} end - def parse(conn, "application", "bert", _headers, _opts) do + def parse(conn, "application", "bert", _headers, {{mod, fun, args}, _opts}) do conn - |> read_body() + |> then(&apply(mod, fun, [&1 | args])) |> decode() end @@ -26,14 +26,7 @@ defmodule Plug.Parsers.BERT do end def decode({:ok, body, conn}) do - body = - if @gzip_header in conn.req_headers do - body |> :zlib.gunzip() |> Bertex.safe_decode() - else - body |> Bertex.safe_decode() - end - - {:ok, body, conn} + {:ok, Bertex.safe_decode(body), conn} rescue e -> reraise Plug.Parsers.ParseError, [exception: e], __STACKTRACE__ diff --git a/lib/logflare_web/controllers/plugs/compressed_body_reader.ex b/lib/logflare_web/controllers/plugs/compressed_body_reader.ex new file mode 100644 index 000000000..61c525fa0 --- /dev/null +++ b/lib/logflare_web/controllers/plugs/compressed_body_reader.ex @@ -0,0 +1,34 @@ +defmodule LogflareWeb.Plugs.CompressedBodyReader do + def read_body(conn, opts \\ []) do + content_encoding = Plug.Conn.get_req_header(conn, "content-encoding") + + with {:ok, body, conn} <- Plug.Conn.read_body(conn, opts) do + case try_decompress(body, content_encoding) do + {:ok, data} -> {:ok, data, conn} + {:more, data} -> {:more, data, conn} + {:error, _} = error -> error + end + end + end + + defp try_decompress(data, []), do: {:ok, data} + defp try_decompress(data, ["gzip"]), do: safe_gunzip(data) + + defp safe_gunzip(data) do + z = :zlib.open() + + try do + :zlib.inflateInit(z, 31) + result = :zlib.safeInflate(z, data) + :zlib.inflateEnd(z) + + result + after + :zlib.close(z) + else + {:finished, data} -> {:ok, IO.iodata_to_binary(data)} + {:continue, data} -> {:more, IO.iodata_to_binary(data)} + {:need_dictionary, _, _} -> {:error, :not_supported} + end + end +end diff --git a/lib/logflare_web/controllers/plugs/ndjson_parser.ex b/lib/logflare_web/controllers/plugs/ndjson_parser.ex index 47fd9afc8..953db98bf 100644 --- a/lib/logflare_web/controllers/plugs/ndjson_parser.ex +++ b/lib/logflare_web/controllers/plugs/ndjson_parser.ex @@ -5,15 +5,15 @@ defmodule Plug.Parsers.NDJSON do require Logger @behaviour Plug.Parsers - import Plug.Conn - @gzip_header {"content-encoding", "gzip"} - def init(_params) do + def init(opts) do + {body_reader, opts} = Keyword.pop(opts, :body_reader, {Plug.Conn, :read_body, []}) + {body_reader, opts} end - def parse(conn, "application", "x-ndjson", _headers, _opts) do + def parse(conn, "application", "x-ndjson", _headers, {{mod, fun, args}, _opts}) do conn - |> read_body() + |> then(&apply(mod, fun, [&1 | args])) |> decode() end @@ -27,12 +27,7 @@ defmodule Plug.Parsers.NDJSON do end def decode({:ok, body, conn}) do - body = - if @gzip_header in conn.req_headers do - body |> :zlib.gunzip() |> String.split("\n", trim: true) - else - body |> String.split("\n", trim: true) - end + body = body |> String.split("\n", trim: true) batch = for line <- body do diff --git a/lib/logflare_web/controllers/plugs/syslog_parser.ex b/lib/logflare_web/controllers/plugs/syslog_parser.ex index 401fa191e..f2eb10b9d 100644 --- a/lib/logflare_web/controllers/plugs/syslog_parser.ex +++ b/lib/logflare_web/controllers/plugs/syslog_parser.ex @@ -5,17 +5,18 @@ defmodule Plug.Parsers.SYSLOG do require Logger @behaviour Plug.Parsers - import Plug.Conn - @gzip_header {"content-encoding", "gzip"} + alias Logflare.Logs.SyslogParser alias Logflare.Logs.SyslogMessage - def init(_params) do + def init(opts) do + {body_reader, opts} = Keyword.pop(opts, :body_reader, {Plug.Conn, :read_body, []}) + {body_reader, opts} end - def parse(conn, "application", "logplex-1", _headers, _opts) do + def parse(conn, "application", "logplex-1", _headers, {{mod, fun, args}, _opts}) do conn - |> read_body() + |> then(&apply(mod, fun, [&1 | args])) |> decode() end @@ -29,12 +30,7 @@ defmodule Plug.Parsers.SYSLOG do end def decode({:ok, body, conn}) do - body = - if @gzip_header in conn.req_headers do - body |> :zlib.gunzip() |> String.split("\n", trim: true) - else - body |> String.split("\n", trim: true) - end + body = body |> String.split("\n", trim: true) opts = case conn.request_path do diff --git a/lib/logflare_web/router.ex b/lib/logflare_web/router.ex index 6a25579b7..4e8042c18 100644 --- a/lib/logflare_web/router.ex +++ b/lib/logflare_web/router.ex @@ -55,7 +55,8 @@ defmodule LogflareWeb.Router do plug(Plug.Parsers, parsers: [:json, :bert, :syslog, :ndjson], - json_decoder: Jason + json_decoder: Jason, + body_reader: {LogflareWeb.Plugs.CompressedBodyReader, :read_body, []} ) plug(:accepts, ["json", "bert"]) diff --git a/mix.exs b/mix.exs index 779418bef..c86a4d405 100644 --- a/mix.exs +++ b/mix.exs @@ -130,6 +130,7 @@ defmodule Logflare.Mixfile do # Test {:mix_test_watch, "~> 1.0", only: [:dev, :test], runtime: false}, {:mimic, "~> 1.0", only: :test}, + {:stream_data, "~> 0.6.0", only: [:test]}, # Pagination {:scrivener_ecto, "~> 2.2"}, diff --git a/mix.lock b/mix.lock index 392eb93ed..0f2bd2af8 100644 --- a/mix.lock +++ b/mix.lock @@ -31,7 +31,6 @@ "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, "eternal": {:hex, :eternal, "1.2.2", "d1641c86368de99375b98d183042dd6c2b234262b8d08dfd72b9eeaafc2a1abd", [:mix], [], "hexpm", "2c9fe32b9c3726703ba5e1d43a1d255a4f3f2d8f8f9bc19f094c7cb1a7a9e782"}, "ets": {:hex, :ets, "0.8.1", "8ff9bcda5682b98493f8878fc9dbd990e48d566cba8cce59f7c2a78130da29ea", [:mix], [], "hexpm", "6be41b50adb5bc5c43626f25ea2d0af1f4a242fb3fad8d53f0c67c20b78915cc"}, - "etso": {:hex, :etso, "1.1.0", "ddbf5417522ecc5f9544a5daeb67fc5f7509a5edb7f65add85a530dc35f80ec5", [], [{:ecto, "~> 3.8.3", [hex: :ecto, repo: "hexpm", optional: false]}], "hexpm", "aa74f6bd76fb444aaa94554c668d637eedd6d71c0a9887ef973437ebe6645368"}, "ex2ms": {:hex, :ex2ms, "1.6.1", "66d472eb14da43087c156e0396bac3cc7176b4f24590a251db53f84e9a0f5f72", [:mix], [], "hexpm", "a7192899d84af03823a8ec2f306fa858cbcce2c2e7fd0f1c49e05168fb9c740e"}, "ex_machina": {:hex, :ex_machina, "2.7.0", "b792cc3127fd0680fecdb6299235b4727a4944a09ff0fa904cc639272cd92dc7", [:mix], [{:ecto, "~> 2.2 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_sql, "~> 3.0", [hex: :ecto_sql, repo: "hexpm", optional: true]}], "hexpm", "419aa7a39bde11894c87a615c4ecaa52d8f107bbdd81d810465186f783245bf8"}, "ex_oauth2_provider": {:git, "https://github.com/aristamd/ex_oauth2_provider.git", "8df4b62acfe858e918be38202948b6b4db142dc6", []}, @@ -71,7 +70,6 @@ "logflare_logger_backend": {:hex, :logflare_logger_backend, "0.11.4", "3a5df94e764b7c8ee4bd7b875a480a34a27807128d8459aa59ea63b2b38bddc7", [:mix], [{:bertex, "~> 1.3", [hex: :bertex, repo: "hexpm", optional: false]}, {:logflare_api_client, "~> 0.3.5", [hex: :logflare_api_client, repo: "hexpm", optional: false]}, {:logflare_etso, "~> 1.1.2", [hex: :logflare_etso, repo: "hexpm", optional: false]}, {:typed_struct, "~> 0.3.0", [hex: :typed_struct, repo: "hexpm", optional: false]}], "hexpm", "00998d81b3c481ad93d2bf25e66d1ddb1a01ad77d994e2c1a7638c6da94755c5"}, "lqueue": {:hex, :lqueue, "1.2.0", "9cde07d8595a012e61fce9c5b751cb464aff030b0944f915086994b017e694f4", [:mix], [], "hexpm", "cdb983cfc609dff425ae237b722e101f85c763ef1f2bbe363e1f1385f370285d"}, "map_keys": {:hex, :map_keys, "0.1.0", "6941966acd7460542318630e81744a26f0d9fd944dac9d1afcb5339d374dc2ca", [:mix], [{:atomic_map, "~> 0.9.2", [hex: :atomic_map, repo: "hexpm", optional: false]}, {:key_tools, "~> 0.4.0", [hex: :key_tools, repo: "hexpm", optional: false]}, {:proper_case, "~> 1.0", [hex: :proper_case, repo: "hexpm", optional: false]}], "hexpm", "edcdfa2cc828324fbbed94e42564c5978af63bf0bb551e58b5dd473e00d34e37"}, - "meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mime": {:hex, :mime, "1.6.0", "dabde576a497cef4bbdd60aceee8160e02a6c89250d6c0b29e56c0dfb00db3d2", [:mix], [], "hexpm", "31a1a8613f8321143dde1dafc36006a17d28d02bdfecb9e95a880fa7aabd19a7"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, @@ -119,6 +117,7 @@ "sobelow": {:hex, :sobelow, "0.12.2", "45f4d500e09f95fdb5a7b94c2838d6b26625828751d9f1127174055a78542cf5", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "2f0b617dce551db651145662b84c8da4f158e7abe049a76daaaae2282df01c5d"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, "statistex": {:hex, :statistex, "1.0.0", "f3dc93f3c0c6c92e5f291704cf62b99b553253d7969e9a5fa713e5481cd858a5", [:mix], [], "hexpm", "ff9d8bee7035028ab4742ff52fc80a2aa35cece833cf5319009b52f1b5a86c27"}, + "stream_data": {:hex, :stream_data, "0.6.0", "e87a9a79d7ec23d10ff83eb025141ef4915eeb09d4491f79e52f2562b73e5f47", [:mix], [], "hexpm", "b92b5031b650ca480ced047578f1d57ea6dd563f5b57464ad274718c9c29501c"}, "stripity_stripe": {:hex, :stripity_stripe, "2.9.0", "241f49bb653a2bf1b94744264c885d4e29331fbffa3334db35500cac3af05ca9", [:mix], [{:hackney, "~> 1.15", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:uri_query, "~> 0.1.2", [hex: :uri_query, repo: "hexpm", optional: false]}], "hexpm", "22702f51b949a4897922a5b622fdb4651c081d8ca82e209359a693e34a50cb50"}, "swoosh": {:hex, :swoosh, "0.25.6", "5a7db75f0c206c65d31c9da056234fe98e29835668b04b7a2bf7839e626da88f", [:mix], [{:cowboy, "~> 1.0.1 or ~> 1.1 or ~> 2.4", [hex: :cowboy, repo: "hexpm", optional: true]}, {:gen_smtp, "~> 0.13", [hex: :gen_smtp, repo: "hexpm", optional: true]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mail, "~> 0.2", [hex: :mail, repo: "hexpm", optional: true]}, {:mime, "~> 1.1", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_cowboy, ">= 1.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}], "hexpm", "5d97f8f183e50d812b37d4d27db3e393996a335004dee6477c0aff5195d5f52b"}, "telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"}, diff --git a/test/logflare_web/plugs/compressed_body_reader_test.exs b/test/logflare_web/plugs/compressed_body_reader_test.exs new file mode 100644 index 000000000..9c6bf5158 --- /dev/null +++ b/test/logflare_web/plugs/compressed_body_reader_test.exs @@ -0,0 +1,33 @@ +defmodule LogflareWeb.Plugs.CompressedBodyReaderTest do + use ExUnit.Case, async: true + use ExUnitProperties + + @subject LogflareWeb.Plugs.CompressedBodyReader + + doctest @subject + + def conn(body, headers \\ []) do + conn = Plug.Test.conn("POST", "/example", body) + + Enum.reduce(headers, conn, fn {key, value}, conn -> + Plug.Conn.put_req_header(conn, key, value) + end) + end + + property "with no `content-encoding` header data is passed through as is" do + check all(data <- binary()) do + assert {:ok, read, _} = @subject.read_body(conn(data)) + assert read == data + end + end + + property "with `content-encoding: gzip` header data is passed through as is" do + check all(data <- binary()) do + compressed = :zlib.gzip(data) + conn = conn(compressed, [{"content-encoding", "gzip"}]) + + assert {:ok, read, _} = @subject.read_body(conn) + assert read == data + end + end +end diff --git a/test/logflare_web/plugs/ndjson_parser_test.exs b/test/logflare_web/plugs/ndjson_parser_test.exs index 5f5dfdc34..39f269464 100644 --- a/test/logflare_web/plugs/ndjson_parser_test.exs +++ b/test/logflare_web/plugs/ndjson_parser_test.exs @@ -1,20 +1,11 @@ defmodule Plugs.Parsers.NDJSONTest do @moduledoc false use LogflareWeb.ConnCase + alias Plug.Parsers.NDJSON alias Logflare.TestUtils describe "Plugs.Parsers.NDJSON" do - test "decodes a ndjson log batch post request", %{conn: conn} do - conn = Plug.Conn.put_req_header(conn, "content-encoding", "gzip") - - body = TestUtils.cloudflare_log_push_body(decoded: false) - - data = TestUtils.cloudflare_log_push_body(decoded: true) - - assert NDJSON.decode({:ok, body, conn}) == {:ok, data, conn} - end - test "decodes a non-gzipped ndjson log batch post request", %{conn: conn} do body = TestUtils.cloudflare_log_push_body(decoded: false) |> :zlib.gunzip()