Skip to content

Commit

Permalink
Merge branch 'new-network-connection' into itest-light-node-state-hash
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeykiselev committed Dec 11, 2024
2 parents 41147b0 + 5219227 commit 9b63c35
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 67 deletions.
2 changes: 1 addition & 1 deletion Dockerfile.gowaves-it
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ COPY pkg pkg
ARG WITH_RACE_SUFFIX=""
RUN make build-node-native${WITH_RACE_SUFFIX}

FROM alpine:3.20.3
FROM alpine:3.21.0
ENV TZ=Etc/UTC \
APP_USER=gowaves

Expand Down
71 changes: 30 additions & 41 deletions itests/clients/net_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log/slog"
"net"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -34,17 +35,15 @@ type NetClient struct {
c *networking.Config
s *networking.Session

closingLock sync.Mutex
closingFlag bool
closedLock sync.Mutex
closedFlag bool
closing atomic.Bool
closed sync.Once
}

func NewNetClient(
ctx context.Context, t testing.TB, impl Implementation, port string, peers []proto.PeerInfo,
) *NetClient {
n := networking.NewNetwork()
p := newProtocol(nil)
p := newProtocol(t, nil)
h := newHandler(t, peers)

f := slogt.Factory(func(w io.Writer) slog.Handler {
Expand All @@ -61,8 +60,7 @@ func NewNetClient(
WithLogger(log).
WithWriteTimeout(networkTimeout).
WithKeepAliveInterval(pingInterval).
WithSlogAttribute(slog.String("suite", t.Name())).
WithSlogAttribute(slog.String("impl", impl.String()))
WithSlogAttributes(slog.String("suite", t.Name()), slog.String("impl", impl.String()))

conn, err := net.Dial("tcp", config.DefaultIP+":"+port)
require.NoError(t, err, "failed to dial TCP to %s node", impl.String())
Expand Down Expand Up @@ -94,27 +92,18 @@ func (c *NetClient) SendHandshake() {
}

func (c *NetClient) SendMessage(m proto.Message) {
b, err := m.MarshalBinary()
require.NoError(c.t, err, "failed to marshal message to %s node at %q", c.impl.String(), c.s.RemoteAddr())
_, err = c.s.Write(b)
_, err := m.WriteTo(c.s)
require.NoError(c.t, err, "failed to send message to %s node at %q", c.impl.String(), c.s.RemoteAddr())
}

func (c *NetClient) Close() {
c.t.Logf("Trying to close connection to %s node at %q", c.impl.String(), c.s.RemoteAddr().String())

c.closingLock.Lock()
c.closingFlag = true
c.closingLock.Unlock()

c.closedLock.Lock()
defer c.closedLock.Unlock()
c.t.Logf("Closing connection to %s node at %q (%t)", c.impl.String(), c.s.RemoteAddr().String(), c.closedFlag)
if c.closedFlag {
return
}
_ = c.s.Close()
c.closedFlag = true
c.closed.Do(func() {
if c.closing.CompareAndSwap(false, true) {
c.t.Logf("Closing connection to %s node at %q", c.impl.String(), c.s.RemoteAddr().String())
}
err := c.s.Close()
require.NoError(c.t, err, "failed to close session to %s node at %q", c.impl.String(), c.s.RemoteAddr())
})
}

func (c *NetClient) reconnect() {
Expand All @@ -129,23 +118,18 @@ func (c *NetClient) reconnect() {
c.SendHandshake()
}

func (c *NetClient) closing() bool {
c.closingLock.Lock()
defer c.closingLock.Unlock()
return c.closingFlag
}

type protocol struct {
t testing.TB
dropLock sync.Mutex
drop map[proto.PeerMessageID]struct{}
}

func newProtocol(drop []proto.PeerMessageID) *protocol {
func newProtocol(t testing.TB, drop []proto.PeerMessageID) *protocol {
m := make(map[proto.PeerMessageID]struct{})
for _, id := range drop {
m[id] = struct{}{}
}
return &protocol{drop: m}
return &protocol{t: t, drop: m}
}

func (p *protocol) EmptyHandshake() networking.Handshake {
Expand All @@ -169,6 +153,17 @@ func (p *protocol) IsAcceptableHandshake(h networking.Handshake) bool {
// Reject nodes with incorrect network bytes, unsupported protocol versions,
// or a zero nonce (indicating a self-connection).
if hs.AppName != appName || hs.Version.Cmp(proto.ProtocolVersion()) < 0 || hs.NodeNonce == 0 {
p.t.Logf("Unacceptable handshake:")
if hs.AppName != appName {
p.t.Logf("\tinvalid application name %q, expected %q", hs.AppName, appName)
}
if hs.Version.Cmp(proto.ProtocolVersion()) < 0 {
p.t.Logf("\tinvalid application version %q should be equal or more than %q",
hs.Version, proto.ProtocolVersion())
}
if hs.NodeNonce == 0 {
p.t.Logf("\tinvalid node nonce %d", hs.NodeNonce)
}
return false
}
return true
Expand Down Expand Up @@ -205,14 +200,8 @@ func (h *handler) OnReceive(s *networking.Session, data []byte) {
switch msg.(type) { // Only reply with peers on GetPeersMessage.
case *proto.GetPeersMessage:
rpl := &proto.PeersMessage{Peers: h.peers}
bts, mErr := rpl.MarshalBinary()
if mErr != nil { // Fail test on marshal error.
h.t.Logf("Failed to marshal peers message: %v", mErr)
h.t.FailNow()
return
}
if _, wErr := s.Write(bts); wErr != nil {
h.t.Logf("Failed to send peers message: %v", wErr)
if _, sErr := rpl.WriteTo(s); sErr != nil {
h.t.Logf("Failed to send peers message: %v", sErr)
h.t.FailNow()
return
}
Expand All @@ -226,7 +215,7 @@ func (h *handler) OnHandshake(_ *networking.Session, _ networking.Handshake) {

func (h *handler) OnClose(s *networking.Session) {
h.t.Logf("Connection to %q was closed", s.RemoteAddr())
if !h.client.closing() && h.client != nil {
if !h.client.closing.Load() && h.client != nil {
h.client.reconnect()
}
}
13 changes: 5 additions & 8 deletions pkg/execution/taskgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ import (
type TaskGroup struct {
wg sync.WaitGroup // Counter for active goroutines.

// active is nonzero when the group is "active", meaning there has been at least one call to Run since the group
// active is true when the group is "active", meaning there has been at least one call to Run since the group
// was created or the last Wait.
//
// Together active and errLock work as a kind of resettable sync.Once. The fast path reads active and only
// acquires errLock if it discovers setup is needed.
active atomic.Uint32
active atomic.Bool

errLock sync.Mutex // Guards the fields below.
err error // First captured error returned from Wait.
Expand Down Expand Up @@ -56,7 +56,7 @@ func (g *TaskGroup) OnError(handler func(error) error) *TaskGroup {
// so the [execute] function should include the interruption logic.
func (g *TaskGroup) Run(execute func() error) {
g.wg.Add(1)
if g.active.Load() == 0 {
if !g.active.Load() {
g.activate()
}
go func() {
Expand All @@ -82,9 +82,7 @@ func (g *TaskGroup) Wait() error {
defer g.errLock.Unlock()

// If the group is still active, deactivate it now.
if g.active.Load() != 0 {
g.active.Store(0)
}
g.active.CompareAndSwap(true, false)
return g.err
}

Expand All @@ -93,9 +91,8 @@ func (g *TaskGroup) Wait() error {
func (g *TaskGroup) activate() {
g.errLock.Lock()
defer g.errLock.Unlock()
if g.active.Load() == 0 {
if g.active.CompareAndSwap(false, true) {
g.err = nil
g.active.Store(1)
}
}

Expand Down
36 changes: 19 additions & 17 deletions pkg/execution/taskgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"math/rand/v2"
"runtime"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -83,8 +82,6 @@ func TestCancelPropagation(t *testing.T) {
}
})
}
runtime.Gosched()
<-time.After(500 * time.Microsecond)
cancel()

err := g.Wait()
Expand All @@ -98,13 +95,10 @@ func TestCancelPropagation(t *testing.T) {
case errors.Is(e, errOther):
numOther++
default:
require.FailNow(t, "unexpected error: %v", e)
require.FailNowf(t, "No error is expected", "unexpected error: %v", e)
}
}

assert.NotZero(t, numOK)
assert.NotZero(t, numCanceled)
assert.NotZero(t, numOther)
total := int(numOK) + numCanceled + numOther
assert.Equal(t, numTasks, total)
}
Expand All @@ -119,7 +113,7 @@ func TestWaitingForFinish(t *testing.T) {
select {
case <-ctx.Done():
return work(50, nil)()
case <-time.After(randomDuration(60)):
case <-time.After(60 * time.Millisecond):
return failure
}
}
Expand All @@ -136,6 +130,8 @@ func TestWaitingForFinish(t *testing.T) {
}

func TestRegression(t *testing.T) {
defer goleak.VerifyNone(t)

t.Run("WaitRace", func(_ *testing.T) {
ready := make(chan struct{})
var g execution.TaskGroup
Expand All @@ -146,20 +142,26 @@ func TestRegression(t *testing.T) {

var wg sync.WaitGroup
wg.Add(2)
go func() { defer wg.Done(); _ = g.Wait() }()
go func() { defer wg.Done(); _ = g.Wait() }()
go func() {
defer wg.Done()
err := g.Wait()
require.NoError(t, err)
}()
go func() {
defer wg.Done()
err := g.Wait()
require.NoError(t, err)
}()

close(ready)
wg.Wait()
})
t.Run("WaitUnstarted", func(t *testing.T) {
defer func() {
if x := recover(); x != nil {
t.Errorf("Unexpected panic: %v", x)
}
}()
var g execution.TaskGroup
_ = g.Wait()
require.NotPanics(t, func() {
var g execution.TaskGroup
err := g.Wait()
require.NoError(t, err)
})
})
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/networking/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ func (c *Config) WithSlogAttribute(attr slog.Attr) *Config {
return c
}

// WithSlogAttributes adds given attributes to the slice of attributes.
func (c *Config) WithSlogAttributes(attrs ...slog.Attr) *Config {
for _, attr := range attrs {
c.attributes = append(c.attributes, attr)
}
return c
}

func (c *Config) WithKeepAliveDisabled() *Config {
c.keepAlive = false
return c
Expand Down

0 comments on commit 9b63c35

Please sign in to comment.