-
Notifications
You must be signed in to change notification settings - Fork 238
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding the serialization features. #1666
Changes from all commits
0b8e976
175aafb
57e6ddd
55fe162
843ef50
c359236
6da1198
adb047a
58f2385
2fe259f
dd55897
9331c64
622dbc1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
package serialization | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/grafana/alloy/internal/component/prometheus/remote/queue/types" | ||
"github.com/prometheus/prometheus/model/exemplar" | ||
"github.com/prometheus/prometheus/model/histogram" | ||
"github.com/prometheus/prometheus/model/labels" | ||
"github.com/prometheus/prometheus/model/metadata" | ||
"github.com/prometheus/prometheus/storage" | ||
) | ||
|
||
type appender struct { | ||
ctx context.Context | ||
ttl time.Duration | ||
s types.Serializer | ||
logger log.Logger | ||
} | ||
|
||
func (a *appender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels, t, ct int64) (storage.SeriesRef, error) { | ||
// TODO @mattdurham figure out what to do here later. This mirrors what we do elsewhere. | ||
mattdurham marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return ref, nil | ||
} | ||
|
||
// NewAppender returns an Appender that writes to a given serializer. NOTE the returned Appender writes | ||
// data immediately, discards data older than `ttl` and does not honor commit or rollback. | ||
func NewAppender(ctx context.Context, ttl time.Duration, s types.Serializer, logger log.Logger) storage.Appender { | ||
app := &appender{ | ||
ttl: ttl, | ||
s: s, | ||
logger: logger, | ||
ctx: ctx, | ||
} | ||
return app | ||
} | ||
|
||
// Append metric | ||
func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { | ||
// Check to see if the TTL has expired for this record. | ||
endTime := time.Now().Unix() - int64(a.ttl.Seconds()) | ||
if t < endTime { | ||
return ref, nil | ||
} | ||
ts := types.GetTimeSeriesFromPool() | ||
ts.Labels = l | ||
ts.TS = t | ||
ts.Value = v | ||
ts.Hash = l.Hash() | ||
err := a.s.SendSeries(a.ctx, ts) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it guaranteed that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should be a required that all time series are returned. Though not in this PR this is checked in a future test via |
||
return ref, err | ||
} | ||
|
||
// Commit is a no op since we always write. | ||
func (a *appender) Commit() (_ error) { | ||
return nil | ||
} | ||
|
||
// Rollback is a no op since we write all the data. | ||
func (a *appender) Rollback() error { | ||
return nil | ||
} | ||
|
||
// AppendExemplar appends exemplar to cache. The passed in labels is unused, instead use the labels on the exemplar. | ||
func (a *appender) AppendExemplar(ref storage.SeriesRef, _ labels.Labels, e exemplar.Exemplar) (_ storage.SeriesRef, _ error) { | ||
endTime := time.Now().Unix() - int64(a.ttl.Seconds()) | ||
if e.HasTs && e.Ts < endTime { | ||
return ref, nil | ||
} | ||
ts := types.GetTimeSeriesFromPool() | ||
ts.Hash = e.Labels.Hash() | ||
ts.TS = e.Ts | ||
ts.Labels = e.Labels | ||
ts.Hash = e.Labels.Hash() | ||
err := a.s.SendSeries(a.ctx, ts) | ||
return ref, err | ||
} | ||
|
||
// AppendHistogram appends histogram | ||
func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (_ storage.SeriesRef, _ error) { | ||
endTime := time.Now().Unix() - int64(a.ttl.Seconds()) | ||
if t < endTime { | ||
return ref, nil | ||
} | ||
ts := types.GetTimeSeriesFromPool() | ||
ts.Labels = l | ||
ts.TS = t | ||
if h != nil { | ||
ts.FromHistogram(t, h) | ||
} else { | ||
ts.FromFloatHistogram(t, fh) | ||
} | ||
ts.Hash = l.Hash() | ||
err := a.s.SendSeries(a.ctx, ts) | ||
return ref, err | ||
} | ||
|
||
// UpdateMetadata updates metadata. | ||
func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (_ storage.SeriesRef, _ error) { | ||
ts := types.GetTimeSeriesFromPool() | ||
// We are going to handle converting some strings to hopefully not reused label names. TimeSeriesBinary has a lot of work | ||
// to ensure its efficient it makes sense to encode metadata into it. | ||
combinedLabels := l.Copy() | ||
combinedLabels = append(combinedLabels, labels.Label{ | ||
Name: "__alloy_metadata_type__", | ||
Value: string(m.Type), | ||
}) | ||
combinedLabels = append(combinedLabels, labels.Label{ | ||
Name: "__alloy_metadata_help__", | ||
Value: m.Help, | ||
}) | ||
combinedLabels = append(combinedLabels, labels.Label{ | ||
Name: "__alloy_metadata_unit__", | ||
Value: m.Unit, | ||
Comment on lines
+107
to
+116
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are used in more than one place, can we make them constants? Helps find usages and generally nice practice IMO. |
||
}) | ||
ts.Labels = combinedLabels | ||
err := a.s.SendMetadata(a.ctx, ts) | ||
return ref, err | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
package serialization | ||
|
||
import ( | ||
"context" | ||
log2 "github.com/go-kit/log" | ||
"github.com/grafana/alloy/internal/component/prometheus/remote/queue/types" | ||
"github.com/prometheus/prometheus/model/labels" | ||
"github.com/stretchr/testify/require" | ||
"testing" | ||
"time" | ||
) | ||
|
||
func TestAppenderTTL(t *testing.T) { | ||
fake := &counterSerializer{} | ||
l := log2.NewNopLogger() | ||
|
||
app := NewAppender(context.Background(), 1*time.Minute, fake, l) | ||
_, err := app.Append(0, labels.FromStrings("one", "two"), time.Now().Unix(), 0) | ||
require.NoError(t, err) | ||
|
||
for i := 0; i < 10; i++ { | ||
_, err = app.Append(0, labels.FromStrings("one", "two"), time.Now().Add(-5*time.Minute).Unix(), 0) | ||
require.NoError(t, err) | ||
} | ||
// Only one record should make it through. | ||
require.True(t, fake.received == 1) | ||
} | ||
|
||
var _ types.Serializer = (*fakeSerializer)(nil) | ||
|
||
type counterSerializer struct { | ||
received int | ||
} | ||
|
||
func (f *counterSerializer) Start() { | ||
|
||
} | ||
|
||
func (f *counterSerializer) Stop() { | ||
|
||
} | ||
|
||
func (f *counterSerializer) SendSeries(ctx context.Context, data *types.TimeSeriesBinary) error { | ||
f.received++ | ||
return nil | ||
|
||
} | ||
|
||
func (f *counterSerializer) SendMetadata(ctx context.Context, data *types.TimeSeriesBinary) error { | ||
return nil | ||
} | ||
|
||
func (f *counterSerializer) UpdateConfig(ctx context.Context, data types.SerializerConfig) error { | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd be running these tests twice, second time without
-race
- I don't see the reason why, is that an accident?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is one test that will not be ran twice since I am accessing the var directly to test its value. The others will be ran, I could add the //go:build race to the others. Note most of our exclusions above have some tests that run twice.