Skip to content

Commit

Permalink
Prefetch content from car file (#27)
Browse files Browse the repository at this point in the history
* Prefetch content from car file

* Improve checks
  • Loading branch information
gagliardetto authored Jul 20, 2023
1 parent b5934a7 commit 4ee5995
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 5 deletions.
98 changes: 98 additions & 0 deletions cmd-rpc-server-car-getBlock.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package main

import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"runtime"
"sort"
"sync"
"time"

"github.com/gagliardetto/solana-go"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car/util"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/rpcpool/yellowstone-faithful/compactindex36"
"github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode"
Expand Down Expand Up @@ -102,6 +107,99 @@ func (ser *rpcServer) handleGetBlock(ctx context.Context, conn *requestContext,
return
}
tim.time("GetBlock")
{
prefetcherFromCar := func() error {
var blockCid, parentCid cid.Cid
wg := new(errgroup.Group)
wg.Go(func() (err error) {
blockCid, err = ser.FindCidFromSlot(ctx, slot)
if err != nil {
return err
}
return nil
})
wg.Go(func() (err error) {
parentCid, err = ser.FindCidFromSlot(ctx, uint64(block.Meta.Parent_slot))
if err != nil {
return err
}
return nil
})
err = wg.Wait()
if err != nil {
return err
}
{
var blockOffset, parentOffset uint64
wg := new(errgroup.Group)
wg.Go(func() (err error) {
blockOffset, err = ser.FindOffsetFromCid(ctx, blockCid)
if err != nil {
return err
}
return nil
})
wg.Go(func() (err error) {
parentOffset, err = ser.FindOffsetFromCid(ctx, parentCid)
if err != nil {
// If the parent is not found, it (probably) means that it's outside of the car file.
parentOffset = 0
}
return nil
})
err = wg.Wait()
if err != nil {
return err
}

parentIsInPreviousEpoch := CalcEpochForSlot(uint64(block.Meta.Parent_slot)) != CalcEpochForSlot(slot)

length := blockOffset - parentOffset
// cap the length to 1GB
GiB := uint64(1024 * 1024 * 1024)
if length > GiB {
length = GiB
}
carSection, err := ser.ReadAtFromCar(ctx, parentOffset, length)
if err != nil {
return err
}
dr := bytes.NewReader(carSection)

br := bufio.NewReader(dr)

gotCid, data, err := util.ReadNode(br)
if err != nil {
return err
}
if !parentIsInPreviousEpoch && !gotCid.Equals(parentCid) {
return fmt.Errorf("CID mismatch: expected %s, got %s", parentCid, gotCid)
}
ser.putNodeInCache(gotCid, data)

for {
gotCid, data, err = util.ReadNode(br)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
return err
}
if gotCid.Equals(blockCid) {
break
}
ser.putNodeInCache(gotCid, data)
}
}
return nil
}
if ser.lassieFetcher == nil {
err := prefetcherFromCar()
if err != nil {
klog.Errorf("failed to prefetch from car: %v", err)
}
}
}
blocktime := uint64(block.Meta.Blocktime)

allTransactionNodes := make([]*ipldbindcode.Transaction, 0)
Expand Down
43 changes: 38 additions & 5 deletions cmd-rpc-server-car.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,14 +391,14 @@ func getCidCacheKey(off int64, p []byte) string {
return fmt.Sprintf("%d-%d", off, len(p))
}

func (r *rpcServer) getFromCache(c cid.Cid) (v []byte, err error, has bool) {
func (r *rpcServer) getNodeFromCache(c cid.Cid) (v []byte, err error, has bool) {
if v, ok := r.cidToBlockCache.Get(c.String()); ok {
return v.([]byte), nil, true
}
return nil, nil, false
}

func (r *rpcServer) putInCache(c cid.Cid, data []byte) {
func (r *rpcServer) putNodeInCache(c cid.Cid, data []byte) {
r.cidToBlockCache.Set(c.String(), data, cache.DefaultExpiration)
}

Expand Down Expand Up @@ -519,7 +519,7 @@ func (s *rpcServer) prefetchSubgraph(ctx context.Context, wantedCid cid.Cid) err
if err == nil {
// put in cache
return sub.Each(ctx, func(c cid.Cid, data []byte) error {
s.putInCache(c, data)
s.putNodeInCache(c, data)
return nil
})
}
Expand All @@ -532,7 +532,7 @@ func (s *rpcServer) prefetchSubgraph(ctx context.Context, wantedCid cid.Cid) err
func (s *rpcServer) GetNodeByCid(ctx context.Context, wantedCid cid.Cid) ([]byte, error) {
{
// try from cache
data, err, has := s.getFromCache(wantedCid)
data, err, has := s.getNodeFromCache(wantedCid)
if err != nil {
return nil, err
}
Expand All @@ -545,7 +545,7 @@ func (s *rpcServer) GetNodeByCid(ctx context.Context, wantedCid cid.Cid) ([]byte
data, err := s.lassieFetcher.GetNodeByCid(ctx, wantedCid)
if err == nil {
// put in cache
s.putInCache(wantedCid, data)
s.putNodeInCache(wantedCid, data)
return data, nil
}
klog.Errorf("failed to get node from lassie: %v", err)
Expand All @@ -561,6 +561,15 @@ func (s *rpcServer) GetNodeByCid(ctx context.Context, wantedCid cid.Cid) ([]byte
return s.GetNodeByOffset(ctx, wantedCid, offset)
}

func readSectionFromReaderAt(reader ReaderAtCloser, offset uint64, length uint64) ([]byte, error) {
data := make([]byte, length)
_, err := reader.ReadAt(data, int64(offset))
if err != nil {
return nil, err
}
return data, nil
}

func readNodeFromReaderAt(reader ReaderAtCloser, wantedCid cid.Cid, offset uint64) ([]byte, error) {
// read MaxVarintLen64 bytes
lenBuf := make([]byte, binary.MaxVarintLen64)
Expand Down Expand Up @@ -591,6 +600,30 @@ func readNodeFromReaderAt(reader ReaderAtCloser, wantedCid cid.Cid, offset uint6
return data[n:], nil
}

func (s *rpcServer) ReadAtFromCar(ctx context.Context, offset uint64, length uint64) ([]byte, error) {
if s.localCarReader == nil {
// try remote reader
if s.remoteCarReader == nil {
return nil, fmt.Errorf("no CAR reader available")
}
return readSectionFromReaderAt(s.remoteCarReader, offset, length)
}
// Get reader and seek to offset, then read node.
dr, err := s.localCarReader.DataReader()
if err != nil {
klog.Errorf("failed to get data reader: %v", err)
return nil, err
}
dr.Seek(int64(offset), io.SeekStart)
data := make([]byte, length)
_, err = io.ReadFull(dr, data)
if err != nil {
klog.Errorf("failed to read node: %v", err)
return nil, err
}
return data, nil
}

func (s *rpcServer) GetNodeByOffset(ctx context.Context, wantedCid cid.Cid, offset uint64) ([]byte, error) {
if s.localCarReader == nil {
// try remote reader
Expand Down
24 changes: 24 additions & 0 deletions slot-tools.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package main

// CalcEpochForSlot returns the epoch for the given slot.
func CalcEpochForSlot(slot uint64) uint64 {
return slot / EpochLen
}

const EpochLen = 432000

// CalcEpochLimits returns the start and stop slots for the given epoch (inclusive).
func CalcEpochLimits(epoch uint64) (uint64, uint64) {
epochStart := epoch * EpochLen
epochStop := epochStart + EpochLen - 1
return epochStart, epochStop
}

// Uint64RangesHavePartialOverlapIncludingEdges returns true if the two ranges have any overlap.
func Uint64RangesHavePartialOverlapIncludingEdges(r1 [2]uint64, r2 [2]uint64) bool {
if r1[0] < r2[0] {
return r1[1] >= r2[0]
} else {
return r2[1] >= r1[0]
}
}
68 changes: 68 additions & 0 deletions slot-tools_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package main

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestCalcEpochForSlot(t *testing.T) {
require.Equal(t, uint64(0), CalcEpochForSlot(0))
require.Equal(t, uint64(0), CalcEpochForSlot(1))
require.Equal(t, uint64(0), CalcEpochForSlot(431999))
require.Equal(t, uint64(1), CalcEpochForSlot(432000))
require.Equal(t, uint64(1), CalcEpochForSlot(863999))
require.Equal(t, uint64(2), CalcEpochForSlot(864000))
require.Equal(t, uint64(477), CalcEpochForSlot(206459118))
}

func TestCalcEpochLimits(t *testing.T) {
{
epochStart, epochStop := CalcEpochLimits(0)
require.Equal(t, uint64(0), epochStart)
require.Equal(t, uint64(431_999), epochStop)
}
{
epochStart, epochStop := CalcEpochLimits(1)
require.Equal(t, uint64(432_000), epochStart)
require.Equal(t, uint64(863_999), epochStop)
}
{
epochStart, epochStop := CalcEpochLimits(333)
require.Equal(t, uint64(143_856_000), epochStart)
require.Equal(t, uint64(144_287_999), epochStop)
}
{
epochStart, epochStop := CalcEpochLimits(447)
require.Equal(t, uint64(193_104_000), epochStart)
require.Equal(t, uint64(193_535_999), epochStop)
}
}

func TestUint64RangesHavePartialOverlapIncludingEdges(t *testing.T) {
{
r1 := [2]uint64{0, 10}
r2 := [2]uint64{5, 15}
require.True(t, Uint64RangesHavePartialOverlapIncludingEdges(r1, r2))
}
{
r1 := [2]uint64{0, 10}
r2 := [2]uint64{10, 15}
require.True(t, Uint64RangesHavePartialOverlapIncludingEdges(r1, r2))
}
{
r1 := [2]uint64{0, 10}
r2 := [2]uint64{11, 15}
require.False(t, Uint64RangesHavePartialOverlapIncludingEdges(r1, r2))
}
{
r1 := [2]uint64{0, 10}
r2 := [2]uint64{0, 10}
require.True(t, Uint64RangesHavePartialOverlapIncludingEdges(r1, r2))
}
{
r1 := [2]uint64{10, 20}
r2 := [2]uint64{0, 10}
require.True(t, Uint64RangesHavePartialOverlapIncludingEdges(r1, r2))
}
}

0 comments on commit 4ee5995

Please sign in to comment.