Skip to content

Commit

Permalink
perf: use ReadableLen to reduce the number of calls to Next
Browse files Browse the repository at this point in the history
  • Loading branch information
jayantxie committed Aug 19, 2024
1 parent 87c3e91 commit 59863ba
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 5 deletions.
4 changes: 4 additions & 0 deletions bufiox/defaultbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ func (r *DefaultReader) ReadLen() (n int) {
return r.ri
}

func (r *DefaultReader) ReadableLen() (n int) {
return len(r.buf) - r.ri
}

func (r *DefaultReader) ReadBinary(bs []byte) (m int, err error) {
m = r.acquire(len(bs))
copy(bs, r.buf[r.ri:r.ri+m])
Expand Down
84 changes: 79 additions & 5 deletions protocol/thrift/skipdecoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ import (
"github.com/cloudwego/gopkg/bufiox"
)

// for batch getting data to reduce the number of calls to Next
type withReadableLen interface {
ReadableLen() (n int)
}

const defaultSkipDecoderSize = 4096

var poolSkipDecoder = sync.Pool{
Expand All @@ -35,7 +40,7 @@ var poolSkipDecoder = sync.Pool{

// SkipDecoder scans the underlying io.Reader and returns the bytes of a type
type SkipDecoder struct {
r bufiox.Reader
sr skipReader
bs []byte

toRelease toReleaseBuffer
Expand All @@ -44,7 +49,10 @@ type SkipDecoder struct {
// NewSkipDecoder ... call Release if no longer use
func NewSkipDecoder(r bufiox.Reader) *SkipDecoder {
p := poolSkipDecoder.Get().(*SkipDecoder)
p.r = r
if wrl, ok := r.(withReadableLen); ok {
p.sr.wrl = wrl
}
p.sr.r = r
return p
}

Expand All @@ -56,9 +64,21 @@ func (p *SkipDecoder) Release() {
}

func (p *SkipDecoder) next(n int) (buf []byte, err error) {
buf, err = p.r.Next(n)
if err != nil {
return
{
// manually inline the code of next() to reduce the overhead of function call.
sr := &p.sr
if sr.wrl == nil {
if buf, err = sr.r.Next(n); err != nil {
return
}
} else if len(sr.buf)-sr.ri > n {
buf = sr.buf[sr.ri : sr.ri+n]
sr.ri += n
} else {
if buf, err = sr.nextSlow(n); err != nil {
return
}
}
}
if cap(p.bs)-len(p.bs) < n {
var ncap int
Expand All @@ -80,6 +100,9 @@ func (p *SkipDecoder) Next(t TType) (buf []byte, err error) {
if err = p.skip(t, defaultRecursionDepth); err != nil {
return
}
if err = p.sr.skip(); err != nil {
return
}
buf = p.bs
p.toRelease.append(p.bs)
return
Expand Down Expand Up @@ -169,6 +192,57 @@ func (p *SkipDecoder) skip(t TType, maxdepth int) error {
return nil
}

type skipReader struct {
r bufiox.Reader
wrl withReadableLen

buf []byte
ri int
}

func (r *skipReader) next(n int) (buf []byte, err error) {

Check failure on line 203 in protocol/thrift/skipdecoder.go

View workflow job for this annotation

GitHub Actions / lint

func `(*skipReader).next` is unused (unused)
if r.wrl == nil {
return r.r.Next(n)
}
if len(r.buf)-r.ri > n {
buf = r.buf[r.ri : r.ri+n]
r.ri += n
return
}
// slow path
return r.nextSlow(n)
}

func (r *skipReader) nextSlow(n int) (buf []byte, err error) {
if err = r.skip(); err != nil {
return
}
rl := r.wrl.ReadableLen()
if rl > n {
r.buf, err = r.r.Peek(r.wrl.ReadableLen())
if err != nil {
return
}
r.ri = 0
buf = r.buf[r.ri : r.ri+n]
r.ri += n
return
} else {
return r.r.Next(n)
}
}

func (r *skipReader) skip() (err error) {
if r.ri > 0 {
err = r.r.Skip(r.ri)
if err != nil {
return
}
}
r.buf, r.ri = nil, 0
return nil
}

type toReleaseBuffer struct {
idx int
bs [16][]byte
Expand Down
78 changes: 78 additions & 0 deletions protocol/thrift/skipdecoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package thrift

import (
"bytes"
"strings"
"testing"

Expand Down Expand Up @@ -155,3 +156,80 @@ func TestSkipDecoder(t *testing.T) {
require.Error(t, err)
}
}

var mockString = make([]byte, 5000)

func BenchmarkSkipDecoder(b *testing.B) {
// prepare data
bs := make([]byte, 0, 1024)

// BOOL, fid=1
bs = Binary.AppendFieldBegin(bs, BOOL, 1)
bs = Binary.AppendBool(bs, true)

// BYTE, fid=2
bs = Binary.AppendFieldBegin(bs, BYTE, 2)
bs = Binary.AppendByte(bs, 2)

// I16, fid=3
bs = Binary.AppendFieldBegin(bs, I16, 3)
bs = Binary.AppendI16(bs, 3)

// I32, fid=4
bs = Binary.AppendFieldBegin(bs, I32, 4)
bs = Binary.AppendI32(bs, 4)

// I64, fid=5
bs = Binary.AppendFieldBegin(bs, I64, 5)
bs = Binary.AppendI64(bs, 5)

// DOUBLE, fid=6
bs = Binary.AppendFieldBegin(bs, DOUBLE, 6)
bs = Binary.AppendDouble(bs, 6)

// STRING, fid=7
bs = Binary.AppendFieldBegin(bs, STRING, 7)
bs = Binary.AppendString(bs, string(mockString))

// MAP, fid=8
bs = Binary.AppendFieldBegin(bs, MAP, 8)
bs = Binary.AppendMapBegin(bs, DOUBLE, DOUBLE, 1)
bs = Binary.AppendDouble(bs, 8.1)
bs = Binary.AppendDouble(bs, 8.2)

// SET, fid=9
bs = Binary.AppendFieldBegin(bs, SET, 9)
bs = Binary.AppendSetBegin(bs, I64, 1)
bs = Binary.AppendI64(bs, 9)

// LIST, fid=10
bs = Binary.AppendFieldBegin(bs, LIST, 10)
bs = Binary.AppendListBegin(bs, I64, 1)
bs = Binary.AppendI64(bs, 10)

// STRUCT with 1 field I64, fid=11,1
bs = Binary.AppendFieldBegin(bs, STRUCT, 11)
bs = Binary.AppendFieldBegin(bs, I64, 1)
bs = Binary.AppendI64(bs, 11)
bs = Binary.AppendFieldStop(bs)

// Finish struct
bs = Binary.AppendFieldStop(bs)

b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
bufReader := bufiox.NewBytesReader(bs)
sr := NewSkipDecoder(bufReader)
buf, err := sr.Next(STRUCT)
if err != nil {
b.Fatal(err)
}
if !bytes.Equal(buf, bs) {
b.Fatal("bytes not equal")
}
sr.Release()
bufReader.Release(nil)
}
})
}

0 comments on commit 59863ba

Please sign in to comment.