Skip to content

Commit

Permalink
Introduce migration cancellation (#1169)
Browse files Browse the repository at this point in the history
Changes:
    - Migration accepts context for cancellation
    - Migration schema includes persistent `IntermediateState`

Towards #1163

Co-authored-by: Exca-DK <[email protected]>
  • Loading branch information
Exca-DK and Exca-DK authored Nov 22, 2023
1 parent 59b5d03 commit 160225f
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 60 deletions.
1 change: 1 addition & 0 deletions db/buckets.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
Pending
BlockCommitments
Temporary // used temporarily for migrations
SchemaIntermediateState
)

// Key flattens a prefix and series of byte arrays into a single []byte.
Expand Down
18 changes: 10 additions & 8 deletions migration/bucket_migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package migration

import (
"bytes"
"context"

"github.com/NethermindEth/juno/db"
"github.com/NethermindEth/juno/utils"
Expand Down Expand Up @@ -66,15 +67,16 @@ func (m *BucketMigrator) WithKeyFilter(keyFilter BucketMigratorKeyFilter) *Bucke
return m
}

func (m *BucketMigrator) Before() {
func (m *BucketMigrator) Before(_ []byte) error {
m.before()
return nil
}

func (m *BucketMigrator) Migrate(txn db.Transaction, network utils.Network) error {
func (m *BucketMigrator) Migrate(_ context.Context, txn db.Transaction, network utils.Network) ([]byte, error) {
remainingInBatch := m.batchSize
iterator, err := txn.NewIterator()
if err != nil {
return err
return nil, err
}

for iterator.Seek(m.startFrom); iterator.Valid(); iterator.Next() {
Expand All @@ -84,24 +86,24 @@ func (m *BucketMigrator) Migrate(txn db.Transaction, network utils.Network) erro
}

if pass, err := m.keyFilter(key); err != nil {
return utils.RunAndWrapOnError(iterator.Close, err)
return nil, utils.RunAndWrapOnError(iterator.Close, err)
} else if pass {
if remainingInBatch == 0 {
m.startFrom = key
return utils.RunAndWrapOnError(iterator.Close, ErrCallWithNewTransaction)
return nil, utils.RunAndWrapOnError(iterator.Close, ErrCallWithNewTransaction)
}

remainingInBatch--
value, err := iterator.Value()
if err != nil {
return utils.RunAndWrapOnError(iterator.Close, err)
return nil, utils.RunAndWrapOnError(iterator.Close, err)
}

if err = m.do(txn, key, value, network); err != nil {
return utils.RunAndWrapOnError(iterator.Close, err)
return nil, utils.RunAndWrapOnError(iterator.Close, err)
}
}
}

return iterator.Close()
return nil, iterator.Close()
}
15 changes: 10 additions & 5 deletions migration/bucket_migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package migration_test

import (
"bytes"
"context"
"errors"
"testing"

Expand Down Expand Up @@ -32,17 +33,20 @@ func TestBucketMover(t *testing.T) {
return txn.Set(sourceBucket.Key(), []byte{44})
}))

mover.Before()
require.NoError(t, mover.Before(nil))
require.True(t, beforeCalled)

err := testDB.Update(func(txn db.Transaction) error {
err := mover.Migrate(txn, utils.Mainnet)
var (
intermediateState []byte
err error
)
err = testDB.Update(func(txn db.Transaction) error {
intermediateState, err = mover.Migrate(context.Background(), txn, utils.Mainnet)
require.ErrorIs(t, err, migration.ErrCallWithNewTransaction)
return nil
})
require.NoError(t, err)
err = testDB.Update(func(txn db.Transaction) error {
err = mover.Migrate(txn, utils.Mainnet)
intermediateState, err = mover.Migrate(context.Background(), txn, utils.Mainnet)
require.NoError(t, err)
return nil
})
Expand Down Expand Up @@ -76,4 +80,5 @@ func TestBucketMover(t *testing.T) {
return nil
})
require.NoError(t, err)
require.Nil(t, intermediateState)
}
115 changes: 78 additions & 37 deletions migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package migration

import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
Expand All @@ -16,27 +17,34 @@ import (
"github.com/NethermindEth/juno/encoder"
"github.com/NethermindEth/juno/utils"
"github.com/bits-and-blooms/bitset"
"github.com/fxamacker/cbor/v2"
"github.com/sourcegraph/conc/pool"
)

type schemaMetadata struct {
Version uint64
IntermediateState []byte
}

type Migration interface {
Before()
Migrate(db.Transaction, utils.Network) error
Before(intermediateState []byte) error
// Migration should return intermediate state whenever it requests new txn or detects cancelled ctx.
Migrate(context.Context, db.Transaction, utils.Network) ([]byte, error)
}

type MigrationFunc func(db.Transaction, utils.Network) error

// Migrate returns f(txn).
func (f MigrationFunc) Migrate(txn db.Transaction, network utils.Network) error {
return f(txn, network)
func (f MigrationFunc) Migrate(_ context.Context, txn db.Transaction, network utils.Network) ([]byte, error) {
return nil, f(txn, network)
}

// Before is a no-op.
func (f MigrationFunc) Before() {}
func (f MigrationFunc) Before(_ []byte) error { return nil }

// migrations contains a set of migrations that can be applied to a database.
// defaultMigrations contains a set of migrations that can be applied to a database.
// After making breaking changes to the DB layout, add new migrations to this list.
var migrations = []Migration{
var defaultMigrations = []Migration{
MigrationFunc(migration0000),
MigrationFunc(relocateContractStorageRootKeys),
MigrationFunc(recalculateBloomFilters),
Expand All @@ -56,9 +64,13 @@ var migrations = []Migration{

var ErrCallWithNewTransaction = errors.New("call with new transaction")

func MigrateIfNeeded(targetDB db.DB, network utils.Network, log utils.SimpleLogger) error {
func MigrateIfNeeded(ctx context.Context, targetDB db.DB, network utils.Network, log utils.SimpleLogger) error {
return migrateIfNeeded(ctx, targetDB, network, log, defaultMigrations)
}

func migrateIfNeeded(ctx context.Context, targetDB db.DB, network utils.Network, log utils.SimpleLogger, migrations []Migration) error {
/*
Schema version of the targetDB determines which set of migrations need to be applied to the database.
Schema metadata of the targetDB determines which set of migrations need to be applied to the database.
After a migration is successfully executed, which may update the database, the schema version is incremented
by 1 by this loop.
Expand All @@ -73,58 +85,86 @@ func MigrateIfNeeded(targetDB db.DB, network utils.Network, log utils.SimpleLogg
new ones. It will be able to do this since the schema version it reads from the database will be
non-zero and that is what we use to initialise the i loop variable.
*/
version, err := SchemaVersion(targetDB)
metadata, err := SchemaMetadata(targetDB)
if err != nil {
return err
}

for i := version; i < uint64(len(migrations)); i++ {
for i := metadata.Version; i < uint64(len(migrations)); i++ {
if err := ctx.Err(); err != nil {
return err
}
log.Infow("Applying database migration", "stage", fmt.Sprintf("%d/%d", i+1, len(migrations)))
migration := migrations[i]
migration.Before()
if err := migration.Before(metadata.IntermediateState); err != nil {
return err
}
for {
var migrationErr error
if dbErr := targetDB.Update(func(txn db.Transaction) error {
migrationErr = migration.Migrate(txn, network)
if migrationErr != nil {
if errors.Is(migrationErr, ErrCallWithNewTransaction) {
return nil // Run the migration again with a new transaction.
metadata.IntermediateState, migrationErr = migration.Migrate(ctx, txn, network)
switch {
case migrationErr == nil || errors.Is(migrationErr, ctx.Err()):
if metadata.IntermediateState == nil {
metadata.Version++
}
return updateSchemaMetadata(txn, metadata)
case errors.Is(migrationErr, ErrCallWithNewTransaction):
return nil // Run migration again with new transaction.
default:
return migrationErr
}

// Migration successful. Bump the version.
var versionBytes [8]byte
binary.BigEndian.PutUint64(versionBytes[:], i+1)
return txn.Set(db.SchemaVersion.Key(), versionBytes[:])
}); dbErr != nil {
return dbErr
} else if migrationErr == nil {
break
} else if !errors.Is(migrationErr, ErrCallWithNewTransaction) {
return migrationErr
}
}
}

return nil
}

func SchemaVersion(targetDB db.DB) (uint64, error) {
version := uint64(0)
// SchemaMetadata retrieves metadata about a database schema from the given database.
func SchemaMetadata(targetDB db.DB) (schemaMetadata, error) {
metadata := schemaMetadata{}
txn, err := targetDB.NewTransaction(false)
if err != nil {
return 0, nil
return metadata, err
}
err = txn.Get(db.SchemaVersion.Key(), func(bytes []byte) error {
version = binary.BigEndian.Uint64(bytes)
if err := txn.Get(db.SchemaVersion.Key(), func(b []byte) error {
metadata.Version = binary.BigEndian.Uint64(b)
return nil
})
if err != nil && !errors.Is(err, db.ErrKeyNotFound) {
return 0, utils.RunAndWrapOnError(txn.Discard, err)
}); err != nil && !errors.Is(err, db.ErrKeyNotFound) {
return metadata, utils.RunAndWrapOnError(txn.Discard, err)
}

if err := txn.Get(db.SchemaIntermediateState.Key(), func(b []byte) error {
return cbor.Unmarshal(b, &metadata.IntermediateState)
}); err != nil && !errors.Is(err, db.ErrKeyNotFound) {
return metadata, utils.RunAndWrapOnError(txn.Discard, err)
}

return version, txn.Discard()
return metadata, txn.Discard()
}

// updateSchemaMetadata updates the schema in given database.
func updateSchemaMetadata(txn db.Transaction, schema schemaMetadata) error {
var (
version [8]byte
state []byte
err error
)
binary.BigEndian.PutUint64(version[:], schema.Version)
state, err = cbor.Marshal(schema.IntermediateState)
if err != nil {
return err
}

if err := txn.Set(db.SchemaVersion.Key(), version[:]); err != nil {
return err
}
return txn.Set(db.SchemaIntermediateState.Key(), state)
}

// migration0000 makes sure the targetDB is empty
Expand Down Expand Up @@ -227,7 +267,7 @@ type changeTrieNodeEncoding struct {
}
}

func (m *changeTrieNodeEncoding) Before() {
func (m *changeTrieNodeEncoding) Before(_ []byte) error {
m.trieNodeBuckets = map[db.Bucket]*struct {
seekTo []byte
skipLen int
Expand All @@ -245,6 +285,7 @@ func (m *changeTrieNodeEncoding) Before() {
skipLen: 1 + felt.Bytes,
},
}
return nil
}

type node struct {
Expand Down Expand Up @@ -314,7 +355,7 @@ func (n *node) _UnmarshalBinary(data []byte) error {
return err
}

func (m *changeTrieNodeEncoding) Migrate(txn db.Transaction, _ utils.Network) error {
func (m *changeTrieNodeEncoding) Migrate(_ context.Context, txn db.Transaction, _ utils.Network) ([]byte, error) {
// If we made n a trie.Node, the encoder would fall back to the custom encoding methods.
// We instead define a cutom struct to force the encoder to use the default encoding.
var n node
Expand Down Expand Up @@ -371,15 +412,15 @@ func (m *changeTrieNodeEncoding) Migrate(txn db.Transaction, _ utils.Network) er

iterator, err := txn.NewIterator()
if err != nil {
return err
return nil, err
}

for bucket, info := range m.trieNodeBuckets {
if err := migrateF(iterator, bucket, info.seekTo, info.skipLen); err != nil {
return utils.RunAndWrapOnError(iterator.Close, err)
return nil, utils.RunAndWrapOnError(iterator.Close, err)
}
}
return iterator.Close()
return nil, iterator.Close()
}

// calculateBlockCommitments calculates the txn and event commitments for each block and stores them separately
Expand Down
Loading

0 comments on commit 160225f

Please sign in to comment.