Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize the mempool 2 #136

Open
wants to merge 17 commits into
base: dev
Choose a base branch
from
6 changes: 3 additions & 3 deletions cmd/rpcdaemon/commands/zkevm_counters.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (zkapi *ZkEvmAPIImpl) EstimateCounters(ctx context.Context, rpcTx *zkevmRPC

smtDepth := smt.GetDepth()

txCounters := vm.NewTransactionCounter(tx, int(smtDepth), uint16(forkId), zkapi.config.Zk.VirtualCountersSmtReduction, false)
txCounters := vm.NewTransactionCounter(tx, int(smtDepth), uint16(forkId), zkapi.config.Zk.VirtualCountersSmtReduction, false, false)
batchCounters := vm.NewBatchCounterCollector(int(smtDepth), uint16(forkId), zkapi.config.Zk.VirtualCountersSmtReduction, false, nil)

_, err = batchCounters.AddNewTransactionCounters(txCounters)
Expand Down Expand Up @@ -369,7 +369,7 @@ func (api *ZkEvmAPIImpl) TraceTransactionCounters(ctx context.Context, hash comm
return err
}

txCounters := vm.NewTransactionCounter(txn, int(smtDepth), uint16(forkId), api.config.Zk.VirtualCountersSmtReduction, false)
txCounters := vm.NewTransactionCounter(txn, int(smtDepth), uint16(forkId), api.config.Zk.VirtualCountersSmtReduction, false, false)
batchCounters := vm.NewBatchCounterCollector(int(smtDepth), uint16(forkId), api.config.Zk.VirtualCountersSmtReduction, false, nil)

if _, err = batchCounters.AddNewTransactionCounters(txCounters); err != nil {
Expand Down Expand Up @@ -551,7 +551,7 @@ func (api *ZkEvmAPIImpl) execTransaction(
msg core.Message
execResult *core.ExecutionResult
)
txCounters := vm.NewTransactionCounter(tx, smtDepth, forkId, api.config.Zk.VirtualCountersSmtReduction, false)
txCounters := vm.NewTransactionCounter(tx, smtDepth, forkId, api.config.Zk.VirtualCountersSmtReduction, false, false)

if _, err = batchCounters.AddNewTransactionCounters(txCounters); err != nil {
return 0, err
Expand Down
29 changes: 20 additions & 9 deletions cmd/txpool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,17 @@ var (
commitEvery time.Duration

// For X Layer
enableWhiteList bool
whiteList []string
blockList []string
freeClaimGasAddrs []string
gasPriceMultiple uint64
enableFreeGasByNonce bool
freeGasExAddrs []string
freeGasCountPerAddr uint64
freeGasLimit uint64
enableWhiteList bool
whiteList []string
blockList []string
freeClaimGasAddrs []string
gasPriceMultiple uint64
enableFreeGasByNonce bool
freeGasExAddrs []string
freeGasCountPerAddr uint64
freeGasLimit uint64
okPayAccountList []string
okPayGasLimitPerBlock uint64
)

func init() {
Expand Down Expand Up @@ -99,6 +101,8 @@ func init() {
rootCmd.Flags().StringSliceVar(&freeGasExAddrs, utils.TxPoolFreeGasExAddrs.Name, ethconfig.DeprecatedDefaultTxPoolConfig.FreeGasExAddrs, utils.TxPoolFreeGasExAddrs.Usage)
rootCmd.PersistentFlags().Uint64Var(&freeGasCountPerAddr, utils.TxPoolFreeGasCountPerAddr.Name, ethconfig.DeprecatedDefaultTxPoolConfig.FreeGasCountPerAddr, utils.TxPoolFreeGasCountPerAddr.Usage)
rootCmd.PersistentFlags().Uint64Var(&freeGasLimit, utils.TxPoolFreeGasLimit.Name, ethconfig.DeprecatedDefaultTxPoolConfig.FreeGasLimit, utils.TxPoolFreeGasLimit.Usage)
rootCmd.Flags().Uint64Var(&okPayGasLimitPerBlock, utils.TxPoolOkPayGasLimitPerBlock.Name, 0, utils.TxPoolOkPayGasLimitPerBlock.Usage)
rootCmd.Flags().StringSliceVar(&okPayAccountList, utils.TxPoolOkPayAccountList.Name, []string{}, utils.TxPoolOkPayAccountList.Usage)
}

var rootCmd = &cobra.Command{
Expand Down Expand Up @@ -204,6 +208,13 @@ func doTxpool(ctx context.Context) error {
ethCfg.DeprecatedTxPool.FreeGasCountPerAddr = freeGasCountPerAddr
ethCfg.DeprecatedTxPool.FreeGasLimit = freeGasLimit

ethCfg.DeprecatedTxPool.OkPayAccountList = make([]string, len(okPayAccountList))
for i, addrHex := range okPayAccountList {
addr := common.HexToAddress(addrHex)
ethCfg.DeprecatedTxPool.OkPayAccountList[i] = addr.String()
}
ethCfg.DeprecatedTxPool.OkPayGasLimitPerBlock = okPayGasLimitPerBlock

newTxs := make(chan types.Announcements, 1024)
defer close(newTxs)
txPoolDB, txPool, fetch, send, txpoolGrpcServer, err := txpooluitl.AllComponents(ctx, cfg, ethCfg,
Expand Down
22 changes: 22 additions & 0 deletions cmd/utils/flags_xlayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ var (
Name: "txpool.freegaslimit",
Usage: "FreeGasLimit is the max gas allowed use to do a free gas tx",
}
TxPoolOkPayAccountList = cli.StringFlag{
Name: "txpool.okpay-account-list",
Usage: "Comma separated list of addresses, who send ok pay tx",
Value: "",
}
TxPoolOkPayGasLimitPerBlock = cli.StringFlag{
Name: "txpool.okpay-gaslimit-per-block",
Usage: "the block max gas limit for ok pay tx",
Value: "",
}
// Gas Pricer
GpoTypeFlag = cli.StringFlag{
Name: "gpo.type",
Expand Down Expand Up @@ -308,6 +318,18 @@ func setTxPoolXLayer(ctx *cli.Context, cfg *ethconfig.DeprecatedTxPoolConfig) {
if ctx.IsSet(TxPoolFreeGasLimit.Name) {
cfg.FreeGasLimit = ctx.Uint64(TxPoolFreeGasLimit.Name)
}
if ctx.IsSet(TxPoolOkPayAccountList.Name) {
// Parse the command separated flag
addrHexes := SplitAndTrim(ctx.String(TxPoolOkPayAccountList.Name))
cfg.OkPayAccountList = make([]string, len(addrHexes))
for i, senderHex := range addrHexes {
sender := libcommon.HexToAddress(senderHex)
cfg.OkPayAccountList[i] = sender.String()
}
}
if ctx.IsSet(TxPoolOkPayGasLimitPerBlock.Name) {
cfg.OkPayGasLimitPerBlock = ctx.Uint64(TxPoolOkPayGasLimitPerBlock.Name)
}
}

// SetApolloGPOXLayer is a public wrapper function to internally call setGPO
Expand Down
51 changes: 51 additions & 0 deletions core/vm/zk_batch_counters.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ type BatchCounterCollector struct {
rlpCombinedCountersCache Counters
executionCombinedCountersCache Counters
processingCombinedCountersCache Counters

// For X Layer
okPayRlpCombinedCounters Counters
okPayExecutionCombinedCounters Counters
okPayProcessingCombinedCounters Counters
}

func NewBatchCounterCollector(smtMaxLevel int, forkId uint16, mcpReduction float64, unlimitedCounters bool, addonCounters *Counters) *BatchCounterCollector {
Expand All @@ -45,6 +50,10 @@ func NewBatchCounterCollector(smtMaxLevel int, forkId uint16, mcpReduction float
bcc.executionCombinedCounters = bcc.NewCounters()
bcc.processingCombinedCounters = bcc.NewCounters()

bcc.okPayRlpCombinedCounters = bcc.NewCounters()
bcc.okPayExecutionCombinedCounters = bcc.NewCounters()
bcc.okPayProcessingCombinedCounters = bcc.NewCounters()

return &bcc
}

Expand Down Expand Up @@ -73,6 +82,10 @@ func (bcc *BatchCounterCollector) Clone() *BatchCounterCollector {
rlpCombinedCounters: bcc.rlpCombinedCounters.Clone(),
executionCombinedCounters: bcc.executionCombinedCounters.Clone(),
processingCombinedCounters: bcc.processingCombinedCounters.Clone(),

okPayRlpCombinedCounters: bcc.okPayRlpCombinedCounters.Clone(),
okPayExecutionCombinedCounters: bcc.okPayExecutionCombinedCounters.Clone(),
okPayProcessingCombinedCounters: bcc.okPayProcessingCombinedCounters.Clone(),
}
}

Expand Down Expand Up @@ -164,6 +177,35 @@ func (bcc *BatchCounterCollector) CheckForOverflow(verifyMerkleProof bool) (bool
return overflow, nil
}

// CheckOkPayForOverflow returns true in the case that any counter has less than 0 remaining
ylsGit marked this conversation as resolved.
Show resolved Hide resolved
func (bcc *BatchCounterCollector) CheckOkPayForOverflow() (bool, error) {
combined := bcc.NewCounters()
for k, _ := range combined {
val := bcc.rlpCombinedCounters[k].used + bcc.executionCombinedCounters[k].used + bcc.processingCombinedCounters[k].used
combined[k].used += val
combined[k].remaining -= val
}

overflow := false
for _, v := range combined {
if v.initialAmount*30/100 < v.used {
log.Info("[VCOUNTER] OkPay Counter overflow detected", "counter", v.name, "remaining", v.remaining, "used", v.used)
overflow = true
}
}

// if we have an overflow we want to log the counters for debugging purposes
if overflow {
logText := "[VCOUNTER] Counters stats"
for _, v := range combined {
logText += fmt.Sprintf(" %s: initial: %v used: %v (remaining: %v)", v.name, v.initialAmount, v.used, v.remaining)
}
log.Info(logText)
}

return overflow, nil
}

func (bcc *BatchCounterCollector) NewCounters() Counters {
var combined Counters
if bcc.unlimitedCounters {
Expand Down Expand Up @@ -274,15 +316,24 @@ func (bcc *BatchCounterCollector) CombineCollectorsNoChanges() Counters {
func (bcc *BatchCounterCollector) UpdateRlpCountersCache(txCounters *TransactionCounter) {
for k, v := range txCounters.rlpCounters.counters {
bcc.rlpCombinedCounters[k].used += v.used
if txCounters.isOkPayTx {
bcc.okPayRlpCombinedCounters[k].used += v.used
}
}
}

func (bcc *BatchCounterCollector) UpdateExecutionAndProcessingCountersCache(txCounters *TransactionCounter) {
for k, v := range txCounters.executionCounters.counters {
bcc.executionCombinedCounters[k].used += v.used
if txCounters.isOkPayTx {
bcc.okPayExecutionCombinedCounters[k].used += v.used
}
}

for k, v := range txCounters.processingCounters.counters {
bcc.processingCombinedCounters[k].used += v.used
if txCounters.isOkPayTx {
bcc.okPayProcessingCombinedCounters[k].used += v.used
}
}
}
6 changes: 5 additions & 1 deletion core/vm/zk_transaction_counters.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ type TransactionCounter struct {
smtLevels int
forkId uint16
l2DataCache []byte

isOkPayTx bool // For X Layer
}

func NewTransactionCounter(transaction types.Transaction, smtMaxLevel int, forkId uint16, mcpReduction float64, shouldCountersBeUnlimited bool) *TransactionCounter {
func NewTransactionCounter(transaction types.Transaction, smtMaxLevel int, forkId uint16, mcpReduction float64, shouldCountersBeUnlimited bool, isOkPayTx bool) *TransactionCounter {
totalLevel := calculateSmtLevels(smtMaxLevel, 32, mcpReduction)

var tc *TransactionCounter
Expand All @@ -33,6 +35,7 @@ func NewTransactionCounter(transaction types.Transaction, smtMaxLevel int, forkI
processingCounters: NewUnlimitedCounterCollector(),
smtLevels: 1, // max depth of the tree anyways
forkId: forkId,
isOkPayTx: isOkPayTx,
}
} else {
tc = &TransactionCounter{
Expand All @@ -42,6 +45,7 @@ func NewTransactionCounter(transaction types.Transaction, smtMaxLevel int, forkI
processingCounters: NewCounterCollector(totalLevel, forkId),
smtLevels: totalLevel,
forkId: forkId,
isOkPayTx: isOkPayTx,
}
}
tc.executionCounters.SetTransaction(transaction)
Expand Down
4 changes: 4 additions & 0 deletions eth/ethconfig/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ type DeprecatedTxPoolConfig struct {
FreeGasCountPerAddr uint64
// FreeGasLimit is the max gas allowed use to do a free gas tx
FreeGasLimit uint64
// okPayAccountList is the ok pay bundler accounts address
OkPayAccountList []string
// OkPayGasLimitPerBlock is the block max gas limit for ok pay tx
OkPayGasLimitPerBlock uint64
}

// DeprecatedDefaultTxPoolConfig contains the default configurations for the transaction
Expand Down
2 changes: 1 addition & 1 deletion eth/stagedsync/stage_mining_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func getNextTransactions(
counter := 0
for !onTime && counter < 1000 {
remainingGas := header.GasLimit - header.GasUsed
if onTime, count, err = cfg.txPool2.YieldBest(amount, &txSlots, poolTx, executionAt, remainingGas, alreadyYielded); err != nil {
if onTime, count, err = cfg.txPool2.YieldBest(amount, &txSlots, poolTx, executionAt, remainingGas, alreadyYielded, false); err != nil {
return err
}
time.Sleep(1 * time.Millisecond)
Expand Down
2 changes: 2 additions & 0 deletions turbo/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,6 @@ var DefaultFlags = []cli.Flag{
&utils.TxPoolFreeGasCountPerAddr,
&utils.TxPoolFreeGasExAddrs,
&utils.TxPoolFreeGasLimit,
&utils.TxPoolOkPayAccountList,
&utils.TxPoolOkPayGasLimitPerBlock,
}
2 changes: 1 addition & 1 deletion turbo/transactions/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func NewReusableCaller(
)

batchCounters := vm.NewBatchCounterCollector(smtDepth, uint16(forkId), VirtualCountersSmtReduction, false, nil)
txCounters := vm.NewTransactionCounter(transaction, smtDepth, uint16(forkId), VirtualCountersSmtReduction, false)
txCounters := vm.NewTransactionCounter(transaction, smtDepth, uint16(forkId), VirtualCountersSmtReduction, false, false)

_, err = batchCounters.AddNewTransactionCounters(txCounters)
if err != nil {
Expand Down
11 changes: 9 additions & 2 deletions zk/stages/stage_sequence_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ func SpawnSequencingStage(
log.Info(fmt.Sprintf("[%s] Waiting for txs from the pool...", logPrefix))
}

okPayPriority := true // For X Layer

LOOP_TRANSACTIONS:
for {
select {
Expand All @@ -217,7 +219,7 @@ func SpawnSequencingStage(
return err
}
} else if !batchState.isL1Recovery() {
batchState.blockState.transactionsForInclusion, err = getNextPoolTransactions(ctx, cfg, executionAt, batchState.forkId, batchState.yieldedTransactions)
batchState.blockState.transactionsForInclusion, err = getNextPoolTransactions(ctx, cfg, executionAt, batchState.forkId, batchState.yieldedTransactions, okPayPriority)
if err != nil {
return err
}
Expand All @@ -235,7 +237,7 @@ func SpawnSequencingStage(

// The copying of this structure is intentional
backupDataSizeChecker := *blockDataSizeChecker
receipt, execResult, anyOverflow, err := attemptAddTransaction(cfg, sdb, ibs, batchCounters, &blockContext, header, transaction, effectiveGas, batchState.isL1Recovery(), batchState.forkId, l1InfoIndex, &backupDataSizeChecker)
receipt, execResult, anyOverflow, okPayOverflow, err := attemptAddTransaction(cfg, sdb, ibs, batchCounters, &blockContext, header, transaction, effectiveGas, batchState.isL1Recovery(), batchState.forkId, l1InfoIndex, &backupDataSizeChecker, okPayPriority)
if err != nil {
if batchState.isLimboRecovery() {
panic("limbo transaction has already been executed once so they must not fail while re-executing")
Expand Down Expand Up @@ -284,6 +286,11 @@ func SpawnSequencingStage(

}

if okPayOverflow {
okPayPriority = false
continue
}

if err == nil {
blockDataSizeChecker = &backupDataSizeChecker
batchState.onAddedTransaction(transaction, receipt, execResult, effectiveGas)
Expand Down
2 changes: 1 addition & 1 deletion zk/stages/stage_sequence_execute_injected_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func handleInjectedBatch(

// process the tx and we can ignore the counters as an overflow at this stage means no network anyway
effectiveGas := DeriveEffectiveGasPrice(*batchContext.cfg, decodedBlocks[0].Transactions[0])
receipt, execResult, _, err := attemptAddTransaction(*batchContext.cfg, batchContext.sdb, ibs, batchCounters, blockContext, header, decodedBlocks[0].Transactions[0], effectiveGas, false, forkId, 0 /* use 0 for l1InfoIndex in injected batch */, nil)
receipt, execResult, _, _, err := attemptAddTransaction(*batchContext.cfg, batchContext.sdb, ibs, batchCounters, blockContext, header, decodedBlocks[0].Transactions[0], effectiveGas, false, forkId, 0 /* use 0 for l1InfoIndex in injected batch */, nil, false)
if err != nil {
return nil, nil, nil, 0, err
}
Expand Down
Loading
Loading