Skip to content

Commit

Permalink
Merge pull request #1404 from Logflare/feat/alerting
Browse files Browse the repository at this point in the history
feat: Logflare Alerts v0
  • Loading branch information
Ziinc authored Sep 26, 2023
2 parents cebe3e3 + d83d1fd commit 6201fbe
Show file tree
Hide file tree
Showing 32 changed files with 1,234 additions and 21 deletions.
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,6 @@ config :logflare, Logflare.Cluster.Utils, min_cluster_size: 1

config :grpc, start_server: true

config :logflare, Logflare.AlertsScheduler, init_task: {Logflare.Alerting, :init_alert_jobs, []}

import_config "#{Mix.env()}.exs"
242 changes: 242 additions & 0 deletions lib/logflare/alerting.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
defmodule Logflare.Alerting do
@moduledoc """
The Alerting context.
"""

import Ecto.Query, warn: false
alias Logflare.Repo

require Logger
alias Logflare.Backends.Adaptor.WebhookAdaptor
alias Logflare.Backends.Adaptor.SlackAdaptor
alias Logflare.Alerting.AlertQuery
alias Logflare.User

@doc """
Returns the list of alert_queries.
## Examples
iex> list_alert_queries()
[%AlertQuery{}, ...]
"""
def list_alert_queries(%User{id: user_id}) do
from(q in AlertQuery, where: q.user_id == ^user_id)
|> Repo.all()
end

@doc """
Gets a single alert_query.
Raises `Ecto.NoResultsError` if the Alert query does not exist.
## Examples
iex> get_alert_query!(123)
%AlertQuery{}
iex> get_alert_query!(456)
** (Ecto.NoResultsError)
"""
def get_alert_query!(id), do: Repo.get!(AlertQuery, id)

def get_alert_query_by(kw) do
Repo.get_by(AlertQuery, kw)
end

@doc """
Creates a alert_query.
## Examples
iex> create_alert_query(%{field: value})
{:ok, %AlertQuery{}}
iex> create_alert_query(%{field: bad_value})
{:error, %Ecto.Changeset{}}
"""
def create_alert_query(%User{} = user, attrs \\ %{}) do
user
|> Ecto.build_assoc(:alert_queries)
|> Repo.preload(:user)
|> AlertQuery.changeset(attrs)
|> Repo.insert()
end

@doc """
Updates a alert_query.
## Examples
iex> update_alert_query(alert_query, %{field: new_value})
{:ok, %AlertQuery{}}
iex> update_alert_query(alert_query, %{field: bad_value})
{:error, %Ecto.Changeset{}}
"""
def update_alert_query(%AlertQuery{} = alert_query, attrs) do
alert_query
|> Repo.preload(:user)
|> AlertQuery.changeset(attrs)
|> Repo.update()
end

@doc """
Deletes a alert_query.
## Examples
iex> delete_alert_query(alert_query)
{:ok, %AlertQuery{}}
iex> delete_alert_query(alert_query)
{:error, %Ecto.Changeset{}}
"""
def delete_alert_query(%AlertQuery{} = alert_query) do
Repo.delete(alert_query)
end

@doc """
Returns an `%Ecto.Changeset{}` for tracking alert_query changes.
## Examples
iex> change_alert_query(alert_query)
%Ecto.Changeset{data: %AlertQuery{}}
"""
@spec change_alert_query(AlertQuery.t()) :: Ecto.Changeset.t()
def change_alert_query(%AlertQuery{} = alert_query, attrs \\ %{}) do
AlertQuery.changeset(alert_query, attrs)
end

@doc """
Retrieves a Citrine.Job based on AlertQuery.
Citrine.Job shares the same id as AlertQuery, resulting in a 1-1 relationship.
"""
@spec get_alert_job(AlertQuery.t()) :: Citrine.Job.t()
def get_alert_job(%AlertQuery{id: id}), do: get_alert_job(id)

def get_alert_job(id) do
case Logflare.AlertsScheduler.get_job(id) do
{_pid, job} -> job
nil -> nil
end
end

@doc """
Updates or creates a new Citrine.Job based on a given AlertQuery
"""
@spec upsert_alert_job(AlertQuery.t()) :: {:ok, Citrine.Job.t()}
def upsert_alert_job(%AlertQuery{} = alert_query) do
Logflare.AlertsScheduler.put_job(%Citrine.Job{
id: alert_query.id,
schedule: alert_query.cron,
extended_syntax: false,
task: {:run_alert, [alert_query]}
})

{:ok, get_alert_job(alert_query)}
end

@doc """
Initializes and ensures that all alert jobs are created.
TODO: batching instead of loading whole table.
"""
def init_alert_jobs do
AlertQuery
|> Repo.all()
|> Stream.each(fn alert_query ->
if get_alert_job(alert_query) == nil do
upsert_alert_job(alert_query)
end
end)
|> Stream.run()

:ok
end

@doc """
Performs the check lifecycle of an AlertQuery.
Send notifications if necessary configurations are set. If no results are returned from the query execution, no alert is sent.
"""
@spec run_alert(AlertQuery.t()) :: :ok
def run_alert(%AlertQuery{} = alert_query) do
alert_query = alert_query |> Repo.preload([:user])

with {:ok, [_ | _] = results} <- execute_alert_query(alert_query) do
if alert_query.webhook_notification_url do
WebhookAdaptor.Client.send(alert_query.webhook_notification_url, %{
"result" => results
})
end

if alert_query.slack_hook_url do
SlackAdaptor.send_message(alert_query.slack_hook_url, results)
end

:ok
else
{:ok, []} ->
:ok

other ->
other
end
end

@doc """
Deletes an AlertQuery's Citrine.Job from the scheduler
noop if already deleted.
### Examples
iex> delete_alert_job(%AlertQuery{})
:ok
iex> delete_alert_job(alert_query.id)
:ok
"""
@spec delete_alert_job(AlertQuery.t() | number()) :: :ok
def delete_alert_job(%AlertQuery{id: id}), do: delete_alert_job(id)

def delete_alert_job(alert_id) do
Logflare.AlertsScheduler.delete_job(alert_id)
end

@doc """
Executes an AlertQuery and returns its results
Requires `:user` key to be preloaded.
### Examples
iex> execute_alert_query(alert_query)
{:ok, [{"user_id" => "my-user-id"}]}
"""
@spec execute_alert_query(AlertQuery.t()) :: {:ok, [map()]}
def execute_alert_query(%AlertQuery{user: %User{}} = alert_query) do
Logger.info("Executing AlertQuery | #{alert_query.name} | #{alert_query.id}")

with {:ok, transformed_query} <-
Logflare.Sql.transform(:bq_sql, alert_query.query, alert_query.user_id),
{:ok, %{rows: rows}} <-
Logflare.BqRepo.query_with_sql_and_params(
alert_query.user,
alert_query.user.bigquery_project_id || env_project_id(),
transformed_query,
[],
parameterMode: "NAMED",
maxResults: 1000,
location: alert_query.user.bigquery_dataset_location
) do
{:ok, rows}
end
end

# helper to get the google project id via env.
defp env_project_id, do: Application.get_env(:logflare, Logflare.Google)[:project_id]
end
63 changes: 63 additions & 0 deletions lib/logflare/alerting/alert_query.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
defmodule Logflare.Alerting.AlertQuery do
@moduledoc false
use TypedEctoSchema
import Ecto.Changeset
alias Logflare.Endpoints.Query

@derive {Jason.Encoder,
only: [
:id,
:token,
:cron,
:name,
:description,
:language,
:query,
:webhook_notification_url,
:slack_hook_url
]}
typed_schema "alert_queries" do
field :name, :string
field :description, :string
field(:language, Ecto.Enum, values: [:bq_sql, :pg_sql, :lql], default: :bq_sql)
field :query, :string
field :cron, :string
field :source_mapping, :map
field :token, Ecto.UUID, autogenerate: true
field :slack_hook_url, :string
field :webhook_notification_url, :string
belongs_to :user, Logflare.User

timestamps()
end

@doc false
def changeset(alert_query, attrs) do
alert_query
|> cast(attrs, [
:name,
:description,
:language,
:query,
:cron,
:slack_hook_url,
:webhook_notification_url
])
|> validate_required([:name, :query, :cron, :language])
|> validate_change(:cron, fn :cron, cron ->
with {:ok, expr} <- Crontab.CronExpression.Parser.parse(cron),
[first, second] <- Crontab.Scheduler.get_next_run_dates(expr) |> Enum.take(2),
true <- NaiveDateTime.diff(first, second, :minute) <= -15 do
[]
else
false -> [cron: "can only trigger up to 15 minute intervals"]
{:error, msg} -> [cron: msg]
end
end)

# this source mapping logic is for any generic changeset
# we implement the same columns for now,
# can consider migrating to separate table in future.
|> Query.update_source_mapping()
end
end
3 changes: 3 additions & 0 deletions lib/logflare/alerts_scheduler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
defmodule Logflare.AlertsScheduler do
use Citrine.Scheduler, otp_app: :logflare
end
5 changes: 4 additions & 1 deletion lib/logflare/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ defmodule Logflare.Application do
{DynamicSupervisor,
strategy: :one_for_one, name: Logflare.Backends.Adaptor.PostgresAdaptor.PgRepoSupervisor},
{Registry, name: Logflare.Backends.SourceRegistry, keys: :unique},
{Registry, name: Logflare.Backends.SourceDispatcher, keys: :duplicate}
{Registry, name: Logflare.Backends.SourceDispatcher, keys: :duplicate},

# citrine scheduler for alerts
Logflare.AlertsScheduler
] ++ conditional_children() ++ common_children()
end

Expand Down
Loading

0 comments on commit 6201fbe

Please sign in to comment.