Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: add schema sampling based on local ingestion rates on high ingestion volume #2106

Merged
merged 2 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/docs.logflare.com/docs/concepts/ingestion/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ metadata: {
}
```

:::note
On high ingestion volume, Logflare will sample incoming events instead of checking each event. The sample rate decreases as the ingestion rate increases. Ingestion rates are compared only on an individual local server that is performing the ingestion.

From 1000-5000 events per second, sample rate is 0.1. Above 5000 events per second, sample rate is 0.01.
:::

### Key Transformation

When logging object, your object keys will be transformed automatically to comply with the respective backend in use. For example, BigQuery column requirements require that names only contain letters (a-z, A-Z), numbers (0-9), or underscores (\_), and it must start with a letter or underscore. This will be automatically handled for you when ingesting data.
Expand Down
29 changes: 29 additions & 0 deletions lib/logflare/pubsub_rates/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,35 @@ defmodule Logflare.PubSubRates.Cache do
Cachex.get(__MODULE__, {source_id, "rates"})
end

def get_local_rates(source_id) when is_atom(source_id) do
node = Node.self()

default = %{
average_rate: 0,
last_rate: 0,
max_rate: 0,
limiter_metrics: %{average: 0, duration: @default_bucket_width, sum: 0}
}

case get_rates(source_id) do
{:ok, nil} ->
default

{:ok, rates} ->
Map.get(rates, node, default)

{:error, _} = err ->
Logger.error("Error when getting pubsub clustr rates: #{inspect(err)}")

%{
average_rate: -1,
last_rate: -1,
max_rate: -1,
limiter_metrics: %{average: 100_000, duration: @default_bucket_width, sum: 6_000_000}
}
end
end

def get_cluster_rates(source_id) when is_atom(source_id) do
case get_rates(source_id) do
{:ok, nil} ->
Expand Down
18 changes: 15 additions & 3 deletions lib/logflare/source/bigquery/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ defmodule Logflare.Source.BigQuery.Pipeline do
alias Logflare.Source.Supervisor
alias Logflare.Sources
alias Logflare.Users
alias Logflare.PubSubRates

# each batch should at most be 5MB
# BQ max is 10MB
Expand Down Expand Up @@ -189,9 +190,20 @@ defmodule Logflare.Source.BigQuery.Pipeline do
# then this makes BigQuery check the payloads for new fields. In the response we'll get a list of events that didn't validate.
# Send those events through the pipeline again, but run them through our schema process this time. Do all
# these things a max of like 5 times and after that send them to the rejected pile.
:ok =
Backends.via_source(source, {Schema, Map.get(context, :backend_id)})
|> Schema.update(log_event)

# random sample if local ingest rate is above a certain level
probability =
case PubSubRates.Cache.get_local_rates(source.token) do
%{average_rate: avg} when avg > 5000 -> 0.01
%{average_rate: avg} when avg > 1000 -> 0.1
_ -> 1
end

if :rand.uniform() <= probability do
:ok =
Backends.via_source(source, {Schema, Map.get(context, :backend_id)})
|> Schema.update(log_event)
end

log_event
end
Expand Down
Loading