From 36ff1650098eba0bf015d006336e1ecea0590eb7 Mon Sep 17 00:00:00 2001 From: yahortsaryk Date: Thu, 24 Oct 2024 23:49:22 +0200 Subject: [PATCH 1/5] feat: supporting multicluster environment in DAC validation --- pallets/ddc-clusters/src/lib.rs | 17 + pallets/ddc-customers/src/mock.rs | 4 + pallets/ddc-verification/src/lib.rs | 953 ++++++++++++++------------ pallets/ddc-verification/src/mock.rs | 8 + pallets/ddc-verification/src/tests.rs | 124 ++-- primitives/src/traits/cluster.rs | 15 + 6 files changed, 596 insertions(+), 525 deletions(-) diff --git a/pallets/ddc-clusters/src/lib.rs b/pallets/ddc-clusters/src/lib.rs index 78ecb63cf..c96f4e2d5 100644 --- a/pallets/ddc-clusters/src/lib.rs +++ b/pallets/ddc-clusters/src/lib.rs @@ -906,6 +906,13 @@ pub mod pallet { Ok(()) } + + fn get_last_validated_era(cluster_id: &ClusterId) -> Result { + let cluster = + Clusters::::try_get(cluster_id).map_err(|_| Error::::ClusterDoesNotExist)?; + + Ok(cluster.last_validated_era_id) + } } impl ClusterManager for Pallet { @@ -980,6 +987,16 @@ pub mod pallet { ) -> Result<(), DispatchError> { Self::do_validate_node(*cluster_id, node_pub_key.clone(), succeeded) } + + fn get_clusters(status: ClusterStatus) -> Result, DispatchError> { + let mut clusters_ids = Vec::new(); + for (cluster_id, cluster) in >::iter() { + if cluster.status == status { + clusters_ids.push(cluster_id); + } + } + Ok(clusters_ids) + } } impl ClusterCreator> for Pallet diff --git a/pallets/ddc-customers/src/mock.rs b/pallets/ddc-customers/src/mock.rs index ca95f5626..0c4ded84e 100644 --- a/pallets/ddc-customers/src/mock.rs +++ b/pallets/ddc-customers/src/mock.rs @@ -279,6 +279,10 @@ impl ClusterManager for TestClusterManager { ) -> Result<(), DispatchError> { unimplemented!() } + + fn get_clusters(_status: ClusterStatus) -> Result, DispatchError> { + unimplemented!() + } } pub struct TestClusterCreator; diff --git a/pallets/ddc-verification/src/lib.rs b/pallets/ddc-verification/src/lib.rs index 632f03076..c361f7748 100644 --- a/pallets/ddc-verification/src/lib.rs +++ b/pallets/ddc-verification/src/lib.rs @@ -17,8 +17,8 @@ use ddc_primitives::{ ClusterManager, ClusterValidator, CustomerVisitor, NodeVisitor, PayoutVisitor, ValidatorVisitor, }, - ActivityHash, BatchIndex, ClusterId, CustomerUsage, DdcEra, MMRProof, NodeParams, NodePubKey, - NodeUsage, PayoutState, StorageNodeParams, + ActivityHash, BatchIndex, ClusterId, ClusterStatus, CustomerUsage, DdcEra, MMRProof, + NodeParams, NodePubKey, NodeUsage, PayoutState, StorageNodeParams, }; use frame_support::{ pallet_prelude::*, @@ -178,7 +178,7 @@ pub mod pallet { }, EraRetrievalError { cluster_id: ClusterId, - node_pub_key: NodePubKey, + node_pub_key: Option, validator: T::AccountId, }, PrepareEraTransactionError { @@ -332,7 +332,7 @@ pub mod pallet { }, EraRetrievalError { cluster_id: ClusterId, - node_pub_key: NodePubKey, + node_pub_key: Option, }, /// Bucket aggregate Retrieval Error. BucketAggregateRetrievalError { @@ -446,8 +446,6 @@ pub mod pallet { AlreadySignedPayoutBatch, /// Node Retrieval Error. NodeRetrievalError, - /// Cluster To Validate Retrieval Error. - ClusterToValidateRetrievalError, /// Era To Validate Retrieval Error. EraToValidateRetrievalError, /// Era Per Node Retrieval Error. @@ -542,6 +540,12 @@ pub mod pallet { pub end: i64, } + impl From for EraActivity { + fn from(era: AggregationEraResponse) -> Self { + Self { id: era.id, start: era.start, end: era.end } + } + } + pub struct CustomerBatch { pub(crate) batch_index: BatchIndex, pub(crate) payers: Vec<(T::AccountId, BucketId, CustomerUsage)>, @@ -571,6 +575,22 @@ pub mod pallet { pub(crate) number_of_gets: u64, } + /// DDC aggregation era + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Encode, Decode)] + pub(crate) struct AggregationEraResponse { + pub(crate) id: DdcEra, + pub(crate) status: String, + pub(crate) start: i64, + pub(crate) end: i64, + pub(crate) processing_time: i64, + pub(crate) nodes_total: u32, + pub(crate) nodes_processed: u32, + pub(crate) records_processed: u32, + pub(crate) records_applied: u32, + pub(crate) records_discarded: u32, + pub(crate) attempt: u32, + } + /// Bucket aggregate response from aggregator. #[derive( Debug, Serialize, Deserialize, Clone, Hash, Ord, PartialOrd, PartialEq, Eq, Encode, Decode, @@ -927,521 +947,529 @@ pub mod pallet { log::info!("πŸ‘‹ Hello from pallet-ddc-verification."); - // todo: fetch clusters from ddc-clusters and loop the whole process for each cluster - let cluster_id = unwrap_or_log_error!( - Self::get_cluster_to_validate(), - "🏭❌ Error retrieving cluster to validate" + let clusters_ids = unwrap_or_log_error!( + T::ClusterManager::get_clusters(ClusterStatus::Activated), + "🏭❌ Error retrieving clusters to validate" ); - let batch_size = T::MAX_PAYOUT_BATCH_SIZE; - let mut errors: Vec = Vec::new(); - - let dac_era_result = Self::process_dac_era(&cluster_id, None, batch_size.into()); + for cluster_id in clusters_ids { + let batch_size = T::MAX_PAYOUT_BATCH_SIZE; + let mut errors: Vec = Vec::new(); - match dac_era_result { - Ok(Some(( - era_activity, - payers_merkle_root_hash, - payees_merkle_root_hash, - payers_batch_merkle_root_hashes, - payees_batch_merkle_root_hashes, - ))) => { - log::info!( - "πŸ­πŸš€ Processing era_id: {:?} for cluster_id: {:?}", - era_activity.clone(), - cluster_id - ); + let dac_era_result = Self::process_dac_era(&cluster_id, None, batch_size.into()); - let results = signer.send_signed_transaction(|_account| { - Call::set_prepare_era_for_payout { - cluster_id, - era_activity: era_activity.clone(), - payers_merkle_root_hash, - payees_merkle_root_hash, - payers_batch_merkle_root_hashes: payers_batch_merkle_root_hashes - .clone(), - payees_batch_merkle_root_hashes: payees_batch_merkle_root_hashes - .clone(), - } - }); + match dac_era_result { + Ok(Some(( + era_activity, + payers_merkle_root_hash, + payees_merkle_root_hash, + payers_batch_merkle_root_hashes, + payees_batch_merkle_root_hashes, + ))) => { + log::info!( + "πŸ­πŸš€ Processing era_id: {:?} for cluster_id: {:?}", + era_activity.clone(), + cluster_id + ); + + let results = signer.send_signed_transaction(|_account| { + Call::set_prepare_era_for_payout { + cluster_id, + era_activity: era_activity.clone(), + payers_merkle_root_hash, + payees_merkle_root_hash, + payers_batch_merkle_root_hashes: payers_batch_merkle_root_hashes + .clone(), + payees_batch_merkle_root_hashes: payees_batch_merkle_root_hashes + .clone(), + } + }); - for (_, res) in &results { - match res { - Ok(()) => { - log::info!( + for (_, res) in &results { + match res { + Ok(()) => { + log::info!( "πŸ­β›³οΈ Merkle roots posted on-chain for cluster_id: {:?}, era: {:?}", cluster_id, era_activity.clone() ); - }, - Err(e) => { - log::error!( + }, + Err(e) => { + log::error!( "🏭❌ Error to post merkle roots on-chain for cluster_id: {:?}, era: {:?}: {:?}", cluster_id, era_activity.clone(), e ); - // Extrinsic call failed - errors.push(OCWError::PrepareEraTransactionError { - cluster_id, - era_id: era_activity.id, - payers_merkle_root_hash, - payees_merkle_root_hash, - }); - }, + // Extrinsic call failed + errors.push(OCWError::PrepareEraTransactionError { + cluster_id, + era_id: era_activity.id, + payers_merkle_root_hash, + payees_merkle_root_hash, + }); + }, + } } - } - }, - Ok(None) => { - log::info!("πŸ­β„ΉοΈ No eras for DAC process for cluster_id: {:?}", cluster_id); - }, - Err(process_errors) => { - errors.extend(process_errors); - }, - }; + }, + Ok(None) => { + log::info!("πŸ­β„ΉοΈ No eras for DAC process for cluster_id: {:?}", cluster_id); + }, + Err(process_errors) => { + errors.extend(process_errors); + }, + }; - // todo! factor out as macro as this is repetitive - match Self::prepare_begin_billing_report(&cluster_id) { - Ok(Some((era_id, start_era, end_era))) => { - log::info!( + // todo! factor out as macro as this is repetitive + match Self::prepare_begin_billing_report(&cluster_id) { + Ok(Some((era_id, start_era, end_era))) => { + log::info!( "πŸ­πŸš€ process_start_payout processed successfully for cluster_id: {:?}, era_id: {:?}, start_era: {:?}, end_era: {:?} ", cluster_id, era_id, start_era, end_era ); - let results = signer.send_signed_transaction(|_account| { - Call::begin_billing_report { cluster_id, era_id, start_era, end_era } - }); + let results = signer.send_signed_transaction(|_account| { + Call::begin_billing_report { cluster_id, era_id, start_era, end_era } + }); - for (_, res) in &results { - match res { - Ok(()) => { - log::info!( + for (_, res) in &results { + match res { + Ok(()) => { + log::info!( "πŸ­πŸ„β€ Sent begin_billing_report successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); - }, - Err(e) => { - log::error!( + }, + Err(e) => { + log::error!( "🏭❌ Error to post begin_billing_report for cluster_id: {:?}, era_id: {:?}: {:?}", cluster_id, era_id, e ); - // Extrinsic call failed - errors.push(OCWError::BeginBillingReportTransactionError { - cluster_id, - era_id, - }); - }, + // Extrinsic call failed + errors.push(OCWError::BeginBillingReportTransactionError { + cluster_id, + era_id, + }); + }, + } } - } - }, - Ok(None) => { - log::info!("🏭❌ No era for payout for cluster_id: {:?}", cluster_id); - }, - Err(e) => { - errors.push(e); - }, - } + }, + Ok(None) => { + log::info!("🏭❌ No era for payout for cluster_id: {:?}", cluster_id); + }, + Err(e) => { + errors.push(e); + }, + } - // todo! factor out as macro as this is repetitive - match Self::prepare_begin_charging_customers(&cluster_id, batch_size.into()) { - Ok(Some((era_id, max_batch_index))) => { - log::info!( + // todo! factor out as macro as this is repetitive + match Self::prepare_begin_charging_customers(&cluster_id, batch_size.into()) { + Ok(Some((era_id, max_batch_index))) => { + log::info!( "🏭🎁 prepare_begin_charging_customers processed successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); - if let Some((_, res)) = signer.send_signed_transaction(|_acc| { - Call::begin_charging_customers { cluster_id, era_id, max_batch_index } - }) { - match res { - Ok(_) => { - // Extrinsic call succeeded - log::info!( + if let Some((_, res)) = signer.send_signed_transaction(|_acc| { + Call::begin_charging_customers { cluster_id, era_id, max_batch_index } + }) { + match res { + Ok(_) => { + // Extrinsic call succeeded + log::info!( "πŸ­πŸš€ Sent begin_charging_customers successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); - }, - Err(e) => { - log::error!( + }, + Err(e) => { + log::error!( "🏭❌ Error to post begin_charging_customers for cluster_id: {:?}, era_id: {:?}: {:?}", cluster_id, era_id, e ); - // Extrinsic call failed - errors.push(OCWError::BeginChargingCustomersTransactionError { - cluster_id, - era_id, - }); - }, + // Extrinsic call failed + errors.push(OCWError::BeginChargingCustomersTransactionError { + cluster_id, + era_id, + }); + }, + } + } else { + log::error!("🏭❌ No account available to sign the transaction"); + errors.push(OCWError::NoAvailableSigner); } - } else { - log::error!("🏭❌ No account available to sign the transaction"); - errors.push(OCWError::NoAvailableSigner); - } - }, - Ok(None) => { - log::error!( - "πŸ­πŸ¦€ No era for begin_charging_customers for cluster_id: {:?}", - cluster_id - ); - }, - Err(e) => errors.extend(e), - } + }, + Ok(None) => { + log::error!( + "πŸ­πŸ¦€ No era for begin_charging_customers for cluster_id: {:?}", + cluster_id + ); + }, + Err(e) => errors.extend(e), + } - // todo! factor out as macro as this is repetitive - match Self::prepare_send_charging_customers_batch(&cluster_id, batch_size.into()) { - Ok(Some((era_id, batch_payout))) => { - let payers_log: Vec<(String, BucketId, CustomerUsage)> = batch_payout - .payers - .clone() - .into_iter() - .map(|(acc_id, bucket_id, customer_usage)| { - let account_id: T::AccountIdConverter = acc_id.into(); - let account_id_32: AccountId32 = account_id.into(); - let account_ref: &[u8; 32] = account_id_32.as_ref(); - (hex::encode(account_ref), bucket_id, customer_usage) - }) - .collect(); - log::info!( + // todo! factor out as macro as this is repetitive + match Self::prepare_send_charging_customers_batch(&cluster_id, batch_size.into()) { + Ok(Some((era_id, batch_payout))) => { + let payers_log: Vec<(String, BucketId, CustomerUsage)> = batch_payout + .payers + .clone() + .into_iter() + .map(|(acc_id, bucket_id, customer_usage)| { + let account_id: T::AccountIdConverter = acc_id.into(); + let account_id_32: AccountId32 = account_id.into(); + let account_ref: &[u8; 32] = account_id_32.as_ref(); + (hex::encode(account_ref), bucket_id, customer_usage) + }) + .collect(); + log::info!( "🏭🎁 prepare_send_charging_customers_batch processed successfully for cluster_id: {:?}, era_id: {:?} , batch_payout: {:?}", cluster_id, era_id, payers_log ); - if let Some((_, res)) = - signer.send_signed_transaction(|_acc| Call::send_charging_customers_batch { - cluster_id, - era_id, - batch_index: batch_payout.batch_index, - payers: batch_payout.payers.clone(), - batch_proof: batch_payout.batch_proof.clone(), + if let Some((_, res)) = signer.send_signed_transaction(|_acc| { + Call::send_charging_customers_batch { + cluster_id, + era_id, + batch_index: batch_payout.batch_index, + payers: batch_payout.payers.clone(), + batch_proof: batch_payout.batch_proof.clone(), + } }) { - match res { - Ok(_) => { - // Extrinsic call succeeded - log::info!( + match res { + Ok(_) => { + // Extrinsic call succeeded + log::info!( "πŸ­πŸš€ Sent send_charging_customers_batch successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); - }, - Err(e) => { - log::error!( + }, + Err(e) => { + log::error!( "🏭❌ Error to post send_charging_customers_batch for cluster_id: {:?}, era_id: {:?}: {:?}", cluster_id, era_id, e ); - // Extrinsic call failed - errors.push(OCWError::SendChargingCustomersBatchTransactionError { - cluster_id, - era_id, - batch_index: batch_payout.batch_index, - }); - }, + // Extrinsic call failed + errors.push( + OCWError::SendChargingCustomersBatchTransactionError { + cluster_id, + era_id, + batch_index: batch_payout.batch_index, + }, + ); + }, + } + } else { + log::error!("🏭❌ No account available to sign the transaction"); + errors.push(OCWError::NoAvailableSigner); } - } else { - log::error!("🏭❌ No account available to sign the transaction"); - errors.push(OCWError::NoAvailableSigner); - } - }, - Ok(None) => { - log::info!( - "πŸ­πŸ¦€ No era for send_charging_customers_batch for cluster_id: {:?}", - cluster_id - ); - }, - Err(e) => { - errors.extend(e); - }, - } + }, + Ok(None) => { + log::info!( + "πŸ­πŸ¦€ No era for send_charging_customers_batch for cluster_id: {:?}", + cluster_id + ); + }, + Err(e) => { + errors.extend(e); + }, + } - // todo! factor out as macro as this is repetitive - match Self::prepare_end_charging_customers(&cluster_id) { - Ok(Some(era_id)) => { - log::info!( + // todo! factor out as macro as this is repetitive + match Self::prepare_end_charging_customers(&cluster_id) { + Ok(Some(era_id)) => { + log::info!( "πŸ­πŸ“prepare_end_charging_customers processed successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); - if let Some((_, res)) = signer.send_signed_transaction(|_acc| { - Call::end_charging_customers { cluster_id, era_id } - }) { - match res { - Ok(_) => { - // Extrinsic call succeeded - log::info!( + if let Some((_, res)) = signer.send_signed_transaction(|_acc| { + Call::end_charging_customers { cluster_id, era_id } + }) { + match res { + Ok(_) => { + // Extrinsic call succeeded + log::info!( "πŸ­πŸ“Sent end_charging_customers successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); - }, - Err(e) => { - log::error!( + }, + Err(e) => { + log::error!( "🏭❌Error to post end_charging_customers for cluster_id: {:?}, era_id: {:?}: {:?}", cluster_id, era_id, e ); - // Extrinsic call failed - errors.push(OCWError::EndChargingCustomersTransactionError { - cluster_id, - era_id, - }); - }, + // Extrinsic call failed + errors.push(OCWError::EndChargingCustomersTransactionError { + cluster_id, + era_id, + }); + }, + } + } else { + log::error!("🏭❌No account available to sign the transaction"); + errors.push(OCWError::NoAvailableSigner); } - } else { - log::error!("🏭❌No account available to sign the transaction"); - errors.push(OCWError::NoAvailableSigner); - } - }, - Ok(None) => { - log::info!( - "πŸ­πŸ“No era for end_charging_customers for cluster_id: {:?}", - cluster_id - ); - }, - Err(e) => { - errors.push(e); - }, - } + }, + Ok(None) => { + log::info!( + "πŸ­πŸ“No era for end_charging_customers for cluster_id: {:?}", + cluster_id + ); + }, + Err(e) => { + errors.push(e); + }, + } - // todo! factor out as macro as this is repetitive - match Self::prepare_begin_rewarding_providers(&cluster_id, batch_size.into()) { - Ok(Some((era_id, max_batch_index, total_node_usage))) => { - log::info!( + // todo! factor out as macro as this is repetitive + match Self::prepare_begin_rewarding_providers(&cluster_id, batch_size.into()) { + Ok(Some((era_id, max_batch_index, total_node_usage))) => { + log::info!( "πŸ­πŸ“prepare_begin_rewarding_providers processed successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); - if let Some((_, res)) = - signer.send_signed_transaction(|_acc| Call::begin_rewarding_providers { - cluster_id, - era_id, - max_batch_index, - total_node_usage: total_node_usage.clone(), - }) { - match res { - Ok(_) => { - // Extrinsic call succeeded - log::info!( + if let Some((_, res)) = + signer.send_signed_transaction(|_acc| Call::begin_rewarding_providers { + cluster_id, + era_id, + max_batch_index, + total_node_usage: total_node_usage.clone(), + }) { + match res { + Ok(_) => { + // Extrinsic call succeeded + log::info!( "πŸ­πŸ“Sent begin_rewarding_providers successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); - }, - Err(e) => { - log::error!( + }, + Err(e) => { + log::error!( "🏭❌Error to post begin_rewarding_providers for cluster_id: {:?}, era_id: {:?}: {:?}", cluster_id, era_id, e ); - // Extrinsic call failed - errors.push(OCWError::BeginRewardingProvidersTransactionError { - cluster_id, - era_id, - }); - }, + // Extrinsic call failed + errors.push( + OCWError::BeginRewardingProvidersTransactionError { + cluster_id, + era_id, + }, + ); + }, + } + } else { + log::error!("🏭❌No account available to sign the transaction"); + errors.push(OCWError::NoAvailableSigner); } - } else { - log::error!("🏭❌No account available to sign the transaction"); - errors.push(OCWError::NoAvailableSigner); - } - }, - Ok(None) => { - log::info!( - "πŸ­πŸ“No era for begin_rewarding_providers for cluster_id: {:?}", - cluster_id - ); - }, - Err(e) => { - errors.extend(e); - }, - } + }, + Ok(None) => { + log::info!( + "πŸ­πŸ“No era for begin_rewarding_providers for cluster_id: {:?}", + cluster_id + ); + }, + Err(e) => { + errors.extend(e); + }, + } - // todo! factor out as macro as this is repetitive - match Self::prepare_send_rewarding_providers_batch(&cluster_id, batch_size.into()) { - Ok(Some((era_id, batch_payout))) => { - log::info!( + // todo! factor out as macro as this is repetitive + match Self::prepare_send_rewarding_providers_batch(&cluster_id, batch_size.into()) { + Ok(Some((era_id, batch_payout))) => { + log::info!( "🎁 prepare_send_rewarding_providers_batch processed successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); - if let Some((_, res)) = signer.send_signed_transaction(|_acc| { - Call::send_rewarding_providers_batch { - cluster_id, - era_id, - batch_index: batch_payout.batch_index, - payees: batch_payout.payees.clone(), - batch_proof: batch_payout.batch_proof.clone(), - } - }) { - match res { - Ok(_) => { - // Extrinsic call succeeded - log::info!( + if let Some((_, res)) = signer.send_signed_transaction(|_acc| { + Call::send_rewarding_providers_batch { + cluster_id, + era_id, + batch_index: batch_payout.batch_index, + payees: batch_payout.payees.clone(), + batch_proof: batch_payout.batch_proof.clone(), + } + }) { + match res { + Ok(_) => { + // Extrinsic call succeeded + log::info!( "πŸš€ Sent send_rewarding_providers_batch successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); - }, - Err(e) => { - log::error!( + }, + Err(e) => { + log::error!( "πŸ¦€ Error to post send_rewarding_providers_batch for cluster_id: {:?}, era_id: {:?}: {:?}", cluster_id, era_id, e ); - // Extrinsic call failed - errors.push( - OCWError::SendRewardingProvidersBatchTransactionError { - cluster_id, - era_id, - batch_index: batch_payout.batch_index, - }, - ); - }, + // Extrinsic call failed + errors.push( + OCWError::SendRewardingProvidersBatchTransactionError { + cluster_id, + era_id, + batch_index: batch_payout.batch_index, + }, + ); + }, + } + } else { + log::error!("πŸ¦€ No account available to sign the transaction"); + errors.push(OCWError::NoAvailableSigner); } - } else { - log::error!("πŸ¦€ No account available to sign the transaction"); - errors.push(OCWError::NoAvailableSigner); - } - }, - Ok(None) => { - log::info!( - "πŸ¦€ No era for send_rewarding_providers_batch for cluster_id: {:?}", - cluster_id - ); - }, - Err(e) => { - errors.extend(e); - }, - } + }, + Ok(None) => { + log::info!( + "πŸ¦€ No era for send_rewarding_providers_batch for cluster_id: {:?}", + cluster_id + ); + }, + Err(e) => { + errors.extend(e); + }, + } - // todo! factor out as macro as this is repetitive - match Self::prepare_end_rewarding_providers(&cluster_id) { - Ok(Some(era_id)) => { - log::info!( + // todo! factor out as macro as this is repetitive + match Self::prepare_end_rewarding_providers(&cluster_id) { + Ok(Some(era_id)) => { + log::info!( "πŸ­πŸ“prepare_end_rewarding_providers processed successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); - if let Some((_, res)) = signer.send_signed_transaction(|_acc| { - Call::end_rewarding_providers { cluster_id, era_id } - }) { - match res { - Ok(_) => { - // Extrinsic call succeeded - log::info!( + if let Some((_, res)) = signer.send_signed_transaction(|_acc| { + Call::end_rewarding_providers { cluster_id, era_id } + }) { + match res { + Ok(_) => { + // Extrinsic call succeeded + log::info!( "πŸ­πŸ“Sent end_rewarding_providers successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); - }, - Err(e) => { - log::error!( + }, + Err(e) => { + log::error!( "🏭❌Error to post end_rewarding_providers for cluster_id: {:?}, era_id: {:?}: {:?}", cluster_id, era_id, e ); - // Extrinsic call failed - errors.push(OCWError::EndRewardingProvidersTransactionError { - cluster_id, - era_id, - }); - }, + // Extrinsic call failed + errors.push(OCWError::EndRewardingProvidersTransactionError { + cluster_id, + era_id, + }); + }, + } + } else { + log::error!("🏭❌No account available to sign the transaction"); + errors.push(OCWError::NoAvailableSigner); } - } else { - log::error!("🏭❌No account available to sign the transaction"); - errors.push(OCWError::NoAvailableSigner); - } - }, - Ok(None) => { - log::info!( - "πŸ­πŸ“No era for end_rewarding_providers for cluster_id: {:?}", - cluster_id - ); - }, - Err(e) => { - errors.push(e); - }, - } + }, + Ok(None) => { + log::info!( + "πŸ­πŸ“No era for end_rewarding_providers for cluster_id: {:?}", + cluster_id + ); + }, + Err(e) => { + errors.push(e); + }, + } - // todo! factor out as macro as this is repetitive - match Self::prepare_end_billing_report(&cluster_id) { - Ok(Some(era_id)) => { - log::info!( + // todo! factor out as macro as this is repetitive + match Self::prepare_end_billing_report(&cluster_id) { + Ok(Some(era_id)) => { + log::info!( "πŸ­πŸ“prepare_end_billing_report processed successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); - if let Some((_, res)) = signer.send_signed_transaction(|_acc| { - Call::end_billing_report { cluster_id, era_id } - }) { - match res { - Ok(_) => { - // Extrinsic call succeeded - log::info!( + if let Some((_, res)) = signer.send_signed_transaction(|_acc| { + Call::end_billing_report { cluster_id, era_id } + }) { + match res { + Ok(_) => { + // Extrinsic call succeeded + log::info!( "πŸ­πŸ“Sent end_billing_report successfully for cluster_id: {:?}, era_id: {:?}", cluster_id, era_id ); - }, - Err(e) => { - log::error!( + }, + Err(e) => { + log::error!( "🏭❌Error to post end_billing_report for cluster_id: {:?}, era_id: {:?}: {:?}", cluster_id, era_id, e ); - // Extrinsic call failed - errors.push(OCWError::EndBillingReportTransactionError { - cluster_id, - era_id, - }); - }, + // Extrinsic call failed + errors.push(OCWError::EndBillingReportTransactionError { + cluster_id, + era_id, + }); + }, + } + } else { + log::error!("🏭❌No account available to sign the transaction"); + errors.push(OCWError::NoAvailableSigner); } - } else { - log::error!("🏭❌No account available to sign the transaction"); - errors.push(OCWError::NoAvailableSigner); - } - }, - Ok(None) => { - log::info!( - "πŸ­πŸ“No era for end_billing_report for cluster_id: {:?}", - cluster_id - ); - }, - Err(e) => { - errors.push(e); - }, - } + }, + Ok(None) => { + log::info!( + "πŸ­πŸ“No era for end_billing_report for cluster_id: {:?}", + cluster_id + ); + }, + Err(e) => { + errors.push(e); + }, + } - if !errors.is_empty() { - let results = signer.send_signed_transaction(|_account| { - Call::emit_consensus_errors { errors: errors.clone() } - }); + if !errors.is_empty() { + let results = signer.send_signed_transaction(|_account| { + Call::emit_consensus_errors { errors: errors.clone() } + }); - for (_, res) in &results { - match res { - Ok(()) => log::info!("βœ… Successfully submitted emit_consensus_errors tx"), - Err(_) => log::error!("🏭❌ Failed to submit emit_consensus_errors tx"), + for (_, res) in &results { + match res { + Ok(()) => { + log::info!("βœ… Successfully submitted emit_consensus_errors tx") + }, + Err(_) => log::error!("🏭❌ Failed to submit emit_consensus_errors tx"), + } } } } @@ -2892,40 +2920,65 @@ pub mod pallet { /// - `cluster_id`: cluster id of a cluster /// - `dac_nodes`: List of DAC nodes pub(crate) fn get_era_for_validation( - // todo! this needs to be rewriten - too complex and inefficient cluster_id: &ClusterId, dac_nodes: &[(NodePubKey, StorageNodeParams)], ) -> Result, OCWError> { - let current_validator_data = Self::fetch_current_validator()?; + let this_validator_data = Self::fetch_current_validator()?; + let this_validator = T::AccountId::decode(&mut &this_validator_data[..]).unwrap(); - let current_validator = T::AccountId::decode(&mut ¤t_validator_data[..]).unwrap(); + let last_validated_era_by_this_validator = + Self::get_last_validated_era(cluster_id, this_validator)? + .unwrap_or_else(DdcEra::default); - let last_validated_era = Self::get_last_validated_era(cluster_id, current_validator)? - .unwrap_or_else(DdcEra::default); + log::info!( + "πŸš€ last_validated_era_by_this_validator for cluster_id: {:?}", + last_validated_era_by_this_validator + ); - log::info!("πŸš€ last_validated_era for cluster_id: {:?}", last_validated_era); - let all_ids = Self::fetch_processed_era_for_node(cluster_id, dac_nodes)?; + let last_validated_era_for_cluster = + T::ClusterValidator::get_last_validated_era(cluster_id).map_err(|_| { + OCWError::EraRetrievalError { cluster_id: *cluster_id, node_pub_key: None } + })?; - let ids_greater_than_last_validated_era: Vec = all_ids + // we want to fetch processed eras from all available validators + let available_processed_eras = + Self::fetch_processed_era_for_nodes(cluster_id, dac_nodes)?; + + // we want to let the current validator to validate available processed/completed eras + // that are greater from the last validated era in the cluster + let processed_eras_to_validate: Vec = available_processed_eras .iter() - .flat_map(|eras| eras.iter().filter(|&ids| ids.id > last_validated_era).cloned()) + .flat_map(|eras| { + eras.iter() + .filter(|&ids| { + ids.id > last_validated_era_by_this_validator && + ids.id > last_validated_era_for_cluster + }) + .cloned() + }) .sorted() .collect::>(); - let mut grouped_data: Vec<(u32, EraActivity)> = Vec::new(); - for (key, chunk) in - &ids_greater_than_last_validated_era.into_iter().chunk_by(|elt| elt.clone()) + // we want to process only eras reported by quorum of validators + let mut processed_eras_with_quorum: Vec = vec![]; + + let quorum = T::AggregatorsQuorum::get(); + let threshold = quorum * dac_nodes.len(); + for (era_key, candidates) in + &processed_eras_to_validate.into_iter().chunk_by(|elt| elt.clone()) { - grouped_data.push((chunk.count() as u32, key)); + if candidates.count() >= threshold { + processed_eras_with_quorum.push(era_key); + } else { + log::warn!( + "⚠️ Era {:?} in cluster_id: {:?} has been reported with unmet quorum", + era_key, + cluster_id + ); + } } - let all_node_eras = grouped_data - .into_iter() - .filter(|(v, _)| *v == dac_nodes.len() as u32) - .map(|(_, id)| id) - .collect::>(); - - Ok(all_node_eras.iter().cloned().min_by_key(|n| n.id)) + Ok(processed_eras_with_quorum.iter().cloned().min_by_key(|n| n.id)) } /// Computes the consensus for a set of activities across multiple nodes within a given @@ -3045,12 +3098,6 @@ pub mod pallet { } } - /// Fetch cluster to validate. - fn get_cluster_to_validate() -> Result> { - // todo! to implement - Self::cluster_to_validate().ok_or(Error::ClusterToValidateRetrievalError) - } - /// Fetch Challenge node aggregate or bucket sub-aggregate. pub(crate) fn _fetch_challenge_responses( cluster_id: &ClusterId, @@ -3178,9 +3225,9 @@ pub mod pallet { /// Parameters: /// - `node_params`: DAC node parameters #[allow(dead_code)] - pub(crate) fn fetch_processed_era( + pub(crate) fn fetch_processed_eras( node_params: &StorageNodeParams, - ) -> Result, http::Error> { + ) -> Result, http::Error> { let scheme = "http"; let host = str::from_utf8(&node_params.host).map_err(|_| http::Error::Unknown)?; let url = format!("{}://{}:{}/activity/eras", scheme, host, node_params.http_port); @@ -3189,8 +3236,6 @@ pub mod pallet { .add(sp_runtime::offchain::Duration::from_millis(RESPONSE_TIMEOUT)); let pending = request.deadline(timeout).send().map_err(|_| http::Error::IoError)?; - // todo! filter by status == PROCESSED - let response = pending.try_wait(timeout).map_err(|_| http::Error::DeadlineReached)??; if response.code != SUCCESS_CODE { @@ -3198,8 +3243,11 @@ pub mod pallet { } let body = response.body().collect::>(); + let res: Vec = + serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?; - serde_json::from_slice(&body).map_err(|_| http::Error::Unknown) + let processed_status = String::from("PROCESSED"); + Ok(res.into_iter().filter(|e| e.status == processed_status).collect::>()) } /// Fetch customer usage. /// @@ -3320,27 +3368,32 @@ pub mod pallet { let mut nodes_aggregates = Vec::new(); for (node_pub_key, node_params) in dac_nodes { - // todo! probably shouldn't stop when some DAC is not responding as we can still - // work with others - let aggregates = Self::fetch_node_aggregates(cluster_id, era_id, node_params) - .map_err(|_| OCWError::NodeUsageRetrievalError { - cluster_id: *cluster_id, - era_id, - node_pub_key: node_pub_key.clone(), - })?; + let aggregates_res = Self::fetch_node_aggregates(cluster_id, era_id, node_params); + if aggregates_res.is_err() { + log::warn!( + "Aggregator with key {:?} is unavailable while fetching nodes aggregates", + node_pub_key + ); + // skip unavailable aggregators and continue with available ones + continue; + } + + let aggregates = aggregates_res.unwrap(); + // todo: this is tech debt that needs to be refactored, the mapping logic needs to + // be moved to payouts pallet for aggregate in aggregates.clone() { let provider_id = Self::get_node_provider_id(node_pub_key).unwrap(); - Self::store_provider_id(aggregate.node_id, provider_id); // todo! this is not good - needs to be - // moved payout pallet + Self::store_provider_id(aggregate.node_id, provider_id); } - let aggregator_info = AggregatorInfo { - node_pub_key: node_pub_key.clone(), - node_params: node_params.clone(), - }; - - nodes_aggregates.push((aggregator_info, aggregates)); + nodes_aggregates.push(( + AggregatorInfo { + node_pub_key: node_pub_key.clone(), + node_params: node_params.clone(), + }, + aggregates, + )); } Ok(nodes_aggregates) @@ -3361,21 +3414,25 @@ pub mod pallet { Vec::new(); for (node_pub_key, node_params) in dac_nodes { - // todo! probably shouldn't stop when some DAC is not responding as we can still - // work with others - let aggregates = Self::fetch_bucket_aggregates(cluster_id, era_id, node_params) - .map_err(|_| OCWError::BucketAggregatesRetrievalError { - cluster_id: *cluster_id, - era_id, - node_pub_key: node_pub_key.clone(), - })?; + let aggregates_res = Self::fetch_bucket_aggregates(cluster_id, era_id, node_params); + if aggregates_res.is_err() { + log::warn!( + "Aggregator with key {:?} is unavailable while fetching buckets aggregates", + node_pub_key + ); + // skip unavailable aggregators and continue with available ones + continue; + } - let aggregator_info = AggregatorInfo { - node_pub_key: node_pub_key.clone(), - node_params: node_params.clone(), - }; + let aggregates = aggregates_res.unwrap(); - bucket_aggregates.push((aggregator_info, aggregates)); + bucket_aggregates.push(( + AggregatorInfo { + node_pub_key: node_pub_key.clone(), + node_params: node_params.clone(), + }, + aggregates, + )); } Ok(bucket_aggregates) @@ -3386,27 +3443,31 @@ pub mod pallet { /// Parameters: /// - `cluster_id`: Cluster id /// - `node_params`: DAC node parameters - fn fetch_processed_era_for_node( - cluster_id: &ClusterId, + fn fetch_processed_era_for_nodes( + _cluster_id: &ClusterId, dac_nodes: &[(NodePubKey, StorageNodeParams)], ) -> Result>, OCWError> { - let mut eras = Vec::new(); + let mut processed_eras_by_nodes: Vec> = Vec::new(); for (node_pub_key, node_params) in dac_nodes { - // todo! probably shouldn't stop when some DAC is not responding as we can still - // work with others - - let ids = Self::fetch_processed_era(node_params).map_err(|_| { - OCWError::EraRetrievalError { - cluster_id: *cluster_id, - node_pub_key: node_pub_key.clone(), + let processed_eras_by_node = Self::fetch_processed_eras(node_params); + if processed_eras_by_node.is_err() { + log::warn!( + "Aggregator with key {:?} is unavailable while fetching eras", + node_pub_key + ); + // skip unavailable aggregators and continue with available ones + continue; + } else { + let eras = processed_eras_by_node.unwrap(); + if !eras.is_empty() { + processed_eras_by_nodes + .push(eras.into_iter().map(|e| e.into()).collect::>()); } - })?; - - eras.push(ids); + } } - Ok(eras) + Ok(processed_eras_by_nodes) } } diff --git a/pallets/ddc-verification/src/mock.rs b/pallets/ddc-verification/src/mock.rs index b5d8edfab..862990353 100644 --- a/pallets/ddc-verification/src/mock.rs +++ b/pallets/ddc-verification/src/mock.rs @@ -332,6 +332,10 @@ impl ClusterValidator for TestClusterValidator { ) -> Result<(), DispatchError> { unimplemented!() } + + fn get_last_validated_era(_cluster_id: &ClusterId) -> Result { + Ok(Default::default()) + } } pub struct MockPayoutVisitor; @@ -687,6 +691,10 @@ impl ClusterManager for TestClusterManager { ) -> Result<(), DispatchError> { unimplemented!() } + + fn get_clusters(_status: ClusterStatus) -> Result, DispatchError> { + Ok(vec![ClusterId::from([12; 20])]) + } } impl frame_system::offchain::SigningTypes for Test { diff --git a/pallets/ddc-verification/src/tests.rs b/pallets/ddc-verification/src/tests.rs index f9cb21f32..82f573130 100644 --- a/pallets/ddc-verification/src/tests.rs +++ b/pallets/ddc-verification/src/tests.rs @@ -2183,22 +2183,11 @@ fn fetch_processed_era_works() { let host = "example1.com"; let port = 80; - // Create a sample EraActivity instance - let era_activity1 = EraActivity { id: 17, start: 1, end: 2 }; - let era_activity2 = EraActivity { id: 18, start: 1, end: 2 }; - let era_activity3 = EraActivity { id: 19, start: 1, end: 2 }; - let era_activity_json = serde_json::to_string(&vec![ - era_activity1.clone(), - era_activity2.clone(), - era_activity3, - ]) - .unwrap(); - // Mock HTTP request and response let pending_request = PendingRequest { method: "GET".to_string(), uri: format!("http://{}:{}/activity/eras", host, port), - response: Some(era_activity_json.as_bytes().to_vec()), + response: Some(br#"[{"id":17,"status":"PROCESSED","start":1,"end":2,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0},{"id":18,"status":"PROCESSED","start":1,"end":2,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0},{"id":19,"status":"PROCESSED","start":1,"end":2,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0}]"#.to_vec()), sent: true, ..Default::default() }; @@ -2215,11 +2204,17 @@ fn fetch_processed_era_works() { domain: b"example2.com".to_vec(), }; - let result = Pallet::::fetch_processed_era(&node_params); + let result = Pallet::::fetch_processed_eras(&node_params); assert!(result.is_ok()); let activities = result.unwrap(); - assert_eq!(activities[0].id, era_activity1.id); - assert_eq!(activities[1].id, era_activity2.id); + + let era_activity1 = EraActivity { id: 17, start: 1, end: 2 }; + let era_activity2 = EraActivity { id: 18, start: 1, end: 2 }; + let era_activity3 = EraActivity { id: 19, start: 1, end: 2 }; + + assert_eq!(era_activity1, activities[0].clone().into()); + assert_eq!(era_activity2, activities[1].clone().into()); + assert_eq!(era_activity3, activities[2].clone().into()); }); } @@ -2248,60 +2243,32 @@ fn get_era_for_validation_works() { let host3 = "example3.com"; let host4 = "example4.com"; let port = 80; - let era_activity1 = EraActivity { id: 16, start: 1, end: 2 }; - let era_activity2 = EraActivity { id: 17, start: 1, end: 2 }; - let era_activity3 = EraActivity { id: 18, start: 1, end: 2 }; - let era_activity4 = EraActivity { id: 19, start: 1, end: 2 }; - let era_activity_json1 = serde_json::to_string(&vec![ - era_activity1.clone(), //16 - era_activity2.clone(), //17 - era_activity3.clone(), //18 - era_activity4.clone(), //19 - ]) - .unwrap(); - let era_activity_json2 = serde_json::to_string(&vec![ - era_activity1.clone(), //16 - era_activity2.clone(), //17 - era_activity3.clone(), //18 - ]) - .unwrap(); - let era_activity_json3 = serde_json::to_string(&vec![ - era_activity1.clone(), //16 - era_activity2.clone(), //17 - era_activity3.clone(), //18 - ]) - .unwrap(); - let era_activity_json4 = serde_json::to_string(&vec![ - era_activity1.clone(), //16 - era_activity2.clone(), //17 - era_activity3.clone(), //18 - ]) - .unwrap(); + let pending_request1 = PendingRequest { method: "GET".to_string(), uri: format!("http://{}:{}/activity/eras", host1, port), - response: Some(era_activity_json1.as_bytes().to_vec()), + response: Some(br#"[{"id":16,"status":"PROCESSED","start":1,"end":2,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0},{"id":17,"status":"PROCESSED","start":1,"end":2,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0},{"id":18,"status":"PROCESSED","start":1,"end":2,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0},{"id":19,"status":"PROCESSED","start":1,"end":2,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0}]"#.to_vec()), sent: true, ..Default::default() }; let pending_request2 = PendingRequest { method: "GET".to_string(), uri: format!("http://{}:{}/activity/eras", host2, port), - response: Some(era_activity_json2.as_bytes().to_vec()), + response: Some(br#"[{"id":16,"status":"PROCESSED","start":1,"end":2,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0},{"id":17,"status":"PROCESSED","start":1,"end":2,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0},{"id":18,"status":"PROCESSED","start":1,"end":2,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0}]"#.to_vec()), sent: true, ..Default::default() }; let pending_request3 = PendingRequest { method: "GET".to_string(), uri: format!("http://{}:{}/activity/eras", host3, port), - response: Some(era_activity_json3.as_bytes().to_vec()), + response: Some(br#"[{"id":16,"status":"PROCESSED","start":1,"end":2,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0},{"id":17,"status":"PROCESSED","start":1,"end":2,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0},{"id":18,"status":"PROCESSED","start":1,"end":2,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0}]"#.to_vec()), sent: true, ..Default::default() }; let pending_request4 = PendingRequest { method: "GET".to_string(), uri: format!("http://{}:{}/activity/eras", host4, port), - response: Some(era_activity_json4.as_bytes().to_vec()), + response: Some(br#"[{"id":16,"status":"PROCESSED","start":1,"end":2,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0},{"id":17,"status":"PROCESSED","start":1,"end":2,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0},{"id":18,"status":"PROCESSED","start":1,"end":2,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0}]"#.to_vec()), sent: true, ..Default::default() }; @@ -2361,7 +2328,8 @@ fn get_era_for_validation_works() { let cluster_id = ClusterId::from([12; 20]); let result = Pallet::::get_era_for_validation(&cluster_id, &dac_nodes); - assert_eq!(result.unwrap().unwrap(), era_activity1); //16 + let era_activity = EraActivity { id: 16, start: 1, end: 2 }; + assert_eq!(result.unwrap().unwrap(), era_activity); }); } @@ -2613,63 +2581,63 @@ fn test_single_ocw_pallet_integration() { let pending_request1 = PendingRequest { method: "GET".to_string(), uri: format!("http://{}:{}/activity/eras", host1, port), - response: Some(br#"[{"id":476814,"start":0,"end":1716533999999,"processing_time_ms":0,"total_records":0,"total_buckets":0},{"id":476815,"start":1716534000000,"end":1716537599999,"processing_time_ms":2,"total_records":54,"total_buckets":2},{"id":476816,"start":1716537600000,"end":1716541199999,"processing_time_ms":10,"total_records":803,"total_buckets":29},{"id":476817,"start":1716541200000,"end":1716544799999,"processing_time_ms":11,"total_records":986,"total_buckets":28}]"#.to_vec()), + response: Some(br#"[{"id":5738616,"status":"PROCESSED","start":1721584800000,"end":1721585099999,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0},{"id":5738617,"status":"PROCESSED","start":1721585100000,"end":1721585399999,"processing_time":1818,"nodes_total":9,"nodes_processed":9,"records_processed":16,"records_applied":16,"records_discarded":0,"attempt":0},{"id":5738618,"status":"PROCESSED","start":1721585400000,"end":1721585699999,"processing_time":1997,"nodes_total":9,"nodes_processed":9,"records_processed":622,"records_applied":622,"records_discarded":0,"attempt":0},{"id":5738619,"status":"PROCESSED","start":1721585700000,"end":1721585999999,"processing_time":2118,"nodes_total":9,"nodes_processed":9,"records_processed":834,"records_applied":834,"records_discarded":0,"attempt":0}]"#.to_vec()), sent: true, ..Default::default() }; let pending_request2 = PendingRequest { method: "GET".to_string(), uri: format!("http://{}:{}/activity/eras", host2, port), - response: Some(br#"[{"id":476814,"start":0,"end":1716533999999,"processing_time_ms":0,"total_records":0,"total_buckets":0},{"id":476815,"start":1716534000000,"end":1716537599999,"processing_time_ms":2,"total_records":54,"total_buckets":2},{"id":476816,"start":1716537600000,"end":1716541199999,"processing_time_ms":10,"total_records":803,"total_buckets":29},{"id":476817,"start":1716541200000,"end":1716544799999,"processing_time_ms":11,"total_records":986,"total_buckets":28}]"#.to_vec()), + response: Some(br#"[{"id":5738616,"status":"PROCESSED","start":1721584800000,"end":1721585099999,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0},{"id":5738617,"status":"PROCESSED","start":1721585100000,"end":1721585399999,"processing_time":1818,"nodes_total":9,"nodes_processed":9,"records_processed":16,"records_applied":16,"records_discarded":0,"attempt":0},{"id":5738618,"status":"PROCESSED","start":1721585400000,"end":1721585699999,"processing_time":1997,"nodes_total":9,"nodes_processed":9,"records_processed":622,"records_applied":622,"records_discarded":0,"attempt":0},{"id":5738619,"status":"PROCESSED","start":1721585700000,"end":1721585999999,"processing_time":2118,"nodes_total":9,"nodes_processed":9,"records_processed":834,"records_applied":834,"records_discarded":0,"attempt":0}]"#.to_vec()), sent: true, ..Default::default() }; let pending_request3 = PendingRequest { method: "GET".to_string(), uri: format!("http://{}:{}/activity/eras", host3, port), - response: Some(br#"[{"id":476814,"start":0,"end":1716533999999,"processing_time_ms":0,"total_records":0,"total_buckets":0},{"id":476815,"start":1716534000000,"end":1716537599999,"processing_time_ms":2,"total_records":54,"total_buckets":2},{"id":476816,"start":1716537600000,"end":1716541199999,"processing_time_ms":10,"total_records":803,"total_buckets":29},{"id":476817,"start":1716541200000,"end":1716544799999,"processing_time_ms":11,"total_records":986,"total_buckets":28}]"#.to_vec()), + response: Some(br#"[{"id":5738616,"status":"PROCESSED","start":1721584800000,"end":1721585099999,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0},{"id":5738617,"status":"PROCESSED","start":1721585100000,"end":1721585399999,"processing_time":1818,"nodes_total":9,"nodes_processed":9,"records_processed":16,"records_applied":16,"records_discarded":0,"attempt":0},{"id":5738618,"status":"PROCESSED","start":1721585400000,"end":1721585699999,"processing_time":1997,"nodes_total":9,"nodes_processed":9,"records_processed":622,"records_applied":622,"records_discarded":0,"attempt":0},{"id":5738619,"status":"PROCESSED","start":1721585700000,"end":1721585999999,"processing_time":2118,"nodes_total":9,"nodes_processed":9,"records_processed":834,"records_applied":834,"records_discarded":0,"attempt":0}]"#.to_vec()), sent: true, ..Default::default() }; let pending_request4 = PendingRequest { method: "GET".to_string(), uri: format!("http://{}:{}/activity/eras", host4, port), - response: Some(br#"[{"id":476814,"start":0,"end":1716533999999,"processing_time_ms":0,"total_records":0,"total_buckets":0},{"id":476815,"start":1716534000000,"end":1716537599999,"processing_time_ms":2,"total_records":54,"total_buckets":2},{"id":476816,"start":1716537600000,"end":1716541199999,"processing_time_ms":10,"total_records":803,"total_buckets":29},{"id":476817,"start":1716541200000,"end":1716544799999,"processing_time_ms":11,"total_records":986,"total_buckets":28}]"#.to_vec()), + response: Some(br#"[{"id":5738616,"status":"PROCESSED","start":1721584800000,"end":1721585099999,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0},{"id":5738617,"status":"PROCESSED","start":1721585100000,"end":1721585399999,"processing_time":1818,"nodes_total":9,"nodes_processed":9,"records_processed":16,"records_applied":16,"records_discarded":0,"attempt":0},{"id":5738618,"status":"PROCESSED","start":1721585400000,"end":1721585699999,"processing_time":1997,"nodes_total":9,"nodes_processed":9,"records_processed":622,"records_applied":622,"records_discarded":0,"attempt":0},{"id":5738619,"status":"PROCESSED","start":1721585700000,"end":1721585999999,"processing_time":2118,"nodes_total":9,"nodes_processed":9,"records_processed":834,"records_applied":834,"records_discarded":0,"attempt":0}]"#.to_vec()), sent: true, ..Default::default() }; let pending_request5 = PendingRequest { method: "GET".to_string(), uri: format!("http://{}:{}/activity/eras", host5, port), - response: Some(br#"[{"id":476814,"start":0,"end":1716533999999,"processing_time_ms":0,"total_records":0,"total_buckets":0},{"id":476815,"start":1716534000000,"end":1716537599999,"processing_time_ms":2,"total_records":54,"total_buckets":2},{"id":476816,"start":1716537600000,"end":1716541199999,"processing_time_ms":10,"total_records":803,"total_buckets":29},{"id":476817,"start":1716541200000,"end":1716544799999,"processing_time_ms":11,"total_records":986,"total_buckets":28}]"#.to_vec()), + response: Some(br#"[{"id":5738616,"status":"PROCESSED","start":1721584800000,"end":1721585099999,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0},{"id":5738617,"status":"PROCESSED","start":1721585100000,"end":1721585399999,"processing_time":1818,"nodes_total":9,"nodes_processed":9,"records_processed":16,"records_applied":16,"records_discarded":0,"attempt":0},{"id":5738618,"status":"PROCESSED","start":1721585400000,"end":1721585699999,"processing_time":1997,"nodes_total":9,"nodes_processed":9,"records_processed":622,"records_applied":622,"records_discarded":0,"attempt":0},{"id":5738619,"status":"PROCESSED","start":1721585700000,"end":1721585999999,"processing_time":2118,"nodes_total":9,"nodes_processed":9,"records_processed":834,"records_applied":834,"records_discarded":0,"attempt":0}]"#.to_vec()), sent: true, ..Default::default() }; let pending_request6 = PendingRequest { method: "GET".to_string(), uri: format!("http://{}:{}/activity/eras", host6, port), - response: Some(br#"[{"id":476814,"start":0,"end":1716533999999,"processing_time_ms":0,"total_records":0,"total_buckets":0},{"id":476815,"start":1716534000000,"end":1716537599999,"processing_time_ms":2,"total_records":54,"total_buckets":2},{"id":476816,"start":1716537600000,"end":1716541199999,"processing_time_ms":10,"total_records":803,"total_buckets":29},{"id":476817,"start":1716541200000,"end":1716544799999,"processing_time_ms":11,"total_records":986,"total_buckets":28}]"#.to_vec()), + response: Some(br#"[{"id":5738616,"status":"PROCESSED","start":1721584800000,"end":1721585099999,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0},{"id":5738617,"status":"PROCESSED","start":1721585100000,"end":1721585399999,"processing_time":1818,"nodes_total":9,"nodes_processed":9,"records_processed":16,"records_applied":16,"records_discarded":0,"attempt":0},{"id":5738618,"status":"PROCESSED","start":1721585400000,"end":1721585699999,"processing_time":1997,"nodes_total":9,"nodes_processed":9,"records_processed":622,"records_applied":622,"records_discarded":0,"attempt":0},{"id":5738619,"status":"PROCESSED","start":1721585700000,"end":1721585999999,"processing_time":2118,"nodes_total":9,"nodes_processed":9,"records_processed":834,"records_applied":834,"records_discarded":0,"attempt":0}]"#.to_vec()), sent: true, ..Default::default() }; let pending_request7 = PendingRequest { method: "GET".to_string(), uri: format!("http://{}:{}/activity/eras", host7, port), - response: Some(br#"[{"id":476814,"start":0,"end":1716533999999,"processing_time_ms":0,"total_records":0,"total_buckets":0},{"id":476815,"start":1716534000000,"end":1716537599999,"processing_time_ms":2,"total_records":54,"total_buckets":2},{"id":476816,"start":1716537600000,"end":1716541199999,"processing_time_ms":10,"total_records":803,"total_buckets":29},{"id":476817,"start":1716541200000,"end":1716544799999,"processing_time_ms":11,"total_records":986,"total_buckets":28}]"#.to_vec()), + response: Some(br#"[{"id":5738616,"status":"PROCESSED","start":1721584800000,"end":1721585099999,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0},{"id":5738617,"status":"PROCESSED","start":1721585100000,"end":1721585399999,"processing_time":1818,"nodes_total":9,"nodes_processed":9,"records_processed":16,"records_applied":16,"records_discarded":0,"attempt":0},{"id":5738618,"status":"PROCESSED","start":1721585400000,"end":1721585699999,"processing_time":1997,"nodes_total":9,"nodes_processed":9,"records_processed":622,"records_applied":622,"records_discarded":0,"attempt":0},{"id":5738619,"status":"PROCESSED","start":1721585700000,"end":1721585999999,"processing_time":2118,"nodes_total":9,"nodes_processed":9,"records_processed":834,"records_applied":834,"records_discarded":0,"attempt":0}]"#.to_vec()), sent: true, ..Default::default() }; let pending_request8 = PendingRequest { method: "GET".to_string(), uri: format!("http://{}:{}/activity/eras", host8, port), - response: Some(br#"[{"id":476814,"start":0,"end":1716533999999,"processing_time_ms":0,"total_records":0,"total_buckets":0},{"id":476815,"start":1716534000000,"end":1716537599999,"processing_time_ms":2,"total_records":54,"total_buckets":2},{"id":476816,"start":1716537600000,"end":1716541199999,"processing_time_ms":10,"total_records":803,"total_buckets":29},{"id":476817,"start":1716541200000,"end":1716544799999,"processing_time_ms":11,"total_records":986,"total_buckets":28}]"#.to_vec()), + response: Some(br#"[{"id":5738616,"status":"PROCESSED","start":1721584800000,"end":1721585099999,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0},{"id":5738617,"status":"PROCESSED","start":1721585100000,"end":1721585399999,"processing_time":1818,"nodes_total":9,"nodes_processed":9,"records_processed":16,"records_applied":16,"records_discarded":0,"attempt":0},{"id":5738618,"status":"PROCESSED","start":1721585400000,"end":1721585699999,"processing_time":1997,"nodes_total":9,"nodes_processed":9,"records_processed":622,"records_applied":622,"records_discarded":0,"attempt":0},{"id":5738619,"status":"PROCESSED","start":1721585700000,"end":1721585999999,"processing_time":2118,"nodes_total":9,"nodes_processed":9,"records_processed":834,"records_applied":834,"records_discarded":0,"attempt":0}]"#.to_vec()), sent: true, ..Default::default() }; let pending_request9 = PendingRequest { method: "GET".to_string(), uri: format!("http://{}:{}/activity/eras", host9, port), - response: Some(br#"[{"id":476814,"start":0,"end":1716533999999,"processing_time_ms":0,"total_records":0,"total_buckets":0},{"id":476815,"start":1716534000000,"end":1716537599999,"processing_time_ms":2,"total_records":54,"total_buckets":2},{"id":476816,"start":1716537600000,"end":1716541199999,"processing_time_ms":10,"total_records":803,"total_buckets":29},{"id":476817,"start":1716541200000,"end":1716544799999,"processing_time_ms":11,"total_records":986,"total_buckets":28}]"#.to_vec()), + response: Some(br#"[{"id":5738616,"status":"PROCESSED","start":1721584800000,"end":1721585099999,"processing_time":15977,"nodes_total":9,"nodes_processed":9,"records_processed":0,"records_applied":0,"records_discarded":130755,"attempt":0},{"id":5738617,"status":"PROCESSED","start":1721585100000,"end":1721585399999,"processing_time":1818,"nodes_total":9,"nodes_processed":9,"records_processed":16,"records_applied":16,"records_discarded":0,"attempt":0},{"id":5738618,"status":"PROCESSED","start":1721585400000,"end":1721585699999,"processing_time":1997,"nodes_total":9,"nodes_processed":9,"records_processed":622,"records_applied":622,"records_discarded":0,"attempt":0},{"id":5738619,"status":"PROCESSED","start":1721585700000,"end":1721585999999,"processing_time":2118,"nodes_total":9,"nodes_processed":9,"records_processed":834,"records_applied":834,"records_discarded":0,"attempt":0}]"#.to_vec()), sent: true, ..Default::default() }; @@ -2677,7 +2645,7 @@ fn test_single_ocw_pallet_integration() { let node_pending_request1 = PendingRequest { method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId=476814", host1, port), + uri: format!("http://{}:{}/activity/nodes?eraId=5738616", host1, port), response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), sent: true, ..Default::default() @@ -2685,7 +2653,7 @@ fn test_single_ocw_pallet_integration() { let node_pending_request2 = PendingRequest { method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId=476814", host2, port), + uri: format!("http://{}:{}/activity/nodes?eraId=5738616", host2, port), response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), sent: true, ..Default::default() @@ -2693,7 +2661,7 @@ fn test_single_ocw_pallet_integration() { let node_pending_request3 = PendingRequest { method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId=476814", host3, port), + uri: format!("http://{}:{}/activity/nodes?eraId=5738616", host3, port), response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), sent: true, ..Default::default() @@ -2701,7 +2669,7 @@ fn test_single_ocw_pallet_integration() { let node_pending_request4 = PendingRequest { method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId=476814", host4, port), + uri: format!("http://{}:{}/activity/nodes?eraId=5738616", host4, port), response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), sent: true, ..Default::default() @@ -2709,7 +2677,7 @@ fn test_single_ocw_pallet_integration() { let node_pending_request5 = PendingRequest { method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId=476814", host5, port), + uri: format!("http://{}:{}/activity/nodes?eraId=5738616", host5, port), response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), sent: true, ..Default::default() @@ -2717,7 +2685,7 @@ fn test_single_ocw_pallet_integration() { let node_pending_request6 = PendingRequest { method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId=476814", host6, port), + uri: format!("http://{}:{}/activity/nodes?eraId=5738616", host6, port), response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), sent: true, ..Default::default() @@ -2725,7 +2693,7 @@ fn test_single_ocw_pallet_integration() { let node_pending_request7 = PendingRequest { method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId=476814", host7, port), + uri: format!("http://{}:{}/activity/nodes?eraId=5738616", host7, port), response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), sent: true, ..Default::default() @@ -2733,7 +2701,7 @@ fn test_single_ocw_pallet_integration() { let node_pending_request8 = PendingRequest { method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId=476814", host8, port), + uri: format!("http://{}:{}/activity/nodes?eraId=5738616", host8, port), response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), sent: true, ..Default::default() @@ -2741,7 +2709,7 @@ fn test_single_ocw_pallet_integration() { let node_pending_request9 = PendingRequest { method: "GET".to_string(), - uri: format!("http://{}:{}/activity/nodes?eraId=476814", host9, port), + uri: format!("http://{}:{}/activity/nodes?eraId=5738616", host9, port), response: Some(br#"[{"node_id": "0x48594f1fd4f05135914c42b03e63b61f6a3e4c537ccee3dbac555ef6df371b7e","stored_bytes": 675613289,"transferred_bytes": 1097091579,"number_of_puts": 889,"number_of_gets": 97},{"node_id": "0x9ef98ad9c3626ba725e78d76cfcfc4b4d07e84f0388465bc7eb992e3e117234a","stored_bytes": 0, "transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]"#.to_vec()), sent: true, ..Default::default() @@ -2749,7 +2717,7 @@ fn test_single_ocw_pallet_integration() { let bucket_pending_request1 = PendingRequest { method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId=476814", host1, port), + uri: format!("http://{}:{}/activity/buckets?eraId=5738616", host1, port), response: Some(br#"[{"bucket_id": 90235,"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), sent: true, ..Default::default() @@ -2757,7 +2725,7 @@ fn test_single_ocw_pallet_integration() { let bucket_pending_request2 = PendingRequest { method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId=476814", host2, port), + uri: format!("http://{}:{}/activity/buckets?eraId=5738616", host2, port), response: Some(br#"[{"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"bucket_id": 90235,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), sent: true, ..Default::default() @@ -2765,7 +2733,7 @@ fn test_single_ocw_pallet_integration() { let bucket_pending_request3 = PendingRequest { method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId=476814", host3, port), + uri: format!("http://{}:{}/activity/buckets?eraId=5738616", host3, port), response: Some(br#"[{"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"bucket_id": 90235,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), sent: true, ..Default::default() @@ -2773,7 +2741,7 @@ fn test_single_ocw_pallet_integration() { let bucket_pending_request4 = PendingRequest { method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId=476814", host4, port), + uri: format!("http://{}:{}/activity/buckets?eraId=5738616", host4, port), response: Some(br#"[{"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"bucket_id": 90235,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), sent: true, ..Default::default() @@ -2781,7 +2749,7 @@ fn test_single_ocw_pallet_integration() { let bucket_pending_request5 = PendingRequest { method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId=476814", host5, port), + uri: format!("http://{}:{}/activity/buckets?eraId=5738616", host5, port), response: Some(br#"[{"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"bucket_id": 90235,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), sent: true, ..Default::default() @@ -2789,7 +2757,7 @@ fn test_single_ocw_pallet_integration() { let bucket_pending_request6 = PendingRequest { method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId=476814", host6, port), + uri: format!("http://{}:{}/activity/buckets?eraId=5738616", host6, port), response: Some(br#"[{"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"bucket_id": 90235,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), sent: true, ..Default::default() @@ -2797,7 +2765,7 @@ fn test_single_ocw_pallet_integration() { let bucket_pending_request7 = PendingRequest { method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId=476814", host7, port), + uri: format!("http://{}:{}/activity/buckets?eraId=5738616", host7, port), response: Some(br#"[{"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"bucket_id": 90235,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), sent: true, ..Default::default() @@ -2805,7 +2773,7 @@ fn test_single_ocw_pallet_integration() { let bucket_pending_request8 = PendingRequest { method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId=476814", host8, port), + uri: format!("http://{}:{}/activity/buckets?eraId=5738616", host8, port), response: Some(br#"[{"bucket_id": 90235,"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), sent: true, ..Default::default() @@ -2813,7 +2781,7 @@ fn test_single_ocw_pallet_integration() { let bucket_pending_request9 = PendingRequest { method: "GET".to_string(), - uri: format!("http://{}:{}/activity/buckets?eraId=476814", host9, port), + uri: format!("http://{}:{}/activity/buckets?eraId=5738616", host9, port), response: Some(br#"[{"bucket_id": 90235,"stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1,"sub_aggregates": [{"NodeID": "0xbe26b2458fb0c9df4ec26ec5ba083051402b2a3b9d4a7fe6106fe9f8b5efde2c","stored_bytes": 0,"transferred_bytes": 38,"number_of_puts": 0,"number_of_gets": 1}]}]"#.to_vec()), sent: true, ..Default::default() @@ -2848,12 +2816,10 @@ fn test_single_ocw_pallet_integration() { offchain_state.expect_request(bucket_pending_request9); drop(offchain_state); - // // Offchain worker should be triggered if block number is divided by 100 + // Offchain worker should be triggered if block number is divided by 100 let block = 500; System::set_block_number(block); - let cluster_id = ClusterId::from([12; 20]); - ClusterToValidate::::put(cluster_id); DdcVerification::offchain_worker(block); }); } diff --git a/primitives/src/traits/cluster.rs b/primitives/src/traits/cluster.rs index 563d613b2..efc95d807 100644 --- a/primitives/src/traits/cluster.rs +++ b/primitives/src/traits/cluster.rs @@ -94,6 +94,8 @@ pub trait ClusterManager: ClusterQuery { node_pub_key: &NodePubKey, succeeded: bool, ) -> Result<(), DispatchError>; + + fn get_clusters(status: ClusterStatus) -> Result, DispatchError>; } pub trait ClusterValidator { /// Updates the `last_validated_era_id` for the given cluster and emits an event indicating the @@ -113,4 +115,17 @@ pub trait ClusterValidator { /// /// Emits `ClusterEraValidated` event if the operation is successful. fn set_last_validated_era(cluster_id: &ClusterId, era_id: DdcEra) -> Result<(), DispatchError>; + + /// Retrieves the `last_validated_era_id` for the given cluster + /// update. + /// + /// # Parameters + /// + /// - `cluster_id`: A reference to the unique identifier of the cluster the + /// `last_validated_era_id` is being retrieved for + /// + /// # Returns + /// + /// Returns `Ok(DdcEra)` identifier of the last validated era in cluster + fn get_last_validated_era(cluster_id: &ClusterId) -> Result; } From 732d4a4c41283ae82d4dd225f4e1c58dc86fafc0 Mon Sep 17 00:00:00 2001 From: yahortsaryk Date: Fri, 25 Oct 2024 00:33:37 +0200 Subject: [PATCH 2/5] chore: migration for deprecated storage item --- pallets/ddc-verification/src/lib.rs | 25 ++------- pallets/ddc-verification/src/migrations.rs | 61 ++++++++++++++++++++++ runtime/cere-dev/src/lib.rs | 6 +-- runtime/cere/src/lib.rs | 2 +- 4 files changed, 70 insertions(+), 24 deletions(-) create mode 100644 pallets/ddc-verification/src/migrations.rs diff --git a/pallets/ddc-verification/src/lib.rs b/pallets/ddc-verification/src/lib.rs index c361f7748..4d9fe9ff0 100644 --- a/pallets/ddc-verification/src/lib.rs +++ b/pallets/ddc-verification/src/lib.rs @@ -56,6 +56,8 @@ pub(crate) mod mock; #[cfg(test)] mod tests; +pub mod migrations; + #[frame_support::pallet] pub mod pallet { @@ -68,7 +70,7 @@ pub mod pallet { /// The current storage version. const STORAGE_VERSION: frame_support::traits::StorageVersion = - frame_support::traits::StorageVersion::new(0); + frame_support::traits::StorageVersion::new(1); const SUCCESS_CODE: u16 = 200; const _BUF_SIZE: usize = 128; @@ -482,11 +484,6 @@ pub mod pallet { EraValidation, >; - /// Cluster id storage - #[pallet::storage] - #[pallet::getter(fn cluster_to_validate)] - pub type ClusterToValidate = StorageValue<_, ClusterId>; - /// List of validators. #[pallet::storage] #[pallet::getter(fn validator_set)] @@ -4003,20 +4000,8 @@ pub mod pallet { T::ClusterValidator::set_last_validated_era(&cluster_id, era_id) } - #[pallet::call_index(11)] - #[pallet::weight(::WeightInfo::create_billing_reports())] // todo! implement weights - pub fn set_cluster_to_validate( - origin: OriginFor, - cluster_id: ClusterId, - ) -> DispatchResult { - ensure_root(origin)?; - ClusterToValidate::::put(cluster_id); - - Ok(()) - } - // todo! Need to remove this - #[pallet::call_index(12)] + #[pallet::call_index(11)] #[pallet::weight(::WeightInfo::create_billing_reports())] // todo! implement weights pub fn set_current_validator(origin: OriginFor) -> DispatchResult { let validator = ensure_signed(origin)?; @@ -4029,7 +4014,7 @@ pub mod pallet { } // todo! remove this after devnet testing - #[pallet::call_index(13)] + #[pallet::call_index(12)] #[pallet::weight(::WeightInfo::create_billing_reports())] // todo! implement weights pub fn set_era_validations( origin: OriginFor, diff --git a/pallets/ddc-verification/src/migrations.rs b/pallets/ddc-verification/src/migrations.rs new file mode 100644 index 000000000..51f1bd044 --- /dev/null +++ b/pallets/ddc-verification/src/migrations.rs @@ -0,0 +1,61 @@ +use frame_support::{migration, traits::OnRuntimeUpgrade}; +use log; + +use super::*; + +const LOG_TARGET: &str = "ddc-verification"; + +pub mod v1 { + use frame_support::pallet_prelude::*; + + use super::*; + + pub fn migrate_to_v1() -> Weight { + let on_chain_version = Pallet::::on_chain_storage_version(); + let current_version = Pallet::::current_storage_version(); + + log::info!( + target: LOG_TARGET, + "Running migration with current storage version {:?} / onchain {:?}", + current_version, + on_chain_version + ); + + if on_chain_version == 0 && current_version == 1 { + log::info!(target: LOG_TARGET, "Running migration to v1."); + + let res = migration::clear_storage_prefix( + >::name().as_bytes(), + b"ClusterToValidate", + b"", + None, + None, + ); + + log::info!( + target: LOG_TARGET, + "Cleared '{}' entries from 'ClusterToValidate' storage prefix.", + res.unique + ); + + if res.maybe_cursor.is_some() { + log::error!( + target: LOG_TARGET, + "Storage prefix 'ClusterToValidate' is not completely cleared." + ); + } + + T::DbWeight::get().reads_writes(1, res.unique.into()) + } else { + log::info!(target: LOG_TARGET, " >>> Unused migration!"); + T::DbWeight::get().reads(1) + } + } + + pub struct MigrateToV1(sp_std::marker::PhantomData); + impl OnRuntimeUpgrade for MigrateToV1 { + fn on_runtime_upgrade() -> Weight { + migrate_to_v1::() + } + } +} diff --git a/runtime/cere-dev/src/lib.rs b/runtime/cere-dev/src/lib.rs index 0d25f2199..8478712b7 100644 --- a/runtime/cere-dev/src/lib.rs +++ b/runtime/cere-dev/src/lib.rs @@ -149,10 +149,10 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { // and set impl_version to 0. If only runtime // implementation changes and behavior does not, then leave spec_version as // is and increment impl_version. - spec_version: 60001, + spec_version: 60002, impl_version: 0, apis: RUNTIME_API_VERSIONS, - transaction_version: 19, + transaction_version: 20, state_version: 0, }; @@ -1400,7 +1400,7 @@ pub type SignedPayload = generic::SignedPayload; pub type CheckedExtrinsic = generic::CheckedExtrinsic; /// Runtime migrations -type Migrations = (); +type Migrations = pallet_ddc_verification::migrations::v1::MigrateToV1; /// Executive: handles dispatch to the various modules. pub type Executive = frame_executive::Executive< diff --git a/runtime/cere/src/lib.rs b/runtime/cere/src/lib.rs index 10b4cd473..54c50ea78 100644 --- a/runtime/cere/src/lib.rs +++ b/runtime/cere/src/lib.rs @@ -1444,7 +1444,7 @@ pub mod migrations { } /// Unreleased migrations. Add new ones here: - pub type Unreleased = (UpgradeSessionKeys,); + pub type Unreleased = UpgradeSessionKeys; } /// Executive: handles dispatch to the various modules. pub type Executive = frame_executive::Executive< From 0b02661144a378a936c99e2b02c9eac81ed7c3b1 Mon Sep 17 00:00:00 2001 From: yahortsaryk Date: Fri, 25 Oct 2024 11:08:12 +0200 Subject: [PATCH 3/5] fix: updating storage version --- pallets/ddc-verification/src/lib.rs | 12 +++++++----- pallets/ddc-verification/src/migrations.rs | 8 ++++++++ runtime/cere/src/lib.rs | 3 ++- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/pallets/ddc-verification/src/lib.rs b/pallets/ddc-verification/src/lib.rs index 4d9fe9ff0..4b04db4a6 100644 --- a/pallets/ddc-verification/src/lib.rs +++ b/pallets/ddc-verification/src/lib.rs @@ -2921,7 +2921,8 @@ pub mod pallet { dac_nodes: &[(NodePubKey, StorageNodeParams)], ) -> Result, OCWError> { let this_validator_data = Self::fetch_current_validator()?; - let this_validator = T::AccountId::decode(&mut &this_validator_data[..]).unwrap(); + let this_validator = T::AccountId::decode(&mut &this_validator_data[..]) + .map_err(|_| OCWError::FailedToFetchCurrentValidator)?; let last_validated_era_by_this_validator = Self::get_last_validated_era(cluster_id, this_validator)? @@ -2942,7 +2943,7 @@ pub mod pallet { Self::fetch_processed_era_for_nodes(cluster_id, dac_nodes)?; // we want to let the current validator to validate available processed/completed eras - // that are greater from the last validated era in the cluster + // that are greater than the last validated era in the cluster let processed_eras_to_validate: Vec = available_processed_eras .iter() .flat_map(|eras| { @@ -3375,7 +3376,7 @@ pub mod pallet { continue; } - let aggregates = aggregates_res.unwrap(); + let aggregates = aggregates_res.expect("Nodes Aggregates Response to be available"); // todo: this is tech debt that needs to be refactored, the mapping logic needs to // be moved to payouts pallet @@ -3421,7 +3422,8 @@ pub mod pallet { continue; } - let aggregates = aggregates_res.unwrap(); + let aggregates = + aggregates_res.expect("Buckets Aggregates Response to be available"); bucket_aggregates.push(( AggregatorInfo { @@ -3456,7 +3458,7 @@ pub mod pallet { // skip unavailable aggregators and continue with available ones continue; } else { - let eras = processed_eras_by_node.unwrap(); + let eras = processed_eras_by_node.expect("Era Response to be available"); if !eras.is_empty() { processed_eras_by_nodes .push(eras.into_iter().map(|e| e.into()).collect::>()); diff --git a/pallets/ddc-verification/src/migrations.rs b/pallets/ddc-verification/src/migrations.rs index 51f1bd044..c9aa04a06 100644 --- a/pallets/ddc-verification/src/migrations.rs +++ b/pallets/ddc-verification/src/migrations.rs @@ -45,6 +45,14 @@ pub mod v1 { ); } + // Update storage version. + StorageVersion::new(1).put::>(); + log::info!( + target: LOG_TARGET, + "Storage migrated to version {:?}", + current_version + ); + T::DbWeight::get().reads_writes(1, res.unique.into()) } else { log::info!(target: LOG_TARGET, " >>> Unused migration!"); diff --git a/runtime/cere/src/lib.rs b/runtime/cere/src/lib.rs index 54c50ea78..79cf8efcb 100644 --- a/runtime/cere/src/lib.rs +++ b/runtime/cere/src/lib.rs @@ -1444,7 +1444,8 @@ pub mod migrations { } /// Unreleased migrations. Add new ones here: - pub type Unreleased = UpgradeSessionKeys; + pub type Unreleased = + (UpgradeSessionKeys, pallet_ddc_verification::migrations::v1::MigrateToV1); } /// Executive: handles dispatch to the various modules. pub type Executive = frame_executive::Executive< From 492c0466cc9051d2bb44d23e6feca4e1f500ca94 Mon Sep 17 00:00:00 2001 From: yahortsaryk Date: Fri, 25 Oct 2024 17:24:45 +0200 Subject: [PATCH 4/5] chore: more logging info for tracing --- Cargo.lock | 1 + pallets/ddc-verification/src/lib.rs | 66 +++++++++++++++++------------ primitives/Cargo.toml | 2 + primitives/src/lib.rs | 13 +++++- runtime/cere-dev/src/lib.rs | 2 +- 5 files changed, 54 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c55ccb9cc..184ce86c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1851,6 +1851,7 @@ dependencies = [ "blake2", "frame-support", "frame-system", + "hex", "parity-scale-codec", "polkadot-ckb-merkle-mountain-range", "scale-info", diff --git a/pallets/ddc-verification/src/lib.rs b/pallets/ddc-verification/src/lib.rs index 4b04db4a6..1804790fd 100644 --- a/pallets/ddc-verification/src/lib.rs +++ b/pallets/ddc-verification/src/lib.rs @@ -917,7 +917,7 @@ pub mod pallet { let signer = Signer::::any_account(); if !signer.can_sign() { - log::error!("🚨No OCW is available."); + log::error!("🚨 No OCW is available."); return; } @@ -949,6 +949,8 @@ pub mod pallet { "🏭❌ Error retrieving clusters to validate" ); + log::info!("🎑 {:?} of 'Activated' clusters found", clusters_ids.len()); + for cluster_id in clusters_ids { let batch_size = T::MAX_PAYOUT_BATCH_SIZE; let mut errors: Vec = Vec::new(); @@ -1010,7 +1012,7 @@ pub mod pallet { } }, Ok(None) => { - log::info!("πŸ­β„ΉοΈ No eras for DAC process for cluster_id: {:?}", cluster_id); + log::info!("πŸ­β„ΉοΈ No eras for DAC process for cluster_id: {:?}", cluster_id); }, Err(process_errors) => { errors.extend(process_errors); @@ -2928,16 +2930,18 @@ pub mod pallet { Self::get_last_validated_era(cluster_id, this_validator)? .unwrap_or_else(DdcEra::default); - log::info!( - "πŸš€ last_validated_era_by_this_validator for cluster_id: {:?}", - last_validated_era_by_this_validator - ); - let last_validated_era_for_cluster = T::ClusterValidator::get_last_validated_era(cluster_id).map_err(|_| { OCWError::EraRetrievalError { cluster_id: *cluster_id, node_pub_key: None } })?; + log::info!( + "ℹ️ The last era validated by this specific validator for cluster_id: {:?} is {:?}. The last overall validated era for the cluster is {:?}", + cluster_id, + last_validated_era_by_this_validator, + last_validated_era_for_cluster + ); + // we want to fetch processed eras from all available validators let available_processed_eras = Self::fetch_processed_era_for_nodes(cluster_id, dac_nodes)?; @@ -2965,13 +2969,16 @@ pub mod pallet { for (era_key, candidates) in &processed_eras_to_validate.into_iter().chunk_by(|elt| elt.clone()) { - if candidates.count() >= threshold { + let count = candidates.count(); + if count >= threshold { processed_eras_with_quorum.push(era_key); } else { log::warn!( - "⚠️ Era {:?} in cluster_id: {:?} has been reported with unmet quorum", + "⚠️ Era {:?} in cluster_id: {:?} has been reported with unmet quorum. Desired: {:?} Actual: {:?}", era_key, - cluster_id + cluster_id, + threshold, + count ); } } @@ -3330,13 +3337,10 @@ pub mod pallet { if let Ok(NodeParams::StorageParams(storage_params)) = T::NodeVisitor::get_node_params(&node_pub_key) { - let NodePubKey::StoragePubKey(key) = node_pub_key.clone(); - let node_pub_key_ref: &[u8; 32] = key.as_ref(); - let node_pub_key_string = hex::encode(node_pub_key_ref); log::info!( - "πŸ­πŸ“Get DAC Node for cluster_id: {:?} and node_pub_key: {:?}", + "πŸ­πŸ“ Obtained DAC Node for cluster_id: {:?} and with key: {:?}", cluster_id, - node_pub_key_string + node_pub_key.get_hex() ); // Add to the results if the mode matches @@ -3365,12 +3369,14 @@ pub mod pallet { ) -> Result)>, OCWError> { let mut nodes_aggregates = Vec::new(); - for (node_pub_key, node_params) in dac_nodes { + for (node_key, node_params) in dac_nodes { let aggregates_res = Self::fetch_node_aggregates(cluster_id, era_id, node_params); if aggregates_res.is_err() { log::warn!( - "Aggregator with key {:?} is unavailable while fetching nodes aggregates", - node_pub_key + "Aggregator from cluster {:?} is unavailable while fetching nodes aggregates. Key: {:?} Host: {:?}", + cluster_id, + node_key.get_hex(), + String::from_utf8(node_params.host.clone()) ); // skip unavailable aggregators and continue with available ones continue; @@ -3381,13 +3387,13 @@ pub mod pallet { // todo: this is tech debt that needs to be refactored, the mapping logic needs to // be moved to payouts pallet for aggregate in aggregates.clone() { - let provider_id = Self::get_node_provider_id(node_pub_key).unwrap(); + let provider_id = Self::get_node_provider_id(node_key).unwrap(); Self::store_provider_id(aggregate.node_id, provider_id); } nodes_aggregates.push(( AggregatorInfo { - node_pub_key: node_pub_key.clone(), + node_pub_key: node_key.clone(), node_params: node_params.clone(), }, aggregates, @@ -3411,12 +3417,14 @@ pub mod pallet { let mut bucket_aggregates: Vec<(AggregatorInfo, Vec)> = Vec::new(); - for (node_pub_key, node_params) in dac_nodes { + for (node_key, node_params) in dac_nodes { let aggregates_res = Self::fetch_bucket_aggregates(cluster_id, era_id, node_params); if aggregates_res.is_err() { log::warn!( - "Aggregator with key {:?} is unavailable while fetching buckets aggregates", - node_pub_key + "Aggregator from cluster {:?} is unavailable while fetching nodes aggregates. Key: {:?} Host: {:?}", + cluster_id, + node_key.get_hex(), + String::from_utf8(node_params.host.clone()) ); // skip unavailable aggregators and continue with available ones continue; @@ -3427,7 +3435,7 @@ pub mod pallet { bucket_aggregates.push(( AggregatorInfo { - node_pub_key: node_pub_key.clone(), + node_pub_key: node_key.clone(), node_params: node_params.clone(), }, aggregates, @@ -3443,17 +3451,19 @@ pub mod pallet { /// - `cluster_id`: Cluster id /// - `node_params`: DAC node parameters fn fetch_processed_era_for_nodes( - _cluster_id: &ClusterId, + cluster_id: &ClusterId, dac_nodes: &[(NodePubKey, StorageNodeParams)], ) -> Result>, OCWError> { let mut processed_eras_by_nodes: Vec> = Vec::new(); - for (node_pub_key, node_params) in dac_nodes { + for (node_key, node_params) in dac_nodes { let processed_eras_by_node = Self::fetch_processed_eras(node_params); if processed_eras_by_node.is_err() { log::warn!( - "Aggregator with key {:?} is unavailable while fetching eras", - node_pub_key + "Aggregator from cluster {:?} is unavailable while fetching nodes aggregates. Key: {:?} Host: {:?}", + cluster_id, + node_key.get_hex(), + String::from_utf8(node_params.host.clone()) ); // skip unavailable aggregators and continue with available ones continue; diff --git a/primitives/Cargo.toml b/primitives/Cargo.toml index e89aebc6d..e84193961 100644 --- a/primitives/Cargo.toml +++ b/primitives/Cargo.toml @@ -13,6 +13,7 @@ blake2 = { workspace = true } codec = { workspace = true } frame-support = { workspace = true } frame-system = { workspace = true } +hex = { workspace = true } polkadot-ckb-merkle-mountain-range = { workspace = true } scale-info = { workspace = true } serde = { workspace = true } @@ -24,6 +25,7 @@ sp-std = { workspace = true } [features] default = ["std"] std = [ + "hex/std", "blake2/std", "polkadot-ckb-merkle-mountain-range/std", "codec/std", diff --git a/primitives/src/lib.rs b/primitives/src/lib.rs index 894bdeb40..5213fe5ff 100644 --- a/primitives/src/lib.rs +++ b/primitives/src/lib.rs @@ -4,7 +4,10 @@ use blake2::{Blake2s256, Digest}; use codec::{Decode, Encode}; use frame_support::parameter_types; use polkadot_ckb_merkle_mountain_range::Merge; -use scale_info::{prelude::vec::Vec, TypeInfo}; +use scale_info::{ + prelude::{string::String, vec::Vec}, + TypeInfo, +}; use serde::{Deserialize, Serialize}; use sp_core::{crypto::KeyTypeId, hash::H160}; use sp_runtime::{AccountId32, Perquintill, RuntimeDebug}; @@ -123,6 +126,14 @@ pub enum NodePubKey { StoragePubKey(StorageNodePubKey), } +impl NodePubKey { + pub fn get_hex(&self) -> String { + match self { + NodePubKey::StoragePubKey(pub_key_ref) => hex::encode(pub_key_ref), + } + } +} + #[derive(Clone, Encode, Decode, RuntimeDebug, TypeInfo, PartialEq)] pub enum NodeType { Storage = 1, diff --git a/runtime/cere-dev/src/lib.rs b/runtime/cere-dev/src/lib.rs index 8478712b7..11f8a2600 100644 --- a/runtime/cere-dev/src/lib.rs +++ b/runtime/cere-dev/src/lib.rs @@ -1296,7 +1296,7 @@ impl pallet_ddc_verification::Config for Runtime { type OffchainIdentifierId = ddc_primitives::crypto::OffchainIdentifierId; type ActivityHasher = BlakeTwo256; const MAJORITY: u8 = 67; - const BLOCK_TO_START: u16 = 30; // every 100 blocks + const BLOCK_TO_START: u16 = 30; // every 30 blocks const DAC_REDUNDANCY_FACTOR: u16 = 3; type AggregatorsQuorum = MajorityOfAggregators; const MAX_PAYOUT_BATCH_SIZE: u16 = MAX_PAYOUT_BATCH_SIZE; From 33b0438a193cbf202642b02f9283d80be961d015 Mon Sep 17 00:00:00 2001 From: yahortsaryk Date: Fri, 25 Oct 2024 17:56:38 +0200 Subject: [PATCH 5/5] fix: log message --- pallets/ddc-verification/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pallets/ddc-verification/src/lib.rs b/pallets/ddc-verification/src/lib.rs index 1804790fd..158e79eb9 100644 --- a/pallets/ddc-verification/src/lib.rs +++ b/pallets/ddc-verification/src/lib.rs @@ -3421,7 +3421,7 @@ pub mod pallet { let aggregates_res = Self::fetch_bucket_aggregates(cluster_id, era_id, node_params); if aggregates_res.is_err() { log::warn!( - "Aggregator from cluster {:?} is unavailable while fetching nodes aggregates. Key: {:?} Host: {:?}", + "Aggregator from cluster {:?} is unavailable while fetching buckets aggregates. Key: {:?} Host: {:?}", cluster_id, node_key.get_hex(), String::from_utf8(node_params.host.clone()) @@ -3460,7 +3460,7 @@ pub mod pallet { let processed_eras_by_node = Self::fetch_processed_eras(node_params); if processed_eras_by_node.is_err() { log::warn!( - "Aggregator from cluster {:?} is unavailable while fetching nodes aggregates. Key: {:?} Host: {:?}", + "Aggregator from cluster {:?} is unavailable while fetching processed eras. Key: {:?} Host: {:?}", cluster_id, node_key.get_hex(), String::from_utf8(node_params.host.clone())