Skip to content

Commit

Permalink
fix: fix close channel
Browse files Browse the repository at this point in the history
  • Loading branch information
ajinkya-verloop committed Oct 16, 2023
1 parent b83c269 commit 1abbb11
Showing 1 changed file with 30 additions and 9 deletions.
39 changes: 30 additions & 9 deletions hedwig.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,14 @@ type Settings struct {

type Hedwig struct {
sync.Mutex
wg *sync.WaitGroup
Settings *Settings
Error error
conn *amqp.Connection
channels map[string]*amqp.Channel
consumeTags map[string]bool
closedChan chan *amqp.Error
wg *sync.WaitGroup
Settings *Settings
Error error
conn *amqp.Connection
channels map[string]*amqp.Channel
consumeTags map[string]bool
closedChan chan *amqp.Error
customCloseChan chan *amqp.Error
}

func (h *Hedwig) AddQueue(qSetting *QueueSetting, qName string) error {
Expand Down Expand Up @@ -142,10 +143,10 @@ func (h *Hedwig) PublishWithHeaders(key string, body []byte, headers map[string]
// Only way to resolve this to restart the service to reconnect.

// We manually check for error while publishing and if we get an error which says connection has been closed, we
// notify on closedChan so that hedwig reconnects to RMQ
// notify on customCloseChan so that hedwig reconnects to RMQ
if errors.Is(err, amqp.ErrClosed) {
logrus.WithError(err).Error("Publish failed, reconnecting")
h.closedChan <- amqp.ErrClosed
h.customCloseChan <- amqp.ErrClosed
}
return err
}
Expand Down Expand Up @@ -302,6 +303,7 @@ func (h *Hedwig) connect() (err error) {
return
}
h.closedChan = make(chan *amqp.Error)
h.customCloseChan = make(chan *amqp.Error)

h.conn.NotifyClose(h.closedChan)
go func() {
Expand All @@ -323,5 +325,24 @@ func (h *Hedwig) connect() (err error) {

}()

go func() {
closeErr, ok := <-h.customCloseChan
if !ok {
logrus.Warning("customCloseChan is closed")
return
}
logrus.WithError(closeErr).Error("Recieved a connection closed event")
h.Lock()
defer h.Unlock()
h.conn = nil
h.channels = make(map[string]*amqp.Channel)
h.consumeTags = make(map[string]bool)
h.wg = &sync.WaitGroup{}
if h.Error == nil {
h.Error = closeErr
}

}()

return
}

0 comments on commit 1abbb11

Please sign in to comment.