Skip to content

Commit

Permalink
[R4R] op-batcher optimism for mainnet upgrade (#179)
Browse files Browse the repository at this point in the history
* disperse eigenda with auth

* verifiy eigenda blob

* fix dockerfile error

* bugfix the blob gas fee cap  <= queued + 100% replacement penalty

* add rpc metrics
  • Loading branch information
adam-xu-mantle authored Aug 5, 2024
1 parent 970813c commit 4044c87
Show file tree
Hide file tree
Showing 105 changed files with 11,961 additions and 115 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/Layr-Labs/eigenda/api v0.6.1
github.com/btcsuite/btcd v0.23.3
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1
github.com/consensys/gnark-crypto v0.10.0
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0
github.com/docker/docker v20.10.24+incompatible
github.com/docker/go-connections v0.4.0
Expand Down Expand Up @@ -75,7 +76,6 @@ require (
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811 // indirect
github.com/cockroachdb/redact v1.1.3 // indirect
github.com/consensys/bavard v0.1.13 // indirect
github.com/consensys/gnark-crypto v0.10.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
Expand Down
5 changes: 3 additions & 2 deletions op-batcher/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=$BUILDPLATFORM golang:1.19.9-alpine3.16 as builder
FROM --platform=$BUILDPLATFORM golang:1.19.9-alpine3.16 AS builder

ARG VERSION=v0.0.0

Expand All @@ -13,7 +13,6 @@ COPY ./op-signer /app/op-signer
COPY ./datalayr /app/datalayr
COPY ./go.mod /app/go.mod
COPY ./go.sum /app/go.sum
COPY ./.git /app/.git
COPY ./bss-core /app/bss-core

COPY ./op-batcher/docker.go.work /go/go.work
Expand All @@ -31,5 +30,7 @@ RUN make op-batcher VERSION="$VERSION" GOOS=$TARGETOS GOARCH=$TARGETARCH
FROM alpine:3.16

COPY --from=builder /app/op-batcher/bin/op-batcher /usr/local/bin
COPY --from=builder /app/op-batcher/resources /resources


ENTRYPOINT ["op-batcher"]
5 changes: 4 additions & 1 deletion op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ type Config struct {
EigenDA eigenda.Config
// Upgrade Da from MantleDA to EigenDA
DaUpgradeChainConfig *upgrade.UpgradeChainConfig
//skip eigenda and submit to ethereum blob transaction
SkipEigenDaRpc bool
}

// Check ensures that the [Config] is valid.
Expand All @@ -93,7 +95,6 @@ func (c *Config) Check() error {
return ErrGraphPollingDurationZero
}
}

return nil
}

Expand Down Expand Up @@ -165,6 +166,7 @@ type CLIConfig struct {

EigenLogConfig logging.Config
EigenDAConfig eigenda.CLIConfig
SkipEigenDaRpc bool
}

func (c CLIConfig) Check() error {
Expand Down Expand Up @@ -211,6 +213,7 @@ func NewConfig(ctx *cli.Context) CLIConfig {
RollupMaxSize: ctx.GlobalUint64(flags.RollUpMaxSizeFlag.Name),
MantleDaNodes: ctx.GlobalInt(flags.MantleDaNodeFlag.Name),
Stopped: ctx.GlobalBool(flags.StoppedFlag.Name),
SkipEigenDaRpc: ctx.GlobalBool(flags.SkipEigenDaRpcFlag.Name),
TxMgrConfig: txmgr.ReadCLIConfig(ctx),
RPCConfig: rpc.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx),
Expand Down
30 changes: 23 additions & 7 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
GraphPollingDuration: cfg.GraphPollingDuration,
RollupMaxSize: cfg.RollupMaxSize,
MantleDaNodes: cfg.MantleDaNodes,
SkipEigenDaRpc: cfg.SkipEigenDaRpc,
Rollup: rcfg,
Channel: ChannelConfig{
SeqWindowSize: rcfg.SeqWindowSize,
Expand All @@ -111,6 +112,19 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri
RPC: cfg.EigenDAConfig.RPC,
StatusQueryTimeout: cfg.EigenDAConfig.StatusQueryTimeout,
StatusQueryRetryInterval: cfg.EigenDAConfig.StatusQueryRetryInterval,
RPCTimeout: cfg.EigenDAConfig.DARPCTimeout,
EnableHsm: cfg.EigenDAConfig.EnableHsm,
HsmCreden: cfg.EigenDAConfig.HsmCreden,
HsmPubkey: cfg.EigenDAConfig.HsmPubkey,
HsmAPIName: cfg.EigenDAConfig.HsmAPIName,
PrivateKey: cfg.EigenDAConfig.PrivateKey,
EthRPC: cfg.EigenDAConfig.EthRPC,
SvcManagerAddr: cfg.EigenDAConfig.SvcManagerAddr,
EthConfirmationDepth: cfg.EigenDAConfig.EthConfirmationDepth,
CacheDir: cfg.EigenDAConfig.CacheDir,
G1Path: cfg.EigenDAConfig.G1Path,
MaxBlobLength: cfg.EigenDAConfig.MaxBlobLength,
G2PowerOfTauPath: cfg.EigenDAConfig.G2PowerOfTauPath,
},
}

Expand Down Expand Up @@ -170,14 +184,16 @@ func NewBatchSubmitter(ctx context.Context, cfg Config, l log.Logger, m metrics.

cfg.metr = m

eigenDA, err := eigenda.NewEigenDAClient(cfg.EigenDA, cfg.log, m)
if err != nil {
return nil, fmt.Errorf("error creating EigenDA client: %w", err)
}

return &BatchSubmitter{
Config: cfg,
txMgr: cfg.TxManager,
state: NewChannelManager(l, m, cfg.Channel, cfg.DaUpgradeChainConfig),
eigenDA: &eigenda.EigenDA{
Config: cfg.EigenDA,
Log: l,
},
Config: cfg,
txMgr: cfg.TxManager,
state: NewChannelManager(l, m, cfg.Channel, cfg.DaUpgradeChainConfig),
eigenDA: eigenDA,
}, nil

}
Expand Down
185 changes: 127 additions & 58 deletions op-batcher/batcher/driver_da.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package batcher
import (
"context"
"errors"
"fmt"
"io"
"math/big"
"sort"
Expand Down Expand Up @@ -35,6 +36,7 @@ const (
DaLoopRetryNum = 10
EigenRPCRetryNum = 3
BytesPerCoefficient = 31
MaxblobNum = 3 // max number of blobs, the bigger the more possible of timeout
)

var ErrInitDataStore = errors.New("init data store transaction failed")
Expand Down Expand Up @@ -211,12 +213,9 @@ func (l *BatchSubmitter) loopEigenDa() (bool, error) {
}

var err error
var candidate *txmgr.TxCandidate
var receipt *types.Receipt
var data, wrappedData []byte
eigendaSuccess := false
var wrappedData []byte

data, err = l.txAggregatorForEigenDa()
daData, err := l.txAggregatorForEigenDa()
if err != nil {
l.log.Error("loopEigenDa txAggregatorForEigenDa err", "err", err)
return false, err
Expand All @@ -228,60 +227,76 @@ func (l *BatchSubmitter) loopEigenDa() (bool, error) {
return false, err
}

for loopRetry := 0; loopRetry < DaLoopRetryNum; loopRetry++ {
err = func() error {
l.metr.RecordRollupRetry(int32(loopRetry))
if !eigendaSuccess {
//try 3 times
for retry := 0; retry < EigenRPCRetryNum; retry++ {
l.metr.RecordDaRetry(int32(retry))
wrappedData, err = l.disperseEigenDaData(data)
if err != nil {
l.log.Warn("loopEigenDa disperseEigenDaData err,need to try again", "retry time", retry, "err", err)
time.Sleep(5 * time.Second)
continue
}

eigendaSuccess = true
break
}
l.log.Info("disperseEigenDaData", "skip da rpc", l.Config.SkipEigenDaRpc)

eigendaSuccess := false
if !l.Config.SkipEigenDaRpc {
timeoutTime := time.Now().Add(l.EigenDA.StatusQueryTimeout)
for retry := 0; retry < EigenRPCRetryNum; retry++ {
l.metr.RecordDaRetry(int32(retry))
wrappedData, err = l.disperseEigenDaData(daData)
if err == nil && len(wrappedData) > 0 {
eigendaSuccess = true
break
}

if eigendaSuccess {
candidate = l.calldataTxCandidate(wrappedData)
} else {
if candidate, err = l.blobTxCandidate(data); err != nil {
l.log.Warn("failed to create blob tx candidate", "err", err)
return err
}
l.metr.RecordEigenDAFailback()
if time.Now().After(timeoutTime) {
l.log.Warn("loopEigenDa disperseEigenDaData timeout", "retry time", retry, "err", err)
break
}

receipt, err = l.txMgr.Send(l.killCtx, *candidate)
if err != nil {
l.log.Warn("loopEigenDa disperseEigenDaData err,need to try again", "retry time", retry, "err", err)
time.Sleep(5 * time.Second)
}
}

var candidates []*txmgr.TxCandidate
if eigendaSuccess {
candidate := l.calldataTxCandidate(wrappedData)
candidates = append(candidates, candidate)
} else {
if blobCandidates, err := l.blobTxCandidates(daData); err != nil {
l.log.Warn("failed to create blob tx candidate", "err", err)
return false, err
} else {
candidates = append(candidates, blobCandidates...)
l.metr.RecordEigenDAFailback(len(blobCandidates))
}
}

var lastReceipt *types.Receipt
var successTxs []ecommon.Hash
for loopRetry := 0; loopRetry < DaLoopRetryNum; loopRetry++ {
l.metr.RecordRollupRetry(int32(loopRetry))
failedIdx := 0
for idx, tx := range candidates {
lastReceipt, err = l.txMgr.Send(l.killCtx, *tx)
if err != nil || lastReceipt.Status == types.ReceiptStatusFailed {
l.log.Warn("failed to send tx candidate", "err", err)
return err
break
}
successTxs = append(successTxs, lastReceipt.TxHash)
failedIdx = idx + 1
}
candidates = candidates[failedIdx:]

return nil
}()
if err != nil {
if len(candidates) > 0 {
l.log.Warn("failed to rollup", "err", err, "retry time", loopRetry)
} else {
l.log.Info("rollup success", "success txs", successTxs)
break
}
}

if err != nil {
if len(candidates) > 0 {
err = fmt.Errorf("failed to rollup %d tx candidates", len(candidates))
l.log.Error("failed to rollup", "err", err)
l.metr.RecordBatchTxConfirmDataFailed()
return false, err
}

err = l.handleConfirmDataStoreReceipt(receipt)
if err != nil {
l.log.Error("failed to handleConfirmDataStoreReceipt", "err", err)
return false, err
}
l.metr.RecordBatchTxConfirmDataSuccess()
l.recordConfirmedEigenDATx(lastReceipt)

//create a new channel now for reducing the disperseEigenDaData latency time
if err = l.state.ensurePendingChannel(currentL1.ID()); err != nil {
Expand Down Expand Up @@ -309,26 +324,74 @@ func minInt(a, b int) int {
return b
}

func (l *BatchSubmitter) blobTxCandidate(data []byte) (*txmgr.TxCandidate, error) {
func (l *BatchSubmitter) blobTxCandidates(data [][]byte) ([]*txmgr.TxCandidate, error) {
l.log.Info("building Blob transaction candidate", "size", len(data))
blobs := []*se.Blob{}
for idx := 0; idx < len(data); idx += se.MaxBlobDataSize {
blobData := data[idx : idx+minInt(len(data)-idx, se.MaxBlobDataSize)]
var blob se.Blob
if err := blob.FromData(blobData); err != nil {
candidates := []*txmgr.TxCandidate{}
dataInTx := [][]byte{}
encodeData := []byte{}

for _, frameData := range data {
dataInTx = append(dataInTx, frameData)
nextEncodeData, err := rlp.EncodeToBytes(dataInTx)
if err != nil {
l.log.Error("op-batcher unable to encode txn", "err", err)
return nil, err
}
blobs = append(blobs, &blob)

if len(nextEncodeData) > se.MaxBlobDataSize*MaxblobNum {
blobs := []*se.Blob{}
for idx := 0; idx < len(encodeData); idx += se.MaxBlobDataSize {
blobData := encodeData[idx : idx+minInt(len(encodeData)-idx, se.MaxBlobDataSize)]
var blob se.Blob
if err := blob.FromData(blobData); err != nil {
return nil, err
}
blobs = append(blobs, &blob)
}
candidates = append(candidates, &txmgr.TxCandidate{
To: &l.Rollup.BatchInboxAddress,
Blobs: blobs,
})
dataInTx = [][]byte{frameData}
encodeData, err = rlp.EncodeToBytes(dataInTx)
if err != nil {
l.log.Error("op-batcher unable to encode txn", "err", err)
return nil, err
}
} else {
dataInTx = append(dataInTx, frameData)
encodeData = nextEncodeData
}

}

return &txmgr.TxCandidate{
To: &l.Rollup.BatchInboxAddress,
Blobs: blobs,
}, nil
if len(dataInTx) > 0 {
blobs := []*se.Blob{}
for idx := 0; idx < len(encodeData); idx += se.MaxBlobDataSize {
blobData := encodeData[idx : idx+minInt(len(encodeData)-idx, se.MaxBlobDataSize)]
var blob se.Blob
if err := blob.FromData(blobData); err != nil {
return nil, err
}
blobs = append(blobs, &blob)
}
candidates = append(candidates, &txmgr.TxCandidate{
To: &l.Rollup.BatchInboxAddress,
Blobs: blobs,
})
}

return candidates, nil
}

func (l *BatchSubmitter) disperseEigenDaData(data []byte) ([]byte, error) {
blobInfo, requestId, err := l.eigenDA.DisperseBlob(l.shutdownCtx, data)
func (l *BatchSubmitter) disperseEigenDaData(data [][]byte) ([]byte, error) {
encodeData, err := rlp.EncodeToBytes(data)
if err != nil {
l.log.Error("op-batcher unable to encode txn", "err", err)
return nil, err
}

blobInfo, requestId, err := l.eigenDA.DisperseBlob(l.shutdownCtx, encodeData)
if err != nil {
l.log.Error("Unable to publish batch frameset to EigenDA", "err", err)
return nil, err
Expand All @@ -346,7 +409,7 @@ func (l *BatchSubmitter) disperseEigenDaData(data []byte) ([]byte, error) {
BlobIndex: blobInfo.BlobVerificationProof.BlobIndex,
ReferenceBlockNumber: blobInfo.BlobVerificationProof.BatchMetadata.BatchHeader.ReferenceBlockNumber,
QuorumIds: quorumIDs,
BlobLength: uint32(len(data)),
BlobLength: uint32(len(encodeData)),
RequestId: requestId,
},
},
Expand Down Expand Up @@ -422,7 +485,7 @@ func (l *BatchSubmitter) txAggregator() ([]byte, error) {
return transactionByte, nil
}

func (l *BatchSubmitter) txAggregatorForEigenDa() ([]byte, error) {
func (l *BatchSubmitter) txAggregatorForEigenDa() ([][]byte, error) {
var txsData [][]byte
var transactionByte []byte
sortTxIds := make([]txID, 0, len(l.state.daPendingTxData))
Expand Down Expand Up @@ -450,7 +513,13 @@ func (l *BatchSubmitter) txAggregatorForEigenDa() ([]byte, error) {
l.state.daUnConfirmedTxID = append(l.state.daUnConfirmedTxID, v)
l.log.Info("added frame to daUnConfirmedTxID", "id", v.String())
}
return transactionByte, nil

if len(txsData) == 0 {
l.log.Error("txsData is empty")
return nil, fmt.Errorf("txsData is empty")
}

return txsData, nil
}

func (l *BatchSubmitter) disperseStoreData(txsData []byte) error {
Expand Down
Loading

0 comments on commit 4044c87

Please sign in to comment.