From c0c32892b47f82bcfa44fc8c4c18bd7b2ef1fa95 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Mon, 14 Sep 2020 13:22:39 +0530 Subject: [PATCH 1/7] Prevent crash due to connection failures --- pkg/queue/beanstalk.go | 5 +++++ pkg/queue/sqs.go | 16 +++++++++++++--- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/pkg/queue/beanstalk.go b/pkg/queue/beanstalk.go index a1e74495..e3543741 100644 --- a/pkg/queue/beanstalk.go +++ b/pkg/queue/beanstalk.go @@ -92,6 +92,11 @@ func getBeanstalkConn(queueURI string) (*beanstalk.Conn, error) { if err != nil { return nil, errors.New("dial-error: " + err.Error()) } + + if conn == nil { + return nil, fmt.Errorf("Connection nil for: %s\n", queueURI) + } + return conn, nil } diff --git a/pkg/queue/sqs.go b/pkg/queue/sqs.go index 6b96472e..e14c81f0 100644 --- a/pkg/queue/sqs.go +++ b/pkg/queue/sqs.go @@ -87,8 +87,13 @@ func (s *SQS) getSQSClient(queueURI string) *sqs.SQS { return s.sqsClientPool[getRegion(queueURI)] } -func (s *SQS) getCWClient(queueURI string) *cloudwatch.CloudWatch { - return s.cwClientPool[getRegion(queueURI)] +func (s *SQS) getCWClient(queueURI string) (*cloudwatch.CloudWatch, error) { + client, ok := s.cwClientPool[getRegion(queueURI)] + if !ok { + return nil, fmt.Errorf("Client not found for queue: %s\n", queueURI) + } + + return client, nil } func (s *SQS) longPollReceiveMessage(queueURI string) (int32, error) { @@ -187,7 +192,12 @@ func (s *SQS) getNumberOfMessagesReceived(queueURI string) (float64, error) { }, } - result, err := s.getCWClient(queueURI).GetMetricData(&cloudwatch.GetMetricDataInput{ + cwClient, err := s.getCWClient(queueURI) + if err != nil { + return 0.0, err + } + + result, err := cwClient.GetMetricData(&cloudwatch.GetMetricDataInput{ EndTime: &endTime, StartTime: &startTime, MetricDataQueries: []*cloudwatch.MetricDataQuery{query}, From 1b67cec70cb68dcf21b050a5283589841633ed70 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Mon, 14 Sep 2020 13:27:19 +0530 Subject: [PATCH 2/7] Syntax fix --- pkg/queue/beanstalk.go | 1 + pkg/queue/sqs.go | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/queue/beanstalk.go b/pkg/queue/beanstalk.go index e3543741..9b4d4101 100644 --- a/pkg/queue/beanstalk.go +++ b/pkg/queue/beanstalk.go @@ -2,6 +2,7 @@ package queue import ( "errors" + "fmt" "io" "net/url" "path" diff --git a/pkg/queue/sqs.go b/pkg/queue/sqs.go index e14c81f0..1bcd2819 100644 --- a/pkg/queue/sqs.go +++ b/pkg/queue/sqs.go @@ -325,7 +325,12 @@ func (s *SQS) getAverageNumberOfMessagesSent(queueURI string) (float64, error) { }, } - result, err := s.getCWClient(queueURI).GetMetricData(&cloudwatch.GetMetricDataInput{ + cwClient, err := s.getCWClient(queueURI) + if err != nil { + return 0.0, err + } + + result, err := cwClient.GetMetricData(&cloudwatch.GetMetricDataInput{ EndTime: &endTime, StartTime: &startTime, MetricDataQueries: []*cloudwatch.MetricDataQuery{query}, From a9b7acbd08906f8bf61f214723afd3ebd69041b7 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Thu, 17 Sep 2020 14:28:47 +0000 Subject: [PATCH 3/7] Check if nil tubeset when connection is empty --- hack/format.sh | 2 +- pkg/queue/beanstalk.go | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/hack/format.sh b/hack/format.sh index 48c64954..df0626d2 100755 --- a/hack/format.sh +++ b/hack/format.sh @@ -1,7 +1,7 @@ #!/bin/sh set -o errexit set -o nounset -set -o pipefail +#set -o pipefail find . -type f | grep .go$ | grep -v vendor | xargs -I {} gofmt -w {} diff --git a/pkg/queue/beanstalk.go b/pkg/queue/beanstalk.go index 9b4d4101..1072fe0e 100644 --- a/pkg/queue/beanstalk.go +++ b/pkg/queue/beanstalk.go @@ -196,8 +196,17 @@ func (c *beanstalkClient) put( func (c *beanstalkClient) doLongPoll( longPollInterval int64) (bool, uint64, error) { + var id uint64 + var err error + tubeSet := beanstalk.NewTubeSet(c.conn, path.Base(c.queueURI)) - id, _, err := tubeSet.Reserve( + if tubeSet == nil { + err = c.reestablishConn() + if err != nil { + return false, id, err + } + } + id, _, err = tubeSet.Reserve( time.Duration(longPollInterval) * time.Second) if err == nil { return true, id, nil From 8c6f841e895f15ff35df7454577ff76bd003bf20 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Tue, 22 Sep 2020 07:24:22 +0530 Subject: [PATCH 4/7] Restablish connection on every failure Failures can happen due to many types of connection failures. This time it broken pipe. Better to retry on every failure till I gather the list of all failures. --- pkg/queue/beanstalk.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/queue/beanstalk.go b/pkg/queue/beanstalk.go index 1072fe0e..3b089f33 100644 --- a/pkg/queue/beanstalk.go +++ b/pkg/queue/beanstalk.go @@ -61,6 +61,7 @@ type BeanstalkClientInterface interface { put(body []byte, pri uint32, delay, t time.Duration) (id uint64, err error) getStats() (int32, int32, int32, error) longPollReceiveMessage(longPollInterval int64) (int32, int32, error) + reestablishConn() error } type beanstalkClient struct { @@ -318,6 +319,16 @@ func (b *Beanstalk) waitForShortPollInterval() { time.Sleep(b.shortPollInterval) } +func (b *Beanstalk) reestablishConn(queueURI string) { + client, err := b.getClient(queueURI) + if err != nil { + klog.Error(err) + } + + err = client.reestablishConn() + klog.Error(err) +} + func (b *Beanstalk) GetName() string { return b.name } @@ -335,6 +346,7 @@ func (b *Beanstalk) poll(key string, queueSpec QueueSpec) { if err != nil { klog.Errorf("Unable to perform request long polling %q, %v.", queueSpec.name, err) + b.reestablishConn(queueSpec.uri) return } @@ -360,6 +372,7 @@ func (b *Beanstalk) poll(key string, queueSpec QueueSpec) { if err != nil { klog.Errorf("Unable to get approximate messages in queue %q, %v.", queueSpec.name, err) + b.reestablishConn(queueSpec.uri) return } klog.V(3).Infof("%s: approxMessages=%d", queueSpec.name, approxMessages) @@ -387,6 +400,7 @@ func (b *Beanstalk) poll(key string, queueSpec QueueSpec) { if err != nil { klog.Errorf("Unable to fetch idle workers %q, %v.", queueSpec.name, err) + b.reestablishConn(queueSpec.uri) time.Sleep(100 * time.Millisecond) return } From 53a7813f1773da6025904e535b6b2c44c35f4d04 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Tue, 22 Sep 2020 07:36:03 +0530 Subject: [PATCH 5/7] Updated mock for reestablishconn --- pkg/queue/beanstalk_mock.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/queue/beanstalk_mock.go b/pkg/queue/beanstalk_mock.go index f8b79386..e0284616 100644 --- a/pkg/queue/beanstalk_mock.go +++ b/pkg/queue/beanstalk_mock.go @@ -84,3 +84,17 @@ func (mr *MockBeanstalkClientInterfaceMockRecorder) put(arg0, arg1, arg2, arg3 i mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "put", reflect.TypeOf((*MockBeanstalkClientInterface)(nil).put), arg0, arg1, arg2, arg3) } + +// reestablishConn mocks base method +func (m *MockBeanstalkClientInterface) reestablishConn() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "reestablishConn") + ret0, _ := ret[0].(error) + return ret0 +} + +// put indicates an expected call of put +func (mr *MockBeanstalkClientInterfaceMockRecorder) reestablishConn() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "reestablishConn", reflect.TypeOf((*MockBeanstalkClientInterface)(nil).reestablishConn)) +} From d345b7d655722a0577c847b984c450918017be21 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Tue, 22 Sep 2020 03:09:00 +0000 Subject: [PATCH 6/7] Client is nil due to conn then err --- pkg/queue/beanstalk.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/queue/beanstalk.go b/pkg/queue/beanstalk.go index 3b089f33..c4c6e300 100644 --- a/pkg/queue/beanstalk.go +++ b/pkg/queue/beanstalk.go @@ -264,6 +264,9 @@ func (b *Beanstalk) getClient( if err != nil { return nil, err } + if client == nil { + return nil, fmt.Errorf("Not able to make client for: %s\n", queueURI) + } b.clientPool.Store(queueURI, client) return client.(BeanstalkClientInterface), nil From e20ba0aeec5c1211650c32d243edaedecbd65b54 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Tue, 22 Sep 2020 09:18:18 +0000 Subject: [PATCH 7/7] Return if err in getting client --- pkg/queue/beanstalk.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/queue/beanstalk.go b/pkg/queue/beanstalk.go index c4c6e300..b8cc9150 100644 --- a/pkg/queue/beanstalk.go +++ b/pkg/queue/beanstalk.go @@ -325,7 +325,8 @@ func (b *Beanstalk) waitForShortPollInterval() { func (b *Beanstalk) reestablishConn(queueURI string) { client, err := b.getClient(queueURI) if err != nil { - klog.Error(err) + klog.Errorf("Could not reestablish conn, err:%v\n", err) + return } err = client.reestablishConn()