From 969efca407f14e0fc256d69fef3d7def705157e6 Mon Sep 17 00:00:00 2001 From: Matheus Nogueira Date: Wed, 24 Jan 2024 18:08:10 -0300 Subject: [PATCH] feat: unsubscribe from subscription (#3557) * feat: unsubscribe from subscription * fix: resolve TODO item in connection testing --- server/executor/queue.go | 2 +- server/executor/test_suite_runner_test.go | 4 +- server/http/websocket/unsubscribe.go | 6 ++- server/subscription/in_memory_manager.go | 28 ++++++++++--- server/subscription/in_memory_manager_test.go | 4 +- server/subscription/manager.go | 41 +++++++++++++++++-- server/subscription/nats_manager.go | 15 ++++--- server/subscription/subscription.go | 7 ---- server/testconnection/otlp.go | 5 +-- 9 files changed, 81 insertions(+), 31 deletions(-) delete mode 100644 server/subscription/subscription.go diff --git a/server/executor/queue.go b/server/executor/queue.go index fe3e354725..7fb7dc0146 100644 --- a/server/executor/queue.go +++ b/server/executor/queue.go @@ -179,7 +179,7 @@ type testSuiteRunGetter interface { } type subscriptor interface { - Subscribe(string, subscription.Subscriber) + Subscribe(string, subscription.Subscriber) subscription.Subscription } type queueConfigurer[T any] struct { diff --git a/server/executor/test_suite_runner_test.go b/server/executor/test_suite_runner_test.go index 7f45c2d7b5..fb9b3c24ce 100644 --- a/server/executor/test_suite_runner_test.go +++ b/server/executor/test_suite_runner_test.go @@ -192,11 +192,11 @@ func runTestSuiteRunnerTest(t *testing.T, withErrors bool, assert func(t *testin return nil }) - subscriptionManager.Subscribe(transactionRun.ResourceID(), sf) + subscription := subscriptionManager.Subscribe(transactionRun.ResourceID(), sf) select { case finalRun := <-done: - subscriptionManager.Unsubscribe(transactionRun.ResourceID(), sf.ID()) //cleanup to avoid race conditions + subscription.Unsubscribe() assert(t, finalRun) case <-time.After(10 * time.Second): t.Log("timeout after 10 second") diff --git a/server/http/websocket/unsubscribe.go b/server/http/websocket/unsubscribe.go index 430c609e67..151a7b1d60 100644 --- a/server/http/websocket/unsubscribe.go +++ b/server/http/websocket/unsubscribe.go @@ -36,7 +36,11 @@ func (e unsubscribeCommandExecutor) Execute(conn *websocket.Conn, message []byte return } - e.subscriptionManager.Unsubscribe(msg.Resource, msg.SubscriptionId) + subscription := e.subscriptionManager.GetSubscription(msg.Resource, msg.SubscriptionId) + err = subscription.Unsubscribe() + if err != nil { + conn.WriteJSON(ErrorMessage(fmt.Errorf("could not unsubscribe: %w", err))) + } conn.WriteJSON(UnsubscribeSuccess()) } diff --git a/server/subscription/in_memory_manager.go b/server/subscription/in_memory_manager.go index d29b30f464..f935a4b358 100644 --- a/server/subscription/in_memory_manager.go +++ b/server/subscription/in_memory_manager.go @@ -6,28 +6,46 @@ import ( ) type inMemoryManager struct { - subscriptions map[string][]Subscriber + subscribers map[string][]Subscriber + subscriptions *subscriptionStorage mutex sync.Mutex } +type inMemorySubscription struct { + unsubscribeFn func() +} + +func (s *inMemorySubscription) Unsubscribe() error { + s.unsubscribeFn() + return nil +} + func (m *inMemoryManager) getSubscribers(resourceID string) []Subscriber { m.mutex.Lock() defer m.mutex.Unlock() - return m.subscriptions[resourceID] + return m.subscribers[resourceID] } func (m *inMemoryManager) setSubscribers(resourceID string, subscribers []Subscriber) { m.mutex.Lock() defer m.mutex.Unlock() - m.subscriptions[resourceID] = subscribers + m.subscribers[resourceID] = subscribers } -func (m *inMemoryManager) Subscribe(resourceID string, subscriber Subscriber) { +func (m *inMemoryManager) Subscribe(resourceID string, subscriber Subscriber) Subscription { subscribers := append(m.getSubscribers(resourceID), subscriber) m.setSubscribers(resourceID, subscribers) + + return &inMemorySubscription{ + unsubscribeFn: func() { m.unsubscribe(resourceID, subscriber.ID()) }, + } +} + +func (m *inMemoryManager) GetSubscription(resourceID string, subscriptionID string) Subscription { + return m.subscriptions.Get(resourceID, subscriptionID) } -func (m *inMemoryManager) Unsubscribe(resourceID string, subscriptionID string) { +func (m *inMemoryManager) unsubscribe(resourceID string, subscriptionID string) { subscribers := m.getSubscribers(resourceID) updated := make([]Subscriber, 0, len(subscribers)-1) diff --git a/server/subscription/in_memory_manager_test.go b/server/subscription/in_memory_manager_test.go index bb9fbd2196..2d3d063025 100644 --- a/server/subscription/in_memory_manager_test.go +++ b/server/subscription/in_memory_manager_test.go @@ -101,13 +101,13 @@ func TestManagerUnsubscribe(t *testing.T) { Content: "Test was deleted", } - manager.Subscribe("test:1", subscriber) + subscription := manager.Subscribe("test:1", subscriber) manager.PublishUpdate(message1) assert.Equal(t, message1.Type, receivedMessage.Type) assert.Equal(t, message1.Content, receivedMessage.Content) - manager.Unsubscribe("test:1", subscriber.ID()) + subscription.Unsubscribe() manager.PublishUpdate(message2) assert.Equal(t, message1.Type, receivedMessage.Type, "subscriber should not be notified after unsubscribed") diff --git a/server/subscription/manager.go b/server/subscription/manager.go index ac24c75408..5e3f68b043 100644 --- a/server/subscription/manager.go +++ b/server/subscription/manager.go @@ -1,19 +1,24 @@ package subscription import ( + "fmt" "sync" "github.com/nats-io/nats.go" ) type Manager interface { - Subscribe(resourceID string, subscriber Subscriber) - Unsubscribe(resourceID string, subscriptionID string) + Subscribe(resourceID string, subscriber Subscriber) Subscription + GetSubscription(resourceID string, subscriptionID string) Subscription PublishUpdate(message Message) Publish(resourceID string, message any) } +type Subscription interface { + Unsubscribe() error +} + type optFn func(*options) type options struct { @@ -34,11 +39,39 @@ func NewManager(opts ...optFn) Manager { } if currentOptions.conn != nil { - return &natsManager{currentOptions.conn} + return &natsManager{ + currentOptions.conn, + newSubscriptionStorage(), + } } return &inMemoryManager{ - subscriptions: make(map[string][]Subscriber), + subscribers: make(map[string][]Subscriber), + subscriptions: newSubscriptionStorage(), mutex: sync.Mutex{}, } } + +type subscriptionStorage struct { + subscriptions map[string]Subscription +} + +func newSubscriptionStorage() *subscriptionStorage { + return &subscriptionStorage{ + subscriptions: make(map[string]Subscription), + } +} + +func (s *subscriptionStorage) Get(resourceID, subscriberID string) Subscription { + key := s.key(resourceID, subscriberID) + return s.subscriptions[key] +} + +func (s *subscriptionStorage) Set(resourceID, subscriberID string, subscription Subscription) { + key := s.key(resourceID, subscriberID) + s.subscriptions[key] = subscription +} + +func (s *subscriptionStorage) key(resourceID, subscriberID string) string { + return fmt.Sprintf("%s-%s", resourceID, subscriberID) +} diff --git a/server/subscription/nats_manager.go b/server/subscription/nats_manager.go index c41c6da814..b0cf7385cb 100644 --- a/server/subscription/nats_manager.go +++ b/server/subscription/nats_manager.go @@ -7,11 +7,12 @@ import ( ) type natsManager struct { - conn *nats.Conn + conn *nats.Conn + subscriptions *subscriptionStorage } -func (m *natsManager) Subscribe(resourceID string, subscriber Subscriber) { - _, err := m.conn.Subscribe(resourceID, func(msg *nats.Msg) { +func (m *natsManager) Subscribe(resourceID string, subscriber Subscriber) Subscription { + subscription, err := m.conn.Subscribe(resourceID, func(msg *nats.Msg) { decoded, err := DecodeMessage(msg.Data) if err != nil { log.Printf("cannot unmarshall incoming nats message: %s", err.Error()) @@ -25,12 +26,14 @@ func (m *natsManager) Subscribe(resourceID string, subscriber Subscriber) { }) if err != nil { log.Printf("cannot subscribe to nats topic: %s", err.Error()) - return + return nil } + + return subscription } -func (m *natsManager) Unsubscribe(resourceID string, subscriptionID string) { - panic("nats unsubscribe not implemented") +func (m *natsManager) GetSubscription(resourceID string, subscriptionID string) Subscription { + return m.subscriptions.Get(resourceID, subscriptionID) } func (m *natsManager) PublishUpdate(message Message) { diff --git a/server/subscription/subscription.go b/server/subscription/subscription.go deleted file mode 100644 index e2279e5c04..0000000000 --- a/server/subscription/subscription.go +++ /dev/null @@ -1,7 +0,0 @@ -package subscription - -// Subscription represents a group of subscribers that are waiting for updates of a specific resource -type Subscription struct { - resourceID string - subscribers []Subscriber -} diff --git a/server/testconnection/otlp.go b/server/testconnection/otlp.go index 5a95b8dd7b..6595694d2a 100644 --- a/server/testconnection/otlp.go +++ b/server/testconnection/otlp.go @@ -105,9 +105,8 @@ func (t *OTLPConnectionTester) GetSpanCount(ctx context.Context, opts ...GetSpan return nil }) - t.subscriptionManager.Subscribe(topicName, subscriber) - // TODO: implement subscription - // defer t.subscriptionManager.Unsubscribe(topicName, subscriber.ID()) + subscription := t.subscriptionManager.Subscribe(topicName, subscriber) + defer subscription.Unsubscribe() t.subscriptionManager.Publish(GetSpanCountTopicName(WithTenantID(tenantID)), OTLPConnectionTestRequest{})