From 59461b2d7170cc2d9f732d1e2d2c08825554f61c Mon Sep 17 00:00:00 2001 From: TzeYiing Date: Sat, 14 Dec 2024 02:32:35 +0800 Subject: [PATCH] perf: remove expensive table traversal for pending counts --- lib/logflare/backends.ex | 43 +++++++---- .../backends/adaptor/bigquery_adaptor.ex | 4 +- lib/logflare/backends/ingest_event_queue.ex | 8 +-- .../ingest_event_queue/broadcast_worker.ex | 4 +- .../ingest_event_queue/queue_janitor.ex | 14 ++-- lib/logflare/logs/browser_report.ex | 2 +- lib/logflare/pubsub_rates/cache.ex | 4 +- .../backends/buffer_producer_test.exs | 4 +- .../backends/ingest_events_queue_test.exs | 32 ++++----- test/logflare/backends_test.exs | 20 +++++- .../plugs/buffer_limiter_test.exs | 48 ++++++++++++- test/profiling/buffer_limiter.exs | 72 +++++++++++++++++++ 12 files changed, 203 insertions(+), 52 deletions(-) create mode 100644 test/profiling/buffer_limiter.exs diff --git a/lib/logflare/backends.ex b/lib/logflare/backends.ex index ca1462f0a..184e5b2cb 100644 --- a/lib/logflare/backends.ex +++ b/lib/logflare/backends.ex @@ -24,16 +24,13 @@ defmodule Logflare.Backends do defdelegate child_spec(arg), to: __MODULE__.Supervisor - @max_pending_buffer_len 100_000 + @max_pending_buffer_len_per_queue 50_000 @doc """ - Retrieves the hardcoded max pending buffer length. + Retrieves the hardcoded max pending buffer length of an individual queue """ - @spec max_buffer_len() :: non_neg_integer() - def max_buffer_len(), do: @max_pending_buffer_len - - @spec max_ingest_queue_len() :: non_neg_integer() - def max_ingest_queue_len(), do: 10_000 + @spec max_buffer_queue_len() :: non_neg_integer() + def max_buffer_queue_len(), do: @max_pending_buffer_len_per_queue @doc """ Lists `Backend`s for a given source. @@ -474,7 +471,16 @@ defmodule Logflare.Backends do do: cached_local_pending_buffer_full?(id) def cached_local_pending_buffer_full?(source_id) when is_integer(source_id) do - cached_local_pending_buffer_len(source_id) > @max_pending_buffer_len + PubSubRates.Cache.get_local_buffer(source_id, nil) + |> Map.get(:queues, []) + |> case do + [] -> + false + + queues -> + queues + |> Enum.all?(fn {_key, v} -> v > @max_pending_buffer_len_per_queue end) + end end @doc """ @@ -485,24 +491,35 @@ defmodule Logflare.Backends do nil | integer() ) :: integer() + @deprecated "call `Logflare.Backends.cache_local_buffer_lens/2` instead." def get_and_cache_local_pending_buffer_len(source_id, backend_id \\ nil) when is_integer(source_id) do - len = IngestEventQueue.count_pending({source_id, backend_id}) + len = IngestEventQueue.total_pending({source_id, backend_id}) payload = %{Node.self() => %{len: len}} PubSubRates.Cache.cache_buffers(source_id, backend_id, payload) len end + @doc """ + Caches total buffer len. Includes ingested events that are awaiting cleanup. + """ + def cache_local_buffer_lens(source_id, backend_id \\ nil) do + queues = IngestEventQueue.list_counts({source_id, backend_id}) + + len = for({_k, v} <- queues, do: v) |> Enum.sum() + + stats = %{len: len, queues: queues} + payload = %{Node.self() => stats} + PubSubRates.Cache.cache_buffers(source_id, backend_id, payload) + {:ok, stats} + end + @doc """ Get local pending buffer len of a source/backend combination """ @spec cached_local_pending_buffer_len(Source.t(), Backend.t() | nil) :: non_neg_integer() def cached_local_pending_buffer_len(source_id, backend_id \\ nil) when is_integer(source_id) do PubSubRates.Cache.get_local_buffer(source_id, backend_id) - |> case do - %{len: len} -> len - other -> other - end end @doc """ diff --git a/lib/logflare/backends/adaptor/bigquery_adaptor.ex b/lib/logflare/backends/adaptor/bigquery_adaptor.ex index 8f26f863c..232885e9b 100644 --- a/lib/logflare/backends/adaptor/bigquery_adaptor.ex +++ b/lib/logflare/backends/adaptor/bigquery_adaptor.ex @@ -55,7 +55,7 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do resolve_count: fn state -> source = Sources.refresh_source_metrics_for_ingest(source) - lens = IngestEventQueue.list_pending_counts({source.id, backend.id}) + lens = IngestEventQueue.list_counts({source.id, backend.id}) handle_resolve_count(state, lens, source.metrics.avg) end @@ -78,7 +78,7 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do """ def handle_resolve_count(state, lens, avg_rate) do - max_len = Backends.max_ingest_queue_len() + max_len = Backends.max_buffer_queue_len() startup_size = Enum.find_value(lens, 0, fn diff --git a/lib/logflare/backends/ingest_event_queue.ex b/lib/logflare/backends/ingest_event_queue.ex index c2a004609..de80483ea 100644 --- a/lib/logflare/backends/ingest_event_queue.ex +++ b/lib/logflare/backends/ingest_event_queue.ex @@ -236,7 +236,7 @@ defmodule Logflare.Backends.IngestEventQueue do fn objs, acc -> items = for {sid_bid_pid, _tid} <- objs, - count = count_pending(sid_bid_pid), + count = total_pending(sid_bid_pid), is_integer(count) do {sid_bid_pid, count} end @@ -271,15 +271,15 @@ defmodule Logflare.Backends.IngestEventQueue do @doc """ Counts pending items from a given table """ - @spec count_pending(source_backend_pid()) :: integer() | {:error, :not_initialized} - def count_pending({_, _} = sid_bid) do + @spec total_pending(source_backend_pid()) :: integer() | {:error, :not_initialized} + def total_pending({_, _} = sid_bid) do # iterate over each matching source-backend queue and sum the totals for {_sid_bid_pid, count} <- list_pending_counts(sid_bid), reduce: 0 do acc -> acc + count end end - def count_pending({_sid, _bid, _pid} = sid_bid_pid) do + def total_pending({_sid, _bid, _pid} = sid_bid_pid) do ms = Ex2ms.fun do {_event_id, :pending, _event} -> true diff --git a/lib/logflare/backends/ingest_event_queue/broadcast_worker.ex b/lib/logflare/backends/ingest_event_queue/broadcast_worker.ex index 45a0e5ba5..a8dd10ace 100644 --- a/lib/logflare/backends/ingest_event_queue/broadcast_worker.ex +++ b/lib/logflare/backends/ingest_event_queue/broadcast_worker.ex @@ -62,9 +62,9 @@ defmodule Logflare.Backends.IngestEventQueue.BroadcastWorker do end defp global_broadcast_producer_buffer_len({source_id, backend_id}) do - len = Backends.get_and_cache_local_pending_buffer_len(source_id, backend_id) + {:ok, stats} = Backends.cache_local_buffer_lens(source_id, backend_id) - local_buffer = %{Node.self() => %{len: len}} + local_buffer = %{Node.self() => stats} PubSubRates.global_broadcast_rate({"buffers", source_id, backend_id, local_buffer}) end diff --git a/lib/logflare/backends/ingest_event_queue/queue_janitor.ex b/lib/logflare/backends/ingest_event_queue/queue_janitor.ex index 9814cb103..2e65ecb4c 100644 --- a/lib/logflare/backends/ingest_event_queue/queue_janitor.ex +++ b/lib/logflare/backends/ingest_event_queue/queue_janitor.ex @@ -14,7 +14,7 @@ defmodule Logflare.Backends.IngestEventQueue.QueueJanitor do require Logger @default_interval 1_000 @default_remainder 100 - @default_max Logflare.Backends.max_buffer_len() + @default_max Logflare.Backends.max_buffer_queue_len() @default_purge_ratio 0.1 def start_link(opts) do @@ -53,18 +53,18 @@ defmodule Logflare.Backends.IngestEventQueue.QueueJanitor do sid_bid = {state.source_id, state.backend_id} # safety measure, drop all if still exceed for {_sid, _bid, ref} = sid_bid_pid <- IngestEventQueue.list_queues(sid_bid), - pending_size = IngestEventQueue.count_pending(sid_bid_pid), - is_integer(pending_size) do - if pending_size > state.remainder do + size = IngestEventQueue.get_table_size(sid_bid_pid), + is_integer(size) do + if size > state.remainder do IngestEventQueue.truncate_table(sid_bid_pid, :ingested, 0) else IngestEventQueue.truncate_table(sid_bid_pid, :ingested, state.remainder) end - pending_size = IngestEventQueue.count_pending(sid_bid_pid) + size = IngestEventQueue.total_pending(sid_bid_pid) - if pending_size > state.max and ref != nil do - to_drop = round(state.purge_ratio * pending_size) + if size > state.max and ref != nil do + to_drop = round(state.purge_ratio * size) IngestEventQueue.drop(sid_bid_pid, :pending, to_drop) Logger.warning( diff --git a/lib/logflare/logs/browser_report.ex b/lib/logflare/logs/browser_report.ex index 8c36a156d..fa7d737aa 100644 --- a/lib/logflare/logs/browser_report.ex +++ b/lib/logflare/logs/browser_report.ex @@ -16,6 +16,6 @@ defmodule Logflare.Logs.BrowserReport do end def message(params) do - Jason.encode!(params) + Jason.encode_to_iodata!(params) end end diff --git a/lib/logflare/pubsub_rates/cache.ex b/lib/logflare/pubsub_rates/cache.ex index 888093b39..1af80b12b 100644 --- a/lib/logflare/pubsub_rates/cache.ex +++ b/lib/logflare/pubsub_rates/cache.ex @@ -89,8 +89,8 @@ defmodule Logflare.PubSubRates.Cache do def get_local_buffer(source_id, backend_id) do Cachex.get(__MODULE__, {source_id, backend_id, "buffers"}) |> case do - {:ok, val} when val != nil -> Map.get(val, Node.self(), 0) - _ -> 0 + {:ok, val} when val != nil -> Map.get(val, Node.self(), %{len: 0, queues: []}) + _ -> %{len: 0, queues: []} end end diff --git a/test/logflare/backends/buffer_producer_test.exs b/test/logflare/backends/buffer_producer_test.exs index 45df60edb..8ae96a2a4 100644 --- a/test/logflare/backends/buffer_producer_test.exs +++ b/test/logflare/backends/buffer_producer_test.exs @@ -27,7 +27,7 @@ defmodule Logflare.Backends.BufferProducerTest do GenStage.stream([{buffer_producer_pid, max_demand: 1}]) |> Enum.take(1) - assert IngestEventQueue.count_pending(sid_bid_pid) == 0 + assert IngestEventQueue.total_pending(sid_bid_pid) == 0 # marked as :ingested assert IngestEventQueue.get_table_size(sid_bid_pid) == 1 end @@ -49,7 +49,7 @@ defmodule Logflare.Backends.BufferProducerTest do GenStage.stream([{buffer_producer_pid, max_demand: 1}]) |> Enum.take(1) - assert IngestEventQueue.count_pending(sid_bid_pid) == 0 + assert IngestEventQueue.total_pending(sid_bid_pid) == 0 # marked as :ingested assert IngestEventQueue.get_table_size(sid_bid_pid) == 1 end diff --git a/test/logflare/backends/ingest_events_queue_test.exs b/test/logflare/backends/ingest_events_queue_test.exs index 4574a0224..0afb0e042 100644 --- a/test/logflare/backends/ingest_events_queue_test.exs +++ b/test/logflare/backends/ingest_events_queue_test.exs @@ -127,9 +127,9 @@ defmodule Logflare.Backends.IngestEventQueueTest do test "adding to startup queue", %{queue: {sid, bid, _} = queue} do le = build(:log_event, message: "123") assert :ok = IngestEventQueue.add_to_table(queue, [le]) - assert IngestEventQueue.count_pending(queue) == 1 - assert IngestEventQueue.count_pending({sid, bid}) == 1 - assert IngestEventQueue.count_pending({sid, bid, nil}) == 1 + assert IngestEventQueue.total_pending(queue) == 1 + assert IngestEventQueue.total_pending({sid, bid}) == 1 + assert IngestEventQueue.total_pending({sid, bid, nil}) == 1 end test "move/1 moves all events from one queue to target queue", %{queue: {sid, bid, _} = queue} do @@ -137,13 +137,13 @@ defmodule Logflare.Backends.IngestEventQueueTest do le = build(:log_event, message: "123") IngestEventQueue.upsert_tid(target) assert :ok = IngestEventQueue.add_to_table(queue, [le]) - assert IngestEventQueue.count_pending(queue) == 1 - assert IngestEventQueue.count_pending(target) == 0 + assert IngestEventQueue.total_pending(queue) == 1 + assert IngestEventQueue.total_pending(target) == 0 assert {:ok, 1} = IngestEventQueue.move(queue, target) - assert IngestEventQueue.count_pending(queue) == 0 - assert IngestEventQueue.count_pending(target) == 1 - assert IngestEventQueue.count_pending({sid, bid}) == 1 - assert IngestEventQueue.count_pending({sid, bid, nil}) == 0 + assert IngestEventQueue.total_pending(queue) == 0 + assert IngestEventQueue.total_pending(target) == 1 + assert IngestEventQueue.total_pending({sid, bid}) == 1 + assert IngestEventQueue.total_pending({sid, bid, nil}) == 0 end end @@ -162,15 +162,15 @@ defmodule Logflare.Backends.IngestEventQueueTest do assert IngestEventQueue.get_table_size(sbp) == 1 # can take pending items assert {:ok, [_]} = IngestEventQueue.take_pending(sbp, 5) - assert IngestEventQueue.count_pending(sbp) == 1 + assert IngestEventQueue.total_pending(sbp) == 1 # set to ingested assert {:ok, 1} = IngestEventQueue.mark_ingested(sbp, [le]) - assert IngestEventQueue.count_pending(sbp) == 0 + assert IngestEventQueue.total_pending(sbp) == 0 # truncate to n items assert :ok = IngestEventQueue.truncate_table(sbp, :ingested, 1) assert IngestEventQueue.get_table_size(sbp) == 1 assert :ok = IngestEventQueue.truncate_table(sbp, :ingested, 0) - assert IngestEventQueue.count_pending(sbp) == 0 + assert IngestEventQueue.total_pending(sbp) == 0 end test "drop n items from a queue", %{source_backend_pid: sbp} do @@ -207,7 +207,7 @@ defmodule Logflare.Backends.IngestEventQueueTest do assert {:ok, _} = IngestEventQueue.mark_ingested(sbp, batch) assert :ok = IngestEventQueue.truncate_table(sbp, :ingested, 50) assert IngestEventQueue.get_table_size(sbp) == 50 - assert IngestEventQueue.count_pending(sbp) == 0 + assert IngestEventQueue.total_pending(sbp) == 0 assert :ok = IngestEventQueue.truncate_table(sbp, :ingested, 0) assert IngestEventQueue.get_table_size(sbp) == 0 end @@ -221,9 +221,9 @@ defmodule Logflare.Backends.IngestEventQueueTest do # add as pending assert :ok = IngestEventQueue.add_to_table(sbp, batch) assert :ok = IngestEventQueue.truncate_table(sbp, :pending, 50) - assert IngestEventQueue.count_pending(sbp) == 50 + assert IngestEventQueue.total_pending(sbp) == 50 assert :ok = IngestEventQueue.truncate_table(sbp, :pending, 0) - assert IngestEventQueue.count_pending(sbp) == 0 + assert IngestEventQueue.total_pending(sbp) == 0 end end @@ -270,7 +270,7 @@ defmodule Logflare.Backends.IngestEventQueueTest do :timer.sleep(550) assert IngestEventQueue.get_table_size(table) == 0 - assert IngestEventQueue.count_pending(table) == 0 + assert IngestEventQueue.total_pending(table) == 0 end test "QueueJanitor purges if exceeds max" do diff --git a/test/logflare/backends_test.exs b/test/logflare/backends_test.exs index e124af127..11b45bd5a 100644 --- a/test/logflare/backends_test.exs +++ b/test/logflare/backends_test.exs @@ -19,7 +19,6 @@ defmodule Logflare.BackendsTest do alias Logflare.Rules alias Logflare.Backends.IngestEventQueue alias Logflare.Backends.SourceSupWorker - alias Logflare.LogEvent setup do start_supervised!(AllLogsLogged) @@ -265,6 +264,25 @@ defmodule Logflare.BackendsTest do :timer.sleep(1_500) assert Backends.get_and_cache_local_pending_buffer_len(source.id) == 0 end + + test "cache_estimated_buffer_lens/1 will cache all queue information", %{ + source: %{id: source_id} = source + } do + assert {:ok, + %{ + len: 0, + queues: [_, _] + }} = Backends.cache_local_buffer_lens(source_id) + + events = for _n <- 1..5, do: build(:log_event, source: source, some: "event") + assert {:ok, 5} = Backends.ingest_logs(events, source) + + assert {:ok, + %{ + len: 5, + queues: [_, _] + }} = Backends.cache_local_buffer_lens(source_id) + end end describe "ingest filters" do diff --git a/test/logflare_web/plugs/buffer_limiter_test.exs b/test/logflare_web/plugs/buffer_limiter_test.exs index da58f0a2d..3bd93fd82 100644 --- a/test/logflare_web/plugs/buffer_limiter_test.exs +++ b/test/logflare_web/plugs/buffer_limiter_test.exs @@ -6,6 +6,7 @@ defmodule LogflareWeb.Plugs.BufferLimiterTest do alias Logflare.Backends setup do + insert(:plan) conn = build_conn(:post, "/api/logs", %{"message" => "some text"}) source = insert(:source, user: insert(:user)) table_key = {source.id, nil, self()} @@ -24,7 +25,7 @@ defmodule LogflareWeb.Plugs.BufferLimiterTest do end # get and cache the value - Backends.get_and_cache_local_pending_buffer_len(source.id, nil) + Backends.cache_local_buffer_lens(source.id, nil) conn = conn @@ -35,6 +36,49 @@ defmodule LogflareWeb.Plugs.BufferLimiterTest do assert conn.status == 429 end + test "bug: buffer limiting is based on all queues", %{ + conn: conn, + source: source, + table_key: table_key + } do + other_table_key = {source.id, nil, make_ref()} + IngestEventQueue.upsert_tid(other_table_key) + + for _ <- 1..25_100 do + le = build(:log_event) + IngestEventQueue.add_to_table(table_key, [le]) + IngestEventQueue.add_to_table(other_table_key, [le]) + end + + # get and cache the value + Backends.cache_local_buffer_lens(source.id, nil) + + conn = + conn + |> assign(:source, source) + |> BufferLimiter.call(%{}) + + assert conn.halted == false + + for _ <- 1..25_100 do + le = build(:log_event) + IngestEventQueue.add_to_table(table_key, [le]) + IngestEventQueue.add_to_table(other_table_key, [le]) + end + + # get and cache the value + Backends.cache_local_buffer_lens(source.id, nil) + + conn = + conn + |> recycle() + |> assign(:source, source) + |> BufferLimiter.call(%{}) + + assert conn.halted == true + assert conn.status == 429 + end + test "200 if most events are ingested", %{conn: conn, source: source, table_key: table_key} do for _ <- 1..8_000 do le = build(:log_event) @@ -43,7 +87,7 @@ defmodule LogflareWeb.Plugs.BufferLimiterTest do end # get and cache the value - Backends.get_and_cache_local_pending_buffer_len(source.id, nil) + Backends.cache_local_buffer_lens(source.id, nil) conn = conn diff --git a/test/profiling/buffer_limiter.exs b/test/profiling/buffer_limiter.exs new file mode 100644 index 000000000..56d80f8bc --- /dev/null +++ b/test/profiling/buffer_limiter.exs @@ -0,0 +1,72 @@ +alias Logflare.Sources +alias Logflare.Users +require Phoenix.ConnTest +Mimic.copy(Broadway) +Mimic.copy(Logflare.Backends) +Mimic.copy(Logflare.Logs) +Mimic.copy(Logflare.Partners) +alias Logflare.Backends.IngestEventQueue + +Mimic.stub(Logflare.Backends, :ingest_logs, fn _, _ -> :ok end) +Mimic.stub(Logflare.Logs, :ingest_logs, fn _, _ -> :ok end) +# Mimic.stub(Broadway, :push_messages, fn _, _ -> :ok end) +ver = System.argv() |> Enum.at(0) + +source = Sources.get(:"9f37d86e-e4fa-4ef2-a47e-e8d4ac1fceba") + +# v2_source = Sources.get(:"94d07aab-30f5-460e-8871-eb85f4674e35") + +# user = Users.get(v1_source.user_id) + +Benchee.run( + %{ + "list_pending_counts" => fn _input -> + IngestEventQueue.list_pending_counts({source.id, nil}) + end, + "cached_local_pending_buffer_len" => fn _input -> + Logflare.Backends.cached_local_pending_buffer_len(source.id) + end, + "get_buffers" => fn _input -> + Logflare.PubSubRates.Cache.get_buffers(source.id, nil) + end, + "cache_local_buffer_lens" => fn _ -> + Logflare.Backends.cache_local_buffer_lens(source.id) + end + }, + before_scenario: fn prev -> + key1 = {source.id, nil, make_ref()} + key2 = {source.id, nil, make_ref()} + IngestEventQueue.upsert_tid(key1) + IngestEventQueue.upsert_tid(key2) + + events = + for _ <- 1..10_000 do + Logflare.Factory.build(:log_event) + end + + IngestEventQueue.add_to_table(key1, events) + IngestEventQueue.add_to_table(key2, events) + + prev + end, + inputs: %{ + "v1" => source + # "v2" => v2_source + }, + time: 4, + memory_time: 0 +) + +# 2024-12-14 addition of cache_local_buffer_lens/1 +# ##### With input v1 ##### +# Name ips average deviation median 99th % +# get_buffers 2247.46 K 0.44 μs ±5589.04% 0.38 μs 0.58 μs +# cached_local_pending_buffer_len 264.75 K 3.78 μs ±429.42% 3.50 μs 5.42 μs +# cache_local_buffer_lens 148.17 K 6.75 μs ±184.04% 6.25 μs 12.21 μs +# list_pending_counts 1.07 K 933.85 μs ±22.27% 887.48 μs 1523.25 μs + +# Comparison: +# get_buffers 2247.46 K +# cached_local_pending_buffer_len 264.75 K - 8.49x slower +3.33 μs +# cache_local_buffer_lens 148.17 K - 15.17x slower +6.30 μs +# list_pending_counts 1.07 K - 2098.80x slower +933.41 μs