It's a golang implementation of the core part of morphStream
1 with some modifications.
GorphStream, inheriting the basic idea of morphStream, is a lockless Transactional Stream Processing Engine (TSPE) featuring high throughput.
The main idea is to organize atomic state visiting operations and their time/parameter dependency with a directed multi-graph, decomposing and assigning operations to each working thread to achieve maximized parallelism with minimized synchronization.
There are several parts in directories:
- Events under
gorphStream/events
. This defines the basic allowed operations and transactions to provide user logic coding. WhengorphStream
gets some events, it would decompose them into operations for later building TPG. - Task Precedence Graph (TPG) under
gorphStream/tpg
. With a bunch of events arrival,gorphStream
map the dependency of operations to this TPG. Analysis and scheduling would be done when constructing and traversing this TPG. - Multi-version Storage Engine (MV-Store) under
gorphStream/storage
. This is a storage engine providing log-based history of states and rolling back. This helps to provide transactional semantics with commitment after successful transactions and rolling back after failures.
Rich and detailed documents for each part could be found in each part of this implementation.
As example provided in main.go
, fairly intuitive to build your own transactions.
This example demonstrates transferring funds between accounts A and B. What coder does is:
- Define state schema: We defined two states as A and B's deposit to init the storage.
- Define the operations: Register callbacks in operations to define transfering and deposit.
- Define the transactions: Combine the operations into transactions.
- Trigger start: Send the shuffled transaction batch to
gorphStream
and execute.
const BANKER_SCHEMA = 2
// The part of Callbacks providing `Operation.Do()`
// Check the Balance in the other account and write to this one.
var transferReceive = func(target storage.ParamView, params storage.ParamView) error {
if params.Get(0) >= V2 {
total := params.Get(1) + V2
target.Set(total)
return nil
} else {
return errors.New("insufficient balance")
}
}
// Other operation callbacks.
...
// Assemble into transactions.
var TransferA2BTxn = events.Txn{
Ops: []events.Operation{
// B Add.
&events.W{
Name: "B receive from A",
Params: []int{A, B},
Target: B,
Do: transferReceive,
},
// A decrease.
&events.W{
Name: "A send to B",
Params: []int{A},
Target: A,
Do: transferSend,
},
},
Timestamp: int64(222),
}
// Other transactions.
...
func main(){
storage.Init(cmd.BANKER_SCHEMA)
tpg.Construct(
// Shuffled input.
[]*events.Txn{
&cmd.TransferA2BTxn,
&cmd.TransferB2ATxn,
&cmd.DepositTxn,
},
).Handle()
storage.Dump()
}
To run the example, simply:
go mod init gorphStream && go mod tidy
go run main.go
The output shall be:
Storage 0:500
Storage 1:0
- TPG Construction.
- Coding and testing.
- (Optional) Parallelizing.
- TPG Traversal.
- DFS.
- Novel NotifyDFS.
- Coding.
- Verification.
- Multi-Version Storage.
- Transaction abortion and rolling back.
- Optimization and Benchmark.
Footnotes
-
[SIGMOD] Yancan Mao and Jianjun Zhao and Shuhao Zhang and Haikun Liu and Volker Markl. MorphStream: Adaptive Scheduling for Scalable Transactional Stream Processing on Multicores, SIGMOD, 2023 ↩