From 5b3d91e3f77a46eab3dcd2658c42a71be1c06f4a Mon Sep 17 00:00:00 2001 From: jonastheis <4181434+jonastheis@users.noreply.github.com> Date: Mon, 15 Apr 2024 18:02:43 +0800 Subject: [PATCH] Fix race condition in warp sync where we wait for engine tasks to finish --- pkg/protocol/commitments.go | 35 +++++++++++++++-------------------- pkg/protocol/warp_sync.go | 11 +++++++++-- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/pkg/protocol/commitments.go b/pkg/protocol/commitments.go index 8aaaf9dbe..8828e7c23 100644 --- a/pkg/protocol/commitments.go +++ b/pkg/protocol/commitments.go @@ -141,30 +141,25 @@ func (c *Commitments) initRequester() (shutdown func()) { // publishRootCommitment publishes the root commitment of the main engine. func (c *Commitments) publishRootCommitment(mainChain *Chain, mainEngine *engine.Engine) func() { return mainEngine.RootCommitment.OnUpdate(func(_ *model.Commitment, rootCommitment *model.Commitment) { - // Use workerpool to avoid a deadlock when warpSync mode is being enabled at the same time. - // Two goroutines deadlock on Commitment.IsSynced - // https://github.com/iotaledger/iota-core/issues/898 - c.workerPool.Submit(func() { - publishedCommitment, published, err := c.publishCommitment(rootCommitment) - if err != nil { - c.LogError("failed to publish new root commitment", "id", rootCommitment.ID(), "error", err) + publishedCommitment, published, err := c.publishCommitment(rootCommitment) + if err != nil { + c.LogError("failed to publish new root commitment", "id", rootCommitment.ID(), "error", err) - return - } + return + } - publishedCommitment.IsRoot.Set(true) - if published { - publishedCommitment.Chain.Set(mainChain) - } + publishedCommitment.IsRoot.Set(true) + if published { + publishedCommitment.Chain.Set(mainChain) + } - // Update the forking point of a chain only if the root is empty or root belongs to the main chain or the published commitment is on the main chain. - // to avoid updating ForkingPoint of the new mainChain into the past. - if c.Root.Get() == nil || c.Root.Get().Chain.Get() == mainChain || publishedCommitment.Chain.Get() == mainChain { - mainChain.ForkingPoint.Set(publishedCommitment) - } + // Update the forking point of a chain only if the root is empty or root belongs to the main chain or the published commitment is on the main chain. + // to avoid updating ForkingPoint of the new mainChain into the past. + if c.Root.Get() == nil || c.Root.Get().Chain.Get() == mainChain || publishedCommitment.Chain.Get() == mainChain { + mainChain.ForkingPoint.Set(publishedCommitment) + } - c.Root.Set(publishedCommitment) - }) + c.Root.Set(publishedCommitment) }) } diff --git a/pkg/protocol/warp_sync.go b/pkg/protocol/warp_sync.go index ced1b1f37..9cbe3515f 100644 --- a/pkg/protocol/warp_sync.go +++ b/pkg/protocol/warp_sync.go @@ -48,8 +48,15 @@ func newWarpSync(protocol *Protocol) *WarpSync { protocol.Chains.WithInitializedEngines(func(chain *Chain, engine *engine.Engine) (shutdown func()) { return chain.WarpSyncMode.OnUpdate(func(_ bool, warpSyncModeEnabled bool) { if warpSyncModeEnabled { - engine.Workers.WaitChildren() - engine.Reset() + // We need to wait for all workers of the engine to finish and reset in a separate worker, + // since otherwise we're locking downstream (c.LatestSyncedSlot, c.chains.LatestSeenSlot, c.OutOfSyncThreshold of the chain). + // Which in turn can lead to a deadlock where the engine can't update the LatestSyncedSlot. + // By running it in the warpsync's single worker we also make sure that the engine is reset before + // actually warp syncing/processing new slots. + c.workerPool.Submit(func() { + engine.Workers.WaitChildren() + engine.Reset() + }) } }) })