Skip to content

Commit

Permalink
Move files into packages (#59)
Browse files Browse the repository at this point in the history
* Move files into packages
  • Loading branch information
naveen246 authored Jan 6, 2025
1 parent 2819607 commit 641782f
Show file tree
Hide file tree
Showing 16 changed files with 230 additions and 223 deletions.
12 changes: 5 additions & 7 deletions slatedb/compaction/sortedrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ package compaction
import (
"bytes"
"context"
"github.com/samber/mo"
"github.com/slatedb/slatedb-go/internal/assert"
"github.com/slatedb/slatedb-go/internal/sstable"
"github.com/slatedb/slatedb-go/internal/types"
"github.com/slatedb/slatedb-go/slatedb/store"

"github.com/samber/mo"
"sort"
)

Expand Down Expand Up @@ -58,15 +56,15 @@ func (s *SortedRun) Clone() *SortedRun {
type SortedRunIterator struct {
currentKVIter mo.Option[*sstable.Iterator]
sstListIter *SSTListIterator
tableStore *store.TableStore
tableStore sstable.TableStore
warn types.ErrWarn
}

func NewSortedRunIterator(sr SortedRun, store *store.TableStore) (*SortedRunIterator, error) {
func NewSortedRunIterator(sr SortedRun, store sstable.TableStore) (*SortedRunIterator, error) {
return newSortedRunIter(sr.SSTList, store, mo.None[[]byte]())
}

func NewSortedRunIteratorFromKey(sr SortedRun, key []byte, store *store.TableStore) (*SortedRunIterator, error) {
func NewSortedRunIteratorFromKey(sr SortedRun, key []byte, store sstable.TableStore) (*SortedRunIterator, error) {
sstList := sr.SSTList
idx, ok := sr.indexOfSSTWithKey(key).Get()
if ok {
Expand All @@ -76,7 +74,7 @@ func NewSortedRunIteratorFromKey(sr SortedRun, key []byte, store *store.TableSto
return newSortedRunIter(sstList, store, mo.Some(key))
}

func newSortedRunIter(sstList []sstable.Handle, store *store.TableStore, fromKey mo.Option[[]byte]) (*SortedRunIterator, error) {
func newSortedRunIter(sstList []sstable.Handle, store sstable.TableStore, fromKey mo.Option[[]byte]) (*SortedRunIterator, error) {

sstListIter := newSSTListIterator(sstList)
currentKVIter := mo.None[*sstable.Iterator]()
Expand Down
35 changes: 18 additions & 17 deletions slatedb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/slatedb/slatedb-go/internal/types"
"github.com/slatedb/slatedb-go/slatedb/common"
compaction2 "github.com/slatedb/slatedb-go/slatedb/compaction"
"github.com/slatedb/slatedb-go/slatedb/config"
"github.com/slatedb/slatedb-go/slatedb/store"
"log/slog"
"sync"
Expand Down Expand Up @@ -39,7 +40,7 @@ type Compactor struct {
orchestrator *CompactionOrchestrator
}

func newCompactor(manifestStore *ManifestStore, tableStore *store.TableStore, opts DBOptions) (*Compactor, error) {
func newCompactor(manifestStore *store.ManifestStore, tableStore *store.TableStore, opts config.DBOptions) (*Compactor, error) {
orchestrator, err := spawnAndRunCompactionOrchestrator(manifestStore, tableStore, opts)
if err != nil {
return nil, err
Expand All @@ -59,9 +60,9 @@ func (c *Compactor) close() {
// ------------------------------------------------

func spawnAndRunCompactionOrchestrator(
manifestStore *ManifestStore,
manifestStore *store.ManifestStore,
tableStore *store.TableStore,
opts DBOptions,
opts config.DBOptions,
) (*CompactionOrchestrator, error) {
orchestrator, err := newCompactionOrchestrator(opts, manifestStore, tableStore)
if err != nil {
Expand All @@ -73,8 +74,8 @@ func spawnAndRunCompactionOrchestrator(
}

type CompactionOrchestrator struct {
options *CompactorOptions
manifest *FenceableManifest
options *config.CompactorOptions
manifest *store.FenceableManifest
state *CompactorState
scheduler CompactionScheduler
executor *CompactionExecutor
Expand All @@ -87,11 +88,11 @@ type CompactionOrchestrator struct {
}

func newCompactionOrchestrator(
opts DBOptions,
manifestStore *ManifestStore,
opts config.DBOptions,
manifestStore *store.ManifestStore,
tableStore *store.TableStore,
) (*CompactionOrchestrator, error) {
sm, err := loadStoredManifest(manifestStore)
sm, err := store.LoadStoredManifest(manifestStore)
if err != nil {
return nil, err
}
Expand All @@ -100,7 +101,7 @@ func newCompactionOrchestrator(
}
storedManifest, _ := sm.Get()

manifest, err := initFenceableManifestCompactor(&storedManifest)
manifest, err := store.NewCompactorFenceableManifest(&storedManifest)
if err != nil {
return nil, err
}
Expand All @@ -125,8 +126,8 @@ func newCompactionOrchestrator(
return &o, nil
}

func loadState(manifest *FenceableManifest) (*CompactorState, error) {
dbState, err := manifest.dbState()
func loadState(manifest *store.FenceableManifest) (*CompactorState, error) {
dbState, err := manifest.DbState()
if err != nil {
return nil, err
}
Expand All @@ -137,7 +138,7 @@ func loadCompactionScheduler() CompactionScheduler {
return SizeTieredCompactionScheduler{}
}

func (o *CompactionOrchestrator) spawnLoop(opts DBOptions) {
func (o *CompactionOrchestrator) spawnLoop(opts config.DBOptions) {
o.waitGroup.Add(1)
go func() {
defer o.waitGroup.Done()
Expand Down Expand Up @@ -172,7 +173,7 @@ func (o *CompactionOrchestrator) shutdown() {
}

func (o *CompactionOrchestrator) loadManifest() error {
_, err := o.manifest.refresh()
_, err := o.manifest.Refresh()
if err != nil {
return err
}
Expand All @@ -184,7 +185,7 @@ func (o *CompactionOrchestrator) loadManifest() error {
}

func (o *CompactionOrchestrator) refreshDBState() error {
state, err := o.manifest.dbState()
state, err := o.manifest.DbState()
if err != nil {
return err
}
Expand Down Expand Up @@ -290,7 +291,7 @@ func (o *CompactionOrchestrator) writeManifest() error {
}

core := o.state.dbState.Clone()
err = o.manifest.updateDBState(core)
err = o.manifest.UpdateDBState(core)
if errors.Is(err, common.ErrManifestVersionExists) {
o.log.Warn("conflicting manifest version. retry write", "error", err)
continue
Expand Down Expand Up @@ -327,7 +328,7 @@ type CompactionJob struct {
}

type CompactionExecutor struct {
options *CompactorOptions
options *config.CompactorOptions
tableStore *store.TableStore

resultCh chan CompactionResult
Expand All @@ -336,7 +337,7 @@ type CompactionExecutor struct {
}

func newCompactorExecutor(
options *CompactorOptions,
options *config.CompactorOptions,
tableStore *store.TableStore,
) *CompactionExecutor {
return &CompactionExecutor{
Expand Down
22 changes: 12 additions & 10 deletions slatedb/compactor_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
assert2 "github.com/slatedb/slatedb-go/internal/assert"
"github.com/slatedb/slatedb-go/internal/sstable"
compaction2 "github.com/slatedb/slatedb-go/slatedb/compaction"
"github.com/slatedb/slatedb-go/slatedb/config"
"github.com/slatedb/slatedb-go/slatedb/state"
"github.com/slatedb/slatedb-go/slatedb/store"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
Expand Down Expand Up @@ -67,7 +69,7 @@ func TestShouldRemoveCompactionWhenCompactionFinished(t *testing.T) {

func TestShouldRefreshDBStateCorrectlyWhenNeverCompacted(t *testing.T) {
bucket, sm, compactorState := buildTestState(t)
option := DefaultDBOptions()
option := config.DefaultDBOptions()
option.L0SSTSizeBytes = 128
db, err := OpenWithOptions(context.Background(), testPath, bucket, option)
assert.NoError(t, err)
Expand Down Expand Up @@ -98,7 +100,7 @@ func TestShouldRefreshDBStateCorrectly(t *testing.T) {
SSTList: []sstable.Handle{originalL0s[len(originalL0s)-1]},
})

option := DefaultDBOptions()
option := config.DefaultDBOptions()
option.L0SSTSizeBytes = 128
db, err := OpenWithOptions(context.Background(), testPath, bucket, option)
assert.NoError(t, err)
Expand Down Expand Up @@ -151,7 +153,7 @@ func TestShouldRefreshDBStateCorrectlyWhenAllL0Compacted(t *testing.T) {
})
assert.Equal(t, 0, len(compactorState.dbState.L0))

option := DefaultDBOptions()
option := config.DefaultDBOptions()
option.L0SSTSizeBytes = 128
db, err := OpenWithOptions(context.Background(), testPath, bucket, option)
assert.NoError(t, err)
Expand All @@ -169,10 +171,10 @@ func TestShouldRefreshDBStateCorrectlyWhenAllL0Compacted(t *testing.T) {
assert.Equal(t, expectedID, actualID)
}

func waitForManifestWithL0Len(storedManifest StoredManifest, size int) *state.CoreStateSnapshot {
func waitForManifestWithL0Len(storedManifest store.StoredManifest, size int) *state.CoreStateSnapshot {
startTime := time.Now()
for time.Since(startTime) < time.Second*10 {
dbState, err := storedManifest.refresh()
dbState, err := storedManifest.Refresh()
assert2.True(err == nil, "")
if len(dbState.L0) == size {
return dbState.Clone()
Expand All @@ -192,11 +194,11 @@ func buildL0Compaction(sstList []sstable.Handle, destination uint32) Compaction
return newCompaction(sources, destination)
}

func buildTestState(t *testing.T) (objstore.Bucket, StoredManifest, *CompactorState) {
func buildTestState(t *testing.T) (objstore.Bucket, store.StoredManifest, *CompactorState) {
t.Helper()

bucket := objstore.NewInMemBucket()
option := DefaultDBOptions()
option := config.DefaultDBOptions()
option.L0SSTSizeBytes = 128
db, err := OpenWithOptions(context.Background(), testPath, bucket, option)
assert2.True(err == nil, "Could not open db")
Expand All @@ -207,14 +209,14 @@ func buildTestState(t *testing.T) (objstore.Bucket, StoredManifest, *CompactorSt
}
db.Close()

manifestStore := newManifestStore(testPath, bucket)
sm, err := loadStoredManifest(manifestStore)
manifestStore := store.NewManifestStore(testPath, bucket)
sm, err := store.LoadStoredManifest(manifestStore)
require.NoError(t, err)
require.NotNil(t, sm)

assert.True(t, err == nil, "Could not load stored manifest")
assert.True(t, sm.IsPresent(), "Could not find stored manifest")
storedManifest, _ := sm.Get()
compactorState := newCompactorState(storedManifest.dbState(), nil)
compactorState := newCompactorState(storedManifest.DbState(), nil)
return bucket, storedManifest, compactorState
}
25 changes: 13 additions & 12 deletions slatedb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/slatedb/slatedb-go/internal/compress"
"github.com/slatedb/slatedb-go/internal/sstable"
"github.com/slatedb/slatedb-go/internal/types"
"github.com/slatedb/slatedb-go/slatedb/config"
"github.com/slatedb/slatedb-go/slatedb/state"
"github.com/slatedb/slatedb-go/slatedb/store"
"log/slog"
Expand All @@ -31,12 +32,12 @@ func TestCompactorCompactsL0(t *testing.T) {
startTime := time.Now()
dbState := mo.None[*state.CoreStateSnapshot]()
for time.Since(startTime) < time.Second*10 {
sm, err := loadStoredManifest(manifestStore)
sm, err := store.LoadStoredManifest(manifestStore)
assert.NoError(t, err)
assert.True(t, sm.IsPresent())
storedManifest, _ := sm.Get()
if storedManifest.dbState().L0LastCompacted.IsPresent() {
dbState = mo.Some(storedManifest.dbState().Clone())
if storedManifest.DbState().L0LastCompacted.IsPresent() {
dbState = mo.Some(storedManifest.DbState().Clone())
break
}
time.Sleep(time.Millisecond * 50)
Expand Down Expand Up @@ -73,7 +74,7 @@ func TestCompactorCompactsL0(t *testing.T) {
func TestShouldWriteManifestSafely(t *testing.T) {
options := dbOptions(nil)
bucket, manifestStore, tableStore, db := buildTestDB(options)
sm, err := loadStoredManifest(manifestStore)
sm, err := store.LoadStoredManifest(manifestStore)
assert.NoError(t, err)
assert.True(t, sm.IsPresent())
storedManifest, _ := sm.Get()
Expand Down Expand Up @@ -109,7 +110,7 @@ func TestShouldWriteManifestSafely(t *testing.T) {
assert.NoError(t, err)

// Key aaa... will be compacted and Key jjj... will be in Level0
dbState, err := storedManifest.refresh()
dbState, err := storedManifest.Refresh()
assert.NoError(t, err)
assert.Equal(t, 1, len(dbState.L0))
assert.Equal(t, 1, len(dbState.Compacted))
Expand All @@ -126,21 +127,21 @@ func TestShouldWriteManifestSafely(t *testing.T) {
assert.Equal(t, l0IDsToCompact[0].sstID(), dbState.L0LastCompacted)
}

func buildTestDB(options DBOptions) (objstore.Bucket, *ManifestStore, *store.TableStore, *DB) {
func buildTestDB(options config.DBOptions) (objstore.Bucket, *store.ManifestStore, *store.TableStore, *DB) {
bucket := objstore.NewInMemBucket()
db, err := OpenWithOptions(context.Background(), testPath, bucket, options)
assert2.True(err == nil, "Failed to open test database")
conf := sstable.DefaultConfig()
conf.BlockSize = 32
conf.MinFilterKeys = 10
conf.Compression = options.CompressionCodec
manifestStore := newManifestStore(testPath, bucket)
manifestStore := store.NewManifestStore(testPath, bucket)
tableStore := store.NewTableStore(bucket, conf, testPath)
return bucket, manifestStore, tableStore, db
}

func dbOptions(compactorOptions *CompactorOptions) DBOptions {
return DBOptions{
func dbOptions(compactorOptions *config.CompactorOptions) config.DBOptions {
return config.DBOptions{
FlushInterval: 100 * time.Millisecond,
ManifestPollInterval: 100 * time.Millisecond,
MinFilterKeys: 0,
Expand All @@ -150,9 +151,9 @@ func dbOptions(compactorOptions *CompactorOptions) DBOptions {
}
}

func compactorOptions() DBOptions {
return DBOptions{
CompactorOptions: &CompactorOptions{
func compactorOptions() config.DBOptions {
return config.DBOptions{
CompactorOptions: &config.CompactorOptions{
PollInterval: 100 * time.Millisecond,
MaxSSTSize: 1024 * 1024 * 1024,
},
Expand Down
2 changes: 1 addition & 1 deletion slatedb/config.go → slatedb/config/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package slatedb
package config

import (
"github.com/slatedb/slatedb-go/internal/compress"
Expand Down
Loading

0 comments on commit 641782f

Please sign in to comment.