Skip to content

Commit

Permalink
Merge pull request #526 from iotaledger/fix/snapshot-load-genesisslot
Browse files Browse the repository at this point in the history
Fix genesisSlot shenanigans in snapshot export & import
  • Loading branch information
karimodm authored Nov 16, 2023
2 parents d070e81 + afc619a commit ccd9f5d
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 74 deletions.
21 changes: 20 additions & 1 deletion .github/workflows/feature-network-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,25 @@ jobs:
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,mode=max,dest=/tmp/.buildx-cache-new

- uses: actions/setup-go@v4
with:
go-version-file: 'tools/genesis-snapshot/go.mod'
cache: false

- name: Print Go version
run: go version

- name: Generate genesis snapshot
working-directory: tools/genesis-snapshot
run: go run -tags=rocksdb . --config feature --seed 7R1itJx5hVuo9w9hjg5cwKFmek4HMSoBDgJZN8hKGxih --filename genesis-snapshot.bin

- name: Upload snapshot
id: upload-snapshot
run: |
SNAPSHOT_URL=$(curl -T ./tools/genesis-snapshot/genesis-snapshot.bin https://transfer.sh)
echo "Snapshot URL: $SNAPSHOT_URL"
echo "snapshot_url=$SNAPSHOT_URL" >> $GITHUB_OUTPUT
- # Temp fix
# https://github.com/docker/build-push-action/issues/252
# https://github.com/moby/buildkit/issues/1896
Expand All @@ -70,7 +89,7 @@ jobs:
- name: Ansible deploy
env:
CUSTOM_SNAPSHOT_URL: '${{ github.event.inputs.snapshotUrl }}'
DEFAULT_SNAPSHOT_URL: 'https://0x0.st/HywH.bin'
DEFAULT_SNAPSHOT_URL: '${{ steps.upload-snapshot.outputs.snapshot_url }}'
NETWORK_ENVIRONMENT: '${{ secrets.NETWORK_ENVIRONMENT }}'
IOTA_CORE_DOCKER_IMAGE_REPO: 'iotaledger/iota-core'
IOTA_CORE_DOCKER_IMAGE_TAG: 'feature'
Expand Down
6 changes: 4 additions & 2 deletions pkg/protocol/engine/attestation/slotattestation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,10 @@ func (m *Manager) Reset() {
}

func (m *Manager) computeAttestationCommitmentOffset(slot iotago.SlotIndex) (cutoffSlot iotago.SlotIndex, isValid bool) {
if slot < m.apiProvider.APIForSlot(slot).ProtocolParameters().MaxCommittableAge() {
return 0, false
protocolParams := m.apiProvider.APIForSlot(slot).ProtocolParameters()

if slot < protocolParams.GenesisSlot()+protocolParams.MaxCommittableAge() {
return protocolParams.GenesisSlot(), false
}

return slot - m.apiProvider.APIForSlot(slot).ProtocolParameters().MaxCommittableAge(), true
Expand Down
5 changes: 3 additions & 2 deletions pkg/protocol/engine/attestation/slotattestation/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ func (m *Manager) Import(reader io.ReadSeeker) error {

var attestations []*iotago.Attestation
if err := stream.ReadCollection(reader, serializer.SeriLengthPrefixTypeAsUint32, func(i int) error {

attestation, err := stream.ReadObjectWithSize[*iotago.Attestation](reader, serializer.SeriLengthPrefixTypeAsUint16, iotago.AttestationFromBytes(m.apiProvider))
if err != nil {
return ierrors.Wrapf(err, "failed to read attestation %d", i)
Expand Down Expand Up @@ -51,7 +50,9 @@ func (m *Manager) Export(writer io.WriteSeeker, targetSlot iotago.SlotIndex) err
}

if _, isValid := m.computeAttestationCommitmentOffset(targetSlot); !isValid {
if err := stream.Write(writer, uint64(0)); err != nil {
if err := stream.WriteCollection(writer, serializer.SeriLengthPrefixTypeAsUint32, func() (int, error) {
return 0, nil
}); err != nil {
return ierrors.Wrap(err, "failed to write 0 attestation count")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ func (s *Scheduler) Shutdown() {
s.bufferMutex.Lock()
defer s.bufferMutex.Unlock()

s.TriggerShutdown()

// validator workers need to be shut down first, otherwise they will hang on the shutdown channel.
s.validatorBuffer.buffer.ForEach(func(accountID iotago.AccountID, validatorQueue *ValidatorQueue) bool {
s.shutdownValidatorQueue(validatorQueue)
Expand All @@ -152,9 +154,10 @@ func (s *Scheduler) Shutdown() {
s.validatorBuffer.Clear()

close(s.shutdownSignal)
s.TriggerStopped()

s.workersWg.Wait()

s.TriggerStopped()
}

// Start starts the scheduler.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ func New() *Scheduler {
}

func (s *Scheduler) Shutdown() {
s.TriggerShutdown()
s.TriggerStopped()
}

func (s *Scheduler) IsBlockIssuerReady(_ iotago.AccountID, _ ...*blocks.Block) bool {
Expand Down
57 changes: 47 additions & 10 deletions pkg/storage/database/db_instance.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package database

import (
"sync/atomic"

"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore"
"github.com/iotaledger/hive.go/lo"
Expand All @@ -10,6 +12,8 @@ type DBInstance struct {
store *lockedKVStore // KVStore that is used to access the DB instance
healthTracker *kvstore.StoreHealthTracker
dbConfig Config
isClosed atomic.Bool
isShutdown atomic.Bool
}

func NewDBInstance(dbConfig Config) *DBInstance {
Expand All @@ -18,7 +22,13 @@ func NewDBInstance(dbConfig Config) *DBInstance {
panic(err)
}

lockableKVStore := newLockedKVStore(db)
dbInstance := &DBInstance{
dbConfig: dbConfig,
}

lockableKVStore := newLockedKVStore(db, dbInstance)

dbInstance.store = lockableKVStore

// HealthTracker state is only modified while holding the lock on the lockableKVStore;
// that's why it needs to use openableKVStore (which does not lock) instead of lockableKVStore to avoid a deadlock.
Expand All @@ -30,10 +40,23 @@ func NewDBInstance(dbConfig Config) *DBInstance {
panic(err)
}

return &DBInstance{
store: lockableKVStore,
healthTracker: storeHealthTracker,
dbConfig: dbConfig,
dbInstance.healthTracker = storeHealthTracker

return dbInstance
}

func (d *DBInstance) Shutdown() {
d.isShutdown.Store(true)

d.Close()
}

func (d *DBInstance) Flush() {
d.store.Lock()
defer d.store.Unlock()

if !d.isClosed.Load() {
_ = d.store.instance().Flush()
}
}

Expand All @@ -45,20 +68,34 @@ func (d *DBInstance) Close() {
}

func (d *DBInstance) CloseWithoutLocking() {
if err := d.healthTracker.MarkHealthy(); err != nil {
panic(err)
}
if !d.isClosed.Load() {
if err := d.healthTracker.MarkHealthy(); err != nil {
panic(err)
}

if err := FlushAndClose(d.store); err != nil {
panic(err)
if err := FlushAndClose(d.store); err != nil {
panic(err)
}

d.isClosed.Store(true)
}
}

// Open re-opens a closed DBInstance. It must only be called while holding a lock on DBInstance,
// otherwise it might cause a race condition and corruption of node's state.
func (d *DBInstance) Open() {
if !d.isClosed.Load() {
panic("cannot open DBInstance that is not closed")
}

if d.isShutdown.Load() {
panic("cannot open DBInstance that is shutdown")
}

d.store.Replace(lo.PanicOnErr(StoreWithDefaultSettings(d.dbConfig.Directory, false, d.dbConfig.Engine)))

d.isClosed.Store(false)

if err := d.healthTracker.MarkCorrupted(); err != nil {
panic(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/database/lockedkvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ type lockedKVStore struct {
instanceMutex *syncutils.RWMutex
}

func newLockedKVStore(storeInstance kvstore.KVStore) *lockedKVStore {
func newLockedKVStore(storeInstance kvstore.KVStore, dbInstance *DBInstance) *lockedKVStore {
return &lockedKVStore{
openableKVStore: newOpenableKVStore(storeInstance),
openableKVStore: newOpenableKVStore(storeInstance, dbInstance),
instanceMutex: new(syncutils.RWMutex),
}
}
Expand Down
28 changes: 23 additions & 5 deletions pkg/storage/database/openablekvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,38 @@ import (
)

type openableKVStore struct {
dbInstance *DBInstance
storeInstance kvstore.KVStore // KVStore that is used to access the DB instance
parentStore *openableKVStore
dbPrefix kvstore.KeyPrefix
}

func newOpenableKVStore(storeInstance kvstore.KVStore) *openableKVStore {
func newOpenableKVStore(storeInstance kvstore.KVStore, dbInstance *DBInstance) *openableKVStore {
return &openableKVStore{
dbInstance: dbInstance,
storeInstance: storeInstance,
parentStore: nil,
dbPrefix: kvstore.EmptyPrefix,
}
}

func (s *openableKVStore) topParent() *openableKVStore {
current := s
for current.parentStore != nil {
current = current.parentStore
}

return current
}

func (s *openableKVStore) instance() kvstore.KVStore {
if s.storeInstance != nil {
return s.storeInstance
parent := s.topParent()

if parent.dbInstance.isClosed.Load() {
parent.dbInstance.Open()
}

return s.parentStore.instance()
return parent.storeInstance
}

func (s *openableKVStore) Replace(newKVStore kvstore.KVStore) {
Expand All @@ -44,13 +57,16 @@ func (s *openableKVStore) Replace(newKVStore kvstore.KVStore) {
func (s *openableKVStore) WithRealm(realm kvstore.Realm) (kvstore.KVStore, error) {
return s.withRealm(realm)
}

func (s *openableKVStore) withRealm(realm kvstore.Realm) (kvstore.KVStore, error) {
return &openableKVStore{
dbInstance: nil,
storeInstance: nil,
parentStore: s,
dbPrefix: realm,
}, nil
}

func (s *openableKVStore) WithExtendedRealm(realm kvstore.Realm) (kvstore.KVStore, error) {
return s.withRealm(s.buildKeyPrefix(realm))
}
Expand Down Expand Up @@ -98,8 +114,10 @@ func (s *openableKVStore) DeletePrefix(prefix kvstore.KeyPrefix) error {
func (s *openableKVStore) Flush() error {
return s.instance().Flush()
}

func (s *openableKVStore) Close() error {
return s.instance().Close()
s.topParent().dbInstance.CloseWithoutLocking()
return nil
}

func (s *openableKVStore) Batched() (kvstore.BatchedMutations, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/database/utils.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package database

func FlushAndClose(store *lockedKVStore) error {
if err := store.FlushWithoutLocking(); err != nil {
if err := store.instance().Flush(); err != nil {
return err
}

return store.CloseWithoutLocking()
return store.instance().Close()
}
Loading

0 comments on commit ccd9f5d

Please sign in to comment.