This repository has been archived by the owner on Mar 19, 2024. It is now read-only.
forked from cockroachdb/pebble
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathversion_set_test.go
108 lines (97 loc) · 3.19 KB
/
version_set_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.
package pebble
import (
"io"
"testing"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/record"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
)
func writeAndIngest(t *testing.T, mem vfs.FS, d *DB, k InternalKey, v []byte, filename string) {
path := mem.PathJoin("ext", filename)
f, err := mem.Create(path)
require.NoError(t, err)
w := sstable.NewWriter(f, sstable.WriterOptions{})
require.NoError(t, w.Add(k, v))
require.NoError(t, w.Close())
require.NoError(t, d.Ingest([]string{path}))
}
func TestVersionSetCheckpoint(t *testing.T) {
mem := vfs.NewMem()
require.NoError(t, mem.MkdirAll("ext", 0755))
opts := &Options{
FS: mem,
MaxManifestFileSize: 1,
}
d, err := Open("", opts)
require.NoError(t, err)
// Multiple manifest files are created such that the latest one must have a correct snapshot
// of the preceding state for the DB to be opened correctly and see the written data.
writeAndIngest(t, mem, d, base.MakeInternalKey([]byte("a"), 0, InternalKeyKindSet), []byte("b"), "a")
writeAndIngest(t, mem, d, base.MakeInternalKey([]byte("c"), 0, InternalKeyKindSet), []byte("d"), "c")
require.NoError(t, d.Close())
d, err = Open("", opts)
require.NoError(t, err)
checkValue := func(k string, expected string) {
v, closer, err := d.Get([]byte(k))
require.NoError(t, err)
require.Equal(t, expected, string(v))
closer.Close()
}
checkValue("a", "b")
checkValue("c", "d")
require.NoError(t, d.Close())
}
func TestVersionSetSeqNums(t *testing.T) {
mem := vfs.NewMem()
require.NoError(t, mem.MkdirAll("ext", 0755))
opts := &Options{
FS: mem,
MaxManifestFileSize: 1,
}
d, err := Open("", opts)
require.NoError(t, err)
writeAndIngest(t, mem, d, base.MakeInternalKey([]byte("a"), 0, InternalKeyKindSet), []byte("b"), "a")
writeAndIngest(t, mem, d, base.MakeInternalKey([]byte("c"), 0, InternalKeyKindSet), []byte("d"), "c")
require.NoError(t, d.Close())
d, err = Open("", opts)
require.NoError(t, err)
defer d.Close()
// Check that the manifest has the correct LastSeqNum, equalling the highest
// observed SeqNum.
filenames, err := mem.List("")
require.NoError(t, err)
var manifest vfs.File
for _, filename := range filenames {
fileType, _, ok := base.ParseFilename(mem, filename)
if ok && fileType == fileTypeManifest {
manifest, err = mem.Open(filename)
require.NoError(t, err)
}
}
require.NotNil(t, manifest)
defer manifest.Close()
rr := record.NewReader(manifest, 0 /* logNum */)
lastSeqNum := uint64(0)
for {
r, err := rr.Next()
if err == io.EOF {
break
}
require.NoError(t, err)
var ve versionEdit
err = ve.Decode(r)
require.NoError(t, err)
if ve.LastSeqNum != 0 {
lastSeqNum = ve.LastSeqNum
}
}
// 2 ingestions happened, so LastSeqNum should equal 2.
require.Equal(t, uint64(2), lastSeqNum)
// logSeqNum is always one greater than the last assigned sequence number.
require.Equal(t, d.mu.versions.atomic.logSeqNum, lastSeqNum+1)
}