From 14420bc474194ec3c46aa4eb8d44bc328ae80072 Mon Sep 17 00:00:00 2001 From: Alexey Kiselev Date: Mon, 16 Dec 2024 17:22:42 +0400 Subject: [PATCH 1/4] Close error channel on sending data successfully. Better error channel passing. Reset receiving buffer by deffering. --- pkg/networking/session.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/pkg/networking/session.go b/pkg/networking/session.go index 5723b1913..2599c9dd4 100644 --- a/pkg/networking/session.go +++ b/pkg/networking/session.go @@ -192,8 +192,12 @@ func (s *Session) waitForSend(data []byte) error { } select { - case err := <-errCh: - s.logger.Debug("Data sent", "error", err) + case err, ok := <-errCh: + if !ok { + s.logger.Debug("Data sent successfully") + return nil // No error, data was sent successfully. + } + s.logger.Debug("Error sending data", "error", err) return err case <-s.ctx.Done(): dataCopy() @@ -249,8 +253,8 @@ func (s *Session) sendLoop() error { s.logger.Debug("Data written into connection") } - // No error, successful send - s.asyncSendErr(packet.err, nil) + // No error, close the channel. + close(packet.err) } } } @@ -347,6 +351,7 @@ func (s *Session) readMessagePayload(hdr Header, conn io.Reader) error { // Allocate the receiving buffer just-in-time to fit the full message. s.receiveBuffer = bytes.NewBuffer(make([]byte, 0, hdr.HeaderLength()+hdr.PayloadLength())) } + defer s.receiveBuffer.Reset() _, err := hdr.WriteTo(s.receiveBuffer) if err != nil { s.logger.Error("Failed to write header to receiving buffer", "error", err) @@ -367,7 +372,6 @@ func (s *Session) readMessagePayload(hdr Header, conn io.Reader) error { base64.StdEncoding.EncodeToString(s.receiveBuffer.Bytes())) } s.config.handler.OnReceive(s, s.receiveBuffer.Bytes()) // Invoke OnReceive handler. - s.receiveBuffer.Reset() // Reset the buffer for the next message. return nil } @@ -399,11 +403,11 @@ func (s *Session) keepaliveLoop() error { type sendPacket struct { mu sync.Mutex // Protects data from unsafe reads. data []byte - err chan error + err chan<- error } // asyncSendErr is used to try an async send of an error. -func (s *Session) asyncSendErr(ch chan error, err error) { +func (s *Session) asyncSendErr(ch chan<- error, err error) { if ch == nil { return } From 412377f81c21605f9437ae677f1f6e963906d84f Mon Sep 17 00:00:00 2001 From: Alexey Kiselev Date: Mon, 16 Dec 2024 17:30:20 +0400 Subject: [PATCH 2/4] Better error handling while reading. Co-authored-by: Nikolay Eskov --- pkg/networking/session.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/networking/session.go b/pkg/networking/session.go index 2599c9dd4..04c1e6062 100644 --- a/pkg/networking/session.go +++ b/pkg/networking/session.go @@ -290,10 +290,13 @@ func (s *Session) readHandshake() error { hs := s.config.protocol.EmptyHandshake() _, err := hs.ReadFrom(s.bufRead) if err != nil { - if errors.Is(err, io.EOF) || strings.Contains(err.Error(), "closed") || - strings.Contains(err.Error(), "reset by peer") { + if errors.Is(err, io.EOF) { return ErrConnectionClosedOnRead } + if errMsg := err.Error(); strings.Contains(errMsg, "closed") || + strings.Contains(errMsg, "reset by peer") { + return errors.Join(ErrConnectionClosedOnRead, err) // Wrap the error with ErrConnectionClosedOnRead. + } s.logger.Error("Failed to read handshake from connection", "error", err) return err } From 52a893ea073ee78f994690be062ee3207b7637de Mon Sep 17 00:00:00 2001 From: Alexey Kiselev Date: Mon, 16 Dec 2024 17:52:45 +0400 Subject: [PATCH 3/4] Fine error assertions. --- pkg/networking/session_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/networking/session_test.go b/pkg/networking/session_test.go index cbeb9ad8e..c89862531 100644 --- a/pkg/networking/session_test.go +++ b/pkg/networking/session_test.go @@ -125,7 +125,7 @@ func TestSessionTimeoutOnHandshake(t *testing.T) { // Send handshake to server, but writing will block because the clientConn is locked. n, err := clientSession.Write([]byte("hello")) - require.Error(t, err) + require.ErrorIs(t, err, networking.ErrConnectionWriteTimeout) assert.Equal(t, 0, n) runtime.Gosched() @@ -136,7 +136,7 @@ func TestSessionTimeoutOnHandshake(t *testing.T) { // Unlock "timeout" and close client. pc.writeBlocker.Unlock() err = clientSession.Close() - assert.Error(t, err) + assert.ErrorIs(t, err, io.ErrClosedPipe) } func TestSessionTimeoutOnMessage(t *testing.T) { @@ -199,7 +199,7 @@ func TestSessionTimeoutOnMessage(t *testing.T) { clientWG.Wait() // Wait for pipe to be locked. // On receiving handshake from server, send the message back to server. _, msgErr := clientSession.Write(encodeMessage("Hello session")) - require.Error(t, msgErr) + require.ErrorIs(t, msgErr, networking.ErrConnectionWriteTimeout) }) time.Sleep(1 * time.Second) // Let timeout occur. @@ -210,7 +210,7 @@ func TestSessionTimeoutOnMessage(t *testing.T) { pc.writeBlocker.Unlock() // Unlock the pipe. err = clientSession.Close() - assert.Error(t, err) // Expect error because connection to the server already closed. + assert.ErrorIs(t, err, io.ErrClosedPipe) // Expect error because connection to the server already closed. } func TestDoubleClose(t *testing.T) { @@ -305,13 +305,13 @@ func TestOnClosedByOtherSide(t *testing.T) { // Try to send message to server, but it will fail because server is already closed. time.Sleep(10 * time.Millisecond) // Wait for server to close. _, msgErr := clientSession.Write(encodeMessage("Hello session")) - require.Error(t, msgErr) + require.ErrorIs(t, msgErr, io.ErrClosedPipe) wg.Done() }) wg.Wait() // Wait for client to finish. err = clientSession.Close() - assert.Error(t, err) // Close reports the same error, because it was registered in the send loop. + assert.ErrorIs(t, err, io.ErrClosedPipe) // Close reports the same error, because it was registered in the send loop. } func TestCloseParentContext(t *testing.T) { @@ -370,7 +370,7 @@ func TestCloseParentContext(t *testing.T) { // Try to send message to server, but it will fail because server is already closed. time.Sleep(10 * time.Millisecond) // Wait for server to close. _, msgErr := clientSession.Write(encodeMessage("Hello session")) - require.Error(t, msgErr) + require.ErrorIs(t, msgErr, networking.ErrSessionShutdown) wg.Done() }) From 77633f3a39854a80a51d38b7effee9d97009e88b Mon Sep 17 00:00:00 2001 From: Alexey Kiselev Date: Mon, 16 Dec 2024 18:54:59 +0400 Subject: [PATCH 4/4] Fix blinking test. --- pkg/networking/session_test.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/pkg/networking/session_test.go b/pkg/networking/session_test.go index c89862531..49759c03a 100644 --- a/pkg/networking/session_test.go +++ b/pkg/networking/session_test.go @@ -6,7 +6,6 @@ import ( "errors" "io" "log/slog" - "runtime" "sync" "testing" "time" @@ -98,6 +97,7 @@ func TestSessionTimeoutOnHandshake(t *testing.T) { defer goleak.VerifyNone(t) mockProtocol := netmocks.NewMockProtocol(t) + mockProtocol.On("EmptyHandshake").Return(&textHandshake{}, nil) clientHandler := netmocks.NewMockHandler(t) serverHandler := netmocks.NewMockHandler(t) @@ -110,33 +110,36 @@ func TestSessionTimeoutOnHandshake(t *testing.T) { clientSession, err := net.NewSession(ctx, clientConn, testConfig(t, mockProtocol, clientHandler, "client")) require.NoError(t, err) + clientHandler.On("OnClose", clientSession).Return() + serverSession, err := net.NewSession(ctx, serverConn, testConfig(t, mockProtocol, serverHandler, "server")) require.NoError(t, err) - - mockProtocol.On("EmptyHandshake").Return(&textHandshake{}, nil) serverHandler.On("OnClose", serverSession).Return() - clientHandler.On("OnClose", clientSession).Return() // Lock pc, ok := clientConn.(*pipeConn) require.True(t, ok) pc.writeBlocker.Lock() - runtime.Gosched() // Send handshake to server, but writing will block because the clientConn is locked. n, err := clientSession.Write([]byte("hello")) require.ErrorIs(t, err, networking.ErrConnectionWriteTimeout) assert.Equal(t, 0, n) - runtime.Gosched() - err = serverSession.Close() assert.NoError(t, err) + wg := new(sync.WaitGroup) + wg.Add(1) + go func() { + err = clientSession.Close() + assert.ErrorIs(t, err, io.ErrClosedPipe) + wg.Done() + }() + // Unlock "timeout" and close client. pc.writeBlocker.Unlock() - err = clientSession.Close() - assert.ErrorIs(t, err, io.ErrClosedPipe) + wg.Wait() } func TestSessionTimeoutOnMessage(t *testing.T) {