From 554974a28d4ea7e9e938e5069afdd0c8c2a6bbda Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sat, 16 Sep 2023 18:16:37 +0100 Subject: [PATCH] kgo: add Buffered{Fetch,Produce}Bytes This slows down fetching a little bit. If it is egregious, we can fix the perf by tracking the size buffered when processing the fetch itself, and then adding a new field to batch-untrack the size. That's left as an exercise for a person that cares. This is now done since the prior commit introduces buffered produce bytes, and we may as well add it while fetching for both-sizes consistency, and we may as well expose it. --- pkg/kgo/consumer.go | 8 ++++++++ pkg/kgo/producer.go | 7 +++++++ pkg/kgo/source.go | 9 ++++++++- 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 0f835469..e1d3c062 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -162,6 +162,7 @@ func (o Offset) At(at int64) Offset { type consumer struct { bufferedRecords atomicI64 + bufferedBytes atomicI64 cl *Client @@ -285,6 +286,13 @@ func (cl *Client) BufferedFetchRecords() int64 { return cl.consumer.bufferedRecords.Load() } +// BufferedFetchBytes returns the number of bytes currently buffered from +// fetching within the client. This is the sum of all keys, values, and header +// keys/values. See the related [BufferedFetchRecords] for more information. +func (cl *Client) BufferedFetchBytes() int64 { + return cl.consumer.bufferedBytes.Load() +} + type usedCursors map[*cursor]struct{} func (u *usedCursors) use(c *cursor) { diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index c91fa42d..5730e9f1 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -88,6 +88,13 @@ func (cl *Client) BufferedProduceRecords() int64 { return cl.producer.bufferedRecords.Load() } +// BufferedProduceBytes returns the number of bytes currently buffered for +// producing within the client. This is the sum of all keys, values, and header +// keys/values. See the related [BufferedProduceRecords] for more information. +func (cl *Client) BufferedProduceBytes() int64 { + return cl.producer.bufferedBytes.Load() +} + type unknownTopicProduces struct { buffered []promisedRec wait chan error // retryable errors diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 9b5f115c..9db56dcd 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -323,16 +323,23 @@ func (s *source) hook(f *Fetch, buffered, polled bool) { }) var nrecs int + var nbytes int64 for i := range f.Topics { t := &f.Topics[i] for j := range t.Partitions { - nrecs += len(t.Partitions[j].Records) + p := &t.Partitions[j] + nrecs += len(p.Records) + for k := range p.Records { + nbytes += p.Records[k].userSize() + } } } if buffered { s.cl.consumer.bufferedRecords.Add(int64(nrecs)) + s.cl.consumer.bufferedBytes.Add(nbytes) } else { s.cl.consumer.bufferedRecords.Add(-int64(nrecs)) + s.cl.consumer.bufferedBytes.Add(-nbytes) } }