Skip to content

Commit

Permalink
ft: DataDog adaptor
Browse files Browse the repository at this point in the history
This introduces wrapper adaptor over Webhook adaptor to send logs to
DataDog HTTP log ingest endpoint.
  • Loading branch information
hauleth committed Apr 10, 2024
1 parent f7d4073 commit a688a23
Show file tree
Hide file tree
Showing 6 changed files with 310 additions and 1 deletion.
1 change: 1 addition & 0 deletions lib/logflare/backends/adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ defmodule Logflare.Backends.Adaptor do
@spec get_adaptor(Backend.t()) :: module()
def get_adaptor(%Backend{type: type}) do
case type do
:datadog -> __MODULE__.DatadogAdaptor
:webhook -> __MODULE__.WebhookAdaptor
:postgres -> __MODULE__.PostgresAdaptor
:bigquery -> __MODULE__.BigQueryAdaptor
Expand Down
192 changes: 192 additions & 0 deletions lib/logflare/backends/adaptor/datadog_adaptor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
defmodule Logflare.Backends.Adaptor.DatadogAdaptor do
@moduledoc """
Wrapper module for `Logflare.Backends.Adaptor.WebhookAdaptor` to provide API
for DataDog logs ingestion endpoint.
"""

use TypedStruct

defmodule Client do
use GenServer

require Logger

# - `queue` - list of elements in queue
# - `size` - amount of elements in queue (cached to not use `length/1` all
# the time)
# - `threshold` - maximal amount of items in queue before sending
# - `timeout` - time to wait before sending all messages in queue
# - `timer` - reference to timer used
defstruct queue: [],
size: 0,
threshold: 100,
timeout: 1000,
timer: nil,
url: nil,
method: :post,
headers: []

def start_link(opts), do: GenServer.start_link(__MODULE__, Map.new(opts))

def ingest(_pid, []), do: :ok
def ingest(pid, logs), do: GenServer.cast(pid, {:events, logs})

@impl GenServer
def init(opts) do
{:ok,
%__MODULE__{
threshold: opts[:threshold] || 100,
timeout: opts[:timeout] || 1000,
method: opts[:method] || :post,
url: Map.fetch!(opts, :url),
headers: Enum.to_list(opts[:headers] || [])
}}
end

@impl GenServer
def handle_cast({:events, events}, %__MODULE__{} = state) do
len = length(events)

new_state = %__MODULE__{
state
| queue: events ++ state.queue,
size: len + state.size,
timer: start_timer(state.timer, state.timeout)
}

if new_state.size >= new_state.threshold do
{:noreply, new_state, {:continue, :send}}
else
{:noreply, new_state}
end
end

@impl GenServer
def handle_info(:timeout, state) do
{:noreply, state, {:continue, :send}}
end

@impl GenServer
def handle_continue(:send, state) do
send_logs(state)
{:noreply, %__MODULE__{state |
queue: [],
size: 0,
timer: cancel_timer(state.timer)}}
end

defp start_timer(nil, timeout) do
{:ok, ref} = :timer.send_after(timeout, :timeout)

ref
end

defp start_timer(ref, _timeout), do: ref

defp cancel_timer(nil), do: nil
defp cancel_timer(ref) do
:timer.cancel(ref)
nil
end

defp send_logs(%__MODULE__{} = state) do
middlewares = [
Tesla.Middleware.Telemetry,
Tesla.Middleware.JSON
]

client = Tesla.client(middlewares)

Task.start(fn ->
case Tesla.request(client,
url: state.url,
method: state.method,
headers: state.headers,
body: state.queue
) do
{:ok, %Tesla.Env{status: status}} when status in 200..299 ->
:ok

{:ok, %Tesla.Env{} = resp} ->
Logger.error(resp, report_cb: &__MODULE__.__report_cb__/2)
:error
end
end)
end

@doc false
def __report_cb__(%Tesla.Env{} = report, _opts) do
"#{report.method} #{report.url} request failed with HTTP #{report.status} - #{inspect(report.body)}"
end

def __report_cb__(report, _opts) do
inspect(report)
end
end

@behaviour Logflare.Backends.Adaptor

@impl Logflare.Backends.Adaptor
def start_link({_source, backend}) do
Client.start_link(
url: "https://http-intake.logs.datadoghq.com/api/v2/logs",
headers: [{"dd-api-key", backend.config.api_key}]
)
end

@impl Logflare.Backends.Adaptor
def ingest(pid, log_events, _opts) do
new_events =
Enum.map(log_events, &translate_event/1)

Client.ingest(pid, new_events)
end

@impl Logflare.Backends.Adaptor
def execute_query(_ident, _query), do: {:error, :not_implemented}

@impl Logflare.Backends.Adaptor
def cast_config(params) do
{%{}, %{api_key: :string}}
|> Ecto.Changeset.cast(params, [:api_key])
end

@impl Logflare.Backends.Adaptor
def validate_config(changeset) do
import Ecto.Changeset

changeset
|> validate_required([:api_key])
end

defp translate_event(%Logflare.LogEvent{} = le) do
%{
message: le.body.event_message,
ddsource: "logflare",
ddtags: build_tags(le.body),
service: le.source.name
}
end

defp build_tags(body) do
body
|> Map.drop([:event_message])
|> Enum.map_join(",", fn {key, value} ->
do_build(value, key)
end)
end

defp do_build(map, parent) when is_map(map) do
for {key, value} <- map,
sub <- do_build(value, "#{parent}.#{key}"),
do: sub
end

defp do_build(list, parent) when is_list(list) do
for {value, key} <- Enum.with_index(list),
sub <- do_build(value, "#{parent}.#{key}"),
do: sub
end

defp do_build(other, key), do: ["#{key}:#{other}"]
end
2 changes: 1 addition & 1 deletion lib/logflare/backends/backend.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Logflare.Backends.Backend do
alias Logflare.Source
alias Logflare.User

@adaptor_types [:bigquery, :webhook, :postgres]
@adaptor_types [:bigquery, :webhook, :postgres, :datadog]

typed_schema "backends" do
field(:name, :string)
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ defmodule Logflare.Mixfile do
{:mix_test_watch, "~> 1.0", only: [:dev, :test], runtime: false},
{:mimic, "~> 1.0", only: :test},
{:stream_data, "~> 0.6.0", only: [:dev, :test]},
{:test_server, "~> 0.1.0", only: [:test]},

# Pagination
{:scrivener_ecto, "~> 2.2"},
Expand Down
2 changes: 2 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
"telemetry_metrics": {:hex, :telemetry_metrics, "0.6.1", "315d9163a1d4660aedc3fee73f33f1d355dcc76c5c3ab3d59e76e3edf80eef1f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7be9e0871c41732c233be71e4be11b96e56177bf15dde64a8ac9ce72ac9834c6"},
"telemetry_poller": {:hex, :telemetry_poller, "0.5.1", "21071cc2e536810bac5628b935521ff3e28f0303e770951158c73eaaa01e962a", [:rebar3], [{:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "4cab72069210bc6e7a080cec9afffad1b33370149ed5d379b81c7c5f0c663fd4"},
"tesla": {:hex, :tesla, "1.8.0", "d511a4f5c5e42538d97eef7c40ec4f3e44effdc5068206f42ed859e09e51d1fd", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: true]}, {:finch, "~> 0.13", [hex: :finch, repo: "hexpm", optional: true]}, {:fuse, "~> 2.4", [hex: :fuse, repo: "hexpm", optional: true]}, {:gun, ">= 1.0.0", [hex: :gun, repo: "hexpm", optional: true]}, {:hackney, "~> 1.6", [hex: :hackney, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.2", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: true]}, {:msgpax, "~> 2.3", [hex: :msgpax, repo: "hexpm", optional: true]}, {:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "10501f360cd926a309501287470372af1a6e1cbed0f43949203a4c13300bc79f"},
"test_server": {:hex, :test_server, "0.1.15", "41768c856cef1d2ad5a91416b7938c5f645b55d1ca4bcebda74b2d877e7cbed9", [:mix], [{:bandit, ">= 0.7.6", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, ">= 2.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:x509, "~> 0.6", [hex: :x509, repo: "hexpm", optional: false]}], "hexpm", "03c02c51a3105cc137dc1a73f81d4311e02785df18392e0070fbb831c71ce610"},
"thousand_island": {:hex, :thousand_island, "1.2.0", "4f548ae771ab5f96bc7e199f9824c0c2ce6d365f8c93f5f64dbbb33988e484bf", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "521671fea179672addb6af46455fc2a77be1edda4c0ed351633e0ef37a4b3584"},
"timber_logfmt": {:git, "https://github.com/Logflare/logfmt-elixir.git", "9766367ccb3014b47b0a621a489261011dcea769", []},
"timex": {:hex, :timex, "3.7.11", "bb95cb4eb1d06e27346325de506bcc6c30f9c6dea40d1ebe390b262fad1862d1", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:gettext, "~> 0.20", [hex: :gettext, repo: "hexpm", optional: false]}, {:tzdata, "~> 1.1", [hex: :tzdata, repo: "hexpm", optional: false]}], "hexpm", "8b9024f7efbabaf9bd7aa04f65cf8dcd7c9818ca5737677c7b76acbc6a94d1aa"},
Expand All @@ -144,4 +145,5 @@
"warpath": {:hex, :warpath, "0.5.0", "906ca2efe1bbd07f2dafc2159897e2cb3068e567210c81c2d9858addc172df50", [:mix], [{:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "a9b4f518bfce91818fd52bc8e7656259844167436f14c0c1a01a6ca7e061f671"},
"websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"},
"websock_adapter": {:hex, :websock_adapter, "0.5.5", "9dfeee8269b27e958a65b3e235b7e447769f66b5b5925385f5a569269164a210", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "4b977ba4a01918acbf77045ff88de7f6972c2a009213c515a445c48f224ffce9"},
"x509": {:hex, :x509, "0.8.8", "aaf5e58b19a36a8e2c5c5cff0ad30f64eef5d9225f0fd98fb07912ee23f7aba3", [:mix], [], "hexpm", "ccc3bff61406e5bb6a63f06d549f3dba3a1bbb456d84517efaaa210d8a33750f"},
}
113 changes: 113 additions & 0 deletions test/logflare/backends/adaptor/datadog_adaptor_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
defmodule Logflare.Backends.Adaptor.DatadogAdaptorTest do
use Logflare.DataCase, async: false

alias Logflare.Backends.Adaptor

@subject Logflare.Backends.Adaptor.DatadogAdaptor

doctest @subject

describe "cast and validate" do
test "API key is required" do
changeset = Adaptor.cast_and_validate_config(@subject, %{})

refute changeset.valid?

changeset =
Adaptor.cast_and_validate_config(@subject, %{
"api_key" => "foobarbaz"
})

assert changeset.valid?
end
end

describe "ingest" do
setup do
this = self()

TestServer.add("/",
via: :post,
match: fn conn ->
{:ok, body, conn} = Plug.Conn.read_body(conn)
events = Jason.decode(body)

send(this, events)

Plug.Conn.send_resp(conn, 200, "")
end
)

pid =
start_supervised!({
@subject.Client,
timeout: 100, threshold: 3, url: TestServer.url(), headers: [{"foo", "bar"}]
})

{:ok, pid: pid}
end

test "after timeout it sends messages", %{pid: pid} do
events = [
%Logflare.LogEvent{
body: %{event_message: "hello world"},
source: %{name: "sample"}
}
]

@subject.ingest(pid, events, [])

assert_receive {:ok, resp}, 200
assert length(resp) == 1
end

test "after sent if there is more events than threshold", %{pid: pid} do
events = List.duplicate(
%Logflare.LogEvent{
body: %{event_message: "hello world"},
source: %{name: "sample"}
}, 5
)

@subject.ingest(pid, events, [])

assert_receive {:ok, resp}, 200
assert length(resp) == 5
end

test "after sent if there is more events than threshold after second ingestion", %{pid: pid} do
events = List.duplicate(
%Logflare.LogEvent{
body: %{event_message: "hello world"},
source: %{name: "sample"}
}, 2
)

@subject.ingest(pid, events, [])
@subject.ingest(pid, events, [])

assert_receive {:ok, resp}, 200
assert length(resp) == 4
end

test "metadata other than `event_message` is stored in `ddtags`", %{pid: pid} do
events = [
%Logflare.LogEvent{
body: %{
event_message: "hello world",
foo: "foo",
bar: %{baz: 4}
},
source: %{name: "sample"}
}
]

@subject.ingest(pid, events, [])

assert_receive {:ok, [event]}, 200
assert %{"ddtags" => tags} = event
assert tags =~ ~r/foo:foo/
assert tags =~ ~r/bar\.baz:4/
end
end
end

0 comments on commit a688a23

Please sign in to comment.