From f390edc5cda49d71f660e433c12d8995601f0630 Mon Sep 17 00:00:00 2001 From: Jehiah Czebotar Date: Sun, 22 Nov 2020 23:34:35 -0500 Subject: [PATCH] review comments: simplify messagePump --- nsqd/channel.go | 14 +++----- nsqd/protocol_v2.go | 78 +++++++++++++++------------------------------ 2 files changed, 31 insertions(+), 61 deletions(-) diff --git a/nsqd/channel.go b/nsqd/channel.go index f1f09ad09..8f9b2b73e 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -41,11 +41,9 @@ type Channel struct { sync.RWMutex - topicName string - name string - topologyRegion string - topologyZone string - ctx *context + topicName string + name string + ctx *context backend BackendQueue @@ -81,18 +79,16 @@ func NewChannel(topicName string, channelName string, ctx *context, c := &Channel{ topicName: topicName, name: channelName, - topologyRegion: ctx.nsqd.getOpts().TopologyRegion, - topologyZone: ctx.nsqd.getOpts().TopologyZone, memoryMsgChan: nil, clients: make(map[int64]Consumer), deleteCallback: deleteCallback, ctx: ctx, } - if c.topologyRegion != "" { + if ctx.nsqd.getOpts().TopologyRegion != "" { c.regionLocalMsgChan = make(chan *Message, 0) } - if c.topologyZone != "" { + if ctx.nsqd.getOpts().TopologyZone != "" { c.zoneLocalMsgChan = make(chan *Message, 0) } diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 39c4e0992..bf10b1034 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -211,7 +211,7 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { // with >1 clients having >1 RDY counts var flusherChan <-chan time.Time var sampleRate int32 - var topologyRegion, topologyZone string + var regionLocal, zoneLocal bool subEventChan := client.SubEventChan identifyEventChan := client.IdentifyEventChan @@ -233,6 +233,8 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { close(startedChan) for { + var b []byte + var msg *Message if subChannel == nil || !client.IsReadyForMessages() { // the client is not ready to receive messages... memoryMsgChan = nil @@ -252,10 +254,10 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { // last iteration we flushed... // do not select on the flusher ticker channel memoryMsgChan = subChannel.memoryMsgChan - if subChannel.topologyZone == topologyZone && topologyZone != "" { + if zoneLocal { zoneMsgChan = subChannel.zoneLocalMsgChan } - if subChannel.topologyRegion == topologyRegion && topologyRegion != "" { + if regionLocal { regionMsgChan = subChannel.regionLocalMsgChan } backendMsgChan = subChannel.backend.ReadChan() @@ -264,10 +266,10 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { // we're buffered (if there isn't any more data we should flush)... // select on the flusher ticker channel, too memoryMsgChan = subChannel.memoryMsgChan - if subChannel.topologyZone == topologyZone && topologyZone != "" { + if zoneLocal { zoneMsgChan = subChannel.zoneLocalMsgChan } - if subChannel.topologyRegion == topologyRegion && topologyRegion != "" { + if regionLocal { regionMsgChan = subChannel.regionLocalMsgChan } backendMsgChan = subChannel.backend.ReadChan() @@ -311,64 +313,37 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { } msgTimeout = identifyData.MsgTimeout - topologyRegion = identifyData.TopologyRegion - topologyZone = identifyData.TopologyZone + if identifyData.TopologyZone == p.ctx.nsqd.getOpts().TopologyZone { + zoneLocal = true + } + if identifyData.TopologyRegion == p.ctx.nsqd.getOpts().TopologyRegion { + regionLocal = true + } case <-heartbeatChan: err = p.Send(client, frameTypeResponse, heartbeatBytes) if err != nil { goto exit } - case b := <-backendMsgChan: - if sampleRate > 0 && rand.Int31n(100) > sampleRate { - continue - } - - msg, err := decodeMessage(b) + case b = <-backendMsgChan: + // decodeMessage then handle 'msg' + case msg = <-zoneMsgChan: + case msg = <-regionMsgChan: + case msg = <-memoryMsgChan: + case <-client.ExitChan: + goto exit + } + if len(b) != 0 { + msg, err = decodeMessage(b) if err != nil { p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) continue } - msg.Attempts++ - - subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) - client.SendingMessage() - err = p.SendMessage(client, msg) - if err != nil { - goto exit - } - flushed = false - case msg := <-zoneMsgChan: - if sampleRate > 0 && rand.Int31n(100) > sampleRate { - continue - } - msg.Attempts++ - - subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) - client.SendingMessage() - err = p.SendMessage(client, msg) - if err != nil { - goto exit - } - flushed = false - case msg := <-regionMsgChan: - if sampleRate > 0 && rand.Int31n(100) > sampleRate { - continue - } - msg.Attempts++ - - subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) - client.SendingMessage() - err = p.SendMessage(client, msg) - if err != nil { - goto exit - } - flushed = false - case msg := <-memoryMsgChan: + } + if msg != nil { if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg.Attempts++ - subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() err = p.SendMessage(client, msg) @@ -376,9 +351,8 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { goto exit } flushed = false - case <-client.ExitChan: - goto exit } + } exit: