Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add stream transactions protobuf #183

Merged
merged 15 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions grpc-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
"github.com/ipld/go-car/util"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/rpcpool/yellowstone-faithful/compactindexsized"
"github.com/rpcpool/yellowstone-faithful/gsfa"
"github.com/rpcpool/yellowstone-faithful/gsfa/linkedlog"
"github.com/rpcpool/yellowstone-faithful/indexes"
"github.com/rpcpool/yellowstone-faithful/ipld/ipldbindcode"
"github.com/rpcpool/yellowstone-faithful/iplddecoders"
old_faithful_grpc "github.com/rpcpool/yellowstone-faithful/old-faithful-proto/old-faithful-grpc"
Expand Down Expand Up @@ -696,5 +699,113 @@
}

return false
}

func (multi *MultiEpoch) StreamTransactions(params *old_faithful_grpc.StreamTransactionsRequest, ser old_faithful_grpc.OldFaithful_StreamTransactionsServer) error {
ctx := ser.Context()

startSlot := params.StartSlot
endSlot := startSlot + maxSlotsToStream

if params.EndSlot != nil {
endSlot = *params.EndSlot
}
gsfaReader, _ := multi.getGsfaReadersInEpochDescendingOrderForSlotRange(ctx, startSlot, endSlot)

for slot := startSlot; slot <= endSlot; slot++ {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

if err := multi.processSlotTransactions(ctx, ser, slot, params.Filter, gsfaReader); err != nil {
return err
}
}
return nil
}

func (multi *MultiEpoch) processSlotTransactions(
ctx context.Context,
ser old_faithful_grpc.OldFaithful_StreamTransactionsServer,
slot uint64, filter *old_faithful_grpc.StreamTransactionsFilter,
gsfaReader *gsfa.GsfaReaderMultiepoch,
) error {

filterFunc := func(txn *ipldbindcode.Transaction) bool {

Check failure on line 736 in grpc-server.go

View workflow job for this annotation

GitHub Actions / test (1.21.x, ubuntu-latest)

filterFunc declared and not used
// fill this out

return true
}

if filter == nil || len(filter.AccountInclude) == 0 {

// get block -> not sure which one to use
// block -> transactions
// Apply filters
// Send
} else {
for _, account := range filter.AccountInclude {
pKey := solana.MustPublicKeyFromBase58(account)
epochToTxns, err := gsfaReader.Get(
ctx,
pKey,
1000,
func(epochNum uint64, oas linkedlog.OffsetAndSizeAndBlocktime) (*ipldbindcode.Transaction, error) {
anjor marked this conversation as resolved.
Show resolved Hide resolved
epoch, err := multi.GetEpoch(epochNum)
if err != nil {
return nil, fmt.Errorf("failed to get epoch %d: %w", epochNum, err)
}
raw, err := epoch.GetNodeByOffsetAndSize(ctx, nil, &indexes.OffsetAndSize{
Offset: oas.Offset,
Size: oas.Size,
})
if err != nil {
return nil, fmt.Errorf("failed to get signature: %w", err)
}
decoded, err := iplddecoders.DecodeTransaction(raw)
if err != nil {
return nil, fmt.Errorf("error while decoding transaction from nodex at offset %d: %w", oas.Offset, err)
}
return decoded, nil
},
)

if err != nil {
return err
}

for epochNumber, txns := range epochToTxns {
epochHandler, err := multi.GetEpoch(epochNumber)
if err != nil {
return status.Errorf(codes.NotFound, "Epoch %d is not available", epochNumber)
}
for _, txn := range txns {
txResp := new(old_faithful_grpc.TransactionResponse)
txResp.Transaction = new(old_faithful_grpc.Transaction)
{
pos, ok := txn.GetPositionIndex()
if ok {
txResp.Index = ptrToUint64(uint64(pos))
txResp.Transaction.Index = ptrToUint64(uint64(pos))
}
txResp.Transaction.Transaction, txResp.Transaction.Meta, err = getTransactionAndMetaFromNode(txn, epochHandler.GetDataFrameByCid)
if err != nil {
return status.Errorf(codes.Internal, "Failed to get transaction: %v", err)
}
txResp.Slot = uint64(txn.Slot)
// What to do for blocktime?
}

// not sure how to apply more filters
anjor marked this conversation as resolved.
Show resolved Hide resolved

if err := ser.Send(txResp); err != nil {
return err
}
}
}
}
}
return nil
}
Loading
Loading