Skip to content

Commit

Permalink
Merge branch 'new-network-connection' into itest-light-node-state-hash
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeykiselev committed Dec 16, 2024
2 parents fdd44ef + 77633f3 commit 125adb7
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 24 deletions.
25 changes: 16 additions & 9 deletions pkg/networking/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,12 @@ func (s *Session) waitForSend(data []byte) error {
}

select {
case err := <-errCh:
s.logger.Debug("Data sent", "error", err)
case err, ok := <-errCh:
if !ok {
s.logger.Debug("Data sent successfully")
return nil // No error, data was sent successfully.
}
s.logger.Debug("Error sending data", "error", err)
return err
case <-s.ctx.Done():
dataCopy()
Expand Down Expand Up @@ -249,8 +253,8 @@ func (s *Session) sendLoop() error {
s.logger.Debug("Data written into connection")
}

// No error, successful send
s.asyncSendErr(packet.err, nil)
// No error, close the channel.
close(packet.err)
}
}
}
Expand Down Expand Up @@ -286,10 +290,13 @@ func (s *Session) readHandshake() error {
hs := s.config.protocol.EmptyHandshake()
_, err := hs.ReadFrom(s.bufRead)
if err != nil {
if errors.Is(err, io.EOF) || strings.Contains(err.Error(), "closed") ||
strings.Contains(err.Error(), "reset by peer") {
if errors.Is(err, io.EOF) {
return ErrConnectionClosedOnRead
}
if errMsg := err.Error(); strings.Contains(errMsg, "closed") ||
strings.Contains(errMsg, "reset by peer") {
return errors.Join(ErrConnectionClosedOnRead, err) // Wrap the error with ErrConnectionClosedOnRead.
}
s.logger.Error("Failed to read handshake from connection", "error", err)
return err
}
Expand Down Expand Up @@ -347,6 +354,7 @@ func (s *Session) readMessagePayload(hdr Header, conn io.Reader) error {
// Allocate the receiving buffer just-in-time to fit the full message.
s.receiveBuffer = bytes.NewBuffer(make([]byte, 0, hdr.HeaderLength()+hdr.PayloadLength()))
}
defer s.receiveBuffer.Reset()
_, err := hdr.WriteTo(s.receiveBuffer)
if err != nil {
s.logger.Error("Failed to write header to receiving buffer", "error", err)
Expand All @@ -367,7 +375,6 @@ func (s *Session) readMessagePayload(hdr Header, conn io.Reader) error {
base64.StdEncoding.EncodeToString(s.receiveBuffer.Bytes()))
}
s.config.handler.OnReceive(s, s.receiveBuffer.Bytes()) // Invoke OnReceive handler.
s.receiveBuffer.Reset() // Reset the buffer for the next message.
return nil
}

Expand Down Expand Up @@ -399,11 +406,11 @@ func (s *Session) keepaliveLoop() error {
type sendPacket struct {
mu sync.Mutex // Protects data from unsafe reads.
data []byte
err chan error
err chan<- error
}

// asyncSendErr is used to try an async send of an error.
func (s *Session) asyncSendErr(ch chan error, err error) {
func (s *Session) asyncSendErr(ch chan<- error, err error) {
if ch == nil {
return
}
Expand Down
33 changes: 18 additions & 15 deletions pkg/networking/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"io"
"log/slog"
"runtime"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -98,6 +97,7 @@ func TestSessionTimeoutOnHandshake(t *testing.T) {
defer goleak.VerifyNone(t)

mockProtocol := netmocks.NewMockProtocol(t)
mockProtocol.On("EmptyHandshake").Return(&textHandshake{}, nil)

clientHandler := netmocks.NewMockHandler(t)
serverHandler := netmocks.NewMockHandler(t)
Expand All @@ -110,33 +110,36 @@ func TestSessionTimeoutOnHandshake(t *testing.T) {

clientSession, err := net.NewSession(ctx, clientConn, testConfig(t, mockProtocol, clientHandler, "client"))
require.NoError(t, err)
clientHandler.On("OnClose", clientSession).Return()

serverSession, err := net.NewSession(ctx, serverConn, testConfig(t, mockProtocol, serverHandler, "server"))
require.NoError(t, err)

mockProtocol.On("EmptyHandshake").Return(&textHandshake{}, nil)
serverHandler.On("OnClose", serverSession).Return()
clientHandler.On("OnClose", clientSession).Return()

// Lock
pc, ok := clientConn.(*pipeConn)
require.True(t, ok)
pc.writeBlocker.Lock()
runtime.Gosched()

// Send handshake to server, but writing will block because the clientConn is locked.
n, err := clientSession.Write([]byte("hello"))
require.Error(t, err)
require.ErrorIs(t, err, networking.ErrConnectionWriteTimeout)
assert.Equal(t, 0, n)

runtime.Gosched()

err = serverSession.Close()
assert.NoError(t, err)

wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
err = clientSession.Close()
assert.ErrorIs(t, err, io.ErrClosedPipe)
wg.Done()
}()

// Unlock "timeout" and close client.
pc.writeBlocker.Unlock()
err = clientSession.Close()
assert.Error(t, err)
wg.Wait()
}

func TestSessionTimeoutOnMessage(t *testing.T) {
Expand Down Expand Up @@ -199,7 +202,7 @@ func TestSessionTimeoutOnMessage(t *testing.T) {
clientWG.Wait() // Wait for pipe to be locked.
// On receiving handshake from server, send the message back to server.
_, msgErr := clientSession.Write(encodeMessage("Hello session"))
require.Error(t, msgErr)
require.ErrorIs(t, msgErr, networking.ErrConnectionWriteTimeout)
})

time.Sleep(1 * time.Second) // Let timeout occur.
Expand All @@ -210,7 +213,7 @@ func TestSessionTimeoutOnMessage(t *testing.T) {
pc.writeBlocker.Unlock() // Unlock the pipe.

err = clientSession.Close()
assert.Error(t, err) // Expect error because connection to the server already closed.
assert.ErrorIs(t, err, io.ErrClosedPipe) // Expect error because connection to the server already closed.
}

func TestDoubleClose(t *testing.T) {
Expand Down Expand Up @@ -305,13 +308,13 @@ func TestOnClosedByOtherSide(t *testing.T) {
// Try to send message to server, but it will fail because server is already closed.
time.Sleep(10 * time.Millisecond) // Wait for server to close.
_, msgErr := clientSession.Write(encodeMessage("Hello session"))
require.Error(t, msgErr)
require.ErrorIs(t, msgErr, io.ErrClosedPipe)
wg.Done()
})

wg.Wait() // Wait for client to finish.
err = clientSession.Close()
assert.Error(t, err) // Close reports the same error, because it was registered in the send loop.
assert.ErrorIs(t, err, io.ErrClosedPipe) // Close reports the same error, because it was registered in the send loop.
}

func TestCloseParentContext(t *testing.T) {
Expand Down Expand Up @@ -370,7 +373,7 @@ func TestCloseParentContext(t *testing.T) {
// Try to send message to server, but it will fail because server is already closed.
time.Sleep(10 * time.Millisecond) // Wait for server to close.
_, msgErr := clientSession.Write(encodeMessage("Hello session"))
require.Error(t, msgErr)
require.ErrorIs(t, msgErr, networking.ErrSessionShutdown)
wg.Done()
})

Expand Down

0 comments on commit 125adb7

Please sign in to comment.