From e450f27b83197fda925da0032f22fc66e63c9524 Mon Sep 17 00:00:00 2001 From: HeyJavaBean Date: Tue, 26 Nov 2024 20:45:14 +0800 Subject: [PATCH] fix: fix some implementations --- apache_adaptor/adaptor.go | 103 +++++++++++++++++---------------- apache_adaptor/byte_buffer.go | 66 +++++++++++++++++++++ apache_adaptor/struct_codec.go | 93 +++++++++++++++++++++++++++++ 3 files changed, 211 insertions(+), 51 deletions(-) create mode 100644 apache_adaptor/byte_buffer.go create mode 100644 apache_adaptor/struct_codec.go diff --git a/apache_adaptor/adaptor.go b/apache_adaptor/adaptor.go index 450670b..d6e07a4 100644 --- a/apache_adaptor/adaptor.go +++ b/apache_adaptor/adaptor.go @@ -25,15 +25,26 @@ import ( ) // AdaptRead receive a kitex binary protocol and read it by given function. -func AdaptRead(iprot interface{}, readFunc func(buf []byte) (int, error)) error { +func AdaptRead(p, iprot interface{}) error { + // for now, we use fastCodec to adapt apache codec. + // the struct should have the function 'FastRead' + fastStruct, ok := p.(fastReader) + if !ok { + return fmt.Errorf("no codec implementation avaliable") + } + var br bufiox.Reader // if iprot is from kitex v0.12.0+, use interface assert to get bufiox reader if bp, ok := iprot.(bufioxReaderWriter); ok { br = bp.GetBufioxReader() } else { - // if iprot is from kitex version lower than v0.12.0, use reflection to get reader - // in kitex v0.10.0, reader is from the field 'br' which is a bufiox.Reader - // in kitex under v0.10.0, reader is from the field 'trans' which is kitex byteBuffer (mostly NetpollByteBuffer) + // if iprot is from kitex lower than v0.12.0, use reflection to get reader + // 1. in kitex v0.11.0, reader is from the field 'br' which is a bufiox.Reader + // eg: https://github.com/cloudwego/kitex/blob/v0.11.0/pkg/protocol/bthrift/apache/binary_protocol.go#L44 + // 2. in kitex under v0.11.0, reader is from the field 'trans' which is kitex byteBuffer (mostly NetpollByteBuffer) + // eg: https://github.com/cloudwego/kitex/blob/v0.5.2/pkg/remote/codec/thrift/binary_protocol.go#L54 + // in apache thrift v0.13.0, reader is from the field 'trans' which implements the interface io.ReadWriter + // eg: https://github.com/apache/thrift/blob/v0.13.0/lib/go/thrift/binary_protocol.go#L33 fieldNames := []string{"br", "trans"} for _, fn := range fieldNames { reader, exist, err := getUnexportField(iprot, fn) @@ -46,9 +57,11 @@ func AdaptRead(iprot interface{}, readFunc func(buf []byte) (int, error)) error br = r case byteBuffer: // if reader is from byteBuffer, Read() function is not always available - // so use an adaptor to implement Read() by Next() and ReadableLen() - rd := next2Reader(r) - br = bufiox.NewDefaultReader(rd) + // so use an adaptor to implement Read() by Next() and ReadableLen() + br = bufiox.NewDefaultReader(byteBuffer2ReadWriter(r)) + case io.ReadWriter: + // if reader is not byteBuffer but is io.ReadWriter, it suppose to be apache thrift binary protocol + br = bufiox.NewDefaultReader(r) default: return fmt.Errorf("reader not ok") } @@ -59,24 +72,40 @@ func AdaptRead(iprot interface{}, readFunc func(buf []byte) (int, error)) error if br == nil { return fmt.Errorf("no available field for reader") } + + // read data from iprot buf, err := thrift.NewSkipDecoder(br).Next(thrift.STRUCT) if err != nil { return err } - _, err = readFunc(buf) + + // unmarshal the data into struct + _, err = fastStruct.FastRead(buf) return err } // AdaptWrite receive a kitex binary protocol and write it by given function. -func AdaptWrite(oprot interface{}, writeFunc func() []byte) error { +func AdaptWrite(p, oprot interface{}) error { + // for now, we use fastCodec, the struct should have the function 'FastWrite' + // but in old kitex_gen, the arguments of FastWrite is not from the same package. + // so we use reflection to handle this situation. + fastStruct, err := toFastCodec(p) + if err != nil { + return fmt.Errorf("no codec implementation avaliable:%s", err) + } + var bw bufiox.Writer // if iprot is from kitex v0.12.0+, use interface assert to get bufiox writer if bp, ok := oprot.(bufioxReaderWriter); ok { bw = bp.GetBufioxWriter() } else { - // if iprot is from kitex version lower than v0.12.0, use reflection to get writer - // in kitex v0.10.0, writer is from the field 'bw' which is a bufiox.Writer - // in kitex under v0.10.0, writer is from the field 'trans' which implements the interface io.Writer + // if iprot is from kitex lower than v0.12.0, use reflection to get writer + // 1. in kitex v0.11.0, writer is from the field 'bw' which is a bufiox.Writer + // eg: https://github.com/cloudwego/kitex/blob/v0.11.0/pkg/protocol/bthrift/apache/binary_protocol.go#L44 + // 2. in kitex under v0.11.0, writer is from the field 'trans' which is kitex buffer (mostly NetpollByteBuffer) + // eg: https://github.com/cloudwego/kitex/blob/v0.5.2/pkg/remote/codec/thrift/binary_protocol.go#L54 + // in apache thrift v0.13.0, writer is from the field 'trans' which implements the interface io.ReadWriter + // eg: https://github.com/apache/thrift/blob/v0.13.0/lib/go/thrift/binary_protocol.go#L33 fieldNames := []string{"bw", "trans"} for _, fn := range fieldNames { writer, exist, err := getUnexportField(oprot, fn) @@ -87,7 +116,12 @@ func AdaptWrite(oprot interface{}, writeFunc func() []byte) error { switch w := writer.(type) { case bufiox.Writer: bw = w - case io.Writer: + case byteBuffer: + // if writer is from byteBuffer, Write() function is not always available + // so use an adaptor to implement Write() by Malloc() + bw = bufiox.NewDefaultWriter(byteBuffer2ReadWriter(w)) + case io.ReadWriter: + // if writer is not byteBuffer but is io.ReadWriter, it suppose to be apache thrift binary protocol bw = bufiox.NewDefaultWriter(w) default: return fmt.Errorf("writer type not ok") @@ -99,8 +133,11 @@ func AdaptWrite(oprot interface{}, writeFunc func() []byte) error { if bw == nil { return fmt.Errorf("no available field for writer") } - buf := writeFunc() - _, err := bw.WriteBinary(buf) + + // use fast codec + buf := make([]byte, fastStruct.BLength()) + buf = buf[:fastStruct.FastWriteNocopy(buf, nil)] + _, err = bw.WriteBinary(buf) if err != nil { return err } @@ -124,39 +161,3 @@ type bufioxReaderWriter interface { GetBufioxReader() bufiox.Reader GetBufioxWriter() bufiox.Writer } - -// byteBuffer -type byteBuffer interface { - // Next reads the next n bytes sequentially and returns the original buffer. - Next(n int) (p []byte, err error) - - // ReadableLen returns the total length of readable buffer. - // Return: -1 means unreadable. - ReadableLen() (n int) -} - -// nextReader is an adaptor that implement Read() by Next() and ReadableLen() -type nextReader struct { - nx byteBuffer -} - -// Read reads data from the nextReader's internal buffer into p. -func (nr nextReader) Read(p []byte) (n int, err error) { - readable := nr.nx.ReadableLen() - if readable == -1 { - return 0, err - } - if readable > len(p) { - readable = len(p) - } - data, err := nr.nx.Next(readable) - if err != nil { - return -1, err - } - copy(p, data) - return readable, nil -} - -func next2Reader(n byteBuffer) io.Reader { - return &nextReader{nx: n} -} diff --git a/apache_adaptor/byte_buffer.go b/apache_adaptor/byte_buffer.go new file mode 100644 index 0000000..4b8ef14 --- /dev/null +++ b/apache_adaptor/byte_buffer.go @@ -0,0 +1,66 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apache_adaptor + +import "io" + +// byteBuffer +type byteBuffer interface { + // Next reads the next n bytes sequentially and returns the original buffer. + Next(n int) (p []byte, err error) + + // ReadableLen returns the total length of readable buffer. + // Return: -1 means unreadable. + ReadableLen() (n int) + + // Malloc n bytes sequentially in the writer buffer. + Malloc(n int) (buf []byte, err error) +} + +// byteBufferWrapper is an adaptor that implement Read() by Next() and ReadableLen() and implement Write() by Malloc() +type byteBufferWrapper struct { + b byteBuffer +} + +func byteBuffer2ReadWriter(n byteBuffer) io.ReadWriter { + return &byteBufferWrapper{b: n} +} + +// Read reads data from the byteBufferWrapper's internal buffer into p. +func (bw byteBufferWrapper) Read(p []byte) (n int, err error) { + readable := bw.b.ReadableLen() + if readable == -1 { + return 0, err + } + if readable > len(p) { + readable = len(p) + } + data, err := bw.b.Next(readable) + if err != nil { + return -1, err + } + copy(p, data) + return readable, nil +} + +// Write writes data from the byteBufferWrapper's internal buffer into p. +func (bw byteBufferWrapper) Write(p []byte) (n int, err error) { + data, err := bw.b.Malloc(len(p)) + if err != nil { + return -1, err + } + copy(data, p) + return len(data), nil +} diff --git a/apache_adaptor/struct_codec.go b/apache_adaptor/struct_codec.go new file mode 100644 index 0000000..adb5c7c --- /dev/null +++ b/apache_adaptor/struct_codec.go @@ -0,0 +1,93 @@ +// Copyright 2024 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apache_adaptor + +import ( + "fmt" + "reflect" + + "github.com/cloudwego/gopkg/protocol/thrift" +) + +type fastReader interface { + FastRead(buf []byte) (int, error) +} + +const OldFastWriteMethod = "FastWriteNocopy" + +func toFastCodec(p interface{}) (thrift.FastCodec, error) { + // if struct is from kitex_gen which is generated higher than v0.10.0,just assert gopkg thrift.FastCodec + if fast, ok := p.(thrift.FastCodec); ok { + return fast, nil + } + // if struct is lower than v0.10.0,the second argument 'bw' from FastWriterNocopy is from kitex package + // it's not good to import an old kitex dependency, so we have to use reflection + fast, ok := p.(interface { + BLength() int + FastRead(buf []byte) (int, error) + }) + if !ok { + return nil, fmt.Errorf("no BLength method for struct") + } + + method := reflect.ValueOf(p).MethodByName(OldFastWriteMethod) + + if !method.IsValid() { + return nil, fmt.Errorf("method not found or not exported: %s", OldFastWriteMethod) + } + + if method.Type().NumIn() != 2 { + return nil, fmt.Errorf("args num is not ok") + } + + if method.Type().NumOut() != 1 { + return nil, fmt.Errorf("resp num is not ok") + } + + if method.Type().Out(0) != reflect.TypeOf(0) { + return nil, fmt.Errorf("return type is not int") + } + + if method.Type().In(0) != reflect.TypeOf([]byte{}) { + return nil, fmt.Errorf("input type 1st is not []byte") + } + + return &oldFastCodec{ + p: fast, + method: method, + }, nil +} + +type oldFastCodec struct { + p interface { + BLength() int + FastRead(buf []byte) (int, error) + } + method reflect.Value +} + +func (c *oldFastCodec) BLength() int { + return c.p.BLength() +} + +func (c *oldFastCodec) FastWriteNocopy(buf []byte, bw thrift.NocopyWriter) int { + method := c.method + out := method.Call([]reflect.Value{reflect.ValueOf(buf), reflect.New(method.Type().In(1)).Elem()}) + return out[0].Interface().(int) +} + +func (c *oldFastCodec) FastRead(buf []byte) (int, error) { + return c.p.FastRead(buf) +}