Skip to content

Commit

Permalink
Fix to match api changes, move patch to init and reformat
Browse files Browse the repository at this point in the history
  • Loading branch information
jayaprabhakar committed Dec 22, 2024
1 parent 4ff8305 commit b55b17b
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 80 deletions.
75 changes: 30 additions & 45 deletions fizztest/fizztest_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@ import (
"github.com/oklog/ulid/v2"
"github.com/slatedb/slatedb-go/slatedb"
"github.com/slatedb/slatedb-go/slatedb/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"strconv"
"strings"
"testing"
"sync/atomic"
"time"
)

Expand All @@ -22,6 +20,13 @@ type Choice struct {
Value interface{}
}

func (c *Choice) GetName() string {
return c.Name
}
func (c *Choice) GetValue() interface{} {
return c.Value
}

type Role interface {
GetState() (map[string]interface{}, error)
}
Expand Down Expand Up @@ -89,12 +94,12 @@ func (r *SlateDbRoleAdapter) GetState() (map[string]interface{}, error) {
}

func (r *SlateDbRoleAdapter) Put(choices []Choice) (interface{}, error) {
key := []byte(choices[0].Value.(string))
value := []byte(choices[1].Value.(string))
key := []byte(choices[0].GetValue().(string))
value := []byte(choices[1].GetValue().(string))

writeOptions := slatedb.DefaultWriteOptions()
writeOptions.AwaitFlush = false
if choices[1].Value.(string) == "notfound" {
writeOptions.AwaitDurable = false
if choices[1].GetValue().(string) == "notfound" {
r.db.DeleteWithOptions(key, writeOptions)
} else {
r.db.PutWithOptions(key, value, writeOptions)
Expand All @@ -104,8 +109,8 @@ func (r *SlateDbRoleAdapter) Put(choices []Choice) (interface{}, error) {
}

func (r *SlateDbRoleAdapter) Get(choices []Choice) (interface{}, error) {
key := []byte(choices[0].Value.(string))
level := choices[1].Value.(string)
key := []byte(choices[0].GetValue().(string))
level := choices[1].GetValue().(string)
readOptions := slatedb.DefaultReadOptions()
if level == "Uncommitted" {
readOptions = slatedb.ReadOptions{ReadLevel: slatedb.Uncommitted}
Expand All @@ -121,18 +126,24 @@ func (r *SlateDbRoleAdapter) Get(choices []Choice) (interface{}, error) {
}

func (r *SlateDbRoleAdapter) FlushWal(_ []Choice) (interface{}, error) {
//fmt.Println("Calling: db.FlushWal()")
return nil, r.db.FlushWAL()
}

func (r *SlateDbRoleAdapter) FlushMemtable(_ []Choice) (interface{}, error) {
//fmt.Println("Calling: db.FlushMemtableToL0()")
return nil, r.db.FlushMemtableToL0()
}

type Model struct {
Roles map[string]Role `json:"roles"`
State map[string]interface{} `json:"state"`

patches *gomonkey.Patches
timeMs uint64

Check failure on line 142 in fizztest/fizztest_adapter.go

View workflow job for this annotation

GitHub Actions / Lint

field `timeMs` is unused (unused)
}

func NewModel() any {
return &Model{}
}

func (m *Model) ToJson() string {
Expand All @@ -144,13 +155,16 @@ func (m *Model) ToJson() string {
return string(bytes)
}

func (m *Model) Init() {
timeMs := uint64(0)
m.patches = gomonkey.ApplyFunc(ulid.Make, func() ulid.ULID {
timeMs++
return ulid.MustNew(timeMs, nil)
})
var ulidTimeMs atomic.Uint64

func init() {
gomonkey.ApplyFunc(ulid.Make, func() ulid.ULID {
newTimeMs := ulidTimeMs.Add(1)
return ulid.MustNew(newTimeMs, nil)
})
}
func (m *Model) Init() {
ulidTimeMs.Store(0)
bucket := objstore.NewInMemBucket()
dbOptions := slatedb.DefaultDBOptions()
dbOptions.FlushInterval = 10 * time.Minute
Expand All @@ -164,45 +178,16 @@ func (m *Model) Init() {
m.State = make(map[string]interface{})
m.State["store"] = store
m.State["writer"] = writer
m.State["next_ulid"] = func() int { return int(timeMs) + 1 }

m.Roles = make(map[string]Role)
m.Roles["ObjectStore#0"] = store
m.Roles["SlateDb#0"] = writer
}

func (m *Model) InternalCleanup() {
m.patches.Reset()
m.State["writer"].(*SlateDbRoleAdapter).db.Close()
}

func AssertModelEquals(t *testing.T, exp string, model *Model, retVal interface{}) {
var node map[string]interface{}
if err := json.Unmarshal([]byte(exp), &node); err != nil {
panic(err)
}

expectedRolesMap := make(map[string]map[string]interface{})
if node["roles"] != nil {
expectedRoles := node["roles"].([]interface{})
for _, r := range expectedRoles {
role := r.(map[string]interface{})
expectedRolesMap[role["ref_string"].(string)] = role["fields"].(map[string]interface{})
}
}

for roleRef, role := range model.Roles {
state, err := role.GetState()
if err != nil {
panic(err)
}
e, ok := expectedRolesMap[roleRef]

require.True(t, ok)
assert.Equal(t, e, state)
}
}

// Function to extract base32 string from "compacted/somebase32string.sst"
func extractBaseName(path string) string {
parts := strings.Split(path, "/")
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/slatedb/slatedb-go
go 1.23.1

require (
github.com/agiledragon/gomonkey/v2 v2.12.0
github.com/gammazero/deque v0.2.1
github.com/golang/snappy v0.0.4
github.com/google/flatbuffers v24.3.25+incompatible
Expand All @@ -18,7 +19,6 @@ require (
)

require (
github.com/agiledragon/gomonkey/v2 v2.12.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand All @@ -36,6 +36,6 @@ require (
github.com/prometheus/procfs v0.11.1 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.26.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
google.golang.org/protobuf v1.36.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
12 changes: 4 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ github.com/huandu/go-assert v1.1.5/go.mod h1:yOLvuqZwmcHIC5rIzrBhT7D3Q9c3GFnd0Jr
github.com/huandu/skiplist v1.2.1 h1:dTi93MgjwErA/8idWTzIw4Y1kZsMWx35fmI2c8Rij7w=
github.com/huandu/skiplist v1.2.1/go.mod h1:7v3iFjLcSAzO4fN5B8dvebvo/qsfumiLiDXMrPiHF9w=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kapetan-io/tackle v0.11.0 h1:xcQ2WgES8rjsd0ZMBfFTMuCs8YG4+1r2OAPY0+mHXjM=
github.com/kapetan-io/tackle v0.11.0/go.mod h1:94m0H3j8pm9JMsAuqBsC/Y08WpAUh01ugkFxABjjHd8=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
Expand Down Expand Up @@ -73,12 +75,6 @@ github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 h1:VjG0mwhN1Dkn
github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97/go.mod h1:vyzFrBXgP+fGNG2FopEGWOO/zrIuoy7zt3LpLeezRsw=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -89,8 +85,8 @@ golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ=
google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
Expand Down
47 changes: 22 additions & 25 deletions slatedb/fizztest_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,42 @@ package slatedb
import (
"fmt"
"github.com/oklog/ulid/v2"
"github.com/slatedb/slatedb-go/internal/sstable"
"github.com/slatedb/slatedb-go/slatedb/table"
)

func GetState(db *DB) (map[string]interface{}, error) {
m := make(map[string]interface{})
wal := db.state.wal.clone()
walState, err := getKvTableState(wal.table)
snapshot := db.state.snapshot()
walState, err := getKvTableStateFromIter(snapshot.wal.Iter())
if err != nil {
return nil, err
}
m["wal"] = walState

memtable := db.state.memtable.clone()
memtableState, err := getKvTableState(memtable.table)
memtableState, err := getKvTableStateFromIter(db.state.memtable.Iter())
if err != nil {
return nil, err
}
m["memtable"] = memtableState

m["wal_index"] = float64(db.state.state.core.nextWalSstID)
m["wal_index"] = float64(db.state.core.nextWalSstID.Load())

// TODO: implement immutable_wal and immutable_memtable
m["immutable_wal"] = make([]interface{}, 0)
m["immutable_memtable"] = make(map[string]interface{})

// get the l0 from core state
l0ssts := db.state.state.core.l0
l0ssts := db.state.core.l0
l0s := make([]interface{}, len(l0ssts))
for i, sst := range l0ssts {
l0s[i] = fmt.Sprintf("compacted/ulid-%04d.sst", sst.id.compactedID().MustGet().Time())
l0s[i] = fmt.Sprintf("compacted/ulid-%04d.sst", sst.Id.CompactedID().MustGet().Time())
}
m["l0"] = l0s
return m, nil
}

func getKvTableState(table *KVTable) (map[string]interface{}, error) {
iter := table.iter()
func getKvTableStateFromIter(iter *table.KVTableIterator) (map[string]interface{}, error) {
m := make(map[string]interface{})
for {
optionalKv, err := iter.NextEntry()
Expand All @@ -50,53 +50,50 @@ func getKvTableState(table *KVTable) (map[string]interface{}, error) {
}
kv := optionalKv.MustGet()
//fmt.Printf("Key: %s, Value: %v\n", kv.Key, kv.ValueDel)
if kv.ValueDel.IsTombstone {
if kv.Value.IsTombstone() {
m[string(kv.Key)] = "notfound"
} else {
m[string(kv.Key)] = string(kv.ValueDel.Value)
m[string(kv.Key)] = string(kv.Value.Value)
}
}
return m, nil
}

func WalSstToMap(db *DB, sstId uint64) (map[string]interface{}, error) {
sst, err := db.tableStore.openSST(newSSTableIDWal(sstId))
sst, err := db.tableStore.OpenSST(sstable.NewIDWal(sstId))
if err != nil {
return nil, err
}
return KeyValSstToMap(db, sst)
}

func CompactedSstToMap(db *DB, sstId ulid.ULID) (map[string]interface{}, error) {
sst, err := db.tableStore.openSST(newSSTableIDCompacted(sstId))
sst, err := db.tableStore.OpenSST(sstable.NewIDCompacted(sstId))
if err != nil {
return nil, err
}
return KeyValSstToMap(db, sst)
}

func KeyValSstToMap(db *DB, sst *SSTableHandle) (map[string]interface{}, error) {
iter, err := newSSTIterator(sst, db.tableStore.clone(), 1, 1)
func KeyValSstToMap(db *DB, sst *sstable.Handle) (map[string]interface{}, error) {
iter, err := sstable.NewIterator(sst, db.tableStore.Clone())
if err != nil {
return nil, err
}
m := make(map[string]interface{})
for {
entry, err := iter.NextEntry()
if err != nil {
return nil, err
}
kvDel, _ := entry.Get()
if entry.IsAbsent() {
entry, ok := iter.NextEntry()
if !ok {
break
}
if string(kvDel.Key) == "placeholder" {

if string(entry.Key) == "placeholder" {
continue
}
if kvDel.ValueDel.IsTombstone {
m[string(kvDel.Key)] = "notfound"
if entry.Value.IsTombstone() {
m[string(entry.Key)] = "notfound"
} else {
m[string(kvDel.Key)] = string(kvDel.ValueDel.Value)
m[string(entry.Key)] = string(entry.Value.Value)
}
}
return m, nil
Expand Down

0 comments on commit b55b17b

Please sign in to comment.