Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make subscriptions eth compatible #2935

Merged
merged 5 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 95 additions & 38 deletions doc/RPC.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,18 @@ Creates a new subscription for particular events. The node returns a subscriptio

`QUANTITY` - The subscription ID

#### Sybscription types
### Subscription types

### newHeads

##### newHeads
Fires a notification when new header is appended to the chain

#### Params

1. `BOOLEAN` - returns full data if true. Only hash otherwise. Default is false

#### Example

```json
// Request
{"jsonrpc":"2.0","method":"eth_subscribe","params":["newHeads"],"id":1}
Expand All @@ -70,16 +77,18 @@ Fires a notification when new header is appended to the chain
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"result": {
...BLOCK DATA...
},
"result": "0x81b21bf43d12ddd97d0966c63d1260775d66dc0fdba5d6e62e9ddbddda20e391",
"subscription": "0x2"
}
}
```

##### newPendingTransactions
Fires a notification each time a pending transaction is added
### newPendingTransactions

Fires a notification with a transaction hash each time a pending transaction is added and is signed with a key that is available in the node

#### Example

```json
// Request
{"jsonrpc":"2.0","method":"eth_subscribe","params":["newPendingTransactions"],"id":1}
Expand All @@ -91,8 +100,16 @@ Fires a notification each time a pending transaction is added
{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":"0x18e1a9a0b0ff48ebf3c52e760191e8d8f9ee76c0f802981548158feb16648a1e","subscription":"0x3"}}
```

##### newDagBlocks
### newDagBlocks

Fires a notification when DAG block is added

#### Params

1. `BOOLEAN` - returns full data if true. Only hash otherwise. Default is false

#### Example

```json
// Request
{"jsonrpc":"2.0","method":"eth_subscribe","params":["newDagBlocks"],"id":1}
Expand Down Expand Up @@ -129,8 +146,53 @@ Fires a notification when DAG block is added
}
```

##### newDagBlocksFinalized
### logs

Subscribe to logs notifications

#### Params

1. `Object` - Filter object
* `address`: `DATA`, 20 Bytes - The address from which the log originated.
* `topics`: `Array of DATA`, 32 Bytes - Array of 32 Bytes `DATA` topics. Topics are order-dependent. A log is matched if any of the topics in the array match. Look at filter documentation for more details

#### Example
```json
// Request
{"jsonrpc":"2.0","id": 1, "method": "eth_subscribe", "params": ["logs", {"address":"0x00000000000000000000000000000000000000FE", "topics": ["0x9310ccfcb8de723f578a9e4282ea9f521f05ae40dc08f3068dfad528a65ee3c7"]}]}

// Result
{"id":1,"jsonrpc":"2.0","result":"0x5"}

// Subscription event
{
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"result": {
"address": "0x00000000000000000000000000000000000000fe",
"blockHash": "0xa5e49eeb9e83aa481bf8a7dedda6ab527f2996c0877b0509024141e243efe7ce",
"blockNumber": "0x13dc5",
"data": "0x0000000000000000000000000000000000000000000000929ee692bd630fd73a",
"logIndex": "0x0",
"removed": false,
"topics": [
"0x9310ccfcb8de723f578a9e4282ea9f521f05ae40dc08f3068dfad528a65ee3c7",
"0x000000000000000000000000d56dc0e94776fbb167f4ac7de7db60ac61ff0764",
"0x000000000000000000000000702d581a73b1632a2240ef09a11468e2448f7e14"
],
"transactionHash": "0xa52041e3e1b217b7076043d5eaf3393d95c2700090ed05053f24c545f53977a5",
"transactionIndex": "0x0"
},
"subscription": "0x5"
}
}
```

### newDagBlocksFinalized

Fires a notification when DAG block is finalized

```json
// Request
{"jsonrpc":"2.0","method":"eth_subscribe","params":["newDagBlocksFinalized"],"id":1}
Expand All @@ -142,8 +204,16 @@ Fires a notification when DAG block is finalized
{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"block":"0xae8237415925358af0668fecb999b3e5071ef472bd0d8daad7e880e991f7e4f2","period":"0xebb0fe"},"subscription":"0x5"}}
```

##### newPbftBlocks
### newPbftBlocks

Fires a notification each time a new PBFT block is added to the node

#### Params

1. `BOOLEAN` - returns full data if true. Only hash otherwise. Default is false

#### Example

```json
// Request
{"jsonrpc":"2.0","method":"eth_subscribe","params":["newPbftBlocks"],"id":1}
Expand All @@ -156,41 +226,25 @@ Fires a notification each time a new PBFT block is added to the node
"jsonrpc": "2.0",
"method": "eth_subscription",
"params": {
"result": {
"pbft_block": {
"beneficiary": "d0fec34ea0ae3f2e4ed935f47c18bab2d4824799",
"block_hash": "2423040510cee56b84db4885b90afd6fd3c7769993ae4217bee38eac29dd237f",
"dag_block_hash_as_pivot": "2e979ef5687154a2db84477ff4b0eb24d196cc4ed71850206e6488a3d0c6f0a7",
"extra_data": {
"major_version": 1,
"minor_version": 11,
"net_version": 3,
"node_implementation": "T",
"patch_version": 3,
"pillar_block_hash": ""
},
"order_hash": "29589cfaf143ae8aabf5dbf14d180308b40b0503d0a42239ba08ec43f9a2a4d7",
"period": 15446304,
"prev_block_hash": "2e0cfed1574db068b865ad09b3472e9cd2380775860492d1d20521e007bb64f1",
"prev_state_root_hash": "741a8961737d98f022071957f7153301b0bfbda6c5e1610d336352f24100e1aa",
"reward_votes": [...],
"schedule": {
"dag_blocks_order": [...]
},
"signature": "6c71d606c82bac7b6e981c7ddec91c806b32f62ba840cee067d5b53b164bf2c105b1dc4042eb15c034b94c37de2cd50c554c7bf2c9208d3fbe67a23c1c6ba47a00",
"timestamp": 1736771943
}
},
"result":"0x1227bee33c02ebac7c18a49381afc7ffabe4264611b7a7e5e980978ae1549360",
"subscription": "0x6"
}
}
```

##### newPillarBlockData
### newPillarBlockData

Fires a notification when Pillar block is added

#### params

1. `BOOLEAN` - if to include signatures in the notification. Default is false

#### Example

```json
// Request
{"jsonrpc":"2.0","method":"eth_subscribe","params":["newPillarBlockData", "includeSignatures"],"id":1}
{"jsonrpc":"2.0","method":"eth_subscribe","params":["newPillarBlockData", true],"id":1}

// Result
{"id":1,"jsonrpc":"2.0","result":"0x7"}
Expand All @@ -209,7 +263,10 @@ Fires a notification when Pillar block is added
"previous_pillar_block_hash": "0xb33b24bc01254d9ab69b1b72a1013f55597afca226e20d35e1ae3815400d0527",
"state_root": "0x71d78c731c07b502189b223ed3b30f4dd90af4a6467f50c9df566bfaad6e45e9",
"validators_vote_counts_changes": []
}
},
"signatures": [
...
]
},
"subscription": "0x7"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class TransactionManager : public std::enable_shared_from_this<TransactionManage
addr_t getFullNodeAddress() const;

public:
util::Event<TransactionManager, h256> const transaction_accepted_{};
util::Event<TransactionManager, std::shared_ptr<Transaction>> const transaction_added_{};

private:
const FullNodeConfig &kConf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ TransactionStatus TransactionManager::insertValidatedTransaction(std::shared_ptr

const auto last_block_number = final_chain_->lastBlockNumber();
LOG(log_dg_) << "Transaction " << trx_hash << " inserted in trx pool";
if (proposable) {
transaction_added_.emit(tx);
}
return transactions_pool_.insert(std::move(tx), proposable, last_block_number);
}

Expand Down Expand Up @@ -189,8 +192,6 @@ std::shared_ptr<Transaction> TransactionManager::getTransaction(trx_hash_t const
}

void TransactionManager::saveTransactionsFromDagBlock(SharedTransactions const &trxs) {
std::vector<trx_hash_t> accepted_transactions;
accepted_transactions.reserve(trxs.size());
auto write_batch = db_->createWriteBatch();
vec_trx_t trx_hashes;
std::transform(trxs.begin(), trxs.end(), std::back_inserter(trx_hashes),
Expand All @@ -202,26 +203,21 @@ void TransactionManager::saveTransactionsFromDagBlock(SharedTransactions const &
std::unique_lock transactions_lock(transactions_mutex_);

for (auto t : trxs) {
const auto tx_hash = t->getHash();
const auto trx_hash = t->getHash();

if (!recently_finalized_transactions_.contains(tx_hash) && !nonfinalized_transactions_in_dag_.contains(tx_hash) &&
!db_->transactionFinalized(tx_hash)) {
if (!recently_finalized_transactions_.contains(trx_hash) &&
!nonfinalized_transactions_in_dag_.contains(trx_hash) && !db_->transactionFinalized(trx_hash)) {
db_->addTransactionToBatch(*t, write_batch);
nonfinalized_transactions_in_dag_.emplace(tx_hash, t);
if (transactions_pool_.erase(tx_hash)) {
LOG(log_dg_) << "Transaction " << tx_hash << " removed from trx pool ";
// Transactions are counted when included in DAG
accepted_transactions.emplace_back(tx_hash);
nonfinalized_transactions_in_dag_.emplace(trx_hash, t);
if (transactions_pool_.erase(trx_hash)) {
LOG(log_dg_) << "Transaction " << trx_hash << " removed from trx pool ";
}
trx_count_++;
}
}
db_->addStatusFieldToBatch(StatusDbField::TrxCount, trx_count_, write_batch);
db_->commitWriteBatch(write_batch);
}
for (const auto &trx_hash : accepted_transactions) {
transaction_accepted_.emit(trx_hash);
}
}

void TransactionManager::recoverNonfinalizedTransactions() {
Expand Down
125 changes: 125 additions & 0 deletions libraries/core_libs/network/include/network/subscriptions.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#pragma once

#include <json/json.h>

#include <functional>
#include <list>
#include <map>
#include <memory>

#include "network/rpc/eth/LogFilter.hpp"

enum class SubscriptionType {
HEADS,
DAG_BLOCKS,
TRANSACTIONS,
DAG_BLOCK_FINALIZED,
PBFT_BLOCK_EXECUTED,
PILLAR_BLOCK,
LOGS,
};

namespace taraxa::net {

class Subscription {
public:
Subscription(int id) : id_(id) {}
virtual ~Subscription() = default;
virtual SubscriptionType getType() const = 0;
int getId() const { return id_; }
virtual std::string processPayload(Json::Value payload) const = 0;

protected:
int id_;
};

class HeadsSubscription : public Subscription {
public:
explicit HeadsSubscription(int id, bool hash_only = false) : Subscription(id), full_data_(hash_only) {}
static constexpr SubscriptionType type = SubscriptionType::HEADS;

SubscriptionType getType() const override { return type; }
std::string processPayload(Json::Value payload) const override;

private:
bool full_data_ = false;
};

class DagBlocksSubscription : public Subscription {
public:
explicit DagBlocksSubscription(int id, bool hash_only = false) : Subscription(id), full_data_(hash_only) {}
static constexpr SubscriptionType type = SubscriptionType::DAG_BLOCKS;
SubscriptionType getType() const override { return type; }
std::string processPayload(Json::Value payload) const override;

private:
bool full_data_ = false;
};

class TransactionsSubscription : public Subscription {
public:
explicit TransactionsSubscription(int id) : Subscription(id) {}
static constexpr SubscriptionType type = SubscriptionType::TRANSACTIONS;
SubscriptionType getType() const override { return type; }
std::string processPayload(Json::Value payload) const override;
};

class DagBlockFinalizedSubscription : public Subscription {
public:
explicit DagBlockFinalizedSubscription(int id) : Subscription(id) {}
static constexpr SubscriptionType type = SubscriptionType::DAG_BLOCK_FINALIZED;
SubscriptionType getType() const override { return type; }
std::string processPayload(Json::Value payload) const override;
};

class PbftBlockExecutedSubscription : public Subscription {
public:
explicit PbftBlockExecutedSubscription(int id, bool full_block = false) : Subscription(id), full_block_(full_block) {}
static constexpr SubscriptionType type = SubscriptionType::PBFT_BLOCK_EXECUTED;
SubscriptionType getType() const override { return type; }
std::string processPayload(Json::Value payload) const override;

private:
bool full_block_ = false;
};

class PillarBlockSubscription : public Subscription {
public:
explicit PillarBlockSubscription(int id, bool include_signatures = false)
: Subscription(id), include_signatures_(include_signatures) {}
static constexpr SubscriptionType type = SubscriptionType::PILLAR_BLOCK;
SubscriptionType getType() const override { return type; }
std::string processPayload(Json::Value payload) const override;

private:
bool include_signatures_ = false;
};

class LogsSubscription : public Subscription {
public:
explicit LogsSubscription(int id, rpc::eth::LogFilter&& filter) : Subscription(id), filter_(filter) {}
static constexpr SubscriptionType type = SubscriptionType::LOGS;
SubscriptionType getType() const override { return type; }
std::string processPayload(Json::Value payload) const override;
rpc::eth::LogFilter getFilter() const { return filter_; }

private:
rpc::eth::LogFilter filter_;
};

class Subscriptions {
public:
Subscriptions(std::function<void(std::string&&)> send) : send_(send) {}
int addSubscription(std::shared_ptr<Subscription> subscription);
bool removeSubscription(int id);
void process(SubscriptionType type, const Json::Value& payload);
void processLogs(const final_chain::BlockHeader& header, TransactionHashes trx_hashes,
const final_chain::TransactionReceipts& receipts);

private:
std::function<void(std::string&&)> send_;
std::map<uint64_t, std::shared_ptr<Subscription>> subscriptions_;
std::map<SubscriptionType, std::list<uint64_t>> subscriptions_by_type_;
std::mutex subscriptions_mutex_;
};
} // namespace taraxa::net
Loading
Loading