Skip to content

Commit

Permalink
review comments: simplify messagePump
Browse files Browse the repository at this point in the history
  • Loading branch information
jehiah committed Nov 23, 2020
1 parent f59bbd5 commit f390edc
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 61 deletions.
14 changes: 5 additions & 9 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ type Channel struct {

sync.RWMutex

topicName string
name string
topologyRegion string
topologyZone string
ctx *context
topicName string
name string
ctx *context

backend BackendQueue

Expand Down Expand Up @@ -81,18 +79,16 @@ func NewChannel(topicName string, channelName string, ctx *context,
c := &Channel{
topicName: topicName,
name: channelName,
topologyRegion: ctx.nsqd.getOpts().TopologyRegion,
topologyZone: ctx.nsqd.getOpts().TopologyZone,
memoryMsgChan: nil,
clients: make(map[int64]Consumer),
deleteCallback: deleteCallback,
ctx: ctx,
}

if c.topologyRegion != "" {
if ctx.nsqd.getOpts().TopologyRegion != "" {
c.regionLocalMsgChan = make(chan *Message, 0)
}
if c.topologyZone != "" {
if ctx.nsqd.getOpts().TopologyZone != "" {
c.zoneLocalMsgChan = make(chan *Message, 0)
}

Expand Down
78 changes: 26 additions & 52 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
// with >1 clients having >1 RDY counts
var flusherChan <-chan time.Time
var sampleRate int32
var topologyRegion, topologyZone string
var regionLocal, zoneLocal bool

subEventChan := client.SubEventChan
identifyEventChan := client.IdentifyEventChan
Expand All @@ -233,6 +233,8 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
close(startedChan)

for {
var b []byte
var msg *Message
if subChannel == nil || !client.IsReadyForMessages() {
// the client is not ready to receive messages...
memoryMsgChan = nil
Expand All @@ -252,10 +254,10 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
// last iteration we flushed...
// do not select on the flusher ticker channel
memoryMsgChan = subChannel.memoryMsgChan
if subChannel.topologyZone == topologyZone && topologyZone != "" {
if zoneLocal {
zoneMsgChan = subChannel.zoneLocalMsgChan
}
if subChannel.topologyRegion == topologyRegion && topologyRegion != "" {
if regionLocal {
regionMsgChan = subChannel.regionLocalMsgChan
}
backendMsgChan = subChannel.backend.ReadChan()
Expand All @@ -264,10 +266,10 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
// we're buffered (if there isn't any more data we should flush)...
// select on the flusher ticker channel, too
memoryMsgChan = subChannel.memoryMsgChan
if subChannel.topologyZone == topologyZone && topologyZone != "" {
if zoneLocal {
zoneMsgChan = subChannel.zoneLocalMsgChan
}
if subChannel.topologyRegion == topologyRegion && topologyRegion != "" {
if regionLocal {
regionMsgChan = subChannel.regionLocalMsgChan
}
backendMsgChan = subChannel.backend.ReadChan()
Expand Down Expand Up @@ -311,74 +313,46 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
}

msgTimeout = identifyData.MsgTimeout
topologyRegion = identifyData.TopologyRegion
topologyZone = identifyData.TopologyZone
if identifyData.TopologyZone == p.ctx.nsqd.getOpts().TopologyZone {
zoneLocal = true
}
if identifyData.TopologyRegion == p.ctx.nsqd.getOpts().TopologyRegion {
regionLocal = true
}
case <-heartbeatChan:
err = p.Send(client, frameTypeResponse, heartbeatBytes)
if err != nil {
goto exit
}
case b := <-backendMsgChan:
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}

msg, err := decodeMessage(b)
case b = <-backendMsgChan:
// decodeMessage then handle 'msg'
case msg = <-zoneMsgChan:
case msg = <-regionMsgChan:
case msg = <-memoryMsgChan:
case <-client.ExitChan:
goto exit
}
if len(b) != 0 {
msg, err = decodeMessage(b)
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}
msg.Attempts++

subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case msg := <-zoneMsgChan:
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg.Attempts++

subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case msg := <-regionMsgChan:
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg.Attempts++

subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case msg := <-memoryMsgChan:
}
if msg != nil {
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg.Attempts++

subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case <-client.ExitChan:
goto exit
}

}

exit:
Expand Down

0 comments on commit f390edc

Please sign in to comment.