Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait frame processing on sync end #412

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 27 additions & 14 deletions node/consensus/data/consensus_frames.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,38 @@ func (e *DataClockConsensusEngine) syncWithMesh() error {
if err != nil {
return errors.Wrap(err, "sync")
}
var doneChs []<-chan struct{}
for {
candidates := e.GetAheadPeers(max(latest.FrameNumber, e.latestFrameReceived))
if len(candidates) == 0 {
break
}
for _, candidate := range candidates {
if candidate.MaxFrame <= max(latest.FrameNumber, e.latestFrameReceived) {
continue
}
head, err := e.dataTimeReel.Head()
if err != nil {
return errors.Wrap(err, "sync")
}
if latest.FrameNumber < head.FrameNumber {
latest = head
}
latest, err = e.syncWithPeer(latest, candidate.MaxFrame, candidate.PeerID)
if candidate.MaxFrame <= max(latest.FrameNumber, e.latestFrameReceived) {
continue
}
latest, doneChs, err = e.syncWithPeer(latest, doneChs, candidate.MaxFrame, candidate.PeerID)
if err != nil {
e.logger.Debug("error syncing frame", zap.Error(err))
}
}
}

for _, doneCh := range doneChs {
select {
case <-e.ctx.Done():
return e.ctx.Err()
case <-doneCh:
}
}

e.logger.Info(
"returning leader frame",
zap.Uint64("frame_number", latest.FrameNumber),
Expand Down Expand Up @@ -312,13 +321,13 @@ func (e *DataClockConsensusEngine) GetAheadPeers(frameNumber uint64) []internal.
}

func (e *DataClockConsensusEngine) syncWithPeer(
currentLatest *protobufs.ClockFrame,
latest *protobufs.ClockFrame,
doneChs []<-chan struct{},
maxFrame uint64,
peerId []byte,
) (*protobufs.ClockFrame, error) {
) (*protobufs.ClockFrame, []<-chan struct{}, error) {
e.syncingStatus = SyncStatusSynchronizing
defer func() { e.syncingStatus = SyncStatusNotSyncing }()
latest := currentLatest
e.logger.Info(
"polling peer for new frames",
zap.String("peer_id", peer.ID(peerId).String()),
Expand Down Expand Up @@ -350,7 +359,7 @@ func (e *DataClockConsensusEngine) syncWithPeer(
zap.Error(err),
)
cooperative = false
return latest, errors.Wrap(err, "sync")
return latest, doneChs, errors.Wrap(err, "sync")
}
defer func() {
if err := cc.Close(); err != nil {
Expand Down Expand Up @@ -378,20 +387,20 @@ func (e *DataClockConsensusEngine) syncWithPeer(
zap.Error(err),
)
cooperative = false
return latest, errors.Wrap(err, "sync")
return latest, doneChs, errors.Wrap(err, "sync")
}

if response == nil {
e.logger.Debug("received no response from peer")
return latest, nil
return latest, doneChs, nil
}

if response.ClockFrame == nil ||
response.ClockFrame.FrameNumber != latest.FrameNumber+1 ||
response.ClockFrame.Timestamp < latest.Timestamp {
e.logger.Debug("received invalid response from peer")
cooperative = false
return latest, nil
return latest, doneChs, nil
}
e.logger.Info(
"received new leading frame",
Expand All @@ -406,12 +415,16 @@ func (e *DataClockConsensusEngine) syncWithPeer(
if err := e.frameProver.VerifyDataClockFrame(
response.ClockFrame,
); err != nil {
return nil, errors.Wrap(err, "sync")
return latest, doneChs, errors.Wrap(err, "sync")
}
doneCh, err := e.dataTimeReel.Insert(e.ctx, response.ClockFrame)
if err != nil {
return latest, doneChs, errors.Wrap(err, "sync")
}
e.dataTimeReel.Insert(e.ctx, response.ClockFrame, true)
doneChs = append(doneChs, doneCh)
latest = response.ClockFrame
if latest.FrameNumber >= maxFrame {
return latest, nil
return latest, doneChs, nil
}
}
}
4 changes: 3 additions & 1 deletion node/consensus/data/main_data_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,9 @@ func (e *DataClockConsensusEngine) processFrame(
return dataFrame
}

e.dataTimeReel.Insert(e.ctx, nextFrame, true)
if _, err := e.dataTimeReel.Insert(e.ctx, nextFrame); err != nil {
e.logger.Debug("could not insert frame", zap.Error(err))
}

return nextFrame
} else {
Expand Down
4 changes: 3 additions & 1 deletion node/consensus/data/message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,9 @@ func (e *DataClockConsensusEngine) handleClockFrame(
}

if frame.FrameNumber > head.FrameNumber {
e.dataTimeReel.Insert(e.ctx, frame, false)
if _, err := e.dataTimeReel.Insert(e.ctx, frame); err != nil {
e.logger.Debug("could not insert frame", zap.Error(err))
}
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion node/consensus/master/broadcast_messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (e *MasterClockConsensusEngine) publishProof(
zap.Uint64("frame_number", frame.FrameNumber),
)

e.masterTimeReel.Insert(context.TODO(), frame, false)
e.masterTimeReel.Insert(context.TODO(), frame)
}

e.state = consensus.EngineStateCollecting
Expand Down
2 changes: 1 addition & 1 deletion node/consensus/master/master_clock_consensus_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (e *MasterClockConsensusEngine) Start() <-chan error {
continue
}

e.masterTimeReel.Insert(context.TODO(), newFrame, false)
e.masterTimeReel.Insert(context.TODO(), newFrame)
}
}
}()
Expand Down
22 changes: 17 additions & 5 deletions node/consensus/time/data_time_reel.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type pendingFrame struct {
selector *big.Int
parentSelector *big.Int
frameNumber uint64
done chan struct{}
}

type DataTimeReel struct {
Expand Down Expand Up @@ -190,12 +191,18 @@ func (d *DataTimeReel) Head() (*protobufs.ClockFrame, error) {
return d.head, nil
}

var alreadyDone chan struct{} = func() chan struct{} {
done := make(chan struct{})
close(done)
return done
}()

// Insert enqueues a structurally valid frame into the time reel. If the frame
// is the next one in sequence, it advances the reel head forward and emits a
// new frame on the new frame channel.
func (d *DataTimeReel) Insert(ctx context.Context, frame *protobufs.ClockFrame, isSync bool) error {
func (d *DataTimeReel) Insert(ctx context.Context, frame *protobufs.ClockFrame) (<-chan struct{}, error) {
if err := d.ctx.Err(); err != nil {
return err
return nil, err
}

d.logger.Debug(
Expand All @@ -222,21 +229,24 @@ func (d *DataTimeReel) Insert(ctx context.Context, frame *protobufs.ClockFrame,
d.storePending(selector, parent, distance, frame)

if d.head.FrameNumber+1 == frame.FrameNumber {
done := make(chan struct{})
select {
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
case <-d.ctx.Done():
return d.ctx.Err()
return nil, d.ctx.Err()
case d.frames <- &pendingFrame{
selector: selector,
parentSelector: parent,
frameNumber: frame.FrameNumber,
done: done,
}:
return done, nil
}
}
}

return nil
return alreadyDone, nil
}

func (
Expand Down Expand Up @@ -393,6 +403,7 @@ func (d *DataTimeReel) runLoop() {

// Otherwise set it as the next and process all pending
if err = d.setHead(rawFrame, distance); err != nil {
close(frame.done)
continue
}
d.processPending(d.head, frame)
Expand Down Expand Up @@ -559,6 +570,7 @@ func (d *DataTimeReel) processPending(
frame *protobufs.ClockFrame,
lastReceived *pendingFrame,
) {
defer close(lastReceived.done)
// d.logger.Debug(
// "process pending",
// zap.Uint64("head_frame", frame.FrameNumber),
Expand Down
10 changes: 5 additions & 5 deletions node/consensus/time/data_time_reel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func TestDataTimeReel(t *testing.T) {
i+1,
10,
)
d.Insert(ctx, frame, false)
d.Insert(ctx, frame)
prevBI, _ := frame.GetSelector()
prev = prevBI.FillBytes(make([]byte, 32))
}
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestDataTimeReel(t *testing.T) {
}

for i := 99; i >= 0; i-- {
err := d.Insert(ctx, insertFrames[i], false)
_, err := d.Insert(ctx, insertFrames[i])
assert.NoError(t, err)
}

Expand All @@ -286,7 +286,7 @@ func TestDataTimeReel(t *testing.T) {
i+1,
10,
)
d.Insert(ctx, frame, false)
d.Insert(ctx, frame)

prevBI, _ := frame.GetSelector()
prev = prevBI.FillBytes(make([]byte, 32))
Expand Down Expand Up @@ -334,7 +334,7 @@ func TestDataTimeReel(t *testing.T) {
}

for i := 99; i >= 0; i-- {
err := d.Insert(ctx, insertFrames[i], false)
_, err := d.Insert(ctx, insertFrames[i])
assert.NoError(t, err)
}

Expand Down Expand Up @@ -397,7 +397,7 @@ func TestDataTimeReel(t *testing.T) {

// Someone is honest, but running backwards:
for i := 99; i >= 0; i-- {
err := d.Insert(ctx, insertFrames[i], false)
_, err := d.Insert(ctx, insertFrames[i])
gotime.Sleep(1 * gotime.Second)
assert.NoError(t, err)
}
Expand Down
5 changes: 2 additions & 3 deletions node/consensus/time/master_time_reel.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,12 @@ func (m *MasterTimeReel) Head() (*protobufs.ClockFrame, error) {
func (m *MasterTimeReel) Insert(
ctx context.Context,
frame *protobufs.ClockFrame,
isSync bool,
) error {
) (<-chan struct{}, error) {
go func() {
m.frames <- frame
}()

return nil
return alreadyDone, nil
}

// NewFrameCh implements TimeReel.
Expand Down
4 changes: 2 additions & 2 deletions node/consensus/time/master_time_reel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestMasterTimeReel(t *testing.T) {
)
assert.NoError(t, err)

err := m.Insert(ctx, frame, false)
_, err := m.Insert(ctx, frame)
assert.NoError(t, err)
}

Expand All @@ -81,7 +81,7 @@ func TestMasterTimeReel(t *testing.T) {
}

for i := 99; i >= 0; i-- {
err := m.Insert(ctx, insertFrames[i], false)
_, err := m.Insert(ctx, insertFrames[i])
assert.NoError(t, err)
}

Expand Down
2 changes: 1 addition & 1 deletion node/consensus/time/time_reel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
type TimeReel interface {
Start() error
Stop()
Insert(ctx context.Context, frame *protobufs.ClockFrame, isSync bool) error
Insert(ctx context.Context, frame *protobufs.ClockFrame) (<-chan struct{}, error)
Head() (*protobufs.ClockFrame, error)
NewFrameCh() <-chan *protobufs.ClockFrame
BadFrameCh() <-chan *protobufs.ClockFrame
Expand Down