Skip to content

Commit

Permalink
Address all the remaining review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
karimodm committed Sep 19, 2023
1 parent 3a3b48e commit a4ca65d
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 67 deletions.
3 changes: 1 addition & 2 deletions pkg/protocol/engine/accounts/accountsledger/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,8 @@ func (m *Manager) readSlotDiffs(reader io.ReadSeeker, slotDiffCount uint64) erro
func (m *Manager) writeSlotDiffs(pWriter *utils.PositionedWriter, targetIndex iotago.SlotIndex) (slotDiffsCount uint64, err error) {
// write slot diffs until being able to reach targetIndex, where the exported tree is at
slotIndex := iotago.SlotIndex(1)

// TODO: shouldn't that be from last finalized slot?
maxCommittableAge := m.apiProvider.APIForSlot(targetIndex).ProtocolParameters().MaxCommittableAge()

if targetIndex > maxCommittableAge {
slotIndex = targetIndex - maxCommittableAge
}
Expand Down
29 changes: 3 additions & 26 deletions pkg/protocol/enginemanager/enginemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,32 +197,9 @@ func (e *EngineManager) loadEngineInstanceFromSnapshot(engineAlias string, snaps
e.errorHandler(ierrors.Wrapf(err, "engine (%s)", engineAlias[0:8]))
}

newEngine := engine.New(e.workers.CreateGroup(engineAlias),
errorHandler,
storage.New(e.directory.Path(engineAlias), e.dbVersion, errorHandler, e.storageOptions...),
e.filterProvider,
e.commitmentFilterProvider,
e.blockDAGProvider,
e.bookerProvider,
e.clockProvider,
e.blockGadgetProvider,
e.slotGadgetProvider,
e.sybilProtectionProvider,
e.notarizationProvider,
e.attestationProvider,
e.ledgerProvider,
e.schedulerProvider,
e.tipManagerProvider,
e.tipSelectionProvider,
e.retainerProvider,
e.upgradeOrchestratorProvider,
e.syncManagerProvider,
append(e.engineOptions, engine.WithSnapshotPath(snapshotPath))...,
)
e.engineOptions = append(e.engineOptions, engine.WithSnapshotPath(snapshotPath))

e.engineCreated.Trigger(newEngine)

return newEngine
return e.loadEngineInstanceWithStorage(engineAlias, storage.Create(e.directory.Path(engineAlias), e.dbVersion, errorHandler, e.storageOptions...))
}

func (e *EngineManager) loadEngineInstanceWithStorage(engineAlias string, storage *storage.Storage) *engine.Engine {
Expand Down Expand Up @@ -265,7 +242,7 @@ func (e *EngineManager) ForkEngineAtSlot(index iotago.SlotIndex) (*engine.Engine
}

// Copy raw data on disk.
newStorage, err := storage.CloneStorage(e.activeInstance.Storage, e.directory.Path(engineAlias), e.dbVersion, errorHandler, e.storageOptions...)
newStorage, err := storage.Clone(e.activeInstance.Storage, e.directory.Path(engineAlias), e.dbVersion, errorHandler, e.storageOptions...)
if err != nil {
return nil, ierrors.Wrapf(err, "failed to copy storage from active engine instance (%s) to new engine instance (%s)", e.activeInstance.Storage.Directory(), e.directory.Path(engineAlias))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/protocol/snapshotcreator/snapshotcreator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func CreateSnapshot(opts ...options.Option[Options]) error {

workers := workerpool.NewGroup("CreateSnapshot")
defer workers.Shutdown()
s := storage.New(lo.PanicOnErr(os.MkdirTemp(os.TempDir(), "*")), opt.DataBaseVersion, errorHandler)
s := storage.Create(lo.PanicOnErr(os.MkdirTemp(os.TempDir(), "*")), opt.DataBaseVersion, errorHandler)
defer s.Shutdown()

if err := s.Settings().StoreProtocolParametersForStartEpoch(opt.ProtocolParameters, 0); err != nil {
Expand Down
10 changes: 1 addition & 9 deletions pkg/storage/database/db_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore"
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/hive.go/runtime/syncutils"
)

type DBInstance struct {
Expand All @@ -19,14 +18,7 @@ func NewDBInstance(dbConfig Config) *DBInstance {
panic(err)
}

lockableKVStore := &lockedKVStore{
openableKVStore: &openableKVStore{
storeInstance: db,
parentStore: nil,
dbPrefix: kvstore.EmptyPrefix,
},
instanceMutex: new(syncutils.RWMutex),
}
lockableKVStore := newLockedKVStore(db)

// HealthTracker state is only modified while holding the lock on the lockableKVStore;
// that's why it needs to use openableKVStore (which does not lock) instead of lockableKVStore to avoid a deadlock.
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/database/lockedkvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ type lockedKVStore struct {
instanceMutex *syncutils.RWMutex
}

func newLockedKVStore(storeInstance kvstore.KVStore) *lockedKVStore {
return &lockedKVStore{
openableKVStore: newOpenableKVStore(storeInstance),
instanceMutex: new(syncutils.RWMutex),
}
}

func (s *lockedKVStore) Lock() {
s.instanceMutex.Lock()
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/database/openablekvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ type openableKVStore struct {
dbPrefix kvstore.KeyPrefix
}

func newOpenableKVStore(storeInstance kvstore.KVStore) *openableKVStore {
return &openableKVStore{
storeInstance: storeInstance,
parentStore: nil,
dbPrefix: kvstore.EmptyPrefix,
}
}

func (s *openableKVStore) instance() kvstore.KVStore {
if s.storeInstance != nil {
return s.storeInstance
Expand Down
48 changes: 23 additions & 25 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,32 +64,30 @@ func New(directory string, dbVersion byte, errorHandler func(error), opts ...opt
optsPruningSizeMaxTargetSizeBytes: 30 * 1024 * 1024 * 1024, // 30GB
optsPruningSizeReductionPercentage: 0.1,
optsPruningSizeCooldownTime: 5 * time.Minute,
}, opts,
func(s *Storage) {
dbConfig := database.Config{
Engine: s.optsDBEngine,
Directory: s.dir.PathWithCreate(permanentDirName),
Version: dbVersion,
PrefixHealth: []byte{storePrefixHealth},
}

s.permanent = permanent.New(dbConfig, errorHandler, s.optsPermanent...)
s.prunable = prunable.New(dbConfig.WithDirectory(s.dir.PathWithCreate(prunableDirName)), s.Settings().APIProvider(), s.errorHandler, s.optsBucketManagerOptions...)
})
}, opts)
}

func CloneStorage(source *Storage, directory string, dbVersion byte, errorHandler func(error), opts ...options.Option[Storage]) (*Storage, error) {
s := options.Apply(&Storage{
dir: utils.NewDirectory(directory, true),
errorHandler: errorHandler,
lastPrunedEpoch: model.NewEvictionIndex[iotago.EpochIndex](),
optsDBEngine: hivedb.EngineRocksDB,
optsPruningDelay: 30,
optPruningSizeEnabled: false,
optsPruningSizeMaxTargetSizeBytes: 30 * 1024 * 1024 * 1024, // 30GB
optsPruningSizeReductionPercentage: 0.1,
optsPruningSizeCooldownTime: 5 * time.Minute,
}, opts)
// Create creates a new storage instance with the named database version in the given directory and initializes its permanent
// and prunable counterparts.
func Create(directory string, dbVersion byte, errorHandler func(error), opts ...options.Option[Storage]) *Storage {
s := New(directory, dbVersion, errorHandler, opts...)
dbConfig := database.Config{
Engine: s.optsDBEngine,
Directory: s.dir.PathWithCreate(permanentDirName),
Version: dbVersion,
PrefixHealth: []byte{storePrefixHealth},
}

s.permanent = permanent.New(dbConfig, errorHandler, s.optsPermanent...)
s.prunable = prunable.New(dbConfig.WithDirectory(s.dir.PathWithCreate(prunableDirName)), s.Settings().APIProvider(), s.errorHandler, s.optsBucketManagerOptions...)

return s
}

// Clone creates a new storage instance with the named database version in the given directory and cloning the permannent
// and prunable counterparts from the given source storage.
func Clone(source *Storage, directory string, dbVersion byte, errorHandler func(error), opts ...options.Option[Storage]) (*Storage, error) {
s := New(directory, dbVersion, errorHandler, opts...)

dbConfig := database.Config{
Engine: s.optsDBEngine,
Expand Down Expand Up @@ -142,4 +140,4 @@ func (s *Storage) Shutdown() {
func (s *Storage) Flush() {
s.permanent.Flush()
s.prunable.Flush()
}
}
2 changes: 1 addition & 1 deletion pkg/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func TestStorage_CopyFromForkedStorageEmpty(t *testing.T) {
}
tf1.GeneratePermanentData(1 * MB)

clonedStorage, err := storage.CloneStorage(tf1.Instance, t.TempDir(), 0, func(err error) {
clonedStorage, err := storage.Clone(tf1.Instance, t.TempDir(), 0, func(err error) {
t.Log(err)
})
require.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/testframework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewTestFramework(t *testing.T, baseDir string, storageOpts ...options.Optio
}

storageFactoryFunc := func() *storage.Storage {
instance := storage.New(baseDir, 0, errorHandler, storageOpts...)
instance := storage.Create(baseDir, 0, errorHandler, storageOpts...)
require.NoError(t, instance.Settings().StoreProtocolParametersForStartEpoch(iotago.NewV3ProtocolParameters(), 0))

return instance
Expand Down Expand Up @@ -210,8 +210,8 @@ func (t *TestFramework) AssertPrunedUntil(
expectedDecidedUpgrades *types.Tuple[int, bool],
expectedPoolStats *types.Tuple[int, bool],
expectedCommittee *types.Tuple[int, bool],
expectedRewards *types.Tuple[int, bool]) {

expectedRewards *types.Tuple[int, bool],
) {
t.assertPrunedState(expectedPrunable, t.Instance.LastPrunedEpoch, "prunable")
t.assertPrunedState(expectedPoolStats, t.Instance.PoolStats().LastPrunedEpoch, "pool stats")
t.assertPrunedState(expectedDecidedUpgrades, t.Instance.DecidedUpgradeSignals().LastPrunedEpoch, "decided upgrades")
Expand Down

0 comments on commit a4ca65d

Please sign in to comment.