From 4e5d6ff32f7d5eb9faaf513cf9d712aa05b606f9 Mon Sep 17 00:00:00 2001 From: Naveen Date: Mon, 23 Dec 2024 22:55:01 +0530 Subject: [PATCH] Move dbstate to package --- slatedb/compactor.go | 10 +-- slatedb/compactor_state.go | 33 ++++---- slatedb/compactor_state_test.go | 117 ++++++++++++++------------- slatedb/compactor_test.go | 27 +++---- slatedb/db.go | 39 ++++----- slatedb/db_test.go | 43 +++++----- slatedb/flatbuf_types.go | 31 +++---- slatedb/flush.go | 14 ++-- slatedb/manifest.go | 7 +- slatedb/manifest_store.go | 21 ++--- slatedb/manifest_store_test.go | 69 ++++++++-------- slatedb/size_tiered_compaction.go | 8 +- slatedb/{ => state}/db_state.go | 105 ++++++++++++++++-------- slatedb/{ => state}/db_state_test.go | 36 ++++----- 14 files changed, 306 insertions(+), 254 deletions(-) rename slatedb/{ => state}/db_state.go (68%) rename slatedb/{ => state}/db_state_test.go (65%) diff --git a/slatedb/compactor.go b/slatedb/compactor.go index 5e3c31f..05811ef 100644 --- a/slatedb/compactor.go +++ b/slatedb/compactor.go @@ -185,7 +185,7 @@ func loadState(manifest *FenceableManifest) (*CompactorState, error) { if err != nil { return nil, err } - return newCompactorState(dbState.clone(), nil), nil + return newCompactorState(dbState.Clone(), nil), nil } func loadCompactionScheduler() CompactionScheduler { @@ -234,12 +234,12 @@ func (o *CompactorOrchestrator) startCompaction(compaction Compaction) { dbState := o.state.dbState sstsByID := make(map[ulid.ULID]sstable.Handle) - for _, sst := range dbState.l0 { + for _, sst := range dbState.L0 { id, ok := sst.Id.CompactedID().Get() assert.True(ok, "expected valid compacted ID") sstsByID[id] = sst } - for _, sr := range dbState.compacted { + for _, sr := range dbState.Compacted { for _, sst := range sr.SSTList { id, ok := sst.Id.CompactedID().Get() assert.True(ok, "expected valid compacted ID") @@ -248,7 +248,7 @@ func (o *CompactorOrchestrator) startCompaction(compaction Compaction) { } srsByID := make(map[uint32]levels.SortedRun) - for _, sr := range dbState.compacted { + for _, sr := range dbState.Compacted { srsByID[sr.ID] = sr } @@ -297,7 +297,7 @@ func (o *CompactorOrchestrator) writeManifest() error { return err } - core := o.state.dbState.clone() + core := o.state.dbState.Clone() err = o.manifest.updateDBState(core) if errors.Is(err, common.ErrManifestVersionExists) { o.log.Warn("conflicting manifest version. retry write", "error", err) diff --git a/slatedb/compactor_state.go b/slatedb/compactor_state.go index ac219ae..10733de 100644 --- a/slatedb/compactor_state.go +++ b/slatedb/compactor_state.go @@ -5,6 +5,7 @@ import ( "github.com/slatedb/slatedb-go/internal/assert" "github.com/slatedb/slatedb-go/internal/sstable" "github.com/slatedb/slatedb-go/slatedb/levels" + "github.com/slatedb/slatedb-go/slatedb/state" "log/slog" "math" "strconv" @@ -89,12 +90,12 @@ func newCompaction(sources []SourceID, destination uint32) Compaction { // ------------------------------------------------ type CompactorState struct { - dbState *CoreDBState + dbState *state.CoreStateSnapshot compactions map[uint32]Compaction log *slog.Logger } -func newCompactorState(dbState *CoreDBState, log *slog.Logger) *CompactorState { +func newCompactorState(dbState *state.CoreStateSnapshot, log *slog.Logger) *CompactorState { set.Default(&log, slog.Default()) return &CompactorState{ @@ -111,7 +112,7 @@ func (c *CompactorState) submitCompaction(compaction Compaction) error { return common.ErrInvalidCompaction } - for _, sr := range c.dbState.compacted { + for _, sr := range c.dbState.Compacted { if sr.ID == compaction.destination { if !c.oneOfTheSourceSRMatchesDestination(compaction) { // the compaction overwrites an existing sr but doesn't include the sr @@ -138,12 +139,12 @@ func (c *CompactorState) oneOfTheSourceSRMatchesDestination(compaction Compactio return false } -func (c *CompactorState) refreshDBState(writerState *CoreDBState) { +func (c *CompactorState) refreshDBState(writerState *state.CoreStateSnapshot) { // the writer may have added more l0 SSTs. Add these to our l0 list. - lastCompactedL0 := c.dbState.l0LastCompacted + lastCompactedL0 := c.dbState.L0LastCompacted mergedL0s := make([]sstable.Handle, 0) - for _, writerL0SST := range writerState.l0 { + for _, writerL0SST := range writerState.L0 { assert.True(writerL0SST.Id.Type == sstable.Compacted, "unexpected sstable.ID type") writerL0ID, _ := writerL0SST.Id.CompactedID().Get() // we stop appending to our l0 list if we encounter sstID equal to lastCompactedID @@ -154,10 +155,10 @@ func (c *CompactorState) refreshDBState(writerState *CoreDBState) { mergedL0s = append(mergedL0s, writerL0SST) } - merged := c.dbState.clone() - merged.l0 = mergedL0s - merged.lastCompactedWalSSTID.Store(writerState.lastCompactedWalSSTID.Load()) - merged.nextWalSstID.Store(writerState.nextWalSstID.Load()) + merged := c.dbState.Clone() + merged.L0 = mergedL0s + merged.LastCompactedWalSSTID.Store(writerState.LastCompactedWalSSTID.Load()) + merged.NextWalSstID.Store(writerState.NextWalSstID.Load()) c.dbState = merged } @@ -183,9 +184,9 @@ func (c *CompactorState) finishCompaction(outputSR *levels.SortedRun) { } compactionSRs[compaction.destination] = true - dbState := c.dbState.clone() + dbState := c.dbState.Clone() newL0 := make([]sstable.Handle, 0) - for _, sst := range dbState.l0 { + for _, sst := range dbState.L0 { assert.True(sst.Id.CompactedID().IsPresent(), "Expected compactedID not present") l0ID, _ := sst.Id.CompactedID().Get() _, ok := compactionL0s[l0ID] @@ -196,7 +197,7 @@ func (c *CompactorState) finishCompaction(outputSR *levels.SortedRun) { newCompacted := make([]levels.SortedRun, 0) inserted := false - for _, sr := range dbState.compacted { + for _, sr := range dbState.Compacted { if !inserted && outputSR.ID >= sr.ID { newCompacted = append(newCompacted, *outputSR) inserted = true @@ -216,11 +217,11 @@ func (c *CompactorState) finishCompaction(outputSR *levels.SortedRun) { firstSource := compaction.sources[0] if firstSource.sstID().IsPresent() { compactedL0, _ := firstSource.sstID().Get() - dbState.l0LastCompacted = mo.Some(compactedL0) + dbState.L0LastCompacted = mo.Some(compactedL0) } - dbState.l0 = newL0 - dbState.compacted = newCompacted + dbState.L0 = newL0 + dbState.Compacted = newCompacted c.dbState = dbState delete(c.compactions, outputSR.ID) } diff --git a/slatedb/compactor_state_test.go b/slatedb/compactor_state_test.go index de8e132..d3d850f 100644 --- a/slatedb/compactor_state_test.go +++ b/slatedb/compactor_state_test.go @@ -5,6 +5,7 @@ import ( assert2 "github.com/slatedb/slatedb-go/internal/assert" "github.com/slatedb/slatedb-go/internal/sstable" "github.com/slatedb/slatedb-go/slatedb/levels" + "github.com/slatedb/slatedb-go/slatedb/state" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" @@ -15,57 +16,57 @@ import ( var testPath = "/test/db" func TestShouldRegisterCompactionAsSubmitted(t *testing.T) { - _, _, state := buildTestState(t) - err := state.submitCompaction(buildL0Compaction(state.dbState.l0, 0)) + _, _, compactorState := buildTestState(t) + err := compactorState.submitCompaction(buildL0Compaction(compactorState.dbState.L0, 0)) assert.NoError(t, err) - assert.Equal(t, 1, len(state.compactions)) - assert.Equal(t, Submitted, state.compactions[0].status) + assert.Equal(t, 1, len(compactorState.compactions)) + assert.Equal(t, Submitted, compactorState.compactions[0].status) } func TestShouldUpdateDBStateWhenCompactionFinished(t *testing.T) { - _, _, state := buildTestState(t) - beforeCompaction := state.dbState.clone() - compaction := buildL0Compaction(beforeCompaction.l0, 0) - err := state.submitCompaction(compaction) + _, _, compactorState := buildTestState(t) + beforeCompaction := compactorState.dbState.Clone() + compaction := buildL0Compaction(beforeCompaction.L0, 0) + err := compactorState.submitCompaction(compaction) assert.NoError(t, err) sr := levels.SortedRun{ ID: 0, - SSTList: beforeCompaction.l0, + SSTList: beforeCompaction.L0, } - state.finishCompaction(sr.Clone()) + compactorState.finishCompaction(sr.Clone()) - compactedID, _ := beforeCompaction.l0[0].Id.CompactedID().Get() - l0LastCompacted, _ := state.dbState.l0LastCompacted.Get() + compactedID, _ := beforeCompaction.L0[0].Id.CompactedID().Get() + l0LastCompacted, _ := compactorState.dbState.L0LastCompacted.Get() assert.Equal(t, compactedID, l0LastCompacted) - assert.Equal(t, 0, len(state.dbState.l0)) - assert.Equal(t, 1, len(state.dbState.compacted)) - assert.Equal(t, sr.ID, state.dbState.compacted[0].ID) - compactedSR := state.dbState.compacted[0] + assert.Equal(t, 0, len(compactorState.dbState.L0)) + assert.Equal(t, 1, len(compactorState.dbState.Compacted)) + assert.Equal(t, sr.ID, compactorState.dbState.Compacted[0].ID) + compactedSR := compactorState.dbState.Compacted[0] for i := 0; i < len(sr.SSTList); i++ { assert.Equal(t, sr.SSTList[i].Id, compactedSR.SSTList[i].Id) } } func TestShouldRemoveCompactionWhenCompactionFinished(t *testing.T) { - _, _, state := buildTestState(t) - beforeCompaction := state.dbState.clone() - compaction := buildL0Compaction(beforeCompaction.l0, 0) - err := state.submitCompaction(compaction) + _, _, compactorState := buildTestState(t) + beforeCompaction := compactorState.dbState.Clone() + compaction := buildL0Compaction(beforeCompaction.L0, 0) + err := compactorState.submitCompaction(compaction) assert.NoError(t, err) sr := levels.SortedRun{ ID: 0, - SSTList: beforeCompaction.l0, + SSTList: beforeCompaction.L0, } - state.finishCompaction(sr.Clone()) + compactorState.finishCompaction(sr.Clone()) - assert.Equal(t, 0, len(state.compactions)) + assert.Equal(t, 0, len(compactorState.compactions)) } func TestShouldRefreshDBStateCorrectlyWhenNeverCompacted(t *testing.T) { - bucket, sm, state := buildTestState(t) + bucket, sm, compactorState := buildTestState(t) option := DefaultDBOptions() option.L0SSTSizeBytes = 128 db, err := OpenWithOptions(context.Background(), testPath, bucket, option) @@ -74,25 +75,25 @@ func TestShouldRefreshDBStateCorrectlyWhenNeverCompacted(t *testing.T) { db.Put(repeatedChar('a', 16), repeatedChar('b', 48)) db.Put(repeatedChar('j', 16), repeatedChar('k', 48)) - writerDBState := waitForManifestWithL0Len(sm, len(state.dbState.l0)+1) + writerDBState := waitForManifestWithL0Len(sm, len(compactorState.dbState.L0)+1) - state.refreshDBState(writerDBState) + compactorState.refreshDBState(writerDBState) - assert.True(t, state.dbState.l0LastCompacted.IsAbsent()) - for i := 0; i < len(writerDBState.l0); i++ { - assert.Equal(t, writerDBState.l0[i].Id.CompactedID(), state.dbState.l0[i].Id.CompactedID()) + assert.True(t, compactorState.dbState.L0LastCompacted.IsAbsent()) + for i := 0; i < len(writerDBState.L0); i++ { + assert.Equal(t, writerDBState.L0[i].Id.CompactedID(), compactorState.dbState.L0[i].Id.CompactedID()) } } func TestShouldRefreshDBStateCorrectly(t *testing.T) { - bucket, sm, state := buildTestState(t) - originalL0s := state.dbState.clone().l0 + bucket, sm, compactorState := buildTestState(t) + originalL0s := compactorState.dbState.Clone().L0 compactedID, ok := originalL0s[len(originalL0s)-1].Id.CompactedID().Get() assert.True(t, ok) compaction := newCompaction([]SourceID{newSourceIDSST(compactedID)}, 0) - err := state.submitCompaction(compaction) + err := compactorState.submitCompaction(compaction) assert.NoError(t, err) - state.finishCompaction(&levels.SortedRun{ + compactorState.finishCompaction(&levels.SortedRun{ ID: 0, SSTList: []sstable.Handle{originalL0s[len(originalL0s)-1]}, }) @@ -105,35 +106,35 @@ func TestShouldRefreshDBStateCorrectly(t *testing.T) { db.Put(repeatedChar('a', 16), repeatedChar('b', 48)) db.Put(repeatedChar('j', 16), repeatedChar('k', 48)) writerDBState := waitForManifestWithL0Len(sm, len(originalL0s)+1) - dbStateBeforeMerge := state.dbState.clone() + dbStateBeforeMerge := compactorState.dbState.Clone() - state.refreshDBState(writerDBState) + compactorState.refreshDBState(writerDBState) - dbState := state.dbState + dbState := compactorState.dbState // last sst was removed during compaction expectedMergedL0s := originalL0s[:len(originalL0s)-1] // new sst got added during db.Put() call above - expectedMergedL0s = append([]sstable.Handle{writerDBState.l0[0]}, expectedMergedL0s...) + expectedMergedL0s = append([]sstable.Handle{writerDBState.L0[0]}, expectedMergedL0s...) for i := 0; i < len(expectedMergedL0s); i++ { expected, _ := expectedMergedL0s[i].Id.CompactedID().Get() - actual, _ := dbState.l0[i].Id.CompactedID().Get() + actual, _ := dbState.L0[i].Id.CompactedID().Get() assert.Equal(t, expected, actual) } - for i := 0; i < len(dbStateBeforeMerge.compacted); i++ { - srBefore := dbStateBeforeMerge.compacted[i] - srAfter := dbState.compacted[i] + for i := 0; i < len(dbStateBeforeMerge.Compacted); i++ { + srBefore := dbStateBeforeMerge.Compacted[i] + srAfter := dbState.Compacted[i] assert.Equal(t, srBefore.ID, srAfter.ID) for j := 0; j < len(srBefore.SSTList); j++ { assert.Equal(t, srBefore.SSTList[j].Id, srAfter.SSTList[j].Id) } } - assert.Equal(t, writerDBState.lastCompactedWalSSTID.Load(), dbState.lastCompactedWalSSTID.Load()) - assert.Equal(t, writerDBState.nextWalSstID.Load(), dbState.nextWalSstID.Load()) + assert.Equal(t, writerDBState.LastCompactedWalSSTID.Load(), dbState.LastCompactedWalSSTID.Load()) + assert.Equal(t, writerDBState.NextWalSstID.Load(), dbState.NextWalSstID.Load()) } func TestShouldRefreshDBStateCorrectlyWhenAllL0Compacted(t *testing.T) { - bucket, sm, state := buildTestState(t) - originalL0s := state.dbState.clone().l0 + bucket, sm, compactorState := buildTestState(t) + originalL0s := compactorState.dbState.Clone().L0 sourceIDs := make([]SourceID, 0) for _, sst := range originalL0s { @@ -142,13 +143,13 @@ func TestShouldRefreshDBStateCorrectlyWhenAllL0Compacted(t *testing.T) { sourceIDs = append(sourceIDs, newSourceIDSST(id)) } compaction := newCompaction(sourceIDs, 0) - err := state.submitCompaction(compaction) + err := compactorState.submitCompaction(compaction) assert.NoError(t, err) - state.finishCompaction(&levels.SortedRun{ + compactorState.finishCompaction(&levels.SortedRun{ ID: 0, SSTList: originalL0s, }) - assert.Equal(t, 0, len(state.dbState.l0)) + assert.Equal(t, 0, len(compactorState.dbState.L0)) option := DefaultDBOptions() option.L0SSTSizeBytes = 128 @@ -159,22 +160,22 @@ func TestShouldRefreshDBStateCorrectlyWhenAllL0Compacted(t *testing.T) { db.Put(repeatedChar('j', 16), repeatedChar('k', 48)) writerDBState := waitForManifestWithL0Len(sm, len(originalL0s)+1) - state.refreshDBState(writerDBState) + compactorState.refreshDBState(writerDBState) - dbState := state.dbState - assert.Equal(t, 1, len(dbState.l0)) - expectedID, _ := writerDBState.l0[0].Id.CompactedID().Get() - actualID, _ := dbState.l0[0].Id.CompactedID().Get() + dbState := compactorState.dbState + assert.Equal(t, 1, len(dbState.L0)) + expectedID, _ := writerDBState.L0[0].Id.CompactedID().Get() + actualID, _ := dbState.L0[0].Id.CompactedID().Get() assert.Equal(t, expectedID, actualID) } -func waitForManifestWithL0Len(storedManifest StoredManifest, size int) *CoreDBState { +func waitForManifestWithL0Len(storedManifest StoredManifest, size int) *state.CoreStateSnapshot { startTime := time.Now() for time.Since(startTime) < time.Second*10 { dbState, err := storedManifest.refresh() assert2.True(err == nil, "") - if len(dbState.l0) == size { - return dbState.clone() + if len(dbState.L0) == size { + return dbState.Clone() } time.Sleep(time.Millisecond * 50) } @@ -214,6 +215,6 @@ func buildTestState(t *testing.T) (objstore.Bucket, StoredManifest, *CompactorSt assert.True(t, err == nil, "Could not load stored manifest") assert.True(t, sm.IsPresent(), "Could not find stored manifest") storedManifest, _ := sm.Get() - state := newCompactorState(storedManifest.dbState(), nil) - return bucket, storedManifest, state + compactorState := newCompactorState(storedManifest.dbState(), nil) + return bucket, storedManifest, compactorState } diff --git a/slatedb/compactor_test.go b/slatedb/compactor_test.go index df365e1..c4c0efe 100644 --- a/slatedb/compactor_test.go +++ b/slatedb/compactor_test.go @@ -6,6 +6,7 @@ import ( "github.com/slatedb/slatedb-go/internal/compress" "github.com/slatedb/slatedb-go/internal/sstable" "github.com/slatedb/slatedb-go/internal/types" + "github.com/slatedb/slatedb-go/slatedb/state" "github.com/slatedb/slatedb-go/slatedb/store" "log/slog" "math" @@ -29,26 +30,24 @@ func TestCompactorCompactsL0(t *testing.T) { } startTime := time.Now() - dbState := mo.None[*CoreDBState]() + dbState := mo.None[*state.CoreStateSnapshot]() for time.Since(startTime) < time.Second*10 { sm, err := loadStoredManifest(manifestStore) assert.NoError(t, err) assert.True(t, sm.IsPresent()) storedManifest, _ := sm.Get() - state := storedManifest.dbState() - if state.l0LastCompacted.IsPresent() { - dbState = mo.Some(state.clone()) + if storedManifest.dbState().L0LastCompacted.IsPresent() { + dbState = mo.Some(storedManifest.dbState().Clone()) break } time.Sleep(time.Millisecond * 50) } assert.True(t, dbState.IsPresent()) - state, _ := dbState.Get() - assert.True(t, state.l0LastCompacted.IsPresent()) - assert.Equal(t, 1, len(state.compacted)) + assert.True(t, dbState.MustGet().L0LastCompacted.IsPresent()) + assert.Equal(t, 1, len(dbState.MustGet().Compacted)) - compactedSSTList := state.compacted[0].SSTList + compactedSSTList := dbState.MustGet().Compacted[0].SSTList assert.Equal(t, 1, len(compactedSSTList)) sst := compactedSSTList[0] @@ -88,7 +87,7 @@ func TestShouldWriteManifestSafely(t *testing.T) { assert.NoError(t, err) l0IDsToCompact := make([]SourceID, 0) - for _, sst := range orchestrator.state.dbState.l0 { + for _, sst := range orchestrator.state.dbState.L0 { id, ok := sst.Id.CompactedID().Get() assert.True(t, ok) l0IDsToCompact = append(l0IDsToCompact, newSourceIDSST(id)) @@ -112,19 +111,19 @@ func TestShouldWriteManifestSafely(t *testing.T) { // Key aaa... will be compacted and Key jjj... will be in Level0 dbState, err := storedManifest.refresh() assert.NoError(t, err) - assert.Equal(t, 1, len(dbState.l0)) - assert.Equal(t, 1, len(dbState.compacted)) + assert.Equal(t, 1, len(dbState.L0)) + assert.Equal(t, 1, len(dbState.Compacted)) - l0ID, ok := dbState.l0[0].Id.CompactedID().Get() + l0ID, ok := dbState.L0[0].Id.CompactedID().Get() assert.True(t, ok) compactedSSTIDs := make([]ulid.ULID, 0) - for _, sst := range dbState.compacted[0].SSTList { + for _, sst := range dbState.Compacted[0].SSTList { id, ok := sst.Id.CompactedID().Get() assert.True(t, ok) compactedSSTIDs = append(compactedSSTIDs, id) } assert.False(t, slices.Contains(compactedSSTIDs, l0ID)) - assert.Equal(t, l0IDsToCompact[0].sstID(), dbState.l0LastCompacted) + assert.Equal(t, l0IDsToCompact[0].sstID(), dbState.L0LastCompacted) } func buildTestDB(options DBOptions) (objstore.Bucket, *ManifestStore, *store.TableStore, *DB) { diff --git a/slatedb/db.go b/slatedb/db.go index b0f1820..7ea1b04 100644 --- a/slatedb/db.go +++ b/slatedb/db.go @@ -10,6 +10,7 @@ import ( "github.com/slatedb/slatedb-go/internal/sstable" "github.com/slatedb/slatedb-go/internal/types" "github.com/slatedb/slatedb-go/slatedb/levels" + "github.com/slatedb/slatedb-go/slatedb/state" "github.com/slatedb/slatedb-go/slatedb/store" "log/slog" "math" @@ -26,7 +27,7 @@ type DB struct { tableStore *store.TableStore compactor *Compactor opts DBOptions - state *DBState + state *state.DBState // walFlushNotifierCh - When DB.Close is called, we send a notification to this channel // and the goroutine running the walFlush task reads this channel and shuts down @@ -63,12 +64,12 @@ func OpenWithOptions(ctx context.Context, path string, bucket objstore.Bucket, o } memtableFlushNotifierCh := make(chan MemtableFlushThreadMsg, math.MaxUint8) - state, err := manifest.dbState() + dbState, err := manifest.dbState() if err != nil { return nil, err } - db, err := newDB(ctx, options, tableStore, state, memtableFlushNotifierCh) + db, err := newDB(ctx, options, tableStore, dbState.ToCoreState(), memtableFlushNotifierCh) if err != nil { return nil, fmt.Errorf("during db init: %w", err) } @@ -138,16 +139,16 @@ func (db *DB) Get(ctx context.Context, key []byte) ([]byte, error) { // if readlevel is Committed we start searching key in the following order // mutable memtable, immutable memtables, SSTs in L0, compacted Sorted runs func (db *DB) GetWithOptions(ctx context.Context, key []byte, options ReadOptions) ([]byte, error) { - snapshot := db.state.snapshot() + snapshot := db.state.Snapshot() if options.ReadLevel == Uncommitted { // search for key in mutable WAL - val, ok := snapshot.wal.Get(key).Get() + val, ok := snapshot.Wal.Get(key).Get() if ok { // key is present or tombstoned return checkValue(val) } // search for key in ImmutableWALs - immWALList := snapshot.immWALs + immWALList := snapshot.ImmWALs for i := 0; i < immWALList.Len(); i++ { immWAL := immWALList.At(i) val, ok := immWAL.Get(key).Get() @@ -158,12 +159,12 @@ func (db *DB) GetWithOptions(ctx context.Context, key []byte, options ReadOption } // search for key in mutable memtable - val, ok := snapshot.memtable.Get(key).Get() + val, ok := snapshot.Memtable.Get(key).Get() if ok { // key is present or tombstoned return checkValue(val) } // search for key in Immutable memtables - immMemtables := snapshot.immMemtables + immMemtables := snapshot.ImmMemtables for i := 0; i < immMemtables.Len(); i++ { immTable := immMemtables.At(i) val, ok := immTable.Get(key).Get() @@ -173,7 +174,7 @@ func (db *DB) GetWithOptions(ctx context.Context, key []byte, options ReadOption } // search for key in SSTs in L0 - for _, sst := range snapshot.core.l0 { + for _, sst := range snapshot.Core.L0 { if db.sstMayIncludeKey(sst, key) { iter, err := sstable.NewIteratorAtKey(&sst, key, db.tableStore.Clone()) if err != nil { @@ -188,7 +189,7 @@ func (db *DB) GetWithOptions(ctx context.Context, key []byte, options ReadOption } // search for key in compacted Sorted runs - for _, sr := range snapshot.core.compacted { + for _, sr := range snapshot.Core.Compacted { if db.srMayIncludeKey(sr, key) { iter, err := levels.NewSortedRunIteratorFromKey(sr, key, db.tableStore.Clone()) if err != nil { @@ -288,7 +289,7 @@ func (db *DB) replayWAL(ctx context.Context) error { db.maybeFreezeMemtable(db.state, sstID) if db.state.NextWALID() == sstID { - db.state.incrementNextWALID() + db.state.IncrementNextWALID() } } @@ -296,11 +297,11 @@ func (db *DB) replayWAL(ctx context.Context) error { return nil } -func (db *DB) maybeFreezeMemtable(state *DBState, walID uint64) { - if state.Memtable().Size() < int64(db.opts.L0SSTSizeBytes) { +func (db *DB) maybeFreezeMemtable(dbState *state.DBState, walID uint64) { + if dbState.Memtable().Size() < int64(db.opts.L0SSTSizeBytes) { return } - state.freezeMemtable(walID) + dbState.FreezeMemtable(walID) db.memtableFlushNotifierCh <- FlushImmutableMemtables } @@ -313,7 +314,7 @@ func (db *DB) FlushMemtableToL0() error { } walID, _ := lastWalID.Get() - db.state.freezeMemtable(walID) + db.state.FreezeMemtable(walID) flusher := MemtableFlusher{ db: db, @@ -334,7 +335,7 @@ func getManifest(manifestStore *ManifestStore) (*FenceableManifest, error) { if ok { storedManifest = &sm } else { - storedManifest, err = newStoredManifest(manifestStore, newCoreDBState()) + storedManifest, err = newStoredManifest(manifestStore, state.NewCoreDBState()) if err != nil { return nil, err } @@ -347,13 +348,13 @@ func newDB( ctx context.Context, options DBOptions, tableStore *store.TableStore, - coreDBState *CoreDBState, + coreDBState *state.CoreDBState, memtableFlushNotifierCh chan<- MemtableFlushThreadMsg, ) (*DB, error) { - state := newDBState(coreDBState) + dbState := state.NewDBState(coreDBState) db := &DB{ - state: state, + state: dbState, opts: options, tableStore: tableStore, memtableFlushNotifierCh: memtableFlushNotifierCh, diff --git a/slatedb/db_test.go b/slatedb/db_test.go index 689b35a..21a2bb6 100644 --- a/slatedb/db_test.go +++ b/slatedb/db_test.go @@ -7,6 +7,7 @@ import ( "github.com/slatedb/slatedb-go/internal/compress" "github.com/slatedb/slatedb-go/internal/sstable" "github.com/slatedb/slatedb-go/internal/types" + "github.com/slatedb/slatedb-go/slatedb/state" "github.com/slatedb/slatedb-go/slatedb/store" "github.com/stretchr/testify/require" "math" @@ -124,16 +125,16 @@ func TestPutFlushesMemtable(t *testing.T) { value = repeatedChar(rune('k'+i), 50) db.Put(key, value) - dbState := waitForManifestCondition(storedManifest, time.Second*30, func(state *CoreDBState) bool { - return state.lastCompactedWalSSTID.Load() > lastCompacted + dbState := waitForManifestCondition(storedManifest, time.Second*30, func(state *state.CoreStateSnapshot) bool { + return state.LastCompactedWalSSTID.Load() > lastCompacted }) - assert.Equal(t, uint64(i*2+2), dbState.lastCompactedWalSSTID.Load()) - lastCompacted = dbState.lastCompactedWalSSTID.Load() + assert.Equal(t, uint64(i*2+2), dbState.LastCompactedWalSSTID.Load()) + lastCompacted = dbState.LastCompactedWalSSTID.Load() } dbState, err := storedManifest.refresh() require.NoError(t, err) - l0 := dbState.l0 + l0 := dbState.L0 ctx := context.Background() assert.Equal(t, 3, len(l0)) for i := 0; i < 3; i++ { @@ -308,7 +309,7 @@ func TestBasicRestore(t *testing.T) { storedManifest, _ := stored.Get() dbState := storedManifest.dbState() - assert.Equal(t, uint64(sstCount+2*l0Count+1), dbState.nextWalSstID.Load()) + assert.Equal(t, uint64(sstCount+2*l0Count+1), dbState.NextWalSstID.Load()) } func TestShouldReadUncommittedIfReadLevelUncommitted(t *testing.T) { @@ -379,10 +380,10 @@ func TestSnapshotState(t *testing.T) { db, err = OpenWithOptions(context.Background(), dbPath, bucket, testDBOptions(0, 128)) require.NoError(t, err) defer db.Close() - snapshot := db.state.snapshot() - assert.Equal(t, uint64(2), snapshot.core.lastCompactedWalSSTID.Load()) - assert.Equal(t, uint64(3), snapshot.core.nextWalSstID.Load()) - assert.Equal(t, 2, len(snapshot.core.l0)) + snapshot := db.state.Snapshot() + assert.Equal(t, uint64(2), snapshot.Core.LastCompactedWalSSTID.Load()) + assert.Equal(t, uint64(3), snapshot.Core.NextWalSstID.Load()) + assert.Equal(t, 2, len(snapshot.Core.L0)) val1, err := db.Get(context.Background(), key1) require.NoError(t, err) @@ -438,8 +439,8 @@ func doTestShouldReadCompactedDB(t *testing.T, options DBOptions) { db.Put(repeatedChar(rune('a'+i), 32), bytes.Repeat([]byte{byte(1 + i)}, 32)) db.Put(repeatedChar(rune('m'+i), 32), bytes.Repeat([]byte{byte(13 + i)}, 32)) } - waitForManifestCondition(storedManifest, time.Second*10, func(state *CoreDBState) bool { - return state.l0LastCompacted.IsPresent() && len(state.l0) == 0 + waitForManifestCondition(storedManifest, time.Second*10, func(state *state.CoreStateSnapshot) bool { + return state.L0LastCompacted.IsPresent() && len(state.L0) == 0 }) // write more l0s and wait for compaction @@ -447,8 +448,8 @@ func doTestShouldReadCompactedDB(t *testing.T, options DBOptions) { db.Put(repeatedChar(rune('f'+i), 32), bytes.Repeat([]byte{byte(6 + i)}, 32)) db.Put(repeatedChar(rune('s'+i), 32), bytes.Repeat([]byte{byte(19 + i)}, 32)) } - waitForManifestCondition(storedManifest, time.Second*10, func(state *CoreDBState) bool { - return state.l0LastCompacted.IsPresent() && len(state.l0) == 0 + waitForManifestCondition(storedManifest, time.Second*10, func(state *state.CoreStateSnapshot) bool { + return state.L0LastCompacted.IsPresent() && len(state.L0) == 0 }) // write another l0 @@ -503,8 +504,8 @@ func doTestDeleteAndWaitForCompaction(t *testing.T, options DBOptions) { db.Put(repeatedChar(rune('a'+i), 32), bytes.Repeat([]byte{byte(1 + i)}, 32)) db.Put(repeatedChar(rune('m'+i), 32), bytes.Repeat([]byte{byte(13 + i)}, 32)) } - waitForManifestCondition(storedManifest, time.Second*10, func(state *CoreDBState) bool { - return state.l0LastCompacted.IsPresent() && len(state.l0) == 0 + waitForManifestCondition(storedManifest, time.Second*10, func(state *state.CoreStateSnapshot) bool { + return state.L0LastCompacted.IsPresent() && len(state.L0) == 0 }) // Delete existing keys @@ -517,8 +518,8 @@ func doTestDeleteAndWaitForCompaction(t *testing.T, options DBOptions) { db.Put(repeatedChar(rune('f'+i), 32), bytes.Repeat([]byte{byte(6 + i)}, 32)) db.Put(repeatedChar(rune('s'+i), 32), bytes.Repeat([]byte{byte(19 + i)}, 32)) } - waitForManifestCondition(storedManifest, time.Second*10, func(state *CoreDBState) bool { - return state.l0LastCompacted.IsPresent() && len(state.l0) == 0 + waitForManifestCondition(storedManifest, time.Second*10, func(state *state.CoreStateSnapshot) bool { + return state.L0LastCompacted.IsPresent() && len(state.L0) == 0 }) // verify that keys are deleted @@ -543,14 +544,14 @@ func doTestDeleteAndWaitForCompaction(t *testing.T, options DBOptions) { func waitForManifestCondition( sm StoredManifest, timeout time.Duration, - cond func(state *CoreDBState) bool, -) *CoreDBState { + cond func(state *state.CoreStateSnapshot) bool, +) *state.CoreStateSnapshot { start := time.Now() for time.Since(start) < timeout { dbState, err := sm.refresh() assert2.True(err == nil, "") if cond(dbState) { - return dbState.clone() + return dbState.Clone() } time.Sleep(time.Millisecond * 10) } diff --git a/slatedb/flatbuf_types.go b/slatedb/flatbuf_types.go index 8310890..1dc931a 100644 --- a/slatedb/flatbuf_types.go +++ b/slatedb/flatbuf_types.go @@ -11,6 +11,7 @@ import ( "github.com/slatedb/slatedb-go/internal/flatbuf" "github.com/slatedb/slatedb-go/internal/sstable" "github.com/slatedb/slatedb-go/slatedb/levels" + "github.com/slatedb/slatedb-go/slatedb/state" ) // ------------------------------------------------ @@ -33,22 +34,22 @@ func (f FlatBufferManifestCodec) decode(data []byte) (*Manifest, error) { } func (f FlatBufferManifestCodec) manifest(manifest *flatbuf.ManifestV1T) *Manifest { - core := &CoreDBState{ - l0: f.parseFlatBufSSTList(manifest.L0), - compacted: f.parseFlatBufSortedRuns(manifest.Compacted), + core := &state.CoreStateSnapshot{ + L0: f.parseFlatBufSSTList(manifest.L0), + Compacted: f.parseFlatBufSortedRuns(manifest.Compacted), } - core.nextWalSstID.Store(manifest.WalIdLastSeen + 1) - core.lastCompactedWalSSTID.Store(manifest.WalIdLastCompacted) + core.NextWalSstID.Store(manifest.WalIdLastSeen + 1) + core.LastCompactedWalSSTID.Store(manifest.WalIdLastCompacted) l0LastCompacted := f.parseFlatBufSSTId(manifest.L0LastCompacted) if l0LastCompacted == ulid.Zero { - core.l0LastCompacted = mo.None[ulid.ULID]() + core.L0LastCompacted = mo.None[ulid.ULID]() } else { - core.l0LastCompacted = mo.Some(l0LastCompacted) + core.L0LastCompacted = mo.Some(l0LastCompacted) } m := &Manifest{} - m.core = core + m.core = core.ToCoreState() m.writerEpoch.Store(manifest.WriterEpoch) m.compactorEpoch.Store(manifest.CompactorEpoch) return m @@ -115,21 +116,21 @@ func NewDBFlatBufferBuilder(builder *flatbuffers.Builder) DBFlatBufferBuilder { } func (fb *DBFlatBufferBuilder) createManifest(manifest *Manifest) []byte { - core := manifest.core - l0 := fb.sstListToFlatBuf(core.l0) + core := manifest.core.Snapshot() + l0 := fb.sstListToFlatBuf(core.L0) var l0LastCompacted *flatbuf.CompactedSstIdT - if core.l0LastCompacted.IsPresent() { - id, _ := core.l0LastCompacted.Get() + if core.L0LastCompacted.IsPresent() { + id, _ := core.L0LastCompacted.Get() l0LastCompacted = fb.compactedSSTID(id) } - compacted := fb.sortedRunsToFlatBuf(core.compacted) + compacted := fb.sortedRunsToFlatBuf(core.Compacted) manifestV1 := flatbuf.ManifestV1T{ ManifestId: 0, WriterEpoch: manifest.writerEpoch.Load(), CompactorEpoch: manifest.compactorEpoch.Load(), - WalIdLastCompacted: core.lastCompactedWalSSTID.Load(), - WalIdLastSeen: core.nextWalSstID.Load() - 1, + WalIdLastCompacted: core.LastCompactedWalSSTID.Load(), + WalIdLastSeen: core.NextWalSstID.Load() - 1, L0LastCompacted: l0LastCompacted, L0: l0, Compacted: compacted, diff --git a/slatedb/flush.go b/slatedb/flush.go index 9835695..e6e3f4a 100644 --- a/slatedb/flush.go +++ b/slatedb/flush.go @@ -38,7 +38,7 @@ func (db *DB) spawnWALFlushTask(walFlushNotifierCh <-chan bool, walFlushTaskWG * // 1. Convert mutable WAL to Immutable WAL // 2. Flush each Immutable WAL to object store and then to memtable func (db *DB) FlushWAL() error { - db.state.freezeWAL() + db.state.FreezeWAL() err := db.flushImmWALs() if err != nil { return err @@ -53,7 +53,7 @@ func (db *DB) FlushWAL() error { // Notify any client(with AwaitDurable set to true) that flush has happened func (db *DB) flushImmWALs() error { for { - oldestWal := db.state.oldestImmWAL() + oldestWal := db.state.OldestImmWAL() if oldestWal.IsAbsent() { break } @@ -64,7 +64,7 @@ func (db *DB) flushImmWALs() error { if err != nil { return err } - db.state.popImmWAL() + db.state.PopImmWAL() // flush to the memtable before notifying so that data is available for reads db.flushImmWALToMemtable(immWal, db.state.Memtable()) @@ -194,12 +194,12 @@ func (m *MemtableFlusher) loadManifest() error { if err != nil { return err } - m.db.state.refreshDBState(currentManifest) + m.db.state.RefreshDBState(currentManifest) return nil } func (m *MemtableFlusher) writeManifest() error { - core := m.db.state.coreStateClone() + core := m.db.state.CoreStateSnapshot() return m.manifest.updateDBState(core) } @@ -223,7 +223,7 @@ func (m *MemtableFlusher) writeManifestSafely() error { func (m *MemtableFlusher) flushImmMemtablesToL0() error { for { - immMemtable := m.db.state.oldestImmMemtable() + immMemtable := m.db.state.OldestImmMemtable() if immMemtable.IsAbsent() { break } @@ -234,7 +234,7 @@ func (m *MemtableFlusher) flushImmMemtablesToL0() error { return err } - m.db.state.moveImmMemtableToL0(immMemtable.MustGet(), sstHandle) + m.db.state.MoveImmMemtableToL0(immMemtable.MustGet(), sstHandle) err = m.writeManifestSafely() if err != nil { return err diff --git a/slatedb/manifest.go b/slatedb/manifest.go index 1b54f11..e14ef65 100644 --- a/slatedb/manifest.go +++ b/slatedb/manifest.go @@ -1,9 +1,12 @@ package slatedb -import "sync/atomic" +import ( + "github.com/slatedb/slatedb-go/slatedb/state" + "sync/atomic" +) type Manifest struct { - core *CoreDBState + core *state.CoreDBState writerEpoch atomic.Uint64 compactorEpoch atomic.Uint64 } diff --git a/slatedb/manifest_store.go b/slatedb/manifest_store.go index 8b10225..1597123 100644 --- a/slatedb/manifest_store.go +++ b/slatedb/manifest_store.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/samber/mo" "github.com/slatedb/slatedb-go/slatedb/common" + "github.com/slatedb/slatedb-go/slatedb/state" "github.com/thanos-io/objstore" "path" "slices" @@ -69,7 +70,7 @@ func initFenceableManifestCompactor(storedManifest *StoredManifest) (*FenceableM return fm, nil } -func (f *FenceableManifest) dbState() (*CoreDBState, error) { +func (f *FenceableManifest) dbState() (*state.CoreStateSnapshot, error) { err := f.checkEpoch() if err != nil { return nil, err @@ -77,7 +78,7 @@ func (f *FenceableManifest) dbState() (*CoreDBState, error) { return f.storedManifest.dbState(), nil } -func (f *FenceableManifest) updateDBState(dbState *CoreDBState) error { +func (f *FenceableManifest) updateDBState(dbState *state.CoreStateSnapshot) error { err := f.checkEpoch() if err != nil { return err @@ -85,7 +86,7 @@ func (f *FenceableManifest) updateDBState(dbState *CoreDBState) error { return f.storedManifest.updateDBState(dbState) } -func (f *FenceableManifest) refresh() (*CoreDBState, error) { +func (f *FenceableManifest) refresh() (*state.CoreStateSnapshot, error) { _, err := f.storedManifest.refresh() if err != nil { return nil, err @@ -128,7 +129,7 @@ type StoredManifest struct { manifestStore *ManifestStore } -func newStoredManifest(store *ManifestStore, core *CoreDBState) (*StoredManifest, error) { +func newStoredManifest(store *ManifestStore, core *state.CoreDBState) (*StoredManifest, error) { manifest := &Manifest{ core: core, } @@ -161,14 +162,14 @@ func loadStoredManifest(store *ManifestStore) (mo.Option[StoredManifest], error) }), nil } -func (s *StoredManifest) dbState() *CoreDBState { - return s.manifest.core +func (s *StoredManifest) dbState() *state.CoreStateSnapshot { + return s.manifest.core.Snapshot() } // write Manifest with updated DB state to object store and update StoredManifest with the new manifest -func (s *StoredManifest) updateDBState(core *CoreDBState) error { +func (s *StoredManifest) updateDBState(coreSnapshot *state.CoreStateSnapshot) error { manifest := &Manifest{ - core: core, + core: coreSnapshot.ToCoreState(), } manifest.writerEpoch.Store(s.manifest.writerEpoch.Load()) manifest.compactorEpoch.Store(s.manifest.compactorEpoch.Load()) @@ -188,7 +189,7 @@ func (s *StoredManifest) updateManifest(manifest *Manifest) error { } // read latest manifest from object store and update StoredManifest with the latest manifest. -func (s *StoredManifest) refresh() (*CoreDBState, error) { +func (s *StoredManifest) refresh() (*state.CoreStateSnapshot, error) { stored, err := s.manifestStore.readLatestManifest() if err != nil { return nil, err @@ -200,7 +201,7 @@ func (s *StoredManifest) refresh() (*CoreDBState, error) { storedInfo, _ := stored.Get() s.manifest = storedInfo.manifest s.id = storedInfo.id - return s.manifest.core, nil + return s.dbState(), nil } // ------------------------------------------------ diff --git a/slatedb/manifest_store_test.go b/slatedb/manifest_store_test.go index 6ba73cb..28bfdda 100644 --- a/slatedb/manifest_store_test.go +++ b/slatedb/manifest_store_test.go @@ -2,6 +2,7 @@ package slatedb import ( "github.com/slatedb/slatedb-go/slatedb/common" + "github.com/slatedb/slatedb-go/slatedb/state" "github.com/stretchr/testify/assert" "github.com/thanos-io/objstore" "testing" @@ -10,9 +11,9 @@ import ( func TestShouldFailWriteOnVersionConflict(t *testing.T) { bucket := objstore.NewInMemBucket() manifestStore := newManifestStore(rootPath, bucket) - state := newCoreDBState() + coreState := state.NewCoreDBState() - sm, err := newStoredManifest(manifestStore, state) + sm, err := newStoredManifest(manifestStore, coreState) assert.NoError(t, err) storedManifest, err := loadStoredManifest(manifestStore) @@ -20,22 +21,22 @@ func TestShouldFailWriteOnVersionConflict(t *testing.T) { sm2, ok := storedManifest.Get() assert.True(t, ok) - err = sm.updateDBState(state) + err = sm.updateDBState(coreState.Snapshot()) assert.NoError(t, err) - err = sm2.updateDBState(state) + err = sm2.updateDBState(coreState.Snapshot()) assert.ErrorIs(t, err, common.ErrManifestVersionExists) } func TestShouldWriteWithNewVersion(t *testing.T) { bucket := objstore.NewInMemBucket() manifestStore := newManifestStore(rootPath, bucket) - state := newCoreDBState() + coreState := state.NewCoreDBState() - sm, err := newStoredManifest(manifestStore, state) + sm, err := newStoredManifest(manifestStore, coreState) assert.NoError(t, err) - err = sm.updateDBState(state) + err = sm.updateDBState(coreState.Snapshot()) assert.NoError(t, err) info, err := manifestStore.readLatestManifest() @@ -49,23 +50,24 @@ func TestShouldWriteWithNewVersion(t *testing.T) { func TestShouldUpdateLocalStateOnWrite(t *testing.T) { bucket := objstore.NewInMemBucket() manifestStore := newManifestStore(rootPath, bucket) - state := newCoreDBState() + coreState := state.NewCoreDBState() - sm, err := newStoredManifest(manifestStore, state) + sm, err := newStoredManifest(manifestStore, coreState) assert.NoError(t, err) - state.nextWalSstID.Store(123) - err = sm.updateDBState(state) + core := coreState.Snapshot() + core.NextWalSstID.Store(123) + err = sm.updateDBState(core) assert.NoError(t, err) - assert.Equal(t, uint64(123), sm.dbState().nextWalSstID.Load()) + assert.Equal(t, uint64(123), sm.dbState().NextWalSstID.Load()) } func TestShouldRefresh(t *testing.T) { bucket := objstore.NewInMemBucket() manifestStore := newManifestStore(rootPath, bucket) - state := newCoreDBState() + coreState := state.NewCoreDBState() - sm, err := newStoredManifest(manifestStore, state) + sm, err := newStoredManifest(manifestStore, coreState) assert.NoError(t, err) storedManifest, err := loadStoredManifest(manifestStore) @@ -73,22 +75,23 @@ func TestShouldRefresh(t *testing.T) { sm2, ok := storedManifest.Get() assert.True(t, ok) - state.nextWalSstID.Store(123) - err = sm.updateDBState(state) + core := coreState.Snapshot() + core.NextWalSstID.Store(123) + err = sm.updateDBState(core) assert.NoError(t, err) refreshed, err := sm2.refresh() assert.NoError(t, err) - assert.Equal(t, uint64(123), refreshed.nextWalSstID.Load()) - assert.Equal(t, uint64(123), sm.dbState().nextWalSstID.Load()) + assert.Equal(t, uint64(123), refreshed.NextWalSstID.Load()) + assert.Equal(t, uint64(123), sm.dbState().NextWalSstID.Load()) } func TestShouldBumpWriterEpoch(t *testing.T) { bucket := objstore.NewInMemBucket() manifestStore := newManifestStore(rootPath, bucket) - state := newCoreDBState() + coreState := state.NewCoreDBState() - _, err := newStoredManifest(manifestStore, state) + _, err := newStoredManifest(manifestStore, coreState) assert.NoError(t, err) for i := 1; i <= 5; i++ { @@ -111,9 +114,9 @@ func TestShouldBumpWriterEpoch(t *testing.T) { func TestShouldFailOnWriterFenced(t *testing.T) { bucket := objstore.NewInMemBucket() manifestStore := newManifestStore(rootPath, bucket) - state := newCoreDBState() + coreState := state.NewCoreDBState() - sm, err := newStoredManifest(manifestStore, state) + sm, err := newStoredManifest(manifestStore, coreState) assert.NoError(t, err) writer1, err := initFenceableManifestWriter(sm) assert.NoError(t, err) @@ -127,21 +130,22 @@ func TestShouldFailOnWriterFenced(t *testing.T) { _, err = writer1.refresh() assert.ErrorIs(t, err, common.ErrFenced) - state.nextWalSstID.Store(123) - err = writer1.updateDBState(state) + core := coreState.Snapshot() + core.NextWalSstID.Store(123) + err = writer1.updateDBState(core) assert.ErrorIs(t, err, common.ErrFenced) refreshed, err := writer2.refresh() assert.NoError(t, err) - assert.Equal(t, uint64(1), refreshed.nextWalSstID.Load()) + assert.Equal(t, uint64(1), refreshed.NextWalSstID.Load()) } func TestShouldBumpCompactorEpoch(t *testing.T) { bucket := objstore.NewInMemBucket() manifestStore := newManifestStore(rootPath, bucket) - state := newCoreDBState() + coreState := state.NewCoreDBState() - _, err := newStoredManifest(manifestStore, state) + _, err := newStoredManifest(manifestStore, coreState) assert.NoError(t, err) for i := 1; i <= 5; i++ { @@ -164,9 +168,9 @@ func TestShouldBumpCompactorEpoch(t *testing.T) { func TestShouldFailOnCompactorFenced(t *testing.T) { bucket := objstore.NewInMemBucket() manifestStore := newManifestStore(rootPath, bucket) - state := newCoreDBState() + coreState := state.NewCoreDBState() - sm, err := newStoredManifest(manifestStore, state) + sm, err := newStoredManifest(manifestStore, coreState) assert.NoError(t, err) compactor1, err := initFenceableManifestCompactor(sm) assert.NoError(t, err) @@ -180,11 +184,12 @@ func TestShouldFailOnCompactorFenced(t *testing.T) { _, err = compactor1.refresh() assert.ErrorIs(t, err, common.ErrFenced) - state.nextWalSstID.Store(123) - err = compactor1.updateDBState(state) + core := coreState.Snapshot() + core.NextWalSstID.Store(123) + err = compactor1.updateDBState(core) assert.ErrorIs(t, err, common.ErrFenced) refreshed, err := compactor2.refresh() assert.NoError(t, err) - assert.Equal(t, uint64(1), refreshed.nextWalSstID.Load()) + assert.Equal(t, uint64(1), refreshed.NextWalSstID.Load()) } diff --git a/slatedb/size_tiered_compaction.go b/slatedb/size_tiered_compaction.go index 683005a..573faff 100644 --- a/slatedb/size_tiered_compaction.go +++ b/slatedb/size_tiered_compaction.go @@ -10,17 +10,17 @@ func (s SizeTieredCompactionScheduler) maybeScheduleCompaction(state *CompactorS dbState := state.dbState // for now, just compact l0 down to a new sorted run each time compactions := make([]Compaction, 0) - if len(dbState.l0) >= 4 { + if len(dbState.L0) >= 4 { sources := make([]SourceID, 0) - for _, sst := range dbState.l0 { + for _, sst := range dbState.L0 { id, ok := sst.Id.CompactedID().Get() assert.True(ok, "Expected valid compacted ID") sources = append(sources, newSourceIDSST(id)) } nextSortedRunID := uint32(0) - if len(dbState.compacted) > 0 { - nextSortedRunID = dbState.compacted[0].ID + 1 + if len(dbState.Compacted) > 0 { + nextSortedRunID = dbState.Compacted[0].ID + 1 } compactions = append(compactions, newCompaction(sources, nextSortedRunID)) diff --git a/slatedb/db_state.go b/slatedb/state/db_state.go similarity index 68% rename from slatedb/db_state.go rename to slatedb/state/db_state.go index 205576f..475a4f2 100644 --- a/slatedb/db_state.go +++ b/slatedb/state/db_state.go @@ -1,4 +1,4 @@ -package slatedb +package state import ( "github.com/slatedb/slatedb-go/internal/assert" @@ -35,7 +35,46 @@ type CoreDBState struct { lastCompactedWalSSTID atomic.Uint64 } -func newCoreDBState() *CoreDBState { +type CoreStateSnapshot struct { + L0LastCompacted mo.Option[ulid.ULID] + L0 []sstable.Handle + Compacted []levels.SortedRun + NextWalSstID atomic.Uint64 + LastCompactedWalSSTID atomic.Uint64 +} + +func (s *CoreStateSnapshot) ToCoreState() *CoreDBState { + snapshot := s.Clone() + coreState := &CoreDBState{ + l0LastCompacted: snapshot.L0LastCompacted, + l0: snapshot.L0, + compacted: snapshot.Compacted, + } + coreState.nextWalSstID.Store(s.NextWalSstID.Load()) + coreState.lastCompactedWalSSTID.Store(s.LastCompactedWalSSTID.Load()) + return coreState +} + +func (s *CoreStateSnapshot) Clone() *CoreStateSnapshot { + l0 := make([]sstable.Handle, 0, len(s.L0)) + for _, sst := range s.L0 { + l0 = append(l0, *sst.Clone()) + } + compacted := make([]levels.SortedRun, 0, len(s.Compacted)) + for _, sr := range s.Compacted { + compacted = append(compacted, *sr.Clone()) + } + snapshot := &CoreStateSnapshot{ + L0LastCompacted: s.L0LastCompacted, + L0: l0, + Compacted: compacted, + } + snapshot.NextWalSstID.Store(s.NextWalSstID.Load()) + snapshot.LastCompactedWalSSTID.Store(s.LastCompactedWalSSTID.Load()) + return snapshot +} + +func NewCoreDBState() *CoreDBState { coreState := &CoreDBState{ l0LastCompacted: mo.None[ulid.ULID](), l0: make([]sstable.Handle, 0), @@ -46,7 +85,7 @@ func newCoreDBState() *CoreDBState { return coreState } -func (c *CoreDBState) clone() *CoreDBState { +func (c *CoreDBState) Snapshot() *CoreStateSnapshot { l0 := make([]sstable.Handle, 0, len(c.l0)) for _, sst := range c.l0 { l0 = append(l0, *sst.Clone()) @@ -55,13 +94,13 @@ func (c *CoreDBState) clone() *CoreDBState { for _, sr := range c.compacted { compacted = append(compacted, *sr.Clone()) } - coreState := &CoreDBState{ - l0LastCompacted: c.l0LastCompacted, - l0: l0, - compacted: compacted, + coreState := &CoreStateSnapshot{ + L0LastCompacted: c.l0LastCompacted, + L0: l0, + Compacted: compacted, } - coreState.nextWalSstID.Store(c.nextWalSstID.Load()) - coreState.lastCompactedWalSSTID.Store(c.lastCompactedWalSSTID.Load()) + coreState.NextWalSstID.Store(c.nextWalSstID.Load()) + coreState.LastCompactedWalSSTID.Store(c.lastCompactedWalSSTID.Load()) return coreState } @@ -75,11 +114,11 @@ func LogState(log *slog.Logger, c *CoreDBState) { // DBStateSnapshot contains state required for read methods (eg. GET) type DBStateSnapshot struct { - wal *table.WAL - memtable *table.Memtable - immWALs *deque.Deque[*table.ImmutableWAL] - immMemtables *deque.Deque[*table.ImmutableMemtable] - core *CoreDBState + Wal *table.WAL + Memtable *table.Memtable + ImmWALs *deque.Deque[*table.ImmutableWAL] + ImmMemtables *deque.Deque[*table.ImmutableMemtable] + Core *CoreStateSnapshot } type DBState struct { @@ -91,7 +130,7 @@ type DBState struct { core *CoreDBState } -func newDBState(coreDBState *CoreDBState) *DBState { +func NewDBState(coreDBState *CoreDBState) *DBState { return &DBState{ wal: table.NewWAL(), memtable: table.NewMemtable(), @@ -165,26 +204,26 @@ func (s *DBState) DeleteKVFromMemtable(key []byte) { s.memtable.Delete(key) } -func (s *DBState) coreStateClone() *CoreDBState { +func (s *DBState) CoreStateSnapshot() *CoreStateSnapshot { s.RLock() defer s.RUnlock() - return s.core.clone() + return s.core.Snapshot() } -func (s *DBState) snapshot() *DBStateSnapshot { +func (s *DBState) Snapshot() *DBStateSnapshot { s.RLock() defer s.RUnlock() return &DBStateSnapshot{ - wal: s.wal.Clone(), - memtable: s.memtable.Clone(), - immWALs: common.CopyDeque(s.immWALs), - immMemtables: common.CopyDeque(s.immMemtables), - core: s.core.clone(), + Wal: s.wal.Clone(), + Memtable: s.memtable.Clone(), + ImmWALs: common.CopyDeque(s.immWALs), + ImmMemtables: common.CopyDeque(s.immMemtables), + Core: s.core.Snapshot(), } } -func (s *DBState) freezeMemtable(walID uint64) { +func (s *DBState) FreezeMemtable(walID uint64) { s.Lock() defer s.Unlock() @@ -195,7 +234,7 @@ func (s *DBState) freezeMemtable(walID uint64) { s.immMemtables.PushFront(immMemtable) } -func (s *DBState) freezeWAL() mo.Option[uint64] { +func (s *DBState) FreezeWAL() mo.Option[uint64] { s.Lock() defer s.Unlock() @@ -212,14 +251,14 @@ func (s *DBState) freezeWAL() mo.Option[uint64] { return mo.Some(immWAL.ID()) } -func (s *DBState) popImmWAL() { +func (s *DBState) PopImmWAL() { s.Lock() defer s.Unlock() s.immWALs.PopBack() } -func (s *DBState) oldestImmWAL() mo.Option[*table.ImmutableWAL] { +func (s *DBState) OldestImmWAL() mo.Option[*table.ImmutableWAL] { s.RLock() defer s.RUnlock() @@ -229,7 +268,7 @@ func (s *DBState) oldestImmWAL() mo.Option[*table.ImmutableWAL] { return mo.Some(s.immWALs.Back()) } -func (s *DBState) oldestImmMemtable() mo.Option[*table.ImmutableMemtable] { +func (s *DBState) OldestImmMemtable() mo.Option[*table.ImmutableMemtable] { s.RLock() defer s.RUnlock() @@ -239,7 +278,7 @@ func (s *DBState) oldestImmMemtable() mo.Option[*table.ImmutableMemtable] { return mo.Some(s.immMemtables.Back()) } -func (s *DBState) moveImmMemtableToL0(immMemtable *table.ImmutableMemtable, sstHandle *sstable.Handle) { +func (s *DBState) MoveImmMemtableToL0(immMemtable *table.ImmutableMemtable, sstHandle *sstable.Handle) { s.Lock() defer s.Unlock() @@ -250,16 +289,16 @@ func (s *DBState) moveImmMemtableToL0(immMemtable *table.ImmutableMemtable, sstH s.core.lastCompactedWalSSTID.Store(immMemtable.LastWalID()) } -func (s *DBState) incrementNextWALID() { +func (s *DBState) IncrementNextWALID() { s.core.nextWalSstID.Add(1) } -func (s *DBState) refreshDBState(compactorState *CoreDBState) { +func (s *DBState) RefreshDBState(compactorState *CoreStateSnapshot) { s.Lock() defer s.Unlock() // copy over L0 up to l0LastCompacted - l0LastCompacted := compactorState.l0LastCompacted + l0LastCompacted := compactorState.L0LastCompacted newL0 := make([]sstable.Handle, 0) for i := 0; i < len(s.core.l0); i++ { sst := s.core.l0[i] @@ -274,5 +313,5 @@ func (s *DBState) refreshDBState(compactorState *CoreDBState) { s.core.l0LastCompacted = l0LastCompacted s.core.l0 = newL0 - s.core.compacted = compactorState.compacted + s.core.compacted = compactorState.Compacted } diff --git a/slatedb/db_state_test.go b/slatedb/state/db_state_test.go similarity index 65% rename from slatedb/db_state_test.go rename to slatedb/state/db_state_test.go index 14c7d43..3710382 100644 --- a/slatedb/db_state_test.go +++ b/slatedb/state/db_state_test.go @@ -1,4 +1,4 @@ -package slatedb +package state import ( "github.com/oklog/ulid/v2" @@ -19,51 +19,51 @@ func addL0sToDBState(dbState *DBState, n uint32) { } for i := 0; i < int(n); i++ { - dbState.freezeMemtable(uint64(i)) - immMemtable := dbState.oldestImmMemtable() + dbState.FreezeMemtable(uint64(i)) + immMemtable := dbState.OldestImmMemtable() if immMemtable.IsAbsent() { break } sst := sstable.NewHandle(sstable.NewIDCompacted(ulid.Make()), sstInfo) - dbState.moveImmMemtableToL0(immMemtable.MustGet(), sst) + dbState.MoveImmMemtableToL0(immMemtable.MustGet(), sst) } } func TestRefreshDBStateWithL0sUptoLastCompacted(t *testing.T) { - dbState := newDBState(newCoreDBState()) + dbState := NewDBState(NewCoreDBState()) addL0sToDBState(dbState, 4) // prepare compactorState indicating that the last SST in L0 gets compacted - compactorState := dbState.coreStateClone() - size := len(compactorState.l0) - lastCompacted := compactorState.l0[size-1] - compactorState.l0 = compactorState.l0[:size-1] + compactorState := dbState.CoreStateSnapshot() + size := len(compactorState.L0) + lastCompacted := compactorState.L0[size-1] + compactorState.L0 = compactorState.L0[:size-1] assert.Equal(t, sstable.Compacted, lastCompacted.Id.Type) id, err := ulid.Parse(lastCompacted.Id.Value) assert.NoError(t, err) - compactorState.l0LastCompacted = mo.Some(id) + compactorState.L0LastCompacted = mo.Some(id) // when refreshDBState is called with the compactorState - dbState.refreshDBState(compactorState) + dbState.RefreshDBState(compactorState) // then verify that the dbState.core is modified to match the given compactorState - assert.Equal(t, len(compactorState.l0), len(dbState.L0())) - for i := 0; i < len(compactorState.l0); i++ { - expected := compactorState.l0[i] + assert.Equal(t, len(compactorState.L0), len(dbState.L0())) + for i := 0; i < len(compactorState.L0); i++ { + expected := compactorState.L0[i] actual := dbState.L0()[i] assert.Equal(t, expected, actual) } - assert.Equal(t, compactorState.l0LastCompacted, dbState.L0LastCompacted()) + assert.Equal(t, compactorState.L0LastCompacted, dbState.L0LastCompacted()) } func TestRefreshDBStateWithAllL0sIfNoneCompacted(t *testing.T) { - dbState := newDBState(newCoreDBState()) + dbState := NewDBState(NewCoreDBState()) addL0sToDBState(dbState, 4) - l0SSTList := dbState.coreStateClone().l0 + l0SSTList := dbState.CoreStateSnapshot().L0 // when refreshDBState is called with no compaction - dbState.refreshDBState(newCoreDBState()) + dbState.RefreshDBState(NewCoreDBState().Snapshot()) // then verify there is no change in dbState L0 assert.Equal(t, len(l0SSTList), len(dbState.L0()))