Skip to content

Commit

Permalink
optimize: skip decoder
Browse files Browse the repository at this point in the history
  • Loading branch information
jayantxie committed Aug 15, 2024
1 parent 7a27596 commit cf7eda2
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 41 deletions.
64 changes: 29 additions & 35 deletions protocol/thrift/skipdecoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ package thrift
import (
"encoding/binary"
"fmt"
"io"
"sync"

"github.com/cloudwego/gopkg/bufiox"
)

var poolSkipDecoder = sync.Pool{
Expand All @@ -31,95 +32,88 @@ var poolSkipDecoder = sync.Pool{

// SkipDecoder scans the underlying io.Reader and returns the bytes of a type
type SkipDecoder struct {
p skipReaderIface
p bufiox.Reader
offset int
}

// NewSkipDecoder ... call Release if no longer use
func NewSkipDecoder(r io.Reader) *SkipDecoder {
func NewSkipDecoder(r bufiox.Reader) *SkipDecoder {
p := poolSkipDecoder.Get().(*SkipDecoder)
p.Reset(r)
return p
}

// Reset ...
func (p *SkipDecoder) Reset(r io.Reader) {
// fast path without returning to pool if remote.ByteBuffer && *skipByteBuffer
if buf, ok := r.(remoteByteBuffer); ok {
if p.p != nil {
r, ok := p.p.(*skipByteBuffer)
if ok {
r.Reset(buf)
return
}
p.p.Release()
}
p.p = newSkipByteBuffer(buf)
return
}

// not remote.ByteBuffer

if p.p != nil {
p.p.Release()
}
p.p = newSkipReader(r)
func (p *SkipDecoder) Reset(r bufiox.Reader) {
p.p = r
p.offset = 0
}

// Release ...
func (p *SkipDecoder) Release() {
p.p.Release()
p.p.Release(nil)
p.p = nil
p.offset = 0
poolSkipDecoder.Put(p)
}

func (p *SkipDecoder) Peek(n int) (buf []byte, err error) {
buf, err = p.p.Peek(n)
if err != nil {
return
}
p.offset += n
return
}

// Next skips a specific type and returns its bytes
func (p *SkipDecoder) Next(t TType) (buf []byte, err error) {
if err := p.skip(t, defaultRecursionDepth); err != nil {
return nil, err
}
return p.p.Bytes()
return p.p.Next(p.offset)
}

func (p *SkipDecoder) skip(t TType, maxdepth int) error {
if maxdepth == 0 {
return errDepthLimitExceeded
}
if sz := typeToSize[t]; sz > 0 {
_, err := p.p.Next(int(sz))
_, err := p.Peek(int(sz))
return err
}
switch t {
case STRING:
b, err := p.p.Next(4)
b, err := p.Peek(4)
if err != nil {
return err
}
sz := int(binary.BigEndian.Uint32(b))
if sz < 0 {
return errNegativeSize
}
if _, err := p.p.Next(sz); err != nil {
if _, err := p.Peek(sz); err != nil {
return err
}
case STRUCT:
for {
b, err := p.p.Next(1) // TType
b, err := p.Peek(1) // TType
if err != nil {
return err
}
tp := TType(b[0])
if tp == STOP {
break
}
if _, err := p.p.Next(2); err != nil { // Field ID
if _, err := p.Peek(2); err != nil { // Field ID
return err
}
if err := p.skip(tp, maxdepth-1); err != nil {
return err
}
}
case MAP:
b, err := p.p.Next(6) // 1 byte key TType, 1 byte value TType, 4 bytes Len
b, err := p.Peek(6) // 1 byte key TType, 1 byte value TType, 4 bytes Len
if err != nil {
return err
}
Expand All @@ -129,7 +123,7 @@ func (p *SkipDecoder) skip(t TType, maxdepth int) error {
}
ksz, vsz := int(typeToSize[kt]), int(typeToSize[vt])
if ksz > 0 && vsz > 0 {
_, err := p.p.Next(int(sz) * (ksz + vsz))
_, err := p.Peek(int(sz) * (ksz + vsz))
return err
}
for i := int32(0); i < sz; i++ {
Expand All @@ -141,7 +135,7 @@ func (p *SkipDecoder) skip(t TType, maxdepth int) error {
}
}
case SET, LIST:
b, err := p.p.Next(5) // 1 byte value type, 4 bytes Len
b, err := p.Peek(5) // 1 byte value type, 4 bytes Len
if err != nil {
return err
}
Expand All @@ -150,7 +144,7 @@ func (p *SkipDecoder) skip(t TType, maxdepth int) error {
return errNegativeSize
}
if vsz := typeToSize[vt]; vsz > 0 {
_, err := p.p.Next(int(sz) * int(vsz))
_, err := p.Peek(int(sz) * int(vsz))
return err
}
for i := int32(0); i < sz; i++ {
Expand Down
6 changes: 0 additions & 6 deletions protocol/thrift/skipreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ import (

// this file contains readers for SkipDecoder

type skipReaderIface interface {
Next(n int) (buf []byte, err error)
Bytes() (buf []byte, err error)
Release()
}

var poolSkipReader = sync.Pool{
New: func() interface{} {
return &skipReader{b: make([]byte, 1024)}
Expand Down

0 comments on commit cf7eda2

Please sign in to comment.