Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wanliqun committed Dec 13, 2024
1 parent 7d37017 commit 0271e09
Showing 1 changed file with 22 additions and 19 deletions.
41 changes: 22 additions & 19 deletions sync/catchup/boost.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (c *coordinator) dispatchLoop(ctx context.Context, wg *sync.WaitGroup) {
// Collect epoch data
c.collectEpochData(r.epochData)
r.epochData = nil // free memory
} else if r.isInvalidFilterError() && r.task.From < r.task.To {
} else if r.isInvalidFilterError() {
resultHistory = append(resultHistory, r)
// Invalid filter: try splitting and reassigning
c.recallTaskQueue <- r.task
Expand Down Expand Up @@ -307,12 +307,14 @@ func (c *coordinator) assignTasks(taskSize uint64) {
recallTask := <-c.recallTaskQueue
midEpoch := (recallTask.From + recallTask.To) / 2
c.pendingTaskQueue <- newSyncTask(recallTask.From, midEpoch)
c.pendingTaskQueue <- newSyncTask(midEpoch+1, recallTask.To)
if midEpoch+1 <= recallTask.To {
c.pendingTaskQueue <- newSyncTask(midEpoch+1, recallTask.To)
}
continue
}

// The full epoch range has been assigned
if c.nextAssignEpoch <= c.fullEpochRange.To {
if c.nextAssignEpoch > c.fullEpochRange.To {
break
}

Expand Down Expand Up @@ -437,18 +439,16 @@ func (s *boostSyncer) memoryMonitorLoop(ctx context.Context, c *coordinator) {

// fetchAndPersistResults retrieves completed epoch data and persists them into the database.
func (s *boostSyncer) fetchAndPersistResults(ctx context.Context, start, end uint64, bmarker *benchmarker) error {
var epochData *store.EpochData
var state persistState

timer := time.NewTimer(forcePersistenceInterval)
defer timer.Stop()

var state persistState
for eno := start; eno <= end; {
startTime := time.Now()
select {
case <-ctx.Done():
return ctx.Err()
case epochData = <-s.resultChan:
case epochData := <-s.resultChan:
// collect epoch data
if epochData.Number != eno {
return errors.Errorf("unexpected epoch collected, expected %v got %v", eno, epochData.Number)
Expand All @@ -458,28 +458,31 @@ func (s *boostSyncer) fetchAndPersistResults(ctx context.Context, start, end uin
}
eno++
s.monitor.Update(eno)
}

epochDbRows, storeDbRows := state.update(epochData)
if logrus.IsLevelEnabled(logrus.DebugLevel) {
logrus.WithFields(logrus.Fields{
"epochNo": epochData.Number,
"epochDbRows": epochDbRows,
"storeDbRows": storeDbRows,
"state.insertDbRows": state.insertDbRows,
"state.totalDbRows": state.totalDbRows,
}).Debug("Catch-up syncer collected new epoch data")
epochDbRows, storeDbRows := state.update(epochData)
if logrus.IsLevelEnabled(logrus.DebugLevel) {
logrus.WithFields(logrus.Fields{
"epochNo": epochData.Number,
"epochDbRows": epochDbRows,
"storeDbRows": storeDbRows,
"state.insertDbRows": state.insertDbRows,
"state.totalDbRows": state.totalDbRows,
}).Debug("Catch-up syncer collected new epoch data")
}
case <-timer.C:
// Force persist if timer expires
}

// Check if we need to persist now (due to timer expiration)
var forcePersist bool
forcePersist := false
select {
case <-timer.C:
forcePersist = true
default:
}

// Batch insert into db if enough db rows collected, also use total db rows here to restrict memory usage.
// Batch insert into db if `forcePersist` is true or enough db rows collected, also use total db rows here to
// check if we need to persist to restrict memory usage.
if forcePersist || state.totalDbRows >= s.maxDbRows || state.insertDbRows >= s.minBatchDbRows {
if err := s.persist(ctx, &state, bmarker); err != nil {
return err
Expand Down

0 comments on commit 0271e09

Please sign in to comment.