Skip to content

Commit

Permalink
Revert "fix: BufferCounter removal and considation to BufferProducer"
Browse files Browse the repository at this point in the history
This reverts commit f88421f.
  • Loading branch information
Ziinc committed May 2, 2024
1 parent f88421f commit 3775dd3
Show file tree
Hide file tree
Showing 22 changed files with 565 additions and 248 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.6.6
1.6.5
12 changes: 0 additions & 12 deletions lib/logflare/backends.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ defmodule Logflare.Backends do
alias Logflare.Logs
alias Logflare.Logs.SourceRouting
alias Logflare.SystemMetrics
alias Logflare.PubSubRates
import Ecto.Query

@adaptor_mapping %{
Expand Down Expand Up @@ -380,17 +379,6 @@ defmodule Logflare.Backends do
end
end

@doc """
Retrieves cluster-wide buffer size stored in cache for a given backend/source combination.
"""
def buffer_len(%Source{} = source, backend \\ nil) do
if backend do
PubSubRates.Cache.get_cluster_buffers(source.token, backend.token)
else
PubSubRates.Cache.get_cluster_buffers(source.token)
end
end

@doc """
Lists the latest recent logs of all caches across the cluster.
Performs a check to ensure that the cache is started. If not started yet globally, it will start the cache locally.
Expand Down
22 changes: 17 additions & 5 deletions lib/logflare/backends/adaptor/bigquery_adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do
alias Logflare.Source.BigQuery.Pipeline
alias Logflare.Source.BigQuery.Schema
alias Logflare.Source.BigQuery.Pipeline
alias Logflare.Source.BigQuery.BufferCounter
alias Logflare.Users
alias Logflare.Billing
use Supervisor
Expand Down Expand Up @@ -34,11 +35,17 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do

with :ok <- Backends.register_backend_for_ingest_dispatch(source, backend) do
children = [
{BufferCounter,
[
source_id: source.id,
source_token: source.token,
backend_token: backend.token,
name: Backends.via_source(source, BufferCounter, backend.id)
]},
{Pipeline,
[
source: source,
backend_id: backend.id,
backend_token: backend.token,
bigquery_project_id: project_id,
bigquery_dataset_id: dataset_id,
name: Backends.via_source(source, Pipeline, backend.id)
Expand All @@ -63,20 +70,25 @@ defmodule Logflare.Backends.Adaptor.BigQueryAdaptor do
backend_id = Keyword.get(opts, :backend_id)
source = Sources.Cache.get_by_id(source_id)

buffer_counter_via = Backends.via_source(source, {BufferCounter, backend_id})

messages =
for le <- log_events,
do: %Broadway.Message{
data: le,
acknowledger: {__MODULE__, nil, nil}
acknowledger: {__MODULE__, buffer_counter_via, nil}
}

Backends.via_source(source, {Pipeline, backend_id})
|> Broadway.push_messages(messages)
with {:ok, _count} <- BufferCounter.inc(buffer_counter_via, Enum.count(messages)) do
Backends.via_source(source, {Pipeline, backend_id})
|> Broadway.push_messages(messages)
end

:ok
end

def ack(_via, _successful, _failed) do
def ack(via, successful, failed) do
BufferCounter.decr(via, Enum.count(successful) + Enum.count(failed))
:ok
end

Expand Down
6 changes: 1 addition & 5 deletions lib/logflare/backends/adaptor/postgres_adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor do

import Ecto.Changeset

typedstruct do
typedstruct enforce: true do
field(:config, %{
url: String.t(),
schema: String.t(),
Expand All @@ -34,8 +34,6 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor do

field(:source, Source.t())
field(:backend, Backend.t())
field(:backend_token, String.t())
field(:source_token, atom())
field(:pipeline_name, tuple())
end

Expand Down Expand Up @@ -183,8 +181,6 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor do
state = %__MODULE__{
config: backend.config,
backend: backend,
backend_token: if(backend, do: backend.token, else: nil),
source_token: source.token,
source: source,
pipeline_name: Backends.via_source(source, Pipeline, backend.id)
}
Expand Down
6 changes: 2 additions & 4 deletions lib/logflare/backends/adaptor/postgres_adaptor/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,14 @@ defmodule Logflare.Backends.Adaptor.PostgresAdaptor.Pipeline do

alias Broadway.Message
alias Logflare.Backends.Adaptor.PostgresAdaptor
alias Logflare.Backends.BufferProducer
alias Logflare.Buffers.BufferProducer

@spec start_link(PostgresAdaptor.t()) :: {:ok, pid()}
def start_link(adaptor_state) do
Broadway.start_link(__MODULE__,
name: adaptor_state.pipeline_name,
producer: [
module:
{BufferProducer,
[source_token: adaptor_state.source_token, backend_token: adaptor_state.backend_token]},
module: {BufferProducer, []},
transformer: {__MODULE__, :transform, []},
concurrency: 1
],
Expand Down
15 changes: 3 additions & 12 deletions lib/logflare/backends/adaptor/webhook_adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,14 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do

@behaviour Logflare.Backends.Adaptor

typedstruct do
typedstruct enforce: true do
field(:config, %{
url: String.t(),
headers: map()
})

field(:backend, Backend.t())
field(:pipeline_name, tuple())
field(:backend_token, String.t())
field(:source_token, atom())
end

# API
Expand Down Expand Up @@ -66,8 +64,6 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do
state = %__MODULE__{
config: backend.config,
backend: backend,
backend_token: if(backend, do: backend.token, else: nil),
source_token: source.token,
pipeline_name: Backends.via_source(source, __MODULE__.Pipeline, backend.id)
}

Expand Down Expand Up @@ -96,7 +92,7 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do
@moduledoc false
use Broadway
alias Broadway.Message
alias Logflare.Backends.BufferProducer
alias Logflare.Buffers.BufferProducer
alias Logflare.Backends.Adaptor.WebhookAdaptor
alias Logflare.Backends.Adaptor.WebhookAdaptor.Client

Expand All @@ -109,12 +105,7 @@ defmodule Logflare.Backends.Adaptor.WebhookAdaptor do
fullsweep_after: 100
],
producer: [
module:
{BufferProducer,
[
source_token: adaptor_state.source_token,
backend_token: adaptor_state.backend_token
]},
module: {BufferProducer, []},
transformer: {__MODULE__, :transform, []},
concurrency: 1
],
Expand Down
115 changes: 0 additions & 115 deletions lib/logflare/backends/buffer_producer.ex

This file was deleted.

64 changes: 64 additions & 0 deletions lib/logflare/buffers/buffer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
defmodule Logflare.Buffers.Buffer do
@moduledoc """
Defines a behaviour for a buffer.
"""

@doc """
Adds a list of payloads to the buffer.
"""
@callback add_many(identifier(), payloads :: [term()]) :: :ok

@doc """
Clears the buffer and removes all enqueued items.
"""
@callback clear(identifier()) :: :ok

@doc """
Returns the length of the buffer
"""
@callback length(identifier()) :: non_neg_integer()

@doc """
Returns multiple items from the buffer
"""
@callback pop_many(identifier(), non_neg_integer()) :: [term()]

@doc """
Adds payload to the buffer.
"""
@spec add(module(), identifier(), term()) :: :ok
def add(mod, ident, payload),
do: mod.add_many(ident, [payload])

@doc """
Adds a list of payloads to the buffer.
"""
@spec add_many(module(), identifier(), [term()]) :: :ok
def add_many(mod, ident, payloads) when is_list(payloads),
do: mod.add_many(ident, payloads)

@doc """
Clears the buffer and removes all enqueued items.
"""
@spec clear(module(), identifier()) :: :ok
def clear(mod, ident), do: mod.clear(ident)

@doc """
Returns the length of the buffer
"""
@spec length(module(), identifier()) :: non_neg_integer()
def length(mod, ident), do: mod.length(ident)

@doc """
Returns single item from the buffer
"""
@spec pop(module(), identifier()) :: term()
def pop(mod, ident), do: mod.pop_many(ident, 1)

@doc """
Returns multiple items from the buffer
"""
@spec pop_many(module(), identifier(), non_neg_integer()) :: [term()]
def pop_many(mod, ident, count) when is_integer(count) and count > 0,
do: mod.pop_many(ident, count)
end
33 changes: 33 additions & 0 deletions lib/logflare/buffers/buffer_producer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule Logflare.Buffers.BufferProducer do
@moduledoc """
A generic producer that acts as a producer but doesn't actually produce anything.
"""
use GenStage

def start_link(opts) do
GenStage.start_link(__MODULE__, opts)
end

def init(opts) do
state = Enum.into(opts, %{buffer_module: nil, buffer_pid: nil, demand: 0})
{:producer, state}
end

def handle_info(:resolve, state) do
{items, state} = resolve_demand(state)
{:noreply, items, state}
end

def handle_demand(demand, state) do
{items, state} = resolve_demand(state, demand)
{:noreply, items, state}
end

defp resolve_demand(
%{demand: prev_demand} = state,
new_demand \\ 0
) do
total_demand = prev_demand + new_demand
{[], %{state | demand: total_demand}}
end
end
Loading

0 comments on commit 3775dd3

Please sign in to comment.