Skip to content

Commit

Permalink
refactor: thrift codec uses bufiox interface for encoding and decoding
Browse files Browse the repository at this point in the history
  • Loading branch information
jayantxie committed Aug 19, 2024
1 parent 3a9525a commit 87c3e91
Show file tree
Hide file tree
Showing 10 changed files with 312 additions and 585 deletions.
120 changes: 0 additions & 120 deletions protocol/thrift/binarywriter.go

This file was deleted.

121 changes: 49 additions & 72 deletions protocol/thrift/binaryreader.go → protocol/thrift/bufferreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,90 +19,72 @@ package thrift
import (
"encoding/binary"
"fmt"
"io"
"math"
"sync"
)

// BinaryReader represents a reader for binary protocol
type BinaryReader struct {
r nextIface
d discardIface
"github.com/bytedance/gopkg/lang/dirtmake"
"github.com/cloudwego/gopkg/bufiox"
"github.com/cloudwego/gopkg/internal/hack"
)

rn int64
// BufferReader represents a reader for binary protocol
type BufferReader struct {
r bufiox.Reader
}

var poolBinaryReader = sync.Pool{
var poolBufferReader = sync.Pool{
New: func() interface{} {
return &BinaryReader{}
return &BufferReader{}
},
}

// NewBinaryReader ... call Release if no longer use for reusing
func NewBinaryReader(r io.Reader) *BinaryReader {
ret := poolBinaryReader.Get().(*BinaryReader)
ret.reset()
if nextr, ok := r.(nextIface); ok {
ret.r = nextr
} else {
nextr := newNextReader(r)
ret.r = nextr
ret.d = nextr
}
// NewBufferReader ... call Release if no longer use for reusing
func NewBufferReader(r bufiox.Reader) *BufferReader {
ret := poolBufferReader.Get().(*BufferReader)
ret.r = r
return ret
}

// Release ...
func (r *BinaryReader) Release() {
nextr, ok := r.r.(*nextReader)
if ok {
nextr.Release()
}
r.reset()
poolBinaryReader.Put(r)
}

func (r *BinaryReader) reset() {
func (r *BufferReader) Release() {
r.r.Release(nil)
r.r = nil
r.d = nil
r.rn = 0
poolBufferReader.Put(r)
}

func (r *BinaryReader) next(n int) (b []byte, err error) {
func (r *BufferReader) next(n int) (b []byte, err error) {
b, err = r.r.Next(n)
if err != nil {
err = NewProtocolExceptionWithErr(err)
}
r.rn += int64(len(b))
return
}

func (r *BinaryReader) skipn(n int) (err error) {
func (r *BufferReader) readBinary(bs []byte) (n int, err error) {
n, err = r.r.ReadBinary(bs)
if err != nil {
err = NewProtocolExceptionWithErr(err)
}
return
}

func (r *BufferReader) skipn(n int) (err error) {
if n < 0 {
return errNegativeSize
}
if r.d != nil {
var sz int
sz, err = r.d.Discard(n)
r.rn += int64(sz)
} else {
var b []byte
b, err = r.r.Next(n)
r.rn += int64(len(b))
}
if err != nil {
if err = r.r.Skip(n); err != nil {
return NewProtocolExceptionWithErr(err)
}
return nil
}

// Readn returns total bytes read from underlying reader
func (r *BinaryReader) Readn() int64 {
return r.rn
func (r *BufferReader) Readn() int64 {
return int64(r.r.ReadLen())
}

// ReadBool ...
func (r *BinaryReader) ReadBool() (v bool, err error) {
func (r *BufferReader) ReadBool() (v bool, err error) {
b, err := r.next(1)
if err != nil {
return false, err
Expand All @@ -112,7 +94,7 @@ func (r *BinaryReader) ReadBool() (v bool, err error) {
}

// ReadByte ...
func (r *BinaryReader) ReadByte() (v int8, err error) {
func (r *BufferReader) ReadByte() (v int8, err error) {
b, err := r.next(1)
if err != nil {
return 0, err
Expand All @@ -122,7 +104,7 @@ func (r *BinaryReader) ReadByte() (v int8, err error) {
}

// ReadI16 ...
func (r *BinaryReader) ReadI16() (v int16, err error) {
func (r *BufferReader) ReadI16() (v int16, err error) {
b, err := r.next(2)
if err != nil {
return 0, err
Expand All @@ -132,7 +114,7 @@ func (r *BinaryReader) ReadI16() (v int16, err error) {
}

// ReadI32 ...
func (r *BinaryReader) ReadI32() (v int32, err error) {
func (r *BufferReader) ReadI32() (v int32, err error) {
b, err := r.next(4)
if err != nil {
return 0, err
Expand All @@ -142,7 +124,7 @@ func (r *BinaryReader) ReadI32() (v int32, err error) {
}

// ReadI64 ...
func (r *BinaryReader) ReadI64() (v int64, err error) {
func (r *BufferReader) ReadI64() (v int64, err error) {
b, err := r.next(8)
if err != nil {
return 0, err
Expand All @@ -152,7 +134,7 @@ func (r *BinaryReader) ReadI64() (v int64, err error) {
}

// ReadDouble ...
func (r *BinaryReader) ReadDouble() (v float64, err error) {
func (r *BufferReader) ReadDouble() (v float64, err error) {
b, err := r.next(8)
if err != nil {
return 0, err
Expand All @@ -162,32 +144,27 @@ func (r *BinaryReader) ReadDouble() (v float64, err error) {
}

// ReadBinary ...
func (r *BinaryReader) ReadBinary() (b []byte, err error) {
func (r *BufferReader) ReadBinary() (b []byte, err error) {
sz, err := r.ReadI32()
if err != nil {
return nil, err
}
b, err = r.next(int(sz))
b = []byte(string(b)) // copy. use span cache?
b = dirtmake.Bytes(int(sz), int(sz))
_, err = r.readBinary(b)
return
}

// ReadString ...
func (r *BinaryReader) ReadString() (s string, err error) {
sz, err := r.ReadI32()
func (r *BufferReader) ReadString() (s string, err error) {
b, err := r.ReadBinary()
if err != nil {
return "", err
}
b, err := r.next(int(sz))
if err != nil {
return "", err
}
s = string(b) // copy. use span cache?
return
return hack.ByteSliceToString(b), nil
}

// ReadMessageBegin ...
func (r *BinaryReader) ReadMessageBegin() (name string, typeID TMessageType, seq int32, err error) {
func (r *BufferReader) ReadMessageBegin() (name string, typeID TMessageType, seq int32, err error) {
var header int32
header, err = r.ReadI32()
if err != nil {
Expand Down Expand Up @@ -215,7 +192,7 @@ func (r *BinaryReader) ReadMessageBegin() (name string, typeID TMessageType, seq
}

// ReadFieldBegin ...
func (r *BinaryReader) ReadFieldBegin() (typeID TType, id int16, err error) {
func (r *BufferReader) ReadFieldBegin() (typeID TType, id int16, err error) {
b, err := r.next(1)
if err != nil {
return 0, 0, err
Expand All @@ -233,7 +210,7 @@ func (r *BinaryReader) ReadFieldBegin() (typeID TType, id int16, err error) {
}

// ReadMapBegin ...
func (r *BinaryReader) ReadMapBegin() (kt, vt TType, size int, err error) {
func (r *BufferReader) ReadMapBegin() (kt, vt TType, size int, err error) {
b, err := r.next(6)
if err != nil {
return 0, 0, 0, err
Expand All @@ -243,7 +220,7 @@ func (r *BinaryReader) ReadMapBegin() (kt, vt TType, size int, err error) {
}

// ReadListBegin ...
func (r *BinaryReader) ReadListBegin() (et TType, size int, err error) {
func (r *BufferReader) ReadListBegin() (et TType, size int, err error) {
b, err := r.next(5)
if err != nil {
return 0, 0, err
Expand All @@ -253,7 +230,7 @@ func (r *BinaryReader) ReadListBegin() (et TType, size int, err error) {
}

// ReadSetBegin ...
func (r *BinaryReader) ReadSetBegin() (et TType, size int, err error) {
func (r *BufferReader) ReadSetBegin() (et TType, size int, err error) {
b, err := r.next(5)
if err != nil {
return 0, 0, err
Expand All @@ -263,19 +240,19 @@ func (r *BinaryReader) ReadSetBegin() (et TType, size int, err error) {
}

// Skip ...
func (r *BinaryReader) Skip(t TType) error {
func (r *BufferReader) Skip(t TType) error {
return r.skipType(t, defaultRecursionDepth)
}

func (r *BinaryReader) skipstr() error {
func (r *BufferReader) skipstr() error {
n, err := r.ReadI32()
if err != nil {
return err
}
return r.skipn(int(n))
}

func (r *BinaryReader) skipType(t TType, maxdepth int) error {
func (r *BufferReader) skipType(t TType, maxdepth int) error {
if maxdepth == 0 {
return errDepthLimitExceeded
}
Expand Down
Loading

0 comments on commit 87c3e91

Please sign in to comment.