Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add metrics/tracking to remaining agents #922

Merged
merged 7 commits into from
Nov 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 12 additions & 12 deletions rust/agents/processor/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::{
};

const AGENT_NAME: &str = "processor";
static CURRENT_NONCE: &str = "current_nonce_";

enum Flow {
Advance,
Expand Down Expand Up @@ -61,7 +62,7 @@ impl Replica {
async move {
use optics_core::Replica;

let domain = self.replica.local_domain();
let replica_domain = self.replica.local_domain();

// The basic structure of this loop is as follows:
// 1. Get the last processed index
Expand All @@ -73,7 +74,7 @@ impl Replica {
// 5. Submit the proof to the replica
let mut next_message_nonce: u32 = self
.db
.retrieve_latest_nonce(domain)?
.retrieve_keyed_decodable(CURRENT_NONCE, &replica_domain)?
.map(|n: u32| n + 1)
Comment on lines 76 to 78
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to remove pass through methods in OpticsDB. Also felt that simple agent-unique counter stores didn't belong in OpticsDB so we just use basic db methods here

.unwrap_or_default();

Expand All @@ -82,35 +83,34 @@ impl Replica {
.set(next_message_nonce as i64);

info!(
domain,
replica_domain,
nonce = next_message_nonce,
replica = self.replica.name(),
"Starting processor for {} {} at nonce {}",
domain,
"Starting processor for {}:{} at nonce {}",
self.replica.name(),
replica_domain,
next_message_nonce
);

loop {
use optics_core::Replica;
let seq_span = tracing::trace_span!(
"ReplicaProcessor",
name = self.replica.name(),
nonce = next_message_nonce,
replica_domain = self.replica.local_domain(),
replica_domain = replica_domain,
home_domain = self.home.local_domain(),
);

match self
.try_msg_by_domain_and_nonce(domain, next_message_nonce)
.try_msg_by_domain_and_nonce(replica_domain, next_message_nonce)
.instrument(seq_span)
.await
{
Ok(Flow::Advance) => {
self.db
.store_latest_nonce(domain, next_message_nonce)?;
next_message_nonce += 1;
.store_keyed_encodable(CURRENT_NONCE, &replica_domain, &next_message_nonce)?;

next_message_nonce += 1;
self.next_message_nonce
.with_label_values(&[
self.home.name(),
Expand All @@ -122,13 +122,13 @@ impl Replica {
Ok(Flow::Repeat) => {
// there was some fault, let's wait and then try again later when state may have moved
debug!(
domain,
replica_domain,
nonce = next_message_nonce,
replica = self.replica.name(),
"Failed to find message_by_nonce or proof_by_leaf_index. Processor retrying message. Replica: {}. Nonce: {}. Domain: {}.",
self.replica.name(),
next_message_nonce,
domain,
replica_domain,
);
sleep(Duration::from_secs(self.interval)).await
}
Expand Down
2 changes: 2 additions & 0 deletions rust/agents/relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ optics-core = { path = "../../optics-core" }
optics-base = { path = "../../optics-base" }
paste = "1.0.5"

prometheus = "0.12"

[dev-dependencies]
tokio-test = "0.4.0"
optics-test = { path = "../../optics-test" }
49 changes: 41 additions & 8 deletions rust/agents/relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ use optics_core::Common;

use crate::settings::RelayerSettings as Settings;

const AGENT_NAME: &str = "relayer";

#[derive(Debug)]
struct UpdatePoller {
duration: Duration,
home: Arc<Homes>,
replica: Arc<Replicas>,
semaphore: Mutex<()>,
updates_relayed_count: Arc<prometheus::IntCounterVec>,
}

impl std::fmt::Display for UpdatePoller {
Expand All @@ -28,20 +31,25 @@ impl std::fmt::Display for UpdatePoller {
}

impl UpdatePoller {
fn new(home: Arc<Homes>, replica: Arc<Replicas>, duration: u64) -> Self {
fn new(
home: Arc<Homes>,
replica: Arc<Replicas>,
duration: u64,
updates_relayed_count: Arc<prometheus::IntCounterVec>,
) -> Self {
Self {
home,
replica,
duration: Duration::from_secs(duration),
semaphore: Mutex::new(()),
updates_relayed_count,
}
}

#[tracing::instrument(err, skip(self), fields(self = %self))]
async fn poll_and_relay_update(&self) -> Result<()> {
// Get replica's current root.
let old_root = self.replica.committed_root().await?;

info!(
"Replica {} latest root is: {}",
self.replica.name(),
Expand All @@ -51,7 +59,8 @@ impl UpdatePoller {
// Check for first signed update building off of the replica's current root
let signed_update_opt = self.home.signed_update_by_old_root(old_root).await?;

// If signed update exists, update replica's current root
// If signed update exists for replica's committed root, try to
// relay
if let Some(signed_update) = signed_update_opt {
info!(
"Update for replica {}. Root {} to {}",
Expand All @@ -60,12 +69,19 @@ impl UpdatePoller {
&signed_update.update.new_root,
);

// Attempt to acquire lock for submitting tx
let lock = self.semaphore.try_lock();
if lock.is_err() {
return Ok(()); // tx in flight. just do nothing
}
// don't care if it succeeds
let _ = self.replica.update(&signed_update).await;

// Relay update and increment counters if tx successful
if self.replica.update(&signed_update).await.is_ok() {
self.updates_relayed_count
.with_label_values(&[self.home.name(), self.replica.name(), AGENT_NAME])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this invocation is constant for every instance of UpdatePoller, which is why in the other PR i made for just the updater stores an already-fully-specified IntCounter inside the UpdatePoller instead of doing it dynamically.

doing so only avoids a single hash of all the labels, though, so 🤷🏼

.inc();
}

// lock dropped here
} else {
info!(
Expand Down Expand Up @@ -93,6 +109,7 @@ impl UpdatePoller {
pub struct Relayer {
duration: u64,
core: AgentCore,
updates_relayed_count: Arc<prometheus::IntCounterVec>,
}

impl AsRef<AgentCore> for Relayer {
Expand All @@ -105,7 +122,21 @@ impl AsRef<AgentCore> for Relayer {
impl Relayer {
/// Instantiate a new relayer
pub fn new(duration: u64, core: AgentCore) -> Self {
Self { duration, core }
let updates_relayed_count = Arc::new(
core.metrics
.new_int_counter(
"updates_relayed_count",
"Number of updates relayed from given home to replica",
&["home", "replica", "agent"],
)
.expect("processor metric already registered -- should have be a singleton"),
);

Self {
duration,
core,
updates_relayed_count,
}
}
}

Expand All @@ -130,8 +161,9 @@ impl OpticsAgent for Relayer {
fn run(&self, name: &str) -> Instrumented<JoinHandle<Result<()>>> {
let replica_opt = self.replica_by_name(name);
let home = self.home();
let name = name.to_owned();
let updates_relayed_count = self.updates_relayed_count.clone();

let name = name.to_owned();
let duration = self.duration;

tokio::spawn(async move {
Expand All @@ -140,7 +172,8 @@ impl OpticsAgent for Relayer {
}
let replica = replica_opt.unwrap();

let update_poller = UpdatePoller::new(home, replica.clone(), duration);
let update_poller =
UpdatePoller::new(home, replica.clone(), duration, updates_relayed_count);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the information needed to do updates_relayed_count.with_label_values is right here

update_poller.spawn().await?
})
.in_current_span()
Expand Down
19 changes: 17 additions & 2 deletions rust/agents/updater/src/submit.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,35 @@
use std::sync::Arc;

use optics_base::Homes;
use optics_base::{Homes, OpticsAgent};
use optics_core::{db::OpticsDB, Common};
use prometheus::IntCounterVec;
use std::time::Duration;

use color_eyre::Result;
use tokio::{task::JoinHandle, time::sleep};
use tracing::{info, info_span, instrument::Instrumented, Instrument};

use crate::updater::Updater;

pub(crate) struct UpdateSubmitter {
home: Arc<Homes>,
db: OpticsDB,
interval_seconds: u64,
submitted_update_count: IntCounterVec,
}

impl UpdateSubmitter {
pub(crate) fn new(home: Arc<Homes>, db: OpticsDB, interval_seconds: u64) -> Self {
pub(crate) fn new(
home: Arc<Homes>,
db: OpticsDB,
interval_seconds: u64,
submitted_update_count: IntCounterVec,
) -> Self {
Self {
home,
db,
interval_seconds,
submitted_update_count,
}
}

Expand All @@ -43,8 +53,13 @@ impl UpdateSubmitter {
hex_signature = %hex_signature,
"Submitting update to chain"
);

self.home.update(&signed).await?;

self.submitted_update_count
.with_label_values(&[self.home.name(), Updater::AGENT_NAME])
.inc();

// continue from local state
committed_root = signed.update.new_root;
}
Expand Down
18 changes: 17 additions & 1 deletion rust/agents/updater/src/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct Updater {
update_pause: u64,
pub(crate) core: AgentCore,
signed_attestation_count: IntCounterVec,
submitted_update_count: IntCounterVec,
}

impl AsRef<AgentCore> for Updater {
Expand All @@ -42,12 +43,22 @@ impl Updater {
)
.expect("must be able to register agent metrics");

let submitted_update_count = core
.metrics
.new_int_counter(
"submitted_update_count",
"Number of updates successfully submitted to home",
&["network", "agent"],
)
.expect("must be able to register agent metrics");

Self {
signer: Arc::new(signer),
interval_seconds,
update_pause,
core,
signed_attestation_count,
submitted_update_count,
}
}
}
Expand Down Expand Up @@ -87,7 +98,12 @@ impl OpticsAgent for Updater {
self.signed_attestation_count.clone(),
);

let submit = UpdateSubmitter::new(self.home(), db, self.interval_seconds);
let submit = UpdateSubmitter::new(
self.home(),
db,
self.interval_seconds,
self.submitted_update_count.clone(),
);

tokio::spawn(async move {
let expected: Address = home.updater().await?.into();
Expand Down
16 changes: 0 additions & 16 deletions rust/optics-core/src/db/optics_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ static MESSAGE: &str = "message_";
static UPDATE: &str = "update_";
static UPDATE_META: &str = "update_metadata_";
static LATEST_ROOT: &str = "update_latest_root_";
static LATEST_NONCE: &str = "latest_nonce_";
static LATEST_LEAF_INDEX: &str = "latest_known_leaf_index_";
static UPDATER_PRODUCED_UPDATE: &str = "updater_produced_update_";

Expand Down Expand Up @@ -159,21 +158,6 @@ impl OpticsDB {
}
}

/// Stores the latest inspected nonce for a given replica domain
///
/// Keys --> Values:
/// - `replica_domain` --> `nonce`
pub fn store_latest_nonce(&self, replica_domain: u32, nonce: u32) -> Result<(), DbError> {
self.store_keyed_encodable(LATEST_NONCE, &replica_domain, &nonce)?;

Ok(())
}

/// Retrieves the latest inspected nonce for a given replica domain
pub fn retrieve_latest_nonce(&self, replica_domain: u32) -> Result<Option<u32>, DbError> {
self.retrieve_keyed_decodable(LATEST_NONCE, &replica_domain)
}

/// Store the latest committed
fn store_latest_root(&self, root: H256) -> Result<(), DbError> {
debug!(root = ?root, "storing new latest root in DB");
Expand Down