Skip to content

Commit

Permalink
Update README for running benchmarks in k8s (apache#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Nov 3, 2024
1 parent 74ec45c commit 8ee46ab
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 5 deletions.
6 changes: 6 additions & 0 deletions tpch/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FROM apache/datafusion-ray

RUN sudo apt update && \
sudo apt install -y git

RUN git clone https://github.com/apache/datafusion-benchmarks.git
97 changes: 96 additions & 1 deletion tpch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,103 @@

## Running Benchmarks

### Standalone Ray Cluster

Data and queries must be available on all nodes of the Ray cluster.

```shell
RAY_ADDRESS='http://ray-cluster-ip-address:8265' ray job submit --working-dir `pwd` -- python3 tpcbench.py --benchmark tpch --data /path/to/data --queries /path/to/tpch/queries --concurrency 4
RAY_ADDRESS='http://ray-cluster-ip-address:8265' ray job submit --working-dir `pwd` -- python3 tpcbench.py --benchmark tpch --data /path/to/data --queries /path/to/tpch/queries
```

### Kubernetes

Create a Docker image containing the TPC-H queries and push to a Docker registry that is accessible from the k8s cluster.

```shell
docker build -t YOURREPO/datafusion-ray-tpch .
```

If the data files are local to the k8s nodes, then create a persistent volume and persistent volume claim.

Create a `pv.yaml` with the following content and run `kubectl apply -f pv.yaml`.

```yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: ray-pv
spec:
storageClassName: manual
capacity:
storage: 10Gi
accessModes:
- ReadWriteOnce
hostPath:
path: "/mnt/bigdata" # Adjust the path as needed
```
Create a `pvc.yaml` with the following content and run `kubectl apply -f pvc.yaml`.

```yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: ray-pvc
spec:
storageClassName: manual # Should match the PV's storageClassName if static
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
```

Create the Ray cluster using the custom image.

Create a `ray-cluster.yaml` with the following content and run `kubectl apply -f ray-cluster.yaml`.

```yaml
apiVersion: ray.io/v1alpha1
kind: RayCluster
metadata:
name: datafusion-ray-cluster
spec:
headGroupSpec:
rayStartParams:
num-cpus: "1"
template:
spec:
containers:
- name: ray-head
image: YOURREPO/datafusion-ray-tpch:latest
volumeMounts:
- mountPath: /mnt/bigdata # Mount path inside the container
name: ray-storage
volumes:
- name: ray-storage
persistentVolumeClaim:
claimName: ray-pvc # Reference the PVC name here
workerGroupSpecs:
- replicas: 2
groupName: "datafusion-ray"
rayStartParams:
num-cpus: "4"
template:
spec:
containers:
- name: ray-worker
image: YOURREPO/datafusion-ray-tpch:latest
volumeMounts:
- mountPath: /mnt/bigdata
name: ray-storage
volumes:
- name: ray-storage
persistentVolumeClaim:
claimName: ray-pvc
```

Run the benchmarks

```shell
ray job submit --working-dir `pwd` -- python3 tpcbench.py --benchmark tpch --queries /home/ray/datafusion-benchmarks/tpch/queries/ --data /mnt/bigdata/tpch/sf100
```
22 changes: 18 additions & 4 deletions tpch/tpcbench.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import argparse
import ray
from datafusion import SessionContext, SessionConfig, RuntimeConfig
from datafusion_ray import DatafusionRayContext
from datetime import datetime
import json
Expand All @@ -41,22 +42,32 @@ def main(benchmark: str, data_path: str, query_path: str, concurrency: int):
# use ray job submit
ray.init()

ctx = DatafusionRayContext(concurrency)
runtime = (
RuntimeConfig()
)
config = (
SessionConfig()
.with_target_partitions(concurrency)
.set("datafusion.execution.parquet.pushdown_filters", "true")
)
df_ctx = SessionContext(config, runtime)

ray_ctx = DatafusionRayContext(df_ctx)

for table in table_names:
path = f"{data_path}/{table}.parquet"
print(f"Registering table {table} using path {path}")
ctx.register_parquet(table, path)
df_ctx.register_parquet(table, path)

results = {
'engine': 'datafusion-python',
'benchmark': benchmark,
'data_path': data_path,
'query_path': query_path,
'concurrency': concurrency,
}

for query in range(1, num_queries + 1):

# read text file
path = f"{query_path}/q{query}.sql"
print(f"Reading query {query} using path {path}")
Expand All @@ -70,7 +81,7 @@ def main(benchmark: str, data_path: str, query_path: str, concurrency: int):
sql = sql.strip()
if len(sql) > 0:
print(f"Executing: {sql}")
rows = ctx.sql(sql)
rows = ray_ctx.sql(sql)

print(f"Query {query} returned {len(rows)} rows")
end_time = time.time()
Expand All @@ -86,6 +97,9 @@ def main(benchmark: str, data_path: str, query_path: str, concurrency: int):
with open(results_path, "w") as f:
f.write(str)

# write results to stdout
print(str)

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="DataFusion benchmark derived from TPC-H / TPC-DS")
parser.add_argument("--benchmark", required=True, help="Benchmark to run (tpch or tpcds)")
Expand Down

0 comments on commit 8ee46ab

Please sign in to comment.