Skip to content

Commit

Permalink
rename variables & functions
Browse files Browse the repository at this point in the history
  • Loading branch information
nkitlabs committed May 17, 2024
1 parent 482d19e commit 0f0f568
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 73 deletions.
26 changes: 13 additions & 13 deletions examples/requester/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,29 +118,29 @@ func requestOracleData(
s.FailedRequestsCh(), senderCh, parser.IntoSenderTaskHandler, retryHandler, delayHandler,
)
resolveMw := middleware.New(
s.SuccessRequestsCh(), watcherCh, parser.IntoRequestWatcherTaskHandler, resolveHandler,
s.SuccessfulRequestsCh(), watcherCh, parser.IntoRequestWatcherTaskHandler, resolveHandler,
)

// start
go s.Start()
go w.Start()
go retryMw.Run()
go resolveMw.Run()
go retryMw.Start()
go resolveMw.Start()

senderCh <- sender.NewTask(1, msg)
for {
time.Sleep(10 * time.Second)
if len(w.FailedRequestCh()) != 0 || len(w.SuccessfulRequestCh()) != 0 {
if len(w.FailedRequestsCh()) != 0 || len(w.SuccessfulRequestsCh()) != 0 {
break
}
}

if len(w.FailedRequestCh()) != 0 {
failResp := <-w.FailedRequestCh()
if len(w.FailedRequestsCh()) != 0 {
failResp := <-w.FailedRequestsCh()
return client.OracleResult{}, failResp
}

resp := <-w.SuccessfulRequestCh()
resp := <-w.SuccessfulRequestsCh()
return resp.OracleResult, nil
}

Expand All @@ -159,28 +159,28 @@ func getSigningResult(
delayHandler := delay.NewHandler[signing.FailResponse, signing.Task](3 * time.Second)

retryMw := middleware.New(
w.FailedRequestCh(), watcherCh, parser.IntoSigningWatcherTaskHandler, retryHandler, delayHandler,
w.FailedRequestsCh(), watcherCh, parser.IntoSigningWatcherTaskHandler, retryHandler, delayHandler,
)

// start
go w.Start()
go retryMw.Run()
go retryMw.Start()

// new task to query tss signing result.
watcherCh <- signing.NewTask(2, signingID)
for {
time.Sleep(10 * time.Second)
if len(w.FailedRequestCh()) != 0 || len(w.SuccessfulRequestCh()) != 0 {
if len(w.FailedRequestsCh()) != 0 || len(w.SuccessfulRequestsCh()) != 0 {
break
}
}

if len(w.FailedRequestCh()) != 0 {
failResp := <-w.FailedRequestCh()
if len(w.FailedRequestsCh()) != 0 {
failResp := <-w.FailedRequestsCh()
return client.SigningResult{}, failResp
}

resp := <-w.SuccessfulRequestCh()
resp := <-w.SuccessfulRequestsCh()
return resp.SigningResult, nil
}

Expand Down
2 changes: 1 addition & 1 deletion requester/middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func New[T, U any](
return &Middleware[T, U]{inCh: inCh, outCh: outCh, chain: handlerChain[0]}
}

func (m *Middleware[T, U]) Run() {
func (m *Middleware[T, U]) Start() {
// Note: Add non-blocking middleware (eg. success middleware
// that we just want to log/save to DB but don't want to wait to finish)
for {
Expand Down
18 changes: 9 additions & 9 deletions requester/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Sender struct {
// Channel
requestQueueCh chan Task
successfulRequestsCh chan SuccessResponse
failedRequestCh chan FailResponse
failedRequestsCh chan FailResponse
}

func NewSender(
Expand Down Expand Up @@ -58,16 +58,16 @@ func NewSender(
pollingDelay: pollingDelay,
requestQueueCh: requestQueueCh,
successfulRequestsCh: make(chan SuccessResponse, successChBufferSize),
failedRequestCh: make(chan FailResponse, failureChBufferSize),
failedRequestsCh: make(chan FailResponse, failureChBufferSize),
}, nil
}

func (s *Sender) SuccessRequestsCh() <-chan SuccessResponse {
func (s *Sender) SuccessfulRequestsCh() <-chan SuccessResponse {
return s.successfulRequestsCh
}

func (s *Sender) FailedRequestsCh() <-chan FailResponse {
return s.failedRequestCh
return s.failedRequestsCh
}

func (s *Sender) Start() {
Expand Down Expand Up @@ -101,15 +101,15 @@ func (s *Sender) request(task Task, key keyring.Record) {
// Handle error
if err != nil {
s.logger.Error("Sender", "failed to broadcast task ID(%d) with error: %s", task.ID(), err.Error())
s.failedRequestCh <- FailResponse{task, sdk.TxResponse{}, types.ErrBroadcastFailed.Wrapf(err.Error())}
s.failedRequestsCh <- FailResponse{task, sdk.TxResponse{}, types.ErrBroadcastFailed.Wrapf(err.Error())}
return
} else if resp != nil && resp.Code != 0 {
s.logger.Error("Sender", "failed to broadcast task ID(%d) with code %d", task.ID(), resp.Code)
s.failedRequestCh <- FailResponse{task, *resp, types.ErrBroadcastFailed}
s.failedRequestsCh <- FailResponse{task, *resp, types.ErrBroadcastFailed}
return
} else if resp == nil {
s.logger.Error("Sender", "failed to broadcast task ID(%d) no response", task.ID())
s.failedRequestCh <- FailResponse{task, sdk.TxResponse{}, types.ErrUnknown}
s.failedRequestsCh <- FailResponse{task, sdk.TxResponse{}, types.ErrUnknown}
return
}

Expand All @@ -127,7 +127,7 @@ func (s *Sender) request(task Task, key keyring.Record) {

if resp.Code != 0 {
s.logger.Warning("Sender", "task ID(%d) failed with code %d", task.ID(), resp.Code)
s.failedRequestCh <- FailResponse{task, *resp, types.ErrBroadcastFailed.Wrapf(resp.RawLog)}
s.failedRequestsCh <- FailResponse{task, *resp, types.ErrBroadcastFailed.Wrapf(resp.RawLog)}
return
}

Expand All @@ -136,5 +136,5 @@ func (s *Sender) request(task Task, key keyring.Record) {
return
}
s.logger.Error("Sender", "task ID(%d) has timed out", task.ID())
s.failedRequestCh <- FailResponse{task, *resp, types.ErrBroadcastFailed.Wrapf("timed out")}
s.failedRequestsCh <- FailResponse{task, *resp, types.ErrBroadcastFailed.Wrapf("timed out")}
}
6 changes: 3 additions & 3 deletions requester/sender/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestSenderWithSuccess(t *testing.T) {
timeout := time.After(10 * time.Second)
for {
select {
case <-s.SuccessRequestsCh():
case <-s.SuccessfulRequestsCh():
return
case <-s.FailedRequestsCh():
t.Errorf("expected a successful response")
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestSenderWithFailure(t *testing.T) {
timeout := time.After(10 * time.Second)
for {
select {
case <-s.SuccessRequestsCh():
case <-s.SuccessfulRequestsCh():
t.Errorf("expected a failed response")
case <-s.FailedRequestsCh():
return
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestSenderWithClientError(t *testing.T) {
timeout := time.After(10 * time.Second)
for {
select {
case <-s.SuccessRequestsCh():
case <-s.SuccessfulRequestsCh():
t.Errorf("expected a failed response")
case <-s.FailedRequestsCh():
return
Expand Down
34 changes: 17 additions & 17 deletions requester/watcher/request/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ type Watcher struct {
pollingDelay time.Duration

// Channel
watchQueueCh <-chan Task
successfulRequestCh chan SuccessResponse
failedRequestCh chan FailResponse
watchQueueCh <-chan Task
successfulRequestsCh chan SuccessResponse
failedRequestsCh chan FailResponse
}

func NewWatcher(
Expand All @@ -34,22 +34,22 @@ func NewWatcher(
failureChBufferSize int,
) *Watcher {
return &Watcher{
client: client,
logger: logger,
timeout: timeout,
pollingDelay: pollingDelay,
watchQueueCh: watchQueueCh,
successfulRequestCh: make(chan SuccessResponse, successChBufferSize),
failedRequestCh: make(chan FailResponse, failureChBufferSize),
client: client,
logger: logger,
timeout: timeout,
pollingDelay: pollingDelay,
watchQueueCh: watchQueueCh,
successfulRequestsCh: make(chan SuccessResponse, successChBufferSize),
failedRequestsCh: make(chan FailResponse, failureChBufferSize),
}
}

func (w *Watcher) SuccessfulRequestCh() <-chan SuccessResponse {
return w.successfulRequestCh
func (w *Watcher) SuccessfulRequestsCh() <-chan SuccessResponse {
return w.successfulRequestsCh
}

func (w *Watcher) FailedRequestCh() <-chan FailResponse {
return w.failedRequestCh
func (w *Watcher) FailedRequestsCh() <-chan FailResponse {
return w.failedRequestsCh
}

func (w *Watcher) Start() {
Expand All @@ -76,17 +76,17 @@ func (w *Watcher) watch(task Task) {
// Assume all results can be marshalled
b, _ := json.Marshal(res)
w.logger.Info("Watcher", "task ID(%d) has been resolved with result: %s", task.ID(), string(b))
w.successfulRequestCh <- SuccessResponse{task, *res}
w.successfulRequestsCh <- SuccessResponse{task, *res}
return
default:
// Assume all results can be marshalled
b, _ := json.Marshal(res)
w.logger.Info("Watcher", "task ID(%d) has failed with result: %s", task.ID(), string(b))
wrappedErr := types.ErrUnknown.Wrapf("request ID %d failed with unknown reason: %s", task.RequestID, err)
w.failedRequestCh <- FailResponse{task, *res, wrappedErr}
w.failedRequestsCh <- FailResponse{task, *res, wrappedErr}
return
}
}

w.failedRequestCh <- FailResponse{task, client.OracleResult{}, types.ErrTimedOut}
w.failedRequestsCh <- FailResponse{task, client.OracleResult{}, types.ErrTimedOut}
}
12 changes: 6 additions & 6 deletions requester/watcher/request/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ func TestWatcher(t *testing.T) {

for {
select {
case <-w.SuccessfulRequestCh():
case <-w.SuccessfulRequestsCh():
return
case <-w.FailedRequestCh():
case <-w.FailedRequestsCh():
t.Errorf("expected success, not failure")
return
case <-timeout:
Expand Down Expand Up @@ -126,10 +126,10 @@ func TestWatcherWithResolveFailure(t *testing.T) {

for {
select {
case <-w.SuccessfulRequestCh():
case <-w.SuccessfulRequestsCh():
t.Errorf("expected failure, not success")
return
case <-w.FailedRequestCh():
case <-w.FailedRequestsCh():
return
case <-timeout:
t.Errorf("timed out")
Expand Down Expand Up @@ -171,10 +171,10 @@ func TestWatcherWithTimeout(t *testing.T) {

for {
select {
case <-w.SuccessfulRequestCh():
case <-w.SuccessfulRequestsCh():
t.Errorf("expected failure due to timeout")
return
case <-w.FailedRequestCh():
case <-w.FailedRequestsCh():
return
case <-timeout:
t.Errorf("timed out")
Expand Down
36 changes: 18 additions & 18 deletions requester/watcher/signing/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ type Watcher struct {
pollingDelay time.Duration

// Channel
watchQueueCh <-chan Task
successfulRequestCh chan SuccessResponse
failedRequestCh chan FailResponse
watchQueueCh <-chan Task
successfulRequestsCh chan SuccessResponse
failedRequestsCh chan FailResponse
}

func NewWatcher(
Expand All @@ -34,22 +34,22 @@ func NewWatcher(
failureChBufferSize int,
) *Watcher {
return &Watcher{
client: client,
logger: logger,
timeout: timeout,
pollingDelay: pollingDelay,
watchQueueCh: watchQueueCh,
successfulRequestCh: make(chan SuccessResponse, successChBufferSize),
failedRequestCh: make(chan FailResponse, failureChBufferSize),
client: client,
logger: logger,
timeout: timeout,
pollingDelay: pollingDelay,
watchQueueCh: watchQueueCh,
successfulRequestsCh: make(chan SuccessResponse, successChBufferSize),
failedRequestsCh: make(chan FailResponse, failureChBufferSize),
}
}

func (w *Watcher) SuccessfulRequestCh() <-chan SuccessResponse {
return w.successfulRequestCh
func (w *Watcher) SuccessfulRequestsCh() <-chan SuccessResponse {
return w.successfulRequestsCh
}

func (w *Watcher) FailedRequestCh() <-chan FailResponse {
return w.failedRequestCh
func (w *Watcher) FailedRequestsCh() <-chan FailResponse {
return w.failedRequestsCh
}

func (w *Watcher) Start() {
Expand All @@ -60,7 +60,7 @@ func (w *Watcher) Start() {

func (w *Watcher) watch(task Task) {
if task.SigningID == 0 {
w.failedRequestCh <- FailResponse{task, client.SigningResult{}, types.ErrUnknown.Wrapf("signing ID %d is invalid", task.SigningID)}
w.failedRequestsCh <- FailResponse{task, client.SigningResult{}, types.ErrUnknown.Wrapf("signing ID %d is invalid", task.SigningID)}
return
}

Expand All @@ -87,17 +87,17 @@ func (w *Watcher) watch(task Task) {
// Assume all results can be marshalled
b, _ := json.Marshal(res)
w.logger.Info("Watcher", "task ID(%d) has been resolved with result: %s", task.ID(), string(b))
w.successfulRequestCh <- SuccessResponse{task, *res}
w.successfulRequestsCh <- SuccessResponse{task, *res}
return
default:
// Assume all results can be marshalled
b, _ := json.Marshal(res)
w.logger.Info("Watcher", "task ID(%d) has failed with result: %s", task.ID(), string(b))
wrappedErr := types.ErrUnknown.Wrapf("signign ID %d failed with unknown reason: %s", task.SigningID, err)
w.failedRequestCh <- FailResponse{task, *res, wrappedErr}
w.failedRequestsCh <- FailResponse{task, *res, wrappedErr}
return
}
}

w.failedRequestCh <- FailResponse{task, client.SigningResult{}, types.ErrTimedOut}
w.failedRequestsCh <- FailResponse{task, client.SigningResult{}, types.ErrTimedOut}
}
12 changes: 6 additions & 6 deletions requester/watcher/signing/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ func TestWatcherSuccess(t *testing.T) {

for {
select {
case <-w.SuccessfulRequestCh():
case <-w.SuccessfulRequestsCh():
return
case <-w.FailedRequestCh():
case <-w.FailedRequestsCh():
t.Errorf("expected success, not failure")
return
case <-timeout:
Expand Down Expand Up @@ -162,10 +162,10 @@ func TestWatcherWithResolveFailure(t *testing.T) {

for {
select {
case <-w.SuccessfulRequestCh():
case <-w.SuccessfulRequestsCh():
t.Errorf("expected failure, not success")
return
case <-w.FailedRequestCh():
case <-w.FailedRequestsCh():
return
case <-timeout:
t.Errorf("timed out")
Expand Down Expand Up @@ -198,10 +198,10 @@ func TestWatcherWithTimeout(t *testing.T) {

for {
select {
case <-w.SuccessfulRequestCh():
case <-w.SuccessfulRequestsCh():
t.Errorf("expected failure due to timeout")
return
case <-w.FailedRequestCh():
case <-w.FailedRequestsCh():
return
case <-timeout:
t.Errorf("client not being stopped before timed out")
Expand Down

0 comments on commit 0f0f568

Please sign in to comment.