Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

リトライ処理の改善 #192

Merged
merged 22 commits into from
Dec 3, 2024
Merged
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
c0035de
JSON ではなくエラーを返す
Hexa Nov 20, 2024
d94ffaa
- リトライ時に ctx を cancel する
Hexa Nov 21, 2024
b8a17ec
エラーを返すようにする
Hexa Nov 22, 2024
da9251a
サービスへの接続が成功した時点でリトライ回数をリセットする
Hexa Nov 22, 2024
084539d
元のエラーをクライアントに送信する
Hexa Nov 22, 2024
0681e4e
リトライ待ちの間にクライアントから切断された場合に、リトライ待ちでブロックされないようにする
Hexa Nov 26, 2024
0b02fef
パケット破棄処理を関数化する
Hexa Nov 26, 2024
eea6b1f
context, cancel を分ける
Hexa Nov 26, 2024
8ae84ed
メソッドに渡す context を修正する
Hexa Nov 26, 2024
853817a
コスメ
Hexa Nov 27, 2024
fe3b43f
- Read 時の処理を channel に置き換えて処理を中断できるようにする
Hexa Nov 28, 2024
d321c2d
opus 取得から ogg への変換処理を分離する
Hexa Nov 29, 2024
a834f61
すぐに EOF にならないようにファイル読み込み完了後に待たせる
Hexa Nov 29, 2024
e250093
channel でエラーを伝搬させる
Hexa Nov 29, 2024
c6c26fb
テストの silent packet 送信までの時間を伸ばす
Hexa Nov 29, 2024
491e173
ヘッダをすぐに返す
Hexa Nov 29, 2024
42bf69a
不要なため削除する
Hexa Nov 29, 2024
9e08fa1
変更履歴を更新する
Hexa Nov 29, 2024
cb10a6c
コスメ
Hexa Dec 2, 2024
5e553b6
sleep を削除する
Hexa Dec 2, 2024
206e9e2
コメントを修正する
Hexa Dec 2, 2024
d49c28f
ログレベルを変更する
Hexa Dec 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -11,6 +11,13 @@

## develop

- [FIX] サービスへの接続が成功してもリトライカウントがリセットされない不具合を修正する
- @Hexa
- [FIX] 解析結果だけでなくエラーメッセージの送信時にもリトライカウントをリセットしていたため、リトライ処理によってカウントがリセットされていた不具合を修正する
- @Hexa
- [FIX] リトライ待ち時にクライアントから切断しようとすると、リトライ待ちで処理がブロックされているため切断までに時間がかかる不具合を修正する
- @Hexa

### misc

## 2024.6.0
5 changes: 4 additions & 1 deletion amazon_transcribe.go
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ import (
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/transcribestreamingservice"
zlog "github.com/rs/zerolog/log"
)

type AmazonTranscribe struct {
@@ -89,7 +90,7 @@ func NewAmazonTranscribeClient(config Config) *transcribestreamingservice.Transc
return transcribestreamingservice.New(sess, cfg)
}

func (at *AmazonTranscribe) Start(ctx context.Context, r io.Reader) (*transcribestreamingservice.StartStreamTranscriptionEventStream, error) {
func (at *AmazonTranscribe) Start(ctx context.Context, r io.ReadCloser) (*transcribestreamingservice.StartStreamTranscriptionEventStream, error) {
config := at.Config
client := NewAmazonTranscribeClient(config)
input := NewStartStreamTranscriptionInput(at)
@@ -117,9 +118,11 @@ func (at *AmazonTranscribe) Start(ctx context.Context, r io.Reader) (*transcribe
stream := resp.GetStream()

go func() {
defer r.Close()
defer stream.Close()

if err := transcribestreamingservice.StreamAudioFromReader(ctx, stream, FrameSize, r); err != nil {
zlog.Error().Err(err).Send()
return
}
}()
46 changes: 7 additions & 39 deletions amazon_transcribe_handler.go
Original file line number Diff line number Diff line change
@@ -95,31 +95,19 @@ func (h *AmazonTranscribeHandler) ResetRetryCount() int {
return h.RetryCount
}

func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) {
func (h *AmazonTranscribeHandler) Handle(ctx context.Context, opusCh chan opusChannel) (*io.PipeReader, error) {
at := NewAmazonTranscribe(h.Config, h.LanguageCode, int64(h.SampleRate), int64(h.ChannelCount))

oggReader, oggWriter := io.Pipe()
go func() {
defer oggWriter.Close()
if err := opus2ogg(ctx, reader, oggWriter, h.SampleRate, h.ChannelCount, h.Config); err != nil {
if !errors.Is(err, io.EOF) {
zlog.Error().
Err(err).
Str("channel_id", h.ChannelID).
Str("connection_id", h.ConnectionID).
Send()
}

oggWriter.CloseWithError(err)
return
}
}()
packetReader := opus2ogg(ctx, opusCh, h.SampleRate, h.ChannelCount, h.Config)

stream, err := at.Start(ctx, oggReader)
stream, err := at.Start(ctx, packetReader)
if err != nil {
return nil, err
}

// リクエストが成功した時点でリトライカウントをリセットする
h.ResetRetryCount()

r, w := io.Pipe()

go func() {
@@ -195,33 +183,13 @@ func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader)
switch err.(type) {
case *transcribestreamingservice.LimitExceededException,
*transcribestreamingservice.InternalFailureException:
// リトライしない設定の場合、または、max_retry を超えた場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する
if (at.Config.MaxRetry < 1) || (at.Config.MaxRetry <= h.GetRetryCount()) {
if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil {
zlog.Error().
Err(err).
Str("channel_id", h.ChannelID).
Str("connection_id", h.ConnectionID).
Send()
}
}

err = ErrServerDisconnected
err = errors.Join(err, ErrServerDisconnected)
default:
// 再接続を想定している以外のエラーの場合はクライアントにエラーを返し、再度接続するかはクライアント側で判断する
if err := encoder.Encode(NewSuzuErrorResponse(err)); err != nil {
zlog.Error().
Err(err).
Str("channel_id", h.ChannelID).
Str("connection_id", h.ConnectionID).
Send()
}
}

w.CloseWithError(err)
return
}

w.Close()
}()

Loading