Skip to content

Commit

Permalink
chore(consensus): add consensus stagnation check (#2216)
Browse files Browse the repository at this point in the history
  • Loading branch information
asmaastarkware authored Jul 21, 2024
1 parent e23be56 commit 5c821dc
Showing 1 changed file with 102 additions and 29 deletions.
131 changes: 102 additions & 29 deletions crates/sequencing/papyrus_consensus/run_consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,45 @@
import socket
from contextlib import closing


# The SECRET_KEY is used for building the BOOT_NODE_PEER_ID, so they are coupled and must be used together.
SECRET_KEY = "0xabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcdabcd"
BOOT_NODE_PEER_ID = "12D3KooWDFYi71juk6dYWo3UDvqs5gAzGDc124LSvcR5d187Tdvi"

MONITORING_PERIOD = 10


class Node:
def __init__(self, validator_id, monitoring_gateway_server_port, cmd):
self.validator_id = validator_id
self.monitoring_gateway_server_port = monitoring_gateway_server_port
self.cmd = cmd
self.process = None
self.height_and_timestamp = (None, None) # (height, timestamp)

def start(self):
self.process = subprocess.Popen(self.cmd, shell=True, preexec_fn=os.setsid)

def stop(self):
if self.process:
os.killpg(os.getpgid(self.process.pid), signal.SIGINT)
self.process.wait()

def get_height(self):
port = self.monitoring_gateway_server_port
command = f"curl -s -X GET http://localhost:{port}/monitoring/metrics | grep -oP 'papyrus_consensus_height \\K\\d+'"
result = subprocess.run(command, shell=True, capture_output=True, text=True)
# returns the most recently decided height, or None if node is not ready or consensus has not yet reached any height.
return int(result.stdout) if result.stdout else None

def check_height(self):
height = self.get_height()
if self.height_and_timestamp[0] != height:
if self.height_and_timestamp[0] is not None and height is not None:
assert height > self.height_and_timestamp[0] , "Height should be increasing."
self.height_and_timestamp = (height, time.time())

return self.height_and_timestamp


def find_free_port():
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
Expand All @@ -21,28 +55,45 @@ def find_free_port():
return s.getsockname()[1]


def run_command(command):
return subprocess.run(command, shell=True, check=True)


def run_parallel_commands(commands, duration):
processes = []
for command in commands:
process = subprocess.Popen(command, shell=True, preexec_fn=os.setsid)
processes.append(process)

# Returns if the simulation should exit.
def monitor_simulation(nodes, start_time, duration, stagnation_timeout):
curr_time = time.time()
if duration is not None and duration < (curr_time - start_time):
return True
stagnated_nodes = []
for node in nodes:
(height, last_update) = node.check_height()
print(f"Node: {node.validator_id}, height: {height}")
if height is not None and (curr_time - last_update) > stagnation_timeout:
stagnated_nodes.append(node.validator_id)
if stagnated_nodes:
print(f"Nodes {stagnated_nodes} have stagnated. Exiting simulation.")
return True
return False


def run_simulation(nodes, duration, stagnation_timeout):
for node in nodes:
node.start()

start_time = time.time()
try:
time.sleep(duration)
while True:
time.sleep(MONITORING_PERIOD)
print(f"\nTime elapsed: {time.time() - start_time}s")
should_exit = monitor_simulation(nodes, start_time, duration, stagnation_timeout)
if should_exit:
break
except KeyboardInterrupt:
print("\nCtrl+C pressed: Terminating subprocesses...")
print("\nTerminating subprocesses...")
finally:
for process in processes:
os.killpg(os.getpgid(process.pid), signal.SIGINT)
process.wait()
for node in nodes:
node.stop()


def peernode_command(base_layer_node_url, temp_dir, num_validators, i):
return (
def build_peernode(base_layer_node_url, temp_dir, num_validators, i):
monitoring_gateway_server_port = find_free_port()
cmd = (
f"RUST_LOG=papyrus_consensus=debug,papyrus=info "
f"target/release/papyrus_node --network.#is_none false "
f"--base_layer.node_url {base_layer_node_url} "
Expand All @@ -51,19 +102,25 @@ def peernode_command(base_layer_node_url, temp_dir, num_validators, i):
f"--consensus.num_validators {num_validators} "
f"--network.tcp_port {find_free_port()} "
f"--rpc.server_address 127.0.0.1:{find_free_port()} "
f"--monitoring_gateway.server_address 127.0.0.1:{find_free_port()} "
f"--monitoring_gateway.server_address 127.0.0.1:{monitoring_gateway_server_port} "
f"--network.bootstrap_peer_multiaddr.#is_none false "
f"--network.bootstrap_peer_multiaddr /ip4/127.0.0.1/tcp/10000/p2p/{BOOT_NODE_PEER_ID} "
f"--collect_metrics true"
# Use sed to strip special formatting characters
f"| sed -r 's/\\x1B\\[[0-9;]*[mK]//g' > {temp_dir}/validator{i}.txt"
)
return Node(
validator_id=i,
monitoring_gateway_server_port=monitoring_gateway_server_port,
cmd=cmd,
)


def main(base_layer_node_url, num_validators, duration):
def main(base_layer_node_url, num_validators, stagnation_threshold, duration):
assert num_validators >= 2, "At least 2 validators are required for the simulation."
# Building the Papyrus Node package assuming its output will be located in the papyrus target directory.
print("Running cargo build...")
run_command("cargo build --release --package papyrus_node")
subprocess.run("cargo build --release --package papyrus_node", shell=True, check=True)

temp_dir = tempfile.mkdtemp()
print(f"Output files will be stored in: {temp_dir}")
Expand All @@ -78,8 +135,9 @@ def main(base_layer_node_url, num_validators, duration):
# 2. Validators 2+ are started next to join the network through the bootnode.
# 3. Validator 0, which is the proposer, is started last so the validators don't miss the proposals.

validator_commands = []
nodes = []
# Ensure validator 1 runs first
monitoring_gateway_server_port = find_free_port()
bootnode_command = (
f"RUST_LOG=papyrus_consensus=debug,papyrus=info "
f"target/release/papyrus_node --network.#is_none false "
Expand All @@ -88,30 +146,45 @@ def main(base_layer_node_url, num_validators, duration):
f"--storage.db_config.path_prefix {temp_dir}/data1 "
f"--consensus.#is_none false --consensus.validator_id 0x1 "
f"--consensus.num_validators {num_validators} "
f"--monitoring_gateway.server_address 127.0.0.1:{monitoring_gateway_server_port} "
f"--collect_metrics true"
# Use sed to strip special formatting characters
f"| sed -r 's/\\x1B\\[[0-9;]*[mK]//g' > {temp_dir}/validator1.txt"
)
validator_commands.append(bootnode_command)
nodes.append(
Node(
validator_id=1,
monitoring_gateway_server_port=monitoring_gateway_server_port,
cmd=bootnode_command,
)
)

# Add other validators
validator_commands.extend(
peernode_command(base_layer_node_url, temp_dir, num_validators, i)
nodes.extend(
build_peernode(base_layer_node_url, temp_dir, num_validators, i)
for i in range(2, num_validators)
)
# Ensure validator 0 runs last
validator_commands.append(peernode_command(base_layer_node_url, temp_dir, num_validators, 0))
nodes.append(build_peernode(base_layer_node_url, temp_dir, num_validators, 0))

# Run validator commands in parallel and manage duration time
print("Running validators...")
run_parallel_commands(validator_commands, duration)
run_simulation(nodes, duration, stagnation_threshold)
print("Simulation complete.")


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run Papyrus Node simulation.")
parser.add_argument("--base_layer_node_url", required=True)
parser.add_argument("--num_validators", type=int, required=True)
parser.add_argument("--duration", type=int, required=True)
parser.add_argument(
"--stagnation_threshold",
type=int,
required=False,
default=60,
help="Time in seconds to check for height stagnation.",
)
parser.add_argument("--duration", type=int, required=False, default=None)

args = parser.parse_args()
main(args.base_layer_node_url, args.num_validators, args.duration)
main(args.base_layer_node_url, args.num_validators, args.stagnation_threshold, args.duration)

0 comments on commit 5c821dc

Please sign in to comment.