diff --git a/Cargo.lock b/Cargo.lock index 0f63559f7..eb221b766 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5785,6 +5785,8 @@ dependencies = [ "pallet-timestamp", "parity-scale-codec", "polkadot-ckb-merkle-mountain-range", + "prost 0.13.3", + "prost-build 0.13.3", "rand 0.8.5", "scale-info", "serde", @@ -7060,7 +7062,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.11.9", +] + +[[package]] +name = "prost" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b0487d90e047de87f984913713b85c601c05609aad5b0df4b4573fbf69aa13f" +dependencies = [ + "bytes", + "prost-derive 0.13.3", ] [[package]] @@ -7077,14 +7089,35 @@ dependencies = [ "multimap", "petgraph", "prettyplease 0.1.25", - "prost", - "prost-types", + "prost 0.11.9", + "prost-types 0.11.9", "regex", "syn 1.0.109", "tempfile", "which", ] +[[package]] +name = "prost-build" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c1318b19085f08681016926435853bbf7858f9c082d0999b80550ff5d9abe15" +dependencies = [ + "bytes", + "heck 0.5.0", + "itertools 0.13.0", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease 0.2.25", + "prost 0.13.3", + "prost-types 0.13.3", + "regex", + "syn 2.0.85", + "tempfile", +] + [[package]] name = "prost-derive" version = "0.11.9" @@ -7098,13 +7131,35 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "prost-derive" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" +dependencies = [ + "anyhow", + "itertools 0.13.0", + "proc-macro2", + "quote", + "syn 2.0.85", +] + [[package]] name = "prost-types" version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" dependencies = [ - "prost", + "prost 0.11.9", +] + +[[package]] +name = "prost-types" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4759aa0d3a6232fb8dbdb97b61de2c20047c68aca932c7ed76da9d788508d670" +dependencies = [ + "prost 0.13.3", ] [[package]] @@ -7735,8 +7790,8 @@ dependencies = [ "log", "multihash", "parity-scale-codec", - "prost", - "prost-build", + "prost 0.11.9", + "prost-build 0.11.9", "rand 0.8.5", "sc-client-api", "sc-network", @@ -8258,8 +8313,8 @@ dependencies = [ "futures", "libp2p-identity", "log", - "prost", - "prost-build", + "prost 0.11.9", + "prost-build 0.11.9", "sc-client-api", "sc-network", "sp-blockchain", @@ -8278,7 +8333,7 @@ dependencies = [ "futures", "libp2p-identity", "parity-scale-codec", - "prost-build", + "prost-build 0.11.9", "sc-consensus", "sp-consensus", "sp-consensus-grandpa", @@ -8315,8 +8370,8 @@ dependencies = [ "libp2p-identity", "log", "parity-scale-codec", - "prost", - "prost-build", + "prost 0.11.9", + "prost-build 0.11.9", "sc-client-api", "sc-network", "sp-blockchain", @@ -8340,8 +8395,8 @@ dependencies = [ "log", "mockall", "parity-scale-codec", - "prost", - "prost-build", + "prost 0.11.9", + "prost-build 0.11.9", "sc-client-api", "sc-consensus", "sc-network", diff --git a/pallets/ddc-verification/Cargo.toml b/pallets/ddc-verification/Cargo.toml index 909237ec7..f9202fc8f 100644 --- a/pallets/ddc-verification/Cargo.toml +++ b/pallets/ddc-verification/Cargo.toml @@ -24,6 +24,7 @@ hex = { workspace = true } itertools = { workspace = true } log = { workspace = true } polkadot-ckb-merkle-mountain-range = { workspace = true } +prost = { version = "0.13", default-features = false, features = ["prost-derive"] } rand = { workspace = true, features = ["small_rng", "alloc"], default-features = false } scale-info = { workspace = true } serde = { workspace = true } @@ -44,6 +45,9 @@ pallet-timestamp = { workspace = true } sp-core = { workspace = true, default-features = true } sp-keystore = { workspace = true } +[build-dependencies] +prost-build = "0.13.3" + [features] default = ["std"] std = [ diff --git a/pallets/ddc-verification/build.rs b/pallets/ddc-verification/build.rs new file mode 100644 index 000000000..ba49aa58b --- /dev/null +++ b/pallets/ddc-verification/build.rs @@ -0,0 +1,8 @@ +use std::io::Result; + +fn main() -> Result<()> { + let mut prost_build = prost_build::Config::new(); + prost_build.protoc_arg("--experimental_allow_proto3_optional"); + prost_build.compile_protos(&["src/protos/activity.proto"], &["src/"])?; + Ok(()) +} diff --git a/pallets/ddc-verification/src/aggregator_client.rs b/pallets/ddc-verification/src/aggregator_client.rs new file mode 100644 index 000000000..3acd0dc38 --- /dev/null +++ b/pallets/ddc-verification/src/aggregator_client.rs @@ -0,0 +1,215 @@ +#![allow(dead_code)] + +use ddc_primitives::{BucketId, DdcEra}; +use prost::Message; +use sp_io::offchain::timestamp; +use sp_runtime::offchain::{http, Duration}; + +use super::*; + +pub struct AggregatorClient<'a> { + pub base_url: &'a str, + timeout: Duration, + retries: u32, +} + +impl<'a> AggregatorClient<'a> { + pub fn new(base_url: &'a str, timeout: Duration, retries: u32) -> Self { + Self { base_url, timeout, retries } + } + + pub fn buckets_aggregates( + &self, + era_id: DdcEra, + limit: Option, + prev_token: Option, + ) -> Result, http::Error> { + let mut url = format!("{}/activity/buckets?eraId={}", self.base_url, era_id); + if let Some(limit) = limit { + url = format!("{}&limit={}", url, limit); + } + if let Some(prev_token) = prev_token { + url = format!("{}&prevToken={}", url, prev_token); + } + let response = self.get(&url, Accept::Any)?; + let body = response.body().collect::>(); + let json_response = serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?; + + Ok(json_response) + } + + pub fn nodes_aggregates( + &self, + era_id: DdcEra, + limit: Option, + prev_token: Option, // node_id hex string + ) -> Result, http::Error> { + let mut url = format!("{}/activity/nodes?eraId={}", self.base_url, era_id); + if let Some(limit) = limit { + url = format!("{}&limit={}", url, limit); + } + if let Some(prev_token) = prev_token { + url = format!("{}&prevToken={}", url, prev_token); + } + let response = self.get(&url, Accept::Any)?; + let body = response.body().collect::>(); + let json_response = serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?; + + Ok(json_response) + } + + pub fn challenge_bucket_sub_aggregate( + &self, + era_id: DdcEra, + bucket_id: BucketId, + node_id: &str, + merkle_tree_node_id: Vec, + ) -> Result { + let url = format!( + "{}/activity/buckets/{}/challenge?eraId={}&nodeId={}&merkleTreeNodeId={}", + self.base_url, + bucket_id, + era_id, + node_id, + Self::merkle_tree_node_id_param(merkle_tree_node_id.as_slice()), + ); + let response = self.get(&url, Accept::Protobuf)?; + let body = response.body().collect::>(); + let proto_response = + proto::ChallengeResponse::decode(body.as_slice()).map_err(|_| http::Error::Unknown)?; + + Ok(proto_response) + } + + pub fn challenge_node_aggregate( + &self, + era_id: DdcEra, + node_id: &str, + merkle_tree_node_id: Vec, + ) -> Result { + let url = format!( + "{}/activity/nodes/{}/challenge?eraId={}&merkleTreeNodeId={}", + self.base_url, + node_id, + era_id, + Self::merkle_tree_node_id_param(merkle_tree_node_id.as_slice()), + ); + let response = self.get(&url, Accept::Protobuf)?; + let body = response.body().collect::>(); + let proto_response = + proto::ChallengeResponse::decode(body.as_slice()).map_err(|_| http::Error::Unknown)?; + + Ok(proto_response) + } + + pub fn eras(&self) -> Result, http::Error> { + let url = format!("{}/activity/eras", self.base_url); + let response = self.get(&url, Accept::Any)?; + let body = response.body().collect::>(); + let json_response = serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?; + + Ok(json_response) + } + + pub fn traverse_bucket_sub_aggregate( + &self, + era_id: DdcEra, + bucket_id: BucketId, + node_id: &str, + merkle_tree_node_id: u32, + levels: u16, + ) -> Result { + let url = format!( + "{}/activity/buckets/{}/traverse?eraId={}&nodeId={}&merkleTreeNodeId={}&levels={}", + self.base_url, bucket_id, era_id, node_id, merkle_tree_node_id, levels, + ); + + let response = self.get(&url, Accept::Any)?; + let body = response.body().collect::>(); + let json_response = serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?; + + Ok(json_response) + } + + pub fn traverse_node_aggregate( + &self, + era_id: DdcEra, + node_id: &str, + merkle_tree_node_id: u32, + levels: u16, + ) -> Result { + let url = format!( + "{}/activity/nodes/{}/traverse?eraId={}&merkleTreeNodeId={}&levels={}", + self.base_url, node_id, era_id, merkle_tree_node_id, levels, + ); + + let response = self.get(&url, Accept::Any)?; + let body = response.body().collect::>(); + let json_response = serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?; + + Ok(json_response) + } + + fn merkle_tree_node_id_param(merkle_tree_node_id: &[u32]) -> String { + merkle_tree_node_id + .iter() + .map(|x| format!("{}", x.clone())) + .collect::>() + .join(",") + } + + fn get(&self, url: &str, accept: Accept) -> Result { + let mut maybe_response = None; + + let deadline = timestamp().add(self.timeout); + let mut error = None; + + for _ in 0..self.retries { + let mut request = http::Request::get(url).deadline(deadline); + request = match accept { + Accept::Any => request, + Accept::Protobuf => request.add_header("Accept", "application/protobuf"), + }; + + let pending = match request.send() { + Ok(p) => p, + Err(_) => { + error = Some(http::Error::IoError); + continue; + }, + }; + + match pending.try_wait(deadline) { + Ok(Ok(r)) => { + maybe_response = Some(r); + error = None; + break; + }, + Ok(Err(_)) | Err(_) => { + error = Some(http::Error::DeadlineReached); + continue; + }, + } + } + + if let Some(e) = error { + return Err(e); + } + + let response = match maybe_response { + Some(r) => r, + None => return Err(http::Error::Unknown), + }; + + if response.code != 200 { + return Err(http::Error::Unknown); + } + + Ok(response) + } +} + +enum Accept { + Any, + Protobuf, +} diff --git a/pallets/ddc-verification/src/lib.rs b/pallets/ddc-verification/src/lib.rs index 15fbccde9..f1d3adcc1 100644 --- a/pallets/ddc-verification/src/lib.rs +++ b/pallets/ddc-verification/src/lib.rs @@ -37,8 +37,7 @@ use scale_info::prelude::{format, string::String}; use serde::{Deserialize, Serialize}; use sp_application_crypto::RuntimeAppPublic; use sp_runtime::{ - offchain as rt_offchain, - offchain::{http, StorageKind}, + offchain::{http, Duration, StorageKind}, traits::Hash, Percent, }; @@ -61,6 +60,14 @@ mod tests; pub mod migrations; +mod aggregator_client; + +pub mod proto { + include!(concat!(env!("OUT_DIR"), "/activity.rs")); +} + +mod signature; + #[frame_support::pallet] pub mod pallet { @@ -75,9 +82,11 @@ pub mod pallet { const STORAGE_VERSION: frame_support::traits::StorageVersion = frame_support::traits::StorageVersion::new(1); - const SUCCESS_CODE: u16 = 200; + const _SUCCESS_CODE: u16 = 200; const _BUF_SIZE: usize = 128; const RESPONSE_TIMEOUT: u64 = 20000; + pub const BUCKETS_AGGREGATES_FETCH_BATCH_SIZE: usize = 100; + pub const NODES_AGGREGATES_FETCH_BATCH_SIZE: usize = 10; #[pallet::pallet] #[pallet::storage_version(STORAGE_VERSION)] @@ -684,7 +693,7 @@ pub mod pallet { Debug, Serialize, Deserialize, Clone, Hash, Ord, PartialOrd, PartialEq, Eq, Encode, Decode, )] pub(crate) struct Proof { - pub merkle_tree_node_id: u64, + pub merkle_tree_node_id: u32, pub usage: Usage, pub path: Vec, //todo! add base64 deserialization pub leafs: Vec, @@ -768,7 +777,7 @@ pub mod pallet { Debug, Serialize, Deserialize, Clone, Hash, Ord, PartialOrd, PartialEq, Eq, Encode, Decode, )] pub(crate) struct MerkleTreeNodeResponse { - merkle_tree_node_id: u64, + merkle_tree_node_id: u32, hash: String, stored_bytes: i64, transferred_bytes: u64, @@ -1522,8 +1531,12 @@ pub mod pallet { aggregators_quorum, ); - let total_buckets_usage = - Self::get_total_usage(cluster_id, era_activity.id, buckets_sub_aggregates_groups)?; + let total_buckets_usage = Self::get_total_usage( + cluster_id, + era_activity.id, + buckets_sub_aggregates_groups, + true, + )?; let customer_activity_hashes: Vec = total_buckets_usage.clone().into_iter().map(|c| c.hash::()).collect(); @@ -1582,7 +1595,7 @@ pub mod pallet { ); let total_nodes_usage = - Self::get_total_usage(cluster_id, era_activity.id, nodes_aggregates_groups)?; + Self::get_total_usage(cluster_id, era_activity.id, nodes_aggregates_groups, true)?; let node_activity_hashes: Vec = total_nodes_usage.clone().into_iter().map(|c| c.hash::()).collect(); @@ -1654,6 +1667,7 @@ pub mod pallet { cluster_id: &ClusterId, era_id: DdcEra, consistency_groups: ConsistencyGroups, + should_challenge: bool, ) -> Result, Vec> { let mut total_usage = vec![]; let mut total_usage_keys = vec![]; @@ -1687,6 +1701,7 @@ pub mod pallet { era_id, consistency_groups, &mut total_usage_keys, + should_challenge, )?; if !verified_usage.is_empty() { @@ -1701,6 +1716,7 @@ pub mod pallet { _era_id: DdcEra, consistency_groups: ConsistencyGroups, accepted_keys: &mut Vec, + should_challenge: bool, ) -> Result, Vec> { let redundancy_factor = T::DAC_REDUNDANCY_FACTOR; let mut verified_usage: Vec = vec![]; @@ -1747,10 +1763,17 @@ pub mod pallet { defective_aggregate.hash::() ); - let is_passed = true; + let mut is_passed = true; // todo: run an intensive challenge for deviating aggregate // let is_passed = Self::_challenge_aggregate(_cluster_id, _era_id, // &defective_aggregate)?; + if should_challenge { + is_passed = Self::challenge_aggregate_proto( + _cluster_id, + _era_id, + &defective_aggregate, + )?; + } if is_passed { // we assume all aggregates are valid at the moment, so we just take the // aggregate to payouts stage @@ -1775,7 +1798,7 @@ pub mod pallet { ); let aggregate_key = aggregate.get_key(); - let merkle_node_ids = Self::_find_random_merkle_node_ids( + let merkle_node_ids = Self::find_random_merkle_node_ids( number_of_identifiers.into(), aggregate.get_number_of_leaves(), aggregate_key.clone(), @@ -1817,10 +1840,10 @@ pub mod pallet { calculated_merkle_root ); - let traverse_response = Self::_fetch_traverse_response( + let root_merkle_node = Self::_fetch_traverse_response( era_id, aggregate_key.clone(), - vec![1], + 1, 1, &aggregator.node_params, ) @@ -1833,40 +1856,88 @@ pub mod pallet { }] })?; - if let Some(root_merkle_node) = traverse_response.first() { - let mut merkle_root_buf = [0u8; _BUF_SIZE]; - let bytes = - Base64::decode(root_merkle_node.hash.clone(), &mut merkle_root_buf).unwrap(); // todo! remove unwrap - let traversed_merkle_root = ActivityHash::from(sp_core::H256::from_slice(bytes)); + let mut merkle_root_buf = [0u8; _BUF_SIZE]; + let bytes = + Base64::decode(root_merkle_node.hash.clone(), &mut merkle_root_buf).unwrap(); // todo! remove unwrap + let traversed_merkle_root = ActivityHash::from(sp_core::H256::from_slice(bytes)); + + log::info!( + "🚀 Fetched merkle root for aggregate key: {:?} traversed_merkle_root: {:?}", + aggregate_key, + traversed_merkle_root + ); + let is_matched = if calculated_merkle_root == traversed_merkle_root { log::info!( - "🚀 Fetched merkle root for aggregate key: {:?} traversed_merkle_root: {:?}", + "🚀👍 The aggregate with hash {:?} and key {:?} has passed the challenge.", + aggregate.hash::(), aggregate_key, - traversed_merkle_root ); - let is_matched = if calculated_merkle_root == traversed_merkle_root { - log::info!( - "🚀👍 The aggregate with hash {:?} and key {:?} has passed the challenge.", - aggregate.hash::(), - aggregate_key, - ); + true + } else { + log::info!( + "🚀👎 The aggregate with hash {:?} and key {:?} has not passed the challenge.", + aggregate.hash::(), + aggregate_key, + ); - true - } else { - log::info!( - "🚀👎 The aggregate with hash {:?} and key {:?} has not passed the challenge.", - aggregate.hash::(), - aggregate_key, - ); + false + }; - false - }; + Ok(is_matched) + } - Ok(is_matched) + pub(crate) fn challenge_aggregate_proto( + cluster_id: &ClusterId, + era_id: DdcEra, + aggregate: &A, + ) -> Result> { + let number_of_identifiers = T::MAX_MERKLE_NODE_IDENTIFIER; + + log::info!( + "🚀 Challenge process starts when bucket sub aggregates are not in consensus!" + ); + + let aggregate_key = aggregate.get_key(); + let merkle_node_ids = Self::find_random_merkle_node_ids( + number_of_identifiers.into(), + aggregate.get_number_of_leaves(), + aggregate_key.clone(), + ); + + log::info!( + "🚀 Merkle Node Identifiers for aggregate key: {:?} identifiers: {:?}", + aggregate_key, + merkle_node_ids + ); + + let aggregator = aggregate.get_aggregator(); + + let challenge_response = Self::_fetch_challenge_responses_proto( + cluster_id, + era_id, + aggregate_key.clone(), + merkle_node_ids.iter().map(|id| *id as u32).collect(), + aggregator.clone(), + ) + .map_err(|err| vec![err])?; + + log::info!( + "🚀 Fetched challenge response for aggregate key: {:?}, challenge_response: {:?}", + aggregate_key, + challenge_response + ); + + let are_signatures_valid = signature::Verify::verify(&challenge_response); + + if are_signatures_valid { + log::info!("👍 Valid challenge signatures for aggregate key: {:?}", aggregate_key,); } else { - Ok(false) + log::info!("👎 Invalid challenge signatures at aggregate key: {:?}", aggregate_key,); } + + Ok(are_signatures_valid) } pub(crate) fn _get_hash_from_merkle_path( @@ -1935,7 +2006,7 @@ pub mod pallet { Ok(resulting_hash) } - pub(crate) fn _find_random_merkle_node_ids( + pub(crate) fn find_random_merkle_node_ids( number_of_identifiers: usize, number_of_leaves: u64, aggregate_key: AggregateKey, @@ -3161,6 +3232,30 @@ pub mod pallet { Ok(response) } + /// Challenge node aggregate or bucket sub-aggregate. + pub(crate) fn _fetch_challenge_responses_proto( + cluster_id: &ClusterId, + era_id: DdcEra, + aggregate_key: AggregateKey, + merkle_tree_node_id: Vec, + aggregator: AggregatorInfo, + ) -> Result { + let response = Self::_fetch_challenge_response_proto( + era_id, + aggregate_key.clone(), + merkle_tree_node_id.clone(), + &aggregator.node_params, + ) + .map_err(|_| OCWError::ChallengeResponseRetrievalError { + cluster_id: *cluster_id, + era_id, + aggregate_key, + aggregator: aggregator.node_pub_key, + })?; + + Ok(response) + } + /// Fetch challenge response. /// /// Parameters: @@ -3201,7 +3296,7 @@ pub mod pallet { let response = pending.try_wait(timeout).map_err(|_| http::Error::DeadlineReached)??; - if response.code != SUCCESS_CODE { + if response.code != _SUCCESS_CODE { return Err(http::Error::Unknown); } @@ -3209,6 +3304,34 @@ pub mod pallet { serde_json::from_slice(&body).map_err(|_| http::Error::Unknown) } + /// Fetch protobuf challenge response. + pub(crate) fn _fetch_challenge_response_proto( + era_id: DdcEra, + aggregate_key: AggregateKey, + merkle_tree_node_id: Vec, + node_params: &StorageNodeParams, + ) -> Result { + let host = str::from_utf8(&node_params.host).map_err(|_| http::Error::Unknown)?; + let base_url = format!("http://{}:{}", host, node_params.http_port); + let client = aggregator_client::AggregatorClient::new( + &base_url, + Duration::from_millis(RESPONSE_TIMEOUT), + 3, + ); + + match aggregate_key { + AggregateKey::BucketSubAggregateKey(bucket_id, node_id) => client + .challenge_bucket_sub_aggregate( + era_id, + bucket_id, + &node_id, + merkle_tree_node_id, + ), + AggregateKey::NodeAggregateKey(node_id) => + client.challenge_node_aggregate(era_id, &node_id, merkle_tree_node_id), + } + } + /// Fetch traverse response. /// /// Parameters: @@ -3220,43 +3343,32 @@ pub mod pallet { pub(crate) fn _fetch_traverse_response( era_id: DdcEra, aggregate_key: AggregateKey, - merkle_node_identifiers: Vec, + merkle_tree_node_id: u32, levels: u16, node_params: &StorageNodeParams, - ) -> Result, http::Error> { - let scheme = "http"; + ) -> Result { let host = str::from_utf8(&node_params.host).map_err(|_| http::Error::Unknown)?; + let base_url = format!("http://{}:{}", host, node_params.http_port); + let client = aggregator_client::AggregatorClient::new( + &base_url, + Duration::from_millis(RESPONSE_TIMEOUT), + 3, + ); - let ids = merkle_node_identifiers - .iter() - .map(|x| format!("{}", x.clone())) - .collect::>() - .join(","); - - let url = match aggregate_key { - AggregateKey::NodeAggregateKey(node_id) => format!( - "{}://{}:{}/activity/nodes/{}/traverse?eraId={}&merkleTreeNodeId={}&levels={}", - scheme, host, node_params.http_port, node_id, era_id, ids, levels - ), - AggregateKey::BucketSubAggregateKey(bucket_id, node_id) => format!( - "{}://{}:{}/activity/buckets/{}/traverse?eraId={}&nodeId={}&merkleTreeNodeId={}&levels={}", - scheme, host, node_params.http_port, bucket_id, era_id, node_id, ids, levels - ), - }; - - let request = http::Request::get(&url); - let timeout = sp_io::offchain::timestamp() - .add(sp_runtime::offchain::Duration::from_millis(RESPONSE_TIMEOUT)); - let pending = request.deadline(timeout).send().map_err(|_| http::Error::IoError)?; - - let response = - pending.try_wait(timeout).map_err(|_| http::Error::DeadlineReached)??; - if response.code != SUCCESS_CODE { - return Err(http::Error::Unknown); - } + let response = match aggregate_key { + AggregateKey::BucketSubAggregateKey(bucket_id, node_id) => client + .traverse_bucket_sub_aggregate( + era_id, + bucket_id, + &node_id, + merkle_tree_node_id, + levels, + ), + AggregateKey::NodeAggregateKey(node_id) => + client.traverse_node_aggregate(era_id, &node_id, merkle_tree_node_id, levels), + }?; - let body = response.body().collect::>(); - serde_json::from_slice(&body).map_err(|_| http::Error::Unknown) + Ok(response) } /// Fetch processed era. @@ -3267,26 +3379,17 @@ pub mod pallet { pub(crate) fn fetch_processed_eras( node_params: &StorageNodeParams, ) -> Result, http::Error> { - let scheme = "http"; let host = str::from_utf8(&node_params.host).map_err(|_| http::Error::Unknown)?; - let url = format!("{}://{}:{}/activity/eras", scheme, host, node_params.http_port); - let request = http::Request::get(&url); - let timeout = sp_io::offchain::timestamp() - .add(sp_runtime::offchain::Duration::from_millis(RESPONSE_TIMEOUT)); - let pending = request.deadline(timeout).send().map_err(|_| http::Error::IoError)?; - - let response = - pending.try_wait(timeout).map_err(|_| http::Error::DeadlineReached)??; - if response.code != SUCCESS_CODE { - return Err(http::Error::Unknown); - } + let base_url = format!("http://{}:{}", host, node_params.http_port); + let client = aggregator_client::AggregatorClient::new( + &base_url, + Duration::from_millis(RESPONSE_TIMEOUT), + 3, + ); - let body = response.body().collect::>(); - let res: Vec = - serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?; + let response = client.eras()?; - let processed_status = String::from("PROCESSED"); - Ok(res.into_iter().filter(|e| e.status == processed_status).collect::>()) + Ok(response.into_iter().filter(|e| e.status == "PROCESSED").collect::>()) } /// Fetch customer usage. /// @@ -3299,26 +3402,35 @@ pub mod pallet { era_id: DdcEra, node_params: &StorageNodeParams, ) -> Result, http::Error> { - let scheme = "http"; let host = str::from_utf8(&node_params.host).map_err(|_| http::Error::Unknown)?; - let url = format!( - "{}://{}:{}/activity/buckets?eraId={}", - scheme, host, node_params.http_port, era_id + let base_url = format!("http://{}:{}", host, node_params.http_port); + let client = aggregator_client::AggregatorClient::new( + &base_url, + Duration::from_millis(RESPONSE_TIMEOUT), + 3, ); - let request = http::Request::get(&url); - let timeout = sp_io::offchain::timestamp() - .add(sp_runtime::offchain::Duration::from_millis(RESPONSE_TIMEOUT)); - let pending = request.deadline(timeout).send().map_err(|_| http::Error::IoError)?; + let mut buckets_aggregates = Vec::new(); + let mut prev_token = None; - let response = - pending.try_wait(timeout).map_err(|_| http::Error::DeadlineReached)??; - if response.code != SUCCESS_CODE { - return Err(http::Error::Unknown); + loop { + let response = client.buckets_aggregates( + era_id, + Some(BUCKETS_AGGREGATES_FETCH_BATCH_SIZE as u32), + prev_token, + )?; + + let response_len = response.len(); + prev_token = response.last().map(|a| a.bucket_id); + + buckets_aggregates.extend(response); + + if response_len < BUCKETS_AGGREGATES_FETCH_BATCH_SIZE { + break; + } } - let body = response.body().collect::>(); - serde_json::from_slice(&body).map_err(|_| http::Error::Unknown) + Ok(buckets_aggregates) } /// Fetch node usage. @@ -3332,26 +3444,35 @@ pub mod pallet { era_id: DdcEra, node_params: &StorageNodeParams, ) -> Result, http::Error> { - let scheme = "http"; let host = str::from_utf8(&node_params.host).map_err(|_| http::Error::Unknown)?; - let url = format!( - "{}://{}:{}/activity/nodes?eraId={}", - scheme, host, node_params.http_port, era_id + let base_url = format!("http://{}:{}", host, node_params.http_port); + let client = aggregator_client::AggregatorClient::new( + &base_url, + Duration::from_millis(RESPONSE_TIMEOUT), + 3, ); - let request = http::Request::get(&url); - let timeout = sp_io::offchain::timestamp() - .add(rt_offchain::Duration::from_millis(RESPONSE_TIMEOUT)); - let pending = request.deadline(timeout).send().map_err(|_| http::Error::IoError)?; + let mut nodes_aggregates = Vec::new(); + let mut prev_token = None; - let response = - pending.try_wait(timeout).map_err(|_| http::Error::DeadlineReached)??; - if response.code != SUCCESS_CODE { - return Err(http::Error::Unknown); + loop { + let response = client.nodes_aggregates( + era_id, + Some(NODES_AGGREGATES_FETCH_BATCH_SIZE as u32), + prev_token, + )?; + + let response_len = response.len(); + prev_token = response.last().map(|a| a.node_id.clone()); + + nodes_aggregates.extend(response); + + if response_len < NODES_AGGREGATES_FETCH_BATCH_SIZE { + break; + } } - let body = response.body().collect::>(); - serde_json::from_slice(&body).map_err(|_| http::Error::Unknown) + Ok(nodes_aggregates) } /// Fetch DAC nodes of a cluster. diff --git a/pallets/ddc-verification/src/protos/activity.proto b/pallets/ddc-verification/src/protos/activity.proto new file mode 100644 index 000000000..4d1bc4993 --- /dev/null +++ b/pallets/ddc-verification/src/protos/activity.proto @@ -0,0 +1,154 @@ +// Acquired from Cerebellum-Network/ddc-storage-node at 4a3d710. +// https://github.com/Cerebellum-Network/ddc-storage-node/tree/4a3d710. +syntax = "proto3"; + +package activity; + +message ChallengeResponse { + repeated Proof proofs = 1; + + message Proof { + uint32 merkleTreeNodeId = 1; + Aggregate usage = 2; + repeated bytes path = 3; + + repeated Leaf leaves = 4; + + message Leaf { + // Repeated oneof is not supported. + // See also: https://github.com/protocolbuffers/protobuf/issues/2592. + oneof leafVariant { + Record record = 1; + RecordLink link = 2; + } + } + + message Record { + ActivityRecord record = 1; + int64 stored = 2; + uint64 delivered = 3; + } + + message RecordLink { + Link link = 1; + int64 stored = 2; + uint64 delivered = 3; + + message Link { + bytes nodeId = 1; + uint64 bucketId = 2; + bytes recordId = 3; + } + } + } +} + +message Aggregate { + int64 stored = 1; // can be negative in case amount of deleted data is more than newly stored + uint64 delivered = 2; + uint64 puts = 3; + uint64 gets = 4; +} + +message ActivityRecord { + bytes id = 1; + ActivityFulfillment upstream = 2; + repeated ActivityFulfillment downstream = 3; + uint64 timestamp = 4; + Signature signature = 5; + optional AuthToken authToken = 6; // set to authorize record in case if owner delegated access to bucket +} + +message ActivityRequest { + optional ActivityRequest parentRequest = 1; + string requestId = 2; + + enum RequestType { + REQUEST_TYPE_PUT = 0; + REQUEST_TYPE_GET = 1; + REQUEST_TYPE_DELETE = 2; + } + + enum ContentType { + CONTENT_TYPE_PIECE = 0; + CONTENT_TYPE_SEGMENT = 1; + CONTENT_TYPE_MERKLE_TREE = 2; + CONTENT_TYPE_METADATA = 3; + } + + RequestType requestType = 3; + ContentType contentType = 4; + uint64 bucketId = 5; // set only when content type is PIECE + bytes pieceCid = 6; + + uint64 offset = 7; // offset of data requested (set only when RecordType = GET) + uint64 size = 8; // size of content stored or delivered + + uint64 timestamp = 9; + Signature signature = 10; +} + +// we will get this on the server side streaming + +message ActivityAcknowledgment { + string requestId = 1; + uint64 bytesStoredOrDelivered = 2; + uint64 timestamp = 3; + Signature signature = 4; +} + +message ActivityResponse { + Status status = 1; + uint32 time = 2; // response time measured by client (start before the request sent and end after the response received) + bytes peerID = 3; + + enum Status { + UNKNOWN = 0; + OK = 1; + NOT_FOUND = 2; // server doesn't have requested resource (e.g. node could miss piece metadata) + INTERNAL = 3; // error in a server response + UNAVAILABLE = 4; // no response from server + ABORTED = 5; // request aborted by a client (e.g. storage node asked for piece metadata a node, but after some time had to initialise parallel request to other node and one of the requests can be aborted once metadata received) + } +} + +message ActivityFulfillment { + ActivityRequest request = 1; + optional ActivityAcknowledgment ack = 2; + optional ActivityResponse response = 3; +} + +message AuthToken { + Signature signature = 1; // signature signer is an issuer. issuer of first token should have an access on pallet level and subsequent tokens can skip 'issuer' and take 'subject' from previous token to verify signature + Payload payload = 2; +} + +message Payload { + optional AuthToken prev = 1; // prev token in trust chain (based on known use cases max depth can be limited to 3 or increase to 5 to support more potential use cases) + optional bytes subject = 2; // whom. every except last token should be non empty. next token should be signed by this subject + optional bool canDelegate = 3; // subject can be prohibited to delegate access to anyone else (next token should be last) + + optional uint64 bucketId = 4; // mentioned only once in trust chain (or even not mentioned at all if bucket owner decided to share access to all his buckets) + repeated Operation operations = 5; // each next token in trust chain should have less or equal privileges (e.g. token restricted to 'get' operation can't have 'put' in next token) + optional int64 expiresAt = 6; // each next token should expires earlier or at the same time as previous one (e.g. token can't have lower expiresAt than in next token) + optional bytes pieceCid = 7; // mentioned only once in trust chain (in DAG API nested pieces can be accessed by path) +} + +enum Operation { + UNKNOWN = 0; + PUT = 1; + GET = 2; + DELETE = 3; +} + +message Signature { + Algorithm algorithm = 1; + bytes signer = 2; + bytes value = 3; + + enum Algorithm { + ED_25519 = 0; + SR_25519 = 1; + } +} + diff --git a/pallets/ddc-verification/src/signature.rs b/pallets/ddc-verification/src/signature.rs new file mode 100644 index 000000000..74dce7a55 --- /dev/null +++ b/pallets/ddc-verification/src/signature.rs @@ -0,0 +1,201 @@ +use prost::Message; +use sp_core::ed25519::{Public, Signature}; +use sp_io::crypto::ed25519_verify; + +use super::*; + +pub trait Verify { + fn verify(&self) -> bool; +} + +impl Verify for proto::ActivityAcknowledgment { + fn verify(&self) -> bool { + verify_signature(self.clone()) + } +} + +impl Verify for proto::ActivityRecord { + fn verify(&self) -> bool { + if !verify_signature(self.clone()) { + return false; + } + + for downstream in &self.downstream { + if !downstream.verify() { + return false; + } + } + + if let Some(upstream) = &self.upstream { + if !upstream.verify() { + return false; + } + } + + true + } +} + +impl Verify for proto::ActivityRequest { + fn verify(&self) -> bool { + if !verify_signature(self.clone()) { + return false; + } + + // TODO(khssnv): parent requests are expected to have an invalid signature. + // if let Some(ref parent_request) = self.parent_request { + // if !parent_request.verify() { + // return false; + // } + // } + + true + } +} + +impl Verify for proto::ActivityFulfillment { + fn verify(&self) -> bool { + if let Some(request) = &self.request { + if !request.verify() { + return false; + } + } + + if let Some(ack) = &self.ack { + if !ack.verify() { + return false; + } + } + + true + } +} + +impl Verify for proto::challenge_response::proof::Record { + fn verify(&self) -> bool { + if let Some(record) = &self.record { + return record.verify(); + } + + true + } +} + +impl Verify for proto::ChallengeResponse { + fn verify(&self) -> bool { + for proof in self.proofs.iter() { + for leaf in proof.leaves.iter() { + if let Some(proto::challenge_response::proof::leaf::LeafVariant::Record(record)) = + &leaf.leaf_variant + { + if !record.verify() { + return false; + } + } + } + } + + true + } +} + +trait Signed { + fn get_signature(&self) -> Option<&proto::Signature>; + fn reset_signature(&mut self); +} + +/// Implements Signed trait for given types. +macro_rules! impl_signed { + (for $($t:ty),+) => { + $(impl Signed for $t { + fn get_signature(&self) -> Option<&proto::Signature> { + return self.signature.as_ref() + } + + fn reset_signature(&mut self) { + self.signature = None; + } + })* + } +} + +impl_signed!(for proto::ActivityAcknowledgment, proto::ActivityRecord, proto::ActivityRequest); + +fn verify_signature(mut signed: impl Clone + Message + Signed) -> bool { + let signature = match signed.get_signature() { + Some(s) => s.clone(), + None => return false, + }; + let sig = match Signature::try_from(signature.value.as_slice()) { + Ok(s) => s, + Err(_) => return false, + }; + + signed.reset_signature(); + let payload = signed.encode_to_vec(); + + let pub_key = match Public::try_from(signature.signer.as_slice()) { + Ok(p) => p, + Err(_) => return false, + }; + + ed25519_verify(&sig, payload.as_slice(), &pub_key) +} + +#[cfg(test)] +mod tests { + use sp_core::Pair; + + use super::*; + + #[test] + fn verify_signature_works() { + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct SignedProtoMsg { + #[prost(string, tag = "1")] + pub foo: ::prost::alloc::string::String, + #[prost(message, optional, tag = "2")] + pub signature: ::core::option::Option, + } + impl_signed!(for SignedProtoMsg); + + let none_signature_msg = + SignedProtoMsg { foo: "none_signature_msg".to_string(), signature: None }; + assert!(!verify_signature(none_signature_msg)); + + let mut invalid_signature_msg = + SignedProtoMsg { foo: "invalid_signature_msg".to_string(), signature: None }; + let invalid_signature_msg_signer = sp_core::ed25519::Pair::generate().0; + let invalid_signature_msg_signature = + invalid_signature_msg_signer.sign(invalid_signature_msg.encode_to_vec().as_slice()); + let mut invalid_signature_msg_signature_vec = invalid_signature_msg_signature.0.to_vec(); + invalid_signature_msg_signature_vec[0] += 1; + invalid_signature_msg.signature = Some(proto::Signature { + algorithm: proto::signature::Algorithm::Ed25519 as i32, + value: invalid_signature_msg_signature_vec, + signer: invalid_signature_msg_signer.public().0.to_vec(), + }); + assert!(!verify_signature(invalid_signature_msg)); + + let mut valid_signature_msg = + SignedProtoMsg { foo: "valid_signature_msg".to_string(), signature: None }; + let valid_signature_msg_signer = sp_core::ed25519::Pair::generate().0; + let valid_signature_msg_signature = + valid_signature_msg_signer.sign(valid_signature_msg.encode_to_vec().as_slice()); + valid_signature_msg.signature = Some(proto::Signature { + algorithm: proto::signature::Algorithm::Ed25519 as i32, + value: valid_signature_msg_signature.0.to_vec(), + signer: valid_signature_msg_signer.public().0.to_vec(), + }); + assert!(verify_signature(valid_signature_msg)); + } + + #[test] + fn verify_challenge_response_works() { + let challenge_response_serialized = + include_bytes!("./test_data/challenge_response.pb").as_slice(); + let challenge_response = proto::ChallengeResponse::decode(challenge_response_serialized) + .expect("protobuf fixture decoding failed, fix the test data"); + assert!(challenge_response.verify()); + } +} diff --git a/pallets/ddc-verification/src/test_data/challenge_response.pb b/pallets/ddc-verification/src/test_data/challenge_response.pb new file mode 100644 index 000000000..d43e443f6 Binary files /dev/null and b/pallets/ddc-verification/src/test_data/challenge_response.pb differ diff --git a/pallets/ddc-verification/src/tests.rs b/pallets/ddc-verification/src/tests.rs index cf8cdf9e9..add71a6f4 100644 --- a/pallets/ddc-verification/src/tests.rs +++ b/pallets/ddc-verification/src/tests.rs @@ -3,6 +3,7 @@ use ddc_primitives::{ StorageNodePubKey, DAC_VERIFICATION_KEY_TYPE, }; use frame_support::{assert_noop, assert_ok}; +use prost::Message; use sp_core::{ offchain::{ testing::{PendingRequest, TestOffchainExt, TestTransactionPoolExt}, @@ -12,7 +13,7 @@ use sp_core::{ }; use sp_io::TestExternalities; use sp_keystore::{testing::MemoryKeystore, Keystore, KeystoreExt}; -use sp_runtime::AccountId32; +use sp_runtime::{offchain::Duration, AccountId32}; use crate::{mock::*, Error, NodeAggregateResponse, *}; @@ -136,7 +137,13 @@ fn fetch_node_aggregates_works() { // Mock HTTP request and response let pending_request = PendingRequest { method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId={}", host, port, era_id), + uri: format!( + "http://{}:{}/activity/nodes?eraId={}&limit={}", + host, + port, + era_id, + pallet::NODES_AGGREGATES_FETCH_BATCH_SIZE + ), response: Some(nodes_activity_json.as_bytes().to_vec()), sent: true, ..Default::default() @@ -224,7 +231,13 @@ fn fetch_bucket_aggregates_works() { // Mock HTTP request and response let pending_request = PendingRequest { method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId={}", host, port, era_id), + uri: format!( + "http://{}:{}/activity/buckets?eraId={}&limit={}", + host, + port, + era_id, + pallet::BUCKETS_AGGREGATES_FETCH_BATCH_SIZE + ), response: Some(customers_activity_json.as_bytes().to_vec()), sent: true, ..Default::default() @@ -394,7 +407,7 @@ fn buckets_sub_aggregates_in_consensus_merged() { assert_eq!(groups.quorum.len(), 0); assert_eq!(groups.others.len(), 0); - let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups); + let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups, false); assert!(result.is_ok()); let usages = result.unwrap(); @@ -518,7 +531,7 @@ fn buckets_sub_aggregates_in_quorum_merged() { assert_eq!(groups.quorum.len(), 1); // 2 consistent aggregates merged into 1 in 'quorum' assert_eq!(groups.others.len(), 1); // 1 inconsistent aggregate goes to 'others' - let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups); + let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups, false); assert!(result.is_ok()); let usages = result.unwrap(); @@ -643,7 +656,7 @@ fn buckets_sub_aggregates_in_others_merged() { assert_eq!(groups.quorum.len(), 0); assert_eq!(groups.others.len(), 2); - let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups); + let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups, false); assert!(result.is_ok()); let usages = result.unwrap(); @@ -768,7 +781,7 @@ fn buckets_sub_aggregates_in_others_merged_2() { assert_eq!(groups.quorum.len(), 0); assert_eq!(groups.others.len(), 2); // 2 inconsistent aggregates - let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups); + let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups, false); assert!(result.is_ok()); let usages = result.unwrap(); @@ -877,7 +890,7 @@ fn nodes_aggregates_in_consensus_merged() { assert_eq!(groups.quorum.len(), 0); assert_eq!(groups.others.len(), 0); - let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups); + let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups, false); assert!(result.is_ok()); let usages = result.unwrap(); @@ -980,7 +993,7 @@ fn nodes_aggregates_in_quorum_merged() { assert_eq!(groups.quorum.len(), 1); // 2 consistent aggregates merged into 1 in 'quorum' assert_eq!(groups.others.len(), 1); // 1 inconsistent aggregate goes to 'others' - let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups); + let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups, false); assert!(result.is_ok()); let usages = result.unwrap(); @@ -1084,7 +1097,7 @@ fn nodes_aggregates_in_others_merged() { assert_eq!(groups.quorum.len(), 0); assert_eq!(groups.others.len(), 2); - let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups); + let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups, false); assert!(result.is_ok()); let usages = result.unwrap(); @@ -1188,7 +1201,7 @@ fn nodes_aggregates_in_others_merged_2() { assert_eq!(groups.quorum.len(), 0); assert_eq!(groups.others.len(), 3); // 3 inconsistent aggregates - let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups); + let result = DdcVerification::get_total_usage(&cluster_id, era_id, groups, false); assert!(result.is_ok()); let usages = result.unwrap(); @@ -1592,45 +1605,46 @@ fn bucket_sub_aggregates_are_fetched_and_grouped() { let host5 = "178.251.228.49"; let port = 8080; - let pending_request1 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId=476817", host1, port), - response: Some(br#"[{"bucket_id":90235,"stored_bytes":0,"transferred_bytes":38,"number_of_puts":0,"number_of_gets":1,"sub_aggregates":[{"NodeID":"0xb6186f80dce7190294665ab53860de2841383bb202c562bb8b81a624351fa318","stored_bytes":578,"transferred_bytes":578,"number_of_puts":2,"number_of_gets":0}]},{"bucket_id":90235,"stored_bytes":0,"transferred_bytes":38,"number_of_puts":0,"number_of_gets":1,"sub_aggregates":[{"NodeID":"0xb6186f80dce7190294665ab53860de2841383bb202c562bb8b81a624351fa319","stored_bytes":0,"transferred_bytes":505,"number_of_puts":0,"number_of_gets":1}]}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let pending_request1 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/buckets?eraId=476817&limit={}", host1, port, pallet::BUCKETS_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"bucket_id":90235,"stored_bytes":0,"transferred_bytes":38,"number_of_puts":0,"number_of_gets":1,"sub_aggregates":[{"NodeID":"0xb6186f80dce7190294665ab53860de2841383bb202c562bb8b81a624351fa318","stored_bytes":578,"transferred_bytes":578,"number_of_puts":2,"number_of_gets":0}]},{"bucket_id":90235,"stored_bytes":0,"transferred_bytes":38,"number_of_puts":0,"number_of_gets":1,"sub_aggregates":[{"NodeID":"0xb6186f80dce7190294665ab53860de2841383bb202c562bb8b81a624351fa319","stored_bytes":0,"transferred_bytes":505,"number_of_puts":0,"number_of_gets":1}]}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let pending_request2 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId=476817", host2, port), - response: Some(br#"[{"bucket_id":90235,"stored_bytes":0,"transferred_bytes":38,"number_of_puts":0,"number_of_gets":1,"sub_aggregates":[{"NodeID":"0xb6186f80dce7190294665ab53860de2841383bb202c562bb8b81a624351fa318","stored_bytes":578,"transferred_bytes":578,"number_of_puts":2,"number_of_gets":0}]},{"bucket_id":90235,"stored_bytes":0,"transferred_bytes":38,"number_of_puts":0,"number_of_gets":1,"sub_aggregates":[{"NodeID":"0xb6186f80dce7190294665ab53860de2841383bb202c562bb8b81a624351fa319","stored_bytes":0,"transferred_bytes":506,"number_of_puts":0,"number_of_gets":1}]}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let pending_request2 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/buckets?eraId=476817&limit={}", host2, port, pallet::BUCKETS_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"bucket_id":90235,"stored_bytes":0,"transferred_bytes":38,"number_of_puts":0,"number_of_gets":1,"sub_aggregates":[{"NodeID":"0xb6186f80dce7190294665ab53860de2841383bb202c562bb8b81a624351fa318","stored_bytes":578,"transferred_bytes":578,"number_of_puts":2,"number_of_gets":0}]},{"bucket_id":90235,"stored_bytes":0,"transferred_bytes":38,"number_of_puts":0,"number_of_gets":1,"sub_aggregates":[{"NodeID":"0xb6186f80dce7190294665ab53860de2841383bb202c562bb8b81a624351fa319","stored_bytes":0,"transferred_bytes":506,"number_of_puts":0,"number_of_gets":1}]}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let pending_request3 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId=476817", host3, port), - response: Some(br#"[{"bucket_id":90235,"stored_bytes":0,"transferred_bytes":38,"number_of_puts":0,"number_of_gets":1,"sub_aggregates":[{"NodeID":"0xb6186f80dce7190294665ab53860de2841383bb202c562bb8b81a624351fa318","stored_bytes":578,"transferred_bytes":578,"number_of_puts":2,"number_of_gets":0}]},{"bucket_id":90235,"stored_bytes":0,"transferred_bytes":38,"number_of_puts":0,"number_of_gets":1,"sub_aggregates":[{"NodeID":"0xb6186f80dce7190294665ab53860de2841383bb202c562bb8b81a624351fa319","stored_bytes":0,"transferred_bytes":505,"number_of_puts":0,"number_of_gets":1}]}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let pending_request3 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/buckets?eraId=476817&limit={}", host3, port, pallet::BUCKETS_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"bucket_id":90235,"stored_bytes":0,"transferred_bytes":38,"number_of_puts":0,"number_of_gets":1,"sub_aggregates":[{"NodeID":"0xb6186f80dce7190294665ab53860de2841383bb202c562bb8b81a624351fa318","stored_bytes":578,"transferred_bytes":578,"number_of_puts":2,"number_of_gets":0}]},{"bucket_id":90235,"stored_bytes":0,"transferred_bytes":38,"number_of_puts":0,"number_of_gets":1,"sub_aggregates":[{"NodeID":"0xb6186f80dce7190294665ab53860de2841383bb202c562bb8b81a624351fa319","stored_bytes":0,"transferred_bytes":505,"number_of_puts":0,"number_of_gets":1}]}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let pending_request4 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId=476817", host4, port), - response: Some(br#"[{"bucket_id":90235,"stored_bytes":0,"transferred_bytes":38,"number_of_puts":0,"number_of_gets":1,"sub_aggregates":[]}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let pending_request4 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/buckets?eraId=476817&limit={}", host4, port, pallet::BUCKETS_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"bucket_id":90235,"stored_bytes":0,"transferred_bytes":38,"number_of_puts":0,"number_of_gets":1,"sub_aggregates":[]}]"#.to_vec()), + sent: true, + ..Default::default() + }; + + let pending_request5 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/buckets?eraId=476817&limit={}", host5, port, pallet::BUCKETS_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"bucket_id":90235,"stored_bytes":0,"transferred_bytes":38,"number_of_puts":0,"number_of_gets":1,"sub_aggregates":[{"NodeID":"0xb6186f80dce7190294665ab53860de2841383bb202c562bb8b81a624351fa320","stored_bytes":578,"transferred_bytes":578,"number_of_puts":2,"number_of_gets":0}]}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let pending_request5 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId=476817", host5, port), - response: Some(br#"[{"bucket_id":90235,"stored_bytes":0,"transferred_bytes":38,"number_of_puts":0,"number_of_gets":1,"sub_aggregates":[{"NodeID":"0xb6186f80dce7190294665ab53860de2841383bb202c562bb8b81a624351fa320","stored_bytes":578,"transferred_bytes":578,"number_of_puts":2,"number_of_gets":0}]}]"#.to_vec()), - sent: true, - ..Default::default() - }; offchain_state.expect_request(pending_request1); offchain_state.expect_request(pending_request2); @@ -1856,45 +1870,45 @@ fn node_aggregates_are_fetched_and_grouped() { let host5 = "178.251.228.49"; let port = 8080; - let pending_request1 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId=476817", host1, port), - response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let pending_request1 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/nodes?eraId=476817&limit={}", host1, port, pallet::NODES_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let pending_request2 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId=476817", host2, port), - response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 48,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let pending_request2 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/nodes?eraId=476817&limit={}", host2, port, pallet::NODES_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 48,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let pending_request3 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId=476817", host3, port), - response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let pending_request3 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/nodes?eraId=476817&limit={}", host3, port, pallet::NODES_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let pending_request4 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId=476817", host4, port), - response: Some(br#"[{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let pending_request4 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/nodes?eraId=476817&limit={}", host4, port, pallet::NODES_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let pending_request5 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId=476817", host5, port), - response: Some(br#"[{"node_id": "0xfc28d5f5bb10212077a8654f62c4f8f0b5ab985fc322a51f5a3c75943b29194b","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let pending_request5 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/nodes?eraId=476817&limit={}", host5, port, pallet::NODES_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"node_id": "0xfc28d5f5bb10212077a8654f62c4f8f0b5ab985fc322a51f5a3c75943b29194b","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97}]"#.to_vec()), + sent: true, + ..Default::default() + }; offchain_state.expect_request(pending_request1); offchain_state.expect_request(pending_request2); @@ -2636,149 +2650,149 @@ fn test_single_ocw_pallet_integration() { }; - let node_pending_request1 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId=5738616", host1, port), - response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let node_pending_request1 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/nodes?eraId=5738616&limit={}", host1, port, pallet::NODES_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let node_pending_request2 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId=5738616", host2, port), - response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let node_pending_request2 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/nodes?eraId=5738616&limit={}", host2, port, pallet::NODES_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let node_pending_request3 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId=5738616", host3, port), - response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let node_pending_request3 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/nodes?eraId=5738616&limit={}", host3, port, pallet::NODES_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let node_pending_request4 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId=5738616", host4, port), - response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let node_pending_request4 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/nodes?eraId=5738616&limit={}", host4, port, pallet::NODES_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let node_pending_request5 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId=5738616", host5, port), - response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let node_pending_request5 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/nodes?eraId=5738616&limit={}", host5, port, pallet::NODES_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let node_pending_request6 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId=5738616", host6, port), - response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let node_pending_request6 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/nodes?eraId=5738616&limit={}", host6, port, pallet::NODES_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let node_pending_request7 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId=5738616", host7, port), - response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let node_pending_request7 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/nodes?eraId=5738616&limit={}", host7, port, pallet::NODES_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let node_pending_request8 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId=5738616", host8, port), - response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let node_pending_request8 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/nodes?eraId=5738616&limit={}", host8, port, pallet::NODES_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let node_pending_request9 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId=5738616", host9, port), - response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let node_pending_request9 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/nodes?eraId=5738616&limit={}", host9, port, pallet::NODES_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let bucket_pending_request1 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId=5738616", host1, port), - response: Some(br#"[{"bucket_id": 90235,"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let bucket_pending_request1 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/buckets?eraId=5738616&limit={}", host1, port, pallet::BUCKETS_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"bucket_id": 90235,"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let bucket_pending_request2 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId=5738616", host2, port), - response: Some(br#"[{"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"bucket_id": 90235,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let bucket_pending_request2 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/buckets?eraId=5738616&limit={}", host2, port, pallet::BUCKETS_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"bucket_id": 90235,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let bucket_pending_request3 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId=5738616", host3, port), - response: Some(br#"[{"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"bucket_id": 90235,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let bucket_pending_request3 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/buckets?eraId=5738616&limit={}", host3, port, pallet::BUCKETS_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"bucket_id": 90235,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let bucket_pending_request4 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId=5738616", host4, port), - response: Some(br#"[{"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"bucket_id": 90235,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let bucket_pending_request4 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/buckets?eraId=5738616&limit={}", host4, port, pallet::BUCKETS_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"bucket_id": 90235,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let bucket_pending_request5 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId=5738616", host5, port), - response: Some(br#"[{"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"bucket_id": 90235,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let bucket_pending_request5 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/buckets?eraId=5738616&limit={}", host5, port, pallet::BUCKETS_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"bucket_id": 90235,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let bucket_pending_request6 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId=5738616", host6, port), - response: Some(br#"[{"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"bucket_id": 90235,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let bucket_pending_request6 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/buckets?eraId=5738616&limit={}", host6, port, pallet::BUCKETS_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"bucket_id": 90235,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let bucket_pending_request7 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId=5738616", host7, port), - response: Some(br#"[{"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"bucket_id": 90235,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let bucket_pending_request7 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/buckets?eraId=5738616&limit={}", host7, port, pallet::BUCKETS_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"bucket_id": 90235,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let bucket_pending_request8 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId=5738616", host8, port), - response: Some(br#"[{"bucket_id": 90235,"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let bucket_pending_request8 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/buckets?eraId=5738616&limit={}", host8, port, pallet::BUCKETS_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"bucket_id": 90235,"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), + sent: true, + ..Default::default() + }; - let bucket_pending_request9 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId=5738616", host9, port), - response: Some(br#"[{"bucket_id": 90235,"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let bucket_pending_request9 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/buckets?eraId=5738616&limit={}", host9, port, pallet::BUCKETS_AGGREGATES_FETCH_BATCH_SIZE), + response: Some(br#"[{"bucket_id": 90235,"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), + sent: true, + ..Default::default() + }; offchain_state.expect_request(pending_request1); offchain_state.expect_request(pending_request2); @@ -2964,7 +2978,7 @@ fn test_find_random_merkle_node_ids() { let number_of_leaves = deffective_bucket_sub_aggregate.get_number_of_leaves(); - let ids = DdcVerification::_find_random_merkle_node_ids( + let ids = DdcVerification::find_random_merkle_node_ids( 3, number_of_leaves, deffective_bucket_sub_aggregate.get_key(), @@ -3022,13 +3036,13 @@ fn challenge_bucket_sub_aggregate_works() { ..Default::default() }; - let pending_request2 = PendingRequest { - method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets/123229/traverse?eraId=5757773&nodeId=0x1f50f1455f60f5774564233d321a116ca45ae3188b2200999445706d04839d72&merkleTreeNodeId=1&levels=1", host1, port), - response: Some(br#"[{"merkle_tree_node_id":2,"hash":"hkujtYgWP21CrXdRP1rhRPrYR2ooIYCnP5zwCERTePI=","stored_bytes":20913291,"transferred_bytes":20913291,"number_of_puts":61,"number_of_gets":3},{"merkle_tree_node_id":3,"hash":"ZgWwK2LgWkHpx5JlXZn/Rouq6uE9DhOnRH6EA1+QO6o=","stored_bytes":23778084,"transferred_bytes":23778084,"number_of_puts":46,"number_of_gets":2}]"#.to_vec()), - sent: true, - ..Default::default() - }; + let pending_request2 = PendingRequest { + method: "GET".to_string(), + uri: format!("http://{}:{}/activity/buckets/123229/traverse?eraId=5757773&nodeId=0x1f50f1455f60f5774564233d321a116ca45ae3188b2200999445706d04839d72&merkleTreeNodeId=1&levels=1", host1, port), + response: Some(br#"{"merkle_tree_node_id":2,"hash":"hkujtYgWP21CrXdRP1rhRPrYR2ooIYCnP5zwCERTePI=","stored_bytes":20913291,"transferred_bytes":20913291,"number_of_puts":61,"number_of_gets":3}"#.to_vec()), + sent: true, + ..Default::default() + }; offchain_state.expect_request(pending_request1); offchain_state.expect_request(pending_request2); @@ -3072,3 +3086,116 @@ fn challenge_bucket_sub_aggregate_works() { }); } + +use crate::aggregator_client::AggregatorClient; + +#[test] +fn aggregator_client_challenge_bucket_sub_aggregate_works() { + let mut ext = TestExternalities::default(); + let (offchain, offchain_state) = TestOffchainExt::new(); + + ext.register_extension(OffchainWorkerExt::new(offchain.clone())); + ext.register_extension(OffchainDbExt::new(Box::new(offchain))); + + ext.execute_with(|| { + let mut offchain_state = offchain_state.write(); + offchain_state.timestamp = Timestamp::from_unix_millis(0); + + let base_url = "http://example.com"; + let bucket_id = 1; + let era_id = 1; + let merkle_tree_node_id = "2,6"; + let node_id = "0x0ac7cb9c53594e9f538d9950c6bcf28f0c0c7b8385deea2ebe24062bc640e7be"; + + let expected_response = proto::ChallengeResponse { + proofs: vec![ + proto::challenge_response::Proof { + merkle_tree_node_id: 2, + usage: Some(proto::Aggregate { stored: 4, delivered: 3, puts: 2, gets: 1 }), + ..Default::default() + }, + proto::challenge_response::Proof { + merkle_tree_node_id: 6, + usage: Some(proto::Aggregate { stored: 8, delivered: 7, puts: 6, gets: 5 }), + ..Default::default() + }, + ], + }; + let mut expected_response_serialized = Vec::new(); + expected_response.encode(&mut expected_response_serialized).unwrap(); + + let expected = PendingRequest { + method: "GET".into(), + headers: vec![("Accept".into(), "application/protobuf".into())], + uri: format!( + "{}/activity/buckets/{}/challenge?eraId={}&nodeId={}&merkleTreeNodeId={}", + base_url, bucket_id, era_id, node_id, merkle_tree_node_id + ), + response: Some(expected_response_serialized), + sent: true, + ..Default::default() + }; + offchain_state.expect_request(expected); + drop(offchain_state); + + let client = AggregatorClient::new(base_url, Duration::from_millis(1_000), 1); + + let result = client.challenge_bucket_sub_aggregate(era_id, bucket_id, node_id, vec![2, 6]); + assert_eq!(result, Ok(expected_response)); + }) +} + +#[test] +fn aggregator_client_challenge_node_aggregate_works() { + let mut ext = TestExternalities::default(); + let (offchain, offchain_state) = TestOffchainExt::new(); + + ext.register_extension(OffchainWorkerExt::new(offchain.clone())); + ext.register_extension(OffchainDbExt::new(Box::new(offchain))); + + ext.execute_with(|| { + let mut offchain_state = offchain_state.write(); + offchain_state.timestamp = Timestamp::from_unix_millis(0); + + let base_url = "http://example.com"; + let era_id = 1; + let merkle_tree_node_id = "2,6"; + let node_id = "0x0ac7cb9c53594e9f538d9950c6bcf28f0c0c7b8385deea2ebe24062bc640e7be"; + + let expected_response = proto::ChallengeResponse { + proofs: vec![ + proto::challenge_response::Proof { + merkle_tree_node_id: 2, + usage: Some(proto::Aggregate { stored: 4, delivered: 3, puts: 2, gets: 1 }), + ..Default::default() + }, + proto::challenge_response::Proof { + merkle_tree_node_id: 6, + usage: Some(proto::Aggregate { stored: 8, delivered: 7, puts: 6, gets: 5 }), + ..Default::default() + }, + ], + }; + let mut expected_response_serialized = Vec::new(); + expected_response.encode(&mut expected_response_serialized).unwrap(); + + let expected = PendingRequest { + method: "GET".into(), + headers: vec![("Accept".into(), "application/protobuf".into())], + uri: format!( + "{}/activity/nodes/{}/challenge?eraId={}&merkleTreeNodeId={}", + base_url, node_id, era_id, merkle_tree_node_id + ), + response: Some(expected_response_serialized), + sent: true, + ..Default::default() + }; + offchain_state.expect_request(expected); + drop(offchain_state); + + let client = AggregatorClient::new(base_url, Duration::from_millis(1_000), 1); + + let result = client.challenge_node_aggregate(era_id, node_id, vec![2, 6]); + assert_eq!(result, Ok(expected_response)); + }) +} diff --git a/runtime/cere-dev/src/lib.rs b/runtime/cere-dev/src/lib.rs index 646a7c2df..b57a7cfd3 100644 --- a/runtime/cere-dev/src/lib.rs +++ b/runtime/cere-dev/src/lib.rs @@ -153,7 +153,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { // and set impl_version to 0. If only runtime // implementation changes and behavior does not, then leave spec_version as // is and increment impl_version. - spec_version: 61001, + spec_version: 61002, impl_version: 0, apis: RUNTIME_API_VERSIONS, transaction_version: 21, diff --git a/runtime/cere/src/lib.rs b/runtime/cere/src/lib.rs index 587b8c7d1..6521464b0 100644 --- a/runtime/cere/src/lib.rs +++ b/runtime/cere/src/lib.rs @@ -147,7 +147,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { // and set impl_version to 0. If only runtime // implementation changes and behavior does not, then leave spec_version as // is and increment impl_version. - spec_version: 61001, + spec_version: 61002, impl_version: 0, apis: RUNTIME_API_VERSIONS, transaction_version: 21,