Skip to content

Commit

Permalink
support save and bgsave
Browse files Browse the repository at this point in the history
  • Loading branch information
HDT3213 committed May 23, 2022
1 parent 7059a12 commit 665fd2a
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 10 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ logs
# jetbrains
.idea
*.iml

158 changes: 158 additions & 0 deletions aof/rdb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package aof

import (
"github.com/hdt3213/godis/config"
"github.com/hdt3213/godis/datastruct/dict"
List "github.com/hdt3213/godis/datastruct/list"
"github.com/hdt3213/godis/datastruct/set"
SortedSet "github.com/hdt3213/godis/datastruct/sortedset"
"github.com/hdt3213/godis/interface/database"
"github.com/hdt3213/godis/lib/logger"
rdb "github.com/hdt3213/rdb/encoder"
"github.com/hdt3213/rdb/model"
"io/ioutil"
"os"
"strconv"
"time"
)

func (handler *Handler) Rewrite2RDB() error {
ctx, err := handler.startRewrite2RDB()
if err != nil {
return err
}
err = handler.rewrite2RDB(ctx)
if err != nil {
return err
}
rdbFilename := config.Properties.RDBFilename
if rdbFilename == "" {
rdbFilename = "dump.rdb"
}
err = ctx.tmpFile.Close()
if err != nil {
return err
}
err = os.Rename(ctx.tmpFile.Name(), rdbFilename)
if err != nil {
return err
}
return nil
}

func (handler *Handler) startRewrite2RDB() (*RewriteCtx, error) {
handler.pausingAof.Lock() // pausing aof
defer handler.pausingAof.Unlock()

err := handler.aofFile.Sync()
if err != nil {
logger.Warn("fsync failed")
return nil, err
}

// get current aof file size
fileInfo, _ := os.Stat(handler.aofFilename)
filesize := fileInfo.Size()
// create tmp file
file, err := ioutil.TempFile("", "*.aof")
if err != nil {
logger.Warn("tmp file create failed")
return nil, err
}
return &RewriteCtx{
tmpFile: file,
fileSize: filesize,
}, nil
}

func (handler *Handler) rewrite2RDB(ctx *RewriteCtx) error {
// load aof tmpFile
tmpHandler := handler.newRewriteHandler()
tmpHandler.LoadAof(int(ctx.fileSize))
encoder := rdb.NewEncoder(ctx.tmpFile).EnableCompress()
err := encoder.WriteHeader()
if err != nil {
return err
}
auxMap := map[string]string{
"redis-ver": "6.0.0",
"redis-bits": "64",
"aof-preamble": "0",
"ctime": strconv.FormatInt(time.Now().Unix(), 10),
}
for k, v := range auxMap {
err := encoder.WriteAux(k, v)
if err != nil {
return err
}
}

for i := 0; i < config.Properties.Databases; i++ {
keyCount, ttlCount := tmpHandler.db.GetDBSize(i)
if keyCount == 0 {
continue
}
err = encoder.WriteDBHeader(uint(i), uint64(keyCount), uint64(ttlCount))
if err != nil {
return err
}
// dump db
var err2 error
tmpHandler.db.ForEach(i, func(key string, entity *database.DataEntity, expiration *time.Time) bool {
var opts []interface{}
if expiration != nil {
opts = append(opts, rdb.WithTTL(uint64(expiration.UnixNano()/1e6)))
}
switch obj := entity.Data.(type) {
case []byte:
err = encoder.WriteStringObject(key, obj, opts...)
case *List.LinkedList:
vals := make([][]byte, 0, obj.Len())
obj.ForEach(func(i int, v interface{}) bool {
bytes, _ := v.([]byte)
vals = append(vals, bytes)
return true
})
err = encoder.WriteListObject(key, vals, opts...)
case *set.Set:
vals := make([][]byte, 0, obj.Len())
obj.ForEach(func(m string) bool {
vals = append(vals, []byte(m))
return true
})
err = encoder.WriteSetObject(key, vals, opts...)
case dict.Dict:
hash := make(map[string][]byte)
obj.ForEach(func(key string, val interface{}) bool {
bytes, _ := val.([]byte)
hash[key] = bytes
return true
})
err = encoder.WriteHashMapObject(key, hash, opts...)
case *SortedSet.SortedSet:
var entries []*model.ZSetEntry
obj.ForEach(int64(0), obj.Len(), true, func(element *SortedSet.Element) bool {
entries = append(entries, &model.ZSetEntry{
Member: element.Member,
Score: element.Score,
})
return true
})
err = encoder.WriteZSetObject(key, entries, opts...)
}
if err != nil {
err2 = err
return false
}
return true
})
if err2 != nil {
return err2
}
}
err = encoder.WriteEnd()
if err != nil {
return err
}
return nil
}
9 changes: 4 additions & 5 deletions aof/rewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,18 @@ type RewriteCtx struct {
}

// Rewrite carries out AOF rewrite
func (handler *Handler) Rewrite() {
func (handler *Handler) Rewrite() error {
ctx, err := handler.StartRewrite()
if err != nil {
logger.Warn(err)
return
return err
}
err = handler.DoRewrite(ctx)
if err != nil {
logger.Error(err)
return
return err
}

handler.FinishRewrite(ctx)
return nil
}

// DoRewrite actually rewrite aof file
Expand Down
38 changes: 38 additions & 0 deletions database/aof_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,44 @@ func TestAof(t *testing.T) {
aofReadDB.Close()
}

func TestRDB(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "godis")
if err != nil {
t.Error(err)
return
}
aofFilename := path.Join(tmpDir, "a.aof")
rdbFilename := path.Join(tmpDir, "dump.rdb")
defer func() {
_ = os.Remove(aofFilename)
_ = os.Remove(rdbFilename)
}()
config.Properties = &config.ServerProperties{
AppendOnly: true,
AppendFilename: aofFilename,
RDBFilename: rdbFilename,
}
dbNum := 4
size := 10
var prefixes []string
conn := &connection.FakeConn{}
writeDB := NewStandaloneServer()
for i := 0; i < dbNum; i++ {
prefix := utils.RandString(8)
prefixes = append(prefixes, prefix)
makeTestData(writeDB, i, prefix, size)
}
time.Sleep(time.Second) // wait for aof finished
writeDB.Exec(conn, utils.ToCmdLine("save"))
writeDB.Close()
readDB := NewStandaloneServer() // start new db and read aof file
for i := 0; i < dbNum; i++ {
prefix := prefixes[i]
validateTestData(t, readDB, i, prefix, size)
}
readDB.Close()
}

func TestRewriteAOF(t *testing.T) {
tmpFile, err := ioutil.TempFile("", "*.aof")
if err != nil {
Expand Down
48 changes: 46 additions & 2 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ func (mdb *MultiDB) Exec(c redis.Connection, cmdLine [][]byte) (result redis.Rep
return RewriteAOF(mdb, cmdLine[1:])
} else if cmdName == "flushall" {
return mdb.flushAll()
} else if cmdName == "save" {
return SaveRDB(mdb, cmdLine[1:])
} else if cmdName == "bgsave" {
return BGSaveRDB(mdb, cmdLine[1:])
} else if cmdName == "select" {
if c != nil && c.InMultiState() {
return protocol.MakeErrReply("cannot select database within multi")
Expand Down Expand Up @@ -217,6 +221,46 @@ func BGRewriteAOF(db *MultiDB, args [][]byte) redis.Reply {

// RewriteAOF start Append-Only-File rewriting and blocked until it finished
func RewriteAOF(db *MultiDB, args [][]byte) redis.Reply {
db.aofHandler.Rewrite()
return protocol.MakeStatusReply("Background append only file rewriting started")
err := db.aofHandler.Rewrite()
if err != nil {
return protocol.MakeErrReply(err.Error())
}
return protocol.MakeOkReply()
}

// SaveRDB start RDB writing and blocked until it finished
func SaveRDB(db *MultiDB, args [][]byte) redis.Reply {
if db.aofHandler == nil {
return protocol.MakeErrReply("please enable aof before using save")
}
err := db.aofHandler.Rewrite2RDB()
if err != nil {
return protocol.MakeErrReply(err.Error())
}
return protocol.MakeOkReply()
}

// BGSaveRDB asynchronously save RDB
func BGSaveRDB(db *MultiDB, args [][]byte) redis.Reply {
if db.aofHandler == nil {
return protocol.MakeErrReply("please enable aof before using save")
}
go func() {
defer func() {
if err := recover(); err != nil {
logger.Error(err)
}
}()
err := db.aofHandler.Rewrite2RDB()
if err != nil {
logger.Error(err)
}
}()
return protocol.MakeStatusReply("Background saving started")
}

// GetDBSize returns keys count and ttl key count
func (mdb *MultiDB) GetDBSize(dbIndex int) (int, int) {
db := mdb.selectDB(dbIndex)
return db.data.Len(), db.ttlMap.Len()
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/hdt3213/godis
go 1.16

require (
github.com/hdt3213/rdb v1.0.0
github.com/hdt3213/rdb v1.0.2
github.com/jolestar/go-commons-pool/v2 v2.1.1
github.com/shopspring/decimal v1.2.0
)
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/hdt3213/rdb v1.0.0 h1:rG8pRz6Y+2XtZw4C35rize3nXByClkFmwfM5ffj7sFs=
github.com/hdt3213/rdb v1.0.0/go.mod h1:m2CaP16oqYROIQMUUjB3WkqQWfDi/VebnHUDVRl4cIM=
github.com/hdt3213/rdb v1.0.2 h1:mPXShIqjuzgioBkwllj8XnlRQaPtbulNyuXeycxOMGs=
github.com/hdt3213/rdb v1.0.2/go.mod h1:m2CaP16oqYROIQMUUjB3WkqQWfDi/VebnHUDVRl4cIM=
github.com/jolestar/go-commons-pool/v2 v2.1.1 h1:KrbCEvx5KhwcHzLTWIE8SJJQL7zzNto5in+wnO9/gSA=
github.com/jolestar/go-commons-pool/v2 v2.1.1/go.mod h1:kTOzcguO2zUoEd+BySdg7Xhk/YE0HEr2bAHdWDkhMXg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
Expand Down
1 change: 1 addition & 0 deletions interface/database/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type EmbedDB interface {
ForEach(dbIndex int, cb func(key string, data *DataEntity, expiration *time.Time) bool)
RWLocks(dbIndex int, writeKeys []string, readKeys []string)
RWUnLocks(dbIndex int, writeKeys []string, readKeys []string)
GetDBSize(dbIndex int) (int, int)
}

// DataEntity stores data bound to a key, including a string, list, hash, set and so on
Expand Down

0 comments on commit 665fd2a

Please sign in to comment.