Skip to content

Commit

Permalink
chore: add comment
Browse files Browse the repository at this point in the history
  • Loading branch information
HeyJavaBean committed Nov 22, 2024
1 parent 5c89f38 commit 629c729
Showing 1 changed file with 56 additions and 39 deletions.
95 changes: 56 additions & 39 deletions apache_adaptor/adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,49 +24,16 @@ import (
"github.com/cloudwego/gopkg/protocol/thrift"
)

type bufioxReaderWriter interface {
GetBufioxReader() bufiox.Reader
GetBufioxWriter() bufiox.Writer
}

type ByteBuffer interface {
// Next reads the next n bytes sequentially and returns the original buffer.
Next(n int) (p []byte, err error)

// ReadableLen returns the total length of readable buffer.
// Return: -1 means unreadable.
ReadableLen() (n int)
}

type nextReader struct {
nx ByteBuffer
}

func (nr nextReader) Read(p []byte) (n int, err error) {
readable := nr.nx.ReadableLen()
if readable == -1 {
return 0, err
}
if readable > len(p) {
readable = len(p)
}
data, err := nr.nx.Next(readable)
if err != nil {
return -1, err
}
copy(p, data)
return readable, nil
}

func next2Reader(n ByteBuffer) io.Reader {
return &nextReader{nx: n}
}

// AdaptRead receive a kitex binary protocol and read it by given function.
func AdaptRead(iprot interface{}, readFunc func(buf []byte) (int, error)) error {
var br bufiox.Reader
// if iprot is from kitex v0.12.0+, use interface assert to get bufiox reader
if bp, ok := iprot.(bufioxReaderWriter); ok {
br = bp.GetBufioxReader()
} else {
// if iprot is from kitex version lower than v0.12.0, use reflection to get reader
// in kitex v0.10.0, reader is from the field 'br' which is a bufiox.Reader
// in kitex under v0.10.0, reader is from the field 'trans' which is kitex byteBuffer (mostly NetpollByteBuffer)
fieldNames := []string{"br", "trans"}
for _, fn := range fieldNames {
reader, exist, err := getUnexportField(iprot, fn)
Expand All @@ -77,7 +44,9 @@ func AdaptRead(iprot interface{}, readFunc func(buf []byte) (int, error)) error
switch r := reader.(type) {
case bufiox.Reader:
br = r
case ByteBuffer:
case byteBuffer:
// if reader is from byteBuffer, Read() function is not always available
// so use an adaptor to implement Read() by Next() and ReadableLen()
rd := next2Reader(r)
br = bufiox.NewDefaultReader(rd)
default:
Expand All @@ -98,11 +67,16 @@ func AdaptRead(iprot interface{}, readFunc func(buf []byte) (int, error)) error
return err
}

// AdaptWrite receive a kitex binary protocol and write it by given function.
func AdaptWrite(oprot interface{}, writeFunc func() []byte) error {
var bw bufiox.Writer
// if iprot is from kitex v0.12.0+, use interface assert to get bufiox writer
if bp, ok := oprot.(bufioxReaderWriter); ok {
bw = bp.GetBufioxWriter()
} else {
// if iprot is from kitex version lower than v0.12.0, use reflection to get writer
// in kitex v0.10.0, writer is from the field 'bw' which is a bufiox.Writer
// in kitex under v0.10.0, writer is from the field 'trans' which implements the interface io.Writer
fieldNames := []string{"bw", "trans"}
for _, fn := range fieldNames {
writer, exist, err := getUnexportField(oprot, fn)
Expand Down Expand Up @@ -133,6 +107,7 @@ func AdaptWrite(oprot interface{}, writeFunc func() []byte) error {
return bw.Flush()
}

// getUnexportField retrieves the value of an unexported struct field.
func getUnexportField(p interface{}, fieldName string) (value interface{}, ok bool, error error) {
if reflect.TypeOf(p).Kind() != reflect.Ptr {
return nil, false, fmt.Errorf("%s is not a ptr", p)
Expand All @@ -143,3 +118,45 @@ func getUnexportField(p interface{}, fieldName string) (value interface{}, ok bo
}
return nil, false, nil
}

// bufioxReaderWriter
type bufioxReaderWriter interface {
GetBufioxReader() bufiox.Reader
GetBufioxWriter() bufiox.Writer
}

// byteBuffer
type byteBuffer interface {
// Next reads the next n bytes sequentially and returns the original buffer.
Next(n int) (p []byte, err error)

// ReadableLen returns the total length of readable buffer.
// Return: -1 means unreadable.
ReadableLen() (n int)
}

// nextReader is an adaptor that implement Read() by Next() and ReadableLen()
type nextReader struct {
nx byteBuffer
}

// Read reads data from the nextReader's internal buffer into p.
func (nr nextReader) Read(p []byte) (n int, err error) {
readable := nr.nx.ReadableLen()
if readable == -1 {
return 0, err
}
if readable > len(p) {
readable = len(p)
}
data, err := nr.nx.Next(readable)
if err != nil {
return -1, err
}
copy(p, data)
return readable, nil
}

func next2Reader(n byteBuffer) io.Reader {
return &nextReader{nx: n}
}

0 comments on commit 629c729

Please sign in to comment.