-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpublisher.go
76 lines (66 loc) · 2.01 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package asyncp
import (
"context"
"github.com/geniusrabbit/notificationcenter/v2"
)
// Publisher writing interface
type Publisher = notificationcenter.Publisher
// DefaultRetranslateCount shows amount of event repeating in the pipeline
const DefaultRetranslateCount = 30
// Retranslator of the event to the stream
func Retranslator(repeatMaxCount int, pubs ...Publisher) Task {
if repeatMaxCount <= 0 {
repeatMaxCount = DefaultRetranslateCount
}
return FuncTask(func(ctx context.Context, event Event, responseWriter ResponseWriter) error {
if _, repeats := event.Counters(); repeats > repeatMaxCount {
return responseWriter.WriteResonse(event.WithError(ErrSkipEvent))
}
for _, pub := range pubs {
if err := pub.Publish(ctx, event); err != nil {
return err
}
}
return responseWriter.WriteResonse(event)
})
}
// Repeater send same event to the same set of pipelines
func Repeater(repeatMaxCount ...int) Task {
maxRepears := DefaultRetranslateCount
if len(repeatMaxCount) > 0 && repeatMaxCount[0] > 0 {
maxRepears = repeatMaxCount[0]
}
return FuncTask(func(ctx context.Context, event Event, responseWriter ResponseWriter) error {
if event.Name() == "" {
return nil
}
if _, repeats := event.Counters(); repeats > maxRepears {
return responseWriter.WriteResonse(event.WithError(ErrSkipEvent))
}
return responseWriter.RepeatWithResponse(event)
})
}
type publisherEventWrapper struct {
name string
pub Publisher
mux *TaskMux
}
// PublisherEventWrapper with fixed event name
func PublisherEventWrapper(eventName string, publisher Publisher) Publisher {
return &publisherEventWrapper{
name: eventName,
pub: publisher,
}
}
func (wr *publisherEventWrapper) SetMux(mux *TaskMux) {
wr.mux = mux
}
func (wr *publisherEventWrapper) Publish(ctx context.Context, messages ...any) error {
events := make([]any, 0, len(messages))
for _, msg := range messages {
event := WithPayload(wr.name, msg)
event.SetMux(wr.mux)
events = append(events, event)
}
return wr.pub.Publish(ctx, events...)
}