From ffebdab40cc405c3abe6932590dc0417370e62e5 Mon Sep 17 00:00:00 2001 From: Joshua Williams <102987390+jwlv@users.noreply.github.com> Date: Mon, 25 Nov 2024 12:05:24 -0800 Subject: [PATCH] Fix PCS/TRS resource leaks and scaling issues CASMHMS-6299 --- .version | 2 +- CHANGELOG.md | 19 + Makefile | 19 +- go.mod | 2 +- go.sum | 14 - pkg/trs_http_api/models.go | 30 +- pkg/trs_http_api/models_test.go | 9 +- pkg/trs_http_api/trshttp_interface.go | 13 +- pkg/trs_http_api/trshttp_local.go | 575 +++++++-- pkg/trs_http_api/trshttp_local_test.go | 1639 +++++++++++++++++++++++- 10 files changed, 2168 insertions(+), 154 deletions(-) diff --git a/.version b/.version index 3e3c2f1..4a36342 100644 --- a/.version +++ b/.version @@ -1 +1 @@ -2.1.1 +3.0.0 diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f64c73..e719ba4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,25 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [3.0.0] - 2024-11-25 + +### Fixed + +- TRS now supports configuration of connection counts and timeouts by callers +- TRS no longer closes all idle connections when http or contexts time out +- TRS no longer closes all idle connections when request retry limits are reached +- Reworked several sections of code for clarity and reduced code duplication +- Fixed bug where contexts were never being cancelled which lead to resource leaks +- Fixed bug to prevent 2nd request if 1st request's context timed out or canceled +- Additional tracing added for debug purposes +- Unit tests: Now run in verbose mode so failures are more easily analyzed +- Unit tests: Enabled TRS logging from inside unit tests +- Unit tests: Error signature changed to make identifying errors easier +- Unit tests: Reworked some existing unit tests +- Unit tests: Numerous unit tests added to test connection states +- Update required version of Go to 1.23 to avoid + https://github.com/golang/go/issues/59017 + ## [2.1.1] - 2024-10-31 ### Fixed diff --git a/Makefile b/Makefile index 86b7c5e..dda2e1f 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ # # MIT License # -# (C) Copyright 2022 Hewlett Packard Enterprise Development LP +# (C) Copyright 2022,2024 Hewlett Packard Enterprise Development LP # # Permission is hereby granted, free of charge, to any person obtaining a # copy of this software and associated documentation files (the "Software"), @@ -26,7 +26,22 @@ all: unittest integration .PHONY: unittest integration unittest: - go test ./pkg/trs_http_api/... -cover + # There are three different versions of go in the unit test VM. + # Versions may change over time but can be discovered with: + # + # ls /opt/hostedtoolcache/go + # + # Here's what's available as of Nov 20, 2024: + # + #/opt/hostedtoolcache/go/1.21.13/x64/bin/go test -v ./pkg/trs_http_api/... -cover -logLevel=2 + #/opt/hostedtoolcache/go/1.22.9/x64/bin/go test -v ./pkg/trs_http_api/... -cover -logLevel=2 + #/opt/hostedtoolcache/go/1.23.3/x64/bin/go test -v ./pkg/trs_http_api/... -cover -logLevel=2 + # + # -logLevel values: 0=Panic, 1=Fatal, 2=Error 3=Warn, 4=Info, 5=Debug, 6=Trace" + # + + go test -v ./pkg/trs_http_api/... -cover -logLevel=2 + # no -v -tags musl integration: diff --git a/go.mod b/go.mod index fe01845..bbb30d0 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/Cray-HPE/hms-trs-app-api/v2 -go 1.17 +go 1.23 require ( github.com/Cray-HPE/hms-base/v2 v2.0.1 diff --git a/go.sum b/go.sum index af8c43f..b83b14c 100644 --- a/go.sum +++ b/go.sum @@ -7,7 +7,6 @@ github.com/confluentinc/confluent-kafka-go v1.8.2/go.mod h1:u2zNLny2xq+5rWeTQjFH github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= @@ -23,13 +22,8 @@ github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVH github.com/hashicorp/go-retryablehttp v0.5.4/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU= github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk= -github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= -github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= -github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= -github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= -github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -42,14 +36,6 @@ github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8 github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/pkg/trs_http_api/models.go b/pkg/trs_http_api/models.go index d484887..f5ca9e0 100644 --- a/pkg/trs_http_api/models.go +++ b/pkg/trs_http_api/models.go @@ -1,6 +1,6 @@ // MIT License // -// (C) Copyright [2021] Hewlett Packard Enterprise Development LP +// (C) Copyright [2021,2024] Hewlett Packard Enterprise Development LP // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), @@ -27,12 +27,13 @@ import ( "context" "crypto/tls" "errors" - "github.com/google/uuid" "io/ioutil" "mime/multipart" "net/http" "net/url" "time" + + "github.com/google/uuid" ) type SerializedRequest struct { @@ -61,7 +62,7 @@ type HttpKafkaTx struct { Request SerializedRequest `json:",omitempty"` TimeStamp string `json:",omitempty"` // Time the request time.Now().String() Timeout time.Duration `json:",omitempty"` - RetryPolicy RetryPolicy + CPolicy ClientPolicy ServiceName string Ignore bool } @@ -77,14 +78,29 @@ type RetryPolicy struct { BackoffTimeout time.Duration } +type HttpTxPolicy struct { + Enabled bool // Enable or disable the policy + MaxIdleConns int + MaxIdleConnsPerHost int + IdleConnTimeout time.Duration + ResponseHeaderTimeout time.Duration + TLSHandshakeTimeout time.Duration + DisableKeepAlives bool +} + +type ClientPolicy struct { + Retry RetryPolicy + Tx HttpTxPolicy +} + type HttpTask struct { id uuid.UUID // message id, likely monotonically increasing ServiceName string //name of the service Request *http.Request TimeStamp string // Time the request was created/sent RFC3339Nano Err *error - Timeout time.Duration - RetryPolicy RetryPolicy + Timeout time.Duration // task's context timeout + CPolicy ClientPolicy Ignore bool context context.Context contextCancel context.CancelFunc @@ -143,7 +159,7 @@ func (ht HttpTask) ToHttpKafkaTx() (tx HttpKafkaTx) { //Fill the data tx.ID = ht.id tx.Timeout = ht.Timeout - tx.RetryPolicy = ht.RetryPolicy + tx.CPolicy = ht.CPolicy tx.TimeStamp = ht.TimeStamp tx.Request = ToSerializedRequest(*ht.Request) tx.ServiceName = ht.ServiceName @@ -156,7 +172,7 @@ func (tx HttpKafkaTx) ToHttpTask() (ht HttpTask) { //Fill the service data ht.id = tx.ID ht.ServiceName = tx.ServiceName - ht.RetryPolicy = tx.RetryPolicy + ht.CPolicy = tx.CPolicy ht.Timeout = tx.Timeout ht.TimeStamp = tx.TimeStamp ht.Ignore = tx.Ignore diff --git a/pkg/trs_http_api/models_test.go b/pkg/trs_http_api/models_test.go index 5d03143..b4d4114 100644 --- a/pkg/trs_http_api/models_test.go +++ b/pkg/trs_http_api/models_test.go @@ -1,6 +1,6 @@ // MIT License // -// (C) Copyright [2021] Hewlett Packard Enterprise Development LP +// (C) Copyright [2021,2024] Hewlett Packard Enterprise Development LP // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), @@ -23,11 +23,12 @@ package trs_http_api import ( - "github.com/google/uuid" - "github.com/stretchr/testify/suite" "net/http" "testing" "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/suite" ) type ModelsTS struct { @@ -43,7 +44,7 @@ func GenerateStockHttpTask() (ht HttpTask) { TimeStamp: time.Now().String(), Err: nil, Timeout: 0, - RetryPolicy: RetryPolicy{}, + CPolicy: ClientPolicy{}, context: nil, contextCancel: nil, } diff --git a/pkg/trs_http_api/trshttp_interface.go b/pkg/trs_http_api/trshttp_interface.go index 8c538ea..cfdcc83 100644 --- a/pkg/trs_http_api/trshttp_interface.go +++ b/pkg/trs_http_api/trshttp_interface.go @@ -1,6 +1,6 @@ // MIT License // -// (C) Copyright [2020-2022] Hewlett Packard Enterprise Development LP +// (C) Copyright [2020-2022,2024] Hewlett Packard Enterprise Development LP // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), @@ -26,13 +26,14 @@ import ( "context" "crypto/tls" "crypto/x509" + "sync" + "time" + + tkafka "github.com/Cray-HPE/hms-trs-kafkalib/v2/pkg/trs-kafkalib" "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/google/uuid" "github.com/hashicorp/go-retryablehttp" "github.com/sirupsen/logrus" - tkafka "github.com/Cray-HPE/hms-trs-kafkalib/v2/pkg/trs-kafkalib" - "sync" - "time" ) // Interface for TRS operations. There will be 2 sets of interface methods -- @@ -43,7 +44,7 @@ import ( // This is intentional -- using the local or remote variants of the interface // shouldn't make the code any different at all. If using the remote/worker // variant, the Kafka topics are calculated based on service name, sender id, -// and send/receive. This is all done under the covers, but will be +// and send/receive. This is all done under the covers, but will be // predictable enough for debugging. // // If for whatever reason an application wants a different return topic, it @@ -85,7 +86,7 @@ type TRSHTTPLocal struct { ctxCancelFunc context.CancelFunc CACertPool *x509.CertPool ClientCert tls.Certificate - clientMap map[RetryPolicy]*clientPack + clientMap map[ClientPolicy]*clientPack clientMutex sync.Mutex taskMap map[uuid.UUID]*taskChannelTuple taskMutex sync.Mutex diff --git a/pkg/trs_http_api/trshttp_local.go b/pkg/trs_http_api/trshttp_local.go index 7bc1c0a..d3f8e11 100644 --- a/pkg/trs_http_api/trshttp_local.go +++ b/pkg/trs_http_api/trshttp_local.go @@ -1,17 +1,17 @@ // MIT License -// +// // (C) Copyright [2020-2021,2024] Hewlett Packard Enterprise Development LP -// +// // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), // to deal in the Software without restriction, including without limitation // the rights to use, copy, modify, merge, publish, distribute, sublicense, // and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: -// +// // The above copyright notice and this permission notice shall be included // in all copies or substantial portions of the Software. -// +// // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL @@ -20,7 +20,6 @@ // ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR // OTHER DEALINGS IN THE SOFTWARE. - package trs_http_api import ( @@ -29,12 +28,16 @@ import ( "crypto/x509" "errors" "fmt" + "io" + "net/http" + "runtime" + "sync" + "time" + + base "github.com/Cray-HPE/hms-base/v2" "github.com/google/uuid" "github.com/hashicorp/go-retryablehttp" "github.com/sirupsen/logrus" - "net/http" - "time" - "github.com/Cray-HPE/hms-base/v2" ) const ( @@ -52,13 +55,13 @@ const ( // // ServiceName: Name of running service/application. // Return: Error string if something went wrong. - func (tloc *TRSHTTPLocal) Init(serviceName string, logger *logrus.Logger) error { if logger != nil { tloc.Logger = logger } else { tloc.Logger = logrus.New() } + //wrapperLogger = tloc.Logger // only uncomment if debugging wrapper issues tloc.ctx, tloc.ctxCancelFunc = context.WithCancel(context.Background()) @@ -69,10 +72,13 @@ func (tloc *TRSHTTPLocal) Init(serviceName string, logger *logrus.Logger) error } if tloc.clientMap == nil { tloc.clientMutex.Lock() - tloc.clientMap = make(map[RetryPolicy]*clientPack) + tloc.clientMap = make(map[ClientPolicy]*clientPack) tloc.clientMutex.Unlock() } tloc.svcName = serviceName + + tloc.Logger.Tracef("Init() successful") + return nil } @@ -105,6 +111,8 @@ func (tloc *TRSHTTPLocal) SetSecurity(inParams interface{}) error { } } + tloc.Logger.Tracef("SetSecurity() successful") + return nil } @@ -126,97 +134,498 @@ func (tloc *TRSHTTPLocal) CreateTaskList(source *HttpTask, numTasks int) []HttpT return createHTTPTaskArray(source, numTasks) } +// leveledLogrus implements the LeveledLogger interface in retryablehttp so +// we can control its log levels. We match TRS's log level as this is what +// TRS's caller wants to see. The code for this comes from the community as +// a recommended work around for the following issue: +// +// https://github.com/hashicorp/go-retryablehttp/issues/93 +// +// In the final version of the commit that added this capability, the +// decision was made NOT to pass our leveledLogrus down to retryablehttp. +// With the prior implementation, TRS was passing down a non-leveled +// logger set at Error level. With the leveled logger set at Error level +// it is actually much more chatty and logs every standard http request +// that fails for whatever reason. While this is helpful, and useful, +// it produces much more log data than previously. To prevent any +// topential log volume issues, the prior mechanism will be kept in place. +// The code needed for leveled logging will remain in place though in the +// event a new version of retryablehttp becomes less chatty at the Error +// level. + +type leveledLogrus struct { + *logrus.Logger +} + +func (l *leveledLogrus) fields(keysAndValues ...interface{}) map[string]interface{} { + fields := make(map[string]interface{}) + + for i := 0; i < len(keysAndValues) - 1; i += 2 { + fields[keysAndValues[i].(string)] = keysAndValues[i+1] + } + + return fields +} + +func (l *leveledLogrus) Error(msg string, keysAndValues ...interface{}) { + l.WithFields(l.fields(keysAndValues...)).Error(msg) +} +func (l *leveledLogrus) Warn(msg string, keysAndValues ...interface{}) { + l.WithFields(l.fields(keysAndValues...)).Warn(msg) +} +func (l *leveledLogrus) Info(msg string, keysAndValues ...interface{}) { + l.WithFields(l.fields(keysAndValues...)).Info(msg) +} +func (l *leveledLogrus) Debug(msg string, keysAndValues ...interface{}) { + l.WithFields(l.fields(keysAndValues...)).Debug(msg) +} + +// The retryablehttp module closes idle connections in an overly aggressive +// manner. If a single request experiences a timeout, all idle connections +// are closed. If a single requests exceeds all retries, all idle +// connections are closed. The following RoundTrip wrapper helps us +// wrap various retryablehttp and http interfaces to prevent this. + +type trsRoundTripper struct { + transport *http.Transport + closeIdleConnectionsFn func() + skipCloseCount uint64 + skipCloseMutex sync.Mutex + timeLastClosedOrReachedZeroCloseCount time.Time +} + +// Our RoundTripper(). Just call RoundTrip interface at next level down. +func (c *trsRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + return c.transport.RoundTrip(req) +} + +// Our wrapper around the standard http.Client's CloseIdleConnections() +// We only call the interface at the next level down if skipCloseCount +// counter is zero. This counter is decremented by our CheckRetry +// wrapper (further below) if it detects a http timeout, context +// timeout, or retry limit exceeded for a request. +// +// There may be a hole in the logic whereby a request sets the skip +// counter but is then killed for whatever reason before it can call into +// this wrapper. This would leave the counter at non-zero forever. It +// would be very unlikely to happen, but logically possible. We guard +// against this by resetting the counter to zero if its been 2 hours since: +// +// * The last call to c.CloseIdleConnectionsFn() +// * The last time the c.skipCloseCount reached zero +// +// Prior to this change, the TRS module would ALWAYS closing all idle +// connections after every single http timeout, context timeout, or retry +// count exceeded condition. So, if we close out all the connections +// occasionally after a two hour period, not a big deal. +// +// WARNING! The Go runtime behavior surrounding connections has changed in +// more recent versions of Go. Prior to version 1.23, if any +// request experiences a timeout, the Go runtime closes ALL idle +// connections. There is nothing we can do about this in TRS, +// other than use a newer version of Go that doesn't exhibit this +// behavior. Issue is documented here: +// +// * https://github.com/golang/go/issues/59017 +// * https://github.com/golang/go/commit/334ce510046ad30b1be466634cf313aad3040892 +// +// Will attempt to jump to Go 1.23 with this commit. + +//var wrapperLogger *logrus.Logger // only uncomment if debugging wrapper issues + +func (c *trsRoundTripper) CloseIdleConnections() { + // Skip closing idle connections if counter > 0 + + c.skipCloseMutex.Lock() + if c.skipCloseCount > 0 { + c.skipCloseCount-- + + //wrapperLogger.Errorf("CloseIdleConnections: skipCloseCount now %v", c.skipCloseCount) + + if c.skipCloseCount == 0 { + // Mark the time the counter last reached zero + c.timeLastClosedOrReachedZeroCloseCount = time.Now() + } + + if time.Since(c.timeLastClosedOrReachedZeroCloseCount) > (2 * time.Hour) { + // If its been two hours since we last closed idle connections + // or since the counter last reached zero, reset the counter to + // zero and proceed to close idle connections + + c.skipCloseCount = 0 + + //wrapperLogger.Errorf("CloseIdleConnections: skipCloseCount reset due to expired time") + + // Time will be marked below when we close idle connections + } else { + c.skipCloseMutex.Unlock() + + //wrapperLogger.Errorf("CloseIdleConnections: returning without closing idle connections") + + return + } + } + + // Continue holding mutex until skipCloseCountResetTime is updated + + if c.closeIdleConnectionsFn == nil { + // Nothing to do so release mutex + c.skipCloseMutex.Unlock() + + //wrapperLogger.Errorf("CloseIdleConnections: no lower closeIdleConnectionsFn() to call") + } else { + // Mark the time of this call to close connections + c.timeLastClosedOrReachedZeroCloseCount = time.Now() + + c.skipCloseMutex.Unlock() + + //wrapperLogger.Errorf("CloseIdleConnections: calling lower closeIdleConnectionsFn()") + + // Call next level down + c.closeIdleConnectionsFn() + } +} + +// Custom request wrapper that includes a retry counter that we'll use to +// determine whether or not to close idle connections +type trsWrappedReq struct { + orig *http.Request // request we're wrapping + retryMax int // retry max from request + retryCount int // retry count for this request +} + +// drainAndCloseBody is a simple helper function reused in many places + +func drainAndCloseBody(resp *http.Response) { + if resp != nil && resp.Body != nil { + _, _ = io.Copy(io.Discard, resp.Body) // Drain the body + resp.Body.Close() // Close the body + resp.Body = nil // Prevent double-handling + } +} + +// Our wrapper around retryablehttp's CheckRetry(). if we detect an http +// timeout, context timeout, or a retry limit exceeded for a request, then +// we decrement the skipCloseCount counter so that the next time our +// CloseIdleConnections() wrapper is called, it skips calling the lower +// level system version that actually closes idle connections. + +type retryKey string // avoids compiler warning +var trsRetryCountKey retryKey = "trsRetryCount" + +func (c *trsRoundTripper) trsCheckRetry(ctx context.Context, resp *http.Response, err error) (bool, error) { + //wrapperLogger.Errorf("trsCheckRetry: err=%v errType=%T", err, err) + + // Skip a retry for this request if it hit one of these specific timeouts + + if err != nil { + c.skipCloseMutex.Lock() + + // Skip retries for HTTPClient.Timeout + if err.Error() == "net/http: request canceled" { + c.skipCloseCount++ + + c.skipCloseMutex.Unlock() + + //wrapperLogger.Errorf("trsCheckRetry: skipCloseCount now %v (http timeout)", c.skipCloseCount) + + // retryablehttp will close any response bodies on retry skip + error + + return false, err // skip it + } + + // Context timeout set by TRS. No request should retry. + if errors.Is(err, context.DeadlineExceeded) { + c.skipCloseCount++ + + c.skipCloseMutex.Unlock() + + //wrapperLogger.Errorf("trsCheckRetry: skipCloseCount now %v (ctx timeout)", c.skipCloseCount) + + // retryablehttp will close any response bodies on retry skip + error + + return false, err // skip it + } + + // Lower level HTTPClient.Timeout triggered timeouts + // + // Unsure if this is wise so I left it commented out. If these + // happen they don't happen very much so closing all idle + // connections when they do happen is not a big deal. + // + // if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + // c.skipCloseCount++ + // + // c.skipCloseMutex.Unlock() + // + // wrapperLogger.Errorf("trsCheckRetry: skipCloseCount now %v (ctx timeout)", c.skipCloseCount) + // + // // retryablehttp will close any response bodies on retry skip + error + // + // return false, err // skip it + // } + + c.skipCloseMutex.Unlock() + + //wrapperLogger.Errorf("trsCheckRetry: deferring to DefaultRetryPolicy() for this error") + } + + // If none of the above, delegate retry check to retryablehttp + shouldRetry, defaultRPErr := retryablehttp.DefaultRetryPolicy(ctx, resp, err) + + //wrapperLogger.Errorf("trsCheckRetry: DefaultRetryPolicy: shouldRetry=%v", shouldRetry) + + // Determine if we should override DefaultRetryPolicy()'s opinion + if shouldRetry { + // This is our own personal copy of the retry counter for this + // request. Let's increment it then compare to the retry limit + + trsWR := ctx.Value(trsRetryCountKey).(*trsWrappedReq) + trsWR.retryCount++ + + //wrapperLogger.Errorf("trsCheckRetry: retryCount now %v", trsWR.retryCount) + + // If the retry limit was reached we do not want to close all idle + // connections unnecessarily so imcrement skipCloseCount counter so + // that our CloseIdleConnections() wrapper skips the next one + + if trsWR.retryCount > trsWR.retryMax { + // If no error present, let's give the caller the underlying + // reason why retries were exhausted, if we can determine it + if err == nil { + if resp != nil { + err = fmt.Errorf("retries exhausted: last attempt received status %d (%s)", + resp.StatusCode, http.StatusText(resp.StatusCode)) + } else { + err = fmt.Errorf("retries exhausted") + } + } + + // Skip an idle connection close + c.skipCloseMutex.Lock() + c.skipCloseCount++ + c.skipCloseMutex.Unlock() + + //wrapperLogger.Errorf("trsCheckRetry: skipCloseCount now %v and err is %v", c.skipCloseCount, err) + + // retryablehttp will close any response bodies on retry skip + error + + return false, err + } + } + + return shouldRetry, defaultRPErr +} + +// Create and configure a new client transport for use with HTTP clients. +func createClient(task *HttpTask, tloc *TRSHTTPLocal, clientType string) (client *retryablehttp.Client) { + // Configure the base transport + + tr := &http.Transport{} + + if clientType == "insecure" { + tr.TLSClientConfig = &tls.Config{ + InsecureSkipVerify: true, + } + } else { // secure + tr.TLSClientConfig = &tls.Config{ + RootCAs: tloc.CACertPool, + } + tr.TLSClientConfig.BuildNameToCertificate() + } + + // Configure base transport policies if requested + httpTxPolicy := task.CPolicy.Tx + if httpTxPolicy.Enabled { + tr.MaxIdleConns = httpTxPolicy.MaxIdleConns // if 0 defaults to 2 + tr.MaxIdleConnsPerHost = httpTxPolicy.MaxIdleConnsPerHost // if 0 defaults to 100 + tr.IdleConnTimeout = httpTxPolicy.IdleConnTimeout // if 0 defaults to no timeout + tr.ResponseHeaderTimeout = httpTxPolicy.ResponseHeaderTimeout // if 0 defaults to no timeout + tr.TLSHandshakeTimeout = httpTxPolicy.TLSHandshakeTimeout // if 0 defaults to 10s + tr.DisableKeepAlives = httpTxPolicy.DisableKeepAlives // if 0 defaults to false + } + + // Wrap base transport with retryablehttp + retryabletr := &trsRoundTripper{ + transport: tr, + closeIdleConnectionsFn: tr.CloseIdleConnections, + timeLastClosedOrReachedZeroCloseCount: time.Now(), + } + + // Create the httpretryable client and start configuring it + client = retryablehttp.NewClient() + + client.HTTPClient.Transport = retryabletr + + // We could set a global http timeout for all users of the client but + // that's a bit inflexible. Let's keep it at the default (unlimited) + // and use the user provided context timeout to limit the request. + // The context timeout they provide could be different from caller to + // caller as well. + // + //client.HTTPClient.Timeout = task.Timeout * 9 / 10 + + // Wrap httpretryable's DefaultRetryPolicy() so we can override + client.CheckRetry = retryabletr.trsCheckRetry + + // Configure the httpretryable client retry count + if (task.CPolicy.Retry.Retries >= 0) { + client.RetryMax = task.CPolicy.Retry.Retries + } else { + client.RetryMax = DFLT_RETRY_MAX + } + + // Configure the httpretryable client backoff timeout + if (task.CPolicy.Retry.BackoffTimeout > 0) { + client.RetryWaitMax = task.CPolicy.Retry.BackoffTimeout + } else { + client.RetryWaitMax = DFLT_BACKOFF_MAX * time.Second + } + + // Log this client's requested configuration and actual configuration + + tloc.Logger.Errorf("Created %s TRS client", clientType) + tloc.Logger.Errorf(" CPolicy request: %v", task.CPolicy) + tloc.Logger.Errorf(" RetryMax: %v", client.RetryMax) + tloc.Logger.Errorf(" RetryWaitMax: %v", client.RetryWaitMax) + tloc.Logger.Errorf(" task.Timeout: %v", task.Timeout) + tloc.Logger.Errorf(" HTTPClient.Timeout: %v", client.HTTPClient.Timeout) + + if (httpTxPolicy.Enabled) { + tloc.Logger.Errorf(" tx.MaxIdleConns: %v", tr.MaxIdleConns) + tloc.Logger.Errorf(" tx.MaxIdleConnsPerHost: %v", tr.MaxIdleConnsPerHost) + tloc.Logger.Errorf(" tx.IdleConnTimeout: %v", tr.IdleConnTimeout) + tloc.Logger.Errorf(" tx.ResponseHeaderTimeout: %v", tr.ResponseHeaderTimeout) + tloc.Logger.Errorf(" tx.TLSHandshakeTimeout: %v", tr.TLSHandshakeTimeout) + tloc.Logger.Errorf(" tx.DisableKeepAlives: %v", tr.DisableKeepAlives) + } + + tloc.Logger.Errorf(" Client pair count: %v", len(tloc.clientMap) + 1) + //tloc.Logger.Errorf(" Log level: %v", tloc.Logger.GetLevel()) + tloc.Logger.Errorf(" Log level: %v", logrus.ErrorLevel) + tloc.Logger.Errorf(" Go version: %v", runtime.Version()) + + return client +} + +// Reference: https://pkg.go.dev/github.com/hashicorp/go-retryablehttp + func ExecuteTask(tloc *TRSHTTPLocal, tct taskChannelTuple) { - //Find a client or make one! + + // Find a client to use, or make a new one! + var cpack *clientPack tloc.clientMutex.Lock() - if _, ok := tloc.clientMap[tct.task.RetryPolicy]; !ok { - //MAKE NEW CLIENT!!! - //Calculate backoff params. If caller didn't specify them, we get - //1 try and a 1 second wait. Not good. We'll use default minimums - //for both if caller doesn't. - rtMax := DFLT_RETRY_MAX - boffMax := DFLT_BACKOFF_MAX * time.Second - if (tct.task.RetryPolicy.Retries > 0) { - rtMax = tct.task.RetryPolicy.Retries - } - if (tct.task.RetryPolicy.BackoffTimeout > 0) { - boffMax = tct.task.RetryPolicy.BackoffTimeout - } + if _, ok := tloc.clientMap[tct.task.CPolicy]; !ok { + cpack = new(clientPack) + + cpack.insecure = createClient(tct.task, tloc, "insecure") + + // Do not use leveled logging for now. See explanation further + // up in the source code. Instead, use standard logger set at + // error level to avoid excessive logging + // + //httpLogger := logrus.New() + //httpLogger.SetLevel(tloc.Logger.GetLevel()) + //retryablehttpLogger := retryablehttp.LeveledLogger(&leveledLogrus{httpLogger}) + //cpack.insecure.Logger = retryablehttpLogger + httpLogger := logrus.New() httpLogger.SetLevel(logrus.ErrorLevel) - cpack = new(clientPack) - cpack.insecure = retryablehttp.NewClient() - tr := &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true},} - cpack.insecure.HTTPClient.Transport = tr cpack.insecure.Logger = httpLogger - cpack.insecure.RetryMax = rtMax - cpack.insecure.RetryWaitMax = boffMax + if (tloc.CACertPool != nil) { - cpack.secure = retryablehttp.NewClient() - tlsConfig := &tls.Config{RootCAs: tloc.CACertPool,} - tlsConfig.BuildNameToCertificate() - trs := &http.Transport{TLSClientConfig: tlsConfig,} - cpack.secure.HTTPClient.Transport = trs + cpack.secure = createClient(tct.task, tloc, "secure") + + //cpack.secure.Logger = retryablehttpLogger cpack.secure.Logger = httpLogger - cpack.secure.RetryMax = rtMax - cpack.secure.RetryWaitMax = boffMax } - tloc.clientMap[tct.task.RetryPolicy] = cpack + + tloc.clientMap[tct.task.CPolicy] = cpack } else { - cpack = tloc.clientMap[tct.task.RetryPolicy] + cpack = tloc.clientMap[tct.task.CPolicy] } tloc.clientMutex.Unlock() + // Found a client to use (or created a new one), now set up a request + + // First validate our task if ok, err := tct.task.Validate(); !ok { tloc.Logger.Errorf("Failed validation of request: %+v, err: %s", tct.task, err) tct.task.Err = &err + tct.taskListChannel <- tct.task + return } - tloc.Logger.Tracef("setting up context for request") - - //setup timeouts and context for request - tct.task.context, tct.task.contextCancel = context.WithTimeout(tloc.ctx, tct.task.Timeout) + // Add user agent header to the request base.SetHTTPUserAgent(tct.task.Request,tloc.svcName) + + // Wrap the request so trsCheckRetry() can keep its own retry count + // We'll attach this to the context further below so that it has access + // to it + trsWR := &trsWrappedReq{ + orig: tct.task.Request, // Core request + retryCount: 0, // Counter for CLIC() + retryMax: cpack.insecure.RetryMax, // CLIC() will need access to this + // same for both secure & insecure + } + + // Create child context with timeout and our own retry counter + + baseCtx, cancel := context.WithTimeout(tloc.ctx, tct.task.Timeout) + ctxWithValue := context.WithValue(baseCtx, trsRetryCountKey, trsWR) + + tct.task.context = ctxWithValue + tct.task.contextCancel = cancel + + // Create a retryablehttp request using the caller's request req, err := retryablehttp.FromRequest(tct.task.Request) - req = req.WithContext(tct.task.context) if err != nil { - tloc.Logger.Error(err) - tct.task.Request.Response = nil + tloc.Logger.Errorf("Failed wrapping request with retryablehttp: %v", err) tct.task.Err = &err + tct.taskListChannel <- tct.task + return } - if (tct.task.Err != nil) && (*tct.task.Err != nil) { - SendDelayedError(tct, tloc.Logger) - return + // Link retryablehttp's request context to the task's context + req.Request = req.Request.WithContext(tct.task.context) + + // Execute the request + var tmpError error + if (tct.task.forceInsecure || tloc.CACertPool == nil || cpack.secure == nil) { + tloc.Logger.Tracef("Using INSECURE client to send request") + tct.task.Request.Response, tmpError = cpack.insecure.Do(req) } else { - var tmpError error + tloc.Logger.Tracef("Using secure client to send request") + tct.task.Request.Response, tmpError = cpack.secure.Do(req) - if (tct.task.forceInsecure || (tloc.CACertPool == nil) || - (cpack.secure == nil)) { - tct.task.Request.Response, tmpError = cpack.insecure.Do(req) - } else { - tct.task.Request.Response, tmpError = cpack.secure.Do(req) + // Fall back to insecure only if the enclosing context was not + // cancelled or timed out. + if tmpError != nil && tct.task.context.Err() == nil { + // But first make sure we drain/close the body of the failed + // response, if there was one + drainAndCloseBody(tct.task.Request.Response) - //If the error is a TLS error, fall back to insecure and log it. - if (tmpError != nil) { - tloc.Logger.Warnf("TLS request failed, retrying without validation: %v", - tmpError) - tct.task.Request.Response, tmpError = cpack.insecure.Do(req) - } + tloc.Logger.Warnf("TLS request failed, retrying using INSECURE client (TLS failure: '%v')", tmpError) + tct.task.Request.Response, tmpError = cpack.insecure.Do(req) } + } - tct.task.Err = &tmpError - if (*tct.task.Err) != nil { - tloc.Logger.Tracef("Err: %s", (*tct.task.Err).Error()) - } - if tct.task.Request.Response != nil { - tloc.Logger.Tracef("Response: %d", tct.task.Request.Response.StatusCode) - } - tct.taskListChannel <- tct.task + tct.task.Err = &tmpError + if (*tct.task.Err) != nil { + tloc.Logger.Tracef("Request failed: %s", (*tct.task.Err).Error()) } + if tct.task.Request.Response != nil { + tloc.Logger.Tracef("Response: %d", tct.task.Request.Response.StatusCode) + } else { + tloc.Logger.Tracef("No response received") + } + + // TODO: Consider cancelling the context for this task here instead of + // leaving it up to the caller - Would have to read and close response + // here as well - return + tct.taskListChannel <- tct.task } // Launch an array of tasks. This is non-blocking. Use Check() to get @@ -276,6 +685,8 @@ func (tloc *TRSHTTPLocal) Launch(taskList *[]HttpTask) (chan *HttpTask, error) { go ExecuteTask(tloc, tct) } + tloc.Logger.Tracef("Launch() completed") + return taskListChannel, nil } @@ -323,6 +734,7 @@ func (tloc *TRSHTTPLocal) Cancel(taskList *[]HttpTask) { v.contextCancel() } } + tloc.Logger.Tracef("Cancel() completed") } // Close out a task list transaction. The frees up a small amount of resources @@ -333,15 +745,31 @@ func (tloc *TRSHTTPLocal) Cancel(taskList *[]HttpTask) { func (tloc *TRSHTTPLocal) Close(taskList *[]HttpTask) { for _, v := range *taskList { if (v.Ignore == false) { - if v.Request.Response != nil && v.Request.Response.Body != nil { - v.Request.Response.Body.Close() - } + // The caller should have closed the response body, but we'll also + // do it here to prevent resource leaks. Note that if that was + // the case, that connection was closed by the above cancel. + + drainAndCloseBody(v.Request.Response) + + // All tasks must be cancelled to prevent resource leaks. The + // caller may have called Cancel() to prematurely cancel the + // operation, but that's probably not a common thing so we will + // do it here. There is no harm in cancelling twice. We must + // do this before closing the response body. + // + // TODO: We may also want to consider having each go routine + // call cancel above in ExecuteTask() for themselves. Doing it + // like that might be less error prone. + + v.contextCancel() } tloc.taskMutex.Lock() delete(tloc.taskMap, v.id) tloc.taskMutex.Unlock() } + + tloc.Logger.Tracef("Close() completed") } // Clean up a local HTTP task system. @@ -376,5 +804,6 @@ func (tloc *TRSHTTPLocal) Cleanup() { tloc.taskMutex.Unlock() } + tloc.Logger.Tracef("Cleanup() completed") // this really just a big red button to STOP ALL? b/c im not clearing any memory } diff --git a/pkg/trs_http_api/trshttp_local_test.go b/pkg/trs_http_api/trshttp_local_test.go index 50d2279..42c018b 100644 --- a/pkg/trs_http_api/trshttp_local_test.go +++ b/pkg/trs_http_api/trshttp_local_test.go @@ -1,17 +1,17 @@ // MIT License -// +// // (C) Copyright [2021,2024] Hewlett Packard Enterprise Development LP -// +// // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), // to deal in the Software without restriction, including without limitation // the rights to use, copy, modify, merge, publish, distribute, sublicense, // and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: -// +// // The above copyright notice and this permission notice shall be included // in all copies or substantial portions of the Software. -// +// // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL @@ -20,55 +20,107 @@ // ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR // OTHER DEALINGS IN THE SOFTWARE. - package trs_http_api import ( - "github.com/Cray-HPE/hms-base/v2" - "testing" - "time" + "bufio" + "bytes" + "encoding/pem" + "flag" + "io" + "log" + "net" "net/http" "net/http/httptest" + "os" + "os/exec" + "regexp" + "sort" + "strings" + "sync/atomic" + "testing" + "time" + + base "github.com/Cray-HPE/hms-base/v2" + "github.com/sirupsen/logrus" ) var svcName = "TestMe" +// Note for unit test and TRS loggers: Log level can be controlled by +// by modifying the -logLevel command line option in the Makefile + +var logLevel logrus.Level // use this for more than logrus + +// TestMain is not a test. It runs before all other tests so that it can +// do any necessary initialization. Here we pars the command line arges +// so we can apply any log level override for the unit test loggers +func TestMain(m *testing.M) { + var logLevelInt int + + flag.IntVar(&logLevelInt, "logLevel", int(logrus.ErrorLevel), + "set log level (0=Panic, 1=Fatal, 2=Error 3=Warn, 4=Info, 5=Debug, 6=Trace)") + flag.Parse() + + logLevel = logrus.Level(logLevelInt) + + log.Printf("logLevel set to %v", logLevel) + + // Run the tests + code := m.Run() + + // Exit + os.Exit(code) +} + +// Create a logger for trs_http_api (not unit tests) so we can see what's +// going on in TRS when we hit issues. +func createLogger() *logrus.Logger { + trsLogger := logrus.New() + + trsLogger.SetFormatter(&logrus.TextFormatter{ FullTimestamp: true, }) + trsLogger.SetLevel(logrus.Level(logLevel)) + trsLogger.SetReportCaller(true) + + return trsLogger +} + func TestInit(t *testing.T) { tloc := &TRSHTTPLocal{} - tloc.Init(svcName,nil) + tloc.Init(svcName, createLogger()) if (tloc.taskMap == nil) { - t.Errorf("Init() failed to create task map") + t.Errorf("=====> ERROR: Init() failed to create task map <=====") } if (tloc.clientMap == nil) { - t.Errorf("Init() failed to create client map") + t.Errorf("=====> ERROR: Init() failed to create client map <=====") } if (tloc.svcName != svcName) { - t.Errorf("Init() failed to set service name") + t.Errorf("=====> ERROR: Init() failed to set service name <=====") } } func TestCreateTaskList(t *testing.T) { tloc := &TRSHTTPLocal{} - tloc.Init(svcName,nil) + tloc.Init(svcName, createLogger()) req,_ := http.NewRequest("GET","http://www.example.com",nil) tproto := HttpTask{Request: req,} base.SetHTTPUserAgent(req,tloc.svcName) tList := tloc.CreateTaskList(&tproto,5) if (len(tList) != 5) { - t.Errorf("CreateTaskList() didn't create a correct array.") + t.Errorf("=====> ERROR: CreateTaskList() didn't create a correct array. <=====") } for _,tsk := range(tList) { if (tsk.Request == nil) { - t.Errorf("CreateTaskList() didn't create a proper Request.") + t.Errorf("=====> ERROR: CreateTaskList() didn't create a proper Request. <=====") } if (len(tsk.Request.Header) == 0) { - t.Errorf("CreateTaskList() didn't create a proper Request header.") + t.Errorf("=====> ERROR: CreateTaskList() didn't create a proper Request header. <=====") } vals,ok := tsk.Request.Header["User-Agent"] if (!ok) { - t.Errorf("CreateTaskList() didn't copy User-Agent header.") + t.Errorf("=====> ERROR: CreateTaskList() didn't copy User-Agent header. <=====") } found := false for _,vr := range(vals) { @@ -78,58 +130,219 @@ func TestCreateTaskList(t *testing.T) { } } if (!found) { - t.Errorf("CreateTaskList() didn't copy User-Agent header.") + t.Errorf("=====> ERROR: CreateTaskList() didn't copy User-Agent header. <=====") } } } +// Check header for "User-Agent" func hasUserAgentHeader(r *http.Request) bool { if (len(r.Header) == 0) { return false } _,ok := r.Header["User-Agent"] - if (!ok) { + + return ok +} + +// Check header for "Trs-Fail-All-Retries" +func hasTRSAlwaysRetryHeader(r *http.Request) bool { + if (len(r.Header) == 0) { + return false + } + + _,ok := r.Header["Trs-Fail-All-Retries"] + + if ok == true { + if (logLevel >= logrus.DebugLevel) { + handlerLogger.Logf("Received Trs-Fail-All-Retries header") + } + } + return ok +} + +// Check header for "Trs-Context-Timeout" +func hasTRSStallHeader(r *http.Request) bool { + if (len(r.Header) == 0) { return false } - return true + + _,ok := r.Header["Trs-Context-Timeout"] + + if ok == true { + if (logLevel >= logrus.DebugLevel) { + handlerLogger.Logf("Received Trs-Context-Timeout header") + } + } + + return ok } +// launchHandler is the general handler used by the unittest http servers +// to handle incoming requests. Depending on the request headers, it can: +// +// * Return a 503 error for a specific request once +// * Return a 503 error for a specific request multiple times so it burns +// through all its retries +// * Stall indefinitly until a unit test signals to return a successful +// response +// * Return a successful response immediately + +var handlerLogger *testing.T // Allows logging in the handlers +var handlerSleep int = 2 // time to sleep to simulate network/BMC delays +var retrySleep int = 0 // time to sleep before returning 503 for retry +var nRetries int32 = 0 // how many retries before returning success +var nCtxTimeouts int = 0 // how many context timeouts + func launchHandler(w http.ResponseWriter, req *http.Request) { - if (!hasUserAgentHeader(req)) { - w.Write([]byte(`{"Message":"No User-Agent Header"}`)) - w.WriteHeader(http.StatusInternalServerError) - return + if (logLevel >= logrus.TraceLevel) { + handlerLogger.Logf("launchHandler received an HTTP %v.%v request", + req.ProtoMajor, req.ProtoMinor) } - w.Header().Set("Content-Type","application/json") - w.WriteHeader(http.StatusOK) - w.Write([]byte(`{"Message":"OK"}`)) + + // Distinguish between limited retries that will succeed and retries + // that should continually fail and exceed their retry limit + singletonRetry := false + itHasTRSAlwaysRetryHeader := hasTRSAlwaysRetryHeader(req) + if !itHasTRSAlwaysRetryHeader { + singletonRetry = atomic.AddInt32(&nRetries, -1) >= 0 + } + + if singletonRetry || itHasTRSAlwaysRetryHeader { + if (logLevel >= logrus.DebugLevel) { + handlerLogger.Logf("launchHandler 503 running (sleep for %vs)...", retrySleep) + } + if singletonRetry { + // Only update for tasks not retrying forever + nRetries-- + } + + // Delay retry based on test requirement + time.Sleep(time.Duration(retrySleep) * time.Second) + + // Clear retrySleep so next retry is immediate - Yes there will be + // many requests doing the same thing but that's ok + retrySleep = 0 + + w.Header().Set("Content-Type","application/json") + w.Header().Set("Retry-After","1") + //w.Header().Set("Connection","keep-alive") + //w.Header().Set("Connection", "close") + w.WriteHeader(http.StatusServiceUnavailable) + w.Write([]byte(`{"Message":"Service Unavailable"}`)) + + if (logLevel >= logrus.DebugLevel) { + handlerLogger.Logf("retryHandler returning Message Service Unavailable...") + } + } else if hasTRSStallHeader(req) { + stallHandler(w, req) + } else { + if (logLevel >= logrus.DebugLevel) { + handlerLogger.Logf("launchHandler running (sleep for %vs)...", handlerSleep) + } + + // Simulate network/BMC delays + time.Sleep(time.Duration(handlerSleep) * time.Second) + + if (!hasUserAgentHeader(req)) { + w.Write([]byte(`{"Message":"No User-Agent Header"}`)) + //w.Header().Set("Connection","keep-alive") + w.WriteHeader(http.StatusInternalServerError) + + if (logLevel >= logrus.DebugLevel) { + handlerLogger.Logf("launchHandler returning no User-Agent header...") + } + return + } + w.Header().Set("Content-Type","application/json") + //w.Header().Set("Connection","keep-alive") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"Message":"OK"}`)) + + if (logLevel >= logrus.DebugLevel) { + handlerLogger.Logf("launchHandler returning Message Ok...") + } + } + } +// stallHandler is handler used by the unittest http servers to simulate +// stalls in handling requests. It simply sits on a channel until the unit +// test in question signals it to return var stallCancel chan bool func stallHandler(w http.ResponseWriter, req *http.Request) { + // Wait for all connections to be established so output looks nice + time.Sleep(100 * time.Millisecond) + + if (logLevel >= logrus.DebugLevel) { + handlerLogger.Logf("stallHandler running (sleep for %vms)...", 100) + } + <-stallCancel + w.Header().Set("Content-Type","application/json") + //w.Header().Set("Connection","keep-alive") w.WriteHeader(http.StatusOK) w.Write([]byte(`{"Message":"OK"}`)) + + if (logLevel >= logrus.DebugLevel) { + handlerLogger.Logf("stallHandler returning Message Ok...") + } } func TestLaunch(t *testing.T) { + testLaunch(t, 5, false, false) +} + +func TestSecureLaunch(t *testing.T) { + testLaunch(t, 1, true, false) +} + +func TestSecureLaunchBadCert(t *testing.T) { + // Despite cert being bad, TRS should retry using the insecure + // client and succeed + testLaunch(t, 1, true, true) +} + +func testLaunch(t *testing.T, numTasks int, testSecureLaunch bool, useBadCert bool) { tloc := &TRSHTTPLocal{} - tloc.Init(svcName,nil) + tloc.Init(svcName, createLogger()) + + var srv *httptest.Server + if (testSecureLaunch == true) { + srv = httptest.NewTLSServer(http.HandlerFunc(launchHandler)) + + secInfo := TRSHTTPLocalSecurity{CACertBundleData: string("BAD CERT")} + + if (useBadCert != true) { + secInfo = TRSHTTPLocalSecurity{CACertBundleData: + string(pem.EncodeToMemory( + &pem.Block{Type: "CERTIFICATE", Bytes: srv.Certificate().Raw}, + )),} + } - srv := httptest.NewServer(http.HandlerFunc(launchHandler)) + err := tloc.SetSecurity(secInfo) + if err != nil { + t.Errorf("=====> ERROR: tloc.SetSecurity() failed: %v <=====", err) + return + } + } else { + srv = httptest.NewServer(http.HandlerFunc(launchHandler)) + } defer srv.Close() + handlerLogger = t + req,_ := http.NewRequest("GET",srv.URL,nil) tproto := HttpTask{Request: req, Timeout: 8*time.Second,} - tList := tloc.CreateTaskList(&tproto,5) + tList := tloc.CreateTaskList(&tproto, numTasks) tch,err := tloc.Launch(&tList) if (err != nil) { - t.Errorf("Launch ERROR: %v",err) + t.Errorf("=====> ERROR: tloc.Launch failed: %v <=====",err) } nDone := 0 @@ -138,52 +351,62 @@ func TestLaunch(t *testing.T) { tdone := <-tch nDone ++ if (tdone == nil) { - t.Errorf("Launch chan returned nil ptr.") + t.Errorf("=====> ERROR: Launch chan returned nil ptr. <=====") } if (tdone.Request == nil) { - t.Errorf("Launch chan returned nil Request.") + t.Errorf("=====> ERROR: Launch chan returned nil Request. <=====") } else if (tdone.Request.Response == nil) { - t.Errorf("Launch chan returned nil Response.") + t.Errorf("=====> ERROR: Launch chan returned nil Response. <=====") } else { if (tdone.Request.Response.StatusCode != http.StatusOK) { - t.Errorf("Launch chan returned bad status: %v",tdone.Request.Response.StatusCode) + t.Errorf("=====> ERROR: Launch chan returned bad status: %v <=====",tdone.Request.Response.StatusCode) nErr ++ } if ((tdone.Err != nil) && ((*tdone.Err) != nil)) { - t.Errorf("Launch chan returned error: %v",*tdone.Err) + t.Errorf("=====> ERROR: Launch chan returned error: %v <=====",*tdone.Err) } } running, err := tloc.Check(&tList) if (err != nil) { - t.Errorf("ERROR with Check(): %v",err) + t.Errorf("=====> ERROR: tloc.Check() failed: %v <=====",err) } if (nDone == len(tList)) { if (running) { - t.Errorf("ERROR, Check() says still running, but all tasks returned.") + t.Errorf("=====> ERROR: tloc.Check() says still running, but all tasks returned. <=====") } break } } if (nErr != 0) { - t.Errorf("Got %d errors from Launch",nErr) + t.Errorf("=====> ERROR: Got %d errors from Launch <=====",nErr) } } func TestLaunchTimeout(t *testing.T) { tloc := &TRSHTTPLocal{} - tloc.Init(svcName,nil) + tloc.Init(svcName, createLogger()) srv := httptest.NewServer(http.HandlerFunc(stallHandler)) defer srv.Close() + handlerLogger = t + req,_ := http.NewRequest("GET",srv.URL,nil) - tproto := HttpTask{Request: req, Timeout: 3*time.Second, RetryPolicy: RetryPolicy{Retries: 1, BackoffTimeout: 1 * time.Second,},} + tproto := HttpTask{ + Request: req, + Timeout: 3*time.Second, + CPolicy: ClientPolicy{ + Retry: RetryPolicy{ + Retries: 1, + BackoffTimeout: 3 * time.Second}, + }, + } tList := tloc.CreateTaskList(&tproto,1) stallCancel = make(chan bool, 1) tch,err := tloc.Launch(&tList) if (err != nil) { - t.Errorf("Launch ERROR: %v",err) + t.Errorf("=====> ERROR: tloc.Launch() failed: %v <=====",err) } time.Sleep(100 * time.Millisecond) @@ -193,23 +416,1347 @@ func TestLaunchTimeout(t *testing.T) { tdone := <-tch nDone ++ if (tdone == nil) { - t.Errorf("Launch chan returned nil ptr.") + t.Errorf("=====> ERROR: Launch chan returned nil ptr. <=====") } stallCancel <- true running, err := tloc.Check(&tList) if (err != nil) { - t.Errorf("ERROR with Check(): %v",err) + t.Errorf("=====> ERROR: tloc.Check() failed: %v <=====",err) } if (nDone == len(tList)) { if (running) { - t.Errorf("ERROR, Check() says still running, but all tasks returned.") + t.Errorf("=====> ERROR: Check() says still running, but all tasks returned. <=====") } break } } if (nErr != 0) { - t.Errorf("Got %d errors from Launch",nErr) + t.Errorf("=====> ERROR: Got %d errors from Launch <=====",nErr) + } + close(stallCancel) +} + +/////////////////////////////////////////////////////////////////////////// +// +// Documenting treatment of connections due to handling of response bodies here +// as its as good of place as any to put this information. +// +// If response body is closed +// +// And body was drained before closure: +// +// * Go client connection state: open, idle, reusable +// * OS connection state: open, idle, reusable +// * Istio connection state: open, idle, reusable +// +// And body was NOT drained before closure +// +// * Go client connection state: closed +// * OS connection state: closed +// * Istio connection state: closed +// +// If response body is NOT closed +// +// And body was drained +// +// * Go client connection state: open, unusable (resource leak) (dirty) +// * OS connection state: open, unusable (resource leak) (dirty) +// * Istio connection state: open, unusable (resource leak) (dirty) +// +// * Marked "dirty" and could get cleaned up any time since the +// body was drained +// +// * If/when IdleConnTimeout is exceeded (by default is 0 which means +// no timeout in place), it will be closed and: +// +// * I couldn't find a definitive answer if a minimal +// resource leak (which would NOT include body data) would +// be permanent or not in the Go client +// * Prior OS resource leak should now be freed (not sure I believe) +// * Prior Istio resource leak should now be freed (not sure I believe) +// +// And body was NOT drained +// +// * Go client connection state: open, unusable (resource leak) +// * OS connection state: open, unusable (resource leak) +// * Istio connection state: open, unusable (resource leak) +// +// * If/when IdleConnTimeout is exceeded (by default is 0 which means +// no timeout in place), OR if/when a context times out or is cancelled, +// it will be closed and: +// +// * Go client resource leak (including body data) will remain +// * Prior OS resource leak should now be freed (not sure I believe) +// * Prior Istio resource leak should now be freed (not sure I believe) +// +/////////////////////////////////////////////////////////////////////////// + +/////////////////////////////////////////////////////////////////////////// +// +// WARNING! The Go runtime behavior surrounding connections has changed in +// more recent versions of Go. Prior to version 1.23, if any +// connection in the connection pool experiences a timeout, the +// Go runtime closes ALL idle connections. There is nothing we +// can do about this in TRS, other than use a newer version of Go +// that doesn't exhibit this (horrible) behavior. +// +// In our unit tests, we can change the version of Go used by +// specifying it in the Makefile. There are further instructions +// in the Makefile on how to do this. +// +/////////////////////////////////////////////////////////////////////////// + + +// CustomConnState is a hook into httptest http servers that the unit tests +// below spin up. It allows is to log changes to connection states. This +// is critical when debugging connection state issues + +func CustomConnState(conn net.Conn, state http.ConnState) { + if logLevel >= logrus.DebugLevel { + log.Printf("HTTP_SERVER %v Connection -> %v\t%v", + conn.LocalAddr(), state, conn.RemoteAddr()) } } +// testOpenConnections is called with an argument representing the number +// of connections in the ESTAB(LISTHED) that we hope to find at the current +// moment. It does this by executing the 'ss' tool in the unit test vm and +// using the output to count various connection states. + +func testOpenConnections(t *testing.T, clientEstabExp int) { + // Set up 'ss' command and arguments + cmd := exec.Command( "ss", "--tcp", "--resolve", "--processes", "--all") + + // Execute the command + fullOutput, err := cmd.CombinedOutput() + if err != nil { + t.Errorf("=====> ERROR: error running ss utility: %v <=====", err) + return + } + + // srvrPorts - Map of every server port for trs_http_api + srvrPorts := map[string]bool{} + + // output - Map used to collect different buckets of output that we care about + output := map[string][]string{} + + // Use a scanner to parse through fullOutput line-by-line + scanner := bufio.NewScanner(bytes.NewReader(fullOutput)) + for scanner.Scan() { + line := scanner.Text() + + if strings.Contains(line, "Recv-Q") { + // Header line. Save it as it helps when reading output + output["header"] = append(output["header"], line) + continue + } else if strings.Contains(line, "LISTEN") { + // This is a server. LISTEN lines always comes first in the output. + // Ignore anything that isn't our test process + if !strings.Contains(line, "trs_http_api") { + output["ignoredListen"] = append(output["ignoredListen"], line) + continue + } + + // Grab the port so we can filter on it later + output["serverListen"] = append(output["serverListen"], line) + + re := regexp.MustCompile(`localhost:(\d+)`) + + match := re.FindStringSubmatch(line) + if len(match) > 1 { + srvrPorts[match[1]] = true + } else { + t.Errorf("=====> ERROR: Failed to find port in LISTEN line: %v <=====", line) + } + } else { + // This is a connection. We now need to distinguish client + // connections from server connections + re := regexp.MustCompile(`localhost:(\d+)\s+localhost:(\d+)`) + + match := re.FindStringSubmatch(line) + if len(match) > 2 { + srcPort := match[1] + dstPort := match[2] + + if _, exists := srvrPorts[srcPort]; exists { + // This is a server connection + output["serverOther"] = append(output["serverOther"], line) + } else { + // This is might be a client connection. Test to see + // if it targets one of our server ports + if _, exists := srvrPorts[dstPort]; exists { + // It's one of our client connections + if strings.Contains(line, "ESTAB") { + output["clientEstab"] = append(output["clientEstab"], line) + } else { + output["clientOther"] = append(output["clientOther"], line) + } + } else { + // An unrelated connection. Ignore + output["ignoredConn"] = append(output["ignoredConn"], line) + } + } + } else { + // An unrelated line of output. Ignore + output["ignoredMisc"] = append(output["ignoredMisc"], line) + } + } + } + + // This is the critical test for this routine + if (len(output["clientEstab"]) != clientEstabExp) { + t.Errorf("=====> ERROR: Expected %v ESTABLISH(ED) connections, but got %v <=====", + clientEstabExp, len(output["clientEstab"])) + if logLevel == logrus.TraceLevel { + t.Logf("Full 'ss' output:\n%s", fullOutput) + } + } + + // All other output as follows is for debug only + + if logLevel >= logrus.InfoLevel { + if len(output["header"]) > 0 { + t.Logf("") + for _,v := range(output["header"]) { + t.Log(v) + } + t.Logf("") + } + if len(output["clientEstab"]) > 0 { + sort.Strings(output["clientEstab"]) + + t.Logf("- Client ESTAB Connections: (%v)", len(output["clientEstab"])) + + if logLevel > logrus.InfoLevel { + t.Logf("") + for _,v := range(output["clientEstab"]) { + t.Log(v) + } + t.Logf("") + } + } + } + if logLevel >= logrus.DebugLevel { + if len(output["clientOther"]) > 0 { + sort.Strings(output["clientOther"]) + + t.Logf("- Client Other Connections: (%v)", len(output["clientOther"])) + + if logLevel > logrus.InfoLevel { + t.Logf("") + for _,v := range(output["clientOther"]) { + t.Log(v) + } + t.Logf("") + } + } + } + if logLevel >= logrus.InfoLevel { + if len(output["serverListen"]) > 0 { + sort.Strings(output["serverListen"]) + + t.Logf("- Server LISTEN Connections: (%v)", len(output["serverListen"])) + + if logLevel > logrus.InfoLevel { + t.Logf("") + for _,v := range(output["serverListen"]) { + t.Log(v) + } + t.Logf("") + } + } + } + if logLevel >= logrus.DebugLevel { + if len(output["serverOther"]) > 0 { + sort.Strings(output["serverOther"]) + + t.Logf("- Server Other Connections: (%v)", len(output["serverOther"])) + + if logLevel > logrus.InfoLevel { + t.Logf("") + for _,v := range(output["serverOther"]) { + t.Log(v) + } + t.Logf("") + } + } + } + if logLevel == logrus.TraceLevel { + if len(output["ignoredConn"]) > 0 { + sort.Strings(output["ignoredConn"]) + + t.Logf("- Ignored Connections: (%v)", len(output["ignoredConn"])) + t.Logf("") + for _,v := range(output["ignoredConn"]) { + t.Log(v) + } + t.Logf("") + } + if len(output["ignoredListen"]) > 0 { + sort.Strings(output["ignoredListen"]) + + t.Logf("- Ignored LISTEN Connections: (%v)", len(output["ignoredListen"])) + t.Logf("") + for _,v := range(output["ignoredListen"]) { + t.Log(v) + } + t.Logf("") + } + if len(output["ignoredMisc"]) > 0 { + sort.Strings(output["ignoredMisc"]) + + t.Logf("- Ignored Misc Output: (%v)", len(output["ignoredMisc"])) + t.Logf("") + for _,v := range(output["ignoredMisc"]) { + t.Log(v) + } + t.Logf("") + } + } +} + +// CustomReadCloser wraps an io.ReadCloser so we can track if response +// bodies get closed in the unit tests below. + +type CustomReadCloser struct { + io.ReadCloser + closed bool +} + +func (c *CustomReadCloser) Close() error { + c.closed = true + return c.ReadCloser.Close() +} + +func (c *CustomReadCloser) WasClosed() bool { + return c.closed +} + +// testConnsArg is the struct used to hold arguments to the lower layer +// connection tests so that that code can be reused for many tests + +type testConnsArg struct { + tListProto *HttpTask // Initialization to pass to tloc.CreateTaskList() + srvHandler func(http.ResponseWriter, *http.Request) // response handler to use + nTasks int // Number of tasks to create + maxIdleConnsPerHost int // Value of MaxIdleConnsPerHost + nSuccessRetries int32 // Number of retries to succeed + nFailRetries int // Number of retries to fail + nSkipDrainBody int // Number of response bodies to skip draining before closing + nSkipCloseBody int // Number of response bodies to skip closing + nCtxTimeouts int // Number of context timeouts + testIdleConnTimeout bool // Test idle connection timeout + runSecondTaskList bool // Run a second task list after the first with same server + openAtStart int // Expected number of ESTAB connections at beginning + openAfterLaunch int // Expected number of ESTAB connections after Launch() + openAfterTasksComplete int // Expected number of ESTAB connections after all tasks complete + openAfterBodyClose int // Expected number of ESTAB connections after closing response bodies + skipCancel bool // Skip cancel and go directly to Close() + openAfterCancel int // Expected number of ESTAB connections after cancelling tasks + openAfterClose int // Expected number of ESTAB connections after closing task list +} + +// logConnTestHeader prints header for each connection related test + +func logConnTestHeader(t *testing.T, a testConnsArg) { + t.Logf("") + t.Logf("--------------------------------------------------") + t.Logf("- Sub-Test Configuration -") + t.Logf("--------------------------------------------------") + t.Logf("") + + if logLevel < logrus.ErrorLevel { + return + } + + t.Logf(" nTasks: %v", a.nTasks) + t.Logf(" nSuccessRetries: %v", a.nSuccessRetries) + t.Logf(" nFailRetries: %v", a.nFailRetries) + t.Logf(" nSkipDrainBody: %v", a.nSkipDrainBody) + t.Logf(" nSkipCloseBody: %v", a.nSkipCloseBody) + t.Logf(" nCtxTimeouts: %v", a.nCtxTimeouts) + t.Logf("") + t.Logf(" testIdleConnTimeout: %v", a.testIdleConnTimeout) + t.Logf(" runSecondTaskList: %v", a.runSecondTaskList) + t.Logf("") + t.Logf(" Conns open after: start: %v", a.openAtStart) + t.Logf(" launch: %v", a.openAfterLaunch) + t.Logf(" tasksComplete: %v", a.openAfterTasksComplete) + t.Logf(" bodyClose: %v", a.openAfterBodyClose) + t.Logf(" cancel: %v (skip = %v)", a.openAfterCancel, a.skipCancel) + t.Logf(" close: %v", a.openAfterClose) + t.Logf("") + t.Logf(" rtPolicy: httpRetries: %v", a.tListProto.CPolicy.Retry.Retries) + t.Logf("") + + if a.tListProto.CPolicy.Tx.Enabled == true { + t.Logf("") + t.Logf(" txPolicy: MaxIdleConns: %v", a.tListProto.CPolicy.Tx.MaxIdleConns) + t.Logf(" MaxIdleConnsPerHost: %v", a.tListProto.CPolicy.Tx.MaxIdleConnsPerHost) + t.Logf(" IdleConnTimeout: %v", a.tListProto.CPolicy.Tx.IdleConnTimeout) + } + + t.Logf("") +} + +/////////////////////////////////////////////////////////////////////////// +// +// MANY CONNECTION TESTS BELOW ARE MARKED AS SKIP BECAUSE: +// +// * The unit test framework in GitHub only allows 10 minutes for ALL +// unit tests to complete. It would take hours to run them all. +// +// * Connection and task counts of at higher scale sometimes do not match +// what one would expect to see based on observations at smaller +// connection and task counts. That is not to say that things are not +// behaving correctly. It's more likely the case that at higher counts +// the underlying logic and timing of that logic is not as simplistic +// as it is at smaller counts for outside observers. Trying to write +// tests to precisely match what's going on in the black box at the +// system level is nearly impossible. The counts that the unit tests +// do detect at these higher counts are within a very reasonable small +// deviation from what one would expect. For these reasons, the tests +// at higher counts are not enabled by default. +// +// When changes to TRS are made in the future, developers should re- +// enable them and reconfirm that the resultant behavior is still within +// a reasonable deviation. +// +/////////////////////////////////////////////////////////////////////////// + +// TestConnsWithNoHttpTxPolicy* tests use by TRS users that do NOT +// configure the http transport. This would be the case for TRS users +// if they updated to the latest TRS without configuring the transport, +// which is a newer feature of TRS. + +func TestConnsWithNoHttpTxPolicy_Idle(t *testing.T) { + + t.Skip() /***************** COMMENT TO RUN TEST *****************/ + + nTasks := 2 // default MaxIdleConnsPerHost + nIssues := 1 + + testConnsWithNoHttpTxPolicy(t, nTasks, nIssues) +} + +func TestConnsWithNoHttpTxPolicy_ModeratlyBusy(t *testing.T) { + + //t.Skip() /***************** COMMENT TO RUN TEST *****************/ + + nTasks := 1000 + nIssues := 1 + + testConnsWithNoHttpTxPolicy(t, nTasks, nIssues) +} + +func TestConnsWithNoHttpTxPolicy_Busy(t *testing.T) { + + t.Skip() /***************** COMMENT TO RUN TEST *****************/ + + nTasks := 4000 + nIssues := 200 + + testConnsWithNoHttpTxPolicy(t, nTasks, nIssues) +} + +func TestConnsWithNoHttpTxPolicy_VeryBusy(t *testing.T) { + + t.Skip() /***************** COMMENT TO RUN TEST *****************/ + + nTasks := 8000 + nIssues := 40 + + testConnsWithNoHttpTxPolicy(t, nTasks, nIssues) +} + +// testConnsWithNoHttpTxPolicy() is the next call level down for the above +// tests that don't configure an HttpTxPolicy + +func testConnsWithNoHttpTxPolicy(t *testing.T, nTasks int, nIssues int) { + httpRetries := 3 + pcsStatusTimeout := 30 + ctxTimeout := time.Duration(pcsStatusTimeout) * time.Second + maxIdleConnsPerHost := 2 // default when not using HttpTxPolicy + //maxIdleConns := 100 // default when using HttpTxPolicy + //pcsTimeToNextStatusPoll := 30 // pmSampleInterval + + // Default prototype to initialize each task in the task list with + // Can customize prior to each test + defaultTListProto := &HttpTask{ + Timeout: ctxTimeout, + CPolicy: ClientPolicy { + Retry: RetryPolicy{Retries: httpRetries}, + }, + } + + // Initialize argument structure (will be modified each test) + a := testConnsArg{ + maxIdleConnsPerHost: maxIdleConnsPerHost, + tListProto: defaultTListProto, + srvHandler: launchHandler, // always returns success + } + + testConnsPrep(t, a, nTasks, nIssues) +} + +// TestConnsWithHttpTxPolicy* tests use by TRS users that do configure +// the http transport. We will use the defaults used by the PCS status +// configuration for most of these. + +func TestConnsWithHttpTxPolicy_PcsSmallIdle(t *testing.T) { + + //t.Skip() /***************** COMMENT TO RUN TEST *****************/ + + nTasks := 4 + nIssues := 4 + maxIdleConnsPerHost := 4 // PCS default when using HttpTxPolicy + maxIdleConns := 4000 // PCS default when using HttpTxPolicy + pcsStatusTimeout := 30 // PCS default + + testConnsWithHttpTxPolicy(t, nTasks, nIssues, maxIdleConnsPerHost, maxIdleConns, pcsStatusTimeout) +} + +func TestConnsWithHttpTxPolicy_PcsSmallModeratlyBusy(t *testing.T) { + + t.Skip() /***************** COMMENT TO RUN TEST *****************/ + + nTasks := 1000 + nIssues := 5 + maxIdleConnsPerHost := 4 // PCS default when using HttpTxPolicy + maxIdleConns := 4000 // PCS default when using HttpTxPolicy + pcsStatusTimeout := 30 // PCS default + + testConnsWithHttpTxPolicy(t, nTasks, nIssues, maxIdleConnsPerHost, maxIdleConns, pcsStatusTimeout) +} + +func TestConnsWithHttpTxPolicy_PcsSimulatedMedium(t *testing.T) { + + t.Skip() /***************** COMMENT TO RUN TEST *****************/ + + nTasks := 1000 + nIssues := 4 + maxIdleConnsPerHost := 1000 // Simulate more servers and larger connection pool + maxIdleConns := 4000 // PCS default when using HttpTxPolicy + pcsStatusTimeout := 30 // PCS default + + testConnsWithHttpTxPolicy(t, nTasks, nIssues, maxIdleConnsPerHost, maxIdleConns, pcsStatusTimeout) +} + +func TestConnsWithHttpTxPolicy_PcsSmallBusy(t *testing.T) { + + t.Skip() /***************** COMMENT TO RUN TEST *****************/ + + nTasks := 4000 + nIssues := 10 + maxIdleConnsPerHost := 1000 // Simulate more servers and larger connection pool + maxIdleConns := 4000 // PCS default when using HttpTxPolicy + pcsStatusTimeout := 30 // PCS default + + testConnsWithHttpTxPolicy(t, nTasks, nIssues, maxIdleConnsPerHost, maxIdleConns, pcsStatusTimeout) +} + +func TestConnsWithHttpTxPolicy_PcsLargeBusy(t *testing.T) { + + t.Skip() /***************** COMMENT TO RUN TEST *****************/ + + nTasks := 8000 + nIssues := 1000 + maxIdleConnsPerHost := 8000 // We're only using one Host server so pretend + maxIdleConns := 8000 // 8000 requests / 4 per host = 2000 BMCs + pcsStatusTimeout := 60 // Increase for pitiful unit test vm + + testConnsWithHttpTxPolicy(t, nTasks, nIssues, maxIdleConnsPerHost, maxIdleConns, pcsStatusTimeout) +} + +func TestConnsWithHttpTxPolicy_PcsHugeBusy(t *testing.T) { + + t.Skip() /***************** COMMENT TO RUN TEST *****************/ + + nTasks := 24000 // TRS can handle larger but unit test vm can't + nIssues := 2 + maxIdleConnsPerHost := 24000 // We're only using one Host server so pretend + maxIdleConns := 24000 // 24000 requests / 4 per host = 6000 BMCs + pcsStatusTimeout := 60 // Larger so we have more time to sleep to wait + + testConnsWithHttpTxPolicy(t, nTasks, nIssues, maxIdleConnsPerHost, maxIdleConns, pcsStatusTimeout) +} + +// testConnsWithHttpTxPolicy() is the next call level down for the above +// tests that do configure the http transport + +func testConnsWithHttpTxPolicy(t *testing.T, nTasks int, nIssues int, + maxIdleConnsPerHost int, maxIdleConns int, + pcsStatusTimeout int) { + httpRetries := 3 + ctxTimeout := time.Duration(pcsStatusTimeout) * time.Second + pcsTimeToNextStatusPoll := 30 // pmSampleInterval + + // idleConnTimeout is the time after which idle connections are closed. + // In PCS we want them to stay open between polling intervals so they + // can be reused for the next poll. Thus, we set it to the worst case + // time it takes for one poll (pcsStatusTimeout) plus the time until + // the next poll (pcsStatusPollInterval). We add an additional 50% to + // this for a buffer (ie. multiply by 150%). + idleConnTimeout := time.Duration( + (pcsStatusTimeout + pcsTimeToNextStatusPoll) * 15 / 10) * time.Second + + // Default prototype to initialize each task in the task list with + // Can customize prior to each test + defaultTListProto := &HttpTask{ + Timeout: ctxTimeout, + + CPolicy: ClientPolicy { + Retry: + RetryPolicy { + Retries: httpRetries, + }, + Tx: + HttpTxPolicy { + Enabled: true, + MaxIdleConns: maxIdleConns, + MaxIdleConnsPerHost: maxIdleConnsPerHost, + IdleConnTimeout: idleConnTimeout, + // ResponseHeaderTimeout: responseHeaderTimeout, + // TLSHandshakeTimeout: tLSHandshakeTimeout, + // DisableKeepAlives: DisableKeepAlives, + }, + }, + } + + // Initialize argument structure (will be modified each test) + a := testConnsArg{ + maxIdleConnsPerHost: maxIdleConnsPerHost, + tListProto: defaultTListProto, + srvHandler: launchHandler, // always returns success + } + + testConnsPrep(t, a, nTasks, nIssues) +} + +// testConnsPrep is the next call level down for ALL conection related +// unit tests. It configures each of the sub-tests to send down to yet +// another call level + +func testConnsPrep(t *testing.T, a testConnsArg, nTasks int, nIssues int) { + + t.Logf("") + t.Logf("============================================================") + t.Logf("= Test Configuration ===") + t.Logf("============================================================") + t.Logf("") + t.Logf("nTasks = %v", nTasks) + t.Logf("nIssues = %v", nIssues) + t.Logf("Retries = %v", a.tListProto.CPolicy.Retry.Retries) + t.Logf("ctxTimeout = %v", a.tListProto.Timeout) + + if a.tListProto.CPolicy.Tx.Enabled { + t.Logf("idleConnTimeout = %v", a.tListProto.CPolicy.Tx.IdleConnTimeout) + t.Logf("MaxIdleConns = %v", a.tListProto.CPolicy.Tx.MaxIdleConns) + t.Logf("MaxIdleConnsPerHost = %v", a.tListProto.CPolicy.Tx.MaxIdleConnsPerHost) + } else { + t.Logf("idleConnTimeout = 0 (default = unlimited)") + t.Logf("MaxIdleConnsPerHost = 2 (default)") + t.Logf("MaxIdleConns = 100 (default)") + } + + t.Logf("") + + /////////////////////////////////////////////////////// + // All successes + + a.nTasks = nTasks + a.nSuccessRetries = 0 + a.nFailRetries = 0 + a.nSkipDrainBody = 0 + a.nSkipCloseBody = 0 + a.nCtxTimeouts = 0 + + a.openAtStart = 0 + a.openAfterLaunch = a.nTasks + a.openAfterTasksComplete = a.openAfterLaunch + a.openAfterBodyClose = a.maxIdleConnsPerHost + a.openAfterCancel = a.maxIdleConnsPerHost + a.openAfterClose = a.maxIdleConnsPerHost + + a.testIdleConnTimeout = true // Let's test IdleConnTimeout + + testConns(t, a) + + a.testIdleConnTimeout = false // Reset to default + + /////////////////////////////////////////////////////// + // Successful retries + + a.nTasks = nTasks + a.nSuccessRetries = int32(nIssues) + a.nFailRetries = 0 + a.nSkipDrainBody = 0 + a.nSkipCloseBody = 0 + a.nCtxTimeouts = 0 + + a.openAtStart = 0 + a.openAfterLaunch = a.nTasks + a.openAfterTasksComplete = a.openAfterLaunch + a.openAfterBodyClose = a.maxIdleConnsPerHost + a.openAfterCancel = a.maxIdleConnsPerHost + a.openAfterClose = a.maxIdleConnsPerHost + + testConns(t, a) + + /////////////////////////////////////////////////////// + // Failed retries due to retries exceeded that complete before the + // successful reception of a response + + a.nTasks = nTasks + a.nSuccessRetries = 0 + a.nFailRetries = nIssues + a.nSkipDrainBody = 0 + a.nSkipCloseBody = 0 + a.nCtxTimeouts = 0 + + a.openAtStart = 0 + + // Connections for failed retries will be subject to maxIdleConnsPerHost + + openAfter := a.nFailRetries + if openAfter > a.maxIdleConnsPerHost { + openAfter = a.maxIdleConnsPerHost + } + + openAfter = openAfter + (nTasks - nIssues) + + a.openAfterLaunch = openAfter + a.openAfterTasksComplete = a.openAfterLaunch + a.openAfterBodyClose = a.maxIdleConnsPerHost + a.openAfterCancel = a.maxIdleConnsPerHost + a.openAfterClose = a.maxIdleConnsPerHost + + retrySleep = 0 // 0 seconds so retries complete first + + if a.nTasks < 1000 && a.nFailRetries < 10 { + handlerSleep = 5 // slow down the others + } else { + handlerSleep = 20 // slow down the others + } + + testConns(t, a) + + handlerSleep = 2 // Set back to default + retrySleep = 0 // Set back to default + + /////////////////////////////////////////////////////// + // Failed retries due to retries exceeded that completes after the + // successful reception of a response + + a.nTasks = nTasks + a.nSuccessRetries = 0 + a.nFailRetries = nIssues + a.nSkipDrainBody = 0 + a.nSkipCloseBody = 0 + a.nCtxTimeouts = 0 + + a.openAtStart = 0 + a.openAfterLaunch = a.nTasks + a.openAfterTasksComplete = a.maxIdleConnsPerHost // successful tasks closed bodies already + a.openAfterBodyClose = a.maxIdleConnsPerHost + a.openAfterCancel = a.maxIdleConnsPerHost + a.openAfterClose = a.maxIdleConnsPerHost + + // Slow down the retries so that the completing tasks finish up first. + + if a.nTasks <= 1000 { + retrySleep = 5 + } else if a.nTasks <= 4000 { + retrySleep = 10 + } else { + retrySleep = 20 + } + + testConns(t, a) + + retrySleep = 0 // Set back to default + + /////////////////////////////////////////////////////// + // Body Drain: skip + // Body Close: yes + // + // Even though we close the body, if it was not drained first the + // connection gets closed + + a.nTasks = nTasks + a.nSuccessRetries = 0 + a.nFailRetries = 0 + a.nSkipDrainBody = nIssues + a.nSkipCloseBody = 0 + a.nCtxTimeouts = 0 + + a.openAtStart = 0 + a.openAfterLaunch = a.nTasks + a.openAfterTasksComplete = a.openAfterLaunch + + // This test closes drained bodies so account for that along with the + // max number of connections allowed per host + + openAfter = a.nTasks - a.nSkipDrainBody + if openAfter > a.maxIdleConnsPerHost { + openAfter = a.maxIdleConnsPerHost + } + + a.openAfterBodyClose = openAfter + a.openAfterCancel = openAfter + a.openAfterClose = openAfter + + testConns(t, a) + + /////////////////////////////////////////////////////// + // Body Drain: yes + // Body Close: skip + // + // Connection stays open if the body is never closed and is not reusable + // for any other requests (unless body is later drained/closed) + // + // However, because it was marked "dirty" it can get cleaned up by + // the system at any time. I've seen that using 'ss' in the tests + // can force this to happen immediately after tasks complete if the + // number of open connections is greater than maxIdleConnsPerHost + + a.nTasks = nTasks + a.nSuccessRetries = 0 + a.nFailRetries = 0 + a.nSkipDrainBody = 0 + a.nSkipCloseBody = nIssues + a.nCtxTimeouts = 0 + + a.openAtStart = 0 + a.openAfterLaunch = a.nTasks + a.openAfterTasksComplete = a.openAfterLaunch + a.openAfterBodyClose = a.maxIdleConnsPerHost + a.openAfterCancel = a.maxIdleConnsPerHost + a.openAfterClose = a.maxIdleConnsPerHost + + testConns(t, a) + + /////////////////////////////////////////////////////// + // Body Drain: skip + // Body Close: skip + // + // The connection stays open if the body is never closed but what is + // different in this case is that if the body was also NOT drained, + // then the connection gets closed if the context times out or is + // cancelled (or IdleConnTimeout is reached). + + a.nTasks = nTasks + a.nSuccessRetries = 0 + a.nFailRetries = 0 + a.nSkipDrainBody = nIssues + a.nSkipCloseBody = nIssues + a.nCtxTimeouts = 0 + + a.openAtStart = 0 + a.openAfterLaunch = a.nTasks + a.openAfterTasksComplete = a.openAfterLaunch + + // Truncate the good connections down to MaxIdleConnsPerHost + + openAfter = a.openAfterTasksComplete - a.nSkipDrainBody + if openAfter > a.maxIdleConnsPerHost { + openAfter = a.maxIdleConnsPerHost + } + openAfter = openAfter + a.nSkipDrainBody + + a.openAfterBodyClose = openAfter + + openAfter = a.openAfterBodyClose - a.nSkipDrainBody + if openAfter > a.maxIdleConnsPerHost { + openAfter = a.maxIdleConnsPerHost + } + + a.openAfterCancel = openAfter + a.openAfterClose = openAfter + + testConns(t, a) + + /////////////////////////////////////////////////////// + // Timeouts. There is no http timeout set, so the only timeout that + // will trigger will be the context timeout. + + a.nTasks = nTasks + a.nSuccessRetries = 0 + a.nFailRetries = 0 + a.nSkipDrainBody = 0 + a.nSkipCloseBody = 0 + a.nCtxTimeouts = nIssues + + a.openAtStart = 0 + a.openAfterLaunch = a.nTasks + + // Timed out connections will close but we also need to account for + // the max idle connections allowed per host + + openAfter = a.openAfterLaunch - a.nCtxTimeouts + if openAfter > a.maxIdleConnsPerHost { + openAfter = a.maxIdleConnsPerHost + } + + a.openAfterTasksComplete = openAfter + a.openAfterBodyClose = openAfter + a.openAfterCancel = openAfter + a.openAfterClose = openAfter + + // We run a second task list through the client to verify that the prior + // task list, which had issues, has no effect on subsequent task list + // execution + + a.runSecondTaskList = true + + testConns(t, a) + + a.runSecondTaskList = false +} + +const sleepTimeToStabilizeConns = 250 * time.Millisecond + +// testConns runs tloc.Init() to set up a task list service. It starts +// the http server and creates the retryablehttp clients. It can issue +// multiple sets of task list requests to one final lower layer if the +// upper layers request it. +// +// WARNING: testConns()/runTaskList() is not capable of testing retries and +// timeouts within the same call. Please use different tests to +// test each + +func testConns(t *testing.T, a testConnsArg) { + logConnTestHeader(t, a) + + // Initialize the task system + tloc := &TRSHTTPLocal{} + tloc.Init(svcName, createLogger()) + + // Copy logger into global namespace so that the http server handlers + // can use it + handlerLogger = t + + // Create http server. We should probably make an addition later to + // create multiple servers. + srv := httptest.NewServer(http.HandlerFunc(a.srvHandler)) + + // Configure server to log changes to connection states + srv.Config.ConnState = CustomConnState + + // Run the primary task list test + runTaskList(t, tloc, a, srv) + + // If a second task list is requested, run it + if (a.runSecondTaskList) { + // Overwrite prior args. Any second task list run should always + // always succeed everything as we're testing to see if any problems + // caused by the prior run can affect a future run. The only valid + // impact we should see are open connections from the prior run, + // which is actually a good thing! + + // a.nTasks stays the same + // a.testIdleConnTimeout stays the same + + a.nSkipDrainBody = 0 // We want no issues + a.nSkipCloseBody = 0 // We want no issues + a.nSuccessRetries = 0 // We want no issues + a.nCtxTimeouts = 0 // We want no issues + a.nFailRetries = 0 // We want no issues + + // If we tested that exceeding IdleConnTimeout closes all connections + // during the first task list run, we should have no open collections + // at the start of this run + + if (a.testIdleConnTimeout) { + // They should have all timeed out and closed + a.openAtStart = 0 + } else { + // What was open at the end of the last run should still be open + a.openAtStart = a.openAfterClose + } + + // Carry forward the same number of tasks. Since there will be no + // issues or errors in the second run, open connections should be + // truncated down to MaxIdleConnsPerHost after bodies get closed + + a.openAfterLaunch = a.nTasks + a.openAfterTasksComplete = a.openAfterLaunch + a.openAfterBodyClose = a.maxIdleConnsPerHost + a.openAfterCancel = a.maxIdleConnsPerHost + a.openAfterClose = a.maxIdleConnsPerHost + + t.Logf("===================> RUNNING SECOND TASK LIST <===================") + + runTaskList(t, tloc, a, srv) + } + + t.Logf("Calling tloc.Cleanup to clean up task system") + tloc.Cleanup() + + // Cleaning up the task list system should close all connections. Verify + + time.Sleep(sleepTimeToStabilizeConns) + t.Logf("Testing connections after task list cleaned up (0)") + testOpenConnections(t, 0) + + t.Logf("Closing the server") + srv.Close() +} + +// runTaskList runs a task list from CreateTaskList() through to Close() +// It assumes a server is already running + +func runTaskList(t *testing.T, tloc *TRSHTTPLocal, a testConnsArg, srv *httptest.Server) { + // Verify correct number of open conections at start + + t.Logf("Testing connections at start (%v)", a.openAtStart) + testOpenConnections(t, a.openAtStart) + + // Create an http request + + req, err := http.NewRequest(http.MethodGet, srv.URL, nil) + if err != nil { + t.Fatalf("=====> ERROR: Failed to create request: %v <=====", err) + } + + // Set any necessary headers + + req.Header.Set("Accept", "*/*") + //req.Header.Set("Connection","keep-alive") + + // Create the task list + + a.tListProto.Request = req + t.Logf("Calling tloc.CreateTaskList() to create %v tasks for URL %v", a.nTasks, srv.URL) + tList := tloc.CreateTaskList(a.tListProto, a.nTasks) + + // Configure any requested retries and put at start of task list + + nRetries = a.nSuccessRetries // this signals the handler + + for i := 0; i < a.nFailRetries; i++ { + // Let handler know if it should fail all retries by this request + // as apposed to just failing it once + + tList[i].Request.Header.Set("Trs-Fail-All-Retries", "true") + + // TODO: Could put nRetries into header to reduce complexity + + if (logLevel == logrus.DebugLevel) { + t.Logf("Set request header %v for task %v", + tList[i].Request.Header, tList[i].GetID()) + } + } + + // Configure any requested timeouts and put at end of task list + + nCtxTimeouts = a.nCtxTimeouts // this signals the handler + + for i := len(tList) - 1; i > len(tList) - 1 - a.nCtxTimeouts; i-- { + // This header is what identifies this request to the handler + + tList[i].Request.Header.Set("Trs-Context-Timeout", "true") + + // TODO: Could put nCtxTimeouts into header to reduce complexity + + if (logLevel == logrus.DebugLevel) { + t.Logf("Set request header %v for task %v", + tList[i].Request.Header, tList[i].GetID()) + } + + // Create a channel which will allow us to later signal the stalled + // server handlers to return the response + + stallCancel = make(chan bool, a.nCtxTimeouts * 2) + } + + // Launch the task list + + t.Logf("Calling tloc.Launch() to launch all tasks") + taskListChannel, err := tloc.Launch(&tList) + if (err != nil) { + t.Errorf("=====> ERROR: tloc.Launch() failed: %v <=====", err) + } + + // Pause so that all tasks can be started and open their connections + + if a.nTasks <= 1000 { + time.Sleep(2 * time.Second) + } else if a.nTasks <= 4000 { + time.Sleep(10 * time.Second) + } else if a.nTasks <= 8000 { + time.Sleep(20 * time.Second) + } else { + time.Sleep(30 * time.Second) + } + + // All tasks should now be runnind and all connections should be in + // the ESTAB(LISHED) state + + testOpenConnections(t, a.openAfterLaunch) + + // If asked, here we attempt to close response bodies for tasks that have + // already completed, prior to tasks that will fail retries. This is + // only done if we were asked to test retries. We do it to verify that + // the completed tasks had their connections closed + + tasksToWaitFor := a.nTasks + if a.nFailRetries > 0 && retrySleep > 0 { + t.Logf("Waiting for %v non-retry tasks to complete", a.nTasks - a.nFailRetries) + + nWaitedFor := 0 + for i := 0; i < (a.nTasks - a.nFailRetries); i++ { + <-taskListChannel + tasksToWaitFor-- + nWaitedFor++ + } + + t.Logf("Draining/closing non-retry response bodies early before retry failures") + + for _, tsk := range(tList) { + if tsk.Request.Response != nil && tsk.Request.Response.Body != nil { + _, _ = io.Copy(io.Discard, tsk.Request.Response.Body) + + tsk.Request.Response.Body.Close() + + if logLevel == logrus.TraceLevel { + // Response headers can be helpful for debug + t.Logf("Response headers: %s", tsk.Request.Response.Header) + } + } + } + + // Wait for underlying system to perform actions on connections + time.Sleep(sleepTimeToStabilizeConns) + + oConns := nWaitedFor + if nWaitedFor > a.maxIdleConnsPerHost { + oConns = a.maxIdleConnsPerHost + } + oConns += a.nFailRetries + + t.Logf("Testing connections after non-retry request bodies closed (%v) (oabc=%v nfr=%v)", + oConns, a.openAfterBodyClose, a.nFailRetries) + + testOpenConnections(t, oConns) + } + + // Here we attempt to close task bodies and cancel context for tasks that + // have already completed, prior to tasks that will timeout. This is + // only done if we were asked to test timeouts. We do it to verify that + // the completed tasks had their connections closed at the right time + + if a.nCtxTimeouts > 0 { + t.Logf("Waiting for %v non-timeout tasks to complete", a.nTasks - a.nCtxTimeouts) + + nWaitedFor := 0 + for i := 0; i < (a.nTasks - a.nCtxTimeouts); i++ { + <-taskListChannel + tasksToWaitFor-- + nWaitedFor++ + } + + t.Logf("Draining/closing non-timeout response bodies early and canceling their contexts") + + for i := 0; i < len(tList) - a.nCtxTimeouts; i++ { + if tList[i].Request.Response != nil && tList[i].Request.Response.Body != nil { + _, _ = io.Copy(io.Discard, tList[i].Request.Response.Body) + + tList[i].Request.Response.Body.Close() + + if logLevel == logrus.TraceLevel { + // Response headers can be helpful for debug + t.Logf("Response headers: %s", tList[i].Request.Response.Header) + } + } + tList[i].contextCancel() + } + + // Wait for underlying system to perform actions on connections + + time.Sleep(sleepTimeToStabilizeConns) + + oConns := nWaitedFor + if nWaitedFor > a.maxIdleConnsPerHost { + oConns = a.maxIdleConnsPerHost + } + oConns += a.nCtxTimeouts + + t.Logf("Testing connections after non-timeout request bodies closed (%v)", oConns) + + testOpenConnections(t, oConns) + } + + // Now wait for ALL tasks to complete + + t.Logf("Waiting for %d tasks to complete", tasksToWaitFor) + for i := 0; i < tasksToWaitFor; i++ { + <-taskListChannel + } + + // Close the task list channel to prevent resource leakage + + t.Logf("Closing the task list channel") + close(taskListChannel) + + // Wait for underlying system to perform actions on connections + + if a.nTasks <= 4000 { + time.Sleep(sleepTimeToStabilizeConns) + } else { + time.Sleep(5 * time.Second) + } + + t.Logf("Testing connections after tasks complete (%v)", a.openAfterTasksComplete) + testOpenConnections(t, a.openAfterTasksComplete) + + // Set up custom read closer to test response body closure + + for _, tsk := range(tList) { + if tsk.Request.Response != nil && tsk.Request.Response.Body != nil { + tsk.Request.Response.Body = &CustomReadCloser{tsk.Request.Response.Body, false} + } + } + + // Now drain and close response bodies per configurated request. + + t.Logf("Draining/Closing response bodies (skipClose=%v skipDrain=%v)", a.nSkipCloseBody, a.nSkipDrainBody) + + nBodyClosesSkipped := 0 + nBodyDrainSkipped := 0 + + for _, tsk := range(tList) { + // Skip closing any requested response bodies + + if nBodyClosesSkipped < a.nSkipCloseBody { + if tsk.Request.Response != nil && tsk.Request.Response.Body != nil { + // May also want to skip drainind the body before skipping closure + if nBodyDrainSkipped < a.nSkipDrainBody { + + nBodyDrainSkipped++ + + if logLevel >= logrus.DebugLevel { + t.Logf("Skipping draining response body for task %v", tsk.GetID()) + } + } else { + _, _ = io.Copy(io.Discard, tsk.Request.Response.Body) + } + + nBodyClosesSkipped++ + + if logLevel >= logrus.DebugLevel { + t.Logf("Skipping closing response body for task %v", tsk.GetID()) + } + continue + } + } + + // Close and (maybe) drain the remaining response bodies + + if tsk.Request.Response != nil && tsk.Request.Response.Body != nil { + // Skip draining the body if requested + if nBodyDrainSkipped < a.nSkipDrainBody { + + nBodyDrainSkipped++ + + if logLevel >= logrus.DebugLevel { + t.Logf("Skipping draining response body for task %v", tsk.GetID()) + } + } else { + _, _ = io.Copy(io.Discard, tsk.Request.Response.Body) + } + + // Do the close + + tsk.Request.Response.Body.Close() + + if logLevel == logrus.TraceLevel { + // Response headers can be helpful for debug + t.Logf("Closed response body for task %v with response headers: %s", + tsk.GetID(), tsk.Request.Response.Header) + } + } + } + + // Closing the body may affect the state of open connections + + // Wait for underlying system to perform actions on connections + time.Sleep(sleepTimeToStabilizeConns) + + t.Logf("Testing connections after response bodies closed (%v)", a.openAfterBodyClose) + testOpenConnections(t, a.openAfterBodyClose) + + // TRS users are not required to call tloc.Cancel() so lets test both ways + if a.skipCancel { + t.Logf("Skipping tloc.Cancel()") + } else { + // tloc.Cancel() cancels the contexts for all of the tasks in the task list + t.Logf("Calling tloc.Cancel() to cancel all tasks (skipCancel=false)") + tloc.Cancel(&tList) + + // Cancelling the task list should not alter connection state + + time.Sleep(sleepTimeToStabilizeConns) + + t.Logf("Testing connections after task list cancelled (%v)", a.openAfterCancel) + testOpenConnections(t, a.openAfterCancel) + } + + // tloc.Close() cancels all contexts, closes any reponse bodies left + // open, and removes all of the tasks from the task list + + t.Logf("Calling tloc.Close() to close out the task list") + tloc.Close(&tList) + + // Closing the task list should not alter connection state + + time.Sleep(sleepTimeToStabilizeConns) + + t.Logf("Testing connections after task list closed (%v)", a.openAfterClose) + testOpenConnections(t, a.openAfterClose) + + // Verify that tloc.Close() did indeed close the response bodies that + // we left open to verify that it WOULD close them + + t.Logf("Checking for closed response bodies") + + for _, tsk := range(tList) { + if tsk.Request.Response != nil && tsk.Request.Response.Body != nil { + if !tsk.Request.Response.Body.(*CustomReadCloser).WasClosed() { + t.Errorf("=====> ERROR: Expected response body for %v to be closed, but it was not <=====", tsk.GetID()) + } + } + } + + // Verify task list was closed + + t.Logf("Checking that the task list was closed") + + if (len(tloc.taskMap) != 0) { + t.Errorf("=====> ERROR: Expected task list map to be empty <=====") + } + + // If we stalled any requests, they are still stalled in the server + // handler. Lets release them now so that we can cleanly stop the + // servers + + if (a.nCtxTimeouts > 0) { + t.Logf("Signaling stalled handlers ") + for i := 0; i < a.nCtxTimeouts * 2; i++ { + stallCancel <- true + } + } + + // We likely have many connections open and idle. If requested, pause + // until IdleConnTimeout expires so that we verify they then close + + if a.testIdleConnTimeout && a.tListProto.CPolicy.Tx.Enabled { + // TODO: Should also comfirm no client "other" connections as well + t.Logf("Testing connections after idleConnTimeout (0)") + + time.Sleep(a.tListProto.CPolicy.Tx.IdleConnTimeout) + + testOpenConnections(t, 0) + } +}