Skip to content

Commit

Permalink
ft: DataDog adaptor
Browse files Browse the repository at this point in the history
This introduces wrapper adaptor over Webhook adaptor to send logs to
DataDog HTTP log ingest endpoint.
  • Loading branch information
hauleth committed Apr 23, 2024
1 parent 9dccee6 commit 2255879
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 6 deletions.
2 changes: 2 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,5 @@ config :logger,
],
level: :error
]

config :tesla, Logflare.Backends.Adaptor.WebhookAdaptor.Client, adapter: Tesla.Mock
1 change: 1 addition & 0 deletions lib/logflare/backends/adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ defmodule Logflare.Backends.Adaptor do
@spec get_adaptor(Backend.t()) :: module()
def get_adaptor(%Backend{type: type}) do
case type do
:datadog -> __MODULE__.DatadogAdaptor
:webhook -> __MODULE__.WebhookAdaptor
:postgres -> __MODULE__.PostgresAdaptor
:bigquery -> __MODULE__.BigQueryAdaptor
Expand Down
74 changes: 74 additions & 0 deletions lib/logflare/backends/adaptor/datadog_adaptor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
defmodule Logflare.Backends.Adaptor.DatadogAdaptor do
@moduledoc """
Wrapper module for `Logflare.Backends.Adaptor.WebhookAdaptor` to provide API
for DataDog logs ingestion endpoint.
"""

use TypedStruct

alias Logflare.Backends.Adaptor.WebhookAdaptor

typedstruct enforce: true do
field(:api_key, String.t())
end

@behaviour Logflare.Backends.Adaptor

def child_spec(arg) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [arg]}
}
end

@impl Logflare.Backends.Adaptor
def start_link({source, backend}) do
backend = %{
backend
| config: %{
url: "https://http-intake.logs.datadoghq.com/api/v2/logs",
headers: %{
"dd-api-key" => backend.config.api_key
}
}
}

WebhookAdaptor.start_link({source, backend})
end

@impl Logflare.Backends.Adaptor
def ingest(pid, log_events, opts) do
new_events =
Enum.map(log_events, &translate_event/1)

WebhookAdaptor.ingest(pid, new_events, opts)
end

@impl Logflare.Backends.Adaptor
def execute_query(_ident, _query), do: {:error, :not_implemented}

@impl Logflare.Backends.Adaptor
def cast_config(params) do
{%{}, %{api_key: :string}}
|> Ecto.Changeset.cast(params, [:api_key])
end

@impl Logflare.Backends.Adaptor
def validate_config(changeset) do
import Ecto.Changeset

changeset
|> validate_required([:api_key])
end

defp translate_event(%Logflare.LogEvent{} = le) do
%Logflare.LogEvent{
le
| body: %{
message: Jason.encode!(le.body),
ddsource: "logflare",
service: le.source.name
}
}
end
end
2 changes: 1 addition & 1 deletion lib/logflare/backends/backend.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Logflare.Backends.Backend do
alias Logflare.Source
alias Logflare.User

@adaptor_types [:bigquery, :webhook, :postgres]
@adaptor_types [:bigquery, :webhook, :postgres, :datadog]

typed_schema "backends" do
field(:name, :string)
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ defmodule Logflare.Mixfile do
{:mix_test_watch, "~> 1.0", only: [:dev, :test], runtime: false},
{:mimic, "~> 1.0", only: :test},
{:stream_data, "~> 0.6.0", only: [:dev, :test]},
{:test_server, "~> 0.1.0", only: [:test]},

# Pagination
{:scrivener_ecto, "~> 2.2"},
Expand Down
12 changes: 7 additions & 5 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
%{
"atomic_map": {:hex, :atomic_map, "0.9.3", "3c7f1302e0590164732d08ca999708efbb2cd768abf2911cf140280ce2dc499d", [:mix], [], "hexpm", "c237babf301bd2435bd85b96cffc973022b4cbb7721537059ee0dd3bb74938d2"},
"bandit": {:hex, :bandit, "1.1.0", "1414e65916229d4ee0914f6d4e7f8ec16c6f2d90e01ad5174d89e90baa577625", [:mix], [{:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "4891fb2f48a83445da70a4e949f649a9b4032310f1f640f4a8a372bc91cece18"},
"bandit": {:hex, :bandit, "1.4.2", "a1475c8dcbffd1f43002797f99487a64c8444753ff2b282b52409e279488e1f5", [:mix], [{:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "3db8bacea631bd926cc62ccad58edfee4252d1b4c5cccbbad9825df2722b884f"},
"benchee": {:hex, :benchee, "1.3.0", "f64e3b64ad3563fa9838146ddefb2d2f94cf5b473bdfd63f5ca4d0657bf96694", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}, {:statistex, "~> 1.0", [hex: :statistex, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "34f4294068c11b2bd2ebf2c59aac9c7da26ffa0068afdf3419f1b176e16c5f81"},
"benchee_async": {:hex, :benchee_async, "0.1.2", "38e686bd9cdf7ae8767a08c0df6932b625b166fd97267d0437c715f9ff424bc1", [:mix], [], "hexpm", "73b0bd95173f86c61d41570adaeb151efe70c0cc273eec6501d391b35e3599dd"},
"bertex": {:hex, :bertex, "1.3.0", "0ad0df9159b5110d9d2b6654f72fbf42a54884ef43b6b651e6224c0af30ba3cb", [:mix], [], "hexpm", "0a5d5e478bb5764b7b7bae37cae1ca491200e58b089df121a2fe1c223d8ee57a"},
Expand All @@ -15,9 +15,9 @@
"configcat": {:hex, :configcat, "2.0.1", "cffd7e6ba7a4c41e1e6bbb706379192a1be7cd848bb6b098d4ed054b13c18f9d", [:mix], [{:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:httpoison, "~> 1.7", [hex: :httpoison, repo: "hexpm", optional: false]}, {:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "3e4a239a613d2acbcee7103a6a426c4ae52882ae65bf48cdb5c1247877b65112"},
"contex": {:hex, :contex, "0.3.0", "d390713efee604702600ba801a481bcb8534a9af43e118b29d9d37fe4495fcba", [:mix], [{:nimble_strftime, "~> 0.1.0", [hex: :nimble_strftime, repo: "hexpm", optional: false]}], "hexpm", "3fa7535cc3b265691a4eabc2707fe8622aa60a2565145a14da9aebd613817652"},
"cors_plug": {:hex, :cors_plug, "2.0.3", "316f806d10316e6d10f09473f19052d20ba0a0ce2a1d910ddf57d663dac402ae", [:mix], [{:plug, "~> 1.8", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "ee4ae1418e6ce117fc42c2ba3e6cbdca4e95ecd2fe59a05ec6884ca16d469aea"},
"cowboy": {:hex, :cowboy, "2.10.0", "ff9ffeff91dae4ae270dd975642997afe2a1179d94b1887863e43f681a203e26", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "3afdccb7183cc6f143cb14d3cf51fa00e53db9ec80cdcd525482f5e99bc41d6b"},
"cowboy": {:hex, :cowboy, "2.12.0", "f276d521a1ff88b2b9b4c54d0e753da6c66dd7be6c9fca3d9418b561828a3731", [:make, :rebar3], [{:cowlib, "2.13.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "8a7abe6d183372ceb21caa2709bec928ab2b72e18a3911aa1771639bef82651e"},
"cowboy_telemetry": {:hex, :cowboy_telemetry, "0.3.1", "ebd1a1d7aff97f27c66654e78ece187abdc646992714164380d8a041eda16754", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3a6efd3366130eab84ca372cbd4a7d3c3a97bdfcfb4911233b035d117063f0af"},
"cowlib": {:hex, :cowlib, "2.12.1", "a9fa9a625f1d2025fe6b462cb865881329b5caff8f1854d1cbc9f9533f00e1e1", [:make, :rebar3], [], "hexpm", "163b73f6367a7341b33c794c4e88e7dbfe6498ac42dcd69ef44c5bc5507c8db0"},
"cowlib": {:hex, :cowlib, "2.13.0", "db8f7505d8332d98ef50a3ef34b34c1afddec7506e4ee4dd4a3a266285d282ca", [:make, :rebar3], [], "hexpm", "e1e1284dc3fc030a64b1ad0d8382ae7e99da46c3246b815318a4b848873800a4"},
"credo": {:hex, :credo, "1.7.1", "6e26bbcc9e22eefbff7e43188e69924e78818e2fe6282487d0703652bc20fd62", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "e9871c6095a4c0381c89b6aa98bc6260a8ba6addccf7f6a53da8849c748a58a2"},
"crontab": {:hex, :crontab, "1.1.13", "3bad04f050b9f7f1c237809e42223999c150656a6b2afbbfef597d56df2144c5", [:mix], [{:ecto, "~> 1.0 or ~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}], "hexpm", "d67441bec989640e3afb94e123f45a2bc42d76e02988c9613885dc3d01cf7085"},
"db_connection": {:hex, :db_connection, "2.6.0", "77d835c472b5b67fc4f29556dee74bf511bbafecdcaf98c27d27fa5918152086", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c2f992d15725e721ec7fbc1189d4ecdb8afef76648c746a8e1cad35e3b8a35f3"},
Expand Down Expand Up @@ -102,7 +102,7 @@
"phoenix_view": {:hex, :phoenix_view, "2.0.3", "4d32c4817fce933693741deeb99ef1392619f942633dde834a5163124813aad3", [:mix], [{:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}], "hexpm", "cd34049af41be2c627df99cd4eaa71fc52a328c0c3d8e7d4aa28f880c30e7f64"},
"plug": {:hex, :plug, "1.15.3", "712976f504418f6dff0a3e554c40d705a9bcf89a7ccef92fc6a5ef8f16a30a97", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "cc4365a3c010a56af402e0809208873d113e9c38c401cabd88027ef4f5c01fd2"},
"plug_caisson": {:hex, :plug_caisson, "0.2.1", "aa6a45a4f0e674459b8881d742cc0e8c7d5d0e008a29fe84dc10ab95d6fcfa74", [:mix], [{:brotli, "~> 0.3.2", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:plug, "~> 1.15", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "661887bca916122c31717842fa6496c5a4d92c22b5dbef6bd6973d28188939cc"},
"plug_cowboy": {:hex, :plug_cowboy, "2.6.2", "753611b23b29231fb916b0cdd96028084b12aff57bfd7b71781bd04b1dbeb5c9", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "951ed2433df22f4c97b85fdb145d4cee561f36b74854d64c06d896d7cd2921a7"},
"plug_cowboy": {:hex, :plug_cowboy, "2.7.1", "87677ffe3b765bc96a89be7960f81703223fe2e21efa42c125fcd0127dd9d6b2", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "02dbd5f9ab571b864ae39418db7811618506256f6d13b4a45037e5fe78dc5de3"},
"plug_crypto": {:hex, :plug_crypto, "1.2.5", "918772575e48e81e455818229bf719d4ab4181fcbf7f85b68a35620f78d89ced", [:mix], [], "hexpm", "26549a1d6345e2172eb1c233866756ae44a9609bd33ee6f99147ab3fd87fd842"},
"poison": {:hex, :poison, "5.0.0", "d2b54589ab4157bbb82ec2050757779bfed724463a544b6e20d79855a9e43b24", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "11dc6117c501b80c62a7594f941d043982a1bd05a1184280c0d9166eb4d8d3fc"},
"postgrex": {:hex, :postgrex, "0.17.3", "c92cda8de2033a7585dae8c61b1d420a1a1322421df84da9a82a6764580c503d", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "946cf46935a4fdca7a81448be76ba3503cff082df42c6ec1ff16a4bdfbfb098d"},
Expand All @@ -127,7 +127,8 @@
"telemetry_metrics": {:hex, :telemetry_metrics, "0.6.1", "315d9163a1d4660aedc3fee73f33f1d355dcc76c5c3ab3d59e76e3edf80eef1f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7be9e0871c41732c233be71e4be11b96e56177bf15dde64a8ac9ce72ac9834c6"},
"telemetry_poller": {:hex, :telemetry_poller, "0.5.1", "21071cc2e536810bac5628b935521ff3e28f0303e770951158c73eaaa01e962a", [:rebar3], [{:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "4cab72069210bc6e7a080cec9afffad1b33370149ed5d379b81c7c5f0c663fd4"},
"tesla": {:hex, :tesla, "1.8.0", "d511a4f5c5e42538d97eef7c40ec4f3e44effdc5068206f42ed859e09e51d1fd", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: true]}, {:finch, "~> 0.13", [hex: :finch, repo: "hexpm", optional: true]}, {:fuse, "~> 2.4", [hex: :fuse, repo: "hexpm", optional: true]}, {:gun, ">= 1.0.0", [hex: :gun, repo: "hexpm", optional: true]}, {:hackney, "~> 1.6", [hex: :hackney, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.2", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: true]}, {:msgpax, "~> 2.3", [hex: :msgpax, repo: "hexpm", optional: true]}, {:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "10501f360cd926a309501287470372af1a6e1cbed0f43949203a4c13300bc79f"},
"thousand_island": {:hex, :thousand_island, "1.2.0", "4f548ae771ab5f96bc7e199f9824c0c2ce6d365f8c93f5f64dbbb33988e484bf", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "521671fea179672addb6af46455fc2a77be1edda4c0ed351633e0ef37a4b3584"},
"test_server": {:hex, :test_server, "0.1.16", "403d6cebaa7ad1d08c0ca9475af48836c45ae0f64cda9f4e88b0f17310c5c452", [:mix], [{:bandit, ">= 1.4.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, ">= 2.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:x509, "~> 0.6", [hex: :x509, repo: "hexpm", optional: false]}], "hexpm", "f27cfc858d494e4df413a962fbe4d7aa3daf1a3824307f0b046bb8cec6ac8e42"},
"thousand_island": {:hex, :thousand_island, "1.3.5", "6022b6338f1635b3d32406ff98d68b843ba73b3aa95cfc27154223244f3a6ca5", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2be6954916fdfe4756af3239fb6b6d75d0b8063b5df03ba76fd8a4c87849e180"},
"timber_logfmt": {:git, "https://github.com/Logflare/logfmt-elixir.git", "9766367ccb3014b47b0a621a489261011dcea769", []},
"timex": {:hex, :timex, "3.7.11", "bb95cb4eb1d06e27346325de506bcc6c30f9c6dea40d1ebe390b262fad1862d1", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:gettext, "~> 0.20", [hex: :gettext, repo: "hexpm", optional: false]}, {:tzdata, "~> 1.1", [hex: :tzdata, repo: "hexpm", optional: false]}], "hexpm", "8b9024f7efbabaf9bd7aa04f65cf8dcd7c9818ca5737677c7b76acbc6a94d1aa"},
"toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"},
Expand All @@ -144,4 +145,5 @@
"warpath": {:hex, :warpath, "0.5.0", "906ca2efe1bbd07f2dafc2159897e2cb3068e567210c81c2d9858addc172df50", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "a9b4f518bfce91818fd52bc8e7656259844167436f14c0c1a01a6ca7e061f671"},
"websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"},
"websock_adapter": {:hex, :websock_adapter, "0.5.5", "9dfeee8269b27e958a65b3e235b7e447769f66b5b5925385f5a569269164a210", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "4b977ba4a01918acbf77045ff88de7f6972c2a009213c515a445c48f224ffce9"},
"x509": {:hex, :x509, "0.8.8", "aaf5e58b19a36a8e2c5c5cff0ad30f64eef5d9225f0fd98fb07912ee23f7aba3", [:mix], [], "hexpm", "ccc3bff61406e5bb6a63f06d549f3dba3a1bbb456d84517efaaa210d8a33750f"},
}
85 changes: 85 additions & 0 deletions test/logflare/backends/adaptor/datadog_adaptor_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
defmodule Logflare.Backends.Adaptor.DatadogAdaptorTest do
use Logflare.DataCase, async: false

alias Logflare.Backends.Adaptor

@subject Logflare.Backends.Adaptor.DatadogAdaptor

doctest @subject

describe "cast and validate" do
test "API key is required" do
changeset = Adaptor.cast_and_validate_config(@subject, %{})

refute changeset.valid?

changeset =
Adaptor.cast_and_validate_config(@subject, %{
"api_key" => "foobarbaz"
})

assert changeset.valid?
end
end

describe "logs ingestion" do
setup do
user = insert(:user)
source = insert(:source, user: user)

backend =
insert(:backend, type: :datadog, sources: [source], config: %{api_key: "foo-bar"})

pid = start_supervised!({@subject, {source, backend}})
:timer.sleep(500)
[pid: pid, backend: backend, source: source]
end

test "sent logs are delivered", %{pid: pid, source: source, backend: backend} do
this = self()
ref = make_ref()

Tesla.Mock.mock_global(fn _req ->
send(this, ref)
%Tesla.Env{status: 200, body: ""}
end)

le = build(:log_event, source: source)

assert :ok == @subject.ingest(pid, [le], source_id: source.id, backend_id: backend.id)
assert_receive ^ref, 2000
end

test "service field is set to source name", %{pid: pid, source: source, backend: backend} do
this = self()
ref = make_ref()

Tesla.Mock.mock_global(fn req ->
send(this, {ref, Jason.decode!(req.body)})
%Tesla.Env{status: 200, body: ""}
end)

le = build(:log_event, source: source)

assert :ok == @subject.ingest(pid, [le], source_id: source.id, backend_id: backend.id)
assert_receive {^ref, [log_entry]}, 2000
assert log_entry["service"] == source.name
end

test "message is JSON encoded log event", %{pid: pid, source: source, backend: backend} do
this = self()
ref = make_ref()

Tesla.Mock.mock_global(fn req ->
send(this, {ref, Jason.decode!(req.body)})
%Tesla.Env{status: 200, body: ""}
end)

le = build(:log_event, source: source)

assert :ok == @subject.ingest(pid, [le], source_id: source.id, backend_id: backend.id)
assert_receive {^ref, [log_entry]}, 2000
assert Jason.decode!(log_entry["message"]) == le.body
end
end
end

0 comments on commit 2255879

Please sign in to comment.