From 20fdcf78985c4c2cc50e56152737d867d24fb3de Mon Sep 17 00:00:00 2001 From: nanocryk <6422796+nanocryk@users.noreply.github.com> Date: Tue, 3 Sep 2024 12:21:45 +0200 Subject: [PATCH] spawner mock --- Cargo.lock | 2 + client/service-container-chain/Cargo.toml | 4 + .../src/data_preservers.rs | 270 +++++++++++++++++- client/service-container-chain/src/spawner.rs | 38 ++- 4 files changed, 305 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 61d3f383c..b0e8aae28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17521,6 +17521,7 @@ dependencies = [ "node-common", "pallet-author-noting-runtime-api", "pallet-data-preservers", + "polkadot-overseer", "polkadot-primitives", "sc-basic-authorship", "sc-chain-spec", @@ -17554,6 +17555,7 @@ dependencies = [ "substrate-prometheus-endpoint", "tc-consensus", "tokio", + "tokio-stream", "tokio-util", ] diff --git a/client/service-container-chain/Cargo.toml b/client/service-container-chain/Cargo.toml index eba3de681..181b036aa 100644 --- a/client/service-container-chain/Cargo.toml +++ b/client/service-container-chain/Cargo.toml @@ -82,6 +82,10 @@ cumulus-relay-chain-interface = { workspace = true } nimbus-consensus = { workspace = true } nimbus-primitives = { workspace = true } +[dev-dependencies] +polkadot-overseer = { workspace = true } +tokio-stream = { workspace = true } + [build-dependencies] substrate-build-script-utils = { workspace = true } diff --git a/client/service-container-chain/src/data_preservers.rs b/client/service-container-chain/src/data_preservers.rs index d69a53fc5..36d9cef0e 100644 --- a/client/service-container-chain/src/data_preservers.rs +++ b/client/service-container-chain/src/data_preservers.rs @@ -15,7 +15,7 @@ // along with Tanssi. If not, see use { - crate::spawner::{ContainerChainSpawner, TSelectSyncMode}, + crate::spawner::{ContainerChainSpawner, Spawner, TSelectSyncMode}, dc_orchestrator_chain_interface::{ DataPreserverAssignment, OrchestratorChainInterface, OrchestratorChainResult, }, @@ -32,14 +32,11 @@ async fn try_fut(fut: impl Future>) -> Result /// Watch assignements by indefinitly listening to finalized block notifications and switching to /// the chain the profile is assigned to. -pub async fn task_watch_assignment( - spawner: ContainerChainSpawner, - profile_id: ProfileId, -) { +pub async fn task_watch_assignment(spawner: impl Spawner, profile_id: ProfileId) { use dc_orchestrator_chain_interface::DataPreserverAssignment as Assignment; if let OrchestratorChainResult::Err(e) = try_fut(async move { - let orchestrator_chain_interface = spawner.params.orchestrator_chain_interface.clone(); + let orchestrator_chain_interface = spawner.orchestrator_chain_interface(); let mut current_assignment = DataPreserverAssignment::::NotAssigned; @@ -100,3 +97,264 @@ pub async fn task_watch_assignment( log::error!("Error in data preservers assignement watching task: {e:?}"); } } + +#[cfg(test)] +mod tests { + use { + super::*, + dc_orchestrator_chain_interface::{ + BlockNumber, DataPreserverProfileId, OrchestratorChainError, PHash, PHeader, + }, + dp_container_chain_genesis_data::ContainerChainGenesisData, + futures::{FutureExt, Stream}, + polkadot_overseer::Handle, + sc_client_api::StorageProof, + sp_core::H256, + sp_runtime::offchain::storage::StorageValue, + std::{ + collections::{BTreeMap, HashSet}, + path::PathBuf, + pin::Pin, + sync::{Arc, Mutex}, + }, + tokio::sync::{broadcast, oneshot}, + }; + + struct MockChainInterface { + state: Mutex, + notification_sender: broadcast::Sender, + } + + struct MockChainInterfaceState { + next_block_number: BlockNumber, + blocks: BTreeMap, + } + + struct BlockAssignment { + assignments: BTreeMap>, + } + + impl MockChainInterface { + fn new() -> Self { + Self { + state: Mutex::new(MockChainInterfaceState { + next_block_number: 0, + blocks: BTreeMap::new(), + }), + + notification_sender: broadcast::Sender::new(100), + } + } + + fn mock_block(&self, assignments: BTreeMap>) { + let mut state = self.state.lock().unwrap(); + state.next_block_number += 1; + + let header = PHeader { + parent_hash: H256::zero(), + number: state.next_block_number, + state_root: H256::zero(), + extrinsics_root: H256::zero(), + digest: Default::default(), + }; + let hash = header.hash(); + + state.blocks.insert(hash, BlockAssignment { assignments }); + + self.notification_sender + .send(header) + .expect("to properly send block header"); + } + } + + #[async_trait::async_trait] + impl OrchestratorChainInterface for MockChainInterface { + fn overseer_handle(&self) -> OrchestratorChainResult { + unimplemented!("not used in test") + } + + async fn get_storage_by_key( + &self, + _orchestrator_parent: PHash, + _key: &[u8], + ) -> OrchestratorChainResult>> { + unimplemented!("not used in test") + } + + async fn prove_read( + &self, + _orchestrator_parent: PHash, + _relevant_keys: &Vec>, + ) -> OrchestratorChainResult { + unimplemented!("not used in test") + } + + async fn import_notification_stream( + &self, + ) -> OrchestratorChainResult + Send>>> { + unimplemented!("not used in test") + } + + async fn new_best_notification_stream( + &self, + ) -> OrchestratorChainResult + Send>>> { + unimplemented!("not used in test") + } + + async fn finality_notification_stream( + &self, + ) -> OrchestratorChainResult + Send>>> { + let receiver = self.notification_sender.subscribe(); + let stream = tokio_stream::wrappers::BroadcastStream::new(receiver) + .filter_map(|x| async { x.ok() }); + let stream = Box::pin(stream); + Ok(stream) + } + + async fn genesis_data( + &self, + _orchestrator_parent: PHash, + _para_id: ParaId, + ) -> OrchestratorChainResult> { + unimplemented!("not used in test") + } + + async fn boot_nodes( + &self, + _orchestrator_parent: PHash, + _para_id: ParaId, + ) -> OrchestratorChainResult>> { + unimplemented!("not used in test") + } + + async fn latest_block_number( + &self, + _orchestrator_parent: PHash, + _para_id: ParaId, + ) -> OrchestratorChainResult> { + unimplemented!("not used in test") + } + + async fn best_block_hash(&self) -> OrchestratorChainResult { + unimplemented!("not used in test") + } + + async fn finalized_block_hash(&self) -> OrchestratorChainResult { + unimplemented!("not used in test") + } + + async fn data_preserver_active_assignment( + &self, + orchestrator_parent: PHash, + profile_id: DataPreserverProfileId, + ) -> OrchestratorChainResult> { + let mut state = self.state.lock().unwrap(); + let block = state.blocks.get_mut(&orchestrator_parent).ok_or_else(|| { + OrchestratorChainError::GenericError("this block is not mocked".into()) + })?; + + Ok(block + .assignments + .get(&profile_id) + .cloned() + .unwrap_or(DataPreserverAssignment::NotAssigned)) + } + } + + #[derive(Debug, PartialEq, Eq, Hash)] + enum SpawnerEvent { + Started(ParaId, bool), + Stopped(ParaId, bool), + } + + #[derive(Clone)] + struct MockSpawner { + state: Arc>>, + chain_interface: Arc, + } + + impl MockSpawner { + fn new() -> Self { + Self { + state: Arc::new(Mutex::new(HashSet::new())), + chain_interface: Arc::new(MockChainInterface::new()), + } + } + + fn set_expectations(&self, events: Vec) { + let mut set = self.state.lock().unwrap(); + + set.clear(); + + for e in events { + set.insert(e); + } + } + + fn ensure_all_events_were_emitted(&self) { + let set = self.state.lock().unwrap(); + + assert!(set.is_empty(), "Not all events were emitted: {set:?}"); + } + } + + impl Spawner for MockSpawner { + fn orchestrator_chain_interface(&self) -> Arc { + self.chain_interface.clone() + } + + /// Try to start a new container chain. In case of an error, this does not stop the node, and + /// the container chain will be attempted to spawn again when the collator is reassigned to it. + /// + /// It is possible that we try to spawn-stop-spawn the same chain, and the second spawn fails + /// because the chain has not stopped yet, because `stop` does not wait for the chain to stop, + /// so before calling `spawn` make sure to call `wait_for_paritydb_lock` before, like we do in + /// `handle_update_assignment`. + fn spawn( + &self, + container_chain_para_id: ParaId, + start_collation: bool, + ) -> impl std::future::Future + Send { + let mut set = self.state.lock().unwrap(); + + let event = SpawnerEvent::Started(container_chain_para_id, start_collation); + + assert!(set.remove(&event), "Unexpected event {event:?}"); + + async {} + } + + /// Stop a container chain. Prints a warning if the container chain was not running. + /// Returns the database path for the container chain, can be used with `wait_for_paritydb_lock` + /// to ensure that the container chain has fully stopped. The database path can be `None` if the + /// chain was not running. + fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) -> Option { + let mut set = self.state.lock().unwrap(); + + let event = SpawnerEvent::Stopped(container_chain_para_id, keep_db); + + assert!(set.remove(&event), "Unexpected event {event:?}"); + + None + } + } + + #[tokio::test] + async fn task_logic_works() { + let spawner = MockSpawner::new(); + + let profile_id = 0; + let para_id1 = ParaId::from(1); + let para_id2 = ParaId::from(2); + + tokio::spawn(task_watch_assignment(spawner.clone(), profile_id)); + + spawner.set_expectations(vec![SpawnerEvent::Started(para_id1, false)]); + spawner.chain_interface.mock_block({ + let mut map = BTreeMap::new(); + map.insert(profile_id, DataPreserverAssignment::Active(para_id1)); + map + }); + spawner.ensure_all_events_were_emitted(); + } +} diff --git a/client/service-container-chain/src/spawner.rs b/client/service-container-chain/src/spawner.rs index 96989c6d3..c0d4cb8c5 100644 --- a/client/service-container-chain/src/spawner.rs +++ b/client/service-container-chain/src/spawner.rs @@ -531,7 +531,37 @@ async fn try_spawn( Ok(()) } -impl ContainerChainSpawner { +/// Interface for spawning and stopping container chain embeded nodes. +pub trait Spawner { + /// Access to the Orchestrator Chain Interface + fn orchestrator_chain_interface(&self) -> Arc; + + /// Try to start a new container chain. In case of an error, this does not stop the node, and + /// the container chain will be attempted to spawn again when the collator is reassigned to it. + /// + /// It is possible that we try to spawn-stop-spawn the same chain, and the second spawn fails + /// because the chain has not stopped yet, because `stop` does not wait for the chain to stop, + /// so before calling `spawn` make sure to call `wait_for_paritydb_lock` before, like we do in + /// `handle_update_assignment`. + fn spawn( + &self, + container_chain_para_id: ParaId, + start_collation: bool, + ) -> impl std::future::Future + Send; + + /// Stop a container chain. Prints a warning if the container chain was not running. + /// Returns the database path for the container chain, can be used with `wait_for_paritydb_lock` + /// to ensure that the container chain has fully stopped. The database path can be `None` if the + /// chain was not running. + fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) -> Option; +} + +impl Spawner for ContainerChainSpawner { + /// Access to the Orchestrator Chain Interface + fn orchestrator_chain_interface(&self) -> Arc { + self.params.orchestrator_chain_interface.clone() + } + /// Try to start a new container chain. In case of an error, this does not stop the node, and /// the container chain will be attempted to spawn again when the collator is reassigned to it. /// @@ -539,7 +569,7 @@ impl ContainerChainSpawner { /// because the chain has not stopped yet, because `stop` does not wait for the chain to stop, /// so before calling `spawn` make sure to call `wait_for_paritydb_lock` before, like we do in /// `handle_update_assignment`. - pub async fn spawn(&self, container_chain_para_id: ParaId, start_collation: bool) { + async fn spawn(&self, container_chain_para_id: ParaId, start_collation: bool) { let try_spawn_params = self.params.clone(); let state = self.state.clone(); let state2 = state.clone(); @@ -570,7 +600,7 @@ impl ContainerChainSpawner { /// Returns the database path for the container chain, can be used with `wait_for_paritydb_lock` /// to ensure that the container chain has fully stopped. The database path can be `None` if the /// chain was not running. - pub fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) -> Option { + fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) -> Option { let mut state = self.state.lock().expect("poison error"); let stop_handle = state .spawned_container_chains @@ -604,7 +634,9 @@ impl ContainerChainSpawner { } } } +} +impl ContainerChainSpawner { /// Receive and process `CcSpawnMsg`s indefinitely pub async fn rx_loop(mut self, mut rx: mpsc::UnboundedReceiver, validator: bool) { // The node always starts as an orchestrator chain collator.