Skip to content

Commit

Permalink
Merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
naveen246 committed Dec 31, 2024
2 parents b17e77c + 7857292 commit ed0aea3
Show file tree
Hide file tree
Showing 18 changed files with 415 additions and 351 deletions.
41 changes: 21 additions & 20 deletions slatedb/sortedrun.go → slatedb/compaction/sortedrun.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package slatedb
package compaction

import (
"bytes"
"context"
"github.com/slatedb/slatedb-go/internal/assert"
"github.com/slatedb/slatedb-go/internal/sstable"
"github.com/slatedb/slatedb-go/internal/types"
"github.com/slatedb/slatedb-go/slatedb/store"

"github.com/samber/mo"
"sort"
Expand All @@ -16,37 +17,37 @@ import (
// ------------------------------------------------

type SortedRun struct {
id uint32
sstList []sstable.Handle
ID uint32
SSTList []sstable.Handle
}

func (s *SortedRun) indexOfSSTWithKey(key []byte) mo.Option[int] {
index := sort.Search(len(s.sstList), func(i int) bool {
assert.True(len(s.sstList[i].Info.FirstKey) != 0, "sst must have first key")
return bytes.Compare(s.sstList[i].Info.FirstKey, key) > 0
index := sort.Search(len(s.SSTList), func(i int) bool {
assert.True(len(s.SSTList[i].Info.FirstKey) != 0, "sst must have first key")
return bytes.Compare(s.SSTList[i].Info.FirstKey, key) > 0
})
if index > 0 {
return mo.Some(index - 1)
}
return mo.None[int]()
}

func (s *SortedRun) sstWithKey(key []byte) mo.Option[sstable.Handle] {
func (s *SortedRun) SstWithKey(key []byte) mo.Option[sstable.Handle] {
index, ok := s.indexOfSSTWithKey(key).Get()
if ok {
return mo.Some(s.sstList[index])
return mo.Some(s.SSTList[index])
}
return mo.None[sstable.Handle]()
}

func (s *SortedRun) clone() *SortedRun {
sstList := make([]sstable.Handle, 0, len(s.sstList))
for _, sst := range s.sstList {
func (s *SortedRun) Clone() *SortedRun {
sstList := make([]sstable.Handle, 0, len(s.SSTList))
for _, sst := range s.SSTList {
sstList = append(sstList, *sst.Clone())
}
return &SortedRun{
id: s.id,
sstList: sstList,
ID: s.ID,
SSTList: sstList,
}
}

Expand All @@ -57,25 +58,25 @@ func (s *SortedRun) clone() *SortedRun {
type SortedRunIterator struct {
currentKVIter mo.Option[*sstable.Iterator]
sstListIter *SSTListIterator
tableStore *TableStore
tableStore *store.TableStore
warn types.ErrWarn
}

func newSortedRunIterator(sr SortedRun, store *TableStore) (*SortedRunIterator, error) {
return newSortedRunIter(sr.sstList, store, mo.None[[]byte]())
func NewSortedRunIterator(sr SortedRun, store *store.TableStore) (*SortedRunIterator, error) {
return newSortedRunIter(sr.SSTList, store, mo.None[[]byte]())
}

func newSortedRunIteratorFromKey(sr SortedRun, key []byte, store *TableStore) (*SortedRunIterator, error) {
sstList := sr.sstList
func NewSortedRunIteratorFromKey(sr SortedRun, key []byte, store *store.TableStore) (*SortedRunIterator, error) {
sstList := sr.SSTList
idx, ok := sr.indexOfSSTWithKey(key).Get()
if ok {
sstList = sr.sstList[idx:]
sstList = sr.SSTList[idx:]
}

return newSortedRunIter(sstList, store, mo.Some(key))
}

func newSortedRunIter(sstList []sstable.Handle, store *TableStore, fromKey mo.Option[[]byte]) (*SortedRunIterator, error) {
func newSortedRunIter(sstList []sstable.Handle, store *store.TableStore, fromKey mo.Option[[]byte]) (*SortedRunIterator, error) {

sstListIter := newSSTListIterator(sstList)
currentKVIter := mo.None[*sstable.Iterator]()
Expand Down
25 changes: 13 additions & 12 deletions slatedb/sortedrun_test.go → slatedb/compaction/sortedrun_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package slatedb
package compaction

import (
"context"
Expand All @@ -8,6 +8,7 @@ import (
"github.com/slatedb/slatedb-go/internal/sstable"
"github.com/slatedb/slatedb-go/internal/types"
"github.com/slatedb/slatedb-go/slatedb/common"
"github.com/slatedb/slatedb-go/slatedb/store"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
Expand All @@ -17,7 +18,7 @@ import (
func buildSRWithSSTs(
n uint64,
keysPerSST uint64,
tableStore *TableStore,
tableStore *store.TableStore,
keyGen common.OrderedBytesGenerator,
valGen common.OrderedBytesGenerator,
) (SortedRun, error) {
Expand All @@ -42,7 +43,7 @@ func TestOneSstSRIter(t *testing.T) {
bucket := objstore.NewInMemBucket()
conf := sstable.DefaultConfig()
conf.MinFilterKeys = 3
tableStore := NewTableStore(bucket, conf, "")
tableStore := store.NewTableStore(bucket, conf, "")

builder := tableStore.TableBuilder()
require.NoError(t, builder.AddValue([]byte("key1"), []byte("value1")))
Expand All @@ -55,7 +56,7 @@ func TestOneSstSRIter(t *testing.T) {
assert.NoError(t, err)

sr := SortedRun{0, []sstable.Handle{*sstHandle}}
iterator, err := newSortedRunIterator(sr, tableStore)
iterator, err := NewSortedRunIterator(sr, tableStore)
assert.NoError(t, err)
assert2.Next(t, iterator, []byte("key1"), []byte("value1"))
assert2.Next(t, iterator, []byte("key2"), []byte("value2"))
Expand All @@ -70,7 +71,7 @@ func TestManySstSRIter(t *testing.T) {
bucket := objstore.NewInMemBucket()
format := sstable.DefaultConfig()
format.MinFilterKeys = 3
tableStore := NewTableStore(bucket, format, "")
tableStore := store.NewTableStore(bucket, format, "")

builder := tableStore.TableBuilder()
require.NoError(t, builder.AddValue([]byte("key1"), []byte("value1")))
Expand All @@ -91,7 +92,7 @@ func TestManySstSRIter(t *testing.T) {
require.NoError(t, err)

sr := SortedRun{0, []sstable.Handle{*sstHandle, *sstHandle2}}
iterator, err := newSortedRunIterator(sr, tableStore)
iterator, err := NewSortedRunIterator(sr, tableStore)
assert.NoError(t, err)
assert2.Next(t, iterator, []byte("key1"), []byte("value1"))
assert2.Next(t, iterator, []byte("key2"), []byte("value2"))
Expand All @@ -106,7 +107,7 @@ func TestSRIterFromKey(t *testing.T) {
bucket := objstore.NewInMemBucket()
conf := sstable.DefaultConfig()
conf.MinFilterKeys = 3
tableStore := NewTableStore(bucket, conf, "")
tableStore := store.NewTableStore(bucket, conf, "")

firstKey := []byte("aaaaaaaaaaaaaaaa")
keyGen := common.NewOrderedBytesGeneratorWithByteRange(firstKey, byte('a'), byte('z'))
Expand All @@ -125,7 +126,7 @@ func TestSRIterFromKey(t *testing.T) {
fromKey := testCaseKeyGen.Next()
testCaseValGen.Next()

kvIter, err := newSortedRunIteratorFromKey(sr, fromKey, tableStore)
kvIter, err := NewSortedRunIteratorFromKey(sr, fromKey, tableStore)
assert.NoError(t, err)

for j := 0; j < 30-i; j++ {
Expand All @@ -141,7 +142,7 @@ func TestSRIterFromKeyLowerThanRange(t *testing.T) {
bucket := objstore.NewInMemBucket()
conf := sstable.DefaultConfig()
conf.MinFilterKeys = 3
tableStore := NewTableStore(bucket, conf, "")
tableStore := store.NewTableStore(bucket, conf, "")

firstKey := []byte("aaaaaaaaaaaaaaaa")
keyGen := common.NewOrderedBytesGeneratorWithByteRange(firstKey, byte('a'), byte('z'))
Expand All @@ -154,7 +155,7 @@ func TestSRIterFromKeyLowerThanRange(t *testing.T) {
sr, err := buildSRWithSSTs(3, 10, tableStore, keyGen, valGen)
require.NoError(t, err)

kvIter, err := newSortedRunIteratorFromKey(sr, []byte("aaaaaaaaaa"), tableStore)
kvIter, err := NewSortedRunIteratorFromKey(sr, []byte("aaaaaaaaaa"), tableStore)
assert.NoError(t, err)

for j := 0; j < 30; j++ {
Expand All @@ -169,7 +170,7 @@ func TestSRIterFromKeyHigherThanRange(t *testing.T) {
bucket := objstore.NewInMemBucket()
conf := sstable.DefaultConfig()
conf.MinFilterKeys = 3
tableStore := NewTableStore(bucket, conf, "")
tableStore := store.NewTableStore(bucket, conf, "")

firstKey := []byte("aaaaaaaaaaaaaaaa")
keyGen := common.NewOrderedBytesGeneratorWithByteRange(firstKey, byte('a'), byte('z'))
Expand All @@ -180,7 +181,7 @@ func TestSRIterFromKeyHigherThanRange(t *testing.T) {
sr, err := buildSRWithSSTs(3, 10, tableStore, keyGen, valGen)
require.NoError(t, err)

kvIter, err := newSortedRunIteratorFromKey(sr, []byte("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"), tableStore)
kvIter, err := NewSortedRunIteratorFromKey(sr, []byte("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"), tableStore)
assert.NoError(t, err)
next, ok := kvIter.Next(context.Background())
assert.False(t, ok)
Expand Down
46 changes: 24 additions & 22 deletions slatedb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/slatedb/slatedb-go/internal/assert"
"github.com/slatedb/slatedb-go/internal/sstable"
"github.com/slatedb/slatedb-go/internal/types"
compaction2 "github.com/slatedb/slatedb-go/slatedb/compaction"
"github.com/slatedb/slatedb-go/slatedb/store"
"log/slog"
"math"
"sync"
Expand All @@ -28,7 +30,7 @@ const (
)

type WorkerToOrchestratorMsg struct {
CompactionResult *SortedRun
CompactionResult *compaction2.SortedRun
CompactionError error
}

Expand All @@ -46,7 +48,7 @@ type Compactor struct {
compactorWG *sync.WaitGroup
}

func newCompactor(manifestStore *ManifestStore, tableStore *TableStore, opts DBOptions) (*Compactor, error) {
func newCompactor(manifestStore *ManifestStore, tableStore *store.TableStore, opts DBOptions) (*Compactor, error) {
compactorMsgCh := make(chan CompactorMainMsg, math.MaxUint8)

compactorWG, errCh := spawnAndRunCompactorOrchestrator(manifestStore, tableStore, opts, compactorMsgCh)
Expand All @@ -71,7 +73,7 @@ func (c *Compactor) close() {

func spawnAndRunCompactorOrchestrator(
manifestStore *ManifestStore,
tableStore *TableStore,
tableStore *store.TableStore,
opts DBOptions,
compactorMsgCh <-chan CompactorMainMsg,
) (*sync.WaitGroup, chan error) {
Expand Down Expand Up @@ -139,7 +141,7 @@ type CompactorOrchestrator struct {
func newCompactorOrchestrator(
opts DBOptions,
manifestStore *ManifestStore,
tableStore *TableStore,
tableStore *store.TableStore,
compactorMsgCh <-chan CompactorMainMsg,
) (*CompactorOrchestrator, error) {
sm, err := loadStoredManifest(manifestStore)
Expand Down Expand Up @@ -183,7 +185,7 @@ func loadState(manifest *FenceableManifest) (*CompactorState, error) {
if err != nil {
return nil, err
}
return newCompactorState(dbState.clone(), nil), nil
return newCompactorState(dbState.Clone(), nil), nil
}

func loadCompactionScheduler() CompactionScheduler {
Expand Down Expand Up @@ -232,22 +234,22 @@ func (o *CompactorOrchestrator) startCompaction(compaction Compaction) {
dbState := o.state.dbState

sstsByID := make(map[ulid.ULID]sstable.Handle)
for _, sst := range dbState.l0 {
for _, sst := range dbState.L0 {
id, ok := sst.Id.CompactedID().Get()
assert.True(ok, "expected valid compacted ID")
sstsByID[id] = sst
}
for _, sr := range dbState.compacted {
for _, sst := range sr.sstList {
for _, sr := range dbState.Compacted {
for _, sst := range sr.SSTList {
id, ok := sst.Id.CompactedID().Get()
assert.True(ok, "expected valid compacted ID")
sstsByID[id] = sst
}
}

srsByID := make(map[uint32]SortedRun)
for _, sr := range dbState.compacted {
srsByID[sr.id] = sr
srsByID := make(map[uint32]compaction2.SortedRun)
for _, sr := range dbState.Compacted {
srsByID[sr.ID] = sr
}

ssts := make([]sstable.Handle, 0)
Expand All @@ -258,7 +260,7 @@ func (o *CompactorOrchestrator) startCompaction(compaction Compaction) {
}
}

sortedRuns := make([]SortedRun, 0)
sortedRuns := make([]compaction2.SortedRun, 0)
for _, sID := range compaction.sources {
srID, ok := sID.sortedRunID().Get()
if ok {
Expand All @@ -273,7 +275,7 @@ func (o *CompactorOrchestrator) startCompaction(compaction Compaction) {
})
}

func (o *CompactorOrchestrator) finishCompaction(outputSR *SortedRun) error {
func (o *CompactorOrchestrator) finishCompaction(outputSR *compaction2.SortedRun) error {
o.state.finishCompaction(outputSR)
o.logCompactionState()
err := o.writeManifest()
Expand All @@ -295,7 +297,7 @@ func (o *CompactorOrchestrator) writeManifest() error {
return err
}

core := o.state.dbState.clone()
core := o.state.dbState.Clone()
err = o.manifest.updateDBState(core)
if errors.Is(err, common.ErrManifestVersionExists) {
o.log.Warn("conflicting manifest version. retry write", "error", err)
Expand Down Expand Up @@ -329,12 +331,12 @@ func (o *CompactorOrchestrator) logCompactionState() {
type CompactionJob struct {
destination uint32
sstList []sstable.Handle
sortedRuns []SortedRun
sortedRuns []compaction2.SortedRun
}

type CompactionExecutor struct {
options *CompactorOptions
tableStore *TableStore
tableStore *store.TableStore

workerCh chan<- WorkerToOrchestratorMsg
tasksWG sync.WaitGroup
Expand All @@ -345,7 +347,7 @@ type CompactionExecutor struct {
func newCompactorExecutor(
options *CompactorOptions,
workerCh chan<- WorkerToOrchestratorMsg,
tableStore *TableStore,
tableStore *store.TableStore,
) *CompactionExecutor {
return &CompactionExecutor{
options: options,
Expand Down Expand Up @@ -374,7 +376,7 @@ func (e *CompactionExecutor) loadIterators(compaction CompactionJob) (iter.KVIte

srIters := make([]iter.KVIterator, 0)
for _, sr := range compaction.sortedRuns {
srIter, err := newSortedRunIterator(sr, e.tableStore.Clone())
srIter, err := compaction2.NewSortedRunIterator(sr, e.tableStore.Clone())
if err != nil {
return nil, err
}
Expand All @@ -395,7 +397,7 @@ func (e *CompactionExecutor) loadIterators(compaction CompactionJob) (iter.KVIte
return it, nil
}

func (e *CompactionExecutor) executeCompaction(compaction CompactionJob) (*SortedRun, error) {
func (e *CompactionExecutor) executeCompaction(compaction CompactionJob) (*compaction2.SortedRun, error) {
allIter, err := e.loadIterators(compaction)
if err != nil {
return nil, err
Expand Down Expand Up @@ -444,9 +446,9 @@ func (e *CompactionExecutor) executeCompaction(compaction CompactionJob) (*Sorte
}
outputSSTs = append(outputSSTs, *sst)
}
return &SortedRun{
id: compaction.destination,
sstList: outputSSTs,
return &compaction2.SortedRun{
ID: compaction.destination,
SSTList: outputSSTs,
}, warn.If()
}

Expand Down
Loading

0 comments on commit ed0aea3

Please sign in to comment.