diff --git a/docs/docs.logflare.com/docs/concepts/ingestion/index.mdx b/docs/docs.logflare.com/docs/concepts/ingestion/index.mdx index bb656d248..f0c50bcf8 100644 --- a/docs/docs.logflare.com/docs/concepts/ingestion/index.mdx +++ b/docs/docs.logflare.com/docs/concepts/ingestion/index.mdx @@ -101,6 +101,12 @@ metadata: { } ``` +:::note +On high ingestion volume, Logflare will sample incoming events instead of checking each event. The sample rate decreases as the ingestion rate increases. Ingestion rates are compared only on an individual local server that is performing the ingestion. + +From 1000-5000 events per second, sample rate is 0.1. Above 5000 events per second, sample rate is 0.01. +::: + ### Key Transformation When logging object, your object keys will be transformed automatically to comply with the respective backend in use. For example, BigQuery column requirements require that names only contain letters (a-z, A-Z), numbers (0-9), or underscores (\_), and it must start with a letter or underscore. This will be automatically handled for you when ingesting data. diff --git a/lib/logflare/pubsub_rates/cache.ex b/lib/logflare/pubsub_rates/cache.ex index 7b5664129..907e4512d 100644 --- a/lib/logflare/pubsub_rates/cache.ex +++ b/lib/logflare/pubsub_rates/cache.ex @@ -76,6 +76,35 @@ defmodule Logflare.PubSubRates.Cache do Cachex.get(__MODULE__, {source_id, "rates"}) end + def get_local_rates(source_id) when is_atom(source_id) do + node = Node.self() + + default = %{ + average_rate: 0, + last_rate: 0, + max_rate: 0, + limiter_metrics: %{average: 0, duration: @default_bucket_width, sum: 0} + } + + case get_rates(source_id) do + {:ok, nil} -> + default + + {:ok, rates} -> + Map.get(rates, node, default) + + {:error, _} = err -> + Logger.error("Error when getting pubsub clustr rates: #{inspect(err)}") + + %{ + average_rate: -1, + last_rate: -1, + max_rate: -1, + limiter_metrics: %{average: 100_000, duration: @default_bucket_width, sum: 6_000_000} + } + end + end + def get_cluster_rates(source_id) when is_atom(source_id) do case get_rates(source_id) do {:ok, nil} -> diff --git a/lib/logflare/source/bigquery/pipeline.ex b/lib/logflare/source/bigquery/pipeline.ex index 232a881be..13bc655a9 100644 --- a/lib/logflare/source/bigquery/pipeline.ex +++ b/lib/logflare/source/bigquery/pipeline.ex @@ -19,6 +19,7 @@ defmodule Logflare.Source.BigQuery.Pipeline do alias Logflare.Source.Supervisor alias Logflare.Sources alias Logflare.Users + alias Logflare.PubSubRates # each batch should at most be 5MB # BQ max is 10MB @@ -189,9 +190,20 @@ defmodule Logflare.Source.BigQuery.Pipeline do # then this makes BigQuery check the payloads for new fields. In the response we'll get a list of events that didn't validate. # Send those events through the pipeline again, but run them through our schema process this time. Do all # these things a max of like 5 times and after that send them to the rejected pile. - :ok = - Backends.via_source(source, {Schema, Map.get(context, :backend_id)}) - |> Schema.update(log_event) + + # random sample if local ingest rate is above a certain level + probability = + case PubSubRates.Cache.get_local_rates(source.token) do + %{average_rate: avg} when avg > 5000 -> 0.01 + %{average_rate: avg} when avg > 1000 -> 0.1 + _ -> 1 + end + + if :rand.uniform() <= probability do + :ok = + Backends.via_source(source, {Schema, Map.get(context, :backend_id)}) + |> Schema.update(log_event) + end log_event end