diff --git a/pkg/protocol/engine/accounts/accountsledger/snapshot.go b/pkg/protocol/engine/accounts/accountsledger/snapshot.go index d5d842740..fe3063e49 100644 --- a/pkg/protocol/engine/accounts/accountsledger/snapshot.go +++ b/pkg/protocol/engine/accounts/accountsledger/snapshot.go @@ -215,9 +215,8 @@ func (m *Manager) readSlotDiffs(reader io.ReadSeeker, slotDiffCount uint64) erro func (m *Manager) writeSlotDiffs(pWriter *utils.PositionedWriter, targetIndex iotago.SlotIndex) (slotDiffsCount uint64, err error) { // write slot diffs until being able to reach targetIndex, where the exported tree is at slotIndex := iotago.SlotIndex(1) - - // TODO: shouldn't that be from last finalized slot? maxCommittableAge := m.apiProvider.APIForSlot(targetIndex).ProtocolParameters().MaxCommittableAge() + if targetIndex > maxCommittableAge { slotIndex = targetIndex - maxCommittableAge } diff --git a/pkg/protocol/enginemanager/enginemanager.go b/pkg/protocol/enginemanager/enginemanager.go index fdb4ea57b..96da59c00 100644 --- a/pkg/protocol/enginemanager/enginemanager.go +++ b/pkg/protocol/enginemanager/enginemanager.go @@ -197,32 +197,9 @@ func (e *EngineManager) loadEngineInstanceFromSnapshot(engineAlias string, snaps e.errorHandler(ierrors.Wrapf(err, "engine (%s)", engineAlias[0:8])) } - newEngine := engine.New(e.workers.CreateGroup(engineAlias), - errorHandler, - storage.New(e.directory.Path(engineAlias), e.dbVersion, errorHandler, e.storageOptions...), - e.filterProvider, - e.commitmentFilterProvider, - e.blockDAGProvider, - e.bookerProvider, - e.clockProvider, - e.blockGadgetProvider, - e.slotGadgetProvider, - e.sybilProtectionProvider, - e.notarizationProvider, - e.attestationProvider, - e.ledgerProvider, - e.schedulerProvider, - e.tipManagerProvider, - e.tipSelectionProvider, - e.retainerProvider, - e.upgradeOrchestratorProvider, - e.syncManagerProvider, - append(e.engineOptions, engine.WithSnapshotPath(snapshotPath))..., - ) + e.engineOptions = append(e.engineOptions, engine.WithSnapshotPath(snapshotPath)) - e.engineCreated.Trigger(newEngine) - - return newEngine + return e.loadEngineInstanceWithStorage(engineAlias, storage.Create(e.directory.Path(engineAlias), e.dbVersion, errorHandler, e.storageOptions...)) } func (e *EngineManager) loadEngineInstanceWithStorage(engineAlias string, storage *storage.Storage) *engine.Engine { @@ -265,7 +242,7 @@ func (e *EngineManager) ForkEngineAtSlot(index iotago.SlotIndex) (*engine.Engine } // Copy raw data on disk. - newStorage, err := storage.CloneStorage(e.activeInstance.Storage, e.directory.Path(engineAlias), e.dbVersion, errorHandler, e.storageOptions...) + newStorage, err := storage.Clone(e.activeInstance.Storage, e.directory.Path(engineAlias), e.dbVersion, errorHandler, e.storageOptions...) if err != nil { return nil, ierrors.Wrapf(err, "failed to copy storage from active engine instance (%s) to new engine instance (%s)", e.activeInstance.Storage.Directory(), e.directory.Path(engineAlias)) } diff --git a/pkg/protocol/snapshotcreator/snapshotcreator.go b/pkg/protocol/snapshotcreator/snapshotcreator.go index e12ce74cd..a14add174 100644 --- a/pkg/protocol/snapshotcreator/snapshotcreator.go +++ b/pkg/protocol/snapshotcreator/snapshotcreator.go @@ -54,7 +54,7 @@ func CreateSnapshot(opts ...options.Option[Options]) error { workers := workerpool.NewGroup("CreateSnapshot") defer workers.Shutdown() - s := storage.New(lo.PanicOnErr(os.MkdirTemp(os.TempDir(), "*")), opt.DataBaseVersion, errorHandler) + s := storage.Create(lo.PanicOnErr(os.MkdirTemp(os.TempDir(), "*")), opt.DataBaseVersion, errorHandler) defer s.Shutdown() if err := s.Settings().StoreProtocolParametersForStartEpoch(opt.ProtocolParameters, 0); err != nil { diff --git a/pkg/storage/database/db_instance.go b/pkg/storage/database/db_instance.go index d072cdf0a..14cdabeaf 100644 --- a/pkg/storage/database/db_instance.go +++ b/pkg/storage/database/db_instance.go @@ -4,7 +4,6 @@ import ( "github.com/iotaledger/hive.go/ierrors" "github.com/iotaledger/hive.go/kvstore" "github.com/iotaledger/hive.go/lo" - "github.com/iotaledger/hive.go/runtime/syncutils" ) type DBInstance struct { @@ -19,14 +18,7 @@ func NewDBInstance(dbConfig Config) *DBInstance { panic(err) } - lockableKVStore := &lockedKVStore{ - openableKVStore: &openableKVStore{ - storeInstance: db, - parentStore: nil, - dbPrefix: kvstore.EmptyPrefix, - }, - instanceMutex: new(syncutils.RWMutex), - } + lockableKVStore := newLockedKVStore(db) // HealthTracker state is only modified while holding the lock on the lockableKVStore; // that's why it needs to use openableKVStore (which does not lock) instead of lockableKVStore to avoid a deadlock. diff --git a/pkg/storage/database/lockedkvstore.go b/pkg/storage/database/lockedkvstore.go index 3fa4bf1aa..f72558bc5 100644 --- a/pkg/storage/database/lockedkvstore.go +++ b/pkg/storage/database/lockedkvstore.go @@ -14,6 +14,13 @@ type lockedKVStore struct { instanceMutex *syncutils.RWMutex } +func newLockedKVStore(storeInstance kvstore.KVStore) *lockedKVStore { + return &lockedKVStore{ + openableKVStore: newOpenableKVStore(storeInstance), + instanceMutex: new(syncutils.RWMutex), + } +} + func (s *lockedKVStore) Lock() { s.instanceMutex.Lock() } diff --git a/pkg/storage/database/openablekvstore.go b/pkg/storage/database/openablekvstore.go index 953ac0188..9ff04df3a 100644 --- a/pkg/storage/database/openablekvstore.go +++ b/pkg/storage/database/openablekvstore.go @@ -15,6 +15,14 @@ type openableKVStore struct { dbPrefix kvstore.KeyPrefix } +func newOpenableKVStore(storeInstance kvstore.KVStore) *openableKVStore { + return &openableKVStore{ + storeInstance: storeInstance, + parentStore: nil, + dbPrefix: kvstore.EmptyPrefix, + } +} + func (s *openableKVStore) instance() kvstore.KVStore { if s.storeInstance != nil { return s.storeInstance diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index c20cc7a10..f8b3dec60 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -64,32 +64,30 @@ func New(directory string, dbVersion byte, errorHandler func(error), opts ...opt optsPruningSizeMaxTargetSizeBytes: 30 * 1024 * 1024 * 1024, // 30GB optsPruningSizeReductionPercentage: 0.1, optsPruningSizeCooldownTime: 5 * time.Minute, - }, opts, - func(s *Storage) { - dbConfig := database.Config{ - Engine: s.optsDBEngine, - Directory: s.dir.PathWithCreate(permanentDirName), - Version: dbVersion, - PrefixHealth: []byte{storePrefixHealth}, - } - - s.permanent = permanent.New(dbConfig, errorHandler, s.optsPermanent...) - s.prunable = prunable.New(dbConfig.WithDirectory(s.dir.PathWithCreate(prunableDirName)), s.Settings().APIProvider(), s.errorHandler, s.optsBucketManagerOptions...) - }) + }, opts) } -func CloneStorage(source *Storage, directory string, dbVersion byte, errorHandler func(error), opts ...options.Option[Storage]) (*Storage, error) { - s := options.Apply(&Storage{ - dir: utils.NewDirectory(directory, true), - errorHandler: errorHandler, - lastPrunedEpoch: model.NewEvictionIndex[iotago.EpochIndex](), - optsDBEngine: hivedb.EngineRocksDB, - optsPruningDelay: 30, - optPruningSizeEnabled: false, - optsPruningSizeMaxTargetSizeBytes: 30 * 1024 * 1024 * 1024, // 30GB - optsPruningSizeReductionPercentage: 0.1, - optsPruningSizeCooldownTime: 5 * time.Minute, - }, opts) +// Create creates a new storage instance with the named database version in the given directory and initializes its permanent +// and prunable counterparts. +func Create(directory string, dbVersion byte, errorHandler func(error), opts ...options.Option[Storage]) *Storage { + s := New(directory, dbVersion, errorHandler, opts...) + dbConfig := database.Config{ + Engine: s.optsDBEngine, + Directory: s.dir.PathWithCreate(permanentDirName), + Version: dbVersion, + PrefixHealth: []byte{storePrefixHealth}, + } + + s.permanent = permanent.New(dbConfig, errorHandler, s.optsPermanent...) + s.prunable = prunable.New(dbConfig.WithDirectory(s.dir.PathWithCreate(prunableDirName)), s.Settings().APIProvider(), s.errorHandler, s.optsBucketManagerOptions...) + + return s +} + +// Clone creates a new storage instance with the named database version in the given directory and cloning the permannent +// and prunable counterparts from the given source storage. +func Clone(source *Storage, directory string, dbVersion byte, errorHandler func(error), opts ...options.Option[Storage]) (*Storage, error) { + s := New(directory, dbVersion, errorHandler, opts...) dbConfig := database.Config{ Engine: s.optsDBEngine, @@ -142,4 +140,4 @@ func (s *Storage) Shutdown() { func (s *Storage) Flush() { s.permanent.Flush() s.prunable.Flush() -} \ No newline at end of file +} diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index b050cd7f0..a6e4623dd 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -222,7 +222,7 @@ func TestStorage_CopyFromForkedStorageEmpty(t *testing.T) { } tf1.GeneratePermanentData(1 * MB) - clonedStorage, err := storage.CloneStorage(tf1.Instance, t.TempDir(), 0, func(err error) { + clonedStorage, err := storage.Clone(tf1.Instance, t.TempDir(), 0, func(err error) { t.Log(err) }) require.NoError(t, err) diff --git a/pkg/storage/testframework_test.go b/pkg/storage/testframework_test.go index bfba64a4c..7a75c2407 100644 --- a/pkg/storage/testframework_test.go +++ b/pkg/storage/testframework_test.go @@ -48,7 +48,7 @@ func NewTestFramework(t *testing.T, baseDir string, storageOpts ...options.Optio } storageFactoryFunc := func() *storage.Storage { - instance := storage.New(baseDir, 0, errorHandler, storageOpts...) + instance := storage.Create(baseDir, 0, errorHandler, storageOpts...) require.NoError(t, instance.Settings().StoreProtocolParametersForStartEpoch(iotago.NewV3ProtocolParameters(), 0)) return instance @@ -210,8 +210,8 @@ func (t *TestFramework) AssertPrunedUntil( expectedDecidedUpgrades *types.Tuple[int, bool], expectedPoolStats *types.Tuple[int, bool], expectedCommittee *types.Tuple[int, bool], - expectedRewards *types.Tuple[int, bool]) { - + expectedRewards *types.Tuple[int, bool], +) { t.assertPrunedState(expectedPrunable, t.Instance.LastPrunedEpoch, "prunable") t.assertPrunedState(expectedPoolStats, t.Instance.PoolStats().LastPrunedEpoch, "pool stats") t.assertPrunedState(expectedDecidedUpgrades, t.Instance.DecidedUpgradeSignals().LastPrunedEpoch, "decided upgrades")