Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf+fix: remove expensive table traversal for pending counts, fix buffer limiter #2282

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 30 additions & 13 deletions lib/logflare/backends.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,13 @@ defmodule Logflare.Backends do

defdelegate child_spec(arg), to: __MODULE__.Supervisor

@max_pending_buffer_len 100_000
@max_pending_buffer_len_per_queue 50_000

@doc """
Retrieves the hardcoded max pending buffer length.
Retrieves the hardcoded max pending buffer length of an individual queue
"""
@spec max_buffer_len() :: non_neg_integer()
def max_buffer_len(), do: @max_pending_buffer_len

@spec max_ingest_queue_len() :: non_neg_integer()
def max_ingest_queue_len(), do: 10_000
@spec max_buffer_queue_len() :: non_neg_integer()
def max_buffer_queue_len(), do: @max_pending_buffer_len_per_queue

@doc """
Lists `Backend`s for a given source.
Expand Down Expand Up @@ -474,7 +471,16 @@ defmodule Logflare.Backends do
do: cached_local_pending_buffer_full?(id)

def cached_local_pending_buffer_full?(source_id) when is_integer(source_id) do
cached_local_pending_buffer_len(source_id) > @max_pending_buffer_len
PubSubRates.Cache.get_local_buffer(source_id, nil)
|> Map.get(:queues, [])
|> case do
[] ->
false

queues ->
queues
|> Enum.all?(fn {_key, v} -> v > @max_pending_buffer_len_per_queue end)
end
end

@doc """
Expand All @@ -485,24 +491,35 @@ defmodule Logflare.Backends do
nil | integer()
) ::
integer()
@deprecated "call `Logflare.Backends.cache_local_buffer_lens/2` instead."
def get_and_cache_local_pending_buffer_len(source_id, backend_id \\ nil)
when is_integer(source_id) do
len = IngestEventQueue.count_pending({source_id, backend_id})
len = IngestEventQueue.total_pending({source_id, backend_id})
payload = %{Node.self() => %{len: len}}
PubSubRates.Cache.cache_buffers(source_id, backend_id, payload)
len
end

@doc """
Caches total buffer len. Includes ingested events that are awaiting cleanup.
"""
def cache_local_buffer_lens(source_id, backend_id \\ nil) do
queues = IngestEventQueue.list_counts({source_id, backend_id})

len = for({_k, v} <- queues, do: v) |> Enum.sum()

stats = %{len: len, queues: queues}
payload = %{Node.self() => stats}
PubSubRates.Cache.cache_buffers(source_id, backend_id, payload)
{:ok, stats}
end

@doc """
Get local pending buffer len of a source/backend combination
"""
@spec cached_local_pending_buffer_len(Source.t(), Backend.t() | nil) :: non_neg_integer()
def cached_local_pending_buffer_len(source_id, backend_id \\ nil) when is_integer(source_id) do
PubSubRates.Cache.get_local_buffer(source_id, backend_id)
|> case do
%{len: len} -> len
other -> other
end
end

@doc """
Expand Down
4 changes: 2 additions & 2 deletions lib/logflare/backends/adaptor/bigquery_adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do
resolve_count: fn state ->
source = Sources.refresh_source_metrics_for_ingest(source)

lens = IngestEventQueue.list_pending_counts({source.id, backend.id})
lens = IngestEventQueue.list_counts({source.id, backend.id})

handle_resolve_count(state, lens, source.metrics.avg)
end
Expand All @@ -78,7 +78,7 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do

"""
def handle_resolve_count(state, lens, avg_rate) do
max_len = Backends.max_ingest_queue_len()
max_len = Backends.max_buffer_queue_len()

startup_size =
Enum.find_value(lens, 0, fn
Expand Down
8 changes: 4 additions & 4 deletions lib/logflare/backends/ingest_event_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ defmodule Logflare.Backends.IngestEventQueue do
fn objs, acc ->
items =
for {sid_bid_pid, _tid} <- objs,
count = count_pending(sid_bid_pid),
count = total_pending(sid_bid_pid),
is_integer(count) do
{sid_bid_pid, count}
end
Expand Down Expand Up @@ -271,15 +271,15 @@ defmodule Logflare.Backends.IngestEventQueue do
@doc """
Counts pending items from a given table
"""
@spec count_pending(source_backend_pid()) :: integer() | {:error, :not_initialized}
def count_pending({_, _} = sid_bid) do
@spec total_pending(source_backend_pid()) :: integer() | {:error, :not_initialized}
def total_pending({_, _} = sid_bid) do
# iterate over each matching source-backend queue and sum the totals
for {_sid_bid_pid, count} <- list_pending_counts(sid_bid), reduce: 0 do
acc -> acc + count
end
end

def count_pending({_sid, _bid, _pid} = sid_bid_pid) do
def total_pending({_sid, _bid, _pid} = sid_bid_pid) do
ms =
Ex2ms.fun do
{_event_id, :pending, _event} -> true
Expand Down
4 changes: 2 additions & 2 deletions lib/logflare/backends/ingest_event_queue/broadcast_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ defmodule Logflare.Backends.IngestEventQueue.BroadcastWorker do
end

defp global_broadcast_producer_buffer_len({source_id, backend_id}) do
len = Backends.get_and_cache_local_pending_buffer_len(source_id, backend_id)
{:ok, stats} = Backends.cache_local_buffer_lens(source_id, backend_id)

local_buffer = %{Node.self() => %{len: len}}
local_buffer = %{Node.self() => stats}
PubSubRates.global_broadcast_rate({"buffers", source_id, backend_id, local_buffer})
end

Expand Down
14 changes: 7 additions & 7 deletions lib/logflare/backends/ingest_event_queue/queue_janitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Logflare.Backends.IngestEventQueue.QueueJanitor do
require Logger
@default_interval 1_000
@default_remainder 100
@default_max Logflare.Backends.max_buffer_len()
@default_max Logflare.Backends.max_buffer_queue_len()
@default_purge_ratio 0.1

def start_link(opts) do
Expand Down Expand Up @@ -53,18 +53,18 @@ defmodule Logflare.Backends.IngestEventQueue.QueueJanitor do
sid_bid = {state.source_id, state.backend_id}
# safety measure, drop all if still exceed
for {_sid, _bid, ref} = sid_bid_pid <- IngestEventQueue.list_queues(sid_bid),
pending_size = IngestEventQueue.count_pending(sid_bid_pid),
is_integer(pending_size) do
if pending_size > state.remainder do
size = IngestEventQueue.get_table_size(sid_bid_pid),
is_integer(size) do
if size > state.remainder do
IngestEventQueue.truncate_table(sid_bid_pid, :ingested, 0)
else
IngestEventQueue.truncate_table(sid_bid_pid, :ingested, state.remainder)
end

pending_size = IngestEventQueue.count_pending(sid_bid_pid)
size = IngestEventQueue.total_pending(sid_bid_pid)

if pending_size > state.max and ref != nil do
to_drop = round(state.purge_ratio * pending_size)
if size > state.max and ref != nil do
to_drop = round(state.purge_ratio * size)
IngestEventQueue.drop(sid_bid_pid, :pending, to_drop)

Logger.warning(
Expand Down
2 changes: 1 addition & 1 deletion lib/logflare/logs/browser_report.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ defmodule Logflare.Logs.BrowserReport do
end

def message(params) do
Jason.encode!(params)
Jason.encode_to_iodata!(params)
end
end
4 changes: 2 additions & 2 deletions lib/logflare/pubsub_rates/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ defmodule Logflare.PubSubRates.Cache do
def get_local_buffer(source_id, backend_id) do
Cachex.get(__MODULE__, {source_id, backend_id, "buffers"})
|> case do
{:ok, val} when val != nil -> Map.get(val, Node.self(), 0)
_ -> 0
{:ok, val} when val != nil -> Map.get(val, Node.self(), %{len: 0, queues: []})
_ -> %{len: 0, queues: []}
end
end

Expand Down
4 changes: 2 additions & 2 deletions test/logflare/backends/buffer_producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ defmodule Logflare.Backends.BufferProducerTest do
GenStage.stream([{buffer_producer_pid, max_demand: 1}])
|> Enum.take(1)

assert IngestEventQueue.count_pending(sid_bid_pid) == 0
assert IngestEventQueue.total_pending(sid_bid_pid) == 0
# marked as :ingested
assert IngestEventQueue.get_table_size(sid_bid_pid) == 1
end
Expand All @@ -49,7 +49,7 @@ defmodule Logflare.Backends.BufferProducerTest do
GenStage.stream([{buffer_producer_pid, max_demand: 1}])
|> Enum.take(1)

assert IngestEventQueue.count_pending(sid_bid_pid) == 0
assert IngestEventQueue.total_pending(sid_bid_pid) == 0
# marked as :ingested
assert IngestEventQueue.get_table_size(sid_bid_pid) == 1
end
Expand Down
32 changes: 16 additions & 16 deletions test/logflare/backends/ingest_events_queue_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -127,23 +127,23 @@ defmodule Logflare.Backends.IngestEventQueueTest do
test "adding to startup queue", %{queue: {sid, bid, _} = queue} do
le = build(:log_event, message: "123")
assert :ok = IngestEventQueue.add_to_table(queue, [le])
assert IngestEventQueue.count_pending(queue) == 1
assert IngestEventQueue.count_pending({sid, bid}) == 1
assert IngestEventQueue.count_pending({sid, bid, nil}) == 1
assert IngestEventQueue.total_pending(queue) == 1
assert IngestEventQueue.total_pending({sid, bid}) == 1
assert IngestEventQueue.total_pending({sid, bid, nil}) == 1
end

test "move/1 moves all events from one queue to target queue", %{queue: {sid, bid, _} = queue} do
target = {sid, bid, self()}
le = build(:log_event, message: "123")
IngestEventQueue.upsert_tid(target)
assert :ok = IngestEventQueue.add_to_table(queue, [le])
assert IngestEventQueue.count_pending(queue) == 1
assert IngestEventQueue.count_pending(target) == 0
assert IngestEventQueue.total_pending(queue) == 1
assert IngestEventQueue.total_pending(target) == 0
assert {:ok, 1} = IngestEventQueue.move(queue, target)
assert IngestEventQueue.count_pending(queue) == 0
assert IngestEventQueue.count_pending(target) == 1
assert IngestEventQueue.count_pending({sid, bid}) == 1
assert IngestEventQueue.count_pending({sid, bid, nil}) == 0
assert IngestEventQueue.total_pending(queue) == 0
assert IngestEventQueue.total_pending(target) == 1
assert IngestEventQueue.total_pending({sid, bid}) == 1
assert IngestEventQueue.total_pending({sid, bid, nil}) == 0
end
end

Expand All @@ -162,15 +162,15 @@ defmodule Logflare.Backends.IngestEventQueueTest do
assert IngestEventQueue.get_table_size(sbp) == 1
# can take pending items
assert {:ok, [_]} = IngestEventQueue.take_pending(sbp, 5)
assert IngestEventQueue.count_pending(sbp) == 1
assert IngestEventQueue.total_pending(sbp) == 1
# set to ingested
assert {:ok, 1} = IngestEventQueue.mark_ingested(sbp, [le])
assert IngestEventQueue.count_pending(sbp) == 0
assert IngestEventQueue.total_pending(sbp) == 0
# truncate to n items
assert :ok = IngestEventQueue.truncate_table(sbp, :ingested, 1)
assert IngestEventQueue.get_table_size(sbp) == 1
assert :ok = IngestEventQueue.truncate_table(sbp, :ingested, 0)
assert IngestEventQueue.count_pending(sbp) == 0
assert IngestEventQueue.total_pending(sbp) == 0
end

test "drop n items from a queue", %{source_backend_pid: sbp} do
Expand Down Expand Up @@ -207,7 +207,7 @@ defmodule Logflare.Backends.IngestEventQueueTest do
assert {:ok, _} = IngestEventQueue.mark_ingested(sbp, batch)
assert :ok = IngestEventQueue.truncate_table(sbp, :ingested, 50)
assert IngestEventQueue.get_table_size(sbp) == 50
assert IngestEventQueue.count_pending(sbp) == 0
assert IngestEventQueue.total_pending(sbp) == 0
assert :ok = IngestEventQueue.truncate_table(sbp, :ingested, 0)
assert IngestEventQueue.get_table_size(sbp) == 0
end
Expand All @@ -221,9 +221,9 @@ defmodule Logflare.Backends.IngestEventQueueTest do
# add as pending
assert :ok = IngestEventQueue.add_to_table(sbp, batch)
assert :ok = IngestEventQueue.truncate_table(sbp, :pending, 50)
assert IngestEventQueue.count_pending(sbp) == 50
assert IngestEventQueue.total_pending(sbp) == 50
assert :ok = IngestEventQueue.truncate_table(sbp, :pending, 0)
assert IngestEventQueue.count_pending(sbp) == 0
assert IngestEventQueue.total_pending(sbp) == 0
end
end

Expand Down Expand Up @@ -270,7 +270,7 @@ defmodule Logflare.Backends.IngestEventQueueTest do

:timer.sleep(550)
assert IngestEventQueue.get_table_size(table) == 0
assert IngestEventQueue.count_pending(table) == 0
assert IngestEventQueue.total_pending(table) == 0
end

test "QueueJanitor purges if exceeds max" do
Expand Down
20 changes: 19 additions & 1 deletion test/logflare/backends_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
alias Logflare.Rules
alias Logflare.Backends.IngestEventQueue
alias Logflare.Backends.SourceSupWorker
alias Logflare.LogEvent

setup do
start_supervised!(AllLogsLogged)
Expand Down Expand Up @@ -219,7 +218,7 @@
{:ok, source: source}
end

test "correctly retains the 100 items", %{source: source} do

Check failure on line 221 in test/logflare/backends_test.exs

View workflow job for this annotation

GitHub Actions / Checks (Tests, mix do ecto.create, ecto.migrate, test)

test ingestion correctly retains the 100 items (Logflare.BackendsTest)
events = for _n <- 1..105, do: build(:log_event, source: source, some: "event")
assert {:ok, 105} = Backends.ingest_logs(events, source)
:timer.sleep(1500)
Expand Down Expand Up @@ -265,6 +264,25 @@
:timer.sleep(1_500)
assert Backends.get_and_cache_local_pending_buffer_len(source.id) == 0
end

test "cache_estimated_buffer_lens/1 will cache all queue information", %{
source: %{id: source_id} = source
} do
assert {:ok,
%{
len: 0,
queues: [_, _]
}} = Backends.cache_local_buffer_lens(source_id)

events = for _n <- 1..5, do: build(:log_event, source: source, some: "event")
assert {:ok, 5} = Backends.ingest_logs(events, source)

assert {:ok,
%{
len: 5,
queues: [_, _]
}} = Backends.cache_local_buffer_lens(source_id)
end
end

describe "ingest filters" do
Expand Down
Loading
Loading