Skip to content

Commit

Permalink
perf: reduce broadway memory footprint
Browse files Browse the repository at this point in the history
  • Loading branch information
Ziinc committed Dec 11, 2024
1 parent f3ccdbc commit 38fc282
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 11 deletions.
4 changes: 3 additions & 1 deletion lib/logflare/backends/adaptor/postgres_adaptor/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor.Pipeline do
Broadway.start_link(__MODULE__,
name: adaptor_state.pipeline_name,
producer: [
module: {BufferProducer, [source: adaptor_state.source, backend: adaptor_state.backend]},
module:
{BufferProducer,
[source_id: adaptor_state.source.id, backend_id: adaptor_state.backend.id]},
transformer: {__MODULE__, :transform, []},
concurrency: 1
],
Expand Down
4 changes: 2 additions & 2 deletions lib/logflare/backends/adaptor/webhook_adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do
module:
{BufferProducer,
[
backend: args.backend,
source: args.source
backend_id: args.backend.id,
source_id: args.source.id
]},
transformer: {__MODULE__, :transform, []},
concurrency: 1
Expand Down
8 changes: 5 additions & 3 deletions lib/logflare/backends/buffer_producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ defmodule Logflare.Backends.BufferProducer do

@impl GenStage
def init(opts) do
source = Sources.Cache.get_by_id(opts[:source_id])

state = %{
demand: 0,
# TODO: broadcast by id instead.
source_id: opts[:source].id,
source_token: opts[:source].token,
backend_id: Map.get(opts[:backend] || %{}, :id),
source_id: opts[:source_id],
source_token: source.token,
backend_id: opts[:backend_id],
# discard logging backoff
last_discard_log_dt: nil,
interval: Keyword.get(opts, :interval, @default_interval)
Expand Down
4 changes: 2 additions & 2 deletions lib/logflare/source/bigquery/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ defmodule Logflare.Source.BigQuery.Pipeline do
module:
{BufferProducer,
[
source: source,
backend: backend
source_id: source.id,
backend_id: backend.id
]},
transformer: {__MODULE__, :transform, []}
],
Expand Down
12 changes: 9 additions & 3 deletions test/logflare/backends/buffer_producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ defmodule Logflare.Backends.BufferProducerTest do
source = insert(:source, user: user)

le = build(:log_event, source: source)
buffer_producer_pid = start_supervised!({BufferProducer, backend: nil, source: source})

buffer_producer_pid =
start_supervised!({BufferProducer, backend_id: nil, source_id: source.id})

sid_bid_pid = {source.id, nil, buffer_producer_pid}
:timer.sleep(100)
:ok = IngestEventQueue.add_to_table(sid_bid_pid, [le])
Expand All @@ -37,7 +40,10 @@ defmodule Logflare.Backends.BufferProducerTest do
startup_key = {source.id, nil, nil}
IngestEventQueue.upsert_tid(startup_key)
:ok = IngestEventQueue.add_to_table(startup_key, [le])
buffer_producer_pid = start_supervised!({BufferProducer, backend: nil, source: source})

buffer_producer_pid =
start_supervised!({BufferProducer, backend_id: nil, source_id: source.id})

sid_bid_pid = {source.id, nil, buffer_producer_pid}

GenStage.stream([{buffer_producer_pid, max_demand: 1}])
Expand All @@ -53,7 +59,7 @@ defmodule Logflare.Backends.BufferProducerTest do
source = insert(:source, user: user)

pid =
start_supervised!({BufferProducer, backend: nil, source: source, buffer_size: 10})
start_supervised!({BufferProducer, backend_id: nil, source_id: source.id, buffer_size: 10})

le = build(:log_event)
items = for _ <- 1..100, do: le
Expand Down

0 comments on commit 38fc282

Please sign in to comment.