From 685517736637dedc4b3660390ded756742bf3a41 Mon Sep 17 00:00:00 2001 From: tdakkota Date: Fri, 19 Jan 2024 13:49:12 +0300 Subject: [PATCH] chore(prometheusremotewriter): use `bytebufferpool` to pool byte buffers --- .../prometheusremotewritereceiver/buffer.go | 33 ------------------- .../prometheusremotewritereceiver/pool.go | 32 ++++++++++++++++++ .../prometheusremotewritereceiver/receiver.go | 19 ++--------- 3 files changed, 34 insertions(+), 50 deletions(-) delete mode 100644 internal/otelreceiver/prometheusremotewritereceiver/buffer.go create mode 100644 internal/otelreceiver/prometheusremotewritereceiver/pool.go diff --git a/internal/otelreceiver/prometheusremotewritereceiver/buffer.go b/internal/otelreceiver/prometheusremotewritereceiver/buffer.go deleted file mode 100644 index fe1dbb8f..00000000 --- a/internal/otelreceiver/prometheusremotewritereceiver/buffer.go +++ /dev/null @@ -1,33 +0,0 @@ -package prometheusremotewritereceiver - -import ( - "bytes" - "sync" -) - -var bufPool = sync.Pool{ - New: func() any { - var buf bytes.Buffer - buf.Grow(32 * 1024) - return &buf - }, -} - -func getBuf() *bytes.Buffer { - buf := bufPool.Get() - return buf.(*bytes.Buffer) -} - -func putBuf(buf *bytes.Buffer) { - buf.Reset() - bufPool.Put(buf) -} - -type closerReader struct { - data []byte - bytes.Reader -} - -func (c *closerReader) Close() error { - return nil -} diff --git a/internal/otelreceiver/prometheusremotewritereceiver/pool.go b/internal/otelreceiver/prometheusremotewritereceiver/pool.go new file mode 100644 index 00000000..3d20795e --- /dev/null +++ b/internal/otelreceiver/prometheusremotewritereceiver/pool.go @@ -0,0 +1,32 @@ +package prometheusremotewritereceiver + +import ( + "bytes" + "sync" + + "github.com/go-faster/oteldb/internal/prompb" +) + +var writeRequestPool sync.Pool + +func putWriteRequest(wr *prompb.WriteRequest) { + wr.Reset() + writeRequestPool.Put(wr) +} + +func getWriteRequest() *prompb.WriteRequest { + v := writeRequestPool.Get() + if v == nil { + return &prompb.WriteRequest{} + } + return v.(*prompb.WriteRequest) +} + +type closerReader struct { + data []byte + bytes.Reader +} + +func (c *closerReader) Close() error { + return nil +} diff --git a/internal/otelreceiver/prometheusremotewritereceiver/receiver.go b/internal/otelreceiver/prometheusremotewritereceiver/receiver.go index 053d625a..ec21fdbf 100644 --- a/internal/otelreceiver/prometheusremotewritereceiver/receiver.go +++ b/internal/otelreceiver/prometheusremotewritereceiver/receiver.go @@ -106,8 +106,8 @@ func (rec *Receiver) Start(_ context.Context, host component.Host) error { } func snappyDecoder(body io.ReadCloser) (io.ReadCloser, error) { - compressed := getBuf() - defer putBuf(compressed) + compressed := bytebufferpool.Get() + defer bytebufferpool.Put(compressed) if _, err := io.Copy(compressed, body); err != nil { return nil, err @@ -137,21 +137,6 @@ func decodeRequest(r io.Reader, bb *bytebufferpool.ByteBuffer, rw *prompb.WriteR return rw.Unmarshal(bb.B) } -func getWriteRequest() *prompb.WriteRequest { - v := writeRequestPool.Get() - if v == nil { - return &prompb.WriteRequest{} - } - return v.(*prompb.WriteRequest) -} - -func putWriteRequest(wr *prompb.WriteRequest) { - wr.Reset() - writeRequestPool.Put(wr) -} - -var writeRequestPool sync.Pool - func (rec *Receiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := rec.obsrecv.StartMetricsOp(r.Context())