Skip to content

Commit

Permalink
Merge pull request #14 from gagliardetto/subgraph-prefetch
Browse files Browse the repository at this point in the history
Subgraph prefetch
  • Loading branch information
gagliardetto authored Jun 29, 2023
2 parents a2e0774 + 2ce51b6 commit 5b2de1f
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 28 deletions.
33 changes: 23 additions & 10 deletions cmd-rpc-server-car-getBlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/gagliardetto/solana-go"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/rpcpool/yellowstone-faithful/compactindex36"
"github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode"
solanablockrewards "github.com/rpcpool/yellowstone-faithful/solana-block-rewards"
"github.com/sourcegraph/jsonrpc2"
Expand Down Expand Up @@ -60,7 +61,7 @@ func (t *timer) time(name string) {
t.prev = time.Now()
}

func (ser *rpcServer) getBlock(ctx context.Context, conn *requestContext, req *jsonrpc2.Request) {
func (ser *rpcServer) handleGetBlock(ctx context.Context, conn *requestContext, req *jsonrpc2.Request) {
tim := newTimer()
params, err := parseGetBlockRequest(req.Params)
if err != nil {
Expand All @@ -77,16 +78,26 @@ func (ser *rpcServer) getBlock(ctx context.Context, conn *requestContext, req *j
tim.time("parseGetBlockRequest")
slot := params.Slot

block, err := ser.GetBlock(ctx, slot)
block, err := ser.GetBlock(WithSubrapghPrefetch(ctx, true), slot)
if err != nil {
klog.Errorf("failed to get block: %v", err)
conn.ReplyWithError(
ctx,
req.ID,
&jsonrpc2.Error{
Code: jsonrpc2.CodeInternalError,
Message: "Failed to get block",
})
if errors.Is(err, compactindex36.ErrNotFound) {
conn.ReplyWithError(
ctx,
req.ID,
&jsonrpc2.Error{
Code: CodeNotFound,
Message: fmt.Sprintf("Slot %d was skipped, or missing in long-term storage", slot),
})
} else {
conn.ReplyWithError(
ctx,
req.ID,
&jsonrpc2.Error{
Code: jsonrpc2.CodeInternalError,
Message: "Failed to get block",
})
}
return
}
tim.time("GetBlock")
Expand Down Expand Up @@ -317,7 +328,7 @@ func (ser *rpcServer) getBlock(ctx context.Context, conn *requestContext, req *j
// get parent slot
parentSlot := uint64(block.Meta.Parent_slot)
if parentSlot != 0 {
parentBlock, err := ser.GetBlock(ctx, parentSlot)
parentBlock, err := ser.GetBlock(WithSubrapghPrefetch(ctx, false), parentSlot)
if err != nil {
klog.Errorf("failed to decode block: %v", err)
conn.ReplyWithError(
Expand Down Expand Up @@ -385,3 +396,5 @@ func rentTypeToString(typ int) string {
return "Unknown"
}
}

const CodeNotFound = -32009
2 changes: 1 addition & 1 deletion cmd-rpc-server-car-getSignaturesForAddress.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func parseGetSignaturesForAddressParams(raw *json.RawMessage) (*GetSignaturesFor
return out, nil
}

func (ser *rpcServer) getSignaturesForAddress(ctx context.Context, conn *requestContext, req *jsonrpc2.Request) {
func (ser *rpcServer) handleGetSignaturesForAddress(ctx context.Context, conn *requestContext, req *jsonrpc2.Request) {
if ser.gsfaReader == nil {
klog.Errorf("gsfaReader is nil")
conn.ReplyWithError(
Expand Down
4 changes: 2 additions & 2 deletions cmd-rpc-server-car-getTransaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func ptrToUint64(v uint64) *uint64 {
return &v
}

func (ser *rpcServer) getTransaction(ctx context.Context, conn *requestContext, req *jsonrpc2.Request) {
func (ser *rpcServer) handleGetTransaction(ctx context.Context, conn *requestContext, req *jsonrpc2.Request) {
params, err := parseGetTransactionRequest(req.Params)
if err != nil {
klog.Errorf("failed to parse params: %v", err)
Expand All @@ -27,7 +27,7 @@ func (ser *rpcServer) getTransaction(ctx context.Context, conn *requestContext,

sig := params.Signature

transactionNode, err := ser.GetTransaction(ctx, sig)
transactionNode, err := ser.GetTransaction(WithSubrapghPrefetch(ctx, true), sig)
if err != nil {
klog.Errorf("failed to decode Transaction: %v", err)
conn.ReplyWithError(
Expand Down
100 changes: 88 additions & 12 deletions cmd-rpc-server-car.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,24 +195,23 @@ type HTTPSingleFileRemoteReaderAt struct {
url string
contentLength int64
client *http.Client
// TODO: add caching
ca *cache.Cache
ca *cache.Cache
}

func getCacheKey(off int64, p []byte) string {
func getHttpCacheKey(off int64, p []byte) string {
return fmt.Sprintf("%d-%d", off, len(p))
}

func (r *HTTPSingleFileRemoteReaderAt) getFromCache(off int64, p []byte) (n int, err error, has bool) {
key := getCacheKey(off, p)
key := getHttpCacheKey(off, p)
if v, ok := r.ca.Get(key); ok {
return copy(p, v.([]byte)), nil, true
}
return 0, nil, false
}

func (r *HTTPSingleFileRemoteReaderAt) putInCache(off int64, p []byte) {
key := getCacheKey(off, p)
key := getHttpCacheKey(off, p)
r.ca.Set(key, p, cache.DefaultExpiration)
}

Expand Down Expand Up @@ -307,13 +306,15 @@ func createAndStartRPCServer_withCar(
sigToCidIndex *compactindex36.DB,
gsfaReader *gsfa.GsfaReader,
) error {
ca := cache.New(30*time.Second, 1*time.Minute)
handler := &rpcServer{
localCarReader: carReader,
remoteCarReader: remoteCarReader,
cidToOffsetIndex: cidToOffsetIndex,
slotToCidIndex: slotToCidIndex,
sigToCidIndex: sigToCidIndex,
gsfaReader: gsfaReader,
cidToBlockCache: ca,
}

h := newRPCHandler_fast(handler)
Expand All @@ -331,11 +332,13 @@ func createAndStartRPCServer_lassie(
sigToCidIndex *compactindex36.DB,
gsfaReader *gsfa.GsfaReader,
) error {
ca := cache.New(30*time.Second, 1*time.Minute)
handler := &rpcServer{
lassieFetcher: lassieWr,
slotToCidIndex: slotToCidIndex,
sigToCidIndex: sigToCidIndex,
gsfaReader: gsfaReader,
lassieFetcher: lassieWr,
slotToCidIndex: slotToCidIndex,
sigToCidIndex: sigToCidIndex,
gsfaReader: gsfaReader,
cidToBlockCache: ca,
}

h := newRPCHandler_fast(handler)
Expand All @@ -353,6 +356,22 @@ type rpcServer struct {
slotToCidIndex *compactindex36.DB
sigToCidIndex *compactindex36.DB
gsfaReader *gsfa.GsfaReader
cidToBlockCache *cache.Cache // TODO: prevent OOM
}

func getCidCacheKey(off int64, p []byte) string {
return fmt.Sprintf("%d-%d", off, len(p))
}

func (r *rpcServer) getFromCache(c cid.Cid) (v []byte, err error, has bool) {
if v, ok := r.cidToBlockCache.Get(c.String()); ok {
return v.([]byte), nil, true
}
return nil, nil, false
}

func (r *rpcServer) putInCache(c cid.Cid, data []byte) {
r.cidToBlockCache.Set(c.String(), data, cache.DefaultExpiration)
}

type requestContext struct {
Expand Down Expand Up @@ -465,11 +484,40 @@ func (c *requestContext) ReplyNoMod(
return err
}

func (s *rpcServer) prefetchSubgraph(ctx context.Context, wantedCid cid.Cid) error {
if s.lassieFetcher != nil {
// Fetch the subgraph from lassie
sub, err := s.lassieFetcher.GetSubgraph(ctx, wantedCid)
if err == nil {
// put in cache
return sub.Each(ctx, func(c cid.Cid, data []byte) error {
s.putInCache(c, data)
return nil
})
}
klog.Errorf("failed to get subgraph from lassie: %v", err)
return err
}
return nil
}

func (s *rpcServer) GetNodeByCid(ctx context.Context, wantedCid cid.Cid) ([]byte, error) {
{
// try from cache
data, err, has := s.getFromCache(wantedCid)
if err != nil {
return nil, err
}
if has {
return data, nil
}
}
if s.lassieFetcher != nil {
// Fetch the node from lassie.
data, err := s.lassieFetcher.GetNodeByCid(ctx, wantedCid)
if err == nil {
// put in cache
s.putInCache(wantedCid, data)
return data, nil
}
klog.Errorf("failed to get node from lassie: %v", err)
Expand Down Expand Up @@ -579,6 +627,20 @@ func (ser *rpcServer) FindOffsetFromCid(ctx context.Context, cid cid.Cid) (uint6
return findOffsetFromCid(ser.cidToOffsetIndex, cid)
}

func putValueIntoContext(ctx context.Context, key, value interface{}) context.Context {
return context.WithValue(ctx, key, value)
}

func getValueFromContext(ctx context.Context, key interface{}) interface{} {
return ctx.Value(key)
}

// WithSubrapghPrefetch sets the prefetch flag in the context
// to enable prefetching of subgraphs.
func WithSubrapghPrefetch(ctx context.Context, yesNo bool) context.Context {
return putValueIntoContext(ctx, "prefetch", yesNo)
}

func (ser *rpcServer) GetBlock(ctx context.Context, slot uint64) (*ipldbindcode.Block, error) {
// get the slot by slot number
wantedCid, err := ser.FindCidFromSlot(ctx, slot)
Expand All @@ -587,6 +649,13 @@ func (ser *rpcServer) GetBlock(ctx context.Context, slot uint64) (*ipldbindcode.
return nil, err
}
klog.Infof("found CID for slot %d: %s", slot, wantedCid)
{
doPrefetch := getValueFromContext(ctx, "prefetch")
if doPrefetch != nil && doPrefetch.(bool) {
// prefetch the block
ser.prefetchSubgraph(ctx, wantedCid)
}
}
// get the block by CID
data, err := ser.GetNodeByCid(ctx, wantedCid)
if err != nil {
Expand Down Expand Up @@ -670,6 +739,13 @@ func (ser *rpcServer) GetTransaction(ctx context.Context, sig solana.Signature)
return nil, err
}
klog.Infof("found CID for signature %s: %s", sig, wantedCid)
{
doPrefetch := getValueFromContext(ctx, "prefetch")
if doPrefetch != nil && doPrefetch.(bool) {
// prefetch the block
ser.prefetchSubgraph(ctx, wantedCid)
}
}
// get the transaction by CID
data, err := ser.GetNodeByCid(ctx, wantedCid)
if err != nil {
Expand Down Expand Up @@ -716,11 +792,11 @@ func parseGetTransactionRequest(raw *json.RawMessage) (*GetTransactionRequest, e
func (ser *rpcServer) Handle(ctx context.Context, conn *requestContext, req *jsonrpc2.Request) {
switch req.Method {
case "getBlock":
ser.getBlock(ctx, conn, req)
ser.handleGetBlock(ctx, conn, req)
case "getTransaction":
ser.getTransaction(ctx, conn, req)
ser.handleGetTransaction(ctx, conn, req)
case "getSignaturesForAddress":
ser.getSignaturesForAddress(ctx, conn, req)
ser.handleGetSignaturesForAddress(ctx, conn, req)
default:
conn.ReplyWithError(
ctx,
Expand Down
5 changes: 3 additions & 2 deletions ipld/ipldbindcode/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ func (n Transaction) HasIndex() bool {
return n.Index != nil && *n.Index != nil
}

// GetIndex returns the value of the 'Index' field and
// GetPositionIndex returns the 'Index' field, which indicates
// the index of the transaction in the block (0-based), and
// a flag indicating whether the field has a value.
func (n Transaction) GetIndex() (int, bool) {
func (n Transaction) GetPositionIndex() (int, bool) {
if n.Index == nil || *n.Index == nil {
return 0, false
}
Expand Down
17 changes: 17 additions & 0 deletions lassie-wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,23 @@ func (l *lassieWrapper) GetNodeByCid(ctx context.Context, wantedCid cid.Cid) ([]
return nil, nil
}

func (l *lassieWrapper) GetSubgraph(ctx context.Context, wantedCid cid.Cid) (*WrappedMemStore, error) {
store := NewWrappedMemStore()
{
_, err := l.Fetch(
ctx,
wantedCid,
"",
types.DagScopeAll,
store,
)
if err != nil {
return nil, err
}
}
return store, nil
}

func (l *lassieWrapper) Fetch(
ctx context.Context,
rootCid cid.Cid,
Expand Down
2 changes: 1 addition & 1 deletion ledger.ipldsch
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type Transaction struct {
metadata DataFrame
# The slot number where this transaction was created.
slot Int
# The index of this transaction in the the block (0-indexed).
# The index of the position of this transaction in the block (0-indexed).
index nullable optional Int
} representation tuple

Expand Down

0 comments on commit 5b2de1f

Please sign in to comment.