From c67d5ee7ade409b589afe0d7822c52a09ee026cb Mon Sep 17 00:00:00 2001 From: Jehiah Czebotar Date: Fri, 27 Nov 2020 20:39:11 -0500 Subject: [PATCH] nsqd: add --topology-region --topology-zone --- apps/nsqd/options.go | 2 ++ nsqd/client_v2.go | 18 ++++++++++++++++-- nsqd/http.go | 4 ++++ nsqd/options.go | 2 ++ nsqd/protocol_v2.go | 4 ++++ nsqd/stats.go | 2 ++ 6 files changed, 30 insertions(+), 2 deletions(-) diff --git a/apps/nsqd/options.go b/apps/nsqd/options.go index ddc3c115f..fc43c5140 100644 --- a/apps/nsqd/options.go +++ b/apps/nsqd/options.go @@ -120,6 +120,8 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet { flagSet.Var(&lookupdTCPAddrs, "lookupd-tcp-address", "lookupd TCP address (may be given multiple times)") flagSet.Duration("http-client-connect-timeout", opts.HTTPClientConnectTimeout, "timeout for HTTP connect") flagSet.Duration("http-client-request-timeout", opts.HTTPClientRequestTimeout, "timeout for HTTP request") + flagSet.String("topology-region", opts.TopologyRegion, "A region represents a larger domain, made up of one or more zones") + flagSet.String("topology-zone", opts.TopologyZone, "A zone represents a logical failure domain") // diskqueue options flagSet.String("data-path", opts.DataPath, "path to store disk-backed messages") diff --git a/nsqd/client_v2.go b/nsqd/client_v2.go index f81b760e0..288ec0084 100644 --- a/nsqd/client_v2.go +++ b/nsqd/client_v2.go @@ -38,6 +38,8 @@ type identifyDataV2 struct { SampleRate int32 `json:"sample_rate"` UserAgent string `json:"user_agent"` MsgTimeout int `json:"msg_timeout"` + TopologyRegion string `json:"topology_region"` + TopologyZone string `json:"topology_zone"` } type identifyEvent struct { @@ -45,6 +47,8 @@ type identifyEvent struct { HeartbeatInterval time.Duration SampleRate int32 MsgTimeout time.Duration + TopologyRegion string + TopologyZone string } type clientV2 struct { @@ -88,8 +92,10 @@ type clientV2 struct { ReadyStateChan chan int ExitChan chan int - ClientID string - Hostname string + ClientID string + Hostname string + TopologyRegion string + TopologyZone string SampleRate int32 @@ -161,6 +167,8 @@ func (c *clientV2) Identify(data identifyDataV2) error { c.ClientID = data.ClientID c.Hostname = data.Hostname c.UserAgent = data.UserAgent + c.TopologyRegion = data.TopologyRegion + c.TopologyZone = data.TopologyZone c.metaLock.Unlock() err := c.SetHeartbeatInterval(data.HeartbeatInterval) @@ -188,6 +196,8 @@ func (c *clientV2) Identify(data identifyDataV2) error { HeartbeatInterval: c.HeartbeatInterval, SampleRate: c.SampleRate, MsgTimeout: c.MsgTimeout, + TopologyRegion: c.TopologyRegion, + TopologyZone: c.TopologyZone, } // update the client's message pump @@ -204,6 +214,8 @@ func (c *clientV2) Stats() ClientStats { clientID := c.ClientID hostname := c.Hostname userAgent := c.UserAgent + topologyZone := c.TopologyZone + topologyRegion := c.TopologyRegion var identity string var identityURL string if c.AuthState != nil { @@ -239,6 +251,8 @@ func (c *clientV2) Stats() ClientStats { AuthIdentity: identity, AuthIdentityURL: identityURL, PubCounts: pubCounts, + TopologyZone: topologyZone, + TopologyRegion: topologyRegion, } if stats.TLS { p := prettyConnectionState{c.tlsConn.ConnectionState()} diff --git a/nsqd/http.go b/nsqd/http.go index ffdec549d..bf550043f 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -132,6 +132,8 @@ func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprou HTTPPort int `json:"http_port"` TCPPort int `json:"tcp_port"` StartTime int64 `json:"start_time"` + TopologyZone string `json:"topology_zone"` + TopologyRegion string `json:"topology_region"` }{ Version: version.Binary, BroadcastAddress: s.nsqd.getOpts().BroadcastAddress, @@ -139,6 +141,8 @@ func (s *httpServer) doInfo(w http.ResponseWriter, req *http.Request, ps httprou TCPPort: s.nsqd.RealTCPAddr().Port, HTTPPort: s.nsqd.RealHTTPAddr().Port, StartTime: s.nsqd.GetStartTime().Unix(), + TopologyZone: s.nsqd.getOpts().TopologyZone, + TopologyRegion: s.nsqd.getOpts().TopologyRegion, }, nil } diff --git a/nsqd/options.go b/nsqd/options.go index 20cf3ed3d..8ea384039 100644 --- a/nsqd/options.go +++ b/nsqd/options.go @@ -29,6 +29,8 @@ type Options struct { AuthHTTPAddresses []string `flag:"auth-http-address" cfg:"auth_http_addresses"` HTTPClientConnectTimeout time.Duration `flag:"http-client-connect-timeout" cfg:"http_client_connect_timeout"` HTTPClientRequestTimeout time.Duration `flag:"http-client-request-timeout" cfg:"http_client_request_timeout"` + TopologyRegion string `flag:"topology-region"` + TopologyZone string `flag:"topology-zone"` // diskqueue options DataPath string `flag:"data-path"` diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index c95aeba6d..64a48826b 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -423,6 +423,8 @@ func (p *protocolV2) IDENTIFY(client *clientV2, params [][]byte) ([]byte, error) AuthRequired bool `json:"auth_required"` OutputBufferSize int `json:"output_buffer_size"` OutputBufferTimeout int64 `json:"output_buffer_timeout"` + TopologyRegion string `json:"topology_region"` + TopologyZone string `json:"topology_zone"` }{ MaxRdyCount: p.nsqd.getOpts().MaxRdyCount, Version: version.Binary, @@ -437,6 +439,8 @@ func (p *protocolV2) IDENTIFY(client *clientV2, params [][]byte) ([]byte, error) AuthRequired: p.nsqd.IsAuthEnabled(), OutputBufferSize: client.OutputBufferSize, OutputBufferTimeout: int64(client.OutputBufferTimeout / time.Millisecond), + TopologyRegion: p.nsqd.getOpts().TopologyRegion, + TopologyZone: p.nsqd.getOpts().TopologyZone, }) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_IDENTIFY_FAILED", "IDENTIFY failed "+err.Error()) diff --git a/nsqd/stats.go b/nsqd/stats.go index 351667814..96674ff27 100644 --- a/nsqd/stats.go +++ b/nsqd/stats.go @@ -99,6 +99,8 @@ type ClientStats struct { Authed bool `json:"authed,omitempty"` AuthIdentity string `json:"auth_identity,omitempty"` AuthIdentityURL string `json:"auth_identity_url,omitempty"` + TopologyZone string `json:"topology_zone"` + TopologyRegion string `json:"topology_region"` PubCounts []PubCount `json:"pub_counts,omitempty"`