Skip to content

Commit

Permalink
add stream transactions protobuf (#183)
Browse files Browse the repository at this point in the history
* add stream transactions protobuf

* impl

* no filter case

* working

* account include impl

* missing return

* filters

* filters

* return

* latest

* add filters

* all txns

* check vote

* index

* slot checking
  • Loading branch information
anjor authored Dec 12, 2024
1 parent 9540dab commit a7ca464
Show file tree
Hide file tree
Showing 5 changed files with 541 additions and 80 deletions.
195 changes: 195 additions & 0 deletions grpc-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,19 @@ import (
"sync"
"time"

bin "github.com/gagliardetto/binary"
"github.com/gagliardetto/solana-go"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car/util"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/rpcpool/yellowstone-faithful/compactindexsized"
"github.com/rpcpool/yellowstone-faithful/gsfa"
"github.com/rpcpool/yellowstone-faithful/gsfa/linkedlog"
"github.com/rpcpool/yellowstone-faithful/indexes"
"github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode"
"github.com/rpcpool/yellowstone-faithful/iplddecoders"
old_faithful_grpc "github.com/rpcpool/yellowstone-faithful/old-faithful-proto/old-faithful-grpc"
solanatxmetaparsers "github.com/rpcpool/yellowstone-faithful/solana-tx-meta-parsers"
"github.com/rpcpool/yellowstone-faithful/tooling"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
Expand Down Expand Up @@ -696,5 +701,195 @@ func blockContainsAccounts(block *old_faithful_grpc.BlockResponse, accounts []st
}

return false
}

func (multi *MultiEpoch) StreamTransactions(params *old_faithful_grpc.StreamTransactionsRequest, ser old_faithful_grpc.OldFaithful_StreamTransactionsServer) error {
ctx := ser.Context()

startSlot := params.StartSlot
endSlot := startSlot + maxSlotsToStream

if params.EndSlot != nil {
endSlot = *params.EndSlot
}
gsfaReader, _ := multi.getGsfaReadersInEpochDescendingOrderForSlotRange(ctx, startSlot, endSlot)

for slot := startSlot; slot <= endSlot; slot++ {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

if err := multi.processSlotTransactions(ctx, ser, slot, params.Filter, gsfaReader); err != nil {
return err
}
}
return nil
}

func (multi *MultiEpoch) processSlotTransactions(
ctx context.Context,
ser old_faithful_grpc.OldFaithful_StreamTransactionsServer,
slot uint64, filter *old_faithful_grpc.StreamTransactionsFilter,
gsfaReader *gsfa.GsfaReaderMultiepoch,
) error {

filterOutTxn := func(tx solana.Transaction, meta any) bool {
if filter == nil {
return true
}

if !(*filter.Vote) && IsSimpleVoteTransaction(&tx) { // If vote is false, we should filter out vote transactions
return false
}

if !(*filter.Failed) { // If failed is false, we should filter out failed transactions
err := getErr(meta)
if err != nil {
return false
}
}

// AccountInclude is handled in the main function

for _, acc := range filter.AccountExclude {
pkey := solana.MustPublicKeyFromBase58(acc)
ok, err := tx.HasAccount(pkey)
if err != nil {
klog.Errorf("Failed to check if transaction %v has account %s", tx, acc)
return false
}
if ok { // If any excluded account is present, filter out the transaction
return false
}
}

for _, acc := range filter.AccountRequired {
pkey := solana.MustPublicKeyFromBase58(acc)
ok, err := tx.HasAccount(pkey)
if err != nil {
klog.Errorf("Failed to check if transaction %v has account %s", tx, acc)
return false
}
if !ok { // If any required account is missing, filter out the transaction
return false
}
}

return true
}

if filter == nil || len(filter.AccountInclude) == 0 {

block, err := multi.GetBlock(ctx, &old_faithful_grpc.BlockRequest{Slot: slot})
if err != nil {
if status.Code(err) == codes.NotFound {
return nil
}
return err
}

for _, tx := range block.Transactions {
decoder := bin.NewBinDecoder(tx.Transaction)
txn, err := solana.TransactionFromDecoder(decoder)
if err != nil {
return status.Errorf(codes.Internal, "Failed to decode transaction: %v", err)
}

meta, err := solanatxmetaparsers.ParseAnyTransactionStatusMeta(tx.Meta)
if err != nil {
return status.Errorf(codes.Internal, "Failed to parse transaction meta: %v", err)
}

if !filterOutTxn(*txn, meta) {

txResp := new(old_faithful_grpc.TransactionResponse)
txResp.Transaction = new(old_faithful_grpc.Transaction)

{
txResp.Transaction.Transaction = tx.Transaction
txResp.Transaction.Meta = tx.Meta
txResp.Transaction.Index = tx.Index

// To do: add blocketime after index work is done
}

if err := ser.Send(txResp); err != nil {
return err
}
}
}
} else {
for _, account := range filter.AccountInclude {
pKey := solana.MustPublicKeyFromBase58(account)
epochToTxns, err := gsfaReader.Get(
ctx,
pKey,
1000,
func(epochNum uint64, oas linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error) {
epoch, err := multi.GetEpoch(epochNum)
if err != nil {
return nil, fmt.Errorf("failed to get epoch %d: %w", epochNum, err)
}
raw, err := epoch.GetNodeByOffsetAndSize(ctx, nil, &indexes.OffsetAndSize{
Offset: oas.Offset,
Size: oas.Size,
})
if err != nil {
return nil, fmt.Errorf("failed to get signature: %w", err)
}
decoded, err := iplddecoders.DecodeTransaction(raw)
if err != nil {
return nil, fmt.Errorf("error while decoding transaction from nodex at offset %d: %w", oas.Offset, err)
}
return decoded, nil
},
)
if err != nil {
return err
}

for epochNumber, txns := range epochToTxns {
epochHandler, err := multi.GetEpoch(epochNumber)
if err != nil {
return status.Errorf(codes.NotFound, "Epoch %d is not available", epochNumber)
}
for _, txn := range txns {
if slot != uint64(txn.Slot) { // If the transaction is not in the requested slot, skip
continue
}
tx, meta, err := parseTransactionAndMetaFromNode(txn, epochHandler.GetDataFrameByCid)
if err != nil {
return status.Errorf(codes.Internal, "Failed to parse transaction from node: %v", err)
}

if !filterOutTxn(tx, meta) {

txResp := new(old_faithful_grpc.TransactionResponse)
txResp.Transaction = new(old_faithful_grpc.Transaction)
{
pos, ok := txn.GetPositionIndex()
if ok {
txResp.Index = ptrToUint64(uint64(pos))
txResp.Transaction.Index = ptrToUint64(uint64(pos))
}
txResp.Transaction.Transaction, txResp.Transaction.Meta, err = getTransactionAndMetaFromNode(txn, epochHandler.GetDataFrameByCid)
if err != nil {
return status.Errorf(codes.Internal, "Failed to get transaction: %v", err)
}
txResp.Slot = uint64(txn.Slot)

// To do: add blocketime after index work is done
}

if err := ser.Send(txResp); err != nil {
return err
}
}
}
}
}
}
return nil
}
Loading

0 comments on commit a7ca464

Please sign in to comment.