Skip to content

Commit

Permalink
Fix fake minimum capacity (#1982)
Browse files Browse the repository at this point in the history
* stash it

* remove comment
  • Loading branch information
mattdurham authored Oct 29, 2024
1 parent 2750025 commit b72443f
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 18 deletions.
7 changes: 4 additions & 3 deletions internal/component/prometheus/write/queue/e2e_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,9 @@ func TestMetrics(t *testing.T) {
},
},
},
// exemplar, note that once it hits the appender exemplars are treated the same as series.
{
// TURNING OFF EXEMPLAR TESTS until underlying issue is resolved.
//exemplar, note that once it hits the appender exemplars are treated the same as series.
/*{
name: "exemplar success",
returnStatusCode: http.StatusOK,
dtype: Exemplar,
Expand Down Expand Up @@ -521,7 +522,7 @@ func TestMetrics(t *testing.T) {
valueFunc: isReasonableTimeStamp,
},
},
},
},*/
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion internal/component/prometheus/write/queue/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewEndpoint(client types.NetworkClient, serializer types.Serializer, ttl ti
serializer: serializer,
log: logger,
ttl: ttl,
incoming: actor.NewMailbox[types.DataHandle](actor.OptCapacity(1)),
incoming: actor.NewMailbox[types.DataHandle](),
buf: make([]byte, 0, 1024),
}
}
Expand Down
16 changes: 10 additions & 6 deletions internal/component/prometheus/write/queue/fanout.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,17 @@ func (f fanout) Rollback() error {
}

func (f fanout) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
for _, child := range f.children {
_, err := child.AppendExemplar(ref, l, e)
if err != nil {
return ref, err
}
}
// Exemplars are disabled due to https://github.com/grafana/alloy/issues/1915
return ref, nil
/*
for _, child := range f.children {
_, err := child.AppendExemplar(ref, l, e)
if err != nil {
return ref, err
}
}
return ref, nil
*/
}

func (f fanout) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
Expand Down
2 changes: 1 addition & 1 deletion internal/component/prometheus/write/queue/network/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func newLoop(cc types.ConnectionConfig, isMetaData bool, l log.Logger, stats fun
return &loop{
isMeta: isMetaData,
// In general we want a healthy queue of items, in this case we want to have 2x our maximum send sized ready.
seriesMbx: actor.NewMailbox[*types.TimeSeriesBinary](actor.OptCapacity(2 * cc.BatchCount)),
seriesMbx: actor.NewMailbox[*types.TimeSeriesBinary](actor.OptCapacity(2*cc.BatchCount), actor.OptAsChan()),
client: &http.Client{},
cfg: cc,
log: log.With(l, "name", "loop", "url", cc.URL),
Expand Down
8 changes: 4 additions & 4 deletions internal/component/prometheus/write/queue/network/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ func New(cc types.ConnectionConfig, logger log.Logger, seriesStats, metadataStat
loops: make([]*loop, 0, cc.Connections),
logger: logger,
// This provides blocking to only handle one at a time, so that if a queue blocks
// it will stop the filequeue from feeding more.
inbox: actor.NewMailbox[*types.TimeSeriesBinary](actor.OptCapacity(1)),
metaInbox: actor.NewMailbox[*types.TimeSeriesBinary](actor.OptCapacity(1)),
// it will stop the filequeue from feeding more. Without OptAsChan the minimum capacity is actually a 64-item buffer.
inbox: actor.NewMailbox[*types.TimeSeriesBinary](actor.OptCapacity(1), actor.OptAsChan()),
metaInbox: actor.NewMailbox[*types.TimeSeriesBinary](actor.OptCapacity(1), actor.OptAsChan()),
configInbox: actor.NewMailbox[configCallback](),
stats: seriesStats,
metaStats: metadataStats,
Expand Down Expand Up @@ -126,8 +126,8 @@ func (s *manager) DoWork(ctx actor.Context) actor.WorkerStatus {
level.Error(s.logger).Log("msg", "failed to send to metadata loop", "err", err)
}
return actor.WorkerContinue
// We need to also check the config here, else its possible this will deadlock.
case cfg, ok := <-s.configInbox.ReceiveC():
// We need to also check the config here, else its possible this will deadlock.
if !ok {
level.Debug(s.logger).Log("msg", "config inbox closed")
return actor.WorkerEnd
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,8 @@ func TestRetry(t *testing.T) {
require.NoError(t, err)
wr.Start()
defer wr.Stop()
send(t, wr, ctx)

for i := 0; i < 10; i++ {
send(t, wr, ctx)
}
require.Eventually(t, func() bool {
done := retries.Load() > 5
return done
Expand Down
2 changes: 2 additions & 0 deletions internal/component/prometheus/write/queue/types/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
type NetworkClient interface {
Start()
Stop()
// SendSeries will block if the network caches are full.
SendSeries(ctx context.Context, d *TimeSeriesBinary) error
// SendMetadata will block if the network caches are full.
SendMetadata(ctx context.Context, d *TimeSeriesBinary) error
// UpdateConfig is a synchronous call and will only return once the config
// is applied or an error occurs.
Expand Down

0 comments on commit b72443f

Please sign in to comment.