Skip to content

Commit

Permalink
Rename levels package to compacted
Browse files Browse the repository at this point in the history
  • Loading branch information
naveen246 committed Dec 25, 2024
1 parent 7c53959 commit af49c9b
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package levels
package compacted

import (
"bytes"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package levels
package compacted

import (
"context"
Expand Down
18 changes: 9 additions & 9 deletions slatedb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/slatedb/slatedb-go/internal/assert"
"github.com/slatedb/slatedb-go/internal/sstable"
"github.com/slatedb/slatedb-go/internal/types"
"github.com/slatedb/slatedb-go/slatedb/levels"
"github.com/slatedb/slatedb-go/slatedb/compacted"
"github.com/slatedb/slatedb-go/slatedb/store"
"log/slog"
"math"
Expand All @@ -30,7 +30,7 @@ const (
)

type WorkerToOrchestratorMsg struct {
CompactionResult *levels.SortedRun
CompactionResult *compacted.SortedRun
CompactionError error
}

Expand Down Expand Up @@ -247,7 +247,7 @@ func (o *CompactorOrchestrator) startCompaction(compaction Compaction) {
}
}

srsByID := make(map[uint32]levels.SortedRun)
srsByID := make(map[uint32]compacted.SortedRun)
for _, sr := range dbState.compacted {
srsByID[sr.ID] = sr
}
Expand All @@ -260,7 +260,7 @@ func (o *CompactorOrchestrator) startCompaction(compaction Compaction) {
}
}

sortedRuns := make([]levels.SortedRun, 0)
sortedRuns := make([]compacted.SortedRun, 0)
for _, sID := range compaction.sources {
srID, ok := sID.sortedRunID().Get()
if ok {
Expand All @@ -275,7 +275,7 @@ func (o *CompactorOrchestrator) startCompaction(compaction Compaction) {
})
}

func (o *CompactorOrchestrator) finishCompaction(outputSR *levels.SortedRun) error {
func (o *CompactorOrchestrator) finishCompaction(outputSR *compacted.SortedRun) error {
o.state.finishCompaction(outputSR)
o.logCompactionState()
err := o.writeManifest()
Expand Down Expand Up @@ -331,7 +331,7 @@ func (o *CompactorOrchestrator) logCompactionState() {
type CompactionJob struct {
destination uint32
sstList []sstable.Handle
sortedRuns []levels.SortedRun
sortedRuns []compacted.SortedRun
}

type CompactionExecutor struct {
Expand Down Expand Up @@ -376,7 +376,7 @@ func (e *CompactionExecutor) loadIterators(compaction CompactionJob) (iter.KVIte

srIters := make([]iter.KVIterator, 0)
for _, sr := range compaction.sortedRuns {
srIter, err := levels.NewSortedRunIterator(sr, e.tableStore.Clone())
srIter, err := compacted.NewSortedRunIterator(sr, e.tableStore.Clone())
if err != nil {
return nil, err
}
Expand All @@ -397,7 +397,7 @@ func (e *CompactionExecutor) loadIterators(compaction CompactionJob) (iter.KVIte
return it, nil
}

func (e *CompactionExecutor) executeCompaction(compaction CompactionJob) (*levels.SortedRun, error) {
func (e *CompactionExecutor) executeCompaction(compaction CompactionJob) (*compacted.SortedRun, error) {
allIter, err := e.loadIterators(compaction)
if err != nil {
return nil, err
Expand Down Expand Up @@ -446,7 +446,7 @@ func (e *CompactionExecutor) executeCompaction(compaction CompactionJob) (*level
}
outputSSTs = append(outputSSTs, *sst)
}
return &levels.SortedRun{
return &compacted.SortedRun{
ID: compaction.destination,
SSTList: outputSSTs,
}, warn.If()
Expand Down
8 changes: 4 additions & 4 deletions slatedb/compactor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"github.com/kapetan-io/tackle/set"
"github.com/slatedb/slatedb-go/internal/assert"
"github.com/slatedb/slatedb-go/internal/sstable"
"github.com/slatedb/slatedb-go/slatedb/levels"
"github.com/slatedb/slatedb-go/slatedb/compacted"
"log/slog"
"math"
"strconv"
Expand Down Expand Up @@ -163,7 +163,7 @@ func (c *CompactorState) refreshDBState(writerState *CoreDBState) {

// update dbState by removing L0 SSTs and compacted SortedRuns that are present
// in Compaction.sources
func (c *CompactorState) finishCompaction(outputSR *levels.SortedRun) {
func (c *CompactorState) finishCompaction(outputSR *compacted.SortedRun) {
compaction, ok := c.compactions[outputSR.ID]
if !ok {
return
Expand Down Expand Up @@ -194,7 +194,7 @@ func (c *CompactorState) finishCompaction(outputSR *levels.SortedRun) {
}
}

newCompacted := make([]levels.SortedRun, 0)
newCompacted := make([]compacted.SortedRun, 0)
inserted := false
for _, sr := range dbState.compacted {
if !inserted && outputSR.ID >= sr.ID {
Expand Down Expand Up @@ -226,7 +226,7 @@ func (c *CompactorState) finishCompaction(outputSR *levels.SortedRun) {
}

// sortedRun list should have IDs in decreasing order
func (c *CompactorState) assertCompactedSRsInIDOrder(compacted []levels.SortedRun) {
func (c *CompactorState) assertCompactedSRsInIDOrder(compacted []compacted.SortedRun) {
lastSortedRunID := uint32(math.MaxUint32)
for _, sr := range compacted {
assert.True(sr.ID < lastSortedRunID, "compacted sortedRuns not in decreasing order")
Expand Down
10 changes: 5 additions & 5 deletions slatedb/compactor_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
assert2 "github.com/slatedb/slatedb-go/internal/assert"
"github.com/slatedb/slatedb-go/internal/sstable"
"github.com/slatedb/slatedb-go/slatedb/levels"
"github.com/slatedb/slatedb-go/slatedb/compacted"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
Expand All @@ -30,7 +30,7 @@ func TestShouldUpdateDBStateWhenCompactionFinished(t *testing.T) {
err := state.submitCompaction(compaction)
assert.NoError(t, err)

sr := levels.SortedRun{
sr := compacted.SortedRun{
ID: 0,
SSTList: beforeCompaction.l0,
}
Expand All @@ -55,7 +55,7 @@ func TestShouldRemoveCompactionWhenCompactionFinished(t *testing.T) {
err := state.submitCompaction(compaction)
assert.NoError(t, err)

sr := levels.SortedRun{
sr := compacted.SortedRun{
ID: 0,
SSTList: beforeCompaction.l0,
}
Expand Down Expand Up @@ -92,7 +92,7 @@ func TestShouldRefreshDBStateCorrectly(t *testing.T) {
compaction := newCompaction([]SourceID{newSourceIDSST(compactedID)}, 0)
err := state.submitCompaction(compaction)
assert.NoError(t, err)
state.finishCompaction(&levels.SortedRun{
state.finishCompaction(&compacted.SortedRun{
ID: 0,
SSTList: []sstable.Handle{originalL0s[len(originalL0s)-1]},
})
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestShouldRefreshDBStateCorrectlyWhenAllL0Compacted(t *testing.T) {
compaction := newCompaction(sourceIDs, 0)
err := state.submitCompaction(compaction)
assert.NoError(t, err)
state.finishCompaction(&levels.SortedRun{
state.finishCompaction(&compacted.SortedRun{
ID: 0,
SSTList: originalL0s,
})
Expand Down
6 changes: 3 additions & 3 deletions slatedb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/slatedb/slatedb-go/internal/assert"
"github.com/slatedb/slatedb-go/internal/sstable"
"github.com/slatedb/slatedb-go/internal/types"
"github.com/slatedb/slatedb-go/slatedb/levels"
"github.com/slatedb/slatedb-go/slatedb/compacted"
"github.com/slatedb/slatedb-go/slatedb/store"
"log/slog"
"math"
Expand Down Expand Up @@ -190,7 +190,7 @@ func (db *DB) GetWithOptions(ctx context.Context, key []byte, options ReadOption
// search for key in compacted Sorted runs
for _, sr := range snapshot.core.compacted {
if db.srMayIncludeKey(sr, key) {
iter, err := levels.NewSortedRunIteratorFromKey(sr, key, db.tableStore.Clone())
iter, err := compacted.NewSortedRunIteratorFromKey(sr, key, db.tableStore.Clone())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -230,7 +230,7 @@ func (db *DB) sstMayIncludeKey(sst sstable.Handle, key []byte) bool {
return true
}

func (db *DB) srMayIncludeKey(sr levels.SortedRun, key []byte) bool {
func (db *DB) srMayIncludeKey(sr compacted.SortedRun, key []byte) bool {
sstOption := sr.SstWithKey(key)
if sstOption.IsAbsent() {
return false
Expand Down
8 changes: 4 additions & 4 deletions slatedb/db_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package slatedb
import (
"github.com/slatedb/slatedb-go/internal/assert"
"github.com/slatedb/slatedb-go/internal/sstable"
"github.com/slatedb/slatedb-go/slatedb/levels"
"github.com/slatedb/slatedb-go/slatedb/compacted"
"github.com/slatedb/slatedb-go/slatedb/table"
"log/slog"
"sync"
Expand All @@ -23,7 +23,7 @@ import (
type CoreDBState struct {
l0LastCompacted mo.Option[ulid.ULID]
l0 []sstable.Handle
compacted []levels.SortedRun
compacted []compacted.SortedRun

// nextWalSstID is used as the ID of new ImmutableWAL created during WAL flush process
// It is initialized to 1 and keeps getting incremented by 1 for new ImmutableWALs
Expand All @@ -39,7 +39,7 @@ func newCoreDBState() *CoreDBState {
coreState := &CoreDBState{
l0LastCompacted: mo.None[ulid.ULID](),
l0: make([]sstable.Handle, 0),
compacted: []levels.SortedRun{},
compacted: []compacted.SortedRun{},
}
coreState.nextWalSstID.Store(1)
coreState.lastCompactedWalSSTID.Store(0)
Expand All @@ -51,7 +51,7 @@ func (c *CoreDBState) clone() *CoreDBState {
for _, sst := range c.l0 {
l0 = append(l0, *sst.Clone())
}
compacted := make([]levels.SortedRun, 0, len(c.compacted))
compacted := make([]compacted.SortedRun, 0, len(c.compacted))
for _, sr := range c.compacted {
compacted = append(compacted, *sr.Clone())
}
Expand Down
10 changes: 5 additions & 5 deletions slatedb/flatbuf_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/slatedb/slatedb-go/internal/compress"
"github.com/slatedb/slatedb-go/internal/flatbuf"
"github.com/slatedb/slatedb-go/internal/sstable"
"github.com/slatedb/slatedb-go/slatedb/levels"
"github.com/slatedb/slatedb-go/slatedb/compacted"
)

// ------------------------------------------------
Expand Down Expand Up @@ -91,10 +91,10 @@ func (f FlatBufferManifestCodec) parseFlatBufSSTInfo(info *flatbuf.SsTableInfoT)
}
}

func (f FlatBufferManifestCodec) parseFlatBufSortedRuns(fbSortedRuns []*flatbuf.SortedRunT) []levels.SortedRun {
sortedRuns := make([]levels.SortedRun, 0)
func (f FlatBufferManifestCodec) parseFlatBufSortedRuns(fbSortedRuns []*flatbuf.SortedRunT) []compacted.SortedRun {
sortedRuns := make([]compacted.SortedRun, 0)
for _, run := range fbSortedRuns {
sortedRuns = append(sortedRuns, levels.SortedRun{
sortedRuns = append(sortedRuns, compacted.SortedRun{
ID: run.Id,
SSTList: f.parseFlatBufSSTList(run.Ssts),
})
Expand Down Expand Up @@ -170,7 +170,7 @@ func (fb *DBFlatBufferBuilder) compactedSSTID(id ulid.ULID) *flatbuf.CompactedSs
}
}

func (fb *DBFlatBufferBuilder) sortedRunsToFlatBuf(sortedRuns []levels.SortedRun) []*flatbuf.SortedRunT {
func (fb *DBFlatBufferBuilder) sortedRunsToFlatBuf(sortedRuns []compacted.SortedRun) []*flatbuf.SortedRunT {
sortedRunFBs := make([]*flatbuf.SortedRunT, 0)
for _, sortedRun := range sortedRuns {
sortedRunFBs = append(sortedRunFBs, &flatbuf.SortedRunT{
Expand Down

0 comments on commit af49c9b

Please sign in to comment.