Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OCC] Add test for scheduler with zero conflicts #368

Open
wants to merge 25 commits into
base: occ-main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6bc6c90
Add occ todos / comments (#317)
udpatil Sep 13, 2023
b66d23e
Multiversion Item Implementation and Tests (#318)
udpatil Sep 26, 2023
0048776
[occ] Add incarnation field (#321)
udpatil Sep 29, 2023
5d8941c
[occ] Implement basic multiversion store (#322)
udpatil Oct 6, 2023
dac5f7b
[occ] Add concurrency worker configuration (#324)
stevenlanders Oct 9, 2023
94bb98f
[occ] Occ multiversion store (#326)
udpatil Oct 10, 2023
5f89416
[occ] Add batch tx delivery interface (#327)
stevenlanders Oct 10, 2023
571d00a
[occ] MVKV store implementation and tests (#323)
udpatil Oct 10, 2023
9886602
[occ] Add validation function for transaction state to multiversionst…
udpatil Oct 13, 2023
293ac79
[occ] Add basic worker task and scheduler shell (#328)
stevenlanders Oct 17, 2023
dfb2260
[occ] Implement iterator for mvkv (#329)
udpatil Oct 17, 2023
663716a
fix dependency (#334)
udpatil Oct 17, 2023
b34d61c
[occ] Iterateset tracking and validation implementation (#337)
udpatil Oct 19, 2023
0aebbc9
[occ] Add scheduler logic for validation (#336)
stevenlanders Oct 19, 2023
096041b
[occ] Fix situation where no stores causes a panic (#338)
stevenlanders Oct 20, 2023
0b9193c
Add occ flag check to context (#340)
stevenlanders Oct 23, 2023
27484e4
[occ] Add struct field and helpers for estimate prefills (#341)
udpatil Oct 24, 2023
95ddc84
Fix map access panic (#343)
stevenlanders Oct 30, 2023
be4a4ae
Gen estimates writeset (#344)
udpatil Nov 3, 2023
931e2f6
[OCC] Add trace spans to scheduler (#347)
stevenlanders Nov 6, 2023
eac8657
[occ] Fix parent store readset validation (#348)
udpatil Nov 10, 2023
6260732
[occ] OCC scheduler and validation fixes (#359)
udpatil Nov 22, 2023
c660786
[occ] Add optimizations for multiversion and mvkv (#361)
udpatil Nov 27, 2023
3c5cfcc
[OCC] Add scheduler goroutine pool and optimizations (#362)
stevenlanders Nov 29, 2023
59f8e5c
add a no-overlap test
stevenlanders Nov 29, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ on:
push:
branches:
- main
- occ-main # TODO: remove after occ work is done

permissions:
contents: read
Expand Down
24 changes: 23 additions & 1 deletion baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"syscall"
"time"

"github.com/cosmos/cosmos-sdk/tasks"

"github.com/armon/go-metrics"
"github.com/gogo/protobuf/proto"
abci "github.com/tendermint/tendermint/abci/types"
Expand Down Expand Up @@ -234,11 +236,31 @@ func (app *BaseApp) CheckTx(ctx context.Context, req *abci.RequestCheckTx) (*abc
}, nil
}

// DeliverTxBatch executes multiple txs
func (app *BaseApp) DeliverTxBatch(ctx sdk.Context, req sdk.DeliverTxBatchRequest) (res sdk.DeliverTxBatchResponse) {
scheduler := tasks.NewScheduler(app.concurrencyWorkers, app.TracingInfo, app.DeliverTx)
// This will basically no-op the actual prefill if the metadata for the txs is empty

// process all txs, this will also initializes the MVS if prefill estimates was disabled
txRes, err := scheduler.ProcessAll(ctx, req.TxEntries)
if err != nil {
// TODO: handle error
}

responses := make([]*sdk.DeliverTxResult, 0, len(req.TxEntries))
for _, tx := range txRes {
responses = append(responses, &sdk.DeliverTxResult{Response: tx})
}
return sdk.DeliverTxBatchResponse{Results: responses}
}

// DeliverTx implements the ABCI interface and executes a tx in DeliverTx mode.
// State only gets persisted if all messages are valid and get executed successfully.
// Otherwise, the ResponseDeliverTx will contain releveant error information.
// Otherwise, the ResponseDeliverTx will contain relevant error information.
// Regardless of tx execution outcome, the ResponseDeliverTx will contain relevant
// gas execution context.
// TODO: (occ) this is the function called from sei-chain to perform execution of a transaction.
// We'd likely replace this with an execution tasks that is scheduled by the OCC scheduler
func (app *BaseApp) DeliverTx(ctx sdk.Context, req abci.RequestDeliverTx) (res abci.ResponseDeliverTx) {
defer telemetry.MeasureSince(time.Now(), "abci", "deliver_tx")
defer func() {
Expand Down
39 changes: 31 additions & 8 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.opentelemetry.io/otel/trace"

"github.com/armon/go-metrics"
"github.com/cosmos/cosmos-sdk/server/config"
"github.com/cosmos/cosmos-sdk/utils/tracing"
"github.com/gogo/protobuf/proto"
sdbm "github.com/sei-protocol/sei-tm-db/backends"
Expand Down Expand Up @@ -60,7 +61,8 @@ const (
FlagArchivalArweaveIndexDBFullPath = "archival-arweave-index-db-full-path"
FlagArchivalArweaveNodeURL = "archival-arweave-node-url"

FlagChainID = "chain-id"
FlagChainID = "chain-id"
FlagConcurrencyWorkers = "concurrency-workers"
)

var (
Expand Down Expand Up @@ -168,6 +170,8 @@ type BaseApp struct { //nolint: maligned
TmConfig *tmcfg.Config

TracingInfo *tracing.Info

concurrencyWorkers int
}

type appStore struct {
Expand Down Expand Up @@ -294,6 +298,16 @@ func NewBaseApp(
app.cms.(*rootmulti.Store).SetOrphanConfig(app.orphanConfig)
}

// if no option overrode already, initialize to the flags value
// this avoids forcing every implementation to pass an option, but allows it
if app.concurrencyWorkers == 0 {
app.concurrencyWorkers = cast.ToInt(appOpts.Get(FlagConcurrencyWorkers))
}
// safely default this to the default value if 0
if app.concurrencyWorkers == 0 {
app.concurrencyWorkers = config.DefaultConcurrencyWorkers
}

return app
}

Expand All @@ -307,6 +321,11 @@ func (app *BaseApp) AppVersion() uint64 {
return app.appVersion
}

// ConcurrencyWorkers returns the number of concurrent workers for the BaseApp.
func (app *BaseApp) ConcurrencyWorkers() int {
return app.concurrencyWorkers
}

// Version returns the application's version string.
func (app *BaseApp) Version() string {
return app.version
Expand Down Expand Up @@ -821,6 +840,7 @@ func (app *BaseApp) getContextForTx(mode runTxMode, txBytes []byte) sdk.Context

// cacheTxContext returns a new context based off of the provided context with
// a branched multi-store.
// TODO: (occ) This is an example of where we wrap the multistore with a cache multistore, and then return a modified context using that multistore
func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context, sdk.CacheMultiStore) {
ms := ctx.MultiStore()
// TODO: https://github.com/cosmos/cosmos-sdk/issues/2824
Expand All @@ -847,13 +867,13 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context
// and execute successfully. An error is returned otherwise.
func (app *BaseApp) runTx(ctx sdk.Context, mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, priority int64, err error) {

defer telemetry.MeasureThroughputSinceWithLabels(
telemetry.TxCount,
[]metrics.Label{
telemetry.NewLabel("mode", modeKeyToString[mode]),
},
time.Now(),
)
// defer telemetry.MeasureThroughputSinceWithLabels(
// telemetry.TxCount,
// []metrics.Label{
// telemetry.NewLabel("mode", modeKeyToString[mode]),
// },
// time.Now(),
// )

// Reset events after each checkTx or simulateTx or recheckTx
// DeliverTx is garbage collected after FinalizeBlocker
Expand Down Expand Up @@ -974,6 +994,7 @@ func (app *BaseApp) runTx(ctx sdk.Context, mode runTxMode, txBytes []byte) (gInf
storeAccessOpEvents := msCache.GetEvents()
accessOps := ctx.TxMsgAccessOps()[acltypes.ANTE_MSG_INDEX]

// TODO: (occ) This is an example of where we do our current validation. Note that this validation operates on the declared dependencies for a TX / antehandler + the utilized dependencies, whereas the validation
missingAccessOps := ctx.MsgValidator().ValidateAccessOperations(accessOps, storeAccessOpEvents)
if len(missingAccessOps) != 0 {
for op := range missingAccessOps {
Expand Down Expand Up @@ -1118,6 +1139,8 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, mode runTxMode) (*s
storeAccessOpEvents := msgMsCache.GetEvents()
accessOps := ctx.TxMsgAccessOps()[i]
missingAccessOps := ctx.MsgValidator().ValidateAccessOperations(accessOps, storeAccessOpEvents)
// TODO: (occ) This is where we are currently validating our per message dependencies,
// whereas validation will be done holistically based on the mvkv for OCC approach
if len(missingAccessOps) != 0 {
for op := range missingAccessOps {
ctx.Logger().Info((fmt.Sprintf("eventMsgName=%s Missing Access Operation:%s ", eventMsgName, op.String())))
Expand Down
145 changes: 145 additions & 0 deletions baseapp/deliver_tx_batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package baseapp

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"

"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)

func anteHandler(capKey sdk.StoreKey, storeKey []byte) sdk.AnteHandler {
return func(ctx sdk.Context, tx sdk.Tx, simulate bool) (sdk.Context, error) {
store := ctx.KVStore(capKey)
txTest := tx.(txTest)

if txTest.FailOnAnte {
return ctx, sdkerrors.Wrap(sdkerrors.ErrUnauthorized, "ante handler failure")
}

val := getIntFromStore(store, storeKey)
setIntOnStore(store, storeKey, val+1)

ctx.EventManager().EmitEvents(
counterEvent("ante-val", val+1),
)

return ctx, nil
}
}

func handlerKVStore(capKey sdk.StoreKey) sdk.Handler {
return func(ctx sdk.Context, msg sdk.Msg) (*sdk.Result, error) {
ctx = ctx.WithEventManager(sdk.NewEventManager())
res := &sdk.Result{}

// Extract the unique ID from the message (assuming you have added this)
txIndex := ctx.TxIndex()

// Use the unique ID to get a specific key for this transaction
sharedKey := []byte(fmt.Sprintf("shared"))
txKey := []byte(fmt.Sprintf("tx-%d", txIndex))

// Similar steps as before: Get the store, retrieve a value, increment it, store back, emit an event
// Get the store
store := ctx.KVStore(capKey)

// increment per-tx key (no conflict)
val := getIntFromStore(store, txKey)
setIntOnStore(store, txKey, val+1)

// increment shared key
sharedVal := getIntFromStore(store, sharedKey)
setIntOnStore(store, sharedKey, sharedVal+1)

// Emit an event with the incremented value and the unique ID
ctx.EventManager().EmitEvent(
sdk.NewEvent(sdk.EventTypeMessage,
sdk.NewAttribute("shared-val", fmt.Sprintf("%d", sharedVal+1)),
sdk.NewAttribute("tx-val", fmt.Sprintf("%d", val+1)),
sdk.NewAttribute("tx-id", fmt.Sprintf("%d", txIndex)),
),
)

res.Events = ctx.EventManager().Events().ToABCIEvents()
return res, nil
}
}

func requireAttribute(t *testing.T, evts []abci.Event, name string, val string) {
for _, evt := range evts {
for _, att := range evt.Attributes {
if string(att.Key) == name {
require.Equal(t, val, string(att.Value))
return
}
}
}
require.Fail(t, fmt.Sprintf("attribute %s not found via value %s", name, val))
}

func TestDeliverTxBatch(t *testing.T) {
// test increments in the ante
anteKey := []byte("ante-key")

anteOpt := func(bapp *BaseApp) {
bapp.SetAnteHandler(anteHandler(capKey1, anteKey))
}

// test increments in the handler
routerOpt := func(bapp *BaseApp) {
r := sdk.NewRoute(routeMsgCounter, handlerKVStore(capKey1))
bapp.Router().AddRoute(r)
}

app := setupBaseApp(t, anteOpt, routerOpt)
app.InitChain(context.Background(), &abci.RequestInitChain{})

// Create same codec used in txDecoder
codec := codec.NewLegacyAmino()
registerTestCodec(codec)

nBlocks := 3
txPerHeight := 5

for blockN := 0; blockN < nBlocks; blockN++ {
header := tmproto.Header{Height: int64(blockN) + 1}
app.setDeliverState(header)
app.deliverState.ctx = app.deliverState.ctx.WithBlockGasMeter(sdk.NewInfiniteGasMeter())
app.BeginBlock(app.deliverState.ctx, abci.RequestBeginBlock{Header: header})

var requests []*sdk.DeliverTxEntry
for i := 0; i < txPerHeight; i++ {
counter := int64(blockN*txPerHeight + i)
tx := newTxCounter(counter, counter)

txBytes, err := codec.Marshal(tx)
require.NoError(t, err)
requests = append(requests, &sdk.DeliverTxEntry{
Request: abci.RequestDeliverTx{Tx: txBytes},
})
}

responses := app.DeliverTxBatch(app.deliverState.ctx, sdk.DeliverTxBatchRequest{TxEntries: requests})
require.Len(t, responses.Results, txPerHeight)

for idx, deliverTxRes := range responses.Results {
res := deliverTxRes.Response
require.Equal(t, abci.CodeTypeOK, res.Code)
requireAttribute(t, res.Events, "tx-id", fmt.Sprintf("%d", idx))
requireAttribute(t, res.Events, "tx-val", fmt.Sprintf("%d", blockN+1))
requireAttribute(t, res.Events, "shared-val", fmt.Sprintf("%d", blockN*txPerHeight+idx+1))
}

app.EndBlock(app.deliverState.ctx, abci.RequestEndBlock{})
require.Empty(t, app.deliverState.ctx.MultiStore().GetEvents())
app.SetDeliverStateToCommit()
app.Commit(context.Background())
}
}
11 changes: 11 additions & 0 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func SetSnapshotInterval(interval uint64) func(*BaseApp) {
return func(app *BaseApp) { app.SetSnapshotInterval(interval) }
}

func SetConcurrencyWorkers(workers int) func(*BaseApp) {
return func(app *BaseApp) { app.SetConcurrencyWorkers(workers) }
}

// SetSnapshotKeepRecent sets the recent snapshots to keep.
func SetSnapshotKeepRecent(keepRecent uint32) func(*BaseApp) {
return func(app *BaseApp) { app.SetSnapshotKeepRecent(keepRecent) }
Expand Down Expand Up @@ -295,6 +299,13 @@ func (app *BaseApp) SetSnapshotInterval(snapshotInterval uint64) {
app.snapshotInterval = snapshotInterval
}

func (app *BaseApp) SetConcurrencyWorkers(workers int) {
if app.sealed {
panic("SetConcurrencyWorkers() on sealed BaseApp")
}
app.concurrencyWorkers = workers
}

// SetSnapshotKeepRecent sets the number of recent snapshots to keep.
func (app *BaseApp) SetSnapshotKeepRecent(snapshotKeepRecent uint32) {
if app.sealed {
Expand Down
4 changes: 4 additions & 0 deletions proto/cosmos/accesscontrol/constants.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,11 @@ enum ResourceType {
KV_DEX_SHORT_ORDER_COUNT = 92; // child of KV_DEX

KV_BANK_DEFERRED = 93; // child of KV
reserved 94;
KV_BANK_DEFERRED_MODULE_TX_INDEX = 95; // child of KV_BANK_DEFERRED

KV_DEX_MEM_CONTRACTS_TO_PROCESS = 96; // child of KV_DEX_MEM
KV_DEX_MEM_DOWNSTREAM_CONTRACTS = 97; // child of KV_DEX_MEM
}

enum WasmMessageSubtype {
Expand Down
9 changes: 9 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ const (

// DefaultGRPCWebAddress defines the default address to bind the gRPC-web server to.
DefaultGRPCWebAddress = "0.0.0.0:9091"

// DefaultConcurrencyWorkers defines the default workers to use for concurrent transactions
DefaultConcurrencyWorkers = 10
)

// BaseConfig defines the server's basic configuration
Expand Down Expand Up @@ -88,6 +91,10 @@ type BaseConfig struct {
SeparateOrphanVersionsToKeep int64 `mapstructure:"separate-orphan-versions-to-keep"`
NumOrphanPerFile int `mapstructure:"num-orphan-per-file"`
OrphanDirectory string `mapstructure:"orphan-dir"`

// ConcurrencyWorkers defines the number of workers to use for concurrent
// transaction execution. A value of -1 means unlimited workers. Default value is 10.
ConcurrencyWorkers int `mapstructure:"concurrency-workers"`
}

// APIConfig defines the API listener configuration.
Expand Down Expand Up @@ -236,6 +243,7 @@ func DefaultConfig() *Config {
IAVLDisableFastNode: true,
CompactionInterval: 0,
NoVersioning: false,
ConcurrencyWorkers: DefaultConcurrencyWorkers,
},
Telemetry: telemetry.Config{
Enabled: false,
Expand Down Expand Up @@ -310,6 +318,7 @@ func GetConfig(v *viper.Viper) (Config, error) {
SeparateOrphanVersionsToKeep: v.GetInt64("separate-orphan-versions-to-keep"),
NumOrphanPerFile: v.GetInt("num-orphan-per-file"),
OrphanDirectory: v.GetString("orphan-dir"),
ConcurrencyWorkers: v.GetInt("concurrency-workers"),
},
Telemetry: telemetry.Config{
ServiceName: v.GetString("telemetry.service-name"),
Expand Down
5 changes: 5 additions & 0 deletions server/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,8 @@ func TestSetSnapshotDirectory(t *testing.T) {
cfg := DefaultConfig()
require.Equal(t, "", cfg.StateSync.SnapshotDirectory)
}

func TestSetConcurrencyWorkers(t *testing.T) {
cfg := DefaultConfig()
require.Equal(t, DefaultConcurrencyWorkers, cfg.ConcurrencyWorkers)
}
3 changes: 3 additions & 0 deletions server/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ num-orphan-per-file = {{ .BaseConfig.NumOrphanPerFile }}
# if separate-orphan-storage is true, where to store orphan data
orphan-dir = "{{ .BaseConfig.OrphanDirectory }}"

# concurrency-workers defines how many workers to run for concurrent transaction execution
# concurrency-workers = {{ .BaseConfig.ConcurrencyWorkers }}

###############################################################################
### Telemetry Configuration ###
###############################################################################
Expand Down
Loading
Loading