Skip to content

Commit

Permalink
[lighthouse] detect unhealthy participants via heartbeats
Browse files Browse the repository at this point in the history
  • Loading branch information
d4l3k committed Jan 10, 2025
1 parent 5dd6f38 commit 529233a
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 37 deletions.
6 changes: 6 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl Manager {
bind: String,
store_addr: String,
world_size: u64,
heartbeat_interval: Duration,
) -> PyResult<Self> {
py.allow_threads(move || {
let runtime = Runtime::new()?;
Expand All @@ -55,6 +56,7 @@ impl Manager {
bind,
store_addr,
world_size,
heartbeat_interval,
))
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
let handle = runtime.spawn(manager.clone().run());
Expand Down Expand Up @@ -224,16 +226,19 @@ struct Lighthouse {

#[pymethods]
impl Lighthouse {
#[pyo3(signature = (bind, min_replicas, join_timeout_ms=None, quorum_tick_ms=None, heartbeat_timeout_ms=None))]
#[new]
fn new(
py: Python<'_>,
bind: String,
min_replicas: u64,
join_timeout_ms: Option<u64>,
quorum_tick_ms: Option<u64>,
heartbeat_timeout_ms: Option<u64>,
) -> PyResult<Self> {
let join_timeout_ms = join_timeout_ms.unwrap_or(100);
let quorum_tick_ms = quorum_tick_ms.unwrap_or(100);
let heartbeat_timeout_ms = heartbeat_timeout_ms.unwrap_or(5000);

py.allow_threads(move || {
let rt = Runtime::new()?;
Expand All @@ -244,6 +249,7 @@ impl Lighthouse {
min_replicas: min_replicas,
join_timeout_ms: join_timeout_ms,
quorum_tick_ms: quorum_tick_ms,
heartbeat_timeout_ms: heartbeat_timeout_ms,
}))
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;

Expand Down
Loading

0 comments on commit 529233a

Please sign in to comment.