From 7b73297cad1d6044df48242107206d098a9b35a4 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 22 Feb 2021 11:42:00 -0700 Subject: [PATCH] [FIXED] Fail faster subscription requests for invalid clients When the server detects that a client is not responding to heartbeats, it closes it, however, this was done with a barrier that ensures that all pending protocols on all internal subscriptions are first processed. In the case of this close (as opposed to processing a client close request), this is not welcome because it may causes the server to process lots of subscription requests that were sent by that client - maybe in response to timeouts for a server already having issues handling the load. The server will now forcefully close the client (without the use of the barrier) and the detection of an invalid client is done sooner in the handling of the subscription request. Also, the internal subscription for client subscription requests has been reverted to be limited (with possibly dropping requests). Signed-off-by: Ivan Kozlovic --- server/server.go | 55 ++++++++++++++++----------- server/server_req_test.go | 16 ++++---- server/server_sub_test.go | 78 ++++++++++++++++++++++++++++++++++++++- server/server_test.go | 18 ++++++++- 4 files changed, 136 insertions(+), 31 deletions(-) diff --git a/server/server.go b/server/server.go index 8b32a6ec..99aadce3 100644 --- a/server/server.go +++ b/server/server.go @@ -2778,7 +2778,10 @@ func (s *StanServer) initInternalSubs(createPub bool) error { } } // Receive subscription requests from clients. - s.subSub, err = s.createSub(s.info.Subscribe, s.processSubscriptionRequest, "subscribe request") + // Don't make this subscription unlimited because we would rather drop + // subscriptions requests than adding to an already possibly overloaded + // server. + s.subSub, err = s.createSubWithUnlimited(s.info.Subscribe, s.processSubscriptionRequest, "subscribe request", false) if err != nil { return err } @@ -2861,11 +2864,17 @@ func (s *StanServer) unsubscribeInternalSubs() { } func (s *StanServer) createSub(subj string, f nats.MsgHandler, errTxt string) (*nats.Subscription, error) { + return s.createSubWithUnlimited(subj, f, errTxt, true) +} + +func (s *StanServer) createSubWithUnlimited(subj string, f nats.MsgHandler, errTxt string, setUnlimited bool) (*nats.Subscription, error) { sub, err := s.nc.Subscribe(subj, f) if err != nil { return nil, fmt.Errorf("could not subscribe to %s subject: %v", errTxt, err) } - sub.SetPendingLimits(-1, -1) + if setUnlimited { + sub.SetPendingLimits(-1, -1) + } return sub, nil } @@ -3223,17 +3232,15 @@ func (s *StanServer) checkClientHealth(clientID string) { // close the client (connection). This locks the // client object internally so unlock here. client.Unlock() - s.barrier(func() { - // If clustered, thread operations through Raft. - if s.isClustered { - if err := s.replicateConnClose(&pb.CloseRequest{ClientID: clientID}); err != nil { - s.log.Errorf("[Client:%s] Failed to replicate disconnect on heartbeat expiration: %v", - clientID, err) - } - } else { - s.closeClient(clientID) + // If clustered, thread operations through Raft. + if s.isClustered { + if err := s.replicateConnClose(&pb.CloseRequest{ClientID: clientID}, false); err != nil { + s.log.Errorf("[Client:%s] Failed to replicate disconnect on heartbeat expiration: %v", + clientID, err) } - }) + } else { + s.closeClient(clientID) + } return } } else { @@ -3305,7 +3312,7 @@ func (s *StanServer) processCloseRequest(m *nats.Msg) { var err error // If clustered, thread operations through Raft. if s.isClustered { - err = s.replicateConnClose(req) + err = s.replicateConnClose(req, true) } else { err = s.closeClient(req.ClientID) } @@ -3316,12 +3323,14 @@ func (s *StanServer) processCloseRequest(m *nats.Msg) { }) } -func (s *StanServer) replicateConnClose(req *pb.CloseRequest) error { - // Go through the list of subscriptions and possibly - // flush the pending replication of sent/ack. - subs := s.clients.getSubs(req.ClientID) - for _, sub := range subs { - s.endSubSentAndAckReplication(sub, false) +func (s *StanServer) replicateConnClose(req *pb.CloseRequest, flushSubAcks bool) error { + if flushSubAcks { + // Go through the list of subscriptions and possibly + // flush the pending replication of sent/ack. + subs := s.clients.getSubs(req.ClientID) + for _, sub := range subs { + s.endSubSentAndAckReplication(sub, false) + } } op := &spb.RaftOperation{ @@ -4873,7 +4882,7 @@ func (s *StanServer) replicateSub(c *channel, sr *pb.SubscriptionRequest, ackInb func (s *StanServer) addSubscription(ss *subStore, sub *subState) error { // Store in client if !s.clients.addSub(sub.ClientID, sub) { - return fmt.Errorf("can't find clientID: %v", sub.ClientID) + return ErrUnknownClient } // Store this subscription in subStore if err := ss.Store(sub); err != nil { @@ -4887,7 +4896,7 @@ func (s *StanServer) addSubscription(ss *subStore, sub *subState) error { func (s *StanServer) updateDurable(ss *subStore, sub *subState, clientID string) error { // Store in the client if !s.clients.addSub(clientID, sub) { - return fmt.Errorf("can't find clientID: %v", clientID) + return ErrUnknownClient } // Update this subscription in the store sub.Lock() @@ -5151,6 +5160,10 @@ func (s *StanServer) processSubscriptionRequest(m *nats.Msg) { s.sendSubscriptionResponseErr(m.Reply, ErrInvalidSubReq) return } + } else if !s.clients.isValid(sr.ClientID, nil) { + // If client is not known, fail the request. + s.sendSubscriptionResponseErr(m.Reply, ErrUnknownClient) + return } var ( diff --git a/server/server_req_test.go b/server/server_req_test.go index dda6d51b..1a9cef74 100644 --- a/server/server_req_test.go +++ b/server/server_req_test.go @@ -1,4 +1,4 @@ -// Copyright 2016-2019 The NATS Authors +// Copyright 2016-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -131,7 +131,7 @@ func TestInvalidSubRequest(t *testing.T) { defer nc.Close() // This test is very dependent on the validity tests performed - // in StanServer.processSubscriptionRequest(). Any cahnge there + // in StanServer.processSubscriptionRequest(). Any change there // may require changes here. // Create empty request @@ -201,20 +201,22 @@ func TestInvalidSubRequest(t *testing.T) { } // Test Queue Group DurableName + sc := NewDefaultConnection(t) + defer sc.Close() req.Subject = "foo" req.QGroup = "queue" req.DurableName = "dur:name" if err := sendInvalidSubRequest(s, nc, req, ErrInvalidDurName); err != nil { t.Fatalf("%v", err) } + sc.Close() // Reset those req.QGroup = "" req.DurableName = "" - // Now we should have an error that says that we can't find client ID - // (that is, client was not registered). - if err := sendInvalidSubRequest(s, nc, req, fmt.Errorf("can't find clientID: %v", clientName)); err != nil { + // Now we should have an error that says that we have an unknown client ID. + if err := sendInvalidSubRequest(s, nc, req, ErrUnknownClient); err != nil { t.Fatalf("%v", err) } @@ -227,7 +229,7 @@ func TestInvalidSubRequest(t *testing.T) { } // Create a durable - sc := NewDefaultConnection(t) + sc = NewDefaultConnection(t) defer sc.Close() dur, err := sc.Subscribe("foo", func(_ *stan.Msg) {}, stan.DurableName("dur")) if err != nil { @@ -245,7 +247,7 @@ func TestInvalidSubRequest(t *testing.T) { req.ClientID = clientName req.Subject = "foo" req.DurableName = "dur" - if err := sendInvalidSubRequest(s, nc, req, fmt.Errorf("can't find clientID: %v", clientName)); err != nil { + if err := sendInvalidSubRequest(s, nc, req, ErrUnknownClient); err != nil { t.Fatalf("%v", err) } } diff --git a/server/server_sub_test.go b/server/server_sub_test.go index a9f0bc97..50d77ce6 100644 --- a/server/server_sub_test.go +++ b/server/server_sub_test.go @@ -1,4 +1,4 @@ -// Copyright 2016-2019 The NATS Authors +// Copyright 2016-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -24,6 +24,7 @@ import ( "github.com/nats-io/nats-streaming-server/spb" "github.com/nats-io/nats-streaming-server/stores" "github.com/nats-io/nats.go" + "github.com/nats-io/nuid" "github.com/nats-io/stan.go" "github.com/nats-io/stan.go/pb" ) @@ -1341,3 +1342,78 @@ func TestSubCloseByInbox(t *testing.T) { t.Fatalf("Should not be any subscription, got %v", subs) } } + +func TestSubRequestsFailedIfClientClosed(t *testing.T) { + sOpts := GetDefaultOptions() + sOpts.ID = clusterName + sOpts.ClientHBInterval = 15 * time.Millisecond + sOpts.ClientHBTimeout = 15 * time.Millisecond + sOpts.ClientHBFailCount = 1 + sOpts.StoreLimits.SubStoreLimits.MaxSubscriptions = 0 + nOpts := DefaultNatsServerOptions + s := runServerWithOpts(t, sOpts, &nOpts) + defer s.Shutdown() + + // Use a bare NATS connection to send incorrect requests + nc, err := nats.Connect(nats.DefaultURL) + if err != nil { + t.Fatalf("Unexpected error on connect: %v", err) + } + defer nc.Close() + + sub, err := nc.SubscribeSync("subreply") + if err != nil { + t.Fatalf("Error on subscribe: %v", err) + } + sub.SetPendingLimits(-1, -1) + + req := &pb.ConnectRequest{ClientID: clientName, HeartbeatInbox: "hbInbox"} + b, _ := req.Marshal() + resp, err := nc.Request(s.info.Discovery, b, time.Second) + if err != nil { + t.Fatalf("Unexpected error on publishing request: %v", err) + } + r := &pb.ConnectResponse{} + err = r.Unmarshal(resp.Data) + if err != nil { + t.Fatalf("Unexpected response object: %v", err) + } + if r.Error != "" { + t.Fatalf("Unexpected error: %v", r.Error) + } + + s.channels.Lock() + + for i := 0; i < 1000; i++ { + req := &pb.SubscriptionRequest{ + ClientID: clientName, + Subject: "foo", + Inbox: nuid.Next(), + MaxInFlight: 1, + AckWaitInSecs: 30, + } + b, _ := req.Marshal() + if err := nc.PublishRequest(s.info.Subscribe, sub.Subject, b); err != nil { + t.Fatalf("Error on request: %v", err) + } + } + + s.channels.Unlock() + + for { + msg, err := sub.NextMsg(250 * time.Millisecond) + if err != nil { + break + } + + rply := &pb.SubscriptionResponse{} + rply.Unmarshal(msg.Data) + if rply.Error == "" { + continue + } + if rply.Error != ErrUnknownClient.Error() { + t.Fatalf("Expected error %q, got %q", ErrUnknownClient, rply.Error) + } + break + } +} diff --git a/server/server_test.go b/server/server_test.go index 84ffaa62..43f76d89 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -1,4 +1,4 @@ -// Copyright 2016-2019 The NATS Authors +// Copyright 2016-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -1630,6 +1630,9 @@ func TestFileSliceMaxBytesCmdLine(t *testing.T) { } func TestInternalSubsLimits(t *testing.T) { + setPartitionsVarsForTest() + defer resetDefaultPartitionsVars() + cleanupDatastore(t) defer cleanupDatastore(t) cleanupRaftLog(t) @@ -1659,13 +1662,20 @@ func TestInternalSubsLimits(t *testing.T) { s := runServerWithOpts(t, o, nil) defer s.Shutdown() + switch test.name { + case "clustered": + getLeader(t, time.Second, s) + case "ft": + getFTActiveServer(t, s) + default: + } + s.mu.Lock() defer s.mu.Unlock() subs := []*nats.Subscription{ s.connectSub, s.pubSub, - s.subSub, s.subUnsubSub, s.subCloseSub, s.closeSub, @@ -1683,6 +1693,10 @@ func TestInternalSubsLimits(t *testing.T) { sub.Subject, err, count, sz) } } + // The subscription on "client subscription requests" should not be unlimited. + if count, sz, err := s.subSub.PendingLimits(); err != nil || count == -1 || sz == -1 { + t.Fatalf("The subSub subscription should not be unlimited: err=%v count=%v sz=%v", err, count, sz) + } }) } }