Skip to content

Commit

Permalink
Merge pull request #2935 from Taraxa-project/eth_subscribe
Browse files Browse the repository at this point in the history
Make subscriptions eth compatible
  • Loading branch information
kstdl authored Jan 22, 2025
2 parents ce50f68 + 455a515 commit 8fe4268
Show file tree
Hide file tree
Showing 17 changed files with 648 additions and 332 deletions.
133 changes: 95 additions & 38 deletions doc/RPC.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,18 @@ Creates a new subscription for particular events. The node returns a subscriptio

`QUANTITY` - The subscription ID

#### Sybscription types
### Subscription types

### newHeads

##### newHeads
Fires a notification when new header is appended to the chain

#### Params

1. `BOOLEAN` - returns full data if true. Only hash otherwise. Default is false

#### Example

```json
// Request
{"jsonrpc":"2.0","method":"eth_subscribe","params":["newHeads"],"id":1}
Expand All @@ -70,16 +77,18 @@ Fires a notification when new header is appended to the chain
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"result": {
...BLOCK DATA...
},
"result": "0x81b21bf43d12ddd97d0966c63d1260775d66dc0fdba5d6e62e9ddbddda20e391",
"subscription": "0x2"
}
}
```

##### newPendingTransactions
Fires a notification each time a pending transaction is added
### newPendingTransactions

Fires a notification with a transaction hash each time a pending transaction is added and is signed with a key that is available in the node

#### Example

```json
// Request
{"jsonrpc":"2.0","method":"eth_subscribe","params":["newPendingTransactions"],"id":1}
Expand All @@ -91,8 +100,16 @@ Fires a notification each time a pending transaction is added
{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":"0x18e1a9a0b0ff48ebf3c52e760191e8d8f9ee76c0f802981548158feb16648a1e","subscription":"0x3"}}
```

##### newDagBlocks
### newDagBlocks

Fires a notification when DAG block is added

#### Params

1. `BOOLEAN` - returns full data if true. Only hash otherwise. Default is false

#### Example

```json
// Request
{"jsonrpc":"2.0","method":"eth_subscribe","params":["newDagBlocks"],"id":1}
Expand Down Expand Up @@ -129,8 +146,53 @@ Fires a notification when DAG block is added
}
```

##### newDagBlocksFinalized
### logs

Subscribe to logs notifications

#### Params

1. `Object` - Filter object
* `address`: `DATA`, 20 Bytes - The address from which the log originated.
* `topics`: `Array of DATA`, 32 Bytes - Array of 32 Bytes `DATA` topics. Topics are order-dependent. A log is matched if any of the topics in the array match. Look at filter documentation for more details

#### Example
```json
// Request
{"jsonrpc":"2.0","id": 1, "method": "eth_subscribe", "params": ["logs", {"address":"0x00000000000000000000000000000000000000FE", "topics": ["0x9310ccfcb8de723f578a9e4282ea9f521f05ae40dc08f3068dfad528a65ee3c7"]}]}

// Result
{"id":1,"jsonrpc":"2.0","result":"0x5"}

// Subscription event
{
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"result": {
"address": "0x00000000000000000000000000000000000000fe",
"blockHash": "0xa5e49eeb9e83aa481bf8a7dedda6ab527f2996c0877b0509024141e243efe7ce",
"blockNumber": "0x13dc5",
"data": "0x0000000000000000000000000000000000000000000000929ee692bd630fd73a",
"logIndex": "0x0",
"removed": false,
"topics": [
"0x9310ccfcb8de723f578a9e4282ea9f521f05ae40dc08f3068dfad528a65ee3c7",
"0x000000000000000000000000d56dc0e94776fbb167f4ac7de7db60ac61ff0764",
"0x000000000000000000000000702d581a73b1632a2240ef09a11468e2448f7e14"
],
"transactionHash": "0xa52041e3e1b217b7076043d5eaf3393d95c2700090ed05053f24c545f53977a5",
"transactionIndex": "0x0"
},
"subscription": "0x5"
}
}
```

### newDagBlocksFinalized

Fires a notification when DAG block is finalized

```json
// Request
{"jsonrpc":"2.0","method":"eth_subscribe","params":["newDagBlocksFinalized"],"id":1}
Expand All @@ -142,8 +204,16 @@ Fires a notification when DAG block is finalized
{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"block":"0xae8237415925358af0668fecb999b3e5071ef472bd0d8daad7e880e991f7e4f2","period":"0xebb0fe"},"subscription":"0x5"}}
```

##### newPbftBlocks
### newPbftBlocks

Fires a notification each time a new PBFT block is added to the node

#### Params

1. `BOOLEAN` - returns full data if true. Only hash otherwise. Default is false

#### Example

```json
// Request
{"jsonrpc":"2.0","method":"eth_subscribe","params":["newPbftBlocks"],"id":1}
Expand All @@ -156,41 +226,25 @@ Fires a notification each time a new PBFT block is added to the node
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"result": {
"pbft_block": {
"beneficiary": "d0fec34ea0ae3f2e4ed935f47c18bab2d4824799",
"block_hash": "2423040510cee56b84db4885b90afd6fd3c7769993ae4217bee38eac29dd237f",
"dag_block_hash_as_pivot": "2e979ef5687154a2db84477ff4b0eb24d196cc4ed71850206e6488a3d0c6f0a7",
"extra_data": {
"major_version": 1,
"minor_version": 11,
"net_version": 3,
"node_implementation": "T",
"patch_version": 3,
"pillar_block_hash": ""
},
"order_hash": "29589cfaf143ae8aabf5dbf14d180308b40b0503d0a42239ba08ec43f9a2a4d7",
"period": 15446304,
"prev_block_hash": "2e0cfed1574db068b865ad09b3472e9cd2380775860492d1d20521e007bb64f1",
"prev_state_root_hash": "741a8961737d98f022071957f7153301b0bfbda6c5e1610d336352f24100e1aa",
"reward_votes": [...],
"schedule": {
"dag_blocks_order": [...]
},
"signature": "6c71d606c82bac7b6e981c7ddec91c806b32f62ba840cee067d5b53b164bf2c105b1dc4042eb15c034b94c37de2cd50c554c7bf2c9208d3fbe67a23c1c6ba47a00",
"timestamp": 1736771943
}
},
"result":"0x1227bee33c02ebac7c18a49381afc7ffabe4264611b7a7e5e980978ae1549360",
"subscription": "0x6"
}
}
```

##### newPillarBlockData
### newPillarBlockData

Fires a notification when Pillar block is added

#### params

1. `BOOLEAN` - if to include signatures in the notification. Default is false

#### Example

```json
// Request
{"jsonrpc":"2.0","method":"eth_subscribe","params":["newPillarBlockData", "includeSignatures"],"id":1}
{"jsonrpc":"2.0","method":"eth_subscribe","params":["newPillarBlockData", true],"id":1}

// Result
{"id":1,"jsonrpc":"2.0","result":"0x7"}
Expand All @@ -209,7 +263,10 @@ Fires a notification when Pillar block is added
"previous_pillar_block_hash": "0xb33b24bc01254d9ab69b1b72a1013f55597afca226e20d35e1ae3815400d0527",
"state_root": "0x71d78c731c07b502189b223ed3b30f4dd90af4a6467f50c9df566bfaad6e45e9",
"validators_vote_counts_changes": []
}
},
"signatures": [
...
]
},
"subscription": "0x7"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class TransactionManager : public std::enable_shared_from_this<TransactionManage
addr_t getFullNodeAddress() const;

public:
util::Event<TransactionManager, h256> const transaction_accepted_{};
util::Event<TransactionManager, std::shared_ptr<Transaction>> const transaction_added_{};

private:
const FullNodeConfig &kConf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ TransactionStatus TransactionManager::insertValidatedTransaction(std::shared_ptr

const auto last_block_number = final_chain_->lastBlockNumber();
LOG(log_dg_) << "Transaction " << trx_hash << " inserted in trx pool";
if (proposable) {
transaction_added_.emit(tx);
}
return transactions_pool_.insert(std::move(tx), proposable, last_block_number);
}

Expand Down Expand Up @@ -189,8 +192,6 @@ std::shared_ptr<Transaction> TransactionManager::getTransaction(trx_hash_t const
}

void TransactionManager::saveTransactionsFromDagBlock(SharedTransactions const &trxs) {
std::vector<trx_hash_t> accepted_transactions;
accepted_transactions.reserve(trxs.size());
auto write_batch = db_->createWriteBatch();
vec_trx_t trx_hashes;
std::transform(trxs.begin(), trxs.end(), std::back_inserter(trx_hashes),
Expand All @@ -202,26 +203,21 @@ void TransactionManager::saveTransactionsFromDagBlock(SharedTransactions const &
std::unique_lock transactions_lock(transactions_mutex_);

for (auto t : trxs) {
const auto tx_hash = t->getHash();
const auto trx_hash = t->getHash();

if (!recently_finalized_transactions_.contains(tx_hash) && !nonfinalized_transactions_in_dag_.contains(tx_hash) &&
!db_->transactionFinalized(tx_hash)) {
if (!recently_finalized_transactions_.contains(trx_hash) &&
!nonfinalized_transactions_in_dag_.contains(trx_hash) && !db_->transactionFinalized(trx_hash)) {
db_->addTransactionToBatch(*t, write_batch);
nonfinalized_transactions_in_dag_.emplace(tx_hash, t);
if (transactions_pool_.erase(tx_hash)) {
LOG(log_dg_) << "Transaction " << tx_hash << " removed from trx pool ";
// Transactions are counted when included in DAG
accepted_transactions.emplace_back(tx_hash);
nonfinalized_transactions_in_dag_.emplace(trx_hash, t);
if (transactions_pool_.erase(trx_hash)) {
LOG(log_dg_) << "Transaction " << trx_hash << " removed from trx pool ";
}
trx_count_++;
}
}
db_->addStatusFieldToBatch(StatusDbField::TrxCount, trx_count_, write_batch);
db_->commitWriteBatch(write_batch);
}
for (const auto &trx_hash : accepted_transactions) {
transaction_accepted_.emit(trx_hash);
}
}

void TransactionManager::recoverNonfinalizedTransactions() {
Expand Down
125 changes: 125 additions & 0 deletions libraries/core_libs/network/include/network/subscriptions.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#pragma once

#include <json/json.h>

#include <functional>
#include <list>
#include <map>
#include <memory>

#include "network/rpc/eth/LogFilter.hpp"

enum class SubscriptionType {
HEADS,
DAG_BLOCKS,
TRANSACTIONS,
DAG_BLOCK_FINALIZED,
PBFT_BLOCK_EXECUTED,
PILLAR_BLOCK,
LOGS,
};

namespace taraxa::net {

class Subscription {
public:
Subscription(int id) : id_(id) {}
virtual ~Subscription() = default;
virtual SubscriptionType getType() const = 0;
int getId() const { return id_; }
virtual std::string processPayload(Json::Value payload) const = 0;

protected:
int id_;
};

class HeadsSubscription : public Subscription {
public:
explicit HeadsSubscription(int id, bool hash_only = false) : Subscription(id), full_data_(hash_only) {}
static constexpr SubscriptionType type = SubscriptionType::HEADS;

SubscriptionType getType() const override { return type; }
std::string processPayload(Json::Value payload) const override;

private:
bool full_data_ = false;
};

class DagBlocksSubscription : public Subscription {
public:
explicit DagBlocksSubscription(int id, bool hash_only = false) : Subscription(id), full_data_(hash_only) {}
static constexpr SubscriptionType type = SubscriptionType::DAG_BLOCKS;
SubscriptionType getType() const override { return type; }
std::string processPayload(Json::Value payload) const override;

private:
bool full_data_ = false;
};

class TransactionsSubscription : public Subscription {
public:
explicit TransactionsSubscription(int id) : Subscription(id) {}
static constexpr SubscriptionType type = SubscriptionType::TRANSACTIONS;
SubscriptionType getType() const override { return type; }
std::string processPayload(Json::Value payload) const override;
};

class DagBlockFinalizedSubscription : public Subscription {
public:
explicit DagBlockFinalizedSubscription(int id) : Subscription(id) {}
static constexpr SubscriptionType type = SubscriptionType::DAG_BLOCK_FINALIZED;
SubscriptionType getType() const override { return type; }
std::string processPayload(Json::Value payload) const override;
};

class PbftBlockExecutedSubscription : public Subscription {
public:
explicit PbftBlockExecutedSubscription(int id, bool full_block = false) : Subscription(id), full_block_(full_block) {}
static constexpr SubscriptionType type = SubscriptionType::PBFT_BLOCK_EXECUTED;
SubscriptionType getType() const override { return type; }
std::string processPayload(Json::Value payload) const override;

private:
bool full_block_ = false;
};

class PillarBlockSubscription : public Subscription {
public:
explicit PillarBlockSubscription(int id, bool include_signatures = false)
: Subscription(id), include_signatures_(include_signatures) {}
static constexpr SubscriptionType type = SubscriptionType::PILLAR_BLOCK;
SubscriptionType getType() const override { return type; }
std::string processPayload(Json::Value payload) const override;

private:
bool include_signatures_ = false;
};

class LogsSubscription : public Subscription {
public:
explicit LogsSubscription(int id, rpc::eth::LogFilter&& filter) : Subscription(id), filter_(filter) {}
static constexpr SubscriptionType type = SubscriptionType::LOGS;
SubscriptionType getType() const override { return type; }
std::string processPayload(Json::Value payload) const override;
rpc::eth::LogFilter getFilter() const { return filter_; }

private:
rpc::eth::LogFilter filter_;
};

class Subscriptions {
public:
Subscriptions(std::function<void(std::string&&)> send) : send_(send) {}
int addSubscription(std::shared_ptr<Subscription> subscription);
bool removeSubscription(int id);
void process(SubscriptionType type, const Json::Value& payload);
void processLogs(const final_chain::BlockHeader& header, TransactionHashes trx_hashes,
const final_chain::TransactionReceipts& receipts);

private:
std::function<void(std::string&&)> send_;
std::map<uint64_t, std::shared_ptr<Subscription>> subscriptions_;
std::map<SubscriptionType, std::list<uint64_t>> subscriptions_by_type_;
std::mutex subscriptions_mutex_;
};
} // namespace taraxa::net
Loading

0 comments on commit 8fe4268

Please sign in to comment.