diff --git a/slatedb/sortedrun.go b/slatedb/compaction/sortedrun.go similarity index 76% rename from slatedb/sortedrun.go rename to slatedb/compaction/sortedrun.go index ada8259..ebe904e 100644 --- a/slatedb/sortedrun.go +++ b/slatedb/compaction/sortedrun.go @@ -1,4 +1,4 @@ -package slatedb +package compaction import ( "bytes" @@ -6,6 +6,7 @@ import ( "github.com/slatedb/slatedb-go/internal/assert" "github.com/slatedb/slatedb-go/internal/sstable" "github.com/slatedb/slatedb-go/internal/types" + "github.com/slatedb/slatedb-go/slatedb/store" "github.com/samber/mo" "sort" @@ -16,14 +17,14 @@ import ( // ------------------------------------------------ type SortedRun struct { - id uint32 - sstList []sstable.Handle + ID uint32 + SSTList []sstable.Handle } func (s *SortedRun) indexOfSSTWithKey(key []byte) mo.Option[int] { - index := sort.Search(len(s.sstList), func(i int) bool { - assert.True(len(s.sstList[i].Info.FirstKey) != 0, "sst must have first key") - return bytes.Compare(s.sstList[i].Info.FirstKey, key) > 0 + index := sort.Search(len(s.SSTList), func(i int) bool { + assert.True(len(s.SSTList[i].Info.FirstKey) != 0, "sst must have first key") + return bytes.Compare(s.SSTList[i].Info.FirstKey, key) > 0 }) if index > 0 { return mo.Some(index - 1) @@ -31,22 +32,22 @@ func (s *SortedRun) indexOfSSTWithKey(key []byte) mo.Option[int] { return mo.None[int]() } -func (s *SortedRun) sstWithKey(key []byte) mo.Option[sstable.Handle] { +func (s *SortedRun) SstWithKey(key []byte) mo.Option[sstable.Handle] { index, ok := s.indexOfSSTWithKey(key).Get() if ok { - return mo.Some(s.sstList[index]) + return mo.Some(s.SSTList[index]) } return mo.None[sstable.Handle]() } -func (s *SortedRun) clone() *SortedRun { - sstList := make([]sstable.Handle, 0, len(s.sstList)) - for _, sst := range s.sstList { +func (s *SortedRun) Clone() *SortedRun { + sstList := make([]sstable.Handle, 0, len(s.SSTList)) + for _, sst := range s.SSTList { sstList = append(sstList, *sst.Clone()) } return &SortedRun{ - id: s.id, - sstList: sstList, + ID: s.ID, + SSTList: sstList, } } @@ -57,25 +58,25 @@ func (s *SortedRun) clone() *SortedRun { type SortedRunIterator struct { currentKVIter mo.Option[*sstable.Iterator] sstListIter *SSTListIterator - tableStore *TableStore + tableStore *store.TableStore warn types.ErrWarn } -func newSortedRunIterator(sr SortedRun, store *TableStore) (*SortedRunIterator, error) { - return newSortedRunIter(sr.sstList, store, mo.None[[]byte]()) +func NewSortedRunIterator(sr SortedRun, store *store.TableStore) (*SortedRunIterator, error) { + return newSortedRunIter(sr.SSTList, store, mo.None[[]byte]()) } -func newSortedRunIteratorFromKey(sr SortedRun, key []byte, store *TableStore) (*SortedRunIterator, error) { - sstList := sr.sstList +func NewSortedRunIteratorFromKey(sr SortedRun, key []byte, store *store.TableStore) (*SortedRunIterator, error) { + sstList := sr.SSTList idx, ok := sr.indexOfSSTWithKey(key).Get() if ok { - sstList = sr.sstList[idx:] + sstList = sr.SSTList[idx:] } return newSortedRunIter(sstList, store, mo.Some(key)) } -func newSortedRunIter(sstList []sstable.Handle, store *TableStore, fromKey mo.Option[[]byte]) (*SortedRunIterator, error) { +func newSortedRunIter(sstList []sstable.Handle, store *store.TableStore, fromKey mo.Option[[]byte]) (*SortedRunIterator, error) { sstListIter := newSSTListIterator(sstList) currentKVIter := mo.None[*sstable.Iterator]() diff --git a/slatedb/sortedrun_test.go b/slatedb/compaction/sortedrun_test.go similarity index 89% rename from slatedb/sortedrun_test.go rename to slatedb/compaction/sortedrun_test.go index 275b190..b7b5a00 100644 --- a/slatedb/sortedrun_test.go +++ b/slatedb/compaction/sortedrun_test.go @@ -1,4 +1,4 @@ -package slatedb +package compaction import ( "context" @@ -8,6 +8,7 @@ import ( "github.com/slatedb/slatedb-go/internal/sstable" "github.com/slatedb/slatedb-go/internal/types" "github.com/slatedb/slatedb-go/slatedb/common" + "github.com/slatedb/slatedb-go/slatedb/store" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" @@ -17,7 +18,7 @@ import ( func buildSRWithSSTs( n uint64, keysPerSST uint64, - tableStore *TableStore, + tableStore *store.TableStore, keyGen common.OrderedBytesGenerator, valGen common.OrderedBytesGenerator, ) (SortedRun, error) { @@ -42,7 +43,7 @@ func TestOneSstSRIter(t *testing.T) { bucket := objstore.NewInMemBucket() conf := sstable.DefaultConfig() conf.MinFilterKeys = 3 - tableStore := NewTableStore(bucket, conf, "") + tableStore := store.NewTableStore(bucket, conf, "") builder := tableStore.TableBuilder() require.NoError(t, builder.AddValue([]byte("key1"), []byte("value1"))) @@ -55,7 +56,7 @@ func TestOneSstSRIter(t *testing.T) { assert.NoError(t, err) sr := SortedRun{0, []sstable.Handle{*sstHandle}} - iterator, err := newSortedRunIterator(sr, tableStore) + iterator, err := NewSortedRunIterator(sr, tableStore) assert.NoError(t, err) assert2.Next(t, iterator, []byte("key1"), []byte("value1")) assert2.Next(t, iterator, []byte("key2"), []byte("value2")) @@ -70,7 +71,7 @@ func TestManySstSRIter(t *testing.T) { bucket := objstore.NewInMemBucket() format := sstable.DefaultConfig() format.MinFilterKeys = 3 - tableStore := NewTableStore(bucket, format, "") + tableStore := store.NewTableStore(bucket, format, "") builder := tableStore.TableBuilder() require.NoError(t, builder.AddValue([]byte("key1"), []byte("value1"))) @@ -91,7 +92,7 @@ func TestManySstSRIter(t *testing.T) { require.NoError(t, err) sr := SortedRun{0, []sstable.Handle{*sstHandle, *sstHandle2}} - iterator, err := newSortedRunIterator(sr, tableStore) + iterator, err := NewSortedRunIterator(sr, tableStore) assert.NoError(t, err) assert2.Next(t, iterator, []byte("key1"), []byte("value1")) assert2.Next(t, iterator, []byte("key2"), []byte("value2")) @@ -106,7 +107,7 @@ func TestSRIterFromKey(t *testing.T) { bucket := objstore.NewInMemBucket() conf := sstable.DefaultConfig() conf.MinFilterKeys = 3 - tableStore := NewTableStore(bucket, conf, "") + tableStore := store.NewTableStore(bucket, conf, "") firstKey := []byte("aaaaaaaaaaaaaaaa") keyGen := common.NewOrderedBytesGeneratorWithByteRange(firstKey, byte('a'), byte('z')) @@ -125,7 +126,7 @@ func TestSRIterFromKey(t *testing.T) { fromKey := testCaseKeyGen.Next() testCaseValGen.Next() - kvIter, err := newSortedRunIteratorFromKey(sr, fromKey, tableStore) + kvIter, err := NewSortedRunIteratorFromKey(sr, fromKey, tableStore) assert.NoError(t, err) for j := 0; j < 30-i; j++ { @@ -141,7 +142,7 @@ func TestSRIterFromKeyLowerThanRange(t *testing.T) { bucket := objstore.NewInMemBucket() conf := sstable.DefaultConfig() conf.MinFilterKeys = 3 - tableStore := NewTableStore(bucket, conf, "") + tableStore := store.NewTableStore(bucket, conf, "") firstKey := []byte("aaaaaaaaaaaaaaaa") keyGen := common.NewOrderedBytesGeneratorWithByteRange(firstKey, byte('a'), byte('z')) @@ -154,7 +155,7 @@ func TestSRIterFromKeyLowerThanRange(t *testing.T) { sr, err := buildSRWithSSTs(3, 10, tableStore, keyGen, valGen) require.NoError(t, err) - kvIter, err := newSortedRunIteratorFromKey(sr, []byte("aaaaaaaaaa"), tableStore) + kvIter, err := NewSortedRunIteratorFromKey(sr, []byte("aaaaaaaaaa"), tableStore) assert.NoError(t, err) for j := 0; j < 30; j++ { @@ -169,7 +170,7 @@ func TestSRIterFromKeyHigherThanRange(t *testing.T) { bucket := objstore.NewInMemBucket() conf := sstable.DefaultConfig() conf.MinFilterKeys = 3 - tableStore := NewTableStore(bucket, conf, "") + tableStore := store.NewTableStore(bucket, conf, "") firstKey := []byte("aaaaaaaaaaaaaaaa") keyGen := common.NewOrderedBytesGeneratorWithByteRange(firstKey, byte('a'), byte('z')) @@ -180,7 +181,7 @@ func TestSRIterFromKeyHigherThanRange(t *testing.T) { sr, err := buildSRWithSSTs(3, 10, tableStore, keyGen, valGen) require.NoError(t, err) - kvIter, err := newSortedRunIteratorFromKey(sr, []byte("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"), tableStore) + kvIter, err := NewSortedRunIteratorFromKey(sr, []byte("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"), tableStore) assert.NoError(t, err) next, ok := kvIter.Next(context.Background()) assert.False(t, ok) diff --git a/slatedb/compactor.go b/slatedb/compactor.go index c2973d2..4ba577b 100644 --- a/slatedb/compactor.go +++ b/slatedb/compactor.go @@ -6,6 +6,8 @@ import ( "github.com/slatedb/slatedb-go/internal/assert" "github.com/slatedb/slatedb-go/internal/sstable" "github.com/slatedb/slatedb-go/internal/types" + compaction2 "github.com/slatedb/slatedb-go/slatedb/compaction" + "github.com/slatedb/slatedb-go/slatedb/store" "log/slog" "math" "sync" @@ -28,7 +30,7 @@ const ( ) type WorkerToOrchestratorMsg struct { - CompactionResult *SortedRun + CompactionResult *compaction2.SortedRun CompactionError error } @@ -46,7 +48,7 @@ type Compactor struct { compactorWG *sync.WaitGroup } -func newCompactor(manifestStore *ManifestStore, tableStore *TableStore, opts DBOptions) (*Compactor, error) { +func newCompactor(manifestStore *ManifestStore, tableStore *store.TableStore, opts DBOptions) (*Compactor, error) { compactorMsgCh := make(chan CompactorMainMsg, math.MaxUint8) compactorWG, errCh := spawnAndRunCompactorOrchestrator(manifestStore, tableStore, opts, compactorMsgCh) @@ -71,7 +73,7 @@ func (c *Compactor) close() { func spawnAndRunCompactorOrchestrator( manifestStore *ManifestStore, - tableStore *TableStore, + tableStore *store.TableStore, opts DBOptions, compactorMsgCh <-chan CompactorMainMsg, ) (*sync.WaitGroup, chan error) { @@ -139,7 +141,7 @@ type CompactorOrchestrator struct { func newCompactorOrchestrator( opts DBOptions, manifestStore *ManifestStore, - tableStore *TableStore, + tableStore *store.TableStore, compactorMsgCh <-chan CompactorMainMsg, ) (*CompactorOrchestrator, error) { sm, err := loadStoredManifest(manifestStore) @@ -183,7 +185,7 @@ func loadState(manifest *FenceableManifest) (*CompactorState, error) { if err != nil { return nil, err } - return newCompactorState(dbState.clone(), nil), nil + return newCompactorState(dbState.Clone(), nil), nil } func loadCompactionScheduler() CompactionScheduler { @@ -232,22 +234,22 @@ func (o *CompactorOrchestrator) startCompaction(compaction Compaction) { dbState := o.state.dbState sstsByID := make(map[ulid.ULID]sstable.Handle) - for _, sst := range dbState.l0 { + for _, sst := range dbState.L0 { id, ok := sst.Id.CompactedID().Get() assert.True(ok, "expected valid compacted ID") sstsByID[id] = sst } - for _, sr := range dbState.compacted { - for _, sst := range sr.sstList { + for _, sr := range dbState.Compacted { + for _, sst := range sr.SSTList { id, ok := sst.Id.CompactedID().Get() assert.True(ok, "expected valid compacted ID") sstsByID[id] = sst } } - srsByID := make(map[uint32]SortedRun) - for _, sr := range dbState.compacted { - srsByID[sr.id] = sr + srsByID := make(map[uint32]compaction2.SortedRun) + for _, sr := range dbState.Compacted { + srsByID[sr.ID] = sr } ssts := make([]sstable.Handle, 0) @@ -258,7 +260,7 @@ func (o *CompactorOrchestrator) startCompaction(compaction Compaction) { } } - sortedRuns := make([]SortedRun, 0) + sortedRuns := make([]compaction2.SortedRun, 0) for _, sID := range compaction.sources { srID, ok := sID.sortedRunID().Get() if ok { @@ -273,7 +275,7 @@ func (o *CompactorOrchestrator) startCompaction(compaction Compaction) { }) } -func (o *CompactorOrchestrator) finishCompaction(outputSR *SortedRun) error { +func (o *CompactorOrchestrator) finishCompaction(outputSR *compaction2.SortedRun) error { o.state.finishCompaction(outputSR) o.logCompactionState() err := o.writeManifest() @@ -295,7 +297,7 @@ func (o *CompactorOrchestrator) writeManifest() error { return err } - core := o.state.dbState.clone() + core := o.state.dbState.Clone() err = o.manifest.updateDBState(core) if errors.Is(err, common.ErrManifestVersionExists) { o.log.Warn("conflicting manifest version. retry write", "error", err) @@ -329,12 +331,12 @@ func (o *CompactorOrchestrator) logCompactionState() { type CompactionJob struct { destination uint32 sstList []sstable.Handle - sortedRuns []SortedRun + sortedRuns []compaction2.SortedRun } type CompactionExecutor struct { options *CompactorOptions - tableStore *TableStore + tableStore *store.TableStore workerCh chan<- WorkerToOrchestratorMsg tasksWG sync.WaitGroup @@ -345,7 +347,7 @@ type CompactionExecutor struct { func newCompactorExecutor( options *CompactorOptions, workerCh chan<- WorkerToOrchestratorMsg, - tableStore *TableStore, + tableStore *store.TableStore, ) *CompactionExecutor { return &CompactionExecutor{ options: options, @@ -374,7 +376,7 @@ func (e *CompactionExecutor) loadIterators(compaction CompactionJob) (iter.KVIte srIters := make([]iter.KVIterator, 0) for _, sr := range compaction.sortedRuns { - srIter, err := newSortedRunIterator(sr, e.tableStore.Clone()) + srIter, err := compaction2.NewSortedRunIterator(sr, e.tableStore.Clone()) if err != nil { return nil, err } @@ -395,7 +397,7 @@ func (e *CompactionExecutor) loadIterators(compaction CompactionJob) (iter.KVIte return it, nil } -func (e *CompactionExecutor) executeCompaction(compaction CompactionJob) (*SortedRun, error) { +func (e *CompactionExecutor) executeCompaction(compaction CompactionJob) (*compaction2.SortedRun, error) { allIter, err := e.loadIterators(compaction) if err != nil { return nil, err @@ -444,9 +446,9 @@ func (e *CompactionExecutor) executeCompaction(compaction CompactionJob) (*Sorte } outputSSTs = append(outputSSTs, *sst) } - return &SortedRun{ - id: compaction.destination, - sstList: outputSSTs, + return &compaction2.SortedRun{ + ID: compaction.destination, + SSTList: outputSSTs, }, warn.If() } diff --git a/slatedb/compactor_state.go b/slatedb/compactor_state.go index c819752..7953d86 100644 --- a/slatedb/compactor_state.go +++ b/slatedb/compactor_state.go @@ -4,6 +4,8 @@ import ( "github.com/kapetan-io/tackle/set" "github.com/slatedb/slatedb-go/internal/assert" "github.com/slatedb/slatedb-go/internal/sstable" + compaction2 "github.com/slatedb/slatedb-go/slatedb/compaction" + "github.com/slatedb/slatedb-go/slatedb/state" "log/slog" "math" "strconv" @@ -88,12 +90,12 @@ func newCompaction(sources []SourceID, destination uint32) Compaction { // ------------------------------------------------ type CompactorState struct { - dbState *CoreDBState + dbState *state.CoreStateSnapshot compactions map[uint32]Compaction log *slog.Logger } -func newCompactorState(dbState *CoreDBState, log *slog.Logger) *CompactorState { +func newCompactorState(dbState *state.CoreStateSnapshot, log *slog.Logger) *CompactorState { set.Default(&log, slog.Default()) return &CompactorState{ @@ -110,8 +112,8 @@ func (c *CompactorState) submitCompaction(compaction Compaction) error { return common.ErrInvalidCompaction } - for _, sr := range c.dbState.compacted { - if sr.id == compaction.destination { + for _, sr := range c.dbState.Compacted { + if sr.ID == compaction.destination { if !c.oneOfTheSourceSRMatchesDestination(compaction) { // the compaction overwrites an existing sr but doesn't include the sr return common.ErrInvalidCompaction @@ -137,12 +139,12 @@ func (c *CompactorState) oneOfTheSourceSRMatchesDestination(compaction Compactio return false } -func (c *CompactorState) refreshDBState(writerState *CoreDBState) { +func (c *CompactorState) refreshDBState(writerState *state.CoreStateSnapshot) { // the writer may have added more l0 SSTs. Add these to our l0 list. - lastCompactedL0 := c.dbState.l0LastCompacted + lastCompactedL0 := c.dbState.L0LastCompacted mergedL0s := make([]sstable.Handle, 0) - for _, writerL0SST := range writerState.l0 { + for _, writerL0SST := range writerState.L0 { assert.True(writerL0SST.Id.Type == sstable.Compacted, "unexpected sstable.ID type") writerL0ID, _ := writerL0SST.Id.CompactedID().Get() // we stop appending to our l0 list if we encounter sstID equal to lastCompactedID @@ -153,17 +155,17 @@ func (c *CompactorState) refreshDBState(writerState *CoreDBState) { mergedL0s = append(mergedL0s, writerL0SST) } - merged := c.dbState.clone() - merged.l0 = mergedL0s - merged.lastCompactedWalSSTID.Store(writerState.lastCompactedWalSSTID.Load()) - merged.nextWalSstID.Store(writerState.nextWalSstID.Load()) + merged := c.dbState.Clone() + merged.L0 = mergedL0s + merged.LastCompactedWalSSTID.Store(writerState.LastCompactedWalSSTID.Load()) + merged.NextWalSstID.Store(writerState.NextWalSstID.Load()) c.dbState = merged } // update dbState by removing L0 SSTs and compacted SortedRuns that are present // in Compaction.sources -func (c *CompactorState) finishCompaction(outputSR *SortedRun) { - compaction, ok := c.compactions[outputSR.id] +func (c *CompactorState) finishCompaction(outputSR *compaction2.SortedRun) { + compaction, ok := c.compactions[outputSR.ID] if !ok { return } @@ -182,9 +184,9 @@ func (c *CompactorState) finishCompaction(outputSR *SortedRun) { } compactionSRs[compaction.destination] = true - dbState := c.dbState.clone() + dbState := c.dbState.Clone() newL0 := make([]sstable.Handle, 0) - for _, sst := range dbState.l0 { + for _, sst := range dbState.L0 { assert.True(sst.Id.CompactedID().IsPresent(), "Expected compactedID not present") l0ID, _ := sst.Id.CompactedID().Get() _, ok := compactionL0s[l0ID] @@ -193,14 +195,14 @@ func (c *CompactorState) finishCompaction(outputSR *SortedRun) { } } - newCompacted := make([]SortedRun, 0) + newCompacted := make([]compaction2.SortedRun, 0) inserted := false - for _, sr := range dbState.compacted { - if !inserted && outputSR.id >= sr.id { + for _, sr := range dbState.Compacted { + if !inserted && outputSR.ID >= sr.ID { newCompacted = append(newCompacted, *outputSR) inserted = true } - _, ok := compactionSRs[sr.id] + _, ok := compactionSRs[sr.ID] if !ok { newCompacted = append(newCompacted, sr) } @@ -215,20 +217,20 @@ func (c *CompactorState) finishCompaction(outputSR *SortedRun) { firstSource := compaction.sources[0] if firstSource.sstID().IsPresent() { compactedL0, _ := firstSource.sstID().Get() - dbState.l0LastCompacted = mo.Some(compactedL0) + dbState.L0LastCompacted = mo.Some(compactedL0) } - dbState.l0 = newL0 - dbState.compacted = newCompacted + dbState.L0 = newL0 + dbState.Compacted = newCompacted c.dbState = dbState - delete(c.compactions, outputSR.id) + delete(c.compactions, outputSR.ID) } // sortedRun list should have IDs in decreasing order -func (c *CompactorState) assertCompactedSRsInIDOrder(compacted []SortedRun) { +func (c *CompactorState) assertCompactedSRsInIDOrder(compacted []compaction2.SortedRun) { lastSortedRunID := uint32(math.MaxUint32) for _, sr := range compacted { - assert.True(sr.id < lastSortedRunID, "compacted sortedRuns not in decreasing order") - lastSortedRunID = sr.id + assert.True(sr.ID < lastSortedRunID, "compacted sortedRuns not in decreasing order") + lastSortedRunID = sr.ID } } diff --git a/slatedb/compactor_state_test.go b/slatedb/compactor_state_test.go index dfbe103..ee4dac0 100644 --- a/slatedb/compactor_state_test.go +++ b/slatedb/compactor_state_test.go @@ -4,6 +4,8 @@ import ( "context" assert2 "github.com/slatedb/slatedb-go/internal/assert" "github.com/slatedb/slatedb-go/internal/sstable" + compaction2 "github.com/slatedb/slatedb-go/slatedb/compaction" + "github.com/slatedb/slatedb-go/slatedb/state" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" @@ -14,57 +16,57 @@ import ( var testPath = "/test/db" func TestShouldRegisterCompactionAsSubmitted(t *testing.T) { - _, _, state := buildTestState(t) - err := state.submitCompaction(buildL0Compaction(state.dbState.l0, 0)) + _, _, compactorState := buildTestState(t) + err := compactorState.submitCompaction(buildL0Compaction(compactorState.dbState.L0, 0)) assert.NoError(t, err) - assert.Equal(t, 1, len(state.compactions)) - assert.Equal(t, Submitted, state.compactions[0].status) + assert.Equal(t, 1, len(compactorState.compactions)) + assert.Equal(t, Submitted, compactorState.compactions[0].status) } func TestShouldUpdateDBStateWhenCompactionFinished(t *testing.T) { - _, _, state := buildTestState(t) - beforeCompaction := state.dbState.clone() - compaction := buildL0Compaction(beforeCompaction.l0, 0) - err := state.submitCompaction(compaction) + _, _, compactorState := buildTestState(t) + beforeCompaction := compactorState.dbState.Clone() + compaction := buildL0Compaction(beforeCompaction.L0, 0) + err := compactorState.submitCompaction(compaction) assert.NoError(t, err) - sr := SortedRun{ - id: 0, - sstList: beforeCompaction.l0, + sr := compaction2.SortedRun{ + ID: 0, + SSTList: beforeCompaction.L0, } - state.finishCompaction(sr.clone()) + compactorState.finishCompaction(sr.Clone()) - compactedID, _ := beforeCompaction.l0[0].Id.CompactedID().Get() - l0LastCompacted, _ := state.dbState.l0LastCompacted.Get() + compactedID, _ := beforeCompaction.L0[0].Id.CompactedID().Get() + l0LastCompacted, _ := compactorState.dbState.L0LastCompacted.Get() assert.Equal(t, compactedID, l0LastCompacted) - assert.Equal(t, 0, len(state.dbState.l0)) - assert.Equal(t, 1, len(state.dbState.compacted)) - assert.Equal(t, sr.id, state.dbState.compacted[0].id) - compactedSR := state.dbState.compacted[0] - for i := 0; i < len(sr.sstList); i++ { - assert.Equal(t, sr.sstList[i].Id, compactedSR.sstList[i].Id) + assert.Equal(t, 0, len(compactorState.dbState.L0)) + assert.Equal(t, 1, len(compactorState.dbState.Compacted)) + assert.Equal(t, sr.ID, compactorState.dbState.Compacted[0].ID) + compactedSR := compactorState.dbState.Compacted[0] + for i := 0; i < len(sr.SSTList); i++ { + assert.Equal(t, sr.SSTList[i].Id, compactedSR.SSTList[i].Id) } } func TestShouldRemoveCompactionWhenCompactionFinished(t *testing.T) { - _, _, state := buildTestState(t) - beforeCompaction := state.dbState.clone() - compaction := buildL0Compaction(beforeCompaction.l0, 0) - err := state.submitCompaction(compaction) + _, _, compactorState := buildTestState(t) + beforeCompaction := compactorState.dbState.Clone() + compaction := buildL0Compaction(beforeCompaction.L0, 0) + err := compactorState.submitCompaction(compaction) assert.NoError(t, err) - sr := SortedRun{ - id: 0, - sstList: beforeCompaction.l0, + sr := compaction2.SortedRun{ + ID: 0, + SSTList: beforeCompaction.L0, } - state.finishCompaction(sr.clone()) + compactorState.finishCompaction(sr.Clone()) - assert.Equal(t, 0, len(state.compactions)) + assert.Equal(t, 0, len(compactorState.compactions)) } func TestShouldRefreshDBStateCorrectlyWhenNeverCompacted(t *testing.T) { - bucket, sm, state := buildTestState(t) + bucket, sm, compactorState := buildTestState(t) option := DefaultDBOptions() option.L0SSTSizeBytes = 128 db, err := OpenWithOptions(context.Background(), testPath, bucket, option) @@ -73,27 +75,27 @@ func TestShouldRefreshDBStateCorrectlyWhenNeverCompacted(t *testing.T) { db.Put(repeatedChar('a', 16), repeatedChar('b', 48)) db.Put(repeatedChar('j', 16), repeatedChar('k', 48)) - writerDBState := waitForManifestWithL0Len(sm, len(state.dbState.l0)+1) + writerDBState := waitForManifestWithL0Len(sm, len(compactorState.dbState.L0)+1) - state.refreshDBState(writerDBState) + compactorState.refreshDBState(writerDBState) - assert.True(t, state.dbState.l0LastCompacted.IsAbsent()) - for i := 0; i < len(writerDBState.l0); i++ { - assert.Equal(t, writerDBState.l0[i].Id.CompactedID(), state.dbState.l0[i].Id.CompactedID()) + assert.True(t, compactorState.dbState.L0LastCompacted.IsAbsent()) + for i := 0; i < len(writerDBState.L0); i++ { + assert.Equal(t, writerDBState.L0[i].Id.CompactedID(), compactorState.dbState.L0[i].Id.CompactedID()) } } func TestShouldRefreshDBStateCorrectly(t *testing.T) { - bucket, sm, state := buildTestState(t) - originalL0s := state.dbState.clone().l0 + bucket, sm, compactorState := buildTestState(t) + originalL0s := compactorState.dbState.Clone().L0 compactedID, ok := originalL0s[len(originalL0s)-1].Id.CompactedID().Get() assert.True(t, ok) compaction := newCompaction([]SourceID{newSourceIDSST(compactedID)}, 0) - err := state.submitCompaction(compaction) + err := compactorState.submitCompaction(compaction) assert.NoError(t, err) - state.finishCompaction(&SortedRun{ - id: 0, - sstList: []sstable.Handle{originalL0s[len(originalL0s)-1]}, + compactorState.finishCompaction(&compaction2.SortedRun{ + ID: 0, + SSTList: []sstable.Handle{originalL0s[len(originalL0s)-1]}, }) option := DefaultDBOptions() @@ -104,35 +106,35 @@ func TestShouldRefreshDBStateCorrectly(t *testing.T) { db.Put(repeatedChar('a', 16), repeatedChar('b', 48)) db.Put(repeatedChar('j', 16), repeatedChar('k', 48)) writerDBState := waitForManifestWithL0Len(sm, len(originalL0s)+1) - dbStateBeforeMerge := state.dbState.clone() + dbStateBeforeMerge := compactorState.dbState.Clone() - state.refreshDBState(writerDBState) + compactorState.refreshDBState(writerDBState) - dbState := state.dbState + dbState := compactorState.dbState // last sst was removed during compaction expectedMergedL0s := originalL0s[:len(originalL0s)-1] // new sst got added during db.Put() call above - expectedMergedL0s = append([]sstable.Handle{writerDBState.l0[0]}, expectedMergedL0s...) + expectedMergedL0s = append([]sstable.Handle{writerDBState.L0[0]}, expectedMergedL0s...) for i := 0; i < len(expectedMergedL0s); i++ { expected, _ := expectedMergedL0s[i].Id.CompactedID().Get() - actual, _ := dbState.l0[i].Id.CompactedID().Get() + actual, _ := dbState.L0[i].Id.CompactedID().Get() assert.Equal(t, expected, actual) } - for i := 0; i < len(dbStateBeforeMerge.compacted); i++ { - srBefore := dbStateBeforeMerge.compacted[i] - srAfter := dbState.compacted[i] - assert.Equal(t, srBefore.id, srAfter.id) - for j := 0; j < len(srBefore.sstList); j++ { - assert.Equal(t, srBefore.sstList[j].Id, srAfter.sstList[j].Id) + for i := 0; i < len(dbStateBeforeMerge.Compacted); i++ { + srBefore := dbStateBeforeMerge.Compacted[i] + srAfter := dbState.Compacted[i] + assert.Equal(t, srBefore.ID, srAfter.ID) + for j := 0; j < len(srBefore.SSTList); j++ { + assert.Equal(t, srBefore.SSTList[j].Id, srAfter.SSTList[j].Id) } } - assert.Equal(t, writerDBState.lastCompactedWalSSTID.Load(), dbState.lastCompactedWalSSTID.Load()) - assert.Equal(t, writerDBState.nextWalSstID.Load(), dbState.nextWalSstID.Load()) + assert.Equal(t, writerDBState.LastCompactedWalSSTID.Load(), dbState.LastCompactedWalSSTID.Load()) + assert.Equal(t, writerDBState.NextWalSstID.Load(), dbState.NextWalSstID.Load()) } func TestShouldRefreshDBStateCorrectlyWhenAllL0Compacted(t *testing.T) { - bucket, sm, state := buildTestState(t) - originalL0s := state.dbState.clone().l0 + bucket, sm, compactorState := buildTestState(t) + originalL0s := compactorState.dbState.Clone().L0 sourceIDs := make([]SourceID, 0) for _, sst := range originalL0s { @@ -141,13 +143,13 @@ func TestShouldRefreshDBStateCorrectlyWhenAllL0Compacted(t *testing.T) { sourceIDs = append(sourceIDs, newSourceIDSST(id)) } compaction := newCompaction(sourceIDs, 0) - err := state.submitCompaction(compaction) + err := compactorState.submitCompaction(compaction) assert.NoError(t, err) - state.finishCompaction(&SortedRun{ - id: 0, - sstList: originalL0s, + compactorState.finishCompaction(&compaction2.SortedRun{ + ID: 0, + SSTList: originalL0s, }) - assert.Equal(t, 0, len(state.dbState.l0)) + assert.Equal(t, 0, len(compactorState.dbState.L0)) option := DefaultDBOptions() option.L0SSTSizeBytes = 128 @@ -158,22 +160,22 @@ func TestShouldRefreshDBStateCorrectlyWhenAllL0Compacted(t *testing.T) { db.Put(repeatedChar('j', 16), repeatedChar('k', 48)) writerDBState := waitForManifestWithL0Len(sm, len(originalL0s)+1) - state.refreshDBState(writerDBState) + compactorState.refreshDBState(writerDBState) - dbState := state.dbState - assert.Equal(t, 1, len(dbState.l0)) - expectedID, _ := writerDBState.l0[0].Id.CompactedID().Get() - actualID, _ := dbState.l0[0].Id.CompactedID().Get() + dbState := compactorState.dbState + assert.Equal(t, 1, len(dbState.L0)) + expectedID, _ := writerDBState.L0[0].Id.CompactedID().Get() + actualID, _ := dbState.L0[0].Id.CompactedID().Get() assert.Equal(t, expectedID, actualID) } -func waitForManifestWithL0Len(storedManifest StoredManifest, size int) *CoreDBState { +func waitForManifestWithL0Len(storedManifest StoredManifest, size int) *state.CoreStateSnapshot { startTime := time.Now() for time.Since(startTime) < time.Second*10 { dbState, err := storedManifest.refresh() assert2.True(err == nil, "") - if len(dbState.l0) == size { - return dbState.clone() + if len(dbState.L0) == size { + return dbState.Clone() } time.Sleep(time.Millisecond * 50) } @@ -213,6 +215,6 @@ func buildTestState(t *testing.T) (objstore.Bucket, StoredManifest, *CompactorSt assert.True(t, err == nil, "Could not load stored manifest") assert.True(t, sm.IsPresent(), "Could not find stored manifest") storedManifest, _ := sm.Get() - state := newCompactorState(storedManifest.dbState(), nil) - return bucket, storedManifest, state + compactorState := newCompactorState(storedManifest.dbState(), nil) + return bucket, storedManifest, compactorState } diff --git a/slatedb/compactor_test.go b/slatedb/compactor_test.go index bbb5337..c4c0efe 100644 --- a/slatedb/compactor_test.go +++ b/slatedb/compactor_test.go @@ -6,6 +6,8 @@ import ( "github.com/slatedb/slatedb-go/internal/compress" "github.com/slatedb/slatedb-go/internal/sstable" "github.com/slatedb/slatedb-go/internal/types" + "github.com/slatedb/slatedb-go/slatedb/state" + "github.com/slatedb/slatedb-go/slatedb/store" "log/slog" "math" "slices" @@ -28,26 +30,24 @@ func TestCompactorCompactsL0(t *testing.T) { } startTime := time.Now() - dbState := mo.None[*CoreDBState]() + dbState := mo.None[*state.CoreStateSnapshot]() for time.Since(startTime) < time.Second*10 { sm, err := loadStoredManifest(manifestStore) assert.NoError(t, err) assert.True(t, sm.IsPresent()) storedManifest, _ := sm.Get() - state := storedManifest.dbState() - if state.l0LastCompacted.IsPresent() { - dbState = mo.Some(state.clone()) + if storedManifest.dbState().L0LastCompacted.IsPresent() { + dbState = mo.Some(storedManifest.dbState().Clone()) break } time.Sleep(time.Millisecond * 50) } assert.True(t, dbState.IsPresent()) - state, _ := dbState.Get() - assert.True(t, state.l0LastCompacted.IsPresent()) - assert.Equal(t, 1, len(state.compacted)) + assert.True(t, dbState.MustGet().L0LastCompacted.IsPresent()) + assert.Equal(t, 1, len(dbState.MustGet().Compacted)) - compactedSSTList := state.compacted[0].sstList + compactedSSTList := dbState.MustGet().Compacted[0].SSTList assert.Equal(t, 1, len(compactedSSTList)) sst := compactedSSTList[0] @@ -87,7 +87,7 @@ func TestShouldWriteManifestSafely(t *testing.T) { assert.NoError(t, err) l0IDsToCompact := make([]SourceID, 0) - for _, sst := range orchestrator.state.dbState.l0 { + for _, sst := range orchestrator.state.dbState.L0 { id, ok := sst.Id.CompactedID().Get() assert.True(t, ok) l0IDsToCompact = append(l0IDsToCompact, newSourceIDSST(id)) @@ -111,22 +111,22 @@ func TestShouldWriteManifestSafely(t *testing.T) { // Key aaa... will be compacted and Key jjj... will be in Level0 dbState, err := storedManifest.refresh() assert.NoError(t, err) - assert.Equal(t, 1, len(dbState.l0)) - assert.Equal(t, 1, len(dbState.compacted)) + assert.Equal(t, 1, len(dbState.L0)) + assert.Equal(t, 1, len(dbState.Compacted)) - l0ID, ok := dbState.l0[0].Id.CompactedID().Get() + l0ID, ok := dbState.L0[0].Id.CompactedID().Get() assert.True(t, ok) compactedSSTIDs := make([]ulid.ULID, 0) - for _, sst := range dbState.compacted[0].sstList { + for _, sst := range dbState.Compacted[0].SSTList { id, ok := sst.Id.CompactedID().Get() assert.True(t, ok) compactedSSTIDs = append(compactedSSTIDs, id) } assert.False(t, slices.Contains(compactedSSTIDs, l0ID)) - assert.Equal(t, l0IDsToCompact[0].sstID(), dbState.l0LastCompacted) + assert.Equal(t, l0IDsToCompact[0].sstID(), dbState.L0LastCompacted) } -func buildTestDB(options DBOptions) (objstore.Bucket, *ManifestStore, *TableStore, *DB) { +func buildTestDB(options DBOptions) (objstore.Bucket, *ManifestStore, *store.TableStore, *DB) { bucket := objstore.NewInMemBucket() db, err := OpenWithOptions(context.Background(), testPath, bucket, options) assert2.True(err == nil, "Failed to open test database") @@ -135,7 +135,7 @@ func buildTestDB(options DBOptions) (objstore.Bucket, *ManifestStore, *TableStor conf.MinFilterKeys = 10 conf.Compression = options.CompressionCodec manifestStore := newManifestStore(testPath, bucket) - tableStore := NewTableStore(bucket, conf, testPath) + tableStore := store.NewTableStore(bucket, conf, testPath) return bucket, manifestStore, tableStore, db } diff --git a/slatedb/db.go b/slatedb/db.go index 9452b63..5711ddd 100644 --- a/slatedb/db.go +++ b/slatedb/db.go @@ -9,6 +9,9 @@ import ( "github.com/slatedb/slatedb-go/internal/assert" "github.com/slatedb/slatedb-go/internal/sstable" "github.com/slatedb/slatedb-go/internal/types" + "github.com/slatedb/slatedb-go/slatedb/compaction" + "github.com/slatedb/slatedb-go/slatedb/state" + "github.com/slatedb/slatedb-go/slatedb/store" "log/slog" "math" "sync" @@ -21,10 +24,10 @@ const BlockSize = 4096 type DB struct { manifest *FenceableManifest - tableStore *TableStore + tableStore *store.TableStore compactor *Compactor opts DBOptions - state *DBState + state *state.DBState // walFlushNotifierCh - When DB.Close is called, we send a notification to this channel // and the goroutine running the walFlush task reads this channel and shuts down @@ -52,7 +55,7 @@ func OpenWithOptions(ctx context.Context, path string, bucket objstore.Bucket, o conf.Compression = options.CompressionCodec set.Default(&options.Log, slog.Default()) - tableStore := NewTableStore(bucket, conf, path) + tableStore := store.NewTableStore(bucket, conf, path) manifestStore := newManifestStore(path, bucket) manifest, err := getManifest(manifestStore) @@ -61,12 +64,12 @@ func OpenWithOptions(ctx context.Context, path string, bucket objstore.Bucket, o } memtableFlushNotifierCh := make(chan MemtableFlushThreadMsg, math.MaxUint8) - state, err := manifest.dbState() + dbState, err := manifest.dbState() if err != nil { return nil, err } - db, err := newDB(ctx, options, tableStore, state, memtableFlushNotifierCh) + db, err := newDB(ctx, options, tableStore, dbState.ToCoreState(), memtableFlushNotifierCh) if err != nil { return nil, fmt.Errorf("during db init: %w", err) } @@ -136,16 +139,16 @@ func (db *DB) Get(ctx context.Context, key []byte) ([]byte, error) { // if readlevel is Committed we start searching key in the following order // mutable memtable, immutable memtables, SSTs in L0, compacted Sorted runs func (db *DB) GetWithOptions(ctx context.Context, key []byte, options ReadOptions) ([]byte, error) { - snapshot := db.state.snapshot() + snapshot := db.state.Snapshot() if options.ReadLevel == Uncommitted { // search for key in mutable WAL - val, ok := snapshot.wal.Get(key).Get() + val, ok := snapshot.Wal.Get(key).Get() if ok { // key is present or tombstoned return checkValue(val) } // search for key in ImmutableWALs - immWALList := snapshot.immWALs + immWALList := snapshot.ImmWALs for i := 0; i < immWALList.Len(); i++ { immWAL := immWALList.At(i) val, ok := immWAL.Get(key).Get() @@ -156,12 +159,12 @@ func (db *DB) GetWithOptions(ctx context.Context, key []byte, options ReadOption } // search for key in mutable memtable - val, ok := snapshot.memtable.Get(key).Get() + val, ok := snapshot.Memtable.Get(key).Get() if ok { // key is present or tombstoned return checkValue(val) } // search for key in Immutable memtables - immMemtables := snapshot.immMemtables + immMemtables := snapshot.ImmMemtables for i := 0; i < immMemtables.Len(); i++ { immTable := immMemtables.At(i) val, ok := immTable.Get(key).Get() @@ -171,7 +174,7 @@ func (db *DB) GetWithOptions(ctx context.Context, key []byte, options ReadOption } // search for key in SSTs in L0 - for _, sst := range snapshot.core.l0 { + for _, sst := range snapshot.Core.L0 { if db.sstMayIncludeKey(sst, key) { iter, err := sstable.NewIteratorAtKey(&sst, key, db.tableStore.Clone()) if err != nil { @@ -186,9 +189,9 @@ func (db *DB) GetWithOptions(ctx context.Context, key []byte, options ReadOption } // search for key in compacted Sorted runs - for _, sr := range snapshot.core.compacted { + for _, sr := range snapshot.Core.Compacted { if db.srMayIncludeKey(sr, key) { - iter, err := newSortedRunIteratorFromKey(sr, key, db.tableStore.Clone()) + iter, err := compaction.NewSortedRunIteratorFromKey(sr, key, db.tableStore.Clone()) if err != nil { return nil, err } @@ -228,8 +231,8 @@ func (db *DB) sstMayIncludeKey(sst sstable.Handle, key []byte) bool { return true } -func (db *DB) srMayIncludeKey(sr SortedRun, key []byte) bool { - sstOption := sr.sstWithKey(key) +func (db *DB) srMayIncludeKey(sr compaction.SortedRun, key []byte) bool { + sstOption := sr.SstWithKey(key) if sstOption.IsAbsent() { return false } @@ -246,7 +249,7 @@ func (db *DB) srMayIncludeKey(sr SortedRun, key []byte) bool { // and write the kv pairs to memtable func (db *DB) replayWAL(ctx context.Context) error { walIDLastCompacted := db.state.LastCompactedWALID() - walSSTList, err := db.tableStore.getWalSSTList(walIDLastCompacted) + walSSTList, err := db.tableStore.GetWalSSTList(walIDLastCompacted) if err != nil { return err } @@ -286,7 +289,7 @@ func (db *DB) replayWAL(ctx context.Context) error { db.maybeFreezeMemtable(db.state, sstID) if db.state.NextWALID() == sstID { - db.state.incrementNextWALID() + db.state.IncrementNextWALID() } } @@ -294,11 +297,11 @@ func (db *DB) replayWAL(ctx context.Context) error { return nil } -func (db *DB) maybeFreezeMemtable(state *DBState, walID uint64) { - if state.Memtable().Size() < int64(db.opts.L0SSTSizeBytes) { +func (db *DB) maybeFreezeMemtable(dbState *state.DBState, walID uint64) { + if dbState.Memtable().Size() < int64(db.opts.L0SSTSizeBytes) { return } - state.freezeMemtable(walID) + dbState.FreezeMemtable(walID) db.memtableFlushNotifierCh <- FlushImmutableMemtables } @@ -311,7 +314,7 @@ func (db *DB) FlushMemtableToL0() error { } walID, _ := lastWalID.Get() - db.state.freezeMemtable(walID) + db.state.FreezeMemtable(walID) flusher := MemtableFlusher{ db: db, @@ -332,7 +335,7 @@ func getManifest(manifestStore *ManifestStore) (*FenceableManifest, error) { if ok { storedManifest = &sm } else { - storedManifest, err = newStoredManifest(manifestStore, newCoreDBState()) + storedManifest, err = newStoredManifest(manifestStore, state.NewCoreDBState()) if err != nil { return nil, err } @@ -344,14 +347,14 @@ func getManifest(manifestStore *ManifestStore) (*FenceableManifest, error) { func newDB( ctx context.Context, options DBOptions, - tableStore *TableStore, - coreDBState *CoreDBState, + tableStore *store.TableStore, + coreDBState *state.CoreDBState, memtableFlushNotifierCh chan<- MemtableFlushThreadMsg, ) (*DB, error) { - state := newDBState(coreDBState) + dbState := state.NewDBState(coreDBState) db := &DB{ - state: state, + state: dbState, opts: options, tableStore: tableStore, memtableFlushNotifierCh: memtableFlushNotifierCh, diff --git a/slatedb/db_test.go b/slatedb/db_test.go index 1b773ea..21a2bb6 100644 --- a/slatedb/db_test.go +++ b/slatedb/db_test.go @@ -7,6 +7,8 @@ import ( "github.com/slatedb/slatedb-go/internal/compress" "github.com/slatedb/slatedb-go/internal/sstable" "github.com/slatedb/slatedb-go/internal/types" + "github.com/slatedb/slatedb-go/slatedb/state" + "github.com/slatedb/slatedb-go/slatedb/store" "github.com/stretchr/testify/require" "math" "strconv" @@ -111,7 +113,7 @@ func TestPutFlushesMemtable(t *testing.T) { storedManifest, _ := stored.Get() conf := sstable.DefaultConfig() conf.MinFilterKeys = 10 - tableStore := NewTableStore(bucket, conf, dbPath) + tableStore := store.NewTableStore(bucket, conf, dbPath) lastCompacted := uint64(0) for i := 0; i < 3; i++ { @@ -123,16 +125,16 @@ func TestPutFlushesMemtable(t *testing.T) { value = repeatedChar(rune('k'+i), 50) db.Put(key, value) - dbState := waitForManifestCondition(storedManifest, time.Second*30, func(state *CoreDBState) bool { - return state.lastCompactedWalSSTID.Load() > lastCompacted + dbState := waitForManifestCondition(storedManifest, time.Second*30, func(state *state.CoreStateSnapshot) bool { + return state.LastCompactedWalSSTID.Load() > lastCompacted }) - assert.Equal(t, uint64(i*2+2), dbState.lastCompactedWalSSTID.Load()) - lastCompacted = dbState.lastCompactedWalSSTID.Load() + assert.Equal(t, uint64(i*2+2), dbState.LastCompactedWalSSTID.Load()) + lastCompacted = dbState.LastCompactedWalSSTID.Load() } dbState, err := storedManifest.refresh() require.NoError(t, err) - l0 := dbState.l0 + l0 := dbState.L0 ctx := context.Background() assert.Equal(t, 3, len(l0)) for i := 0; i < 3; i++ { @@ -307,7 +309,7 @@ func TestBasicRestore(t *testing.T) { storedManifest, _ := stored.Get() dbState := storedManifest.dbState() - assert.Equal(t, uint64(sstCount+2*l0Count+1), dbState.nextWalSstID.Load()) + assert.Equal(t, uint64(sstCount+2*l0Count+1), dbState.NextWalSstID.Load()) } func TestShouldReadUncommittedIfReadLevelUncommitted(t *testing.T) { @@ -378,10 +380,10 @@ func TestSnapshotState(t *testing.T) { db, err = OpenWithOptions(context.Background(), dbPath, bucket, testDBOptions(0, 128)) require.NoError(t, err) defer db.Close() - snapshot := db.state.snapshot() - assert.Equal(t, uint64(2), snapshot.core.lastCompactedWalSSTID.Load()) - assert.Equal(t, uint64(3), snapshot.core.nextWalSstID.Load()) - assert.Equal(t, 2, len(snapshot.core.l0)) + snapshot := db.state.Snapshot() + assert.Equal(t, uint64(2), snapshot.Core.LastCompactedWalSSTID.Load()) + assert.Equal(t, uint64(3), snapshot.Core.NextWalSstID.Load()) + assert.Equal(t, 2, len(snapshot.Core.L0)) val1, err := db.Get(context.Background(), key1) require.NoError(t, err) @@ -437,8 +439,8 @@ func doTestShouldReadCompactedDB(t *testing.T, options DBOptions) { db.Put(repeatedChar(rune('a'+i), 32), bytes.Repeat([]byte{byte(1 + i)}, 32)) db.Put(repeatedChar(rune('m'+i), 32), bytes.Repeat([]byte{byte(13 + i)}, 32)) } - waitForManifestCondition(storedManifest, time.Second*10, func(state *CoreDBState) bool { - return state.l0LastCompacted.IsPresent() && len(state.l0) == 0 + waitForManifestCondition(storedManifest, time.Second*10, func(state *state.CoreStateSnapshot) bool { + return state.L0LastCompacted.IsPresent() && len(state.L0) == 0 }) // write more l0s and wait for compaction @@ -446,8 +448,8 @@ func doTestShouldReadCompactedDB(t *testing.T, options DBOptions) { db.Put(repeatedChar(rune('f'+i), 32), bytes.Repeat([]byte{byte(6 + i)}, 32)) db.Put(repeatedChar(rune('s'+i), 32), bytes.Repeat([]byte{byte(19 + i)}, 32)) } - waitForManifestCondition(storedManifest, time.Second*10, func(state *CoreDBState) bool { - return state.l0LastCompacted.IsPresent() && len(state.l0) == 0 + waitForManifestCondition(storedManifest, time.Second*10, func(state *state.CoreStateSnapshot) bool { + return state.L0LastCompacted.IsPresent() && len(state.L0) == 0 }) // write another l0 @@ -502,8 +504,8 @@ func doTestDeleteAndWaitForCompaction(t *testing.T, options DBOptions) { db.Put(repeatedChar(rune('a'+i), 32), bytes.Repeat([]byte{byte(1 + i)}, 32)) db.Put(repeatedChar(rune('m'+i), 32), bytes.Repeat([]byte{byte(13 + i)}, 32)) } - waitForManifestCondition(storedManifest, time.Second*10, func(state *CoreDBState) bool { - return state.l0LastCompacted.IsPresent() && len(state.l0) == 0 + waitForManifestCondition(storedManifest, time.Second*10, func(state *state.CoreStateSnapshot) bool { + return state.L0LastCompacted.IsPresent() && len(state.L0) == 0 }) // Delete existing keys @@ -516,8 +518,8 @@ func doTestDeleteAndWaitForCompaction(t *testing.T, options DBOptions) { db.Put(repeatedChar(rune('f'+i), 32), bytes.Repeat([]byte{byte(6 + i)}, 32)) db.Put(repeatedChar(rune('s'+i), 32), bytes.Repeat([]byte{byte(19 + i)}, 32)) } - waitForManifestCondition(storedManifest, time.Second*10, func(state *CoreDBState) bool { - return state.l0LastCompacted.IsPresent() && len(state.l0) == 0 + waitForManifestCondition(storedManifest, time.Second*10, func(state *state.CoreStateSnapshot) bool { + return state.L0LastCompacted.IsPresent() && len(state.L0) == 0 }) // verify that keys are deleted @@ -542,14 +544,14 @@ func doTestDeleteAndWaitForCompaction(t *testing.T, options DBOptions) { func waitForManifestCondition( sm StoredManifest, timeout time.Duration, - cond func(state *CoreDBState) bool, -) *CoreDBState { + cond func(state *state.CoreStateSnapshot) bool, +) *state.CoreStateSnapshot { start := time.Now() for time.Since(start) < timeout { dbState, err := sm.refresh() assert2.True(err == nil, "") if cond(dbState) { - return dbState.clone() + return dbState.Clone() } time.Sleep(time.Millisecond * 10) } diff --git a/slatedb/flatbuf_types.go b/slatedb/flatbuf_types.go index efbeebc..28271dd 100644 --- a/slatedb/flatbuf_types.go +++ b/slatedb/flatbuf_types.go @@ -10,6 +10,8 @@ import ( "github.com/slatedb/slatedb-go/internal/compress" "github.com/slatedb/slatedb-go/internal/flatbuf" "github.com/slatedb/slatedb-go/internal/sstable" + "github.com/slatedb/slatedb-go/slatedb/compaction" + "github.com/slatedb/slatedb-go/slatedb/state" ) // ------------------------------------------------ @@ -32,22 +34,22 @@ func (f FlatBufferManifestCodec) decode(data []byte) (*Manifest, error) { } func (f FlatBufferManifestCodec) manifest(manifest *flatbuf.ManifestV1T) *Manifest { - core := &CoreDBState{ - l0: f.parseFlatBufSSTList(manifest.L0), - compacted: f.parseFlatBufSortedRuns(manifest.Compacted), + core := &state.CoreStateSnapshot{ + L0: f.parseFlatBufSSTList(manifest.L0), + Compacted: f.parseFlatBufSortedRuns(manifest.Compacted), } - core.nextWalSstID.Store(manifest.WalIdLastSeen + 1) - core.lastCompactedWalSSTID.Store(manifest.WalIdLastCompacted) + core.NextWalSstID.Store(manifest.WalIdLastSeen + 1) + core.LastCompactedWalSSTID.Store(manifest.WalIdLastCompacted) l0LastCompacted := f.parseFlatBufSSTId(manifest.L0LastCompacted) if l0LastCompacted == ulid.Zero { - core.l0LastCompacted = mo.None[ulid.ULID]() + core.L0LastCompacted = mo.None[ulid.ULID]() } else { - core.l0LastCompacted = mo.Some(l0LastCompacted) + core.L0LastCompacted = mo.Some(l0LastCompacted) } m := &Manifest{} - m.core = core + m.core = core.ToCoreState() m.writerEpoch.Store(manifest.WriterEpoch) m.compactorEpoch.Store(manifest.CompactorEpoch) return m @@ -90,12 +92,12 @@ func (f FlatBufferManifestCodec) parseFlatBufSSTInfo(info *flatbuf.SsTableInfoT) } } -func (f FlatBufferManifestCodec) parseFlatBufSortedRuns(fbSortedRuns []*flatbuf.SortedRunT) []SortedRun { - sortedRuns := make([]SortedRun, 0) +func (f FlatBufferManifestCodec) parseFlatBufSortedRuns(fbSortedRuns []*flatbuf.SortedRunT) []compaction.SortedRun { + sortedRuns := make([]compaction.SortedRun, 0) for _, run := range fbSortedRuns { - sortedRuns = append(sortedRuns, SortedRun{ - id: run.Id, - sstList: f.parseFlatBufSSTList(run.Ssts), + sortedRuns = append(sortedRuns, compaction.SortedRun{ + ID: run.Id, + SSTList: f.parseFlatBufSSTList(run.Ssts), }) } return sortedRuns @@ -114,21 +116,21 @@ func NewDBFlatBufferBuilder(builder *flatbuffers.Builder) DBFlatBufferBuilder { } func (fb *DBFlatBufferBuilder) createManifest(manifest *Manifest) []byte { - core := manifest.core - l0 := fb.sstListToFlatBuf(core.l0) + core := manifest.core.Snapshot() + l0 := fb.sstListToFlatBuf(core.L0) var l0LastCompacted *flatbuf.CompactedSstIdT - if core.l0LastCompacted.IsPresent() { - id, _ := core.l0LastCompacted.Get() + if core.L0LastCompacted.IsPresent() { + id, _ := core.L0LastCompacted.Get() l0LastCompacted = fb.compactedSSTID(id) } - compacted := fb.sortedRunsToFlatBuf(core.compacted) + compacted := fb.sortedRunsToFlatBuf(core.Compacted) manifestV1 := flatbuf.ManifestV1T{ ManifestId: 0, WriterEpoch: manifest.writerEpoch.Load(), CompactorEpoch: manifest.compactorEpoch.Load(), - WalIdLastCompacted: core.lastCompactedWalSSTID.Load(), - WalIdLastSeen: core.nextWalSstID.Load() - 1, + WalIdLastCompacted: core.LastCompactedWalSSTID.Load(), + WalIdLastSeen: core.NextWalSstID.Load() - 1, L0LastCompacted: l0LastCompacted, L0: l0, Compacted: compacted, @@ -169,12 +171,12 @@ func (fb *DBFlatBufferBuilder) compactedSSTID(id ulid.ULID) *flatbuf.CompactedSs } } -func (fb *DBFlatBufferBuilder) sortedRunsToFlatBuf(sortedRuns []SortedRun) []*flatbuf.SortedRunT { +func (fb *DBFlatBufferBuilder) sortedRunsToFlatBuf(sortedRuns []compaction.SortedRun) []*flatbuf.SortedRunT { sortedRunFBs := make([]*flatbuf.SortedRunT, 0) for _, sortedRun := range sortedRuns { sortedRunFBs = append(sortedRunFBs, &flatbuf.SortedRunT{ - Id: sortedRun.id, - Ssts: fb.sstListToFlatBuf(sortedRun.sstList), + Id: sortedRun.ID, + Ssts: fb.sstListToFlatBuf(sortedRun.SSTList), }) } return sortedRunFBs diff --git a/slatedb/flush.go b/slatedb/flush.go index 9835695..e6e3f4a 100644 --- a/slatedb/flush.go +++ b/slatedb/flush.go @@ -38,7 +38,7 @@ func (db *DB) spawnWALFlushTask(walFlushNotifierCh <-chan bool, walFlushTaskWG * // 1. Convert mutable WAL to Immutable WAL // 2. Flush each Immutable WAL to object store and then to memtable func (db *DB) FlushWAL() error { - db.state.freezeWAL() + db.state.FreezeWAL() err := db.flushImmWALs() if err != nil { return err @@ -53,7 +53,7 @@ func (db *DB) FlushWAL() error { // Notify any client(with AwaitDurable set to true) that flush has happened func (db *DB) flushImmWALs() error { for { - oldestWal := db.state.oldestImmWAL() + oldestWal := db.state.OldestImmWAL() if oldestWal.IsAbsent() { break } @@ -64,7 +64,7 @@ func (db *DB) flushImmWALs() error { if err != nil { return err } - db.state.popImmWAL() + db.state.PopImmWAL() // flush to the memtable before notifying so that data is available for reads db.flushImmWALToMemtable(immWal, db.state.Memtable()) @@ -194,12 +194,12 @@ func (m *MemtableFlusher) loadManifest() error { if err != nil { return err } - m.db.state.refreshDBState(currentManifest) + m.db.state.RefreshDBState(currentManifest) return nil } func (m *MemtableFlusher) writeManifest() error { - core := m.db.state.coreStateClone() + core := m.db.state.CoreStateSnapshot() return m.manifest.updateDBState(core) } @@ -223,7 +223,7 @@ func (m *MemtableFlusher) writeManifestSafely() error { func (m *MemtableFlusher) flushImmMemtablesToL0() error { for { - immMemtable := m.db.state.oldestImmMemtable() + immMemtable := m.db.state.OldestImmMemtable() if immMemtable.IsAbsent() { break } @@ -234,7 +234,7 @@ func (m *MemtableFlusher) flushImmMemtablesToL0() error { return err } - m.db.state.moveImmMemtableToL0(immMemtable.MustGet(), sstHandle) + m.db.state.MoveImmMemtableToL0(immMemtable.MustGet(), sstHandle) err = m.writeManifestSafely() if err != nil { return err diff --git a/slatedb/manifest.go b/slatedb/manifest.go index 1b54f11..e14ef65 100644 --- a/slatedb/manifest.go +++ b/slatedb/manifest.go @@ -1,9 +1,12 @@ package slatedb -import "sync/atomic" +import ( + "github.com/slatedb/slatedb-go/slatedb/state" + "sync/atomic" +) type Manifest struct { - core *CoreDBState + core *state.CoreDBState writerEpoch atomic.Uint64 compactorEpoch atomic.Uint64 } diff --git a/slatedb/manifest_store.go b/slatedb/manifest_store.go index 8b10225..1597123 100644 --- a/slatedb/manifest_store.go +++ b/slatedb/manifest_store.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/samber/mo" "github.com/slatedb/slatedb-go/slatedb/common" + "github.com/slatedb/slatedb-go/slatedb/state" "github.com/thanos-io/objstore" "path" "slices" @@ -69,7 +70,7 @@ func initFenceableManifestCompactor(storedManifest *StoredManifest) (*FenceableM return fm, nil } -func (f *FenceableManifest) dbState() (*CoreDBState, error) { +func (f *FenceableManifest) dbState() (*state.CoreStateSnapshot, error) { err := f.checkEpoch() if err != nil { return nil, err @@ -77,7 +78,7 @@ func (f *FenceableManifest) dbState() (*CoreDBState, error) { return f.storedManifest.dbState(), nil } -func (f *FenceableManifest) updateDBState(dbState *CoreDBState) error { +func (f *FenceableManifest) updateDBState(dbState *state.CoreStateSnapshot) error { err := f.checkEpoch() if err != nil { return err @@ -85,7 +86,7 @@ func (f *FenceableManifest) updateDBState(dbState *CoreDBState) error { return f.storedManifest.updateDBState(dbState) } -func (f *FenceableManifest) refresh() (*CoreDBState, error) { +func (f *FenceableManifest) refresh() (*state.CoreStateSnapshot, error) { _, err := f.storedManifest.refresh() if err != nil { return nil, err @@ -128,7 +129,7 @@ type StoredManifest struct { manifestStore *ManifestStore } -func newStoredManifest(store *ManifestStore, core *CoreDBState) (*StoredManifest, error) { +func newStoredManifest(store *ManifestStore, core *state.CoreDBState) (*StoredManifest, error) { manifest := &Manifest{ core: core, } @@ -161,14 +162,14 @@ func loadStoredManifest(store *ManifestStore) (mo.Option[StoredManifest], error) }), nil } -func (s *StoredManifest) dbState() *CoreDBState { - return s.manifest.core +func (s *StoredManifest) dbState() *state.CoreStateSnapshot { + return s.manifest.core.Snapshot() } // write Manifest with updated DB state to object store and update StoredManifest with the new manifest -func (s *StoredManifest) updateDBState(core *CoreDBState) error { +func (s *StoredManifest) updateDBState(coreSnapshot *state.CoreStateSnapshot) error { manifest := &Manifest{ - core: core, + core: coreSnapshot.ToCoreState(), } manifest.writerEpoch.Store(s.manifest.writerEpoch.Load()) manifest.compactorEpoch.Store(s.manifest.compactorEpoch.Load()) @@ -188,7 +189,7 @@ func (s *StoredManifest) updateManifest(manifest *Manifest) error { } // read latest manifest from object store and update StoredManifest with the latest manifest. -func (s *StoredManifest) refresh() (*CoreDBState, error) { +func (s *StoredManifest) refresh() (*state.CoreStateSnapshot, error) { stored, err := s.manifestStore.readLatestManifest() if err != nil { return nil, err @@ -200,7 +201,7 @@ func (s *StoredManifest) refresh() (*CoreDBState, error) { storedInfo, _ := stored.Get() s.manifest = storedInfo.manifest s.id = storedInfo.id - return s.manifest.core, nil + return s.dbState(), nil } // ------------------------------------------------ diff --git a/slatedb/manifest_store_test.go b/slatedb/manifest_store_test.go index 6ba73cb..28bfdda 100644 --- a/slatedb/manifest_store_test.go +++ b/slatedb/manifest_store_test.go @@ -2,6 +2,7 @@ package slatedb import ( "github.com/slatedb/slatedb-go/slatedb/common" + "github.com/slatedb/slatedb-go/slatedb/state" "github.com/stretchr/testify/assert" "github.com/thanos-io/objstore" "testing" @@ -10,9 +11,9 @@ import ( func TestShouldFailWriteOnVersionConflict(t *testing.T) { bucket := objstore.NewInMemBucket() manifestStore := newManifestStore(rootPath, bucket) - state := newCoreDBState() + coreState := state.NewCoreDBState() - sm, err := newStoredManifest(manifestStore, state) + sm, err := newStoredManifest(manifestStore, coreState) assert.NoError(t, err) storedManifest, err := loadStoredManifest(manifestStore) @@ -20,22 +21,22 @@ func TestShouldFailWriteOnVersionConflict(t *testing.T) { sm2, ok := storedManifest.Get() assert.True(t, ok) - err = sm.updateDBState(state) + err = sm.updateDBState(coreState.Snapshot()) assert.NoError(t, err) - err = sm2.updateDBState(state) + err = sm2.updateDBState(coreState.Snapshot()) assert.ErrorIs(t, err, common.ErrManifestVersionExists) } func TestShouldWriteWithNewVersion(t *testing.T) { bucket := objstore.NewInMemBucket() manifestStore := newManifestStore(rootPath, bucket) - state := newCoreDBState() + coreState := state.NewCoreDBState() - sm, err := newStoredManifest(manifestStore, state) + sm, err := newStoredManifest(manifestStore, coreState) assert.NoError(t, err) - err = sm.updateDBState(state) + err = sm.updateDBState(coreState.Snapshot()) assert.NoError(t, err) info, err := manifestStore.readLatestManifest() @@ -49,23 +50,24 @@ func TestShouldWriteWithNewVersion(t *testing.T) { func TestShouldUpdateLocalStateOnWrite(t *testing.T) { bucket := objstore.NewInMemBucket() manifestStore := newManifestStore(rootPath, bucket) - state := newCoreDBState() + coreState := state.NewCoreDBState() - sm, err := newStoredManifest(manifestStore, state) + sm, err := newStoredManifest(manifestStore, coreState) assert.NoError(t, err) - state.nextWalSstID.Store(123) - err = sm.updateDBState(state) + core := coreState.Snapshot() + core.NextWalSstID.Store(123) + err = sm.updateDBState(core) assert.NoError(t, err) - assert.Equal(t, uint64(123), sm.dbState().nextWalSstID.Load()) + assert.Equal(t, uint64(123), sm.dbState().NextWalSstID.Load()) } func TestShouldRefresh(t *testing.T) { bucket := objstore.NewInMemBucket() manifestStore := newManifestStore(rootPath, bucket) - state := newCoreDBState() + coreState := state.NewCoreDBState() - sm, err := newStoredManifest(manifestStore, state) + sm, err := newStoredManifest(manifestStore, coreState) assert.NoError(t, err) storedManifest, err := loadStoredManifest(manifestStore) @@ -73,22 +75,23 @@ func TestShouldRefresh(t *testing.T) { sm2, ok := storedManifest.Get() assert.True(t, ok) - state.nextWalSstID.Store(123) - err = sm.updateDBState(state) + core := coreState.Snapshot() + core.NextWalSstID.Store(123) + err = sm.updateDBState(core) assert.NoError(t, err) refreshed, err := sm2.refresh() assert.NoError(t, err) - assert.Equal(t, uint64(123), refreshed.nextWalSstID.Load()) - assert.Equal(t, uint64(123), sm.dbState().nextWalSstID.Load()) + assert.Equal(t, uint64(123), refreshed.NextWalSstID.Load()) + assert.Equal(t, uint64(123), sm.dbState().NextWalSstID.Load()) } func TestShouldBumpWriterEpoch(t *testing.T) { bucket := objstore.NewInMemBucket() manifestStore := newManifestStore(rootPath, bucket) - state := newCoreDBState() + coreState := state.NewCoreDBState() - _, err := newStoredManifest(manifestStore, state) + _, err := newStoredManifest(manifestStore, coreState) assert.NoError(t, err) for i := 1; i <= 5; i++ { @@ -111,9 +114,9 @@ func TestShouldBumpWriterEpoch(t *testing.T) { func TestShouldFailOnWriterFenced(t *testing.T) { bucket := objstore.NewInMemBucket() manifestStore := newManifestStore(rootPath, bucket) - state := newCoreDBState() + coreState := state.NewCoreDBState() - sm, err := newStoredManifest(manifestStore, state) + sm, err := newStoredManifest(manifestStore, coreState) assert.NoError(t, err) writer1, err := initFenceableManifestWriter(sm) assert.NoError(t, err) @@ -127,21 +130,22 @@ func TestShouldFailOnWriterFenced(t *testing.T) { _, err = writer1.refresh() assert.ErrorIs(t, err, common.ErrFenced) - state.nextWalSstID.Store(123) - err = writer1.updateDBState(state) + core := coreState.Snapshot() + core.NextWalSstID.Store(123) + err = writer1.updateDBState(core) assert.ErrorIs(t, err, common.ErrFenced) refreshed, err := writer2.refresh() assert.NoError(t, err) - assert.Equal(t, uint64(1), refreshed.nextWalSstID.Load()) + assert.Equal(t, uint64(1), refreshed.NextWalSstID.Load()) } func TestShouldBumpCompactorEpoch(t *testing.T) { bucket := objstore.NewInMemBucket() manifestStore := newManifestStore(rootPath, bucket) - state := newCoreDBState() + coreState := state.NewCoreDBState() - _, err := newStoredManifest(manifestStore, state) + _, err := newStoredManifest(manifestStore, coreState) assert.NoError(t, err) for i := 1; i <= 5; i++ { @@ -164,9 +168,9 @@ func TestShouldBumpCompactorEpoch(t *testing.T) { func TestShouldFailOnCompactorFenced(t *testing.T) { bucket := objstore.NewInMemBucket() manifestStore := newManifestStore(rootPath, bucket) - state := newCoreDBState() + coreState := state.NewCoreDBState() - sm, err := newStoredManifest(manifestStore, state) + sm, err := newStoredManifest(manifestStore, coreState) assert.NoError(t, err) compactor1, err := initFenceableManifestCompactor(sm) assert.NoError(t, err) @@ -180,11 +184,12 @@ func TestShouldFailOnCompactorFenced(t *testing.T) { _, err = compactor1.refresh() assert.ErrorIs(t, err, common.ErrFenced) - state.nextWalSstID.Store(123) - err = compactor1.updateDBState(state) + core := coreState.Snapshot() + core.NextWalSstID.Store(123) + err = compactor1.updateDBState(core) assert.ErrorIs(t, err, common.ErrFenced) refreshed, err := compactor2.refresh() assert.NoError(t, err) - assert.Equal(t, uint64(1), refreshed.nextWalSstID.Load()) + assert.Equal(t, uint64(1), refreshed.NextWalSstID.Load()) } diff --git a/slatedb/size_tiered_compaction.go b/slatedb/size_tiered_compaction.go index 84c28f0..573faff 100644 --- a/slatedb/size_tiered_compaction.go +++ b/slatedb/size_tiered_compaction.go @@ -10,17 +10,17 @@ func (s SizeTieredCompactionScheduler) maybeScheduleCompaction(state *CompactorS dbState := state.dbState // for now, just compact l0 down to a new sorted run each time compactions := make([]Compaction, 0) - if len(dbState.l0) >= 4 { + if len(dbState.L0) >= 4 { sources := make([]SourceID, 0) - for _, sst := range dbState.l0 { + for _, sst := range dbState.L0 { id, ok := sst.Id.CompactedID().Get() assert.True(ok, "Expected valid compacted ID") sources = append(sources, newSourceIDSST(id)) } nextSortedRunID := uint32(0) - if len(dbState.compacted) > 0 { - nextSortedRunID = dbState.compacted[0].id + 1 + if len(dbState.Compacted) > 0 { + nextSortedRunID = dbState.Compacted[0].ID + 1 } compactions = append(compactions, newCompaction(sources, nextSortedRunID)) diff --git a/slatedb/db_state.go b/slatedb/state/db_state.go similarity index 65% rename from slatedb/db_state.go rename to slatedb/state/db_state.go index ae7f0d3..08fc4ba 100644 --- a/slatedb/db_state.go +++ b/slatedb/state/db_state.go @@ -1,8 +1,9 @@ -package slatedb +package state import ( "github.com/slatedb/slatedb-go/internal/assert" "github.com/slatedb/slatedb-go/internal/sstable" + "github.com/slatedb/slatedb-go/slatedb/compaction" "github.com/slatedb/slatedb-go/slatedb/table" "log/slog" "sync" @@ -22,7 +23,7 @@ import ( type CoreDBState struct { l0LastCompacted mo.Option[ulid.ULID] l0 []sstable.Handle - compacted []SortedRun + compacted []compaction.SortedRun // nextWalSstID is used as the ID of new ImmutableWAL created during WAL flush process // It is initialized to 1 and keeps getting incremented by 1 for new ImmutableWALs @@ -34,33 +35,72 @@ type CoreDBState struct { lastCompactedWalSSTID atomic.Uint64 } -func newCoreDBState() *CoreDBState { +type CoreStateSnapshot struct { + L0LastCompacted mo.Option[ulid.ULID] + L0 []sstable.Handle + Compacted []compaction.SortedRun + NextWalSstID atomic.Uint64 + LastCompactedWalSSTID atomic.Uint64 +} + +func (s *CoreStateSnapshot) ToCoreState() *CoreDBState { + snapshot := s.Clone() + coreState := &CoreDBState{ + l0LastCompacted: snapshot.L0LastCompacted, + l0: snapshot.L0, + compacted: snapshot.Compacted, + } + coreState.nextWalSstID.Store(s.NextWalSstID.Load()) + coreState.lastCompactedWalSSTID.Store(s.LastCompactedWalSSTID.Load()) + return coreState +} + +func (s *CoreStateSnapshot) Clone() *CoreStateSnapshot { + l0 := make([]sstable.Handle, 0, len(s.L0)) + for _, sst := range s.L0 { + l0 = append(l0, *sst.Clone()) + } + compacted := make([]compaction.SortedRun, 0, len(s.Compacted)) + for _, sr := range s.Compacted { + compacted = append(compacted, *sr.Clone()) + } + snapshot := &CoreStateSnapshot{ + L0LastCompacted: s.L0LastCompacted, + L0: l0, + Compacted: compacted, + } + snapshot.NextWalSstID.Store(s.NextWalSstID.Load()) + snapshot.LastCompactedWalSSTID.Store(s.LastCompactedWalSSTID.Load()) + return snapshot +} + +func NewCoreDBState() *CoreDBState { coreState := &CoreDBState{ l0LastCompacted: mo.None[ulid.ULID](), l0: make([]sstable.Handle, 0), - compacted: []SortedRun{}, + compacted: []compaction.SortedRun{}, } coreState.nextWalSstID.Store(1) coreState.lastCompactedWalSSTID.Store(0) return coreState } -func (c *CoreDBState) clone() *CoreDBState { +func (c *CoreDBState) Snapshot() *CoreStateSnapshot { l0 := make([]sstable.Handle, 0, len(c.l0)) for _, sst := range c.l0 { l0 = append(l0, *sst.Clone()) } - compacted := make([]SortedRun, 0, len(c.compacted)) + compacted := make([]compaction.SortedRun, 0, len(c.compacted)) for _, sr := range c.compacted { - compacted = append(compacted, *sr.clone()) + compacted = append(compacted, *sr.Clone()) } - coreState := &CoreDBState{ - l0LastCompacted: c.l0LastCompacted, - l0: l0, - compacted: compacted, + coreState := &CoreStateSnapshot{ + L0LastCompacted: c.l0LastCompacted, + L0: l0, + Compacted: compacted, } - coreState.nextWalSstID.Store(c.nextWalSstID.Load()) - coreState.lastCompactedWalSSTID.Store(c.lastCompactedWalSSTID.Load()) + coreState.NextWalSstID.Store(c.nextWalSstID.Load()) + coreState.LastCompactedWalSSTID.Store(c.lastCompactedWalSSTID.Load()) return coreState } @@ -74,11 +114,11 @@ func LogState(log *slog.Logger, c *CoreDBState) { // DBStateSnapshot contains state required for read methods (eg. GET) type DBStateSnapshot struct { - wal *table.WAL - memtable *table.Memtable - immWALs *deque.Deque[*table.ImmutableWAL] - immMemtables *deque.Deque[*table.ImmutableMemtable] - core *CoreDBState + Wal *table.WAL + Memtable *table.Memtable + ImmWALs *deque.Deque[*table.ImmutableWAL] + ImmMemtables *deque.Deque[*table.ImmutableMemtable] + Core *CoreStateSnapshot } type DBState struct { @@ -90,7 +130,7 @@ type DBState struct { core *CoreDBState } -func newDBState(coreDBState *CoreDBState) *DBState { +func NewDBState(coreDBState *CoreDBState) *DBState { return &DBState{ wal: table.NewWAL(), memtable: table.NewMemtable(), @@ -164,26 +204,26 @@ func (s *DBState) DeleteKVFromMemtable(key []byte) { s.memtable.Delete(key) } -func (s *DBState) coreStateClone() *CoreDBState { +func (s *DBState) CoreStateSnapshot() *CoreStateSnapshot { s.RLock() defer s.RUnlock() - return s.core.clone() + return s.core.Snapshot() } -func (s *DBState) snapshot() *DBStateSnapshot { +func (s *DBState) Snapshot() *DBStateSnapshot { s.RLock() defer s.RUnlock() return &DBStateSnapshot{ - wal: s.wal.Clone(), - memtable: s.memtable.Clone(), - immWALs: common.CopyDeque(s.immWALs), - immMemtables: common.CopyDeque(s.immMemtables), - core: s.core.clone(), + Wal: s.wal.Clone(), + Memtable: s.memtable.Clone(), + ImmWALs: common.CopyDeque(s.immWALs), + ImmMemtables: common.CopyDeque(s.immMemtables), + Core: s.core.Snapshot(), } } -func (s *DBState) freezeMemtable(walID uint64) { +func (s *DBState) FreezeMemtable(walID uint64) { s.Lock() defer s.Unlock() @@ -194,7 +234,7 @@ func (s *DBState) freezeMemtable(walID uint64) { s.immMemtables.PushFront(immMemtable) } -func (s *DBState) freezeWAL() mo.Option[uint64] { +func (s *DBState) FreezeWAL() mo.Option[uint64] { s.Lock() defer s.Unlock() @@ -211,14 +251,14 @@ func (s *DBState) freezeWAL() mo.Option[uint64] { return mo.Some(immWAL.ID()) } -func (s *DBState) popImmWAL() { +func (s *DBState) PopImmWAL() { s.Lock() defer s.Unlock() s.immWALs.PopBack() } -func (s *DBState) oldestImmWAL() mo.Option[*table.ImmutableWAL] { +func (s *DBState) OldestImmWAL() mo.Option[*table.ImmutableWAL] { s.RLock() defer s.RUnlock() @@ -228,7 +268,7 @@ func (s *DBState) oldestImmWAL() mo.Option[*table.ImmutableWAL] { return mo.Some(s.immWALs.Back()) } -func (s *DBState) oldestImmMemtable() mo.Option[*table.ImmutableMemtable] { +func (s *DBState) OldestImmMemtable() mo.Option[*table.ImmutableMemtable] { s.RLock() defer s.RUnlock() @@ -238,7 +278,7 @@ func (s *DBState) oldestImmMemtable() mo.Option[*table.ImmutableMemtable] { return mo.Some(s.immMemtables.Back()) } -func (s *DBState) moveImmMemtableToL0(immMemtable *table.ImmutableMemtable, sstHandle *sstable.Handle) { +func (s *DBState) MoveImmMemtableToL0(immMemtable *table.ImmutableMemtable, sstHandle *sstable.Handle) { s.Lock() defer s.Unlock() @@ -249,16 +289,16 @@ func (s *DBState) moveImmMemtableToL0(immMemtable *table.ImmutableMemtable, sstH s.core.lastCompactedWalSSTID.Store(immMemtable.LastWalID()) } -func (s *DBState) incrementNextWALID() { +func (s *DBState) IncrementNextWALID() { s.core.nextWalSstID.Add(1) } -func (s *DBState) refreshDBState(compactorState *CoreDBState) { +func (s *DBState) RefreshDBState(compactorState *CoreStateSnapshot) { s.Lock() defer s.Unlock() // copy over L0 up to l0LastCompacted - l0LastCompacted := compactorState.l0LastCompacted + l0LastCompacted := compactorState.L0LastCompacted newL0 := make([]sstable.Handle, 0) for i := 0; i < len(s.core.l0); i++ { sst := s.core.l0[i] @@ -273,5 +313,5 @@ func (s *DBState) refreshDBState(compactorState *CoreDBState) { s.core.l0LastCompacted = l0LastCompacted s.core.l0 = newL0 - s.core.compacted = compactorState.compacted + s.core.compacted = compactorState.Compacted } diff --git a/slatedb/db_state_test.go b/slatedb/state/db_state_test.go similarity index 65% rename from slatedb/db_state_test.go rename to slatedb/state/db_state_test.go index 14c7d43..3710382 100644 --- a/slatedb/db_state_test.go +++ b/slatedb/state/db_state_test.go @@ -1,4 +1,4 @@ -package slatedb +package state import ( "github.com/oklog/ulid/v2" @@ -19,51 +19,51 @@ func addL0sToDBState(dbState *DBState, n uint32) { } for i := 0; i < int(n); i++ { - dbState.freezeMemtable(uint64(i)) - immMemtable := dbState.oldestImmMemtable() + dbState.FreezeMemtable(uint64(i)) + immMemtable := dbState.OldestImmMemtable() if immMemtable.IsAbsent() { break } sst := sstable.NewHandle(sstable.NewIDCompacted(ulid.Make()), sstInfo) - dbState.moveImmMemtableToL0(immMemtable.MustGet(), sst) + dbState.MoveImmMemtableToL0(immMemtable.MustGet(), sst) } } func TestRefreshDBStateWithL0sUptoLastCompacted(t *testing.T) { - dbState := newDBState(newCoreDBState()) + dbState := NewDBState(NewCoreDBState()) addL0sToDBState(dbState, 4) // prepare compactorState indicating that the last SST in L0 gets compacted - compactorState := dbState.coreStateClone() - size := len(compactorState.l0) - lastCompacted := compactorState.l0[size-1] - compactorState.l0 = compactorState.l0[:size-1] + compactorState := dbState.CoreStateSnapshot() + size := len(compactorState.L0) + lastCompacted := compactorState.L0[size-1] + compactorState.L0 = compactorState.L0[:size-1] assert.Equal(t, sstable.Compacted, lastCompacted.Id.Type) id, err := ulid.Parse(lastCompacted.Id.Value) assert.NoError(t, err) - compactorState.l0LastCompacted = mo.Some(id) + compactorState.L0LastCompacted = mo.Some(id) // when refreshDBState is called with the compactorState - dbState.refreshDBState(compactorState) + dbState.RefreshDBState(compactorState) // then verify that the dbState.core is modified to match the given compactorState - assert.Equal(t, len(compactorState.l0), len(dbState.L0())) - for i := 0; i < len(compactorState.l0); i++ { - expected := compactorState.l0[i] + assert.Equal(t, len(compactorState.L0), len(dbState.L0())) + for i := 0; i < len(compactorState.L0); i++ { + expected := compactorState.L0[i] actual := dbState.L0()[i] assert.Equal(t, expected, actual) } - assert.Equal(t, compactorState.l0LastCompacted, dbState.L0LastCompacted()) + assert.Equal(t, compactorState.L0LastCompacted, dbState.L0LastCompacted()) } func TestRefreshDBStateWithAllL0sIfNoneCompacted(t *testing.T) { - dbState := newDBState(newCoreDBState()) + dbState := NewDBState(NewCoreDBState()) addL0sToDBState(dbState, 4) - l0SSTList := dbState.coreStateClone().l0 + l0SSTList := dbState.CoreStateSnapshot().L0 // when refreshDBState is called with no compaction - dbState.refreshDBState(newCoreDBState()) + dbState.RefreshDBState(NewCoreDBState().Snapshot()) // then verify there is no change in dbState L0 assert.Equal(t, len(l0SSTList), len(dbState.L0())) diff --git a/slatedb/table_store.go b/slatedb/store/table_store.go similarity index 99% rename from slatedb/table_store.go rename to slatedb/store/table_store.go index 42d4cca..048131e 100644 --- a/slatedb/table_store.go +++ b/slatedb/store/table_store.go @@ -1,4 +1,4 @@ -package slatedb +package store import ( "bytes" @@ -50,7 +50,7 @@ func NewTableStore(bucket objstore.Bucket, sstConfig sstable.Config, rootPath st } // Get list of WALs from object store that are not compacted (walID greater than walIDLastCompacted) -func (ts *TableStore) getWalSSTList(walIDLastCompacted uint64) ([]uint64, error) { +func (ts *TableStore) GetWalSSTList(walIDLastCompacted uint64) ([]uint64, error) { walList := make([]uint64, 0) walPath := path.Join(ts.rootPath, ts.walPath) diff --git a/slatedb/table_store_test.go b/slatedb/store/table_store_test.go similarity index 99% rename from slatedb/table_store_test.go rename to slatedb/store/table_store_test.go index 620e670..88873d7 100644 --- a/slatedb/table_store_test.go +++ b/slatedb/store/table_store_test.go @@ -1,4 +1,4 @@ -package slatedb +package store import ( "bytes"