forked from fiatjaf/khatru
-
Notifications
You must be signed in to change notification settings - Fork 0
/
listener.go
146 lines (128 loc) · 4.49 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package khatru
import (
"context"
"errors"
"slices"
"github.com/nbd-wtf/go-nostr"
)
var ErrSubscriptionClosedByClient = errors.New("subscription closed by client")
type listenerSpec struct {
id string // kept here so we can easily match against it removeListenerId
cancel context.CancelCauseFunc
index int
subrelay *Relay // this is important when we're dealing with routing, otherwise it will be always the same
}
type listener struct {
id string // duplicated here so we can easily send it on notifyListeners
filter nostr.Filter
ws *WebSocket
}
func (rl *Relay) GetListeningFilters() []nostr.Filter {
respfilters := make([]nostr.Filter, len(rl.listeners))
for i, l := range rl.listeners {
respfilters[i] = l.filter
}
return respfilters
}
// addListener may be called multiple times for each id and ws -- in which case each filter will
// be added as an independent listener
func (rl *Relay) addListener(
ws *WebSocket,
id string,
subrelay *Relay,
filter nostr.Filter,
cancel context.CancelCauseFunc,
) {
rl.clientsMutex.Lock()
defer rl.clientsMutex.Unlock()
if specs, ok := rl.clients[ws]; ok /* this will always be true unless client has disconnected very rapidly */ {
idx := len(subrelay.listeners)
rl.clients[ws] = append(specs, listenerSpec{
id: id,
cancel: cancel,
subrelay: subrelay,
index: idx,
})
subrelay.listeners = append(subrelay.listeners, listener{
ws: ws,
id: id,
filter: filter,
})
}
}
// remove a specific subscription id from listeners for a given ws client
// and cancel its specific context
func (rl *Relay) removeListenerId(ws *WebSocket, id string) {
rl.clientsMutex.Lock()
defer rl.clientsMutex.Unlock()
if specs, ok := rl.clients[ws]; ok {
// swap delete specs that match this id
for s := len(specs) - 1; s >= 0; s-- {
spec := specs[s]
if spec.id == id {
spec.cancel(ErrSubscriptionClosedByClient)
specs[s] = specs[len(specs)-1]
specs = specs[0 : len(specs)-1]
rl.clients[ws] = specs
// swap delete listeners one at a time, as they may be each in a different subrelay
srl := spec.subrelay // == rl in normal cases, but different when this came from a route
if spec.index != len(srl.listeners)-1 {
movedFromIndex := len(srl.listeners) - 1
moved := srl.listeners[movedFromIndex] // this wasn't removed, but will be moved
srl.listeners[spec.index] = moved
// now we must update the the listener we just moved
// so its .index reflects its new position on srl.listeners
movedSpecs := rl.clients[moved.ws]
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
return ls.index == movedFromIndex && ls.subrelay == srl
})
movedSpecs[idx].index = spec.index
rl.clients[moved.ws] = movedSpecs
}
srl.listeners = srl.listeners[0 : len(srl.listeners)-1] // finally reduce the slice length
}
}
}
}
func (rl *Relay) removeClientAndListeners(ws *WebSocket) {
rl.clientsMutex.Lock()
defer rl.clientsMutex.Unlock()
if specs, ok := rl.clients[ws]; ok {
// swap delete listeners and delete client (all specs will be deleted)
for s, spec := range specs {
// no need to cancel contexts since they inherit from the main connection context
// just delete the listeners (swap-delete)
srl := spec.subrelay
if spec.index != len(srl.listeners)-1 {
movedFromIndex := len(srl.listeners) - 1
moved := srl.listeners[movedFromIndex] // this wasn't removed, but will be moved
srl.listeners[spec.index] = moved
// temporarily update the spec of the listener being removed to have index == -1
// (since it was removed) so it doesn't match in the search below
rl.clients[ws][s].index = -1
// now we must update the the listener we just moved
// so its .index reflects its new position on srl.listeners
movedSpecs := rl.clients[moved.ws]
idx := slices.IndexFunc(movedSpecs, func(ls listenerSpec) bool {
return ls.index == movedFromIndex && ls.subrelay == srl
})
movedSpecs[idx].index = spec.index
rl.clients[moved.ws] = movedSpecs
}
srl.listeners = srl.listeners[0 : len(srl.listeners)-1] // finally reduce the slice length
}
}
delete(rl.clients, ws)
}
func (rl *Relay) notifyListeners(event *nostr.Event) {
for _, listener := range rl.listeners {
if listener.filter.Matches(event) {
for _, pb := range rl.PreventBroadcast {
if pb(listener.ws, event) {
return
}
}
listener.ws.WriteJSON(nostr.EventEnvelope{SubscriptionID: &listener.id, Event: *event})
}
}
}