From 9bb033ad2744a82e38a8a835730137432411509f Mon Sep 17 00:00:00 2001 From: HeyJavaBean Date: Thu, 28 Nov 2024 19:44:54 +0800 Subject: [PATCH] fix: fix buf reader --- protocol/thrift/apache/adaptor/adaptor.go | 24 +++++++++++++++++------ protocol/thrift/skipdecoder.go | 1 + 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/protocol/thrift/apache/adaptor/adaptor.go b/protocol/thrift/apache/adaptor/adaptor.go index 2f52aed..ae02315 100644 --- a/protocol/thrift/apache/adaptor/adaptor.go +++ b/protocol/thrift/apache/adaptor/adaptor.go @@ -33,6 +33,7 @@ func AdaptRead(p, iprot interface{}) error { return fmt.Errorf("no codec implementation available") } + var rd io.Reader var br bufiox.Reader // if iprot is from kitex v0.12.0+, use interface assert to get bufiox reader if bp, ok := iprot.(bufioxReaderWriter); ok { @@ -58,10 +59,10 @@ func AdaptRead(p, iprot interface{}) error { case byteBuffer: // if reader is from byteBuffer, Read() function is not always available // so use an adaptor to implement Read() by Next() and ReadableLen() - br = bufiox.NewDefaultReader(byteBuffer2ReadWriter(r)) + rd = byteBuffer2ReadWriter(r) case io.ReadWriter: - // if reader is not byteBuffer but is io.ReadWriter, it suppose to be apache thrift binary protocol - br = bufiox.NewDefaultReader(r) + // if reader is not byteBuffer but is io.ReadWriter, it supposes to be apache thrift binary protocol + rd = r default: return fmt.Errorf("reader not ok") } @@ -69,18 +70,29 @@ func AdaptRead(p, iprot interface{}) error { } } } - if br == nil { + if rd == nil && br == nil { return fmt.Errorf("no available field for reader") } - // read data from iprot - buf, err := thrift.NewSkipDecoder(br).Next(thrift.STRUCT) + var sd *thrift.SkipDecoder + if br != nil { + sd = thrift.NewSkipDecoder(br) + } else { + // if there's no bufiox.Reader, do not wrap a new bufiox.Reader, or some data will remain in the buffer + // directly read from io.Reader + sd = thrift.NewSkipDecoderWithIOReader(rd) + } + + buf, err := sd.Next(thrift.STRUCT) if err != nil { return err } + sd.Release() + // unmarshal the data into struct _, err = fastStruct.FastRead(buf) + return err } diff --git a/protocol/thrift/skipdecoder.go b/protocol/thrift/skipdecoder.go index 00c806a..c41dda8 100644 --- a/protocol/thrift/skipdecoder.go +++ b/protocol/thrift/skipdecoder.go @@ -165,6 +165,7 @@ func (p *SkipDecoder) next(n int) (buf []byte, err error) { if buf, err = p.r.Next(n); err != nil { return } + if cap(p.nextBuf)-len(p.nextBuf) < n { var ncap int for ncap = cap(p.nextBuf) * 2; ncap-len(p.nextBuf) < n; ncap *= 2 {