Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move sortedrun under package #56

Merged
merged 3 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 21 additions & 20 deletions slatedb/sortedrun.go → slatedb/compaction/sortedrun.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package slatedb
package compaction

import (
"bytes"
"context"
"github.com/slatedb/slatedb-go/internal/assert"
"github.com/slatedb/slatedb-go/internal/sstable"
"github.com/slatedb/slatedb-go/internal/types"
"github.com/slatedb/slatedb-go/slatedb/store"

"github.com/samber/mo"
"sort"
Expand All @@ -16,37 +17,37 @@ import (
// ------------------------------------------------

type SortedRun struct {
id uint32
sstList []sstable.Handle
ID uint32
SSTList []sstable.Handle
}

func (s *SortedRun) indexOfSSTWithKey(key []byte) mo.Option[int] {
index := sort.Search(len(s.sstList), func(i int) bool {
assert.True(len(s.sstList[i].Info.FirstKey) != 0, "sst must have first key")
return bytes.Compare(s.sstList[i].Info.FirstKey, key) > 0
index := sort.Search(len(s.SSTList), func(i int) bool {
assert.True(len(s.SSTList[i].Info.FirstKey) != 0, "sst must have first key")
return bytes.Compare(s.SSTList[i].Info.FirstKey, key) > 0
})
if index > 0 {
return mo.Some(index - 1)
}
return mo.None[int]()
}

func (s *SortedRun) sstWithKey(key []byte) mo.Option[sstable.Handle] {
func (s *SortedRun) SstWithKey(key []byte) mo.Option[sstable.Handle] {
index, ok := s.indexOfSSTWithKey(key).Get()
if ok {
return mo.Some(s.sstList[index])
return mo.Some(s.SSTList[index])
}
return mo.None[sstable.Handle]()
}

func (s *SortedRun) clone() *SortedRun {
sstList := make([]sstable.Handle, 0, len(s.sstList))
for _, sst := range s.sstList {
func (s *SortedRun) Clone() *SortedRun {
sstList := make([]sstable.Handle, 0, len(s.SSTList))
for _, sst := range s.SSTList {
sstList = append(sstList, *sst.Clone())
}
return &SortedRun{
id: s.id,
sstList: sstList,
ID: s.ID,
SSTList: sstList,
}
}

Expand All @@ -57,25 +58,25 @@ func (s *SortedRun) clone() *SortedRun {
type SortedRunIterator struct {
currentKVIter mo.Option[*sstable.Iterator]
sstListIter *SSTListIterator
tableStore *TableStore
tableStore *store.TableStore
warn types.ErrWarn
}

func newSortedRunIterator(sr SortedRun, store *TableStore, ) (*SortedRunIterator, error) {
return newSortedRunIter(sr.sstList, store, mo.None[[]byte]())
func NewSortedRunIterator(sr SortedRun, store *store.TableStore) (*SortedRunIterator, error) {
return newSortedRunIter(sr.SSTList, store, mo.None[[]byte]())
}

func newSortedRunIteratorFromKey(sr SortedRun, key []byte, store *TableStore) (*SortedRunIterator, error) {
sstList := sr.sstList
func NewSortedRunIteratorFromKey(sr SortedRun, key []byte, store *store.TableStore) (*SortedRunIterator, error) {
sstList := sr.SSTList
idx, ok := sr.indexOfSSTWithKey(key).Get()
if ok {
sstList = sr.sstList[idx:]
sstList = sr.SSTList[idx:]
}

return newSortedRunIter(sstList, store, mo.Some(key))
}

func newSortedRunIter(sstList []sstable.Handle, store *TableStore, fromKey mo.Option[[]byte]) (*SortedRunIterator, error) {
func newSortedRunIter(sstList []sstable.Handle, store *store.TableStore, fromKey mo.Option[[]byte]) (*SortedRunIterator, error) {

sstListIter := newSSTListIterator(sstList)
currentKVIter := mo.None[*sstable.Iterator]()
Expand Down
25 changes: 13 additions & 12 deletions slatedb/sortedrun_test.go → slatedb/compaction/sortedrun_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package slatedb
package compaction

import (
"context"
Expand All @@ -8,6 +8,7 @@ import (
"github.com/slatedb/slatedb-go/internal/sstable"
"github.com/slatedb/slatedb-go/internal/types"
"github.com/slatedb/slatedb-go/slatedb/common"
"github.com/slatedb/slatedb-go/slatedb/store"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
Expand All @@ -17,7 +18,7 @@ import (
func buildSRWithSSTs(
n uint64,
keysPerSST uint64,
tableStore *TableStore,
tableStore *store.TableStore,
keyGen common.OrderedBytesGenerator,
valGen common.OrderedBytesGenerator,
) (SortedRun, error) {
Expand All @@ -42,7 +43,7 @@ func TestOneSstSRIter(t *testing.T) {
bucket := objstore.NewInMemBucket()
conf := sstable.DefaultConfig()
conf.MinFilterKeys = 3
tableStore := NewTableStore(bucket, conf, "")
tableStore := store.NewTableStore(bucket, conf, "")

builder := tableStore.TableBuilder()
require.NoError(t, builder.AddValue([]byte("key1"), []byte("value1")))
Expand All @@ -55,7 +56,7 @@ func TestOneSstSRIter(t *testing.T) {
assert.NoError(t, err)

sr := SortedRun{0, []sstable.Handle{*sstHandle}}
iterator, err := newSortedRunIterator(sr, tableStore)
iterator, err := NewSortedRunIterator(sr, tableStore)
assert.NoError(t, err)
assert2.Next(t, iterator, []byte("key1"), []byte("value1"))
assert2.Next(t, iterator, []byte("key2"), []byte("value2"))
Expand All @@ -70,7 +71,7 @@ func TestManySstSRIter(t *testing.T) {
bucket := objstore.NewInMemBucket()
format := sstable.DefaultConfig()
format.MinFilterKeys = 3
tableStore := NewTableStore(bucket, format, "")
tableStore := store.NewTableStore(bucket, format, "")

builder := tableStore.TableBuilder()
require.NoError(t, builder.AddValue([]byte("key1"), []byte("value1")))
Expand All @@ -91,7 +92,7 @@ func TestManySstSRIter(t *testing.T) {
require.NoError(t, err)

sr := SortedRun{0, []sstable.Handle{*sstHandle, *sstHandle2}}
iterator, err := newSortedRunIterator(sr, tableStore)
iterator, err := NewSortedRunIterator(sr, tableStore)
assert.NoError(t, err)
assert2.Next(t, iterator, []byte("key1"), []byte("value1"))
assert2.Next(t, iterator, []byte("key2"), []byte("value2"))
Expand All @@ -106,7 +107,7 @@ func TestSRIterFromKey(t *testing.T) {
bucket := objstore.NewInMemBucket()
conf := sstable.DefaultConfig()
conf.MinFilterKeys = 3
tableStore := NewTableStore(bucket, conf, "")
tableStore := store.NewTableStore(bucket, conf, "")

firstKey := []byte("aaaaaaaaaaaaaaaa")
keyGen := common.NewOrderedBytesGeneratorWithByteRange(firstKey, byte('a'), byte('z'))
Expand All @@ -125,7 +126,7 @@ func TestSRIterFromKey(t *testing.T) {
fromKey := testCaseKeyGen.Next()
testCaseValGen.Next()

kvIter, err := newSortedRunIteratorFromKey(sr, fromKey, tableStore)
kvIter, err := NewSortedRunIteratorFromKey(sr, fromKey, tableStore)
assert.NoError(t, err)

for j := 0; j < 30-i; j++ {
Expand All @@ -141,7 +142,7 @@ func TestSRIterFromKeyLowerThanRange(t *testing.T) {
bucket := objstore.NewInMemBucket()
conf := sstable.DefaultConfig()
conf.MinFilterKeys = 3
tableStore := NewTableStore(bucket, conf, "")
tableStore := store.NewTableStore(bucket, conf, "")

firstKey := []byte("aaaaaaaaaaaaaaaa")
keyGen := common.NewOrderedBytesGeneratorWithByteRange(firstKey, byte('a'), byte('z'))
Expand All @@ -154,7 +155,7 @@ func TestSRIterFromKeyLowerThanRange(t *testing.T) {
sr, err := buildSRWithSSTs(3, 10, tableStore, keyGen, valGen)
require.NoError(t, err)

kvIter, err := newSortedRunIteratorFromKey(sr, []byte("aaaaaaaaaa"), tableStore)
kvIter, err := NewSortedRunIteratorFromKey(sr, []byte("aaaaaaaaaa"), tableStore)
assert.NoError(t, err)

for j := 0; j < 30; j++ {
Expand All @@ -169,7 +170,7 @@ func TestSRIterFromKeyHigherThanRange(t *testing.T) {
bucket := objstore.NewInMemBucket()
conf := sstable.DefaultConfig()
conf.MinFilterKeys = 3
tableStore := NewTableStore(bucket, conf, "")
tableStore := store.NewTableStore(bucket, conf, "")

firstKey := []byte("aaaaaaaaaaaaaaaa")
keyGen := common.NewOrderedBytesGeneratorWithByteRange(firstKey, byte('a'), byte('z'))
Expand All @@ -180,7 +181,7 @@ func TestSRIterFromKeyHigherThanRange(t *testing.T) {
sr, err := buildSRWithSSTs(3, 10, tableStore, keyGen, valGen)
require.NoError(t, err)

kvIter, err := newSortedRunIteratorFromKey(sr, []byte("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"), tableStore)
kvIter, err := NewSortedRunIteratorFromKey(sr, []byte("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"), tableStore)
assert.NoError(t, err)
next, ok := kvIter.Next(context.Background())
assert.False(t, ok)
Expand Down
36 changes: 19 additions & 17 deletions slatedb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/slatedb/slatedb-go/internal/assert"
"github.com/slatedb/slatedb-go/internal/sstable"
"github.com/slatedb/slatedb-go/internal/types"
compaction2 "github.com/slatedb/slatedb-go/slatedb/compaction"
"github.com/slatedb/slatedb-go/slatedb/store"
"log/slog"
"math"
"sync"
Expand All @@ -28,7 +30,7 @@ const (
)

type WorkerToOrchestratorMsg struct {
CompactionResult *SortedRun
CompactionResult *compaction2.SortedRun
CompactionError error
}

Expand All @@ -46,7 +48,7 @@ type Compactor struct {
compactorWG *sync.WaitGroup
}

func newCompactor(manifestStore *ManifestStore, tableStore *TableStore, opts DBOptions) (*Compactor, error) {
func newCompactor(manifestStore *ManifestStore, tableStore *store.TableStore, opts DBOptions) (*Compactor, error) {
compactorMsgCh := make(chan CompactorMainMsg, math.MaxUint8)

compactorWG, errCh := spawnAndRunCompactorOrchestrator(manifestStore, tableStore, opts, compactorMsgCh)
Expand All @@ -71,7 +73,7 @@ func (c *Compactor) close() {

func spawnAndRunCompactorOrchestrator(
manifestStore *ManifestStore,
tableStore *TableStore,
tableStore *store.TableStore,
opts DBOptions,
compactorMsgCh <-chan CompactorMainMsg,
) (*sync.WaitGroup, chan error) {
Expand Down Expand Up @@ -139,7 +141,7 @@ type CompactorOrchestrator struct {
func newCompactorOrchestrator(
opts DBOptions,
manifestStore *ManifestStore,
tableStore *TableStore,
tableStore *store.TableStore,
compactorMsgCh <-chan CompactorMainMsg,
) (*CompactorOrchestrator, error) {
sm, err := loadStoredManifest(manifestStore)
Expand Down Expand Up @@ -238,16 +240,16 @@ func (o *CompactorOrchestrator) startCompaction(compaction Compaction) {
sstsByID[id] = sst
}
for _, sr := range dbState.compacted {
for _, sst := range sr.sstList {
for _, sst := range sr.SSTList {
id, ok := sst.Id.CompactedID().Get()
assert.True(ok, "expected valid compacted ID")
sstsByID[id] = sst
}
}

srsByID := make(map[uint32]SortedRun)
srsByID := make(map[uint32]compaction2.SortedRun)
for _, sr := range dbState.compacted {
srsByID[sr.id] = sr
srsByID[sr.ID] = sr
}

ssts := make([]sstable.Handle, 0)
Expand All @@ -258,7 +260,7 @@ func (o *CompactorOrchestrator) startCompaction(compaction Compaction) {
}
}

sortedRuns := make([]SortedRun, 0)
sortedRuns := make([]compaction2.SortedRun, 0)
for _, sID := range compaction.sources {
srID, ok := sID.sortedRunID().Get()
if ok {
Expand All @@ -273,7 +275,7 @@ func (o *CompactorOrchestrator) startCompaction(compaction Compaction) {
})
}

func (o *CompactorOrchestrator) finishCompaction(outputSR *SortedRun) error {
func (o *CompactorOrchestrator) finishCompaction(outputSR *compaction2.SortedRun) error {
o.state.finishCompaction(outputSR)
o.logCompactionState()
err := o.writeManifest()
Expand Down Expand Up @@ -329,12 +331,12 @@ func (o *CompactorOrchestrator) logCompactionState() {
type CompactionJob struct {
destination uint32
sstList []sstable.Handle
sortedRuns []SortedRun
sortedRuns []compaction2.SortedRun
}

type CompactionExecutor struct {
options *CompactorOptions
tableStore *TableStore
tableStore *store.TableStore

workerCh chan<- WorkerToOrchestratorMsg
tasksWG sync.WaitGroup
Expand All @@ -345,7 +347,7 @@ type CompactionExecutor struct {
func newCompactorExecutor(
options *CompactorOptions,
workerCh chan<- WorkerToOrchestratorMsg,
tableStore *TableStore,
tableStore *store.TableStore,
) *CompactionExecutor {
return &CompactionExecutor{
options: options,
Expand Down Expand Up @@ -374,7 +376,7 @@ func (e *CompactionExecutor) loadIterators(compaction CompactionJob) (iter.KVIte

srIters := make([]iter.KVIterator, 0)
for _, sr := range compaction.sortedRuns {
srIter, err := newSortedRunIterator(sr, e.tableStore.Clone())
srIter, err := compaction2.NewSortedRunIterator(sr, e.tableStore.Clone())
if err != nil {
return nil, err
}
Expand All @@ -395,7 +397,7 @@ func (e *CompactionExecutor) loadIterators(compaction CompactionJob) (iter.KVIte
return it, nil
}

func (e *CompactionExecutor) executeCompaction(compaction CompactionJob) (*SortedRun, error) {
func (e *CompactionExecutor) executeCompaction(compaction CompactionJob) (*compaction2.SortedRun, error) {
allIter, err := e.loadIterators(compaction)
if err != nil {
return nil, err
Expand Down Expand Up @@ -444,9 +446,9 @@ func (e *CompactionExecutor) executeCompaction(compaction CompactionJob) (*Sorte
}
outputSSTs = append(outputSSTs, *sst)
}
return &SortedRun{
id: compaction.destination,
sstList: outputSSTs,
return &compaction2.SortedRun{
ID: compaction.destination,
SSTList: outputSSTs,
}, warn.If()
}

Expand Down
Loading
Loading