From 348dd569a9108e6e935701fc84999d8829acca9f Mon Sep 17 00:00:00 2001 From: Naveen Date: Mon, 30 Dec 2024 14:03:46 +0530 Subject: [PATCH 1/4] Remove workerCh from CompactorOrchestrator and rename it to resultCh as a field in CompactionExecutor --- slatedb/compactor.go | 119 +++++++++++++++++++------------------- slatedb/compactor_test.go | 8 ++- 2 files changed, 66 insertions(+), 61 deletions(-) diff --git a/slatedb/compactor.go b/slatedb/compactor.go index 4ba577b..1943a9d 100644 --- a/slatedb/compactor.go +++ b/slatedb/compactor.go @@ -29,15 +29,14 @@ const ( CompactorShutdown CompactorMainMsg = iota + 1 ) -type WorkerToOrchestratorMsg struct { - CompactionResult *compaction2.SortedRun - CompactionError error +type CompactionResult struct { + SortedRun *compaction2.SortedRun + Error error } -// Compactor creates one goroutine for CompactorOrchestrator. The CompactorOrchestrator can create multiple goroutines -// for running the actual compaction. The result of the compaction are shared back to the CompactorOrchestrator through -// CompactorOrchestrator.workerCh channel. -// When Compactor.close is called, we wait till all the goroutines/workers started by CompactorOrchestrator stop running +// Compactor creates one goroutine for CompactorOrchestrator. The CompactionExecutor can create multiple goroutines +// for running the actual compaction. +// When Compactor.close is called, we wait till all the goroutines/workers started by CompactionExecutor stop running type Compactor struct { // compactorMsgCh - When Compactor.close is called we write CompactorShutdown message to this channel // CompactorOrchestrator which runs in a different goroutine reads this channel and shuts down @@ -94,26 +93,23 @@ func spawnAndRunCompactorOrchestrator( ticker := time.NewTicker(opts.CompactorOptions.PollInterval) defer ticker.Stop() - // TODO(thrawn01): Race, cannot know if no more work is coming by checking length of the channel - for !(orchestrator.executor.isStopped() && len(orchestrator.workerCh) == 0) { + for { + resultPresent := orchestrator.processCompactionResult(opts.Log) + if !resultPresent && orchestrator.executor.isStopped() { + break + } + select { case <-ticker.C: err := orchestrator.loadManifest() assert.True(err == nil, "Failed to load manifest") - case msg := <-orchestrator.workerCh: - if msg.CompactionError != nil { - opts.Log.Error("Error executing compaction", "error", msg.CompactionError) - } else if msg.CompactionResult != nil { - err := orchestrator.finishCompaction(msg.CompactionResult) - assert.True(err == nil, "Failed to finish compaction") - } case <-orchestrator.compactorMsgCh: - // we receive Shutdown msg on compactorMsgCh. - // Stop the executor. Don't return because there might - // still be messages in `orchestrator.workerCh`. Let the loop continue - // to drain them until empty. + // we receive Shutdown msg on compactorMsgCh. Stop the executor. + // Don't return and let the loop continue until `orchestrator.processCompactionResult` + // has no more compaction results to process orchestrator.executor.stop() ticker.Stop() + default: } } }() @@ -131,11 +127,7 @@ type CompactorOrchestrator struct { // compactorMsgCh - When CompactorOrchestrator receives a CompactorShutdown message on this channel, // it calls executor.stop compactorMsgCh <-chan CompactorMainMsg - - // workerCh - CompactionExecutor sends a CompactionFinished message to this channel once compaction is done. - // The CompactorOrchestrator loops through each item in this channel and calls finishCompaction - workerCh <-chan WorkerToOrchestratorMsg - log *slog.Logger + log *slog.Logger } func newCompactorOrchestrator( @@ -164,8 +156,7 @@ func newCompactorOrchestrator( } scheduler := loadCompactionScheduler() - workerCh := make(chan WorkerToOrchestratorMsg, 1) - executor := newCompactorExecutor(opts.CompactorOptions, workerCh, tableStore) + executor := newCompactorExecutor(opts.CompactorOptions, tableStore) o := CompactorOrchestrator{ options: opts.CompactorOptions, @@ -174,7 +165,6 @@ func newCompactorOrchestrator( scheduler: scheduler, executor: executor, compactorMsgCh: compactorMsgCh, - workerCh: workerCh, log: opts.Log, } return &o, nil @@ -275,6 +265,19 @@ func (o *CompactorOrchestrator) startCompaction(compaction Compaction) { }) } +func (o *CompactorOrchestrator) processCompactionResult(log *slog.Logger) bool { + result, resultPresent := o.executor.compactionResult() + if resultPresent { + if result.Error != nil { + log.Error("Error executing compaction", "error", result.Error) + } else if result.SortedRun != nil { + err := o.finishCompaction(result.SortedRun) + assert.True(err == nil, "Failed to finish compaction") + } + } + return resultPresent +} + func (o *CompactorOrchestrator) finishCompaction(outputSR *compaction2.SortedRun) error { o.state.finishCompaction(outputSR) o.logCompactionState() @@ -338,22 +341,28 @@ type CompactionExecutor struct { options *CompactorOptions tableStore *store.TableStore - workerCh chan<- WorkerToOrchestratorMsg + resultCh chan CompactionResult tasksWG sync.WaitGroup - abortCh chan bool stopped atomic.Bool } func newCompactorExecutor( options *CompactorOptions, - workerCh chan<- WorkerToOrchestratorMsg, tableStore *store.TableStore, ) *CompactionExecutor { return &CompactionExecutor{ options: options, tableStore: tableStore, - workerCh: workerCh, - abortCh: make(chan bool), + resultCh: make(chan CompactionResult, 1), + } +} + +func (e *CompactionExecutor) compactionResult() (CompactionResult, bool) { + select { + case result := <-e.resultCh: + return result, true + default: + return CompactionResult{}, false } } @@ -456,40 +465,34 @@ func (e *CompactionExecutor) startCompaction(compaction CompactionJob) { if e.isStopped() { return } + e.tasksWG.Add(1) go func() { defer e.tasksWG.Done() - for { - select { - case <-e.abortCh: - return - default: - sortedRun, err := e.executeCompaction(compaction) - var msg WorkerToOrchestratorMsg - if err != nil { - // TODO(thrawn01): log the error somewhere. - msg = WorkerToOrchestratorMsg{CompactionError: err} - } else if sortedRun != nil { - msg = WorkerToOrchestratorMsg{CompactionResult: sortedRun} - } - - // wait till we can send msg to workerCh or abort is called - for { - select { - case <-e.abortCh: - return - case e.workerCh <- msg: - } - } - } + + if e.isStopped() { + return } + + var result CompactionResult + sortedRun, err := e.executeCompaction(compaction) + if err != nil { + // TODO(thrawn01): log the error somewhere. + result = CompactionResult{Error: err} + } else if sortedRun != nil { + result = CompactionResult{SortedRun: sortedRun} + } + e.resultCh <- result }() } func (e *CompactionExecutor) stop() { - close(e.abortCh) - e.tasksWG.Wait() e.stopped.Store(true) + e.waitForTasksCompletion() +} + +func (e *CompactionExecutor) waitForTasksCompletion() { + e.tasksWG.Wait() } func (e *CompactionExecutor) isStopped() bool { diff --git a/slatedb/compactor_test.go b/slatedb/compactor_test.go index c4c0efe..f0041a6 100644 --- a/slatedb/compactor_test.go +++ b/slatedb/compactor_test.go @@ -101,9 +101,11 @@ func TestShouldWriteManifestSafely(t *testing.T) { err = orchestrator.submitCompaction(newCompaction(l0IDsToCompact, 0)) assert.NoError(t, err) - msg := <-orchestrator.workerCh - assert.NotNil(t, msg.CompactionResult) - sr := msg.CompactionResult + orchestrator.executor.waitForTasksCompletion() + msg, ok := orchestrator.executor.compactionResult() + assert.True(t, ok) + assert.NotNil(t, msg.SortedRun) + sr := msg.SortedRun err = orchestrator.finishCompaction(sr) assert.NoError(t, err) From 1c40f21a2aa7352e76e9f7715ff6075b7ae8493e Mon Sep 17 00:00:00 2001 From: Naveen Date: Mon, 30 Dec 2024 17:43:28 +0530 Subject: [PATCH 2/4] Access compactorMsgCh channel only from CompactorOrchestrator --- slatedb/compactor.go | 125 ++++++++++++++++++-------------------- slatedb/compactor_test.go | 4 +- 2 files changed, 59 insertions(+), 70 deletions(-) diff --git a/slatedb/compactor.go b/slatedb/compactor.go index 1943a9d..159aec9 100644 --- a/slatedb/compactor.go +++ b/slatedb/compactor.go @@ -3,20 +3,18 @@ package slatedb import ( "context" "errors" + "github.com/oklog/ulid/v2" "github.com/slatedb/slatedb-go/internal/assert" + "github.com/slatedb/slatedb-go/internal/iter" "github.com/slatedb/slatedb-go/internal/sstable" "github.com/slatedb/slatedb-go/internal/types" + "github.com/slatedb/slatedb-go/slatedb/common" compaction2 "github.com/slatedb/slatedb-go/slatedb/compaction" "github.com/slatedb/slatedb-go/slatedb/store" "log/slog" - "math" "sync" "sync/atomic" "time" - - "github.com/oklog/ulid/v2" - "github.com/slatedb/slatedb-go/internal/iter" - "github.com/slatedb/slatedb-go/slatedb/common" ) type CompactionScheduler interface { @@ -34,36 +32,26 @@ type CompactionResult struct { Error error } -// Compactor creates one goroutine for CompactorOrchestrator. The CompactionExecutor can create multiple goroutines -// for running the actual compaction. -// When Compactor.close is called, we wait till all the goroutines/workers started by CompactionExecutor stop running +// Compactor - The CompactorOrchestrator checks with the CompactionScheduler if Level0 needs to be compacted. +// If compaction is needed, the CompactorOrchestrator gives CompactionJobs to the CompactionExecutor. +// The CompactionExecutor creates new goroutine for each CompactionJob and the results are written to a channel. type Compactor struct { - // compactorMsgCh - When Compactor.close is called we write CompactorShutdown message to this channel - // CompactorOrchestrator which runs in a different goroutine reads this channel and shuts down - compactorMsgCh chan<- CompactorMainMsg - - // compactorWG - When Compactor.close is called then wait till goroutine running CompactorOrchestrator - // is completed - compactorWG *sync.WaitGroup + orchestrator *CompactorOrchestrator } func newCompactor(manifestStore *ManifestStore, tableStore *store.TableStore, opts DBOptions) (*Compactor, error) { - compactorMsgCh := make(chan CompactorMainMsg, math.MaxUint8) - - compactorWG, errCh := spawnAndRunCompactorOrchestrator(manifestStore, tableStore, opts, compactorMsgCh) - - err := <-errCh - assert.True(err == nil, "Failed to start compactor") + orchestrator, err := spawnAndRunCompactorOrchestrator(manifestStore, tableStore, opts) + if err != nil { + return nil, err + } return &Compactor{ - compactorMsgCh: compactorMsgCh, - compactorWG: compactorWG, + orchestrator: orchestrator, }, nil } func (c *Compactor) close() { - c.compactorMsgCh <- CompactorShutdown - c.compactorWG.Wait() + c.orchestrator.shutdown() } // ------------------------------------------------ @@ -74,47 +62,15 @@ func spawnAndRunCompactorOrchestrator( manifestStore *ManifestStore, tableStore *store.TableStore, opts DBOptions, - compactorMsgCh <-chan CompactorMainMsg, -) (*sync.WaitGroup, chan error) { - - errCh := make(chan error) - compactorWG := &sync.WaitGroup{} - compactorWG.Add(1) - - go func() { - defer compactorWG.Done() - orchestrator, err := newCompactorOrchestrator(opts, manifestStore, tableStore, compactorMsgCh) - if err != nil { - errCh <- err - return - } - errCh <- nil - - ticker := time.NewTicker(opts.CompactorOptions.PollInterval) - defer ticker.Stop() - - for { - resultPresent := orchestrator.processCompactionResult(opts.Log) - if !resultPresent && orchestrator.executor.isStopped() { - break - } +) (*CompactorOrchestrator, error) { - select { - case <-ticker.C: - err := orchestrator.loadManifest() - assert.True(err == nil, "Failed to load manifest") - case <-orchestrator.compactorMsgCh: - // we receive Shutdown msg on compactorMsgCh. Stop the executor. - // Don't return and let the loop continue until `orchestrator.processCompactionResult` - // has no more compaction results to process - orchestrator.executor.stop() - ticker.Stop() - default: - } - } - }() + orchestrator, err := newCompactorOrchestrator(opts, manifestStore, tableStore) + if err != nil { + return nil, err + } - return compactorWG, errCh + orchestrator.spawnLoop(opts) + return orchestrator, nil } type CompactorOrchestrator struct { @@ -126,7 +82,8 @@ type CompactorOrchestrator struct { // compactorMsgCh - When CompactorOrchestrator receives a CompactorShutdown message on this channel, // it calls executor.stop - compactorMsgCh <-chan CompactorMainMsg + compactorMsgCh chan CompactorMainMsg + waitGroup sync.WaitGroup log *slog.Logger } @@ -134,7 +91,6 @@ func newCompactorOrchestrator( opts DBOptions, manifestStore *ManifestStore, tableStore *store.TableStore, - compactorMsgCh <-chan CompactorMainMsg, ) (*CompactorOrchestrator, error) { sm, err := loadStoredManifest(manifestStore) if err != nil { @@ -164,7 +120,7 @@ func newCompactorOrchestrator( state: state, scheduler: scheduler, executor: executor, - compactorMsgCh: compactorMsgCh, + compactorMsgCh: make(chan CompactorMainMsg, 1), log: opts.Log, } return &o, nil @@ -182,6 +138,41 @@ func loadCompactionScheduler() CompactionScheduler { return SizeTieredCompactionScheduler{} } +func (o *CompactorOrchestrator) spawnLoop(opts DBOptions) { + o.waitGroup.Add(1) + go func() { + defer o.waitGroup.Done() + + ticker := time.NewTicker(opts.CompactorOptions.PollInterval) + defer ticker.Stop() + + for { + resultPresent := o.processCompactionResult(opts.Log) + if !resultPresent && o.executor.isStopped() { + break + } + + select { + case <-ticker.C: + err := o.loadManifest() + assert.True(err == nil, "Failed to load manifest") + case <-o.compactorMsgCh: + // we receive Shutdown msg on compactorMsgCh. Stop the executor. + // Don't return and let the loop continue until `orchestrator.processCompactionResult` + // has no more compaction results to process + o.executor.stop() + ticker.Stop() + default: + } + } + }() +} + +func (o *CompactorOrchestrator) shutdown() { + o.compactorMsgCh <- CompactorShutdown + o.waitGroup.Wait() +} + func (o *CompactorOrchestrator) loadManifest() error { _, err := o.manifest.refresh() if err != nil { diff --git a/slatedb/compactor_test.go b/slatedb/compactor_test.go index f0041a6..35aae67 100644 --- a/slatedb/compactor_test.go +++ b/slatedb/compactor_test.go @@ -9,7 +9,6 @@ import ( "github.com/slatedb/slatedb-go/slatedb/state" "github.com/slatedb/slatedb-go/slatedb/store" "log/slog" - "math" "slices" "testing" "time" @@ -82,8 +81,7 @@ func TestShouldWriteManifestSafely(t *testing.T) { err = db.Close() assert.NoError(t, err) - compactorMsgCh := make(chan CompactorMainMsg, math.MaxUint8) - orchestrator, err := newCompactorOrchestrator(compactorOptions(), manifestStore, tableStore, compactorMsgCh) + orchestrator, err := newCompactorOrchestrator(compactorOptions(), manifestStore, tableStore) assert.NoError(t, err) l0IDsToCompact := make([]SourceID, 0) From 3bf16045074fffed728e96fdb7885210112ffd57 Mon Sep 17 00:00:00 2001 From: Naveen Date: Mon, 30 Dec 2024 18:11:07 +0530 Subject: [PATCH 3/4] Rename methods --- slatedb/compactor.go | 12 +++++------- slatedb/compactor_test.go | 4 ++-- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/slatedb/compactor.go b/slatedb/compactor.go index 159aec9..6a73ade 100644 --- a/slatedb/compactor.go +++ b/slatedb/compactor.go @@ -63,7 +63,6 @@ func spawnAndRunCompactorOrchestrator( tableStore *store.TableStore, opts DBOptions, ) (*CompactorOrchestrator, error) { - orchestrator, err := newCompactorOrchestrator(opts, manifestStore, tableStore) if err != nil { return nil, err @@ -158,8 +157,7 @@ func (o *CompactorOrchestrator) spawnLoop(opts DBOptions) { assert.True(err == nil, "Failed to load manifest") case <-o.compactorMsgCh: // we receive Shutdown msg on compactorMsgCh. Stop the executor. - // Don't return and let the loop continue until `orchestrator.processCompactionResult` - // has no more compaction results to process + // Don't return and let the loop continue until there are no more compaction results to process o.executor.stop() ticker.Stop() default: @@ -257,7 +255,7 @@ func (o *CompactorOrchestrator) startCompaction(compaction Compaction) { } func (o *CompactorOrchestrator) processCompactionResult(log *slog.Logger) bool { - result, resultPresent := o.executor.compactionResult() + result, resultPresent := o.executor.nextCompactionResult() if resultPresent { if result.Error != nil { log.Error("Error executing compaction", "error", result.Error) @@ -348,7 +346,7 @@ func newCompactorExecutor( } } -func (e *CompactionExecutor) compactionResult() (CompactionResult, bool) { +func (e *CompactionExecutor) nextCompactionResult() (CompactionResult, bool) { select { case result := <-e.resultCh: return result, true @@ -479,10 +477,10 @@ func (e *CompactionExecutor) startCompaction(compaction CompactionJob) { func (e *CompactionExecutor) stop() { e.stopped.Store(true) - e.waitForTasksCompletion() + e.waitForTasksToComplete() } -func (e *CompactionExecutor) waitForTasksCompletion() { +func (e *CompactionExecutor) waitForTasksToComplete() { e.tasksWG.Wait() } diff --git a/slatedb/compactor_test.go b/slatedb/compactor_test.go index 35aae67..2c8998f 100644 --- a/slatedb/compactor_test.go +++ b/slatedb/compactor_test.go @@ -99,8 +99,8 @@ func TestShouldWriteManifestSafely(t *testing.T) { err = orchestrator.submitCompaction(newCompaction(l0IDsToCompact, 0)) assert.NoError(t, err) - orchestrator.executor.waitForTasksCompletion() - msg, ok := orchestrator.executor.compactionResult() + orchestrator.executor.waitForTasksToComplete() + msg, ok := orchestrator.executor.nextCompactionResult() assert.True(t, ok) assert.NotNil(t, msg.SortedRun) sr := msg.SortedRun From 6ecf82815e636abeb0cd5f67d23e0cbddbb0cea6 Mon Sep 17 00:00:00 2001 From: Naveen Date: Mon, 30 Dec 2024 18:24:58 +0530 Subject: [PATCH 4/4] Rename CompactorOrchestrator to CompactionOrchestrator --- slatedb/compactor.go | 48 +++++++++++++++++++-------------------- slatedb/compactor_test.go | 2 +- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/slatedb/compactor.go b/slatedb/compactor.go index 6a73ade..9c5506c 100644 --- a/slatedb/compactor.go +++ b/slatedb/compactor.go @@ -32,15 +32,15 @@ type CompactionResult struct { Error error } -// Compactor - The CompactorOrchestrator checks with the CompactionScheduler if Level0 needs to be compacted. -// If compaction is needed, the CompactorOrchestrator gives CompactionJobs to the CompactionExecutor. +// Compactor - The CompactionOrchestrator checks with the CompactionScheduler if Level0 needs to be compacted. +// If compaction is needed, the CompactionOrchestrator gives CompactionJobs to the CompactionExecutor. // The CompactionExecutor creates new goroutine for each CompactionJob and the results are written to a channel. type Compactor struct { - orchestrator *CompactorOrchestrator + orchestrator *CompactionOrchestrator } func newCompactor(manifestStore *ManifestStore, tableStore *store.TableStore, opts DBOptions) (*Compactor, error) { - orchestrator, err := spawnAndRunCompactorOrchestrator(manifestStore, tableStore, opts) + orchestrator, err := spawnAndRunCompactionOrchestrator(manifestStore, tableStore, opts) if err != nil { return nil, err } @@ -55,15 +55,15 @@ func (c *Compactor) close() { } // ------------------------------------------------ -// CompactorOrchestrator +// CompactionOrchestrator // ------------------------------------------------ -func spawnAndRunCompactorOrchestrator( +func spawnAndRunCompactionOrchestrator( manifestStore *ManifestStore, tableStore *store.TableStore, opts DBOptions, -) (*CompactorOrchestrator, error) { - orchestrator, err := newCompactorOrchestrator(opts, manifestStore, tableStore) +) (*CompactionOrchestrator, error) { + orchestrator, err := newCompactionOrchestrator(opts, manifestStore, tableStore) if err != nil { return nil, err } @@ -72,25 +72,25 @@ func spawnAndRunCompactorOrchestrator( return orchestrator, nil } -type CompactorOrchestrator struct { +type CompactionOrchestrator struct { options *CompactorOptions manifest *FenceableManifest state *CompactorState scheduler CompactionScheduler executor *CompactionExecutor - // compactorMsgCh - When CompactorOrchestrator receives a CompactorShutdown message on this channel, + // compactorMsgCh - When CompactionOrchestrator receives a CompactorShutdown message on this channel, // it calls executor.stop compactorMsgCh chan CompactorMainMsg waitGroup sync.WaitGroup log *slog.Logger } -func newCompactorOrchestrator( +func newCompactionOrchestrator( opts DBOptions, manifestStore *ManifestStore, tableStore *store.TableStore, -) (*CompactorOrchestrator, error) { +) (*CompactionOrchestrator, error) { sm, err := loadStoredManifest(manifestStore) if err != nil { return nil, err @@ -113,7 +113,7 @@ func newCompactorOrchestrator( scheduler := loadCompactionScheduler() executor := newCompactorExecutor(opts.CompactorOptions, tableStore) - o := CompactorOrchestrator{ + o := CompactionOrchestrator{ options: opts.CompactorOptions, manifest: manifest, state: state, @@ -137,7 +137,7 @@ func loadCompactionScheduler() CompactionScheduler { return SizeTieredCompactionScheduler{} } -func (o *CompactorOrchestrator) spawnLoop(opts DBOptions) { +func (o *CompactionOrchestrator) spawnLoop(opts DBOptions) { o.waitGroup.Add(1) go func() { defer o.waitGroup.Done() @@ -166,12 +166,12 @@ func (o *CompactorOrchestrator) spawnLoop(opts DBOptions) { }() } -func (o *CompactorOrchestrator) shutdown() { +func (o *CompactionOrchestrator) shutdown() { o.compactorMsgCh <- CompactorShutdown o.waitGroup.Wait() } -func (o *CompactorOrchestrator) loadManifest() error { +func (o *CompactionOrchestrator) loadManifest() error { _, err := o.manifest.refresh() if err != nil { return err @@ -183,7 +183,7 @@ func (o *CompactorOrchestrator) loadManifest() error { return nil } -func (o *CompactorOrchestrator) refreshDBState() error { +func (o *CompactionOrchestrator) refreshDBState() error { state, err := o.manifest.dbState() if err != nil { return err @@ -197,7 +197,7 @@ func (o *CompactorOrchestrator) refreshDBState() error { return nil } -func (o *CompactorOrchestrator) maybeScheduleCompactions() error { +func (o *CompactionOrchestrator) maybeScheduleCompactions() error { compactions := o.scheduler.maybeScheduleCompaction(o.state) for _, compaction := range compactions { err := o.submitCompaction(compaction) @@ -208,7 +208,7 @@ func (o *CompactorOrchestrator) maybeScheduleCompactions() error { return nil } -func (o *CompactorOrchestrator) startCompaction(compaction Compaction) { +func (o *CompactionOrchestrator) startCompaction(compaction Compaction) { o.logCompactionState() dbState := o.state.dbState @@ -254,7 +254,7 @@ func (o *CompactorOrchestrator) startCompaction(compaction Compaction) { }) } -func (o *CompactorOrchestrator) processCompactionResult(log *slog.Logger) bool { +func (o *CompactionOrchestrator) processCompactionResult(log *slog.Logger) bool { result, resultPresent := o.executor.nextCompactionResult() if resultPresent { if result.Error != nil { @@ -267,7 +267,7 @@ func (o *CompactorOrchestrator) processCompactionResult(log *slog.Logger) bool { return resultPresent } -func (o *CompactorOrchestrator) finishCompaction(outputSR *compaction2.SortedRun) error { +func (o *CompactionOrchestrator) finishCompaction(outputSR *compaction2.SortedRun) error { o.state.finishCompaction(outputSR) o.logCompactionState() err := o.writeManifest() @@ -282,7 +282,7 @@ func (o *CompactorOrchestrator) finishCompaction(outputSR *compaction2.SortedRun return nil } -func (o *CompactorOrchestrator) writeManifest() error { +func (o *CompactionOrchestrator) writeManifest() error { for { err := o.loadManifest() if err != nil { @@ -299,7 +299,7 @@ func (o *CompactorOrchestrator) writeManifest() error { } } -func (o *CompactorOrchestrator) submitCompaction(compaction Compaction) error { +func (o *CompactionOrchestrator) submitCompaction(compaction Compaction) error { err := o.state.submitCompaction(compaction) if err != nil { o.log.Warn("invalid compaction", "error", err) @@ -309,7 +309,7 @@ func (o *CompactorOrchestrator) submitCompaction(compaction Compaction) error { return nil } -func (o *CompactorOrchestrator) logCompactionState() { +func (o *CompactionOrchestrator) logCompactionState() { // LogState(o.log, o.state.dbState) for _, compaction := range o.state.compactions { o.log.Info("in-flight compaction", "compaction", compaction) diff --git a/slatedb/compactor_test.go b/slatedb/compactor_test.go index 2c8998f..c798ea0 100644 --- a/slatedb/compactor_test.go +++ b/slatedb/compactor_test.go @@ -81,7 +81,7 @@ func TestShouldWriteManifestSafely(t *testing.T) { err = db.Close() assert.NoError(t, err) - orchestrator, err := newCompactorOrchestrator(compactorOptions(), manifestStore, tableStore) + orchestrator, err := newCompactionOrchestrator(compactorOptions(), manifestStore, tableStore) assert.NoError(t, err) l0IDsToCompact := make([]SourceID, 0)