Skip to content

Commit

Permalink
Wrap up tpch loader implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
1ntEgr8 committed Sep 23, 2024
1 parent 9f6aa8b commit be66704
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 43 deletions.
9 changes: 7 additions & 2 deletions configs/tpch_test.conf → configs/tpch_replay.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,19 @@
--scheduler=EDF
--scheduler_runtime=0
--enforce_deadlines
--min_deadline_variance=25
--max_deadline_variance=50

# Execution mode configs.
--execution_mode=replay
--replay_trace=tpch

# Release time config.
--override_arrival_period=5
--override_num_invocation=10
--override_release_policy=gamma
--override_gamma_coefficient=1
--override_poisson_arrival_rate=0.04
--override_num_invocation=50

# TPCH flags
--tpch_query_dag_spec=profiles/workload/tpch/queries.yaml
--worker_profile_path=profiles/workers/tpch_cluster.yaml
68 changes: 41 additions & 27 deletions data/tpch_loader.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import sys
import random
from pathlib import Path

from typing import Any, Dict, List, Optional
from pathlib import Path
Expand All @@ -8,8 +8,6 @@
import numpy as np
import yaml

from more_itertools import before_and_after

from utils import EventTime
from workload import (
Workload,
Expand All @@ -25,14 +23,6 @@
from .base_workload_loader import BaseWorkloadLoader


"""
- [ ] Release policy based on workload
- [ ] Fix current time setting
- [ ] Configure deadline variance
- [ ] Configure release policy
"""


class TpchLoader(BaseWorkloadLoader):
"""Loads the TPCH trace from the provided file
Expand All @@ -45,36 +35,46 @@ def __init__(self, path: str, flags: "absl.flags") -> None:
self._flags = flags
self._rng_seed = flags.random_seed
self._rng = random.Random(self._rng_seed)
self._loop_timeout = flags.loop_timeout
self._num_queries = flags.tpch_num_queries
self._dataset_size = flags.tpch_dataset_size
if flags.workload_profile_path:
self._workload_profile_path = str(
Path(flags.workload_profile_path) / f"{self._dataset_size}g"
)
if flags.workload_update_interval > 0:
self._workload_update_interval = flags.workload_update_interval
else:
self._workload_profile_path = "./profiles/workload/tpch/decima/2g"
self._workload_update_interval = EventTime(10, EventTime.Unit.US)
self._workload_update_interval = EventTime(sys.maxsize, EventTime.Unit.US)
release_policy = self._get_release_policy()
self._release_times = release_policy.get_release_times(
completion_time=EventTime(self._flags.loop_timeout, EventTime.Unit.US)
)
self._current_release_pointer = 0

# Set up query name to job graph mapping

with open(path, "r") as f:
workload_data = yaml.safe_load(f)

if flags.workload_profile_path:
workload_profile_path = str(
Path(flags.workload_profile_path) / f"{flags.s.tpch_dataset_size}g"
)
else:
workload_profile_path = "./profiles/workload/tpch/decima/2g"

job_graphs = {}
for query in workload_data["graphs"]:
query_name = query["name"]
graph = query["graph"]
job_graph = TpchLoader.make_job_graph(
query_name=query_name,
graph=graph,
profile_path=self._workload_profile_path,
profile_path=workload_profile_path,
deadline_variance=(
int(flags.min_deadline_variance),
int(flags.max_deadline_variance),
)
)
job_graphs[query_name] = job_graph

self._job_graphs = job_graphs

# Initialize workload
self._workload = Workload.empty(flags)

def _get_release_policy(self):
Expand Down Expand Up @@ -134,12 +134,11 @@ def _get_release_policy(self):

@staticmethod
def make_job_graph(
query_name: str, graph: List[Dict[str, Any]], profile_path: str
query_name: str, graph: List[Dict[str, Any]], profile_path: str, deadline_variance=(0,0),
) -> JobGraph:
job_graph = JobGraph(
name=query_name,
# TODO: make configurable
deadline_variance=(10, 50),
deadline_variance=deadline_variance,
)

query_num = int(query_name[1:])
Expand Down Expand Up @@ -248,9 +247,24 @@ def pre_process_task_duration(task_duration):
return stage_info

def get_next_workload(self, current_time: EventTime) -> Optional[Workload]:
if len(self._release_times) == 0:
to_release = []
while (
self._current_release_pointer < len(self._release_times)
and self._release_times[self._current_release_pointer]
<= current_time + self._workload_update_interval
):
to_release.append(
self._release_times[self._current_release_pointer]
)
self._current_release_pointer += 1

if (
self._current_release_pointer >= len(self._release_times)
and len(to_release) == 0
):
# Nothing left to release
return None
to_release, self._release_times = before_and_after(lambda t: t <= current_time + self._workload_update_interval, self._release_times)

for t in to_release:
query_num = self._rng.randint(1, len(self._job_graphs))
query_name = f"Q{query_num}"
Expand All @@ -260,7 +274,7 @@ def get_next_workload(self, current_time: EventTime) -> Optional[Workload]:
_flags=self._flags,
)
self._workload.add_task_graph(task_graph)
self._release_times = list(self._release_times)

return self._workload


Expand Down
6 changes: 6 additions & 0 deletions profiles/workers/tpch_cluster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
- name: WorkerPool_1
workers:
- name: Worker_1_1
resources:
- name: Slot
quantity: 500
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,3 @@ cplex
pre-commit
black
isort
more-itertools
26 changes: 13 additions & 13 deletions simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1539,19 +1539,19 @@ def __handle_update_workload(self, event: Event) -> None:
len(releasable_tasks),
)

# Add the TaskGraphRelease events into the system.
for task_graph_name, task_graph in self._workload.task_graphs.items():
event = Event(
event_type=EventType.TASK_GRAPH_RELEASE,
time=task_graph.release_time,
task_graph=task_graph_name,
)
self._event_queue.add_event(event)
self._logger.info(
"[%s] Added %s to the event queue.",
self._simulator_time.to(EventTime.Unit.US).time,
event,
)
# # Add the TaskGraphRelease events into the system.
# for task_graph_name, task_graph in self._workload.task_graphs.items():
# event = Event(
# event_type=EventType.TASK_GRAPH_RELEASE,
# time=task_graph.release_time,
# task_graph=task_graph_name,
# )
# self._event_queue.add_event(event)
# self._logger.info(
# "[%s] Added %s to the event queue.",
# self._simulator_time.to(EventTime.Unit.US).time,
# event,
# )

max_release_time = self._simulator_time
for task in releasable_tasks:
Expand Down

0 comments on commit be66704

Please sign in to comment.