Skip to content

Commit

Permalink
Adjust new stream package
Browse files Browse the repository at this point in the history
  • Loading branch information
jonastheis committed Oct 27, 2023
1 parent b8ff52b commit 5383f66
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 69 deletions.
40 changes: 9 additions & 31 deletions pkg/protocol/engine/attestation/slotattestation/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io"

"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/serializer/v2"
"github.com/iotaledger/hive.go/serializer/v2/stream"
iotago "github.com/iotaledger/iota.go/v4"
)
Expand All @@ -13,28 +14,14 @@ func (m *Manager) Import(reader io.ReadSeeker) error {
defer m.commitmentMutex.Unlock()

var attestations []*iotago.Attestation
if err := stream.ReadCollection(reader, func(i int) error {
attestationBytes, err := stream.ReadBlob(reader)
if err != nil {
return ierrors.Wrap(err, "failed to read attestation")
}

version, _, err := iotago.VersionFromBytes(attestationBytes)
if err != nil {
return ierrors.Wrap(err, "failed to determine version")
}
if err := stream.ReadCollection(reader, serializer.SeriLengthPrefixTypeAsUint32, func(i int) error {

apiForVersion, err := m.apiProvider.APIForVersion(version)
attestation, err := stream.ReadObject[*iotago.Attestation](reader, serializer.SeriLengthPrefixTypeAsUint16, iotago.AttestationFromBytes(m.apiProvider))
if err != nil {
return ierrors.Wrapf(err, "failed to get API for version %d", version)
return ierrors.Wrapf(err, "failed to read attestation %d", i)
}

importedAttestation := new(iotago.Attestation)
if _, err = apiForVersion.Decode(attestationBytes, importedAttestation); err != nil {
return ierrors.Wrapf(err, "failed to decode attestation %d", i)
}

attestations = append(attestations, importedAttestation)
attestations = append(attestations, attestation)

return nil
}); err != nil {
Expand Down Expand Up @@ -88,23 +75,14 @@ func (m *Manager) Export(writer io.WriteSeeker, targetSlot iotago.SlotIndex) err
return ierrors.Wrapf(err, "failed to stream attestations of slot %d", targetSlot)
}

if err = stream.WriteCollection(writer, func() (uint64, error) {
if err = stream.WriteCollection(writer, serializer.SeriLengthPrefixTypeAsUint32, func() (int, error) {
for _, a := range attestations {
apiForVersion, err := m.apiProvider.APIForVersion(a.ProtocolVersion)
if err != nil {
return 0, ierrors.Wrapf(err, "failed to get API for version %d", a.ProtocolVersion)
}
bytes, err := apiForVersion.Encode(a)
if err != nil {
return 0, ierrors.Wrapf(err, "failed to encode attestation %v", a)
}

if writeErr := stream.WriteBlob(writer, bytes); writeErr != nil {
return 0, ierrors.Wrapf(writeErr, "failed to write attestation %v", a)
if err := stream.WriteObject(writer, a, serializer.SeriLengthPrefixTypeAsUint16, (*iotago.Attestation).Bytes); err != nil {
return 0, ierrors.Wrapf(err, "failed to write attestation %v", a)
}
}

return uint64(len(attestations)), nil
return len(attestations), nil
}); err != nil {
return ierrors.Wrapf(err, "failed to write attestations of slot %d", targetSlot)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/protocol/engine/attestation/slotattestation/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ func (m *Manager) attestationsForSlot(index iotago.SlotIndex) (ads.Map[iotago.Id
return nil, ierrors.Wrapf(err, "failed to get extended realm for attestations of slot %d", index)
}

return ads.NewMap[iotago.Identifier](attestationsStorage,
return ads.NewMap[iotago.Identifier](
attestationsStorage,
iotago.Identifier.Bytes,
iotago.IdentifierFromBytes,
iotago.AccountID.Bytes,
iotago.AccountIDFromBytes,
(*iotago.Attestation).Bytes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,10 @@ func (t *TestFramework) AssertCommit(slot iotago.SlotIndex, expectedCW uint64, e

require.EqualValues(t.test, expectedCW, cw)

expectedTree := ads.NewMap[iotago.Identifier](mapdb.NewMapDB(),
expectedTree := ads.NewMap[iotago.Identifier](
mapdb.NewMapDB(),
iotago.Identifier.Bytes,
iotago.IdentifierFromBytes,
iotago.AccountID.Bytes,
iotago.AccountIDFromBytes,
(*iotago.Attestation).Bytes,
Expand Down
35 changes: 10 additions & 25 deletions pkg/protocol/engine/eviction/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/iotaledger/hive.go/lo"
"github.com/iotaledger/hive.go/runtime/options"
"github.com/iotaledger/hive.go/runtime/syncutils"
"github.com/iotaledger/hive.go/serializer/v2"
"github.com/iotaledger/hive.go/serializer/v2/stream"
"github.com/iotaledger/iota-core/pkg/storage/prunable/slotstore"
iotago "github.com/iotaledger/iota.go/v4"
Expand Down Expand Up @@ -185,18 +186,18 @@ func (s *State) Export(writer io.WriteSeeker, lowerTarget iotago.SlotIndex, targ

latestNonEmptySlot := iotago.SlotIndex(0)

if err := stream.WriteCollection(writer, func() (elementsCount uint64, err error) {
if err := stream.WriteCollection(writer, serializer.SeriLengthPrefixTypeAsUint32, func() (elementsCount int, err error) {
for currentSlot := start; currentSlot <= targetSlot; currentSlot++ {
storage, err := s.rootBlockStorageFunc(currentSlot)
if err != nil {
continue
}
if err = storage.StreamBytes(func(rootBlockIDBytes []byte, commitmentIDBytes []byte) (err error) {
if err = stream.WriteBlob(writer, rootBlockIDBytes); err != nil {
if err = stream.WriteBytes(writer, rootBlockIDBytes); err != nil {
return ierrors.Wrapf(err, "failed to write root block ID %s", rootBlockIDBytes)
}

if err = stream.WriteBlob(writer, commitmentIDBytes); err != nil {
if err = stream.WriteBytes(writer, commitmentIDBytes); err != nil {
return ierrors.Wrapf(err, "failed to write root block's %s commitment %s", rootBlockIDBytes, commitmentIDBytes)
}

Expand All @@ -221,7 +222,7 @@ func (s *State) Export(writer io.WriteSeeker, lowerTarget iotago.SlotIndex, targ
latestNonEmptySlot = 0
}

if err := stream.WriteSerializable(writer, latestNonEmptySlot, iotago.SlotIndexLength); err != nil {
if err := stream.Write(writer, latestNonEmptySlot); err != nil {
return ierrors.Wrap(err, "failed to write latest non empty slot")
}

Expand All @@ -230,26 +231,15 @@ func (s *State) Export(writer io.WriteSeeker, lowerTarget iotago.SlotIndex, targ

// Import imports the root blocks from the given reader.
func (s *State) Import(reader io.ReadSeeker) error {
if err := stream.ReadCollection(reader, func(i int) error {

blockIDBytes, err := stream.ReadBlob(reader)
if err := stream.ReadCollection(reader, serializer.SeriLengthPrefixTypeAsUint32, func(i int) error {
rootBlockID, err := stream.Read[iotago.BlockID](reader)
if err != nil {
return ierrors.Wrapf(err, "failed to read root block id %d", i)
}

rootBlockID, _, err := iotago.BlockIDFromBytes(blockIDBytes)
if err != nil {
return ierrors.Wrapf(err, "failed to parse root block id %d", i)
}

commitmentIDBytes, err := stream.ReadBlob(reader)
commitmentID, err := stream.Read[iotago.CommitmentID](reader)
if err != nil {
return ierrors.Wrapf(err, "failed to read root block's %s commitment id", rootBlockID)
}

commitmentID, _, err := iotago.CommitmentIDFromBytes(commitmentIDBytes)
if err != nil {
return ierrors.Wrapf(err, "failed to parse root block's %s commitment id", rootBlockID)
return ierrors.Wrapf(err, "failed to read root block's %s commitment id %d", rootBlockID, i)
}

if s.rootBlocks.Get(rootBlockID.Slot(), true).Set(rootBlockID, commitmentID) {
Expand All @@ -263,16 +253,11 @@ func (s *State) Import(reader io.ReadSeeker) error {
return ierrors.Wrap(err, "failed to read root blocks")
}

latestNonEmptySlotBytes, err := stream.ReadBytes(reader, iotago.SlotIndexLength)
latestNonEmptySlot, err := stream.Read[iotago.SlotIndex](reader)
if err != nil {
return ierrors.Wrap(err, "failed to read latest non empty slot")
}

latestNonEmptySlot, _, err := iotago.SlotIndexFromBytes(latestNonEmptySlotBytes)
if err != nil {
return ierrors.Wrap(err, "failed to parse latest non empty slot")
}

s.setLatestNonEmptySlot(latestNonEmptySlot)

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io"

"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/serializer/v2"
"github.com/iotaledger/hive.go/serializer/v2/stream"
"github.com/iotaledger/iota-core/pkg/core/account"
"github.com/iotaledger/iota-core/pkg/model"
Expand All @@ -21,13 +22,13 @@ func (o *Orchestrator) Import(reader io.ReadSeeker) error {
o.lastCommittedSlot = slot

upgradeSignalMap := make(map[account.SeatIndex]*model.SignaledBlock)
if err := stream.ReadCollection(reader, func(i int) error {
if err := stream.ReadCollection(reader, serializer.SeriLengthPrefixTypeAsUint32, func(i int) error {
seat, err := stream.Read[account.SeatIndex](reader)
if err != nil {
return ierrors.Wrap(err, "failed to read seat")
}

signaledBlock, err := stream.ReadFunc(reader, model.SignaledBlockFromBytesFunc(o.apiProvider.APIForSlot(slot)))
signaledBlock, err := stream.ReadObject(reader, serializer.SeriLengthPrefixTypeAsUint16, model.SignaledBlockFromBytesFunc(o.apiProvider.APIForSlot(slot)))
if err != nil {
return ierrors.Wrap(err, "failed to read signaled block")
}
Expand Down Expand Up @@ -55,13 +56,13 @@ func (o *Orchestrator) Import(reader io.ReadSeeker) error {
latestSignals.Set(seat, signaledBlock)
}

if err := stream.ReadCollection(reader, func(i int) error {
if err := stream.ReadCollection(reader, serializer.SeriLengthPrefixTypeAsByte, func(i int) error {
epoch, err := stream.Read[iotago.EpochIndex](reader)
if err != nil {
return ierrors.Wrap(err, "failed to read epoch")
}

versionAndHash, err := stream.ReadFunc(reader, model.VersionAndHashFromBytes)
versionAndHash, err := stream.ReadFixedSizeObject(reader, model.VersionAndHashSize, model.VersionAndHashFromBytes)
if err != nil {
return ierrors.Wrap(err, "failed to read versionAndHash")
}
Expand All @@ -87,19 +88,19 @@ func (o *Orchestrator) Export(writer io.WriteSeeker, targetSlot iotago.SlotIndex
}

// Export the upgrade signals for the target slot. Since these are rolled forward exporting the last slot is sufficient.
if err := stream.WriteCollection(writer, func() (elementsCount uint64, err error) {
var exportedCount uint64
if err := stream.WriteCollection(writer, serializer.SeriLengthPrefixTypeAsUint32, func() (elementsCount int, err error) {
var exportedCount int

upgradeSignals, err := o.upgradeSignalsPerSlotFunc(targetSlot)
if err != nil {
return 0, ierrors.Wrapf(err, "failed to get upgrade signals for target slot %d", targetSlot)
}
if err := upgradeSignals.StreamBytes(func(seatBytes []byte, signaledBlockBytes []byte) error {
if err := stream.Write(writer, seatBytes); err != nil {
if err := stream.WriteBytes(writer, seatBytes); err != nil {
return ierrors.Wrap(err, "failed to write seat")
}

if err := stream.WriteBlob(writer, signaledBlockBytes); err != nil {
if err := stream.WriteByteSlice(writer, signaledBlockBytes, serializer.SeriLengthPrefixTypeAsUint16); err != nil {
return ierrors.Wrap(err, "failed to write signaled block")
}

Expand All @@ -116,8 +117,8 @@ func (o *Orchestrator) Export(writer io.WriteSeeker, targetSlot iotago.SlotIndex
}

// Export the successfully signaled epochs for the signaling window.
if err := stream.WriteCollection(writer, func() (elementsCount uint64, err error) {
var exportedCount uint64
if err := stream.WriteCollection(writer, serializer.SeriLengthPrefixTypeAsByte, func() (elementsCount int, err error) {
var exportedCount int

apiForSlot := o.apiProvider.APIForSlot(targetSlot)
currentEpoch := apiForSlot.TimeProvider().EpochFromSlot(targetSlot)
Expand All @@ -137,7 +138,7 @@ func (o *Orchestrator) Export(writer io.WriteSeeker, targetSlot iotago.SlotIndex
if err := stream.Write(writer, epoch); err != nil {
return 0, ierrors.Wrapf(err, "failed to write epoch %d", epoch)
}
if err := stream.WriteSerializable(writer, versionAndHash); err != nil {
if err := stream.WriteFixedSizeObject(writer, versionAndHash, model.VersionAndHashSize, model.VersionAndHash.Bytes); err != nil {
return 0, ierrors.Wrapf(err, "failed to write versionAndHash for epoch %d", epoch)
}

Expand Down

0 comments on commit 5383f66

Please sign in to comment.