forked from yihuang/go-block-stm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mviterator.go
123 lines (106 loc) · 2.75 KB
/
mviterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package block_stm
import (
storetypes "cosmossdk.io/store/types"
"github.com/tidwall/btree"
)
// MVIterator is an iterator for a multi-versioned store.
type MVIterator[V any] struct {
BTreeIteratorG[dataItem[V]]
txn TxnIndex
// cache current found value and version
value V
version TxnVersion
// record the observed reads during iteration during execution
reads []ReadDescriptor
// blocking call to wait for dependent transaction to finish, `nil` in validation mode
waitFn func(TxnIndex)
// signal the validation to fail
readEstimateValue bool
}
var _ storetypes.Iterator = (*MVIterator[[]byte])(nil)
func NewMVIterator[V any](
opts IteratorOptions, txn TxnIndex, iter btree.IterG[dataItem[V]],
waitFn func(TxnIndex),
) *MVIterator[V] {
it := &MVIterator[V]{
BTreeIteratorG: *NewBTreeIteratorG(
dataItem[V]{Key: opts.Start},
dataItem[V]{Key: opts.End},
iter,
opts.Ascending,
),
txn: txn,
waitFn: waitFn,
}
it.resolveValue()
return it
}
// Executing returns if the iterator is running in execution mode.
func (it *MVIterator[V]) Executing() bool {
return it.waitFn != nil
}
func (it *MVIterator[V]) Next() {
it.BTreeIteratorG.Next()
it.resolveValue()
}
func (it *MVIterator[V]) Value() V {
return it.value
}
func (it *MVIterator[V]) Version() TxnVersion {
return it.version
}
func (it *MVIterator[V]) Reads() []ReadDescriptor {
return it.reads
}
func (it *MVIterator[V]) ReadEstimateValue() bool {
return it.readEstimateValue
}
// resolveValue skips the non-exist values in the iterator based on the txn index, and caches the first existing one.
func (it *MVIterator[V]) resolveValue() {
inner := &it.BTreeIteratorG
for ; inner.Valid(); inner.Next() {
v, ok := it.resolveValueInner(inner.Item().Tree)
if !ok {
// abort the iterator
it.valid = false
// signal the validation to fail
it.readEstimateValue = true
return
}
if v == nil {
continue
}
it.value = v.Value
it.version = v.Version()
if it.Executing() {
it.reads = append(it.reads, ReadDescriptor{
Key: inner.Item().Key,
Version: it.version,
})
}
return
}
}
// resolveValueInner loop until we find a value that is not an estimate,
// wait for dependency if gets an ESTIMATE.
// returns:
// - (nil, true) if the value is not found
// - (nil, false) if the value is an estimate and we should fail the validation
// - (v, true) if the value is found
func (it *MVIterator[V]) resolveValueInner(tree *BTree[secondaryDataItem[V]]) (*secondaryDataItem[V], bool) {
for {
v, ok := seekClosestTxn(tree, it.txn)
if !ok {
return nil, true
}
if v.Estimate {
if it.Executing() {
it.waitFn(v.Index)
continue
}
// in validation mode, it should fail validation immediatelly
return nil, false
}
return &v, true
}
}