diff --git a/config/test.exs b/config/test.exs index 28daf46fb..e526f9d7a 100644 --- a/config/test.exs +++ b/config/test.exs @@ -49,3 +49,6 @@ config :logger, ], level: :error ] + +config :tesla, Logflare.Backends.Adaptor.WebhookAdaptor.Client, + adapter: Tesla.Mock diff --git a/lib/logflare/backends/adaptor.ex b/lib/logflare/backends/adaptor.ex index 36b9c01e0..7ab07f9ab 100644 --- a/lib/logflare/backends/adaptor.ex +++ b/lib/logflare/backends/adaptor.ex @@ -29,6 +29,7 @@ defmodule Logflare.Backends.Adaptor do @spec get_adaptor(Backend.t()) :: module() def get_adaptor(%Backend{type: type}) do case type do + :datadog -> __MODULE__.DatadogAdaptor :webhook -> __MODULE__.WebhookAdaptor :postgres -> __MODULE__.PostgresAdaptor :bigquery -> __MODULE__.BigQueryAdaptor diff --git a/lib/logflare/backends/adaptor/datadog_adaptor.ex b/lib/logflare/backends/adaptor/datadog_adaptor.ex new file mode 100644 index 000000000..f81fa2974 --- /dev/null +++ b/lib/logflare/backends/adaptor/datadog_adaptor.ex @@ -0,0 +1,72 @@ +defmodule Logflare.Backends.Adaptor.DatadogAdaptor do + @moduledoc """ + Wrapper module for `Logflare.Backends.Adaptor.WebhookAdaptor` to provide API + for DataDog logs ingestion endpoint. + """ + + use TypedStruct + + alias Logflare.Backends.Adaptor.WebhookAdaptor + + typedstruct enforce: true do + field(:api_key, String.t()) + end + + @behaviour Logflare.Backends.Adaptor + + def child_spec(arg) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [arg]} + } + end + + @impl Logflare.Backends.Adaptor + def start_link({source, backend}) do + backend = %{backend | + config: %{ + url: "https://http-intake.logs.datadoghq.com/api/v2/logs", + headers: %{ + "dd-api-key" => backend.config.api_key + } + } + } + + WebhookAdaptor.start_link({source, backend}) + end + + @impl Logflare.Backends.Adaptor + def ingest(pid, log_events, opts) do + new_events = + Enum.map(log_events, &translate_event/1) + + WebhookAdaptor.ingest(pid, new_events, opts) + end + + @impl Logflare.Backends.Adaptor + def execute_query(_ident, _query), do: {:error, :not_implemented} + + @impl Logflare.Backends.Adaptor + def cast_config(params) do + {%{}, %{api_key: :string}} + |> Ecto.Changeset.cast(params, [:api_key]) + end + + @impl Logflare.Backends.Adaptor + def validate_config(changeset) do + import Ecto.Changeset + + changeset + |> validate_required([:api_key]) + end + + defp translate_event(%Logflare.LogEvent{} = le) do + %Logflare.LogEvent{le | + body: %{ + message: Jason.encode!(le.body), + ddsource: "logflare", + service: le.source.name + } + } + end +end diff --git a/lib/logflare/backends/backend.ex b/lib/logflare/backends/backend.ex index 99d9a84c9..9142291fe 100644 --- a/lib/logflare/backends/backend.ex +++ b/lib/logflare/backends/backend.ex @@ -9,7 +9,7 @@ defmodule Logflare.Backends.Backend do alias Logflare.Source alias Logflare.User - @adaptor_types [:bigquery, :webhook, :postgres] + @adaptor_types [:bigquery, :webhook, :postgres, :datadog] typed_schema "backends" do field(:name, :string) diff --git a/mix.exs b/mix.exs index c35256e18..4b5c55c29 100644 --- a/mix.exs +++ b/mix.exs @@ -128,6 +128,7 @@ defmodule Logflare.Mixfile do {:mix_test_watch, "~> 1.0", only: [:dev, :test], runtime: false}, {:mimic, "~> 1.0", only: :test}, {:stream_data, "~> 0.6.0", only: [:dev, :test]}, + {:test_server, "~> 0.1.0", only: [:test]}, # Pagination {:scrivener_ecto, "~> 2.2"}, diff --git a/test/logflare/backends/adaptor/datadog_adaptor_test.exs b/test/logflare/backends/adaptor/datadog_adaptor_test.exs new file mode 100644 index 000000000..4f4a84ffa --- /dev/null +++ b/test/logflare/backends/adaptor/datadog_adaptor_test.exs @@ -0,0 +1,86 @@ +defmodule Logflare.Backends.Adaptor.DatadogAdaptorTest do + use Logflare.DataCase, async: false + + alias Logflare.Backends.Adaptor + + @subject Logflare.Backends.Adaptor.DatadogAdaptor + + doctest @subject + + describe "cast and validate" do + test "API key is required" do + changeset = Adaptor.cast_and_validate_config(@subject, %{}) + + refute changeset.valid? + + changeset = + Adaptor.cast_and_validate_config(@subject, %{ + "api_key" => "foobarbaz" + }) + + assert changeset.valid? + end + end + + describe "logs ingestion" do + setup do + user = insert(:user) + source = insert(:source, user: user) + + backend = + insert(:backend, type: :datadog, sources: [source], config: %{api_key: "foo-bar"}) + + pid = start_supervised!({@subject, {source, backend}}) + :timer.sleep(500) + [pid: pid, backend: backend, source: source] + end + + + test "sent logs are delivered", %{pid: pid, source: source, backend: backend} do + this = self() + ref = make_ref() + + Tesla.Mock.mock_global(fn _req -> + send(this, ref) + %Tesla.Env{status: 200, body: ""} + end) + + le = build(:log_event, source: source) + + assert :ok == @subject.ingest(pid, [le], source_id: source.id, backend_id: backend.id) + assert_receive ^ref, 2000 + end + + test "service field is set to source name", %{pid: pid, source: source, backend: backend} do + this = self() + ref = make_ref() + + Tesla.Mock.mock_global(fn req -> + send(this, {ref, Jason.decode!(req.body)}) + %Tesla.Env{status: 200, body: ""} + end) + + le = build(:log_event, source: source) + + assert :ok == @subject.ingest(pid, [le], source_id: source.id, backend_id: backend.id) + assert_receive {^ref, [log_entry]}, 2000 + assert log_entry["service"] == source.name + end + + test "message is JSON encoded log event", %{pid: pid, source: source, backend: backend} do + this = self() + ref = make_ref() + + Tesla.Mock.mock_global(fn req -> + send(this, {ref, Jason.decode!(req.body)}) + %Tesla.Env{status: 200, body: ""} + end) + + le = build(:log_event, source: source) + + assert :ok == @subject.ingest(pid, [le], source_id: source.id, backend_id: backend.id) + assert_receive {^ref, [log_entry]}, 2000 + assert Jason.decode!(log_entry["message"]) == le.body + end + end +end