diff --git a/lib/logflare/backends/adaptor/postgres_adaptor/pipeline.ex b/lib/logflare/backends/adaptor/postgres_adaptor/pipeline.ex index 79c498cea..ec5a052e1 100644 --- a/lib/logflare/backends/adaptor/postgres_adaptor/pipeline.ex +++ b/lib/logflare/backends/adaptor/postgres_adaptor/pipeline.ex @@ -15,7 +15,9 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor.Pipeline do Broadway.start_link(__MODULE__, name: adaptor_state.pipeline_name, producer: [ - module: {BufferProducer, [source: adaptor_state.source, backend: adaptor_state.backend]}, + module: + {BufferProducer, + [source_id: adaptor_state.source.id, backend_id: adaptor_state.backend.id]}, transformer: {__MODULE__, :transform, []}, concurrency: 1 ], diff --git a/lib/logflare/backends/adaptor/webhook_adaptor.ex b/lib/logflare/backends/adaptor/webhook_adaptor.ex index a4c35fe58..1aaef6693 100644 --- a/lib/logflare/backends/adaptor/webhook_adaptor.ex +++ b/lib/logflare/backends/adaptor/webhook_adaptor.ex @@ -105,8 +105,8 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do module: {BufferProducer, [ - backend: args.backend, - source: args.source + backend_id: args.backend.id, + source_id: args.source.id ]}, transformer: {__MODULE__, :transform, []}, concurrency: 1 diff --git a/lib/logflare/backends/buffer_producer.ex b/lib/logflare/backends/buffer_producer.ex index efec15432..34d6e50df 100644 --- a/lib/logflare/backends/buffer_producer.ex +++ b/lib/logflare/backends/buffer_producer.ex @@ -15,12 +15,14 @@ defmodule Logflare.Backends.BufferProducer do @impl GenStage def init(opts) do + source = Sources.Cache.get_by_id(opts[:source_id]) + state = %{ demand: 0, # TODO: broadcast by id instead. - source_id: opts[:source].id, - source_token: opts[:source].token, - backend_id: Map.get(opts[:backend] || %{}, :id), + source_id: opts[:source_id], + source_token: source.token, + backend_id: opts[:backend_id], # discard logging backoff last_discard_log_dt: nil, interval: Keyword.get(opts, :interval, @default_interval) diff --git a/lib/logflare/source/bigquery/pipeline.ex b/lib/logflare/source/bigquery/pipeline.ex index ba3901560..fece6ae25 100644 --- a/lib/logflare/source/bigquery/pipeline.ex +++ b/lib/logflare/source/bigquery/pipeline.ex @@ -45,8 +45,8 @@ defmodule Logflare.Source.BigQuery.Pipeline do module: {BufferProducer, [ - source: source, - backend: backend + source_id: source.id, + backend_id: backend.id ]}, transformer: {__MODULE__, :transform, []} ], diff --git a/test/logflare/backends/buffer_producer_test.exs b/test/logflare/backends/buffer_producer_test.exs index 7c0a4e96a..45df60edb 100644 --- a/test/logflare/backends/buffer_producer_test.exs +++ b/test/logflare/backends/buffer_producer_test.exs @@ -16,7 +16,10 @@ defmodule Logflare.Backends.BufferProducerTest do source = insert(:source, user: user) le = build(:log_event, source: source) - buffer_producer_pid = start_supervised!({BufferProducer, backend: nil, source: source}) + + buffer_producer_pid = + start_supervised!({BufferProducer, backend_id: nil, source_id: source.id}) + sid_bid_pid = {source.id, nil, buffer_producer_pid} :timer.sleep(100) :ok = IngestEventQueue.add_to_table(sid_bid_pid, [le]) @@ -37,7 +40,10 @@ defmodule Logflare.Backends.BufferProducerTest do startup_key = {source.id, nil, nil} IngestEventQueue.upsert_tid(startup_key) :ok = IngestEventQueue.add_to_table(startup_key, [le]) - buffer_producer_pid = start_supervised!({BufferProducer, backend: nil, source: source}) + + buffer_producer_pid = + start_supervised!({BufferProducer, backend_id: nil, source_id: source.id}) + sid_bid_pid = {source.id, nil, buffer_producer_pid} GenStage.stream([{buffer_producer_pid, max_demand: 1}]) @@ -53,7 +59,7 @@ defmodule Logflare.Backends.BufferProducerTest do source = insert(:source, user: user) pid = - start_supervised!({BufferProducer, backend: nil, source: source, buffer_size: 10}) + start_supervised!({BufferProducer, backend_id: nil, source_id: source.id, buffer_size: 10}) le = build(:log_event) items = for _ <- 1..100, do: le