diff --git a/apache_adaptor/adaptor.go b/apache_adaptor/adaptor.go index 1ca629d..450670b 100644 --- a/apache_adaptor/adaptor.go +++ b/apache_adaptor/adaptor.go @@ -24,49 +24,16 @@ import ( "github.com/cloudwego/gopkg/protocol/thrift" ) -type bufioxReaderWriter interface { - GetBufioxReader() bufiox.Reader - GetBufioxWriter() bufiox.Writer -} - -type ByteBuffer interface { - // Next reads the next n bytes sequentially and returns the original buffer. - Next(n int) (p []byte, err error) - - // ReadableLen returns the total length of readable buffer. - // Return: -1 means unreadable. - ReadableLen() (n int) -} - -type nextReader struct { - nx ByteBuffer -} - -func (nr nextReader) Read(p []byte) (n int, err error) { - readable := nr.nx.ReadableLen() - if readable == -1 { - return 0, err - } - if readable > len(p) { - readable = len(p) - } - data, err := nr.nx.Next(readable) - if err != nil { - return -1, err - } - copy(p, data) - return readable, nil -} - -func next2Reader(n ByteBuffer) io.Reader { - return &nextReader{nx: n} -} - +// AdaptRead receive a kitex binary protocol and read it by given function. func AdaptRead(iprot interface{}, readFunc func(buf []byte) (int, error)) error { var br bufiox.Reader + // if iprot is from kitex v0.12.0+, use interface assert to get bufiox reader if bp, ok := iprot.(bufioxReaderWriter); ok { br = bp.GetBufioxReader() } else { + // if iprot is from kitex version lower than v0.12.0, use reflection to get reader + // in kitex v0.10.0, reader is from the field 'br' which is a bufiox.Reader + // in kitex under v0.10.0, reader is from the field 'trans' which is kitex byteBuffer (mostly NetpollByteBuffer) fieldNames := []string{"br", "trans"} for _, fn := range fieldNames { reader, exist, err := getUnexportField(iprot, fn) @@ -77,7 +44,9 @@ func AdaptRead(iprot interface{}, readFunc func(buf []byte) (int, error)) error switch r := reader.(type) { case bufiox.Reader: br = r - case ByteBuffer: + case byteBuffer: + // if reader is from byteBuffer, Read() function is not always available + // so use an adaptor to implement Read() by Next() and ReadableLen() rd := next2Reader(r) br = bufiox.NewDefaultReader(rd) default: @@ -98,11 +67,16 @@ func AdaptRead(iprot interface{}, readFunc func(buf []byte) (int, error)) error return err } +// AdaptWrite receive a kitex binary protocol and write it by given function. func AdaptWrite(oprot interface{}, writeFunc func() []byte) error { var bw bufiox.Writer + // if iprot is from kitex v0.12.0+, use interface assert to get bufiox writer if bp, ok := oprot.(bufioxReaderWriter); ok { bw = bp.GetBufioxWriter() } else { + // if iprot is from kitex version lower than v0.12.0, use reflection to get writer + // in kitex v0.10.0, writer is from the field 'bw' which is a bufiox.Writer + // in kitex under v0.10.0, writer is from the field 'trans' which implements the interface io.Writer fieldNames := []string{"bw", "trans"} for _, fn := range fieldNames { writer, exist, err := getUnexportField(oprot, fn) @@ -133,6 +107,7 @@ func AdaptWrite(oprot interface{}, writeFunc func() []byte) error { return bw.Flush() } +// getUnexportField retrieves the value of an unexported struct field. func getUnexportField(p interface{}, fieldName string) (value interface{}, ok bool, error error) { if reflect.TypeOf(p).Kind() != reflect.Ptr { return nil, false, fmt.Errorf("%s is not a ptr", p) @@ -143,3 +118,45 @@ func getUnexportField(p interface{}, fieldName string) (value interface{}, ok bo } return nil, false, nil } + +// bufioxReaderWriter +type bufioxReaderWriter interface { + GetBufioxReader() bufiox.Reader + GetBufioxWriter() bufiox.Writer +} + +// byteBuffer +type byteBuffer interface { + // Next reads the next n bytes sequentially and returns the original buffer. + Next(n int) (p []byte, err error) + + // ReadableLen returns the total length of readable buffer. + // Return: -1 means unreadable. + ReadableLen() (n int) +} + +// nextReader is an adaptor that implement Read() by Next() and ReadableLen() +type nextReader struct { + nx byteBuffer +} + +// Read reads data from the nextReader's internal buffer into p. +func (nr nextReader) Read(p []byte) (n int, err error) { + readable := nr.nx.ReadableLen() + if readable == -1 { + return 0, err + } + if readable > len(p) { + readable = len(p) + } + data, err := nr.nx.Next(readable) + if err != nil { + return -1, err + } + copy(p, data) + return readable, nil +} + +func next2Reader(n byteBuffer) io.Reader { + return &nextReader{nx: n} +}