From 5fc6535a928079107ae9d237c571f03b90384a04 Mon Sep 17 00:00:00 2001 From: muXxer Date: Fri, 21 Apr 2023 13:42:36 +0200 Subject: [PATCH 1/4] Use pointers for `PeerMessageData` and `PeerMessageIn` --- packages/dkg/node.go | 2 +- packages/nodeconn/nodeconn.go | 3 +-- packages/peering/group/group.go | 4 ++-- packages/peering/lpp/lppNetImpl.go | 4 +++- packages/peering/lpp/lppPeer.go | 4 +++- packages/peering/peer_message.go | 8 ++++---- packages/peering/peering_test.go | 2 +- packages/testutil/peeringNetworkProvider.go | 4 ++-- 8 files changed, 17 insertions(+), 14 deletions(-) diff --git a/packages/dkg/node.go b/packages/dkg/node.go index 4ccd4b3064..275263db3c 100644 --- a/packages/dkg/node.go +++ b/packages/dkg/node.go @@ -382,7 +382,7 @@ func (n *Node) exchangeInitiatorMsgs( var err error var initMsg initiatorMsg var isInitMsg bool - isInitMsg, initMsg, err = readInitiatorMsg(&recv.PeerMessageData, n.edSuite, n.blsSuite) + isInitMsg, initMsg, err = readInitiatorMsg(recv.PeerMessageData, n.edSuite, n.blsSuite) if !isInitMsg { return false, nil } diff --git a/packages/nodeconn/nodeconn.go b/packages/nodeconn/nodeconn.go index 7740a30065..7278c52641 100644 --- a/packages/nodeconn/nodeconn.go +++ b/packages/nodeconn/nodeconn.go @@ -339,8 +339,7 @@ func (nc *nodeConnection) GetL1ProtocolParams() *iotago.ProtocolParameters { } func (nc *nodeConnection) subscribeToLedgerUpdates() { - err := nc.nodeBridge.ListenToLedgerUpdates(nc.ctx, 0, 0, nc.handleLedgerUpdate) - if err != nil && !errors.Is(err, io.EOF) { + if err := nc.nodeBridge.ListenToLedgerUpdates(nc.ctx, 0, 0, nc.handleLedgerUpdate); err != nil && !errors.Is(err, io.EOF) { nc.LogError(err) nc.shutdownHandler.SelfShutdown( fmt.Sprintf("INX connection unexpected error: %s", err.Error()), diff --git a/packages/peering/group/group.go b/packages/peering/group/group.go index 36d4317055..016f57f0fa 100644 --- a/packages/peering/group/group.go +++ b/packages/peering/group/group.go @@ -148,7 +148,7 @@ func (g *groupImpl) ExchangeRound( continue } recvMsg := peering.PeerMessageGroupIn{ - PeerMessageIn: *recvMsgNoIndex, + PeerMessageIn: recvMsgNoIndex, SenderIndex: senderIndex, } if acks[recvMsg.SenderIndex] { // Only consider first successful message. @@ -247,7 +247,7 @@ func (g *groupImpl) Attach(receiver byte, callback func(recv *peering.PeerMessag return } gRecv := &peering.PeerMessageGroupIn{ - PeerMessageIn: *recv, + PeerMessageIn: recv, SenderIndex: idx, } callback(gRecv) diff --git a/packages/peering/lpp/lppNetImpl.go b/packages/peering/lpp/lppNetImpl.go index 456b999ee6..73e10b376d 100644 --- a/packages/peering/lpp/lppNetImpl.go +++ b/packages/peering/lpp/lppNetImpl.go @@ -405,7 +405,7 @@ func (n *netImpl) PubKey() *cryptolib.PublicKey { // SendMsg implements peering.PeerSender for the Self() node. func (n *netImpl) SendMsg(msg *peering.PeerMessageData) { // Don't go via the network, if sending a message to self. - n.triggerRecvEvents(n.Self().PubKey(), &peering.PeerMessageNet{PeerMessageData: *msg}) + n.triggerRecvEvents(n.Self().PubKey(), &peering.PeerMessageNet{PeerMessageData: msg}) } func (n *netImpl) triggerRecvEvents(from *cryptolib.PublicKey, msg *peering.PeerMessageNet) { @@ -426,6 +426,8 @@ func (n *netImpl) NumUsers() int { } // Await implements peering.PeerSender for the Self() node. +// +//nolint:revive func (n *netImpl) Await(timeout time.Duration) error { return nil // This node is alive immediately. } diff --git a/packages/peering/lpp/lppPeer.go b/packages/peering/lpp/lppPeer.go index 0ef905d19c..59ebdff41c 100644 --- a/packages/peering/lpp/lppPeer.go +++ b/packages/peering/lpp/lppPeer.go @@ -128,7 +128,7 @@ func (p *peer) PubKey() *cryptolib.PublicKey { func (p *peer) SendMsg(msg *peering.PeerMessageData) { // p.accessLock.RLock() - msgNet := &peering.PeerMessageNet{PeerMessageData: *msg} + msgNet := &peering.PeerMessageNet{PeerMessageData: msg} if !p.trusted { p.log.Infof("Dropping outgoing message, because it was meant to send to a distrusted peer.") p.accessLock.RUnlock() @@ -201,6 +201,8 @@ func (p *peer) IsAlive() bool { } // Await implements peering.PeerSender interface for the remote peers. +// +//nolint:revive func (p *peer) Await(timeout time.Duration) error { p.accessLock.RLock() defer p.accessLock.RUnlock() diff --git a/packages/peering/peer_message.go b/packages/peering/peer_message.go index 0b83ec203e..97bfdd74ff 100644 --- a/packages/peering/peer_message.go +++ b/packages/peering/peer_message.go @@ -28,17 +28,17 @@ type PeerMessageData struct { } type PeerMessageNet struct { - PeerMessageData + *PeerMessageData serialized *[]byte } type PeerMessageIn struct { - PeerMessageData + *PeerMessageData SenderPubKey *cryptolib.PublicKey } type PeerMessageGroupIn struct { - PeerMessageIn + *PeerMessageIn SenderIndex uint16 } @@ -70,7 +70,7 @@ func NewPeerMessageNetFromBytes(buf []byte) (*PeerMessageNet, error) { return nil, err } return &PeerMessageNet{ - PeerMessageData: *data, + PeerMessageData: data, serialized: &buf, }, nil } diff --git a/packages/peering/peering_test.go b/packages/peering/peering_test.go index 6edd7b6ef8..2b820e37ad 100644 --- a/packages/peering/peering_test.go +++ b/packages/peering/peering_test.go @@ -13,7 +13,7 @@ func TestPeerMessageCodec(t *testing.T) { var err error var src, dst *peering.PeerMessageNet src = &peering.PeerMessageNet{ - PeerMessageData: peering.PeerMessageData{ + PeerMessageData: &peering.PeerMessageData{ PeeringID: peering.RandomPeeringID(), MsgReceiver: byte(10), MsgType: peering.FirstUserMsgCode + 17, diff --git a/packages/testutil/peeringNetworkProvider.go b/packages/testutil/peeringNetworkProvider.go index 893ce0830b..42d3961054 100644 --- a/packages/testutil/peeringNetworkProvider.go +++ b/packages/testutil/peeringNetworkProvider.go @@ -95,7 +95,7 @@ type peeringNode struct { type peeringMsg struct { from *cryptolib.PublicKey - msg peering.PeerMessageData + msg *peering.PeerMessageData timestamp int64 } @@ -141,7 +141,7 @@ func (n *peeringNode) recvLoop() { func (n *peeringNode) sendMsg(from *cryptolib.PublicKey, msg *peering.PeerMessageData) { n.sendCh <- &peeringMsg{ from: from, - msg: *msg, + msg: msg, } } From ca8af25979014fcd9983c2fc06421e8a3abdeed4 Mon Sep 17 00:00:00 2001 From: muXxer Date: Fri, 21 Apr 2023 13:43:38 +0200 Subject: [PATCH 2/4] Only serialize `PeerMessageData` once and hash `PeerMessageNet` once --- packages/peering/peer_message.go | 155 +++++++++++++++++++------------ 1 file changed, 96 insertions(+), 59 deletions(-) diff --git a/packages/peering/peer_message.go b/packages/peering/peer_message.go index 97bfdd74ff..fb4c712e83 100644 --- a/packages/peering/peer_message.go +++ b/packages/peering/peer_message.go @@ -11,102 +11,139 @@ package peering import ( "bytes" + "sync" + "github.com/iotaledger/hive.go/lo" "github.com/iotaledger/wasp/packages/cryptolib" "github.com/iotaledger/wasp/packages/hashing" "github.com/iotaledger/wasp/packages/util" "github.com/iotaledger/wasp/packages/util/pipe" ) -// PeerMessage is an envelope for all the messages exchanged via -// the peering module. +// PeerMessage is an envelope for all the messages exchanged via the peering module. type PeerMessageData struct { PeeringID PeeringID MsgReceiver byte MsgType byte MsgData []byte -} - -type PeerMessageNet struct { - *PeerMessageData - serialized *[]byte -} - -type PeerMessageIn struct { - *PeerMessageData - SenderPubKey *cryptolib.PublicKey -} -type PeerMessageGroupIn struct { - *PeerMessageIn - SenderIndex uint16 + serializedErr error + serializedData []byte + serializedOnce sync.Once } -var _ pipe.Hashable = &PeerMessageNet{} - +// newPeerMessageDataFromBytes creates a new PeerMessageData from bytes. +// The function takes ownership over "data" and the caller should not use "data" after this call. +// //nolint:gocritic -func NewPeerMessageDataFromBytes(buf []byte) (*PeerMessageData, error) { +func newPeerMessageDataFromBytes(data []byte) (*PeerMessageData, error) { + // create a copy of the slice for later usage of the raw data. + cpy := lo.CopySlice(data) + var err error - r := bytes.NewBuffer(buf) - m := PeerMessageData{} - if m.MsgReceiver, err = util.ReadByte(r); err != nil { + buf := bytes.NewBuffer(data) + + m := new(PeerMessageData) + if m.MsgReceiver, err = util.ReadByte(buf); err != nil { return nil, err } - if m.MsgType, err = util.ReadByte(r); err != nil { + if m.MsgType, err = util.ReadByte(buf); err != nil { return nil, err } - if err = m.PeeringID.Read(r); err != nil { + if err = m.PeeringID.Read(buf); err != nil { return nil, err } - if m.MsgData, err = util.ReadBytes32(r); err != nil { + if m.MsgData, err = util.ReadBytes32(buf); err != nil { return nil, err } - return &m, nil + + m.serializedOnce.Do(func() { + m.serializedErr = nil + m.serializedData = cpy + }) + + return m, nil +} + +func (m *PeerMessageData) Bytes() ([]byte, error) { + m.serializedOnce.Do(func() { + buf := new(bytes.Buffer) + + if err := util.WriteByte(buf, m.MsgReceiver); err != nil { + m.serializedErr = err + return + } + if err := util.WriteByte(buf, m.MsgType); err != nil { + m.serializedErr = err + return + } + if err := m.PeeringID.Write(buf); err != nil { + m.serializedErr = err + return + } + if err := util.WriteBytes32(buf, m.MsgData); err != nil { + m.serializedErr = err + return + } + + m.serializedData = buf.Bytes() + }) + + if m.serializedErr != nil { + return nil, m.serializedErr + } + + return m.serializedData, nil +} + +type PeerMessageNet struct { + *PeerMessageData + + hash hashing.HashValue + hashOnce sync.Once } -func NewPeerMessageNetFromBytes(buf []byte) (*PeerMessageNet, error) { - data, err := NewPeerMessageDataFromBytes(buf) +var _ pipe.Hashable = &PeerMessageNet{} + +// NewPeerMessageNetFromBytes creates a new PeerMessageNet from bytes. +// The function takes ownership over "data" and the caller should not use "data" after this call. +func NewPeerMessageNetFromBytes(data []byte) (*PeerMessageNet, error) { + peerMessageData, err := newPeerMessageDataFromBytes(data) if err != nil { return nil, err } - return &PeerMessageNet{ - PeerMessageData: data, - serialized: &buf, - }, nil + + peerMessageNet := &PeerMessageNet{ + PeerMessageData: peerMessageData, + } + + return peerMessageNet, nil } func (m *PeerMessageNet) Bytes() ([]byte, error) { - if m.serialized == nil { - serialized, err := m.PeerMessageData.bytes() + return m.PeerMessageData.Bytes() +} + +func (m *PeerMessageNet) GetHash() hashing.HashValue { + m.hashOnce.Do(func() { + bytes, err := m.Bytes() if err != nil { - return nil, err + m.hash = hashing.HashValue{} + return } - m.serialized = &serialized - } - return *(m.serialized), nil + + m.hash = hashing.HashData(bytes) + }) + + return m.hash } -func (m *PeerMessageData) bytes() ([]byte, error) { - var buf bytes.Buffer - if err := util.WriteByte(&buf, m.MsgReceiver); err != nil { - return nil, err - } - if err := util.WriteByte(&buf, m.MsgType); err != nil { - return nil, err - } - if err := m.PeeringID.Write(&buf); err != nil { - return nil, err - } - if err := util.WriteBytes32(&buf, m.MsgData); err != nil { - return nil, err - } - return buf.Bytes(), nil +type PeerMessageIn struct { + *PeerMessageData + SenderPubKey *cryptolib.PublicKey } -func (m *PeerMessageNet) GetHash() hashing.HashValue { - mBytes, err := m.Bytes() - if err != nil { - return hashing.HashValue{} - } - return hashing.HashData(mBytes) +type PeerMessageGroupIn struct { + *PeerMessageIn + SenderIndex uint16 } From 34aff11cced48a226367a89bb41aec13341554d9 Mon Sep 17 00:00:00 2001 From: muXxer Date: Fri, 21 Apr 2023 14:08:47 +0200 Subject: [PATCH 3/4] Fix testutil --- packages/testutil/peeringNetBehaviour.go | 4 ++-- packages/testutil/peeringNetBehaviourDynamic.go | 12 ++++++------ packages/testutil/peeringNetworkProvider.go | 11 +++++++++++ 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/packages/testutil/peeringNetBehaviour.go b/packages/testutil/peeringNetBehaviour.go index 568f3fa3d4..abbabd688d 100644 --- a/packages/testutil/peeringNetBehaviour.go +++ b/packages/testutil/peeringNetBehaviour.go @@ -107,7 +107,7 @@ func (n *peeringNetUnreliable) recvLoop(inCh, outCh chan *peeringMsg, closeCh ch return } if rand.Intn(100) > n.deliverPct { - n.log.Debugf("Network dropped message %v -%v-> %v", recv.from.String(), recv.msg.MsgType, dstPubKey.String()) + n.log.Debugf("Network dropped message %v -%v-> %v", recv.from.String(), recv.PeerMessageData().MsgType, dstPubKey.String()) continue // Drop the message. } // @@ -138,7 +138,7 @@ func (n *peeringNetUnreliable) sendDelayed(recv *peeringMsg, outCh chan *peering } n.log.Debugf( "Network delivers message %v -%v-> %v (duplicate %v/%v, delay=%vms)", - recv.from.String(), recv.msg.MsgType, dstPubKey.String(), dupNum, dupCount, delay.Milliseconds(), + recv.from.String(), recv.PeerMessageData().MsgType, dstPubKey.String(), dupNum, dupCount, delay.Milliseconds(), ) safeSendPeeringMsg(outCh, recv, n.log) } diff --git a/packages/testutil/peeringNetBehaviourDynamic.go b/packages/testutil/peeringNetBehaviourDynamic.go index c154e2f5ac..5ae0d26965 100644 --- a/packages/testutil/peeringNetBehaviourDynamic.go +++ b/packages/testutil/peeringNetBehaviourDynamic.go @@ -142,7 +142,7 @@ func (pndT *PeeringNetDynamic) recvLoop(inCh, outCh chan *peeringMsg, closeCh ch if len(nextHandlers) > 0 { nextHandlers[0].handleSendMessage(recv, dstPubKey, nextHandlers[1:], callHandlersAndSendFun, pndT.log) } else { - pndT.log.Debugf("Network delivers message %v -%v-> %v", recv.from.String(), recv.msg.MsgType, dstPubKey.String()) + pndT.log.Debugf("Network delivers message %v -%v-> %v", recv.from.String(), recv.PeerMessageData().MsgType, dstPubKey.String()) safeSendPeeringMsg(outCh, recv, pndT.log) } } @@ -171,7 +171,7 @@ func (lcT *peeringNetDynamicHandlerLosingChannel) handleSendMessage( log *logger.Logger, ) { if rand.Intn(100) > lcT.probability { - log.Debugf("Network dropped message %v -%v-> %v", msg.from.String(), msg.msg.MsgType, dstPubKey.String()) + log.Debugf("Network dropped message %v -%v-> %v", msg.from.String(), msg.PeerMessageData().MsgType, dstPubKey.String()) return } callHandlersAndSendFun(nextHandlers) @@ -192,7 +192,7 @@ func (rcT *peeringNetDynamicHandlerRepeatingChannel) handleSendMessage( if rand.Intn(100) < rcT.probability%100 { numRepeat++ } - log.Debugf("Network repeated message %v -%v-> %v %v times", msg.from.String(), msg.msg.MsgType, dstPubKey.String(), numRepeat) + log.Debugf("Network repeated message %v -%v-> %v %v times", msg.from.String(), msg.PeerMessageData().MsgType, dstPubKey.String(), numRepeat) for i := 0; i < numRepeat; i++ { callHandlersAndSendFun(nextHandlers) } @@ -220,7 +220,7 @@ func (dcT *peeringNetDynamicHandlerDelayingChannel) handleSendMessage( } else { delay = time.Duration(fromMS) * time.Millisecond } - log.Debugf("Network delayed message %v -%v-> %v for %v", msg.from.String(), msg.msg.MsgType, dstPubKey.String(), delay) + log.Debugf("Network delayed message %v -%v-> %v for %v", msg.from.String(), msg.PeerMessageData().MsgType, dstPubKey.String(), delay) <-time.After(delay) } callHandlersAndSendFun(nextHandlers) @@ -239,11 +239,11 @@ func (pdT *peeringNetDynamicHandlerPeerDisconnected) handleSendMessage( log *logger.Logger, ) { if dstPubKey.Equals(pdT.peerPubKey) { - log.Debugf("Network dropped message %v -%v-> %v, because destination is disconnected", msg.from.String(), msg.msg.MsgType, dstPubKey.String()) + log.Debugf("Network dropped message %v -%v-> %v, because destination is disconnected", msg.from.String(), msg.PeerMessageData().MsgType, dstPubKey.String()) return } if msg.from.Equals(pdT.peerPubKey) { - log.Debugf("Network dropped message %v -%v-> %v, because source is disconnected", msg.from.String(), msg.msg.MsgType, dstPubKey.String()) + log.Debugf("Network dropped message %v -%v-> %v, because source is disconnected", msg.from.String(), msg.PeerMessageData().MsgType, dstPubKey.String()) return } callHandlersAndSendFun(nextHandlers) diff --git a/packages/testutil/peeringNetworkProvider.go b/packages/testutil/peeringNetworkProvider.go index 42d3961054..175ad16ed1 100644 --- a/packages/testutil/peeringNetworkProvider.go +++ b/packages/testutil/peeringNetworkProvider.go @@ -99,6 +99,13 @@ type peeringMsg struct { timestamp int64 } +func (m *peeringMsg) PeerMessageData() *peering.PeerMessageData { + if m.msg == nil { + return &peering.PeerMessageData{} + } + return m.msg +} + type peeringCb struct { callback func(recv *peering.PeerMessageIn) // Receive callback. destNP *peeringNetworkProvider // Destination node. @@ -126,6 +133,10 @@ func newPeeringNode(peeringURL string, identity *cryptolib.KeyPair, network *Pee func (n *peeringNode) recvLoop() { for pm := range n.recvCh { + if pm.msg == nil { + continue + } + msgPeeringID := pm.msg.PeeringID.String() for _, cb := range n.recvCbs { if cb.peeringID.String() == msgPeeringID && cb.receiver == pm.msg.MsgReceiver { From 1b2f26e06c32f2e5c26a70288d8d91e6ad92683a Mon Sep 17 00:00:00 2001 From: muXxer Date: Fri, 21 Apr 2023 15:22:06 +0200 Subject: [PATCH 4/4] Revert nolint directives for now --- packages/peering/lpp/lppNetImpl.go | 2 -- packages/peering/lpp/lppPeer.go | 2 -- 2 files changed, 4 deletions(-) diff --git a/packages/peering/lpp/lppNetImpl.go b/packages/peering/lpp/lppNetImpl.go index 73e10b376d..948d43dd18 100644 --- a/packages/peering/lpp/lppNetImpl.go +++ b/packages/peering/lpp/lppNetImpl.go @@ -426,8 +426,6 @@ func (n *netImpl) NumUsers() int { } // Await implements peering.PeerSender for the Self() node. -// -//nolint:revive func (n *netImpl) Await(timeout time.Duration) error { return nil // This node is alive immediately. } diff --git a/packages/peering/lpp/lppPeer.go b/packages/peering/lpp/lppPeer.go index 59ebdff41c..c7d9c39f21 100644 --- a/packages/peering/lpp/lppPeer.go +++ b/packages/peering/lpp/lppPeer.go @@ -201,8 +201,6 @@ func (p *peer) IsAlive() bool { } // Await implements peering.PeerSender interface for the remote peers. -// -//nolint:revive func (p *peer) Await(timeout time.Duration) error { p.accessLock.RLock() defer p.accessLock.RUnlock()