forked from yihuang/go-block-stm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
executor.go
84 lines (74 loc) · 2.08 KB
/
executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package block_stm
import (
"context"
)
// Executor fields are not mutated during execution.
type Executor struct {
ctx context.Context // context for cancellation
scheduler *Scheduler // scheduler for task management
txExecutor TxExecutor // callback to actually execute a transaction
mvMemory *MVMemory // multi-version memory for the executor
// index of the executor, used for debugging output
i int
}
func NewExecutor(
ctx context.Context,
scheduler *Scheduler,
txExecutor TxExecutor,
mvMemory *MVMemory,
i int,
) *Executor {
return &Executor{
ctx: ctx,
scheduler: scheduler,
txExecutor: txExecutor,
mvMemory: mvMemory,
i: i,
}
}
// Invariant `num_active_tasks`:
// - `NextTask` increases it if returns a valid task.
// - `TryExecute` and `NeedsReexecution` don't change it if it returns a new valid task to run,
// otherwise it decreases it.
func (e *Executor) Run() {
var kind TaskKind
version := InvalidTxnVersion
for !e.scheduler.Done() {
if !version.Valid() {
// check for cancellation
select {
case <-e.ctx.Done():
return
default:
}
version, kind = e.scheduler.NextTask()
continue
}
switch kind {
case TaskKindExecution:
version, kind = e.TryExecute(version)
case TaskKindValidation:
version, kind = e.NeedsReexecution(version)
}
}
}
func (e *Executor) TryExecute(version TxnVersion) (TxnVersion, TaskKind) {
e.scheduler.executedTxns.Add(1)
view := e.execute(version.Index)
wroteNewLocation := e.mvMemory.Record(version, view)
return e.scheduler.FinishExecution(version, wroteNewLocation)
}
func (e *Executor) NeedsReexecution(version TxnVersion) (TxnVersion, TaskKind) {
e.scheduler.validatedTxns.Add(1)
valid := e.mvMemory.ValidateReadSet(version.Index)
aborted := !valid && e.scheduler.TryValidationAbort(version)
if aborted {
e.mvMemory.ConvertWritesToEstimates(version.Index)
}
return e.scheduler.FinishValidation(version.Index, aborted)
}
func (e *Executor) execute(txn TxnIndex) *MultiMVMemoryView {
view := e.mvMemory.View(txn)
e.txExecutor(txn, view)
return view
}