diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index d549e5238..8634868d5 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -46,7 +46,7 @@ jobs: uses: actions/checkout@v4 - name: Set up Go 1.22 - uses: actions/setup-go@v5.1.0 + uses: actions/setup-go@v5.2.0 with: go-version: 1.22.x check-latest: true diff --git a/.github/workflows/deploy_node.yml b/.github/workflows/deploy_node.yml index 80dd2ad6b..edb45bfed 100644 --- a/.github/workflows/deploy_node.yml +++ b/.github/workflows/deploy_node.yml @@ -69,7 +69,7 @@ jobs: fetch-tags: true - name: Set up Go 1.22 - uses: actions/setup-go@v5.1.0 + uses: actions/setup-go@v5.2.0 with: go-version: 1.22.x check-latest: true diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 02bcc2295..795154421 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -28,12 +28,14 @@ jobs: uses: actions/checkout@v4 with: fetch-depth: 0 + - name: Set up Go 1.22 - uses: actions/setup-go@v5.1.0 + uses: actions/setup-go@v5.2.0 with: go-version: 1.22.x check-latest: true cache: false # don't save & restore build caches because golangci-lint action does it internally + - name: Get dependencies run: go mod vendor - name: golangci-lint-soft @@ -51,8 +53,13 @@ jobs: if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name != 'wavesplatform/gowaves' steps: + - name: Check out code into the Go module directory + uses: actions/checkout@v4 + with: + fetch-depth: 0 + - name: Set up Go 1.22 - uses: actions/setup-go@v5.1.0 + uses: actions/setup-go@v5.2.0 with: go-version: 1.22.x check-latest: true @@ -61,11 +68,6 @@ jobs: - name: Set up GolangCI-Lint run: curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh| sh -s -- -b $HOME/bin v1.61.0 - - name: Check out code into the Go module directory - uses: actions/checkout@v4 - with: - fetch-depth: 0 - - name: Get dependencies run: go mod vendor @@ -88,17 +90,17 @@ jobs: github.repository == 'wavesplatform/gowaves'}} steps: + - name: Check out code into the Go module directory + uses: actions/checkout@v4 + - name: Set up Go 1.22 - uses: actions/setup-go@v5.1.0 + uses: actions/setup-go@v5.2.0 with: go-version: 1.22.x check-latest: true cache: true id: go - - name: Check out code into the Go module directory - uses: actions/checkout@v4 - - name: Get dependencies run: go mod vendor diff --git a/.github/workflows/itests.yml b/.github/workflows/itests.yml index efbfd90f0..9438e12a1 100644 --- a/.github/workflows/itests.yml +++ b/.github/workflows/itests.yml @@ -18,17 +18,17 @@ jobs: if: (github.event_name == 'workflow_dispatch' || github.event.review.state == 'APPROVED') && github.repository == 'wavesplatform/gowaves' steps: + - name: Check out code into the Go module directory + uses: actions/checkout@v4 + - name: Set up Go 1.22 - uses: actions/setup-go@v5.1.0 + uses: actions/setup-go@v5.2.0 with: go-version: 1.22.x check-latest: true cache: true id: go - - name: Check out code into the Go module directory - uses: actions/checkout@v4 - - name: Get dependencies run: go mod vendor diff --git a/.github/workflows/itests_race.yml b/.github/workflows/itests_race.yml index 42c3402c9..f72cc05ca 100644 --- a/.github/workflows/itests_race.yml +++ b/.github/workflows/itests_race.yml @@ -11,17 +11,17 @@ jobs: if: github.repository == 'wavesplatform/gowaves' steps: + - name: Check out code into the Go module directory + uses: actions/checkout@v4 + - name: Set up Go 1.22 - uses: actions/setup-go@v5.1.0 + uses: actions/setup-go@v5.2.0 with: go-version: 1.22.x check-latest: true cache: true id: go - - name: Check out code into the Go module directory - uses: actions/checkout@v4 - - name: Get dependencies run: go mod vendor diff --git a/.github/workflows/security.yml b/.github/workflows/security.yml index 62ceabe87..158f58009 100644 --- a/.github/workflows/security.yml +++ b/.github/workflows/security.yml @@ -69,7 +69,7 @@ jobs: - name: Checkout repository uses: actions/checkout@v4 - name: Set up Go - uses: actions/setup-go@v5.1.0 + uses: actions/setup-go@v5.2.0 with: go-version: 1.22.x check-latest: true diff --git a/go.mod b/go.mod index b3797de5d..ffaec493f 100644 --- a/go.mod +++ b/go.mod @@ -46,11 +46,11 @@ require ( go.uber.org/atomic v1.11.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 - golang.org/x/crypto v0.30.0 + golang.org/x/crypto v0.31.0 golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e golang.org/x/sync v0.10.0 golang.org/x/sys v0.28.0 - google.golang.org/grpc v1.68.1 + google.golang.org/grpc v1.69.0 google.golang.org/protobuf v1.35.2 moul.io/zapfilter v1.7.0 ) @@ -110,10 +110,10 @@ require ( github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/xeipuuv/gojsonschema v1.2.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/net v0.29.0 // indirect + golang.org/x/net v0.30.0 // indirect golang.org/x/term v0.27.0 // indirect golang.org/x/text v0.21.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect rsc.io/tmplfunc v0.0.3 // indirect diff --git a/go.sum b/go.sum index 4bbbbf2c7..3f2697502 100644 --- a/go.sum +++ b/go.sum @@ -84,6 +84,10 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2 github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-redis/redis v6.15.8+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-redis/redis/v8 v8.4.2/go.mod h1:A1tbYoHSa1fXwN+//ljcCYYJeLmVrwL9hbQN45Jdy0M= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= @@ -129,6 +133,8 @@ github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8/go.mod h1:K1liHPHnj73 github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= @@ -329,6 +335,16 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.opentelemetry.io/otel v0.14.0/go.mod h1:vH5xEuwy7Rts0GNtsCW3HYQoZDY+OmBJ6t1bFGGlxgw= +go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= +go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= +go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= +go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= +go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= +go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0= +go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc= +go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= +go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys= +go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.8.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= @@ -348,8 +364,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.30.0 h1:RwoQn3GkWiMkzlX562cLB7OxWvjH1L8xutO2WoJcRoY= -golang.org/x/crypto v0.30.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e h1:I88y4caeGeuDQxgdoFPUq097j7kNfw6uvuiNxUBfcBk= golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -373,8 +389,8 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= -golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= +golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -442,10 +458,10 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= -google.golang.org/grpc v1.68.1 h1:oI5oTa11+ng8r8XMMN7jAOmWfPZWbYpCFaMUTACxkM0= -google.golang.org/grpc v1.68.1/go.mod h1:+q1XYFJjShcqn0QZHvCyeR4CXPA+llXIeUIfIe00waw= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 h1:X58yt85/IXCx0Y3ZwN6sEIKZzQtDEYaBWrDvErdXrRE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= +google.golang.org/grpc v1.69.0 h1:quSiOM1GJPmPH5XtU+BCoVXcDVJJAzNcoyfC2cCjGkI= +google.golang.org/grpc v1.69.0/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/itests/clients/net_client.go b/itests/clients/net_client.go index d69cecd7c..a81a549ec 100644 --- a/itests/clients/net_client.go +++ b/itests/clients/net_client.go @@ -57,7 +57,7 @@ func NewNetClient( slog.SetLogLoggerLevel(slog.LevelError) conf := networking.NewConfig(p, h). - WithLogger(log). + WithSlogHandler(log.Handler()). WithWriteTimeout(networkTimeout). WithKeepAliveInterval(pingInterval). WithSlogAttributes(slog.String("suite", t.Name()), slog.String("impl", impl.String())) diff --git a/pkg/networking/configuration.go b/pkg/networking/configuration.go index cda0e377e..151bc8541 100644 --- a/pkg/networking/configuration.go +++ b/pkg/networking/configuration.go @@ -12,7 +12,7 @@ const ( // Config allows to set some parameters of the [Conn] or it's underlying connection. type Config struct { - logger *slog.Logger + slogHandler slog.Handler protocol Protocol handler Handler keepAlive bool @@ -25,7 +25,6 @@ type Config struct { // Other parameters are set to their default values. func NewConfig(p Protocol, h Handler) *Config { return &Config{ - logger: slog.Default(), protocol: p, handler: h, keepAlive: true, @@ -35,9 +34,9 @@ func NewConfig(p Protocol, h Handler) *Config { } } -// WithLogger sets the logger. -func (c *Config) WithLogger(logger *slog.Logger) *Config { - c.logger = logger +// WithSlogHandler sets the slog handler. +func (c *Config) WithSlogHandler(handler slog.Handler) *Config { + c.slogHandler = handler return c } diff --git a/pkg/networking/logging.go b/pkg/networking/logging.go new file mode 100644 index 000000000..94338ab31 --- /dev/null +++ b/pkg/networking/logging.go @@ -0,0 +1,18 @@ +package networking + +import ( + "context" + "log/slog" +) + +// TODO: Remove this file and the handler when the default [slog.DiscardHandler] will be introduced in +// Go version 1.24. See https://go-review.googlesource.com/c/go/+/626486. + +// discardingHandler is a logger that discards all log messages. +// It is used when no slog handler is provided in the [Config]. +type discardingHandler struct{} + +func (h discardingHandler) Enabled(context.Context, slog.Level) bool { return false } +func (h discardingHandler) Handle(context.Context, slog.Record) error { return nil } +func (h discardingHandler) WithAttrs([]slog.Attr) slog.Handler { return h } +func (h discardingHandler) WithGroup(string) slog.Handler { return h } diff --git a/pkg/networking/network.go b/pkg/networking/network.go index 6715f24ed..f145b4cb1 100644 --- a/pkg/networking/network.go +++ b/pkg/networking/network.go @@ -3,12 +3,12 @@ package networking import ( "context" "errors" - "fmt" "io" ) const Namespace = "NET" +// TODO: Consider special Error type for all [networking] errors. var ( // ErrInvalidConfigurationNoProtocol is used when the configuration has no protocol. ErrInvalidConfigurationNoProtocol = errors.New("invalid configuration: empty protocol") @@ -16,6 +16,12 @@ var ( // ErrInvalidConfigurationNoHandler is used when the configuration has no handler. ErrInvalidConfigurationNoHandler = errors.New("invalid configuration: empty handler") + // ErrInvalidConfigurationNoKeepAliveInterval is used when the configuration has an invalid keep-alive interval. + ErrInvalidConfigurationNoKeepAliveInterval = errors.New("invalid configuration: invalid keep-alive interval value") + + // ErrInvalidConfigurationNoWriteTimeout is used when the configuration has an invalid write timeout. + ErrInvalidConfigurationNoWriteTimeout = errors.New("invalid configuration: invalid write timeout value") + // ErrUnacceptableHandshake is used when the handshake is not accepted. ErrUnacceptableHandshake = errors.New("handshake is not accepted") @@ -23,7 +29,7 @@ var ( ErrSessionShutdown = errors.New("session shutdown") // ErrConnectionWriteTimeout indicates that we hit the timeout writing to the underlying stream connection. - ErrConnectionWriteTimeout = fmt.Errorf("connection write timeout") + ErrConnectionWriteTimeout = errors.New("connection write timeout") // ErrKeepAliveProtocolFailure is used when the protocol failed to provide a keep-alive message. ErrKeepAliveProtocolFailure = errors.New("protocol failed to provide a keep-alive message") diff --git a/pkg/networking/session.go b/pkg/networking/session.go index b0f760555..5723b1913 100644 --- a/pkg/networking/session.go +++ b/pkg/networking/session.go @@ -12,6 +12,7 @@ import ( "net" "strings" "sync" + "sync/atomic" "time" "github.com/wavesplatform/gowaves/pkg/execution" @@ -36,10 +37,9 @@ type Session struct { sendLock sync.Mutex // Guards the sendCh. sendCh chan *sendPacket // sendCh is used to send data to the connection. - establishedLock sync.Mutex // Guards the established field. - established bool // Indicates that incoming Handshake was successfully accepted. - shutdownLock sync.Mutex // Guards the shutdown field. - shutdown bool // shutdown is used to safely close the Session. + receiving atomic.Bool // Indicates that receiveLoop already running. + established atomic.Bool // Indicates that incoming Handshake was successfully accepted. + shutdown sync.Once // shutdown is used to safely close the Session. } // NewSession is used to construct a new session. @@ -50,9 +50,16 @@ func newSession(ctx context.Context, config *Config, conn io.ReadWriteCloser, tp if config.handler == nil { return nil, ErrInvalidConfigurationNoHandler } + if config.keepAlive && config.keepAliveInterval <= 0 { + return nil, ErrInvalidConfigurationNoKeepAliveInterval + } + if config.connectionWriteTimeout <= 0 { + return nil, ErrInvalidConfigurationNoWriteTimeout + } if tp == nil { return nil, ErrEmptyTimerPool } + sCtx, cancel := context.WithCancel(ctx) s := &Session{ g: execution.NewTaskGroup(suppressContextCancellationError), @@ -65,17 +72,16 @@ func newSession(ctx context.Context, config *Config, conn io.ReadWriteCloser, tp sendCh: make(chan *sendPacket, 1), // TODO: Make the size of send channel configurable. } - attributes := []any{ - slog.String("namespace", Namespace), - slog.String("remote", s.RemoteAddr().String()), + if config.slogHandler == nil { + config.slogHandler = discardingHandler{} } - attributes = append(attributes, config.attributes...) - if config.logger == nil { - s.logger = slog.Default().With(attributes...) - } else { - s.logger = config.logger.With(attributes...) + sa := [...]any{ + slog.String("namespace", Namespace), + slog.String("remote", s.RemoteAddr().String()), } + attrs := append(sa[:], config.attributes...) + s.logger = slog.New(config.slogHandler).With(attrs...) s.g.Run(s.receiveLoop) s.g.Run(s.sendLoop) @@ -109,29 +115,24 @@ func (s *Session) RemoteAddr() net.Addr { // Close is used to close the session. It is safe to call Close multiple times from different goroutines, // subsequent calls do nothing. func (s *Session) Close() error { - s.shutdownLock.Lock() - defer s.shutdownLock.Unlock() - - if s.shutdown { - return nil // Fast path - session already closed. - } - s.shutdown = true - - s.logger.Debug("Closing session") - clErr := s.conn.Close() // Close the underlying connection. - if clErr != nil { - s.logger.Warn("Failed to close underlying connection", "error", clErr) - } - s.logger.Debug("Underlying connection closed") + var err error + s.shutdown.Do(func() { + s.logger.Debug("Closing session") + clErr := s.conn.Close() // Close the underlying connection. + if clErr != nil { + s.logger.Warn("Failed to close underlying connection", "error", clErr) + } + s.logger.Debug("Underlying connection closed") - s.cancel() // Cancel the underlying context to interrupt the loops. + s.cancel() // Cancel the underlying context to interrupt the loops. - s.logger.Debug("Waiting for loops to finish") - err := s.g.Wait() // Wait for loops to finish. + s.logger.Debug("Waiting for loops to finish") + err = s.g.Wait() // Wait for loops to finish. - err = errors.Join(err, clErr) // Combine loops finalization errors with connection close error. + err = errors.Join(err, clErr) // Combine loops finalization errors with connection close error. - s.logger.Debug("Session closed", "error", err) + s.logger.Debug("Session closed", "error", err) + }) return err } @@ -157,7 +158,9 @@ func (s *Session) waitForSend(data []byte) error { timer.Reset(s.config.connectionWriteTimeout) defer s.tp.Put(timer) - s.logger.Debug("Sending data", "data", base64.StdEncoding.EncodeToString(data)) + if s.logger.Enabled(s.ctx, slog.LevelDebug) { + s.logger.Debug("Sending data", "data", base64.StdEncoding.EncodeToString(data)) + } ready := &sendPacket{data: data, err: errCh} select { case s.sendCh <- ready: @@ -171,8 +174,8 @@ func (s *Session) waitForSend(data []byte) error { } dataCopy := func() { - if data == nil { - return // A nil data is ignored. + if len(data) == 0 { + return // An empty data is ignored. } // In the event of session shutdown or connection write timeout, we need to prevent `send` from reading @@ -215,10 +218,12 @@ func (s *Session) sendLoop() error { return s.ctx.Err() case packet := <-s.sendCh: - s.logger.Debug("Sending data to connection", - "data", base64.StdEncoding.EncodeToString(packet.data)) packet.mu.Lock() - if packet.data != nil { + if s.logger.Enabled(s.ctx, slog.LevelDebug) { + s.logger.Debug("Sending data to connection", + "data", base64.StdEncoding.EncodeToString(packet.data)) + } + if len(packet.data) != 0 { // Copy the data into the buffer to avoid holding a mutex lock during the writing. _, err := dataBuf.Write(packet.data) if err != nil { @@ -253,9 +258,9 @@ func (s *Session) sendLoop() error { // receiveLoop continues to receive data until a fatal error is encountered or underlying connection is closed. // Receive loop works after handshake and accepts only length-prepended messages. func (s *Session) receiveLoop() error { - s.establishedLock.Lock() // Prevents from running multiple receiveLoops. - defer s.establishedLock.Unlock() - + if !s.receiving.CompareAndSwap(false, true) { + return nil // Prevent running multiple receive loops. + } for { if err := s.receive(); err != nil { if errors.Is(err, ErrConnectionClosedOnRead) { @@ -268,7 +273,7 @@ func (s *Session) receiveLoop() error { } func (s *Session) receive() error { - if s.established { + if s.established.Load() { hdr := s.config.protocol.EmptyHeader() return s.readMessage(hdr) } @@ -295,7 +300,7 @@ func (s *Session) readHandshake() error { return ErrUnacceptableHandshake } // Handshake is acceptable, we can switch the session into established state. - s.established = true + s.established.Store(true) s.config.handler.OnHandshake(s, hs) return nil } @@ -303,11 +308,14 @@ func (s *Session) readHandshake() error { func (s *Session) readMessage(hdr Header) error { // Read the header if _, err := hdr.ReadFrom(s.bufRead); err != nil { - if errors.Is(err, io.EOF) || strings.Contains(err.Error(), "closed") || - strings.Contains(err.Error(), "reset by peer") || - strings.Contains(err.Error(), "broken pipe") { // In Docker network built on top of pipe, we get this error on close. + if errors.Is(err, io.EOF) { return ErrConnectionClosedOnRead } + if errMsg := err.Error(); strings.Contains(errMsg, "closed") || + strings.Contains(errMsg, "reset by peer") || + strings.Contains(errMsg, "broken pipe") { // In Docker network built on top of pipe, we get this error on close. + return errors.Join(ErrConnectionClosedOnRead, err) // Wrap the error with ErrConnectionClosedOnRead. + } s.logger.Error("Failed to read header", "error", err) return err } @@ -329,7 +337,7 @@ func (s *Session) readMessage(hdr Header) error { func (s *Session) readMessagePayload(hdr Header, conn io.Reader) error { // Wrap in a limited reader s.logger.Debug("Reading message payload", "len", hdr.PayloadLength()) - conn = &io.LimitedReader{R: conn, N: int64(hdr.PayloadLength())} + conn = io.LimitReader(conn, int64(hdr.PayloadLength())) // Copy into buffer s.receiveLock.Lock() @@ -354,7 +362,10 @@ func (s *Session) readMessagePayload(hdr Header, conn io.Reader) error { // We lock the buffer from modification on the time of invocation of OnReceive handler. // The slice of bytes passed into the handler is only valid for the duration of the handler invocation. // So inside the handler better deserialize message or make a copy of the bytes. - s.logger.Debug("Invoking OnReceive handler", "message", base64.StdEncoding.EncodeToString(s.receiveBuffer.Bytes())) + if s.logger.Enabled(s.ctx, slog.LevelDebug) { + s.logger.Debug("Invoking OnReceive handler", "message", + base64.StdEncoding.EncodeToString(s.receiveBuffer.Bytes())) + } s.config.handler.OnReceive(s, s.receiveBuffer.Bytes()) // Invoke OnReceive handler. s.receiveBuffer.Reset() // Reset the buffer for the next message. return nil diff --git a/pkg/networking/session_test.go b/pkg/networking/session_test.go index 51fbd8369..cbeb9ad8e 100644 --- a/pkg/networking/session_test.go +++ b/pkg/networking/session_test.go @@ -385,7 +385,7 @@ func TestCloseParentContext(t *testing.T) { func testConfig(t testing.TB, p networking.Protocol, h networking.Handler, direction string) *networking.Config { log := slogt.New(t) return networking.NewConfig(p, h). - WithLogger(log). + WithSlogHandler(log.Handler()). WithWriteTimeout(1 * time.Second). WithKeepAliveDisabled(). WithSlogAttribute(slog.String("direction", direction)) diff --git a/pkg/networking/timers.go b/pkg/networking/timers.go index 5f2d2949f..9dd227c8a 100644 --- a/pkg/networking/timers.go +++ b/pkg/networking/timers.go @@ -5,13 +5,12 @@ import ( "time" ) -const initialTimerInterval = time.Hour * 1e6 - type timerPool struct { p *sync.Pool } func newTimerPool() *timerPool { + const initialTimerInterval = time.Hour * 1e6 return &timerPool{ p: &sync.Pool{ New: func() any { @@ -32,10 +31,11 @@ func (p *timerPool) Get() *time.Timer { } func (p *timerPool) Put(t *time.Timer) { - t.Stop() - select { - case <-t.C: - default: + if !t.Stop() { + select { + case <-t.C: + default: + } } p.p.Put(t) } diff --git a/pkg/proto/eth_utils.go b/pkg/proto/eth_utils.go index 07587f042..d75fd69c9 100644 --- a/pkg/proto/eth_utils.go +++ b/pkg/proto/eth_utils.go @@ -72,7 +72,7 @@ func copyBytes(bytes []byte) []byte { return copiedBytes } -// copyBytes returns an exact copy of the provided big.Int. +// copyBigInt returns an exact copy of the provided big.Int. func copyBigInt(v *big.Int) *big.Int { if v == nil { return nil diff --git a/pkg/state/accounts_data_storage.go b/pkg/state/accounts_data_storage.go index 5369d7f4e..c60e2539d 100644 --- a/pkg/state/accounts_data_storage.go +++ b/pkg/state/accounts_data_storage.go @@ -114,7 +114,7 @@ func (s *accountsDataStorage) setLastAddrNum(lastAddrNum uint64) error { return nil } -// newestAddressToNum returns the number of given address. It looks up for the address in cache map first +// newestAddrToNum returns the number of given address. It looks up for the address in cache map first // and if not present in state. The second result parameter is true if account's number was found cache otherwise false. // Error can be `keyvalue.ErrNotFound` if no corresponding number found for given address. func (s *accountsDataStorage) newestAddrToNum(addr proto.Address) (uint64, bool, error) {