Skip to content

Commit

Permalink
Resource stats static calculation improved, changes made to gather cp…
Browse files Browse the repository at this point in the history
…u, running tasks and slots at PU to machine

Affected modules: scheduling
  • Loading branch information
shivramsrivastava committed Jun 19, 2018
1 parent c45c413 commit 97d14fc
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 136 deletions.
148 changes: 76 additions & 72 deletions src/scheduling/firmament_scheduler_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ DEFINE_string(firmament_scheduler_service_address, "127.0.0.1",
"The address of the scheduler service");
DEFINE_string(firmament_scheduler_service_port, "9090",
"The port of the scheduler service");
DECLARE_bool(resource_stats_update_based_on_resource_reservation);
DEFINE_string(service_scheduler, "flow", "Scheduler to use: flow | simple");
DEFINE_uint64(queue_based_scheduling_time, 1, "Queue Based Schedule run time");

Expand Down Expand Up @@ -117,21 +118,43 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
// TODO(ionel): Implement!
}

void UpdateMachineSamplesToKnowledgeBaseStatically(const TaskDescriptor* td_ptr, bool add) {
// Helper function that update the knowledge base with resource stats samples
// based on task resource request reservation. We can use this function when
// we do not have external dynamic resource stats provider like heapster in
// kubernetes. If add is true, then tast resource request is subtracted from
// available machine resources. else tast resource request is added back to
// available machine resources.
void UpdateMachineSamplesToKnowledgeBaseStatically(
const TaskDescriptor* td_ptr, bool add) {
ResourceID_t res_id = ResourceIDFromString(td_ptr->scheduled_to_resource());
ResourceStatus* rs = FindPtrOrNull(*resource_map_, res_id);
ResourceStats resource_stats;
CpuStats* cpu_stats = resource_stats.add_cpus_stats();
bool have_sample =
knowledge_base_->GetLatestStatsForMachine(ResourceIDFromString(rs->mutable_topology_node()->parent_id()), &resource_stats);
bool have_sample = knowledge_base_->GetLatestStatsForMachine(
ResourceIDFromString(rs->mutable_topology_node()->parent_id()),
&resource_stats);
if (have_sample) {
if (add) {
cpu_stats->set_cpu_allocatable(cpu_stats->cpu_allocatable() + td_ptr->resource_request().cpu_cores());
resource_stats.set_mem_allocatable(resource_stats.mem_allocatable() + td_ptr->resource_request().ram_cap());
cpu_stats->set_cpu_allocatable(cpu_stats->cpu_allocatable() +
td_ptr->resource_request().cpu_cores());
resource_stats.set_mem_allocatable(
resource_stats.mem_allocatable() +
td_ptr->resource_request().ram_cap());
} else {
cpu_stats->set_cpu_allocatable(cpu_stats->cpu_allocatable() - td_ptr->resource_request().cpu_cores());
resource_stats.set_mem_allocatable(resource_stats.mem_allocatable() - td_ptr->resource_request().ram_cap());
cpu_stats->set_cpu_allocatable(cpu_stats->cpu_allocatable() -
td_ptr->resource_request().cpu_cores());
resource_stats.set_mem_allocatable(
resource_stats.mem_allocatable() -
td_ptr->resource_request().ram_cap());
}
double cpu_utilization =
(cpu_stats->cpu_capacity() - cpu_stats->cpu_allocatable()) /
(double)cpu_stats->cpu_capacity();
cpu_stats->set_cpu_utilization(cpu_utilization);
double mem_utilization =
(resource_stats.mem_capacity() - resource_stats.mem_allocatable()) /
(double)resource_stats.mem_capacity();
resource_stats.set_mem_utilization(mem_utilization);
knowledge_base_->AddMachineSample(resource_stats);
}
}
Expand All @@ -141,11 +164,6 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
SchedulerStats sstat;
vector<SchedulingDelta> deltas;
scheduler_->ScheduleAllJobs(&sstat, &deltas);
// Extract results
if (deltas.size()) {
LOG(INFO) << "Got " << deltas.size() << " scheduling deltas";
}

// Schedule tasks having pod affinity/anti-affinity
chrono::high_resolution_clock::time_point start =
chrono::high_resolution_clock::now();
Expand All @@ -158,11 +176,10 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
chrono::high_resolution_clock::now();
time_spent = chrono::duration_cast<chrono::seconds>(end - start);
}
if(deltas.size()) {
LOG(INFO) << "QueueBasedSchedule: Got " << deltas.size()
<< " scheduling deltas";
// Extract results
if (deltas.size()) {
LOG(INFO) << "Got " << deltas.size() << " scheduling deltas";
}

for (auto& d : deltas) {
// LOG(INFO) << "Delta: " << d.DebugString();
SchedulingDelta* ret_delta = reply->add_deltas();
Expand Down Expand Up @@ -215,19 +232,17 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
}
}


Status TaskCompleted(ServerContext* context, const TaskUID* tid_ptr,
TaskCompletedResponse* reply) override {
TaskDescriptor* td_ptr = FindPtrOrNull(*task_map_, tid_ptr->task_uid());
if (td_ptr == NULL) {
reply->set_type(TaskReplyType::TASK_NOT_FOUND);
return Status::OK;
}
// TODO(jagadish): We need to remove below code once we start
// getting machine resource stats samples from poseidon i.e., heapster.
// Currently updating machine samples statically based on state of pod.
if (!td_ptr->scheduled_to_resource().empty()) {
UpdateMachineSamplesToKnowledgeBaseStatically(td_ptr, true);
if (FLAGS_resource_stats_update_based_on_resource_reservation) {
if (!td_ptr->scheduled_to_resource().empty()) {
UpdateMachineSamplesToKnowledgeBaseStatically(td_ptr, true);
}
}
JobID_t job_id = JobIDFromString(td_ptr->job_id());
JobDescriptor* jd_ptr = FindOrNull(*job_map_, job_id);
Expand Down Expand Up @@ -261,11 +276,10 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
reply->set_type(TaskReplyType::TASK_NOT_FOUND);
return Status::OK;
}
// TODO(jagadish): We need to remove below code once we start
// getting machine resource stats samples from poseidon i.e., heapster.
// Currently updating machine samples statically based on state of pod.
if (!td_ptr->scheduled_to_resource().empty()) {
UpdateMachineSamplesToKnowledgeBaseStatically(td_ptr, true);
if (FLAGS_resource_stats_update_based_on_resource_reservation) {
if (!td_ptr->scheduled_to_resource().empty()) {
UpdateMachineSamplesToKnowledgeBaseStatically(td_ptr, true);
}
}
RemoveTaskFromLabelsMap(*td_ptr);
scheduler_->HandleTaskFailure(td_ptr);
Expand All @@ -283,11 +297,12 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
return Status::OK;
}
RemoveTaskFromLabelsMap(*td_ptr);
// TODO(jagadish): We need to remove below code once we start
// getting machine resource stats samples from poseidon i.e., heapster.
// Currently updating machine samples statically based on state of pod.
if (!td_ptr->scheduled_to_resource().empty()) {
UpdateMachineSamplesToKnowledgeBaseStatically(td_ptr, true);
if (FLAGS_resource_stats_update_based_on_resource_reservation) {
if (!(td_ptr->scheduled_to_resource().empty()) &&
(td_ptr->state() != TaskDescriptor::COMPLETED) &&
(td_ptr->state() != TaskDescriptor::FAILED)) {
UpdateMachineSamplesToKnowledgeBaseStatically(td_ptr, true);
}
}
scheduler_->HandleTaskRemoval(td_ptr);
JobID_t job_id = JobIDFromString(td_ptr->job_id());
Expand Down Expand Up @@ -394,8 +409,6 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
uint64_t* num_tasks_to_remove =
FindOrNull(job_num_tasks_to_remove_, job_id);
(*num_tasks_to_remove)++;
if ((*num_tasks_to_remove) >= 3800)
LOG(INFO) << "All tasks are submitted" << job_id << ", " << 3800;
reply->set_type(TaskReplyType::TASK_SUBMITTED_OK);
return Status::OK;
}
Expand Down Expand Up @@ -472,44 +485,35 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
scheduler_->RegisterResource(rtnd_ptr, false, true);
reply->set_type(NodeReplyType::NODE_ADDED_OK);

// Add Node initial status simulation
ResourceStats resource_stats;
ResourceID_t res_id =
ResourceIDFromString(rtnd_ptr->resource_desc().uuid());
ResourceStatus* rs_ptr = FindPtrOrNull(*resource_map_, res_id);
if (rs_ptr == NULL || rs_ptr->mutable_descriptor() == NULL) {
reply->set_type(NodeReplyType::NODE_NOT_FOUND);
return Status::OK;
if (FLAGS_resource_stats_update_based_on_resource_reservation) {
// Add Node initial status simulation
ResourceStats resource_stats;
ResourceID_t res_id =
ResourceIDFromString(rtnd_ptr->resource_desc().uuid());
ResourceStatus* rs_ptr = FindPtrOrNull(*resource_map_, res_id);
if (rs_ptr == NULL || rs_ptr->mutable_descriptor() == NULL) {
reply->set_type(NodeReplyType::NODE_NOT_FOUND);
return Status::OK;
}
resource_stats.set_resource_id(rtnd_ptr->resource_desc().uuid());
resource_stats.set_timestamp(0);

CpuStats* cpu_stats = resource_stats.add_cpus_stats();
// As some of the resources is utilized by system pods, so initializing
// utilization to 10%.
cpu_stats->set_cpu_capacity(
rtnd_ptr->resource_desc().resource_capacity().cpu_cores());
cpu_stats->set_cpu_utilization(0.1);
cpu_stats->set_cpu_allocatable(cpu_stats->cpu_capacity() * 0.9);
resource_stats.set_mem_capacity(
rtnd_ptr->resource_desc().resource_capacity().ram_cap());
resource_stats.set_mem_utilization(0.1);
resource_stats.set_mem_allocatable(resource_stats.mem_capacity() * 0.9);
resource_stats.set_disk_bw(0);
resource_stats.set_net_rx_bw(0);
resource_stats.set_net_tx_bw(0);
knowledge_base_->AddMachineSample(resource_stats);
}
resource_stats.set_resource_id(rtnd_ptr->resource_desc().uuid());
resource_stats.set_timestamp(0);
CpuStats* cpu_stats = resource_stats.add_cpus_stats();
cpu_stats->set_cpu_capacity(
rtnd_ptr->resource_desc().resource_capacity().cpu_cores());
// Assuming 80% of cpu/mem is is allocatable neglecting 20% for other
// processes in node.
cpu_stats->set_cpu_allocatable(
rtnd_ptr->resource_desc().resource_capacity().cpu_cores() * 0.80);
// resource_stats.cpus_stats(0).set_cpu_utilization(0.0);
// resource_stats.cpus_stats(0).set_cpu_reservation(0.0);
resource_stats.set_mem_allocatable(
rtnd_ptr->resource_desc().resource_capacity().ram_cap());
resource_stats.set_mem_capacity(
rtnd_ptr->resource_desc().resource_capacity().ram_cap() * 0.80);
// resource_stats.set_mem_utilization(0.0);
// resource_stats.set_mem_reservation(0.0);
resource_stats.set_disk_bw(0);
resource_stats.set_net_rx_bw(0);
resource_stats.set_net_tx_bw(0);
// LOG(INFO) << "DEBUG: During node additions: CPU CAP: " <<
// cpu_stats->cpu_capacity() << "\n"
// << " CPU ALLOC: " <<
// cpu_stats->cpu_allocatable() << "\n"
// << " MEM CAP: " <<
// resource_stats.mem_capacity() << "\n"
// << " MEM ALLOC: " <<
// resource_stats.mem_allocatable();
knowledge_base_->AddMachineSample(resource_stats);
return Status::OK;
}

Expand Down Expand Up @@ -624,7 +628,7 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
unordered_map<JobID_t, uint64_t, boost::hash<boost::uuids::uuid>>
job_num_tasks_to_remove_;
KnowledgeBasePopulator* kb_populator_;
//Pod affinity/anti-affinity
// Pod affinity/anti-affinity
unordered_map<string, unordered_map<string, vector<TaskID_t>>> labels_map_;
vector<TaskID_t> affinity_antiaffinity_tasks_;

Expand Down
92 changes: 59 additions & 33 deletions src/scheduling/flow/cpu_cost_model.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ DECLARE_uint64(max_tasks_per_pu);

namespace firmament {

CpuCostModel::CpuCostModel(shared_ptr<ResourceMap_t> resource_map,
shared_ptr<TaskMap_t> task_map,
shared_ptr<KnowledgeBase> knowledge_base,
unordered_map<string, unordered_map<string, vector<TaskID_t>>>* labels_map)
CpuCostModel::CpuCostModel(
shared_ptr<ResourceMap_t> resource_map, shared_ptr<TaskMap_t> task_map,
shared_ptr<KnowledgeBase> knowledge_base,
unordered_map<string, unordered_map<string, vector<TaskID_t>>>* labels_map)
: resource_map_(resource_map),
task_map_(task_map),
knowledge_base_(knowledge_base),
Expand All @@ -49,6 +49,20 @@ CpuCostModel::CpuCostModel(shared_ptr<ResourceMap_t> resource_map,
infinity_ = omega_ * CpuMemCostVector_t::dimensions_;
}

void CpuCostModel::AccumulateResourceStats(ResourceDescriptor* accumulator,
ResourceDescriptor* other) {
// Track the aggregate available resources below the machine node
ResourceVector* acc_avail = accumulator->mutable_available_resources();
ResourceVector* other_avail = other->mutable_available_resources();
acc_avail->set_cpu_cores(acc_avail->cpu_cores() + other_avail->cpu_cores());
// Running/idle task count
accumulator->set_num_running_tasks_below(
accumulator->num_running_tasks_below() +
other->num_running_tasks_below());
accumulator->set_num_slots_below(accumulator->num_slots_below() +
other->num_slots_below());
}

ArcDescriptor CpuCostModel::TaskToUnscheduledAgg(TaskID_t task_id) {
return ArcDescriptor(2560000, 1ULL, 0ULL);
}
Expand Down Expand Up @@ -121,6 +135,7 @@ ArcDescriptor CpuCostModel::EquivClassToEquivClass(EquivClass_t ec1,
ec_index * resource_request->cpu_cores_;
available_resources.ram_cap_ = rd.available_resources().ram_cap() -
ec_index * resource_request->ram_cap_;

// Expressing Least Requested Priority.
float cpu_fraction =
((rd.resource_capacity().cpu_cores() - available_resources.cpu_cores_) /
Expand Down Expand Up @@ -187,7 +202,7 @@ ArcDescriptor CpuCostModel::EquivClassToEquivClass(EquivClass_t ec1,
}
}
}

// Expressing pod affinity/anti-affinity priority scores.
if ((affinity.has_pod_affinity() &&
affinity.pod_affinity()
Expand Down Expand Up @@ -225,10 +240,10 @@ ArcDescriptor CpuCostModel::EquivClassToEquivClass(EquivClass_t ec1,
}
}
}

cost_vector.node_affinity_soft_cost_ =
omega_ - node_affinity_normalized_score;
cost_vector.pod_affinity_soft_cost_ = omega_ - pod_affinity_normalized_score;
cost_vector.pod_affinity_soft_cost_ = omega_ - pod_affinity_normalized_score;
Cost_t final_cost = FlattenCostVector(cost_vector);
return ArcDescriptor(final_cost, 1ULL, 0ULL);
}
Expand Down Expand Up @@ -854,44 +869,54 @@ FlowGraphNode* CpuCostModel::GatherStats(FlowGraphNode* accumulator,
if (!accumulator->IsResourceNode()) {
return accumulator;
}

if (other->resource_id_.is_nil()) {
// The other node is not a resource node.
if (other->type_ == FlowNodeType::SINK) {
accumulator->rd_ptr_->set_num_running_tasks_below(static_cast<uint64_t>(
accumulator->rd_ptr_->current_running_tasks_size()));
accumulator->rd_ptr_->set_num_slots_below(FLAGS_max_tasks_per_pu);
}
if (accumulator->type_ == FlowNodeType::COORDINATOR) {
return accumulator;
}

CHECK_NOTNULL(other->rd_ptr_);
ResourceDescriptor* rd_ptr = accumulator->rd_ptr_;
CHECK_NOTNULL(rd_ptr);
if (accumulator->type_ == FlowNodeType::MACHINE) {
if (accumulator->type_ == FlowNodeType::PU) {
CHECK(other->resource_id_.is_nil());
ResourceStats latest_stats;
ResourceID_t machine_res_id =
MachineResIDForResource(accumulator->resource_id_);
bool have_sample = knowledge_base_->GetLatestStatsForMachine(machine_res_id,
&latest_stats);
if (have_sample) {
VLOG(2) << "Updating PU " << accumulator->resource_id_ << "'s "
<< "resource stats!";
// Get the CPU stats for this PU
string label = rd_ptr->friendly_name();
uint64_t idx = label.find("PU #");
if (idx != string::npos) {
string core_id_substr = label.substr(idx + 4, label.size() - idx - 4);
uint32_t core_id = strtoul(core_id_substr.c_str(), 0, 10);
float available_cpu_cores =
latest_stats.cpus_stats(core_id).cpu_capacity() *
(1.0 - latest_stats.cpus_stats(core_id).cpu_utilization());
rd_ptr->mutable_available_resources()->set_cpu_cores(
available_cpu_cores);
}
// Running/idle task count
rd_ptr->set_num_running_tasks_below(rd_ptr->current_running_tasks_size());
rd_ptr->set_num_slots_below(FLAGS_max_tasks_per_pu);
return accumulator;
}
} else if (accumulator->type_ == FlowNodeType::MACHINE) {
// Grab the latest available resource sample from the machine
ResourceStats latest_stats;
// Take the most recent sample for now
bool have_sample = knowledge_base_->GetLatestStatsForMachine(
accumulator->resource_id_, &latest_stats);
if (have_sample) {
// LOG(INFO) << "DEBUG: Size of cpu stats: " <<
// latest_stats.cpus_stats_size();
// uint32_t core_id = 0;
float available_cpu_cores = latest_stats.cpus_stats(0).cpu_allocatable();
// latest_stats.cpus_stats(core_id).cpu_capacity() *
// (1.0 - latest_stats.cpus_stats(core_id).cpu_utilization());
// auto available_ram_cap = latest_stats.mem_capacity() *
auto available_ram_cap = latest_stats.mem_allocatable();
// (1.0 - latest_stats.mem_utilization());
// LOG(INFO) << "DEBUG: Stats from latest machine sample: "
// << "Available cpu: " << available_cpu_cores << "\n"
// << "Available mem: " << available_ram_cap;
rd_ptr->mutable_available_resources()->set_cpu_cores(available_cpu_cores);
rd_ptr->mutable_available_resources()->set_ram_cap(available_ram_cap);
VLOG(2) << "Updating machine " << accumulator->resource_id_ << "'s "
<< "resource stats!";
rd_ptr->mutable_available_resources()->set_ram_cap(
latest_stats.mem_capacity() * (1.0 - latest_stats.mem_utilization()));
}
if (accumulator->rd_ptr_ && other->rd_ptr_) {
AccumulateResourceStats(accumulator->rd_ptr_, other->rd_ptr_);
}
}

return accumulator;
}

Expand All @@ -902,6 +927,7 @@ void CpuCostModel::PrepareStats(FlowGraphNode* accumulator) {
CHECK_NOTNULL(accumulator->rd_ptr_);
accumulator->rd_ptr_->clear_num_running_tasks_below();
accumulator->rd_ptr_->clear_num_slots_below();
accumulator->rd_ptr_->clear_available_resources();
// Clear maps related to priority scores.
ec_to_node_priority_scores.clear();
}
Expand Down
Loading

0 comments on commit 97d14fc

Please sign in to comment.