Skip to content

Commit

Permalink
Propagate logger to buildWriteRequest to log dropped data
Browse files Browse the repository at this point in the history
Signed-off-by: Paschalis Tsilias <[email protected]>
  • Loading branch information
tpaschalis committed Dec 14, 2023
1 parent 0b1c94e commit 0ecfa07
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 34 deletions.
2 changes: 1 addition & 1 deletion storage/remote/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ func TestMetricTypeToMetricTypeProto(t *testing.T) {
}

func TestDecodeWriteRequest(t *testing.T) {
buf, _, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil, nil)
buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil)
require.NoError(t, err)

actual, err := DecodeWriteRequest(bytes.NewReader(buf))
Expand Down
28 changes: 22 additions & 6 deletions storage/remote/queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.Met

func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error {
// Build the WriteRequest with no samples.
req, _, _, err := buildWriteRequest(nil, metadata, pBuf, nil, nil)
req, _, _, err := buildWriteRequest(t.logger, nil, metadata, pBuf, nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -1552,7 +1552,7 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
// sendSamples to the remote storage with backoff for recoverable errors.
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error {
// Build the WriteRequest with no metadata.
req, highest, lowest, err := buildWriteRequest(samples, nil, pBuf, *buf, nil)
req, highest, lowest, err := buildWriteRequest(s.qm.logger, samples, nil, pBuf, *buf, nil)
s.qm.buildRequestLimitTimestamp.Store(lowest)
if err != nil {
// Failing to build the write request is non-recoverable, since it will
Expand All @@ -1572,6 +1572,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
if isSampleOld(currentTime, time.Duration(s.qm.cfg.SampleAgeLimit), lowest) {
// This will filter out old samples during retries.
req, _, lowest, err := buildWriteRequest(
s.qm.logger,
samples,
nil,
pBuf,
Expand Down Expand Up @@ -1689,14 +1690,24 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l
}
}

func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) (int64, int64, []prompb.TimeSeries) {
func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) (int64, int64, []prompb.TimeSeries, int, int, int) {
var highest int64
var lowest int64
var droppedSamples, droppedExemplars, droppedHistograms int

keepIdx := 0
lowest = math.MaxInt64
for i, ts := range timeSeries {
if filter != nil && filter(ts) {
if len(ts.Samples) > 0 {
droppedSamples++
}
if len(ts.Exemplars) > 0 {
droppedExemplars++
}
if len(ts.Histograms) > 0 {
droppedHistograms++
}
continue
}

Expand Down Expand Up @@ -1728,11 +1739,16 @@ func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeri
}

timeSeries = timeSeries[:keepIdx]
return highest, lowest, timeSeries
return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms
}

func buildWriteRequest(timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte, filter func(prompb.TimeSeries) bool) ([]byte, int64, int64, error) {
highest, lowest, timeSeries := buildTimeSeries(timeSeries, filter)
func buildWriteRequest(logger log.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte, filter func(prompb.TimeSeries) bool) ([]byte, int64, int64, error) {
highest, lowest, timeSeries,
droppedSamples, droppedExemplars, droppedHistograms := buildTimeSeries(timeSeries, filter)

if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 {
level.Debug(logger).Log("msg", "dropped data due to their age", "droppedSamples", droppedSamples, "droppedExemplars", droppedExemplars, "droppedHistograms", droppedHistograms)
}

req := &prompb.WriteRequest{
Timeseries: timeSeries,
Expand Down
43 changes: 24 additions & 19 deletions storage/remote/queue_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1401,12 +1401,13 @@ func filterTsLimit(limit int64, ts prompb.TimeSeries) bool {

func TestBuildTimeSeries(t *testing.T) {
testCases := []struct {
name string
ts []prompb.TimeSeries
filter func(ts prompb.TimeSeries) bool
lowestTs int64
highestTs int64
responseLen int
name string
ts []prompb.TimeSeries
filter func(ts prompb.TimeSeries) bool
lowestTs int64
highestTs int64
droppedSamples int
responseLen int
}{
{
name: "No filter applied",
Expand Down Expand Up @@ -1477,10 +1478,11 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
},
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) },
responseLen: 2,
lowestTs: 1234567892,
highestTs: 1234567893,
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) },
responseLen: 2,
lowestTs: 1234567892,
highestTs: 1234567893,
droppedSamples: 2,
},
{
name: "Filter applied, samples out of order",
Expand Down Expand Up @@ -1518,10 +1520,11 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
},
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) },
responseLen: 2,
lowestTs: 1234567892,
highestTs: 1234567893,
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567892, ts) },
responseLen: 2,
lowestTs: 1234567892,
highestTs: 1234567893,
droppedSamples: 2,
},
{
name: "Filter applied, samples not consecutive",
Expand Down Expand Up @@ -1559,21 +1562,23 @@ func TestBuildTimeSeries(t *testing.T) {
},
},
},
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567895, ts) },
responseLen: 2,
lowestTs: 1234567895,
highestTs: 1234567897,
filter: func(ts prompb.TimeSeries) bool { return filterTsLimit(1234567895, ts) },
responseLen: 2,
lowestTs: 1234567895,
highestTs: 1234567897,
droppedSamples: 2,
},
}

// Run the test cases
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
highest, lowest, result := buildTimeSeries(tc.ts, tc.filter)
highest, lowest, result, droppedSamples, _, _ := buildTimeSeries(tc.ts, tc.filter)
require.NotNil(t, result)
require.Len(t, result, tc.responseLen)
require.Equal(t, tc.highestTs, highest)
require.Equal(t, tc.lowestTs, lowest)
require.Equal(t, tc.droppedSamples, droppedSamples)
})
}
}
16 changes: 8 additions & 8 deletions storage/remote/write_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
)

func TestRemoteWriteHandler(t *testing.T) {
buf, _, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil, nil)
buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil)
require.NoError(t, err)

req, err := http.NewRequest("", "", bytes.NewReader(buf))
Expand Down Expand Up @@ -84,7 +84,7 @@ func TestRemoteWriteHandler(t *testing.T) {
}

func TestOutOfOrderSample(t *testing.T) {
buf, _, _, err := buildWriteRequest([]prompb.TimeSeries{{
buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Samples: []prompb.Sample{{Value: 1, Timestamp: 0}},
}}, nil, nil, nil, nil)
Expand All @@ -109,7 +109,7 @@ func TestOutOfOrderSample(t *testing.T) {
// don't fail on ingestion errors since the exemplar storage is
// still experimental.
func TestOutOfOrderExemplar(t *testing.T) {
buf, _, _, err := buildWriteRequest([]prompb.TimeSeries{{
buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}},
}}, nil, nil, nil, nil)
Expand All @@ -132,7 +132,7 @@ func TestOutOfOrderExemplar(t *testing.T) {
}

func TestOutOfOrderHistogram(t *testing.T) {
buf, _, _, err := buildWriteRequest([]prompb.TimeSeries{{
buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}},
Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat(nil))},
}}, nil, nil, nil, nil)
Expand All @@ -158,7 +158,7 @@ func BenchmarkRemoteWritehandler(b *testing.B) {
reqs := []*http.Request{}
for i := 0; i < b.N; i++ {
num := strings.Repeat(strconv.Itoa(i), 16)
buf, _, _, err := buildWriteRequest([]prompb.TimeSeries{{
buf, _, _, err := buildWriteRequest(nil, []prompb.TimeSeries{{
Labels: []prompb.Label{
{Name: "__name__", Value: "test_metric"},
{Name: "test_label_name_" + num, Value: labelValue + num},
Expand All @@ -182,7 +182,7 @@ func BenchmarkRemoteWritehandler(b *testing.B) {
}

func TestCommitErr(t *testing.T) {
buf, _, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil, nil)
buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil)
require.NoError(t, err)

req, err := http.NewRequest("", "", bytes.NewReader(buf))
Expand Down Expand Up @@ -219,7 +219,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {

handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head())

buf, _, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil)
buf, _, _, err := buildWriteRequest(nil, genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil, nil)
require.NoError(b, err)

req, err := http.NewRequest("", "", bytes.NewReader(buf))
Expand All @@ -232,7 +232,7 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) {

var bufRequests [][]byte
for i := 0; i < 100; i++ {
buf, _, _, err = buildWriteRequest(genSeriesWithSample(1000, int64(80+i)*time.Minute.Milliseconds()), nil, nil, nil, nil)
buf, _, _, err = buildWriteRequest(nil, genSeriesWithSample(1000, int64(80+i)*time.Minute.Milliseconds()), nil, nil, nil, nil)
require.NoError(b, err)
bufRequests = append(bufRequests, buf)
}
Expand Down

0 comments on commit 0ecfa07

Please sign in to comment.