diff --git a/nsqd/channel.go b/nsqd/channel.go index f2452ee27..bbed70a00 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -316,22 +316,44 @@ func (c *Channel) PutMessage(m *Message) error { func (c *Channel) put(m *Message) error { if c.topologyAwareConsumption { + // Attempt zone local, region local and finally the memory channel + // we do this to ensure that we preferentially deliver messages based on toplogy + // + // Because messagePump is intermittently unavailable while writing a msg to a client + // we continue to have higher priority channels in the select loop, this means at each + // attempt a higher priority channel can still win select { case c.zoneLocalMsgChan <- m: return nil default: } select { + case c.zoneLocalMsgChan <- m: + return nil case c.regionLocalMsgChan <- m: return nil default: } + + select { + case c.zoneLocalMsgChan <- m: + return nil + case c.regionLocalMsgChan <- m: + return nil + case c.memoryMsgChan <- m: + return nil + default: + } + + } else { + + select { + case c.memoryMsgChan <- m: + return nil + default: + } } - select { - case c.memoryMsgChan <- m: - return nil - default: - } + err := writeMessageToBackend(m, c.backend) c.nsqd.SetHealth(err) if err != nil {