Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement an apache adaptor #29

Merged
merged 12 commits into from
Dec 13, 2024
177 changes: 177 additions & 0 deletions protocol/thrift/apache/adaptor/adaptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package adaptor

import (
"fmt"
"io"
"reflect"
"unsafe"

"github.com/cloudwego/gopkg/bufiox"
"github.com/cloudwego/gopkg/protocol/thrift"
)

// AdaptRead receive a kitex binary protocol and read it by given function.
func AdaptRead(p, iprot interface{}) (err error) {
// for now, we use fastCodec to adapt apache codec.
// the struct should have the function 'FastRead'
fastStruct, ok := p.(fastReader)
if !ok {
return fmt.Errorf("no codec implementation available")
}

var rd io.Reader
var br bufiox.Reader
// if iprot is from kitex v0.12.0+, use interface assert to get bufiox reader
if bp, ok := iprot.(bufioxReaderWriter); ok {
br = bp.GetBufioxReader()
} else {
// if iprot is from kitex lower than v0.12.0, use reflection to get reader
// 1. in kitex v0.11.0, reader is from the field 'br' which is a bufiox.Reader
// eg: https://github.com/cloudwego/kitex/blob/v0.11.0/pkg/protocol/bthrift/apache/binary_protocol.go#L44
// 2. in kitex under v0.11.0, reader is from the field 'trans' which is kitex byteBuffer (mostly NetpollByteBuffer)
// eg: https://github.com/cloudwego/kitex/blob/v0.5.2/pkg/remote/codec/thrift/binary_protocol.go#L54
// in apache thrift v0.13.0, reader is from the field 'trans' which implements the interface io.ReadWriter
// eg: https://github.com/apache/thrift/blob/v0.13.0/lib/go/thrift/binary_protocol.go#L33
fieldNames := []string{"br", "trans"}
for _, fn := range fieldNames {
reader, exist, err := getUnexportField(iprot, fn)
if err != nil {
return err
}
if exist {
switch r := reader.(type) {
case bufiox.Reader:
br = r
case byteBuffer:
// if reader is from byteBuffer, Read() function is not always available
// so use an adaptor to implement Read() by Next() and ReadableLen()
rd = byteBuffer2ReadWriter(r)
case io.ReadWriter:
// if reader is not byteBuffer but is io.ReadWriter, it supposes to be apache thrift binary protocol
rd = r
default:
return fmt.Errorf("reader not ok")
}
break
}
}
}

var buf []byte
if br != nil {
sd := thrift.NewSkipDecoder(br)
buf, err = sd.Next(thrift.STRUCT)
sd.Release()
} else if rd != nil {
sd := thrift.NewReaderSkipDecoder(rd)
buf, err = sd.Next(thrift.STRUCT)
sd.Release()
} else {
return fmt.Errorf("no available field for reader")
}

if err != nil {
return err
}

// unmarshal the data into struct
_, err = fastStruct.FastRead(buf)

return err
}

// AdaptWrite receive a kitex binary protocol and write it by given function.
func AdaptWrite(p, oprot interface{}) error {
// for now, we use fastCodec, the struct should have the function 'FastWrite'
// but in old kitex_gen, the arguments of FastWrite is not from the same package.
// so we use reflection to handle this situation.
fastStruct, err := toFastCodec(p)
if err != nil {
return fmt.Errorf("no codec implementation available:%s", err)
}

var bw bufiox.Writer
// if iprot is from kitex v0.12.0+, use interface assert to get bufiox writer
if bp, ok := oprot.(bufioxReaderWriter); ok {
bw = bp.GetBufioxWriter()
} else {
// if iprot is from kitex lower than v0.12.0, use reflection to get writer
// 1. in kitex v0.11.0, writer is from the field 'bw' which is a bufiox.Writer
// eg: https://github.com/cloudwego/kitex/blob/v0.11.0/pkg/protocol/bthrift/apache/binary_protocol.go#L44
// 2. in kitex under v0.11.0, writer is from the field 'trans' which is kitex buffer (mostly NetpollByteBuffer)
// eg: https://github.com/cloudwego/kitex/blob/v0.5.2/pkg/remote/codec/thrift/binary_protocol.go#L54
// in apache thrift v0.13.0, writer is from the field 'trans' which implements the interface io.ReadWriter
// eg: https://github.com/apache/thrift/blob/v0.13.0/lib/go/thrift/binary_protocol.go#L33
fieldNames := []string{"bw", "trans"}
for _, fn := range fieldNames {
writer, exist, err := getUnexportField(oprot, fn)
if err != nil {
return err
}
if exist {
switch w := writer.(type) {
case bufiox.Writer:
bw = w
case byteBuffer:
// if writer is from byteBuffer, Write() function is not always available
// so use an adaptor to implement Write() by Malloc()
bw = bufiox.NewDefaultWriter(byteBuffer2ReadWriter(w))
case io.ReadWriter:
// if writer is not byteBuffer but is io.ReadWriter, it supposes to be apache thrift binary protocol
bw = bufiox.NewDefaultWriter(w)
default:
return fmt.Errorf("writer type not ok")
}
break
}
}
}
if bw == nil {
return fmt.Errorf("no available field for writer")
}

// use fast codec
buf := make([]byte, fastStruct.BLength())
n := fastStruct.FastWriteNocopy(buf, nil)
if n < 0 {
return fmt.Errorf("failed to fast write")
}
buf = buf[:n]
_, err = bw.WriteBinary(buf)
if err != nil {
return err
}
return bw.Flush()
}

// getUnexportField retrieves the value of an unexported struct field.
func getUnexportField(p interface{}, fieldName string) (value interface{}, ok bool, error error) {
if reflect.TypeOf(p).Kind() != reflect.Ptr {
return nil, false, fmt.Errorf("%s is not a ptr", p)
}
field := reflect.ValueOf(p).Elem().FieldByName(fieldName)
if field.IsValid() {
return reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem().Interface(), true, nil
}
return nil, false, nil
}

// bufioxReaderWriter
type bufioxReaderWriter interface {
GetBufioxReader() bufiox.Reader
GetBufioxWriter() bufiox.Writer
}
72 changes: 72 additions & 0 deletions protocol/thrift/apache/adaptor/adaptor_apache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2024 CloudWeGo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package adaptor_test

import (
"bytes"
"io"
"testing"
)

// TestApacheProtocol
func TestApacheProtocol(t *testing.T) {
// Apache Protocol implement the interface TProtocol (https://github.com/apache/thrift/blob/v0.13.0/lib/go/thrift/protocol.go#L33)
// The implementation classes of this interface include many protocols.
// Such as TBinaryProtocol, TCompactProtocol, THeaderProtocol, and TJsonProtocol.
// However, for users of cloudwego/kitex, only TBinaryProtocol and TCompactProtocol are used
// So we only support to adapt these two protocols.

// apache binary protocol with old kitex struct
testAdaptor(t, oldKitexGen, mockTBinaryProtocol())

// apache binary protocol with new kitex struct
testAdaptor(t, newKitexGen, mockTBinaryProtocol())

// apache compact protocol with old kitex struct
testAdaptor(t, oldKitexGen, mockTCompactProtocol())

// apache compact protocol with new kitex struct
testAdaptor(t, newKitexGen, mockTCompactProtocol())
}

// tBinaryProtocol
// https://github.com/apache/thrift/blob/v0.13.0/lib/go/thrift/binary_protocol.go#L33
type tBinaryProtocol struct {
trans tRichTransport
}

func mockTBinaryProtocol() *tBinaryProtocol {
return &tBinaryProtocol{
trans: bytes.NewBuffer(nil),
}
}

// tCompactProtocol
// https://github.com/apache/thrift/blob/v0.13.0/lib/go/thrift/compact_protocol.go#L88
type tCompactProtocol struct {
trans tRichTransport
}

func mockTCompactProtocol() *tCompactProtocol {
return &tCompactProtocol{
trans: bytes.NewBuffer(nil),
}
}

// https://github.com/apache/thrift/blob/v0.13.0/lib/go/thrift/trans.go
type tRichTransport interface {
io.ReadWriter
// another interfaces are not used in apache adaptor, just ignore them.
}
Loading
Loading