diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go index 0f835469..e1d3c062 100644 --- a/pkg/kgo/consumer.go +++ b/pkg/kgo/consumer.go @@ -162,6 +162,7 @@ func (o Offset) At(at int64) Offset { type consumer struct { bufferedRecords atomicI64 + bufferedBytes atomicI64 cl *Client @@ -285,6 +286,13 @@ func (cl *Client) BufferedFetchRecords() int64 { return cl.consumer.bufferedRecords.Load() } +// BufferedFetchBytes returns the number of bytes currently buffered from +// fetching within the client. This is the sum of all keys, values, and header +// keys/values. See the related [BufferedFetchRecords] for more information. +func (cl *Client) BufferedFetchBytes() int64 { + return cl.consumer.bufferedBytes.Load() +} + type usedCursors map[*cursor]struct{} func (u *usedCursors) use(c *cursor) { diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go index e57fb796..b578f113 100644 --- a/pkg/kgo/producer.go +++ b/pkg/kgo/producer.go @@ -88,6 +88,13 @@ func (cl *Client) BufferedProduceRecords() int64 { return cl.producer.bufferedRecords.Load() } +// BufferedProduceBytes returns the number of bytes currently buffered for +// producing within the client. This is the sum of all keys, values, and header +// keys/values. See the related [BufferedProduceRecords] for more information. +func (cl *Client) BufferedProduceBytes() int64 { + return cl.producer.bufferedBytes.Load() +} + type unknownTopicProduces struct { buffered []promisedRec wait chan error // retryable errors diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 9b5f115c..9db56dcd 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -323,16 +323,23 @@ func (s *source) hook(f *Fetch, buffered, polled bool) { }) var nrecs int + var nbytes int64 for i := range f.Topics { t := &f.Topics[i] for j := range t.Partitions { - nrecs += len(t.Partitions[j].Records) + p := &t.Partitions[j] + nrecs += len(p.Records) + for k := range p.Records { + nbytes += p.Records[k].userSize() + } } } if buffered { s.cl.consumer.bufferedRecords.Add(int64(nrecs)) + s.cl.consumer.bufferedBytes.Add(nbytes) } else { s.cl.consumer.bufferedRecords.Add(-int64(nrecs)) + s.cl.consumer.bufferedBytes.Add(-nbytes) } }