Skip to content

Commit

Permalink
Setup ability to compute cpu metric
Browse files Browse the repository at this point in the history
  • Loading branch information
zacksiri committed Oct 23, 2024
1 parent 14231b9 commit 37deffc
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 43 deletions.
9 changes: 8 additions & 1 deletion lib/uplink/monitors/instance.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
defmodule Uplink.Monitors.Instance do
alias Uplink.Clients.LXD

defstruct [:name, :data]
defstruct [:name, :timestamp, :data]

def metrics do
LXD.list_instances(recursion: 2)
|> Enum.map(fn instance ->
%__MODULE__{
name: instance.name,
data: instance,
timestamp: DateTime.utc_now()
}
end)

# members
# |> Enum.map(fn member ->
Expand Down
57 changes: 54 additions & 3 deletions lib/uplink/monitors/instance/metric.ex
Original file line number Diff line number Diff line change
@@ -1,13 +1,64 @@
defimpl Uplink.Monitors.Metric, for: Uplink.Monitors.Instance do
alias Uplink.Monitors.Instance

def memory(%Instance{name: node_name, data: data}) do
def memory(%Instance{name: node_name, timestamp: timestamp, data: data}) do
%{
"@timestamp" => DateTime.utc_now(),
"memory" =>
%{"usage" => memory_usage, "total" => memory_total} = memory_data
} = data.state

pct = percentage(memory_data)

%{
"@timestamp" => timestamp,
"host" => %{
"name" => node_name,
"containerized" => false
"containerized" => data.type == "container"
},
"container.id" => node_name,
"system" => %{
"memory" => %{
"actual" => %{
"used" => %{
"bytes" => memory_usage,
"pct" => pct
}
},
"total" => memory_total,
"used" => %{
"bytes" => memory_usage,
"pct" => pct
}
}
}
}
end

def cpu(%Instance{} = instance, nil), do: nil

def cpu(
%Instance{name: node_name, timestamp: timestamp, data: data} = instance,
%{
timestamp: previous_cpu_metric_timestamp,
data: previous_cpu_metric_data
}
) do
%{
"@timestamp" => timestamp,
"host" => %{
"name" => node_name,
"containerized" => data.type == "container"
},
"container.id" => node_name,
"system" => %{
"cpu" => %{
"cores" => 1
}
}
}
end

defp percentage(%{"total" => total, "usage" => usage_bytes}) do
if usage_bytes > 0 and total > 0, do: usage_bytes / total, else: 0.0
end
end
4 changes: 2 additions & 2 deletions lib/uplink/monitors/metric.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defprotocol Uplink.Monitors.Metric do
@spec cpu(struct) :: {:ok, map()} | {:error, String.t()}
def cpu(data)
@spec cpu(struct, map) :: {:ok, map()} | {:error, String.t()}
def cpu(data, previous_cpu_metric)

@spec memory(struct) :: {:ok, map()} | {:error, String.t()}
def memory(data)
Expand Down
21 changes: 0 additions & 21 deletions lib/uplink/monitors/observer.ex

This file was deleted.

29 changes: 27 additions & 2 deletions lib/uplink/monitors/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,44 @@ defmodule Uplink.Monitors.Pipeline do
use Broadway

alias Broadway.Message
alias Uplink.Monitors.Metric

def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {Uplink.Monitors.Producer, [poll_interval: :timer.seconds(15)]},
cncurrency: 1
concurrency: 1
],
processors: [
default: [
concurrency: 1
concurrency: 3,
max_demand: 10
]
],
batchers: [
default: [
concurrency: 3,
batch_size: 10
]
]
)
end

def handle_message(_, %Message{data: data} = message, _) do
%{metric: instance_metric, previous_cpu_metric: previous_cpu_metric} = data

memory = Metric.memory(instance_metric)
cpu = Metric.cpu(instance_metric, previous_cpu_metric)

data = %{memory: memory, cpu: cpu}

Message.put_data(message, data)
end

def handle_batch(_, messages, _batch_info, _context) do
IO.inspect(messages)

messages
end
end
73 changes: 59 additions & 14 deletions lib/uplink/monitors/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Uplink.Monitors.Producer do
use GenStage
@behaviour Broadway.Producer

alias Uplink.Metrics
alias Uplink.Monitors

@doc false
def start_link(opts) do
Expand All @@ -12,37 +12,74 @@ defmodule Uplink.Monitors.Producer do
@impl true
def init(opts) do
state = %{
client: LXD.client(),
demand: 0,
poll_interval: Keyword.get(opts, :poll_interval, 15_000)
poll_interval: Keyword.get(opts, :poll_interval, 15_000),
last_fetched_timestamp: nil,
previous_cpu_metrics: []
}

{:producer, state}
end

@impl true
def handle_demand(demand, state) when demand <= 0, do: {:noreply, [], state}

def handle_demand(demand, state) do
{messages, state} = load_metrics(demand, state)
{:noreply, messages, state}
if ready_to_fetch?(state) do
{messages, state} = load_metrics(demand, state)
{:noreply, messages, state}
else
Process.send_after(self(), :poll, state.poll_interval)
{:noreply, [], state}
end
end

@impl true
def handle_info(:poll, state) do
{messages, state} = load_metrics(0, state)
Process.send_after(self(), :poll, state.poll_interval)
{:noreply, messages, state}
end

defp load_metrics(demand, state) when demand <= 0, do: {[], state}

defp load_metrics(demand, state) do
messages =
Metrics.get_instances_metrics()
|> transform_metrics()
demand = demand + state.demand

metrics = Monitors.get_instances_metrics()

messages = transform_metrics(metrics, state.previous_cpu_metrics)

current_demand = demand - length(messages)

{messages, %{state | demand: demand - Enum.count(messages)}}
fetch_timestamp = DateTime.to_unix(DateTime.utc_now(), :millisecond)

previous_cpu_metrics =
Enum.map(metrics, fn instance ->
%{
name: instance.data.name,
timestamp: fetch_timestamp,
data: Map.get(instance.data.state, "cpu")
}
end)

state =
state
|> Map.put(:demand, current_demand)
|> Map.put(:last_fetched_timestamp, fetch_timestamp)
|> Map.put(:previous_cpu_metrics, previous_cpu_metrics)

{messages, state}
end

defp transform_metrics(metrics) do
Enum.map(metrics, &transform_message/1)
defp transform_metrics(metrics, previous_cpu_metrics) do
metrics
|> Enum.map(fn metric ->
previous_cpu_metric =
Enum.find(previous_cpu_metrics, fn previous_cpu_metric ->
previous_cpu_metric.name == metric.data.name
end)

%{metric: metric, previous_cpu_metric: previous_cpu_metric}
end)
|> Enum.map(&transform_message/1)
end

defp transform_message(message) do
Expand All @@ -51,4 +88,12 @@ defmodule Uplink.Monitors.Producer do
acknowledger: Broadway.NoopAcknowledger.init()
}
end

defp ready_to_fetch?(state) do
now = DateTime.to_unix(DateTime.utc_now(), :millisecond)
last_fetched_timestamp = state[:last_fetched_timestamp]

is_nil(last_fetched_timestamp) ||
now - last_fetched_timestamp > state.poll_interval
end
end

0 comments on commit 37deffc

Please sign in to comment.