diff --git a/src/base/job_desc.proto b/src/base/job_desc.proto index dcd950437..4d5a1db9b 100644 --- a/src/base/job_desc.proto +++ b/src/base/job_desc.proto @@ -25,5 +25,9 @@ message JobDescriptor { JobState state = 3; TaskDescriptor root_task = 4; repeated bytes output_ids = 5; + // For gang scheduling jobs. + uint64 min_number_of_tasks = 6; + uint64 scheduled_tasks_count = 7; + bool is_gang_scheduling_job = 8; } diff --git a/src/scheduling/event_driven_scheduler.cc b/src/scheduling/event_driven_scheduler.cc index 1097450f0..d503138ae 100644 --- a/src/scheduling/event_driven_scheduler.cc +++ b/src/scheduling/event_driven_scheduler.cc @@ -101,6 +101,14 @@ EventDrivenScheduler::~EventDrivenScheduler() { void EventDrivenScheduler::AddJob(JobDescriptor* jd_ptr) { boost::lock_guard lock(scheduling_lock_); InsertOrUpdate(&jobs_to_schedule_, JobIDFromString(jd_ptr->uuid()), jd_ptr); + if (jd_ptr->is_gang_scheduling_job()) { + TaskDescriptor rtd = jd_ptr->root_task(); + if (rtd.has_affinity() && (rtd.affinity().has_pod_affinity() + || rtd.affinity().has_pod_anti_affinity())) { + vector delta_v; + InsertIfNotPresent(&affinity_job_to_deltas_, jd_ptr, delta_v); + } + } } void EventDrivenScheduler::BindTaskToResource(TaskDescriptor* td_ptr, @@ -658,7 +666,7 @@ void EventDrivenScheduler::LazyGraphReduction( if (queue_based_schedule == false || one_task_runnable == true) continue; for (auto task_itr = affinity_antiaffinity_tasks_->begin(); - task_itr != affinity_antiaffinity_tasks_->end();) { + task_itr != affinity_antiaffinity_tasks_->end(); ++task_itr) { TaskDescriptor* tdp = FindPtrOrNull(*task_map_, *task_itr); if (tdp) { if ((tdp->state() == TaskDescriptor::RUNNABLE) && diff --git a/src/scheduling/event_driven_scheduler.h b/src/scheduling/event_driven_scheduler.h index 477531404..70734ea50 100644 --- a/src/scheduling/event_driven_scheduler.h +++ b/src/scheduling/event_driven_scheduler.h @@ -189,6 +189,11 @@ class EventDrivenScheduler : public SchedulerInterface { //Pod affinity/anti-affinity bool one_task_runnable; bool queue_based_schedule; + //Gang schedule tasks deltas + unordered_set delta_jobs; + // Pod affinity/anti-affinity gang schedule tasks deltas + unordered_map> affinity_job_to_deltas_; + unordered_set affinity_delta_tasks; }; } // namespace scheduler diff --git a/src/scheduling/firmament_scheduler_service.cc b/src/scheduling/firmament_scheduler_service.cc index 6e56e36e9..287449821 100644 --- a/src/scheduling/firmament_scheduler_service.cc +++ b/src/scheduling/firmament_scheduler_service.cc @@ -191,18 +191,10 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { // requirements. scheduler_->ScheduleAllJobs(&sstat, &deltas); uint64_t total_unsched_tasks_size = 0; + vector unscheduled_normal_tasks; if (FLAGS_gather_unscheduled_tasks) { // Get unscheduled tasks of above scheduling round. - vector unscheduled_normal_tasks; cost_model_->GetUnscheduledTasks(&unscheduled_normal_tasks); - auto unscheduled_normal_tasks_ret = reply->mutable_unscheduled_tasks(); - for (auto& unsched_task : unscheduled_normal_tasks) { - uint64_t* unsched_task_ret = unscheduled_normal_tasks_ret->Add(); - *unsched_task_ret = unsched_task; - total_unsched_tasks_size++; - } - // Clear unscheduled tasks related maps and sets. - cost_model_->ClearUnscheduledTasksData(); } // Schedule tasks having pod affinity/anti-affinity. @@ -217,23 +209,42 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { TaskID_t task_id = dynamic_cast(scheduler_) ->GetSingleTaskTobeScheduled(); if (FLAGS_gather_unscheduled_tasks) { - if (!task_scheduled) { - if (unscheduled_affinity_tasks_set.find(task_id) == - unscheduled_affinity_tasks_set.end()) { - unscheduled_affinity_tasks_set.insert(task_id); - unscheduled_affinity_tasks.push_back(task_id); + TaskDescriptor* td_ptr = FindPtrOrNull(*task_map_, task_id); + CHECK_NOTNULL(td_ptr); + JobDescriptor* jd = + FindOrNull(*job_map_, JobIDFromString(td_ptr->job_id())); + if (!(jd->is_gang_scheduling_job())) { + if (!task_scheduled) { + if (unscheduled_affinity_tasks_set.find(task_id) == + unscheduled_affinity_tasks_set.end()) { + unscheduled_affinity_tasks_set.insert(task_id); + unscheduled_affinity_tasks.push_back(task_id); + } + } else { + unscheduled_affinity_tasks_set.erase(task_id); } - } else { - unscheduled_affinity_tasks_set.erase(task_id); } } clock_t stop = clock(); elapsed = (double)(stop - start) * 1000.0 / CLOCKS_PER_SEC; } + //For pod affinity/anti-affinity gang scheduling tasks + scheduler_->UpdateGangSchedulingDeltas(&sstat, &deltas, + &unscheduled_normal_tasks, + &unscheduled_affinity_tasks_set, + &unscheduled_affinity_tasks); + // Get unscheduled tasks of above scheduling round which tried scheduling // tasks having pod affinity/anti-affinity. And populate the same into // reply. if (FLAGS_gather_unscheduled_tasks) { + auto unscheduled_normal_tasks_ret = reply->mutable_unscheduled_tasks(); + for (auto& unsched_task : unscheduled_normal_tasks) { + uint64_t* unsched_task_ret = unscheduled_normal_tasks_ret->Add(); + *unsched_task_ret = unsched_task; + total_unsched_tasks_size++; + } + cost_model_->ClearUnscheduledTasksData(); auto unscheduled_affinity_tasks_ret = reply->mutable_unscheduled_tasks(); for (auto& unsched_task : unscheduled_affinity_tasks) { if (unscheduled_affinity_tasks_set.find(unsched_task) != diff --git a/src/scheduling/flow/flow_graph_manager.cc b/src/scheduling/flow/flow_graph_manager.cc index 283cb32fc..91e55819e 100644 --- a/src/scheduling/flow/flow_graph_manager.cc +++ b/src/scheduling/flow/flow_graph_manager.cc @@ -327,6 +327,11 @@ void FlowGraphManager::SchedulingDeltasForPreemptedTasks( // a PREEMPT delta because the task has finished. continue; } + if (task_node->td_ptr_->state() != TaskDescriptor::RUNNING) { + // For pod affinity/antiaffinity tasks from previous + // scheduling round not satisfying gang scheduling. + continue; + } const uint64_t* res_node_id = FindOrNull(task_mappings, task_node->id_); if (!res_node_id) { // The task doesn't exist in the mappings => the task has been diff --git a/src/scheduling/flow/flow_graph_manager.h b/src/scheduling/flow/flow_graph_manager.h index 3dfa533a7..350ddc482 100644 --- a/src/scheduling/flow/flow_graph_manager.h +++ b/src/scheduling/flow/flow_graph_manager.h @@ -143,7 +143,9 @@ class FlowGraphManager { inline CostModelInterface* cost_model() { return cost_model_; } - + inline const FlowGraphNode& node_for_node_id(uint64_t node_id) { + return graph_change_manager_->Node(node_id); + } private: FRIEND_TEST(DIMACSExporterTest, LargeGraph); FRIEND_TEST(DIMACSExporterTest, ScalabilityTestGraphs); diff --git a/src/scheduling/flow/flow_scheduler.cc b/src/scheduling/flow/flow_scheduler.cc index 6816af53b..57b192de8 100644 --- a/src/scheduling/flow/flow_scheduler.cc +++ b/src/scheduling/flow/flow_scheduler.cc @@ -202,6 +202,36 @@ uint64_t FlowScheduler::ApplySchedulingDeltas( ResourceStatus* rs = FindPtrOrNull(*resource_map_, res_id); CHECK_NOTNULL(td_ptr); CHECK_NOTNULL(rs); + JobDescriptor* jd = FindOrNull(*job_map_, + JobIDFromString(td_ptr->job_id())); + CHECK_NOTNULL(jd); + if (jd->is_gang_scheduling_job()) { + if (td_ptr->has_affinity() + && (td_ptr->affinity().has_pod_affinity() + || td_ptr->affinity().has_pod_anti_affinity())) { + if (queue_based_schedule) { + vector* delta_vec = + FindOrNull(affinity_job_to_deltas_, jd); + if (delta_vec) { + if (affinity_delta_tasks.find(delta->task_id()) + == affinity_delta_tasks.end()) { + delta_vec->push_back(*delta); + affinity_delta_tasks.insert(delta->task_id()); + } + } + } + } else { + uint64_t scheduled_tasks_count = jd->scheduled_tasks_count(); + if (scheduled_tasks_count < jd->min_number_of_tasks()) { + jd->set_scheduled_tasks_count(--scheduled_tasks_count); + delta->set_type(SchedulingDelta::NOOP); + if (delta_jobs.find(jd) == delta_jobs.end()) { + delta_jobs.insert(jd); + } + continue; + } + } + } if (delta->type() == SchedulingDelta::NOOP) { // We should not get any NOOP deltas as they get filtered before. continue; @@ -335,6 +365,10 @@ void FlowScheduler::HandleJobCompletion(JobID_t job_id) { void FlowScheduler::HandleJobRemoval(JobID_t job_id) { boost::lock_guard lock(scheduling_lock_); flow_graph_manager_->JobRemoved(job_id); + JobDescriptor* jdp = FindOrNull(*job_map_, job_id); + if (jdp) { + affinity_job_to_deltas_.erase(jdp); + } // Call into superclass handler EventDrivenScheduler::HandleJobRemoval(job_id); } @@ -370,6 +404,15 @@ void FlowScheduler::HandleTaskEviction(TaskDescriptor* td_ptr, boost::lock_guard lock(scheduling_lock_); flow_graph_manager_->TaskEvicted(td_ptr->uid(), ResourceIDFromString(rd_ptr->uuid())); + vector::iterator it = + find(affinity_antiaffinity_tasks_->begin(), + affinity_antiaffinity_tasks_->end(), td_ptr->uid()); + if (it == affinity_antiaffinity_tasks_->end()) { + affinity_antiaffinity_tasks_->push_back(*it); + } + if (FLAGS_pod_affinity_antiaffinity_symmetry) { + cost_model_->RemoveTaskFromTaskSymmetryMap(td_ptr); + } EventDrivenScheduler::HandleTaskEviction(td_ptr, rd_ptr); } @@ -745,6 +788,20 @@ uint64_t FlowScheduler::RunSchedulingIteration( flow_graph_manager_->NodeBindingToSchedulingDeltas(it->first, it->second, &task_bindings_, &deltas); + const FlowGraphNode& task_node = + flow_graph_manager_->node_for_node_id(it->first); + CHECK(task_node.IsTaskNode()); + CHECK_NOTNULL(task_node.td_ptr_); + TaskDescriptor* td_ptr = task_node.td_ptr_; + JobDescriptor* jd = FindOrNull(*job_map_, + JobIDFromString(td_ptr->job_id())); + CHECK_NOTNULL(jd); + if (jd->is_gang_scheduling_job() + && affinity_delta_tasks.find(td_ptr->uid()) + == affinity_delta_tasks.end()) { + uint64_t scheduled_tasks_count = jd->scheduled_tasks_count(); + jd->set_scheduled_tasks_count(++scheduled_tasks_count); + } } // Freeing the mappings because they're not used below. delete task_mappings; @@ -779,6 +836,9 @@ uint64_t FlowScheduler::RunSchedulingIteration( uint64_t num_scheduled = ApplySchedulingDeltas(deltas); if (deltas_output) { for (auto& delta : deltas) { + if (delta->type() == SchedulingDelta::NOOP) { + continue; + } deltas_output->push_back(*delta); } } @@ -802,5 +862,133 @@ void FlowScheduler::UpdateCostModelResourceStats() { boost::bind(&CostModelInterface::UpdateStats, cost_model_, _1, _2)); } +void FlowScheduler::AddKnowledgeBaseResourceStats(TaskDescriptor* td_ptr, + ResourceStatus* rs) { + ResourceStats resource_stats; + CpuStats* cpu_stats = resource_stats.add_cpus_stats(); + bool have_sample = knowledge_base_->GetLatestStatsForMachine( + ResourceIDFromString(rs->mutable_topology_node()->parent_id()), + &resource_stats); + if (have_sample) { + cpu_stats->set_cpu_allocatable( + cpu_stats->cpu_allocatable() + + td_ptr->resource_request().cpu_cores()); + resource_stats.set_mem_allocatable( + resource_stats.mem_allocatable() + + td_ptr->resource_request().ram_cap()); + resource_stats.set_ephemeral_storage_allocatable( + resource_stats.ephemeral_storage_allocatable() + + td_ptr->resource_request().ephemeral_storage()); + double cpu_utilization = + (cpu_stats->cpu_capacity() - cpu_stats->cpu_allocatable()) / + (double)cpu_stats->cpu_capacity(); + cpu_stats->set_cpu_utilization(cpu_utilization); + double mem_utilization = (resource_stats.mem_capacity() - + resource_stats.mem_allocatable()) / + (double)resource_stats.mem_capacity(); + resource_stats.set_mem_utilization(mem_utilization); + double ephemeral_storage_utilization = (resource_stats.ephemeral_storage_capacity() - + resource_stats.ephemeral_storage_allocatable()) / + (double)resource_stats.ephemeral_storage_capacity(); + resource_stats.set_ephemeral_storage_utilization(ephemeral_storage_utilization); + knowledge_base_->AddMachineSample(resource_stats); + } +} + +void FlowScheduler::UpdateGangSchedulingDeltas( + SchedulerStats* scheduler_stats, + vector* deltas_output, + vector* unscheduled_normal_tasks, + unordered_set* unscheduled_affinity_tasks_set, + vector* unscheduled_affinity_tasks) { + // update batch schedule deltas + for (auto job_ptr : delta_jobs) { + TaskDescriptor rtd = job_ptr->root_task(); + for (auto td : rtd.spawned()) { + vector::iterator it = find(unscheduled_normal_tasks->begin(), + unscheduled_normal_tasks->end(), + td.uid()); + if (it == unscheduled_normal_tasks->end()) { + unscheduled_normal_tasks->push_back(td.uid()); + } + } + delta_jobs.clear(); + + vector::iterator rit = find(unscheduled_normal_tasks->begin(), + unscheduled_normal_tasks->end(), + rtd.uid()); + if (rit == unscheduled_normal_tasks->end()) { + unscheduled_normal_tasks->push_back(rtd.uid()); + } + } + + // update queue schedule deltas + for (auto it = affinity_job_to_deltas_.begin(); + it != affinity_job_to_deltas_.end(); ++it) { + JobDescriptor* jd_ptr = it->first; + TaskDescriptor root_td = jd_ptr->root_task(); + if (!it->second.size()) { + for (auto td : root_td.spawned()) { + if (td.state() != TaskDescriptor::RUNNING) { + unscheduled_affinity_tasks_set->insert(td.uid()); + unscheduled_affinity_tasks->push_back(td.uid()); + } + } + if (root_td.state() != TaskDescriptor::RUNNING) { + unscheduled_affinity_tasks_set->insert(root_td.uid()); + unscheduled_affinity_tasks->push_back(root_td.uid()); + } + continue; + } + if (jd_ptr->scheduled_tasks_count() < jd_ptr->min_number_of_tasks()) { + for (auto delta : it->second) { + TaskDescriptor* td_ptr = FindPtrOrNull(*task_map_, delta.task_id()); + ResourceID_t res_id = ResourceIDFromString(delta.resource_id()); + ResourceStatus* rs = FindPtrOrNull(*resource_map_, res_id); + CHECK_NOTNULL(td_ptr); + CHECK_NOTNULL(rs); + if (FLAGS_resource_stats_update_based_on_resource_reservation) { + AddKnowledgeBaseResourceStats(td_ptr, rs); + } + HandleTaskEviction(td_ptr, rs->mutable_descriptor()); + td_ptr->set_state(TaskDescriptor::CREATED); + td_ptr->clear_scheduled_to_resource(); + JobID_t job_id = JobIDFromString(jd_ptr->uuid()); + unordered_set* runnables_for_job = + FindOrNull(runnable_tasks_, job_id); + if (runnables_for_job) { + runnables_for_job->erase(delta.task_id()); + } + vector::iterator it = + find_if(deltas_output->begin(), deltas_output->end(), + [&](SchedulingDelta& d){return (d.task_id() == delta.task_id());}); + if (it != deltas_output->end()) { + deltas_output->erase(it); + } + } + for (auto td : root_td.spawned()) { + unscheduled_affinity_tasks_set->insert(td.uid()); + unscheduled_affinity_tasks->push_back(td.uid()); + } + unscheduled_affinity_tasks_set->insert(root_td.uid()); + unscheduled_affinity_tasks->push_back(root_td.uid()); + } else { + for (auto td : root_td.spawned()) { + if (td.state() != TaskDescriptor::RUNNING) { + unscheduled_affinity_tasks_set->insert(td.uid()); + unscheduled_affinity_tasks->push_back(td.uid()); + } + } + if (root_td.state() != TaskDescriptor::RUNNING) { + unscheduled_affinity_tasks_set->insert(root_td.uid()); + unscheduled_affinity_tasks->push_back(root_td.uid()); + } + } + jd_ptr->set_scheduled_tasks_count(0); + it->second.clear(); + } + affinity_delta_tasks.clear(); +} + } // namespace scheduler } // namespace firmament diff --git a/src/scheduling/flow/flow_scheduler.h b/src/scheduling/flow/flow_scheduler.h index 7da026039..9cf309c30 100644 --- a/src/scheduling/flow/flow_scheduler.h +++ b/src/scheduling/flow/flow_scheduler.h @@ -93,6 +93,12 @@ class FlowScheduler : public EventDrivenScheduler { virtual uint64_t ScheduleAllJobs(SchedulerStats* scheduler_stats); virtual uint64_t ScheduleAllQueueJobs(SchedulerStats* scheduler_stats, vector* deltas); + virtual void UpdateGangSchedulingDeltas( + SchedulerStats* scheduler_stats, + vector* deltas_output, + vector* unscheduled_normal_tasks, + unordered_set* unscheduled_affinity_tasks_set, + vector* unscheduled_affinity_tasks); virtual uint64_t ScheduleAllJobs(SchedulerStats* scheduler_stats, vector* deltas); virtual uint64_t ScheduleJob(JobDescriptor* jd_ptr, @@ -135,6 +141,8 @@ class FlowScheduler : public EventDrivenScheduler { uint64_t RunSchedulingIteration(SchedulerStats* scheduler_stats, vector* deltas_output, vector* job_vector); void UpdateCostModelResourceStats(); + void AddKnowledgeBaseResourceStats(TaskDescriptor* td_ptr, + ResourceStatus* rs); // Pointer to the coordinator's topology manager shared_ptr topology_manager_; diff --git a/src/scheduling/scheduler_interface.h b/src/scheduling/scheduler_interface.h index 81610eef6..0c6c2897b 100644 --- a/src/scheduling/scheduler_interface.h +++ b/src/scheduling/scheduler_interface.h @@ -247,14 +247,25 @@ class SchedulerInterface : public PrintableInterface { virtual uint64_t ScheduleAllJobs(SchedulerStats* scheduler_stats) = 0; virtual uint64_t ScheduleAllJobs(SchedulerStats* scheduler_stats, vector* deltas) = 0; - /** + /** * Runs a scheduling iteration for all active queue based jobs. * @return the number of tasks scheduled */ virtual uint64_t ScheduleAllQueueJobs(SchedulerStats* scheduler_stats, vector* deltas) { - return 0; -} + return 0; + } + + /** + * Updates deltas for pod affinity/anti-affinity gang + * scheduling tasks + */ + virtual void UpdateGangSchedulingDeltas( + SchedulerStats* scheduler_stats, + vector* deltas_output, + vector* unscheduled_normal_tasks, + unordered_set* unscheduled_affinity_tasks_set, + vector* unscheduled_affinity_tasks) {} /** * Schedules all runnable tasks in a job.