diff --git a/fizztest/fizztest_adapter.go b/fizztest/fizztest_adapter.go new file mode 100644 index 0000000..cb21dce --- /dev/null +++ b/fizztest/fizztest_adapter.go @@ -0,0 +1,211 @@ +package main + +import ( + "encoding/json" + "errors" + "fmt" + "github.com/agiledragon/gomonkey/v2" + "github.com/oklog/ulid/v2" + "github.com/slatedb/slatedb-go/slatedb" + "github.com/slatedb/slatedb-go/slatedb/common" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "strconv" + "strings" + "testing" + "time" +) + +type Choice struct { + Name string + Value interface{} +} + +type Role interface { + GetState() (map[string]interface{}, error) +} + +type ObjectStoreRole interface { + Role +} +type ObjectStoreRoleAdapter struct { + bucket *objstore.InMemBucket + db *slatedb.DB +} + +// assert that SlateDbRoleAdapter satisfies SlateDbRole +var _ ObjectStoreRole = (*ObjectStoreRoleAdapter)(nil) + +func (r *ObjectStoreRoleAdapter) GetState() (map[string]interface{}, error) { + + objects := make(map[string]interface{}) + for k, _ := range r.bucket.Objects() { + if strings.HasPrefix(k, "wal/") { + baseName := extractBaseName(k) + sstId, err := strconv.Atoi(baseName) + if err != nil { + panic(err) + } + data, err := slatedb.WalSstToMap(r.db, uint64(sstId)) + if err != nil { + return nil, err + } + objects[fmt.Sprintf("wal/%04d.sst", sstId)] = data + } else if strings.HasPrefix(k, "compacted/") { + baseName := extractBaseName(k) + sstId, err := ulid.Parse(baseName) + if err != nil { + return nil, err + } + + data, err := slatedb.CompactedSstToMap(r.db, sstId) + if err != nil { + return nil, err + } + objects[fmt.Sprintf("compacted/ulid-%04d.sst", sstId.Time())] = data + } + } + return map[string]interface{}{"objects": objects}, nil +} + +type SlateDbRole interface { + Role + Put(choices []Choice) (interface{}, error) + Get(choices []Choice) (interface{}, error) + FlushWal(choices []Choice) (interface{}, error) + FlushMemtable(choices []Choice) (interface{}, error) +} + +type SlateDbRoleAdapter struct { + db *slatedb.DB +} + +// assert that SlateDbRoleAdapter satisfies SlateDbRole +var _ SlateDbRole = (*SlateDbRoleAdapter)(nil) + +func (r *SlateDbRoleAdapter) GetState() (map[string]interface{}, error) { + return slatedb.GetState(r.db) +} + +func (r *SlateDbRoleAdapter) Put(choices []Choice) (interface{}, error) { + key := []byte(choices[0].Value.(string)) + value := []byte(choices[1].Value.(string)) + + writeOptions := slatedb.DefaultWriteOptions() + writeOptions.AwaitFlush = false + if choices[1].Value.(string) == "notfound" { + r.db.DeleteWithOptions(key, writeOptions) + } else { + r.db.PutWithOptions(key, value, writeOptions) + } + + return nil, nil +} + +func (r *SlateDbRoleAdapter) Get(choices []Choice) (interface{}, error) { + key := []byte(choices[0].Value.(string)) + level := choices[1].Value.(string) + readOptions := slatedb.DefaultReadOptions() + if level == "Uncommitted" { + readOptions = slatedb.ReadOptions{ReadLevel: slatedb.Uncommitted} + } + val, err := r.db.GetWithOptions(key, readOptions) + if err != nil { + if errors.Is(err, common.ErrKeyNotFound) { + return "notfound", nil + } + return nil, err + } + return string(val), nil +} + +func (r *SlateDbRoleAdapter) FlushWal(_ []Choice) (interface{}, error) { + return nil, r.db.FlushWAL() +} + +func (r *SlateDbRoleAdapter) FlushMemtable(_ []Choice) (interface{}, error) { + return nil, r.db.FlushMemtableToL0() +} + +type Model struct { + Roles map[string]Role `json:"roles"` + State map[string]interface{} `json:"state"` + + patches *gomonkey.Patches +} + +func (m *Model) ToJson() string { + // json marshall to string + bytes, err := json.Marshal(m) + if err != nil { + panic(err) + } + return string(bytes) +} + +func (m *Model) Init() { + timeMs := uint64(0) + m.patches = gomonkey.ApplyFunc(ulid.Make, func() ulid.ULID { + timeMs++ + return ulid.MustNew(timeMs, nil) + }) + + bucket := objstore.NewInMemBucket() + dbOptions := slatedb.DefaultDBOptions() + dbOptions.FlushInterval = 10 * time.Minute + dbOptions.CompactorOptions.PollInterval = 10 * time.Minute + + db, _ := slatedb.OpenWithOptions("", bucket, dbOptions) + + writer := &SlateDbRoleAdapter{db} + store := &ObjectStoreRoleAdapter{bucket: bucket, db: db} + + m.State = make(map[string]interface{}) + m.State["store"] = store + m.State["writer"] = writer + m.State["next_ulid"] = func() int { return int(timeMs) + 1 } + + m.Roles = make(map[string]Role) + m.Roles["ObjectStore#0"] = store + m.Roles["SlateDb#0"] = writer +} + +func (m *Model) InternalCleanup() { + m.patches.Reset() + m.State["writer"].(*SlateDbRoleAdapter).db.Close() +} + +func AssertModelEquals(t *testing.T, exp string, model *Model, retVal interface{}) { + var node map[string]interface{} + if err := json.Unmarshal([]byte(exp), &node); err != nil { + panic(err) + } + + expectedRolesMap := make(map[string]map[string]interface{}) + if node["roles"] != nil { + expectedRoles := node["roles"].([]interface{}) + for _, r := range expectedRoles { + role := r.(map[string]interface{}) + expectedRolesMap[role["ref_string"].(string)] = role["fields"].(map[string]interface{}) + } + } + + for roleRef, role := range model.Roles { + state, err := role.GetState() + if err != nil { + panic(err) + } + e, ok := expectedRolesMap[roleRef] + + require.True(t, ok) + assert.Equal(t, e, state) + } +} + +// Function to extract base32 string from "compacted/somebase32string.sst" +func extractBaseName(path string) string { + parts := strings.Split(path, "/") + fileName := parts[len(parts)-1] + return strings.TrimSuffix(fileName, ".sst") +} diff --git a/go.mod b/go.mod index b740ec1..41f560b 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( ) require ( + github.com/agiledragon/gomonkey/v2 v2.12.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index a14d176..3c889ee 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/agiledragon/gomonkey/v2 v2.12.0 h1:ek0dYu9K1rSV+TgkW5LvNNPRWyDZVIxGMCFI6Pz9o38= +github.com/agiledragon/gomonkey/v2 v2.12.0/go.mod h1:ap1AmDzcVOAz1YpeJ3TCzIgstoaWLA6jbbgxfB4w2iY= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -24,12 +26,12 @@ github.com/google/flatbuffers v24.3.25+incompatible h1:CX395cjN9Kke9mmalRoL3d81A github.com/google/flatbuffers v24.3.25+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/huandu/go-assert v1.1.5 h1:fjemmA7sSfYHJD7CUqs9qTwwfdNAx7/j2/ZlHXzNB3c= github.com/huandu/go-assert v1.1.5/go.mod h1:yOLvuqZwmcHIC5rIzrBhT7D3Q9c3GFnd0JrPVhn/06U= github.com/huandu/skiplist v1.2.1 h1:dTi93MgjwErA/8idWTzIw4Y1kZsMWx35fmI2c8Rij7w= github.com/huandu/skiplist v1.2.1/go.mod h1:7v3iFjLcSAzO4fN5B8dvebvo/qsfumiLiDXMrPiHF9w= -github.com/kapetan-io/tackle v0.11.0 h1:xcQ2WgES8rjsd0ZMBfFTMuCs8YG4+1r2OAPY0+mHXjM= -github.com/kapetan-io/tackle v0.11.0/go.mod h1:94m0H3j8pm9JMsAuqBsC/Y08WpAUh01ugkFxABjjHd8= +github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -61,6 +63,8 @@ github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjR github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/samber/mo v1.13.0 h1:LB1OwfJMju3a6FjghH+AIvzMG0ZPOzgTWj1qaHs1IQ4= github.com/samber/mo v1.13.0/go.mod h1:BfkrCPuYzVG3ZljnZB783WIJIGk1mcZr9c9CPf8tAxs= +github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= +github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= @@ -69,11 +73,22 @@ github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 h1:VjG0mwhN1Dkn github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97/go.mod h1:vyzFrBXgP+fGNG2FopEGWOO/zrIuoy7zt3LpLeezRsw= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/slatedb/fizztest_utils.go b/slatedb/fizztest_utils.go new file mode 100644 index 0000000..e0b6b79 --- /dev/null +++ b/slatedb/fizztest_utils.go @@ -0,0 +1,103 @@ +package slatedb + +import ( + "fmt" + "github.com/oklog/ulid/v2" +) + +func GetState(db *DB) (map[string]interface{}, error) { + m := make(map[string]interface{}) + wal := db.state.wal.clone() + walState, err := getKvTableState(wal.table) + if err != nil { + return nil, err + } + m["wal"] = walState + + memtable := db.state.memtable.clone() + memtableState, err := getKvTableState(memtable.table) + if err != nil { + return nil, err + } + m["memtable"] = memtableState + + m["wal_index"] = float64(db.state.state.core.nextWalSstID) + + // TODO: implement immutable_wal and immutable_memtable + m["immutable_wal"] = make([]interface{}, 0) + m["immutable_memtable"] = make(map[string]interface{}) + + // get the l0 from core state + l0ssts := db.state.state.core.l0 + l0s := make([]interface{}, len(l0ssts)) + for i, sst := range l0ssts { + l0s[i] = fmt.Sprintf("compacted/ulid-%04d.sst", sst.id.compactedID().MustGet().Time()) + } + m["l0"] = l0s + return m, nil +} + +func getKvTableState(table *KVTable) (map[string]interface{}, error) { + iter := table.iter() + m := make(map[string]interface{}) + for { + optionalKv, err := iter.NextEntry() + if err != nil { + return nil, err + } + if optionalKv.IsAbsent() { + break + } + kv := optionalKv.MustGet() + //fmt.Printf("Key: %s, Value: %v\n", kv.Key, kv.ValueDel) + if kv.ValueDel.IsTombstone { + m[string(kv.Key)] = "notfound" + } else { + m[string(kv.Key)] = string(kv.ValueDel.Value) + } + } + return m, nil +} + +func WalSstToMap(db *DB, sstId uint64) (map[string]interface{}, error) { + sst, err := db.tableStore.openSST(newSSTableIDWal(sstId)) + if err != nil { + return nil, err + } + return KeyValSstToMap(db, sst) +} + +func CompactedSstToMap(db *DB, sstId ulid.ULID) (map[string]interface{}, error) { + sst, err := db.tableStore.openSST(newSSTableIDCompacted(sstId)) + if err != nil { + return nil, err + } + return KeyValSstToMap(db, sst) +} + +func KeyValSstToMap(db *DB, sst *SSTableHandle) (map[string]interface{}, error) { + iter, err := newSSTIterator(sst, db.tableStore.clone(), 1, 1) + if err != nil { + return nil, err + } + m := make(map[string]interface{}) + for { + entry, err := iter.NextEntry() + if err != nil { + return nil, err + } + kvDel, _ := entry.Get() + if entry.IsAbsent() { + break + } + if string(kvDel.Key) == "placeholder" { + continue + } + if kvDel.ValueDel.IsTombstone { + m[string(kvDel.Key)] = "notfound" + } else { + m[string(kvDel.Key)] = string(kvDel.ValueDel.Value) + } + } + return m, nil +}