diff --git a/protocol/ttheader/decode.go b/protocol/ttheader/decode.go index 6942737..3930d89 100644 --- a/protocol/ttheader/decode.go +++ b/protocol/ttheader/decode.go @@ -94,6 +94,7 @@ func Decode(ctx context.Context, in bufiox.Reader) (param DecodeParam, err error if headerInfo, err = in.Next(int(headerInfoSize)); err != nil { return } + param.ProtocolID = ProtocolID(headerInfo[0]) if err = checkProtocolID(headerInfo[0]); err != nil { return } @@ -245,6 +246,8 @@ func checkProtocolID(protoID uint8) error { case uint8(ProtocolIDKitexProtobuf): case uint8(ProtocolIDThriftCompactV2): // just for compatibility + case uint8(ProtocolIDThriftStruct): + case uint8(ProtocolIDProtobufStruct): default: return fmt.Errorf("unsupported ProtocolID[%d]", protoID) } diff --git a/protocol/ttheader/encode.go b/protocol/ttheader/encode.go index 11613ee..70bcf5c 100644 --- a/protocol/ttheader/encode.go +++ b/protocol/ttheader/encode.go @@ -76,6 +76,7 @@ const ( type HeaderFlags uint16 const ( + HeaderFlagsStreaming HeaderFlags = 0b0000_0000_0000_0010 HeaderFlagSupportOutOfOrder HeaderFlags = 0x01 HeaderFlagDuplexReverse HeaderFlags = 0x08 HeaderFlagSASL HeaderFlags = 0x10 @@ -90,6 +91,8 @@ const ( ProtocolIDThriftCompact ProtocolID = 0x02 // Kitex not support ProtocolIDThriftCompactV2 ProtocolID = 0x03 // Kitex not support ProtocolIDKitexProtobuf ProtocolID = 0x04 + ProtocolIDThriftStruct ProtocolID = 0x10 // TTHeader Streaming: only thrift struct encoded, no magic + ProtocolIDProtobufStruct ProtocolID = 0x11 // TTHeader Streaming: only protobuf struct encoded, no magic ProtocolIDDefault = ProtocolIDThriftBinary ) @@ -102,6 +105,14 @@ const ( InfoIDACLToken InfoIDType = 0x11 ) +const ( + FrameTypeMeta = "1" + FrameTypeHeader = "2" + FrameTypeData = "3" + FrameTypeTrailer = "4" + FrameTypeInvalid = "" +) + // EncodeParam is used to set up params to encode ttheader. type EncodeParam struct { // Flags is used to set up header flags, default is 0. diff --git a/protocol/ttheader/encode_test.go b/protocol/ttheader/encode_test.go index 1461163..5507eeb 100644 --- a/protocol/ttheader/encode_test.go +++ b/protocol/ttheader/encode_test.go @@ -116,6 +116,35 @@ func TestEncode(t *testing.T) { checkParamEqual(t, encodeParam, decodeParam, headerLen, 0) } +func TestEncodeStreamingFrame(t *testing.T) { + seqId++ + encodeParam := EncodeParam{ + Flags: HeaderFlagsStreaming, + SeqID: seqId, + ProtocolID: ProtocolIDThriftStruct, + IntInfo: map[uint16]string{ + ToService: "to.service", + ToCluster: "to.cluster", + ToMethod: "method", + LogID: "xxxxxxxxx", + }, + StrInfo: map[string]string{ + HeaderIDLServiceName: "a.b.c", + HeaderTransToIDC: "to_idc", + }, + } + buf, err := EncodeToBytes(context.Background(), encodeParam) + if err != nil { + t.Fatalf("encode to bytes failed, %s", err.Error()) + } + binary.BigEndian.PutUint32(buf, uint32(len(buf)-4)) + decodeParam, err := DecodeFromBytes(context.Background(), buf) + if err != nil { + t.Fatalf("encode to bytes failed, %s", err.Error()) + } + checkParamEqual(t, encodeParam, decodeParam, len(buf), 0) +} + func checkParamEqual(t *testing.T, encodeParam EncodeParam, decodeParam DecodeParam, headerLen, payloadLen int) { if decodeParam.Flags != encodeParam.Flags { t.Fatalf("encode to bytes failed, flags not equal") diff --git a/protocol/ttheader/metakey.go b/protocol/ttheader/metakey.go index 8acbb41..2aa66c6 100644 --- a/protocol/ttheader/metakey.go +++ b/protocol/ttheader/metakey.go @@ -44,6 +44,8 @@ const ( HTTPContentType RawRingHashKey LBType + ClusterShardID + FrameType // ttheader streaming frame type ) // string keys of ttheader transport diff --git a/protocol/ttheader/utils.go b/protocol/ttheader/utils.go index 81fd9dc..b9ee004 100644 --- a/protocol/ttheader/utils.go +++ b/protocol/ttheader/utils.go @@ -128,3 +128,11 @@ func WriteString2BLen(val string, out bufiox.Writer) (int, error) { } return n + 2, nil } + +func IsStreaming(bytes []byte) bool { + if len(bytes) < 8 { + return false + } + return binary.BigEndian.Uint16(bytes[Size32:]) == uint16(TTHeaderMagic>>16) && + binary.BigEndian.Uint16(bytes[Size32+Size16:])&uint16(HeaderFlagsStreaming) != 0 +}