Skip to content

Commit

Permalink
feat: unsubscribe from subscription (#3557)
Browse files Browse the repository at this point in the history
* feat: unsubscribe from subscription

* fix: resolve TODO item in connection testing
  • Loading branch information
mathnogueira authored Jan 24, 2024
1 parent 1c22e62 commit 969efca
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 31 deletions.
2 changes: 1 addition & 1 deletion server/executor/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ type testSuiteRunGetter interface {
}

type subscriptor interface {
Subscribe(string, subscription.Subscriber)
Subscribe(string, subscription.Subscriber) subscription.Subscription
}

type queueConfigurer[T any] struct {
Expand Down
4 changes: 2 additions & 2 deletions server/executor/test_suite_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,11 @@ func runTestSuiteRunnerTest(t *testing.T, withErrors bool, assert func(t *testin

return nil
})
subscriptionManager.Subscribe(transactionRun.ResourceID(), sf)
subscription := subscriptionManager.Subscribe(transactionRun.ResourceID(), sf)

select {
case finalRun := <-done:
subscriptionManager.Unsubscribe(transactionRun.ResourceID(), sf.ID()) //cleanup to avoid race conditions
subscription.Unsubscribe()
assert(t, finalRun)
case <-time.After(10 * time.Second):
t.Log("timeout after 10 second")
Expand Down
6 changes: 5 additions & 1 deletion server/http/websocket/unsubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ func (e unsubscribeCommandExecutor) Execute(conn *websocket.Conn, message []byte
return
}

e.subscriptionManager.Unsubscribe(msg.Resource, msg.SubscriptionId)
subscription := e.subscriptionManager.GetSubscription(msg.Resource, msg.SubscriptionId)
err = subscription.Unsubscribe()
if err != nil {
conn.WriteJSON(ErrorMessage(fmt.Errorf("could not unsubscribe: %w", err)))
}

conn.WriteJSON(UnsubscribeSuccess())
}
28 changes: 23 additions & 5 deletions server/subscription/in_memory_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,46 @@ import (
)

type inMemoryManager struct {
subscriptions map[string][]Subscriber
subscribers map[string][]Subscriber
subscriptions *subscriptionStorage
mutex sync.Mutex
}

type inMemorySubscription struct {
unsubscribeFn func()
}

func (s *inMemorySubscription) Unsubscribe() error {
s.unsubscribeFn()
return nil
}

func (m *inMemoryManager) getSubscribers(resourceID string) []Subscriber {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.subscriptions[resourceID]
return m.subscribers[resourceID]
}

func (m *inMemoryManager) setSubscribers(resourceID string, subscribers []Subscriber) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.subscriptions[resourceID] = subscribers
m.subscribers[resourceID] = subscribers
}

func (m *inMemoryManager) Subscribe(resourceID string, subscriber Subscriber) {
func (m *inMemoryManager) Subscribe(resourceID string, subscriber Subscriber) Subscription {
subscribers := append(m.getSubscribers(resourceID), subscriber)
m.setSubscribers(resourceID, subscribers)

return &inMemorySubscription{
unsubscribeFn: func() { m.unsubscribe(resourceID, subscriber.ID()) },
}
}

func (m *inMemoryManager) GetSubscription(resourceID string, subscriptionID string) Subscription {
return m.subscriptions.Get(resourceID, subscriptionID)
}

func (m *inMemoryManager) Unsubscribe(resourceID string, subscriptionID string) {
func (m *inMemoryManager) unsubscribe(resourceID string, subscriptionID string) {
subscribers := m.getSubscribers(resourceID)

updated := make([]Subscriber, 0, len(subscribers)-1)
Expand Down
4 changes: 2 additions & 2 deletions server/subscription/in_memory_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ func TestManagerUnsubscribe(t *testing.T) {
Content: "Test was deleted",
}

manager.Subscribe("test:1", subscriber)
subscription := manager.Subscribe("test:1", subscriber)
manager.PublishUpdate(message1)

assert.Equal(t, message1.Type, receivedMessage.Type)
assert.Equal(t, message1.Content, receivedMessage.Content)

manager.Unsubscribe("test:1", subscriber.ID())
subscription.Unsubscribe()
manager.PublishUpdate(message2)

assert.Equal(t, message1.Type, receivedMessage.Type, "subscriber should not be notified after unsubscribed")
Expand Down
41 changes: 37 additions & 4 deletions server/subscription/manager.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
package subscription

import (
"fmt"
"sync"

"github.com/nats-io/nats.go"
)

type Manager interface {
Subscribe(resourceID string, subscriber Subscriber)
Unsubscribe(resourceID string, subscriptionID string)
Subscribe(resourceID string, subscriber Subscriber) Subscription
GetSubscription(resourceID string, subscriptionID string) Subscription

PublishUpdate(message Message)
Publish(resourceID string, message any)
}

type Subscription interface {
Unsubscribe() error
}

type optFn func(*options)

type options struct {
Expand All @@ -34,11 +39,39 @@ func NewManager(opts ...optFn) Manager {
}

if currentOptions.conn != nil {
return &natsManager{currentOptions.conn}
return &natsManager{
currentOptions.conn,
newSubscriptionStorage(),
}
}

return &inMemoryManager{
subscriptions: make(map[string][]Subscriber),
subscribers: make(map[string][]Subscriber),
subscriptions: newSubscriptionStorage(),
mutex: sync.Mutex{},
}
}

type subscriptionStorage struct {
subscriptions map[string]Subscription
}

func newSubscriptionStorage() *subscriptionStorage {
return &subscriptionStorage{
subscriptions: make(map[string]Subscription),
}
}

func (s *subscriptionStorage) Get(resourceID, subscriberID string) Subscription {
key := s.key(resourceID, subscriberID)
return s.subscriptions[key]
}

func (s *subscriptionStorage) Set(resourceID, subscriberID string, subscription Subscription) {
key := s.key(resourceID, subscriberID)
s.subscriptions[key] = subscription
}

func (s *subscriptionStorage) key(resourceID, subscriberID string) string {
return fmt.Sprintf("%s-%s", resourceID, subscriberID)
}
15 changes: 9 additions & 6 deletions server/subscription/nats_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (
)

type natsManager struct {
conn *nats.Conn
conn *nats.Conn
subscriptions *subscriptionStorage
}

func (m *natsManager) Subscribe(resourceID string, subscriber Subscriber) {
_, err := m.conn.Subscribe(resourceID, func(msg *nats.Msg) {
func (m *natsManager) Subscribe(resourceID string, subscriber Subscriber) Subscription {
subscription, err := m.conn.Subscribe(resourceID, func(msg *nats.Msg) {
decoded, err := DecodeMessage(msg.Data)
if err != nil {
log.Printf("cannot unmarshall incoming nats message: %s", err.Error())
Expand All @@ -25,12 +26,14 @@ func (m *natsManager) Subscribe(resourceID string, subscriber Subscriber) {
})
if err != nil {
log.Printf("cannot subscribe to nats topic: %s", err.Error())
return
return nil
}

return subscription
}

func (m *natsManager) Unsubscribe(resourceID string, subscriptionID string) {
panic("nats unsubscribe not implemented")
func (m *natsManager) GetSubscription(resourceID string, subscriptionID string) Subscription {
return m.subscriptions.Get(resourceID, subscriptionID)
}

func (m *natsManager) PublishUpdate(message Message) {
Expand Down
7 changes: 0 additions & 7 deletions server/subscription/subscription.go

This file was deleted.

5 changes: 2 additions & 3 deletions server/testconnection/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,8 @@ func (t *OTLPConnectionTester) GetSpanCount(ctx context.Context, opts ...GetSpan
return nil
})

t.subscriptionManager.Subscribe(topicName, subscriber)
// TODO: implement subscription
// defer t.subscriptionManager.Unsubscribe(topicName, subscriber.ID())
subscription := t.subscriptionManager.Subscribe(topicName, subscriber)
defer subscription.Unsubscribe()

t.subscriptionManager.Publish(GetSpanCountTopicName(WithTenantID(tenantID)), OTLPConnectionTestRequest{})

Expand Down

0 comments on commit 969efca

Please sign in to comment.