Skip to content

Commit

Permalink
refactor: thrift codec uses bufiox interface for encoding and decoding
Browse files Browse the repository at this point in the history
  • Loading branch information
jayantxie committed Aug 26, 2024
1 parent b8e81cf commit 9b1ac58
Show file tree
Hide file tree
Showing 14 changed files with 451 additions and 615 deletions.
10 changes: 8 additions & 2 deletions bufiox/defaultbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ func (r *DefaultReader) ReadLen() (n int) {
return r.ri
}

func (r *DefaultReader) ReadableLen() (n int) {
return len(r.buf) - r.ri
}

func (r *DefaultReader) ReadBinary(bs []byte) (m int, err error) {
m = r.acquire(len(bs))
copy(bs, r.buf[r.ri:r.ri+m])
Expand All @@ -195,7 +199,7 @@ func (r *DefaultReader) Release(e error) error {
if len(r.buf)-r.ri == 0 {
// release buf
r.maxSizeStats.update(cap(r.buf))
if !r.bufReadOnly {
if !r.bufReadOnly && cap(r.buf) > 0 {
mcache.Free(r.buf)
}
r.buf = nil
Expand Down Expand Up @@ -348,7 +352,9 @@ func (w *DefaultWriter) Flush() (err error) {
}
w.maxSizeStats.update(cap(w.buf))
if !w.disableCache {
mcache.Free(w.buf)
if cap(w.buf) > 0 {
mcache.Free(w.buf)
}
if w.pendingBuf != nil {
for _, buf := range w.pendingBuf {
mcache.Free(buf)
Expand Down
19 changes: 10 additions & 9 deletions protocol/thrift/apache/apache.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ package apache

import (
"errors"
"io"

"github.com/cloudwego/gopkg/bufiox"
)

var (
fnCheckTStruct func(v interface{}) error

fnThriftRead func(rw io.ReadWriter, v interface{}) error
fnThriftWrite func(rw io.ReadWriter, v interface{}) error
fnThriftRead func(r bufiox.Reader, v interface{}) error
fnThriftWrite func(w bufiox.Writer, v interface{}) error
)

// RegisterCheckTStruct accepts `thrift.TStruct check` func and save it for later use.
Expand All @@ -53,12 +54,12 @@ func RegisterCheckTStruct(fn func(v interface{}) error) {
}

// RegisterThriftRead ...
func RegisterThriftRead(fn func(rw io.ReadWriter, v interface{}) error) {
func RegisterThriftRead(fn func(r bufiox.Reader, v interface{}) error) {
fnThriftRead = fn
}

// RegisterThriftWrite ...
func RegisterThriftWrite(fn func(rw io.ReadWriter, v interface{}) error) {
func RegisterThriftWrite(fn func(w bufiox.Writer, v interface{}) error) {
fnThriftWrite = fn
}

Expand All @@ -77,17 +78,17 @@ func CheckTStruct(v interface{}) error {
}

// ThriftRead ...
func ThriftRead(rw io.ReadWriter, v interface{}) error {
func ThriftRead(r bufiox.Reader, v interface{}) error {
if fnThriftRead == nil {
return errThriftReadNotRegistered
}
return fnThriftRead(rw, v)
return fnThriftRead(r, v)
}

// ThriftWrite ...
func ThriftWrite(rw io.ReadWriter, v interface{}) error {
func ThriftWrite(w bufiox.Writer, v interface{}) error {
if fnThriftWrite == nil {
return errThriftWriteNotRegistered
}
return fnThriftWrite(rw, v)
return fnThriftWrite(w, v)
}
43 changes: 28 additions & 15 deletions protocol/thrift/apache/apache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ package apache

import (
"bytes"
"encoding/json"
"errors"
"io"
"testing"

"github.com/cloudwego/gopkg/bufiox"
"github.com/stretchr/testify/require"
)

func TestThriftReadWrite(t *testing.T) {

v := &TestingWriteRead{Msg: "Hello"}

err := CheckTStruct(v)
Expand All @@ -37,41 +35,56 @@ func TestThriftReadWrite(t *testing.T) {
require.NoError(t, err)

buf := &bytes.Buffer{}
bw := bufiox.NewDefaultWriter(buf)

err = ThriftWrite(buf, v)
err = ThriftWrite(bw, v)
require.Same(t, err, errThriftWriteNotRegistered)

RegisterThriftWrite(callThriftWrite)
err = ThriftWrite(NewBufferTransport(buf), v) // calls v.Write
err = ThriftWrite(bw, v) // calls v.Write
require.NoError(t, err)
err = bw.Flush()
require.NoError(t, err)

p := &TestingWriteRead{}

err = ThriftRead(NewBufferTransport(buf), p)
br := bufiox.NewDefaultReader(buf)

err = ThriftRead(br, p)
require.Same(t, err, errThriftReadNotRegistered)

RegisterThriftRead(callThriftRead)
err = ThriftRead(NewBufferTransport(buf), p) // calls p.Read
err = ThriftRead(br, p) // calls p.Read
require.NoError(t, err)

require.Equal(t, v.Msg, p.Msg)
}

type TStruct interface { // simulate thrift.TStruct
Read(r io.Reader) error
Write(w io.Writer) error
Read(r bufiox.Reader) error
Write(w bufiox.Writer) error
}

type TestingWriteRead struct {
Msg string
}

func (t *TestingWriteRead) Read(r io.Reader) error {
return json.NewDecoder(r).Decode(t)
func (t *TestingWriteRead) Read(r bufiox.Reader) error {
b, err := r.Next(5)
if err != nil {
return err
}
t.Msg = string(b)
return nil
}

func (t *TestingWriteRead) Write(w io.Writer) error {
return json.NewEncoder(w).Encode(t)
func (t *TestingWriteRead) Write(w bufiox.Writer) error {
b, err := w.Malloc(5)
if err != nil {
return err
}
copy(b, t.Msg)
return nil
}

var errNotThriftTStruct = errors.New("errNotThriftTStruct")
Expand All @@ -84,15 +97,15 @@ func checkTStruct(v interface{}) error {
return nil
}

func callThriftRead(rw io.ReadWriter, v interface{}) error {
func callThriftRead(rw bufiox.Reader, v interface{}) error {
p, ok := v.(TStruct)
if !ok {
return errNotThriftTStruct
}
return p.Read(rw)
}

func callThriftWrite(rw io.ReadWriter, v interface{}) error {
func callThriftWrite(rw bufiox.Writer, v interface{}) error {
p, ok := v.(TStruct)
if !ok {
return errNotThriftTStruct
Expand Down
28 changes: 24 additions & 4 deletions protocol/thrift/binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,20 @@ import (
"math"
"unsafe"

"github.com/bytedance/gopkg/lang/span"
"github.com/cloudwego/gopkg/internal/hack"
)

var (
spanCache = span.NewSpanCache(1024 * 1024)
spanCacheEnable bool = false
)

// SetSpanCache enable/disable binary protocol bytes/string allocator
func SetSpanCache(enable bool) {
spanCacheEnable = enable
}

var Binary BinaryProtocol

type BinaryProtocol struct{}
Expand Down Expand Up @@ -316,8 +327,12 @@ func (p BinaryProtocol) ReadBinary(buf []byte) (b []byte, l int, err error) {
if len(buf) < l {
return nil, 4, errReadBin
}
// TODO: use span
return []byte(string(buf[4:l])), l, nil
if spanCacheEnable {
b = spanCache.Copy(buf[4:l])
} else {
b = []byte(string(buf[4:l]))
}
return b, l, nil
}

func (p BinaryProtocol) ReadString(buf []byte) (s string, l int, err error) {
Expand All @@ -329,8 +344,13 @@ func (p BinaryProtocol) ReadString(buf []byte) (s string, l int, err error) {
if len(buf) < l {
return "", 4, errReadStr
}
// TODO: use span
return string(buf[4:l]), l, nil
if spanCacheEnable {
data := spanCache.Copy(buf[4:l])
s = hack.ByteSliceToString(data)
} else {
s = string(buf[4:l])
}
return s, l, nil
}

func (BinaryProtocol) ReadBool(buf []byte) (v bool, l int, err error) {
Expand Down
120 changes: 0 additions & 120 deletions protocol/thrift/binarywriter.go

This file was deleted.

Loading

0 comments on commit 9b1ac58

Please sign in to comment.