diff --git a/client/bus/bus.go b/client/bus/bus.go index cbab820..1faa5c5 100644 --- a/client/bus/bus.go +++ b/client/bus/bus.go @@ -5,13 +5,16 @@ import ( "sync" "github.com/CyCoreSystems/ari/v5" - "github.com/CyCoreSystems/ari/v5/stdbus" "github.com/inconshreveable/log15" "github.com/pkg/errors" "github.com/nats-io/nats.go" ) +// subscriptionEventBufferSize defines the number of events that each +// subscription will queue before accepting more events. +var subscriptionEventBufferSize = 100 + // busWrapper binds a NATS subject to an ari.Bus, passing any received NATS messages to that bus type busWrapper struct { subject string @@ -20,7 +23,23 @@ type busWrapper struct { sub *nats.Subscription - bus ari.Bus + subs []*subscription // The list of subscriptions + + rwMux sync.RWMutex + + closed bool +} + +// A Subscription is a wrapped channel for receiving +// events from the ARI event bus. +type subscription struct { + key *ari.Key + b *busWrapper // reference to the event bus + events []string // list of events to listen for + + mu sync.RWMutex + closed bool // channel closure protection flag + C chan ari.Event // channel for sending events to the subscriber } func newBusWrapper(subject string, nc *nats.EncodedConn, log log15.Logger) (*busWrapper, error) { @@ -29,7 +48,7 @@ func newBusWrapper(subject string, nc *nats.EncodedConn, log log15.Logger) (*bus w := &busWrapper{ subject: subject, log: log, - bus: stdbus.New(), + subs: []*subscription{}, } w.sub, err = nc.Subscribe(subject, func(m *nats.Msg) { @@ -43,20 +62,128 @@ func newBusWrapper(subject string, nc *nats.EncodedConn, log log15.Logger) (*bus } func (w *busWrapper) receive(o *nats.Msg) { + var matched bool + e, err := ari.DecodeEvent(o.Data) if err != nil { w.log.Error("failed to convert received message to ari.Event", "error", err) return } - w.bus.Send(e) + w.rwMux.RLock() + // Disseminate the message to the subscribers + for _, s := range w.subs { + matched = false + for _, k := range e.Keys() { + if matched { + break + } + + if s.key.Match(k) { + matched = true + + for _, topic := range s.events { + if topic == e.GetType() || topic == ari.Events.All { + s.mu.RLock() + if !s.closed { + select { + case s.C <- e: + default: // never block + } + } + s.mu.RUnlock() + } + } + } + } + } + w.rwMux.RUnlock() +} + +// Subscribe returns a subscription to the given list +// of event types +func (w *busWrapper) Subscribe(key *ari.Key, eTypes ...string) ari.Subscription { + s := &subscription{ + key: key, + b: w, + events: eTypes, + C: make(chan ari.Event, subscriptionEventBufferSize), + } + w.add(s) + + return s +} + +// add appends a new subscription to the bus +func (w *busWrapper) add(s *subscription) { + w.rwMux.Lock() + w.subs = append(w.subs, s) + w.rwMux.Unlock() +} + +// remove deletes the given subscription from the bus +func (w *busWrapper) remove(s *subscription) { + w.rwMux.Lock() + for i, si := range w.subs { + if s == si { + // Subs are pointers, so we have to explicitly remove them + // to prevent memory leaks + w.subs[i] = w.subs[len(w.subs)-1] // replace the current with the end + w.subs[len(w.subs)-1] = nil // remove the end + w.subs = w.subs[:len(w.subs)-1] // lop off the end + + break + } + } + w.rwMux.Unlock() } func (w *busWrapper) Close() { + if w.closed { + return + } if err := w.sub.Unsubscribe(); err != nil { w.log.Error("failed to unsubscribe when closing NATS subscription:", err) } - w.bus.Close() + w.closed = true + for _, s := range w.subs { + s.Cancel() + } +} + +// Events returns the events channel +func (s *subscription) Events() <-chan ari.Event { + return s.C +} + +// Cancel cancels the subscription and removes it from +// the event bus. +func (s *subscription) Cancel() { + if s == nil { + return + } + s.mu.Lock() + + if s.closed { + s.mu.Unlock() + return + } + + s.closed = true + + s.mu.Unlock() + + // Remove the subscription from the bus + if s.b != nil { + s.b.remove(s) + } + + // Close the subscription's deliver channel + if s.C != nil { + close(s.C) + s.C = nil + } + s = nil } // Bus provides an ari.Bus interface to NATS @@ -171,5 +298,5 @@ func (b *Bus) Subscribe(key *ari.Key, n ...string) ari.Subscription { } b.mu.Unlock() - return w.bus.Subscribe(key, n...) + return w.Subscribe(key, n...) } diff --git a/client/client.go b/client/client.go index d8629ab..f1ca18b 100644 --- a/client/client.go +++ b/client/client.go @@ -229,15 +229,15 @@ func New(ctx context.Context, opts ...OptionFunc) (*Client, error) { opt(c) } - // Create the core bus - c.core.bus = bus.New(c.core.prefix, c.core.nc, c.core.log) - // Start the core, if it is not already started err := c.core.Start() if err != nil { return nil, errors.Wrap(err, "failed to start core") } + // Create the core bus + c.core.bus = bus.New(c.core.prefix, c.core.nc, c.core.log) + // Extract a SubBus from that core bus (NOTE: must come after core is started so that NATS connection exists) c.bus = c.core.bus.SubBus()