Skip to content

Commit

Permalink
refactor(thrift): new skipdecoder impl
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaost committed Nov 29, 2024
1 parent 3ed84c3 commit acc9c91
Show file tree
Hide file tree
Showing 3 changed files with 335 additions and 164 deletions.
264 changes: 151 additions & 113 deletions protocol/thrift/skipdecoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@
package thrift

import (
"encoding/binary"
"fmt"
"io"
"sync"

"github.com/bytedance/gopkg/lang/mcache"
"github.com/cloudwego/gopkg/bufiox"
)

const defaultSkipDecoderSize = 4096

var poolSkipDecoder = sync.Pool{
New: func() interface{} {
return &SkipDecoder{}
Expand All @@ -37,143 +34,184 @@ var poolSkipDecoder = sync.Pool{
type SkipDecoder struct {
r bufiox.Reader

// for storing Next(ttype) buffer
nextBuf []byte

// for reusing buffer
pendingBuf [][]byte
rn int
}

// NewSkipDecoder ... call Release if no longer use
// NewSkipDecoder ...
//
// call Release if no longer use
func NewSkipDecoder(r bufiox.Reader) *SkipDecoder {
p := poolSkipDecoder.Get().(*SkipDecoder)
p.r = r
return p
}

// Release releases the peekAck decoder, callers cannot use the returned data of Next after calling Release.
// Release puts SkipDecoder back to pool and reuse it next time.
//
// DO NOT USE SkipDecoder after calling Release.
func (p *SkipDecoder) Release() {
if cap(p.nextBuf) > 0 {
mcache.Free(p.nextBuf)
}
*p = SkipDecoder{}
poolSkipDecoder.Put(p)
}

// Next skips a specific type and returns its bytes.
// Callers cannot use the returned data after calling Release.
//
// The returned buf is directly from bufiox.Reader with the same lifecycle.
func (p *SkipDecoder) Next(t TType) (buf []byte, err error) {
p.nextBuf = mcache.Malloc(0, defaultSkipDecoderSize)
if err = p.skip(t, defaultRecursionDepth); err != nil {
p.rn = 0
if err = NewSkipDecoderTpl(p).Skip(t, defaultRecursionDepth); err != nil {
return
}
var offset int
for _, b := range p.pendingBuf {
offset += copy(p.nextBuf[offset:], b[offset:])
mcache.Free(b)
buf, err = p.r.Next(p.rn)
return
}

// SkipN implements SkipDecoderIFace
func (p *SkipDecoder) SkipN(n int) (buf []byte, err error) {
// old version netpoll may have performance issue when using Peek
// see: https://github.com/cloudwego/netpoll/pull/335
if buf, err = p.r.Peek(p.rn + n); err == nil {
buf = buf[p.rn:]
p.rn += n
}
p.pendingBuf = nil
buf = p.nextBuf
return
}

func (p *SkipDecoder) skip(t TType, maxdepth int) error {
if maxdepth == 0 {
return errDepthLimitExceeded
// BytesSkipDecoder ...
type BytesSkipDecoder struct {
n int
b []byte
}

var poolBytesSkipDecoder = sync.Pool{
New: func() interface{} {
return &BytesSkipDecoder{}
},
}

// NewBytesSkipDecoder ...
//
// call Release if no longer use
func NewBytesSkipDecoder(b []byte) *BytesSkipDecoder {
p := poolBytesSkipDecoder.Get().(*BytesSkipDecoder)
p.Reset(b)
return p
}

// Release puts BytesSkipDecoder back to pool and reuse it next time.
//
// DO NOT USE BytesSkipDecoder after calling Release.
func (p *BytesSkipDecoder) Release() {
p.Reset(nil)
poolBytesSkipDecoder.Put(p)
}

// Reset ...
func (p *BytesSkipDecoder) Reset(b []byte) {
p.n = 0
p.b = b
}

// Next skips a specific type and returns its bytes.
//
// The returned buf refers to the input []byte without copy
func (p *BytesSkipDecoder) Next(t TType) (b []byte, err error) {
if err = NewSkipDecoderTpl(p).Skip(t, defaultRecursionDepth); err != nil {
return
}
if sz := typeToSize[t]; sz > 0 {
_, err := p.next(int(sz))
return err
b = p.b[:p.n]
p.b = p.b[p.n:]
p.n = 0
return
}

// SkipN implements SkipDecoderIFace
func (p *BytesSkipDecoder) SkipN(n int) ([]byte, error) {
if len(p.b) >= p.n+n {
p.n += n
return p.b[p.n-n : p.n], nil
}
switch t {
case STRING:
b, err := p.next(4)
if err != nil {
return err
}
sz := int(binary.BigEndian.Uint32(b))
if sz < 0 {
return errNegativeSize
}
if _, err := p.next(sz); err != nil {
return err
}
case STRUCT:
for {
b, err := p.next(1) // TType
if err != nil {
return err
}
tp := TType(b[0])
if tp == STOP {
break
}
if _, err := p.next(2); err != nil { // Field ID
return err
}
if err := p.skip(tp, maxdepth-1); err != nil {
return err
}
}
case MAP:
b, err := p.next(6) // 1 byte key TType, 1 byte value TType, 4 bytes Len
if err != nil {
return err
}
kt, vt, sz := TType(b[0]), TType(b[1]), int32(binary.BigEndian.Uint32(b[2:]))
if sz < 0 {
return errNegativeSize
}
ksz, vsz := int(typeToSize[kt]), int(typeToSize[vt])
if ksz > 0 && vsz > 0 {
_, err := p.next(int(sz) * (ksz + vsz))
return err
}
for i := int32(0); i < sz; i++ {
if err := p.skip(kt, maxdepth-1); err != nil {
return err
}
if err := p.skip(vt, maxdepth-1); err != nil {
return err
}
}
case SET, LIST:
b, err := p.next(5) // 1 byte value type, 4 bytes Len
if err != nil {
return err
}
vt, sz := TType(b[0]), int32(binary.BigEndian.Uint32(b[1:]))
if sz < 0 {
return errNegativeSize
}
if vsz := typeToSize[vt]; vsz > 0 {
_, err := p.next(int(sz) * int(vsz))
return err
}
for i := int32(0); i < sz; i++ {
if err := p.skip(vt, maxdepth-1); err != nil {
return err
}
}
default:
return NewProtocolException(INVALID_DATA, fmt.Sprintf("unknown data type %d", t))
return nil, io.EOF
}

// ReaderSkipDecoder ...
type ReaderSkipDecoder struct {
r io.Reader

n int // bytes read, n <= len(b)
b []byte
}

var poolReaderSkipDecoder = sync.Pool{
New: func() interface{} {
return &ReaderSkipDecoder{}
},
}

// NewReaderSkipDecoder creates a ReaderSkipDecoder from pool
//
// call Release if no longer use
func NewReaderSkipDecoder(r io.Reader) *ReaderSkipDecoder {
p := poolReaderSkipDecoder.Get().(*ReaderSkipDecoder)
p.Reset(r)
return p
}

// Release puts ReaderSkipDecoder back to pool and reuse it next time.
//
// DO NOT USE ReaderSkipDecoder after calling Release.
func (p *ReaderSkipDecoder) Release() {
// no need to free p.b
// will make use of p.b without reallcation
p.Reset(nil)
poolReaderSkipDecoder.Put(p)
}

// Reset ...
func (p *ReaderSkipDecoder) Reset(r io.Reader) {
p.r = r
p.n = 0
}

// Grow grows the underlying buffer to fit n bytes
func (p *ReaderSkipDecoder) Grow(n int) {
if len(p.b)-p.n >= n {
return
}
return nil
p.growSlow(n)
}

func (p *ReaderSkipDecoder) growSlow(n int) {
// mcache will take care of the size of newb
newb := mcache.Malloc(p.n + n)
copy(newb, p.b[:p.n])
mcache.Free(p.b)
p.b = newb
}

func (p *SkipDecoder) next(n int) (buf []byte, err error) {
if buf, err = p.r.Next(n); err != nil {
// Next skips a specific type and returns its bytes.
//
// The returned []byte is vaild before the next `Next` call or `Release`
func (p *ReaderSkipDecoder) Next(t TType) (b []byte, err error) {
p.n = 0
if err = NewSkipDecoderTpl(p).Skip(t, defaultRecursionDepth); err != nil {
return
}
if cap(p.nextBuf)-len(p.nextBuf) < n {
var ncap int
for ncap = cap(p.nextBuf) * 2; ncap-len(p.nextBuf) < n; ncap *= 2 {
}
nbs := mcache.Malloc(ncap, ncap)
p.pendingBuf = append(p.pendingBuf, p.nextBuf)
p.nextBuf = nbs[:len(p.nextBuf)]
return p.b[:p.n], nil
}

// SkipN implements SkipDecoderIFace
func (p *ReaderSkipDecoder) SkipN(n int) (buf []byte, err error) {
p.Grow(n)
buf = p.b[p.n : p.n+n]
for i := 0; i < n && err == nil; { // io.ReadFull(buf)
var nn int
nn, err = p.r.Read(buf[i:])
i += nn
}
if err != nil {
return
}
cn := copy(p.nextBuf[len(p.nextBuf):cap(p.nextBuf)], buf)
p.nextBuf = p.nextBuf[:len(p.nextBuf)+cn]
p.n += n
return
}
Loading

0 comments on commit acc9c91

Please sign in to comment.