From b7ee34d07c0e5a4e4bdd1f9a7145866ea7771380 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Mon, 11 Nov 2024 20:24:22 +0900 Subject: [PATCH 1/5] =?UTF-8?q?=E4=B8=8D=E8=A6=81=E3=81=AA=E3=81=9F?= =?UTF-8?q?=E3=82=81=E5=89=8A=E9=99=A4=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/handler.go b/handler.go index f16a1db..282f47b 100644 --- a/handler.go +++ b/handler.go @@ -267,13 +267,6 @@ func readPacketWithHeader(reader io.Reader) (io.Reader, error) { continue } - // payload が足りないのでさらに読み込む - if length < (20 + payloadLength) { - // 前の payload へ追加して次へ - payload = append(payload, p...) - continue - } - // 次の frame が含まれている場合 if length > (20 + payloadLength) { if _, err := w.Write(p[:payloadLength]); err != nil { @@ -319,8 +312,6 @@ func readPacketWithHeader(reader io.Reader) (io.Reader, error) { break } } - - continue } } else { // ヘッダー分に足りなければ次の読み込みへ From 35ffc8264528048bf143f0df0825ced885ba5395 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Mon, 11 Nov 2024 20:28:44 +0900 Subject: [PATCH 2/5] =?UTF-8?q?=E3=82=B3=E3=82=B9=E3=83=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler.go | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/handler.go b/handler.go index 282f47b..a7967e6 100644 --- a/handler.go +++ b/handler.go @@ -231,6 +231,10 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte } } +const ( + HeaderLength = 20 +) + func readPacketWithHeader(reader io.Reader) (io.Reader, error) { r, w := io.Pipe() @@ -240,7 +244,7 @@ func readPacketWithHeader(reader io.Reader) (io.Reader, error) { var payload []byte for { - buf := make([]byte, 20+0xffff) + buf := make([]byte, HeaderLength+0xffff) n, err := reader.Read(buf) if err != nil { w.CloseWithError(err) @@ -252,12 +256,12 @@ func readPacketWithHeader(reader io.Reader) (io.Reader, error) { if length > 20 { // timestamp(64), sequence number(64), length(32) - h := payload[0:20] - p := payload[20:length] + h := payload[:HeaderLength] + p := payload[HeaderLength:] - payloadLength = int(binary.BigEndian.Uint32(h[16:20])) + payloadLength = int(binary.BigEndian.Uint32(h[16:HeaderLength])) - if length == (20 + payloadLength) { + if length == (HeaderLength + payloadLength) { if _, err := w.Write(p); err != nil { w.CloseWithError(err) return @@ -268,7 +272,7 @@ func readPacketWithHeader(reader io.Reader) (io.Reader, error) { } // 次の frame が含まれている場合 - if length > (20 + payloadLength) { + if length > (HeaderLength + payloadLength) { if _, err := w.Write(p[:payloadLength]); err != nil { w.CloseWithError(err) return @@ -279,14 +283,14 @@ func readPacketWithHeader(reader io.Reader) (io.Reader, error) { // 次の payload がすでにある場合の処理 for { - if length > 20 { - h = payload[0:20] - p = payload[20:length] + if length > HeaderLength { + h = payload[:HeaderLength] + p = payload[HeaderLength:] - payloadLength = int(binary.BigEndian.Uint32(h[16:20])) + payloadLength = int(binary.BigEndian.Uint32(h[16:HeaderLength])) // すでに次の payload が全てある場合 - if length == (20 + payloadLength) { + if length == (HeaderLength + payloadLength) { if _, err := w.Write(p); err != nil { w.CloseWithError(err) return @@ -296,7 +300,7 @@ func readPacketWithHeader(reader io.Reader) (io.Reader, error) { continue } - if length > (20 + payloadLength) { + if length > (HeaderLength + payloadLength) { if _, err := w.Write(p[:payloadLength]); err != nil { w.CloseWithError(err) return From 6ff65aaa203d59e5273324d28ca4d47370ee2544 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Tue, 12 Nov 2024 10:33:29 +0900 Subject: [PATCH 3/5] =?UTF-8?q?=E5=87=A6=E7=90=86=E3=82=92=E3=81=BE?= =?UTF-8?q?=E3=81=A8=E3=82=81=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler.go | 109 ++++++++++++++++++++++++----------------------------- 1 file changed, 49 insertions(+), 60 deletions(-) diff --git a/handler.go b/handler.go index a7967e6..1313724 100644 --- a/handler.go +++ b/handler.go @@ -254,72 +254,61 @@ func readPacketWithHeader(reader io.Reader) (io.Reader, error) { payload = append(payload, buf[:n]...) length += n - if length > 20 { - // timestamp(64), sequence number(64), length(32) - h := payload[:HeaderLength] - p := payload[HeaderLength:] + // ヘッダー分のデータが揃っていないので、次の読み込みへ + if length < HeaderLength { + continue + } + + // timestamp(64), sequence number(64), length(32) + h := payload[:HeaderLength] + p := payload[HeaderLength:] + + payloadLength = int(binary.BigEndian.Uint32(h[16:HeaderLength])) + + // payload が足りないので、次の読み込みへ + if length < (HeaderLength + payloadLength) { + continue + } + + if _, err := w.Write(p[:payloadLength]); err != nil { + w.CloseWithError(err) + return + } + + payload = p[payloadLength:] + length = len(payload) + + // 全てのデータを書き込んだ場合は次の読み込みへ + if length == 0 { + continue + } + + // 次の frame が含まれている場合 + for { + // ヘッダー分のデータが揃っていないので、次の読み込みへ + if length < HeaderLength { + break + } + + h = payload[:HeaderLength] + p = payload[HeaderLength:] payloadLength = int(binary.BigEndian.Uint32(h[16:HeaderLength])) - if length == (HeaderLength + payloadLength) { - if _, err := w.Write(p); err != nil { - w.CloseWithError(err) - return - } - payload = []byte{} - length = 0 - continue + // payload が足りないので、次の読み込みへ + if length < (HeaderLength + payloadLength) { + break } - // 次の frame が含まれている場合 - if length > (HeaderLength + payloadLength) { - if _, err := w.Write(p[:payloadLength]); err != nil { - w.CloseWithError(err) - return - } - // 次の payload 処理へ - payload = p[payloadLength:] - length = len(payload) - - // 次の payload がすでにある場合の処理 - for { - if length > HeaderLength { - h = payload[:HeaderLength] - p = payload[HeaderLength:] - - payloadLength = int(binary.BigEndian.Uint32(h[16:HeaderLength])) - - // すでに次の payload が全てある場合 - if length == (HeaderLength + payloadLength) { - if _, err := w.Write(p); err != nil { - w.CloseWithError(err) - return - } - payload = []byte{} - length = 0 - continue - } - - if length > (HeaderLength + payloadLength) { - if _, err := w.Write(p[:payloadLength]); err != nil { - w.CloseWithError(err) - return - } - - // 次の payload 処理へ - payload = p[payloadLength:] - length = len(payload) - continue - } - } else { - // payload が足りないので、次の読み込みへ - break - } - } + // データが足りているので payloadLength まで書き込む + if _, err := w.Write(p[:payloadLength]); err != nil { + w.CloseWithError(err) + return } - } else { - // ヘッダー分に足りなければ次の読み込みへ - continue + + // 残りの処理へ + payload = p[payloadLength:] + length = len(payload) } } }() From 0f31a8e5af5392077548b47da614db9c81f59003 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Tue, 12 Nov 2024 10:59:01 +0900 Subject: [PATCH 4/5] =?UTF-8?q?=E5=AE=9A=E6=95=B0=E3=81=AB=E3=81=BE?= =?UTF-8?q?=E3=81=A8=E3=82=81=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/handler.go b/handler.go index 1313724..eddb06b 100644 --- a/handler.go +++ b/handler.go @@ -16,7 +16,9 @@ import ( ) const ( - FrameSize = 1024 * 10 + FrameSize = 1024 * 10 + HeaderLength = 20 + MaxPayloadLength = 0xffff ) var ( @@ -231,9 +233,7 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte } } -const ( - HeaderLength = 20 -) +const () func readPacketWithHeader(reader io.Reader) (io.Reader, error) { r, w := io.Pipe() @@ -244,7 +244,7 @@ func readPacketWithHeader(reader io.Reader) (io.Reader, error) { var payload []byte for { - buf := make([]byte, HeaderLength+0xffff) + buf := make([]byte, HeaderLength+MaxPayloadLength) n, err := reader.Read(buf) if err != nil { w.CloseWithError(err) From 19a56bff3d3ce7ea5569b8d03aa69a2aee26e538 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Tue, 12 Nov 2024 12:41:45 +0900 Subject: [PATCH 5/5] =?UTF-8?q?=E5=8F=97=E4=BF=A1=E3=81=A7=E3=81=8D?= =?UTF-8?q?=E3=81=9F=E3=83=87=E3=83=BC=E3=82=BF=E9=95=B7=E3=81=94=E3=81=A8?= =?UTF-8?q?=E3=81=AE=E3=83=86=E3=82=B9=E3=83=88=E3=82=92=E8=BF=BD=E5=8A=A0?= =?UTF-8?q?=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler_test.go | 244 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 206 insertions(+), 38 deletions(-) diff --git a/handler_test.go b/handler_test.go index b743291..b7192ad 100644 --- a/handler_test.go +++ b/handler_test.go @@ -29,15 +29,15 @@ func (e *ErrReadCloser) Close() error { } func TestOpusPacketReader(t *testing.T) { + c := Config{ + AudioStreamingHeader: false, + } t.Run("success", func(t *testing.T) { d := time.Duration(100) * time.Millisecond r := readDumpFile(t, "testdata/000.jsonl", 0) defer r.Close() - c := Config{ - AudioStreamingHeader: false, - } reader := NewOpusReader(c, d, r) for { @@ -51,41 +51,12 @@ func TestOpusPacketReader(t *testing.T) { } }) - t.Run("audio streaming header", func(t *testing.T) { - d := time.Duration(100) * time.Millisecond - r := readDumpFile(t, "testdata/header.jsonl", 0) - defer r.Close() - - c := Config{ - AudioStreamingHeader: true, - } - reader := NewOpusReader(c, d, r) - - for { - buf := make([]byte, FrameSize) - _, err := reader.Read(buf) - if err != nil { - assert.ErrorIs(t, err, io.EOF) - break - } - // seqNum - assert.Equal(t, buf[8:16], []byte{0, 0, 0, 0, 0, 0, 0, 0}) - // length - assert.Equal(t, buf[16:20], []byte{0, 0, 0, 3}) - assert.Equal(t, buf[20:23], []byte{252, 255, 254}) - - } - }) - t.Run("read error", func(t *testing.T) { d := time.Duration(100) * time.Millisecond errPacketRead := errors.New("packet read error") r := NewErrReadCloser(errPacketRead) - c := Config{ - AudioStreamingHeader: false, - } reader := NewOpusReader(c, d, &r) for { @@ -104,9 +75,6 @@ func TestOpusPacketReader(t *testing.T) { r := readDumpFile(t, "testdata/dump.jsonl", 0) r.Close() - c := Config{ - AudioStreamingHeader: false, - } reader := NewOpusReader(c, d, r) for { @@ -127,9 +95,6 @@ func TestOpusPacketReader(t *testing.T) { r.Close() }() - c := Config{ - AudioStreamingHeader: false, - } reader := NewOpusReader(c, d, r) for { @@ -143,3 +108,206 @@ func TestOpusPacketReader(t *testing.T) { }) } + +func TestReadPacketWithHeader(t *testing.T) { + testCaces := []struct { + Name string + Data [][]byte + Expect [][]byte + }{ + { + Name: "success", + Data: [][]byte{ + { + 0, 5, 236, 96, 167, 215, 194, 192, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 3, + 252, 255, 254, + }, + { + 0, 5, 236, 96, 167, 215, 194, 193, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 3, + 252, 255, 255, + }, + }, + Expect: [][]byte{ + { + 252, 255, 254, + }, + { + 252, 255, 255, + }, + }, + }, + { + Name: "multiple data", + Data: [][]byte{ + { + 0, 5, 236, 96, 167, 215, 194, 192, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 3, + 252, 255, 254, + 0, 5, 236, 96, 167, 215, 194, 193, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 3, + 252, 255, 255, + }, + }, + Expect: [][]byte{ + { + 252, 255, 254, + }, + { + 252, 255, 255, + }, + }, + }, + { + Name: "split data", + Data: [][]byte{ + { + 0, 5, 236, 96, 167, 215, 194, 192, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 3, + }, + { + 252, 255, 254, + }, + }, + Expect: [][]byte{ + { + 252, 255, 254, + }, + }, + }, + { + Name: "split data", + Data: [][]byte{ + { + 0, 5, 236, 96, 167, 215, 194, 192, + 0, 0, 0, 0, 0, 0, 0, 0, + }, + { + 0, 0, 0, 3, + 252, 255, 254, + }, + }, + Expect: [][]byte{ + { + 252, 255, 254, + }, + }, + }, + { + Name: "split data", + Data: [][]byte{ + { + 0, 5, 236, 96, 167, 215, 194, 192, + }, + { + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 3, + 252, 255, 254, + }, + }, + Expect: [][]byte{ + { + 252, 255, 254, + }, + }, + }, + { + Name: "split data", + Data: [][]byte{ + { + 0, 5, 236, 96, 167, 215, 194, 192, + 0, 0, 0, 0, 0, 0, 0, 0, + }, + { + 0, 0, 0, 3, + 252, 255, 254, + 0, 5, 236, 96, 167, 215, 194, 193, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 3, + 252, 255, 255, + }, + }, + Expect: [][]byte{ + { + 252, 255, 254, + }, + { + 252, 255, 255, + }, + }, + }, + { + Name: "split data", + Data: [][]byte{ + { + 0, 5, 236, 96, 167, 215, 194, 192, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 3, + 252, 255, 254, + 0, 5, 236, 96, 167, 215, 194, 193, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 3, + }, + { + 252, 255, 255, + }, + }, + Expect: [][]byte{ + { + 252, 255, 254, + }, + { + 252, 255, 255, + }, + }, + }, + } + + for _, tc := range testCaces { + t.Run(tc.Name, func(t *testing.T) { + reader, writer := io.Pipe() + defer reader.Close() + + go func() { + defer writer.Close() + for _, data := range tc.Data { + _, err := writer.Write(data) + if err != nil { + if assert.ErrorIs(t, err, io.EOF) { + break + } + t.Error(t, err) + return + } + } + }() + + r, err := readPacketWithHeader(reader) + assert.NoError(t, err) + + i := 0 + for { + buf := make([]byte, HeaderLength+MaxPayloadLength) + n, err := r.Read(buf) + if err != nil { + if assert.ErrorIs(t, err, io.EOF) { + break + } + t.Error(t, err) + return + } + + assert.Equal(t, tc.Expect[i], buf[:n]) + + i += 1 + } + + }) + } +}