Skip to content

Commit

Permalink
Merge branch 'master' into add-ci-submodule-pin-check
Browse files Browse the repository at this point in the history
  • Loading branch information
Tristan-Wilson authored Jul 11, 2024
2 parents a464232 + 6df9580 commit 432eea1
Show file tree
Hide file tree
Showing 18 changed files with 559 additions and 52 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/merge-checks.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: Merge Checks

on:
pull_request:
pull_request_target:
branches: [ master ]
types: [synchronize, opened, reopened, labeled, unlabeled]

Expand Down
48 changes: 37 additions & 11 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ type BatchPosterConfig struct {
UseAccessLists bool `koanf:"use-access-lists" reload:"hot"`
GasEstimateBaseFeeMultipleBips arbmath.Bips `koanf:"gas-estimate-base-fee-multiple-bips"`
Dangerous BatchPosterDangerousConfig `koanf:"dangerous"`
ReorgResistanceMargin time.Duration `koanf:"reorg-resistance-margin" reload:"hot"`

gasRefunder common.Address
l1BlockBound l1BlockBound
Expand Down Expand Up @@ -219,6 +220,7 @@ func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Duration(prefix+".l1-block-bound-bypass", DefaultBatchPosterConfig.L1BlockBoundBypass, "post batches even if not within the layer 1 future bounds if we're within this margin of the max delay")
f.Bool(prefix+".use-access-lists", DefaultBatchPosterConfig.UseAccessLists, "post batches with access lists to reduce gas usage (disabled for L3s)")
f.Uint64(prefix+".gas-estimate-base-fee-multiple-bips", uint64(DefaultBatchPosterConfig.GasEstimateBaseFeeMultipleBips), "for gas estimation, use this multiple of the basefee (measured in basis points) as the max fee per gas")
f.Duration(prefix+".reorg-resistance-margin", DefaultBatchPosterConfig.ReorgResistanceMargin, "do not post batch if its within this duration from layer 1 minimum bounds. Requires l1-block-bound option not be set to \"ignore\"")
redislock.AddConfigOptions(prefix+".redis-lock", f)
dataposter.DataPosterConfigAddOptions(prefix+".data-poster", f, dataposter.DefaultDataPosterConfig)
genericconf.WalletConfigAddOptions(prefix+".parent-chain-wallet", f, DefaultBatchPosterConfig.ParentChainWallet.Pathname)
Expand Down Expand Up @@ -248,6 +250,7 @@ var DefaultBatchPosterConfig = BatchPosterConfig{
UseAccessLists: true,
RedisLock: redislock.DefaultCfg,
GasEstimateBaseFeeMultipleBips: arbmath.OneInBips * 3 / 2,
ReorgResistanceMargin: 10 * time.Minute,
}

var DefaultBatchPosterL1WalletConfig = genericconf.WalletConfig{
Expand Down Expand Up @@ -1136,6 +1139,8 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
var l1BoundMaxTimestamp uint64 = math.MaxUint64
var l1BoundMinBlockNumber uint64
var l1BoundMinTimestamp uint64
var l1BoundMinBlockNumberWithBypass uint64
var l1BoundMinTimestampWithBypass uint64
hasL1Bound := config.l1BlockBound != l1BlockBoundIgnore
if hasL1Bound {
var l1Bound *types.Header
Expand Down Expand Up @@ -1180,17 +1185,19 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
l1BoundMaxBlockNumber = arbmath.SaturatingUAdd(l1BoundBlockNumber, arbmath.BigToUintSaturating(maxTimeVariationFutureBlocks))
l1BoundMaxTimestamp = arbmath.SaturatingUAdd(l1Bound.Time, arbmath.BigToUintSaturating(maxTimeVariationFutureSeconds))

latestHeader, err := b.l1Reader.LastHeader(ctx)
if err != nil {
return false, err
}
latestBlockNumber := arbutil.ParentHeaderToL1BlockNumber(latestHeader)
l1BoundMinBlockNumber = arbmath.SaturatingUSub(latestBlockNumber, arbmath.BigToUintSaturating(maxTimeVariationDelayBlocks))
l1BoundMinTimestamp = arbmath.SaturatingUSub(latestHeader.Time, arbmath.BigToUintSaturating(maxTimeVariationDelaySeconds))

if config.L1BlockBoundBypass > 0 {
latestHeader, err := b.l1Reader.LastHeader(ctx)
if err != nil {
return false, err
}
latestBlockNumber := arbutil.ParentHeaderToL1BlockNumber(latestHeader)
blockNumberWithPadding := arbmath.SaturatingUAdd(latestBlockNumber, uint64(config.L1BlockBoundBypass/ethPosBlockTime))
timestampWithPadding := arbmath.SaturatingUAdd(latestHeader.Time, uint64(config.L1BlockBoundBypass/time.Second))

l1BoundMinBlockNumber = arbmath.SaturatingUSub(blockNumberWithPadding, arbmath.BigToUintSaturating(maxTimeVariationDelayBlocks))
l1BoundMinTimestamp = arbmath.SaturatingUSub(timestampWithPadding, arbmath.BigToUintSaturating(maxTimeVariationDelaySeconds))
l1BoundMinBlockNumberWithBypass = arbmath.SaturatingUSub(blockNumberWithPadding, arbmath.BigToUintSaturating(maxTimeVariationDelayBlocks))
l1BoundMinTimestampWithBypass = arbmath.SaturatingUSub(timestampWithPadding, arbmath.BigToUintSaturating(maxTimeVariationDelaySeconds))
}
}

Expand All @@ -1200,13 +1207,14 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
log.Error("error getting message from streamer", "error", err)
break
}
if msg.Message.Header.BlockNumber < l1BoundMinBlockNumber || msg.Message.Header.Timestamp < l1BoundMinTimestamp {
if msg.Message.Header.BlockNumber < l1BoundMinBlockNumberWithBypass || msg.Message.Header.Timestamp < l1BoundMinTimestampWithBypass {
log.Error(
"disabling L1 bound as batch posting message is close to the maximum delay",
"blockNumber", msg.Message.Header.BlockNumber,
"l1BoundMinBlockNumber", l1BoundMinBlockNumber,
"l1BoundMinBlockNumberWithBypass", l1BoundMinBlockNumberWithBypass,
"timestamp", msg.Message.Header.Timestamp,
"l1BoundMinTimestamp", l1BoundMinTimestamp,
"l1BoundMinTimestampWithBypass", l1BoundMinTimestampWithBypass,
"l1BlockBoundBypass", config.L1BlockBoundBypass,
)
l1BoundMaxBlockNumber = math.MaxUint64
l1BoundMaxTimestamp = math.MaxUint64
Expand Down Expand Up @@ -1242,6 +1250,24 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
b.building.msgCount++
}

if hasL1Bound && config.ReorgResistanceMargin > 0 {
firstMsgBlockNumber := firstMsg.Message.Header.BlockNumber
firstMsgTimeStamp := firstMsg.Message.Header.Timestamp
batchNearL1BoundMinBlockNumber := firstMsgBlockNumber <= arbmath.SaturatingUAdd(l1BoundMinBlockNumber, uint64(config.ReorgResistanceMargin/ethPosBlockTime))
batchNearL1BoundMinTimestamp := firstMsgTimeStamp <= arbmath.SaturatingUAdd(l1BoundMinTimestamp, uint64(config.ReorgResistanceMargin/time.Second))
if batchNearL1BoundMinTimestamp || batchNearL1BoundMinBlockNumber {
log.Error(
"Disabling batch posting due to batch being within reorg resistance margin from layer 1 minimum block or timestamp bounds",
"reorgResistanceMargin", config.ReorgResistanceMargin,
"firstMsgTimeStamp", firstMsgTimeStamp,
"l1BoundMinTimestamp", l1BoundMinTimestamp,
"firstMsgBlockNumber", firstMsgBlockNumber,
"l1BoundMinBlockNumber", l1BoundMinBlockNumber,
)
return false, errors.New("batch is within reorg resistance margin from layer 1 minimum block or timestamp bounds")
}
}

if !forcePostBatch || !b.building.haveUsefulMessage {
// the batch isn't full yet and we've posted a batch recently
// don't post anything for now
Expand Down
2 changes: 1 addition & 1 deletion cmd/nitro/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func isLeveldbNotExistError(err error) bool {

func openInitializeChainDb(ctx context.Context, stack *node.Node, config *NodeConfig, chainId *big.Int, cacheConfig *core.CacheConfig, persistentConfig *conf.PersistentConfig, l1Client arbutil.L1Interface, rollupAddrs chaininfo.RollupAddresses) (ethdb.Database, *core.BlockChain, error) {
if !config.Init.Force {
if readOnlyDb, err := stack.OpenDatabaseWithFreezerWithExtraOptions("l2chaindata", 0, 0, "", "l2chaindata/", true, persistentConfig.Pebble.ExtraOptions("l2chaindata")); err == nil {
if readOnlyDb, err := stack.OpenDatabaseWithFreezerWithExtraOptions("l2chaindata", 0, 0, config.Persistent.Ancient, "l2chaindata/", true, persistentConfig.Pebble.ExtraOptions("l2chaindata")); err == nil {
if chainConfig := gethexec.TryReadStoredChainConfig(readOnlyDb); chainConfig != nil {
readOnlyDb.Close()
if !arbmath.BigEquals(chainConfig.ChainID, chainId) {
Expand Down
5 changes: 0 additions & 5 deletions cmd/nitro/nitro.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,8 +679,6 @@ func mainImpl() int {
exitCode = 1
}
if nodeConfig.Init.ThenQuit {
close(sigint)

return exitCode
}
}
Expand All @@ -694,9 +692,6 @@ func mainImpl() int {
log.Info("shutting down because of sigint")
}

// cause future ctrl+c's to panic
close(sigint)

return exitCode
}

Expand Down
3 changes: 3 additions & 0 deletions das/das.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type DataAvailabilityConfig struct {
LocalFileStorage LocalFileStorageConfig `koanf:"local-file-storage"`
S3Storage S3StorageServiceConfig `koanf:"s3-storage"`

MigrateLocalDBToFileStorage bool `koanf:"migrate-local-db-to-file-storage"`

Key KeyConfig `koanf:"key"`

RPCAggregator AggregatorConfig `koanf:"rpc-aggregator"`
Expand Down Expand Up @@ -112,6 +114,7 @@ func dataAvailabilityConfigAddOptions(prefix string, f *flag.FlagSet, r role) {
LocalDBStorageConfigAddOptions(prefix+".local-db-storage", f)
LocalFileStorageConfigAddOptions(prefix+".local-file-storage", f)
S3ConfigAddOptions(prefix+".s3-storage", f)
f.Bool(prefix+".migrate-local-db-to-file-storage", DefaultDataAvailabilityConfig.MigrateLocalDBToFileStorage, "daserver will migrate all data on startup from local-db-storage to local-file-storage, then mark local-db-storage as unusable")

// Key config for storage
KeyConfigAddOptions(prefix+".key", f)
Expand Down
96 changes: 94 additions & 2 deletions das/db_storage_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"bytes"
"context"
"errors"
"fmt"
"os"
"path/filepath"
"time"

badger "github.com/dgraph-io/badger/v4"
Expand Down Expand Up @@ -35,6 +38,8 @@ type LocalDBStorageConfig struct {

var badgerDefaultOptions = badger.DefaultOptions("")

const migratedMarker = "MIGRATED"

var DefaultLocalDBStorageConfig = LocalDBStorageConfig{
Enable: false,
DataDir: "",
Expand All @@ -49,7 +54,7 @@ var DefaultLocalDBStorageConfig = LocalDBStorageConfig{
}

func LocalDBStorageConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Bool(prefix+".enable", DefaultLocalDBStorageConfig.Enable, "enable storage/retrieval of sequencer batch data from a database on the local filesystem")
f.Bool(prefix+".enable", DefaultLocalDBStorageConfig.Enable, "!!!DEPRECATED, USE local-file-storage!!! enable storage/retrieval of sequencer batch data from a database on the local filesystem")
f.String(prefix+".data-dir", DefaultLocalDBStorageConfig.DataDir, "directory in which to store the database")
f.Bool(prefix+".discard-after-timeout", DefaultLocalDBStorageConfig.DiscardAfterTimeout, "discard data after its expiry timeout")

Expand All @@ -69,7 +74,17 @@ type DBStorageService struct {
stopWaiter stopwaiter.StopWaiterSafe
}

func NewDBStorageService(ctx context.Context, config *LocalDBStorageConfig) (StorageService, error) {
// The DBStorageService is deprecated. This function will migrate data to the target
// LocalFileStorageService if it is provided and migration hasn't already happened.
func NewDBStorageService(ctx context.Context, config *LocalDBStorageConfig, target *LocalFileStorageService) (*DBStorageService, error) {
if alreadyMigrated(config.DataDir) {
log.Warn("local-db-storage already migrated, please remove it from the daserver configuration and restart. data-dir can be cleaned up manually now")
return nil, nil
}
if target == nil {
log.Error("local-db-storage is DEPRECATED, please use use the local-file-storage and migrate-local-db-to-file-storage options. This error will be made fatal in future, continuing for now...")
}

options := badger.DefaultOptions(config.DataDir).
WithNumMemtables(config.NumMemtables).
WithNumLevelZeroTables(config.NumLevelZeroTables).
Expand All @@ -87,9 +102,21 @@ func NewDBStorageService(ctx context.Context, config *LocalDBStorageConfig) (Sto
discardAfterTimeout: config.DiscardAfterTimeout,
dirPath: config.DataDir,
}

if target != nil {
if err = ret.migrateTo(ctx, target); err != nil {
return nil, fmt.Errorf("error migrating local-db-storage to %s: %w", target, err)
}
if err = ret.setMigrated(); err != nil {
return nil, fmt.Errorf("error finalizing migration of local-db-storage to %s: %w", target, err)
}
return nil, nil
}

if err := ret.stopWaiter.Start(ctx, ret); err != nil {
return nil, err
}

err = ret.stopWaiter.LaunchThreadSafe(func(myCtx context.Context) {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
Expand Down Expand Up @@ -152,6 +179,48 @@ func (dbs *DBStorageService) Put(ctx context.Context, data []byte, timeout uint6
})
}

func (dbs *DBStorageService) migrateTo(ctx context.Context, s StorageService) error {
originExpirationPolicy, err := dbs.ExpirationPolicy(ctx)
if err != nil {
return err
}
targetExpirationPolicy, err := s.ExpirationPolicy(ctx)
if err != nil {
return err
}

if originExpirationPolicy == daprovider.KeepForever && targetExpirationPolicy == daprovider.DiscardAfterDataTimeout {
return errors.New("can't migrate from DBStorageService to target, incompatible expiration policies - can't migrate from non-expiring to expiring since non-expiring DB lacks expiry time metadata")
}

return dbs.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
it := txn.NewIterator(opts)
defer it.Close()
log.Info("Migrating from DBStorageService", "target", s)
migrationStart := time.Now()
count := 0
for it.Rewind(); it.Valid(); it.Next() {
if count%1000 == 0 {
log.Info("Migration in progress", "migrated", count)
}
item := it.Item()
k := item.Key()
expiry := item.ExpiresAt()
err := item.Value(func(v []byte) error {
log.Trace("migrated", "key", pretty.FirstFewBytes(k), "value", pretty.FirstFewBytes(v), "expiry", expiry)
return s.Put(ctx, v, expiry)
})
if err != nil {
return err
}
count++
}
log.Info("Migration from DBStorageService complete", "target", s, "migrated", count, "duration", time.Since(migrationStart))
return nil
})
}

func (dbs *DBStorageService) Sync(ctx context.Context) error {
return dbs.db.Sync()
}
Expand All @@ -160,6 +229,29 @@ func (dbs *DBStorageService) Close(ctx context.Context) error {
return dbs.stopWaiter.StopAndWait()
}

func alreadyMigrated(dirPath string) bool {
migratedMarkerFile := filepath.Join(dirPath, migratedMarker)
_, err := os.Stat(migratedMarkerFile)
if os.IsNotExist(err) {
return false
}
if err != nil {
log.Error("error checking if local-db-storage is already migrated", "err", err)
return false
}
return true
}

func (dbs *DBStorageService) setMigrated() error {
migratedMarkerFile := filepath.Join(dbs.dirPath, migratedMarker)
file, err := os.OpenFile(migratedMarkerFile, os.O_CREATE|os.O_WRONLY, 0o600)
if err != nil {
return err
}
file.Close()
return nil
}

func (dbs *DBStorageService) ExpirationPolicy(ctx context.Context) (daprovider.ExpirationPolicy, error) {
if dbs.discardAfterTimeout {
return daprovider.DiscardAfterDataTimeout, nil
Expand Down
36 changes: 25 additions & 11 deletions das/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,36 @@ func CreatePersistentStorageService(
) (StorageService, *LifecycleManager, error) {
storageServices := make([]StorageService, 0, 10)
var lifecycleManager LifecycleManager
if config.LocalDBStorage.Enable {
s, err := NewDBStorageService(ctx, &config.LocalDBStorage)
var err error

var fs *LocalFileStorageService
if config.LocalFileStorage.Enable {
fs, err = NewLocalFileStorageService(config.LocalFileStorage)
if err != nil {
return nil, nil, err
}
lifecycleManager.Register(s)
storageServices = append(storageServices, s)
}

if config.LocalFileStorage.Enable {
s, err := NewLocalFileStorageService(config.LocalFileStorage)
err = fs.start(ctx)
if err != nil {
return nil, nil, err
}
err = s.start(ctx)
lifecycleManager.Register(fs)
storageServices = append(storageServices, fs)
}

if config.LocalDBStorage.Enable {
var s *DBStorageService
if config.MigrateLocalDBToFileStorage {
s, err = NewDBStorageService(ctx, &config.LocalDBStorage, fs)
} else {
s, err = NewDBStorageService(ctx, &config.LocalDBStorage, nil)
}
if err != nil {
return nil, nil, err
}
lifecycleManager.Register(s)
storageServices = append(storageServices, s)
if s != nil {
lifecycleManager.Register(s)
storageServices = append(storageServices, s)
}
}

if config.S3Storage.Enable {
Expand All @@ -67,6 +77,10 @@ func CreatePersistentStorageService(
if len(storageServices) == 1 {
return storageServices[0], &lifecycleManager, nil
}
if len(storageServices) == 0 {
return nil, nil, errors.New("No data-availability storage backend has been configured")
}

return nil, &lifecycleManager, nil
}

Expand Down
Loading

0 comments on commit 432eea1

Please sign in to comment.