diff --git a/lib/logflare/source/data.ex b/lib/logflare/source/data.ex index a788a6d77..31482ff06 100644 --- a/lib/logflare/source/data.ex +++ b/lib/logflare/source/data.ex @@ -5,12 +5,6 @@ defmodule Logflare.Source.Data do alias Logflare.Source.RateCounterServer alias Logflare.Source.BigQuery.Schema alias Logflare.Backends - alias Logflare.Sources - - def get_logs(source_id) when is_atom(source_id) do - source = Sources.Cache.get_by_id(source_id) - Backends.list_recent_logs(source) - end @spec get_log_count(atom, String.t()) :: non_neg_integer() def get_log_count(token, _bigquery_project_id) do diff --git a/lib/logflare/source/email_notification_server.ex b/lib/logflare/source/email_notification_server.ex index 5afd9be07..1bab775a3 100644 --- a/lib/logflare/source/email_notification_server.ex +++ b/lib/logflare/source/email_notification_server.ex @@ -9,7 +9,6 @@ defmodule Logflare.Source.EmailNotificationServer do alias Logflare.AccountEmail alias Logflare.Mailer alias Logflare.Backends - alias Logflare.Utils.Tasks def start_link(args) do source = Keyword.get(args, :source) @@ -41,9 +40,7 @@ defmodule Logflare.Source.EmailNotificationServer do user = Users.Cache.get_by(id: source.user_id) if source.notifications.user_email_notifications do - Tasks.start_child(fn -> - AccountEmail.source_notification(user, rate, source) |> Mailer.deliver() - end) + AccountEmail.source_notification(user, rate, source) |> Mailer.deliver() end stranger_emails = source.notifications.other_email_notifications @@ -52,10 +49,8 @@ defmodule Logflare.Source.EmailNotificationServer do other_emails = String.split(stranger_emails, ",") for email <- other_emails do - Tasks.start_child(fn -> - AccountEmail.source_notification_for_others(String.trim(email), rate, source) - |> Mailer.deliver() - end) + AccountEmail.source_notification_for_others(String.trim(email), rate, source) + |> Mailer.deliver() end end @@ -64,9 +59,7 @@ defmodule Logflare.Source.EmailNotificationServer do team_user = TeamUsers.Cache.get_team_user(x) if team_user do - Tasks.start_child(fn -> - AccountEmail.source_notification(team_user, rate, source) |> Mailer.deliver() - end) + AccountEmail.source_notification(team_user, rate, source) |> Mailer.deliver() end end) end diff --git a/lib/logflare/source/recent_logs_server.ex b/lib/logflare/source/recent_logs_server.ex index 33cde2d49..4aa98ca67 100644 --- a/lib/logflare/source/recent_logs_server.ex +++ b/lib/logflare/source/recent_logs_server.ex @@ -89,20 +89,23 @@ defmodule Logflare.Source.RecentLogsServer do }} end - def handle_info(:touch, %{source_id: source_id, source_token: source_token} = state) do - source_id - |> Sources.Cache.get_by_id() - |> Backends.list_recent_logs() + def handle_info(:touch, %{source_id: source_id} = state) do + source = + source_id + |> Sources.Cache.get_by_id() + + Backends.list_recent_logs_local(source) |> Enum.reverse() |> case do [] -> :noop - [log_event | _] -> + [_ | _] = events -> now = NaiveDateTime.utc_now() + latest_ts = Enum.map(events, & &1.ingested_at) |> Enum.max(NaiveDateTime) - if NaiveDateTime.diff(now, log_event.ingested_at, :millisecond) < @touch_timer do - Sources.Cache.get_by(token: source_token) + if NaiveDateTime.diff(now, latest_ts, :millisecond) < @touch_timer do + source |> Sources.update_source(%{log_events_updated_at: DateTime.utc_now()}) end end diff --git a/lib/logflare/source/slack_hook_server/slack_hook_server.ex b/lib/logflare/source/slack_hook_server/slack_hook_server.ex index e2f145746..3d06f6f98 100644 --- a/lib/logflare/source/slack_hook_server/slack_hook_server.ex +++ b/lib/logflare/source/slack_hook_server/slack_hook_server.ex @@ -14,7 +14,7 @@ defmodule Logflare.Source.SlackHookServer do end def test_post(source) do - recent_events = Backends.list_recent_logs(source) + recent_events = Backends.list_recent_logs_local(source) __MODULE__.Client.new() |> __MODULE__.Client.post(source, source.metrics.rate, recent_events) @@ -43,10 +43,12 @@ defmodule Logflare.Source.SlackHookServer do case rate > 0 do true -> if source.slack_hook_url do - recent_events = Backends.list_recent_logs(source) + recent_events = Backends.list_recent_logs_local(source) - __MODULE__.Client.new() - |> __MODULE__.Client.post(source, rate, recent_events) + if length(recent_events) > 0 do + __MODULE__.Client.new() + |> __MODULE__.Client.post(source, rate, recent_events) + end end check_rate(state.notifications_every) diff --git a/lib/logflare/source/text_notification_server.ex b/lib/logflare/source/text_notification_server.ex index a7002e4d8..6dd3c0097 100644 --- a/lib/logflare/source/text_notification_server.ex +++ b/lib/logflare/source/text_notification_server.ex @@ -11,7 +11,6 @@ defmodule Logflare.Source.TextNotificationServer do alias LogflareWeb.Router.Helpers, as: Routes alias LogflareWeb.Endpoint alias Logflare.Backends - alias Logflare.Utils.Tasks @twilio_phone "+16026006731" @@ -46,27 +45,23 @@ defmodule Logflare.Source.TextNotificationServer do check_rate(state.notifications_every) source = Sources.Cache.get_by_id(state.source_token) - user = Users.Cache.get_by(id: source.user_id) - source_link = Routes.source_url(Endpoint, :show, source.id) - body = "#{source.name} has #{rate} new event(s). See: #{source_link} " if source.notifications.user_text_notifications == true do - Tasks.start_child(fn -> - ExTwilio.Message.create(to: user.phone, from: @twilio_phone, body: body) - end) - end + user = Users.Cache.get_by(id: source.user_id) + source_link = Routes.source_url(Endpoint, :show, source.id) + body = "#{source.name} has #{rate} new event(s). See: #{source_link} " + + ExTwilio.Message.create(to: user.phone, from: @twilio_phone, body: body) - if source.notifications.team_user_ids_for_sms do - Enum.each(source.notifications.team_user_ids_for_sms, fn x -> - team_user = TeamUsers.Cache.get_team_user(x) - body = "#{source.name} has #{rate} new event(s). See: #{source_link} " + if source.notifications.team_user_ids_for_sms do + Enum.each(source.notifications.team_user_ids_for_sms, fn x -> + team_user = TeamUsers.Cache.get_team_user(x) - if team_user do - Tasks.start_child(fn -> + if team_user do ExTwilio.Message.create(to: team_user.phone, from: @twilio_phone, body: body) - end) - end - end) + end + end) + end end {:noreply, %{state | inserts_since_boot: current_inserts}} diff --git a/lib/logflare/source/webhook_notification_server/webhook_notification_server.ex b/lib/logflare/source/webhook_notification_server/webhook_notification_server.ex index c532d9f24..88febd158 100644 --- a/lib/logflare/source/webhook_notification_server/webhook_notification_server.ex +++ b/lib/logflare/source/webhook_notification_server/webhook_notification_server.ex @@ -14,7 +14,7 @@ defmodule Logflare.Source.WebhookNotificationServer do end def test_post(source) do - recent_events = Backends.list_recent_logs(source) + recent_events = Backends.list_recent_logs_local(source) uri = source.webhook_notification_url post(uri, source, 0, recent_events) @@ -42,7 +42,7 @@ defmodule Logflare.Source.WebhookNotificationServer do case rate > 0 do true -> if uri = source.webhook_notification_url do - recent_events = Backends.list_recent_logs(source) + recent_events = Backends.list_recent_logs_local(source) post(uri, source, rate, recent_events) end