From 2ce51b6415a6f3fc03cd4b1933d89258a779d84f Mon Sep 17 00:00:00 2001 From: gagliardetto Date: Thu, 29 Jun 2023 11:34:23 +0200 Subject: [PATCH] Enable subgraph fetching and add block cache --- cmd-rpc-server-car-getBlock.go | 33 ++++-- cmd-rpc-server-car-getSignaturesForAddress.go | 2 +- cmd-rpc-server-car-getTransaction.go | 4 +- cmd-rpc-server-car.go | 100 +++++++++++++++--- lassie-wrapper.go | 17 +++ 5 files changed, 131 insertions(+), 25 deletions(-) diff --git a/cmd-rpc-server-car-getBlock.go b/cmd-rpc-server-car-getBlock.go index 09402bcf..bf33b9c8 100644 --- a/cmd-rpc-server-car-getBlock.go +++ b/cmd-rpc-server-car-getBlock.go @@ -11,6 +11,7 @@ import ( "github.com/gagliardetto/solana-go" cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/rpcpool/yellowstone-faithful/compactindex36" "github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode" solanablockrewards "github.com/rpcpool/yellowstone-faithful/solana-block-rewards" "github.com/sourcegraph/jsonrpc2" @@ -60,7 +61,7 @@ func (t *timer) time(name string) { t.prev = time.Now() } -func (ser *rpcServer) getBlock(ctx context.Context, conn *requestContext, req *jsonrpc2.Request) { +func (ser *rpcServer) handleGetBlock(ctx context.Context, conn *requestContext, req *jsonrpc2.Request) { tim := newTimer() params, err := parseGetBlockRequest(req.Params) if err != nil { @@ -77,16 +78,26 @@ func (ser *rpcServer) getBlock(ctx context.Context, conn *requestContext, req *j tim.time("parseGetBlockRequest") slot := params.Slot - block, err := ser.GetBlock(ctx, slot) + block, err := ser.GetBlock(WithSubrapghPrefetch(ctx, true), slot) if err != nil { klog.Errorf("failed to get block: %v", err) - conn.ReplyWithError( - ctx, - req.ID, - &jsonrpc2.Error{ - Code: jsonrpc2.CodeInternalError, - Message: "Failed to get block", - }) + if errors.Is(err, compactindex36.ErrNotFound) { + conn.ReplyWithError( + ctx, + req.ID, + &jsonrpc2.Error{ + Code: CodeNotFound, + Message: fmt.Sprintf("Slot %d was skipped, or missing in long-term storage", slot), + }) + } else { + conn.ReplyWithError( + ctx, + req.ID, + &jsonrpc2.Error{ + Code: jsonrpc2.CodeInternalError, + Message: "Failed to get block", + }) + } return } tim.time("GetBlock") @@ -317,7 +328,7 @@ func (ser *rpcServer) getBlock(ctx context.Context, conn *requestContext, req *j // get parent slot parentSlot := uint64(block.Meta.Parent_slot) if parentSlot != 0 { - parentBlock, err := ser.GetBlock(ctx, parentSlot) + parentBlock, err := ser.GetBlock(WithSubrapghPrefetch(ctx, false), parentSlot) if err != nil { klog.Errorf("failed to decode block: %v", err) conn.ReplyWithError( @@ -385,3 +396,5 @@ func rentTypeToString(typ int) string { return "Unknown" } } + +const CodeNotFound = -32009 diff --git a/cmd-rpc-server-car-getSignaturesForAddress.go b/cmd-rpc-server-car-getSignaturesForAddress.go index 8a7a09cc..2c0a7617 100644 --- a/cmd-rpc-server-car-getSignaturesForAddress.go +++ b/cmd-rpc-server-car-getSignaturesForAddress.go @@ -54,7 +54,7 @@ func parseGetSignaturesForAddressParams(raw *json.RawMessage) (*GetSignaturesFor return out, nil } -func (ser *rpcServer) getSignaturesForAddress(ctx context.Context, conn *requestContext, req *jsonrpc2.Request) { +func (ser *rpcServer) handleGetSignaturesForAddress(ctx context.Context, conn *requestContext, req *jsonrpc2.Request) { if ser.gsfaReader == nil { klog.Errorf("gsfaReader is nil") conn.ReplyWithError( diff --git a/cmd-rpc-server-car-getTransaction.go b/cmd-rpc-server-car-getTransaction.go index 53671d6a..17926e72 100644 --- a/cmd-rpc-server-car-getTransaction.go +++ b/cmd-rpc-server-car-getTransaction.go @@ -11,7 +11,7 @@ func ptrToUint64(v uint64) *uint64 { return &v } -func (ser *rpcServer) getTransaction(ctx context.Context, conn *requestContext, req *jsonrpc2.Request) { +func (ser *rpcServer) handleGetTransaction(ctx context.Context, conn *requestContext, req *jsonrpc2.Request) { params, err := parseGetTransactionRequest(req.Params) if err != nil { klog.Errorf("failed to parse params: %v", err) @@ -27,7 +27,7 @@ func (ser *rpcServer) getTransaction(ctx context.Context, conn *requestContext, sig := params.Signature - transactionNode, err := ser.GetTransaction(ctx, sig) + transactionNode, err := ser.GetTransaction(WithSubrapghPrefetch(ctx, true), sig) if err != nil { klog.Errorf("failed to decode Transaction: %v", err) conn.ReplyWithError( diff --git a/cmd-rpc-server-car.go b/cmd-rpc-server-car.go index 63506685..b9c593b4 100644 --- a/cmd-rpc-server-car.go +++ b/cmd-rpc-server-car.go @@ -195,16 +195,15 @@ type HTTPSingleFileRemoteReaderAt struct { url string contentLength int64 client *http.Client - // TODO: add caching - ca *cache.Cache + ca *cache.Cache } -func getCacheKey(off int64, p []byte) string { +func getHttpCacheKey(off int64, p []byte) string { return fmt.Sprintf("%d-%d", off, len(p)) } func (r *HTTPSingleFileRemoteReaderAt) getFromCache(off int64, p []byte) (n int, err error, has bool) { - key := getCacheKey(off, p) + key := getHttpCacheKey(off, p) if v, ok := r.ca.Get(key); ok { return copy(p, v.([]byte)), nil, true } @@ -212,7 +211,7 @@ func (r *HTTPSingleFileRemoteReaderAt) getFromCache(off int64, p []byte) (n int, } func (r *HTTPSingleFileRemoteReaderAt) putInCache(off int64, p []byte) { - key := getCacheKey(off, p) + key := getHttpCacheKey(off, p) r.ca.Set(key, p, cache.DefaultExpiration) } @@ -307,6 +306,7 @@ func createAndStartRPCServer_withCar( sigToCidIndex *compactindex36.DB, gsfaReader *gsfa.GsfaReader, ) error { + ca := cache.New(30*time.Second, 1*time.Minute) handler := &rpcServer{ localCarReader: carReader, remoteCarReader: remoteCarReader, @@ -314,6 +314,7 @@ func createAndStartRPCServer_withCar( slotToCidIndex: slotToCidIndex, sigToCidIndex: sigToCidIndex, gsfaReader: gsfaReader, + cidToBlockCache: ca, } h := newRPCHandler_fast(handler) @@ -331,11 +332,13 @@ func createAndStartRPCServer_lassie( sigToCidIndex *compactindex36.DB, gsfaReader *gsfa.GsfaReader, ) error { + ca := cache.New(30*time.Second, 1*time.Minute) handler := &rpcServer{ - lassieFetcher: lassieWr, - slotToCidIndex: slotToCidIndex, - sigToCidIndex: sigToCidIndex, - gsfaReader: gsfaReader, + lassieFetcher: lassieWr, + slotToCidIndex: slotToCidIndex, + sigToCidIndex: sigToCidIndex, + gsfaReader: gsfaReader, + cidToBlockCache: ca, } h := newRPCHandler_fast(handler) @@ -353,6 +356,22 @@ type rpcServer struct { slotToCidIndex *compactindex36.DB sigToCidIndex *compactindex36.DB gsfaReader *gsfa.GsfaReader + cidToBlockCache *cache.Cache // TODO: prevent OOM +} + +func getCidCacheKey(off int64, p []byte) string { + return fmt.Sprintf("%d-%d", off, len(p)) +} + +func (r *rpcServer) getFromCache(c cid.Cid) (v []byte, err error, has bool) { + if v, ok := r.cidToBlockCache.Get(c.String()); ok { + return v.([]byte), nil, true + } + return nil, nil, false +} + +func (r *rpcServer) putInCache(c cid.Cid, data []byte) { + r.cidToBlockCache.Set(c.String(), data, cache.DefaultExpiration) } type requestContext struct { @@ -465,11 +484,40 @@ func (c *requestContext) ReplyNoMod( return err } +func (s *rpcServer) prefetchSubgraph(ctx context.Context, wantedCid cid.Cid) error { + if s.lassieFetcher != nil { + // Fetch the subgraph from lassie + sub, err := s.lassieFetcher.GetSubgraph(ctx, wantedCid) + if err == nil { + // put in cache + return sub.Each(ctx, func(c cid.Cid, data []byte) error { + s.putInCache(c, data) + return nil + }) + } + klog.Errorf("failed to get subgraph from lassie: %v", err) + return err + } + return nil +} + func (s *rpcServer) GetNodeByCid(ctx context.Context, wantedCid cid.Cid) ([]byte, error) { + { + // try from cache + data, err, has := s.getFromCache(wantedCid) + if err != nil { + return nil, err + } + if has { + return data, nil + } + } if s.lassieFetcher != nil { // Fetch the node from lassie. data, err := s.lassieFetcher.GetNodeByCid(ctx, wantedCid) if err == nil { + // put in cache + s.putInCache(wantedCid, data) return data, nil } klog.Errorf("failed to get node from lassie: %v", err) @@ -579,6 +627,20 @@ func (ser *rpcServer) FindOffsetFromCid(ctx context.Context, cid cid.Cid) (uint6 return findOffsetFromCid(ser.cidToOffsetIndex, cid) } +func putValueIntoContext(ctx context.Context, key, value interface{}) context.Context { + return context.WithValue(ctx, key, value) +} + +func getValueFromContext(ctx context.Context, key interface{}) interface{} { + return ctx.Value(key) +} + +// WithSubrapghPrefetch sets the prefetch flag in the context +// to enable prefetching of subgraphs. +func WithSubrapghPrefetch(ctx context.Context, yesNo bool) context.Context { + return putValueIntoContext(ctx, "prefetch", yesNo) +} + func (ser *rpcServer) GetBlock(ctx context.Context, slot uint64) (*ipldbindcode.Block, error) { // get the slot by slot number wantedCid, err := ser.FindCidFromSlot(ctx, slot) @@ -587,6 +649,13 @@ func (ser *rpcServer) GetBlock(ctx context.Context, slot uint64) (*ipldbindcode. return nil, err } klog.Infof("found CID for slot %d: %s", slot, wantedCid) + { + doPrefetch := getValueFromContext(ctx, "prefetch") + if doPrefetch != nil && doPrefetch.(bool) { + // prefetch the block + ser.prefetchSubgraph(ctx, wantedCid) + } + } // get the block by CID data, err := ser.GetNodeByCid(ctx, wantedCid) if err != nil { @@ -670,6 +739,13 @@ func (ser *rpcServer) GetTransaction(ctx context.Context, sig solana.Signature) return nil, err } klog.Infof("found CID for signature %s: %s", sig, wantedCid) + { + doPrefetch := getValueFromContext(ctx, "prefetch") + if doPrefetch != nil && doPrefetch.(bool) { + // prefetch the block + ser.prefetchSubgraph(ctx, wantedCid) + } + } // get the transaction by CID data, err := ser.GetNodeByCid(ctx, wantedCid) if err != nil { @@ -716,11 +792,11 @@ func parseGetTransactionRequest(raw *json.RawMessage) (*GetTransactionRequest, e func (ser *rpcServer) Handle(ctx context.Context, conn *requestContext, req *jsonrpc2.Request) { switch req.Method { case "getBlock": - ser.getBlock(ctx, conn, req) + ser.handleGetBlock(ctx, conn, req) case "getTransaction": - ser.getTransaction(ctx, conn, req) + ser.handleGetTransaction(ctx, conn, req) case "getSignaturesForAddress": - ser.getSignaturesForAddress(ctx, conn, req) + ser.handleGetSignaturesForAddress(ctx, conn, req) default: conn.ReplyWithError( ctx, diff --git a/lassie-wrapper.go b/lassie-wrapper.go index 0f9c6f77..3f66d31d 100644 --- a/lassie-wrapper.go +++ b/lassie-wrapper.go @@ -52,6 +52,23 @@ func (l *lassieWrapper) GetNodeByCid(ctx context.Context, wantedCid cid.Cid) ([] return nil, nil } +func (l *lassieWrapper) GetSubgraph(ctx context.Context, wantedCid cid.Cid) (*WrappedMemStore, error) { + store := NewWrappedMemStore() + { + _, err := l.Fetch( + ctx, + wantedCid, + "", + types.DagScopeAll, + store, + ) + if err != nil { + return nil, err + } + } + return store, nil +} + func (l *lassieWrapper) Fetch( ctx context.Context, rootCid cid.Cid,