Skip to content

Commit

Permalink
Safety checks for multi-reply-first
Browse files Browse the repository at this point in the history
Add safety wrapper to multi-response handler

Signed-off-by: Seán C McCord <[email protected]>
  • Loading branch information
Ulexus committed Nov 14, 2020
1 parent 9c11fb3 commit 30dca11
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 26 deletions.
8 changes: 2 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,12 @@ However, it does need to know how to connect to both Asterisk and NATS.

Binary releases are available on the [releases page](https://github.com/CyCoreSystems/ari-proxy/releases).

You can also install the server manually. It is not (yet) go-gettable, but we
use [dep](https://github.com/golang/dep) for dependency management.
You can also install the server manually:

```
dep ensure
go install
go install github.com/CyCoreSystems/ari-proxy/v5
```

You may need to explicitly install dependencies for this to work

## Client library

`ari-proxy` uses semantic versioning and standard Go modules. To use it in your
Expand Down
65 changes: 45 additions & 20 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"context"
"os"
"sync"
"time"

"github.com/CyCoreSystems/ari-proxy/v5/client/bus"
Expand All @@ -27,7 +28,7 @@ import (
var ClosureGracePeriod = 10 * time.Second

// DefaultRequestTimeout is the default timeout for a NATS request. (Note: Answer() takes longer than 250ms on average)
const DefaultRequestTimeout = 500 * time.Millisecond
var DefaultRequestTimeout = 500 * time.Millisecond

// DefaultInputBufferLength is the default size of the event buffer for events
// coming in from NATS
Expand Down Expand Up @@ -572,39 +573,62 @@ func (c *Client) makeRequests(class string, req *proxy.Request) (responses []*pr
}
}

type limitedResponseForwarder struct {
closed bool
count int
expected int
fwdChan chan *proxy.Response

mu sync.Mutex
}

func (f *limitedResponseForwarder) Forward(o *proxy.Response) {
f.mu.Lock()
defer f.mu.Unlock()

f.count++

if f.closed {
return
}

// always send up reply, so we can track errors.
select {
case f.fwdChan <- o:
default:
}

if f.count >= f.expected {
f.closed = true
close(f.fwdChan)
}
}

// TODO: simplify
// nolint: gocyclo
func (c *Client) makeBroadcastRequestReturnFirstGoodResponse(class string, req *proxy.Request) (*proxy.Response, error) {
if req == nil {
return nil, errors.New("empty request")
}

if req.Key == nil {
req.Key = ari.NewKey("", "")
}

expected := len(c.core.cluster.Matching(req.Key.Node, req.Key.App, c.core.clusterMaxAge))
reply := rid.New("rp")
replyChan := make(chan *proxy.Response)

var responseCount int
replySub, err := c.core.nc.Subscribe(reply, func(o *proxy.Response) {
responseCount++

// always send up reply, so we can track errors.
replyChan <- o
rf := &limitedResponseForwarder{
expected: len(c.core.cluster.Matching(req.Key.Node, req.Key.App, c.core.clusterMaxAge)),
fwdChan: make(chan *proxy.Response),
}

if responseCount >= expected {
close(replyChan)
}
})
replySub, err := c.core.nc.Subscribe(reply, rf.Forward)
if err != nil {
return nil, errors.Wrap(err, "failed to subscribe to data responses")
}
defer replySub.Unsubscribe() // nolint: errcheck

// Make an all-call for the entity data
err = c.core.nc.PublishRequest(c.subject(class, req), reply, req)
if err != nil {
if err = c.core.nc.PublishRequest(c.subject(class, req), reply, req); err != nil {
return nil, errors.Wrap(err, "failed to make request for data")
}

Expand All @@ -616,18 +640,19 @@ func (c *Client) makeBroadcastRequestReturnFirstGoodResponse(class string, req *
if err == nil {
err = errors.New("timeout")
}

return nil, err
case resp, more := <-replyChan:
case resp, more := <-rf.fwdChan:
if !more {
if err == nil {
err = errors.New("no data")
}

return nil, err
}
if resp != nil {
err = resp.Err() // store the error for later return
if err == nil { // No error means to return the current value
return resp, nil
if err = resp.Err(); err == nil { // store the error for later return
return resp, nil // No error means to return the current value
}
}
}
Expand Down

0 comments on commit 30dca11

Please sign in to comment.