Skip to content

Commit

Permalink
Merge pull request #304 from go-faster/chore/use-bytebufferpool
Browse files Browse the repository at this point in the history
chore(prometheusremotewriter): use `bytebufferpool` to pool byte buffers
  • Loading branch information
ernado authored Jan 21, 2024
2 parents b411a2a + 6855177 commit 55aa8bc
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 50 deletions.
33 changes: 0 additions & 33 deletions internal/otelreceiver/prometheusremotewritereceiver/buffer.go

This file was deleted.

32 changes: 32 additions & 0 deletions internal/otelreceiver/prometheusremotewritereceiver/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package prometheusremotewritereceiver

import (
"bytes"
"sync"

"github.com/go-faster/oteldb/internal/prompb"
)

var writeRequestPool sync.Pool

func putWriteRequest(wr *prompb.WriteRequest) {
wr.Reset()
writeRequestPool.Put(wr)
}

func getWriteRequest() *prompb.WriteRequest {
v := writeRequestPool.Get()
if v == nil {
return &prompb.WriteRequest{}
}
return v.(*prompb.WriteRequest)
}

type closerReader struct {
data []byte
bytes.Reader
}

func (c *closerReader) Close() error {
return nil
}
19 changes: 2 additions & 17 deletions internal/otelreceiver/prometheusremotewritereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ func (rec *Receiver) Start(_ context.Context, host component.Host) error {
}

func snappyDecoder(body io.ReadCloser) (io.ReadCloser, error) {
compressed := getBuf()
defer putBuf(compressed)
compressed := bytebufferpool.Get()
defer bytebufferpool.Put(compressed)

if _, err := io.Copy(compressed, body); err != nil {
return nil, err
Expand Down Expand Up @@ -137,21 +137,6 @@ func decodeRequest(r io.Reader, bb *bytebufferpool.ByteBuffer, rw *prompb.WriteR
return rw.Unmarshal(bb.B)
}

func getWriteRequest() *prompb.WriteRequest {
v := writeRequestPool.Get()
if v == nil {
return &prompb.WriteRequest{}
}
return v.(*prompb.WriteRequest)
}

func putWriteRequest(wr *prompb.WriteRequest) {
wr.Reset()
writeRequestPool.Put(wr)
}

var writeRequestPool sync.Pool

func (rec *Receiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := rec.obsrecv.StartMetricsOp(r.Context())

Expand Down

0 comments on commit 55aa8bc

Please sign in to comment.