From 59863baa4c96850d7e7d710e8c6a5d6fd7fbe93f Mon Sep 17 00:00:00 2001 From: xiezhengyao Date: Mon, 19 Aug 2024 17:41:42 +0800 Subject: [PATCH] perf: use ReadableLen to reduce the number of calls to Next --- bufiox/defaultbuf.go | 4 ++ protocol/thrift/skipdecoder.go | 84 +++++++++++++++++++++++++++-- protocol/thrift/skipdecoder_test.go | 78 +++++++++++++++++++++++++++ 3 files changed, 161 insertions(+), 5 deletions(-) diff --git a/bufiox/defaultbuf.go b/bufiox/defaultbuf.go index 9e36ff4..ad2c8e8 100644 --- a/bufiox/defaultbuf.go +++ b/bufiox/defaultbuf.go @@ -174,6 +174,10 @@ func (r *DefaultReader) ReadLen() (n int) { return r.ri } +func (r *DefaultReader) ReadableLen() (n int) { + return len(r.buf) - r.ri +} + func (r *DefaultReader) ReadBinary(bs []byte) (m int, err error) { m = r.acquire(len(bs)) copy(bs, r.buf[r.ri:r.ri+m]) diff --git a/protocol/thrift/skipdecoder.go b/protocol/thrift/skipdecoder.go index b6eabd6..e82c55c 100644 --- a/protocol/thrift/skipdecoder.go +++ b/protocol/thrift/skipdecoder.go @@ -25,6 +25,11 @@ import ( "github.com/cloudwego/gopkg/bufiox" ) +// for batch getting data to reduce the number of calls to Next +type withReadableLen interface { + ReadableLen() (n int) +} + const defaultSkipDecoderSize = 4096 var poolSkipDecoder = sync.Pool{ @@ -35,7 +40,7 @@ var poolSkipDecoder = sync.Pool{ // SkipDecoder scans the underlying io.Reader and returns the bytes of a type type SkipDecoder struct { - r bufiox.Reader + sr skipReader bs []byte toRelease toReleaseBuffer @@ -44,7 +49,10 @@ type SkipDecoder struct { // NewSkipDecoder ... call Release if no longer use func NewSkipDecoder(r bufiox.Reader) *SkipDecoder { p := poolSkipDecoder.Get().(*SkipDecoder) - p.r = r + if wrl, ok := r.(withReadableLen); ok { + p.sr.wrl = wrl + } + p.sr.r = r return p } @@ -56,9 +64,21 @@ func (p *SkipDecoder) Release() { } func (p *SkipDecoder) next(n int) (buf []byte, err error) { - buf, err = p.r.Next(n) - if err != nil { - return + { + // manually inline the code of next() to reduce the overhead of function call. + sr := &p.sr + if sr.wrl == nil { + if buf, err = sr.r.Next(n); err != nil { + return + } + } else if len(sr.buf)-sr.ri > n { + buf = sr.buf[sr.ri : sr.ri+n] + sr.ri += n + } else { + if buf, err = sr.nextSlow(n); err != nil { + return + } + } } if cap(p.bs)-len(p.bs) < n { var ncap int @@ -80,6 +100,9 @@ func (p *SkipDecoder) Next(t TType) (buf []byte, err error) { if err = p.skip(t, defaultRecursionDepth); err != nil { return } + if err = p.sr.skip(); err != nil { + return + } buf = p.bs p.toRelease.append(p.bs) return @@ -169,6 +192,57 @@ func (p *SkipDecoder) skip(t TType, maxdepth int) error { return nil } +type skipReader struct { + r bufiox.Reader + wrl withReadableLen + + buf []byte + ri int +} + +func (r *skipReader) next(n int) (buf []byte, err error) { + if r.wrl == nil { + return r.r.Next(n) + } + if len(r.buf)-r.ri > n { + buf = r.buf[r.ri : r.ri+n] + r.ri += n + return + } + // slow path + return r.nextSlow(n) +} + +func (r *skipReader) nextSlow(n int) (buf []byte, err error) { + if err = r.skip(); err != nil { + return + } + rl := r.wrl.ReadableLen() + if rl > n { + r.buf, err = r.r.Peek(r.wrl.ReadableLen()) + if err != nil { + return + } + r.ri = 0 + buf = r.buf[r.ri : r.ri+n] + r.ri += n + return + } else { + return r.r.Next(n) + } +} + +func (r *skipReader) skip() (err error) { + if r.ri > 0 { + err = r.r.Skip(r.ri) + if err != nil { + return + } + } + r.buf, r.ri = nil, 0 + return nil +} + type toReleaseBuffer struct { idx int bs [16][]byte diff --git a/protocol/thrift/skipdecoder_test.go b/protocol/thrift/skipdecoder_test.go index e748a38..6e3eafa 100644 --- a/protocol/thrift/skipdecoder_test.go +++ b/protocol/thrift/skipdecoder_test.go @@ -17,6 +17,7 @@ package thrift import ( + "bytes" "strings" "testing" @@ -155,3 +156,80 @@ func TestSkipDecoder(t *testing.T) { require.Error(t, err) } } + +var mockString = make([]byte, 5000) + +func BenchmarkSkipDecoder(b *testing.B) { + // prepare data + bs := make([]byte, 0, 1024) + + // BOOL, fid=1 + bs = Binary.AppendFieldBegin(bs, BOOL, 1) + bs = Binary.AppendBool(bs, true) + + // BYTE, fid=2 + bs = Binary.AppendFieldBegin(bs, BYTE, 2) + bs = Binary.AppendByte(bs, 2) + + // I16, fid=3 + bs = Binary.AppendFieldBegin(bs, I16, 3) + bs = Binary.AppendI16(bs, 3) + + // I32, fid=4 + bs = Binary.AppendFieldBegin(bs, I32, 4) + bs = Binary.AppendI32(bs, 4) + + // I64, fid=5 + bs = Binary.AppendFieldBegin(bs, I64, 5) + bs = Binary.AppendI64(bs, 5) + + // DOUBLE, fid=6 + bs = Binary.AppendFieldBegin(bs, DOUBLE, 6) + bs = Binary.AppendDouble(bs, 6) + + // STRING, fid=7 + bs = Binary.AppendFieldBegin(bs, STRING, 7) + bs = Binary.AppendString(bs, string(mockString)) + + // MAP, fid=8 + bs = Binary.AppendFieldBegin(bs, MAP, 8) + bs = Binary.AppendMapBegin(bs, DOUBLE, DOUBLE, 1) + bs = Binary.AppendDouble(bs, 8.1) + bs = Binary.AppendDouble(bs, 8.2) + + // SET, fid=9 + bs = Binary.AppendFieldBegin(bs, SET, 9) + bs = Binary.AppendSetBegin(bs, I64, 1) + bs = Binary.AppendI64(bs, 9) + + // LIST, fid=10 + bs = Binary.AppendFieldBegin(bs, LIST, 10) + bs = Binary.AppendListBegin(bs, I64, 1) + bs = Binary.AppendI64(bs, 10) + + // STRUCT with 1 field I64, fid=11,1 + bs = Binary.AppendFieldBegin(bs, STRUCT, 11) + bs = Binary.AppendFieldBegin(bs, I64, 1) + bs = Binary.AppendI64(bs, 11) + bs = Binary.AppendFieldStop(bs) + + // Finish struct + bs = Binary.AppendFieldStop(bs) + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + bufReader := bufiox.NewBytesReader(bs) + sr := NewSkipDecoder(bufReader) + buf, err := sr.Next(STRUCT) + if err != nil { + b.Fatal(err) + } + if !bytes.Equal(buf, bs) { + b.Fatal("bytes not equal") + } + sr.Release() + bufReader.Release(nil) + } + }) +}