Skip to content

Commit

Permalink
starknet_subscribePendingTransactions all tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
weiihann authored and pnowosie committed Oct 23, 2024
1 parent a61fe2a commit e9aaefc
Show file tree
Hide file tree
Showing 6 changed files with 516 additions and 131 deletions.
14 changes: 14 additions & 0 deletions mocks/mock_synchronizer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

127 changes: 126 additions & 1 deletion rpc/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ import (
)

const (
MaxBlocksBack = 1024
MaxBlocksBack = 1024
MaxAddressesInFilter = 1000 // TODO(weiihann): not finalised yet
)

type EventsArg struct {
Expand Down Expand Up @@ -116,6 +117,130 @@ func (h *Handler) SubscribeNewHeads(ctx context.Context, blockID *BlockID) (*Sub
return &SubscriptionID{ID: id}, nil
}

func (h *Handler) SubscribePendingTxs(ctx context.Context, getDetails *bool, senderAddr []felt.Felt) (*SubscriptionID, *jsonrpc.Error) {
w, ok := jsonrpc.ConnFromContext(ctx)
if !ok {
return nil, jsonrpc.Err(jsonrpc.MethodNotFound, nil)
}

if len(senderAddr) > MaxAddressesInFilter {
return nil, ErrTooManyAddressesInFilter
}

id := h.idgen()
subscriptionCtx, subscriptionCtxCancel := context.WithCancel(ctx)
sub := &subscription{
cancel: subscriptionCtxCancel,
conn: w,
}
h.mu.Lock()
h.subscriptions[id] = sub
h.mu.Unlock()

pendingTxsSub := h.pendingTxs.Subscribe()
sub.wg.Go(func() {
defer func() {
h.unsubscribe(sub, id)
pendingTxsSub.Unsubscribe()
}()

h.processPendingTxs(subscriptionCtx, getDetails != nil && *getDetails, senderAddr, pendingTxsSub, w, id)
})

return &SubscriptionID{ID: id}, nil
}

func (h *Handler) processPendingTxs(
ctx context.Context,
getDetails bool,
senderAddr []felt.Felt,
pendingTxsSub *feed.Subscription[[]core.Transaction],
w jsonrpc.Conn,
id uint64,
) {
for {
select {
case <-ctx.Done():
return
case pendingTxs := <-pendingTxsSub.Recv():
filteredTxs := h.filterTxs(pendingTxs, getDetails, senderAddr)
if err := h.sendPendingTxs(w, filteredTxs, id); err != nil {
h.log.Warnw("Error sending pending transactions", "err", err)
return
}
}
}
}

func (h *Handler) filterTxs(pendingTxs []core.Transaction, getDetails bool, senderAddr []felt.Felt) interface{} {
if getDetails {
return h.filterTxDetails(pendingTxs, senderAddr)
}
return h.filterTxHashes(pendingTxs, senderAddr)
}

func (h *Handler) filterTxDetails(pendingTxs []core.Transaction, senderAddr []felt.Felt) []*Transaction {
filteredTxs := make([]*Transaction, 0, len(pendingTxs))
for _, txn := range pendingTxs {
if h.shouldIncludeTx(txn, senderAddr) {
filteredTxs = append(filteredTxs, AdaptTransaction(txn))
}
}
return filteredTxs
}

func (h *Handler) filterTxHashes(pendingTxs []core.Transaction, senderAddr []felt.Felt) []felt.Felt {
filteredTxHashes := make([]felt.Felt, 0, len(pendingTxs))
for _, txn := range pendingTxs {
if h.shouldIncludeTx(txn, senderAddr) {
filteredTxHashes = append(filteredTxHashes, *txn.Hash())
}
}
return filteredTxHashes
}

func (h *Handler) shouldIncludeTx(txn core.Transaction, senderAddr []felt.Felt) bool {
if len(senderAddr) == 0 {
return true
}

//
switch t := txn.(type) {
case *core.InvokeTransaction:
for _, addr := range senderAddr {
if t.SenderAddress.Equal(&addr) {
return true
}
}
case *core.DeclareTransaction:
for _, addr := range senderAddr {
if t.SenderAddress.Equal(&addr) {
return true
}
}
}

return false
}

func (h *Handler) sendPendingTxs(w jsonrpc.Conn, result interface{}, id uint64) error {
req := jsonrpc.Request{
Version: "2.0",
Method: "starknet_subscriptionPendingTransactions",
Params: map[string]interface{}{
"subscription_id": id,
"result": result,
},
}

resp, err := json.Marshal(req)
if err != nil {
return err
}
_, err = w.Write(resp)
return err
}

// getStartAndLatestHeaders gets the start and latest header for the subscription
func (h *Handler) getStartAndLatestHeaders(blockID *BlockID) (*core.Header, *core.Header, *jsonrpc.Error) {
if blockID == nil || blockID.Latest {
Expand Down
Loading

0 comments on commit e9aaefc

Please sign in to comment.