From 7df82469def0b52557dfc248a7e7adb267b921fa Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Mon, 4 Mar 2024 11:51:27 +0900 Subject: [PATCH 01/15] =?UTF-8?q?SuzuError=20=E3=81=A7=E3=83=AA=E3=83=88?= =?UTF-8?q?=E3=83=A9=E3=82=A4=E5=AF=BE=E8=B1=A1=E3=81=AE=E3=82=A8=E3=83=A9?= =?UTF-8?q?=E3=83=BC=E3=82=92=E5=88=A4=E5=88=A5=E3=81=A7=E3=81=8D=E3=82=8B?= =?UTF-8?q?=E3=82=88=E3=81=86=E3=81=AB=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- errors.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/errors.go b/errors.go index 34a3551..ce7ecd8 100644 --- a/errors.go +++ b/errors.go @@ -3,8 +3,13 @@ package suzu type SuzuError struct { Code int Message string + Retry bool } func (e *SuzuError) Error() string { return e.Message } + +func (e *SuzuError) IsRetry() bool { + return e.Retry +} From 855e002751f03dad1e8a60328cca63bbf691e0b0 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Mon, 4 Mar 2024 11:55:50 +0900 Subject: [PATCH 02/15] =?UTF-8?q?=E6=8E=A5=E7=B6=9A=E6=99=82=E3=81=AB?= =?UTF-8?q?=E3=83=AA=E3=83=88=E3=83=A9=E3=82=A4=E5=AF=BE=E8=B1=A1=E3=81=AE?= =?UTF-8?q?=E3=82=A8=E3=83=A9=E3=83=BC=E3=81=8C=E8=BF=94=E3=81=A3=E3=81=A6?= =?UTF-8?q?=E3=81=8D=E3=81=9F=E5=A0=B4=E5=90=88=E3=81=AF=E3=83=AA=E3=83=88?= =?UTF-8?q?=E3=83=A9=E3=82=A4=E3=81=95=E3=81=9B=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/handler.go b/handler.go index c348a42..e40f8c4 100644 --- a/handler.go +++ b/handler.go @@ -143,6 +143,22 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte Str("connection_id", h.SoraConnectionID). Send() if err, ok := err.(*SuzuError); ok { + if *s.config.Retry { + if err.IsRetry() { + retryCount += 1 + + zlog.Debug(). + Err(err). + Str("channel_id", h.SoraChannelID). + Str("connection_id", h.SoraConnectionID). + Int("retry_count", retryCount). + Send() + + // リトライ対象のエラーのため、クライアントとの接続は切らずにリトライする + continue + } + } + // SuzuError の場合はその Status Code を返す return c.NoContent(err.Code) } From 6b3b8cefefc7c52e5f324448e1672098a85e2e0c Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Mon, 4 Mar 2024 11:56:48 +0900 Subject: [PATCH 03/15] =?UTF-8?q?status=20code=20=E3=81=8C=20429=20?= =?UTF-8?q?=E3=81=AE=E5=A0=B4=E5=90=88=E3=81=AF=E3=83=AA=E3=83=88=E3=83=A9?= =?UTF-8?q?=E3=82=A4=E5=AF=BE=E8=B1=A1=E3=81=AE=E3=82=A8=E3=83=A9=E3=83=BC?= =?UTF-8?q?=E3=81=AB=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- amazon_transcribe.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/amazon_transcribe.go b/amazon_transcribe.go index 0b0e4da..a14c804 100644 --- a/amazon_transcribe.go +++ b/amazon_transcribe.go @@ -99,9 +99,16 @@ func (at *AmazonTranscribe) Start(ctx context.Context, r io.Reader) (*transcribe if reqErr, ok := err.(awserr.RequestFailure); ok { code := reqErr.StatusCode() message := reqErr.Message() + + var retry bool + if code == 429 { + retry = true + } + return nil, &SuzuError{ Code: code, Message: message, + Retry: retry, } } return nil, err From a42c993a78485a9e27e4b5a09b7a2bf05e02e3ac Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Mon, 4 Mar 2024 12:29:07 +0900 Subject: [PATCH 04/15] =?UTF-8?q?retry=20=E8=A8=AD=E5=AE=9A=E3=82=92?= =?UTF-8?q?=E5=89=8A=E9=99=A4=E3=81=97=E3=80=81max=5Fretry=20=E8=A8=AD?= =?UTF-8?q?=E5=AE=9A=E3=82=92=E8=BF=BD=E5=8A=A0=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.go | 12 +++++++----- config_example.ini | 4 ++-- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/config.go b/config.go index 4fee353..54062df 100644 --- a/config.go +++ b/config.go @@ -27,6 +27,9 @@ const ( // 100ms DefaultTimeToWaitForOpusPacketMs = 100 + + // リトライ無し + DefaultMaxRetry = 0 ) type Config struct { @@ -46,7 +49,7 @@ type Config struct { HTTP2MaxReadFrameSize uint32 `ini:"http2_max_read_frame_size"` HTTP2IdleTimeout uint32 `ini:"http2_idle_timeout"` - Retry *bool `ini:"retry"` + MaxRetry *int `ini:"max_retry"` ExporterHTTPS bool `ini:"exporter_https"` ExporterListenAddr string `ini:"exporter_listen_addr"` @@ -160,10 +163,9 @@ func setDefaultsConfig(config *Config) { config.TimeToWaitForOpusPacketMs = DefaultTimeToWaitForOpusPacketMs } - // 未指定の場合は true - if config.Retry == nil { - defaultRetry := true - config.Retry = &defaultRetry + // 未指定の場合のリトライ回数は 0 + if config.MaxRetry == nil { + *config.MaxRetry = DefaultMaxRetry } } func validateConfig(config *Config) error { diff --git a/config_example.ini b/config_example.ini index be8bf5b..fa51d2f 100644 --- a/config_example.ini +++ b/config_example.ini @@ -44,8 +44,8 @@ audio_channel_count = 1 # 受信した音声データの保存先ファイルです dump_file = ./dump.jsonl -# サーバからの切断時に再接続を試みます -retry = true +# サーバからの切断時またはハンドラー個別で指定した条件でのリトライ回数を指定します +max_retry = 0 # aws の場合は IsPartial が false, gcp の場合は IsFinal が true の場合の最終的な結果のみを返す指定 final_result_only = true From bc8fd1950166c644b409c023945d8d269f822d08 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Mon, 4 Mar 2024 14:29:44 +0900 Subject: [PATCH 05/15] =?UTF-8?q?max=5Fretry=20=E5=9B=9E=E6=95=B0=E3=81=BE?= =?UTF-8?q?=E3=81=A7=E3=83=AA=E3=83=88=E3=83=A9=E3=82=A4=E3=81=99=E3=82=8B?= =?UTF-8?q?=E3=82=88=E3=81=86=E3=81=AB=E5=A4=89=E6=9B=B4=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- amazon_transcribe_handler.go | 2 +- handler.go | 9 ++++++--- speech_to_text_handler.go | 2 +- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/amazon_transcribe_handler.go b/amazon_transcribe_handler.go index 32214b2..aa9bd79 100644 --- a/amazon_transcribe_handler.go +++ b/amazon_transcribe_handler.go @@ -163,7 +163,7 @@ func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader) Send() // リトライしない設定の場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する - if !*at.Config.Retry { + if *at.Config.MaxRetry < 1 { if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil { zlog.Error(). Err(err). diff --git a/handler.go b/handler.go index e40f8c4..532de55 100644 --- a/handler.go +++ b/handler.go @@ -143,8 +143,8 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte Str("connection_id", h.SoraConnectionID). Send() if err, ok := err.(*SuzuError); ok { - if *s.config.Retry { - if err.IsRetry() { + if err.IsRetry() { + if *s.config.MaxRetry > retryCount { retryCount += 1 zlog.Debug(). @@ -184,7 +184,7 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte Send() return err } else if errors.Is(err, ErrServerDisconnected) { - if *s.config.Retry { + if *s.config.MaxRetry > retryCount { // サーバから切断されたが再度接続できる可能性があるため、接続を試みる retryCount += 1 @@ -215,6 +215,9 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte return err } + // 1 度でも接続結果を受け取れた場合はリトライ回数をリセットする + retryCount = 0 + // メッセージが空でない場合はクライアントに結果を送信する if n > 0 { if _, err := c.Response().Write(buf[:n]); err != nil { diff --git a/speech_to_text_handler.go b/speech_to_text_handler.go index e49dea0..8aec81a 100644 --- a/speech_to_text_handler.go +++ b/speech_to_text_handler.go @@ -142,7 +142,7 @@ func (h *SpeechToTextHandler) Handle(ctx context.Context, reader io.Reader) (*io Send() // リトライしない設定の場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する - if !*stt.Config.Retry { + if *stt.Config.MaxRetry < 1 { if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil { zlog.Error(). Err(err). From 02065106e77a1b740623d0e3fc4a9f2d28697248 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Mon, 4 Mar 2024 15:17:19 +0900 Subject: [PATCH 06/15] =?UTF-8?q?max=5Fretry=20=E3=81=AE=E6=8C=87=E5=AE=9A?= =?UTF-8?q?=E3=82=92=E4=BF=AE=E6=AD=A3=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/config.go b/config.go index 54062df..da725df 100644 --- a/config.go +++ b/config.go @@ -165,7 +165,8 @@ func setDefaultsConfig(config *Config) { // 未指定の場合のリトライ回数は 0 if config.MaxRetry == nil { - *config.MaxRetry = DefaultMaxRetry + maxRetry := DefaultMaxRetry + config.MaxRetry = &maxRetry } } func validateConfig(config *Config) error { @@ -215,4 +216,5 @@ func ShowConfig(config *Config) { zlog.Info().Str("exporter_listen_addr", config.ExporterListenAddr).Msg("CONF") zlog.Info().Int("exporter_listen_port", config.ExporterListenPort).Msg("CONF") + zlog.Info().Int("max_retry", *config.MaxRetry).Msg("CONF") } From f99b71fe43b7592dc005f37581f2a61b91c005e8 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Mon, 4 Mar 2024 16:01:31 +0900 Subject: [PATCH 07/15] =?UTF-8?q?max=5Fretry=20=E3=81=8C=E6=9C=AA=E6=8C=87?= =?UTF-8?q?=E5=AE=9A=E3=81=AE=E5=A0=B4=E5=90=88=E3=81=AF=200=20=E5=9B=9E?= =?UTF-8?q?=E3=81=A7=E5=9B=BA=E5=AE=9A=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- amazon_transcribe_handler.go | 2 +- config.go | 14 +++----------- handler.go | 4 ++-- speech_to_text_handler.go | 2 +- 4 files changed, 7 insertions(+), 15 deletions(-) diff --git a/amazon_transcribe_handler.go b/amazon_transcribe_handler.go index aa9bd79..db1d67a 100644 --- a/amazon_transcribe_handler.go +++ b/amazon_transcribe_handler.go @@ -163,7 +163,7 @@ func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader) Send() // リトライしない設定の場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する - if *at.Config.MaxRetry < 1 { + if at.Config.MaxRetry < 1 { if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil { zlog.Error(). Err(err). diff --git a/config.go b/config.go index da725df..cd06cac 100644 --- a/config.go +++ b/config.go @@ -27,9 +27,6 @@ const ( // 100ms DefaultTimeToWaitForOpusPacketMs = 100 - - // リトライ無し - DefaultMaxRetry = 0 ) type Config struct { @@ -49,7 +46,7 @@ type Config struct { HTTP2MaxReadFrameSize uint32 `ini:"http2_max_read_frame_size"` HTTP2IdleTimeout uint32 `ini:"http2_idle_timeout"` - MaxRetry *int `ini:"max_retry"` + MaxRetry int `ini:"max_retry"` ExporterHTTPS bool `ini:"exporter_https"` ExporterListenAddr string `ini:"exporter_listen_addr"` @@ -162,13 +159,8 @@ func setDefaultsConfig(config *Config) { if config.TimeToWaitForOpusPacketMs == 0 { config.TimeToWaitForOpusPacketMs = DefaultTimeToWaitForOpusPacketMs } - - // 未指定の場合のリトライ回数は 0 - if config.MaxRetry == nil { - maxRetry := DefaultMaxRetry - config.MaxRetry = &maxRetry - } } + func validateConfig(config *Config) error { var err error // アドレスとして正しいことを確認する @@ -216,5 +208,5 @@ func ShowConfig(config *Config) { zlog.Info().Str("exporter_listen_addr", config.ExporterListenAddr).Msg("CONF") zlog.Info().Int("exporter_listen_port", config.ExporterListenPort).Msg("CONF") - zlog.Info().Int("max_retry", *config.MaxRetry).Msg("CONF") + zlog.Info().Int("max_retry", config.MaxRetry).Msg("CONF") } diff --git a/handler.go b/handler.go index 532de55..6ef58c3 100644 --- a/handler.go +++ b/handler.go @@ -144,7 +144,7 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte Send() if err, ok := err.(*SuzuError); ok { if err.IsRetry() { - if *s.config.MaxRetry > retryCount { + if s.config.MaxRetry > retryCount { retryCount += 1 zlog.Debug(). @@ -184,7 +184,7 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte Send() return err } else if errors.Is(err, ErrServerDisconnected) { - if *s.config.MaxRetry > retryCount { + if s.config.MaxRetry > retryCount { // サーバから切断されたが再度接続できる可能性があるため、接続を試みる retryCount += 1 diff --git a/speech_to_text_handler.go b/speech_to_text_handler.go index 8aec81a..5037a01 100644 --- a/speech_to_text_handler.go +++ b/speech_to_text_handler.go @@ -142,7 +142,7 @@ func (h *SpeechToTextHandler) Handle(ctx context.Context, reader io.Reader) (*io Send() // リトライしない設定の場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する - if *stt.Config.MaxRetry < 1 { + if stt.Config.MaxRetry < 1 { if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil { zlog.Error(). Err(err). From 1d23e3c0deaf4aae9baa17c264b7bbbe43bc1414 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Mon, 4 Mar 2024 16:20:32 +0900 Subject: [PATCH 08/15] =?UTF-8?q?status=20code=20=E3=82=92=E5=AE=9A?= =?UTF-8?q?=E6=95=B0=E3=81=AB=E7=BD=AE=E3=81=8D=E6=8F=9B=E3=81=88=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- amazon_transcribe.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amazon_transcribe.go b/amazon_transcribe.go index a14c804..ce61cbc 100644 --- a/amazon_transcribe.go +++ b/amazon_transcribe.go @@ -101,7 +101,7 @@ func (at *AmazonTranscribe) Start(ctx context.Context, r io.Reader) (*transcribe message := reqErr.Message() var retry bool - if code == 429 { + if code == http.StatusTooManyRequests { retry = true } From 3e62eaf722e206f589511a4d63f987e8ee4c4aec Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Mon, 4 Mar 2024 17:13:23 +0900 Subject: [PATCH 09/15] =?UTF-8?q?=E3=83=AA=E3=83=88=E3=83=A9=E3=82=A4?= =?UTF-8?q?=E9=96=93=E9=9A=94=E3=82=92=E6=8C=87=E5=AE=9A=E3=81=A7=E3=81=8D?= =?UTF-8?q?=E3=82=8B=E3=82=88=E3=81=86=E3=81=AB=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.go | 11 ++++++++++- config_example.ini | 2 ++ handler.go | 6 ++++++ 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/config.go b/config.go index cd06cac..45e47e8 100644 --- a/config.go +++ b/config.go @@ -27,6 +27,9 @@ const ( // 100ms DefaultTimeToWaitForOpusPacketMs = 100 + + // リトライ間隔 100ms + DefaultRetryIntervalMs = 100 ) type Config struct { @@ -46,7 +49,8 @@ type Config struct { HTTP2MaxReadFrameSize uint32 `ini:"http2_max_read_frame_size"` HTTP2IdleTimeout uint32 `ini:"http2_idle_timeout"` - MaxRetry int `ini:"max_retry"` + MaxRetry int `ini:"max_retry"` + RetryIntervalMs int `ini:"retry_interval_ms"` ExporterHTTPS bool `ini:"exporter_https"` ExporterListenAddr string `ini:"exporter_listen_addr"` @@ -159,6 +163,10 @@ func setDefaultsConfig(config *Config) { if config.TimeToWaitForOpusPacketMs == 0 { config.TimeToWaitForOpusPacketMs = DefaultTimeToWaitForOpusPacketMs } + + if config.RetryIntervalMs == 0 { + config.RetryIntervalMs = DefaultRetryIntervalMs + } } func validateConfig(config *Config) error { @@ -209,4 +217,5 @@ func ShowConfig(config *Config) { zlog.Info().Int("exporter_listen_port", config.ExporterListenPort).Msg("CONF") zlog.Info().Int("max_retry", config.MaxRetry).Msg("CONF") + zlog.Info().Int("retry_interval_ms", config.RetryIntervalMs).Msg("CONF") } diff --git a/config_example.ini b/config_example.ini index fa51d2f..90ee00d 100644 --- a/config_example.ini +++ b/config_example.ini @@ -46,6 +46,8 @@ dump_file = ./dump.jsonl # サーバからの切断時またはハンドラー個別で指定した条件でのリトライ回数を指定します max_retry = 0 +# リトライ間隔(ミリ秒)です +retry_interval_ms = 100 # aws の場合は IsPartial が false, gcp の場合は IsFinal が true の場合の最終的な結果のみを返す指定 final_result_only = true diff --git a/handler.go b/handler.go index 6ef58c3..e2bd6d0 100644 --- a/handler.go +++ b/handler.go @@ -154,6 +154,9 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte Int("retry_count", retryCount). Send() + // 連続のリトライを避けるために少し待つ + time.Sleep(time.Duration(s.config.RetryIntervalMs) * time.Millisecond) + // リトライ対象のエラーのため、クライアントとの接続は切らずにリトライする continue } @@ -194,6 +197,9 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte Str("connection_id", h.SoraConnectionID). Int("retry_count", retryCount). Send() + + // TODO: 必要な場合は連続のリトライを避けるために少し待つ処理を追加する + break } else { // サーバから切断されたが再接続させない設定の場合 From a2f93ab0aba0b65659fb8ffc7ee610419a4acd04 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Tue, 5 Mar 2024 11:55:24 +0900 Subject: [PATCH 10/15] =?UTF-8?q?=E3=83=AA=E3=83=88=E3=83=A9=E3=82=A4?= =?UTF-8?q?=E5=9B=9E=E6=95=B0=E3=81=AE=E5=87=BA=E5=8A=9B=E3=82=92=E3=81=BE?= =?UTF-8?q?=E3=81=A8=E3=82=81=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- handler.go | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/handler.go b/handler.go index e2bd6d0..e7fe51e 100644 --- a/handler.go +++ b/handler.go @@ -133,6 +133,7 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte zlog.Info(). Str("channel_id", h.SoraChannelID). Str("connection_id", h.SoraConnectionID). + Int("retry_count", retryCount). Msg("NEW-REQUEST") reader, err := serviceHandler.Handle(ctx, r) @@ -147,13 +148,6 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte if s.config.MaxRetry > retryCount { retryCount += 1 - zlog.Debug(). - Err(err). - Str("channel_id", h.SoraChannelID). - Str("connection_id", h.SoraConnectionID). - Int("retry_count", retryCount). - Send() - // 連続のリトライを避けるために少し待つ time.Sleep(time.Duration(s.config.RetryIntervalMs) * time.Millisecond) @@ -191,13 +185,6 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte // サーバから切断されたが再度接続できる可能性があるため、接続を試みる retryCount += 1 - zlog.Debug(). - Err(err). - Str("channel_id", h.SoraChannelID). - Str("connection_id", h.SoraConnectionID). - Int("retry_count", retryCount). - Send() - // TODO: 必要な場合は連続のリトライを避けるために少し待つ処理を追加する break From 8ac6c9a89e7954462a8945a2084409b8ba8e9cea Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Tue, 5 Mar 2024 17:10:11 +0900 Subject: [PATCH 11/15] =?UTF-8?q?=E8=A8=AD=E5=AE=9A=E3=81=97=E3=81=9F?= =?UTF-8?q?=E3=83=AA=E3=83=88=E3=83=A9=E3=82=A4=E5=9B=9E=E6=95=B0=E3=82=92?= =?UTF-8?q?=E8=B6=85=E3=81=88=E3=81=9F=E5=A0=B4=E5=90=88=E3=81=AB=20type:?= =?UTF-8?q?=20error=20=E3=82=92=E3=82=AF=E3=83=A9=E3=82=A4=E3=82=A2?= =?UTF-8?q?=E3=83=B3=E3=83=88=E3=81=AB=E9=80=81=E3=82=8C=E3=82=8B=E3=82=88?= =?UTF-8?q?=E3=81=86=E3=81=AB=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- amazon_transcribe_handler.go | 27 +++++++++++++++++++++++++-- handler.go | 36 +++++++++++++++++++----------------- packet_dump_handler.go | 21 +++++++++++++++++++++ service_handler.go | 3 +++ speech_to_text_handler.go | 25 +++++++++++++++++++++++-- test_handler.go | 21 +++++++++++++++++++++ test_handler_test.go | 18 ++---------------- 7 files changed, 114 insertions(+), 37 deletions(-) diff --git a/amazon_transcribe_handler.go b/amazon_transcribe_handler.go index db1d67a..f4d3512 100644 --- a/amazon_transcribe_handler.go +++ b/amazon_transcribe_handler.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "io" + "sync" "github.com/aws/aws-sdk-go/service/transcribestreamingservice" zlog "github.com/rs/zerolog/log" @@ -22,6 +23,8 @@ type AmazonTranscribeHandler struct { SampleRate uint32 ChannelCount uint16 LanguageCode string + RetryCount int + mu sync.Mutex OnResultFunc func(context.Context, io.WriteCloser, string, string, string, any) error } @@ -34,6 +37,7 @@ func NewAmazonTranscribeHandler(config Config, channelID, connectionID string, s SampleRate: sampleRate, ChannelCount: channelCount, LanguageCode: languageCode, + RetryCount: 0, OnResultFunc: onResultFunc.(func(context.Context, io.WriteCloser, string, string, string, any) error), } } @@ -67,6 +71,24 @@ func (ar *AwsResult) SetMessage(message string) *AwsResult { return ar } +func (h *AmazonTranscribeHandler) UpdateRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount++ + return h.RetryCount +} + +func (h *AmazonTranscribeHandler) GetRetryCount() int { + return h.RetryCount +} + +func (h *AmazonTranscribeHandler) ResetRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount = 0 + return h.RetryCount +} + func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) { at := NewAmazonTranscribe(h.Config, h.LanguageCode, int64(h.SampleRate), int64(h.ChannelCount)) @@ -160,10 +182,11 @@ func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader) Err(err). Str("channel_id", h.ChannelID). Str("connection_id", h.ConnectionID). + Int("retry_count", h.GetRetryCount()). Send() - // リトライしない設定の場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する - if at.Config.MaxRetry < 1 { + // リトライしない設定の場合、または、max_retry を超えた場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する + if (at.Config.MaxRetry < 1) || (at.Config.MaxRetry <= h.GetRetryCount()) { if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil { zlog.Error(). Err(err). diff --git a/handler.go b/handler.go index e7fe51e..cec4245 100644 --- a/handler.go +++ b/handler.go @@ -125,15 +125,13 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte return echo.NewHTTPError(http.StatusInternalServerError) } - retryCount := 0 - // サーバへの接続・結果の送信処理 // サーバへの再接続が期待できる限りは、再接続を試みる for { zlog.Info(). Str("channel_id", h.SoraChannelID). Str("connection_id", h.SoraConnectionID). - Int("retry_count", retryCount). + Int("retry_count", serviceHandler.GetRetryCount()). Msg("NEW-REQUEST") reader, err := serviceHandler.Handle(ctx, r) @@ -145,8 +143,8 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte Send() if err, ok := err.(*SuzuError); ok { if err.IsRetry() { - if s.config.MaxRetry > retryCount { - retryCount += 1 + if s.config.MaxRetry > serviceHandler.GetRetryCount() { + serviceHandler.UpdateRetryCount() // 連続のリトライを避けるために少し待つ time.Sleep(time.Duration(s.config.RetryIntervalMs) * time.Millisecond) @@ -155,11 +153,9 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte continue } } - // SuzuError の場合はその Status Code を返す return c.NoContent(err.Code) } - // SuzuError 以外の場合は 500 を返す return echo.NewHTTPError(http.StatusInternalServerError, err) } @@ -181,35 +177,41 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte Send() return err } else if errors.Is(err, ErrServerDisconnected) { - if s.config.MaxRetry > retryCount { + if s.config.MaxRetry < 1 { + // サーバから切断されたが再接続させない設定の場合 + zlog.Error(). + Err(err). + Str("channel_id", h.SoraChannelID). + Str("connection_id", h.SoraConnectionID). + Send() + return err + } + + if s.config.MaxRetry > serviceHandler.GetRetryCount() { // サーバから切断されたが再度接続できる可能性があるため、接続を試みる - retryCount += 1 + + serviceHandler.UpdateRetryCount() // TODO: 必要な場合は連続のリトライを避けるために少し待つ処理を追加する break } else { - // サーバから切断されたが再接続させない設定の場合 zlog.Error(). Err(err). Str("channel_id", h.SoraChannelID). Str("connection_id", h.SoraConnectionID). Send() - return err + // max_retry を超えた場合は終了 + return c.NoContent(http.StatusOK) } } - zlog.Error(). - Err(err). - Str("channel_id", h.SoraChannelID). - Str("connection_id", h.SoraConnectionID). - Send() // サーバから切断されたが再度の接続が期待できない場合 return err } // 1 度でも接続結果を受け取れた場合はリトライ回数をリセットする - retryCount = 0 + serviceHandler.ResetRetryCount() // メッセージが空でない場合はクライアントに結果を送信する if n > 0 { diff --git a/packet_dump_handler.go b/packet_dump_handler.go index 3d8b1a7..1ed7e94 100644 --- a/packet_dump_handler.go +++ b/packet_dump_handler.go @@ -5,6 +5,7 @@ import ( "encoding/json" "io" "os" + "sync" "time" ) @@ -20,6 +21,8 @@ type PacketDumpHandler struct { SampleRate uint32 ChannelCount uint16 LanguageCode string + RetryCount int + mu sync.Mutex OnResultFunc func(context.Context, io.WriteCloser, string, string, string, any) error } @@ -46,6 +49,24 @@ type PacketDumpResult struct { Payload []byte `json:"payload"` } +func (h *PacketDumpHandler) UpdateRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount++ + return h.RetryCount +} + +func (h *PacketDumpHandler) GetRetryCount() int { + return h.RetryCount +} + +func (h *PacketDumpHandler) ResetRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount = 0 + return h.RetryCount +} + func (h *PacketDumpHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) { c := h.Config filename := c.DumpFile diff --git a/service_handler.go b/service_handler.go index 0064f9f..0a8b722 100644 --- a/service_handler.go +++ b/service_handler.go @@ -16,6 +16,9 @@ var ( type serviceHandlerInterface interface { Handle(context.Context, io.Reader) (*io.PipeReader, error) + UpdateRetryCount() int + GetRetryCount() int + ResetRetryCount() int } type newServiceHandlerFunc func(Config, string, string, uint32, uint16, string, any) serviceHandlerInterface diff --git a/speech_to_text_handler.go b/speech_to_text_handler.go index 5037a01..5c85fd6 100644 --- a/speech_to_text_handler.go +++ b/speech_to_text_handler.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "strings" + "sync" zlog "github.com/rs/zerolog/log" @@ -25,6 +26,8 @@ type SpeechToTextHandler struct { SampleRate uint32 ChannelCount uint16 LanguageCode string + RetryCount int + mu sync.Mutex OnResultFunc func(context.Context, io.WriteCloser, string, string, string, any) error } @@ -70,6 +73,24 @@ func (gr *GcpResult) SetMessage(message string) *GcpResult { return gr } +func (h *SpeechToTextHandler) UpdateRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount++ + return h.RetryCount +} + +func (h *SpeechToTextHandler) GetRetryCount() int { + return h.RetryCount +} + +func (h *SpeechToTextHandler) ResetRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount = 0 + return h.RetryCount +} + func (h *SpeechToTextHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) { stt := NewSpeechToText(h.Config, h.LanguageCode, int32(h.SampleRate), int32(h.ChannelCount)) @@ -141,8 +162,8 @@ func (h *SpeechToTextHandler) Handle(ctx context.Context, reader io.Reader) (*io Int32("code", status.GetCode()). Send() - // リトライしない設定の場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する - if stt.Config.MaxRetry < 1 { + // リトライしない設定の場合、または、max_retry を超えた場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する + if (stt.Config.MaxRetry < 1) || (stt.Config.MaxRetry <= h.GetRetryCount()) { if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil { zlog.Error(). Err(err). diff --git a/test_handler.go b/test_handler.go index a8c3912..e264439 100644 --- a/test_handler.go +++ b/test_handler.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "sync" zlog "github.com/rs/zerolog/log" ) @@ -21,6 +22,8 @@ type TestHandler struct { SampleRate uint32 ChannelCount uint16 LanguageCode string + RetryCount int + mu sync.Mutex OnResultFunc func(context.Context, io.WriteCloser, string, string, string, any) error } @@ -52,6 +55,24 @@ func NewTestResult(channelID, message string) TestResult { } } +func (h *TestHandler) UpdateRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount++ + return h.RetryCount +} + +func (h *TestHandler) GetRetryCount() int { + return h.RetryCount +} + +func (h *TestHandler) ResetRetryCount() int { + defer h.mu.Unlock() + h.mu.Lock() + h.RetryCount = 0 + return h.RetryCount +} + func (h *TestHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) { r, w := io.Pipe() diff --git a/test_handler_test.go b/test_handler_test.go index 57541ff..506a6c4 100644 --- a/test_handler_test.go +++ b/test_handler_test.go @@ -243,17 +243,6 @@ func TestSpeechHandler(t *testing.T) { }) t.Run("packet read error", func(t *testing.T) { - logger := log.Logger - defer func() { - log.Logger = logger - }() - - pr, pw, err := os.Pipe() - if err != nil { - t.Fatal(err) - } - log.Logger = zerolog.New(pw).With().Caller().Timestamp().Logger() - r := iotest.ErrReader(errors.New("packet read error")) e := echo.New() @@ -271,14 +260,11 @@ func TestSpeechHandler(t *testing.T) { assert.Equal(t, "packet read error", err.Error()) } - pw.Close() - - var buf bytes.Buffer - n, err := buf.ReadFrom(pr) + line, err := rec.Body.ReadBytes([]byte("\n")[0]) if err != nil { t.Fatal(err) } - assert.Contains(t, buf.String()[:n], "packet read error") + assert.Contains(t, string(line), "packet read error") }) t.Run("silent packet", func(t *testing.T) { From 0a74e459af819b23b349d7f863884f1afd27e48b Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Wed, 6 Mar 2024 12:54:00 +0900 Subject: [PATCH 12/15] =?UTF-8?q?=E5=85=A8=E3=81=A6=E3=81=AE=E3=82=A8?= =?UTF-8?q?=E3=83=A9=E3=83=BC=E3=82=92=E3=83=AD=E3=82=B0=E3=81=AB=E5=87=BA?= =?UTF-8?q?=E3=81=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- amazon_transcribe_handler.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/amazon_transcribe_handler.go b/amazon_transcribe_handler.go index f4d3512..dd3717d 100644 --- a/amazon_transcribe_handler.go +++ b/amazon_transcribe_handler.go @@ -175,16 +175,16 @@ func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader) } if err := stream.Err(); err != nil { + zlog.Error(). + Err(err). + Str("channel_id", h.ChannelID). + Str("connection_id", h.ConnectionID). + Int("retry_count", h.GetRetryCount()). + Send() + // 復帰が不可能なエラー以外は再接続を試みる switch err.(type) { case *transcribestreamingservice.LimitExceededException: - zlog.Error(). - Err(err). - Str("channel_id", h.ChannelID). - Str("connection_id", h.ConnectionID). - Int("retry_count", h.GetRetryCount()). - Send() - // リトライしない設定の場合、または、max_retry を超えた場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する if (at.Config.MaxRetry < 1) || (at.Config.MaxRetry <= h.GetRetryCount()) { if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil { From d1290d1de1e1a325dade7e142a2047f6bda8aee3 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Wed, 6 Mar 2024 15:39:59 +0900 Subject: [PATCH 13/15] =?UTF-8?q?=E5=A4=89=E6=9B=B4=E5=B1=A5=E6=AD=B4?= =?UTF-8?q?=E3=82=92=E6=9B=B4=E6=96=B0=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGES.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index a75760a..b7ca29c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,6 +11,20 @@ ## develop +- [CHANGE] retry 設定を削除し、リトライ回数を指定する max_retry 設定を追加する + - リトライしない場合は、max_retry を設定ファイルから削除するか、または、max_retry = 0 を設定する + - @Hexa +- [ADD] サービス接続時のエラーによるリトライまでの時間間隔を指定する retry_interval_ms 設定(ミリ秒間隔)を追加する + - @Hexa +- [ADD] サービス接続時の特定のエラー発生時に、リトライする仕組みを追加する + - @Hexa +- [ADD] ハンドラーにリトライ回数を管理するメソッドを追加する + - @Hexa +- [CHANGE] aws への接続時に HTTP ステータスコードが 429 の応答の場合は、指定されたリトライ設定に応じて、再接続を試みるように変更する + - @Hexa +- [CHANGE] aws、または、gcp への接続後にリトライ回数が max_retry を超えた場合は、{"type": "error", "reason": string} をクライアントへ送信する + - @Hexa + ## 2024.1.0 From cb6b140d50984c2541061727234f46b2f5d93e8e Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Wed, 6 Mar 2024 15:57:04 +0900 Subject: [PATCH 14/15] =?UTF-8?q?=E3=83=87=E3=83=95=E3=82=A9=E3=83=AB?= =?UTF-8?q?=E3=83=88=E5=80=A4=E3=82=92=E8=BF=BD=E8=A8=98=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGES.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index b7ca29c..ae10303 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -13,8 +13,10 @@ - [CHANGE] retry 設定を削除し、リトライ回数を指定する max_retry 設定を追加する - リトライしない場合は、max_retry を設定ファイルから削除するか、または、max_retry = 0 を設定する + - デフォルト値: 0 (リトライ無し) - @Hexa - [ADD] サービス接続時のエラーによるリトライまでの時間間隔を指定する retry_interval_ms 設定(ミリ秒間隔)を追加する + - デフォルト値: 100 (100 ms) - @Hexa - [ADD] サービス接続時の特定のエラー発生時に、リトライする仕組みを追加する - @Hexa From fa9448e0f8b8fd5f660db3540c4fde34f5e8a375 Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Wed, 6 Mar 2024 15:58:21 +0900 Subject: [PATCH 15/15] =?UTF-8?q?=E7=90=86=E7=94=B1=E3=82=92=E8=BF=BD?= =?UTF-8?q?=E5=8A=A0=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index ae10303..8a7175e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -22,7 +22,7 @@ - @Hexa - [ADD] ハンドラーにリトライ回数を管理するメソッドを追加する - @Hexa -- [CHANGE] aws への接続時に HTTP ステータスコードが 429 の応答の場合は、指定されたリトライ設定に応じて、再接続を試みるように変更する +- [CHANGE] aws への接続時に、時間をおいて再接続できる可能性がある HTTP ステータスコードが 429 の応答の場合は、指定されたリトライ設定に応じて、再接続を試みるように変更する - @Hexa - [CHANGE] aws、または、gcp への接続後にリトライ回数が max_retry を超えた場合は、{"type": "error", "reason": string} をクライアントへ送信する - @Hexa