-
Notifications
You must be signed in to change notification settings - Fork 134
/
Copy pathpublish_flow_block.go
43 lines (38 loc) · 1.43 KB
/
publish_flow_block.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package rabbitmq
import (
amqp "github.com/rabbitmq/amqp091-go"
)
func (publisher *Publisher) startNotifyFlowHandler() {
notifyFlowChan := publisher.chanManager.NotifyFlowSafe(make(chan bool))
publisher.disablePublishDueToFlowMu.Lock()
publisher.disablePublishDueToFlow = false
publisher.disablePublishDueToFlowMu.Unlock()
for ok := range notifyFlowChan {
publisher.disablePublishDueToFlowMu.Lock()
if ok {
publisher.options.Logger.Warnf("pausing publishing due to flow request from server")
publisher.disablePublishDueToFlow = true
} else {
publisher.disablePublishDueToFlow = false
publisher.options.Logger.Warnf("resuming publishing due to flow request from server")
}
publisher.disablePublishDueToFlowMu.Unlock()
}
}
func (publisher *Publisher) startNotifyBlockedHandler() {
blockings := publisher.connManager.NotifyBlockedSafe(make(chan amqp.Blocking))
publisher.disablePublishDueToBlockedMu.Lock()
publisher.disablePublishDueToBlocked = false
publisher.disablePublishDueToBlockedMu.Unlock()
for b := range blockings {
publisher.disablePublishDueToBlockedMu.Lock()
if b.Active {
publisher.options.Logger.Warnf("pausing publishing due to TCP blocking from server")
publisher.disablePublishDueToBlocked = true
} else {
publisher.disablePublishDueToBlocked = false
publisher.options.Logger.Warnf("resuming publishing due to TCP blocking from server")
}
publisher.disablePublishDueToBlockedMu.Unlock()
}
}