Skip to content

Commit

Permalink
Merge pull request #57 from naveen246/refactor-state
Browse files Browse the repository at this point in the history
Refactor state
  • Loading branch information
thrawn01 authored Dec 27, 2024
2 parents 59176a2 + 7e6e9fc commit 7857292
Show file tree
Hide file tree
Showing 14 changed files with 306 additions and 254 deletions.
10 changes: 5 additions & 5 deletions slatedb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func loadState(manifest *FenceableManifest) (*CompactorState, error) {
if err != nil {
return nil, err
}
return newCompactorState(dbState.clone(), nil), nil
return newCompactorState(dbState.Clone(), nil), nil
}

func loadCompactionScheduler() CompactionScheduler {
Expand Down Expand Up @@ -234,12 +234,12 @@ func (o *CompactorOrchestrator) startCompaction(compaction Compaction) {
dbState := o.state.dbState

sstsByID := make(map[ulid.ULID]sstable.Handle)
for _, sst := range dbState.l0 {
for _, sst := range dbState.L0 {
id, ok := sst.Id.CompactedID().Get()
assert.True(ok, "expected valid compacted ID")
sstsByID[id] = sst
}
for _, sr := range dbState.compacted {
for _, sr := range dbState.Compacted {
for _, sst := range sr.SSTList {
id, ok := sst.Id.CompactedID().Get()
assert.True(ok, "expected valid compacted ID")
Expand All @@ -248,7 +248,7 @@ func (o *CompactorOrchestrator) startCompaction(compaction Compaction) {
}

srsByID := make(map[uint32]compaction2.SortedRun)
for _, sr := range dbState.compacted {
for _, sr := range dbState.Compacted {
srsByID[sr.ID] = sr
}

Expand Down Expand Up @@ -297,7 +297,7 @@ func (o *CompactorOrchestrator) writeManifest() error {
return err
}

core := o.state.dbState.clone()
core := o.state.dbState.Clone()
err = o.manifest.updateDBState(core)
if errors.Is(err, common.ErrManifestVersionExists) {
o.log.Warn("conflicting manifest version. retry write", "error", err)
Expand Down
33 changes: 17 additions & 16 deletions slatedb/compactor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/slatedb/slatedb-go/internal/assert"
"github.com/slatedb/slatedb-go/internal/sstable"
compaction2 "github.com/slatedb/slatedb-go/slatedb/compaction"
"github.com/slatedb/slatedb-go/slatedb/state"
"log/slog"
"math"
"strconv"
Expand Down Expand Up @@ -89,12 +90,12 @@ func newCompaction(sources []SourceID, destination uint32) Compaction {
// ------------------------------------------------

type CompactorState struct {
dbState *CoreDBState
dbState *state.CoreStateSnapshot
compactions map[uint32]Compaction
log *slog.Logger
}

func newCompactorState(dbState *CoreDBState, log *slog.Logger) *CompactorState {
func newCompactorState(dbState *state.CoreStateSnapshot, log *slog.Logger) *CompactorState {
set.Default(&log, slog.Default())

return &CompactorState{
Expand All @@ -111,7 +112,7 @@ func (c *CompactorState) submitCompaction(compaction Compaction) error {
return common.ErrInvalidCompaction
}

for _, sr := range c.dbState.compacted {
for _, sr := range c.dbState.Compacted {
if sr.ID == compaction.destination {
if !c.oneOfTheSourceSRMatchesDestination(compaction) {
// the compaction overwrites an existing sr but doesn't include the sr
Expand All @@ -138,12 +139,12 @@ func (c *CompactorState) oneOfTheSourceSRMatchesDestination(compaction Compactio
return false
}

func (c *CompactorState) refreshDBState(writerState *CoreDBState) {
func (c *CompactorState) refreshDBState(writerState *state.CoreStateSnapshot) {
// the writer may have added more l0 SSTs. Add these to our l0 list.
lastCompactedL0 := c.dbState.l0LastCompacted
lastCompactedL0 := c.dbState.L0LastCompacted
mergedL0s := make([]sstable.Handle, 0)

for _, writerL0SST := range writerState.l0 {
for _, writerL0SST := range writerState.L0 {
assert.True(writerL0SST.Id.Type == sstable.Compacted, "unexpected sstable.ID type")
writerL0ID, _ := writerL0SST.Id.CompactedID().Get()
// we stop appending to our l0 list if we encounter sstID equal to lastCompactedID
Expand All @@ -154,10 +155,10 @@ func (c *CompactorState) refreshDBState(writerState *CoreDBState) {
mergedL0s = append(mergedL0s, writerL0SST)
}

merged := c.dbState.clone()
merged.l0 = mergedL0s
merged.lastCompactedWalSSTID.Store(writerState.lastCompactedWalSSTID.Load())
merged.nextWalSstID.Store(writerState.nextWalSstID.Load())
merged := c.dbState.Clone()
merged.L0 = mergedL0s
merged.LastCompactedWalSSTID.Store(writerState.LastCompactedWalSSTID.Load())
merged.NextWalSstID.Store(writerState.NextWalSstID.Load())
c.dbState = merged
}

Expand All @@ -183,9 +184,9 @@ func (c *CompactorState) finishCompaction(outputSR *compaction2.SortedRun) {
}
compactionSRs[compaction.destination] = true

dbState := c.dbState.clone()
dbState := c.dbState.Clone()
newL0 := make([]sstable.Handle, 0)
for _, sst := range dbState.l0 {
for _, sst := range dbState.L0 {
assert.True(sst.Id.CompactedID().IsPresent(), "Expected compactedID not present")
l0ID, _ := sst.Id.CompactedID().Get()
_, ok := compactionL0s[l0ID]
Expand All @@ -196,7 +197,7 @@ func (c *CompactorState) finishCompaction(outputSR *compaction2.SortedRun) {

newCompacted := make([]compaction2.SortedRun, 0)
inserted := false
for _, sr := range dbState.compacted {
for _, sr := range dbState.Compacted {
if !inserted && outputSR.ID >= sr.ID {
newCompacted = append(newCompacted, *outputSR)
inserted = true
Expand All @@ -216,11 +217,11 @@ func (c *CompactorState) finishCompaction(outputSR *compaction2.SortedRun) {
firstSource := compaction.sources[0]
if firstSource.sstID().IsPresent() {
compactedL0, _ := firstSource.sstID().Get()
dbState.l0LastCompacted = mo.Some(compactedL0)
dbState.L0LastCompacted = mo.Some(compactedL0)
}

dbState.l0 = newL0
dbState.compacted = newCompacted
dbState.L0 = newL0
dbState.Compacted = newCompacted
c.dbState = dbState
delete(c.compactions, outputSR.ID)
}
Expand Down
117 changes: 59 additions & 58 deletions slatedb/compactor_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
assert2 "github.com/slatedb/slatedb-go/internal/assert"
"github.com/slatedb/slatedb-go/internal/sstable"
compaction2 "github.com/slatedb/slatedb-go/slatedb/compaction"
"github.com/slatedb/slatedb-go/slatedb/state"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
Expand All @@ -15,57 +16,57 @@ import (
var testPath = "/test/db"

func TestShouldRegisterCompactionAsSubmitted(t *testing.T) {
_, _, state := buildTestState(t)
err := state.submitCompaction(buildL0Compaction(state.dbState.l0, 0))
_, _, compactorState := buildTestState(t)
err := compactorState.submitCompaction(buildL0Compaction(compactorState.dbState.L0, 0))
assert.NoError(t, err)

assert.Equal(t, 1, len(state.compactions))
assert.Equal(t, Submitted, state.compactions[0].status)
assert.Equal(t, 1, len(compactorState.compactions))
assert.Equal(t, Submitted, compactorState.compactions[0].status)
}

func TestShouldUpdateDBStateWhenCompactionFinished(t *testing.T) {
_, _, state := buildTestState(t)
beforeCompaction := state.dbState.clone()
compaction := buildL0Compaction(beforeCompaction.l0, 0)
err := state.submitCompaction(compaction)
_, _, compactorState := buildTestState(t)
beforeCompaction := compactorState.dbState.Clone()
compaction := buildL0Compaction(beforeCompaction.L0, 0)
err := compactorState.submitCompaction(compaction)
assert.NoError(t, err)

sr := compaction2.SortedRun{
ID: 0,
SSTList: beforeCompaction.l0,
SSTList: beforeCompaction.L0,
}
state.finishCompaction(sr.Clone())
compactorState.finishCompaction(sr.Clone())

compactedID, _ := beforeCompaction.l0[0].Id.CompactedID().Get()
l0LastCompacted, _ := state.dbState.l0LastCompacted.Get()
compactedID, _ := beforeCompaction.L0[0].Id.CompactedID().Get()
l0LastCompacted, _ := compactorState.dbState.L0LastCompacted.Get()
assert.Equal(t, compactedID, l0LastCompacted)
assert.Equal(t, 0, len(state.dbState.l0))
assert.Equal(t, 1, len(state.dbState.compacted))
assert.Equal(t, sr.ID, state.dbState.compacted[0].ID)
compactedSR := state.dbState.compacted[0]
assert.Equal(t, 0, len(compactorState.dbState.L0))
assert.Equal(t, 1, len(compactorState.dbState.Compacted))
assert.Equal(t, sr.ID, compactorState.dbState.Compacted[0].ID)
compactedSR := compactorState.dbState.Compacted[0]
for i := 0; i < len(sr.SSTList); i++ {
assert.Equal(t, sr.SSTList[i].Id, compactedSR.SSTList[i].Id)
}
}

func TestShouldRemoveCompactionWhenCompactionFinished(t *testing.T) {
_, _, state := buildTestState(t)
beforeCompaction := state.dbState.clone()
compaction := buildL0Compaction(beforeCompaction.l0, 0)
err := state.submitCompaction(compaction)
_, _, compactorState := buildTestState(t)
beforeCompaction := compactorState.dbState.Clone()
compaction := buildL0Compaction(beforeCompaction.L0, 0)
err := compactorState.submitCompaction(compaction)
assert.NoError(t, err)

sr := compaction2.SortedRun{
ID: 0,
SSTList: beforeCompaction.l0,
SSTList: beforeCompaction.L0,
}
state.finishCompaction(sr.Clone())
compactorState.finishCompaction(sr.Clone())

assert.Equal(t, 0, len(state.compactions))
assert.Equal(t, 0, len(compactorState.compactions))
}

func TestShouldRefreshDBStateCorrectlyWhenNeverCompacted(t *testing.T) {
bucket, sm, state := buildTestState(t)
bucket, sm, compactorState := buildTestState(t)
option := DefaultDBOptions()
option.L0SSTSizeBytes = 128
db, err := OpenWithOptions(context.Background(), testPath, bucket, option)
Expand All @@ -74,25 +75,25 @@ func TestShouldRefreshDBStateCorrectlyWhenNeverCompacted(t *testing.T) {
db.Put(repeatedChar('a', 16), repeatedChar('b', 48))
db.Put(repeatedChar('j', 16), repeatedChar('k', 48))

writerDBState := waitForManifestWithL0Len(sm, len(state.dbState.l0)+1)
writerDBState := waitForManifestWithL0Len(sm, len(compactorState.dbState.L0)+1)

state.refreshDBState(writerDBState)
compactorState.refreshDBState(writerDBState)

assert.True(t, state.dbState.l0LastCompacted.IsAbsent())
for i := 0; i < len(writerDBState.l0); i++ {
assert.Equal(t, writerDBState.l0[i].Id.CompactedID(), state.dbState.l0[i].Id.CompactedID())
assert.True(t, compactorState.dbState.L0LastCompacted.IsAbsent())
for i := 0; i < len(writerDBState.L0); i++ {
assert.Equal(t, writerDBState.L0[i].Id.CompactedID(), compactorState.dbState.L0[i].Id.CompactedID())
}
}

func TestShouldRefreshDBStateCorrectly(t *testing.T) {
bucket, sm, state := buildTestState(t)
originalL0s := state.dbState.clone().l0
bucket, sm, compactorState := buildTestState(t)
originalL0s := compactorState.dbState.Clone().L0
compactedID, ok := originalL0s[len(originalL0s)-1].Id.CompactedID().Get()
assert.True(t, ok)
compaction := newCompaction([]SourceID{newSourceIDSST(compactedID)}, 0)
err := state.submitCompaction(compaction)
err := compactorState.submitCompaction(compaction)
assert.NoError(t, err)
state.finishCompaction(&compaction2.SortedRun{
compactorState.finishCompaction(&compaction2.SortedRun{
ID: 0,
SSTList: []sstable.Handle{originalL0s[len(originalL0s)-1]},
})
Expand All @@ -105,35 +106,35 @@ func TestShouldRefreshDBStateCorrectly(t *testing.T) {
db.Put(repeatedChar('a', 16), repeatedChar('b', 48))
db.Put(repeatedChar('j', 16), repeatedChar('k', 48))
writerDBState := waitForManifestWithL0Len(sm, len(originalL0s)+1)
dbStateBeforeMerge := state.dbState.clone()
dbStateBeforeMerge := compactorState.dbState.Clone()

state.refreshDBState(writerDBState)
compactorState.refreshDBState(writerDBState)

dbState := state.dbState
dbState := compactorState.dbState
// last sst was removed during compaction
expectedMergedL0s := originalL0s[:len(originalL0s)-1]
// new sst got added during db.Put() call above
expectedMergedL0s = append([]sstable.Handle{writerDBState.l0[0]}, expectedMergedL0s...)
expectedMergedL0s = append([]sstable.Handle{writerDBState.L0[0]}, expectedMergedL0s...)
for i := 0; i < len(expectedMergedL0s); i++ {
expected, _ := expectedMergedL0s[i].Id.CompactedID().Get()
actual, _ := dbState.l0[i].Id.CompactedID().Get()
actual, _ := dbState.L0[i].Id.CompactedID().Get()
assert.Equal(t, expected, actual)
}
for i := 0; i < len(dbStateBeforeMerge.compacted); i++ {
srBefore := dbStateBeforeMerge.compacted[i]
srAfter := dbState.compacted[i]
for i := 0; i < len(dbStateBeforeMerge.Compacted); i++ {
srBefore := dbStateBeforeMerge.Compacted[i]
srAfter := dbState.Compacted[i]
assert.Equal(t, srBefore.ID, srAfter.ID)
for j := 0; j < len(srBefore.SSTList); j++ {
assert.Equal(t, srBefore.SSTList[j].Id, srAfter.SSTList[j].Id)
}
}
assert.Equal(t, writerDBState.lastCompactedWalSSTID.Load(), dbState.lastCompactedWalSSTID.Load())
assert.Equal(t, writerDBState.nextWalSstID.Load(), dbState.nextWalSstID.Load())
assert.Equal(t, writerDBState.LastCompactedWalSSTID.Load(), dbState.LastCompactedWalSSTID.Load())
assert.Equal(t, writerDBState.NextWalSstID.Load(), dbState.NextWalSstID.Load())
}

func TestShouldRefreshDBStateCorrectlyWhenAllL0Compacted(t *testing.T) {
bucket, sm, state := buildTestState(t)
originalL0s := state.dbState.clone().l0
bucket, sm, compactorState := buildTestState(t)
originalL0s := compactorState.dbState.Clone().L0

sourceIDs := make([]SourceID, 0)
for _, sst := range originalL0s {
Expand All @@ -142,13 +143,13 @@ func TestShouldRefreshDBStateCorrectlyWhenAllL0Compacted(t *testing.T) {
sourceIDs = append(sourceIDs, newSourceIDSST(id))
}
compaction := newCompaction(sourceIDs, 0)
err := state.submitCompaction(compaction)
err := compactorState.submitCompaction(compaction)
assert.NoError(t, err)
state.finishCompaction(&compaction2.SortedRun{
compactorState.finishCompaction(&compaction2.SortedRun{
ID: 0,
SSTList: originalL0s,
})
assert.Equal(t, 0, len(state.dbState.l0))
assert.Equal(t, 0, len(compactorState.dbState.L0))

option := DefaultDBOptions()
option.L0SSTSizeBytes = 128
Expand All @@ -159,22 +160,22 @@ func TestShouldRefreshDBStateCorrectlyWhenAllL0Compacted(t *testing.T) {
db.Put(repeatedChar('j', 16), repeatedChar('k', 48))
writerDBState := waitForManifestWithL0Len(sm, len(originalL0s)+1)

state.refreshDBState(writerDBState)
compactorState.refreshDBState(writerDBState)

dbState := state.dbState
assert.Equal(t, 1, len(dbState.l0))
expectedID, _ := writerDBState.l0[0].Id.CompactedID().Get()
actualID, _ := dbState.l0[0].Id.CompactedID().Get()
dbState := compactorState.dbState
assert.Equal(t, 1, len(dbState.L0))
expectedID, _ := writerDBState.L0[0].Id.CompactedID().Get()
actualID, _ := dbState.L0[0].Id.CompactedID().Get()
assert.Equal(t, expectedID, actualID)
}

func waitForManifestWithL0Len(storedManifest StoredManifest, size int) *CoreDBState {
func waitForManifestWithL0Len(storedManifest StoredManifest, size int) *state.CoreStateSnapshot {
startTime := time.Now()
for time.Since(startTime) < time.Second*10 {
dbState, err := storedManifest.refresh()
assert2.True(err == nil, "")
if len(dbState.l0) == size {
return dbState.clone()
if len(dbState.L0) == size {
return dbState.Clone()
}
time.Sleep(time.Millisecond * 50)
}
Expand Down Expand Up @@ -214,6 +215,6 @@ func buildTestState(t *testing.T) (objstore.Bucket, StoredManifest, *CompactorSt
assert.True(t, err == nil, "Could not load stored manifest")
assert.True(t, sm.IsPresent(), "Could not find stored manifest")
storedManifest, _ := sm.Get()
state := newCompactorState(storedManifest.dbState(), nil)
return bucket, storedManifest, state
compactorState := newCompactorState(storedManifest.dbState(), nil)
return bucket, storedManifest, compactorState
}
Loading

0 comments on commit 7857292

Please sign in to comment.