Skip to content

Commit

Permalink
scheduler & executor support __str__ and __repr__
Browse files Browse the repository at this point in the history
  • Loading branch information
milenkovicm committed Dec 8, 2024
1 parent aadfd3e commit 47650bf
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 6 deletions.
5 changes: 5 additions & 0 deletions ballista/scheduler/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub struct SchedulerConfig {
pub namespace: String,
/// The external hostname of the scheduler
pub external_host: String,
/// The bind host for the scheduler's gRPC service
pub bind_host: String,
/// The bind port for the scheduler's gRPC service
pub bind_port: u16,
/// The task scheduling policy for the scheduler
Expand Down Expand Up @@ -87,6 +89,7 @@ impl std::fmt::Debug for SchedulerConfig {
.field("namespace", &self.namespace)
.field("external_host", &self.external_host)
.field("bind_port", &self.bind_port)
.field("bind_host", &self.bind_host)
.field("scheduling_policy", &self.scheduling_policy)
.field("event_loop_buffer_size", &self.event_loop_buffer_size)
.field("task_distribution", &self.task_distribution)
Expand Down Expand Up @@ -137,6 +140,7 @@ impl Default for SchedulerConfig {
namespace: String::default(),
external_host: "localhost".into(),
bind_port: 50050,
bind_host: "127.0.0.1".into(),
scheduling_policy: Default::default(),
event_loop_buffer_size: 10000,
task_distribution: Default::default(),
Expand Down Expand Up @@ -326,6 +330,7 @@ impl TryFrom<Config> for SchedulerConfig {
namespace: opt.namespace,
external_host: opt.external_host,
bind_port: opt.bind_port,
bind_host: opt.bind_host,
scheduling_policy: opt.scheduler_policy,
event_loop_buffer_size: opt.event_loop_buffer_size,
task_distribution,
Expand Down
2 changes: 2 additions & 0 deletions python/examples/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
# %%
executor.start()
# %%
executor
# %%
executor.wait_for_termination()
# %%
# %%
2 changes: 2 additions & 0 deletions python/examples/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
# %%
scheduler = BallistaScheduler()
# %%
scheduler
# %%
scheduler.start()
# %%
scheduler.wait_for_termination()
Expand Down
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ classifier = [
"Programming Language :: Rust",
]
dependencies = [
"pyarrow>=11.0.0",
"pyarrow>=11.0.0", cloudpickle
]

[project.urls]
Expand Down
68 changes: 65 additions & 3 deletions python/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@ impl PyScheduler {
.map_err(|e| to_pyerr(e))?;

let config = self.config.clone();
let addr = format!("0.0.0.0:{}", config.bind_port);
let addr = addr.parse()?;
let address = format!("{}:{}", config.bind_host, config.bind_port);
let address = address.parse()?;
let handle = spawn_feature(py, async move {
start_server(cluster, addr, Arc::new(config)).await.unwrap();
start_server(cluster, address, Arc::new(config))
.await
.unwrap();
});
self.handle = Some(handle);

Expand All @@ -89,6 +91,31 @@ impl PyScheduler {
None => Ok(()),
}
}
#[classattr]
pub fn version() -> &'static str {
ballista_core::BALLISTA_VERSION
}

pub fn __str__(&self) -> String {
match self.handle {
Some(_) => format!(
"listening address={}:{}",
self.config.bind_host, self.config.bind_port,
),
None => format!(
"configured address={}:{}",
self.config.bind_host, self.config.bind_port,
),
}
}

pub fn __repr__(&self) -> String {
format!(
"BallistaScheduler(config={:?}, listening= {})",
self.config,
self.handle.is_some()
)
}
}

#[pyclass(name = "BallistaExecutor", module = "ballista", subclass)]
Expand Down Expand Up @@ -166,4 +193,39 @@ impl PyExecutor {
None => Ok(()),
}
}

#[classattr]
pub fn version() -> &'static str {
ballista_core::BALLISTA_VERSION
}

pub fn __str__(&self) -> String {
match self.handle {
Some(_) => format!(
"listening address={}:{}, scheduler={}:{}",
self.config.bind_host,
self.config.port,
self.config.scheduler_host,
self.config.scheduler_port
),
None => format!(
"configured address={}:{}, scheduler={}:{}",
self.config.bind_host,
self.config.port,
self.config.scheduler_host,
self.config.scheduler_port
),
}
}

pub fn __repr__(&self) -> String {
format!(
"BallistaExecutor(address={}:{}, scheduler={}:{}, listening={})",
self.config.bind_host,
self.config.port,
self.config.scheduler_host,
self.config.scheduler_port,
self.handle.is_some()
)
}
}
15 changes: 13 additions & 2 deletions python/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use pyo3::types::{PyAnyMethods, PyBytes, PyBytesMethods};
use pyo3::{PyObject, PyResult, Python};
use std::fmt::Debug;
use std::sync::Arc;

static MODULE: &str = "cloudpickle";
Expand Down Expand Up @@ -59,7 +60,6 @@ impl CloudPickle {
}
}

#[derive(Debug)]
pub struct PyLogicalCodec {
inner: BallistaLogicalExtensionCodec,
cloudpickle: CloudPickle,
Expand All @@ -74,6 +74,12 @@ impl PyLogicalCodec {
}
}

impl Debug for PyLogicalCodec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PyLogicalCodec").finish()
}
}

impl LogicalExtensionCodec for PyLogicalCodec {
fn try_decode(
&self,
Expand Down Expand Up @@ -185,12 +191,17 @@ impl LogicalExtensionCodec for PyLogicalCodec {
}
}

#[derive(Debug)]
pub struct PyPhysicalCodec {
inner: BallistaPhysicalExtensionCodec,
cloudpickle: CloudPickle,
}

impl Debug for PyPhysicalCodec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PyPhysicalCodec").finish()
}
}

impl PyPhysicalCodec {
pub fn try_new(py: Python<'_>) -> PyResult<Self> {
Ok(Self {
Expand Down
5 changes: 5 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,9 @@ impl PyBallistaBuilder {

Ok(ctx.into())
}

#[classattr]
pub fn version() -> &'static str {
ballista_core::BALLISTA_VERSION
}
}

0 comments on commit 47650bf

Please sign in to comment.