diff --git a/slatedb/compactor.go b/slatedb/compactor.go index 6a73ade..9c5506c 100644 --- a/slatedb/compactor.go +++ b/slatedb/compactor.go @@ -32,15 +32,15 @@ type CompactionResult struct { Error error } -// Compactor - The CompactorOrchestrator checks with the CompactionScheduler if Level0 needs to be compacted. -// If compaction is needed, the CompactorOrchestrator gives CompactionJobs to the CompactionExecutor. +// Compactor - The CompactionOrchestrator checks with the CompactionScheduler if Level0 needs to be compacted. +// If compaction is needed, the CompactionOrchestrator gives CompactionJobs to the CompactionExecutor. // The CompactionExecutor creates new goroutine for each CompactionJob and the results are written to a channel. type Compactor struct { - orchestrator *CompactorOrchestrator + orchestrator *CompactionOrchestrator } func newCompactor(manifestStore *ManifestStore, tableStore *store.TableStore, opts DBOptions) (*Compactor, error) { - orchestrator, err := spawnAndRunCompactorOrchestrator(manifestStore, tableStore, opts) + orchestrator, err := spawnAndRunCompactionOrchestrator(manifestStore, tableStore, opts) if err != nil { return nil, err } @@ -55,15 +55,15 @@ func (c *Compactor) close() { } // ------------------------------------------------ -// CompactorOrchestrator +// CompactionOrchestrator // ------------------------------------------------ -func spawnAndRunCompactorOrchestrator( +func spawnAndRunCompactionOrchestrator( manifestStore *ManifestStore, tableStore *store.TableStore, opts DBOptions, -) (*CompactorOrchestrator, error) { - orchestrator, err := newCompactorOrchestrator(opts, manifestStore, tableStore) +) (*CompactionOrchestrator, error) { + orchestrator, err := newCompactionOrchestrator(opts, manifestStore, tableStore) if err != nil { return nil, err } @@ -72,25 +72,25 @@ func spawnAndRunCompactorOrchestrator( return orchestrator, nil } -type CompactorOrchestrator struct { +type CompactionOrchestrator struct { options *CompactorOptions manifest *FenceableManifest state *CompactorState scheduler CompactionScheduler executor *CompactionExecutor - // compactorMsgCh - When CompactorOrchestrator receives a CompactorShutdown message on this channel, + // compactorMsgCh - When CompactionOrchestrator receives a CompactorShutdown message on this channel, // it calls executor.stop compactorMsgCh chan CompactorMainMsg waitGroup sync.WaitGroup log *slog.Logger } -func newCompactorOrchestrator( +func newCompactionOrchestrator( opts DBOptions, manifestStore *ManifestStore, tableStore *store.TableStore, -) (*CompactorOrchestrator, error) { +) (*CompactionOrchestrator, error) { sm, err := loadStoredManifest(manifestStore) if err != nil { return nil, err @@ -113,7 +113,7 @@ func newCompactorOrchestrator( scheduler := loadCompactionScheduler() executor := newCompactorExecutor(opts.CompactorOptions, tableStore) - o := CompactorOrchestrator{ + o := CompactionOrchestrator{ options: opts.CompactorOptions, manifest: manifest, state: state, @@ -137,7 +137,7 @@ func loadCompactionScheduler() CompactionScheduler { return SizeTieredCompactionScheduler{} } -func (o *CompactorOrchestrator) spawnLoop(opts DBOptions) { +func (o *CompactionOrchestrator) spawnLoop(opts DBOptions) { o.waitGroup.Add(1) go func() { defer o.waitGroup.Done() @@ -166,12 +166,12 @@ func (o *CompactorOrchestrator) spawnLoop(opts DBOptions) { }() } -func (o *CompactorOrchestrator) shutdown() { +func (o *CompactionOrchestrator) shutdown() { o.compactorMsgCh <- CompactorShutdown o.waitGroup.Wait() } -func (o *CompactorOrchestrator) loadManifest() error { +func (o *CompactionOrchestrator) loadManifest() error { _, err := o.manifest.refresh() if err != nil { return err @@ -183,7 +183,7 @@ func (o *CompactorOrchestrator) loadManifest() error { return nil } -func (o *CompactorOrchestrator) refreshDBState() error { +func (o *CompactionOrchestrator) refreshDBState() error { state, err := o.manifest.dbState() if err != nil { return err @@ -197,7 +197,7 @@ func (o *CompactorOrchestrator) refreshDBState() error { return nil } -func (o *CompactorOrchestrator) maybeScheduleCompactions() error { +func (o *CompactionOrchestrator) maybeScheduleCompactions() error { compactions := o.scheduler.maybeScheduleCompaction(o.state) for _, compaction := range compactions { err := o.submitCompaction(compaction) @@ -208,7 +208,7 @@ func (o *CompactorOrchestrator) maybeScheduleCompactions() error { return nil } -func (o *CompactorOrchestrator) startCompaction(compaction Compaction) { +func (o *CompactionOrchestrator) startCompaction(compaction Compaction) { o.logCompactionState() dbState := o.state.dbState @@ -254,7 +254,7 @@ func (o *CompactorOrchestrator) startCompaction(compaction Compaction) { }) } -func (o *CompactorOrchestrator) processCompactionResult(log *slog.Logger) bool { +func (o *CompactionOrchestrator) processCompactionResult(log *slog.Logger) bool { result, resultPresent := o.executor.nextCompactionResult() if resultPresent { if result.Error != nil { @@ -267,7 +267,7 @@ func (o *CompactorOrchestrator) processCompactionResult(log *slog.Logger) bool { return resultPresent } -func (o *CompactorOrchestrator) finishCompaction(outputSR *compaction2.SortedRun) error { +func (o *CompactionOrchestrator) finishCompaction(outputSR *compaction2.SortedRun) error { o.state.finishCompaction(outputSR) o.logCompactionState() err := o.writeManifest() @@ -282,7 +282,7 @@ func (o *CompactorOrchestrator) finishCompaction(outputSR *compaction2.SortedRun return nil } -func (o *CompactorOrchestrator) writeManifest() error { +func (o *CompactionOrchestrator) writeManifest() error { for { err := o.loadManifest() if err != nil { @@ -299,7 +299,7 @@ func (o *CompactorOrchestrator) writeManifest() error { } } -func (o *CompactorOrchestrator) submitCompaction(compaction Compaction) error { +func (o *CompactionOrchestrator) submitCompaction(compaction Compaction) error { err := o.state.submitCompaction(compaction) if err != nil { o.log.Warn("invalid compaction", "error", err) @@ -309,7 +309,7 @@ func (o *CompactorOrchestrator) submitCompaction(compaction Compaction) error { return nil } -func (o *CompactorOrchestrator) logCompactionState() { +func (o *CompactionOrchestrator) logCompactionState() { // LogState(o.log, o.state.dbState) for _, compaction := range o.state.compactions { o.log.Info("in-flight compaction", "compaction", compaction) diff --git a/slatedb/compactor_test.go b/slatedb/compactor_test.go index 2c8998f..c798ea0 100644 --- a/slatedb/compactor_test.go +++ b/slatedb/compactor_test.go @@ -81,7 +81,7 @@ func TestShouldWriteManifestSafely(t *testing.T) { err = db.Close() assert.NoError(t, err) - orchestrator, err := newCompactorOrchestrator(compactorOptions(), manifestStore, tableStore) + orchestrator, err := newCompactionOrchestrator(compactorOptions(), manifestStore, tableStore) assert.NoError(t, err) l0IDsToCompact := make([]SourceID, 0)