Skip to content

Commit

Permalink
i#6822 unscheduled: Change infinite to max timeout in scheduler (#7016)
Browse files Browse the repository at this point in the history
Adds a new drmemtrace scheduler option
scheduler_options_t.honor_infinite_timeouts and a CLI parameter
-sched_infinite_timeouts, both off by default. If turned on, these match
the previous behavior.

Changes the default behavior to use the (scaled per the scale parameter)
maximum block timeout for indefinitely-unscheduled cases, rather than
using an infinite timeout. This avoids waiting a long time for things
like background threads that do nothing but wait the entire duration of
a trace.

Adds unit test variants for both infinite and max-timeout.

Tested on a large application where this did not noticeably decrease the
successful number of direct switches, but did reduce the idle time which
was too high previously.

Issue: #6822
  • Loading branch information
derekbruening authored Oct 2, 2024
1 parent 85b547d commit 81d5748
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 6 deletions.
1 change: 1 addition & 0 deletions clients/drcachesim/analyzer_multi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,7 @@ analyzer_multi_tmpl_t<RecordType, ReaderType>::init_dynamic_schedule()
sched_ops.blocking_switch_threshold = op_sched_blocking_switch_us.get_value();
sched_ops.block_time_multiplier = op_sched_block_scale.get_value();
sched_ops.block_time_max_us = op_sched_block_max_us.get_value();
sched_ops.honor_infinite_timeouts = op_sched_infinite_timeouts.get_value();
sched_ops.migration_threshold_us = op_sched_migration_threshold_us.get_value();
sched_ops.rebalance_period_us = op_sched_rebalance_period_us.get_value();
sched_ops.randomize_next_input = op_sched_randomize.get_value();
Expand Down
8 changes: 8 additions & 0 deletions clients/drcachesim/common/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,14 @@ droption_t<bool> op_sched_disable_direct_switches(
"switch being determined by latency and the next input in the queue. The "
"TRACE_MARKER_TYPE_DIRECT_THREAD_SWITCH markers are not removed from the trace.");

droption_t<bool> op_sched_infinite_timeouts(
DROPTION_SCOPE_FRONTEND, "sched_infinite_timeouts", false,
"Whether unscheduled-indefinitely means never scheduled",
"Applies to -core_sharded and -core_serial. Determines whether an "
"unscheduled-indefinitely input really is never scheduled (set to true), or instead "
"is treated as blocked for the maximum time (scaled by the regular block scale) "
"(set to false).");

droption_t<double> op_sched_time_units_per_us(
DROPTION_SCOPE_ALL, "sched_time_units_per_us", 100.,
"Time units per simulated microsecond",
Expand Down
1 change: 1 addition & 0 deletions clients/drcachesim/common/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ extern dynamorio::droption::droption_t<std::string> op_cpu_schedule_file;
extern dynamorio::droption::droption_t<std::string> op_sched_switch_file;
extern dynamorio::droption::droption_t<bool> op_sched_randomize;
extern dynamorio::droption::droption_t<bool> op_sched_disable_direct_switches;
extern dynamorio::droption::droption_t<bool> op_sched_infinite_timeouts;
extern dynamorio::droption::droption_t<uint64_t> op_sched_migration_threshold_us;
extern dynamorio::droption::droption_t<uint64_t> op_sched_rebalance_period_us;
extern dynamorio::droption::droption_t<double> op_sched_time_units_per_us;
Expand Down
40 changes: 36 additions & 4 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1850,7 +1850,15 @@ scheduler_tmpl_t<RecordType, ReaderType>::process_next_initial_record(
}
} else if (marker_type == TRACE_MARKER_TYPE_SYSCALL_UNSCHEDULE) {
if (options_.honor_direct_switches && options_.mapping != MAP_AS_PREVIOUSLY) {
VPRINT(this, 2, "Input %d starting unscheduled\n", input.index);
input.unscheduled = true;
if (!options_.honor_infinite_timeouts) {
input.blocked_time = scale_blocked_time(options_.block_time_max_us);
// Clamp at 1 since 0 means an infinite timeout for unscheduled=true.
if (input.blocked_time == 0)
input.blocked_time = 1;
// blocked_start_time will be set when we first pop this off a queue.
}
// Ignore this marker during regular processing.
input.skip_next_unscheduled = true;
}
Expand Down Expand Up @@ -2627,13 +2635,23 @@ scheduler_tmpl_t<RecordType, ReaderType>::pop_from_ready_queue_hold_locks(
// control points we only check for being unblocked when an input
// would be chosen to run. We thus keep blocked inputs in the ready queue.
if (res->blocked_time > 0) {
assert(cur_time > 0);
--outputs_[from_output].ready_queue.num_blocked;
if (!options_.honor_infinite_timeouts) {
// cur_time can be 0 at initialization time.
if (res->blocked_start_time == 0 && cur_time > 0) {
// This was a start-unscheduled input: we didn't have a valid
// time at initialization.
res->blocked_start_time = cur_time;
}
} else
assert(cur_time > 0);
}
if (res->blocked_time > 0 &&
// XXX i#6966: We have seen wall-clock time go backward, which
// underflows here and then always unblocks the input.
cur_time - res->blocked_start_time < res->blocked_time) {
// cur_time can be 0 at initialization time.
(cur_time == 0 ||
// XXX i#6966: We have seen wall-clock time go backward, which
// underflows here and then always unblocks the input.
cur_time - res->blocked_start_time < res->blocked_time)) {
VPRINT(this, 4, "pop queue: %d still blocked for %" PRIu64 "\n",
res->index,
res->blocked_time - (cur_time - res->blocked_start_time));
Expand Down Expand Up @@ -3473,6 +3491,13 @@ scheduler_tmpl_t<RecordType, ReaderType>::process_marker(input_info_t &input,
break;
}
input.unscheduled = true;
if (!options_.honor_infinite_timeouts && input.syscall_timeout_arg == 0) {
// As our scheduling is imperfect we do not risk things being blocked
// indefinitely: we instead have a timeout, but the maximum value.
input.syscall_timeout_arg = options_.block_time_max_us;
if (input.syscall_timeout_arg == 0)
input.syscall_timeout_arg = 1;
}
if (input.syscall_timeout_arg > 0) {
input.blocked_time = scale_blocked_time(input.syscall_timeout_arg);
// Clamp at 1 since 0 means an infinite timeout for unscheduled=true.
Expand Down Expand Up @@ -3504,6 +3529,13 @@ scheduler_tmpl_t<RecordType, ReaderType>::process_marker(input_info_t &input,
}
// Trigger a switch either indefinitely or until timeout.
input.unscheduled = true;
if (!options_.honor_infinite_timeouts && input.syscall_timeout_arg == 0) {
// As our scheduling is imperfect we do not risk things being blocked
// indefinitely: we instead have a timeout, but the maximum value.
input.syscall_timeout_arg = options_.block_time_max_us;
if (input.syscall_timeout_arg == 0)
input.syscall_timeout_arg = 1;
}
if (input.syscall_timeout_arg > 0) {
input.blocked_time = scale_blocked_time(input.syscall_timeout_arg);
// Clamp at 1 since 0 means an infinite timeout for unscheduled=true.
Expand Down
6 changes: 6 additions & 0 deletions clients/drcachesim/scheduler/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,12 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
* parameter to next_record().
*/
uint64_t rebalance_period_us = 50000;
/**
* Determines whether an unscheduled-indefinitely input really is unscheduled for
* an infinite time, or instead is treated as blocked for the maxiumim time
* (#block_time_max_us) scaled by #block_time_multiplier.
*/
bool honor_infinite_timeouts = false;
};

/**
Expand Down
117 changes: 115 additions & 2 deletions clients/drcachesim/tests/scheduler_unit_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4836,7 +4836,7 @@ test_unscheduled_fallback()
make_exit(TID_C),
};
{
// Test the defaults with direct switches enabled.
// Test with direct switches enabled and infinite timeouts.
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_A)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_A);
Expand Down Expand Up @@ -4871,6 +4871,47 @@ test_unscheduled_fallback()
sched_ops.block_time_multiplier = BLOCK_SCALE;
sched_ops.block_time_max_us = BLOCK_TIME_MAX;
sched_ops.rebalance_period_us = BLOCK_TIME_MAX;
sched_ops.honor_infinite_timeouts = true;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::string> sched_as_string =
run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE, /*send_time=*/true);
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
assert(sched_as_string[0] == CORE0_SCHED_STRING);
}
{
// Test disabling infinite timeouts.
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_A)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_A);
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_B)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_B);
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_C)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_C);
// Here we see much shorter idle time before A and B finish.
static const char *const CORE0_SCHED_STRING =
"...AA.........B........CC.....__A....._____A._________B......B...._____BBBB."
"___________C.";

std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(std::move(readers));
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
/*verbosity=*/3);
sched_ops.quantum_duration_us = QUANTUM_DURATION;
// We use our mock's time==instruction count for a deterministic result.
sched_ops.quantum_unit = scheduler_t::QUANTUM_TIME;
sched_ops.time_units_per_us = 1.;
sched_ops.blocking_switch_threshold = BLOCK_LATENCY;
sched_ops.block_time_multiplier = BLOCK_SCALE;
sched_ops.block_time_max_us = BLOCK_TIME_MAX;
sched_ops.rebalance_period_us = BLOCK_TIME_MAX;
sched_ops.honor_infinite_timeouts = false;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) !=
scheduler_t::STATUS_SUCCESS)
Expand Down Expand Up @@ -4974,7 +5015,7 @@ test_unscheduled_initially()
make_exit(TID_B),
};
{
// Test the defaults with direct switches enabled.
// Test with infinite timeouts and direct switches enabled.
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_A)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_A);
Expand All @@ -4995,6 +5036,41 @@ test_unscheduled_initially()
sched_ops.blocking_switch_threshold = BLOCK_LATENCY;
sched_ops.block_time_multiplier = BLOCK_SCALE;
sched_ops.block_time_max_us = BLOCK_TIME_MAX;
sched_ops.honor_infinite_timeouts = true;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::string> sched_as_string =
run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE, /*send_time=*/true);
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
assert(sched_as_string[0] == CORE0_SCHED_STRING);
}
{
// Test without infinite timeouts.
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_A)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_A);
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_B)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_B);
// We have a medium idle period before A becomes scheduleable.
static const char *const CORE0_SCHED_STRING =
"...B....._____.....A.__________________________________B....B.";

std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(std::move(readers));
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
/*verbosity=*/3);
sched_ops.quantum_unit = scheduler_t::QUANTUM_TIME;
sched_ops.time_units_per_us = 1.;
sched_ops.blocking_switch_threshold = BLOCK_LATENCY;
sched_ops.block_time_multiplier = BLOCK_SCALE;
sched_ops.block_time_max_us = BLOCK_TIME_MAX;
sched_ops.honor_infinite_timeouts = false;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) !=
scheduler_t::STATUS_SUCCESS)
Expand Down Expand Up @@ -5230,6 +5306,7 @@ test_unscheduled_no_alternative()
std::cerr << "\n----------------\nTesting unscheduled no alternative (i#6959)\n";
static constexpr int NUM_OUTPUTS = 1;
static constexpr uint64_t REBALANCE_PERIOD_US = 50;
static constexpr uint64_t BLOCK_TIME_MAX = 200;
static constexpr memref_tid_t TID_A = 100;
std::vector<trace_entry_t> refs_A = {
make_thread(TID_A),
Expand All @@ -5249,6 +5326,7 @@ test_unscheduled_no_alternative()
make_exit(TID_A),
};
{
// Test infinite timeouts.
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_A)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_A);
Expand All @@ -5265,6 +5343,38 @@ test_unscheduled_no_alternative()
sched_ops.quantum_unit = scheduler_t::QUANTUM_TIME;
sched_ops.time_units_per_us = 1.;
sched_ops.rebalance_period_us = REBALANCE_PERIOD_US;
sched_ops.block_time_max_us = BLOCK_TIME_MAX;
sched_ops.honor_infinite_timeouts = true;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) !=
scheduler_t::STATUS_SUCCESS)
assert(false);
std::vector<std::string> sched_as_string =
run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_A, /*send_time=*/true);
for (int i = 0; i < NUM_OUTPUTS; i++) {
std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
}
assert(sched_as_string[0] == CORE0_SCHED_STRING);
}
{
// Test finite timeouts.
std::vector<scheduler_t::input_reader_t> readers;
readers.emplace_back(std::unique_ptr<mock_reader_t>(new mock_reader_t(refs_A)),
std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_A);
static const char *const CORE0_SCHED_STRING = "...A......____________________A.";

std::vector<scheduler_t::input_workload_t> sched_inputs;
sched_inputs.emplace_back(std::move(readers));
scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
scheduler_t::DEPENDENCY_TIMESTAMPS,
scheduler_t::SCHEDULER_DEFAULTS,
/*verbosity=*/3);
// We use our mock's time==instruction count for a deterministic result.
sched_ops.quantum_unit = scheduler_t::QUANTUM_TIME;
sched_ops.time_units_per_us = 1.;
sched_ops.rebalance_period_us = REBALANCE_PERIOD_US;
sched_ops.block_time_max_us = BLOCK_TIME_MAX;
sched_ops.honor_infinite_timeouts = false;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) !=
scheduler_t::STATUS_SUCCESS)
Expand Down Expand Up @@ -5785,6 +5895,8 @@ test_rebalancing()
static constexpr int NUM_INSTRS = QUANTUM_DURATION * 3;
static constexpr int REBALANCE_PERIOD = NUM_OUTPUTS * 20 * NUM_INPUTS_UNSCHED;
static constexpr int MIGRATION_THRESHOLD = QUANTUM_DURATION;
// Keep unscheduled for longer.
static constexpr uint64_t BLOCK_TIME_MAX = 250000;
static constexpr memref_tid_t TID_BASE = 100;
static constexpr memref_tid_t TID_A = TID_BASE + 0;
static constexpr memref_tid_t TID_B = TID_BASE + 1;
Expand Down Expand Up @@ -5865,6 +5977,7 @@ test_rebalancing()
sched_ops.block_time_multiplier = BLOCK_SCALE;
sched_ops.migration_threshold_us = MIGRATION_THRESHOLD;
sched_ops.rebalance_period_us = REBALANCE_PERIOD;
sched_ops.block_time_max_us = BLOCK_TIME_MAX;
scheduler_t scheduler;
if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) !=
scheduler_t::STATUS_SUCCESS)
Expand Down

0 comments on commit 81d5748

Please sign in to comment.