Skip to content

Commit

Permalink
ft: DataDog adaptor
Browse files Browse the repository at this point in the history
This introduces wrapper adaptor over Webhook adaptor to send logs to
DataDog HTTP log ingest endpoint.
  • Loading branch information
hauleth committed Apr 28, 2024
1 parent 4cf5ada commit 5afab89
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 1 deletion.
3 changes: 3 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,6 @@ config :logger,
],
level: :error
]

config :tesla, Logflare.Backends.Adaptor.WebhookAdaptor.Client,
adapter: Tesla.Mock
1 change: 1 addition & 0 deletions lib/logflare/backends/adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ defmodule Logflare.Backends.Adaptor do
@spec get_adaptor(Backend.t()) :: module()
def get_adaptor(%Backend{type: type}) do
case type do
:datadog -> __MODULE__.DatadogAdaptor
:webhook -> __MODULE__.WebhookAdaptor
:postgres -> __MODULE__.PostgresAdaptor
:bigquery -> __MODULE__.BigQueryAdaptor
Expand Down
72 changes: 72 additions & 0 deletions lib/logflare/backends/adaptor/datadog_adaptor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
defmodule Logflare.Backends.Adaptor.DatadogAdaptor do
@moduledoc """
Wrapper module for `Logflare.Backends.Adaptor.WebhookAdaptor` to provide API
for DataDog logs ingestion endpoint.
"""

use TypedStruct

alias Logflare.Backends.Adaptor.WebhookAdaptor

typedstruct enforce: true do
field(:api_key, String.t())
end

@behaviour Logflare.Backends.Adaptor

def child_spec(arg) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [arg]}
}
end

@impl Logflare.Backends.Adaptor
def start_link({source, backend}) do
backend = %{backend |
config: %{
url: "https://http-intake.logs.datadoghq.com/api/v2/logs",
headers: %{
"dd-api-key" => backend.config.api_key
}
}
}

WebhookAdaptor.start_link({source, backend})
end

@impl Logflare.Backends.Adaptor
def ingest(pid, log_events, opts) do
new_events =
Enum.map(log_events, &translate_event/1)

WebhookAdaptor.ingest(pid, new_events, opts)
end

@impl Logflare.Backends.Adaptor
def execute_query(_ident, _query), do: {:error, :not_implemented}

@impl Logflare.Backends.Adaptor
def cast_config(params) do
{%{}, %{api_key: :string}}
|> Ecto.Changeset.cast(params, [:api_key])
end

@impl Logflare.Backends.Adaptor
def validate_config(changeset) do
import Ecto.Changeset

changeset
|> validate_required([:api_key])
end

defp translate_event(%Logflare.LogEvent{} = le) do
%Logflare.LogEvent{le |
body: %{
message: Jason.encode!(le.body),
ddsource: "logflare",
service: le.source.name
}
}
end
end
2 changes: 1 addition & 1 deletion lib/logflare/backends/backend.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Logflare.Backends.Backend do
alias Logflare.Source
alias Logflare.User

@adaptor_types [:bigquery, :webhook, :postgres]
@adaptor_types [:bigquery, :webhook, :postgres, :datadog]

typed_schema "backends" do
field(:name, :string)
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ defmodule Logflare.Mixfile do
{:mix_test_watch, "~> 1.0", only: [:dev, :test], runtime: false},
{:mimic, "~> 1.0", only: :test},
{:stream_data, "~> 0.6.0", only: [:dev, :test]},
{:test_server, "~> 0.1.0", only: [:test]},

# Pagination
{:scrivener_ecto, "~> 2.2"},
Expand Down
86 changes: 86 additions & 0 deletions test/logflare/backends/adaptor/datadog_adaptor_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
defmodule Logflare.Backends.Adaptor.DatadogAdaptorTest do
use Logflare.DataCase, async: false

alias Logflare.Backends.Adaptor

@subject Logflare.Backends.Adaptor.DatadogAdaptor

doctest @subject

describe "cast and validate" do
test "API key is required" do
changeset = Adaptor.cast_and_validate_config(@subject, %{})

refute changeset.valid?

changeset =
Adaptor.cast_and_validate_config(@subject, %{
"api_key" => "foobarbaz"
})

assert changeset.valid?
end
end

describe "logs ingestion" do
setup do
user = insert(:user)
source = insert(:source, user: user)

backend =
insert(:backend, type: :datadog, sources: [source], config: %{api_key: "foo-bar"})

pid = start_supervised!({@subject, {source, backend}})
:timer.sleep(500)
[pid: pid, backend: backend, source: source]
end


test "sent logs are delivered", %{pid: pid, source: source, backend: backend} do
this = self()
ref = make_ref()

Tesla.Mock.mock_global(fn _req ->
send(this, ref)
%Tesla.Env{status: 200, body: ""}
end)

le = build(:log_event, source: source)

assert :ok == @subject.ingest(pid, [le], source_id: source.id, backend_id: backend.id)
assert_receive ^ref, 2000
end

test "service field is set to source name", %{pid: pid, source: source, backend: backend} do
this = self()
ref = make_ref()

Tesla.Mock.mock_global(fn req ->
send(this, {ref, Jason.decode!(req.body)})
%Tesla.Env{status: 200, body: ""}
end)

le = build(:log_event, source: source)

assert :ok == @subject.ingest(pid, [le], source_id: source.id, backend_id: backend.id)
assert_receive {^ref, [log_entry]}, 2000
assert log_entry["service"] == source.name
end

test "message is JSON encoded log event", %{pid: pid, source: source, backend: backend} do
this = self()
ref = make_ref()

Tesla.Mock.mock_global(fn req ->
send(this, {ref, Jason.decode!(req.body)})
%Tesla.Env{status: 200, body: ""}
end)

le = build(:log_event, source: source)

assert :ok == @subject.ingest(pid, [le], source_id: source.id, backend_id: backend.id)
assert_receive {^ref, [log_entry]}, 2000
assert Jason.decode!(log_entry["message"]) == le.body
end
end
end

0 comments on commit 5afab89

Please sign in to comment.