diff --git a/Makefile b/Makefile index 3badbe3a5d..be8ec31531 100644 --- a/Makefile +++ b/Makefile @@ -141,7 +141,7 @@ lint: alloylint # final command runs tests for all other submodules. test: $(GO_ENV) go test $(GO_FLAGS) -race $(shell go list ./... | grep -v /integration-tests/) - $(GO_ENV) go test $(GO_FLAGS) ./internal/static/integrations/node_exporter ./internal/static/logs ./internal/component/otelcol/processor/tail_sampling ./internal/component/loki/source/file ./internal/component/loki/source/docker ./internal/component/prometheus/remote/queue/serialization + $(GO_ENV) go test $(GO_FLAGS) ./internal/static/integrations/node_exporter ./internal/static/logs ./internal/component/otelcol/processor/tail_sampling ./internal/component/loki/source/file ./internal/component/loki/source/docker ./internal/component/prometheus/remote/queue/serialization ./internal/component/prometheus/remote/queue/network $(GO_ENV) find . -name go.mod -not -path "./go.mod" -execdir go test -race ./... \; test-packages: diff --git a/docs/sources/reference/compatibility/_index.md b/docs/sources/reference/compatibility/_index.md index 6549abff99..e96eb008ab 100644 --- a/docs/sources/reference/compatibility/_index.md +++ b/docs/sources/reference/compatibility/_index.md @@ -175,6 +175,7 @@ The following components, grouped by namespace, _export_ Prometheus `MetricsRece {{< collapse title="prometheus" >}} - [prometheus.relabel](../components/prometheus/prometheus.relabel) - [prometheus.remote_write](../components/prometheus/prometheus.remote_write) +- [prometheus.write.queue](../components/prometheus/prometheus.write.queue) {{< /collapse >}} diff --git a/docs/sources/reference/components/prometheus/prometheus.write.queue.md b/docs/sources/reference/components/prometheus/prometheus.write.queue.md new file mode 100644 index 0000000000..43c9cb560d --- /dev/null +++ b/docs/sources/reference/components/prometheus/prometheus.write.queue.md @@ -0,0 +1,280 @@ +--- +canonical: https://grafana.com/docs/alloy/latest/reference/components/prometheus/prometheus.write.queue/ +description: Learn about prometheus.write.queue +title: prometheus.write.queue +--- + + +Experimental + +# prometheus.write.queue + +`prometheus.write.queue` collects metrics sent from other components into a +Write-Ahead Log (WAL) and forwards them over the network to a series of +user-supplied endpoints. Metrics are sent over the network using the +[Prometheus Remote Write protocol][remote_write-spec]. + +You can specify multiple `prometheus.write.queue` components by giving them different labels. + +You should consider everything here extremely experimental and highly subject to change. +[emote_write-spec]: https://prometheus.io/docs/specs/remote_write_spec/ + + + +## Usage + +```alloy +prometheus.write.queue "LABEL" { + endpoint "default "{ + url = REMOTE_WRITE_URL + + ... + } + + ... +} +``` + +## Arguments + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- | ----------- | ------- | -------- +`ttl` | `time` | `duration` | How long the samples can be queued for before they are discarded. | `2h` | no + +## Blocks + +The following blocks are supported inside the definition of +`prometheus.write.queue`: + +Hierarchy | Block | Description | Required +--------- | ----- | ----------- | -------- +persistence | [persistence][] | Configuration for persistence | no +endpoint | [endpoint][] | Location to send metrics to. | no +endpoint > basic_auth | [basic_auth][] | Configure basic_auth for authenticating to the endpoint. | no + +The `>` symbol indicates deeper levels of nesting. For example, `endpoint > +basic_auth` refers to a `basic_auth` block defined inside an +`endpoint` block. + +[endpoint]: #endpoint-block +[basic_auth]: #basic_auth-block +[persistence]: #persistence-block + +### persistence block + +The `persistence` block describes how often and at what limits to write to disk. Persistence settings +are shared for each `endpoint`. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- |-------------------------------------------------------------------------------|---------| -------- +`max_signals_to_batch` | `uint` | The maximum number of signals before they are batched to disk. | `10000` | no +`batch_interval` | `duration` | How often to batch signals to disk if `max_signals_to_batch` is not reached. | `5s` | no + + +### endpoint block + +The `endpoint` block describes a single location to send metrics to. Multiple +`endpoint` blocks can be provided to send metrics to multiple locations. Each +`endpoint` will have its own WAL folder. + +The following arguments are supported: + +Name | Type | Description | Default | Required +---- | ---- |------------------------------------------------------------------| ------ | -------- +`url` | `string` | Full URL to send metrics to. | | yes +`write_timeout` | `duration` | Timeout for requests made to the URL. | `"30s"` | no +`retry_backoff` | `duration` | How often to wait between retries. | `1s` | no +`max_retry_attempts` | Maximum number of retries before dropping the batch. | `0` | no +`batch_count` | `uint` | How many series to queue in each queue. | `1000` | no +`flush_interval` | `duration` | How often to wait until sending if `batch_count` is not trigger. | `1s` | no +`parallelism` | `uint` | How many parallel batches to write. | 10 | no +`external_labels` | `map(string)` | Labels to add to metrics sent over the network. | | no + +### basic_auth block + +{{< docs/shared lookup="reference/components/basic-auth-block.md" source="alloy" version="" >}} + + +## Exported fields + +The following fields are exported and can be referenced by other components: + +Name | Type | Description +---- | ---- | ----------- +`receiver` | `MetricsReceiver` | A value that other components can use to send metrics to. + +## Component health + +`prometheus.write.queue` is only reported as unhealthy if given an invalid +configuration. In those cases, exported fields are kept at their last healthy +values. + +## Debug information + +`prometheus.write.queue` does not expose any component-specific debug +information. + +## Debug metrics + +The following metrics are provided for backward compatibility. +They generally behave the same, but there are likely edge cases where they differ. + +* `prometheus_remote_write_wal_storage_created_series_total` (counter): Total number of created + series appended to the WAL. +* `prometheus_remote_write_wal_storage_removed_series_total` (counter): Total number of series + removed from the WAL. +* `prometheus_remote_write_wal_samples_appended_total` (counter): Total number of samples + appended to the WAL. +* `prometheus_remote_write_wal_exemplars_appended_total` (counter): Total number of exemplars + appended to the WAL. +* `prometheus_remote_storage_samples_total` (counter): Total number of samples + sent to remote storage. +* `prometheus_remote_storage_exemplars_total` (counter): Total number of + exemplars sent to remote storage. +* `prometheus_remote_storage_metadata_total` (counter): Total number of + metadata entries sent to remote storage. +* `prometheus_remote_storage_samples_failed_total` (counter): Total number of + samples that failed to send to remote storage due to non-recoverable errors. +* `prometheus_remote_storage_exemplars_failed_total` (counter): Total number of + exemplars that failed to send to remote storage due to non-recoverable errors. +* `prometheus_remote_storage_metadata_failed_total` (counter): Total number of + metadata entries that failed to send to remote storage due to + non-recoverable errors. +* `prometheus_remote_storage_samples_retries_total` (counter): Total number of + samples that failed to send to remote storage but were retried due to + recoverable errors. +* `prometheus_remote_storage_exemplars_retried_total` (counter): Total number of + exemplars that failed to send to remote storage but were retried due to + recoverable errors. +* `prometheus_remote_storage_metadata_retried_total` (counter): Total number of + metadata entries that failed to send to remote storage but were retried due + to recoverable errors. +* `prometheus_remote_storage_samples_dropped_total` (counter): Total number of + samples which were dropped after being read from the WAL before being sent to + remote_write because of an unknown reference ID. +* `prometheus_remote_storage_exemplars_dropped_total` (counter): Total number + of exemplars that were dropped after being read from the WAL before being + sent to remote_write because of an unknown reference ID. +* `prometheus_remote_storage_enqueue_retries_total` (counter): Total number of + times enqueue has failed because a shard's queue was full. +* `prometheus_remote_storage_sent_batch_duration_seconds` (histogram): Duration + of send calls to remote storage. +* `prometheus_remote_storage_queue_highest_sent_timestamp_seconds` (gauge): + Unix timestamp of the latest WAL sample successfully sent by a queue. +* `prometheus_remote_storage_samples_pending` (gauge): The number of samples + pending in shards to be sent to remote storage. +* `prometheus_remote_storage_exemplars_pending` (gauge): The number of + exemplars pending in shards to be sent to remote storage. +* `prometheus_remote_storage_samples_in_total` (counter): Samples read into + remote storage. +* `prometheus_remote_storage_exemplars_in_total` (counter): Exemplars read into + remote storage. + +Metrics that are new to `prometheus.write.queue`. These are highly subject to change. + +* `alloy_queue_series_serializer_incoming_signals` (counter): Total number of series written to serialization. +* `alloy_queue_metadata_serializer_incoming_signals` (counter): Total number of metadata written to serialization. +* `alloy_queue_series_serializer_incoming_timestamp_seconds` (gauge): Highest timestamp of incoming series. +* `alloy_queue_series_serializer_errors` (gauge): Number of errors for series written to serializer. +* `alloy_queue_metadata_serializer_errors` (gauge): Number of errors for metadata written to serializer. +* `alloy_queue_series_network_timestamp_seconds` (gauge): Highest timestamp written to an endpoint. +* `alloy_queue_series_network_sent` (counter): Number of series sent successfully. +* `alloy_queue_metadata_network_sent` (counter): Number of metadata sent successfully. +* `alloy_queue_network_series_failed` (counter): Number of series failed. +* `alloy_queue_network_metadata_failed` (counter): Number of metadata failed. +* `alloy_queue_network_series_retried` (counter): Number of series retried due to network issues. +* `alloy_queue_network_metadata_retried` (counter): Number of metadata retried due to network issues. +* `alloy_queue_network_series_retried_429` (counter): Number of series retried due to status code 429. +* `alloy_queue_network_metadata_retried_429` (counter): Number of metadata retried due to status code 429. +* `alloy_queue_network_series_retried_5xx` (counter): Number of series retried due to status code 5xx. +* `alloy_queue_network_metadata_retried_5xx` (counter): Number of metadata retried due to status code 5xx. +* `alloy_queue_network_series_network_duration_seconds` (histogram): Duration writing series to endpoint. +* `alloy_queue_network_metadata_network_duration_seconds` (histogram): Duration writing metadata to endpoint. +* `alloy_queue_network_series_network_errors` (counter): Number of errors writing series to network. +* `alloy_queue_network_metadata_network_errors` (counter): Number of errors writing metadata to network. + +## Examples + +The following examples show you how to create `prometheus.write.queue` components that send metrics to different destinations. + +### Send metrics to a local Mimir instance + +You can create a `prometheus.write.queue` component that sends your metrics to a local Mimir instance: + +```alloy +prometheus.write.queue "staging" { + // Send metrics to a locally running Mimir. + endpoint "mimir" { + url = "http://mimir:9009/api/v1/push" + + basic_auth { + username = "example-user" + password = "example-password" + } + } +} + +// Configure a prometheus.scrape component to send metrics to +// prometheus.write.queue component. +prometheus.scrape "demo" { + targets = [ + // Collect metrics from the default HTTP listen address. + {"__address__" = "127.0.0.1:12345"}, + ] + forward_to = [prometheus.write.queue.staging.receiver] +} + +``` + +## Technical details + +`prometheus.write.queue` uses [snappy][] for compression. +`prometheus.write.queue` sends native histograms by default. +Any labels that start with `__` will be removed before sending to the endpoint. + +### Data retention + +Data is written to disk in blocks utilizing [snappy][] compression. These blocks are read on startup and resent if they are still within the TTL. +Any data that has not been written to disk, or that is in the network queues is lost if {{< param "PRODUCT_NAME" >}} is restarted. + +### Retries + +`prometheus.write.queue` will retry sending data if the following errors or HTTP status codes are returned: + + * Network errors. + * HTTP 429 errors. + * HTTP 5XX errors. + +`prometheus.write.queue` will not retry sending data if any other unsuccessful status codes are returned. + +### Memory + +`prometheus.write.queue` is meant to be memory efficient. +You can adjust the `max_signals_to_batch`, `parallelism`, and `batch_size` to control how much memory is used. +A higher `max_signals_to_batch` allows for more efficient disk compression. +A higher `parallelism` allows more parallel writes, and `batch_size` allows more data sent at one time. +This can allow greater throughput at the cost of more memory on both {{< param "PRODUCT_NAME" >}} and the endpoint. +The defaults are suitable for most common usages. + + + +## Compatible components + +`prometheus.write.queue` has exports that can be consumed by the following components: + +- Components that consume [Prometheus `MetricsReceiver`](../../../compatibility/#prometheus-metricsreceiver-consumers) + +{{< admonition type="note" >}} +Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. +Refer to the linked documentation for more details. +{{< /admonition >}} + + + +[snappy]: https://en.wikipedia.org/wiki/Snappy_(compression) +[Stop]: ../../../../set-up/run/ +[run]: ../../../cli/run/ diff --git a/internal/component/all/all.go b/internal/component/all/all.go index bfdde5c5b3..a51ca2a3e4 100644 --- a/internal/component/all/all.go +++ b/internal/component/all/all.go @@ -81,10 +81,10 @@ import ( _ "github.com/grafana/alloy/internal/component/otelcol/processor/attributes" // Import otelcol.processor.attributes _ "github.com/grafana/alloy/internal/component/otelcol/processor/batch" // Import otelcol.processor.batch _ "github.com/grafana/alloy/internal/component/otelcol/processor/deltatocumulative" // Import otelcol.processor.deltatocumulative - _ "github.com/grafana/alloy/internal/component/otelcol/processor/interval" // Import otelcol.processor.interval _ "github.com/grafana/alloy/internal/component/otelcol/processor/discovery" // Import otelcol.processor.discovery _ "github.com/grafana/alloy/internal/component/otelcol/processor/filter" // Import otelcol.processor.filter _ "github.com/grafana/alloy/internal/component/otelcol/processor/groupbyattrs" // Import otelcol.processor.groupbyattrs + _ "github.com/grafana/alloy/internal/component/otelcol/processor/interval" // Import otelcol.processor.interval _ "github.com/grafana/alloy/internal/component/otelcol/processor/k8sattributes" // Import otelcol.processor.k8sattributes _ "github.com/grafana/alloy/internal/component/otelcol/processor/memorylimiter" // Import otelcol.processor.memory_limiter _ "github.com/grafana/alloy/internal/component/otelcol/processor/probabilistic_sampler" // Import otelcol.processor.probabilistic_sampler @@ -134,6 +134,7 @@ import ( _ "github.com/grafana/alloy/internal/component/prometheus/operator/servicemonitors" // Import prometheus.operator.servicemonitors _ "github.com/grafana/alloy/internal/component/prometheus/receive_http" // Import prometheus.receive_http _ "github.com/grafana/alloy/internal/component/prometheus/relabel" // Import prometheus.relabel + _ "github.com/grafana/alloy/internal/component/prometheus/remote/queue" // Import prometheus.write.queue _ "github.com/grafana/alloy/internal/component/prometheus/remotewrite" // Import prometheus.remote_write _ "github.com/grafana/alloy/internal/component/prometheus/scrape" // Import prometheus.scrape _ "github.com/grafana/alloy/internal/component/pyroscope/ebpf" // Import pyroscope.ebpf diff --git a/internal/component/prometheus/remote/queue/README.md b/internal/component/prometheus/remote/queue/README.md index ab5e2f7cbd..1c40f50ca5 100644 --- a/internal/component/prometheus/remote/queue/README.md +++ b/internal/component/prometheus/remote/queue/README.md @@ -4,9 +4,9 @@ ## Overview -The `prometheus.remote.queue` goals are to set reliable and repeatable memory and cpu usage based on the number of incoming and outgoing series. There are four broad parts to the system. +The `prometheus.write.queue` goals are to set reliable and repeatable memory and cpu usage based on the number of incoming and outgoing series. There are four broad parts to the system. -1. The `prometheus.remote.queue` component itself. This handles the lifecycle of the Alloy system. +1. The `prometheus.write.queue` component itself. This handles the lifecycle of the Alloy system. 2. The `serialization` converts an array of series into a serializable format. This is handled via [msgp]() library. 3. The `filequeue` is where the buffers are written to. This has a series of files that are committed to disk and then are read. 4. The `network` handles sending data. The data is sharded by the label hash across any number of loops that send data. diff --git a/internal/component/prometheus/remote/queue/component.go b/internal/component/prometheus/remote/queue/component.go new file mode 100644 index 0000000000..10a13b163c --- /dev/null +++ b/internal/component/prometheus/remote/queue/component.go @@ -0,0 +1,164 @@ +package queue + +import ( + "context" + "path/filepath" + "reflect" + "sync" + + "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/prometheus/remote/queue/filequeue" + "github.com/grafana/alloy/internal/component/prometheus/remote/queue/network" + "github.com/grafana/alloy/internal/component/prometheus/remote/queue/serialization" + "github.com/grafana/alloy/internal/component/prometheus/remote/queue/types" + "github.com/grafana/alloy/internal/featuregate" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/storage" +) + +func init() { + component.Register(component.Registration{ + Name: "prometheus.write.queue", + Args: Arguments{}, + Exports: Exports{}, + Stability: featuregate.StabilityExperimental, + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return NewComponent(opts, args.(Arguments)) + }, + }) +} + +func NewComponent(opts component.Options, args Arguments) (*Queue, error) { + s := &Queue{ + opts: opts, + args: args, + log: opts.Logger, + endpoints: map[string]*endpoint{}, + } + + err := s.createEndpoints() + if err != nil { + return nil, err + } + // This needs to be started before we export the onstatechange so that it can accept + // signals. + for _, ep := range s.endpoints { + ep.Start() + } + s.opts.OnStateChange(Exports{Receiver: s}) + + return s, nil +} + +// Queue is a queue based WAL used to send data to a remote_write endpoint. Queue supports replaying +// and TTLs. +type Queue struct { + mut sync.RWMutex + args Arguments + opts component.Options + log log.Logger + endpoints map[string]*endpoint +} + +// Run starts the component, blocking until ctx is canceled or the component +// suffers a fatal error. Run is guaranteed to be called exactly once per +// Component. +func (s *Queue) Run(ctx context.Context) error { + defer func() { + s.mut.Lock() + defer s.mut.Unlock() + + for _, ep := range s.endpoints { + ep.Stop() + } + }() + + <-ctx.Done() + return nil +} + +// Update provides a new Config to the component. The type of newConfig will +// always match the struct type which the component registers. +// +// Update will be called concurrently with Run. The component must be able to +// gracefully handle updating its config while still running. +// +// An error may be returned if the provided config is invalid. +func (s *Queue) Update(args component.Arguments) error { + s.mut.Lock() + defer s.mut.Unlock() + + newArgs := args.(Arguments) + sync.OnceFunc(func() { + s.opts.OnStateChange(Exports{Receiver: s}) + }) + // If they are the same do nothing. + if reflect.DeepEqual(newArgs, s.args) { + return nil + } + s.args = newArgs + // TODO @mattdurham need to cycle through the endpoints figuring out what changed instead of this global stop and start. + // This will cause data in the endpoints and their children to be lost. + if len(s.endpoints) > 0 { + for _, ep := range s.endpoints { + ep.Stop() + } + s.endpoints = map[string]*endpoint{} + } + err := s.createEndpoints() + if err != nil { + return err + } + for _, ep := range s.endpoints { + ep.Start() + } + return nil +} + +func (s *Queue) createEndpoints() error { + // @mattdurham not in love with this code. + for _, ep := range s.args.Endpoints { + reg := prometheus.WrapRegistererWith(prometheus.Labels{"endpoint": ep.Name}, s.opts.Registerer) + stats := types.NewStats("alloy", "queue_series", reg) + stats.SeriesBackwardsCompatibility(reg) + meta := types.NewStats("alloy", "queue_metadata", reg) + meta.MetaBackwardsCompatibility(reg) + cfg := ep.ToNativeType() + client, err := network.New(cfg, s.log, stats.UpdateNetwork, meta.UpdateNetwork) + if err != nil { + return err + } + end := NewEndpoint(client, nil, s.args.TTL, s.opts.Logger) + fq, err := filequeue.NewQueue(filepath.Join(s.opts.DataPath, ep.Name, "wal"), func(ctx context.Context, dh types.DataHandle) { + _ = end.incoming.Send(ctx, dh) + }, s.opts.Logger) + if err != nil { + return err + } + serial, err := serialization.NewSerializer(types.SerializerConfig{ + MaxSignalsInBatch: uint32(s.args.Serialization.MaxSignalsToBatch), + FlushFrequency: s.args.Serialization.BatchInterval, + }, fq, stats.UpdateSerializer, s.opts.Logger) + if err != nil { + return err + } + end.serializer = serial + s.endpoints[ep.Name] = end + } + return nil +} + +// Appender returns a new appender for the storage. The implementation +// can choose whether or not to use the context, for deadlines or to check +// for errors. +func (c *Queue) Appender(ctx context.Context) storage.Appender { + c.mut.RLock() + defer c.mut.RUnlock() + + children := make([]storage.Appender, 0) + for _, ep := range c.endpoints { + children = append(children, serialization.NewAppender(ctx, c.args.TTL, ep.serializer, c.opts.Logger)) + } + return &fanout{children: children} +} diff --git a/internal/component/prometheus/remote/queue/e2e_bench_test.go b/internal/component/prometheus/remote/queue/e2e_bench_test.go new file mode 100644 index 0000000000..e405a2aa50 --- /dev/null +++ b/internal/component/prometheus/remote/queue/e2e_bench_test.go @@ -0,0 +1,126 @@ +package queue + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/alloy/internal/component" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/require" +) + +func BenchmarkE2E(b *testing.B) { + // Around 120k ops if you look at profile roughly 20k are actual implementation with the rest being benchmark + // setup. + type e2eTest struct { + name string + maker func(index int, app storage.Appender) + tester func(samples []prompb.TimeSeries) + } + tests := []e2eTest{ + { + // This should be ~1200 allocs an op + name: "normal", + maker: func(index int, app storage.Appender) { + ts, v, lbls := makeSeries(index) + _, _ = app.Append(0, lbls, ts, v) + }, + tester: func(samples []prompb.TimeSeries) { + b.Helper() + for _, s := range samples { + require.True(b, len(s.Samples) == 1) + } + }, + }, + } + for _, test := range tests { + b.Run(test.name, func(t *testing.B) { + runBenchmark(t, test.maker, test.tester) + }) + } +} + +func runBenchmark(t *testing.B, add func(index int, appendable storage.Appender), _ func(samples []prompb.TimeSeries)) { + t.ReportAllocs() + l := log.NewNopLogger() + done := make(chan struct{}) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + })) + expCh := make(chan Exports, 1) + c, err := newComponentBenchmark(t, l, srv.URL, expCh) + require.NoError(t, err) + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + go func() { + runErr := c.Run(ctx) + require.NoError(t, runErr) + }() + // Wait for export to spin up. + exp := <-expCh + + index := 0 + app := exp.Receiver.Appender(ctx) + + for i := 0; i < t.N; i++ { + index++ + add(index, app) + } + require.NoError(t, app.Commit()) + + tm := time.NewTimer(10 * time.Second) + select { + case <-done: + case <-tm.C: + } + cancel() +} + +func newComponentBenchmark(t *testing.B, l log.Logger, url string, exp chan Exports) (*Queue, error) { + return NewComponent(component.Options{ + ID: "test", + Logger: l, + DataPath: t.TempDir(), + OnStateChange: func(e component.Exports) { + exp <- e.(Exports) + }, + Registerer: fakeRegistry{}, + Tracer: nil, + }, Arguments{ + TTL: 2 * time.Hour, + Serialization: Serialization{ + MaxSignalsToBatch: 100_000, + BatchInterval: 1 * time.Second, + }, + Endpoints: []EndpointConfig{{ + Name: "test", + URL: url, + Timeout: 10 * time.Second, + RetryBackoff: 1 * time.Second, + MaxRetryAttempts: 0, + BatchCount: 50, + FlushInterval: 1 * time.Second, + Parallelism: 1, + }}, + }) +} + +var _ prometheus.Registerer = (*fakeRegistry)(nil) + +type fakeRegistry struct{} + +func (f fakeRegistry) Register(collector prometheus.Collector) error { + return nil +} + +func (f fakeRegistry) MustRegister(collector ...prometheus.Collector) { +} + +func (f fakeRegistry) Unregister(collector prometheus.Collector) bool { + return true +} diff --git a/internal/component/prometheus/remote/queue/e2e_stats_test.go b/internal/component/prometheus/remote/queue/e2e_stats_test.go new file mode 100644 index 0000000000..81cd26c75d --- /dev/null +++ b/internal/component/prometheus/remote/queue/e2e_stats_test.go @@ -0,0 +1,689 @@ +package queue + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/grafana/alloy/internal/util" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" +) + +const remoteSamples = "prometheus_remote_storage_samples_total" +const remoteHistograms = "prometheus_remote_storage_histograms_total" +const remoteMetadata = "prometheus_remote_storage_metadata_total" + +const sentBytes = "prometheus_remote_storage_sent_bytes_total" +const sentMetadataBytes = "prometheus_remote_storage_metadata_bytes_total" + +const outTimestamp = "prometheus_remote_storage_queue_highest_sent_timestamp_seconds" +const inTimestamp = "prometheus_remote_storage_highest_timestamp_in_seconds" + +const failedSample = "prometheus_remote_storage_samples_failed_total" +const failedHistogram = "prometheus_remote_storage_histograms_failed_total" +const failedMetadata = "prometheus_remote_storage_metadata_failed_total" + +const retriedSamples = "prometheus_remote_storage_samples_retried_total" +const retriedHistogram = "prometheus_remote_storage_histograms_retried_total" +const retriedMetadata = "prometheus_remote_storage_metadata_retried_total" + +const prometheusDuration = "prometheus_remote_storage_queue_duration_seconds" + +const serializerIncoming = "alloy_queue_series_serializer_incoming_signals" +const alloySent = "alloy_queue_series_network_sent" +const alloySerializerIncoming = "alloy_queue_series_serializer_incoming_timestamp_seconds" +const alloyNetworkDuration = "alloy_queue_series_network_duration_seconds" +const alloyFailures = "alloy_queue_series_network_failed" +const alloyRetries = "alloy_queue_series_network_retried" +const alloy429 = "alloy_queue_series_network_retried_429" + +const alloyMetadataDuration = "alloy_queue_metadata_network_duration_seconds" +const alloyMetadataSent = "alloy_queue_metadata_network_sent" +const alloyMetadataFailed = "alloy_queue_metadata_network_failed" +const alloyMetadataRetried429 = "alloy_queue_metadata_network_retried_429" +const alloyMetadataRetried = "alloy_queue_metadata_network_retried" + +const alloyNetworkTimestamp = "alloy_queue_series_network_timestamp_seconds" + +// TestMetadata is the large end to end testing for the queue based wal, specifically for metadata. +func TestMetadata(t *testing.T) { + // Check assumes you are checking for any value that is not 0. + // The test at the end will see if there are any values that were not 0. + tests := []statsTest{ + // Metadata Tests + { + name: "metadata success", + returnStatusCode: http.StatusOK, + dtype: Metadata, + checks: []check{ + { + name: serializerIncoming, + value: 10, + }, + { + name: remoteMetadata, + value: 10, + }, + { + name: sentMetadataBytes, + valueFunc: greaterThenZero, + }, + { + name: alloyMetadataDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyMetadataSent, + value: 10, + }, + }, + }, + { + name: "metadata failure", + returnStatusCode: http.StatusBadRequest, + dtype: Metadata, + checks: []check{ + { + name: alloyMetadataFailed, + value: 10, + }, + { + name: serializerIncoming, + value: 10, + }, + { + name: failedMetadata, + value: 10, + }, + { + name: alloyMetadataDuration, + valueFunc: greaterThenZero, + }, + }, + }, + { + name: "metadata retry", + returnStatusCode: http.StatusTooManyRequests, + dtype: Metadata, + checks: []check{ + { + name: serializerIncoming, + value: 10, + }, + { + name: retriedMetadata, + // This will be more than 10 since it retries in a loop. + valueFunc: greaterThenZero, + }, + { + name: alloyMetadataDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyMetadataRetried, + valueFunc: greaterThenZero, + }, + { + name: alloyMetadataRetried429, + valueFunc: greaterThenZero, + }, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + runE2eStats(t, test) + }) + } + +} + +// TestMetrics is the large end to end testing for the queue based wal. +func TestMetrics(t *testing.T) { + // Check assumes you are checking for any value that is not 0. + // The test at the end will see if there are any values that were not 0. + tests := []statsTest{ + // Sample Tests + { + name: "sample success", + returnStatusCode: http.StatusOK, + dtype: Sample, + checks: []check{ + { + name: serializerIncoming, + value: 10, + }, + { + name: remoteSamples, + value: 10, + }, + { + name: alloySent, + value: 10, + }, + { + name: prometheusDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyNetworkDuration, + valueFunc: greaterThenZero, + }, + { + name: alloySerializerIncoming, + valueFunc: isReasonableTimeStamp, + }, + { + name: sentBytes, + valueFunc: greaterThenZero, + }, + { + name: outTimestamp, + valueFunc: isReasonableTimeStamp, + }, + { + name: inTimestamp, + valueFunc: isReasonableTimeStamp, + }, + { + name: alloyNetworkTimestamp, + valueFunc: greaterThenZero, + }, + }, + }, + { + name: "sample failure", + returnStatusCode: http.StatusBadRequest, + dtype: Sample, + checks: []check{ + { + name: alloyFailures, + value: 10, + }, + { + name: serializerIncoming, + value: 10, + }, + { + name: failedSample, + value: 10, + }, + { + name: prometheusDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyNetworkDuration, + valueFunc: greaterThenZero, + }, + { + name: alloySerializerIncoming, + valueFunc: isReasonableTimeStamp, + }, + { + name: inTimestamp, + valueFunc: isReasonableTimeStamp, + }, + }, + }, + { + name: "sample retry", + returnStatusCode: http.StatusTooManyRequests, + dtype: Sample, + checks: []check{ + { + name: serializerIncoming, + value: 10, + }, + { + name: retriedSamples, + // This will be more than 10 since it retries in a loop. + valueFunc: greaterThenZero, + }, + { + name: alloyRetries, + // This will be more than 10 since it retries in a loop. + valueFunc: greaterThenZero, + }, + { + name: alloy429, + // This will be more than 10 since it retries in a loop. + valueFunc: greaterThenZero, + }, + { + name: prometheusDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyNetworkDuration, + valueFunc: greaterThenZero, + }, + { + name: alloySerializerIncoming, + valueFunc: isReasonableTimeStamp, + }, + { + name: inTimestamp, + valueFunc: isReasonableTimeStamp, + }, + }, + }, + // histograms + { + name: "histogram success", + returnStatusCode: http.StatusOK, + dtype: Histogram, + checks: []check{ + { + name: serializerIncoming, + value: 10, + }, + { + name: remoteHistograms, + value: 10, + }, + { + name: alloySent, + value: 10, + }, + { + name: prometheusDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyNetworkDuration, + valueFunc: greaterThenZero, + }, + { + name: alloySerializerIncoming, + valueFunc: isReasonableTimeStamp, + }, + { + name: sentBytes, + valueFunc: greaterThenZero, + }, + { + name: outTimestamp, + valueFunc: isReasonableTimeStamp, + }, + { + name: inTimestamp, + valueFunc: isReasonableTimeStamp, + }, + { + name: alloyNetworkTimestamp, + valueFunc: greaterThenZero, + }, + }, + }, + { + name: "histogram failure", + returnStatusCode: http.StatusBadRequest, + dtype: Histogram, + checks: []check{ + { + name: alloyFailures, + value: 10, + }, + { + name: serializerIncoming, + value: 10, + }, + { + name: failedHistogram, + value: 10, + }, + { + name: prometheusDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyNetworkDuration, + valueFunc: greaterThenZero, + }, + { + name: alloySerializerIncoming, + valueFunc: isReasonableTimeStamp, + }, + { + name: inTimestamp, + valueFunc: isReasonableTimeStamp, + }, + }, + }, + { + name: "histogram retry", + returnStatusCode: http.StatusTooManyRequests, + dtype: Histogram, + checks: []check{ + { + name: serializerIncoming, + value: 10, + }, + { + name: retriedHistogram, + // This will be more than 10 since it retries in a loop. + valueFunc: greaterThenZero, + }, + { + name: alloyRetries, + // This will be more than 10 since it retries in a loop. + valueFunc: greaterThenZero, + }, + { + name: alloy429, + // This will be more than 10 since it retries in a loop. + valueFunc: greaterThenZero, + }, + { + name: prometheusDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyNetworkDuration, + valueFunc: greaterThenZero, + }, + { + name: alloySerializerIncoming, + valueFunc: isReasonableTimeStamp, + }, + { + name: inTimestamp, + valueFunc: isReasonableTimeStamp, + }, + }, + }, + // exemplar, note that once it hits the appender exemplars are treated the same as series. + { + name: "exemplar success", + returnStatusCode: http.StatusOK, + dtype: Exemplar, + checks: []check{ + { + name: serializerIncoming, + value: 10, + }, + { + name: remoteSamples, + value: 10, + }, + { + name: alloySent, + value: 10, + }, + { + name: prometheusDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyNetworkDuration, + valueFunc: greaterThenZero, + }, + { + name: alloySerializerIncoming, + valueFunc: isReasonableTimeStamp, + }, + { + name: sentBytes, + valueFunc: greaterThenZero, + }, + { + name: outTimestamp, + valueFunc: isReasonableTimeStamp, + }, + { + name: inTimestamp, + valueFunc: isReasonableTimeStamp, + }, + { + name: alloyNetworkTimestamp, + valueFunc: greaterThenZero, + }, + }, + }, + { + name: "exemplar failure", + returnStatusCode: http.StatusBadRequest, + dtype: Exemplar, + checks: []check{ + { + name: alloyFailures, + value: 10, + }, + { + name: serializerIncoming, + value: 10, + }, + { + name: failedSample, + value: 10, + }, + { + name: prometheusDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyNetworkDuration, + valueFunc: greaterThenZero, + }, + { + name: alloySerializerIncoming, + valueFunc: isReasonableTimeStamp, + }, + { + name: inTimestamp, + valueFunc: isReasonableTimeStamp, + }, + }, + }, + { + name: "exemplar retry", + returnStatusCode: http.StatusTooManyRequests, + dtype: Exemplar, + checks: []check{ + { + name: serializerIncoming, + value: 10, + }, + { + name: retriedSamples, + // This will be more than 10 since it retries in a loop. + valueFunc: greaterThenZero, + }, + { + name: alloyRetries, + // This will be more than 10 since it retries in a loop. + valueFunc: greaterThenZero, + }, + { + name: alloy429, + // This will be more than 10 since it retries in a loop. + valueFunc: greaterThenZero, + }, + { + name: prometheusDuration, + valueFunc: greaterThenZero, + }, + { + name: alloyNetworkDuration, + valueFunc: greaterThenZero, + }, + { + name: alloySerializerIncoming, + valueFunc: isReasonableTimeStamp, + }, + { + name: inTimestamp, + valueFunc: isReasonableTimeStamp, + }, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + runE2eStats(t, test) + }) + } + +} + +func greaterThenZero(v float64) bool { + return v > 0 +} + +func isReasonableTimeStamp(v float64) bool { + if v < 0 { + return false + } + unixTime := time.Unix(int64(v), 0) + + return time.Since(unixTime) < 10*time.Second +} + +type dataType int + +const ( + Sample dataType = iota + Histogram + Exemplar + Metadata +) + +type check struct { + name string + value float64 + valueFunc func(v float64) bool +} +type statsTest struct { + name string + returnStatusCode int + // Only check for non zero values, once all checks are ran it will automatically ensure all remaining metrics are 0. + checks []check + dtype dataType +} + +func runE2eStats(t *testing.T, test statsTest) { + l := util.TestAlloyLogger(t) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(test.returnStatusCode) + })) + expCh := make(chan Exports, 1) + + reg := prometheus.NewRegistry() + c, err := newComponent(t, l, srv.URL, expCh, reg) + require.NoError(t, err) + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + go func() { + runErr := c.Run(ctx) + require.NoError(t, runErr) + }() + // Wait for export to spin up. + exp := <-expCh + + index := 0 + + go func() { + app := exp.Receiver.Appender(ctx) + for j := 0; j < 10; j++ { + index++ + switch test.dtype { + case Sample: + ts, v, lbls := makeSeries(index) + _, errApp := app.Append(0, lbls, ts, v) + require.NoError(t, errApp) + case Histogram: + ts, lbls, h := makeHistogram(index) + _, errApp := app.AppendHistogram(0, lbls, ts, h, nil) + require.NoError(t, errApp) + case Exemplar: + ex := makeExemplar(index) + _, errApp := app.AppendExemplar(0, nil, ex) + require.NoError(t, errApp) + case Metadata: + md, lbls := makeMetadata(index) + _, errApp := app.UpdateMetadata(0, lbls, md) + require.NoError(t, errApp) + default: + require.True(t, false) + } + } + require.NoError(t, app.Commit()) + }() + tm := time.NewTimer(8 * time.Second) + <-tm.C + cancel() + + require.Eventually(t, func() bool { + dtos, gatherErr := reg.Gather() + require.NoError(t, gatherErr) + // Check if we have some valid metrics. + for _, d := range dtos { + if getValue(d) > 0 { + return true + } + } + return false + }, 10*time.Second, 1*time.Second) + metrics := make(map[string]float64) + dtos, err := reg.Gather() + require.NoError(t, err) + // Get the value of metrics. + for _, d := range dtos { + metrics[*d.Name] = getValue(d) + } + + // Check for the metrics that matter. + for _, valChk := range test.checks { + // These check functions will return the list of metrics with the one checked for deleted. + // Ideally at the end we should only be left with metrics with a value of zero.s + if valChk.valueFunc != nil { + metrics = checkValueCondition(t, valChk.name, valChk.valueFunc, metrics) + } else { + metrics = checkValue(t, valChk.name, valChk.value, metrics) + } + } + // all other metrics should be zero. + for k, v := range metrics { + require.Zerof(t, v, "%s should be zero", k) + } +} + +func getValue(d *dto.MetricFamily) float64 { + switch *d.Type { + case dto.MetricType_COUNTER: + return d.Metric[0].Counter.GetValue() + case dto.MetricType_GAUGE: + return d.Metric[0].Gauge.GetValue() + case dto.MetricType_SUMMARY: + return d.Metric[0].Summary.GetSampleSum() + case dto.MetricType_UNTYPED: + return d.Metric[0].Untyped.GetValue() + case dto.MetricType_HISTOGRAM: + return d.Metric[0].Histogram.GetSampleSum() + case dto.MetricType_GAUGE_HISTOGRAM: + return d.Metric[0].Histogram.GetSampleSum() + default: + panic("unknown type " + d.Type.String()) + } +} + +func checkValue(t *testing.T, name string, value float64, metrics map[string]float64) map[string]float64 { + v, ok := metrics[name] + require.Truef(t, ok, "invalid metric name %s", name) + require.Equalf(t, value, v, "%s should be %f", name, value) + delete(metrics, name) + return metrics +} + +func checkValueCondition(t *testing.T, name string, chk func(float64) bool, metrics map[string]float64) map[string]float64 { + v, ok := metrics[name] + require.Truef(t, ok, "invalid metric name %s", name) + require.Truef(t, chk(v), "false test for metric name %s", name) + delete(metrics, name) + return metrics +} diff --git a/internal/component/prometheus/remote/queue/e2e_test.go b/internal/component/prometheus/remote/queue/e2e_test.go new file mode 100644 index 0000000000..aca9ca29ba --- /dev/null +++ b/internal/component/prometheus/remote/queue/e2e_test.go @@ -0,0 +1,424 @@ +package queue + +import ( + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "reflect" + "strings" + "sync" + "testing" + "time" + + "github.com/golang/snappy" + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/prometheus/remote/queue/types" + "github.com/grafana/alloy/internal/runtime/logging" + "github.com/grafana/alloy/internal/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" +) + +func TestE2E(t *testing.T) { + type e2eTest struct { + name string + maker func(index int, app storage.Appender) (float64, labels.Labels) + tester func(samples *safeSlice[prompb.TimeSeries]) + testMeta func(samples *safeSlice[prompb.MetricMetadata]) + } + tests := []e2eTest{ + { + name: "normal", + maker: func(index int, app storage.Appender) (float64, labels.Labels) { + ts, v, lbls := makeSeries(index) + _, errApp := app.Append(0, lbls, ts, v) + require.NoError(t, errApp) + return v, lbls + }, + tester: func(samples *safeSlice[prompb.TimeSeries]) { + t.Helper() + for i := 0; i < samples.Len(); i++ { + s := samples.Get(i) + require.True(t, len(s.Samples) == 1) + require.True(t, s.Samples[0].Timestamp > 0) + require.True(t, s.Samples[0].Value > 0) + require.True(t, len(s.Labels) == 1) + require.Truef(t, s.Labels[0].Name == fmt.Sprintf("name_%d", int(s.Samples[0].Value)), "%d name %s", int(s.Samples[0].Value), s.Labels[0].Name) + require.True(t, s.Labels[0].Value == fmt.Sprintf("value_%d", int(s.Samples[0].Value))) + } + }, + }, + { + name: "metadata", + maker: func(index int, app storage.Appender) (float64, labels.Labels) { + meta, lbls := makeMetadata(index) + _, errApp := app.UpdateMetadata(0, lbls, meta) + require.NoError(t, errApp) + return 0, lbls + }, + testMeta: func(samples *safeSlice[prompb.MetricMetadata]) { + for i := 0; i < samples.Len(); i++ { + s := samples.Get(i) + require.True(t, s.GetUnit() == "seconds") + require.True(t, s.Help == "metadata help") + require.True(t, s.Unit == "seconds") + require.True(t, s.Type == prompb.MetricMetadata_COUNTER) + require.True(t, strings.HasPrefix(s.MetricFamilyName, "name_")) + } + }, + }, + + { + name: "histogram", + maker: func(index int, app storage.Appender) (float64, labels.Labels) { + ts, lbls, h := makeHistogram(index) + _, errApp := app.AppendHistogram(0, lbls, ts, h, nil) + require.NoError(t, errApp) + return h.Sum, lbls + }, + tester: func(samples *safeSlice[prompb.TimeSeries]) { + t.Helper() + for i := 0; i < samples.Len(); i++ { + s := samples.Get(i) + require.True(t, len(s.Samples) == 1) + require.True(t, s.Samples[0].Timestamp > 0) + require.True(t, s.Samples[0].Value == 0) + require.True(t, len(s.Labels) == 1) + histSame(t, hist(int(s.Histograms[0].Sum)), s.Histograms[0]) + } + }, + }, + { + name: "float histogram", + maker: func(index int, app storage.Appender) (float64, labels.Labels) { + ts, lbls, h := makeFloatHistogram(index) + _, errApp := app.AppendHistogram(0, lbls, ts, nil, h) + require.NoError(t, errApp) + return h.Sum, lbls + }, + tester: func(samples *safeSlice[prompb.TimeSeries]) { + t.Helper() + for i := 0; i < samples.Len(); i++ { + s := samples.Get(i) + require.True(t, len(s.Samples) == 1) + require.True(t, s.Samples[0].Timestamp > 0) + require.True(t, s.Samples[0].Value == 0) + require.True(t, len(s.Labels) == 1) + histFloatSame(t, histFloat(int(s.Histograms[0].Sum)), s.Histograms[0]) + } + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + runTest(t, test.maker, test.tester, test.testMeta) + }) + } +} + +const ( + iterations = 10 + items = 10_000 +) + +func runTest(t *testing.T, add func(index int, appendable storage.Appender) (float64, labels.Labels), test func(samples *safeSlice[prompb.TimeSeries]), metaTest func(meta *safeSlice[prompb.MetricMetadata])) { + l := util.TestAlloyLogger(t) + done := make(chan struct{}) + var series atomic.Int32 + var meta atomic.Int32 + samples := newSafeSlice[prompb.TimeSeries]() + metaSamples := newSafeSlice[prompb.MetricMetadata]() + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + newSamples, newMetadata := handlePost(t, w, r) + series.Add(int32(len(newSamples))) + meta.Add(int32(len(newMetadata))) + samples.AddSlice(newSamples) + metaSamples.AddSlice(newMetadata) + if series.Load() == iterations*items { + done <- struct{}{} + } + if meta.Load() == iterations*items { + done <- struct{}{} + } + })) + expCh := make(chan Exports, 1) + c, err := newComponent(t, l, srv.URL, expCh, prometheus.NewRegistry()) + require.NoError(t, err) + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + go func() { + runErr := c.Run(ctx) + require.NoError(t, runErr) + }() + // Wait for export to spin up. + exp := <-expCh + + index := atomic.NewInt64(0) + results := &safeMap{ + results: make(map[float64]labels.Labels), + } + + for i := 0; i < iterations; i++ { + go func() { + app := exp.Receiver.Appender(ctx) + for j := 0; j < items; j++ { + val := index.Add(1) + v, lbl := add(int(val), app) + results.Add(v, lbl) + } + require.NoError(t, app.Commit()) + }() + } + // This is a weird use case to handle eventually. + // With race turned on this can take a long time. + tm := time.NewTimer(20 * time.Second) + select { + case <-done: + case <-tm.C: + require.Truef(t, false, "failed to collect signals in the appropriate time") + } + cancel() + + for i := 0; i < samples.Len(); i++ { + s := samples.Get(i) + if len(s.Histograms) == 1 { + lbls, ok := results.Get(s.Histograms[0].Sum) + require.True(t, ok) + for i, sLbl := range s.Labels { + require.True(t, lbls[i].Name == sLbl.Name) + require.True(t, lbls[i].Value == sLbl.Value) + } + } else { + lbls, ok := results.Get(s.Samples[0].Value) + require.True(t, ok) + for i, sLbl := range s.Labels { + require.True(t, lbls[i].Name == sLbl.Name) + require.True(t, lbls[i].Value == sLbl.Value) + } + } + } + if test != nil { + test(samples) + } else { + metaTest(metaSamples) + } + require.Eventuallyf(t, func() bool { + return types.OutStandingTimeSeriesBinary.Load() == 0 + }, 2*time.Second, 100*time.Millisecond, "there are %d time series not collected", types.OutStandingTimeSeriesBinary.Load()) +} + +func handlePost(t *testing.T, _ http.ResponseWriter, r *http.Request) ([]prompb.TimeSeries, []prompb.MetricMetadata) { + defer r.Body.Close() + data, err := io.ReadAll(r.Body) + require.NoError(t, err) + + data, err = snappy.Decode(nil, data) + require.NoError(t, err) + + var req prompb.WriteRequest + err = req.Unmarshal(data) + require.NoError(t, err) + return req.GetTimeseries(), req.Metadata +} + +func makeSeries(index int) (int64, float64, labels.Labels) { + return time.Now().UTC().Unix(), float64(index), labels.FromStrings(fmt.Sprintf("name_%d", index), fmt.Sprintf("value_%d", index)) +} + +func makeMetadata(index int) (metadata.Metadata, labels.Labels) { + return metadata.Metadata{ + Type: "counter", + Unit: "seconds", + Help: "metadata help", + }, labels.FromStrings("__name__", fmt.Sprintf("name_%d", index)) +} + +func makeHistogram(index int) (int64, labels.Labels, *histogram.Histogram) { + return time.Now().UTC().Unix(), labels.FromStrings(fmt.Sprintf("name_%d", index), fmt.Sprintf("value_%d", index)), hist(index) +} + +func makeExemplar(index int) exemplar.Exemplar { + return exemplar.Exemplar{ + Labels: labels.FromStrings(fmt.Sprintf("name_%d", index), fmt.Sprintf("value_%d", index)), + Ts: time.Now().Unix(), + HasTs: true, + Value: float64(index), + } +} + +func hist(i int) *histogram.Histogram { + return &histogram.Histogram{ + CounterResetHint: 1, + Schema: 2, + ZeroThreshold: 3, + ZeroCount: 4, + Count: 5, + Sum: float64(i), + PositiveSpans: []histogram.Span{ + { + Offset: 1, + Length: 2, + }, + }, + NegativeSpans: []histogram.Span{ + { + Offset: 3, + Length: 4, + }, + }, + PositiveBuckets: []int64{1, 2, 3}, + NegativeBuckets: []int64{1, 2, 3}, + } +} + +func histSame(t *testing.T, h *histogram.Histogram, pb prompb.Histogram) { + require.True(t, h.Sum == pb.Sum) + require.True(t, h.ZeroCount == pb.ZeroCount.(*prompb.Histogram_ZeroCountInt).ZeroCountInt) + require.True(t, h.Schema == pb.Schema) + require.True(t, h.Count == pb.Count.(*prompb.Histogram_CountInt).CountInt) + require.True(t, h.ZeroThreshold == pb.ZeroThreshold) + require.True(t, int32(h.CounterResetHint) == int32(pb.ResetHint)) + require.True(t, reflect.DeepEqual(h.PositiveBuckets, pb.PositiveDeltas)) + require.True(t, reflect.DeepEqual(h.NegativeBuckets, pb.NegativeDeltas)) + histSpanSame(t, h.PositiveSpans, pb.PositiveSpans) + histSpanSame(t, h.NegativeSpans, pb.NegativeSpans) +} + +func histSpanSame(t *testing.T, h []histogram.Span, pb []prompb.BucketSpan) { + require.True(t, len(h) == len(pb)) + for i := range h { + require.True(t, h[i].Length == pb[i].Length) + require.True(t, h[i].Offset == pb[i].Offset) + } +} + +func makeFloatHistogram(index int) (int64, labels.Labels, *histogram.FloatHistogram) { + return time.Now().UTC().Unix(), labels.FromStrings(fmt.Sprintf("name_%d", index), fmt.Sprintf("value_%d", index)), histFloat(index) +} + +func histFloat(i int) *histogram.FloatHistogram { + return &histogram.FloatHistogram{ + CounterResetHint: 1, + Schema: 2, + ZeroThreshold: 3, + ZeroCount: 4, + Count: 5, + Sum: float64(i), + PositiveSpans: []histogram.Span{ + { + Offset: 1, + Length: 2, + }, + }, + NegativeSpans: []histogram.Span{ + { + Offset: 3, + Length: 4, + }, + }, + PositiveBuckets: []float64{1.1, 2.2, 3.3}, + NegativeBuckets: []float64{1.2, 2.3, 3.4}, + } +} + +func histFloatSame(t *testing.T, h *histogram.FloatHistogram, pb prompb.Histogram) { + require.True(t, h.Sum == pb.Sum) + require.True(t, h.ZeroCount == pb.ZeroCount.(*prompb.Histogram_ZeroCountFloat).ZeroCountFloat) + require.True(t, h.Schema == pb.Schema) + require.True(t, h.Count == pb.Count.(*prompb.Histogram_CountFloat).CountFloat) + require.True(t, h.ZeroThreshold == pb.ZeroThreshold) + require.True(t, int32(h.CounterResetHint) == int32(pb.ResetHint)) + require.True(t, reflect.DeepEqual(h.PositiveBuckets, pb.PositiveCounts)) + require.True(t, reflect.DeepEqual(h.NegativeBuckets, pb.NegativeCounts)) + histSpanSame(t, h.PositiveSpans, pb.PositiveSpans) + histSpanSame(t, h.NegativeSpans, pb.NegativeSpans) +} + +func newComponent(t *testing.T, l *logging.Logger, url string, exp chan Exports, reg prometheus.Registerer) (*Queue, error) { + return NewComponent(component.Options{ + ID: "test", + Logger: l, + DataPath: t.TempDir(), + OnStateChange: func(e component.Exports) { + exp <- e.(Exports) + }, + Registerer: reg, + Tracer: nil, + }, Arguments{ + TTL: 2 * time.Hour, + Serialization: Serialization{ + MaxSignalsToBatch: 10_000, + BatchInterval: 1 * time.Second, + }, + Endpoints: []EndpointConfig{{ + Name: "test", + URL: url, + Timeout: 20 * time.Second, + RetryBackoff: 5 * time.Second, + MaxRetryAttempts: 1, + BatchCount: 50, + FlushInterval: 1 * time.Second, + Parallelism: 1, + }}, + }) +} + +func newSafeSlice[T any]() *safeSlice[T] { + return &safeSlice[T]{slice: make([]T, 0)} +} + +type safeSlice[T any] struct { + slice []T + mut sync.Mutex +} + +func (s *safeSlice[T]) Add(v T) { + s.mut.Lock() + defer s.mut.Unlock() + s.slice = append(s.slice, v) +} + +func (s *safeSlice[T]) AddSlice(v []T) { + s.mut.Lock() + defer s.mut.Unlock() + s.slice = append(s.slice, v...) +} + +func (s *safeSlice[T]) Len() int { + s.mut.Lock() + defer s.mut.Unlock() + return len(s.slice) +} + +func (s *safeSlice[T]) Get(i int) T { + s.mut.Lock() + defer s.mut.Unlock() + return s.slice[i] +} + +type safeMap struct { + mut sync.Mutex + results map[float64]labels.Labels +} + +func (s *safeMap) Add(v float64, ls labels.Labels) { + s.mut.Lock() + defer s.mut.Unlock() + s.results[v] = ls +} + +func (s *safeMap) Get(v float64) (labels.Labels, bool) { + s.mut.Lock() + defer s.mut.Unlock() + res, ok := s.results[v] + return res, ok +} diff --git a/internal/component/prometheus/remote/queue/endpoint.go b/internal/component/prometheus/remote/queue/endpoint.go new file mode 100644 index 0000000000..223dbafd3e --- /dev/null +++ b/internal/component/prometheus/remote/queue/endpoint.go @@ -0,0 +1,133 @@ +package queue + +import ( + "context" + "strconv" + "time" + + snappy "github.com/eapache/go-xerial-snappy" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/alloy/internal/component/prometheus/remote/queue/types" + "github.com/vladopajic/go-actor/actor" +) + +var _ actor.Worker = (*endpoint)(nil) + +// endpoint handles communication between the serializer, filequeue and network. +type endpoint struct { + network types.NetworkClient + serializer types.Serializer + log log.Logger + ttl time.Duration + incoming actor.Mailbox[types.DataHandle] + buf []byte + self actor.Actor +} + +func NewEndpoint(client types.NetworkClient, serializer types.Serializer, ttl time.Duration, logger log.Logger) *endpoint { + return &endpoint{ + network: client, + serializer: serializer, + log: logger, + ttl: ttl, + incoming: actor.NewMailbox[types.DataHandle](actor.OptCapacity(1)), + buf: make([]byte, 0, 1024), + } +} + +func (ep *endpoint) Start() { + ep.self = actor.Combine(actor.New(ep), ep.incoming).Build() + ep.self.Start() + ep.serializer.Start() + ep.network.Start() +} + +func (ep *endpoint) Stop() { + // Stop in order of data flow. This prevents errors around stopped mailboxes that can pop up. + ep.serializer.Stop() + ep.network.Stop() + ep.self.Stop() +} + +func (ep *endpoint) DoWork(ctx actor.Context) actor.WorkerStatus { + select { + case <-ctx.Done(): + return actor.WorkerEnd + case file, ok := <-ep.incoming.ReceiveC(): + if !ok { + return actor.WorkerEnd + } + meta, buf, err := file.Pop() + if err != nil { + level.Error(ep.log).Log("msg", "unable to get file contents", "name", file.Name, "err", err) + return actor.WorkerContinue + } + ep.deserializeAndSend(ctx, meta, buf) + return actor.WorkerContinue + } +} + +func (ep *endpoint) deserializeAndSend(ctx context.Context, meta map[string]string, buf []byte) { + var err error + ep.buf, err = snappy.DecodeInto(ep.buf, buf) + if err != nil { + level.Debug(ep.log).Log("msg", "error snappy decoding", "err", err) + return + } + // The version of each file is in the metadata. Right now there is only one version + // supported but in the future the ability to support more. Along with different + // compression. + version, ok := meta["version"] + if !ok { + level.Error(ep.log).Log("msg", "version not found for deserialization") + return + } + if version != types.AlloyFileVersion { + level.Error(ep.log).Log("msg", "invalid version found for deserialization", "version", version) + return + } + // Grab the amounts of each type and we can go ahead and alloc the space. + seriesCount, _ := strconv.Atoi(meta["series_count"]) + metaCount, _ := strconv.Atoi(meta["meta_count"]) + stringsCount, _ := strconv.Atoi(meta["strings_count"]) + sg := &types.SeriesGroup{ + Series: make([]*types.TimeSeriesBinary, seriesCount), + Metadata: make([]*types.TimeSeriesBinary, metaCount), + Strings: make([]string, stringsCount), + } + // Prefill our series with items from the pool to limit allocs. + for i := 0; i < seriesCount; i++ { + sg.Series[i] = types.GetTimeSeriesFromPool() + } + for i := 0; i < metaCount; i++ { + sg.Metadata[i] = types.GetTimeSeriesFromPool() + } + sg, ep.buf, err = types.DeserializeToSeriesGroup(sg, ep.buf) + if err != nil { + level.Debug(ep.log).Log("msg", "error deserializing", "err", err) + return + } + + for _, series := range sg.Series { + // One last chance to check the TTL. Writing to the filequeue will check it but + // in a situation where the network is down and writing backs up we dont want to send + // data that will get rejected. + seriesAge := time.Since(time.Unix(series.TS, 0)) + if seriesAge > ep.ttl { + // TODO @mattdurham add metric here for ttl expired. + continue + } + sendErr := ep.network.SendSeries(ctx, series) + if sendErr != nil { + level.Error(ep.log).Log("msg", "error sending to write client", "err", sendErr) + } + } + + for _, md := range sg.Metadata { + sendErr := ep.network.SendMetadata(ctx, md) + if sendErr != nil { + level.Error(ep.log).Log("msg", "error sending metadata to write client", "err", sendErr) + } + } +} diff --git a/internal/component/prometheus/remote/queue/fanout.go b/internal/component/prometheus/remote/queue/fanout.go new file mode 100644 index 0000000000..09a7fb97ed --- /dev/null +++ b/internal/component/prometheus/remote/queue/fanout.go @@ -0,0 +1,85 @@ +package queue + +import ( + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" + "github.com/prometheus/prometheus/storage" +) + +var _ storage.Appender = (*fanout)(nil) + +type fanout struct { + children []storage.Appender +} + +func (f fanout) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { + for _, child := range f.children { + _, err := child.Append(ref, l, t, v) + if err != nil { + return ref, err + } + } + return ref, nil +} + +func (f fanout) Commit() error { + for _, child := range f.children { + err := child.Commit() + if err != nil { + return err + } + } + return nil +} + +func (f fanout) Rollback() error { + for _, child := range f.children { + err := child.Rollback() + if err != nil { + return err + } + } + return nil +} + +func (f fanout) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { + for _, child := range f.children { + _, err := child.AppendExemplar(ref, l, e) + if err != nil { + return ref, err + } + } + return ref, nil +} + +func (f fanout) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + for _, child := range f.children { + _, err := child.AppendHistogram(ref, l, t, h, fh) + if err != nil { + return ref, err + } + } + return ref, nil +} + +func (f fanout) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) { + for _, child := range f.children { + _, err := child.UpdateMetadata(ref, l, m) + if err != nil { + return ref, err + } + } + return ref, nil +} + +func (f fanout) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64) (storage.SeriesRef, error) { + for _, child := range f.children { + _, err := child.AppendCTZeroSample(ref, l, t, ct) + if err != nil { + return ref, err + } + } + return ref, nil +} diff --git a/internal/component/prometheus/remote/queue/network/loop.go b/internal/component/prometheus/remote/queue/network/loop.go index cbe94f2e41..e09c92a034 100644 --- a/internal/component/prometheus/remote/queue/network/loop.go +++ b/internal/component/prometheus/remote/queue/network/loop.go @@ -92,7 +92,7 @@ func (l *loop) DoWork(ctx actor.Context) actor.WorkerStatus { if len(l.series) == 0 { return actor.WorkerContinue } - if time.Since(l.lastSend) > l.cfg.FlushFrequency { + if time.Since(l.lastSend) > l.cfg.FlushInterval { l.trySend(ctx) } return actor.WorkerContinue @@ -108,7 +108,7 @@ func (l *loop) DoWork(ctx actor.Context) actor.WorkerStatus { } } -// trySend is the core functionality for sending data to a endpoint. It will attempt retries as defined in MaxRetryBackoffAttempts. +// trySend is the core functionality for sending data to a endpoint. It will attempt retries as defined in MaxRetryAttempts. func (l *loop) trySend(ctx context.Context) { attempts := 0 for { @@ -130,7 +130,7 @@ func (l *loop) trySend(ctx context.Context) { return } attempts++ - if attempts > int(l.cfg.MaxRetryBackoffAttempts) && l.cfg.MaxRetryBackoffAttempts > 0 { + if attempts > int(l.cfg.MaxRetryAttempts) && l.cfg.MaxRetryAttempts > 0 { level.Debug(l.log).Log("msg", "max retry attempts reached", "attempts", attempts) l.sendingCleanup() return @@ -195,7 +195,9 @@ func (l *loop) send(ctx context.Context, retryCount int) sendResult { httpReq.Header.Set("Content-Type", "application/x-protobuf") httpReq.Header.Set("User-Agent", l.cfg.UserAgent) httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") - httpReq.SetBasicAuth(l.cfg.Username, l.cfg.Password) + if l.cfg.BasicAuth != nil { + httpReq.SetBasicAuth(l.cfg.BasicAuth.Username, l.cfg.BasicAuth.Password) + } if retryCount > 0 { httpReq.Header.Set("Retry-Attempt", strconv.Itoa(retryCount)) @@ -310,19 +312,16 @@ func createWriteRequest(wr *prompb.WriteRequest, series []*types.TimeSeriesBinar } func createWriteRequestMetadata(l log.Logger, wr *prompb.WriteRequest, series []*types.TimeSeriesBinary, data *proto.Buffer) ([]byte, error) { - if cap(wr.Metadata) < len(series) { - wr.Metadata = make([]prompb.MetricMetadata, len(series)) - } else { - wr.Metadata = wr.Metadata[:len(series)] - } - - for i, ts := range series { + // Metadata is rarely sent so having this being less than optimal is fine. + wr.Metadata = make([]prompb.MetricMetadata, 0) + for _, ts := range series { mt, valid := toMetadata(ts) + // TODO @mattdurham somewhere there is a bug where metadata with no labels are being passed through. if !valid { level.Error(l).Log("msg", "invalid metadata was found", "labels", ts.Labels.String()) continue } - wr.Metadata[i] = mt + wr.Metadata = append(wr.Metadata, mt) } data.Reset() err := data.Marshal(wr) diff --git a/internal/component/prometheus/remote/queue/network/manager.go b/internal/component/prometheus/remote/queue/network/manager.go index 19e4f5b961..277db5335a 100644 --- a/internal/component/prometheus/remote/queue/network/manager.go +++ b/internal/component/prometheus/remote/queue/network/manager.go @@ -2,11 +2,9 @@ package network import ( "context" - - "github.com/grafana/alloy/internal/runtime/logging/level" - "github.com/go-kit/log" "github.com/grafana/alloy/internal/component/prometheus/remote/queue/types" + "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/vladopajic/go-actor/actor" ) @@ -17,13 +15,19 @@ type manager struct { logger log.Logger inbox actor.Mailbox[*types.TimeSeriesBinary] metaInbox actor.Mailbox[*types.TimeSeriesBinary] - configInbox actor.Mailbox[types.ConnectionConfig] + configInbox actor.Mailbox[configCallback] self actor.Actor cfg types.ConnectionConfig stats func(types.NetworkStats) metaStats func(types.NetworkStats) } +// configCallback allows actors to notify via `done` channel when they're done processing the config `cc`. Useful when synchronous processing is required. +type configCallback struct { + cc types.ConnectionConfig + done chan struct{} +} + var _ types.NetworkClient = (*manager)(nil) var _ actor.Worker = (*manager)(nil) @@ -36,13 +40,14 @@ func New(cc types.ConnectionConfig, logger log.Logger, seriesStats, metadataStat // it will stop the filequeue from feeding more. inbox: actor.NewMailbox[*types.TimeSeriesBinary](actor.OptCapacity(1)), metaInbox: actor.NewMailbox[*types.TimeSeriesBinary](actor.OptCapacity(1)), - configInbox: actor.NewMailbox[types.ConnectionConfig](), + configInbox: actor.NewMailbox[configCallback](), stats: seriesStats, + metaStats: metadataStats, cfg: cc, } // start kicks off a number of concurrent connections. - for i := uint64(0); i < s.cfg.Connections; i++ { + for i := uint(0); i < s.cfg.Connections; i++ { l := newLoop(cc, false, logger, seriesStats) l.self = actor.New(l) s.loops = append(s.loops, l) @@ -71,7 +76,17 @@ func (s *manager) SendMetadata(ctx context.Context, data *types.TimeSeriesBinary } func (s *manager) UpdateConfig(ctx context.Context, cc types.ConnectionConfig) error { - return s.configInbox.Send(ctx, cc) + done := make(chan struct{}) + defer close(done) + err := s.configInbox.Send(ctx, configCallback{ + cc: cc, + done: done, + }) + if err != nil { + return err + } + <-done + return nil } func (s *manager) DoWork(ctx actor.Context) actor.WorkerStatus { @@ -79,24 +94,31 @@ func (s *manager) DoWork(ctx actor.Context) actor.WorkerStatus { select { case cfg, ok := <-s.configInbox.ReceiveC(): if !ok { + level.Debug(s.logger).Log("msg", "config inbox closed") return actor.WorkerEnd } - s.updateConfig(cfg) + s.updateConfig(cfg.cc) + // Notify the caller we have applied the config. + cfg.done <- struct{}{} return actor.WorkerContinue default: } + + // main work queue. select { case <-ctx.Done(): s.Stop() return actor.WorkerEnd case ts, ok := <-s.inbox.ReceiveC(): if !ok { + level.Debug(s.logger).Log("msg", "series inbox closed") return actor.WorkerEnd } s.queue(ctx, ts) return actor.WorkerContinue case ts, ok := <-s.metaInbox.ReceiveC(): if !ok { + level.Debug(s.logger).Log("msg", "meta inbox closed") return actor.WorkerEnd } err := s.metadata.seriesMbx.Send(ctx, ts) @@ -104,6 +126,16 @@ func (s *manager) DoWork(ctx actor.Context) actor.WorkerStatus { level.Error(s.logger).Log("msg", "failed to send to metadata loop", "err", err) } return actor.WorkerContinue + // We need to also check the config here, else its possible this will deadlock. + case cfg, ok := <-s.configInbox.ReceiveC(): + if !ok { + level.Debug(s.logger).Log("msg", "config inbox closed") + return actor.WorkerEnd + } + s.updateConfig(cfg.cc) + // Notify the caller we have applied the config. + cfg.done <- struct{}{} + return actor.WorkerContinue } } @@ -120,17 +152,17 @@ func (s *manager) updateConfig(cc types.ConnectionConfig) { level.Debug(s.logger).Log("msg", "dropping all series in loops and creating queue due to config change") s.stopLoops() s.loops = make([]*loop, 0, s.cfg.Connections) - var i uint64 - for ; i < s.cfg.Connections; i++ { + for i := uint(0); i < s.cfg.Connections; i++ { l := newLoop(cc, false, s.logger, s.stats) l.self = actor.New(l) - s.loops = append(s.loops, l) } s.metadata = newLoop(cc, true, s.logger, s.metaStats) s.metadata.self = actor.New(s.metadata) + level.Debug(s.logger).Log("msg", "starting loops") s.startLoops() + level.Debug(s.logger).Log("msg", "loops started") } func (s *manager) Stop() { @@ -158,7 +190,7 @@ func (s *manager) startLoops() { // Queue adds anything thats not metadata to the queue. func (s *manager) queue(ctx context.Context, ts *types.TimeSeriesBinary) { // Based on a hash which is the label hash add to the queue. - queueNum := ts.Hash % s.cfg.Connections + queueNum := ts.Hash % uint64(s.cfg.Connections) // This will block if the queue is full. err := s.loops[queueNum].seriesMbx.Send(ctx, ts) if err != nil { diff --git a/internal/component/prometheus/remote/queue/network/manager_test.go b/internal/component/prometheus/remote/queue/network/manager_test.go index 47001fda9f..46eef2ea71 100644 --- a/internal/component/prometheus/remote/queue/network/manager_test.go +++ b/internal/component/prometheus/remote/queue/network/manager_test.go @@ -2,6 +2,7 @@ package network import ( "context" + "github.com/grafana/alloy/internal/util" "io" "math/rand" "net/http" @@ -33,11 +34,11 @@ func TestSending(t *testing.T) { defer cncl() cc := types.ConnectionConfig{ - URL: svr.URL, - Timeout: 1 * time.Second, - BatchCount: 10, - FlushFrequency: 1 * time.Second, - Connections: 4, + URL: svr.URL, + Timeout: 1 * time.Second, + BatchCount: 10, + FlushInterval: 1 * time.Second, + Connections: 4, } logger := log.NewNopLogger() @@ -64,41 +65,41 @@ func TestUpdatingConfig(t *testing.T) { })) defer svr.Close() - ctx := context.Background() - ctx, cncl := context.WithCancel(ctx) - defer cncl() cc := types.ConnectionConfig{ - URL: svr.URL, - Timeout: 1 * time.Second, - BatchCount: 10, - FlushFrequency: 1 * time.Second, - Connections: 4, + URL: svr.URL, + Timeout: 1 * time.Second, + BatchCount: 10, + FlushInterval: 5 * time.Second, + Connections: 1, } - logger := log.NewNopLogger() + logger := util.TestAlloyLogger(t) + wr, err := New(cc, logger, func(s types.NetworkStats) {}, func(s types.NetworkStats) {}) + require.NoError(t, err) wr.Start() defer wr.Stop() cc2 := types.ConnectionConfig{ - URL: svr.URL, - Timeout: 5 * time.Second, - BatchCount: 100, - FlushFrequency: 1 * time.Second, - Connections: 4, + URL: svr.URL, + Timeout: 1 * time.Second, + BatchCount: 20, + FlushInterval: 5 * time.Second, + Connections: 1, } - - err = wr.UpdateConfig(context.Background(), cc2) + ctx := context.Background() + err = wr.UpdateConfig(ctx, cc2) require.NoError(t, err) - for i := 0; i < 1_000; i++ { + time.Sleep(1 * time.Second) + for i := 0; i < 100; i++ { send(t, wr, ctx) } require.Eventuallyf(t, func() bool { - return recordsFound.Load() == 1_000 - }, 10*time.Second, 1*time.Second, "record count should be 1000 but is %d", recordsFound.Load()) + return recordsFound.Load() == 100 + }, 20*time.Second, 1*time.Second, "record count should be 100 but is %d", recordsFound.Load()) - require.Truef(t, lastBatchSize.Load() == 100, "batch_count should be 100 but is %d", lastBatchSize.Load()) + require.Truef(t, lastBatchSize.Load() == 20, "batch_count should be 20 but is %d", lastBatchSize.Load()) } func TestRetry(t *testing.T) { @@ -121,12 +122,12 @@ func TestRetry(t *testing.T) { defer cncl() cc := types.ConnectionConfig{ - URL: svr.URL, - Timeout: 1 * time.Second, - BatchCount: 1, - FlushFrequency: 1 * time.Second, - RetryBackoff: 100 * time.Millisecond, - Connections: 1, + URL: svr.URL, + Timeout: 1 * time.Second, + BatchCount: 1, + FlushInterval: 1 * time.Second, + RetryBackoff: 100 * time.Millisecond, + Connections: 1, } logger := log.NewNopLogger() @@ -158,13 +159,13 @@ func TestRetryBounded(t *testing.T) { defer cncl() cc := types.ConnectionConfig{ - URL: svr.URL, - Timeout: 1 * time.Second, - BatchCount: 1, - FlushFrequency: 1 * time.Second, - RetryBackoff: 100 * time.Millisecond, - MaxRetryBackoffAttempts: 1, - Connections: 1, + URL: svr.URL, + Timeout: 1 * time.Second, + BatchCount: 1, + FlushInterval: 1 * time.Second, + RetryBackoff: 100 * time.Millisecond, + MaxRetryAttempts: 1, + Connections: 1, } logger := log.NewNopLogger() @@ -196,13 +197,13 @@ func TestRecoverable(t *testing.T) { defer cncl() cc := types.ConnectionConfig{ - URL: svr.URL, - Timeout: 1 * time.Second, - BatchCount: 1, - FlushFrequency: 1 * time.Second, - RetryBackoff: 100 * time.Millisecond, - MaxRetryBackoffAttempts: 1, - Connections: 1, + URL: svr.URL, + Timeout: 1 * time.Second, + BatchCount: 1, + FlushInterval: 1 * time.Second, + RetryBackoff: 100 * time.Millisecond, + MaxRetryAttempts: 1, + Connections: 1, } logger := log.NewNopLogger() @@ -237,13 +238,13 @@ func TestNonRecoverable(t *testing.T) { defer cncl() cc := types.ConnectionConfig{ - URL: svr.URL, - Timeout: 1 * time.Second, - BatchCount: 1, - FlushFrequency: 1 * time.Second, - RetryBackoff: 100 * time.Millisecond, - MaxRetryBackoffAttempts: 1, - Connections: 1, + URL: svr.URL, + Timeout: 1 * time.Second, + BatchCount: 1, + FlushInterval: 1 * time.Second, + RetryBackoff: 100 * time.Millisecond, + MaxRetryAttempts: 1, + Connections: 1, } logger := log.NewNopLogger() diff --git a/internal/component/prometheus/remote/queue/serialization/appender.go b/internal/component/prometheus/remote/queue/serialization/appender.go index e2cdd56272..03f8b27a70 100644 --- a/internal/component/prometheus/remote/queue/serialization/appender.go +++ b/internal/component/prometheus/remote/queue/serialization/appender.go @@ -2,6 +2,7 @@ package serialization import ( "context" + "fmt" "time" "github.com/go-kit/log" @@ -99,10 +100,13 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int // UpdateMetadata updates metadata. func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (_ storage.SeriesRef, _ error) { + if !l.Has("__name__") { + return ref, fmt.Errorf("missing __name__ label for metadata") + } ts := types.GetTimeSeriesFromPool() // We are going to handle converting some strings to hopefully not reused label names. TimeSeriesBinary has a lot of work // to ensure its efficient it makes sense to encode metadata into it. - combinedLabels := l.Copy() + combinedLabels := labels.EmptyLabels() combinedLabels = append(combinedLabels, labels.Label{ Name: types.MetaType, Value: string(m.Type), @@ -115,6 +119,11 @@ func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m meta Name: types.MetaUnit, Value: m.Unit, }) + // We ONLY want __name__ from labels + combinedLabels = append(combinedLabels, labels.Label{ + Name: "__name__", + Value: l.Get("__name__"), + }) ts.Labels = combinedLabels err := a.s.SendMetadata(a.ctx, ts) return ref, err diff --git a/internal/component/prometheus/remote/queue/serialization/serializer.go b/internal/component/prometheus/remote/queue/serialization/serializer.go index 56307163a0..95a89cca59 100644 --- a/internal/component/prometheus/remote/queue/serialization/serializer.go +++ b/internal/component/prometheus/remote/queue/serialization/serializer.go @@ -2,6 +2,7 @@ package serialization import ( "context" + "fmt" "strconv" "time" @@ -10,6 +11,7 @@ import ( "github.com/grafana/alloy/internal/component/prometheus/remote/queue/types" "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/vladopajic/go-actor/actor" + "go.uber.org/atomic" ) // serializer collects data from multiple appenders in-memory and will periodically flush the data to file.Storage. @@ -30,6 +32,7 @@ type serializer struct { meta []*types.TimeSeriesBinary msgpBuffer []byte stats func(stats types.SerializerStats) + stopped *atomic.Bool } func NewSerializer(cfg types.SerializerConfig, q types.FileStorage, stats func(stats types.SerializerStats), l log.Logger) (types.Serializer, error) { @@ -46,6 +49,7 @@ func NewSerializer(cfg types.SerializerConfig, q types.FileStorage, stats func(s msgpBuffer: make([]byte, 0), lastFlush: time.Now(), stats: stats, + stopped: atomic.NewBool(false), } return s, nil @@ -58,19 +62,29 @@ func (s *serializer) Start() { } func (s *serializer) Stop() { + s.stopped.Store(true) s.queue.Stop() s.self.Stop() } func (s *serializer) SendSeries(ctx context.Context, data *types.TimeSeriesBinary) error { + if s.stopped.Load() { + return fmt.Errorf("serializer is stopped") + } return s.inbox.Send(ctx, data) } func (s *serializer) SendMetadata(ctx context.Context, data *types.TimeSeriesBinary) error { + if s.stopped.Load() { + return fmt.Errorf("serializer is stopped") + } return s.metaInbox.Send(ctx, data) } func (s *serializer) UpdateConfig(ctx context.Context, cfg types.SerializerConfig) error { + if s.stopped.Load() { + return fmt.Errorf("serializer is stopped") + } return s.cfgInbox.Send(ctx, cfg) } @@ -150,7 +164,7 @@ func (s *serializer) flushToDisk(ctx actor.Context) error { types.PutTimeSeriesSliceIntoPool(s.series) types.PutTimeSeriesSliceIntoPool(s.meta) s.series = s.series[:0] - s.meta = s.series[:0] + s.meta = s.meta[:0] }() // This maps strings to index position in a slice. This is doing to reduce the file size of the data. @@ -178,7 +192,7 @@ func (s *serializer) flushToDisk(ctx actor.Context) error { out := snappy.Encode(buf) meta := map[string]string{ // product.signal_type.schema.version - "version": "alloy.metrics.queue.v1", + "version": types.AlloyFileVersion, "compression": "snappy", "series_count": strconv.Itoa(len(group.Series)), "meta_count": strconv.Itoa(len(group.Metadata)), @@ -197,7 +211,6 @@ func (s *serializer) storeStats(err error) { for _, ts := range s.series { if ts.TS > newestTS { newestTS = ts.TS - } } s.stats(types.SerializerStats{ diff --git a/internal/component/prometheus/remote/queue/types.go b/internal/component/prometheus/remote/queue/types.go new file mode 100644 index 0000000000..097e8f0aaf --- /dev/null +++ b/internal/component/prometheus/remote/queue/types.go @@ -0,0 +1,119 @@ +package queue + +import ( + "fmt" + "time" + + "github.com/grafana/alloy/internal/component/prometheus/remote/queue/types" + "github.com/grafana/alloy/syntax/alloytypes" + "github.com/prometheus/common/version" + "github.com/prometheus/prometheus/storage" +) + +func defaultArgs() Arguments { + return Arguments{ + TTL: 2 * time.Hour, + Serialization: Serialization{ + MaxSignalsToBatch: 10_000, + BatchInterval: 5 * time.Second, + }, + } +} + +type Arguments struct { + // TTL is how old a series can be. + TTL time.Duration `alloy:"ttl,attr,optional"` + Serialization Serialization `alloy:"serialization,block,optional"` + Endpoints []EndpointConfig `alloy:"endpoint,block"` +} + +type Serialization struct { + // The batch size to persist to the file queue. + MaxSignalsToBatch int `alloy:"max_signals_to_batch,attr,optional"` + // How often to flush to the file queue if BatchSize isn't met. + BatchInterval time.Duration `alloy:"batch_interval,attr,optional"` +} + +type Exports struct { + Receiver storage.Appendable `alloy:"receiver,attr"` +} + +// SetToDefault sets the default +func (rc *Arguments) SetToDefault() { + *rc = defaultArgs() +} + +func defaultEndpointConfig() EndpointConfig { + return EndpointConfig{ + Timeout: 30 * time.Second, + RetryBackoff: 1 * time.Second, + MaxRetryAttempts: 0, + BatchCount: 1_000, + FlushInterval: 1 * time.Second, + Parallelism: 4, + } +} + +func (cc *EndpointConfig) SetToDefault() { + *cc = defaultEndpointConfig() +} + +func (r *Arguments) Validate() error { + for _, conn := range r.Endpoints { + if conn.BatchCount <= 0 { + return fmt.Errorf("batch_count must be greater than 0") + } + if conn.FlushInterval < 1*time.Second { + return fmt.Errorf("flush_interval must be greater or equal to 1s, the internal timers resolution is 1s") + } + } + + return nil +} + +// EndpointConfig is the alloy specific version of ConnectionConfig. +type EndpointConfig struct { + Name string `alloy:",label"` + URL string `alloy:"url,attr"` + BasicAuth *BasicAuth `alloy:"basic_auth,block,optional"` + Timeout time.Duration `alloy:"write_timeout,attr,optional"` + // How long to wait between retries. + RetryBackoff time.Duration `alloy:"retry_backoff,attr,optional"` + // Maximum number of retries. + MaxRetryAttempts uint `alloy:"max_retry_attempts,attr,optional"` + // How many series to write at a time. + BatchCount int `alloy:"batch_count,attr,optional"` + // How long to wait before sending regardless of batch count. + FlushInterval time.Duration `alloy:"flush_interval,attr,optional"` + // How many concurrent queues to have. + Parallelism uint `alloy:"parallelism,attr,optional"` + ExternalLabels map[string]string `alloy:"external_labels,attr,optional"` +} + +var UserAgent = fmt.Sprintf("Alloy/%s", version.Version) + +func (cc EndpointConfig) ToNativeType() types.ConnectionConfig { + tcc := types.ConnectionConfig{ + URL: cc.URL, + UserAgent: UserAgent, + Timeout: cc.Timeout, + RetryBackoff: cc.RetryBackoff, + MaxRetryAttempts: cc.MaxRetryAttempts, + BatchCount: cc.BatchCount, + FlushInterval: cc.FlushInterval, + ExternalLabels: cc.ExternalLabels, + Connections: cc.Parallelism, + } + if cc.BasicAuth != nil { + tcc.BasicAuth = &types.BasicAuth{ + Username: cc.BasicAuth.Username, + Password: string(cc.BasicAuth.Password), + } + } + return tcc +} + +type BasicAuth struct { + Username string `alloy:"username,attr,optional"` + Password alloytypes.Secret `alloy:"password,attr,optional"` +} diff --git a/internal/component/prometheus/remote/queue/types/network.go b/internal/component/prometheus/remote/queue/types/network.go index 024f8a3122..c36ea930c4 100644 --- a/internal/component/prometheus/remote/queue/types/network.go +++ b/internal/component/prometheus/remote/queue/types/network.go @@ -11,20 +11,26 @@ type NetworkClient interface { Stop() SendSeries(ctx context.Context, d *TimeSeriesBinary) error SendMetadata(ctx context.Context, d *TimeSeriesBinary) error + // UpdateConfig is a synchronous call and will only return once the config + // is applied or an error occurs. UpdateConfig(ctx context.Context, cfg ConnectionConfig) error } type ConnectionConfig struct { - URL string - Username string - Password string - UserAgent string - Timeout time.Duration - RetryBackoff time.Duration - MaxRetryBackoffAttempts time.Duration - BatchCount int - FlushFrequency time.Duration - ExternalLabels map[string]string - Connections uint64 + URL string + BasicAuth *BasicAuth + UserAgent string + Timeout time.Duration + RetryBackoff time.Duration + MaxRetryAttempts uint + BatchCount int + FlushInterval time.Duration + ExternalLabels map[string]string + Connections uint +} + +type BasicAuth struct { + Username string + Password string } func (cc ConnectionConfig) Equals(bb ConnectionConfig) bool { diff --git a/internal/component/prometheus/remote/queue/types/serializer.go b/internal/component/prometheus/remote/queue/types/serializer.go index 6919f666f4..d0041242cc 100644 --- a/internal/component/prometheus/remote/queue/types/serializer.go +++ b/internal/component/prometheus/remote/queue/types/serializer.go @@ -5,6 +5,8 @@ import ( "time" ) +const AlloyFileVersion = "alloy.metrics.queue.v1" + type SerializerConfig struct { // MaxSignalsInBatch controls what the max batch size is. MaxSignalsInBatch uint32 diff --git a/internal/component/prometheus/remote/queue/types/stats.go b/internal/component/prometheus/remote/queue/types/stats.go index 4107d8089f..732b6255aa 100644 --- a/internal/component/prometheus/remote/queue/types/stats.go +++ b/internal/component/prometheus/remote/queue/types/stats.go @@ -6,6 +6,8 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +// TODO @mattdurham separate this into more manageable chunks, and likely 3 stats series: series, metadata and new ones. + type SerializerStats struct { SeriesStored int MetadataStored int @@ -24,10 +26,10 @@ type PrometheusStats struct { NetworkErrors prometheus.Counter NetworkNewestOutTimeStampSeconds prometheus.Gauge - // Filequeue Stats - FilequeueInSeries prometheus.Counter - FilequeueNewestInTimeStampSeconds prometheus.Gauge - FilequeueErrors prometheus.Counter + // Serializer Stats + SerializerInSeries prometheus.Counter + SerializerNewestInTimeStampSeconds prometheus.Gauge + SerializerErrors prometheus.Counter // Backwards compatibility metrics SamplesTotal prometheus.Counter @@ -55,20 +57,20 @@ type PrometheusStats struct { func NewStats(namespace, subsystem string, registry prometheus.Registerer) *PrometheusStats { s := &PrometheusStats{ - FilequeueInSeries: prometheus.NewCounter(prometheus.CounterOpts{ + SerializerInSeries: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "filequeue_incoming", + Name: "serializer_incoming_signals", }), - FilequeueNewestInTimeStampSeconds: prometheus.NewGauge(prometheus.GaugeOpts{ + SerializerNewestInTimeStampSeconds: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "filequeue_incoming_timestamp_seconds", + Name: "serializer_incoming_timestamp_seconds", }), - FilequeueErrors: prometheus.NewGauge(prometheus.GaugeOpts{ + SerializerErrors: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "filequeue_errors", + Name: "serializer_errors", }), NetworkNewestOutTimeStampSeconds: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, @@ -175,28 +177,33 @@ func NewStats(namespace, subsystem string, registry prometheus.Registerer) *Prom s.NetworkSeriesSent, s.NetworkErrors, s.NetworkNewestOutTimeStampSeconds, - s.FilequeueInSeries, - s.FilequeueErrors, - s.FilequeueNewestInTimeStampSeconds, + s.SerializerInSeries, + s.SerializerErrors, + s.SerializerNewestInTimeStampSeconds, ) return s } -func (s *PrometheusStats) BackwardsCompatibility(registry prometheus.Registerer) { +func (s *PrometheusStats) SeriesBackwardsCompatibility(registry prometheus.Registerer) { registry.MustRegister( s.RemoteStorageDuration, s.RemoteStorageInTimestamp, s.RemoteStorageOutTimestamp, s.SamplesTotal, s.HistogramsTotal, - s.MetadataTotal, s.FailedSamplesTotal, s.FailedHistogramsTotal, - s.FailedMetadataTotal, s.RetriedSamplesTotal, s.RetriedHistogramsTotal, - s.RetriedMetadataTotal, s.SentBytesTotal, + ) +} + +func (s *PrometheusStats) MetaBackwardsCompatibility(registry prometheus.Registerer) { + registry.MustRegister( + s.MetadataTotal, + s.FailedMetadataTotal, + s.RetriedMetadataTotal, s.MetadataBytesTotal, ) } @@ -212,6 +219,7 @@ func (s *PrometheusStats) UpdateNetwork(stats NetworkStats) { // The newest timestamp is no always sent. if stats.NewestTimestamp != 0 { s.RemoteStorageOutTimestamp.Set(float64(stats.NewestTimestamp)) + s.NetworkNewestOutTimeStampSeconds.Set(float64(stats.NewestTimestamp)) } s.SamplesTotal.Add(float64(stats.Series.SeriesSent)) @@ -230,13 +238,15 @@ func (s *PrometheusStats) UpdateNetwork(stats NetworkStats) { s.SentBytesTotal.Add(float64(stats.SeriesBytes)) } -func (s *PrometheusStats) UpdateFileQueue(stats SerializerStats) { - s.FilequeueInSeries.Add(float64(stats.SeriesStored)) - s.FilequeueErrors.Add(float64(stats.Errors)) +func (s *PrometheusStats) UpdateSerializer(stats SerializerStats) { + s.SerializerInSeries.Add(float64(stats.SeriesStored)) + s.SerializerInSeries.Add(float64(stats.MetadataStored)) + s.SerializerErrors.Add(float64(stats.Errors)) if stats.NewestTimestamp != 0 { - s.FilequeueNewestInTimeStampSeconds.Set(float64(stats.NewestTimestamp)) + s.SerializerNewestInTimeStampSeconds.Set(float64(stats.NewestTimestamp)) s.RemoteStorageInTimestamp.Set(float64(stats.NewestTimestamp)) } + } type NetworkStats struct { diff --git a/internal/component/prometheus/remote/queue/types/storage_test.go b/internal/component/prometheus/remote/queue/types/storage_test.go new file mode 100644 index 0000000000..f994427792 --- /dev/null +++ b/internal/component/prometheus/remote/queue/types/storage_test.go @@ -0,0 +1,26 @@ +package types + +import ( + "testing" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" +) + +func TestStorage(t *testing.T) { + ts := GetTimeSeriesFromPool() + ts.Labels = labels.FromStrings("one", "two") + ts.LabelsValues = make([]uint32, 1) + ts.LabelsNames = make([]uint32, 1) + ts.LabelsValues[0] = 1 + ts.LabelsNames[0] = 2 + + PutTimeSeriesIntoPool(ts) + ts = GetTimeSeriesFromPool() + defer PutTimeSeriesIntoPool(ts) + require.Len(t, ts.Labels, 0) + require.True(t, cap(ts.LabelsValues) == 1) + require.True(t, cap(ts.LabelsNames) == 1) + require.Len(t, ts.LabelsValues, 0) + require.Len(t, ts.LabelsNames, 0) +}