Skip to content

Commit

Permalink
Rename CompactorOrchestrator to CompactionOrchestrator
Browse files Browse the repository at this point in the history
  • Loading branch information
naveen246 committed Dec 30, 2024
1 parent 3bf1604 commit 6ecf828
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 25 deletions.
48 changes: 24 additions & 24 deletions slatedb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ type CompactionResult struct {
Error error
}

// Compactor - The CompactorOrchestrator checks with the CompactionScheduler if Level0 needs to be compacted.
// If compaction is needed, the CompactorOrchestrator gives CompactionJobs to the CompactionExecutor.
// Compactor - The CompactionOrchestrator checks with the CompactionScheduler if Level0 needs to be compacted.
// If compaction is needed, the CompactionOrchestrator gives CompactionJobs to the CompactionExecutor.
// The CompactionExecutor creates new goroutine for each CompactionJob and the results are written to a channel.
type Compactor struct {
orchestrator *CompactorOrchestrator
orchestrator *CompactionOrchestrator
}

func newCompactor(manifestStore *ManifestStore, tableStore *store.TableStore, opts DBOptions) (*Compactor, error) {
orchestrator, err := spawnAndRunCompactorOrchestrator(manifestStore, tableStore, opts)
orchestrator, err := spawnAndRunCompactionOrchestrator(manifestStore, tableStore, opts)
if err != nil {
return nil, err
}
Expand All @@ -55,15 +55,15 @@ func (c *Compactor) close() {
}

// ------------------------------------------------
// CompactorOrchestrator
// CompactionOrchestrator
// ------------------------------------------------

func spawnAndRunCompactorOrchestrator(
func spawnAndRunCompactionOrchestrator(
manifestStore *ManifestStore,
tableStore *store.TableStore,
opts DBOptions,
) (*CompactorOrchestrator, error) {
orchestrator, err := newCompactorOrchestrator(opts, manifestStore, tableStore)
) (*CompactionOrchestrator, error) {
orchestrator, err := newCompactionOrchestrator(opts, manifestStore, tableStore)
if err != nil {
return nil, err
}
Expand All @@ -72,25 +72,25 @@ func spawnAndRunCompactorOrchestrator(
return orchestrator, nil
}

type CompactorOrchestrator struct {
type CompactionOrchestrator struct {
options *CompactorOptions
manifest *FenceableManifest
state *CompactorState
scheduler CompactionScheduler
executor *CompactionExecutor

// compactorMsgCh - When CompactorOrchestrator receives a CompactorShutdown message on this channel,
// compactorMsgCh - When CompactionOrchestrator receives a CompactorShutdown message on this channel,
// it calls executor.stop
compactorMsgCh chan CompactorMainMsg
waitGroup sync.WaitGroup
log *slog.Logger
}

func newCompactorOrchestrator(
func newCompactionOrchestrator(
opts DBOptions,
manifestStore *ManifestStore,
tableStore *store.TableStore,
) (*CompactorOrchestrator, error) {
) (*CompactionOrchestrator, error) {
sm, err := loadStoredManifest(manifestStore)
if err != nil {
return nil, err
Expand All @@ -113,7 +113,7 @@ func newCompactorOrchestrator(
scheduler := loadCompactionScheduler()
executor := newCompactorExecutor(opts.CompactorOptions, tableStore)

o := CompactorOrchestrator{
o := CompactionOrchestrator{
options: opts.CompactorOptions,
manifest: manifest,
state: state,
Expand All @@ -137,7 +137,7 @@ func loadCompactionScheduler() CompactionScheduler {
return SizeTieredCompactionScheduler{}
}

func (o *CompactorOrchestrator) spawnLoop(opts DBOptions) {
func (o *CompactionOrchestrator) spawnLoop(opts DBOptions) {
o.waitGroup.Add(1)
go func() {
defer o.waitGroup.Done()
Expand Down Expand Up @@ -166,12 +166,12 @@ func (o *CompactorOrchestrator) spawnLoop(opts DBOptions) {
}()
}

func (o *CompactorOrchestrator) shutdown() {
func (o *CompactionOrchestrator) shutdown() {
o.compactorMsgCh <- CompactorShutdown
o.waitGroup.Wait()
}

func (o *CompactorOrchestrator) loadManifest() error {
func (o *CompactionOrchestrator) loadManifest() error {
_, err := o.manifest.refresh()
if err != nil {
return err
Expand All @@ -183,7 +183,7 @@ func (o *CompactorOrchestrator) loadManifest() error {
return nil
}

func (o *CompactorOrchestrator) refreshDBState() error {
func (o *CompactionOrchestrator) refreshDBState() error {
state, err := o.manifest.dbState()
if err != nil {
return err
Expand All @@ -197,7 +197,7 @@ func (o *CompactorOrchestrator) refreshDBState() error {
return nil
}

func (o *CompactorOrchestrator) maybeScheduleCompactions() error {
func (o *CompactionOrchestrator) maybeScheduleCompactions() error {
compactions := o.scheduler.maybeScheduleCompaction(o.state)
for _, compaction := range compactions {
err := o.submitCompaction(compaction)
Expand All @@ -208,7 +208,7 @@ func (o *CompactorOrchestrator) maybeScheduleCompactions() error {
return nil
}

func (o *CompactorOrchestrator) startCompaction(compaction Compaction) {
func (o *CompactionOrchestrator) startCompaction(compaction Compaction) {
o.logCompactionState()
dbState := o.state.dbState

Expand Down Expand Up @@ -254,7 +254,7 @@ func (o *CompactorOrchestrator) startCompaction(compaction Compaction) {
})
}

func (o *CompactorOrchestrator) processCompactionResult(log *slog.Logger) bool {
func (o *CompactionOrchestrator) processCompactionResult(log *slog.Logger) bool {
result, resultPresent := o.executor.nextCompactionResult()
if resultPresent {
if result.Error != nil {
Expand All @@ -267,7 +267,7 @@ func (o *CompactorOrchestrator) processCompactionResult(log *slog.Logger) bool {
return resultPresent
}

func (o *CompactorOrchestrator) finishCompaction(outputSR *compaction2.SortedRun) error {
func (o *CompactionOrchestrator) finishCompaction(outputSR *compaction2.SortedRun) error {
o.state.finishCompaction(outputSR)
o.logCompactionState()
err := o.writeManifest()
Expand All @@ -282,7 +282,7 @@ func (o *CompactorOrchestrator) finishCompaction(outputSR *compaction2.SortedRun
return nil
}

func (o *CompactorOrchestrator) writeManifest() error {
func (o *CompactionOrchestrator) writeManifest() error {
for {
err := o.loadManifest()
if err != nil {
Expand All @@ -299,7 +299,7 @@ func (o *CompactorOrchestrator) writeManifest() error {
}
}

func (o *CompactorOrchestrator) submitCompaction(compaction Compaction) error {
func (o *CompactionOrchestrator) submitCompaction(compaction Compaction) error {
err := o.state.submitCompaction(compaction)
if err != nil {
o.log.Warn("invalid compaction", "error", err)
Expand All @@ -309,7 +309,7 @@ func (o *CompactorOrchestrator) submitCompaction(compaction Compaction) error {
return nil
}

func (o *CompactorOrchestrator) logCompactionState() {
func (o *CompactionOrchestrator) logCompactionState() {
// LogState(o.log, o.state.dbState)
for _, compaction := range o.state.compactions {
o.log.Info("in-flight compaction", "compaction", compaction)
Expand Down
2 changes: 1 addition & 1 deletion slatedb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestShouldWriteManifestSafely(t *testing.T) {
err = db.Close()
assert.NoError(t, err)

orchestrator, err := newCompactorOrchestrator(compactorOptions(), manifestStore, tableStore)
orchestrator, err := newCompactionOrchestrator(compactorOptions(), manifestStore, tableStore)
assert.NoError(t, err)

l0IDsToCompact := make([]SourceID, 0)
Expand Down

0 comments on commit 6ecf828

Please sign in to comment.