Skip to content

Commit

Permalink
gsfa: change OffsetAndSizeAndBlocktime to OffsetAndSizeAndSlot
Browse files Browse the repository at this point in the history
  • Loading branch information
gagliardetto committed Dec 11, 2024
1 parent 82b3cf9 commit a3ad8e9
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 73 deletions.
2 changes: 2 additions & 0 deletions blocktimeindex/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/rpcpool/yellowstone-faithful/slottools"
)

// Using an int32 for blocktime is enough seconds until the year 2106.

var magic = []byte("blocktimeindex")

type Index struct {
Expand Down
1 change: 0 additions & 1 deletion cmd-x-index-gsfa.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ func newCmd_Index_gsfa() *cli.Command {
txWithInfo.Offset,
txWithInfo.Length,
txWithInfo.Slot,
txWithInfo.Blocktime,
accountKeys,
)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions gsfa/gsfa-read-multiepoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (gsfa *GsfaReaderMultiepoch) Get(
ctx context.Context,
pk solana.PublicKey,
limit int,
fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error),
fetcher func(uint64, linkedlog.OffsetAndSizeAndSlot) (*ipldbindcode.Transaction, error),
) (EpochToTransactionObjects, error) {
if limit <= 0 {
return nil, nil
Expand Down Expand Up @@ -102,7 +102,7 @@ func (multi *GsfaReaderMultiepoch) GetBeforeUntil(
limit int,
before *solana.Signature, // Before this signature, exclusive (i.e. get signatures older than this signature, excluding it).
until *solana.Signature, // Until this signature, inclusive (i.e. stop at this signature, including it).
fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error),
fetcher func(uint64, linkedlog.OffsetAndSizeAndSlot) (*ipldbindcode.Transaction, error),
) (EpochToTransactionObjects, error) {
if limit <= 0 {
return make(EpochToTransactionObjects), nil
Expand All @@ -118,7 +118,7 @@ func (multi *GsfaReaderMultiepoch) iterBeforeUntil(
limit int,
before *solana.Signature, // Before this signature, exclusive (i.e. get signatures older than this signature, excluding it).
until *solana.Signature, // Until this signature, inclusive (i.e. stop at this signature, including it).
fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error),
fetcher func(uint64, linkedlog.OffsetAndSizeAndSlot) (*ipldbindcode.Transaction, error),
) (EpochToTransactionObjects, error) {
if limit <= 0 {
return make(EpochToTransactionObjects), nil
Expand Down
14 changes: 7 additions & 7 deletions gsfa/gsfa-read.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ func (index *GsfaReader) Get(
ctx context.Context,
pk solana.PublicKey,
limit int,
) ([]linkedlog.OffsetAndSizeAndBlocktime, error) {
) ([]linkedlog.OffsetAndSizeAndSlot, error) {
if limit <= 0 {
return []linkedlog.OffsetAndSizeAndBlocktime{}, nil
return []linkedlog.OffsetAndSizeAndSlot{}, nil
}
lastOffset, err := index.offsets.Get(pk)
if err != nil {
Expand All @@ -106,7 +106,7 @@ func (index *GsfaReader) Get(
}
debugln("locs.OffsetToFirst:", lastOffset)

var allTransactionLocations []linkedlog.OffsetAndSizeAndBlocktime
var allTransactionLocations []linkedlog.OffsetAndSizeAndSlot
next := lastOffset // Start from the latest, and go back in time.

for {
Expand Down Expand Up @@ -138,10 +138,10 @@ func (index *GsfaReader) GetBeforeUntil(
limit int,
before *solana.Signature, // Before this signature, exclusive (i.e. get signatures older than this signature, excluding it).
until *solana.Signature, // Until this signature, inclusive (i.e. stop at this signature, including it).
fetcher func(sigIndex linkedlog.OffsetAndSizeAndBlocktime) (solana.Signature, error),
) ([]linkedlog.OffsetAndSizeAndBlocktime, error) {
fetcher func(sigIndex linkedlog.OffsetAndSizeAndSlot) (solana.Signature, error),
) ([]linkedlog.OffsetAndSizeAndSlot, error) {
if limit <= 0 {
return []linkedlog.OffsetAndSizeAndBlocktime{}, nil
return []linkedlog.OffsetAndSizeAndSlot{}, nil
}
locs, err := index.offsets.Get(pk)
if err != nil {
Expand All @@ -152,7 +152,7 @@ func (index *GsfaReader) GetBeforeUntil(
}
debugln("locs.OffsetToFirst:", locs)

var allTransactionLocations []linkedlog.OffsetAndSizeAndBlocktime
var allTransactionLocations []linkedlog.OffsetAndSizeAndSlot
next := locs // Start from the latest, and go back in time.

reachedBefore := false
Expand Down
17 changes: 8 additions & 9 deletions gsfa/gsfa-write.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type GsfaWriter struct {
ll *linkedlog.LinkedLog
man *manifest.Manifest
fullBufferWriterChan chan linkedlog.KeyToOffsetAndSizeAndBlocktime
accum *hashmap.Map[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktime]
accum *hashmap.Map[solana.PublicKey, []*linkedlog.OffsetAndSizeAndSlot]
offsetsWriter *indexes.PubkeyToOffsetAndSize_Writer
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -64,7 +64,7 @@ func NewGsfaWriter(
fullBufferWriterChan: make(chan linkedlog.KeyToOffsetAndSizeAndBlocktime, 50), // TODO: make this configurable
popRank: newRollingRankOfTopPerformers(10_000),
offsets: hashmap.New[solana.PublicKey, [2]uint64](int(1_000_000)),
accum: hashmap.New[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktime](int(1_000_000)),
accum: hashmap.New[solana.PublicKey, []*linkedlog.OffsetAndSizeAndSlot](int(1_000_000)),
ctx: ctx,
cancel: cancel,
fullBufferWriterDone: make(chan struct{}),
Expand Down Expand Up @@ -145,16 +145,15 @@ func (a *GsfaWriter) Push(
offset uint64,
length uint64,
slot uint64,
blocktime uint64,
publicKeys solana.PublicKeySlice,
) error {
a.mu.Lock()
defer a.mu.Unlock()

oas := &linkedlog.OffsetAndSizeAndBlocktime{
Offset: offset,
Size: length,
Blocktime: blocktime,
oas := &linkedlog.OffsetAndSizeAndSlot{
Offset: offset,
Size: length,
Slot: slot,
}
publicKeys = publicKeys.Dedupe()
publicKeys.Sort()
Expand Down Expand Up @@ -190,7 +189,7 @@ func (a *GsfaWriter) Push(
for _, publicKey := range publicKeys {
current, ok := a.accum.Get(publicKey)
if !ok {
current = make([]*linkedlog.OffsetAndSizeAndBlocktime, 0, itemsPerBatch)
current = make([]*linkedlog.OffsetAndSizeAndSlot, 0, itemsPerBatch)
current = append(current, oas)
a.accum.Set(publicKey, current)
} else {
Expand Down Expand Up @@ -259,7 +258,7 @@ func (a *GsfaWriter) Close() error {
)
}

func (a *GsfaWriter) flushAccum(m *hashmap.Map[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktime]) error {
func (a *GsfaWriter) flushAccum(m *hashmap.Map[solana.PublicKey, []*linkedlog.OffsetAndSizeAndSlot]) error {
keys := solana.PublicKeySlice(m.Keys())
keys.Sort()
for ii := range keys {
Expand Down
14 changes: 7 additions & 7 deletions gsfa/linkedlog/linked-log.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *LinkedLog) write(b []byte) (uint64, uint32, error) {
const mib = 1024 * 1024

// Read reads the block stored at the given offset.
func (s *LinkedLog) Read(offset uint64) ([]OffsetAndSizeAndBlocktime, indexes.OffsetAndSize, error) {
func (s *LinkedLog) Read(offset uint64) ([]OffsetAndSizeAndSlot, indexes.OffsetAndSize, error) {
lenBuf := make([]byte, binary.MaxVarintLen64)
_, err := s.file.ReadAt(lenBuf, int64(offset))
if err != nil {
Expand All @@ -130,7 +130,7 @@ func sizeOfUvarint(n uint64) int {
return binary.PutUvarint(make([]byte, binary.MaxVarintLen64), n)
}

func (s *LinkedLog) ReadWithSize(offset uint64, size uint64) ([]OffsetAndSizeAndBlocktime, indexes.OffsetAndSize, error) {
func (s *LinkedLog) ReadWithSize(offset uint64, size uint64) ([]OffsetAndSizeAndSlot, indexes.OffsetAndSize, error) {
if size > 256*mib {
return nil, indexes.OffsetAndSize{}, fmt.Errorf("compacted indexes length too large: %d", size)
}
Expand Down Expand Up @@ -158,12 +158,12 @@ func (s *LinkedLog) ReadWithSize(offset uint64, size uint64) ([]OffsetAndSizeAnd
return sigIndexes, nextOffset, nil
}

func decompressIndexes(data []byte) ([]OffsetAndSizeAndBlocktime, error) {
func decompressIndexes(data []byte) ([]OffsetAndSizeAndSlot, error) {
decompressed, err := tooling.DecompressZstd(data)
if err != nil {
return nil, fmt.Errorf("error while decompressing data: %w", err)
}
return OffsetAndSizeAndBlocktimeSliceFromBytes(decompressed)
return OffsetAndSizeAndSlotSliceFromBytes(decompressed)
}

type KeyToOffsetAndSizeAndBlocktimeSlice []KeyToOffsetAndSizeAndBlocktime
Expand All @@ -180,7 +180,7 @@ func (s KeyToOffsetAndSizeAndBlocktimeSlice) Has(key solana.PublicKey) bool {

type KeyToOffsetAndSizeAndBlocktime struct {
Key solana.PublicKey
Values []*OffsetAndSizeAndBlocktime
Values []*OffsetAndSizeAndSlot
}

func (s *LinkedLog) Put(
Expand All @@ -205,7 +205,7 @@ func (s *LinkedLog) Put(
if len(val.Values) == 0 {
continue
}
slices.Reverse[[]*OffsetAndSizeAndBlocktime](val.Values) // reverse the slice so that the most recent indexes are first
slices.Reverse[[]*OffsetAndSizeAndSlot](val.Values) // reverse the slice so that the most recent indexes are first
err := func() error {
encodedIndexes, err := createIndexesPayload(val.Values)
if err != nil {
Expand Down Expand Up @@ -245,7 +245,7 @@ func (s *LinkedLog) Put(
return uint64(previousSize), nil
}

func createIndexesPayload(indexes []*OffsetAndSizeAndBlocktime) ([]byte, error) {
func createIndexesPayload(indexes []*OffsetAndSizeAndSlot) ([]byte, error) {
buf := make([]byte, 0, 9*len(indexes))
for _, index := range indexes {
buf = append(buf, index.Bytes()...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,32 @@ import (
"slices"
)

func NewOffsetAndSizeAndBlocktime(offset uint64, size uint64, blocktime uint64) *OffsetAndSizeAndBlocktime {
return &OffsetAndSizeAndBlocktime{
Offset: offset,
Size: size,
Blocktime: blocktime,
func NewOffsetAndSizeAndSlot(offset uint64, size uint64, slot uint64) *OffsetAndSizeAndSlot {
return &OffsetAndSizeAndSlot{
Offset: offset,
Size: size,
Slot: slot,
}
}

type OffsetAndSizeAndBlocktime struct {
Offset uint64 // uint48, 6 bytes, max 281.5 TB (terabytes)
Size uint64 // uint24, 3 bytes, max 16.7 MB (megabytes)
Blocktime uint64 // uint40, 5 bytes, max 1099511627775 (seconds since epoch)
type OffsetAndSizeAndSlot struct {
Offset uint64 // encoded as uvarint
Size uint64 // encoded as uvarint
Slot uint64 // encoded as uvarint
}

// Bytes returns the offset and size as a byte slice.
func (oas OffsetAndSizeAndBlocktime) Bytes() []byte {
func (oas OffsetAndSizeAndSlot) Bytes() []byte {
buf := make([]byte, 0, binary.MaxVarintLen64*3)
buf = binary.AppendUvarint(buf, oas.Offset)
buf = binary.AppendUvarint(buf, oas.Size)
buf = binary.AppendUvarint(buf, oas.Blocktime)
buf = binary.AppendUvarint(buf, oas.Slot)
buf = slices.Clip(buf)
return buf
}

// FromBytes parses the offset and size from a byte slice.
func (oas *OffsetAndSizeAndBlocktime) FromBytes(buf []byte) error {
func (oas *OffsetAndSizeAndSlot) FromBytes(buf []byte) error {
if len(buf) > binary.MaxVarintLen64*3 {
return errors.New("invalid byte slice length")
}
Expand All @@ -48,14 +48,14 @@ func (oas *OffsetAndSizeAndBlocktime) FromBytes(buf []byte) error {
return errors.New("failed to parse size")
}
buf = buf[n:]
oas.Blocktime, n = binary.Uvarint(buf)
oas.Slot, n = binary.Uvarint(buf)
if n <= 0 {
return errors.New("failed to parse blocktime")
return errors.New("failed to parse slot")
}
return nil
}

func (oas *OffsetAndSizeAndBlocktime) FromReader(r UvarintReader) error {
func (oas *OffsetAndSizeAndSlot) FromReader(r UvarintReader) error {
var err error
oas.Offset, err = r.ReadUvarint()
if err != nil {
Expand All @@ -65,9 +65,9 @@ func (oas *OffsetAndSizeAndBlocktime) FromReader(r UvarintReader) error {
if err != nil {
return fmt.Errorf("failed to read size: %w", err)
}
oas.Blocktime, err = r.ReadUvarint()
oas.Slot, err = r.ReadUvarint()
if err != nil {
return fmt.Errorf("failed to read blocktime: %w", err)
return fmt.Errorf("failed to read slot: %w", err)
}
return nil
}
Expand All @@ -92,11 +92,11 @@ func (r *uvarintReader) ReadUvarint() (uint64, error) {
return v, nil
}

func OffsetAndSizeAndBlocktimeSliceFromBytes(buf []byte) ([]OffsetAndSizeAndBlocktime, error) {
func OffsetAndSizeAndSlotSliceFromBytes(buf []byte) ([]OffsetAndSizeAndSlot, error) {
r := &uvarintReader{buf: buf}
oass := make([]OffsetAndSizeAndBlocktime, 0)
oass := make([]OffsetAndSizeAndSlot, 0)
for {
oas := OffsetAndSizeAndBlocktime{}
oas := OffsetAndSizeAndSlot{}
err := oas.FromReader(r)
if err != nil {
if errors.Is(err, io.EOF) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ import (
"testing"
)

func TestOffsetAndSizeAndBlocktime(t *testing.T) {
func TestOffsetAndSizeAndSlot(t *testing.T) {
{
ca := OffsetAndSizeAndBlocktime{
Offset: 1,
Size: 2,
Blocktime: 3,
ca := OffsetAndSizeAndSlot{
Offset: 1,
Size: 2,
Slot: 3,
}
buf := ca.Bytes()

{
ca2 := OffsetAndSizeAndBlocktime{}
ca2 := OffsetAndSizeAndSlot{}
err := ca2.FromBytes(buf)
if err != nil {
panic(err)
Expand All @@ -28,15 +28,15 @@ func TestOffsetAndSizeAndBlocktime(t *testing.T) {
}
{
// now with very high values
ca := OffsetAndSizeAndBlocktime{
Offset: 281474976710655,
Size: 16777215,
Blocktime: 1099511627775,
ca := OffsetAndSizeAndSlot{
Offset: 281474976710655,
Size: 16777215,
Slot: 1099511627775,
}
buf := ca.Bytes()

{
ca2 := OffsetAndSizeAndBlocktime{}
ca2 := OffsetAndSizeAndSlot{}
err := ca2.FromBytes(buf)
if err != nil {
panic(err)
Expand All @@ -47,21 +47,21 @@ func TestOffsetAndSizeAndBlocktime(t *testing.T) {
}
}
{
many := []OffsetAndSizeAndBlocktime{
many := []OffsetAndSizeAndSlot{
{
Offset: 1,
Size: 2,
Blocktime: 3,
Offset: 1,
Size: 2,
Slot: 3,
},
{
Offset: 4,
Size: 5,
Blocktime: 6,
Offset: 4,
Size: 5,
Slot: 6,
},
{
Offset: 281474976710655,
Size: 16777215,
Blocktime: 1099511627775,
Offset: 281474976710655,
Size: 16777215,
Slot: 1099511627775,
},
}
buf := make([]byte, 0, binary.MaxVarintLen64*3*len(many))
Expand All @@ -70,7 +70,7 @@ func TestOffsetAndSizeAndBlocktime(t *testing.T) {
}

{
many2, err := OffsetAndSizeAndBlocktimeSliceFromBytes(buf)
many2, err := OffsetAndSizeAndSlotSliceFromBytes(buf)
if err != nil {
panic(err)
}
Expand Down
8 changes: 6 additions & 2 deletions gsfa/manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ type Manifest struct {
}

var (
_MAGIC = [...]byte{'g', 's', 'f', 'a', 'm', 'n', 'f', 's'}
_Version = uint64(3)
_MAGIC = [...]byte{'g', 's', 'f', 'a', 'm', 'n', 'f', 's'}
// NOTES:
// - v3: stores offset, size, and blocktime
// - v4: stores offset, size, and slot
// Version is the version of the gsfa index.
_Version = uint64(4)
)

var headerLenWithoutMeta = len(_MAGIC) + 8 // 8 bytes for the version
Expand Down
Loading

0 comments on commit a3ad8e9

Please sign in to comment.