From 347a1417ad9f2444ba7e11a88b3ac2e6feb709e4 Mon Sep 17 00:00:00 2001 From: Ivan Perez Date: Sat, 17 Nov 2018 22:57:50 -0200 Subject: [PATCH 1/5] add config opts for max pings out and ping interval to nats client --- server/conf.go | 3 +++ server/server.go | 14 ++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/server/conf.go b/server/conf.go index 0861a393..265b2db3 100644 --- a/server/conf.go +++ b/server/conf.go @@ -16,6 +16,7 @@ package server import ( "flag" "fmt" + "github.com/nats-io/go-nats" "reflect" "strconv" "strings" @@ -600,6 +601,8 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp, fs.BoolVar(&sopts.SQLStoreOpts.NoCaching, "sql_no_caching", defSQLOpts.NoCaching, "Enable/Disable caching") fs.IntVar(&sopts.SQLStoreOpts.MaxOpenConns, "sql_max_open_conns", defSQLOpts.MaxOpenConns, "Max opened connections to the database") fs.StringVar(&sopts.SyslogName, "syslog_name", "", "Syslog Name") + fs.IntVar(&sopts.NatsClientMaxPingsOut, "nats_client_max_pings_out", defaultNatsClientMaxPingsOut, "Nats client max pings out before try to reconnect") + fs.UintVar(&sopts.NatsClientPingInterval, "nats_client_ping_interval", defaultNatsClientPingInterval, "Nats client ping interval in seconds") // First, we need to call NATS's ConfigureOptions() with above flag set. // It will be augmented with NATS specific flags and call fs.Parse(args) for us. diff --git a/server/server.go b/server/server.go index 32dd6ac6..9bd0fbe5 100644 --- a/server/server.go +++ b/server/server.go @@ -123,6 +123,9 @@ const ( // Interval at which server goes through list of subscriptions with // pending sent/ack operations that needs to be replicated. defaultLazyReplicationInterval = time.Second + + defaultNatsClientMaxPingsOut = 0 + defaultNatsClientPingInterval = 0 ) // Constant to indicate that sendMsgToSub() should check number of acks pending @@ -1119,6 +1122,8 @@ type Options struct { Partitioning bool // Specify if server only accepts messages/subscriptions on channels defined in StoreLimits. SyslogName string // Optional name for the syslog (usueful on Windows when running several servers as a service) Clustering ClusteringOptions + NatsClientPingInterval uint + NatsClientMaxPingsOut int } // Clone returns a deep copy of the Options object. @@ -1281,6 +1286,15 @@ func (s *StanServer) createNatsClientConn(name string) (*nats.Conn, error) { return nil, err } } + + if s.opts.NatsClientPingInterval > 0 { + ncOpts.PingInterval = time.Duration(s.opts.NatsClientPingInterval) * time.Second + } + + if s.opts.NatsClientMaxPingsOut > 0 { + ncOpts.MaxPingsOut = s.opts.NatsClientMaxPingsOut + } + // Shorten the time we wait to try to reconnect. // Don't make it too often because it may exhaust the number of FDs. ncOpts.ReconnectWait = 250 * time.Millisecond From b3bec14c28bb1de52e508654de97fceff2ac3a3d Mon Sep 17 00:00:00 2001 From: Ivan Perez Date: Sat, 17 Nov 2018 23:08:59 -0200 Subject: [PATCH 2/5] remove unused import --- server/conf.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server/conf.go b/server/conf.go index 265b2db3..d9e11267 100644 --- a/server/conf.go +++ b/server/conf.go @@ -16,7 +16,6 @@ package server import ( "flag" "fmt" - "github.com/nats-io/go-nats" "reflect" "strconv" "strings" From ef019a4c4462df7715cee025d31bc8e2cee95291 Mon Sep 17 00:00:00 2001 From: Ivan Perez Date: Sat, 17 Nov 2018 23:11:05 -0200 Subject: [PATCH 3/5] go fmt --- server/server.go | 56 ++++++++++++++++++++++++------------------------ 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/server/server.go b/server/server.go index 9bd0fbe5..62a0e52b 100644 --- a/server/server.go +++ b/server/server.go @@ -124,7 +124,7 @@ const ( // pending sent/ack operations that needs to be replicated. defaultLazyReplicationInterval = time.Second - defaultNatsClientMaxPingsOut = 0 + defaultNatsClientMaxPingsOut = 0 defaultNatsClientPingInterval = 0 ) @@ -1096,34 +1096,34 @@ func (ss *subStore) LookupByAckInbox(ackInbox string) *subState { // Options for NATS Streaming Server type Options struct { - ID string - DiscoverPrefix string - StoreType string - FilestoreDir string - FileStoreOpts stores.FileStoreOptions - SQLStoreOpts stores.SQLStoreOptions - stores.StoreLimits // Store limits (MaxChannels, etc..) - EnableLogging bool // Enables logging - CustomLogger logger.Logger // Server will start with the provided logger - Trace bool // Verbose trace - Debug bool // Debug trace - HandleSignals bool // Should the server setup a signal handler (for Ctrl+C, etc...) - Secure bool // Create a TLS enabled connection w/o server verification - ClientCert string // Client Certificate for TLS - ClientKey string // Client Key for TLS - ClientCA string // Client CAs for TLS - IOBatchSize int // Maximum number of messages collected from clients before starting their processing. - IOSleepTime int64 // Duration (in micro-seconds) the server waits for more message to fill up a batch. - NATSServerURL string // URL for external NATS Server to connect to. If empty, NATS Server is embedded. - ClientHBInterval time.Duration // Interval at which server sends heartbeat to a client. - ClientHBTimeout time.Duration // How long server waits for a heartbeat response. - ClientHBFailCount int // Number of failed heartbeats before server closes client connection. - FTGroupName string // Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore. - Partitioning bool // Specify if server only accepts messages/subscriptions on channels defined in StoreLimits. - SyslogName string // Optional name for the syslog (usueful on Windows when running several servers as a service) - Clustering ClusteringOptions + ID string + DiscoverPrefix string + StoreType string + FilestoreDir string + FileStoreOpts stores.FileStoreOptions + SQLStoreOpts stores.SQLStoreOptions + stores.StoreLimits // Store limits (MaxChannels, etc..) + EnableLogging bool // Enables logging + CustomLogger logger.Logger // Server will start with the provided logger + Trace bool // Verbose trace + Debug bool // Debug trace + HandleSignals bool // Should the server setup a signal handler (for Ctrl+C, etc...) + Secure bool // Create a TLS enabled connection w/o server verification + ClientCert string // Client Certificate for TLS + ClientKey string // Client Key for TLS + ClientCA string // Client CAs for TLS + IOBatchSize int // Maximum number of messages collected from clients before starting their processing. + IOSleepTime int64 // Duration (in micro-seconds) the server waits for more message to fill up a batch. + NATSServerURL string // URL for external NATS Server to connect to. If empty, NATS Server is embedded. + ClientHBInterval time.Duration // Interval at which server sends heartbeat to a client. + ClientHBTimeout time.Duration // How long server waits for a heartbeat response. + ClientHBFailCount int // Number of failed heartbeats before server closes client connection. + FTGroupName string // Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore. + Partitioning bool // Specify if server only accepts messages/subscriptions on channels defined in StoreLimits. + SyslogName string // Optional name for the syslog (usueful on Windows when running several servers as a service) + Clustering ClusteringOptions NatsClientPingInterval uint - NatsClientMaxPingsOut int + NatsClientMaxPingsOut int } // Clone returns a deep copy of the Options object. From 69e31dbe77fb5fc04db165811e793ea50265b740 Mon Sep 17 00:00:00 2001 From: Ivan Perez Date: Sat, 17 Nov 2018 23:13:18 -0200 Subject: [PATCH 4/5] make nats client as NC a short name --- server/conf.go | 4 +-- server/server.go | 64 ++++++++++++++++++++++++------------------------ 2 files changed, 34 insertions(+), 34 deletions(-) diff --git a/server/conf.go b/server/conf.go index d9e11267..2b2d5e03 100644 --- a/server/conf.go +++ b/server/conf.go @@ -600,8 +600,8 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp, fs.BoolVar(&sopts.SQLStoreOpts.NoCaching, "sql_no_caching", defSQLOpts.NoCaching, "Enable/Disable caching") fs.IntVar(&sopts.SQLStoreOpts.MaxOpenConns, "sql_max_open_conns", defSQLOpts.MaxOpenConns, "Max opened connections to the database") fs.StringVar(&sopts.SyslogName, "syslog_name", "", "Syslog Name") - fs.IntVar(&sopts.NatsClientMaxPingsOut, "nats_client_max_pings_out", defaultNatsClientMaxPingsOut, "Nats client max pings out before try to reconnect") - fs.UintVar(&sopts.NatsClientPingInterval, "nats_client_ping_interval", defaultNatsClientPingInterval, "Nats client ping interval in seconds") + fs.IntVar(&sopts.NCMaxPingsOut, "nats_client_max_pings_out", defaultNatsClientMaxPingsOut, "Nats client max pings out before try to reconnect") + fs.UintVar(&sopts.NCPingInterval, "nats_client_ping_interval", defaultNatsClientPingInterval, "Nats client ping interval in seconds") // First, we need to call NATS's ConfigureOptions() with above flag set. // It will be augmented with NATS specific flags and call fs.Parse(args) for us. diff --git a/server/server.go b/server/server.go index 62a0e52b..7aa68097 100644 --- a/server/server.go +++ b/server/server.go @@ -1096,34 +1096,34 @@ func (ss *subStore) LookupByAckInbox(ackInbox string) *subState { // Options for NATS Streaming Server type Options struct { - ID string - DiscoverPrefix string - StoreType string - FilestoreDir string - FileStoreOpts stores.FileStoreOptions - SQLStoreOpts stores.SQLStoreOptions - stores.StoreLimits // Store limits (MaxChannels, etc..) - EnableLogging bool // Enables logging - CustomLogger logger.Logger // Server will start with the provided logger - Trace bool // Verbose trace - Debug bool // Debug trace - HandleSignals bool // Should the server setup a signal handler (for Ctrl+C, etc...) - Secure bool // Create a TLS enabled connection w/o server verification - ClientCert string // Client Certificate for TLS - ClientKey string // Client Key for TLS - ClientCA string // Client CAs for TLS - IOBatchSize int // Maximum number of messages collected from clients before starting their processing. - IOSleepTime int64 // Duration (in micro-seconds) the server waits for more message to fill up a batch. - NATSServerURL string // URL for external NATS Server to connect to. If empty, NATS Server is embedded. - ClientHBInterval time.Duration // Interval at which server sends heartbeat to a client. - ClientHBTimeout time.Duration // How long server waits for a heartbeat response. - ClientHBFailCount int // Number of failed heartbeats before server closes client connection. - FTGroupName string // Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore. - Partitioning bool // Specify if server only accepts messages/subscriptions on channels defined in StoreLimits. - SyslogName string // Optional name for the syslog (usueful on Windows when running several servers as a service) - Clustering ClusteringOptions - NatsClientPingInterval uint - NatsClientMaxPingsOut int + ID string + DiscoverPrefix string + StoreType string + FilestoreDir string + FileStoreOpts stores.FileStoreOptions + SQLStoreOpts stores.SQLStoreOptions + stores.StoreLimits // Store limits (MaxChannels, etc..) + EnableLogging bool // Enables logging + CustomLogger logger.Logger // Server will start with the provided logger + Trace bool // Verbose trace + Debug bool // Debug trace + HandleSignals bool // Should the server setup a signal handler (for Ctrl+C, etc...) + Secure bool // Create a TLS enabled connection w/o server verification + ClientCert string // Client Certificate for TLS + ClientKey string // Client Key for TLS + ClientCA string // Client CAs for TLS + IOBatchSize int // Maximum number of messages collected from clients before starting their processing. + IOSleepTime int64 // Duration (in micro-seconds) the server waits for more message to fill up a batch. + NATSServerURL string // URL for external NATS Server to connect to. If empty, NATS Server is embedded. + ClientHBInterval time.Duration // Interval at which server sends heartbeat to a client. + ClientHBTimeout time.Duration // How long server waits for a heartbeat response. + ClientHBFailCount int // Number of failed heartbeats before server closes client connection. + FTGroupName string // Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore. + Partitioning bool // Specify if server only accepts messages/subscriptions on channels defined in StoreLimits. + SyslogName string // Optional name for the syslog (usueful on Windows when running several servers as a service) + Clustering ClusteringOptions + NCPingInterval uint + NCMaxPingsOut int } // Clone returns a deep copy of the Options object. @@ -1287,12 +1287,12 @@ func (s *StanServer) createNatsClientConn(name string) (*nats.Conn, error) { } } - if s.opts.NatsClientPingInterval > 0 { - ncOpts.PingInterval = time.Duration(s.opts.NatsClientPingInterval) * time.Second + if s.opts.NCPingInterval > 0 { + ncOpts.PingInterval = time.Duration(s.opts.NCPingInterval) * time.Second } - if s.opts.NatsClientMaxPingsOut > 0 { - ncOpts.MaxPingsOut = s.opts.NatsClientMaxPingsOut + if s.opts.NCMaxPingsOut > 0 { + ncOpts.MaxPingsOut = s.opts.NCMaxPingsOut } // Shorten the time we wait to try to reconnect. From ee4e7c72ba8a9a52265b43b3f55de496a8879c4e Mon Sep 17 00:00:00 2001 From: Ivan Perez Date: Mon, 19 Nov 2018 13:58:18 -0200 Subject: [PATCH 5/5] config file parsing, readme.md showing new configs --- README.md | 5 +++++ nats-streaming-server.go | 2 ++ server/conf.go | 14 ++++++++++++-- server/server.go | 4 ++-- 4 files changed, 21 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 7547b9c7..3c35b670 100644 --- a/README.md +++ b/README.md @@ -1336,6 +1336,8 @@ Streaming Server Options: -hbf, --hb_fail_count Number of failed heartbeats before server closes the client connection --ft_group Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore. -sl, --signal [=] Send signal to nats-streaming-server process (stop, quit, reopen) + -npi, --nc_ping_interval NATS Client ping interval in seconds. If not informed, NATS Streaming considers the NATS Client default value. + -npo, --nc_max_pings_out NATS Client max pings out. If not informed, NATS Streaming considers the NATS Client default value. Streaming Server Clustering Options: --clustered Run the server in a clustered configuration (default: false) @@ -1491,6 +1493,9 @@ In general the configuration parameters are the same as the command line argumen | hb_interval | Interval at which the server sends an heartbeat to a client | Duration | `hb_interval: "10s"` | | hb_timeout | How long the server waits for a heartbeat response from the client before considering it a failed heartbeat | Duration | `hb_timeout: "10s"` | | hb_fail_count | Count of failed heartbeats before server closes the client connection. The actual total wait is: (fail count + 1) * (hb interval + hb timeout) | Number | `hb_fail_count: 2` | +| nc_ping_interval | NATS Client ping interval in seconds. If not informed, NATS Streaming considers the NATS Client default value. | Number > 0 | `nc_ping_interval: 2` | +| nc_max_pings_out | NATS Client max pings out. If not informed, NATS Streaming considers the NATS Client default value. | Number > 0 | `nc_max_pings_out: 4` | + | ft_group | In Fault Tolerance mode, you can start a group of streaming servers with only one server being active while others are running in standby mode. This is the name of this FT group | String | `ft_group: "my_ft_group"` | | partitioning | If set to true, a list of channels must be defined in store_limits/channels section. This section then serves two purposes, overriding limits for a given channel or adding it to the partition | `true` or `false` | `partitioning: true` | | cluster | Cluster Configuration | Map: `cluster: { ... }` | **See details below** | diff --git a/nats-streaming-server.go b/nats-streaming-server.go index bee86e80..fee50c48 100644 --- a/nats-streaming-server.go +++ b/nats-streaming-server.go @@ -48,6 +48,8 @@ Streaming Server Options: -hbf, --hb_fail_count Number of failed heartbeats before server closes the client connection --ft_group Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore. -sl, --signal [=] Send signal to nats-streaming-server process (stop, quit, reopen) + -npi, --nc_ping_interval NATS Client ping interval in seconds. If not informed, NATS Streaming considers the NATS Client default value. + -npo, --nc_max_pings_out NATS Client max pings out. If not informed, NATS Streaming considers the NATS Client default value. Streaming Server Clustering Options: --clustered Run the server in a clustered configuration (default: false) diff --git a/server/conf.go b/server/conf.go index 2b2d5e03..28e4019b 100644 --- a/server/conf.go +++ b/server/conf.go @@ -154,6 +154,16 @@ func ProcessConfigFile(configFile string, opts *Options) error { return err } opts.SyslogName = v.(string) + case "npo", "nc_max_pings_out", "nats_client_max_pings_out": + if err := checkType(k, reflect.Int, v); err != nil { + return err + } + opts.NCMaxPingsOut = v.(int) + case "npi", "nc_ping_interval", "nats_client_ping_interval": + if err := checkType(k, reflect.Uint, v); err != nil { + return err + } + opts.NCPingInterval = v.(uint) } } return nil @@ -600,8 +610,8 @@ func ConfigureOptions(fs *flag.FlagSet, args []string, printVersion, printHelp, fs.BoolVar(&sopts.SQLStoreOpts.NoCaching, "sql_no_caching", defSQLOpts.NoCaching, "Enable/Disable caching") fs.IntVar(&sopts.SQLStoreOpts.MaxOpenConns, "sql_max_open_conns", defSQLOpts.MaxOpenConns, "Max opened connections to the database") fs.StringVar(&sopts.SyslogName, "syslog_name", "", "Syslog Name") - fs.IntVar(&sopts.NCMaxPingsOut, "nats_client_max_pings_out", defaultNatsClientMaxPingsOut, "Nats client max pings out before try to reconnect") - fs.UintVar(&sopts.NCPingInterval, "nats_client_ping_interval", defaultNatsClientPingInterval, "Nats client ping interval in seconds") + fs.IntVar(&sopts.NCMaxPingsOut, "nc_max_pings_out", defaultNatsClientMaxPingsOut, "NATS Client max pings out before try to reconnect") + fs.UintVar(&sopts.NCPingInterval, "nc_ping_interval", defaultNatsClientPingInterval, "NATS Client ping interval in seconds") // First, we need to call NATS's ConfigureOptions() with above flag set. // It will be augmented with NATS specific flags and call fs.Parse(args) for us. diff --git a/server/server.go b/server/server.go index 7aa68097..b386c9ac 100644 --- a/server/server.go +++ b/server/server.go @@ -1122,8 +1122,8 @@ type Options struct { Partitioning bool // Specify if server only accepts messages/subscriptions on channels defined in StoreLimits. SyslogName string // Optional name for the syslog (usueful on Windows when running several servers as a service) Clustering ClusteringOptions - NCPingInterval uint - NCMaxPingsOut int + NCPingInterval uint //Optional config that change value Ping interval of nats client. This config is defined in seconds. Default value is 2 minutes. + NCMaxPingsOut int //Optional config that change value Max pings out of nats client. Default value is 2. } // Clone returns a deep copy of the Options object.