Skip to content

Commit

Permalink
Merge pull request #2379 from iotaledger/refactor/peer-message
Browse files Browse the repository at this point in the history
Refactor `peer_message.go`
  • Loading branch information
muXxer authored Apr 21, 2023
2 parents 1276895 + 1b2f26e commit 0258843
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 77 deletions.
2 changes: 1 addition & 1 deletion packages/dkg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func (n *Node) exchangeInitiatorMsgs(
var err error
var initMsg initiatorMsg
var isInitMsg bool
isInitMsg, initMsg, err = readInitiatorMsg(&recv.PeerMessageData, n.edSuite, n.blsSuite)
isInitMsg, initMsg, err = readInitiatorMsg(recv.PeerMessageData, n.edSuite, n.blsSuite)
if !isInitMsg {
return false, nil
}
Expand Down
3 changes: 1 addition & 2 deletions packages/nodeconn/nodeconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,7 @@ func (nc *nodeConnection) GetL1ProtocolParams() *iotago.ProtocolParameters {
}

func (nc *nodeConnection) subscribeToLedgerUpdates() {
err := nc.nodeBridge.ListenToLedgerUpdates(nc.ctx, 0, 0, nc.handleLedgerUpdate)
if err != nil && !errors.Is(err, io.EOF) {
if err := nc.nodeBridge.ListenToLedgerUpdates(nc.ctx, 0, 0, nc.handleLedgerUpdate); err != nil && !errors.Is(err, io.EOF) {
nc.LogError(err)
nc.shutdownHandler.SelfShutdown(
fmt.Sprintf("INX connection unexpected error: %s", err.Error()),
Expand Down
4 changes: 2 additions & 2 deletions packages/peering/group/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (g *groupImpl) ExchangeRound(
continue
}
recvMsg := peering.PeerMessageGroupIn{
PeerMessageIn: *recvMsgNoIndex,
PeerMessageIn: recvMsgNoIndex,
SenderIndex: senderIndex,
}
if acks[recvMsg.SenderIndex] { // Only consider first successful message.
Expand Down Expand Up @@ -247,7 +247,7 @@ func (g *groupImpl) Attach(receiver byte, callback func(recv *peering.PeerMessag
return
}
gRecv := &peering.PeerMessageGroupIn{
PeerMessageIn: *recv,
PeerMessageIn: recv,
SenderIndex: idx,
}
callback(gRecv)
Expand Down
2 changes: 1 addition & 1 deletion packages/peering/lpp/lppNetImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ func (n *netImpl) PubKey() *cryptolib.PublicKey {
// SendMsg implements peering.PeerSender for the Self() node.
func (n *netImpl) SendMsg(msg *peering.PeerMessageData) {
// Don't go via the network, if sending a message to self.
n.triggerRecvEvents(n.Self().PubKey(), &peering.PeerMessageNet{PeerMessageData: *msg})
n.triggerRecvEvents(n.Self().PubKey(), &peering.PeerMessageNet{PeerMessageData: msg})
}

func (n *netImpl) triggerRecvEvents(from *cryptolib.PublicKey, msg *peering.PeerMessageNet) {
Expand Down
2 changes: 1 addition & 1 deletion packages/peering/lpp/lppPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (p *peer) PubKey() *cryptolib.PublicKey {
func (p *peer) SendMsg(msg *peering.PeerMessageData) {
//
p.accessLock.RLock()
msgNet := &peering.PeerMessageNet{PeerMessageData: *msg}
msgNet := &peering.PeerMessageNet{PeerMessageData: msg}
if !p.trusted {
p.log.Infof("Dropping outgoing message, because it was meant to send to a distrusted peer.")
p.accessLock.RUnlock()
Expand Down
155 changes: 96 additions & 59 deletions packages/peering/peer_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,102 +11,139 @@ package peering

import (
"bytes"
"sync"

"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/wasp/packages/cryptolib"
"github.com/iotaledger/wasp/packages/hashing"
"github.com/iotaledger/wasp/packages/util"
"github.com/iotaledger/wasp/packages/util/pipe"
)

// PeerMessage is an envelope for all the messages exchanged via
// the peering module.
// PeerMessage is an envelope for all the messages exchanged via the peering module.
type PeerMessageData struct {
PeeringID PeeringID
MsgReceiver byte
MsgType byte
MsgData []byte
}

type PeerMessageNet struct {
PeerMessageData
serialized *[]byte
}

type PeerMessageIn struct {
PeerMessageData
SenderPubKey *cryptolib.PublicKey
serializedErr error
serializedData []byte
serializedOnce sync.Once
}

type PeerMessageGroupIn struct {
PeerMessageIn
SenderIndex uint16
}

var _ pipe.Hashable = &PeerMessageNet{}

// newPeerMessageDataFromBytes creates a new PeerMessageData from bytes.
// The function takes ownership over "data" and the caller should not use "data" after this call.
//
//nolint:gocritic
func NewPeerMessageDataFromBytes(buf []byte) (*PeerMessageData, error) {
func newPeerMessageDataFromBytes(data []byte) (*PeerMessageData, error) {
// create a copy of the slice for later usage of the raw data.
cpy := lo.CopySlice(data)

var err error
r := bytes.NewBuffer(buf)
m := PeerMessageData{}
if m.MsgReceiver, err = util.ReadByte(r); err != nil {
buf := bytes.NewBuffer(data)

m := new(PeerMessageData)
if m.MsgReceiver, err = util.ReadByte(buf); err != nil {
return nil, err
}
if m.MsgType, err = util.ReadByte(r); err != nil {
if m.MsgType, err = util.ReadByte(buf); err != nil {
return nil, err
}
if err = m.PeeringID.Read(r); err != nil {
if err = m.PeeringID.Read(buf); err != nil {
return nil, err
}
if m.MsgData, err = util.ReadBytes32(r); err != nil {
if m.MsgData, err = util.ReadBytes32(buf); err != nil {
return nil, err
}
return &m, nil

m.serializedOnce.Do(func() {
m.serializedErr = nil
m.serializedData = cpy
})

return m, nil
}

func (m *PeerMessageData) Bytes() ([]byte, error) {
m.serializedOnce.Do(func() {
buf := new(bytes.Buffer)

if err := util.WriteByte(buf, m.MsgReceiver); err != nil {
m.serializedErr = err
return
}
if err := util.WriteByte(buf, m.MsgType); err != nil {
m.serializedErr = err
return
}
if err := m.PeeringID.Write(buf); err != nil {
m.serializedErr = err
return
}
if err := util.WriteBytes32(buf, m.MsgData); err != nil {
m.serializedErr = err
return
}

m.serializedData = buf.Bytes()
})

if m.serializedErr != nil {
return nil, m.serializedErr
}

return m.serializedData, nil
}

type PeerMessageNet struct {
*PeerMessageData

hash hashing.HashValue
hashOnce sync.Once
}

func NewPeerMessageNetFromBytes(buf []byte) (*PeerMessageNet, error) {
data, err := NewPeerMessageDataFromBytes(buf)
var _ pipe.Hashable = &PeerMessageNet{}

// NewPeerMessageNetFromBytes creates a new PeerMessageNet from bytes.
// The function takes ownership over "data" and the caller should not use "data" after this call.
func NewPeerMessageNetFromBytes(data []byte) (*PeerMessageNet, error) {
peerMessageData, err := newPeerMessageDataFromBytes(data)
if err != nil {
return nil, err
}
return &PeerMessageNet{
PeerMessageData: *data,
serialized: &buf,
}, nil

peerMessageNet := &PeerMessageNet{
PeerMessageData: peerMessageData,
}

return peerMessageNet, nil
}

func (m *PeerMessageNet) Bytes() ([]byte, error) {
if m.serialized == nil {
serialized, err := m.PeerMessageData.bytes()
return m.PeerMessageData.Bytes()
}

func (m *PeerMessageNet) GetHash() hashing.HashValue {
m.hashOnce.Do(func() {
bytes, err := m.Bytes()
if err != nil {
return nil, err
m.hash = hashing.HashValue{}
return
}
m.serialized = &serialized
}
return *(m.serialized), nil

m.hash = hashing.HashData(bytes)
})

return m.hash
}

func (m *PeerMessageData) bytes() ([]byte, error) {
var buf bytes.Buffer
if err := util.WriteByte(&buf, m.MsgReceiver); err != nil {
return nil, err
}
if err := util.WriteByte(&buf, m.MsgType); err != nil {
return nil, err
}
if err := m.PeeringID.Write(&buf); err != nil {
return nil, err
}
if err := util.WriteBytes32(&buf, m.MsgData); err != nil {
return nil, err
}
return buf.Bytes(), nil
type PeerMessageIn struct {
*PeerMessageData
SenderPubKey *cryptolib.PublicKey
}

func (m *PeerMessageNet) GetHash() hashing.HashValue {
mBytes, err := m.Bytes()
if err != nil {
return hashing.HashValue{}
}
return hashing.HashData(mBytes)
type PeerMessageGroupIn struct {
*PeerMessageIn
SenderIndex uint16
}
2 changes: 1 addition & 1 deletion packages/peering/peering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestPeerMessageCodec(t *testing.T) {
var err error
var src, dst *peering.PeerMessageNet
src = &peering.PeerMessageNet{
PeerMessageData: peering.PeerMessageData{
PeerMessageData: &peering.PeerMessageData{
PeeringID: peering.RandomPeeringID(),
MsgReceiver: byte(10),
MsgType: peering.FirstUserMsgCode + 17,
Expand Down
4 changes: 2 additions & 2 deletions packages/testutil/peeringNetBehaviour.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (n *peeringNetUnreliable) recvLoop(inCh, outCh chan *peeringMsg, closeCh ch
return
}
if rand.Intn(100) > n.deliverPct {
n.log.Debugf("Network dropped message %v -%v-> %v", recv.from.String(), recv.msg.MsgType, dstPubKey.String())
n.log.Debugf("Network dropped message %v -%v-> %v", recv.from.String(), recv.PeerMessageData().MsgType, dstPubKey.String())
continue // Drop the message.
}
//
Expand Down Expand Up @@ -138,7 +138,7 @@ func (n *peeringNetUnreliable) sendDelayed(recv *peeringMsg, outCh chan *peering
}
n.log.Debugf(
"Network delivers message %v -%v-> %v (duplicate %v/%v, delay=%vms)",
recv.from.String(), recv.msg.MsgType, dstPubKey.String(), dupNum, dupCount, delay.Milliseconds(),
recv.from.String(), recv.PeerMessageData().MsgType, dstPubKey.String(), dupNum, dupCount, delay.Milliseconds(),
)
safeSendPeeringMsg(outCh, recv, n.log)
}
Expand Down
12 changes: 6 additions & 6 deletions packages/testutil/peeringNetBehaviourDynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (pndT *PeeringNetDynamic) recvLoop(inCh, outCh chan *peeringMsg, closeCh ch
if len(nextHandlers) > 0 {
nextHandlers[0].handleSendMessage(recv, dstPubKey, nextHandlers[1:], callHandlersAndSendFun, pndT.log)
} else {
pndT.log.Debugf("Network delivers message %v -%v-> %v", recv.from.String(), recv.msg.MsgType, dstPubKey.String())
pndT.log.Debugf("Network delivers message %v -%v-> %v", recv.from.String(), recv.PeerMessageData().MsgType, dstPubKey.String())
safeSendPeeringMsg(outCh, recv, pndT.log)
}
}
Expand Down Expand Up @@ -171,7 +171,7 @@ func (lcT *peeringNetDynamicHandlerLosingChannel) handleSendMessage(
log *logger.Logger,
) {
if rand.Intn(100) > lcT.probability {
log.Debugf("Network dropped message %v -%v-> %v", msg.from.String(), msg.msg.MsgType, dstPubKey.String())
log.Debugf("Network dropped message %v -%v-> %v", msg.from.String(), msg.PeerMessageData().MsgType, dstPubKey.String())
return
}
callHandlersAndSendFun(nextHandlers)
Expand All @@ -192,7 +192,7 @@ func (rcT *peeringNetDynamicHandlerRepeatingChannel) handleSendMessage(
if rand.Intn(100) < rcT.probability%100 {
numRepeat++
}
log.Debugf("Network repeated message %v -%v-> %v %v times", msg.from.String(), msg.msg.MsgType, dstPubKey.String(), numRepeat)
log.Debugf("Network repeated message %v -%v-> %v %v times", msg.from.String(), msg.PeerMessageData().MsgType, dstPubKey.String(), numRepeat)
for i := 0; i < numRepeat; i++ {
callHandlersAndSendFun(nextHandlers)
}
Expand Down Expand Up @@ -220,7 +220,7 @@ func (dcT *peeringNetDynamicHandlerDelayingChannel) handleSendMessage(
} else {
delay = time.Duration(fromMS) * time.Millisecond
}
log.Debugf("Network delayed message %v -%v-> %v for %v", msg.from.String(), msg.msg.MsgType, dstPubKey.String(), delay)
log.Debugf("Network delayed message %v -%v-> %v for %v", msg.from.String(), msg.PeerMessageData().MsgType, dstPubKey.String(), delay)
<-time.After(delay)
}
callHandlersAndSendFun(nextHandlers)
Expand All @@ -239,11 +239,11 @@ func (pdT *peeringNetDynamicHandlerPeerDisconnected) handleSendMessage(
log *logger.Logger,
) {
if dstPubKey.Equals(pdT.peerPubKey) {
log.Debugf("Network dropped message %v -%v-> %v, because destination is disconnected", msg.from.String(), msg.msg.MsgType, dstPubKey.String())
log.Debugf("Network dropped message %v -%v-> %v, because destination is disconnected", msg.from.String(), msg.PeerMessageData().MsgType, dstPubKey.String())
return
}
if msg.from.Equals(pdT.peerPubKey) {
log.Debugf("Network dropped message %v -%v-> %v, because source is disconnected", msg.from.String(), msg.msg.MsgType, dstPubKey.String())
log.Debugf("Network dropped message %v -%v-> %v, because source is disconnected", msg.from.String(), msg.PeerMessageData().MsgType, dstPubKey.String())
return
}
callHandlersAndSendFun(nextHandlers)
Expand Down
15 changes: 13 additions & 2 deletions packages/testutil/peeringNetworkProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,17 @@ type peeringNode struct {

type peeringMsg struct {
from *cryptolib.PublicKey
msg peering.PeerMessageData
msg *peering.PeerMessageData
timestamp int64
}

func (m *peeringMsg) PeerMessageData() *peering.PeerMessageData {
if m.msg == nil {
return &peering.PeerMessageData{}
}
return m.msg
}

type peeringCb struct {
callback func(recv *peering.PeerMessageIn) // Receive callback.
destNP *peeringNetworkProvider // Destination node.
Expand Down Expand Up @@ -126,6 +133,10 @@ func newPeeringNode(peeringURL string, identity *cryptolib.KeyPair, network *Pee

func (n *peeringNode) recvLoop() {
for pm := range n.recvCh {
if pm.msg == nil {
continue
}

msgPeeringID := pm.msg.PeeringID.String()
for _, cb := range n.recvCbs {
if cb.peeringID.String() == msgPeeringID && cb.receiver == pm.msg.MsgReceiver {
Expand All @@ -141,7 +152,7 @@ func (n *peeringNode) recvLoop() {
func (n *peeringNode) sendMsg(from *cryptolib.PublicKey, msg *peering.PeerMessageData) {
n.sendCh <- &peeringMsg{
from: from,
msg: *msg,
msg: msg,
}
}

Expand Down

0 comments on commit 0258843

Please sign in to comment.