diff --git a/handler.go b/handler.go index f16a1db..eddb06b 100644 --- a/handler.go +++ b/handler.go @@ -16,7 +16,9 @@ import ( ) const ( - FrameSize = 1024 * 10 + FrameSize = 1024 * 10 + HeaderLength = 20 + MaxPayloadLength = 0xffff ) var ( @@ -231,6 +233,8 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte } } +const () + func readPacketWithHeader(reader io.Reader) (io.Reader, error) { r, w := io.Pipe() @@ -240,7 +244,7 @@ func readPacketWithHeader(reader io.Reader) (io.Reader, error) { var payload []byte for { - buf := make([]byte, 20+0xffff) + buf := make([]byte, HeaderLength+MaxPayloadLength) n, err := reader.Read(buf) if err != nil { w.CloseWithError(err) @@ -250,81 +254,61 @@ func readPacketWithHeader(reader io.Reader) (io.Reader, error) { payload = append(payload, buf[:n]...) length += n - if length > 20 { - // timestamp(64), sequence number(64), length(32) - h := payload[0:20] - p := payload[20:length] + // ヘッダー分のデータが揃っていないので、次の読み込みへ + if length < HeaderLength { + continue + } - payloadLength = int(binary.BigEndian.Uint32(h[16:20])) + // timestamp(64), sequence number(64), length(32) + h := payload[:HeaderLength] + p := payload[HeaderLength:] - if length == (20 + payloadLength) { - if _, err := w.Write(p); err != nil { - w.CloseWithError(err) - return - } - payload = []byte{} - length = 0 - continue - } + payloadLength = int(binary.BigEndian.Uint32(h[16:HeaderLength])) + + // payload が足りないので、次の読み込みへ + if length < (HeaderLength + payloadLength) { + continue + } - // payload が足りないのでさらに読み込む - if length < (20 + payloadLength) { - // 前の payload へ追加して次へ - payload = append(payload, p...) - continue + if _, err := w.Write(p[:payloadLength]); err != nil { + w.CloseWithError(err) + return + } + + payload = p[payloadLength:] + length = len(payload) + + // 全てのデータを書き込んだ場合は次の読み込みへ + if length == 0 { + continue + } + + // 次の frame が含まれている場合 + for { + // ヘッダー分のデータが揃っていないので、次の読み込みへ + if length < HeaderLength { + break } - // 次の frame が含まれている場合 - if length > (20 + payloadLength) { - if _, err := w.Write(p[:payloadLength]); err != nil { - w.CloseWithError(err) - return - } - // 次の payload 処理へ - payload = p[payloadLength:] - length = len(payload) - - // 次の payload がすでにある場合の処理 - for { - if length > 20 { - h = payload[0:20] - p = payload[20:length] - - payloadLength = int(binary.BigEndian.Uint32(h[16:20])) - - // すでに次の payload が全てある場合 - if length == (20 + payloadLength) { - if _, err := w.Write(p); err != nil { - w.CloseWithError(err) - return - } - payload = []byte{} - length = 0 - continue - } - - if length > (20 + payloadLength) { - if _, err := w.Write(p[:payloadLength]); err != nil { - w.CloseWithError(err) - return - } - - // 次の payload 処理へ - payload = p[payloadLength:] - length = len(payload) - continue - } - } else { - // payload が足りないので、次の読み込みへ - break - } - } + h = payload[:HeaderLength] + p = payload[HeaderLength:] - continue + payloadLength = int(binary.BigEndian.Uint32(h[16:HeaderLength])) + + // payload が足りないので、次の読み込みへ + if length < (HeaderLength + payloadLength) { + break } - } else { - // ヘッダー分に足りなければ次の読み込みへ - continue + + // データが足りているので payloadLength まで書き込む + if _, err := w.Write(p[:payloadLength]); err != nil { + w.CloseWithError(err) + return + } + + // 残りの処理へ + payload = p[payloadLength:] + length = len(payload) } } }() diff --git a/handler_test.go b/handler_test.go index b743291..b7192ad 100644 --- a/handler_test.go +++ b/handler_test.go @@ -29,15 +29,15 @@ func (e *ErrReadCloser) Close() error { } func TestOpusPacketReader(t *testing.T) { + c := Config{ + AudioStreamingHeader: false, + } t.Run("success", func(t *testing.T) { d := time.Duration(100) * time.Millisecond r := readDumpFile(t, "testdata/000.jsonl", 0) defer r.Close() - c := Config{ - AudioStreamingHeader: false, - } reader := NewOpusReader(c, d, r) for { @@ -51,41 +51,12 @@ func TestOpusPacketReader(t *testing.T) { } }) - t.Run("audio streaming header", func(t *testing.T) { - d := time.Duration(100) * time.Millisecond - r := readDumpFile(t, "testdata/header.jsonl", 0) - defer r.Close() - - c := Config{ - AudioStreamingHeader: true, - } - reader := NewOpusReader(c, d, r) - - for { - buf := make([]byte, FrameSize) - _, err := reader.Read(buf) - if err != nil { - assert.ErrorIs(t, err, io.EOF) - break - } - // seqNum - assert.Equal(t, buf[8:16], []byte{0, 0, 0, 0, 0, 0, 0, 0}) - // length - assert.Equal(t, buf[16:20], []byte{0, 0, 0, 3}) - assert.Equal(t, buf[20:23], []byte{252, 255, 254}) - - } - }) - t.Run("read error", func(t *testing.T) { d := time.Duration(100) * time.Millisecond errPacketRead := errors.New("packet read error") r := NewErrReadCloser(errPacketRead) - c := Config{ - AudioStreamingHeader: false, - } reader := NewOpusReader(c, d, &r) for { @@ -104,9 +75,6 @@ func TestOpusPacketReader(t *testing.T) { r := readDumpFile(t, "testdata/dump.jsonl", 0) r.Close() - c := Config{ - AudioStreamingHeader: false, - } reader := NewOpusReader(c, d, r) for { @@ -127,9 +95,6 @@ func TestOpusPacketReader(t *testing.T) { r.Close() }() - c := Config{ - AudioStreamingHeader: false, - } reader := NewOpusReader(c, d, r) for { @@ -143,3 +108,206 @@ func TestOpusPacketReader(t *testing.T) { }) } + +func TestReadPacketWithHeader(t *testing.T) { + testCaces := []struct { + Name string + Data [][]byte + Expect [][]byte + }{ + { + Name: "success", + Data: [][]byte{ + { + 0, 5, 236, 96, 167, 215, 194, 192, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 3, + 252, 255, 254, + }, + { + 0, 5, 236, 96, 167, 215, 194, 193, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 3, + 252, 255, 255, + }, + }, + Expect: [][]byte{ + { + 252, 255, 254, + }, + { + 252, 255, 255, + }, + }, + }, + { + Name: "multiple data", + Data: [][]byte{ + { + 0, 5, 236, 96, 167, 215, 194, 192, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 3, + 252, 255, 254, + 0, 5, 236, 96, 167, 215, 194, 193, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 3, + 252, 255, 255, + }, + }, + Expect: [][]byte{ + { + 252, 255, 254, + }, + { + 252, 255, 255, + }, + }, + }, + { + Name: "split data", + Data: [][]byte{ + { + 0, 5, 236, 96, 167, 215, 194, 192, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 3, + }, + { + 252, 255, 254, + }, + }, + Expect: [][]byte{ + { + 252, 255, 254, + }, + }, + }, + { + Name: "split data", + Data: [][]byte{ + { + 0, 5, 236, 96, 167, 215, 194, 192, + 0, 0, 0, 0, 0, 0, 0, 0, + }, + { + 0, 0, 0, 3, + 252, 255, 254, + }, + }, + Expect: [][]byte{ + { + 252, 255, 254, + }, + }, + }, + { + Name: "split data", + Data: [][]byte{ + { + 0, 5, 236, 96, 167, 215, 194, 192, + }, + { + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 3, + 252, 255, 254, + }, + }, + Expect: [][]byte{ + { + 252, 255, 254, + }, + }, + }, + { + Name: "split data", + Data: [][]byte{ + { + 0, 5, 236, 96, 167, 215, 194, 192, + 0, 0, 0, 0, 0, 0, 0, 0, + }, + { + 0, 0, 0, 3, + 252, 255, 254, + 0, 5, 236, 96, 167, 215, 194, 193, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 3, + 252, 255, 255, + }, + }, + Expect: [][]byte{ + { + 252, 255, 254, + }, + { + 252, 255, 255, + }, + }, + }, + { + Name: "split data", + Data: [][]byte{ + { + 0, 5, 236, 96, 167, 215, 194, 192, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 3, + 252, 255, 254, + 0, 5, 236, 96, 167, 215, 194, 193, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 3, + }, + { + 252, 255, 255, + }, + }, + Expect: [][]byte{ + { + 252, 255, 254, + }, + { + 252, 255, 255, + }, + }, + }, + } + + for _, tc := range testCaces { + t.Run(tc.Name, func(t *testing.T) { + reader, writer := io.Pipe() + defer reader.Close() + + go func() { + defer writer.Close() + for _, data := range tc.Data { + _, err := writer.Write(data) + if err != nil { + if assert.ErrorIs(t, err, io.EOF) { + break + } + t.Error(t, err) + return + } + } + }() + + r, err := readPacketWithHeader(reader) + assert.NoError(t, err) + + i := 0 + for { + buf := make([]byte, HeaderLength+MaxPayloadLength) + n, err := r.Read(buf) + if err != nil { + if assert.ErrorIs(t, err, io.EOF) { + break + } + t.Error(t, err) + return + } + + assert.Equal(t, tc.Expect[i], buf[:n]) + + i += 1 + } + + }) + } +}