Skip to content

Commit

Permalink
i#6938 sched migrate: Print blocked times in queues
Browse files Browse the repository at this point in the history
Improves diagnostics by augmenting the all-runqueue printing:

+ It now constructs its many-line string in memory and then prints
  it all at once, to make it more atomic.

+ It includes the remaining blocked times for blocked inputs.

+ It is moved from pop_from_ready_queue() where the popped input is in
  flux to pick_next_input() where the current running input is valid.

+ It is printed more frequently.

Also prints the size of the unscheduled queue when moving it.

Issue: #6938
  • Loading branch information
derekbruening committed Oct 3, 2024
1 parent 81d5748 commit 2acb4a3
Showing 1 changed file with 38 additions and 14 deletions.
52 changes: 38 additions & 14 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,9 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
}
}
}

VPRINT(this, 1, "%zu inputs\n", inputs_.size());

live_input_count_.store(static_cast<int>(inputs_.size()), std::memory_order_release);

res = read_switch_sequences();
Expand Down Expand Up @@ -2765,13 +2767,6 @@ scheduler_tmpl_t<RecordType, ReaderType>::pop_from_ready_queue(
}
status = pop_from_ready_queue_hold_locks(from_output, for_output, new_input);
}
VDO(this, 1, {
static int global_heartbeat;
// We are ok with races as the cadence is approximate.
if (++global_heartbeat % 100000 == 0) {
print_queue_stats();
}
});
return status;
}

Expand Down Expand Up @@ -3111,6 +3106,13 @@ typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t output,
uint64_t blocked_time)
{
VDO(this, 1, {
static int global_heartbeat;
// We are ok with races as the cadence is approximate.
if (++global_heartbeat % 10000 == 0) {
print_queue_stats();
}
});

sched_type_t::stream_status_t res = sched_type_t::STATUS_OK;
const input_ordinal_t prev_index = outputs_[output].cur_input;
Expand Down Expand Up @@ -4240,15 +4242,34 @@ scheduler_tmpl_t<RecordType, ReaderType>::print_queue_stats()
unsched_size = unscheduled_priority_.queue.size();
}
int live = live_input_count_.load(std::memory_order_acquire);
VPRINT(this, 1, "inputs: %zd scheduleable, %zd unscheduled, %zd eof\n",
live - unsched_size, unsched_size, inputs_.size() - live);
// Make our multi-line output more atomic.
std::ostringstream ostr;
ostr << "Queue snapshot: inputs: " << live - unsched_size << " scheduleable, "
<< unsched_size << " unscheduled, " << inputs_.size() - live << " eof\n";
for (unsigned int i = 0; i < outputs_.size(); ++i) {
auto lock = acquire_scoped_output_lock_if_necessary(i);
VPRINT(this, 1, " out #%d: running #%d; %zd in queue; %d blocked\n", i,
// XXX: Reading this is racy; we're ok with that.
outputs_[i].cur_input, outputs_[i].ready_queue.queue.size(),
outputs_[i].ready_queue.num_blocked);
uint64_t cur_time = get_output_time(i);
ostr << " out #" << i << " @" << cur_time << ": running #"
<< outputs_[i].cur_input << "; " << outputs_[i].ready_queue.queue.size()
<< " in queue; " << outputs_[i].ready_queue.num_blocked << " blocked\n";
std::set<input_info_t *> readd;
input_info_t *res = nullptr;
while (!outputs_[i].ready_queue.queue.empty()) {
res = outputs_[i].ready_queue.queue.top();
readd.insert(res);
outputs_[i].ready_queue.queue.pop();
std::lock_guard<mutex_dbg_owned> input_lock(*res->lock);
if (res->blocked_time > 0) {
ostr << " " << res->index << " still blocked for "
<< res->blocked_time - (cur_time - res->blocked_start_time) << "\n";
}
}
// Re-add the ones we skipped, but without changing their counters so we preserve
// the prior FIFO order.
for (input_info_t *add : readd)
outputs_[i].ready_queue.queue.push(add);
}
VPRINT(this, 1, "%s\n", ostr.str().c_str());
}

template <typename RecordType, typename ReaderType>
Expand Down Expand Up @@ -4288,7 +4309,10 @@ scheduler_tmpl_t<RecordType, ReaderType>::rebalance_queues(
}
if (live_input_count_.load(std::memory_order_acquire) ==
static_cast<int>(unsched_size)) {
VPRINT(this, 1, "rebalancing moving entire unscheduled queue to ready_queues\n");
VPRINT(
this, 1,
"rebalancing moving entire unscheduled queue (%zu entries) to ready_queues\n",
unsched_size);
{
std::lock_guard<mutex_dbg_owned> unsched_lock(*unscheduled_priority_.lock);
while (!unscheduled_priority_.queue.empty()) {
Expand Down

0 comments on commit 2acb4a3

Please sign in to comment.