diff --git a/examples/requester/example.go b/examples/requester/example.go index 0a28888..2dbc36a 100644 --- a/examples/requester/example.go +++ b/examples/requester/example.go @@ -118,29 +118,29 @@ func requestOracleData( s.FailedRequestsCh(), senderCh, parser.IntoSenderTaskHandler, retryHandler, delayHandler, ) resolveMw := middleware.New( - s.SuccessRequestsCh(), watcherCh, parser.IntoRequestWatcherTaskHandler, resolveHandler, + s.SuccessfulRequestsCh(), watcherCh, parser.IntoRequestWatcherTaskHandler, resolveHandler, ) // start go s.Start() go w.Start() - go retryMw.Run() - go resolveMw.Run() + go retryMw.Start() + go resolveMw.Start() senderCh <- sender.NewTask(1, msg) for { time.Sleep(10 * time.Second) - if len(w.FailedRequestCh()) != 0 || len(w.SuccessfulRequestCh()) != 0 { + if len(w.FailedRequestsCh()) != 0 || len(w.SuccessfulRequestsCh()) != 0 { break } } - if len(w.FailedRequestCh()) != 0 { - failResp := <-w.FailedRequestCh() + if len(w.FailedRequestsCh()) != 0 { + failResp := <-w.FailedRequestsCh() return client.OracleResult{}, failResp } - resp := <-w.SuccessfulRequestCh() + resp := <-w.SuccessfulRequestsCh() return resp.OracleResult, nil } @@ -159,28 +159,28 @@ func getSigningResult( delayHandler := delay.NewHandler[signing.FailResponse, signing.Task](3 * time.Second) retryMw := middleware.New( - w.FailedRequestCh(), watcherCh, parser.IntoSigningWatcherTaskHandler, retryHandler, delayHandler, + w.FailedRequestsCh(), watcherCh, parser.IntoSigningWatcherTaskHandler, retryHandler, delayHandler, ) // start go w.Start() - go retryMw.Run() + go retryMw.Start() // new task to query tss signing result. watcherCh <- signing.NewTask(2, signingID) for { time.Sleep(10 * time.Second) - if len(w.FailedRequestCh()) != 0 || len(w.SuccessfulRequestCh()) != 0 { + if len(w.FailedRequestsCh()) != 0 || len(w.SuccessfulRequestsCh()) != 0 { break } } - if len(w.FailedRequestCh()) != 0 { - failResp := <-w.FailedRequestCh() + if len(w.FailedRequestsCh()) != 0 { + failResp := <-w.FailedRequestsCh() return client.SigningResult{}, failResp } - resp := <-w.SuccessfulRequestCh() + resp := <-w.SuccessfulRequestsCh() return resp.SigningResult, nil } diff --git a/requester/middleware/middleware.go b/requester/middleware/middleware.go index 4643a43..74a94f1 100644 --- a/requester/middleware/middleware.go +++ b/requester/middleware/middleware.go @@ -28,7 +28,7 @@ func New[T, U any]( return &Middleware[T, U]{inCh: inCh, outCh: outCh, chain: handlerChain[0]} } -func (m *Middleware[T, U]) Run() { +func (m *Middleware[T, U]) Start() { // Note: Add non-blocking middleware (eg. success middleware // that we just want to log/save to DB but don't want to wait to finish) for { diff --git a/requester/sender/sender.go b/requester/sender/sender.go index 9157d5c..3369cfc 100644 --- a/requester/sender/sender.go +++ b/requester/sender/sender.go @@ -25,7 +25,7 @@ type Sender struct { // Channel requestQueueCh chan Task successfulRequestsCh chan SuccessResponse - failedRequestCh chan FailResponse + failedRequestsCh chan FailResponse } func NewSender( @@ -58,16 +58,16 @@ func NewSender( pollingDelay: pollingDelay, requestQueueCh: requestQueueCh, successfulRequestsCh: make(chan SuccessResponse, successChBufferSize), - failedRequestCh: make(chan FailResponse, failureChBufferSize), + failedRequestsCh: make(chan FailResponse, failureChBufferSize), }, nil } -func (s *Sender) SuccessRequestsCh() <-chan SuccessResponse { +func (s *Sender) SuccessfulRequestsCh() <-chan SuccessResponse { return s.successfulRequestsCh } func (s *Sender) FailedRequestsCh() <-chan FailResponse { - return s.failedRequestCh + return s.failedRequestsCh } func (s *Sender) Start() { @@ -101,15 +101,15 @@ func (s *Sender) request(task Task, key keyring.Record) { // Handle error if err != nil { s.logger.Error("Sender", "failed to broadcast task ID(%d) with error: %s", task.ID(), err.Error()) - s.failedRequestCh <- FailResponse{task, sdk.TxResponse{}, types.ErrBroadcastFailed.Wrapf(err.Error())} + s.failedRequestsCh <- FailResponse{task, sdk.TxResponse{}, types.ErrBroadcastFailed.Wrapf(err.Error())} return } else if resp != nil && resp.Code != 0 { s.logger.Error("Sender", "failed to broadcast task ID(%d) with code %d", task.ID(), resp.Code) - s.failedRequestCh <- FailResponse{task, *resp, types.ErrBroadcastFailed} + s.failedRequestsCh <- FailResponse{task, *resp, types.ErrBroadcastFailed} return } else if resp == nil { s.logger.Error("Sender", "failed to broadcast task ID(%d) no response", task.ID()) - s.failedRequestCh <- FailResponse{task, sdk.TxResponse{}, types.ErrUnknown} + s.failedRequestsCh <- FailResponse{task, sdk.TxResponse{}, types.ErrUnknown} return } @@ -127,7 +127,7 @@ func (s *Sender) request(task Task, key keyring.Record) { if resp.Code != 0 { s.logger.Warning("Sender", "task ID(%d) failed with code %d", task.ID(), resp.Code) - s.failedRequestCh <- FailResponse{task, *resp, types.ErrBroadcastFailed.Wrapf(resp.RawLog)} + s.failedRequestsCh <- FailResponse{task, *resp, types.ErrBroadcastFailed.Wrapf(resp.RawLog)} return } @@ -136,5 +136,5 @@ func (s *Sender) request(task Task, key keyring.Record) { return } s.logger.Error("Sender", "task ID(%d) has timed out", task.ID()) - s.failedRequestCh <- FailResponse{task, *resp, types.ErrBroadcastFailed.Wrapf("timed out")} + s.failedRequestsCh <- FailResponse{task, *resp, types.ErrBroadcastFailed.Wrapf("timed out")} } diff --git a/requester/sender/sender_test.go b/requester/sender/sender_test.go index 3b03f12..1b84555 100644 --- a/requester/sender/sender_test.go +++ b/requester/sender/sender_test.go @@ -84,7 +84,7 @@ func TestSenderWithSuccess(t *testing.T) { timeout := time.After(10 * time.Second) for { select { - case <-s.SuccessRequestsCh(): + case <-s.SuccessfulRequestsCh(): return case <-s.FailedRequestsCh(): t.Errorf("expected a successful response") @@ -139,7 +139,7 @@ func TestSenderWithFailure(t *testing.T) { timeout := time.After(10 * time.Second) for { select { - case <-s.SuccessRequestsCh(): + case <-s.SuccessfulRequestsCh(): t.Errorf("expected a failed response") case <-s.FailedRequestsCh(): return @@ -179,7 +179,7 @@ func TestSenderWithClientError(t *testing.T) { timeout := time.After(10 * time.Second) for { select { - case <-s.SuccessRequestsCh(): + case <-s.SuccessfulRequestsCh(): t.Errorf("expected a failed response") case <-s.FailedRequestsCh(): return diff --git a/requester/watcher/request/watcher.go b/requester/watcher/request/watcher.go index da6c4f5..57f9c56 100644 --- a/requester/watcher/request/watcher.go +++ b/requester/watcher/request/watcher.go @@ -19,9 +19,9 @@ type Watcher struct { pollingDelay time.Duration // Channel - watchQueueCh <-chan Task - successfulRequestCh chan SuccessResponse - failedRequestCh chan FailResponse + watchQueueCh <-chan Task + successfulRequestsCh chan SuccessResponse + failedRequestsCh chan FailResponse } func NewWatcher( @@ -34,22 +34,22 @@ func NewWatcher( failureChBufferSize int, ) *Watcher { return &Watcher{ - client: client, - logger: logger, - timeout: timeout, - pollingDelay: pollingDelay, - watchQueueCh: watchQueueCh, - successfulRequestCh: make(chan SuccessResponse, successChBufferSize), - failedRequestCh: make(chan FailResponse, failureChBufferSize), + client: client, + logger: logger, + timeout: timeout, + pollingDelay: pollingDelay, + watchQueueCh: watchQueueCh, + successfulRequestsCh: make(chan SuccessResponse, successChBufferSize), + failedRequestsCh: make(chan FailResponse, failureChBufferSize), } } -func (w *Watcher) SuccessfulRequestCh() <-chan SuccessResponse { - return w.successfulRequestCh +func (w *Watcher) SuccessfulRequestsCh() <-chan SuccessResponse { + return w.successfulRequestsCh } -func (w *Watcher) FailedRequestCh() <-chan FailResponse { - return w.failedRequestCh +func (w *Watcher) FailedRequestsCh() <-chan FailResponse { + return w.failedRequestsCh } func (w *Watcher) Start() { @@ -76,17 +76,17 @@ func (w *Watcher) watch(task Task) { // Assume all results can be marshalled b, _ := json.Marshal(res) w.logger.Info("Watcher", "task ID(%d) has been resolved with result: %s", task.ID(), string(b)) - w.successfulRequestCh <- SuccessResponse{task, *res} + w.successfulRequestsCh <- SuccessResponse{task, *res} return default: // Assume all results can be marshalled b, _ := json.Marshal(res) w.logger.Info("Watcher", "task ID(%d) has failed with result: %s", task.ID(), string(b)) wrappedErr := types.ErrUnknown.Wrapf("request ID %d failed with unknown reason: %s", task.RequestID, err) - w.failedRequestCh <- FailResponse{task, *res, wrappedErr} + w.failedRequestsCh <- FailResponse{task, *res, wrappedErr} return } } - w.failedRequestCh <- FailResponse{task, client.OracleResult{}, types.ErrTimedOut} + w.failedRequestsCh <- FailResponse{task, client.OracleResult{}, types.ErrTimedOut} } diff --git a/requester/watcher/request/watcher_test.go b/requester/watcher/request/watcher_test.go index a1e6f1b..f190705 100644 --- a/requester/watcher/request/watcher_test.go +++ b/requester/watcher/request/watcher_test.go @@ -64,9 +64,9 @@ func TestWatcher(t *testing.T) { for { select { - case <-w.SuccessfulRequestCh(): + case <-w.SuccessfulRequestsCh(): return - case <-w.FailedRequestCh(): + case <-w.FailedRequestsCh(): t.Errorf("expected success, not failure") return case <-timeout: @@ -126,10 +126,10 @@ func TestWatcherWithResolveFailure(t *testing.T) { for { select { - case <-w.SuccessfulRequestCh(): + case <-w.SuccessfulRequestsCh(): t.Errorf("expected failure, not success") return - case <-w.FailedRequestCh(): + case <-w.FailedRequestsCh(): return case <-timeout: t.Errorf("timed out") @@ -171,10 +171,10 @@ func TestWatcherWithTimeout(t *testing.T) { for { select { - case <-w.SuccessfulRequestCh(): + case <-w.SuccessfulRequestsCh(): t.Errorf("expected failure due to timeout") return - case <-w.FailedRequestCh(): + case <-w.FailedRequestsCh(): return case <-timeout: t.Errorf("timed out") diff --git a/requester/watcher/signing/watcher.go b/requester/watcher/signing/watcher.go index 397e44f..df67d1f 100644 --- a/requester/watcher/signing/watcher.go +++ b/requester/watcher/signing/watcher.go @@ -19,9 +19,9 @@ type Watcher struct { pollingDelay time.Duration // Channel - watchQueueCh <-chan Task - successfulRequestCh chan SuccessResponse - failedRequestCh chan FailResponse + watchQueueCh <-chan Task + successfulRequestsCh chan SuccessResponse + failedRequestsCh chan FailResponse } func NewWatcher( @@ -34,22 +34,22 @@ func NewWatcher( failureChBufferSize int, ) *Watcher { return &Watcher{ - client: client, - logger: logger, - timeout: timeout, - pollingDelay: pollingDelay, - watchQueueCh: watchQueueCh, - successfulRequestCh: make(chan SuccessResponse, successChBufferSize), - failedRequestCh: make(chan FailResponse, failureChBufferSize), + client: client, + logger: logger, + timeout: timeout, + pollingDelay: pollingDelay, + watchQueueCh: watchQueueCh, + successfulRequestsCh: make(chan SuccessResponse, successChBufferSize), + failedRequestsCh: make(chan FailResponse, failureChBufferSize), } } -func (w *Watcher) SuccessfulRequestCh() <-chan SuccessResponse { - return w.successfulRequestCh +func (w *Watcher) SuccessfulRequestsCh() <-chan SuccessResponse { + return w.successfulRequestsCh } -func (w *Watcher) FailedRequestCh() <-chan FailResponse { - return w.failedRequestCh +func (w *Watcher) FailedRequestsCh() <-chan FailResponse { + return w.failedRequestsCh } func (w *Watcher) Start() { @@ -60,7 +60,7 @@ func (w *Watcher) Start() { func (w *Watcher) watch(task Task) { if task.SigningID == 0 { - w.failedRequestCh <- FailResponse{task, client.SigningResult{}, types.ErrUnknown.Wrapf("signing ID %d is invalid", task.SigningID)} + w.failedRequestsCh <- FailResponse{task, client.SigningResult{}, types.ErrUnknown.Wrapf("signing ID %d is invalid", task.SigningID)} return } @@ -87,17 +87,17 @@ func (w *Watcher) watch(task Task) { // Assume all results can be marshalled b, _ := json.Marshal(res) w.logger.Info("Watcher", "task ID(%d) has been resolved with result: %s", task.ID(), string(b)) - w.successfulRequestCh <- SuccessResponse{task, *res} + w.successfulRequestsCh <- SuccessResponse{task, *res} return default: // Assume all results can be marshalled b, _ := json.Marshal(res) w.logger.Info("Watcher", "task ID(%d) has failed with result: %s", task.ID(), string(b)) wrappedErr := types.ErrUnknown.Wrapf("signign ID %d failed with unknown reason: %s", task.SigningID, err) - w.failedRequestCh <- FailResponse{task, *res, wrappedErr} + w.failedRequestsCh <- FailResponse{task, *res, wrappedErr} return } } - w.failedRequestCh <- FailResponse{task, client.SigningResult{}, types.ErrTimedOut} + w.failedRequestsCh <- FailResponse{task, client.SigningResult{}, types.ErrTimedOut} } diff --git a/requester/watcher/signing/watcher_test.go b/requester/watcher/signing/watcher_test.go index 3216559..057349d 100644 --- a/requester/watcher/signing/watcher_test.go +++ b/requester/watcher/signing/watcher_test.go @@ -67,9 +67,9 @@ func TestWatcherSuccess(t *testing.T) { for { select { - case <-w.SuccessfulRequestCh(): + case <-w.SuccessfulRequestsCh(): return - case <-w.FailedRequestCh(): + case <-w.FailedRequestsCh(): t.Errorf("expected success, not failure") return case <-timeout: @@ -162,10 +162,10 @@ func TestWatcherWithResolveFailure(t *testing.T) { for { select { - case <-w.SuccessfulRequestCh(): + case <-w.SuccessfulRequestsCh(): t.Errorf("expected failure, not success") return - case <-w.FailedRequestCh(): + case <-w.FailedRequestsCh(): return case <-timeout: t.Errorf("timed out") @@ -198,10 +198,10 @@ func TestWatcherWithTimeout(t *testing.T) { for { select { - case <-w.SuccessfulRequestCh(): + case <-w.SuccessfulRequestsCh(): t.Errorf("expected failure due to timeout") return - case <-w.FailedRequestCh(): + case <-w.FailedRequestsCh(): return case <-timeout: t.Errorf("client not being stopped before timed out")