Skip to content

Commit

Permalink
feat(e2e): use ContainedNode
Browse files Browse the repository at this point in the history
Co-authored-by: Parsa Ghadimi <[email protected]>
  • Loading branch information
matthias-wright and qti3e committed May 9, 2024
1 parent 76adbdb commit aa38f87
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 58 deletions.
63 changes: 25 additions & 38 deletions core/e2e/src/containerized_node.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
use std::sync::Mutex;

use fleek_crypto::AccountOwnerSecretKey;
use futures::Future;
use lightning_blockstore::blockstore::Blockstore;
use lightning_final_bindings::FinalTypes;
use lightning_interfaces::fdi::MultiThreadedProvider;
use lightning_interfaces::prelude::*;
use lightning_interfaces::types::Staking;
use lightning_node::ContainedNode;
use lightning_rpc::Rpc;
use lightning_utils::config::TomlConfigProvider;

use crate::container::Container;

pub struct ContainerizedNode {
config: TomlConfigProvider<FinalTypes>,
owner_secret_key: AccountOwnerSecretKey,
container: Mutex<Option<Container<FinalTypes>>>,
runtime_type: RuntimeType,
node: ContainedNode<FinalTypes>,
index: usize,
genesis_stake: Staking,
is_genesis_committee: bool,
Expand All @@ -28,11 +26,13 @@ impl ContainerizedNode {
is_genesis_committee: bool,
genesis_stake: Staking,
) -> Self {
let provider = MultiThreadedProvider::default();
provider.insert(config.clone());
let node = ContainedNode::<FinalTypes>::new(provider, None);
Self {
config,
owner_secret_key,
container: Default::default(),
runtime_type: RuntimeType::MultiThreaded,
node,
index,
genesis_stake,
is_genesis_committee,
Expand All @@ -41,20 +41,19 @@ impl ContainerizedNode {

pub async fn start(&self) -> anyhow::Result<()> {
// This function has to return a result in order to use try_join_all in swarm.rs
*self.container.lock().unwrap() =
Some(Container::spawn(self.index, self.config.clone(), self.runtime_type).await);
let handle = self.node.spawn();
handle.await.unwrap()?;

Ok(())
}

pub fn shutdown(&self) {
if let Some(container) = &mut self.container.lock().unwrap().take() {
container.shutdown();
}
pub fn shutdown(self) -> impl Future<Output = ()> {
self.node.shutdown()
}

pub fn is_running(&self) -> bool {
self.container.lock().unwrap().is_some()
}
//pub fn is_running(&self) -> bool {
// self.node.lock().unwrap().is_some()
//}

pub fn get_rpc_address(&self) -> String {
let config = self.config.get::<Rpc<FinalTypes>>();
Expand All @@ -73,36 +72,24 @@ impl ContainerizedNode {
self.genesis_stake.clone()
}

pub fn take_syncronizer(&self) -> Option<fdi::Ref<c!(FinalTypes::SyncronizerInterface)>> {
let container = self.container.lock().unwrap().take();
if let Some(mut container) = container {
let syncronizer = container.take_ckpt_rx();
*self.container.lock().unwrap() = Some(container);
syncronizer
} else {
*self.container.lock().unwrap() = None;
None
}
pub fn take_syncronizer(&self) -> fdi::Ref<c!(FinalTypes::SyncronizerInterface)> {
self.node
.provider()
.get::<<FinalTypes as Collection>::SyncronizerInterface>()
}

pub fn take_blockstore(&self) -> Option<Blockstore<FinalTypes>> {
let container = self.container.lock().unwrap().take();
if let Some(mut container) = container {
let blockstore = container.take_blockstore();
*self.container.lock().unwrap() = Some(container);
blockstore
} else {
*self.container.lock().unwrap() = None;
None
}
pub fn take_blockstore(&self) -> Blockstore<FinalTypes> {
self.node
.provider()
.get::<<FinalTypes as Collection>::BlockstoreInterface>()
.clone()
}

pub fn is_genesis_committee(&self) -> bool {
self.is_genesis_committee
}
}

#[derive(Clone, Copy, Debug)]
pub enum RuntimeType {
SingleThreaded,
MultiThreaded,
Expand Down
25 changes: 17 additions & 8 deletions core/e2e/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ pub struct Swarm {

impl Drop for Swarm {
fn drop(&mut self) {
self.shutdown_internal();
for (_, node) in self.nodes.drain() {
drop(node.shutdown());
}
self.cleanup();
}
}

Expand Down Expand Up @@ -89,8 +92,15 @@ impl Swarm {
Ok(())
}

pub fn shutdown(mut self) {
self.shutdown_internal();
pub async fn shutdown(mut self) {
let mut handles = Vec::new();
for (_, node) in self.nodes.drain() {
handles.push(tokio::spawn(node.shutdown()));
}
for handle in handles {
handle.await.unwrap();
}
self.cleanup();
}

pub fn get_rpc_addresses(&self) -> HashMap<NodePublicKey, String> {
Expand Down Expand Up @@ -127,7 +137,7 @@ impl Swarm {
&self,
) -> Vec<(
NodePublicKey,
Option<fdi::Ref<c!(FinalTypes::SyncronizerInterface)>>,
fdi::Ref<c!(FinalTypes::SyncronizerInterface)>,
)> {
self.nodes
.iter()
Expand All @@ -136,19 +146,18 @@ impl Swarm {
.collect()
}

pub fn get_blockstores(&self) -> Vec<Option<Blockstore<FinalTypes>>> {
pub fn get_blockstores(&self) -> Vec<Blockstore<FinalTypes>> {
self.nodes
.values()
.map(|node| node.take_blockstore())
.collect()
}

pub fn get_blockstore(&self, node: &NodePublicKey) -> Option<Blockstore<FinalTypes>> {
self.nodes.get(node).and_then(|node| node.take_blockstore())
self.nodes.get(node).map(|node| node.take_blockstore())
}

fn shutdown_internal(&mut self) {
self.nodes.values().for_each(|node| node.shutdown());
fn cleanup(&mut self) {
if self.directory.exists() {
fs::remove_dir_all(&self.directory).expect("Failed to clean up swarm directory.");
}
Expand Down
2 changes: 1 addition & 1 deletion core/e2e/tests/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,6 @@ async fn e2e_checkpoint() -> Result<()> {
// TODO(matthias): read the block stores of all the nodes and make sure they all stored the
// checkpoint

swarm.shutdown();
swarm.shutdown().await;
Ok(())
}
10 changes: 8 additions & 2 deletions core/e2e/tests/epoch_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async fn e2e_epoch_change_all_nodes_on_committee() -> Result<()> {
assert_eq!(epoch, 1);
}

swarm.shutdown();
swarm.shutdown().await;
Ok(())
}

Expand Down Expand Up @@ -147,6 +147,8 @@ async fn e2e_epoch_change_with_edge_node() -> Result<()> {
.expect("Failed to parse response.");
assert_eq!(epoch, 1);
}

swarm.shutdown().await;
Ok(())
}

Expand Down Expand Up @@ -188,12 +190,14 @@ async fn e2e_committee_change() -> Result<()> {

// Make sure all nodes still all have the same committee.
compare_committee(swarm.get_rpc_addresses(), committee_size as usize).await;

swarm.shutdown().await;
Ok(())
}

#[tokio::test]
#[serial]
async fn test_staking_auction() -> Result<()> {
async fn e2e_test_staking_auction() -> Result<()> {
logging::setup();

// Start epoch now and let it end in 40 seconds.
Expand Down Expand Up @@ -328,6 +332,8 @@ async fn test_staking_auction() -> Result<()> {
} else {
assert!(!current_committee.contains(low_stake_nodes[1]));
}

swarm.shutdown().await;
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion core/e2e/tests/pinger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,6 @@ async fn e2e_detect_offline_node() -> Result<()> {
assert_eq!(node_info.participation, Participation::False);
}

swarm.shutdown();
swarm.shutdown().await;
Ok(())
}
8 changes: 4 additions & 4 deletions core/e2e/tests/syncronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async fn e2e_syncronize_state() -> Result<()> {
fs::remove_dir_all(&path).expect("Failed to clean up swarm directory before test.");
}
let swarm = Swarm::builder()
.with_directory(path)
.with_directory(path.clone())
.with_min_port(10600)
.with_num_nodes(5)
.with_committee_size(4)
Expand Down Expand Up @@ -64,8 +64,7 @@ async fn e2e_syncronize_state() -> Result<()> {

// Get the checkpoint receivers from the syncronizer for the node that is not on the genesis
// committee.
let (pubkey, syncro) = swarm.get_non_genesis_committee_syncronizer().pop().unwrap();
let syncronizer = syncro.unwrap();
let (pubkey, syncronizer) = swarm.get_non_genesis_committee_syncronizer().pop().unwrap();

// Wait for the syncronizer to detect that we are behind and send the checkpoint hash.
let ckpt_hash = syncronizer.next_checkpoint_hash().await;
Expand All @@ -79,6 +78,7 @@ async fn e2e_syncronize_state() -> Result<()> {
let hash = blake3::hash(&checkpoint);
assert_eq!(hash, ckpt_hash);

swarm.shutdown();
swarm.shutdown().await;

Ok(())
}
8 changes: 4 additions & 4 deletions core/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<C: Collection> ContainedNode<C> {
shutdown.install_ctrlc_handlers();

// Get the trigger permit from the shutdown controller to be passed into each thread.
let permit = shutdown.permit();
//let permit = shutdown.permit();

// Create the tokio runtime.
let worker_id = AtomicUsize::new(0);
Expand All @@ -53,10 +53,10 @@ impl<C: Collection> ContainedNode<C> {
format!("{node_name}#{id}")
})
.on_thread_start(move || {
let permit = permit.clone();
//let permit = permit.clone();
thread_local_panic_hook::update_hook(move |prev, info| {
tracing::error!("Uncaught panic in detected in worker. Shutting node down.");
permit.trigger_shutdown();
tracing::error!("Uncaught panic detected in worker.");
//permit.trigger_shutdown();
// bubble up and call the previous panic handler.
prev(info);
});
Expand Down

0 comments on commit aa38f87

Please sign in to comment.