diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 0a56796228e..0d5a93e2048 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -187,6 +187,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { case "extra-scrape-metrics": c.scrape.ExtraMetrics = true level.Info(logger).Log("msg", "Experimental additional scrape metrics enabled") + case "metadata-storage": + c.scrape.EnableMetadataStorage = true + level.Info(logger).Log("msg", "Experimental in-memory metadata storage enabled") case "new-service-discovery-manager": c.enableNewSDManager = true level.Info(logger).Log("msg", "Experimental service discovery manager") @@ -431,7 +434,7 @@ func main() { a.Flag("scrape.discovery-reload-interval", "Interval used by scrape manager to throttle target groups updates."). Hidden().Default("5s").SetValue(&cfg.scrape.DiscoveryReloadInterval) - a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). + a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, metadata-storage. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). Default("").StringsVar(&cfg.featureList) a.Flag("remote-write-format", "remote write proto format to use, valid options: 0 (1.0), 1 (reduced format), 3 (min64 format)"). @@ -521,6 +524,12 @@ func main() { cfg.tsdb.OutOfOrderTimeWindow = cfgFile.StorageConfig.TSDBConfig.OutOfOrderTimeWindow } + for _, rwc := range cfgFile.RemoteWriteConfigs { + if rwc.SendWALMetadata && !cfg.scrape.EnableMetadataStorage { + level.Warn(logger).Log("msg", "the 'send_metadata' remote_write parameter must be set along the --enable-features=metadata-storage flag to take effect") + } + } + // Now that the validity of the config is established, set the config // success metrics accordingly, although the config isn't really loaded // yet. This will happen later (including setting these metrics again), diff --git a/config/config.go b/config/config.go index ddcca84dc78..d66e90c1fa8 100644 --- a/config/config.go +++ b/config/config.go @@ -1022,6 +1022,7 @@ type RemoteWriteConfig struct { Name string `yaml:"name,omitempty"` SendExemplars bool `yaml:"send_exemplars,omitempty"` SendNativeHistograms bool `yaml:"send_native_histograms,omitempty"` + SendWALMetadata bool `yaml:"send_metadata,omitempty"` // TODO(@tpaschalis) Adding an extra field to enable us to remove the `metadata_config` struct in the future. // We cannot do proper Go type embedding below as the parser will then parse // values arbitrarily into the overflow maps of further-down types. diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index eae64645379..9554f8f847a 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -52,7 +52,7 @@ The Prometheus monitoring server | --query.timeout | Maximum time a query may take before being aborted. Use with server mode only. | `2m` | | --query.max-concurrency | Maximum number of queries executed concurrently. Use with server mode only. | `20` | | --query.max-samples | Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return. Use with server mode only. | `50000000` | -| --enable-feature | Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | | +| --enable-feature | Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, promql-per-step-stats, promql-experimental-functions, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager, auto-gomaxprocs, no-default-scrape-port, native-histograms, otlp-write-receiver, metadata-storage. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details. | | | --remote-write-format | remote write proto format to use, valid options: 0 (1.0), 1 (reduced format), 3 (min64 format) | `0` | | --log.level | Only log messages with the given severity or above. One of: [debug, info, warn, error] | `info` | | --log.format | Output format of log messages. One of: [logfmt, json] | `logfmt` | diff --git a/docs/feature_flags.md b/docs/feature_flags.md index bcf8309b5c9..78959058a60 100644 --- a/docs/feature_flags.md +++ b/docs/feature_flags.md @@ -185,7 +185,7 @@ This should **only** be applied to metrics that currently produce such labels. `--enable-feature=otlp-write-receiver` The OTLP receiver allows Prometheus to accept [OpenTelemetry](https://opentelemetry.io/) metrics writes. -Prometheus is best used as a Pull based system, and staleness, `up` metric, and other Pull enabled features +Prometheus is best used as a Pull based system, and staleness, `up` metric, and other Pull enabled features won't work when you push OTLP metrics. ## Experimental PromQL functions @@ -204,3 +204,12 @@ Enables ingestion of created timestamp. Created timestamps are injected as 0 val Currently Prometheus supports created timestamps only on the traditional Prometheus Protobuf protocol (WIP for other protocols). As a result, when enabling this feature, the Prometheus protobuf scrape protocol will be prioritized (See `scrape_config.scrape_protocols` settings for more details). Besides enabling this feature in Prometheus, created timestamps need to be exposed by the application being scraped. + +## Metadata Storage +`--enable-features=metadata-storage` + +When enabled, Prometheus will store metadata in-memory and keep track of +metadata changes as WAL records. This should be used along the remote write's +`send_metadata` parameter. +This new way of storing and sending metadata is mutually exclusive with the +`metadata_config.send` field and the current way of sending metadata. diff --git a/prompb/types.proto b/prompb/types.proto index effc626fc4d..499dd367af8 100644 --- a/prompb/types.proto +++ b/prompb/types.proto @@ -38,7 +38,6 @@ message MetricMetadata { string unit = 5; } - message Sample { double value = 1; // timestamp is in ms format, see model/timestamp/timestamp.go for diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 39016654222..b7bd2563f7a 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -33,6 +33,7 @@ import ( "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/prompb" writev2 "github.com/prometheus/prometheus/prompb/write/v2" @@ -629,6 +630,13 @@ func exemplarProtoToExemplar(ep prompb.Exemplar) exemplar.Exemplar { } } +func minMetadataProtoToMetadata(mp writev2.Metadata, symbols []string) metadata.Metadata { + return metadata.Metadata{ + Type: metricTypeFromProtoEquivalent(mp.Type), + Unit: symbols[mp.UnitRef], // TODO: check for overflow + Help: symbols[mp.HelpRef], // TODO: check for overflow + } +} func minExemplarProtoToExemplar(ep writev2.Exemplar, symbols []string) exemplar.Exemplar { timestamp := ep.Timestamp @@ -925,6 +933,21 @@ func metricTypeToMetricTypeProto(t textparse.MetricType) prompb.MetricMetadata_M return prompb.MetricMetadata_MetricType(v) } +func metricTypeToProtoEquivalent(t textparse.MetricType) writev2.Metadata_MetricType { + mt := strings.ToUpper(string(t)) + v, ok := writev2.Metadata_MetricType_value[mt] + if !ok { + return writev2.Metadata_UNKNOWN + } + + return writev2.Metadata_MetricType(v) +} + +func metricTypeFromProtoEquivalent(t writev2.Metadata_MetricType) textparse.MetricType { + mt := strings.ToLower(t.String()) + return textparse.MetricType(mt) // TODO(@tpaschalis) a better way for this? +} + // DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling // snappy decompression. func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) { diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index 800bf0eeecd..651514c7df6 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -92,6 +92,10 @@ var writeRequestMinimizedFixture = func() *writev2.WriteRequest { for _, s := range []string{ "f", "g", // 10, 11 "h", "i", // 12, 13 + "help text 1", //14 + "unit text 1", //15 + "help text 2", //16 + "unit text 2", //17 } { st.RefStr(s) } @@ -103,12 +107,14 @@ var writeRequestMinimizedFixture = func() *writev2.WriteRequest { Samples: []writev2.Sample{{Value: 1, Timestamp: 0}}, Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{10, 11}, Value: 1, Timestamp: 0}}, Histograms: []writev2.Histogram{HistogramToMinHistogramProto(0, &testHistogram), FloatHistogramToMinHistogramProto(1, testHistogram.ToFloat(nil))}, + Metadata: writev2.Metadata{Type: writev2.Metadata_COUNTER, HelpRef: 14, UnitRef: 15}, }, { LabelsRefs: labels, Samples: []writev2.Sample{{Value: 2, Timestamp: 1}}, Exemplars: []writev2.Exemplar{{LabelsRefs: []uint32{12, 13}, Value: 2, Timestamp: 1}}, Histograms: []writev2.Histogram{HistogramToMinHistogramProto(2, &testHistogram), FloatHistogramToMinHistogramProto(3, testHistogram.ToFloat(nil))}, + Metadata: writev2.Metadata{Type: writev2.Metadata_GAUGE, HelpRef: 16, UnitRef: 17}, }, }, Symbols: st.LabelsStrings(), diff --git a/storage/remote/metadata_watcher.go b/storage/remote/metadata_watcher.go index 21de565ed92..4761306cb7c 100644 --- a/storage/remote/metadata_watcher.go +++ b/storage/remote/metadata_watcher.go @@ -27,7 +27,7 @@ import ( // MetadataAppender is an interface used by the Metadata Watcher to send metadata, It is read from the scrape manager, on to somewhere else. type MetadataAppender interface { - AppendMetadata(context.Context, []scrape.MetricMetadata) + AppendWatcherMetadata(context.Context, []scrape.MetricMetadata) } // Watchable represents from where we fetch active targets for metadata. @@ -146,7 +146,7 @@ func (mw *MetadataWatcher) collect() { } // Blocks until the metadata is sent to the remote write endpoint or hardShutdownContext is expired. - mw.writer.AppendMetadata(mw.hardShutdownCtx, metadata) + mw.writer.AppendWatcherMetadata(mw.hardShutdownCtx, metadata) } func (mw *MetadataWatcher) ready() bool { diff --git a/storage/remote/metadata_watcher_test.go b/storage/remote/metadata_watcher_test.go index cd664bc8beb..e5c25c79947 100644 --- a/storage/remote/metadata_watcher_test.go +++ b/storage/remote/metadata_watcher_test.go @@ -58,7 +58,7 @@ type writeMetadataToMock struct { metadataAppended int } -func (mwtm *writeMetadataToMock) AppendMetadata(_ context.Context, m []scrape.MetricMetadata) { +func (mwtm *writeMetadataToMock) AppendWatcherMetadata(_ context.Context, m []scrape.MetricMetadata) { mwtm.metadataAppended += len(m) } diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 4bc9abb1f97..9bbe65ca781 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -35,6 +35,7 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/prompb" writev2 "github.com/prometheus/prometheus/prompb/write/v2" @@ -409,6 +410,7 @@ type QueueManager struct { relabelConfigs []*relabel.Config sendExemplars bool sendNativeHistograms bool + sendMetadata bool watcher *wlog.Watcher metadataWatcher *MetadataWatcher // experimental feature, new remote write proto format @@ -417,9 +419,10 @@ type QueueManager struct { clientMtx sync.RWMutex storeClient WriteClient - seriesMtx sync.Mutex // Covers seriesLabels and droppedSeries. - seriesLabels map[chunks.HeadSeriesRef]labels.Labels - droppedSeries map[chunks.HeadSeriesRef]struct{} + seriesMtx sync.Mutex // Covers seriesLabels, seriesMetadata and droppedSeries. + seriesLabels map[chunks.HeadSeriesRef]labels.Labels + seriesMetadata map[chunks.HeadSeriesRef]*metadata.Metadata + droppedSeries map[chunks.HeadSeriesRef]struct{} seriesSegmentMtx sync.Mutex // Covers seriesSegmentIndexes - if you also lock seriesMtx, take seriesMtx first. seriesSegmentIndexes map[chunks.HeadSeriesRef]int @@ -460,6 +463,7 @@ func NewQueueManager( sm ReadyScrapeManager, enableExemplarRemoteWrite bool, enableNativeHistogramRemoteWrite bool, + enableMetadataRemoteWrite bool, rwFormat RemoteWriteFormat, ) *QueueManager { if logger == nil { @@ -483,11 +487,13 @@ func NewQueueManager( storeClient: client, sendExemplars: enableExemplarRemoteWrite, sendNativeHistograms: enableNativeHistogramRemoteWrite, + sendMetadata: enableMetadataRemoteWrite, // TODO: we should eventually set the format via content negotiation, // so this field would be the desired format, maybe with a fallback? rwFormat: rwFormat, seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels), + seriesMetadata: make(map[chunks.HeadSeriesRef]*metadata.Metadata), seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int), droppedSeries: make(map[chunks.HeadSeriesRef]struct{}), @@ -505,7 +511,17 @@ func NewQueueManager( highestRecvTimestamp: highestRecvTimestamp, } - t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite) + t.watcher = wlog.NewWatcher(watcherMetrics, readerMetrics, logger, client.Name(), t, dir, enableExemplarRemoteWrite, enableNativeHistogramRemoteWrite, enableMetadataRemoteWrite) + + // The current MetadataWatcher implementation is mutually exclusive + // with the new approach, which stores metadata as WAL records and + // ships them alongside series. If both mechanisms are set, the new one + // takes precedence by implicitly disabling the older one. + if t.mcfg.Send && t.sendMetadata { + level.Warn(logger).Log("msg", "the 'send_metadata' and 'metadata_config.send' parameters are mutually exclusive; defaulting to use 'send_metadata'") + t.mcfg.Send = false + } + if t.mcfg.Send { t.metadataWatcher = NewMetadataWatcher(logger, sm, client.Name(), t, t.mcfg.SendInterval, flushDeadline) } @@ -514,8 +530,8 @@ func NewQueueManager( return t } -// AppendMetadata sends metadata to the remote storage. Metadata is sent in batches, but is not parallelized. -func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.MetricMetadata) { +// AppendWatcherMetadata sends metadata to the remote storage. Metadata is sent in batches, but is not parallelized. +func (t *QueueManager) AppendWatcherMetadata(ctx context.Context, metadata []scrape.MetricMetadata) { mm := make([]prompb.MetricMetadata, 0, len(metadata)) for _, entry := range metadata { mm = append(mm, prompb.MetricMetadata{ @@ -605,6 +621,7 @@ outer: t.seriesMtx.Unlock() continue } + meta := t.seriesMetadata[s.Ref] t.seriesMtx.Unlock() // Start with a very small backoff. This should not be t.cfg.MinBackoff // as it can happen without errors, and we want to pickup work after @@ -619,6 +636,7 @@ outer: } if t.shards.enqueue(s.Ref, timeSeries{ seriesLabels: lbls, + metadata: meta, timestamp: s.T, value: s.V, sType: tSample, @@ -658,6 +676,7 @@ outer: t.seriesMtx.Unlock() continue } + meta := t.seriesMetadata[e.Ref] t.seriesMtx.Unlock() // This will only loop if the queues are being resharded. backoff := t.cfg.MinBackoff @@ -669,6 +688,7 @@ outer: } if t.shards.enqueue(e.Ref, timeSeries{ seriesLabels: lbls, + metadata: meta, timestamp: e.T, value: e.V, exemplarLabels: e.Labels, @@ -706,6 +726,7 @@ outer: t.seriesMtx.Unlock() continue } + meta := t.seriesMetadata[h.Ref] t.seriesMtx.Unlock() backoff := model.Duration(5 * time.Millisecond) @@ -717,6 +738,7 @@ outer: } if t.shards.enqueue(h.Ref, timeSeries{ seriesLabels: lbls, + metadata: meta, timestamp: h.T, histogram: h.H, sType: tHistogram, @@ -753,6 +775,7 @@ outer: t.seriesMtx.Unlock() continue } + meta := t.seriesMetadata[h.Ref] t.seriesMtx.Unlock() backoff := model.Duration(5 * time.Millisecond) @@ -764,6 +787,7 @@ outer: } if t.shards.enqueue(h.Ref, timeSeries{ seriesLabels: lbls, + metadata: meta, timestamp: h.T, floatHistogram: h.FH, sType: tFloatHistogram, @@ -858,6 +882,24 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) { } } +// StoreMetadata keeps track of known series' metadata for lookups when sending samples to remote. +func (t *QueueManager) StoreMetadata(meta []record.RefMetadata) { + if !t.sendMetadata { + return + } + + t.seriesMtx.Lock() + defer t.seriesMtx.Unlock() + for _, m := range meta { + // TODO(@tpaschalis) Introduce and use an internMetadata method for the unit/help texts. + t.seriesMetadata[m.Ref] = &metadata.Metadata{ + Type: record.ToTextparseMetricType(m.Type), + Unit: m.Unit, + Help: m.Help, + } + } +} + // UpdateSeriesSegment updates the segment number held against the series, // so we can trim older ones in SeriesReset. func (t *QueueManager) UpdateSeriesSegment(series []record.RefSeries, index int) { @@ -883,6 +925,7 @@ func (t *QueueManager) SeriesReset(index int) { delete(t.seriesSegmentIndexes, k) t.releaseLabels(t.seriesLabels[k]) delete(t.seriesLabels, k) + delete(t.seriesMetadata, k) delete(t.droppedSeries, k) } } @@ -1107,6 +1150,7 @@ type shards struct { samplesDroppedOnHardShutdown atomic.Uint32 exemplarsDroppedOnHardShutdown atomic.Uint32 histogramsDroppedOnHardShutdown atomic.Uint32 + metadataDroppedOnHardShutdown atomic.Uint32 } // start the shards; must be called before any call to enqueue. @@ -1135,6 +1179,7 @@ func (s *shards) start(n int) { s.samplesDroppedOnHardShutdown.Store(0) s.exemplarsDroppedOnHardShutdown.Store(0) s.histogramsDroppedOnHardShutdown.Store(0) + s.metadataDroppedOnHardShutdown.Store(0) for i := 0; i < n; i++ { go s.runShard(hardShutdownCtx, i, newQueues[i]) } @@ -1225,6 +1270,7 @@ type timeSeries struct { value float64 histogram *histogram.Histogram floatHistogram *histogram.FloatHistogram + metadata *metadata.Metadata timestamp int64 exemplarLabels labels.Labels // The type of series: sample, exemplar, or histogram. @@ -1366,6 +1412,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { pBufRaw []byte buf []byte ) + // TODO(@tpaschalis) Should we also raise the max if we have WAL metadata? if s.qm.sendExemplars { max += int(float64(max) * 0.1) } @@ -1425,7 +1472,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { n := nPendingSamples + nPendingExemplars + nPendingHistograms s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) case MinStrings: - nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesStr(&symbolTable, batch, pendingMinStrData, s.qm.sendExemplars, s.qm.sendNativeHistograms) + nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesStr(&symbolTable, batch, pendingMinStrData, s.qm.sendExemplars, s.qm.sendNativeHistograms, s.qm.sendMetadata) n := nPendingSamples + nPendingExemplars + nPendingHistograms s.sendMinStrSamples(ctx, pendingMinStrData[:n], symbolTable.LabelsStrings(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf) symbolTable.clear() @@ -1447,7 +1494,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) case MinStrings: - nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesStr(&symbolTable, batch, pendingMinStrData, s.qm.sendExemplars, s.qm.sendNativeHistograms) + nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesStr(&symbolTable, batch, pendingMinStrData, s.qm.sendExemplars, s.qm.sendNativeHistograms, s.qm.sendMetadata) n := nPendingSamples + nPendingExemplars + nPendingHistograms s.sendMinStrSamples(ctx, pendingMinStrData[:n], symbolTable.LabelsStrings(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf) symbolTable.clear() @@ -1474,6 +1521,7 @@ func populateTimeSeries(batch []timeSeries, pendingData []prompb.TimeSeries, sen // retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll // stop reading from the queue. This makes it safe to reference pendingSamples by index. pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) + switch d.sType { case tSample: pendingData[nPending].Samples = append(pendingData[nPending].Samples, prompb.Sample{ @@ -1605,7 +1653,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, rawReq []byte, samp return err } -func populateMinimizedTimeSeriesStr(symbolTable *rwSymbolTable, batch []timeSeries, pendingData []writev2.TimeSeries, sendExemplars, sendNativeHistograms bool) (int, int, int) { +func populateMinimizedTimeSeriesStr(symbolTable *rwSymbolTable, batch []timeSeries, pendingData []writev2.TimeSeries, sendExemplars, sendNativeHistograms, sendMetadata bool) (int, int, int) { var nPendingSamples, nPendingExemplars, nPendingHistograms int for nPending, d := range batch { pendingData[nPending].Samples = pendingData[nPending].Samples[:0] @@ -1622,6 +1670,13 @@ func populateMinimizedTimeSeriesStr(symbolTable *rwSymbolTable, batch []timeSeri // pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels) pendingData[nPending].LabelsRefs = labelsToUint32SliceStr(d.seriesLabels, symbolTable, pendingData[nPending].LabelsRefs) + if sendMetadata && d.metadata != nil { + pendingData[nPending].Metadata = writev2.Metadata{ + Type: metricTypeToProtoEquivalent(d.metadata.Type), + HelpRef: symbolTable.RefStr(d.metadata.Help), + UnitRef: symbolTable.RefStr(d.metadata.Unit), + } + } switch d.sType { case tSample: pendingData[nPending].Samples = append(pendingData[nPending].Samples, writev2.Sample{ diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 7aca8550046..d6dd0f71f28 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -96,6 +96,7 @@ func TestSampleDelivery(t *testing.T) { writeConfig.QueueConfig = queueConfig writeConfig.SendExemplars = true writeConfig.SendNativeHistograms = true + writeConfig.SendWALMetadata = true conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, @@ -112,6 +113,7 @@ func TestSampleDelivery(t *testing.T) { var ( series []record.RefSeries + metadata []record.RefMetadata samples []record.RefSample exemplars []record.RefExemplar histograms []record.RefHistogramSample @@ -121,6 +123,7 @@ func TestSampleDelivery(t *testing.T) { // Generates same series in both cases. if tc.samples { samples, series = createTimeseries(n, n) + metadata = createSeriesMetadata(series) } if tc.exemplars { exemplars, series = createExemplars(n, n) @@ -144,6 +147,7 @@ func TestSampleDelivery(t *testing.T) { qm.SetClient(c) qm.StoreSeries(series, 0) + qm.StoreMetadata(metadata) // Send first half of data. c.expectSamples(samples[:len(samples)/2], series) @@ -179,7 +183,7 @@ func TestMetadataDelivery(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, 0) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, 0) m.Start() defer m.Stop() @@ -194,7 +198,7 @@ func TestMetadataDelivery(t *testing.T) { }) } - m.AppendMetadata(context.Background(), metadata) + m.AppendWatcherMetadata(context.Background(), metadata) require.Len(t, c.receivedMetadata, numMetadata) // One more write than the rounded qoutient should be performed in order to get samples that didn't @@ -204,6 +208,47 @@ func TestMetadataDelivery(t *testing.T) { require.Equal(t, c.receivedMetadata[metadata[len(metadata)-1].Metric][0].MetricFamilyName, metadata[len(metadata)-1].Metric) } +func TestWALMetadataDelivery(t *testing.T) { + dir := t.TempDir() + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1) + defer s.Close() + + cfg := config.DefaultQueueConfig + cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) + cfg.MaxShards = 1 + + writeConfig := baseRemoteWriteConfig("http://test-storage.com") + writeConfig.QueueConfig = cfg + + conf := &config.Config{ + GlobalConfig: config.DefaultGlobalConfig, + RemoteWriteConfigs: []*config.RemoteWriteConfig{ + writeConfig, + }, + } + + num := 3 + _, series := createTimeseries(0, num) + metadata := createSeriesMetadata(series) + + require.NoError(t, s.ApplyConfig(conf)) + hash, err := toHash(writeConfig) + require.NoError(t, err) + qm := s.rws.queues[hash] + qm.sendMetadata = true + + c := NewTestWriteClient(Base1) + qm.SetClient(c) + + qm.StoreSeries(series, 0) + qm.StoreMetadata(metadata) + + require.Equal(t, num, len(qm.seriesLabels)) + require.Equal(t, num, len(qm.seriesMetadata)) + + c.waitForExpectedData(t) +} + func TestSampleDeliveryTimeout(t *testing.T) { for _, rwFormat := range []RemoteWriteFormat{Base1, MinStrings} { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { @@ -220,7 +265,7 @@ func TestSampleDeliveryTimeout(t *testing.T) { dir := t.TempDir() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, rwFormat) m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -266,7 +311,7 @@ func TestSampleDeliveryOrder(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, rwFormat) m.StoreSeries(series, 0) m.Start() @@ -288,7 +333,7 @@ func TestShutdown(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, Base1) n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend samples, series := createTimeseries(n, n) m.StoreSeries(series, 0) @@ -326,17 +371,22 @@ func TestSeriesReset(t *testing.T) { cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, true, Base1) for i := 0; i < numSegments; i++ { series := []record.RefSeries{} + metadata := []record.RefMetadata{} for j := 0; j < numSeries; j++ { series = append(series, record.RefSeries{Ref: chunks.HeadSeriesRef((i * 100) + j), Labels: labels.FromStrings("a", "a")}) + metadata = append(metadata, record.RefMetadata{Ref: chunks.HeadSeriesRef((i * 100) + j), Type: 0, Unit: "unit text", Help: "help text"}) } m.StoreSeries(series, i) + m.StoreMetadata(metadata) } require.Len(t, m.seriesLabels, numSegments*numSeries) + require.Len(t, m.seriesMetadata, numSegments*numSeries) m.SeriesReset(2) require.Len(t, m.seriesLabels, numSegments*numSeries/2) + require.Len(t, m.seriesMetadata, numSegments*numSeries/2) } func TestReshard(t *testing.T) { @@ -357,7 +407,7 @@ func TestReshard(t *testing.T) { dir := t.TempDir() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, rwFormat) m.StoreSeries(series, 0) m.Start() @@ -397,7 +447,7 @@ func TestReshardRaceWithStop(t *testing.T) { go func() { for { metrics := newQueueManagerMetrics(nil, "", "") - m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat) + m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, rwFormat) m.Start() h.Unlock() h.Lock() @@ -436,7 +486,7 @@ func TestReshardPartialBatch(t *testing.T) { cfg.BatchSendDeadline = model.Duration(batchSendDeadline) metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat) + m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, rwFormat) m.StoreSeries(series, 0) m.Start() @@ -485,7 +535,7 @@ func TestQueueFilledDeadlock(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat) + m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, rwFormat) m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -516,7 +566,7 @@ func TestReleaseNoninternedString(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") c := NewTestWriteClient(rwFormat) - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, rwFormat) m.Start() defer m.Stop() @@ -566,7 +616,7 @@ func TestShouldReshard(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") // todo: test with new proto type(s) client := NewTestWriteClient(Base1) - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, Base1) m.numShards = c.startingShards m.dataIn.incr(c.samplesIn) m.dataOut.incr(c.samplesOut) @@ -682,6 +732,20 @@ func createHistograms(numSamples, numSeries int, floatHistogram bool) ([]record. return histograms, nil, series } +func createSeriesMetadata(series []record.RefSeries) []record.RefMetadata { + metas := make([]record.RefMetadata, len(series)) + + for _, s := range series { + metas = append(metas, record.RefMetadata{ + Ref: s.Ref, + Type: uint8(record.Counter), + Unit: "unit text", + Help: "help text", + }) + } + return metas +} + func getSeriesNameFromRef(r record.RefSeries) string { return r.Labels.Get("__name__") } @@ -865,7 +929,6 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error { } else { c.receivedHistograms[seriesName] = append(c.receivedHistograms[seriesName], hist) } - } } if c.withWaitGroup { @@ -964,7 +1027,7 @@ func BenchmarkSampleSend(b *testing.B) { metrics := newQueueManagerMetrics(nil, "", "") // todo: test with new proto type(s) - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, Base1) m.StoreSeries(series, 0) // These should be received by the client. @@ -1011,7 +1074,7 @@ func BenchmarkStartup(b *testing.B) { // todo: test with new proto type(s) m := NewQueueManager(metrics, nil, nil, logger, dir, newEWMARate(ewmaWeight, shardUpdateDuration), - cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, Base1) + cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, false, Base1) m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) m.watcher.MaxSegment = segments[len(segments)-2] err := m.watcher.Run() @@ -1095,7 +1158,7 @@ func TestCalculateDesiredShards(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) // todo: test with new proto type(s) - m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1) + m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, Base1) // Need to start the queue manager so the proper metrics are initialized. // However we can stop it right away since we don't need to do any actual @@ -1173,7 +1236,7 @@ func TestCalculateDesiredShardsDetail(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) // todo: test with new proto type(s) - m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1) + m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, false, Base1) for _, tc := range []struct { name string @@ -1513,14 +1576,14 @@ func BenchmarkBuildMinimizedWriteRequest(b *testing.B) { // Warmup buffers for i := 0; i < 10; i++ { - populateMinimizedTimeSeriesStr(&symbolTable, tc.batch, seriesBuff, true, true) + populateMinimizedTimeSeriesStr(&symbolTable, tc.batch, seriesBuff, true, true, true) buildMinimizedWriteRequestStr(seriesBuff, symbolTable.LabelsStrings(), &pBuf, &buff) } b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) { totalSize := 0 for j := 0; j < b.N; j++ { - populateMinimizedTimeSeriesStr(&symbolTable, tc.batch, seriesBuff, true, true) + populateMinimizedTimeSeriesStr(&symbolTable, tc.batch, seriesBuff, true, true, true) b.ResetTimer() req, _, err := buildMinimizedWriteRequestStr(seriesBuff, symbolTable.LabelsStrings(), &pBuf, &buff) if err != nil { diff --git a/storage/remote/write.go b/storage/remote/write.go index d220ea05f8f..4bbb7c729de 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -210,6 +210,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { rws.scraper, rwConf.SendExemplars, rwConf.SendNativeHistograms, + rwConf.SendWALMetadata, rws.rwFormat, ) // Keep track of which queues are new so we know which to start. diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 59a07f6751b..a7758b36828 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -364,6 +364,13 @@ func (h *writeHandler) writeMinStr(ctx context.Context, req *writev2.WriteReques if err != nil { return err } + + m := minMetadataProtoToMetadata(ts.Metadata, req.Symbols) + _, err = app.UpdateMetadata(0, ls, m) + if err != nil { + level.Debug(h.logger).Log("msg", "error while updating metadata from remote write", "err", err) + } + } if outOfOrderExemplarErrs > 0 { diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 6c378410177..9070dbe77b9 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -105,32 +105,39 @@ func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) { i := 0 j := 0 k := 0 + l := 0 // the reduced write request is equivalent to the write request fixture. // we can use it for - for _, ts := range writeRequestFixture.Timeseries { - ls := labelProtosToLabels(ts.Labels) + for _, ts := range writeRequestMinimizedFixture.Timeseries { + ls := Uint32StrRefToLabels(writeRequestMinimizedFixture.Symbols, ts.LabelsRefs) for _, s := range ts.Samples { require.Equal(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i]) i++ } for _, e := range ts.Exemplars { - exemplarLabels := labelProtosToLabels(e.Labels) + exemplarLabels := Uint32StrRefToLabels(writeRequestMinimizedFixture.Symbols, e.LabelsRefs) require.Equal(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j]) j++ } for _, hp := range ts.Histograms { if hp.IsFloatHistogram() { - fh := FloatHistogramProtoToFloatHistogram(hp) + fh := FloatMinHistogramProtoToFloatHistogram(hp) require.Equal(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k]) } else { - h := HistogramProtoToHistogram(hp) + h := MinHistogramProtoToHistogram(hp) require.Equal(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k]) } k++ } + + m := ts.Metadata + unit := writeRequestMinimizedFixture.Symbols[m.UnitRef] + help := writeRequestMinimizedFixture.Symbols[m.HelpRef] + require.Equal(t, mockMetadata{ls, int32(m.Type), unit, help}, appendable.metadata[l]) + l++ } } @@ -325,6 +332,7 @@ type mockAppendable struct { exemplars []mockExemplar latestHistogram int64 histograms []mockHistogram + metadata []mockMetadata commitErr error } @@ -348,6 +356,13 @@ type mockHistogram struct { fh *histogram.FloatHistogram } +type mockMetadata struct { + l labels.Labels + mtype int32 + unit string + help string +} + func (m *mockAppendable) Appender(_ context.Context) storage.Appender { return m } @@ -390,9 +405,8 @@ func (m *mockAppendable) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t return 0, nil } -func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ metadata.Metadata) (storage.SeriesRef, error) { - // TODO: Wire metadata in a mockAppendable field when we get around to handling metadata in remote_write. - // UpdateMetadata is no-op for remote write (where mockAppendable is being used to test) for now. +func (m *mockAppendable) UpdateMetadata(_ storage.SeriesRef, l labels.Labels, mp metadata.Metadata) (storage.SeriesRef, error) { + m.metadata = append(m.metadata, mockMetadata{l, int32(metricTypeToProtoEquivalent(mp.Type)), mp.Unit, mp.Help}) return 0, nil } diff --git a/tsdb/wlog/watcher.go b/tsdb/wlog/watcher.go index 1c76e388779..be46555347c 100644 --- a/tsdb/wlog/watcher.go +++ b/tsdb/wlog/watcher.go @@ -56,6 +56,7 @@ type WriteTo interface { AppendHistograms([]record.RefHistogramSample) bool AppendFloatHistograms([]record.RefFloatHistogramSample) bool StoreSeries([]record.RefSeries, int) + StoreMetadata([]record.RefMetadata) // Next two methods are intended for garbage-collection: first we call // UpdateSeriesSegment on all current series @@ -87,6 +88,7 @@ type Watcher struct { lastCheckpoint string sendExemplars bool sendHistograms bool + sendMetadata bool metrics *WatcherMetrics readerMetrics *LiveReaderMetrics @@ -169,7 +171,7 @@ func NewWatcherMetrics(reg prometheus.Registerer) *WatcherMetrics { } // NewWatcher creates a new WAL watcher for a given WriteTo. -func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, dir string, sendExemplars, sendHistograms bool) *Watcher { +func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logger log.Logger, name string, writer WriteTo, dir string, sendExemplars, sendHistograms, sendMetadata bool) *Watcher { if logger == nil { logger = log.NewNopLogger() } @@ -182,6 +184,7 @@ func NewWatcher(metrics *WatcherMetrics, readerMetrics *LiveReaderMetrics, logge name: name, sendExemplars: sendExemplars, sendHistograms: sendHistograms, + sendMetadata: sendMetadata, readNotify: make(chan struct{}), quit: make(chan struct{}), @@ -540,6 +543,7 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { histogramsToSend []record.RefHistogramSample floatHistograms []record.RefFloatHistogramSample floatHistogramsToSend []record.RefFloatHistogramSample + metadata []record.RefMetadata ) for r.Next() && !isClosed(w.quit) { rec := r.Record() @@ -651,6 +655,17 @@ func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error { w.writer.AppendFloatHistograms(floatHistogramsToSend) floatHistogramsToSend = floatHistogramsToSend[:0] } + + case record.Metadata: + if !w.sendMetadata || !tail { + break + } + meta, err := dec.Metadata(rec, metadata[:0]) + if err != nil { + w.recordDecodeFailsMetric.Inc() + return err + } + w.writer.StoreMetadata(meta) case record.Tombstones: default: diff --git a/tsdb/wlog/watcher_test.go b/tsdb/wlog/watcher_test.go index b30dce91a36..e25656de8c9 100644 --- a/tsdb/wlog/watcher_test.go +++ b/tsdb/wlog/watcher_test.go @@ -84,6 +84,8 @@ func (wtm *writeToMock) StoreSeries(series []record.RefSeries, index int) { wtm.UpdateSeriesSegment(series, index) } +func (wtm *writeToMock) StoreMetadata(_ []record.RefMetadata) { /* no-op */ } + func (wtm *writeToMock) UpdateSeriesSegment(series []record.RefSeries, index int) { wtm.seriesLock.Lock() defer wtm.seriesLock.Unlock() @@ -210,7 +212,7 @@ func TestTailSamples(t *testing.T) { require.NoError(t, err) wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true, true) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, true, true, true) watcher.SetStartTime(now) // Set the Watcher's metrics so they're not nil pointers. @@ -295,7 +297,7 @@ func TestReadToEndNoCheckpoint(t *testing.T) { require.NoError(t, err) wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) go watcher.Start() expected := seriesCount @@ -384,7 +386,7 @@ func TestReadToEndWithCheckpoint(t *testing.T) { require.NoError(t, err) readTimeout = time.Second wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) go watcher.Start() expected := seriesCount * 2 @@ -455,7 +457,7 @@ func TestReadCheckpoint(t *testing.T) { require.NoError(t, err) wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) go watcher.Start() expectedSeries := seriesCount @@ -524,7 +526,7 @@ func TestReadCheckpointMultipleSegments(t *testing.T) { } wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) watcher.MaxSegment = -1 // Set the Watcher's metrics so they're not nil pointers. @@ -597,7 +599,7 @@ func TestCheckpointSeriesReset(t *testing.T) { readTimeout = time.Second wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) watcher.MaxSegment = -1 go watcher.Start() @@ -676,7 +678,7 @@ func TestRun_StartupTime(t *testing.T) { require.NoError(t, w.Close()) wt := newWriteToMock() - watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false) + watcher := NewWatcher(wMetrics, nil, nil, "", wt, dir, false, false, false) watcher.MaxSegment = segments watcher.setMetrics()