Skip to content

Commit

Permalink
feat: support ttheader streaming codec (#23)
Browse files Browse the repository at this point in the history
* feat: support ttheader streaming codec

* perf: reuse buffer struct

* Revert "perf: reuse buffer struct"

This reverts commit cb71232.
  • Loading branch information
joway authored Nov 20, 2024
1 parent a218fe6 commit b8f4557
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 0 deletions.
3 changes: 3 additions & 0 deletions protocol/ttheader/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func Decode(ctx context.Context, in bufiox.Reader) (param DecodeParam, err error
if headerInfo, err = in.Next(int(headerInfoSize)); err != nil {
return
}
param.ProtocolID = ProtocolID(headerInfo[0])
if err = checkProtocolID(headerInfo[0]); err != nil {
return
}
Expand Down Expand Up @@ -245,6 +246,8 @@ func checkProtocolID(protoID uint8) error {
case uint8(ProtocolIDKitexProtobuf):
case uint8(ProtocolIDThriftCompactV2):
// just for compatibility
case uint8(ProtocolIDThriftStruct):
case uint8(ProtocolIDProtobufStruct):
default:
return fmt.Errorf("unsupported ProtocolID[%d]", protoID)
}
Expand Down
11 changes: 11 additions & 0 deletions protocol/ttheader/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ const (
type HeaderFlags uint16

const (
HeaderFlagsStreaming HeaderFlags = 0b0000_0000_0000_0010
HeaderFlagSupportOutOfOrder HeaderFlags = 0x01
HeaderFlagDuplexReverse HeaderFlags = 0x08
HeaderFlagSASL HeaderFlags = 0x10
Expand All @@ -90,6 +91,8 @@ const (
ProtocolIDThriftCompact ProtocolID = 0x02 // Kitex not support
ProtocolIDThriftCompactV2 ProtocolID = 0x03 // Kitex not support
ProtocolIDKitexProtobuf ProtocolID = 0x04
ProtocolIDThriftStruct ProtocolID = 0x10 // TTHeader Streaming: only thrift struct encoded, no magic
ProtocolIDProtobufStruct ProtocolID = 0x11 // TTHeader Streaming: only protobuf struct encoded, no magic
ProtocolIDDefault = ProtocolIDThriftBinary
)

Expand All @@ -102,6 +105,14 @@ const (
InfoIDACLToken InfoIDType = 0x11
)

const (
FrameTypeMeta = "1"
FrameTypeHeader = "2"
FrameTypeData = "3"
FrameTypeTrailer = "4"
FrameTypeInvalid = ""
)

// EncodeParam is used to set up params to encode ttheader.
type EncodeParam struct {
// Flags is used to set up header flags, default is 0.
Expand Down
29 changes: 29 additions & 0 deletions protocol/ttheader/encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,35 @@ func TestEncode(t *testing.T) {
checkParamEqual(t, encodeParam, decodeParam, headerLen, 0)
}

func TestEncodeStreamingFrame(t *testing.T) {
seqId++
encodeParam := EncodeParam{
Flags: HeaderFlagsStreaming,
SeqID: seqId,
ProtocolID: ProtocolIDThriftStruct,
IntInfo: map[uint16]string{
ToService: "to.service",
ToCluster: "to.cluster",
ToMethod: "method",
LogID: "xxxxxxxxx",
},
StrInfo: map[string]string{
HeaderIDLServiceName: "a.b.c",
HeaderTransToIDC: "to_idc",
},
}
buf, err := EncodeToBytes(context.Background(), encodeParam)
if err != nil {
t.Fatalf("encode to bytes failed, %s", err.Error())
}
binary.BigEndian.PutUint32(buf, uint32(len(buf)-4))
decodeParam, err := DecodeFromBytes(context.Background(), buf)
if err != nil {
t.Fatalf("encode to bytes failed, %s", err.Error())
}
checkParamEqual(t, encodeParam, decodeParam, len(buf), 0)
}

func checkParamEqual(t *testing.T, encodeParam EncodeParam, decodeParam DecodeParam, headerLen, payloadLen int) {
if decodeParam.Flags != encodeParam.Flags {
t.Fatalf("encode to bytes failed, flags not equal")
Expand Down
2 changes: 2 additions & 0 deletions protocol/ttheader/metakey.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
HTTPContentType
RawRingHashKey
LBType
ClusterShardID
FrameType // ttheader streaming frame type
)

// string keys of ttheader transport
Expand Down
8 changes: 8 additions & 0 deletions protocol/ttheader/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,11 @@ func WriteString2BLen(val string, out bufiox.Writer) (int, error) {
}
return n + 2, nil
}

func IsStreaming(bytes []byte) bool {
if len(bytes) < 8 {
return false
}
return binary.BigEndian.Uint16(bytes[Size32:]) == uint16(TTHeaderMagic>>16) &&
binary.BigEndian.Uint16(bytes[Size32+Size16:])&uint16(HeaderFlagsStreaming) != 0
}

0 comments on commit b8f4557

Please sign in to comment.