diff --git a/block/fetcher/rpc.go b/block/fetcher/rpc.go index 58c5e413..50680471 100644 --- a/block/fetcher/rpc.go +++ b/block/fetcher/rpc.go @@ -9,6 +9,8 @@ import ( "math" "time" + "github.com/streamingfast/derr" + "golang.org/x/exp/slices" "github.com/gagliardetto/solana-go" @@ -105,23 +107,33 @@ func (f *RPCFetcher) Fetch(ctx context.Context, requestedSlot uint64) (out *pbbs return block, false, nil } -func (f *RPCFetcher) fetch(ctx context.Context, requestedSlot uint64) (out *rpc.GetBlockResult, skip bool, err error) { +func (f *RPCFetcher) fetch(ctx context.Context, requestedSlot uint64) (*rpc.GetBlockResult, bool, error) { currentSlot := requestedSlot + var out *rpc.GetBlockResult + skipped := false //f.logger.Info("getting block", zap.Uint64("block_num", currentSlot)) - blockResult, err := f.rpcClient.GetBlockWithOpts(ctx, requestedSlot, GetBlockOpts) - - if err != nil { - var rpcErr *jsonrpc.RPCError - if errors.As(err, &rpcErr) { - if rpcErr.Code == -32009 || rpcErr.Code == -32007 { - f.logger.Info("block was skipped", zap.Uint64("block_num", currentSlot)) - currentSlot += 1 - return nil, true, nil + err := derr.Retry(math.MaxFloat32, func(ctx context.Context) error { + var innerErr error + out, innerErr = f.rpcClient.GetBlockWithOpts(ctx, requestedSlot, GetBlockOpts) + + if innerErr != nil { + var rpcErr *jsonrpc.RPCError + if errors.As(innerErr, &rpcErr) { + if rpcErr.Code == -32004 { + f.logger.Warn("block not available. Retrying same block", zap.Uint64("block_num", currentSlot)) + return innerErr + } + if rpcErr.Code == -32009 || rpcErr.Code == -32007 { + f.logger.Info("block was skipped", zap.Uint64("block_num", currentSlot)) + currentSlot += 1 + skipped = true + return nil + } } } - return nil, false, fmt.Errorf("fetching block %d: %w", currentSlot, err) - } - return blockResult, false, nil + return nil + }) + return out, skipped, err } func blockFromBlockResult(slot uint64, finalizedSlot uint64, result *rpc.GetBlockResult) (*pbbstream.Block, error) {