-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature: add metrics/tracking to remaining agents #922
Changes from all commits
3f13a8b
e47cfa0
3c99c60
80a3656
6ce3922
f0bf089
993e2f1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,12 +9,15 @@ use optics_core::Common; | |
|
||
use crate::settings::RelayerSettings as Settings; | ||
|
||
const AGENT_NAME: &str = "relayer"; | ||
|
||
#[derive(Debug)] | ||
struct UpdatePoller { | ||
duration: Duration, | ||
home: Arc<Homes>, | ||
replica: Arc<Replicas>, | ||
semaphore: Mutex<()>, | ||
updates_relayed_count: Arc<prometheus::IntCounterVec>, | ||
} | ||
|
||
impl std::fmt::Display for UpdatePoller { | ||
|
@@ -28,20 +31,25 @@ impl std::fmt::Display for UpdatePoller { | |
} | ||
|
||
impl UpdatePoller { | ||
fn new(home: Arc<Homes>, replica: Arc<Replicas>, duration: u64) -> Self { | ||
fn new( | ||
home: Arc<Homes>, | ||
replica: Arc<Replicas>, | ||
duration: u64, | ||
updates_relayed_count: Arc<prometheus::IntCounterVec>, | ||
) -> Self { | ||
Self { | ||
home, | ||
replica, | ||
duration: Duration::from_secs(duration), | ||
semaphore: Mutex::new(()), | ||
updates_relayed_count, | ||
} | ||
} | ||
|
||
#[tracing::instrument(err, skip(self), fields(self = %self))] | ||
async fn poll_and_relay_update(&self) -> Result<()> { | ||
// Get replica's current root. | ||
let old_root = self.replica.committed_root().await?; | ||
|
||
info!( | ||
"Replica {} latest root is: {}", | ||
self.replica.name(), | ||
|
@@ -51,7 +59,8 @@ impl UpdatePoller { | |
// Check for first signed update building off of the replica's current root | ||
let signed_update_opt = self.home.signed_update_by_old_root(old_root).await?; | ||
|
||
// If signed update exists, update replica's current root | ||
// If signed update exists for replica's committed root, try to | ||
// relay | ||
if let Some(signed_update) = signed_update_opt { | ||
info!( | ||
"Update for replica {}. Root {} to {}", | ||
|
@@ -60,12 +69,19 @@ impl UpdatePoller { | |
&signed_update.update.new_root, | ||
); | ||
|
||
// Attempt to acquire lock for submitting tx | ||
let lock = self.semaphore.try_lock(); | ||
if lock.is_err() { | ||
return Ok(()); // tx in flight. just do nothing | ||
} | ||
// don't care if it succeeds | ||
let _ = self.replica.update(&signed_update).await; | ||
|
||
// Relay update and increment counters if tx successful | ||
if self.replica.update(&signed_update).await.is_ok() { | ||
self.updates_relayed_count | ||
.with_label_values(&[self.home.name(), self.replica.name(), AGENT_NAME]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this invocation is constant for every instance of UpdatePoller, which is why in the other PR i made for just the updater stores an already-fully-specified doing so only avoids a single hash of all the labels, though, so 🤷🏼 |
||
.inc(); | ||
} | ||
|
||
// lock dropped here | ||
} else { | ||
info!( | ||
|
@@ -93,6 +109,7 @@ impl UpdatePoller { | |
pub struct Relayer { | ||
duration: u64, | ||
core: AgentCore, | ||
updates_relayed_count: Arc<prometheus::IntCounterVec>, | ||
} | ||
|
||
impl AsRef<AgentCore> for Relayer { | ||
|
@@ -105,7 +122,21 @@ impl AsRef<AgentCore> for Relayer { | |
impl Relayer { | ||
/// Instantiate a new relayer | ||
pub fn new(duration: u64, core: AgentCore) -> Self { | ||
Self { duration, core } | ||
let updates_relayed_count = Arc::new( | ||
core.metrics | ||
.new_int_counter( | ||
"updates_relayed_count", | ||
"Number of updates relayed from given home to replica", | ||
&["home", "replica", "agent"], | ||
) | ||
.expect("processor metric already registered -- should have be a singleton"), | ||
); | ||
|
||
Self { | ||
duration, | ||
core, | ||
updates_relayed_count, | ||
} | ||
} | ||
} | ||
|
||
|
@@ -130,8 +161,9 @@ impl OpticsAgent for Relayer { | |
fn run(&self, name: &str) -> Instrumented<JoinHandle<Result<()>>> { | ||
let replica_opt = self.replica_by_name(name); | ||
let home = self.home(); | ||
let name = name.to_owned(); | ||
let updates_relayed_count = self.updates_relayed_count.clone(); | ||
|
||
let name = name.to_owned(); | ||
let duration = self.duration; | ||
|
||
tokio::spawn(async move { | ||
|
@@ -140,7 +172,8 @@ impl OpticsAgent for Relayer { | |
} | ||
let replica = replica_opt.unwrap(); | ||
|
||
let update_poller = UpdatePoller::new(home, replica.clone(), duration); | ||
let update_poller = | ||
UpdatePoller::new(home, replica.clone(), duration, updates_relayed_count); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. all the information needed to do |
||
update_poller.spawn().await? | ||
}) | ||
.in_current_span() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to remove pass through methods in OpticsDB. Also felt that simple agent-unique counter stores didn't belong in OpticsDB so we just use basic db methods here