From 97d14fc34552cf0c2da7ad2cebc728458a8bf893 Mon Sep 17 00:00:00 2001 From: shivramsrivastava Date: Mon, 18 Jun 2018 13:05:53 +0000 Subject: [PATCH] Resource stats static calculation improved, changes made to gather cpu, running tasks and slots at PU to machine Affected modules: scheduling --- src/scheduling/firmament_scheduler_service.cc | 148 +++++++++--------- src/scheduling/flow/cpu_cost_model.cc | 92 +++++++---- src/scheduling/flow/cpu_cost_model.h | 10 +- src/scheduling/flow/cpu_cost_model_test.cc | 10 +- src/scheduling/flow/flow_scheduler.cc | 80 +++++++--- 5 files changed, 204 insertions(+), 136 deletions(-) diff --git a/src/scheduling/firmament_scheduler_service.cc b/src/scheduling/firmament_scheduler_service.cc index a924ff0c7..f6c254927 100644 --- a/src/scheduling/firmament_scheduler_service.cc +++ b/src/scheduling/firmament_scheduler_service.cc @@ -57,6 +57,7 @@ DEFINE_string(firmament_scheduler_service_address, "127.0.0.1", "The address of the scheduler service"); DEFINE_string(firmament_scheduler_service_port, "9090", "The port of the scheduler service"); +DECLARE_bool(resource_stats_update_based_on_resource_reservation); DEFINE_string(service_scheduler, "flow", "Scheduler to use: flow | simple"); DEFINE_uint64(queue_based_scheduling_time, 1, "Queue Based Schedule run time"); @@ -117,21 +118,43 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { // TODO(ionel): Implement! } - void UpdateMachineSamplesToKnowledgeBaseStatically(const TaskDescriptor* td_ptr, bool add) { + // Helper function that update the knowledge base with resource stats samples + // based on task resource request reservation. We can use this function when + // we do not have external dynamic resource stats provider like heapster in + // kubernetes. If add is true, then tast resource request is subtracted from + // available machine resources. else tast resource request is added back to + // available machine resources. + void UpdateMachineSamplesToKnowledgeBaseStatically( + const TaskDescriptor* td_ptr, bool add) { ResourceID_t res_id = ResourceIDFromString(td_ptr->scheduled_to_resource()); ResourceStatus* rs = FindPtrOrNull(*resource_map_, res_id); ResourceStats resource_stats; CpuStats* cpu_stats = resource_stats.add_cpus_stats(); - bool have_sample = - knowledge_base_->GetLatestStatsForMachine(ResourceIDFromString(rs->mutable_topology_node()->parent_id()), &resource_stats); + bool have_sample = knowledge_base_->GetLatestStatsForMachine( + ResourceIDFromString(rs->mutable_topology_node()->parent_id()), + &resource_stats); if (have_sample) { if (add) { - cpu_stats->set_cpu_allocatable(cpu_stats->cpu_allocatable() + td_ptr->resource_request().cpu_cores()); - resource_stats.set_mem_allocatable(resource_stats.mem_allocatable() + td_ptr->resource_request().ram_cap()); + cpu_stats->set_cpu_allocatable(cpu_stats->cpu_allocatable() + + td_ptr->resource_request().cpu_cores()); + resource_stats.set_mem_allocatable( + resource_stats.mem_allocatable() + + td_ptr->resource_request().ram_cap()); } else { - cpu_stats->set_cpu_allocatable(cpu_stats->cpu_allocatable() - td_ptr->resource_request().cpu_cores()); - resource_stats.set_mem_allocatable(resource_stats.mem_allocatable() - td_ptr->resource_request().ram_cap()); + cpu_stats->set_cpu_allocatable(cpu_stats->cpu_allocatable() - + td_ptr->resource_request().cpu_cores()); + resource_stats.set_mem_allocatable( + resource_stats.mem_allocatable() - + td_ptr->resource_request().ram_cap()); } + double cpu_utilization = + (cpu_stats->cpu_capacity() - cpu_stats->cpu_allocatable()) / + (double)cpu_stats->cpu_capacity(); + cpu_stats->set_cpu_utilization(cpu_utilization); + double mem_utilization = + (resource_stats.mem_capacity() - resource_stats.mem_allocatable()) / + (double)resource_stats.mem_capacity(); + resource_stats.set_mem_utilization(mem_utilization); knowledge_base_->AddMachineSample(resource_stats); } } @@ -141,11 +164,6 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { SchedulerStats sstat; vector deltas; scheduler_->ScheduleAllJobs(&sstat, &deltas); - // Extract results - if (deltas.size()) { - LOG(INFO) << "Got " << deltas.size() << " scheduling deltas"; - } - // Schedule tasks having pod affinity/anti-affinity chrono::high_resolution_clock::time_point start = chrono::high_resolution_clock::now(); @@ -158,11 +176,10 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { chrono::high_resolution_clock::now(); time_spent = chrono::duration_cast(end - start); } - if(deltas.size()) { - LOG(INFO) << "QueueBasedSchedule: Got " << deltas.size() - << " scheduling deltas"; + // Extract results + if (deltas.size()) { + LOG(INFO) << "Got " << deltas.size() << " scheduling deltas"; } - for (auto& d : deltas) { // LOG(INFO) << "Delta: " << d.DebugString(); SchedulingDelta* ret_delta = reply->add_deltas(); @@ -215,7 +232,6 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { } } - Status TaskCompleted(ServerContext* context, const TaskUID* tid_ptr, TaskCompletedResponse* reply) override { TaskDescriptor* td_ptr = FindPtrOrNull(*task_map_, tid_ptr->task_uid()); @@ -223,11 +239,10 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { reply->set_type(TaskReplyType::TASK_NOT_FOUND); return Status::OK; } - // TODO(jagadish): We need to remove below code once we start - // getting machine resource stats samples from poseidon i.e., heapster. - // Currently updating machine samples statically based on state of pod. - if (!td_ptr->scheduled_to_resource().empty()) { - UpdateMachineSamplesToKnowledgeBaseStatically(td_ptr, true); + if (FLAGS_resource_stats_update_based_on_resource_reservation) { + if (!td_ptr->scheduled_to_resource().empty()) { + UpdateMachineSamplesToKnowledgeBaseStatically(td_ptr, true); + } } JobID_t job_id = JobIDFromString(td_ptr->job_id()); JobDescriptor* jd_ptr = FindOrNull(*job_map_, job_id); @@ -261,11 +276,10 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { reply->set_type(TaskReplyType::TASK_NOT_FOUND); return Status::OK; } - // TODO(jagadish): We need to remove below code once we start - // getting machine resource stats samples from poseidon i.e., heapster. - // Currently updating machine samples statically based on state of pod. - if (!td_ptr->scheduled_to_resource().empty()) { - UpdateMachineSamplesToKnowledgeBaseStatically(td_ptr, true); + if (FLAGS_resource_stats_update_based_on_resource_reservation) { + if (!td_ptr->scheduled_to_resource().empty()) { + UpdateMachineSamplesToKnowledgeBaseStatically(td_ptr, true); + } } RemoveTaskFromLabelsMap(*td_ptr); scheduler_->HandleTaskFailure(td_ptr); @@ -283,11 +297,12 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { return Status::OK; } RemoveTaskFromLabelsMap(*td_ptr); - // TODO(jagadish): We need to remove below code once we start - // getting machine resource stats samples from poseidon i.e., heapster. - // Currently updating machine samples statically based on state of pod. - if (!td_ptr->scheduled_to_resource().empty()) { - UpdateMachineSamplesToKnowledgeBaseStatically(td_ptr, true); + if (FLAGS_resource_stats_update_based_on_resource_reservation) { + if (!(td_ptr->scheduled_to_resource().empty()) && + (td_ptr->state() != TaskDescriptor::COMPLETED) && + (td_ptr->state() != TaskDescriptor::FAILED)) { + UpdateMachineSamplesToKnowledgeBaseStatically(td_ptr, true); + } } scheduler_->HandleTaskRemoval(td_ptr); JobID_t job_id = JobIDFromString(td_ptr->job_id()); @@ -394,8 +409,6 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { uint64_t* num_tasks_to_remove = FindOrNull(job_num_tasks_to_remove_, job_id); (*num_tasks_to_remove)++; - if ((*num_tasks_to_remove) >= 3800) - LOG(INFO) << "All tasks are submitted" << job_id << ", " << 3800; reply->set_type(TaskReplyType::TASK_SUBMITTED_OK); return Status::OK; } @@ -472,44 +485,35 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { scheduler_->RegisterResource(rtnd_ptr, false, true); reply->set_type(NodeReplyType::NODE_ADDED_OK); - // Add Node initial status simulation - ResourceStats resource_stats; - ResourceID_t res_id = - ResourceIDFromString(rtnd_ptr->resource_desc().uuid()); - ResourceStatus* rs_ptr = FindPtrOrNull(*resource_map_, res_id); - if (rs_ptr == NULL || rs_ptr->mutable_descriptor() == NULL) { - reply->set_type(NodeReplyType::NODE_NOT_FOUND); - return Status::OK; + if (FLAGS_resource_stats_update_based_on_resource_reservation) { + // Add Node initial status simulation + ResourceStats resource_stats; + ResourceID_t res_id = + ResourceIDFromString(rtnd_ptr->resource_desc().uuid()); + ResourceStatus* rs_ptr = FindPtrOrNull(*resource_map_, res_id); + if (rs_ptr == NULL || rs_ptr->mutable_descriptor() == NULL) { + reply->set_type(NodeReplyType::NODE_NOT_FOUND); + return Status::OK; + } + resource_stats.set_resource_id(rtnd_ptr->resource_desc().uuid()); + resource_stats.set_timestamp(0); + + CpuStats* cpu_stats = resource_stats.add_cpus_stats(); + // As some of the resources is utilized by system pods, so initializing + // utilization to 10%. + cpu_stats->set_cpu_capacity( + rtnd_ptr->resource_desc().resource_capacity().cpu_cores()); + cpu_stats->set_cpu_utilization(0.1); + cpu_stats->set_cpu_allocatable(cpu_stats->cpu_capacity() * 0.9); + resource_stats.set_mem_capacity( + rtnd_ptr->resource_desc().resource_capacity().ram_cap()); + resource_stats.set_mem_utilization(0.1); + resource_stats.set_mem_allocatable(resource_stats.mem_capacity() * 0.9); + resource_stats.set_disk_bw(0); + resource_stats.set_net_rx_bw(0); + resource_stats.set_net_tx_bw(0); + knowledge_base_->AddMachineSample(resource_stats); } - resource_stats.set_resource_id(rtnd_ptr->resource_desc().uuid()); - resource_stats.set_timestamp(0); - CpuStats* cpu_stats = resource_stats.add_cpus_stats(); - cpu_stats->set_cpu_capacity( - rtnd_ptr->resource_desc().resource_capacity().cpu_cores()); - // Assuming 80% of cpu/mem is is allocatable neglecting 20% for other - // processes in node. - cpu_stats->set_cpu_allocatable( - rtnd_ptr->resource_desc().resource_capacity().cpu_cores() * 0.80); - // resource_stats.cpus_stats(0).set_cpu_utilization(0.0); - // resource_stats.cpus_stats(0).set_cpu_reservation(0.0); - resource_stats.set_mem_allocatable( - rtnd_ptr->resource_desc().resource_capacity().ram_cap()); - resource_stats.set_mem_capacity( - rtnd_ptr->resource_desc().resource_capacity().ram_cap() * 0.80); - // resource_stats.set_mem_utilization(0.0); - // resource_stats.set_mem_reservation(0.0); - resource_stats.set_disk_bw(0); - resource_stats.set_net_rx_bw(0); - resource_stats.set_net_tx_bw(0); - // LOG(INFO) << "DEBUG: During node additions: CPU CAP: " << - // cpu_stats->cpu_capacity() << "\n" - // << " CPU ALLOC: " << - // cpu_stats->cpu_allocatable() << "\n" - // << " MEM CAP: " << - // resource_stats.mem_capacity() << "\n" - // << " MEM ALLOC: " << - // resource_stats.mem_allocatable(); - knowledge_base_->AddMachineSample(resource_stats); return Status::OK; } @@ -624,7 +628,7 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service { unordered_map> job_num_tasks_to_remove_; KnowledgeBasePopulator* kb_populator_; - //Pod affinity/anti-affinity + // Pod affinity/anti-affinity unordered_map>> labels_map_; vector affinity_antiaffinity_tasks_; diff --git a/src/scheduling/flow/cpu_cost_model.cc b/src/scheduling/flow/cpu_cost_model.cc index 2cd1e0682..1e24b4dd7 100644 --- a/src/scheduling/flow/cpu_cost_model.cc +++ b/src/scheduling/flow/cpu_cost_model.cc @@ -36,10 +36,10 @@ DECLARE_uint64(max_tasks_per_pu); namespace firmament { -CpuCostModel::CpuCostModel(shared_ptr resource_map, - shared_ptr task_map, - shared_ptr knowledge_base, - unordered_map>>* labels_map) +CpuCostModel::CpuCostModel( + shared_ptr resource_map, shared_ptr task_map, + shared_ptr knowledge_base, + unordered_map>>* labels_map) : resource_map_(resource_map), task_map_(task_map), knowledge_base_(knowledge_base), @@ -49,6 +49,20 @@ CpuCostModel::CpuCostModel(shared_ptr resource_map, infinity_ = omega_ * CpuMemCostVector_t::dimensions_; } +void CpuCostModel::AccumulateResourceStats(ResourceDescriptor* accumulator, + ResourceDescriptor* other) { + // Track the aggregate available resources below the machine node + ResourceVector* acc_avail = accumulator->mutable_available_resources(); + ResourceVector* other_avail = other->mutable_available_resources(); + acc_avail->set_cpu_cores(acc_avail->cpu_cores() + other_avail->cpu_cores()); + // Running/idle task count + accumulator->set_num_running_tasks_below( + accumulator->num_running_tasks_below() + + other->num_running_tasks_below()); + accumulator->set_num_slots_below(accumulator->num_slots_below() + + other->num_slots_below()); +} + ArcDescriptor CpuCostModel::TaskToUnscheduledAgg(TaskID_t task_id) { return ArcDescriptor(2560000, 1ULL, 0ULL); } @@ -121,6 +135,7 @@ ArcDescriptor CpuCostModel::EquivClassToEquivClass(EquivClass_t ec1, ec_index * resource_request->cpu_cores_; available_resources.ram_cap_ = rd.available_resources().ram_cap() - ec_index * resource_request->ram_cap_; + // Expressing Least Requested Priority. float cpu_fraction = ((rd.resource_capacity().cpu_cores() - available_resources.cpu_cores_) / @@ -187,7 +202,7 @@ ArcDescriptor CpuCostModel::EquivClassToEquivClass(EquivClass_t ec1, } } } - + // Expressing pod affinity/anti-affinity priority scores. if ((affinity.has_pod_affinity() && affinity.pod_affinity() @@ -225,10 +240,10 @@ ArcDescriptor CpuCostModel::EquivClassToEquivClass(EquivClass_t ec1, } } } - + cost_vector.node_affinity_soft_cost_ = omega_ - node_affinity_normalized_score; - cost_vector.pod_affinity_soft_cost_ = omega_ - pod_affinity_normalized_score; + cost_vector.pod_affinity_soft_cost_ = omega_ - pod_affinity_normalized_score; Cost_t final_cost = FlattenCostVector(cost_vector); return ArcDescriptor(final_cost, 1ULL, 0ULL); } @@ -854,44 +869,54 @@ FlowGraphNode* CpuCostModel::GatherStats(FlowGraphNode* accumulator, if (!accumulator->IsResourceNode()) { return accumulator; } - - if (other->resource_id_.is_nil()) { - // The other node is not a resource node. - if (other->type_ == FlowNodeType::SINK) { - accumulator->rd_ptr_->set_num_running_tasks_below(static_cast( - accumulator->rd_ptr_->current_running_tasks_size())); - accumulator->rd_ptr_->set_num_slots_below(FLAGS_max_tasks_per_pu); - } + if (accumulator->type_ == FlowNodeType::COORDINATOR) { return accumulator; } - - CHECK_NOTNULL(other->rd_ptr_); ResourceDescriptor* rd_ptr = accumulator->rd_ptr_; CHECK_NOTNULL(rd_ptr); - if (accumulator->type_ == FlowNodeType::MACHINE) { + if (accumulator->type_ == FlowNodeType::PU) { + CHECK(other->resource_id_.is_nil()); + ResourceStats latest_stats; + ResourceID_t machine_res_id = + MachineResIDForResource(accumulator->resource_id_); + bool have_sample = knowledge_base_->GetLatestStatsForMachine(machine_res_id, + &latest_stats); + if (have_sample) { + VLOG(2) << "Updating PU " << accumulator->resource_id_ << "'s " + << "resource stats!"; + // Get the CPU stats for this PU + string label = rd_ptr->friendly_name(); + uint64_t idx = label.find("PU #"); + if (idx != string::npos) { + string core_id_substr = label.substr(idx + 4, label.size() - idx - 4); + uint32_t core_id = strtoul(core_id_substr.c_str(), 0, 10); + float available_cpu_cores = + latest_stats.cpus_stats(core_id).cpu_capacity() * + (1.0 - latest_stats.cpus_stats(core_id).cpu_utilization()); + rd_ptr->mutable_available_resources()->set_cpu_cores( + available_cpu_cores); + } + // Running/idle task count + rd_ptr->set_num_running_tasks_below(rd_ptr->current_running_tasks_size()); + rd_ptr->set_num_slots_below(FLAGS_max_tasks_per_pu); + return accumulator; + } + } else if (accumulator->type_ == FlowNodeType::MACHINE) { // Grab the latest available resource sample from the machine ResourceStats latest_stats; // Take the most recent sample for now bool have_sample = knowledge_base_->GetLatestStatsForMachine( accumulator->resource_id_, &latest_stats); if (have_sample) { - // LOG(INFO) << "DEBUG: Size of cpu stats: " << - // latest_stats.cpus_stats_size(); - // uint32_t core_id = 0; - float available_cpu_cores = latest_stats.cpus_stats(0).cpu_allocatable(); - // latest_stats.cpus_stats(core_id).cpu_capacity() * - // (1.0 - latest_stats.cpus_stats(core_id).cpu_utilization()); - // auto available_ram_cap = latest_stats.mem_capacity() * - auto available_ram_cap = latest_stats.mem_allocatable(); - // (1.0 - latest_stats.mem_utilization()); - // LOG(INFO) << "DEBUG: Stats from latest machine sample: " - // << "Available cpu: " << available_cpu_cores << "\n" - // << "Available mem: " << available_ram_cap; - rd_ptr->mutable_available_resources()->set_cpu_cores(available_cpu_cores); - rd_ptr->mutable_available_resources()->set_ram_cap(available_ram_cap); + VLOG(2) << "Updating machine " << accumulator->resource_id_ << "'s " + << "resource stats!"; + rd_ptr->mutable_available_resources()->set_ram_cap( + latest_stats.mem_capacity() * (1.0 - latest_stats.mem_utilization())); + } + if (accumulator->rd_ptr_ && other->rd_ptr_) { + AccumulateResourceStats(accumulator->rd_ptr_, other->rd_ptr_); } } - return accumulator; } @@ -902,6 +927,7 @@ void CpuCostModel::PrepareStats(FlowGraphNode* accumulator) { CHECK_NOTNULL(accumulator->rd_ptr_); accumulator->rd_ptr_->clear_num_running_tasks_below(); accumulator->rd_ptr_->clear_num_slots_below(); + accumulator->rd_ptr_->clear_available_resources(); // Clear maps related to priority scores. ec_to_node_priority_scores.clear(); } diff --git a/src/scheduling/flow/cpu_cost_model.h b/src/scheduling/flow/cpu_cost_model.h index ecc5ce1aa..594e634ff 100644 --- a/src/scheduling/flow/cpu_cost_model.h +++ b/src/scheduling/flow/cpu_cost_model.h @@ -37,13 +37,16 @@ namespace firmament { struct CpuMemCostVector_t { // record number of dimensions here - static const int16_t dimensions_ = 3; + static const int16_t dimensions_ = 4; uint64_t cpu_mem_cost_; uint64_t balanced_res_cost_; uint64_t node_affinity_soft_cost_; uint64_t pod_affinity_soft_cost_; CpuMemCostVector_t() - : cpu_mem_cost_(0), balanced_res_cost_(0), node_affinity_soft_cost_(0), pod_affinity_soft_cost_(0) {} + : cpu_mem_cost_(0), + balanced_res_cost_(0), + node_affinity_soft_cost_(0), + pod_affinity_soft_cost_(0) {} }; struct CpuMemResVector_t { @@ -167,6 +170,9 @@ class CpuCostModel : public CostModelInterface { FRIEND_TEST(CpuCostModelTest, GetOutgoingEquivClassPrefArcs); FRIEND_TEST(CpuCostModelTest, GetTaskEquivClasses); FRIEND_TEST(CpuCostModelTest, MachineResIDForResource); + // Load statistics accumulator helper + void AccumulateResourceStats(ResourceDescriptor* accumulator, + ResourceDescriptor* other); Cost_t FlattenCostVector(CpuMemCostVector_t cv); EquivClass_t GetMachineEC(const string& machine_name, uint64_t ec_index); ResourceID_t MachineResIDForResource(ResourceID_t res_id); diff --git a/src/scheduling/flow/cpu_cost_model_test.cc b/src/scheduling/flow/cpu_cost_model_test.cc index 93314dabc..db3dc2862 100644 --- a/src/scheduling/flow/cpu_cost_model_test.cc +++ b/src/scheduling/flow/cpu_cost_model_test.cc @@ -163,8 +163,8 @@ TEST_F(CpuCostModelTest, EquivClassToEquivClass) { // Calculate cost of arc between main EC and second machine EC. ArcDescriptor arc_cost2 = cost_model->EquivClassToEquivClass( (*equiv_classes)[0], machine_equiv_classes[1]); - EXPECT_EQ(1500, arc_cost1.cost_); - EXPECT_EQ(1525, arc_cost2.cost_); + EXPECT_EQ(2500, arc_cost1.cost_); + EXPECT_EQ(2525, arc_cost2.cost_); // Cost of arc between main EC and first machine EC should be less than // cost of arc between main EC and second machine EC. EXPECT_LT(arc_cost1.cost_, arc_cost2.cost_); @@ -277,8 +277,10 @@ TEST_F(CpuCostModelTest, GatherStats) { machine_test_stats.set_resource_id(to_string(res_id1)); machine_test_stats.add_cpus_stats(); CpuStats* pu_test_stats = machine_test_stats.mutable_cpus_stats(0); - pu_test_stats->set_cpu_allocatable(500.0); - machine_test_stats.set_mem_allocatable(16000); + pu_test_stats->set_cpu_utilization(0.5); + pu_test_stats->set_cpu_capacity(1000.0); + machine_test_stats.set_mem_utilization(0.5); + machine_test_stats.set_mem_capacity(32000); cost_model->knowledge_base_->AddMachineSample(machine_test_stats); // Create flow graph nodes. FlowGraphNode* machine_node = new FlowGraphNode(1); diff --git a/src/scheduling/flow/flow_scheduler.cc b/src/scheduling/flow/flow_scheduler.cc index 42f505cec..1c5fed4fe 100644 --- a/src/scheduling/flow/flow_scheduler.cc +++ b/src/scheduling/flow/flow_scheduler.cc @@ -70,6 +70,8 @@ DEFINE_bool(reschedule_tasks_upon_node_failure, true, "True if tasks that were " DECLARE_string(flow_scheduling_solver); DECLARE_bool(flowlessly_flip_algorithms); +DEFINE_bool(resource_stats_update_based_on_resource_reservation, true, + "Set this false when you have external machine stats server"); namespace firmament { namespace scheduler { @@ -203,39 +205,67 @@ uint64_t FlowScheduler::ApplySchedulingDeltas( // We should not get any NOOP deltas as they get filtered before. continue; } else if (delta->type() == SchedulingDelta::PLACE) { - // Resource stats simulation - ResourceStats resource_stats; - CpuStats* cpu_stats = resource_stats.add_cpus_stats(); - bool have_sample = - knowledge_base_->GetLatestStatsForMachine(ResourceIDFromString(rs->mutable_topology_node()->parent_id()), &resource_stats); - if(have_sample) { - cpu_stats->set_cpu_allocatable(cpu_stats->cpu_allocatable() - td_ptr->resource_request().cpu_cores()); - resource_stats.set_mem_allocatable(resource_stats.mem_allocatable() - td_ptr->resource_request().ram_cap()); - //LOG(INFO) << "DEBUG: While applying PLACE scheduling deltas after iteration: \n" - // << "CPU CAP: " << cpu_stats->cpu_capacity() << ", " << "CPU ALLOC: " << cpu_stats->cpu_allocatable() << "\n" - // << "MEM CAP: " << resource_stats.mem_capacity() << ", " << "MEM ALLOC: " << resource_stats.mem_allocatable(); - knowledge_base_->AddMachineSample(resource_stats); + // Update the knowlege base with resource stats samples based on tasks + // resource requeset, when we do not have external dynamic resource + // stats provider like heapster in kubernetes. + if (FLAGS_resource_stats_update_based_on_resource_reservation) { + ResourceStats resource_stats; + CpuStats* cpu_stats = resource_stats.add_cpus_stats(); + bool have_sample = knowledge_base_->GetLatestStatsForMachine( + ResourceIDFromString(rs->mutable_topology_node()->parent_id()), + &resource_stats); + if (have_sample) { + cpu_stats->set_cpu_allocatable( + cpu_stats->cpu_allocatable() - + td_ptr->resource_request().cpu_cores()); + resource_stats.set_mem_allocatable( + resource_stats.mem_allocatable() - + td_ptr->resource_request().ram_cap()); + double cpu_utilization = + (cpu_stats->cpu_capacity() - cpu_stats->cpu_allocatable()) / + (double)cpu_stats->cpu_capacity(); + cpu_stats->set_cpu_utilization(cpu_utilization); + double mem_utilization = (resource_stats.mem_capacity() - + resource_stats.mem_allocatable()) / + (double)resource_stats.mem_capacity(); + resource_stats.set_mem_utilization(mem_utilization); + knowledge_base_->AddMachineSample(resource_stats); + } } // Tag the job to which this task belongs as running JobDescriptor* jd = - FindOrNull(*job_map_, JobIDFromString(td_ptr->job_id())); + FindOrNull(*job_map_, JobIDFromString(td_ptr->job_id())); if (jd->state() != JobDescriptor::RUNNING) jd->set_state(JobDescriptor::RUNNING); HandleTaskPlacement(td_ptr, rs->mutable_descriptor()); num_scheduled++; } else if (delta->type() == SchedulingDelta::PREEMPT) { - // Resource stats simulation - ResourceStats resource_stats; - CpuStats* cpu_stats = resource_stats.add_cpus_stats(); - bool have_sample = - knowledge_base_->GetLatestStatsForMachine(ResourceIDFromString(rs->mutable_topology_node()->parent_id()), &resource_stats); - if(have_sample) { - cpu_stats->set_cpu_allocatable(cpu_stats->cpu_allocatable() + td_ptr->resource_request().cpu_cores()); - resource_stats.set_mem_allocatable(resource_stats.mem_allocatable() + td_ptr->resource_request().ram_cap()); - //LOG(INFO) << "DEBUG: While applying PREEMPT scheduling deltas after iteration: \n" - // << "CPU CAP: " << cpu_stats->cpu_capacity() << ", " << "CPU ALLOC: " << cpu_stats->cpu_allocatable() << "\n" - // << "MEM CAP: " << resource_stats.mem_capacity() << ", " << "MEM ALLOC: " << resource_stats.mem_allocatable(); - knowledge_base_->AddMachineSample(resource_stats); + // Update the knowlege base with resource stats samples based on tasks + // resource requeset, when we do not have external dynamic resource + // stats provider like heapster in kubernetes. + if (FLAGS_resource_stats_update_based_on_resource_reservation) { + ResourceStats resource_stats; + CpuStats* cpu_stats = resource_stats.add_cpus_stats(); + bool have_sample = knowledge_base_->GetLatestStatsForMachine( + ResourceIDFromString(rs->mutable_topology_node()->parent_id()), + &resource_stats); + if (have_sample) { + cpu_stats->set_cpu_allocatable( + cpu_stats->cpu_allocatable() + + td_ptr->resource_request().cpu_cores()); + resource_stats.set_mem_allocatable( + resource_stats.mem_allocatable() + + td_ptr->resource_request().ram_cap()); + double cpu_utilization = + (cpu_stats->cpu_capacity() - cpu_stats->cpu_allocatable()) / + (double)cpu_stats->cpu_capacity(); + cpu_stats->set_cpu_utilization(cpu_utilization); + double mem_utilization = (resource_stats.mem_capacity() - + resource_stats.mem_allocatable()) / + (double)resource_stats.mem_capacity(); + resource_stats.set_mem_utilization(mem_utilization); + knowledge_base_->AddMachineSample(resource_stats); + } } HandleTaskEviction(td_ptr, rs->mutable_descriptor()); } else if (delta->type() == SchedulingDelta::MIGRATE) {