From cf7eda26042a879f89f946e41fbae48212692482 Mon Sep 17 00:00:00 2001 From: xiezhengyao Date: Thu, 15 Aug 2024 16:43:59 +0800 Subject: [PATCH] optimize: skip decoder --- protocol/thrift/skipdecoder.go | 64 +++++++++++++++------------------- protocol/thrift/skipreader.go | 6 ---- 2 files changed, 29 insertions(+), 41 deletions(-) diff --git a/protocol/thrift/skipdecoder.go b/protocol/thrift/skipdecoder.go index 3e5f1c4..15375c3 100644 --- a/protocol/thrift/skipdecoder.go +++ b/protocol/thrift/skipdecoder.go @@ -19,8 +19,9 @@ package thrift import ( "encoding/binary" "fmt" - "io" "sync" + + "github.com/cloudwego/gopkg/bufiox" ) var poolSkipDecoder = sync.Pool{ @@ -31,53 +32,46 @@ var poolSkipDecoder = sync.Pool{ // SkipDecoder scans the underlying io.Reader and returns the bytes of a type type SkipDecoder struct { - p skipReaderIface + p bufiox.Reader + offset int } // NewSkipDecoder ... call Release if no longer use -func NewSkipDecoder(r io.Reader) *SkipDecoder { +func NewSkipDecoder(r bufiox.Reader) *SkipDecoder { p := poolSkipDecoder.Get().(*SkipDecoder) p.Reset(r) return p } // Reset ... -func (p *SkipDecoder) Reset(r io.Reader) { - // fast path without returning to pool if remote.ByteBuffer && *skipByteBuffer - if buf, ok := r.(remoteByteBuffer); ok { - if p.p != nil { - r, ok := p.p.(*skipByteBuffer) - if ok { - r.Reset(buf) - return - } - p.p.Release() - } - p.p = newSkipByteBuffer(buf) - return - } - - // not remote.ByteBuffer - - if p.p != nil { - p.p.Release() - } - p.p = newSkipReader(r) +func (p *SkipDecoder) Reset(r bufiox.Reader) { + p.p = r + p.offset = 0 } // Release ... func (p *SkipDecoder) Release() { - p.p.Release() + p.p.Release(nil) p.p = nil + p.offset = 0 poolSkipDecoder.Put(p) } +func (p *SkipDecoder) Peek(n int) (buf []byte, err error) { + buf, err = p.p.Peek(n) + if err != nil { + return + } + p.offset += n + return +} + // Next skips a specific type and returns its bytes func (p *SkipDecoder) Next(t TType) (buf []byte, err error) { if err := p.skip(t, defaultRecursionDepth); err != nil { return nil, err } - return p.p.Bytes() + return p.p.Next(p.offset) } func (p *SkipDecoder) skip(t TType, maxdepth int) error { @@ -85,12 +79,12 @@ func (p *SkipDecoder) skip(t TType, maxdepth int) error { return errDepthLimitExceeded } if sz := typeToSize[t]; sz > 0 { - _, err := p.p.Next(int(sz)) + _, err := p.Peek(int(sz)) return err } switch t { case STRING: - b, err := p.p.Next(4) + b, err := p.Peek(4) if err != nil { return err } @@ -98,12 +92,12 @@ func (p *SkipDecoder) skip(t TType, maxdepth int) error { if sz < 0 { return errNegativeSize } - if _, err := p.p.Next(sz); err != nil { + if _, err := p.Peek(sz); err != nil { return err } case STRUCT: for { - b, err := p.p.Next(1) // TType + b, err := p.Peek(1) // TType if err != nil { return err } @@ -111,7 +105,7 @@ func (p *SkipDecoder) skip(t TType, maxdepth int) error { if tp == STOP { break } - if _, err := p.p.Next(2); err != nil { // Field ID + if _, err := p.Peek(2); err != nil { // Field ID return err } if err := p.skip(tp, maxdepth-1); err != nil { @@ -119,7 +113,7 @@ func (p *SkipDecoder) skip(t TType, maxdepth int) error { } } case MAP: - b, err := p.p.Next(6) // 1 byte key TType, 1 byte value TType, 4 bytes Len + b, err := p.Peek(6) // 1 byte key TType, 1 byte value TType, 4 bytes Len if err != nil { return err } @@ -129,7 +123,7 @@ func (p *SkipDecoder) skip(t TType, maxdepth int) error { } ksz, vsz := int(typeToSize[kt]), int(typeToSize[vt]) if ksz > 0 && vsz > 0 { - _, err := p.p.Next(int(sz) * (ksz + vsz)) + _, err := p.Peek(int(sz) * (ksz + vsz)) return err } for i := int32(0); i < sz; i++ { @@ -141,7 +135,7 @@ func (p *SkipDecoder) skip(t TType, maxdepth int) error { } } case SET, LIST: - b, err := p.p.Next(5) // 1 byte value type, 4 bytes Len + b, err := p.Peek(5) // 1 byte value type, 4 bytes Len if err != nil { return err } @@ -150,7 +144,7 @@ func (p *SkipDecoder) skip(t TType, maxdepth int) error { return errNegativeSize } if vsz := typeToSize[vt]; vsz > 0 { - _, err := p.p.Next(int(sz) * int(vsz)) + _, err := p.Peek(int(sz) * int(vsz)) return err } for i := int32(0); i < sz; i++ { diff --git a/protocol/thrift/skipreader.go b/protocol/thrift/skipreader.go index eb04580..6f9754d 100644 --- a/protocol/thrift/skipreader.go +++ b/protocol/thrift/skipreader.go @@ -23,12 +23,6 @@ import ( // this file contains readers for SkipDecoder -type skipReaderIface interface { - Next(n int) (buf []byte, err error) - Bytes() (buf []byte, err error) - Release() -} - var poolSkipReader = sync.Pool{ New: func() interface{} { return &skipReader{b: make([]byte, 1024)}