Skip to content

Commit

Permalink
feat(consensus): add metric for sync in consensus and update the simu…
Browse files Browse the repository at this point in the history
…lation script (#490)
  • Loading branch information
matan-starkware authored Aug 18, 2024
1 parent 240c1f5 commit bc4fc7d
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 12 deletions.
3 changes: 3 additions & 0 deletions crates/papyrus_common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,6 @@ pub static COLLECT_PROFILING_METRICS: OnceLock<bool> = OnceLock::new();

/// The height consensus is currently working on.
pub const PAPYRUS_CONSENSUS_HEIGHT: &str = "papyrus_consensus_height";

/// The number of times consensus has progressed due to the sync protocol.
pub const PAPYRUS_CONSENSUS_SYNC_COUNT: &str = "papyrus_consensus_sync_count";
21 changes: 12 additions & 9 deletions crates/sequencing/papyrus_consensus/run_consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __init__(self, validator_id, monitoring_gateway_server_port, cmd):
self.cmd = cmd
self.process = None
self.height_and_timestamp = (None, None) # (height, timestamp)
self.sync_count = None

def start(self):
self.process = subprocess.Popen(self.cmd, shell=True, preexec_fn=os.setsid)
Expand All @@ -31,15 +32,17 @@ def stop(self):
os.killpg(os.getpgid(self.process.pid), signal.SIGINT)
self.process.wait()

def get_height(self):
def get_metric(self, metric: str):
port = self.monitoring_gateway_server_port
command = f"curl -s -X GET http://localhost:{port}/monitoring/metrics | grep -oP 'papyrus_consensus_height \\K\\d+'"
command = f"curl -s -X GET http://localhost:{port}/monitoring/metrics | grep -oP '{metric} \\K\\d+'"
result = subprocess.run(command, shell=True, capture_output=True, text=True)
# returns the latest decided height, or None if consensus has not yet started.
return int(result.stdout) if result.stdout else None

def check_height(self):
height = self.get_height()
# Check the node's metrics and return the height and timestamp.
def check_node(self):
self.sync_count = self.get_metric("papyrus_consensus_sync_count")

height = self.get_metric("papyrus_consensus_height")
if self.height_and_timestamp[0] != height:
if self.height_and_timestamp[0] is not None and height is not None:
assert height > self.height_and_timestamp[0], "Height should be increasing."
Expand Down Expand Up @@ -89,8 +92,8 @@ def monitor_simulation(nodes, start_time, duration, stagnation_timeout):
return True
stagnated_nodes = []
for node in nodes:
(height, last_update) = node.check_height()
print(f"Node: {node.validator_id}, height: {height}")
(height, last_update) = node.check_node()
print(f"Node: {node.validator_id}, height: {height}, sync_count: {node.sync_count}")
if height is not None and (curr_time - last_update) > stagnation_timeout:
stagnated_nodes.append(node.validator_id)
if stagnated_nodes:
Expand All @@ -107,7 +110,8 @@ def run_simulation(nodes, duration, stagnation_timeout):
try:
while True:
time.sleep(MONITORING_PERIOD)
print(f"\nTime elapsed: {time.time() - start_time}s")
elapsed = round(time.time() - start_time)
print(f"\nTime elapsed: {elapsed}s")
should_exit = monitor_simulation(nodes, start_time, duration, stagnation_timeout)
if should_exit:
break
Expand Down Expand Up @@ -147,7 +151,6 @@ def build_node(data_dir, logs_dir, i, papryus_args):
f"--network.secret_key {SECRET_KEY} "
+ f"2>&1 | sed -r 's/\\x1B\\[[0-9;]*[mK]//g' > {logs_dir}/validator{i}.txt"
)

else:
cmd += (
f"--network.bootstrap_peer_multiaddr.#is_none false "
Expand Down
3 changes: 2 additions & 1 deletion crates/sequencing/papyrus_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::time::Duration;

use futures::channel::{mpsc, oneshot};
use futures::{Stream, StreamExt};
use papyrus_common::metrics::PAPYRUS_CONSENSUS_HEIGHT;
use papyrus_common::metrics::{PAPYRUS_CONSENSUS_HEIGHT, PAPYRUS_CONSENSUS_SYNC_COUNT};
use papyrus_network::network_manager::ReportSender;
use papyrus_protobuf::consensus::{ConsensusMessage, Proposal};
use papyrus_protobuf::converters::ProtobufConversionError;
Expand Down Expand Up @@ -69,6 +69,7 @@ where
current_height = current_height.unchecked_next();
},
sync_height = sync_height(current_height, &mut sync_receiver) => {
metrics::increment_counter!(PAPYRUS_CONSENSUS_SYNC_COUNT);
current_height = sync_height?.unchecked_next();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use papyrus_network::network_manager::ReportSender;
use papyrus_protobuf::consensus::ConsensusMessage;
use papyrus_protobuf::converters::ProtobufConversionError;
use starknet_api::block::BlockHash;
use tracing::{debug, instrument};

/// Receiver used to help run simulations of consensus. It has 2 goals in mind:
/// 1. Simulate network failures.
Expand Down Expand Up @@ -66,13 +67,15 @@ where
///
/// Applies `drop_probability` followed by `invalid_probability`. So the probability of an
/// invalid message is `(1- drop_probability) * invalid_probability`.
#[instrument(skip(self), level = "debug")]
pub fn filter_msg(&mut self, mut msg: ConsensusMessage) -> Option<ConsensusMessage> {
if !matches!(msg, ConsensusMessage::Proposal(_)) {
// TODO(matan): Add support for dropping/invalidating votes.
return Some(msg);
}

if self.should_drop_msg(&msg) {
debug!("Dropping message");
return None;
}

Expand Down Expand Up @@ -109,6 +112,7 @@ where
}

fn invalidate_msg(&mut self, msg: &mut ConsensusMessage) {
debug!("Invalidating message");
// TODO(matan): Allow for invalid votes based on signature/sender_id.
if let ConsensusMessage::Proposal(ref mut proposal) = msg {
proposal.block_hash = BlockHash(proposal.block_hash.0 + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ impl<BlockT: ConsensusBlock> SingleHeightConsensus<BlockT> {
fin_receiver: oneshot::Receiver<BlockHash>,
) -> Result<Option<Decision<BlockT>>, ConsensusError> {
debug!(
"Received proposal: proposal_height={}, proposer={:?}",
init.height.0, init.proposer
"Received proposal: height={}, round={}, proposer={:?}",
init.height.0, init.round, init.proposer
);
let proposer_id = context.proposer(&self.validators, self.height);
if init.height != self.height {
Expand Down

0 comments on commit bc4fc7d

Please sign in to comment.