diff --git a/bufiox/defaultbuf.go b/bufiox/defaultbuf.go index 9e36ff4..ad2c8e8 100644 --- a/bufiox/defaultbuf.go +++ b/bufiox/defaultbuf.go @@ -174,6 +174,10 @@ func (r *DefaultReader) ReadLen() (n int) { return r.ri } +func (r *DefaultReader) ReadableLen() (n int) { + return len(r.buf) - r.ri +} + func (r *DefaultReader) ReadBinary(bs []byte) (m int, err error) { m = r.acquire(len(bs)) copy(bs, r.buf[r.ri:r.ri+m]) diff --git a/protocol/thrift/skipdecoder.go b/protocol/thrift/skipdecoder.go index b6eabd6..fc92fd9 100644 --- a/protocol/thrift/skipdecoder.go +++ b/protocol/thrift/skipdecoder.go @@ -25,6 +25,11 @@ import ( "github.com/cloudwego/gopkg/bufiox" ) +// for batch getting data to reduce the number of calls to Next +type withReadableLen interface { + ReadableLen() (n int) +} + const defaultSkipDecoderSize = 4096 var poolSkipDecoder = sync.Pool{ @@ -35,7 +40,7 @@ var poolSkipDecoder = sync.Pool{ // SkipDecoder scans the underlying io.Reader and returns the bytes of a type type SkipDecoder struct { - r bufiox.Reader + sr skipReader bs []byte toRelease toReleaseBuffer @@ -44,7 +49,10 @@ type SkipDecoder struct { // NewSkipDecoder ... call Release if no longer use func NewSkipDecoder(r bufiox.Reader) *SkipDecoder { p := poolSkipDecoder.Get().(*SkipDecoder) - p.r = r + if wrl, ok := r.(withReadableLen); ok { + p.sr.wrl = wrl + } + p.sr.r = r return p } @@ -56,9 +64,21 @@ func (p *SkipDecoder) Release() { } func (p *SkipDecoder) next(n int) (buf []byte, err error) { - buf, err = p.r.Next(n) - if err != nil { - return + { + // manually inline the code of next() to reduce the overhead of function call. + sr := &p.sr + if sr.wrl == nil { + if buf, err = sr.r.Next(n); err != nil { + return + } + } else if len(sr.buf)-sr.ri > n { + buf = sr.buf[sr.ri : sr.ri+n] + sr.ri += n + } else { + if buf, err = sr.nextSlow(n); err != nil { + return + } + } } if cap(p.bs)-len(p.bs) < n { var ncap int @@ -80,6 +100,9 @@ func (p *SkipDecoder) Next(t TType) (buf []byte, err error) { if err = p.skip(t, defaultRecursionDepth); err != nil { return } + if err = p.sr.skip(); err != nil { + return + } buf = p.bs p.toRelease.append(p.bs) return @@ -169,6 +192,62 @@ func (p *SkipDecoder) skip(t TType, maxdepth int) error { return nil } +type skipReader struct { + r bufiox.Reader + wrl withReadableLen + + buf []byte + ri int +} + +/* +func (sr *skipReader) next(n int) (buf []byte, err error) { + if sr.wrl == nil { + if buf, err = sr.r.Next(n); err != nil { + return + } + } else if len(sr.buf)-sr.ri > n { + buf = sr.buf[sr.ri : sr.ri+n] + sr.ri += n + } else { + if buf, err = sr.nextSlow(n); err != nil { + return + } + } + return +} +*/ + +func (sr *skipReader) nextSlow(n int) (buf []byte, err error) { + if err = sr.skip(); err != nil { + return + } + rl := sr.wrl.ReadableLen() + if rl > n { + sr.buf, err = sr.r.Peek(sr.wrl.ReadableLen()) + if err != nil { + return + } + sr.ri = 0 + buf = sr.buf[sr.ri : sr.ri+n] + sr.ri += n + return + } else { + return sr.r.Next(n) + } +} + +func (sr *skipReader) skip() (err error) { + if sr.ri > 0 { + err = sr.r.Skip(sr.ri) + if err != nil { + return + } + } + sr.buf, sr.ri = nil, 0 + return nil +} + type toReleaseBuffer struct { idx int bs [16][]byte diff --git a/protocol/thrift/skipdecoder_test.go b/protocol/thrift/skipdecoder_test.go index e748a38..6e3eafa 100644 --- a/protocol/thrift/skipdecoder_test.go +++ b/protocol/thrift/skipdecoder_test.go @@ -17,6 +17,7 @@ package thrift import ( + "bytes" "strings" "testing" @@ -155,3 +156,80 @@ func TestSkipDecoder(t *testing.T) { require.Error(t, err) } } + +var mockString = make([]byte, 5000) + +func BenchmarkSkipDecoder(b *testing.B) { + // prepare data + bs := make([]byte, 0, 1024) + + // BOOL, fid=1 + bs = Binary.AppendFieldBegin(bs, BOOL, 1) + bs = Binary.AppendBool(bs, true) + + // BYTE, fid=2 + bs = Binary.AppendFieldBegin(bs, BYTE, 2) + bs = Binary.AppendByte(bs, 2) + + // I16, fid=3 + bs = Binary.AppendFieldBegin(bs, I16, 3) + bs = Binary.AppendI16(bs, 3) + + // I32, fid=4 + bs = Binary.AppendFieldBegin(bs, I32, 4) + bs = Binary.AppendI32(bs, 4) + + // I64, fid=5 + bs = Binary.AppendFieldBegin(bs, I64, 5) + bs = Binary.AppendI64(bs, 5) + + // DOUBLE, fid=6 + bs = Binary.AppendFieldBegin(bs, DOUBLE, 6) + bs = Binary.AppendDouble(bs, 6) + + // STRING, fid=7 + bs = Binary.AppendFieldBegin(bs, STRING, 7) + bs = Binary.AppendString(bs, string(mockString)) + + // MAP, fid=8 + bs = Binary.AppendFieldBegin(bs, MAP, 8) + bs = Binary.AppendMapBegin(bs, DOUBLE, DOUBLE, 1) + bs = Binary.AppendDouble(bs, 8.1) + bs = Binary.AppendDouble(bs, 8.2) + + // SET, fid=9 + bs = Binary.AppendFieldBegin(bs, SET, 9) + bs = Binary.AppendSetBegin(bs, I64, 1) + bs = Binary.AppendI64(bs, 9) + + // LIST, fid=10 + bs = Binary.AppendFieldBegin(bs, LIST, 10) + bs = Binary.AppendListBegin(bs, I64, 1) + bs = Binary.AppendI64(bs, 10) + + // STRUCT with 1 field I64, fid=11,1 + bs = Binary.AppendFieldBegin(bs, STRUCT, 11) + bs = Binary.AppendFieldBegin(bs, I64, 1) + bs = Binary.AppendI64(bs, 11) + bs = Binary.AppendFieldStop(bs) + + // Finish struct + bs = Binary.AppendFieldStop(bs) + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + bufReader := bufiox.NewBytesReader(bs) + sr := NewSkipDecoder(bufReader) + buf, err := sr.Next(STRUCT) + if err != nil { + b.Fatal(err) + } + if !bytes.Equal(buf, bs) { + b.Fatal("bytes not equal") + } + sr.Release() + bufReader.Release(nil) + } + }) +}