Skip to content

Commit

Permalink
feature: add metrics/tracking to remaining agents (#922)
Browse files Browse the repository at this point in the history
* feature: add num relayed messages metrics/tracking to relayer

* refactor: remove processor-specific nonce storage method from optics db

* fix: use countervec for relayer instead of guage

* feature: updater uses intcounter vec for submitted update count

* fix: remove db from relayer since it now uses countervec with no rocksdb

* fix: updates relayed not messages oops

* fix: update submitter takes the correct countervec
  • Loading branch information
luketchang authored Nov 1, 2021
1 parent 397b088 commit f007a29
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 39 deletions.
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 12 additions & 12 deletions rust/agents/processor/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::{
};

const AGENT_NAME: &str = "processor";
static CURRENT_NONCE: &str = "current_nonce_";

enum Flow {
Advance,
Expand Down Expand Up @@ -61,7 +62,7 @@ impl Replica {
async move {
use optics_core::Replica;

let domain = self.replica.local_domain();
let replica_domain = self.replica.local_domain();

// The basic structure of this loop is as follows:
// 1. Get the last processed index
Expand All @@ -73,7 +74,7 @@ impl Replica {
// 5. Submit the proof to the replica
let mut next_message_nonce: u32 = self
.db
.retrieve_latest_nonce(domain)?
.retrieve_keyed_decodable(CURRENT_NONCE, &replica_domain)?
.map(|n: u32| n + 1)
.unwrap_or_default();

Expand All @@ -82,35 +83,34 @@ impl Replica {
.set(next_message_nonce as i64);

info!(
domain,
replica_domain,
nonce = next_message_nonce,
replica = self.replica.name(),
"Starting processor for {} {} at nonce {}",
domain,
"Starting processor for {}:{} at nonce {}",
self.replica.name(),
replica_domain,
next_message_nonce
);

loop {
use optics_core::Replica;
let seq_span = tracing::trace_span!(
"ReplicaProcessor",
name = self.replica.name(),
nonce = next_message_nonce,
replica_domain = self.replica.local_domain(),
replica_domain = replica_domain,
home_domain = self.home.local_domain(),
);

match self
.try_msg_by_domain_and_nonce(domain, next_message_nonce)
.try_msg_by_domain_and_nonce(replica_domain, next_message_nonce)
.instrument(seq_span)
.await
{
Ok(Flow::Advance) => {
self.db
.store_latest_nonce(domain, next_message_nonce)?;
next_message_nonce += 1;
.store_keyed_encodable(CURRENT_NONCE, &replica_domain, &next_message_nonce)?;

next_message_nonce += 1;
self.next_message_nonce
.with_label_values(&[
self.home.name(),
Expand All @@ -122,13 +122,13 @@ impl Replica {
Ok(Flow::Repeat) => {
// there was some fault, let's wait and then try again later when state may have moved
debug!(
domain,
replica_domain,
nonce = next_message_nonce,
replica = self.replica.name(),
"Failed to find message_by_nonce or proof_by_leaf_index. Processor retrying message. Replica: {}. Nonce: {}. Domain: {}.",
self.replica.name(),
next_message_nonce,
domain,
replica_domain,
);
sleep(Duration::from_secs(self.interval)).await
}
Expand Down
2 changes: 2 additions & 0 deletions rust/agents/relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ optics-core = { path = "../../optics-core" }
optics-base = { path = "../../optics-base" }
paste = "1.0.5"

prometheus = "0.12"

[dev-dependencies]
tokio-test = "0.4.0"
optics-test = { path = "../../optics-test" }
49 changes: 41 additions & 8 deletions rust/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ use optics_core::Common;

use crate::settings::RelayerSettings as Settings;

const AGENT_NAME: &str = "relayer";

#[derive(Debug)]
struct UpdatePoller {
duration: Duration,
home: Arc<Homes>,
replica: Arc<Replicas>,
semaphore: Mutex<()>,
updates_relayed_count: Arc<prometheus::IntCounterVec>,
}

impl std::fmt::Display for UpdatePoller {
Expand All @@ -28,20 +31,25 @@ impl std::fmt::Display for UpdatePoller {
}

impl UpdatePoller {
fn new(home: Arc<Homes>, replica: Arc<Replicas>, duration: u64) -> Self {
fn new(
home: Arc<Homes>,
replica: Arc<Replicas>,
duration: u64,
updates_relayed_count: Arc<prometheus::IntCounterVec>,
) -> Self {
Self {
home,
replica,
duration: Duration::from_secs(duration),
semaphore: Mutex::new(()),
updates_relayed_count,
}
}

#[tracing::instrument(err, skip(self), fields(self = %self))]
async fn poll_and_relay_update(&self) -> Result<()> {
// Get replica's current root.
let old_root = self.replica.committed_root().await?;

info!(
"Replica {} latest root is: {}",
self.replica.name(),
Expand All @@ -51,7 +59,8 @@ impl UpdatePoller {
// Check for first signed update building off of the replica's current root
let signed_update_opt = self.home.signed_update_by_old_root(old_root).await?;

// If signed update exists, update replica's current root
// If signed update exists for replica's committed root, try to
// relay
if let Some(signed_update) = signed_update_opt {
info!(
"Update for replica {}. Root {} to {}",
Expand All @@ -60,12 +69,19 @@ impl UpdatePoller {
&signed_update.update.new_root,
);

// Attempt to acquire lock for submitting tx
let lock = self.semaphore.try_lock();
if lock.is_err() {
return Ok(()); // tx in flight. just do nothing
}
// don't care if it succeeds
let _ = self.replica.update(&signed_update).await;

// Relay update and increment counters if tx successful
if self.replica.update(&signed_update).await.is_ok() {
self.updates_relayed_count
.with_label_values(&[self.home.name(), self.replica.name(), AGENT_NAME])
.inc();
}

// lock dropped here
} else {
info!(
Expand Down Expand Up @@ -93,6 +109,7 @@ impl UpdatePoller {
pub struct Relayer {
duration: u64,
core: AgentCore,
updates_relayed_count: Arc<prometheus::IntCounterVec>,
}

impl AsRef<AgentCore> for Relayer {
Expand All @@ -105,7 +122,21 @@ impl AsRef<AgentCore> for Relayer {
impl Relayer {
/// Instantiate a new relayer
pub fn new(duration: u64, core: AgentCore) -> Self {
Self { duration, core }
let updates_relayed_count = Arc::new(
core.metrics
.new_int_counter(
"updates_relayed_count",
"Number of updates relayed from given home to replica",
&["home", "replica", "agent"],
)
.expect("processor metric already registered -- should have be a singleton"),
);

Self {
duration,
core,
updates_relayed_count,
}
}
}

Expand All @@ -130,8 +161,9 @@ impl OpticsAgent for Relayer {
fn run(&self, name: &str) -> Instrumented<JoinHandle<Result<()>>> {
let replica_opt = self.replica_by_name(name);
let home = self.home();
let name = name.to_owned();
let updates_relayed_count = self.updates_relayed_count.clone();

let name = name.to_owned();
let duration = self.duration;

tokio::spawn(async move {
Expand All @@ -140,7 +172,8 @@ impl OpticsAgent for Relayer {
}
let replica = replica_opt.unwrap();

let update_poller = UpdatePoller::new(home, replica.clone(), duration);
let update_poller =
UpdatePoller::new(home, replica.clone(), duration, updates_relayed_count);
update_poller.spawn().await?
})
.in_current_span()
Expand Down
19 changes: 17 additions & 2 deletions rust/agents/updater/src/submit.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,35 @@
use std::sync::Arc;

use optics_base::Homes;
use optics_base::{Homes, OpticsAgent};
use optics_core::{db::OpticsDB, Common};
use prometheus::IntCounterVec;
use std::time::Duration;

use color_eyre::Result;
use tokio::{task::JoinHandle, time::sleep};
use tracing::{info, info_span, instrument::Instrumented, Instrument};

use crate::updater::Updater;

pub(crate) struct UpdateSubmitter {
home: Arc<Homes>,
db: OpticsDB,
interval_seconds: u64,
submitted_update_count: IntCounterVec,
}

impl UpdateSubmitter {
pub(crate) fn new(home: Arc<Homes>, db: OpticsDB, interval_seconds: u64) -> Self {
pub(crate) fn new(
home: Arc<Homes>,
db: OpticsDB,
interval_seconds: u64,
submitted_update_count: IntCounterVec,
) -> Self {
Self {
home,
db,
interval_seconds,
submitted_update_count,
}
}

Expand All @@ -43,8 +53,13 @@ impl UpdateSubmitter {
hex_signature = %hex_signature,
"Submitting update to chain"
);

self.home.update(&signed).await?;

self.submitted_update_count
.with_label_values(&[self.home.name(), Updater::AGENT_NAME])
.inc();

// continue from local state
committed_root = signed.update.new_root;
}
Expand Down
18 changes: 17 additions & 1 deletion rust/agents/updater/src/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct Updater {
update_pause: u64,
pub(crate) core: AgentCore,
signed_attestation_count: IntCounterVec,
submitted_update_count: IntCounterVec,
}

impl AsRef<AgentCore> for Updater {
Expand All @@ -42,12 +43,22 @@ impl Updater {
)
.expect("must be able to register agent metrics");

let submitted_update_count = core
.metrics
.new_int_counter(
"submitted_update_count",
"Number of updates successfully submitted to home",
&["network", "agent"],
)
.expect("must be able to register agent metrics");

Self {
signer: Arc::new(signer),
interval_seconds,
update_pause,
core,
signed_attestation_count,
submitted_update_count,
}
}
}
Expand Down Expand Up @@ -87,7 +98,12 @@ impl OpticsAgent for Updater {
self.signed_attestation_count.clone(),
);

let submit = UpdateSubmitter::new(self.home(), db, self.interval_seconds);
let submit = UpdateSubmitter::new(
self.home(),
db,
self.interval_seconds,
self.submitted_update_count.clone(),
);

tokio::spawn(async move {
let expected: Address = home.updater().await?.into();
Expand Down
16 changes: 0 additions & 16 deletions rust/optics-core/src/db/optics_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ static MESSAGE: &str = "message_";
static UPDATE: &str = "update_";
static UPDATE_META: &str = "update_metadata_";
static LATEST_ROOT: &str = "update_latest_root_";
static LATEST_NONCE: &str = "latest_nonce_";
static LATEST_LEAF_INDEX: &str = "latest_known_leaf_index_";
static UPDATER_PRODUCED_UPDATE: &str = "updater_produced_update_";

Expand Down Expand Up @@ -159,21 +158,6 @@ impl OpticsDB {
}
}

/// Stores the latest inspected nonce for a given replica domain
///
/// Keys --> Values:
/// - `replica_domain` --> `nonce`
pub fn store_latest_nonce(&self, replica_domain: u32, nonce: u32) -> Result<(), DbError> {
self.store_keyed_encodable(LATEST_NONCE, &replica_domain, &nonce)?;

Ok(())
}

/// Retrieves the latest inspected nonce for a given replica domain
pub fn retrieve_latest_nonce(&self, replica_domain: u32) -> Result<Option<u32>, DbError> {
self.retrieve_keyed_decodable(LATEST_NONCE, &replica_domain)
}

/// Store the latest committed
fn store_latest_root(&self, root: H256) -> Result<(), DbError> {
debug!(root = ?root, "storing new latest root in DB");
Expand Down

0 comments on commit f007a29

Please sign in to comment.