Skip to content

Commit

Permalink
Merge pull request #16 from shivramsrivastava/gang_scheduling_feature
Browse files Browse the repository at this point in the history
Gang scheduling feature
  • Loading branch information
shivramsrivastava authored Nov 13, 2018
2 parents 63087f7 + 21447c8 commit 04bb18e
Show file tree
Hide file tree
Showing 9 changed files with 263 additions and 21 deletions.
4 changes: 4 additions & 0 deletions src/base/job_desc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,9 @@ message JobDescriptor {
JobState state = 3;
TaskDescriptor root_task = 4;
repeated bytes output_ids = 5;
// For gang scheduling jobs.
uint64 min_number_of_tasks = 6;
uint64 scheduled_tasks_count = 7;
bool is_gang_scheduling_job = 8;
}

10 changes: 9 additions & 1 deletion src/scheduling/event_driven_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ EventDrivenScheduler::~EventDrivenScheduler() {
void EventDrivenScheduler::AddJob(JobDescriptor* jd_ptr) {
boost::lock_guard<boost::recursive_mutex> lock(scheduling_lock_);
InsertOrUpdate(&jobs_to_schedule_, JobIDFromString(jd_ptr->uuid()), jd_ptr);
if (jd_ptr->is_gang_scheduling_job()) {
TaskDescriptor rtd = jd_ptr->root_task();
if (rtd.has_affinity() && (rtd.affinity().has_pod_affinity()
|| rtd.affinity().has_pod_anti_affinity())) {
vector<SchedulingDelta> delta_v;
InsertIfNotPresent(&affinity_job_to_deltas_, jd_ptr, delta_v);
}
}
}

void EventDrivenScheduler::BindTaskToResource(TaskDescriptor* td_ptr,
Expand Down Expand Up @@ -658,7 +666,7 @@ void EventDrivenScheduler::LazyGraphReduction(
if (queue_based_schedule == false || one_task_runnable == true)
continue;
for (auto task_itr = affinity_antiaffinity_tasks_->begin();
task_itr != affinity_antiaffinity_tasks_->end();) {
task_itr != affinity_antiaffinity_tasks_->end(); ++task_itr) {
TaskDescriptor* tdp = FindPtrOrNull(*task_map_, *task_itr);
if (tdp) {
if ((tdp->state() == TaskDescriptor::RUNNABLE) &&
Expand Down
5 changes: 5 additions & 0 deletions src/scheduling/event_driven_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ class EventDrivenScheduler : public SchedulerInterface {
//Pod affinity/anti-affinity
bool one_task_runnable;
bool queue_based_schedule;
//Gang schedule tasks deltas
unordered_set<JobDescriptor*> delta_jobs;
// Pod affinity/anti-affinity gang schedule tasks deltas
unordered_map<JobDescriptor*, vector<SchedulingDelta>> affinity_job_to_deltas_;
unordered_set<uint64_t> affinity_delta_tasks;
};

} // namespace scheduler
Expand Down
43 changes: 27 additions & 16 deletions src/scheduling/firmament_scheduler_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,18 +191,10 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
// requirements.
scheduler_->ScheduleAllJobs(&sstat, &deltas);
uint64_t total_unsched_tasks_size = 0;
vector<uint64_t> unscheduled_normal_tasks;
if (FLAGS_gather_unscheduled_tasks) {
// Get unscheduled tasks of above scheduling round.
vector<uint64_t> unscheduled_normal_tasks;
cost_model_->GetUnscheduledTasks(&unscheduled_normal_tasks);
auto unscheduled_normal_tasks_ret = reply->mutable_unscheduled_tasks();
for (auto& unsched_task : unscheduled_normal_tasks) {
uint64_t* unsched_task_ret = unscheduled_normal_tasks_ret->Add();
*unsched_task_ret = unsched_task;
total_unsched_tasks_size++;
}
// Clear unscheduled tasks related maps and sets.
cost_model_->ClearUnscheduledTasksData();
}

// Schedule tasks having pod affinity/anti-affinity.
Expand All @@ -217,23 +209,42 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
TaskID_t task_id = dynamic_cast<FlowScheduler*>(scheduler_)
->GetSingleTaskTobeScheduled();
if (FLAGS_gather_unscheduled_tasks) {
if (!task_scheduled) {
if (unscheduled_affinity_tasks_set.find(task_id) ==
unscheduled_affinity_tasks_set.end()) {
unscheduled_affinity_tasks_set.insert(task_id);
unscheduled_affinity_tasks.push_back(task_id);
TaskDescriptor* td_ptr = FindPtrOrNull(*task_map_, task_id);
CHECK_NOTNULL(td_ptr);
JobDescriptor* jd =
FindOrNull(*job_map_, JobIDFromString(td_ptr->job_id()));
if (!(jd->is_gang_scheduling_job())) {
if (!task_scheduled) {
if (unscheduled_affinity_tasks_set.find(task_id) ==
unscheduled_affinity_tasks_set.end()) {
unscheduled_affinity_tasks_set.insert(task_id);
unscheduled_affinity_tasks.push_back(task_id);
}
} else {
unscheduled_affinity_tasks_set.erase(task_id);
}
} else {
unscheduled_affinity_tasks_set.erase(task_id);
}
}
clock_t stop = clock();
elapsed = (double)(stop - start) * 1000.0 / CLOCKS_PER_SEC;
}
//For pod affinity/anti-affinity gang scheduling tasks
scheduler_->UpdateGangSchedulingDeltas(&sstat, &deltas,
&unscheduled_normal_tasks,
&unscheduled_affinity_tasks_set,
&unscheduled_affinity_tasks);

// Get unscheduled tasks of above scheduling round which tried scheduling
// tasks having pod affinity/anti-affinity. And populate the same into
// reply.
if (FLAGS_gather_unscheduled_tasks) {
auto unscheduled_normal_tasks_ret = reply->mutable_unscheduled_tasks();
for (auto& unsched_task : unscheduled_normal_tasks) {
uint64_t* unsched_task_ret = unscheduled_normal_tasks_ret->Add();
*unsched_task_ret = unsched_task;
total_unsched_tasks_size++;
}
cost_model_->ClearUnscheduledTasksData();
auto unscheduled_affinity_tasks_ret = reply->mutable_unscheduled_tasks();
for (auto& unsched_task : unscheduled_affinity_tasks) {
if (unscheduled_affinity_tasks_set.find(unsched_task) !=
Expand Down
5 changes: 5 additions & 0 deletions src/scheduling/flow/flow_graph_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,11 @@ void FlowGraphManager::SchedulingDeltasForPreemptedTasks(
// a PREEMPT delta because the task has finished.
continue;
}
if (task_node->td_ptr_->state() != TaskDescriptor::RUNNING) {
// For pod affinity/antiaffinity tasks from previous
// scheduling round not satisfying gang scheduling.
continue;
}
const uint64_t* res_node_id = FindOrNull(task_mappings, task_node->id_);
if (!res_node_id) {
// The task doesn't exist in the mappings => the task has been
Expand Down
4 changes: 3 additions & 1 deletion src/scheduling/flow/flow_graph_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ class FlowGraphManager {
inline CostModelInterface* cost_model() {
return cost_model_;
}

inline const FlowGraphNode& node_for_node_id(uint64_t node_id) {
return graph_change_manager_->Node(node_id);
}
private:
FRIEND_TEST(DIMACSExporterTest, LargeGraph);
FRIEND_TEST(DIMACSExporterTest, ScalabilityTestGraphs);
Expand Down
188 changes: 188 additions & 0 deletions src/scheduling/flow/flow_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,36 @@ uint64_t FlowScheduler::ApplySchedulingDeltas(
ResourceStatus* rs = FindPtrOrNull(*resource_map_, res_id);
CHECK_NOTNULL(td_ptr);
CHECK_NOTNULL(rs);
JobDescriptor* jd = FindOrNull(*job_map_,
JobIDFromString(td_ptr->job_id()));
CHECK_NOTNULL(jd);
if (jd->is_gang_scheduling_job()) {
if (td_ptr->has_affinity()
&& (td_ptr->affinity().has_pod_affinity()
|| td_ptr->affinity().has_pod_anti_affinity())) {
if (queue_based_schedule) {
vector<SchedulingDelta>* delta_vec =
FindOrNull(affinity_job_to_deltas_, jd);
if (delta_vec) {
if (affinity_delta_tasks.find(delta->task_id())
== affinity_delta_tasks.end()) {
delta_vec->push_back(*delta);
affinity_delta_tasks.insert(delta->task_id());
}
}
}
} else {
uint64_t scheduled_tasks_count = jd->scheduled_tasks_count();
if (scheduled_tasks_count < jd->min_number_of_tasks()) {
jd->set_scheduled_tasks_count(--scheduled_tasks_count);
delta->set_type(SchedulingDelta::NOOP);
if (delta_jobs.find(jd) == delta_jobs.end()) {
delta_jobs.insert(jd);
}
continue;
}
}
}
if (delta->type() == SchedulingDelta::NOOP) {
// We should not get any NOOP deltas as they get filtered before.
continue;
Expand Down Expand Up @@ -335,6 +365,10 @@ void FlowScheduler::HandleJobCompletion(JobID_t job_id) {
void FlowScheduler::HandleJobRemoval(JobID_t job_id) {
boost::lock_guard<boost::recursive_mutex> lock(scheduling_lock_);
flow_graph_manager_->JobRemoved(job_id);
JobDescriptor* jdp = FindOrNull(*job_map_, job_id);
if (jdp) {
affinity_job_to_deltas_.erase(jdp);
}
// Call into superclass handler
EventDrivenScheduler::HandleJobRemoval(job_id);
}
Expand Down Expand Up @@ -370,6 +404,15 @@ void FlowScheduler::HandleTaskEviction(TaskDescriptor* td_ptr,
boost::lock_guard<boost::recursive_mutex> lock(scheduling_lock_);
flow_graph_manager_->TaskEvicted(td_ptr->uid(),
ResourceIDFromString(rd_ptr->uuid()));
vector<TaskID_t>::iterator it =
find(affinity_antiaffinity_tasks_->begin(),
affinity_antiaffinity_tasks_->end(), td_ptr->uid());
if (it == affinity_antiaffinity_tasks_->end()) {
affinity_antiaffinity_tasks_->push_back(*it);
}
if (FLAGS_pod_affinity_antiaffinity_symmetry) {
cost_model_->RemoveTaskFromTaskSymmetryMap(td_ptr);
}
EventDrivenScheduler::HandleTaskEviction(td_ptr, rd_ptr);
}

Expand Down Expand Up @@ -745,6 +788,20 @@ uint64_t FlowScheduler::RunSchedulingIteration(
flow_graph_manager_->NodeBindingToSchedulingDeltas(it->first, it->second,
&task_bindings_,
&deltas);
const FlowGraphNode& task_node =
flow_graph_manager_->node_for_node_id(it->first);
CHECK(task_node.IsTaskNode());
CHECK_NOTNULL(task_node.td_ptr_);
TaskDescriptor* td_ptr = task_node.td_ptr_;
JobDescriptor* jd = FindOrNull(*job_map_,
JobIDFromString(td_ptr->job_id()));
CHECK_NOTNULL(jd);
if (jd->is_gang_scheduling_job()
&& affinity_delta_tasks.find(td_ptr->uid())
== affinity_delta_tasks.end()) {
uint64_t scheduled_tasks_count = jd->scheduled_tasks_count();
jd->set_scheduled_tasks_count(++scheduled_tasks_count);
}
}
// Freeing the mappings because they're not used below.
delete task_mappings;
Expand Down Expand Up @@ -779,6 +836,9 @@ uint64_t FlowScheduler::RunSchedulingIteration(
uint64_t num_scheduled = ApplySchedulingDeltas(deltas);
if (deltas_output) {
for (auto& delta : deltas) {
if (delta->type() == SchedulingDelta::NOOP) {
continue;
}
deltas_output->push_back(*delta);
}
}
Expand All @@ -802,5 +862,133 @@ void FlowScheduler::UpdateCostModelResourceStats() {
boost::bind(&CostModelInterface::UpdateStats, cost_model_, _1, _2));
}

void FlowScheduler::AddKnowledgeBaseResourceStats(TaskDescriptor* td_ptr,
ResourceStatus* rs) {
ResourceStats resource_stats;
CpuStats* cpu_stats = resource_stats.add_cpus_stats();
bool have_sample = knowledge_base_->GetLatestStatsForMachine(
ResourceIDFromString(rs->mutable_topology_node()->parent_id()),
&resource_stats);
if (have_sample) {
cpu_stats->set_cpu_allocatable(
cpu_stats->cpu_allocatable() +
td_ptr->resource_request().cpu_cores());
resource_stats.set_mem_allocatable(
resource_stats.mem_allocatable() +
td_ptr->resource_request().ram_cap());
resource_stats.set_ephemeral_storage_allocatable(
resource_stats.ephemeral_storage_allocatable() +
td_ptr->resource_request().ephemeral_storage());
double cpu_utilization =
(cpu_stats->cpu_capacity() - cpu_stats->cpu_allocatable()) /
(double)cpu_stats->cpu_capacity();
cpu_stats->set_cpu_utilization(cpu_utilization);
double mem_utilization = (resource_stats.mem_capacity() -
resource_stats.mem_allocatable()) /
(double)resource_stats.mem_capacity();
resource_stats.set_mem_utilization(mem_utilization);
double ephemeral_storage_utilization = (resource_stats.ephemeral_storage_capacity() -
resource_stats.ephemeral_storage_allocatable()) /
(double)resource_stats.ephemeral_storage_capacity();
resource_stats.set_ephemeral_storage_utilization(ephemeral_storage_utilization);
knowledge_base_->AddMachineSample(resource_stats);
}
}

void FlowScheduler::UpdateGangSchedulingDeltas(
SchedulerStats* scheduler_stats,
vector<SchedulingDelta>* deltas_output,
vector<uint64_t>* unscheduled_normal_tasks,
unordered_set<uint64_t>* unscheduled_affinity_tasks_set,
vector<uint64_t>* unscheduled_affinity_tasks) {
// update batch schedule deltas
for (auto job_ptr : delta_jobs) {
TaskDescriptor rtd = job_ptr->root_task();
for (auto td : rtd.spawned()) {
vector<uint64_t>::iterator it = find(unscheduled_normal_tasks->begin(),
unscheduled_normal_tasks->end(),
td.uid());
if (it == unscheduled_normal_tasks->end()) {
unscheduled_normal_tasks->push_back(td.uid());
}
}
delta_jobs.clear();

vector<uint64_t>::iterator rit = find(unscheduled_normal_tasks->begin(),
unscheduled_normal_tasks->end(),
rtd.uid());
if (rit == unscheduled_normal_tasks->end()) {
unscheduled_normal_tasks->push_back(rtd.uid());
}
}

// update queue schedule deltas
for (auto it = affinity_job_to_deltas_.begin();
it != affinity_job_to_deltas_.end(); ++it) {
JobDescriptor* jd_ptr = it->first;
TaskDescriptor root_td = jd_ptr->root_task();
if (!it->second.size()) {
for (auto td : root_td.spawned()) {
if (td.state() != TaskDescriptor::RUNNING) {
unscheduled_affinity_tasks_set->insert(td.uid());
unscheduled_affinity_tasks->push_back(td.uid());
}
}
if (root_td.state() != TaskDescriptor::RUNNING) {
unscheduled_affinity_tasks_set->insert(root_td.uid());
unscheduled_affinity_tasks->push_back(root_td.uid());
}
continue;
}
if (jd_ptr->scheduled_tasks_count() < jd_ptr->min_number_of_tasks()) {
for (auto delta : it->second) {
TaskDescriptor* td_ptr = FindPtrOrNull(*task_map_, delta.task_id());
ResourceID_t res_id = ResourceIDFromString(delta.resource_id());
ResourceStatus* rs = FindPtrOrNull(*resource_map_, res_id);
CHECK_NOTNULL(td_ptr);
CHECK_NOTNULL(rs);
if (FLAGS_resource_stats_update_based_on_resource_reservation) {
AddKnowledgeBaseResourceStats(td_ptr, rs);
}
HandleTaskEviction(td_ptr, rs->mutable_descriptor());
td_ptr->set_state(TaskDescriptor::CREATED);
td_ptr->clear_scheduled_to_resource();
JobID_t job_id = JobIDFromString(jd_ptr->uuid());
unordered_set<TaskID_t>* runnables_for_job =
FindOrNull(runnable_tasks_, job_id);
if (runnables_for_job) {
runnables_for_job->erase(delta.task_id());
}
vector<SchedulingDelta>::iterator it =
find_if(deltas_output->begin(), deltas_output->end(),
[&](SchedulingDelta& d){return (d.task_id() == delta.task_id());});
if (it != deltas_output->end()) {
deltas_output->erase(it);
}
}
for (auto td : root_td.spawned()) {
unscheduled_affinity_tasks_set->insert(td.uid());
unscheduled_affinity_tasks->push_back(td.uid());
}
unscheduled_affinity_tasks_set->insert(root_td.uid());
unscheduled_affinity_tasks->push_back(root_td.uid());
} else {
for (auto td : root_td.spawned()) {
if (td.state() != TaskDescriptor::RUNNING) {
unscheduled_affinity_tasks_set->insert(td.uid());
unscheduled_affinity_tasks->push_back(td.uid());
}
}
if (root_td.state() != TaskDescriptor::RUNNING) {
unscheduled_affinity_tasks_set->insert(root_td.uid());
unscheduled_affinity_tasks->push_back(root_td.uid());
}
}
jd_ptr->set_scheduled_tasks_count(0);
it->second.clear();
}
affinity_delta_tasks.clear();
}

} // namespace scheduler
} // namespace firmament
8 changes: 8 additions & 0 deletions src/scheduling/flow/flow_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ class FlowScheduler : public EventDrivenScheduler {
virtual uint64_t ScheduleAllJobs(SchedulerStats* scheduler_stats);
virtual uint64_t ScheduleAllQueueJobs(SchedulerStats* scheduler_stats,
vector<SchedulingDelta>* deltas);
virtual void UpdateGangSchedulingDeltas(
SchedulerStats* scheduler_stats,
vector<SchedulingDelta>* deltas_output,
vector<uint64_t>* unscheduled_normal_tasks,
unordered_set<uint64_t>* unscheduled_affinity_tasks_set,
vector<uint64_t>* unscheduled_affinity_tasks);
virtual uint64_t ScheduleAllJobs(SchedulerStats* scheduler_stats,
vector<SchedulingDelta>* deltas);
virtual uint64_t ScheduleJob(JobDescriptor* jd_ptr,
Expand Down Expand Up @@ -135,6 +141,8 @@ class FlowScheduler : public EventDrivenScheduler {
uint64_t RunSchedulingIteration(SchedulerStats* scheduler_stats,
vector<SchedulingDelta>* deltas_output, vector<JobDescriptor*>* job_vector);
void UpdateCostModelResourceStats();
void AddKnowledgeBaseResourceStats(TaskDescriptor* td_ptr,
ResourceStatus* rs);

// Pointer to the coordinator's topology manager
shared_ptr<TopologyManager> topology_manager_;
Expand Down
Loading

0 comments on commit 04bb18e

Please sign in to comment.