Skip to content

Commit

Permalink
Notify telemetry only every second about the tx pool status (#6605)
Browse files Browse the repository at this point in the history
Before this was done for every imported transaction. When a lot of
transactions got imported, the import notification channel was filled.
The underlying problem was that the `status` call is read locking the
`validated_pool` which will be write locked by the internal submitting
logic. Thus, the submitting and status reading was interferring which
each other.

---------

Co-authored-by: GitHub Action <[email protected]>
  • Loading branch information
bkchr and actions-user authored Nov 24, 2024
1 parent 08ec8cd commit 1e3b8e1
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 17 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cumulus/client/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ workspace = true

[dependencies]
futures = { workspace = true }
futures-timer = { workspace = true }

# Substrate
sc-client-api = { workspace = true, default-features = true }
Expand Down
10 changes: 10 additions & 0 deletions prdoc/pr_6605.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
title: Notify telemetry only every second about the tx pool status
doc:
- audience: Node Operator
description: |-
Before this was done for every imported transaction. When a lot of transactions got imported, the import notification channel was filled. The underlying problem was that the `status` call is read locking the `validated_pool` which will be write locked by the internal submitting logic. Thus, the submitting and status reading was interferring which each other.
crates:
- name: cumulus-client-service
bump: patch
- name: sc-service
bump: patch
58 changes: 41 additions & 17 deletions substrate/client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
start_rpc_servers, BuildGenesisBlock, GenesisBlockBuilder, RpcHandlers, SpawnTaskHandle,
TaskManager, TransactionPoolAdapter,
};
use futures::{future::ready, FutureExt, StreamExt};
use futures::{select, FutureExt, StreamExt};
use jsonrpsee::RpcModule;
use log::info;
use prometheus_endpoint::Registry;
Expand Down Expand Up @@ -90,7 +90,11 @@ use sp_consensus::block_validation::{
use sp_core::traits::{CodeExecutor, SpawnNamed};
use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, BlockIdTo, NumberFor, Zero};
use std::{str::FromStr, sync::Arc, time::SystemTime};
use std::{
str::FromStr,
sync::Arc,
time::{Duration, SystemTime},
};

/// Full client type.
pub type TFullClient<TBl, TRtApi, TExec> =
Expand Down Expand Up @@ -577,22 +581,42 @@ pub async fn propagate_transaction_notifications<Block, ExPool>(
Block: BlockT,
ExPool: MaintainedTransactionPool<Block = Block, Hash = <Block as BlockT>::Hash>,
{
const TELEMETRY_INTERVAL: Duration = Duration::from_secs(1);

// transaction notifications
transaction_pool
.import_notification_stream()
.for_each(move |hash| {
tx_handler_controller.propagate_transaction(hash);
let status = transaction_pool.status();
telemetry!(
telemetry;
SUBSTRATE_INFO;
"txpool.import";
"ready" => status.ready,
"future" => status.future,
);
ready(())
})
.await;
let mut notifications = transaction_pool.import_notification_stream().fuse();
let mut timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();
let mut tx_imported = false;

loop {
select! {
notification = notifications.next() => {
let Some(hash) = notification else { return };

tx_handler_controller.propagate_transaction(hash);

tx_imported = true;
},
_ = timer => {
timer = futures_timer::Delay::new(TELEMETRY_INTERVAL).fuse();

if !tx_imported {
continue;
}

tx_imported = false;
let status = transaction_pool.status();

telemetry!(
telemetry;
SUBSTRATE_INFO;
"txpool.import";
"ready" => status.ready,
"future" => status.future,
);
}
}
}
}

/// Initialize telemetry with provided configuration and return telemetry handle
Expand Down

0 comments on commit 1e3b8e1

Please sign in to comment.