Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempt to fix leak #34

Open
wants to merge 4 commits into
base: issue-26
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 133 additions & 6 deletions client/bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import (
"sync"

"github.com/CyCoreSystems/ari/v5"
"github.com/CyCoreSystems/ari/v5/stdbus"
"github.com/inconshreveable/log15"
"github.com/pkg/errors"

"github.com/nats-io/nats.go"
)

// subscriptionEventBufferSize defines the number of events that each
// subscription will queue before accepting more events.
var subscriptionEventBufferSize = 100

// busWrapper binds a NATS subject to an ari.Bus, passing any received NATS messages to that bus
type busWrapper struct {
subject string
Expand All @@ -20,7 +23,23 @@ type busWrapper struct {

sub *nats.Subscription

bus ari.Bus
subs []*subscription // The list of subscriptions

rwMux sync.RWMutex

closed bool
}

// A Subscription is a wrapped channel for receiving
// events from the ARI event bus.
type subscription struct {
key *ari.Key
b *busWrapper // reference to the event bus
events []string // list of events to listen for

mu sync.RWMutex
closed bool // channel closure protection flag
C chan ari.Event // channel for sending events to the subscriber
}

func newBusWrapper(subject string, nc *nats.EncodedConn, log log15.Logger) (*busWrapper, error) {
Expand All @@ -29,7 +48,7 @@ func newBusWrapper(subject string, nc *nats.EncodedConn, log log15.Logger) (*bus
w := &busWrapper{
subject: subject,
log: log,
bus: stdbus.New(),
subs: []*subscription{},
}

w.sub, err = nc.Subscribe(subject, func(m *nats.Msg) {
Expand All @@ -43,20 +62,128 @@ func newBusWrapper(subject string, nc *nats.EncodedConn, log log15.Logger) (*bus
}

func (w *busWrapper) receive(o *nats.Msg) {
var matched bool

e, err := ari.DecodeEvent(o.Data)
if err != nil {
w.log.Error("failed to convert received message to ari.Event", "error", err)
return
}

w.bus.Send(e)
w.rwMux.RLock()
// Disseminate the message to the subscribers
for _, s := range w.subs {
matched = false
for _, k := range e.Keys() {
if matched {
break
}

if s.key.Match(k) {
matched = true

for _, topic := range s.events {
if topic == e.GetType() || topic == ari.Events.All {
s.mu.RLock()
if !s.closed {
select {
case s.C <- e:
default: // never block
}
}
s.mu.RUnlock()
}
}
}
}
}
w.rwMux.RUnlock()
}

// Subscribe returns a subscription to the given list
// of event types
func (w *busWrapper) Subscribe(key *ari.Key, eTypes ...string) ari.Subscription {
s := &subscription{
key: key,
b: w,
events: eTypes,
C: make(chan ari.Event, subscriptionEventBufferSize),
}
w.add(s)

return s
}

// add appends a new subscription to the bus
func (w *busWrapper) add(s *subscription) {
w.rwMux.Lock()
w.subs = append(w.subs, s)
w.rwMux.Unlock()
}

// remove deletes the given subscription from the bus
func (w *busWrapper) remove(s *subscription) {
w.rwMux.Lock()
for i, si := range w.subs {
if s == si {
// Subs are pointers, so we have to explicitly remove them
// to prevent memory leaks
w.subs[i] = w.subs[len(w.subs)-1] // replace the current with the end
w.subs[len(w.subs)-1] = nil // remove the end
w.subs = w.subs[:len(w.subs)-1] // lop off the end

break
}
}
w.rwMux.Unlock()
}

func (w *busWrapper) Close() {
if w.closed {
return
}
if err := w.sub.Unsubscribe(); err != nil {
w.log.Error("failed to unsubscribe when closing NATS subscription:", err)
}
w.bus.Close()
w.closed = true
for _, s := range w.subs {
s.Cancel()
}
}

// Events returns the events channel
func (s *subscription) Events() <-chan ari.Event {
return s.C
}

// Cancel cancels the subscription and removes it from
// the event bus.
func (s *subscription) Cancel() {
if s == nil {
return
}
s.mu.Lock()

if s.closed {
s.mu.Unlock()
return
}

s.closed = true

s.mu.Unlock()

// Remove the subscription from the bus
if s.b != nil {
s.b.remove(s)
}

// Close the subscription's deliver channel
if s.C != nil {
close(s.C)
s.C = nil
}
s = nil
}

// Bus provides an ari.Bus interface to NATS
Expand Down Expand Up @@ -171,5 +298,5 @@ func (b *Bus) Subscribe(key *ari.Key, n ...string) ari.Subscription {
}
b.mu.Unlock()

return w.bus.Subscribe(key, n...)
return w.Subscribe(key, n...)
}
6 changes: 3 additions & 3 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,15 +229,15 @@ func New(ctx context.Context, opts ...OptionFunc) (*Client, error) {
opt(c)
}

// Create the core bus
c.core.bus = bus.New(c.core.prefix, c.core.nc, c.core.log)

// Start the core, if it is not already started
err := c.core.Start()
if err != nil {
return nil, errors.Wrap(err, "failed to start core")
}

// Create the core bus
c.core.bus = bus.New(c.core.prefix, c.core.nc, c.core.log)

// Extract a SubBus from that core bus (NOTE: must come after core is started so that NATS connection exists)
c.bus = c.core.bus.SubBus()

Expand Down