Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport/rpc-v2: Implement archive_unstable_storageDiff (stable2412) #6709

Open
wants to merge 1 commit into
base: stable2412
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions prdoc/pr_5997.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: Implement archive_unstable_storageDiff method

doc:
- audience: Node Dev
description: |
This PR implements the `archive_unstable_storageDiff` rpc-v2 method.
Developers can use this method to fetch the storage differences
between two blocks. This is useful for oracles and archive nodes.
For more details see: https://github.com/paritytech/json-rpc-interface-spec/blob/main/src/api/archive_unstable_storageDiff.md.

crates:
- name: sc-rpc-spec-v2
bump: major
- name: sc-service
bump: patch
1 change: 1 addition & 0 deletions substrate/client/rpc-spec-v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ log = { workspace = true, default-features = true }
futures-util = { workspace = true }
rand = { workspace = true, default-features = true }
schnellru = { workspace = true }
itertools = { workspace = true }

[dev-dependencies]
jsonrpsee = { workspace = true, features = ["server", "ws-client"] }
Expand Down
22 changes: 21 additions & 1 deletion substrate/client/rpc-spec-v2/src/archive/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
//! API trait of the archive methods.

use crate::{
common::events::{ArchiveStorageResult, PaginatedStorageQuery},
common::events::{
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageResult,
PaginatedStorageQuery,
},
MethodResult,
};
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
Expand Down Expand Up @@ -104,4 +107,21 @@ pub trait ArchiveApi<Hash> {
items: Vec<PaginatedStorageQuery<String>>,
child_trie: Option<String>,
) -> RpcResult<ArchiveStorageResult>;

/// Returns the storage difference between two blocks.
///
/// # Unstable
///
/// This method is unstable and can change in minor or patch releases.
#[subscription(
name = "archive_unstable_storageDiff" => "archive_unstable_storageDiffEvent",
unsubscribe = "archive_unstable_storageDiff_stopStorageDiff",
item = ArchiveStorageDiffEvent,
)]
fn archive_unstable_storage_diff(
&self,
hash: Hash,
items: Vec<ArchiveStorageDiffItem<String>>,
previous_hash: Option<Hash>,
);
}
89 changes: 84 additions & 5 deletions substrate/client/rpc-spec-v2/src/archive/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,29 @@
//! API implementation for `archive`.

use crate::{
archive::{error::Error as ArchiveError, ArchiveApiServer},
common::events::{ArchiveStorageResult, PaginatedStorageQuery},
hex_string, MethodResult,
archive::{
archive_storage::{ArchiveStorage, ArchiveStorageDiff},
error::Error as ArchiveError,
ArchiveApiServer,
},
common::events::{
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageResult,
PaginatedStorageQuery,
},
hex_string, MethodResult, SubscriptionTaskExecutor,
};

use codec::Encode;
use jsonrpsee::core::{async_trait, RpcResult};
use futures::FutureExt;
use jsonrpsee::{
core::{async_trait, RpcResult},
PendingSubscriptionSink,
};
use sc_client_api::{
Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey,
StorageProvider,
};
use sc_rpc::utils::Subscription;
use sp_api::{CallApiAt, CallContext};
use sp_blockchain::{
Backend as BlockChainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata,
Expand All @@ -41,7 +53,9 @@ use sp_runtime::{
};
use std::{collections::HashSet, marker::PhantomData, sync::Arc};

use super::archive_storage::ArchiveStorage;
use tokio::sync::mpsc;

pub(crate) const LOG_TARGET: &str = "rpc-spec-v2::archive";

/// The configuration of [`Archive`].
pub struct ArchiveConfig {
Expand All @@ -64,6 +78,12 @@ const MAX_DESCENDANT_RESPONSES: usize = 5;
/// `MAX_DESCENDANT_RESPONSES`.
const MAX_QUERIED_ITEMS: usize = 8;

/// The buffer capacity for each storage query.
///
/// This is small because the underlying JSON-RPC server has
/// its down buffer capacity per connection as well.
const STORAGE_QUERY_BUF: usize = 16;

impl Default for ArchiveConfig {
fn default() -> Self {
Self {
Expand All @@ -79,6 +99,8 @@ pub struct Archive<BE: Backend<Block>, Block: BlockT, Client> {
client: Arc<Client>,
/// Backend of the chain.
backend: Arc<BE>,
/// Executor to spawn subscriptions.
executor: SubscriptionTaskExecutor,
/// The hexadecimal encoded hash of the genesis block.
genesis_hash: String,
/// The maximum number of items the `archive_storage` can return for a descendant query before
Expand All @@ -96,12 +118,14 @@ impl<BE: Backend<Block>, Block: BlockT, Client> Archive<BE, Block, Client> {
client: Arc<Client>,
backend: Arc<BE>,
genesis_hash: GenesisHash,
executor: SubscriptionTaskExecutor,
config: ArchiveConfig,
) -> Self {
let genesis_hash = hex_string(&genesis_hash.as_ref());
Self {
client,
backend,
executor,
genesis_hash,
storage_max_descendant_responses: config.max_descendant_responses,
storage_max_queried_items: config.max_queried_items,
Expand Down Expand Up @@ -278,4 +302,59 @@ where

Ok(storage_client.handle_query(hash, items, child_trie))
}

fn archive_unstable_storage_diff(
&self,
pending: PendingSubscriptionSink,
hash: Block::Hash,
items: Vec<ArchiveStorageDiffItem<String>>,
previous_hash: Option<Block::Hash>,
) {
let storage_client = ArchiveStorageDiff::new(self.client.clone());
let client = self.client.clone();

log::trace!(target: LOG_TARGET, "Storage diff subscription started");

let fut = async move {
let Ok(mut sink) = pending.accept().await.map(Subscription::from) else { return };

let previous_hash = if let Some(previous_hash) = previous_hash {
previous_hash
} else {
let Ok(Some(current_header)) = client.header(hash) else {
let message = format!("Block header is not present: {hash}");
let _ = sink.send(&ArchiveStorageDiffEvent::err(message)).await;
return
};
*current_header.parent_hash()
};

let (tx, mut rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF);
let storage_fut =
storage_client.handle_trie_queries(hash, items, previous_hash, tx.clone());

// We don't care about the return value of this join:
// - process_events might encounter an error (if the client disconnected)
// - storage_fut might encounter an error while processing a trie queries and
// the error is propagated via the sink.
let _ = futures::future::join(storage_fut, process_events(&mut rx, &mut sink)).await;
};

self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
}
}

/// Sends all the events to the sink.
async fn process_events(rx: &mut mpsc::Receiver<ArchiveStorageDiffEvent>, sink: &mut Subscription) {
while let Some(event) = rx.recv().await {
if event.is_done() {
log::debug!(target: LOG_TARGET, "Finished processing partial trie query");
} else if event.is_err() {
log::debug!(target: LOG_TARGET, "Error encountered while processing partial trie query");
}

if sink.send(&event).await.is_err() {
return
}
}
}
Loading
Loading