-
Notifications
You must be signed in to change notification settings - Fork 0
/
priority_test.go
134 lines (110 loc) · 4.26 KB
/
priority_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package emitter
import (
"sync"
"testing"
)
// TestPriorityOrdering checks if the Emitter calls listeners in the correct order of their priorities.
func TestPriorityOrdering(t *testing.T) {
em := NewMemoryEmitter()
var mu sync.Mutex // Mutex to protect access to callOrder slice.
topic := "test_priority_topic"
callOrder := make([]Priority, 0)
var wg sync.WaitGroup // WaitGroup to wait for listeners to finish.
// Helper function to subscribe to the emitter with synchronization.
subscribeWithPriority := func(priority Priority) {
wg.Add(1) // Increment the WaitGroup counter.
em.On(topic, func(e Event) error {
defer wg.Done() // Decrement the counter when the function completes.
mu.Lock() // Lock the mutex to safely append to callOrder.
callOrder = append(callOrder, priority)
mu.Unlock() // Unlock the mutex after appending.
return nil
}, WithPriority(priority))
}
// Set up listeners with different priorities.
subscribeWithPriority(High)
subscribeWithPriority(Low)
subscribeWithPriority(Normal)
subscribeWithPriority(Lowest)
subscribeWithPriority(Highest)
// Emit an event to the topic
em.Emit(topic, "test_payload")
wg.Wait() // Wait for all listeners to process the event
// Verify the call order of listeners matches the expected priority order.
expectedOrder := []Priority{Highest, High, Normal, Low, Lowest}
mu.Lock() // Lock the mutex to safely read callOrder.
defer mu.Unlock()
if len(callOrder) != len(expectedOrder) {
t.Fatalf("Expected call order length to be %d, got %d", len(expectedOrder), len(callOrder))
}
for i, priority := range expectedOrder {
if callOrder[i] != priority {
t.Errorf("Expected priority %v at index %d, got %v", priority, i, callOrder[i])
}
}
}
// TestEmitSyncWithAbort tests the synchronous EmitSync method with a listener that aborts the event.
func TestEmitSyncWithAbort(t *testing.T) {
emitter := NewMemoryEmitter()
// Create three listeners with different priorities.
highPriorityListener := func(e Event) error {
// This listener has the lowest priority and should be called first.
return nil
}
abortingListener := func(e Event) error {
// This listener aborts the event processing.
e.SetAborted(true)
return nil
}
lowPriorityListener := func(e Event) error {
t.Error("The low priority listener should not be called after the event is aborted")
return nil
}
// Subscribe the listeners to the "testTopic".
emitter.On("testTopic", lowPriorityListener, WithPriority(Low)) // This should not be called.
emitter.On("testTopic", abortingListener, WithPriority(Normal)) // This listener will abort the event.
emitter.On("testTopic", highPriorityListener, WithPriority(High))
// Emit the event synchronously.
emitter.EmitSync("testTopic", "testPayload")
}
// TestEmitWithAbort tests the asynchronous Emit method with a listener that aborts the event.
func TestEmitWithAbort(t *testing.T) {
emitter := NewMemoryEmitter()
// Create three listeners with different priorities.
highPriorityListener := func(e Event) error {
// This listener has the highest priority and should be called first.
return nil
}
abortingListener := func(e Event) error {
// This listener aborts the event processing.
e.SetAborted(true)
return nil
}
lowPriorityListenerCalled := false
lowPriorityListener := func(e Event) error {
// This flag should remain false if the event processing is correctly aborted.
lowPriorityListenerCalled = true
return nil
}
// Subscribe the listeners to the "testTopic".
emitter.On("testTopic", lowPriorityListener, WithPriority(Low)) // This should not be called.
emitter.On("testTopic", abortingListener, WithPriority(Normal)) // This listener will abort the event.
emitter.On("testTopic", highPriorityListener, WithPriority(High))
// Emit the event asynchronously.
errChan := emitter.Emit("testTopic", "testPayload")
// Wait for all errors to be collected.
var emitErrors []error
for err := range errChan {
if err != nil {
emitErrors = append(emitErrors, err)
}
}
// Check that the low priority listener was not called.
if lowPriorityListenerCalled {
t.Error("The low priority listener should not have been called")
}
// Check that there were no errors during emission.
if len(emitErrors) != 0 {
t.Errorf("Emit() resulted in errors: %v", emitErrors)
}
}