Skip to content

Commit

Permalink
Setup monitoring pipeline for metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
zacksiri committed Oct 25, 2024
1 parent 9b7058e commit f7cfd77
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 30 deletions.
4 changes: 4 additions & 0 deletions lib/uplink/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ defmodule Uplink.Application do
{Cluster.Supervisor, [topologies, [name: Uplink.ClusterSupervisor]]},
{Task.Supervisor, name: Uplink.TaskSupervisor},
{Plug.Cowboy, plug: Uplink.Internal, scheme: :http, port: internal_port},
{Uplink.Pipelines.Context, [name: :metrics, monitors: []]},
{Pogo.DynamicSupervisor,
[name: Uplink.PipelineSupervisor, scope: :uplink]},
{Uplink.Monitors, []},
{
Plug.Cowboy,
plug: Uplink.Router,
Expand Down
19 changes: 11 additions & 8 deletions lib/uplink/metrics/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ defmodule Uplink.Metrics.Pipeline do

alias Broadway.Message

alias Uplink.Pipelines

alias Uplink.Metrics
alias Uplink.Metrics.Document

def start_link(opts) do
monitors = Keyword.fetch!(opts, :monitors)
require Logger

def start_link(_opts \\ []) do
Broadway.start_link(__MODULE__,
name: {:global, __MODULE__},
context: %{
monitors: monitors
},
name: __MODULE__,
context: :metrics,
producer: [
module: {Uplink.Metrics.Producer, [poll_interval: :timer.seconds(15)]},
concurrency: 1
Expand All @@ -33,6 +33,8 @@ defmodule Uplink.Metrics.Pipeline do
)
|> case do
{:ok, pid} ->
Logger.info("[Uplink.Metrics.Pipeline] Started...")

{:ok, pid}

{:error, {:already_started, pid}} ->
Expand Down Expand Up @@ -63,10 +65,11 @@ defmodule Uplink.Metrics.Pipeline do

def handle_batch(_, messages, _batch_info, context) do
documents = to_ndjson(messages)
monitors = Pipelines.get_monitors(context)

context.monitors
monitors
|> Enum.map(fn monitor ->
Metrics.push!(context.monitor, documents)
Metrics.push!(monitor, documents)
end)

messages
Expand Down
50 changes: 37 additions & 13 deletions lib/uplink/monitors.ex
Original file line number Diff line number Diff line change
@@ -1,33 +1,57 @@
defmodule Uplink.Monitors do
use Task

alias Uplink.Cache
alias Uplink.Pipelines
alias Uplink.Clients.Instellar

require Logger

@pipeline_modules %{
metrics: Uplink.Metrics.Pipeline
}

def start_link(options) do
Task.start_link(__MODULE__, :run, options)
Task.start_link(__MODULE__, :run, [options])
end

def run(options) do
def run(_options) do
Instellar.list_monitors()
|> case do
{:ok, %{body: monitors}} ->
state = maybe_start_pipeline(monitors)
{:ok, monitors} ->
start_pipeline(monitors, :metrics)

error ->
{:error, error}
end
end

defp maybe_start_pipeline(monitors) do
Cache.transaction([keys: [:monitors]], fn ->
started_monitors = Cache.get(:monitors)
defp start_pipeline(monitors, context) do
Logger.info("[Uplink.Monitors] Starting pipeline...")

started_metrics_monitor_ids =
Pipelines.get_monitors(context)
|> Enum.map(fn monitor ->
monitor["attributes"]["id"]
end)

not_started_monitors =
Enum.filter(monitors, fn monitor ->
monitor["attributes"]["id"] not in started_metrics_monitor_ids
end)

grouped_monitors =
Enum.group_by(not_started_monitors, fn monitor ->
monitor["attributes"]["type"]
end)

context_monitors = Map.get(grouped_monitors, "#{context}")

if Enum.count(context_monitors) > 0 do
Pipelines.append_monitors(context, context_monitors)
end

module = Map.fetch!(@pipeline_modules, context)

not_started_monitors =
Enum.filter(monitors, fn monitor ->
monitor["attributes"]["id"] not in started_monitors
end)
end)
Pipelines.start(module)
end
end
23 changes: 14 additions & 9 deletions lib/uplink/pipelines.ex
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
defmodule Uplink.Pipelines do
use DynamicSupervisor
defdelegate get_monitors(context),
to: __MODULE__.Context,
as: :get

def start_link(options) do
DynamicSupervisor.start_link(__MODULE__, options, name: __MODULE__)
end
defdelegate append_monitors(context, monitors),
to: __MODULE__.Context,
as: :append

def start_metrics(monitors) do
spec = {Uplink.Metrics.Pipeline, monitors: monitors}
def start(module) do
spec = %{
id: module,
start: {module, :start_link, []}
}

DynamicSupervisor.start_child(__MODULE__, spec)
Pogo.DynamicSupervisor.start_child(Uplink.PipelineSupervisor, spec)
end

def init(_options) do
DynamicSupervisor.init(strategy: :one_for_one)
def list do
Pogo.DynamicSupervisor.which_children(Uplink.PipelineSupervisor, :global)
end
end
32 changes: 32 additions & 0 deletions lib/uplink/pipelines/context.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule Uplink.Pipelines.Context do
use Agent

require Logger

def start_link(options) do
monitors = Keyword.get(options, :monitors, [])
name = Keyword.fetch!(options, :name)

Agent.start_link(fn -> monitors end, name: {:global, name})
|> case do
{:ok, pid} ->
Logger.info("[Uplink.Pipelines.Context] started #{inspect(name)}")

{:ok, pid}

{:error, {:already_started, pid}} ->
Process.link(pid)
{:ok, pid}
end
end

def get(pid_or_name) do
Agent.get({:global, pid_or_name}, fn monitors -> monitors end)
end

def append(pid_or_name, new_monitors) do
Agent.get_and_update({:global, pid_or_name}, fn monitors ->
{monitors, monitors ++ new_monitors}
end)
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ defmodule Uplink.MixProject do

# Clustering
{:libcluster, "~> 3.0"},
{:pogo, "~> 0.3.0"},

# One time password
{:pot, "~> 1.0.2"},
Expand Down
2 changes: 2 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
"lexdee": {:hex, :lexdee, "2.4.3", "3049ee1df9ce9478208231bf8e3b1f63c98d2d71d7bfe6491a5b926446cdfe41", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mint, "~> 1.5", [hex: :mint, repo: "hexpm", optional: false]}, {:mint_web_socket, "~> 1.0.2", [hex: :mint_web_socket, repo: "hexpm", optional: false]}, {:tesla, "~> 1.7.0", [hex: :tesla, repo: "hexpm", optional: false]}, {:x509, "~> 0.8.1", [hex: :x509, repo: "hexpm", optional: false]}], "hexpm", "5464fecaaca8cdb773a412738c8bbb288928a21054f8acdbf184515fff628992"},
"libcluster": {:hex, :libcluster, "3.3.3", "a4f17721a19004cfc4467268e17cff8b1f951befe428975dd4f6f7b84d927fe0", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "7c0a2275a0bb83c07acd17dab3c3bfb4897b145106750eeccc62d302e3bdfee5"},
"libring": {:hex, :libring, "1.6.0", "d5dca4bcb1765f862ab59f175b403e356dec493f565670e0bacc4b35e109ce0d", [:mix], [], "hexpm", "5e91ece396af4bce99953d49ee0b02f698cd38326d93cd068361038167484319"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
"mime": {:hex, :mime, "2.0.6", "8f18486773d9b15f95f4f4f1e39b710045fa1de891fada4516559967276e4dc2", [:mix], [], "hexpm", "c9945363a6b26d747389aac3643f8e0e09d30499a138ad64fe8fd1d13d9b153e"},
"mimerl": {:hex, :mimerl, "1.3.0", "d0cd9fc04b9061f82490f6581e0128379830e78535e017f7780f37fea7545726", [:rebar3], [], "hexpm", "a1e15a50d1887217de95f0b9b0793e32853f7c258a5cd227650889b38839fe9d"},
Expand All @@ -42,6 +43,7 @@
"plug": {:hex, :plug, "1.16.1", "40c74619c12f82736d2214557dedec2e9762029b2438d6d175c5074c933edc9d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a13ff6b9006b03d7e33874945b2755253841b238c34071ed85b0e86057f8cddc"},
"plug_cowboy": {:hex, :plug_cowboy, "2.6.1", "9a3bbfceeb65eff5f39dab529e5cd79137ac36e913c02067dba3963a26efe9b2", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "de36e1a21f451a18b790f37765db198075c25875c64834bcc82d90b309eb6613"},
"plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"},
"pogo": {:hex, :pogo, "0.3.0", "4983ae7c52735af088fb3733c17482ca801975bb1f15c32c2c6f08086b1ac47e", [:mix], [{:libring, "~> 1.6.0", [hex: :libring, repo: "hexpm", optional: false]}], "hexpm", "a810511e242538e59369efca5aa8bc315dc80ae641b667252ea7930a6dc0ce1e"},
"postgrex": {:hex, :postgrex, "0.17.5", "0483d054938a8dc069b21bdd636bf56c487404c241ce6c319c1f43588246b281", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "50b8b11afbb2c4095a3ba675b4f055c416d0f3d7de6633a595fc131a828a67eb"},
"pot": {:hex, :pot, "1.0.2", "13abb849139fdc04ab8154986abbcb63bdee5de6ed2ba7e1713527e33df923dd", [:rebar3], [], "hexpm", "78fe127f5a4f5f919d6ea5a2a671827bd53eb9d37e5b4128c0ad3df99856c2e0"},
"prometheus_parser": {:hex, :prometheus_parser, "0.1.10", "d1657b308506261b17f111429b38c427d7e4699b9b77601113ccec658c8cb7f9", [:mix], [{:nimble_parsec, "~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "3db190840fc9634a8b1a478cbaebd6ef6118d8e4970c71c80a8d11cd24613640"},
Expand Down

0 comments on commit f7cfd77

Please sign in to comment.