Skip to content

Commit

Permalink
Merge pull request #6 from shivramsrivastava/dev
Browse files Browse the repository at this point in the history
Improved the static resource calculation
  • Loading branch information
shivramsrivastava authored Jun 19, 2018
2 parents f7f252b + 97d14fc commit 502f0b3
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 136 deletions.
148 changes: 76 additions & 72 deletions src/scheduling/firmament_scheduler_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ DEFINE_string(firmament_scheduler_service_address, "127.0.0.1",
"The address of the scheduler service");
DEFINE_string(firmament_scheduler_service_port, "9090",
"The port of the scheduler service");
DECLARE_bool(resource_stats_update_based_on_resource_reservation);
DEFINE_string(service_scheduler, "flow", "Scheduler to use: flow | simple");
DEFINE_uint64(queue_based_scheduling_time, 1, "Queue Based Schedule run time");

Expand Down Expand Up @@ -117,21 +118,43 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
// TODO(ionel): Implement!
}

void UpdateMachineSamplesToKnowledgeBaseStatically(const TaskDescriptor* td_ptr, bool add) {
// Helper function that update the knowledge base with resource stats samples
// based on task resource request reservation. We can use this function when
// we do not have external dynamic resource stats provider like heapster in
// kubernetes. If add is true, then tast resource request is subtracted from
// available machine resources. else tast resource request is added back to
// available machine resources.
void UpdateMachineSamplesToKnowledgeBaseStatically(
const TaskDescriptor* td_ptr, bool add) {
ResourceID_t res_id = ResourceIDFromString(td_ptr->scheduled_to_resource());
ResourceStatus* rs = FindPtrOrNull(*resource_map_, res_id);
ResourceStats resource_stats;
CpuStats* cpu_stats = resource_stats.add_cpus_stats();
bool have_sample =
knowledge_base_->GetLatestStatsForMachine(ResourceIDFromString(rs->mutable_topology_node()->parent_id()), &resource_stats);
bool have_sample = knowledge_base_->GetLatestStatsForMachine(
ResourceIDFromString(rs->mutable_topology_node()->parent_id()),
&resource_stats);
if (have_sample) {
if (add) {
cpu_stats->set_cpu_allocatable(cpu_stats->cpu_allocatable() + td_ptr->resource_request().cpu_cores());
resource_stats.set_mem_allocatable(resource_stats.mem_allocatable() + td_ptr->resource_request().ram_cap());
cpu_stats->set_cpu_allocatable(cpu_stats->cpu_allocatable() +
td_ptr->resource_request().cpu_cores());
resource_stats.set_mem_allocatable(
resource_stats.mem_allocatable() +
td_ptr->resource_request().ram_cap());
} else {
cpu_stats->set_cpu_allocatable(cpu_stats->cpu_allocatable() - td_ptr->resource_request().cpu_cores());
resource_stats.set_mem_allocatable(resource_stats.mem_allocatable() - td_ptr->resource_request().ram_cap());
cpu_stats->set_cpu_allocatable(cpu_stats->cpu_allocatable() -
td_ptr->resource_request().cpu_cores());
resource_stats.set_mem_allocatable(
resource_stats.mem_allocatable() -
td_ptr->resource_request().ram_cap());
}
double cpu_utilization =
(cpu_stats->cpu_capacity() - cpu_stats->cpu_allocatable()) /
(double)cpu_stats->cpu_capacity();
cpu_stats->set_cpu_utilization(cpu_utilization);
double mem_utilization =
(resource_stats.mem_capacity() - resource_stats.mem_allocatable()) /
(double)resource_stats.mem_capacity();
resource_stats.set_mem_utilization(mem_utilization);
knowledge_base_->AddMachineSample(resource_stats);
}
}
Expand All @@ -141,11 +164,6 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
SchedulerStats sstat;
vector<SchedulingDelta> deltas;
scheduler_->ScheduleAllJobs(&sstat, &deltas);
// Extract results
if (deltas.size()) {
LOG(INFO) << "Got " << deltas.size() << " scheduling deltas";
}

// Schedule tasks having pod affinity/anti-affinity
chrono::high_resolution_clock::time_point start =
chrono::high_resolution_clock::now();
Expand All @@ -158,11 +176,10 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
chrono::high_resolution_clock::now();
time_spent = chrono::duration_cast<chrono::seconds>(end - start);
}
if(deltas.size()) {
LOG(INFO) << "QueueBasedSchedule: Got " << deltas.size()
<< " scheduling deltas";
// Extract results
if (deltas.size()) {
LOG(INFO) << "Got " << deltas.size() << " scheduling deltas";
}

for (auto& d : deltas) {
// LOG(INFO) << "Delta: " << d.DebugString();
SchedulingDelta* ret_delta = reply->add_deltas();
Expand Down Expand Up @@ -215,19 +232,17 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
}
}


Status TaskCompleted(ServerContext* context, const TaskUID* tid_ptr,
TaskCompletedResponse* reply) override {
TaskDescriptor* td_ptr = FindPtrOrNull(*task_map_, tid_ptr->task_uid());
if (td_ptr == NULL) {
reply->set_type(TaskReplyType::TASK_NOT_FOUND);
return Status::OK;
}
// TODO(jagadish): We need to remove below code once we start
// getting machine resource stats samples from poseidon i.e., heapster.
// Currently updating machine samples statically based on state of pod.
if (!td_ptr->scheduled_to_resource().empty()) {
UpdateMachineSamplesToKnowledgeBaseStatically(td_ptr, true);
if (FLAGS_resource_stats_update_based_on_resource_reservation) {
if (!td_ptr->scheduled_to_resource().empty()) {
UpdateMachineSamplesToKnowledgeBaseStatically(td_ptr, true);
}
}
JobID_t job_id = JobIDFromString(td_ptr->job_id());
JobDescriptor* jd_ptr = FindOrNull(*job_map_, job_id);
Expand Down Expand Up @@ -261,11 +276,10 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
reply->set_type(TaskReplyType::TASK_NOT_FOUND);
return Status::OK;
}
// TODO(jagadish): We need to remove below code once we start
// getting machine resource stats samples from poseidon i.e., heapster.
// Currently updating machine samples statically based on state of pod.
if (!td_ptr->scheduled_to_resource().empty()) {
UpdateMachineSamplesToKnowledgeBaseStatically(td_ptr, true);
if (FLAGS_resource_stats_update_based_on_resource_reservation) {
if (!td_ptr->scheduled_to_resource().empty()) {
UpdateMachineSamplesToKnowledgeBaseStatically(td_ptr, true);
}
}
RemoveTaskFromLabelsMap(*td_ptr);
scheduler_->HandleTaskFailure(td_ptr);
Expand All @@ -283,11 +297,12 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
return Status::OK;
}
RemoveTaskFromLabelsMap(*td_ptr);
// TODO(jagadish): We need to remove below code once we start
// getting machine resource stats samples from poseidon i.e., heapster.
// Currently updating machine samples statically based on state of pod.
if (!td_ptr->scheduled_to_resource().empty()) {
UpdateMachineSamplesToKnowledgeBaseStatically(td_ptr, true);
if (FLAGS_resource_stats_update_based_on_resource_reservation) {
if (!(td_ptr->scheduled_to_resource().empty()) &&
(td_ptr->state() != TaskDescriptor::COMPLETED) &&
(td_ptr->state() != TaskDescriptor::FAILED)) {
UpdateMachineSamplesToKnowledgeBaseStatically(td_ptr, true);
}
}
scheduler_->HandleTaskRemoval(td_ptr);
JobID_t job_id = JobIDFromString(td_ptr->job_id());
Expand Down Expand Up @@ -394,8 +409,6 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
uint64_t* num_tasks_to_remove =
FindOrNull(job_num_tasks_to_remove_, job_id);
(*num_tasks_to_remove)++;
if ((*num_tasks_to_remove) >= 3800)
LOG(INFO) << "All tasks are submitted" << job_id << ", " << 3800;
reply->set_type(TaskReplyType::TASK_SUBMITTED_OK);
return Status::OK;
}
Expand Down Expand Up @@ -472,44 +485,35 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
scheduler_->RegisterResource(rtnd_ptr, false, true);
reply->set_type(NodeReplyType::NODE_ADDED_OK);

// Add Node initial status simulation
ResourceStats resource_stats;
ResourceID_t res_id =
ResourceIDFromString(rtnd_ptr->resource_desc().uuid());
ResourceStatus* rs_ptr = FindPtrOrNull(*resource_map_, res_id);
if (rs_ptr == NULL || rs_ptr->mutable_descriptor() == NULL) {
reply->set_type(NodeReplyType::NODE_NOT_FOUND);
return Status::OK;
if (FLAGS_resource_stats_update_based_on_resource_reservation) {
// Add Node initial status simulation
ResourceStats resource_stats;
ResourceID_t res_id =
ResourceIDFromString(rtnd_ptr->resource_desc().uuid());
ResourceStatus* rs_ptr = FindPtrOrNull(*resource_map_, res_id);
if (rs_ptr == NULL || rs_ptr->mutable_descriptor() == NULL) {
reply->set_type(NodeReplyType::NODE_NOT_FOUND);
return Status::OK;
}
resource_stats.set_resource_id(rtnd_ptr->resource_desc().uuid());
resource_stats.set_timestamp(0);

CpuStats* cpu_stats = resource_stats.add_cpus_stats();
// As some of the resources is utilized by system pods, so initializing
// utilization to 10%.
cpu_stats->set_cpu_capacity(
rtnd_ptr->resource_desc().resource_capacity().cpu_cores());
cpu_stats->set_cpu_utilization(0.1);
cpu_stats->set_cpu_allocatable(cpu_stats->cpu_capacity() * 0.9);
resource_stats.set_mem_capacity(
rtnd_ptr->resource_desc().resource_capacity().ram_cap());
resource_stats.set_mem_utilization(0.1);
resource_stats.set_mem_allocatable(resource_stats.mem_capacity() * 0.9);
resource_stats.set_disk_bw(0);
resource_stats.set_net_rx_bw(0);
resource_stats.set_net_tx_bw(0);
knowledge_base_->AddMachineSample(resource_stats);
}
resource_stats.set_resource_id(rtnd_ptr->resource_desc().uuid());
resource_stats.set_timestamp(0);
CpuStats* cpu_stats = resource_stats.add_cpus_stats();
cpu_stats->set_cpu_capacity(
rtnd_ptr->resource_desc().resource_capacity().cpu_cores());
// Assuming 80% of cpu/mem is is allocatable neglecting 20% for other
// processes in node.
cpu_stats->set_cpu_allocatable(
rtnd_ptr->resource_desc().resource_capacity().cpu_cores() * 0.80);
// resource_stats.cpus_stats(0).set_cpu_utilization(0.0);
// resource_stats.cpus_stats(0).set_cpu_reservation(0.0);
resource_stats.set_mem_allocatable(
rtnd_ptr->resource_desc().resource_capacity().ram_cap());
resource_stats.set_mem_capacity(
rtnd_ptr->resource_desc().resource_capacity().ram_cap() * 0.80);
// resource_stats.set_mem_utilization(0.0);
// resource_stats.set_mem_reservation(0.0);
resource_stats.set_disk_bw(0);
resource_stats.set_net_rx_bw(0);
resource_stats.set_net_tx_bw(0);
// LOG(INFO) << "DEBUG: During node additions: CPU CAP: " <<
// cpu_stats->cpu_capacity() << "\n"
// << " CPU ALLOC: " <<
// cpu_stats->cpu_allocatable() << "\n"
// << " MEM CAP: " <<
// resource_stats.mem_capacity() << "\n"
// << " MEM ALLOC: " <<
// resource_stats.mem_allocatable();
knowledge_base_->AddMachineSample(resource_stats);
return Status::OK;
}

Expand Down Expand Up @@ -624,7 +628,7 @@ class FirmamentSchedulerServiceImpl final : public FirmamentScheduler::Service {
unordered_map<JobID_t, uint64_t, boost::hash<boost::uuids::uuid>>
job_num_tasks_to_remove_;
KnowledgeBasePopulator* kb_populator_;
//Pod affinity/anti-affinity
// Pod affinity/anti-affinity
unordered_map<string, unordered_map<string, vector<TaskID_t>>> labels_map_;
vector<TaskID_t> affinity_antiaffinity_tasks_;

Expand Down
92 changes: 59 additions & 33 deletions src/scheduling/flow/cpu_cost_model.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ DECLARE_uint64(max_tasks_per_pu);

namespace firmament {

CpuCostModel::CpuCostModel(shared_ptr<ResourceMap_t> resource_map,
shared_ptr<TaskMap_t> task_map,
shared_ptr<KnowledgeBase> knowledge_base,
unordered_map<string, unordered_map<string, vector<TaskID_t>>>* labels_map)
CpuCostModel::CpuCostModel(
shared_ptr<ResourceMap_t> resource_map, shared_ptr<TaskMap_t> task_map,
shared_ptr<KnowledgeBase> knowledge_base,
unordered_map<string, unordered_map<string, vector<TaskID_t>>>* labels_map)
: resource_map_(resource_map),
task_map_(task_map),
knowledge_base_(knowledge_base),
Expand All @@ -49,6 +49,20 @@ CpuCostModel::CpuCostModel(shared_ptr<ResourceMap_t> resource_map,
infinity_ = omega_ * CpuMemCostVector_t::dimensions_;
}

void CpuCostModel::AccumulateResourceStats(ResourceDescriptor* accumulator,
ResourceDescriptor* other) {
// Track the aggregate available resources below the machine node
ResourceVector* acc_avail = accumulator->mutable_available_resources();
ResourceVector* other_avail = other->mutable_available_resources();
acc_avail->set_cpu_cores(acc_avail->cpu_cores() + other_avail->cpu_cores());
// Running/idle task count
accumulator->set_num_running_tasks_below(
accumulator->num_running_tasks_below() +
other->num_running_tasks_below());
accumulator->set_num_slots_below(accumulator->num_slots_below() +
other->num_slots_below());
}

ArcDescriptor CpuCostModel::TaskToUnscheduledAgg(TaskID_t task_id) {
return ArcDescriptor(2560000, 1ULL, 0ULL);
}
Expand Down Expand Up @@ -121,6 +135,7 @@ ArcDescriptor CpuCostModel::EquivClassToEquivClass(EquivClass_t ec1,
ec_index * resource_request->cpu_cores_;
available_resources.ram_cap_ = rd.available_resources().ram_cap() -
ec_index * resource_request->ram_cap_;

// Expressing Least Requested Priority.
float cpu_fraction =
((rd.resource_capacity().cpu_cores() - available_resources.cpu_cores_) /
Expand Down Expand Up @@ -187,7 +202,7 @@ ArcDescriptor CpuCostModel::EquivClassToEquivClass(EquivClass_t ec1,
}
}
}

// Expressing pod affinity/anti-affinity priority scores.
if ((affinity.has_pod_affinity() &&
affinity.pod_affinity()
Expand Down Expand Up @@ -225,10 +240,10 @@ ArcDescriptor CpuCostModel::EquivClassToEquivClass(EquivClass_t ec1,
}
}
}

cost_vector.node_affinity_soft_cost_ =
omega_ - node_affinity_normalized_score;
cost_vector.pod_affinity_soft_cost_ = omega_ - pod_affinity_normalized_score;
cost_vector.pod_affinity_soft_cost_ = omega_ - pod_affinity_normalized_score;
Cost_t final_cost = FlattenCostVector(cost_vector);
return ArcDescriptor(final_cost, 1ULL, 0ULL);
}
Expand Down Expand Up @@ -854,44 +869,54 @@ FlowGraphNode* CpuCostModel::GatherStats(FlowGraphNode* accumulator,
if (!accumulator->IsResourceNode()) {
return accumulator;
}

if (other->resource_id_.is_nil()) {
// The other node is not a resource node.
if (other->type_ == FlowNodeType::SINK) {
accumulator->rd_ptr_->set_num_running_tasks_below(static_cast<uint64_t>(
accumulator->rd_ptr_->current_running_tasks_size()));
accumulator->rd_ptr_->set_num_slots_below(FLAGS_max_tasks_per_pu);
}
if (accumulator->type_ == FlowNodeType::COORDINATOR) {
return accumulator;
}

CHECK_NOTNULL(other->rd_ptr_);
ResourceDescriptor* rd_ptr = accumulator->rd_ptr_;
CHECK_NOTNULL(rd_ptr);
if (accumulator->type_ == FlowNodeType::MACHINE) {
if (accumulator->type_ == FlowNodeType::PU) {
CHECK(other->resource_id_.is_nil());
ResourceStats latest_stats;
ResourceID_t machine_res_id =
MachineResIDForResource(accumulator->resource_id_);
bool have_sample = knowledge_base_->GetLatestStatsForMachine(machine_res_id,
&latest_stats);
if (have_sample) {
VLOG(2) << "Updating PU " << accumulator->resource_id_ << "'s "
<< "resource stats!";
// Get the CPU stats for this PU
string label = rd_ptr->friendly_name();
uint64_t idx = label.find("PU #");
if (idx != string::npos) {
string core_id_substr = label.substr(idx + 4, label.size() - idx - 4);
uint32_t core_id = strtoul(core_id_substr.c_str(), 0, 10);
float available_cpu_cores =
latest_stats.cpus_stats(core_id).cpu_capacity() *
(1.0 - latest_stats.cpus_stats(core_id).cpu_utilization());
rd_ptr->mutable_available_resources()->set_cpu_cores(
available_cpu_cores);
}
// Running/idle task count
rd_ptr->set_num_running_tasks_below(rd_ptr->current_running_tasks_size());
rd_ptr->set_num_slots_below(FLAGS_max_tasks_per_pu);
return accumulator;
}
} else if (accumulator->type_ == FlowNodeType::MACHINE) {
// Grab the latest available resource sample from the machine
ResourceStats latest_stats;
// Take the most recent sample for now
bool have_sample = knowledge_base_->GetLatestStatsForMachine(
accumulator->resource_id_, &latest_stats);
if (have_sample) {
// LOG(INFO) << "DEBUG: Size of cpu stats: " <<
// latest_stats.cpus_stats_size();
// uint32_t core_id = 0;
float available_cpu_cores = latest_stats.cpus_stats(0).cpu_allocatable();
// latest_stats.cpus_stats(core_id).cpu_capacity() *
// (1.0 - latest_stats.cpus_stats(core_id).cpu_utilization());
// auto available_ram_cap = latest_stats.mem_capacity() *
auto available_ram_cap = latest_stats.mem_allocatable();
// (1.0 - latest_stats.mem_utilization());
// LOG(INFO) << "DEBUG: Stats from latest machine sample: "
// << "Available cpu: " << available_cpu_cores << "\n"
// << "Available mem: " << available_ram_cap;
rd_ptr->mutable_available_resources()->set_cpu_cores(available_cpu_cores);
rd_ptr->mutable_available_resources()->set_ram_cap(available_ram_cap);
VLOG(2) << "Updating machine " << accumulator->resource_id_ << "'s "
<< "resource stats!";
rd_ptr->mutable_available_resources()->set_ram_cap(
latest_stats.mem_capacity() * (1.0 - latest_stats.mem_utilization()));
}
if (accumulator->rd_ptr_ && other->rd_ptr_) {
AccumulateResourceStats(accumulator->rd_ptr_, other->rd_ptr_);
}
}

return accumulator;
}

Expand All @@ -902,6 +927,7 @@ void CpuCostModel::PrepareStats(FlowGraphNode* accumulator) {
CHECK_NOTNULL(accumulator->rd_ptr_);
accumulator->rd_ptr_->clear_num_running_tasks_below();
accumulator->rd_ptr_->clear_num_slots_below();
accumulator->rd_ptr_->clear_available_resources();
// Clear maps related to priority scores.
ec_to_node_priority_scores.clear();
}
Expand Down
Loading

0 comments on commit 502f0b3

Please sign in to comment.