Skip to content

Commit

Permalink
perf: reduce tasks spawning (#2284)
Browse files Browse the repository at this point in the history
* perf: reduce max buffer length

* chore: bump max buffer len to 10k

* perf: reduce the number of tasks being spawned

* perf: remove global list_Recent_logs from RLS

* perf: revert touch timer

* chore: fix compilation warnings
  • Loading branch information
Ziinc authored Dec 18, 2024
1 parent 3b6d45e commit 8925185
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 47 deletions.
6 changes: 0 additions & 6 deletions lib/logflare/source/data.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@ defmodule Logflare.Source.Data do
alias Logflare.Source.RateCounterServer
alias Logflare.Source.BigQuery.Schema
alias Logflare.Backends
alias Logflare.Sources

def get_logs(source_id) when is_atom(source_id) do
source = Sources.Cache.get_by_id(source_id)
Backends.list_recent_logs(source)
end

@spec get_log_count(atom, String.t()) :: non_neg_integer()
def get_log_count(token, _bigquery_project_id) do
Expand Down
15 changes: 4 additions & 11 deletions lib/logflare/source/email_notification_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ defmodule Logflare.Source.EmailNotificationServer do
alias Logflare.AccountEmail
alias Logflare.Mailer
alias Logflare.Backends
alias Logflare.Utils.Tasks

def start_link(args) do
source = Keyword.get(args, :source)
Expand Down Expand Up @@ -41,9 +40,7 @@ defmodule Logflare.Source.EmailNotificationServer do
user = Users.Cache.get_by(id: source.user_id)

if source.notifications.user_email_notifications do
Tasks.start_child(fn ->
AccountEmail.source_notification(user, rate, source) |> Mailer.deliver()
end)
AccountEmail.source_notification(user, rate, source) |> Mailer.deliver()
end

stranger_emails = source.notifications.other_email_notifications
Expand All @@ -52,10 +49,8 @@ defmodule Logflare.Source.EmailNotificationServer do
other_emails = String.split(stranger_emails, ",")

for email <- other_emails do
Tasks.start_child(fn ->
AccountEmail.source_notification_for_others(String.trim(email), rate, source)
|> Mailer.deliver()
end)
AccountEmail.source_notification_for_others(String.trim(email), rate, source)
|> Mailer.deliver()
end
end

Expand All @@ -64,9 +59,7 @@ defmodule Logflare.Source.EmailNotificationServer do
team_user = TeamUsers.Cache.get_team_user(x)

if team_user do
Tasks.start_child(fn ->
AccountEmail.source_notification(team_user, rate, source) |> Mailer.deliver()
end)
AccountEmail.source_notification(team_user, rate, source) |> Mailer.deliver()
end
end)
end
Expand Down
17 changes: 10 additions & 7 deletions lib/logflare/source/recent_logs_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,23 @@ defmodule Logflare.Source.RecentLogsServer do
}}
end

def handle_info(:touch, %{source_id: source_id, source_token: source_token} = state) do
source_id
|> Sources.Cache.get_by_id()
|> Backends.list_recent_logs()
def handle_info(:touch, %{source_id: source_id} = state) do
source =
source_id
|> Sources.Cache.get_by_id()

Backends.list_recent_logs_local(source)
|> Enum.reverse()
|> case do
[] ->
:noop

[log_event | _] ->
[_ | _] = events ->
now = NaiveDateTime.utc_now()
latest_ts = Enum.map(events, & &1.ingested_at) |> Enum.max(NaiveDateTime)

if NaiveDateTime.diff(now, log_event.ingested_at, :millisecond) < @touch_timer do
Sources.Cache.get_by(token: source_token)
if NaiveDateTime.diff(now, latest_ts, :millisecond) < @touch_timer do
source
|> Sources.update_source(%{log_events_updated_at: DateTime.utc_now()})
end
end
Expand Down
10 changes: 6 additions & 4 deletions lib/logflare/source/slack_hook_server/slack_hook_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Logflare.Source.SlackHookServer do
end

def test_post(source) do
recent_events = Backends.list_recent_logs(source)
recent_events = Backends.list_recent_logs_local(source)

__MODULE__.Client.new()
|> __MODULE__.Client.post(source, source.metrics.rate, recent_events)
Expand Down Expand Up @@ -43,10 +43,12 @@ defmodule Logflare.Source.SlackHookServer do
case rate > 0 do
true ->
if source.slack_hook_url do
recent_events = Backends.list_recent_logs(source)
recent_events = Backends.list_recent_logs_local(source)

__MODULE__.Client.new()
|> __MODULE__.Client.post(source, rate, recent_events)
if length(recent_events) > 0 do
__MODULE__.Client.new()
|> __MODULE__.Client.post(source, rate, recent_events)
end
end

check_rate(state.notifications_every)
Expand Down
29 changes: 12 additions & 17 deletions lib/logflare/source/text_notification_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ defmodule Logflare.Source.TextNotificationServer do
alias LogflareWeb.Router.Helpers, as: Routes
alias LogflareWeb.Endpoint
alias Logflare.Backends
alias Logflare.Utils.Tasks

@twilio_phone "+16026006731"

Expand Down Expand Up @@ -46,27 +45,23 @@ defmodule Logflare.Source.TextNotificationServer do
check_rate(state.notifications_every)

source = Sources.Cache.get_by_id(state.source_token)
user = Users.Cache.get_by(id: source.user_id)
source_link = Routes.source_url(Endpoint, :show, source.id)
body = "#{source.name} has #{rate} new event(s). See: #{source_link} "

if source.notifications.user_text_notifications == true do
Tasks.start_child(fn ->
ExTwilio.Message.create(to: user.phone, from: @twilio_phone, body: body)
end)
end
user = Users.Cache.get_by(id: source.user_id)
source_link = Routes.source_url(Endpoint, :show, source.id)
body = "#{source.name} has #{rate} new event(s). See: #{source_link} "

ExTwilio.Message.create(to: user.phone, from: @twilio_phone, body: body)

if source.notifications.team_user_ids_for_sms do
Enum.each(source.notifications.team_user_ids_for_sms, fn x ->
team_user = TeamUsers.Cache.get_team_user(x)
body = "#{source.name} has #{rate} new event(s). See: #{source_link} "
if source.notifications.team_user_ids_for_sms do
Enum.each(source.notifications.team_user_ids_for_sms, fn x ->
team_user = TeamUsers.Cache.get_team_user(x)

if team_user do
Tasks.start_child(fn ->
if team_user do
ExTwilio.Message.create(to: team_user.phone, from: @twilio_phone, body: body)
end)
end
end)
end
end)
end
end

{:noreply, %{state | inserts_since_boot: current_inserts}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Logflare.Source.WebhookNotificationServer do
end

def test_post(source) do
recent_events = Backends.list_recent_logs(source)
recent_events = Backends.list_recent_logs_local(source)
uri = source.webhook_notification_url

post(uri, source, 0, recent_events)
Expand Down Expand Up @@ -42,7 +42,7 @@ defmodule Logflare.Source.WebhookNotificationServer do
case rate > 0 do
true ->
if uri = source.webhook_notification_url do
recent_events = Backends.list_recent_logs(source)
recent_events = Backends.list_recent_logs_local(source)

post(uri, source, rate, recent_events)
end
Expand Down

0 comments on commit 8925185

Please sign in to comment.