From 128463cac05eb4cde325aa9ec5de6bf6779cf453 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 16 Nov 2021 17:39:12 -0700 Subject: [PATCH 1/2] [FIXED] Clustering: possible wrong pending_count and is_stalled on followers If a queue member receives a message and does not ack it within the AckWait interval, this message may be redelivered to a different member. If the original member then acknowledge the message, the server finds it in the other member and remove it from the pending list. This would cause the second member to show a wrong pending_count and possibly is_stalled values in monitoring of the followers. Signed-off-by: Ivan Kozlovic --- server/clustering_test.go | 82 +++++++++++++++++++++++++++++++++++++++ server/server.go | 5 ++- 2 files changed, 86 insertions(+), 1 deletion(-) diff --git a/server/clustering_test.go b/server/clustering_test.go index 0d2711f4..29984ba8 100644 --- a/server/clustering_test.go +++ b/server/clustering_test.go @@ -8596,6 +8596,88 @@ func TestClusteringQueueRedelivery(t *testing.T) { waitForAcks(t, s2, clientName, 2, 0) } +func TestClusteringQueueRedeliveryPendingAndStalled(t *testing.T) { + cleanupDatastore(t) + defer cleanupDatastore(t) + cleanupRaftLog(t) + defer cleanupRaftLog(t) + + // For this test, use a central NATS server. + ns := natsdTest.RunDefaultServer() + defer ns.Shutdown() + + // Configure first server + s1sOpts := getTestDefaultOptsForClustering("a", true) + s1 := runServerWithOpts(t, s1sOpts, nil) + defer s1.Shutdown() + + // Configure second server. + s2sOpts := getTestDefaultOptsForClustering("b", false) + s2 := runServerWithOpts(t, s2sOpts, nil) + defer s2.Shutdown() + + servers := []*StanServer{s1, s2} + getLeader(t, 10*time.Second, servers...) + + sc := NewDefaultConnection(t) + defer sc.Close() + + ch := make(chan bool, 1) + if _, err := sc.QueueSubscribe("foo", "bar", func(m *stan.Msg) { + if m.Redelivered { + m.Ack() + return + } + // Wait for more than AckWait, then ack + time.Sleep(150 * time.Millisecond) + m.Ack() + ch <- true + }, stan.SetManualAckMode(), stan.AckWait(ackWaitInMs(100)), stan.MaxInflight(3)); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + // Create second queue member that does not ack. + if _, err := sc.QueueSubscribe("foo", "bar", func(m *stan.Msg) {}, + stan.SetManualAckMode(), stan.AckWait(ackWaitInMs(500)), stan.MaxInflight(3)); err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + + for count := 0; count < 5; { + if err := sc.Publish("foo", []byte("msg")); err != nil { + t.Fatalf("Error on publish: %v", err) + } + select { + case <-ch: + count++ + case <-time.After(time.Second): + // Try another message + } + } + + // Make sure that state is replicated + time.Sleep(testLazyReplicationInterval * 2) + + // Ensure that the pending map and stalled are 0 and false + // on all servers for all subs. + waitFor(t, 2*time.Second, 50*time.Millisecond, func() error { + for _, s := range servers { + subs := s.clients.getSubs(clientName) + for _, sub := range subs { + var err error + sub.RLock() + if len(sub.acksPending) != 0 || sub.stalled { + err = fmt.Errorf("Invalid values: node=%s - acksPending=%v - stalled=%v", + s.opts.Clustering.NodeID, sub.acksPending, sub.stalled) + } + sub.RUnlock() + if err != nil { + return err + } + } + } + return nil + }) +} + func TestClusteringQueueRedeliverySentAndAck(t *testing.T) { // Set this to something very large so we can manually cause the flush. lazyReplicationInterval = time.Hour diff --git a/server/server.go b/server/server.go index a8b7102b..793c8a15 100644 --- a/server/server.go +++ b/server/server.go @@ -47,7 +47,7 @@ import ( // Server defaults. const ( // VERSION is the current version for the NATS Streaming server. - VERSION = "0.23.1" + VERSION = "0.23.2-beta01" DefaultClusterID = "test-cluster" DefaultDiscoverPrefix = "_STAN.discover" @@ -5495,6 +5495,9 @@ func (s *StanServer) processAck(c *channel, sub *subState, sequence uint64, from qsub.Lock() _, found := qsub.acksPending[sequence] if found { + if s.isClustered { + s.collectSentOrAck(qsub, replicateAck, sequence) + } delete(qsub.acksPending, sequence) persistAck(qsub) } From 78d6066e0d43992e52063cdc21f084246b7d42f8 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 17 Nov 2021 13:00:29 -0700 Subject: [PATCH 2/2] Update stan.go to v0.10.2 to avoid tests failures A change in v0.10.1 caused failed Close() to keep the underlying NATS connection opened, which would cause some tests to fail because the NATS connection would reconnect to subsequent tests, etc.. Signed-off-by: Ivan Kozlovic --- go.mod | 2 +- go.sum | 4 +-- vendor/github.com/nats-io/stan.go/README.md | 2 +- .../github.com/nats-io/stan.go/go_tests.mod | 2 +- .../github.com/nats-io/stan.go/go_tests.sum | 16 ++++----- vendor/github.com/nats-io/stan.go/stan.go | 34 +++++++++++++++++-- vendor/modules.txt | 2 +- 7 files changed, 45 insertions(+), 17 deletions(-) diff --git a/go.mod b/go.mod index 1f2ce139..e95c1b1b 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/nats-io/nats-server/v2 v2.6.4 github.com/nats-io/nats.go v1.13.1-0.20211018182449-f2416a8b1483 github.com/nats-io/nuid v1.0.1 - github.com/nats-io/stan.go v0.10.1 + github.com/nats-io/stan.go v0.10.2 github.com/prometheus/procfs v0.7.3 go.etcd.io/bbolt v1.3.6 golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 diff --git a/go.sum b/go.sum index 67817aeb..03e23b37 100644 --- a/go.sum +++ b/go.sum @@ -69,8 +69,8 @@ github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/nats-io/stan.go v0.10.1 h1:a0PPS12JHAC2xA2OaQ3aB9/wi49MnvxBTstgMlgwoFU= -github.com/nats-io/stan.go v0.10.1/go.mod h1:vo2ax8K2IxaR3JtEMLZRFKIdoK/3o1/PKueapB7ezX0= +github.com/nats-io/stan.go v0.10.2 h1:gQLd05LhzmhFkHm3/qP/klYHfM/hys45GyHa1Uly/kI= +github.com/nats-io/stan.go v0.10.2/go.mod h1:vo2ax8K2IxaR3JtEMLZRFKIdoK/3o1/PKueapB7ezX0= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/vendor/github.com/nats-io/stan.go/README.md b/vendor/github.com/nats-io/stan.go/README.md index ab636220..1c275e01 100644 --- a/vendor/github.com/nats-io/stan.go/README.md +++ b/vendor/github.com/nats-io/stan.go/README.md @@ -30,7 +30,7 @@ When using or transitioning to Go modules support: ```bash # Go client latest or explicit version go get github.com/nats-io/stan.go/@latest -go get github.com/nats-io/stan.go/@v0.10.1 +go get github.com/nats-io/stan.go/@v0.10.2 ``` ## Important things to know about reconnections. diff --git a/vendor/github.com/nats-io/stan.go/go_tests.mod b/vendor/github.com/nats-io/stan.go/go_tests.mod index 5184e69e..7ccfcb68 100644 --- a/vendor/github.com/nats-io/stan.go/go_tests.mod +++ b/vendor/github.com/nats-io/stan.go/go_tests.mod @@ -5,7 +5,7 @@ go 1.14 require ( github.com/gogo/protobuf v1.3.2 github.com/nats-io/nats-server/v2 v2.6.4 - github.com/nats-io/nats-streaming-server v0.23.0 + github.com/nats-io/nats-streaming-server v0.23.1 github.com/nats-io/nats.go v1.13.1-0.20211018182449-f2416a8b1483 github.com/nats-io/nuid v1.0.1 ) diff --git a/vendor/github.com/nats-io/stan.go/go_tests.sum b/vendor/github.com/nats-io/stan.go/go_tests.sum index 15d8a79b..aedf9b6a 100644 --- a/vendor/github.com/nats-io/stan.go/go_tests.sum +++ b/vendor/github.com/nats-io/stan.go/go_tests.sum @@ -42,14 +42,14 @@ github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCS github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= -github.com/hashicorp/raft v1.3.1 h1:zDT8ke8y2aP4wf9zPTB2uSIeavJ3Hx/ceY4jxI2JxuY= -github.com/hashicorp/raft v1.3.1/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM= +github.com/hashicorp/raft v1.3.2 h1:j2tqHqFnDdWCepLxzuo3b6WzS2krIweBrvEoqBbWMTo= +github.com/hashicorp/raft v1.3.2/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.13.4 h1:0zhec2I8zGnjWcKyLl6i3gPqKANCCn5e9xmviEEeX6s= github.com/klauspost/compress v1.13.4/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= -github.com/lib/pq v1.10.3 h1:v9QZf2Sn6AmjXtQeFpdoq/eaNtYP6IN+7lcrygsIAtg= -github.com/lib/pq v1.10.3/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/lib/pq v1.10.4 h1:SO9z7FRPzA03QhHKJrH5BXA6HU1rS4V2nIVrrNC1iYk= +github.com/lib/pq v1.10.4/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mattn/go-colorable v0.1.4 h1:snbPLB8fVfU9iwbbo30TPtbLRzwWu6aJS6Xh4eaaviA= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= @@ -60,12 +60,10 @@ github.com/minio/highwayhash v1.0.1 h1:dZ6IIu8Z14VlC0VpfKofAhCy74wu/Qb5gcn52yWoz github.com/minio/highwayhash v1.0.1/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.1.0 h1:1UbfD5g1xTdWmSeRV8bh/7u+utTiBsRtWhLl1PixZp4= github.com/nats-io/jwt/v2 v2.1.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/nats-server/v2 v2.6.2/go.mod h1:CNi6dJQ5H+vWqaoWKjCGtqBt7ai/xOTLiocUqhK6ews= github.com/nats-io/nats-server/v2 v2.6.4 h1:WjR1ylV/5Urth88K8U78wEEnWFYEJ9DNM0Q5DTlTx0g= github.com/nats-io/nats-server/v2 v2.6.4/go.mod h1:LlMieumxNUnCloOTVFv7Wog0YnasScxARUMXVXv9/+M= -github.com/nats-io/nats-streaming-server v0.23.0 h1:80I3U3osEpLUtrvX2ao46aqq4AurMk+hBQC8wUJPJ0g= -github.com/nats-io/nats-streaming-server v0.23.0/go.mod h1:1asNNRpUKbgwoPqRLEWbJE65uqmWjG1YN/Xlo3WgkTY= -github.com/nats-io/nats.go v1.11.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats-streaming-server v0.23.1 h1:GFnCbMW/aO7ijtzHThcAFOwLqf7m4Q0/o5eB9dEYUz4= +github.com/nats-io/nats-streaming-server v0.23.1/go.mod h1:0z0UagX1ehTG/YiTTtHcgyPTUGJ7HIBseJLHGb66Z6U= github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nats.go v1.13.1-0.20211018182449-f2416a8b1483 h1:GMx3ZOcMEVM5qnUItQ4eJyQ6ycwmIEB/VC/UxvdevE0= github.com/nats-io/nats.go v1.13.1-0.20211018182449-f2416a8b1483/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= @@ -73,7 +71,7 @@ github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/nats-io/stan.go v0.10.0/go.mod h1:0jEuBXKauB1HHJswHM/lx05K48TJ1Yxj6VIfM4k+aB4= +github.com/nats-io/stan.go v0.10.1/go.mod h1:vo2ax8K2IxaR3JtEMLZRFKIdoK/3o1/PKueapB7ezX0= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/vendor/github.com/nats-io/stan.go/stan.go b/vendor/github.com/nats-io/stan.go/stan.go index f8c67c10..237e2abe 100644 --- a/vendor/github.com/nats-io/stan.go/stan.go +++ b/vendor/github.com/nats-io/stan.go/stan.go @@ -26,7 +26,7 @@ import ( ) // Version is the NATS Streaming Go Client version -const Version = "0.10.1" +const Version = "0.10.2" const ( // DefaultNatsURL is the default URL the client connects to @@ -179,6 +179,21 @@ type Options struct { // ConnectionLostCB specifies the handler to be invoked when the connection // is permanently lost. ConnectionLostCB ConnectionLostHandler + + // AllowCloseRetry specifies that a failed connection Close() can be retried. + // + // By default, after the first call to Close(), the underlying NATS connection + // is closed (when owned by the library), regardless if the library gets a + // response from the server or not, and calling Close() again is a no-op. + // With AllowCloseRetry set to true, if the library fails to get a response + // from the close protocol, calling Close() again is possible and the library + // will try to resend the protocol. It means that the underlying NATS connection + // won't be closed until the library successfully gets a response from the server. + // This behavior can have side effects in that the underlying NATS connection + // may stay open (or reconnect) when otherwise it would have been closed after + // calling Close(). So AllowCloseRetry is disabled by default to maintain + // expected default behavior in regard with the underlying NATS connection state. + AllowCloseRetry bool } // GetDefaultOptions returns default configuration options for the client. @@ -308,6 +323,15 @@ func SetConnectionLostHandler(handler ConnectionLostHandler) Option { } } +// AllowCloseRetry is an Option that allows a failed connection close to be retried. +// See option AllowCloseRetry for more information. +func AllowCloseRetry(allow bool) Option { + return func(o *Options) error { + o.AllowCloseRetry = allow + return nil + } +} + // A conn represents a bare connection to a stan cluster. type conn struct { sync.RWMutex @@ -682,6 +706,12 @@ func (sc *conn) Close() error { if !sc.closed { sc.closed = true sc.cleanupOnClose(ErrConnectionClosed) + if !sc.opts.AllowCloseRetry { + sc.fullyClosed = true + if sc.ncOwned { + defer sc.nc.Close() + } + } } req := &pb.CloseRequest{ClientID: sc.clientID} @@ -700,7 +730,7 @@ func (sc *conn) Close() error { } // As long as we got a valid response, we consider the connection fully closed. sc.fullyClosed = true - if sc.ncOwned { + if sc.ncOwned && sc.opts.AllowCloseRetry { sc.nc.Close() } if cr.Error != "" { diff --git a/vendor/modules.txt b/vendor/modules.txt index c1ef6baa..34ae407a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -57,7 +57,7 @@ github.com/nats-io/nkeys # github.com/nats-io/nuid v1.0.1 ## explicit github.com/nats-io/nuid -# github.com/nats-io/stan.go v0.10.1 +# github.com/nats-io/stan.go v0.10.2 ## explicit github.com/nats-io/stan.go github.com/nats-io/stan.go/pb