From 5c821dc0d5a1b2a64b1b617ce6cc83942cc2010b Mon Sep 17 00:00:00 2001 From: asmaa-starkware <163830216+asmaastarkware@users.noreply.github.com> Date: Sun, 21 Jul 2024 14:56:50 +0300 Subject: [PATCH] chore(consensus): add consensus stagnation check (#2216) --- .../papyrus_consensus/run_consensus.py | 131 ++++++++++++++---- 1 file changed, 102 insertions(+), 29 deletions(-) diff --git a/crates/sequencing/papyrus_consensus/run_consensus.py b/crates/sequencing/papyrus_consensus/run_consensus.py index bec3a1c039..d084a99411 100644 --- a/crates/sequencing/papyrus_consensus/run_consensus.py +++ b/crates/sequencing/papyrus_consensus/run_consensus.py @@ -7,11 +7,45 @@ import socket from contextlib import closing - # The SECRET_KEY is used for building the BOOT_NODE_PEER_ID, so they are coupled and must be used together. SECRET_KEY = "0xabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcd" BOOT_NODE_PEER_ID = "12D3KooWDFYi71juk6dYWo3UDvqs5gAzGDc124LSvcR5d187Tdvi" +MONITORING_PERIOD = 10 + + +class Node: + def __init__(self, validator_id, monitoring_gateway_server_port, cmd): + self.validator_id = validator_id + self.monitoring_gateway_server_port = monitoring_gateway_server_port + self.cmd = cmd + self.process = None + self.height_and_timestamp = (None, None) # (height, timestamp) + + def start(self): + self.process = subprocess.Popen(self.cmd, shell=True, preexec_fn=os.setsid) + + def stop(self): + if self.process: + os.killpg(os.getpgid(self.process.pid), signal.SIGINT) + self.process.wait() + + def get_height(self): + port = self.monitoring_gateway_server_port + command = f"curl -s -X GET http://localhost:{port}/monitoring/metrics | grep -oP 'papyrus_consensus_height \\K\\d+'" + result = subprocess.run(command, shell=True, capture_output=True, text=True) + # returns the most recently decided height, or None if node is not ready or consensus has not yet reached any height. + return int(result.stdout) if result.stdout else None + + def check_height(self): + height = self.get_height() + if self.height_and_timestamp[0] != height: + if self.height_and_timestamp[0] is not None and height is not None: + assert height > self.height_and_timestamp[0] , "Height should be increasing." + self.height_and_timestamp = (height, time.time()) + + return self.height_and_timestamp + def find_free_port(): with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: @@ -21,28 +55,45 @@ def find_free_port(): return s.getsockname()[1] -def run_command(command): - return subprocess.run(command, shell=True, check=True) - - -def run_parallel_commands(commands, duration): - processes = [] - for command in commands: - process = subprocess.Popen(command, shell=True, preexec_fn=os.setsid) - processes.append(process) - +# Returns if the simulation should exit. +def monitor_simulation(nodes, start_time, duration, stagnation_timeout): + curr_time = time.time() + if duration is not None and duration < (curr_time - start_time): + return True + stagnated_nodes = [] + for node in nodes: + (height, last_update) = node.check_height() + print(f"Node: {node.validator_id}, height: {height}") + if height is not None and (curr_time - last_update) > stagnation_timeout: + stagnated_nodes.append(node.validator_id) + if stagnated_nodes: + print(f"Nodes {stagnated_nodes} have stagnated. Exiting simulation.") + return True + return False + + +def run_simulation(nodes, duration, stagnation_timeout): + for node in nodes: + node.start() + + start_time = time.time() try: - time.sleep(duration) + while True: + time.sleep(MONITORING_PERIOD) + print(f"\nTime elapsed: {time.time() - start_time}s") + should_exit = monitor_simulation(nodes, start_time, duration, stagnation_timeout) + if should_exit: + break except KeyboardInterrupt: - print("\nCtrl+C pressed: Terminating subprocesses...") + print("\nTerminating subprocesses...") finally: - for process in processes: - os.killpg(os.getpgid(process.pid), signal.SIGINT) - process.wait() + for node in nodes: + node.stop() -def peernode_command(base_layer_node_url, temp_dir, num_validators, i): - return ( +def build_peernode(base_layer_node_url, temp_dir, num_validators, i): + monitoring_gateway_server_port = find_free_port() + cmd = ( f"RUST_LOG=papyrus_consensus=debug,papyrus=info " f"target/release/papyrus_node --network.#is_none false " f"--base_layer.node_url {base_layer_node_url} " @@ -51,19 +102,25 @@ def peernode_command(base_layer_node_url, temp_dir, num_validators, i): f"--consensus.num_validators {num_validators} " f"--network.tcp_port {find_free_port()} " f"--rpc.server_address 127.0.0.1:{find_free_port()} " - f"--monitoring_gateway.server_address 127.0.0.1:{find_free_port()} " + f"--monitoring_gateway.server_address 127.0.0.1:{monitoring_gateway_server_port} " f"--network.bootstrap_peer_multiaddr.#is_none false " f"--network.bootstrap_peer_multiaddr /ip4/127.0.0.1/tcp/10000/p2p/{BOOT_NODE_PEER_ID} " + f"--collect_metrics true" # Use sed to strip special formatting characters f"| sed -r 's/\\x1B\\[[0-9;]*[mK]//g' > {temp_dir}/validator{i}.txt" ) + return Node( + validator_id=i, + monitoring_gateway_server_port=monitoring_gateway_server_port, + cmd=cmd, + ) -def main(base_layer_node_url, num_validators, duration): +def main(base_layer_node_url, num_validators, stagnation_threshold, duration): assert num_validators >= 2, "At least 2 validators are required for the simulation." # Building the Papyrus Node package assuming its output will be located in the papyrus target directory. print("Running cargo build...") - run_command("cargo build --release --package papyrus_node") + subprocess.run("cargo build --release --package papyrus_node", shell=True, check=True) temp_dir = tempfile.mkdtemp() print(f"Output files will be stored in: {temp_dir}") @@ -78,8 +135,9 @@ def main(base_layer_node_url, num_validators, duration): # 2. Validators 2+ are started next to join the network through the bootnode. # 3. Validator 0, which is the proposer, is started last so the validators don't miss the proposals. - validator_commands = [] + nodes = [] # Ensure validator 1 runs first + monitoring_gateway_server_port = find_free_port() bootnode_command = ( f"RUST_LOG=papyrus_consensus=debug,papyrus=info " f"target/release/papyrus_node --network.#is_none false " @@ -88,22 +146,30 @@ def main(base_layer_node_url, num_validators, duration): f"--storage.db_config.path_prefix {temp_dir}/data1 " f"--consensus.#is_none false --consensus.validator_id 0x1 " f"--consensus.num_validators {num_validators} " + f"--monitoring_gateway.server_address 127.0.0.1:{monitoring_gateway_server_port} " + f"--collect_metrics true" # Use sed to strip special formatting characters f"| sed -r 's/\\x1B\\[[0-9;]*[mK]//g' > {temp_dir}/validator1.txt" ) - validator_commands.append(bootnode_command) + nodes.append( + Node( + validator_id=1, + monitoring_gateway_server_port=monitoring_gateway_server_port, + cmd=bootnode_command, + ) + ) # Add other validators - validator_commands.extend( - peernode_command(base_layer_node_url, temp_dir, num_validators, i) + nodes.extend( + build_peernode(base_layer_node_url, temp_dir, num_validators, i) for i in range(2, num_validators) ) # Ensure validator 0 runs last - validator_commands.append(peernode_command(base_layer_node_url, temp_dir, num_validators, 0)) + nodes.append(build_peernode(base_layer_node_url, temp_dir, num_validators, 0)) # Run validator commands in parallel and manage duration time print("Running validators...") - run_parallel_commands(validator_commands, duration) + run_simulation(nodes, duration, stagnation_threshold) print("Simulation complete.") @@ -111,7 +177,14 @@ def main(base_layer_node_url, num_validators, duration): parser = argparse.ArgumentParser(description="Run Papyrus Node simulation.") parser.add_argument("--base_layer_node_url", required=True) parser.add_argument("--num_validators", type=int, required=True) - parser.add_argument("--duration", type=int, required=True) + parser.add_argument( + "--stagnation_threshold", + type=int, + required=False, + default=60, + help="Time in seconds to check for height stagnation.", + ) + parser.add_argument("--duration", type=int, required=False, default=None) args = parser.parse_args() - main(args.base_layer_node_url, args.num_validators, args.duration) + main(args.base_layer_node_url, args.num_validators, args.stagnation_threshold, args.duration)