diff --git a/api.go b/api.go index d9f3876d..3c773daf 100644 --- a/api.go +++ b/api.go @@ -8,6 +8,7 @@ import ( "github.com/gagliardetto/solana-go" "github.com/rpcpool/yellowstone-faithful/compactindexsized" + "github.com/rpcpool/yellowstone-faithful/slottools" "github.com/valyala/fasthttp" ) @@ -35,7 +36,7 @@ func (multi *MultiEpoch) apiHandler(reqCtx *fasthttp.RequestCtx) { return } // find the epoch that contains the requested slot - epochNumber := CalcEpochForSlot(slot) + epochNumber := slottools.CalcEpochForSlot(slot) epochHandler, err := multi.GetEpoch(epochNumber) if err != nil { reqCtx.SetStatusCode(fasthttp.StatusNotFound) // TODO: this means epoch is not available, and probably should be another dedicated status code diff --git a/blocktimeindex/error.go b/blocktimeindex/error.go new file mode 100644 index 00000000..b8873484 --- /dev/null +++ b/blocktimeindex/error.go @@ -0,0 +1,26 @@ +package blocktimeindex + +import "fmt" + +var _ error = &ErrSlotOutOfRange{} + +type ErrSlotOutOfRange struct { + start, end uint64 + slot uint64 +} + +func NewErrSlotOutOfRange(start, end, slot uint64) error { + return &ErrSlotOutOfRange{start: start, end: end, slot: slot} +} + +func (e *ErrSlotOutOfRange) Error() string { + if e == nil { + return "nil" + } + return fmt.Sprintf("slot %d is out of range [%d, %d]", e.slot, e.start, e.end) +} + +func (e *ErrSlotOutOfRange) Is(target error) bool { + _, ok := target.(*ErrSlotOutOfRange) + return ok +} diff --git a/blocktimeindex/writer.go b/blocktimeindex/writer.go new file mode 100644 index 00000000..df0fe821 --- /dev/null +++ b/blocktimeindex/writer.go @@ -0,0 +1,259 @@ +package blocktimeindex + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + "math" + "os" + + "github.com/ipfs/go-cid" + "github.com/rpcpool/yellowstone-faithful/indexes" + "github.com/rpcpool/yellowstone-faithful/slottools" +) + +// Using an int32 for blocktime is enough seconds until the year 2106. + +var magic = []byte("blocktimeindex") + +type Index struct { + start uint64 + end uint64 + epoch uint64 + capacity uint64 + values []int64 +} + +func NewIndexer(start, end, capacity uint64) *Index { + epoch := slottools.EpochForSlot(start) + if epoch != slottools.EpochForSlot(end) { + panic(fmt.Sprintf("start and end slots must be in the same epoch: %d != %d", epoch, slottools.EpochForSlot(end))) + } + return &Index{ + start: start, + end: end, + epoch: epoch, + capacity: capacity, + values: make([]int64, capacity), + } +} + +const DefaultCapacityForEpoch = 432_000 + +// NewForEpoch creates a new Index for the given epoch. +func NewForEpoch(epoch uint64) *Index { + start, end := slottools.CalcEpochLimits(epoch) + return NewIndexer(start, end, DefaultCapacityForEpoch) +} + +// Set sets the blocktime for the given slot. +func (i *Index) Set(slot uint64, time int64) error { + if slot < i.start || slot > i.end { + return NewErrSlotOutOfRange(i.start, i.end, slot) + } + i.values[slot-i.start] = time + return nil +} + +// Get gets the blocktime for the given slot. +func (i *Index) Get(slot uint64) (int64, error) { + if slot < i.start || slot > i.end { + return 0, NewErrSlotOutOfRange(i.start, i.end, slot) + } + return i.values[slot-i.start], nil +} + +func (i *Index) marshalBinary() ([]byte, error) { + writer := bytes.NewBuffer(nil) + writer.Grow(DefaultIndexByteSize) + _, err := writer.Write(magic) + if err != nil { + return nil, fmt.Errorf("failed to write magic: %w", err) + } + _, err = writer.Write(slottools.Uint64ToLEBytes(i.start)) + if err != nil { + return nil, fmt.Errorf("failed to write start: %w", err) + } + _, err = writer.Write(slottools.Uint64ToLEBytes(i.end)) + if err != nil { + return nil, fmt.Errorf("failed to write end: %w", err) + } + _, err = writer.Write(slottools.Uint64ToLEBytes(i.epoch)) + if err != nil { + return nil, fmt.Errorf("failed to write epoch: %w", err) + } + _, err = writer.Write(slottools.Uint64ToLEBytes(i.capacity)) + if err != nil { + return nil, fmt.Errorf("failed to write capacity: %w", err) + } + for _, t := range i.values { + b, err := blocktimeToBytes(int64(t)) + if err != nil { + return nil, fmt.Errorf("failed to convert time to bytes: %w", err) + } + _, err = writer.Write(b) + if err != nil { + return nil, fmt.Errorf("failed to write time: %w", err) + } + } + return writer.Bytes(), nil +} + +func (i *Index) MarshalBinary() ([]byte, error) { + return i.marshalBinary() +} + +var _ io.WriterTo = (*Index)(nil) + +// WriteTo implements io.WriterTo. +func (i *Index) WriteTo(wr io.Writer) (int64, error) { + data, err := i.marshalBinary() + if err != nil { + return 0, err + } + n, err := wr.Write(data) + return int64(n), err +} + +func blocktimeToBytes(blocktime int64) ([]byte, error) { + if blocktime < 0 { + return nil, fmt.Errorf("blocktime must be non-negative") + } + if blocktime > math.MaxUint32 { + return nil, fmt.Errorf("blocktime must fit in 32 bits") + } + // treat as uint32 + buf := make([]byte, 4) + binary.LittleEndian.PutUint32(buf, uint32(blocktime)) + return buf, nil +} + +func (i *Index) unmarshalBinary(data []byte) error { + reader := bytes.NewReader(data) + magicBuf := make([]byte, len(magic)) + _, err := reader.Read(magicBuf) + if err != nil { + return fmt.Errorf("failed to read magic: %w", err) + } + if !bytes.Equal(magicBuf, magic) { + return fmt.Errorf("invalid magic: %s", magicBuf) + } + + startBuf := make([]byte, 8) + _, err = reader.Read(startBuf) + if err != nil { + return fmt.Errorf("failed to read start: %w", err) + } + i.start = slottools.Uint64FromLEBytes(startBuf) + + endBuf := make([]byte, 8) + _, err = reader.Read(endBuf) + if err != nil { + return fmt.Errorf("failed to read end: %w", err) + } + i.end = slottools.Uint64FromLEBytes(endBuf) + + epochBuf := make([]byte, 8) + _, err = reader.Read(epochBuf) + if err != nil { + return fmt.Errorf("failed to read epoch: %w", err) + } + i.epoch = slottools.Uint64FromLEBytes(epochBuf) + { + // check that start and end are in the same epoch + startEpoch := slottools.EpochForSlot(i.start) + endEpoch := slottools.EpochForSlot(i.end) + if startEpoch != endEpoch { + return fmt.Errorf("start and end slots must be in the same epoch: %d != %d", startEpoch, endEpoch) + } + if startEpoch != i.epoch { + return fmt.Errorf("epoch mismatch: start=%d, end=%d, epoch=%d", startEpoch, endEpoch, i.epoch) + } + } + + capacityBuf := make([]byte, 8) + _, err = reader.Read(capacityBuf) + if err != nil { + return fmt.Errorf("failed to read capacity: %w", err) + } + i.capacity = slottools.Uint64FromLEBytes(capacityBuf) + + i.values = make([]int64, i.capacity) + for j := uint64(0); j < i.capacity; j++ { + timeBuf := make([]byte, 4) + _, err = reader.Read(timeBuf) + if err != nil { + return fmt.Errorf("failed to read time: %w", err) + } + i.values[j] = int64(binary.LittleEndian.Uint32(timeBuf)) + } + return nil +} + +func (i *Index) UnmarshalBinary(data []byte) error { + return i.unmarshalBinary(data) +} + +func (i *Index) FromBytes(data []byte) error { + return i.unmarshalBinary(data) +} + +func (i *Index) FromReader(r io.Reader) error { + data, err := io.ReadAll(r) + if err != nil { + return fmt.Errorf("failed to read data: %w", err) + } + return i.FromBytes(data) +} + +func (i *Index) FromFile(file string) error { + buf, err := os.ReadFile(file) + if err != nil { + return fmt.Errorf("failed to read file: %w", err) + } + return i.FromBytes(buf) +} + +func FromFile(file string) (*Index, error) { + i := &Index{} + err := i.FromFile(file) + if err != nil { + return nil, err + } + return i, nil +} + +func FromBytes(data []byte) (*Index, error) { + i := &Index{} + err := i.FromBytes(data) + if err != nil { + return nil, err + } + return i, nil +} + +func FromReader(r io.Reader) (*Index, error) { + i := &Index{} + err := i.FromReader(r) + if err != nil { + return nil, err + } + return i, nil +} + +func FormatFilename(epoch uint64, rootCid cid.Cid, network indexes.Network) string { + return fmt.Sprintf( + "epoch-%d-%s-%s-%s", + epoch, + rootCid.String(), + network, + "slot-to-blocktime.index", + ) +} + +var DefaultIndexByteSize = len(magic) + 8 + 8 + 8 + 8 + (432000 * 4) + +func (i *Index) Epoch() uint64 { + return i.epoch +} diff --git a/blocktimeindex/writer_test.go b/blocktimeindex/writer_test.go new file mode 100644 index 00000000..32ac18a9 --- /dev/null +++ b/blocktimeindex/writer_test.go @@ -0,0 +1,121 @@ +package blocktimeindex + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestWriter(t *testing.T) { + { + epoch := uint64(0) + w := NewForEpoch(epoch) + if w.start != 0 { + t.Errorf("expected 0, got %v", w.start) + } + if w.end != 431999 { + t.Errorf("expected 431999, got %v", w.end) + } + if w.epoch != 0 { + t.Errorf("expected 0, got %v", w.epoch) + } + if w.capacity != 432000 { + t.Errorf("expected 432000, got %v", w.capacity) + } + if len(w.values) != 432000 { + t.Errorf("expected 432000, got %v", len(w.values)) + } + err := w.Set(0, 1) + if err != nil { + t.Errorf("expected no error, got %v", err) + } + if got, err := w.Get(0); got != 1 || err != nil { + t.Errorf("expected 1, got %v, %v", got, err) + } + err = w.Set(431999, 1) + if err != nil { + t.Errorf("expected no error, got %v", err) + } + if got, err := w.Get(431999); got != 1 || err != nil { + t.Errorf("expected 1, got %v, %v", got, err) + } + err = w.Set(432000, 1) + if !errors.Is(err, &ErrSlotOutOfRange{}) { + t.Errorf("expected ErrSlotOutOfRange, got %v", err) + } + // expect error when getting out of range slot + if _, err := w.Get(432000); !errors.Is(err, &ErrSlotOutOfRange{}) { + t.Errorf("expected ErrSlotOutOfRange, got %v", err) + } + } + { + epoch := uint64(1) + w := NewForEpoch(epoch) + if w.start != 432000 { + t.Errorf("expected 432000, got %v", w.start) + } + if w.end != 863999 { + t.Errorf("expected 863999, got %v", w.end) + } + if w.epoch != 1 { + t.Errorf("expected 1, got %v", w.epoch) + } + if w.capacity != 432000 { + t.Errorf("expected 432000, got %v", w.capacity) + } + if len(w.values) != 432000 { + t.Errorf("expected 432000, got %v", len(w.values)) + } + if len(w.values) != 432000 { + t.Errorf("expected 432000, got %v", len(w.values)) + } + err := w.Set(432000, 123) + if err != nil { + t.Errorf("expected no error, got %v", err) + } + if got, err := w.Get(432000); got != 123 || err != nil { + t.Errorf("expected 1, got %v, %v", got, err) + } + err = w.Set(863999, 1) + if err != nil { + t.Errorf("expected no error, got %v", err) + } + if got, err := w.Get(863999); got != 1 || err != nil { + t.Errorf("expected 1, got %v, %v", got, err) + } + err = w.Set(864000, 1) + if !errors.Is(err, &ErrSlotOutOfRange{}) { + t.Errorf("expected ErrSlotOutOfRange, got %v", err) + } + // expect error when getting out of range slot + if _, err := w.Get(864000); !errors.Is(err, &ErrSlotOutOfRange{}) { + t.Errorf("expected ErrSlotOutOfRange, got %v", err) + } + { + // test writing + buf, err := w.MarshalBinary() + if err != nil { + t.Errorf("expected no error, got %v", err) + } + // the expected length is: + // - len(magic) + len(start) + len(end) + len(capacity) + len(epoch) + (len(values) * 4) + expectedLen := DefaultIndexByteSize + if len(buf) != expectedLen { + t.Errorf("expected %v, got %v", expectedLen, len(buf)) + } + { + // test reading + got, err := FromBytes(buf) + if err != nil { + t.Errorf("expected no error, got %v", err) + } + assert.Equal(t, w.start, got.start) + assert.Equal(t, w.end, got.end) + assert.Equal(t, w.epoch, got.epoch) + assert.Equal(t, w.capacity, got.capacity) + assert.Equal(t, w.values, got.values) + } + } + } +} diff --git a/cmd-car-split.go b/cmd-car-split.go index 3707149f..7d3e9909 100644 --- a/cmd-car-split.go +++ b/cmd-car-split.go @@ -219,7 +219,7 @@ func newCmd_SplitCar() *cli.Command { } // Set the currentFileSize to the size of the header - currentFileSize = uint64(len(nulRootCarHeader)) + currentFileSize = hdrSize currentSubsetInfo = subsetInfo{fileName: filename, firstSlot: -1, lastSlot: -1} return nil } diff --git a/cmd-find-missing-tx-meta.go b/cmd-find-missing-tx-meta.go index bea7704f..21900625 100644 --- a/cmd-find-missing-tx-meta.go +++ b/cmd-find-missing-tx-meta.go @@ -16,6 +16,7 @@ import ( "github.com/rpcpool/yellowstone-faithful/carreader" "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" "github.com/rpcpool/yellowstone-faithful/iplddecoders" + "github.com/rpcpool/yellowstone-faithful/slottools" "github.com/rpcpool/yellowstone-faithful/tooling" "github.com/urfave/cli/v2" "k8s.io/klog/v2" @@ -188,8 +189,8 @@ func newCmd_find_missing_tx_metadata() *cli.Command { slot := uint64(block.Slot) firstSlot = &slot // determine epoch: - epoch := CalcEpochForSlot(slot) - epochStart, epochEnd = CalcEpochLimits(epoch) + epoch := slottools.CalcEpochForSlot(slot) + epochStart, epochEnd = slottools.CalcEpochLimits(epoch) } if tookToDo1kSlots > 0 { remainingSlots := epochEnd - uint64(block.Slot) diff --git a/cmd-x-index-all.go b/cmd-x-index-all.go index b4b15eac..7e33d35b 100644 --- a/cmd-x-index-all.go +++ b/cmd-x-index-all.go @@ -9,10 +9,12 @@ import ( "math/rand" "os" "path/filepath" + "sort" "time" "github.com/dustin/go-humanize" "github.com/ipfs/go-cid" + "github.com/rpcpool/yellowstone-faithful/blocktimeindex" "github.com/rpcpool/yellowstone-faithful/bucketteer" "github.com/rpcpool/yellowstone-faithful/carreader" "github.com/rpcpool/yellowstone-faithful/indexes" @@ -166,6 +168,10 @@ func createAllIndexes( for kind := range numItems { kinds = append(kinds, kind) } + // sort from byte value: + sort.Slice(kinds, func(i, j int) bool { + return kinds[i] < kinds[j] + }) for _, kind := range kinds { klog.Infof(" %s: %s items", iplddecoders.Kind(kind), humanize.Comma(int64(numItems[kind]))) numTotalItems += numItems[kind] @@ -220,6 +226,8 @@ func createAllIndexes( } defer sig_exists.Close() + slot_to_blocktime := blocktimeindex.NewForEpoch(epoch) + totalOffset := uint64(0) { if size, err := rd.HeaderSize(); err != nil { @@ -266,6 +274,11 @@ func createAllIndexes( if err != nil { return nil, 0, fmt.Errorf("failed to index slot to cid: %w", err) } + + err = slot_to_blocktime.Set(uint64(block.Slot), int64(block.Meta.Blocktime)) + if err != nil { + return nil, 0, fmt.Errorf("failed to index slot to blocktime: %w", err) + } numIndexedBlocks++ } case iplddecoders.KindTransaction: @@ -391,6 +404,24 @@ func createAllIndexes( return nil }) + wg.Go(func() error { + klog.Infof("Sealing slot_to_blocktime index...") + + path := filepath.Join(indexDir, blocktimeindex.FormatFilename(epoch, rootCID, network)) + file, err := os.Create(path) + if err != nil { + return fmt.Errorf("failed to create slot_to_blocktime index file: %w", err) + } + defer file.Close() + + if _, err := slot_to_blocktime.WriteTo(file); err != nil { + return fmt.Errorf("failed to write slot_to_blocktime index: %w", err) + } + paths.SlotToBlocktime = path + klog.Infof("Successfully sealed slot_to_blocktime index: %s", paths.SlotToBlocktime) + return nil + }) + if err := wg.Wait(); err != nil { return nil, 0, err } @@ -412,6 +443,7 @@ type IndexPaths struct { SlotToCid string SignatureToCid string SignatureExists string + SlotToBlocktime string } // IndexPaths.String @@ -429,6 +461,9 @@ func (p *IndexPaths) String() string { builder.WriteString(" sig_exists:\n uri: ") builder.WriteString(quoteSingle(p.SignatureExists)) builder.WriteString("\n") + builder.WriteString(" slot_to_blocktime:\n uri: ") + builder.WriteString(quoteSingle(p.SlotToBlocktime)) + builder.WriteString("\n") return builder.String() } diff --git a/cmd-x-index-gsfa.go b/cmd-x-index-gsfa.go index 65db5400..edc1697c 100644 --- a/cmd-x-index-gsfa.go +++ b/cmd-x-index-gsfa.go @@ -20,6 +20,7 @@ import ( "github.com/rpcpool/yellowstone-faithful/indexmeta" "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" "github.com/rpcpool/yellowstone-faithful/iplddecoders" + "github.com/rpcpool/yellowstone-faithful/slottools" "github.com/urfave/cli/v2" "k8s.io/klog/v2" ) @@ -167,7 +168,7 @@ func newCmd_Index_gsfa() *cli.Command { verifyHash := c.Bool("verify-hash") ipldbindcode.DisableHashVerification = !verifyHash - epochStart, epochEnd := CalcEpochLimits(epoch) + epochStart, epochEnd := slottools.CalcEpochLimits(epoch) numSlots := uint64(0) numMaxObjects := uint64(0) @@ -233,7 +234,6 @@ func newCmd_Index_gsfa() *cli.Command { txWithInfo.Offset, txWithInfo.Length, txWithInfo.Slot, - txWithInfo.Blocktime, accountKeys, ) if err != nil { diff --git a/cmd-x-index-slot2blocktime.go b/cmd-x-index-slot2blocktime.go new file mode 100644 index 00000000..ca6371de --- /dev/null +++ b/cmd-x-index-slot2blocktime.go @@ -0,0 +1,163 @@ +package main + +import ( + "context" + "fmt" + "os" + "path/filepath" + "time" + + "github.com/ipfs/go-cid" + carv2 "github.com/ipld/go-car/v2" + "github.com/rpcpool/yellowstone-faithful/blocktimeindex" + "github.com/rpcpool/yellowstone-faithful/indexes" + "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" + "github.com/urfave/cli/v2" + "k8s.io/klog/v2" +) + +func newCmd_Index_slot2blocktime() *cli.Command { + var epoch uint64 + var network indexes.Network + return &cli.Command{ + Name: "slot-to-blocktime", + Description: "Given a CAR file containing a Solana epoch, create an index of the file that maps slots to blocktimes.", + ArgsUsage: " ", + Before: func(c *cli.Context) error { + if network == "" { + network = indexes.NetworkMainnet + } + return nil + }, + Flags: []cli.Flag{ + &cli.Uint64Flag{ + Name: "epoch", + Usage: "the epoch of the CAR file", + Destination: &epoch, + Required: true, + }, + &cli.StringFlag{ + Name: "network", + Usage: "the cluster of the epoch; one of: mainnet, testnet, devnet", + Action: func(c *cli.Context, s string) error { + network = indexes.Network(s) + if !indexes.IsValidNetwork(network) { + return fmt.Errorf("invalid network: %q", network) + } + return nil + }, + }, + }, + Subcommands: []*cli.Command{}, + Action: func(c *cli.Context) error { + carPath := c.Args().Get(0) + indexDir := c.Args().Get(1) + + { + startedAt := time.Now() + defer func() { + klog.Infof("Finished in %s", time.Since(startedAt)) + }() + klog.Infof("Creating slot-to-blocktime index for %s", carPath) + indexFilepath, err := CreateIndex_slot2blocktime( + context.TODO(), + epoch, + network, + carPath, + indexDir, + ) + if err != nil { + panic(err) + } + klog.Info("slot-to-blocktime index created at", indexFilepath) + } + return nil + }, + } +} + +// CreateIndex_slot2blocktime creates an index file that maps slot numbers to blocktimes. +func CreateIndex_slot2blocktime( + ctx context.Context, + epoch uint64, + network indexes.Network, + carPath string, + indexDir string, +) (string, error) { + // Check if the CAR file exists: + exists, err := fileExists(carPath) + if err != nil { + return "", fmt.Errorf("failed to check if CAR file exists: %w", err) + } + if !exists { + return "", fmt.Errorf("CAR file %q does not exist", carPath) + } + + cr, err := carv2.OpenReader(carPath) + if err != nil { + return "", fmt.Errorf("failed to open CAR file: %w", err) + } + + // check it has 1 root + roots, err := cr.Roots() + if err != nil { + return "", fmt.Errorf("failed to get roots: %w", err) + } + // There should be only one root CID in the CAR file. + if len(roots) != 1 { + return "", fmt.Errorf("CAR file has %d roots, expected 1", len(roots)) + } + rootCid := roots[0] + + slot_to_blocktime := blocktimeindex.NewForEpoch(epoch) + + numBlocksIndexed := uint64(0) + klog.Infof("Indexing...") + + dr, err := cr.DataReader() + if err != nil { + return "", fmt.Errorf("failed to get data reader: %w", err) + } + + // Iterate over all blocks in the CAR file and put them into the index, + // using the slot number as the key and the blocktime as the value. + err = FindBlocks( + ctx, + dr, + func(c cid.Cid, block *ipldbindcode.Block) error { + slotNum := uint64(block.Slot) + + err = slot_to_blocktime.Set(slotNum, int64(block.Meta.Blocktime)) + if err != nil { + return fmt.Errorf("failed to put cid to offset: %w", err) + } + + numBlocksIndexed++ + if numBlocksIndexed%1_000 == 0 { + printToStderr(".") + } + return nil + }) + if err != nil { + return "", fmt.Errorf("failed to index; error while iterating over blocks: %w", err) + } + + // Use the car file name and root CID to name the index file: + + klog.Infof("Sealing index...") + + indexFilePath := filepath.Join(indexDir, blocktimeindex.FormatFilename(epoch, rootCid, network)) + + file, err := os.Create(indexFilePath) + if err != nil { + return "", fmt.Errorf("failed to create slot_to_blocktime index file: %w", err) + } + defer file.Close() + + if _, err := slot_to_blocktime.WriteTo(file); err != nil { + return "", fmt.Errorf("failed to write slot_to_blocktime index: %w", err) + } + klog.Infof("Successfully sealed slot_to_blocktime index") + klog.Infof("Index created at %s; %d items indexed", indexFilePath, numBlocksIndexed) + return indexFilePath, nil +} diff --git a/cmd-x-index.go b/cmd-x-index.go index 58e58fc1..e51b8bdf 100644 --- a/cmd-x-index.go +++ b/cmd-x-index.go @@ -20,6 +20,7 @@ func newCmd_Index() *cli.Command { newCmd_Index_all(), // NOTE: not actually all. newCmd_Index_gsfa(), newCmd_Index_sigExists(), + newCmd_Index_slot2blocktime(), }, } } diff --git a/config.go b/config.go index 154b3d98..801014e1 100644 --- a/config.go +++ b/config.go @@ -148,6 +148,9 @@ type Config struct { SigExists struct { URI URI `json:"uri" yaml:"uri"` } `json:"sig_exists" yaml:"sig_exists"` + SlotToBlocktime struct { + URI URI `json:"uri" yaml:"uri"` + } `json:"slot_to_blocktime" yaml:"slot_to_blocktime"` } `json:"indexes" yaml:"indexes"` Genesis struct { URI URI `json:"uri" yaml:"uri"` @@ -365,6 +368,14 @@ func (c *Config) Validate() error { return err } } + { + if c.Indexes.SlotToBlocktime.URI.IsZero() { + return fmt.Errorf("indexes.slot_to_blocktime.uri must be set") + } + if err := isSupportedURI(c.Indexes.SlotToBlocktime.URI, "indexes.slot_to_blocktime.uri"); err != nil { + return err + } + } } { // check that the URIs are valid @@ -406,6 +417,9 @@ func (c *Config) Validate() error { return fmt.Errorf("indexes.gsfa.uri must be a local directory") } } + if !c.Indexes.SlotToBlocktime.URI.IsValid() { + return fmt.Errorf("indexes.slot_to_blocktime.uri is invalid") + } } { // if epoch is 0, then the genesis URI must be set: diff --git a/dataprep-tools/filecoin/boost_create_deals.py b/dataprep-tools/filecoin/boost_create_deals.py index 272b338c..76847e29 100644 --- a/dataprep-tools/filecoin/boost_create_deals.py +++ b/dataprep-tools/filecoin/boost_create_deals.py @@ -351,6 +351,7 @@ def create_deals(client: Any, epoch: str, deal_config: DealConfig, metadata_obj: continue deals_providers: Dict[str, List[Dict[str,Any]]] = {} + fields = [ "provider", "deal_uuid", diff --git a/epoch.go b/epoch.go index b2414063..6f2a145d 100644 --- a/epoch.go +++ b/epoch.go @@ -22,6 +22,7 @@ import ( carv2 "github.com/ipld/go-car/v2" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/libp2p/go-libp2p/core/peer" + "github.com/rpcpool/yellowstone-faithful/blocktimeindex" "github.com/rpcpool/yellowstone-faithful/bucketteer" "github.com/rpcpool/yellowstone-faithful/carreader" deprecatedbucketter "github.com/rpcpool/yellowstone-faithful/deprecated/bucketteer" @@ -55,6 +56,7 @@ type Epoch struct { sigToCidIndex *indexes.SigToCid_Reader sigExists SigExistsIndex gsfaReader *gsfa.GsfaReader + blocktimeindex *blocktimeindex.Index onClose []func() error allCache *hugecache.Cache } @@ -63,6 +65,18 @@ func (r *Epoch) GetCache() *hugecache.Cache { return r.allCache } +func (r *Epoch) GetBlocktime(slot uint64) (int64, error) { + if r.blocktimeindex == nil { + return 0, fmt.Errorf("blocktime index is not available") + } + return r.blocktimeindex.Get(slot) +} + +// GetBlocktimeIndex returns the blocktime index for the epoch. +func (r *Epoch) GetBlocktimeIndex() *blocktimeindex.Index { + return r.blocktimeindex +} + func (e *Epoch) Epoch() uint64 { return e.epoch } @@ -479,12 +493,50 @@ func NewEpochFromConfig( } } } + { + slotToBlocktimeFile, err := openIndexStorage( + c.Context, + string(config.Indexes.SlotToBlocktime.URI), + ) + if err != nil { + return nil, fmt.Errorf("failed to open slot-to-blocktime index file: %w", err) + } + buf, err := ReadAllFromReaderAt(slotToBlocktimeFile, uint64(blocktimeindex.DefaultIndexByteSize)) + if err != nil { + return nil, fmt.Errorf("failed to read slot-to-blocktime index: %w", err) + } + blocktimeIndex, err := blocktimeindex.FromBytes(buf) + if err != nil { + return nil, fmt.Errorf("failed to decode slot-to-blocktime index: %w", err) + } + // can close the file now: + err = slotToBlocktimeFile.Close() + if err != nil { + return nil, fmt.Errorf("failed to close slot-to-blocktime index file: %w", err) + } + if blocktimeIndex.Epoch() != ep.Epoch() { + return nil, fmt.Errorf("epoch mismatch in slot-to-blocktime index: expected %d, got %d", ep.Epoch(), blocktimeIndex.Epoch()) + } + ep.blocktimeindex = blocktimeIndex + } ep.rootCid = lastRootCid return ep, nil } +func ReadAllFromReaderAt(reader io.ReaderAt, size uint64) ([]byte, error) { + buf := make([]byte, size) + n, err := reader.ReadAt(buf, 0) + if err != nil { + return nil, fmt.Errorf("failed to read: %w", err) + } + if uint64(n) != size { + return nil, fmt.Errorf("failed to read all bytes: expected %d, got %d", size, n) + } + return buf, nil +} + func ParseFilecoinProviders(vs ...string) ([]peer.AddrInfo, error) { providerAddrInfos := make([]peer.AddrInfo, 0, len(vs)) diff --git a/grpc-server.go b/grpc-server.go index a04c2007..1baec44b 100644 --- a/grpc-server.go +++ b/grpc-server.go @@ -25,6 +25,7 @@ import ( "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" "github.com/rpcpool/yellowstone-faithful/iplddecoders" old_faithful_grpc "github.com/rpcpool/yellowstone-faithful/old-faithful-proto/old-faithful-grpc" + "github.com/rpcpool/yellowstone-faithful/slottools" solanatxmetaparsers "github.com/rpcpool/yellowstone-faithful/solana-tx-meta-parsers" "github.com/rpcpool/yellowstone-faithful/tooling" "golang.org/x/sync/errgroup" @@ -71,7 +72,7 @@ func (me *MultiEpoch) GetVersion(context.Context, *old_faithful_grpc.VersionRequ func (multi *MultiEpoch) GetBlock(ctx context.Context, params *old_faithful_grpc.BlockRequest) (*old_faithful_grpc.BlockResponse, error) { // find the epoch that contains the requested slot slot := params.Slot - epochNumber := CalcEpochForSlot(slot) + epochNumber := slottools.CalcEpochForSlot(slot) epochHandler, err := multi.GetEpoch(epochNumber) if err != nil { return nil, status.Errorf(codes.NotFound, "Epoch %d is not available", epochNumber) @@ -90,7 +91,7 @@ func (multi *MultiEpoch) GetBlock(ctx context.Context, params *old_faithful_grpc tim.time("GetBlock") { prefetcherFromCar := func() error { - parentIsInPreviousEpoch := CalcEpochForSlot(uint64(block.Meta.Parent_slot)) != CalcEpochForSlot(slot) + parentIsInPreviousEpoch := slottools.CalcEpochForSlot(uint64(block.Meta.Parent_slot)) != slottools.CalcEpochForSlot(slot) if slot == 0 { parentIsInPreviousEpoch = true } @@ -350,7 +351,7 @@ func (multi *MultiEpoch) GetBlock(ctx context.Context, params *old_faithful_grpc { // get parent slot parentSlot := uint64(block.Meta.Parent_slot) - if (parentSlot != 0 || slot == 1) && CalcEpochForSlot(parentSlot) == epochNumber { + if (parentSlot != 0 || slot == 1) && slottools.CalcEpochForSlot(parentSlot) == epochNumber { // NOTE: if the parent is in the same epoch, we can get it from the same epoch handler as the block; // otherwise, we need to get it from the previous epoch (TODO: implement this) parentBlock, _, err := epochHandler.GetBlock(WithSubrapghPrefetch(ctx, false), parentSlot) @@ -415,13 +416,15 @@ func (multi *MultiEpoch) GetTransaction(ctx context.Context, params *old_faithfu } response.Slot = uint64(transactionNode.Slot) { - block, _, err := epochHandler.GetBlock(ctx, uint64(transactionNode.Slot)) - if err != nil { - return nil, status.Errorf(codes.Internal, "Failed to get block: %v", err) - } - blocktime := uint64(block.Meta.Blocktime) - if blocktime != 0 { + blocktimeIndex := epochHandler.GetBlocktimeIndex() + if blocktimeIndex != nil { + blocktime, err := blocktimeIndex.Get(uint64(transactionNode.Slot)) + if err != nil { + return nil, status.Errorf(codes.Internal, "Failed to get blocktime: %v", err) + } response.BlockTime = int64(blocktime) + } else { + return nil, status.Errorf(codes.Internal, "Failed to get blocktime: blocktime index is nil") } } @@ -606,29 +609,24 @@ func (multi *MultiEpoch) Get(ser old_faithful_grpc.OldFaithful_GetServer) error func (multi *MultiEpoch) GetBlockTime(ctx context.Context, params *old_faithful_grpc.BlockTimeRequest) (*old_faithful_grpc.BlockTimeResponse, error) { slot := params.Slot - epochNumber := CalcEpochForSlot(slot) + epochNumber := slottools.CalcEpochForSlot(slot) epochHandler, err := multi.GetEpoch(epochNumber) if err != nil { return nil, status.Errorf(codes.NotFound, "Epoch %d is not available", epochNumber) } - block, _, err := epochHandler.GetBlock(WithSubrapghPrefetch(ctx, false), slot) - if err != nil { - if errors.Is(err, compactindexsized.ErrNotFound) { - return nil, status.Errorf(codes.NotFound, "Slot %d was skipped, or missing in long-term storage", slot) - } else { + blocktimeIndex := epochHandler.GetBlocktimeIndex() + if blocktimeIndex != nil { + blocktime, err := blocktimeIndex.Get(slot) + if err != nil { return nil, status.Errorf(codes.Internal, "Failed to get block: %v", err) } + return &old_faithful_grpc.BlockTimeResponse{ + BlockTime: blocktime, + }, nil + } else { + return nil, status.Errorf(codes.Internal, "Failed to get block: blocktime index is not available") } - - blocktime := uint64(block.Meta.Blocktime) - if blocktime == 0 { - return nil, status.Errorf(codes.NotFound, "Blocktime for slot %d is not available", slot) - } - - return &old_faithful_grpc.BlockTimeResponse{ - BlockTime: int64(blocktime), - }, nil } func (multi *MultiEpoch) StreamBlocks(params *old_faithful_grpc.StreamBlocksRequest, ser old_faithful_grpc.OldFaithful_StreamBlocksServer) error { @@ -827,7 +825,7 @@ func (multi *MultiEpoch) processSlotTransactions( ctx, pKey, 1000, - func(epochNum uint64, oas linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error) { + func(epochNum uint64, oas linkedlog.OffsetAndSizeAndSlot) (*ipldbindcode.Transaction, error) { epoch, err := multi.GetEpoch(epochNum) if err != nil { return nil, fmt.Errorf("failed to get epoch %d: %w", epochNum, err) @@ -880,7 +878,7 @@ func (multi *MultiEpoch) processSlotTransactions( } txResp.Slot = uint64(txn.Slot) - // To do: add blocketime after index work is done + // To do: add blocktime after index work is done } if err := ser.Send(txResp); err != nil { diff --git a/gsfa/gsfa-read-multiepoch.go b/gsfa/gsfa-read-multiepoch.go index 6141bdd3..6646136a 100644 --- a/gsfa/gsfa-read-multiepoch.go +++ b/gsfa/gsfa-read-multiepoch.go @@ -50,7 +50,7 @@ func (gsfa *GsfaReaderMultiepoch) Get( ctx context.Context, pk solana.PublicKey, limit int, - fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error), + fetcher func(uint64, linkedlog.OffsetAndSizeAndSlot) (*ipldbindcode.Transaction, error), ) (EpochToTransactionObjects, error) { if limit <= 0 { return nil, nil @@ -102,7 +102,7 @@ func (multi *GsfaReaderMultiepoch) GetBeforeUntil( limit int, before *solana.Signature, // Before this signature, exclusive (i.e. get signatures older than this signature, excluding it). until *solana.Signature, // Until this signature, inclusive (i.e. stop at this signature, including it). - fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error), + fetcher func(uint64, linkedlog.OffsetAndSizeAndSlot) (*ipldbindcode.Transaction, error), ) (EpochToTransactionObjects, error) { if limit <= 0 { return make(EpochToTransactionObjects), nil @@ -118,7 +118,7 @@ func (multi *GsfaReaderMultiepoch) iterBeforeUntil( limit int, before *solana.Signature, // Before this signature, exclusive (i.e. get signatures older than this signature, excluding it). until *solana.Signature, // Until this signature, inclusive (i.e. stop at this signature, including it). - fetcher func(uint64, linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error), + fetcher func(uint64, linkedlog.OffsetAndSizeAndSlot) (*ipldbindcode.Transaction, error), ) (EpochToTransactionObjects, error) { if limit <= 0 { return make(EpochToTransactionObjects), nil diff --git a/gsfa/gsfa-read.go b/gsfa/gsfa-read.go index eaa01426..b3fe6219 100644 --- a/gsfa/gsfa-read.go +++ b/gsfa/gsfa-read.go @@ -93,9 +93,9 @@ func (index *GsfaReader) Get( ctx context.Context, pk solana.PublicKey, limit int, -) ([]linkedlog.OffsetAndSizeAndBlocktime, error) { +) ([]linkedlog.OffsetAndSizeAndSlot, error) { if limit <= 0 { - return []linkedlog.OffsetAndSizeAndBlocktime{}, nil + return []linkedlog.OffsetAndSizeAndSlot{}, nil } lastOffset, err := index.offsets.Get(pk) if err != nil { @@ -106,7 +106,7 @@ func (index *GsfaReader) Get( } debugln("locs.OffsetToFirst:", lastOffset) - var allTransactionLocations []linkedlog.OffsetAndSizeAndBlocktime + var allTransactionLocations []linkedlog.OffsetAndSizeAndSlot next := lastOffset // Start from the latest, and go back in time. for { @@ -138,10 +138,10 @@ func (index *GsfaReader) GetBeforeUntil( limit int, before *solana.Signature, // Before this signature, exclusive (i.e. get signatures older than this signature, excluding it). until *solana.Signature, // Until this signature, inclusive (i.e. stop at this signature, including it). - fetcher func(sigIndex linkedlog.OffsetAndSizeAndBlocktime) (solana.Signature, error), -) ([]linkedlog.OffsetAndSizeAndBlocktime, error) { + fetcher func(sigIndex linkedlog.OffsetAndSizeAndSlot) (solana.Signature, error), +) ([]linkedlog.OffsetAndSizeAndSlot, error) { if limit <= 0 { - return []linkedlog.OffsetAndSizeAndBlocktime{}, nil + return []linkedlog.OffsetAndSizeAndSlot{}, nil } locs, err := index.offsets.Get(pk) if err != nil { @@ -152,7 +152,7 @@ func (index *GsfaReader) GetBeforeUntil( } debugln("locs.OffsetToFirst:", locs) - var allTransactionLocations []linkedlog.OffsetAndSizeAndBlocktime + var allTransactionLocations []linkedlog.OffsetAndSizeAndSlot next := locs // Start from the latest, and go back in time. reachedBefore := false diff --git a/gsfa/gsfa-write.go b/gsfa/gsfa-write.go index 136e5563..b940ecbd 100644 --- a/gsfa/gsfa-write.go +++ b/gsfa/gsfa-write.go @@ -28,7 +28,7 @@ type GsfaWriter struct { ll *linkedlog.LinkedLog man *manifest.Manifest fullBufferWriterChan chan linkedlog.KeyToOffsetAndSizeAndBlocktime - accum *hashmap.Map[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktime] + accum *hashmap.Map[solana.PublicKey, []*linkedlog.OffsetAndSizeAndSlot] offsetsWriter *indexes.PubkeyToOffsetAndSize_Writer ctx context.Context cancel context.CancelFunc @@ -64,7 +64,7 @@ func NewGsfaWriter( fullBufferWriterChan: make(chan linkedlog.KeyToOffsetAndSizeAndBlocktime, 50), // TODO: make this configurable popRank: newRollingRankOfTopPerformers(10_000), offsets: hashmap.New[solana.PublicKey, [2]uint64](int(1_000_000)), - accum: hashmap.New[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktime](int(1_000_000)), + accum: hashmap.New[solana.PublicKey, []*linkedlog.OffsetAndSizeAndSlot](int(1_000_000)), ctx: ctx, cancel: cancel, fullBufferWriterDone: make(chan struct{}), @@ -145,16 +145,15 @@ func (a *GsfaWriter) Push( offset uint64, length uint64, slot uint64, - blocktime uint64, publicKeys solana.PublicKeySlice, ) error { a.mu.Lock() defer a.mu.Unlock() - oas := &linkedlog.OffsetAndSizeAndBlocktime{ - Offset: offset, - Size: length, - Blocktime: blocktime, + oas := &linkedlog.OffsetAndSizeAndSlot{ + Offset: offset, + Size: length, + Slot: slot, } publicKeys = publicKeys.Dedupe() publicKeys.Sort() @@ -190,7 +189,7 @@ func (a *GsfaWriter) Push( for _, publicKey := range publicKeys { current, ok := a.accum.Get(publicKey) if !ok { - current = make([]*linkedlog.OffsetAndSizeAndBlocktime, 0, itemsPerBatch) + current = make([]*linkedlog.OffsetAndSizeAndSlot, 0, itemsPerBatch) current = append(current, oas) a.accum.Set(publicKey, current) } else { @@ -259,7 +258,7 @@ func (a *GsfaWriter) Close() error { ) } -func (a *GsfaWriter) flushAccum(m *hashmap.Map[solana.PublicKey, []*linkedlog.OffsetAndSizeAndBlocktime]) error { +func (a *GsfaWriter) flushAccum(m *hashmap.Map[solana.PublicKey, []*linkedlog.OffsetAndSizeAndSlot]) error { keys := solana.PublicKeySlice(m.Keys()) keys.Sort() for ii := range keys { diff --git a/gsfa/linkedlog/linked-log.go b/gsfa/linkedlog/linked-log.go index be7f2c6d..322700c3 100644 --- a/gsfa/linkedlog/linked-log.go +++ b/gsfa/linkedlog/linked-log.go @@ -111,7 +111,7 @@ func (s *LinkedLog) write(b []byte) (uint64, uint32, error) { const mib = 1024 * 1024 // Read reads the block stored at the given offset. -func (s *LinkedLog) Read(offset uint64) ([]OffsetAndSizeAndBlocktime, indexes.OffsetAndSize, error) { +func (s *LinkedLog) Read(offset uint64) ([]OffsetAndSizeAndSlot, indexes.OffsetAndSize, error) { lenBuf := make([]byte, binary.MaxVarintLen64) _, err := s.file.ReadAt(lenBuf, int64(offset)) if err != nil { @@ -130,7 +130,7 @@ func sizeOfUvarint(n uint64) int { return binary.PutUvarint(make([]byte, binary.MaxVarintLen64), n) } -func (s *LinkedLog) ReadWithSize(offset uint64, size uint64) ([]OffsetAndSizeAndBlocktime, indexes.OffsetAndSize, error) { +func (s *LinkedLog) ReadWithSize(offset uint64, size uint64) ([]OffsetAndSizeAndSlot, indexes.OffsetAndSize, error) { if size > 256*mib { return nil, indexes.OffsetAndSize{}, fmt.Errorf("compacted indexes length too large: %d", size) } @@ -158,12 +158,12 @@ func (s *LinkedLog) ReadWithSize(offset uint64, size uint64) ([]OffsetAndSizeAnd return sigIndexes, nextOffset, nil } -func decompressIndexes(data []byte) ([]OffsetAndSizeAndBlocktime, error) { +func decompressIndexes(data []byte) ([]OffsetAndSizeAndSlot, error) { decompressed, err := tooling.DecompressZstd(data) if err != nil { return nil, fmt.Errorf("error while decompressing data: %w", err) } - return OffsetAndSizeAndBlocktimeSliceFromBytes(decompressed) + return OffsetAndSizeAndSlotSliceFromBytes(decompressed) } type KeyToOffsetAndSizeAndBlocktimeSlice []KeyToOffsetAndSizeAndBlocktime @@ -180,7 +180,7 @@ func (s KeyToOffsetAndSizeAndBlocktimeSlice) Has(key solana.PublicKey) bool { type KeyToOffsetAndSizeAndBlocktime struct { Key solana.PublicKey - Values []*OffsetAndSizeAndBlocktime + Values []*OffsetAndSizeAndSlot } func (s *LinkedLog) Put( @@ -205,7 +205,7 @@ func (s *LinkedLog) Put( if len(val.Values) == 0 { continue } - slices.Reverse[[]*OffsetAndSizeAndBlocktime](val.Values) // reverse the slice so that the most recent indexes are first + slices.Reverse[[]*OffsetAndSizeAndSlot](val.Values) // reverse the slice so that the most recent indexes are first err := func() error { encodedIndexes, err := createIndexesPayload(val.Values) if err != nil { @@ -245,7 +245,7 @@ func (s *LinkedLog) Put( return uint64(previousSize), nil } -func createIndexesPayload(indexes []*OffsetAndSizeAndBlocktime) ([]byte, error) { +func createIndexesPayload(indexes []*OffsetAndSizeAndSlot) ([]byte, error) { buf := make([]byte, 0, 9*len(indexes)) for _, index := range indexes { buf = append(buf, index.Bytes()...) diff --git a/gsfa/linkedlog/offset-size-blocktime.go b/gsfa/linkedlog/offset-size-slot.go similarity index 60% rename from gsfa/linkedlog/offset-size-blocktime.go rename to gsfa/linkedlog/offset-size-slot.go index 5e5977cb..dbf62885 100644 --- a/gsfa/linkedlog/offset-size-blocktime.go +++ b/gsfa/linkedlog/offset-size-slot.go @@ -8,32 +8,32 @@ import ( "slices" ) -func NewOffsetAndSizeAndBlocktime(offset uint64, size uint64, blocktime uint64) *OffsetAndSizeAndBlocktime { - return &OffsetAndSizeAndBlocktime{ - Offset: offset, - Size: size, - Blocktime: blocktime, +func NewOffsetAndSizeAndSlot(offset uint64, size uint64, slot uint64) *OffsetAndSizeAndSlot { + return &OffsetAndSizeAndSlot{ + Offset: offset, + Size: size, + Slot: slot, } } -type OffsetAndSizeAndBlocktime struct { - Offset uint64 // uint48, 6 bytes, max 281.5 TB (terabytes) - Size uint64 // uint24, 3 bytes, max 16.7 MB (megabytes) - Blocktime uint64 // uint40, 5 bytes, max 1099511627775 (seconds since epoch) +type OffsetAndSizeAndSlot struct { + Offset uint64 // encoded as uvarint + Size uint64 // encoded as uvarint + Slot uint64 // encoded as uvarint } // Bytes returns the offset and size as a byte slice. -func (oas OffsetAndSizeAndBlocktime) Bytes() []byte { +func (oas OffsetAndSizeAndSlot) Bytes() []byte { buf := make([]byte, 0, binary.MaxVarintLen64*3) buf = binary.AppendUvarint(buf, oas.Offset) buf = binary.AppendUvarint(buf, oas.Size) - buf = binary.AppendUvarint(buf, oas.Blocktime) + buf = binary.AppendUvarint(buf, oas.Slot) buf = slices.Clip(buf) return buf } // FromBytes parses the offset and size from a byte slice. -func (oas *OffsetAndSizeAndBlocktime) FromBytes(buf []byte) error { +func (oas *OffsetAndSizeAndSlot) FromBytes(buf []byte) error { if len(buf) > binary.MaxVarintLen64*3 { return errors.New("invalid byte slice length") } @@ -48,14 +48,14 @@ func (oas *OffsetAndSizeAndBlocktime) FromBytes(buf []byte) error { return errors.New("failed to parse size") } buf = buf[n:] - oas.Blocktime, n = binary.Uvarint(buf) + oas.Slot, n = binary.Uvarint(buf) if n <= 0 { - return errors.New("failed to parse blocktime") + return errors.New("failed to parse slot") } return nil } -func (oas *OffsetAndSizeAndBlocktime) FromReader(r UvarintReader) error { +func (oas *OffsetAndSizeAndSlot) FromReader(r UvarintReader) error { var err error oas.Offset, err = r.ReadUvarint() if err != nil { @@ -65,9 +65,9 @@ func (oas *OffsetAndSizeAndBlocktime) FromReader(r UvarintReader) error { if err != nil { return fmt.Errorf("failed to read size: %w", err) } - oas.Blocktime, err = r.ReadUvarint() + oas.Slot, err = r.ReadUvarint() if err != nil { - return fmt.Errorf("failed to read blocktime: %w", err) + return fmt.Errorf("failed to read slot: %w", err) } return nil } @@ -92,11 +92,11 @@ func (r *uvarintReader) ReadUvarint() (uint64, error) { return v, nil } -func OffsetAndSizeAndBlocktimeSliceFromBytes(buf []byte) ([]OffsetAndSizeAndBlocktime, error) { +func OffsetAndSizeAndSlotSliceFromBytes(buf []byte) ([]OffsetAndSizeAndSlot, error) { r := &uvarintReader{buf: buf} - oass := make([]OffsetAndSizeAndBlocktime, 0) + oass := make([]OffsetAndSizeAndSlot, 0) for { - oas := OffsetAndSizeAndBlocktime{} + oas := OffsetAndSizeAndSlot{} err := oas.FromReader(r) if err != nil { if errors.Is(err, io.EOF) { diff --git a/gsfa/linkedlog/offset-size-blocktime_test.go b/gsfa/linkedlog/offset-size-slot_test.go similarity index 59% rename from gsfa/linkedlog/offset-size-blocktime_test.go rename to gsfa/linkedlog/offset-size-slot_test.go index 99d1ac6a..2d9253fc 100644 --- a/gsfa/linkedlog/offset-size-blocktime_test.go +++ b/gsfa/linkedlog/offset-size-slot_test.go @@ -6,17 +6,17 @@ import ( "testing" ) -func TestOffsetAndSizeAndBlocktime(t *testing.T) { +func TestOffsetAndSizeAndSlot(t *testing.T) { { - ca := OffsetAndSizeAndBlocktime{ - Offset: 1, - Size: 2, - Blocktime: 3, + ca := OffsetAndSizeAndSlot{ + Offset: 1, + Size: 2, + Slot: 3, } buf := ca.Bytes() { - ca2 := OffsetAndSizeAndBlocktime{} + ca2 := OffsetAndSizeAndSlot{} err := ca2.FromBytes(buf) if err != nil { panic(err) @@ -28,15 +28,15 @@ func TestOffsetAndSizeAndBlocktime(t *testing.T) { } { // now with very high values - ca := OffsetAndSizeAndBlocktime{ - Offset: 281474976710655, - Size: 16777215, - Blocktime: 1099511627775, + ca := OffsetAndSizeAndSlot{ + Offset: 281474976710655, + Size: 16777215, + Slot: 1099511627775, } buf := ca.Bytes() { - ca2 := OffsetAndSizeAndBlocktime{} + ca2 := OffsetAndSizeAndSlot{} err := ca2.FromBytes(buf) if err != nil { panic(err) @@ -47,21 +47,21 @@ func TestOffsetAndSizeAndBlocktime(t *testing.T) { } } { - many := []OffsetAndSizeAndBlocktime{ + many := []OffsetAndSizeAndSlot{ { - Offset: 1, - Size: 2, - Blocktime: 3, + Offset: 1, + Size: 2, + Slot: 3, }, { - Offset: 4, - Size: 5, - Blocktime: 6, + Offset: 4, + Size: 5, + Slot: 6, }, { - Offset: 281474976710655, - Size: 16777215, - Blocktime: 1099511627775, + Offset: 281474976710655, + Size: 16777215, + Slot: 1099511627775, }, } buf := make([]byte, 0, binary.MaxVarintLen64*3*len(many)) @@ -70,7 +70,7 @@ func TestOffsetAndSizeAndBlocktime(t *testing.T) { } { - many2, err := OffsetAndSizeAndBlocktimeSliceFromBytes(buf) + many2, err := OffsetAndSizeAndSlotSliceFromBytes(buf) if err != nil { panic(err) } diff --git a/gsfa/manifest/manifest.go b/gsfa/manifest/manifest.go index 96bf4a03..fe74bcf6 100644 --- a/gsfa/manifest/manifest.go +++ b/gsfa/manifest/manifest.go @@ -19,8 +19,12 @@ type Manifest struct { } var ( - _MAGIC = [...]byte{'g', 's', 'f', 'a', 'm', 'n', 'f', 's'} - _Version = uint64(3) + _MAGIC = [...]byte{'g', 's', 'f', 'a', 'm', 'n', 'f', 's'} + // NOTES: + // - v3: stores offset, size, and blocktime + // - v4: stores offset, size, and slot + // Version is the version of the gsfa index. + _Version = uint64(4) ) var headerLenWithoutMeta = len(_MAGIC) + 8 // 8 bytes for the version @@ -125,7 +129,7 @@ func NewManifest(filename string, meta indexmeta.Meta) (*Manifest, error) { return nil, err } if header.Version() != _Version { - return nil, fmt.Errorf("unsupported manifest version: %d", header.Version()) + return nil, fmt.Errorf("unsupported manifest version: %d; expected %d", header.Version(), _Version) } man.header = header } diff --git a/http-range.go b/http-range.go index d03346fb..31170044 100644 --- a/http-range.go +++ b/http-range.go @@ -63,6 +63,7 @@ func (r *readCloserWrapper) ReadAt(p []byte, off int64) (n int, err error) { metrics.IndexLookupHistogram.WithLabelValues( indexName, boolToString(r.isRemote), + boolToString(r.isSplitCar), ).Observe(float64(took.Seconds())) } diff --git a/index-slot-to-cid.go b/index-slot-to-cid.go index 1161e134..67afd11f 100644 --- a/index-slot-to-cid.go +++ b/index-slot-to-cid.go @@ -7,11 +7,11 @@ import ( "path/filepath" "time" - "github.com/dustin/go-humanize" "github.com/ipfs/go-cid" carv2 "github.com/ipld/go-car/v2" "github.com/rpcpool/yellowstone-faithful/indexes" "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" + "github.com/rpcpool/yellowstone-faithful/slottools" "k8s.io/klog/v2" ) @@ -49,19 +49,13 @@ func CreateIndex_slot2cid( } rootCid := roots[0] - // TODO: use another way to precisely count the number of solana Blocks in the CAR file. - klog.Infof("Counting items in car file...") - numItems, err := carCountItems(carPath) - if err != nil { - return "", fmt.Errorf("failed to count items in car file: %w", err) - } - klog.Infof("Found %s items in car file", humanize.Comma(int64(numItems))) - tmpDir = filepath.Join(tmpDir, "index-slot-to-cid-"+time.Now().Format("20060102-150405.000000000")) if err = os.MkdirAll(tmpDir, 0o755); err != nil { return "", fmt.Errorf("failed to create tmp dir: %w", err) } + numItems := uint64(slottools.EpochLen) + klog.Infof("Creating builder with %d items", numItems) sl2c, err := indexes.NewWriter_SlotToCid( epoch, diff --git a/multiepoch-getBlock.go b/multiepoch-getBlock.go index 11c96412..d2cd02fc 100644 --- a/multiepoch-getBlock.go +++ b/multiepoch-getBlock.go @@ -18,6 +18,7 @@ import ( jsoniter "github.com/json-iterator/go" "github.com/rpcpool/yellowstone-faithful/compactindexsized" "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" + "github.com/rpcpool/yellowstone-faithful/slottools" solanablockrewards "github.com/rpcpool/yellowstone-faithful/solana-block-rewards" "github.com/rpcpool/yellowstone-faithful/tooling" "github.com/sourcegraph/jsonrpc2" @@ -62,7 +63,7 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex slot := params.Slot // find the epoch that contains the requested slot - epochNumber := CalcEpochForSlot(slot) + epochNumber := slottools.CalcEpochForSlot(slot) epochHandler, err := multi.GetEpoch(epochNumber) if err != nil { return &jsonrpc2.Error{ @@ -93,7 +94,7 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex tim.time("GetBlock") { prefetcherFromCar := func() error { - parentIsInPreviousEpoch := CalcEpochForSlot(uint64(block.Meta.Parent_slot)) != CalcEpochForSlot(slot) + parentIsInPreviousEpoch := slottools.CalcEpochForSlot(uint64(block.Meta.Parent_slot)) != slottools.CalcEpochForSlot(slot) if slot == 0 { parentIsInPreviousEpoch = true } @@ -451,7 +452,7 @@ func (multi *MultiEpoch) handleGetBlock(ctx context.Context, conn *requestContex { // get parent slot parentSlot := uint64(block.Meta.Parent_slot) - if (parentSlot != 0 || slot == 1) && CalcEpochForSlot(parentSlot) == epochNumber { + if (parentSlot != 0 || slot == 1) && slottools.CalcEpochForSlot(parentSlot) == epochNumber { // NOTE: if the parent is in the same epoch, we can get it from the same epoch handler as the block; // otherwise, we need to get it from the previous epoch (TODO: implement this) parentBlock, _, err := epochHandler.GetBlock(WithSubrapghPrefetch(ctx, false), parentSlot) diff --git a/multiepoch-getBlockTime.go b/multiepoch-getBlockTime.go index c9b87172..c76ef910 100644 --- a/multiepoch-getBlockTime.go +++ b/multiepoch-getBlockTime.go @@ -2,10 +2,9 @@ package main import ( "context" - "errors" "fmt" - "github.com/rpcpool/yellowstone-faithful/compactindexsized" + "github.com/rpcpool/yellowstone-faithful/slottools" "github.com/sourcegraph/jsonrpc2" ) @@ -19,7 +18,7 @@ func (multi *MultiEpoch) handleGetBlockTime(ctx context.Context, conn *requestCo } // find the epoch that contains the requested slot - epochNumber := CalcEpochForSlot(blockNum) + epochNumber := slottools.CalcEpochForSlot(blockNum) epochHandler, err := multi.GetEpoch(epochNumber) if err != nil { return &jsonrpc2.Error{ @@ -27,34 +26,35 @@ func (multi *MultiEpoch) handleGetBlockTime(ctx context.Context, conn *requestCo Message: fmt.Sprintf("Epoch %d is not available", epochNumber), }, fmt.Errorf("failed to get epoch %d: %w", epochNumber, err) } - - block, _, err := epochHandler.GetBlock(WithSubrapghPrefetch(ctx, false), blockNum) - if err != nil { - if errors.Is(err, compactindexsized.ErrNotFound) { - return &jsonrpc2.Error{ - Code: CodeNotFound, - Message: fmt.Sprintf("Slot %d was skipped, or missing in long-term storage", blockNum), - }, err + { + blocktimeIndex := epochHandler.GetBlocktimeIndex() + if blocktimeIndex != nil { + blockTime, err := blocktimeIndex.Get(blockNum) + if err != nil { + return &jsonrpc2.Error{ + Code: CodeNotFound, + Message: fmt.Sprintf("Slot %d was skipped, or missing in long-term storage", blockNum), + }, fmt.Errorf("failed to get blocktime: %w", err) + } + err = conn.ReplyRaw( + ctx, + req.ID, + func() any { + if blockTime != 0 { + return blockTime + } + return nil + }(), + ) + if err != nil { + return nil, fmt.Errorf("failed to reply: %w", err) + } + return nil, nil } else { return &jsonrpc2.Error{ Code: jsonrpc2.CodeInternalError, Message: "Failed to get block", - }, fmt.Errorf("failed to get block: %w", err) + }, fmt.Errorf("failed to get blocktime: blocktime index is nil") } } - blockTime := uint64(block.Meta.Blocktime) - err = conn.ReplyRaw( - ctx, - req.ID, - func() any { - if blockTime != 0 { - return blockTime - } - return nil - }(), - ) - if err != nil { - return nil, fmt.Errorf("failed to reply: %w", err) - } - return nil, nil } diff --git a/multiepoch-getSignaturesForAddress.go b/multiepoch-getSignaturesForAddress.go index d953ed9c..b9c47d5e 100644 --- a/multiepoch-getSignaturesForAddress.go +++ b/multiepoch-getSignaturesForAddress.go @@ -11,6 +11,7 @@ import ( "github.com/rpcpool/yellowstone-faithful/indexes" "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" "github.com/rpcpool/yellowstone-faithful/iplddecoders" + "github.com/rpcpool/yellowstone-faithful/slottools" "github.com/sourcegraph/jsonrpc2" "k8s.io/klog/v2" ) @@ -47,12 +48,11 @@ func (ser *MultiEpoch) getGsfaReadersInEpochDescendingOrderForSlotRange(ctx cont ser.mu.RLock() defer ser.mu.RUnlock() - startEpoch := CalcEpochForSlot(startSlot) - endEpoch := CalcEpochForSlot(endSlot) + startEpoch := slottools.CalcEpochForSlot(startSlot) + endEpoch := slottools.CalcEpochForSlot(endSlot) epochs := make([]*Epoch, 0, endEpoch-startEpoch+1) for _, epoch := range ser.epochs { - if epoch.Epoch() >= startEpoch && epoch.Epoch() <= endEpoch { epochs = append(epochs, epoch) } @@ -80,7 +80,6 @@ func (ser *MultiEpoch) getGsfaReadersInEpochDescendingOrderForSlotRange(ctx cont } return gsfaReaderMultiEpoch, epochNums - } func countTransactions(v gsfa.EpochToTransactionObjects) int { @@ -125,26 +124,27 @@ func (multi *MultiEpoch) handleGetSignaturesForAddress(ctx context.Context, conn } var blockTimeCache struct { - m map[uint64]uint64 + m map[uint64]int64 mu sync.Mutex } - blockTimeCache.m = make(map[uint64]uint64) - getBlockTime := func(slot uint64, ser *Epoch) uint64 { - // NOTE: this means that you have to potentially fetch 1k blocks to get the blocktime for each transaction. - // TODO: include blocktime into the transaction data, or in the gsfaindex. - // return 0 + blockTimeCache.m = make(map[uint64]int64) + getBlockTime := func(slot uint64, ser *Epoch) int64 { blockTimeCache.mu.Lock() defer blockTimeCache.mu.Unlock() if blockTime, ok := blockTimeCache.m[slot]; ok { return blockTime } - block, _, err := ser.GetBlock(ctx, slot) - if err != nil { - klog.Errorf("failed to get block time for slot %d: %v", slot, err) - return 0 + blocktimeIndex := ser.GetBlocktimeIndex() + if blocktimeIndex != nil { + blocktime, err := blocktimeIndex.Get(slot) + if err != nil { + klog.Errorf("failed to get block time for slot %d: %v", slot, err) + return 0 + } + blockTimeCache.m[slot] = blocktime + return blocktime } - blockTimeCache.m[slot] = uint64(block.Meta.Blocktime) - return uint64(block.Meta.Blocktime) + return 0 } // Get the transactions: @@ -154,7 +154,7 @@ func (multi *MultiEpoch) handleGetSignaturesForAddress(ctx context.Context, conn limit, params.Before, params.Until, - func(epochNum uint64, oas linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error) { + func(epochNum uint64, oas linkedlog.OffsetAndSizeAndSlot) (*ipldbindcode.Transaction, error) { epoch, err := multi.GetEpoch(epochNum) if err != nil { return nil, fmt.Errorf("failed to get epoch %d: %w", epochNum, err) @@ -170,7 +170,6 @@ func (multi *MultiEpoch) handleGetSignaturesForAddress(ctx context.Context, conn if err != nil { return nil, fmt.Errorf("error while decoding transaction from nodex at offset %d: %w", oas.Offset, err) } - blockTimeCache.m[uint64(decoded.Slot)] = uint64(oas.Blocktime) return decoded, nil }, ) diff --git a/multiepoch-getTransaction.go b/multiepoch-getTransaction.go index c0ca1f1d..eeb7c7c7 100644 --- a/multiepoch-getTransaction.go +++ b/multiepoch-getTransaction.go @@ -10,6 +10,8 @@ import ( "github.com/gagliardetto/solana-go" "github.com/rpcpool/yellowstone-faithful/compactindexsized" "github.com/sourcegraph/jsonrpc2" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "k8s.io/klog/v2" ) @@ -172,16 +174,18 @@ func (multi *MultiEpoch) handleGetTransaction(ctx context.Context, conn *request response.Slot = ptrToUint64(uint64(transactionNode.Slot)) { - block, _, err := epochHandler.GetBlock(ctx, uint64(transactionNode.Slot)) - if err != nil { + blocktimeIndex := epochHandler.GetBlocktimeIndex() + if blocktimeIndex != nil { + blocktime, err := blocktimeIndex.Get(uint64(transactionNode.Slot)) + if err != nil { + return nil, status.Errorf(codes.Internal, "Failed to get block: %v", err) + } + response.Blocktime = &blocktime + } else { return &jsonrpc2.Error{ Code: jsonrpc2.CodeInternalError, Message: "Internal error", - }, fmt.Errorf("failed to get block: %w", err) - } - blocktime := uint64(block.Meta.Blocktime) - if blocktime != 0 { - response.Blocktime = &blocktime + }, fmt.Errorf("failed to get blocktime: blocktime index is not available") } } diff --git a/parse_legacy_transaction_status_meta/v-latest/parse_legacy_transaction_status_meta_ce598c5c98e7384c104fe7f5121e32c2c5a2d2eb.go b/parse_legacy_transaction_status_meta/v-latest/parse_legacy_transaction_status_meta_ce598c5c98e7384c104fe7f5121e32c2c5a2d2eb.go index c28fd935..b15c645b 100644 --- a/parse_legacy_transaction_status_meta/v-latest/parse_legacy_transaction_status_meta_ce598c5c98e7384c104fe7f5121e32c2c5a2d2eb.go +++ b/parse_legacy_transaction_status_meta/v-latest/parse_legacy_transaction_status_meta_ce598c5c98e7384c104fe7f5121e32c2c5a2d2eb.go @@ -2615,7 +2615,7 @@ func BincodeDeserializeTransactionStatusMeta(input []byte) (TransactionStatusMet if err == nil && deserializer.GetBufferOffset() < uint64(len(input)) { // return obj, fmt.Errorf("Some input bytes were not read") // TODO: fix this - klog.Warningf( + klog.V(5).Infof( "Parsed %d bytes, but input was %d bytes (%d bytes not read)", deserializer.GetBufferOffset(), len(input), diff --git a/parse_legacy_transaction_status_meta/v-oldest/parse_legacy_transaction_status_meta_b7b4aa5d4d34ebf3fd338a64f4f2a5257b047bb4.go b/parse_legacy_transaction_status_meta/v-oldest/parse_legacy_transaction_status_meta_b7b4aa5d4d34ebf3fd338a64f4f2a5257b047bb4.go index d28555ef..c496cf6d 100644 --- a/parse_legacy_transaction_status_meta/v-oldest/parse_legacy_transaction_status_meta_b7b4aa5d4d34ebf3fd338a64f4f2a5257b047bb4.go +++ b/parse_legacy_transaction_status_meta/v-oldest/parse_legacy_transaction_status_meta_b7b4aa5d4d34ebf3fd338a64f4f2a5257b047bb4.go @@ -1943,7 +1943,7 @@ func BincodeDeserializeTransactionStatusMeta(input []byte) (TransactionStatusMet if err == nil && deserializer.GetBufferOffset() < uint64(len(input)) { // return obj, fmt.Errorf("Some input bytes were not read") // TODO: fix this - klog.Warningf( + klog.V(5).Infof( "Parsed %d bytes, but input was %d bytes (%d bytes not read)", deserializer.GetBufferOffset(), len(input), diff --git a/readasonecar/read-as-one-car.go b/readasonecar/read-as-one-car.go index 329e7a39..cd134659 100644 --- a/readasonecar/read-as-one-car.go +++ b/readasonecar/read-as-one-car.go @@ -1,6 +1,7 @@ package readasonecar import ( + "errors" "fmt" "io" "os" @@ -31,24 +32,24 @@ func NewMultiReader(files ...string) (*MultiReader, error) { return nil, fmt.Errorf("no files provided") } // check that each file exists - for _, file := range files { - if _, err := os.Stat(file); err != nil { - return nil, err + for _, fn := range files { + if _, err := os.Stat(fn); err != nil { + return nil, fmt.Errorf("file %q does not exist: %w", fn, err) } } readers := make([]*carreader.CarReader, len(files)) onClose := make([]func() error, len(files)) - for i, file := range files { - f, err := os.Open(file) + for i, fn := range files { + file, err := os.Open(fn) if err != nil { - return nil, fmt.Errorf("failed to open car file %s: %w", file, err) + return nil, fmt.Errorf("failed to open car file %q: %w", fn, err) } - onClose[i] = f.Close - r, err := carreader.New(f) + onClose[i] = file.Close + reader, err := carreader.New(file) if err != nil { - return nil, fmt.Errorf("failed to create car reader for file %s: %w", file, err) + return nil, fmt.Errorf("failed to create car reader for file %q: %w", fn, err) } - readers[i] = r + readers[i] = reader } return &MultiReader{files: files}, nil } @@ -93,13 +94,16 @@ func (mr *MultiReader) NextNodeBytes() (cid.Cid, uint64, []byte, error) { } func (mr *MultiReader) Close() error { - var err error + var errs []error for _, f := range mr.onClose { if e := f(); e != nil { - err = e + errs = append(errs, e) } } - return err + if len(errs) == 0 { + return nil + } + return errors.Join(errs...) } func (mr *MultiReader) Files() []string { diff --git a/slot-tools.go b/slottools/edges.go similarity index 55% rename from slot-tools.go rename to slottools/edges.go index 311c0422..79bbea87 100644 --- a/slot-tools.go +++ b/slottools/edges.go @@ -1,4 +1,6 @@ -package main +package slottools + +import "encoding/binary" // CalcEpochForSlot returns the epoch for the given slot. func CalcEpochForSlot(slot uint64) uint64 { @@ -22,3 +24,23 @@ func Uint64RangesHavePartialOverlapIncludingEdges(r1 [2]uint64, r2 [2]uint64) bo return r2[1] >= r1[0] } } + +// EpochForSlot returns the epoch for the given slot. +func EpochForSlot(slot uint64) uint64 { + return CalcEpochForSlot(slot) +} + +// EpochLimits returns the start and stop slots for the given epoch (inclusive). +func EpochLimits(epoch uint64) (uint64, uint64) { + return CalcEpochLimits(epoch) +} + +func Uint64ToLEBytes(v uint64) []byte { + buf := make([]byte, 8) + binary.LittleEndian.PutUint64(buf, v) + return buf +} + +func Uint64FromLEBytes(buf []byte) uint64 { + return binary.LittleEndian.Uint64(buf) +} diff --git a/slot-tools_test.go b/slottools/edges_test.go similarity index 99% rename from slot-tools_test.go rename to slottools/edges_test.go index 92a5e399..2c622eb9 100644 --- a/slot-tools_test.go +++ b/slottools/edges_test.go @@ -1,4 +1,4 @@ -package main +package slottools import ( "testing" diff --git a/storage.go b/storage.go index f706eb7f..b8469cae 100644 --- a/storage.go +++ b/storage.go @@ -113,7 +113,7 @@ type GetBlockResponse struct { type GetTransactionResponse struct { // TODO: use same format as solana - Blocktime *uint64 `json:"blockTime,omitempty"` + Blocktime *int64 `json:"blockTime,omitempty"` Meta any `json:"meta"` Slot *uint64 `json:"slot,omitempty"` Transaction any `json:"transaction"`