diff --git a/sync/catchup/boost.go b/sync/catchup/boost.go index 264b0b4..c32fdcf 100644 --- a/sync/catchup/boost.go +++ b/sync/catchup/boost.go @@ -223,7 +223,7 @@ func (c *coordinator) dispatchLoop(ctx context.Context, wg *sync.WaitGroup) { // Collect epoch data c.collectEpochData(r.epochData) r.epochData = nil // free memory - } else if r.isInvalidFilterError() && r.task.From < r.task.To { + } else if r.isInvalidFilterError() { resultHistory = append(resultHistory, r) // Invalid filter: try splitting and reassigning c.recallTaskQueue <- r.task @@ -307,12 +307,14 @@ func (c *coordinator) assignTasks(taskSize uint64) { recallTask := <-c.recallTaskQueue midEpoch := (recallTask.From + recallTask.To) / 2 c.pendingTaskQueue <- newSyncTask(recallTask.From, midEpoch) - c.pendingTaskQueue <- newSyncTask(midEpoch+1, recallTask.To) + if midEpoch+1 <= recallTask.To { + c.pendingTaskQueue <- newSyncTask(midEpoch+1, recallTask.To) + } continue } // The full epoch range has been assigned - if c.nextAssignEpoch <= c.fullEpochRange.To { + if c.nextAssignEpoch > c.fullEpochRange.To { break } @@ -437,18 +439,16 @@ func (s *boostSyncer) memoryMonitorLoop(ctx context.Context, c *coordinator) { // fetchAndPersistResults retrieves completed epoch data and persists them into the database. func (s *boostSyncer) fetchAndPersistResults(ctx context.Context, start, end uint64, bmarker *benchmarker) error { - var epochData *store.EpochData - var state persistState - timer := time.NewTimer(forcePersistenceInterval) defer timer.Stop() + var state persistState for eno := start; eno <= end; { startTime := time.Now() select { case <-ctx.Done(): return ctx.Err() - case epochData = <-s.resultChan: + case epochData := <-s.resultChan: // collect epoch data if epochData.Number != eno { return errors.Errorf("unexpected epoch collected, expected %v got %v", eno, epochData.Number) @@ -458,28 +458,31 @@ func (s *boostSyncer) fetchAndPersistResults(ctx context.Context, start, end uin } eno++ s.monitor.Update(eno) - } - epochDbRows, storeDbRows := state.update(epochData) - if logrus.IsLevelEnabled(logrus.DebugLevel) { - logrus.WithFields(logrus.Fields{ - "epochNo": epochData.Number, - "epochDbRows": epochDbRows, - "storeDbRows": storeDbRows, - "state.insertDbRows": state.insertDbRows, - "state.totalDbRows": state.totalDbRows, - }).Debug("Catch-up syncer collected new epoch data") + epochDbRows, storeDbRows := state.update(epochData) + if logrus.IsLevelEnabled(logrus.DebugLevel) { + logrus.WithFields(logrus.Fields{ + "epochNo": epochData.Number, + "epochDbRows": epochDbRows, + "storeDbRows": storeDbRows, + "state.insertDbRows": state.insertDbRows, + "state.totalDbRows": state.totalDbRows, + }).Debug("Catch-up syncer collected new epoch data") + } + case <-timer.C: + // Force persist if timer expires } // Check if we need to persist now (due to timer expiration) - var forcePersist bool + forcePersist := false select { case <-timer.C: forcePersist = true default: } - // Batch insert into db if enough db rows collected, also use total db rows here to restrict memory usage. + // Batch insert into db if `forcePersist` is true or enough db rows collected, also use total db rows here to + // check if we need to persist to restrict memory usage. if forcePersist || state.totalDbRows >= s.maxDbRows || state.insertDbRows >= s.minBatchDbRows { if err := s.persist(ctx, &state, bmarker); err != nil { return err