diff --git a/README.md b/README.md index 7713b1c..5e4e724 100644 --- a/README.md +++ b/README.md @@ -43,16 +43,12 @@ However, it does need to know how to connect to both Asterisk and NATS. Binary releases are available on the [releases page](https://github.com/CyCoreSystems/ari-proxy/releases). -You can also install the server manually. It is not (yet) go-gettable, but we -use [dep](https://github.com/golang/dep) for dependency management. +You can also install the server manually: ``` - dep ensure - go install + go install github.com/CyCoreSystems/ari-proxy/v5 ``` -You may need to explicitly install dependencies for this to work - ## Client library `ari-proxy` uses semantic versioning and standard Go modules. To use it in your diff --git a/client/client.go b/client/client.go index 8e3239a..35e399d 100644 --- a/client/client.go +++ b/client/client.go @@ -3,6 +3,7 @@ package client import ( "context" "os" + "sync" "time" "github.com/CyCoreSystems/ari-proxy/v5/client/bus" @@ -27,7 +28,7 @@ import ( var ClosureGracePeriod = 10 * time.Second // DefaultRequestTimeout is the default timeout for a NATS request. (Note: Answer() takes longer than 250ms on average) -const DefaultRequestTimeout = 500 * time.Millisecond +var DefaultRequestTimeout = 500 * time.Millisecond // DefaultInputBufferLength is the default size of the event buffer for events // coming in from NATS @@ -572,39 +573,62 @@ func (c *Client) makeRequests(class string, req *proxy.Request) (responses []*pr } } +type limitedResponseForwarder struct { + closed bool + count int + expected int + fwdChan chan *proxy.Response + + mu sync.Mutex +} + +func (f *limitedResponseForwarder) Forward(o *proxy.Response) { + f.mu.Lock() + defer f.mu.Unlock() + + f.count++ + + if f.closed { + return + } + + // always send up reply, so we can track errors. + select { + case f.fwdChan <- o: + default: + } + + if f.count >= f.expected { + f.closed = true + close(f.fwdChan) + } +} + // TODO: simplify -// nolint: gocyclo func (c *Client) makeBroadcastRequestReturnFirstGoodResponse(class string, req *proxy.Request) (*proxy.Response, error) { if req == nil { return nil, errors.New("empty request") } + if req.Key == nil { req.Key = ari.NewKey("", "") } - expected := len(c.core.cluster.Matching(req.Key.Node, req.Key.App, c.core.clusterMaxAge)) reply := rid.New("rp") - replyChan := make(chan *proxy.Response) - var responseCount int - replySub, err := c.core.nc.Subscribe(reply, func(o *proxy.Response) { - responseCount++ - - // always send up reply, so we can track errors. - replyChan <- o + rf := &limitedResponseForwarder{ + expected: len(c.core.cluster.Matching(req.Key.Node, req.Key.App, c.core.clusterMaxAge)), + fwdChan: make(chan *proxy.Response), + } - if responseCount >= expected { - close(replyChan) - } - }) + replySub, err := c.core.nc.Subscribe(reply, rf.Forward) if err != nil { return nil, errors.Wrap(err, "failed to subscribe to data responses") } defer replySub.Unsubscribe() // nolint: errcheck // Make an all-call for the entity data - err = c.core.nc.PublishRequest(c.subject(class, req), reply, req) - if err != nil { + if err = c.core.nc.PublishRequest(c.subject(class, req), reply, req); err != nil { return nil, errors.Wrap(err, "failed to make request for data") } @@ -616,18 +640,19 @@ func (c *Client) makeBroadcastRequestReturnFirstGoodResponse(class string, req * if err == nil { err = errors.New("timeout") } + return nil, err - case resp, more := <-replyChan: + case resp, more := <-rf.fwdChan: if !more { if err == nil { err = errors.New("no data") } + return nil, err } if resp != nil { - err = resp.Err() // store the error for later return - if err == nil { // No error means to return the current value - return resp, nil + if err = resp.Err(); err == nil { // store the error for later return + return resp, nil // No error means to return the current value } } }