Skip to content

Commit

Permalink
fix: fix some implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
HeyJavaBean committed Nov 26, 2024
1 parent 629c729 commit e450f27
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 51 deletions.
103 changes: 52 additions & 51 deletions apache_adaptor/adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,26 @@ import (
)

// AdaptRead receive a kitex binary protocol and read it by given function.
func AdaptRead(iprot interface{}, readFunc func(buf []byte) (int, error)) error {
func AdaptRead(p, iprot interface{}) error {
// for now, we use fastCodec to adapt apache codec.
// the struct should have the function 'FastRead'
fastStruct, ok := p.(fastReader)
if !ok {
return fmt.Errorf("no codec implementation avaliable")

Check warning on line 33 in apache_adaptor/adaptor.go

View workflow job for this annotation

GitHub Actions / compliant

"avaliable" should be "available".
}

var br bufiox.Reader
// if iprot is from kitex v0.12.0+, use interface assert to get bufiox reader
if bp, ok := iprot.(bufioxReaderWriter); ok {
br = bp.GetBufioxReader()
} else {
// if iprot is from kitex version lower than v0.12.0, use reflection to get reader
// in kitex v0.10.0, reader is from the field 'br' which is a bufiox.Reader
// in kitex under v0.10.0, reader is from the field 'trans' which is kitex byteBuffer (mostly NetpollByteBuffer)
// if iprot is from kitex lower than v0.12.0, use reflection to get reader
// 1. in kitex v0.11.0, reader is from the field 'br' which is a bufiox.Reader
// eg: https://github.com/cloudwego/kitex/blob/v0.11.0/pkg/protocol/bthrift/apache/binary_protocol.go#L44
// 2. in kitex under v0.11.0, reader is from the field 'trans' which is kitex byteBuffer (mostly NetpollByteBuffer)
// eg: https://github.com/cloudwego/kitex/blob/v0.5.2/pkg/remote/codec/thrift/binary_protocol.go#L54
// in apache thrift v0.13.0, reader is from the field 'trans' which implements the interface io.ReadWriter
// eg: https://github.com/apache/thrift/blob/v0.13.0/lib/go/thrift/binary_protocol.go#L33
fieldNames := []string{"br", "trans"}
for _, fn := range fieldNames {
reader, exist, err := getUnexportField(iprot, fn)
Expand All @@ -46,9 +57,11 @@ func AdaptRead(iprot interface{}, readFunc func(buf []byte) (int, error)) error
br = r
case byteBuffer:
// if reader is from byteBuffer, Read() function is not always available
// so use an adaptor to implement Read() by Next() and ReadableLen()
rd := next2Reader(r)
br = bufiox.NewDefaultReader(rd)
// so use an adaptor to implement Read() by Next() and ReadableLen()
br = bufiox.NewDefaultReader(byteBuffer2ReadWriter(r))
case io.ReadWriter:
// if reader is not byteBuffer but is io.ReadWriter, it suppose to be apache thrift binary protocol
br = bufiox.NewDefaultReader(r)
default:
return fmt.Errorf("reader not ok")
}
Expand All @@ -59,24 +72,40 @@ func AdaptRead(iprot interface{}, readFunc func(buf []byte) (int, error)) error
if br == nil {
return fmt.Errorf("no available field for reader")
}

// read data from iprot
buf, err := thrift.NewSkipDecoder(br).Next(thrift.STRUCT)
if err != nil {
return err
}
_, err = readFunc(buf)

// unmarshal the data into struct
_, err = fastStruct.FastRead(buf)
return err
}

// AdaptWrite receive a kitex binary protocol and write it by given function.
func AdaptWrite(oprot interface{}, writeFunc func() []byte) error {
func AdaptWrite(p, oprot interface{}) error {
// for now, we use fastCodec, the struct should have the function 'FastWrite'
// but in old kitex_gen, the arguments of FastWrite is not from the same package.
// so we use reflection to handle this situation.
fastStruct, err := toFastCodec(p)
if err != nil {
return fmt.Errorf("no codec implementation avaliable:%s", err)

Check warning on line 94 in apache_adaptor/adaptor.go

View workflow job for this annotation

GitHub Actions / compliant

"avaliable" should be "available".
}

var bw bufiox.Writer
// if iprot is from kitex v0.12.0+, use interface assert to get bufiox writer
if bp, ok := oprot.(bufioxReaderWriter); ok {
bw = bp.GetBufioxWriter()
} else {
// if iprot is from kitex version lower than v0.12.0, use reflection to get writer
// in kitex v0.10.0, writer is from the field 'bw' which is a bufiox.Writer
// in kitex under v0.10.0, writer is from the field 'trans' which implements the interface io.Writer
// if iprot is from kitex lower than v0.12.0, use reflection to get writer
// 1. in kitex v0.11.0, writer is from the field 'bw' which is a bufiox.Writer
// eg: https://github.com/cloudwego/kitex/blob/v0.11.0/pkg/protocol/bthrift/apache/binary_protocol.go#L44
// 2. in kitex under v0.11.0, writer is from the field 'trans' which is kitex buffer (mostly NetpollByteBuffer)
// eg: https://github.com/cloudwego/kitex/blob/v0.5.2/pkg/remote/codec/thrift/binary_protocol.go#L54
// in apache thrift v0.13.0, writer is from the field 'trans' which implements the interface io.ReadWriter
// eg: https://github.com/apache/thrift/blob/v0.13.0/lib/go/thrift/binary_protocol.go#L33
fieldNames := []string{"bw", "trans"}
for _, fn := range fieldNames {
writer, exist, err := getUnexportField(oprot, fn)
Expand All @@ -87,7 +116,12 @@ func AdaptWrite(oprot interface{}, writeFunc func() []byte) error {
switch w := writer.(type) {
case bufiox.Writer:
bw = w
case io.Writer:
case byteBuffer:
// if writer is from byteBuffer, Write() function is not always available
// so use an adaptor to implement Write() by Malloc()
bw = bufiox.NewDefaultWriter(byteBuffer2ReadWriter(w))
case io.ReadWriter:
// if writer is not byteBuffer but is io.ReadWriter, it suppose to be apache thrift binary protocol
bw = bufiox.NewDefaultWriter(w)
default:
return fmt.Errorf("writer type not ok")
Expand All @@ -99,8 +133,11 @@ func AdaptWrite(oprot interface{}, writeFunc func() []byte) error {
if bw == nil {
return fmt.Errorf("no available field for writer")
}
buf := writeFunc()
_, err := bw.WriteBinary(buf)

// use fast codec
buf := make([]byte, fastStruct.BLength())
buf = buf[:fastStruct.FastWriteNocopy(buf, nil)]
_, err = bw.WriteBinary(buf)
if err != nil {
return err
}
Expand All @@ -124,39 +161,3 @@ type bufioxReaderWriter interface {
GetBufioxReader() bufiox.Reader
GetBufioxWriter() bufiox.Writer
}

// byteBuffer
type byteBuffer interface {
// Next reads the next n bytes sequentially and returns the original buffer.
Next(n int) (p []byte, err error)

// ReadableLen returns the total length of readable buffer.
// Return: -1 means unreadable.
ReadableLen() (n int)
}

// nextReader is an adaptor that implement Read() by Next() and ReadableLen()
type nextReader struct {
nx byteBuffer
}

// Read reads data from the nextReader's internal buffer into p.
func (nr nextReader) Read(p []byte) (n int, err error) {
readable := nr.nx.ReadableLen()
if readable == -1 {
return 0, err
}
if readable > len(p) {
readable = len(p)
}
data, err := nr.nx.Next(readable)
if err != nil {
return -1, err
}
copy(p, data)
return readable, nil
}

func next2Reader(n byteBuffer) io.Reader {
return &nextReader{nx: n}
}
66 changes: 66 additions & 0 deletions apache_adaptor/byte_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apache_adaptor

import "io"

// byteBuffer
type byteBuffer interface {
// Next reads the next n bytes sequentially and returns the original buffer.
Next(n int) (p []byte, err error)

// ReadableLen returns the total length of readable buffer.
// Return: -1 means unreadable.
ReadableLen() (n int)

// Malloc n bytes sequentially in the writer buffer.
Malloc(n int) (buf []byte, err error)
}

// byteBufferWrapper is an adaptor that implement Read() by Next() and ReadableLen() and implement Write() by Malloc()
type byteBufferWrapper struct {
b byteBuffer
}

func byteBuffer2ReadWriter(n byteBuffer) io.ReadWriter {
return &byteBufferWrapper{b: n}
}

// Read reads data from the byteBufferWrapper's internal buffer into p.
func (bw byteBufferWrapper) Read(p []byte) (n int, err error) {
readable := bw.b.ReadableLen()
if readable == -1 {
return 0, err
}
if readable > len(p) {
readable = len(p)
}
data, err := bw.b.Next(readable)
if err != nil {
return -1, err
}
copy(p, data)
return readable, nil
}

// Write writes data from the byteBufferWrapper's internal buffer into p.
func (bw byteBufferWrapper) Write(p []byte) (n int, err error) {
data, err := bw.b.Malloc(len(p))
if err != nil {
return -1, err
}
copy(data, p)
return len(data), nil
}
93 changes: 93 additions & 0 deletions apache_adaptor/struct_codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apache_adaptor

import (
"fmt"
"reflect"

"github.com/cloudwego/gopkg/protocol/thrift"
)

type fastReader interface {
FastRead(buf []byte) (int, error)
}

const OldFastWriteMethod = "FastWriteNocopy"

func toFastCodec(p interface{}) (thrift.FastCodec, error) {
// if struct is from kitex_gen which is generated higher than v0.10.0,just assert gopkg thrift.FastCodec
if fast, ok := p.(thrift.FastCodec); ok {
return fast, nil
}
// if struct is lower than v0.10.0,the second argument 'bw' from FastWriterNocopy is from kitex package
// it's not good to import an old kitex dependency, so we have to use reflection
fast, ok := p.(interface {
BLength() int
FastRead(buf []byte) (int, error)
})
if !ok {
return nil, fmt.Errorf("no BLength method for struct")
}

method := reflect.ValueOf(p).MethodByName(OldFastWriteMethod)

if !method.IsValid() {
return nil, fmt.Errorf("method not found or not exported: %s", OldFastWriteMethod)
}

if method.Type().NumIn() != 2 {
return nil, fmt.Errorf("args num is not ok")
}

if method.Type().NumOut() != 1 {
return nil, fmt.Errorf("resp num is not ok")
}

if method.Type().Out(0) != reflect.TypeOf(0) {
return nil, fmt.Errorf("return type is not int")
}

if method.Type().In(0) != reflect.TypeOf([]byte{}) {
return nil, fmt.Errorf("input type 1st is not []byte")
}

return &oldFastCodec{
p: fast,
method: method,
}, nil
}

type oldFastCodec struct {
p interface {
BLength() int
FastRead(buf []byte) (int, error)
}
method reflect.Value
}

func (c *oldFastCodec) BLength() int {
return c.p.BLength()
}

func (c *oldFastCodec) FastWriteNocopy(buf []byte, bw thrift.NocopyWriter) int {
method := c.method
out := method.Call([]reflect.Value{reflect.ValueOf(buf), reflect.New(method.Type().In(1)).Elem()})
return out[0].Interface().(int)
}

func (c *oldFastCodec) FastRead(buf []byte) (int, error) {
return c.p.FastRead(buf)
}

0 comments on commit e450f27

Please sign in to comment.