Skip to content

Commit

Permalink
feat: Expose Ballista Scheduler and Executor in Python (#1148)
Browse files Browse the repository at this point in the history
* Add PyScheduler and PyExecutor

* fix builder api

* scheduler & executor support __str__ and __repr__

* update readme and requirements

* fix pyproject dependency

* expose additional configuration option

* cleanup examples

* add ability to close/stop scheduler and executor

* clippy cleanup

* concurrent_tasks can be configured
  • Loading branch information
milenkovicm authored Dec 9, 2024
1 parent 3af9ae0 commit b3cf8d1
Show file tree
Hide file tree
Showing 16 changed files with 813 additions and 51 deletions.
5 changes: 5 additions & 0 deletions ballista/scheduler/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub struct SchedulerConfig {
pub namespace: String,
/// The external hostname of the scheduler
pub external_host: String,
/// The bind host for the scheduler's gRPC service
pub bind_host: String,
/// The bind port for the scheduler's gRPC service
pub bind_port: u16,
/// The task scheduling policy for the scheduler
Expand Down Expand Up @@ -87,6 +89,7 @@ impl std::fmt::Debug for SchedulerConfig {
.field("namespace", &self.namespace)
.field("external_host", &self.external_host)
.field("bind_port", &self.bind_port)
.field("bind_host", &self.bind_host)
.field("scheduling_policy", &self.scheduling_policy)
.field("event_loop_buffer_size", &self.event_loop_buffer_size)
.field("task_distribution", &self.task_distribution)
Expand Down Expand Up @@ -137,6 +140,7 @@ impl Default for SchedulerConfig {
namespace: String::default(),
external_host: "localhost".into(),
bind_port: 50050,
bind_host: "127.0.0.1".into(),
scheduling_policy: Default::default(),
event_loop_buffer_size: 10000,
task_distribution: Default::default(),
Expand Down Expand Up @@ -326,6 +330,7 @@ impl TryFrom<Config> for SchedulerConfig {
namespace: opt.namespace,
external_host: opt.external_host,
bind_port: opt.bind_port,
bind_host: opt.bind_host,
scheduling_policy: opt.scheduler_policy,
event_loop_buffer_size: opt.event_loop_buffer_size,
task_distribution,
Expand Down
4 changes: 3 additions & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ publish = false

[dependencies]
async-trait = "0.1.77"
ballista = { path = "../ballista/client", version = "0.12.0", features = ["standalone"] }
ballista = { path = "../ballista/client", version = "0.12.0" }
ballista-core = { path = "../ballista/core", version = "0.12.0" }
ballista-executor = { path = "../ballista/executor", version = "0.12.0" }
ballista-scheduler = { path = "../ballista/scheduler", version = "0.12.0" }
datafusion = { version = "42", features = ["pyarrow", "avro"] }
datafusion-proto = { version = "42" }
datafusion-python = { version = "42" }
Expand Down
48 changes: 41 additions & 7 deletions python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,45 +26,79 @@ part of the default Cargo workspace so that it doesn't cause overhead for mainta

## Creating a SessionContext

> [!IMPORTANT]
> Current approach is to support datafusion python API, there are know limitations of current approach,
> with some cases producing errors.
> We trying to come up with the best approach to support datafusion python interface.
> More details could be found at [#1142](https://github.com/apache/datafusion-ballista/issues/1142)
Creates a new context and connects to a Ballista scheduler process.

```python
from ballista import BallistaBuilder
>>> ctx = BallistaBuilder().standalone()
```

## Example SQL Usage
### Example SQL Usage

```python
>>> ctx.sql("create external table t stored as parquet location '/mnt/bigdata/tpch/sf10-parquet/lineitem.parquet'")
>>> ctx.sql("create external table t stored as parquet location './testdata/test.parquet'")
>>> df = ctx.sql("select * from t limit 5")
>>> pyarrow_batches = df.collect()
```

## Example DataFrame Usage
### Example DataFrame Usage

```python
>>> df = ctx.read_parquet('/mnt/bigdata/tpch/sf10-parquet/lineitem.parquet').limit(5)
>>> df = ctx.read_parquet('./testdata/test.parquet').limit(5)
>>> pyarrow_batches = df.collect()
```

## Creating Virtual Environment
## Scheduler and Executor

Scheduler and executors can be configured and started from python code.

To start scheduler:

```python
from ballista import BallistaScheduler

scheduler = BallistaScheduler()

scheduler.start()
scheduler.wait_for_termination()
```

For executor:

```python
from ballista import BallistaExecutor

executor = BallistaExecutor()

executor.start()
executor.wait_for_termination()
```

## Development Process

### Creating Virtual Environment

```shell
python3 -m venv venv
source venv/bin/activate
pip3 install -r requirements.txt
```

## Building
### Building

```shell
maturin develop
```

Note that you can also run `maturin develop --release` to get a release build locally.

## Testing
### Testing

```shell
python3 -m pytest
Expand Down
4 changes: 2 additions & 2 deletions python/ballista/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
import pyarrow as pa

from .ballista_internal import (
BallistaBuilder,
BallistaBuilder, BallistaScheduler, BallistaExecutor
)

__version__ = importlib_metadata.version(__name__)

__all__ = [
"BallistaBuilder",
"BallistaBuilder", "BallistaScheduler", "BallistaExecutor"
]
27 changes: 27 additions & 0 deletions python/examples/client_remote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# %%
from ballista import BallistaBuilder
from datafusion.context import SessionContext

ctx: SessionContext = BallistaBuilder().remote("df://127.0.0.1:50050")

# Select 1 to verify its working
ctx.sql("SELECT 1").show()

# %%
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,23 @@
# specific language governing permissions and limitations
# under the License.

# %%

from ballista import BallistaBuilder
from datafusion.context import SessionContext

# Ballista will initiate with an empty config
# set config variables with `config`
ctx: SessionContext = BallistaBuilder()\
.config("datafusion.catalog.information_schema","true")\
.config("ballista.job.name", "example ballista")\
.config("ballista.shuffle.partitions", "16")\
.standalone()

#ctx_remote: SessionContext = ballista.remote("remote_ip", 50050)

# Select 1 to verify its working
ctx.sql("SELECT 1").show()
#ctx_remote.sql("SELECT 2").show()

# %%
ctx.sql("SHOW TABLES").show()
# %%
ctx.sql("select name, value from information_schema.df_settings where name like 'ballista.job.name'").show()


# %%
31 changes: 31 additions & 0 deletions python/examples/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# %%
from ballista import BallistaExecutor
# %%
executor = BallistaExecutor()
# %%
executor.start()
# %%
executor
# %%
executor.wait_for_termination()
# %%
# %%
executor.close()
# %%
38 changes: 38 additions & 0 deletions python/examples/readme_remote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# %%

from ballista import BallistaBuilder
from datafusion.context import SessionContext

ctx: SessionContext = BallistaBuilder()\
.config("ballista.job.name", "Readme Example Remote")\
.config("datafusion.execution.target_partitions", "4")\
.remote("df://127.0.0.1:50050")

ctx.sql("create external table t stored as parquet location '../testdata/test.parquet'")

# %%
df = ctx.sql("select * from t limit 5")
pyarrow_batches = df.collect()
pyarrow_batches[0].to_pandas()
# %%
df = ctx.read_parquet('../testdata/test.parquet').limit(5)
pyarrow_batches = df.collect()
pyarrow_batches[0].to_pandas()
# %%
38 changes: 38 additions & 0 deletions python/examples/readme_standalone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# %%

from ballista import BallistaBuilder
from datafusion.context import SessionContext

ctx: SessionContext = BallistaBuilder()\
.config("ballista.job.name", "Readme Example")\
.config("datafusion.execution.target_partitions", "4")\
.standalone()

ctx.sql("create external table t stored as parquet location '../testdata/test.parquet'")

# %%
df = ctx.sql("select * from t limit 5")
pyarrow_batches = df.collect()
pyarrow_batches[0].to_pandas()
# %%
df = ctx.read_parquet('../testdata/test.parquet').limit(5)
pyarrow_batches = df.collect()
pyarrow_batches[0].to_pandas()
# %%
29 changes: 29 additions & 0 deletions python/examples/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# %%
from ballista import BallistaScheduler
# %%
scheduler = BallistaScheduler()
# %%
scheduler
# %%
scheduler.start()
# %%
scheduler.wait_for_termination()
# %%
scheduler.close()
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ classifier = [
"Programming Language :: Rust",
]
dependencies = [
"pyarrow>=11.0.0",
"pyarrow>=11.0.0", "cloudpickle"
]

[project.urls]
Expand Down
7 changes: 5 additions & 2 deletions python/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
datafusion==35.0.0
datafusion==42.0.0
pyarrow
pytest
pytest
maturin==1.5.1
cloudpickle
pandas
Loading

0 comments on commit b3cf8d1

Please sign in to comment.