From becd856ddb0fd9c54bc8d384758efb65d5b8ea09 Mon Sep 17 00:00:00 2001
From: Derek Bruening <bruening@google.com>
Date: Thu, 3 Oct 2024 20:02:30 -0400
Subject: [PATCH] i#6959: Add exit_if_fraction_inputs_left option (#7018)

Adds a new scheduler feature and CLI option exit_if_fraction_inputs_left. This
applies to -core_sharded and -core_serial modes. When an input reaches
EOF, if the number of non-EOF inputs left as a fraction of the original
inputs is equal to or less than this value then the scheduler exits
(sets all outputs to EOF) rather than finishing off the final inputs.
This helps avoid long sequences of idles during staggered endings with
fewer inputs left than cores and only a small fraction of the total
instructions left in those inputs.

The default value in scheduler_options_t and the CLI option is 0.05 (i.e., 5%),
which when tested on an large internal trace helps eliminate much of the
final idle time from the cores without losing many instructions.

Compare the numbers below for today's default with a long idle time and
so distinct differences between the "cpu busy by time" and "cpu busy by
time, ignoring idle past last instr" stats on a 39-core schedule-stats
run of a moderately large trace, with key stats and the 1st 2 cores (for
brevity) shown here:

```
  1567052521 instructions
   878027975 idles
       64.09% cpu busy by record count
       82.38% cpu busy by time
       96.81% cpu busy by time, ignoring idle past last instr
Core #0 schedule: CccccccOXHhUuuuuAaSEOGOWEWQqqqFffIiTETENWwwOWEeeeeeeACMmTQFfOWLWVvvvvFQqqqqYOWOooOWOYOYQOWO_O_W_O_W_O_W_O_WO_WO_O_O_O_O_O_OR_R_R_R_R_R_R_R_R_R_R_R_R_R_R_R_R_R_R_R_R_R_R_R_R_RY_YyyyySUuuOSISO_S_S_SOPpSOKO_KO_KCcDKWDB_B_____________________________________________
Core #1 schedule: KkLWSFUQPDddddddddXxSUSVRJWKkRNJBWUWwwTttGgRNKkkRWNTtFRWKkRNWUuuGULRFSRSYKkkkRYAYFffGSRYHRYHNWMDddddddddRYGgggggYHNWK_YAHYNnGYSNHWwwwwSWSNKSYyyWKNNWKNNGAKWGggNnNW_NNWE_E_EF__________________________________________________
```

And now with -exit_if_fraction_inputs_left 0.05, where we lose (1567052521 -
1564522227)/1567052521. = 0.16% of the instructions but drastically
reduce the tail from 14% of the time to less than 1% of the time:

```
  1564522227 instructions
   120512812 idles
       92.85% cpu busy by record count
       96.39% cpu busy by time
       97.46% cpu busy by time, ignoring idle past last instr
Core #0 schedule: CccccccOXHKYEGGETRARrrPRTVvvvRrrNWwwOOKWVRRrPBbbXUVvvvvvOWKVLWVvvJjSOWKVUuTIiiiFPpppKAaaMFfffAHOKWAaGNBOWKAPPOABCWKPWOKWPCXxxxZOWKCccJSOSWKJUYRCOWKCcSOSUKkkkOROK_O_O_O_O_O
Core #1 schedule: KkLWSMmmFLSFffffffJjWBbGBUuuuuuuuuuuBDBJJRJWKkRNJWMBKkkRNWKkRNWKkkkRNWXxxxxxZOooAaUIiTHhhhSDNnnnHZzQNnnRNWXxxxxxRNWUuuRNWKXUuXRNKRWKNXxxRWKONNHRKWONURKWXRKXRKNW_KR_KkRK_KRKR_R_R_R_R_R_R_R_R_R_R_R__R__R__R___R___R___R___R___R
```

Fixes #6959
---
 clients/drcachesim/analyzer_multi.cpp         |   2 +
 clients/drcachesim/common/options.cpp         |  12 ++
 clients/drcachesim/common/options.h           |   1 +
 clients/drcachesim/scheduler/scheduler.cpp    |  69 +++++++--
 clients/drcachesim/scheduler/scheduler.h      |  16 ++-
 .../drcachesim/tests/scheduler_unit_tests.cpp | 136 ++++++++++++++++--
 6 files changed, 208 insertions(+), 28 deletions(-)

diff --git a/clients/drcachesim/analyzer_multi.cpp b/clients/drcachesim/analyzer_multi.cpp
index beb0e526e6f..c27f231831c 100644
--- a/clients/drcachesim/analyzer_multi.cpp
+++ b/clients/drcachesim/analyzer_multi.cpp
@@ -574,6 +574,8 @@ analyzer_multi_tmpl_t<RecordType, ReaderType>::init_dynamic_schedule()
     sched_ops.rebalance_period_us = op_sched_rebalance_period_us.get_value();
     sched_ops.randomize_next_input = op_sched_randomize.get_value();
     sched_ops.honor_direct_switches = !op_sched_disable_direct_switches.get_value();
+    sched_ops.exit_if_fraction_inputs_left =
+        op_sched_exit_if_fraction_inputs_left.get_value();
 #ifdef HAS_ZIP
     if (!op_record_file.get_value().empty()) {
         record_schedule_zip_.reset(new zipfile_ostream_t(op_record_file.get_value()));
diff --git a/clients/drcachesim/common/options.cpp b/clients/drcachesim/common/options.cpp
index ecf253fd7dd..2a7d1038dc4 100644
--- a/clients/drcachesim/common/options.cpp
+++ b/clients/drcachesim/common/options.cpp
@@ -1034,6 +1034,18 @@ droption_t<uint64_t> op_sched_rebalance_period_us(
     "The period in simulated microseconds at which per-core run queues are re-balanced "
     "to redistribute load.");
 
+droption_t<double> op_sched_exit_if_fraction_inputs_left(
+    DROPTION_SCOPE_FRONTEND, "sched_exit_if_fraction_inputs_left", 0.05,
+    "Exit if non-EOF inputs left are <= this fraction of the total",
+    "Applies to -core_sharded and -core_serial.  When an input reaches EOF, if the "
+    "number of non-EOF inputs left as a fraction of the original inputs is equal to or "
+    "less than this value then the scheduler exits (sets all outputs to EOF) rather than "
+    "finishing off the final inputs.  This helps avoid long sequences of idles during "
+    "staggered endings with fewer inputs left than cores and only a small fraction of "
+    "the total instructions left in those inputs.  Since the remaining instruction "
+    "count is not considered (as it is not available), use discretion when raising "
+    "this value on uneven inputs.");
+
 // Schedule_stats options.
 droption_t<uint64_t>
     op_schedule_stats_print_every(DROPTION_SCOPE_ALL, "schedule_stats_print_every",
diff --git a/clients/drcachesim/common/options.h b/clients/drcachesim/common/options.h
index 75633a1928c..ef24824b4d0 100644
--- a/clients/drcachesim/common/options.h
+++ b/clients/drcachesim/common/options.h
@@ -221,6 +221,7 @@ extern dynamorio::droption::droption_t<bool> op_sched_infinite_timeouts;
 extern dynamorio::droption::droption_t<uint64_t> op_sched_migration_threshold_us;
 extern dynamorio::droption::droption_t<uint64_t> op_sched_rebalance_period_us;
 extern dynamorio::droption::droption_t<double> op_sched_time_units_per_us;
+extern dynamorio::droption::droption_t<double> op_sched_exit_if_fraction_inputs_left;
 extern dynamorio::droption::droption_t<uint64_t> op_schedule_stats_print_every;
 extern dynamorio::droption::droption_t<std::string> op_syscall_template_file;
 extern dynamorio::droption::droption_t<uint64_t> op_filter_stop_timestamp;
diff --git a/clients/drcachesim/scheduler/scheduler.cpp b/clients/drcachesim/scheduler/scheduler.cpp
index 13e88563c86..35466c79bb6 100644
--- a/clients/drcachesim/scheduler/scheduler.cpp
+++ b/clients/drcachesim/scheduler/scheduler.cpp
@@ -725,6 +725,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::print_configuration()
            options_.rebalance_period_us);
     VPRINT(this, 1, "  %-25s : %d\n", "honor_infinite_timeouts",
            options_.honor_infinite_timeouts);
+    VPRINT(this, 1, "  %-25s : %f\n", "exit_if_fraction_inputs_left",
+           options_.exit_if_fraction_inputs_left);
 }
 
 template <typename RecordType, typename ReaderType>
@@ -1027,6 +1029,11 @@ scheduler_tmpl_t<RecordType, ReaderType>::legacy_field_support()
         error_string_ = "block_time_max_us must be > 0";
         return STATUS_ERROR_INVALID_PARAMETER;
     }
+    if (options_.exit_if_fraction_inputs_left < 0. ||
+        options_.exit_if_fraction_inputs_left > 1.) {
+        error_string_ = "exit_if_fraction_inputs_left must be 0..1";
+        return STATUS_ERROR_INVALID_PARAMETER;
+    }
     return STATUS_SUCCESS;
 }
 
@@ -2339,7 +2346,11 @@ scheduler_tmpl_t<RecordType, ReaderType>::advance_region_of_interest(
                         return status;
                 }
                 input.queue.push_back(create_thread_exit(input.tid));
-                mark_input_eof(input);
+                sched_type_t::stream_status_t status = mark_input_eof(input);
+                // For early EOF we still need our synthetic exit so do not return it yet.
+                if (status != sched_type_t::STATUS_OK &&
+                    status != sched_type_t::STATUS_EOF)
+                    return status;
                 return sched_type_t::STATUS_SKIPPED;
             }
         }
@@ -2466,7 +2477,9 @@ scheduler_tmpl_t<RecordType, ReaderType>::skip_instructions(input_info_t &input,
         input.instrs_pre_read = 0;
     }
     if (*input.reader == *input.reader_end) {
-        mark_input_eof(input);
+        sched_type_t::stream_status_t status = mark_input_eof(input);
+        if (status != sched_type_t::STATUS_OK)
+            return status;
         // Raise error because the input region is out of bounds, unless the max
         // was used which we ourselves use internally for times_of_interest.
         if (skip_amount >= std::numeric_limits<uint64_t>::max() - 2) {
@@ -3114,11 +3127,13 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input_as_previously(
         // queued candidate record, if any.
         clear_input_queue(inputs_[index]);
         inputs_[index].queue.push_back(create_thread_exit(inputs_[index].tid));
-        mark_input_eof(inputs_[index]);
         VPRINT(this, 2, "early end for input %d\n", index);
         // We're done with this entry but we need the queued record to be read,
         // so we do not move past the entry.
         outputs_[output].record_index->fetch_add(1, std::memory_order_release);
+        sched_type_t::stream_status_t status = mark_input_eof(inputs_[index]);
+        if (status != sched_type_t::STATUS_OK)
+            return status;
         return sched_type_t::STATUS_SKIPPED;
     } else if (segment.type == schedule_record_t::SKIP) {
         std::lock_guard<mutex_dbg_owned> lock(*inputs_[index].lock);
@@ -3461,8 +3476,11 @@ scheduler_tmpl_t<RecordType, ReaderType>::pick_next_input(output_ordinal_t outpu
         if (inputs_[index].at_eof ||
             *inputs_[index].reader == *inputs_[index].reader_end) {
             VPRINT(this, 2, "next_record[%d]: input #%d at eof\n", output, index);
-            if (!inputs_[index].at_eof)
-                mark_input_eof(inputs_[index]);
+            if (!inputs_[index].at_eof) {
+                sched_type_t::stream_status_t status = mark_input_eof(inputs_[index]);
+                if (status != sched_type_t::STATUS_OK)
+                    return status;
+            }
             index = INVALID_INPUT_ORDINAL;
             // Loop and pick next thread.
             continue;
@@ -3813,8 +3831,11 @@ scheduler_tmpl_t<RecordType, ReaderType>::next_record(output_ordinal_t output,
                 input->needs_advance = true;
             }
             if (input->at_eof || *input->reader == *input->reader_end) {
-                if (!input->at_eof)
-                    mark_input_eof(*input);
+                if (!input->at_eof) {
+                    sched_type_t::stream_status_t status = mark_input_eof(*input);
+                    if (status != sched_type_t::STATUS_OK)
+                        return status;
+                }
                 lock.unlock();
                 VPRINT(this, 5, "next_record[%d]: need new input (cur=%d eof)\n", output,
                        input->index);
@@ -4178,17 +4199,28 @@ scheduler_tmpl_t<RecordType, ReaderType>::stop_speculation(output_ordinal_t outp
 }
 
 template <typename RecordType, typename ReaderType>
-void
+typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
 scheduler_tmpl_t<RecordType, ReaderType>::mark_input_eof(input_info_t &input)
 {
     assert(input.lock->owned_by_cur_thread());
     if (input.at_eof)
-        return;
+        return sched_type_t::STATUS_OK;
     input.at_eof = true;
-    assert(live_input_count_.load(std::memory_order_acquire) > 0);
-    live_input_count_.fetch_add(-1, std::memory_order_release);
-    VPRINT(this, 2, "input %d at eof; %d live inputs left\n", input.index,
-           live_input_count_.load(std::memory_order_acquire));
+#ifndef NDEBUG
+    int old_count =
+#endif
+        live_input_count_.fetch_add(-1, std::memory_order_release);
+    assert(old_count > 0);
+    int live_inputs = live_input_count_.load(std::memory_order_acquire);
+    VPRINT(this, 2, "input %d at eof; %d live inputs left\n", input.index, live_inputs);
+    if (options_.mapping == MAP_TO_ANY_OUTPUT &&
+        live_inputs <=
+            static_cast<int>(inputs_.size() * options_.exit_if_fraction_inputs_left)) {
+        VPRINT(this, 1, "exiting early at input %d with %d live inputs left\n",
+               input.index, live_inputs);
+        return sched_type_t::STATUS_EOF;
+    }
+    return sched_type_t::STATUS_OK;
 }
 
 template <typename RecordType, typename ReaderType>
@@ -4199,8 +4231,8 @@ scheduler_tmpl_t<RecordType, ReaderType>::eof_or_idle(output_ordinal_t output,
     // XXX i#6831: Refactor to use subclasses or templates to specialize
     // scheduler code based on mapping options, to avoid these top-level
     // conditionals in many functions?
-    if (options_.mapping == MAP_TO_CONSISTENT_OUTPUT ||
-        live_input_count_.load(std::memory_order_acquire) == 0 ||
+    int live_inputs = live_input_count_.load(std::memory_order_acquire);
+    if (options_.mapping == MAP_TO_CONSISTENT_OUTPUT || live_inputs == 0 ||
         // While a full schedule recorded should have each input hit either its
         // EOF or ROI end, we have a fallback to avoid hangs for possible recorded
         // schedules that end an input early deliberately without an ROI.
@@ -4209,6 +4241,13 @@ scheduler_tmpl_t<RecordType, ReaderType>::eof_or_idle(output_ordinal_t output,
         assert(options_.mapping != MAP_AS_PREVIOUSLY || outputs_[output].at_eof);
         return sched_type_t::STATUS_EOF;
     }
+    if (options_.mapping == MAP_TO_ANY_OUTPUT &&
+        live_inputs <=
+            static_cast<int>(inputs_.size() * options_.exit_if_fraction_inputs_left)) {
+        VPRINT(this, 1, "output %d exiting early with %d live inputs left\n", output,
+               live_inputs);
+        return sched_type_t::STATUS_EOF;
+    }
     // Before going idle, try to steal work from another output.
     // We start with us+1 to avoid everyone stealing from the low-numbered outputs.
     // We only try when we first transition to idle; we rely on rebalancing after that,
diff --git a/clients/drcachesim/scheduler/scheduler.h b/clients/drcachesim/scheduler/scheduler.h
index 3a8182b83b6..70d9ba1c944 100644
--- a/clients/drcachesim/scheduler/scheduler.h
+++ b/clients/drcachesim/scheduler/scheduler.h
@@ -808,6 +808,17 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
          * (#block_time_max_us) scaled by #block_time_multiplier.
          */
         bool honor_infinite_timeouts = false;
+        /**
+         * For #MAP_TO_ANY_OUTPUT, when an input reaches EOF, if the number of non-EOF
+         * inputs left as a fraction of the original inputs is equal to or less than
+         * this value then the scheduler exits (sets all outputs to EOF) rather than
+         * finishing off the final inputs.  This helps avoid long sequences of idles
+         * during staggered endings with fewer inputs left than cores and only a small
+         * fraction of the total instructions left in those inputs.  Since the remaining
+         * instruction count is not considered (as it is not available), use discretion
+         * when raising this value on uneven inputs.
+         */
+        double exit_if_fraction_inputs_left = 0.05;
         // When adding new options, also add to print_configuration().
     };
 
@@ -2036,7 +2047,10 @@ template <typename RecordType, typename ReaderType> class scheduler_tmpl_t {
     set_output_active(output_ordinal_t output, bool active);
 
     // Caller must hold the input's lock.
-    void
+    // The return value is STATUS_EOF if a global exit is now happening (an
+    // early exit); otherwise STATUS_OK is returned on success but only a
+    // local EOF.
+    stream_status_t
     mark_input_eof(input_info_t &input);
 
     // Determines whether to exit or wait for other outputs when one output
diff --git a/clients/drcachesim/tests/scheduler_unit_tests.cpp b/clients/drcachesim/tests/scheduler_unit_tests.cpp
index 7a5394054d1..e69d23a05ee 100644
--- a/clients/drcachesim/tests/scheduler_unit_tests.cpp
+++ b/clients/drcachesim/tests/scheduler_unit_tests.cpp
@@ -1598,6 +1598,8 @@ test_synthetic_with_timestamps()
                                                scheduler_t::SCHEDULER_DEFAULTS,
                                                /*verbosity=*/3);
     sched_ops.quantum_duration_instrs = 3;
+    // Test dropping a final "_" from core0.
+    sched_ops.exit_if_fraction_inputs_left = 0.1;
     scheduler_t scheduler;
     if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) !=
         scheduler_t::STATUS_SUCCESS)
@@ -1615,9 +1617,8 @@ test_synthetic_with_timestamps()
     // thus core0 has C,I,B,H,D and core1 has F,J,E,A,G.
     // We should interleave within each group -- except once we reach J
     // we should completely finish it.  There should be no migrations.
-    assert(
-        sched_as_string[0] ==
-        ".CC.C.II.IC.CC.I.II.CC.C.II.I..BB.B.HH.HB.BB.H.HH.BB.B.HH.H..DD.DD.DD.DD.D._");
+    assert(sched_as_string[0] ==
+           ".CC.C.II.IC.CC.I.II.CC.C.II.I..BB.B.HH.HB.BB.H.HH.BB.B.HH.H..DD.DD.DD.DD.D.");
     assert(sched_as_string[1] ==
            ".FF.F.JJ.JJ.JJ.JJ.J.F.FF.FF.F..EE.EE.EE.EE.E..AA.A.GG.GA.AA.G.GG.AA.A.GG.G.");
     // Check scheduler stats.  # switches is the # of letter transitions; # preempts
@@ -1625,7 +1626,7 @@ test_synthetic_with_timestamps()
     // appearing in between (and ignoring the last letter for an input: EOF doesn't
     // count as a preempt).
     verify_scheduler_stats(scheduler.get_stream(0), /*switch_input_to_input=*/12,
-                           /*switch_input_to_idle=*/1, /*switch_idle_to_input=*/0,
+                           /*switch_input_to_idle=*/0, /*switch_idle_to_input=*/0,
                            /*switch_nop=*/2, /*preempts=*/10, /*direct_attempts=*/0,
                            /*direct_successes=*/0, /*migrations=*/0);
     verify_scheduler_stats(scheduler.get_stream(1), /*switch_input_to_input=*/9,
@@ -1699,6 +1700,8 @@ test_synthetic_with_priorities()
                                                scheduler_t::SCHEDULER_DEFAULTS,
                                                /*verbosity=*/3);
     sched_ops.quantum_duration_instrs = 3;
+    // Test dropping a final "_" from core0.
+    sched_ops.exit_if_fraction_inputs_left = 0.1;
     scheduler_t scheduler;
     if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) !=
         scheduler_t::STATUS_SUCCESS)
@@ -1711,9 +1714,8 @@ test_synthetic_with_priorities()
     // See the test_synthetic_with_timestamps() test which has our base sequence.
     // We've elevated B, E, and H to higher priorities so they go
     // first.  J remains uninterrupted due to lower timestamps.
-    assert(
-        sched_as_string[0] ==
-        ".BB.B.HH.HB.BB.H.HH.BB.B.HH.H..FF.F.JJ.JJ.JJ.JJ.J.F.FF.FF.F..DD.DD.DD.DD.D._");
+    assert(sched_as_string[0] ==
+           ".BB.B.HH.HB.BB.H.HH.BB.B.HH.H..FF.F.JJ.JJ.JJ.JJ.J.F.FF.FF.F..DD.DD.DD.DD.D.");
     assert(sched_as_string[1] ==
            ".EE.EE.EE.EE.E..CC.C.II.IC.CC.I.II.CC.C.II.I..AA.A.GG.GA.AA.G.GG.AA.A.GG.G.");
     // Check scheduler stats.  # switches is the # of letter transitions; # preempts
@@ -1721,7 +1723,7 @@ test_synthetic_with_priorities()
     // appearing in between (and ignoring the last letter for an input: EOF doesn't
     // count as a preempt).
     verify_scheduler_stats(scheduler.get_stream(0), /*switch_input_to_input=*/9,
-                           /*switch_input_to_idle=*/1, /*switch_idle_to_input=*/0,
+                           /*switch_input_to_idle=*/0, /*switch_idle_to_input=*/0,
                            /*switch_nop=*/5, /*preempts=*/10, /*direct_attempts=*/0,
                            /*direct_successes=*/0, /*migrations=*/0);
     verify_scheduler_stats(scheduler.get_stream(1), /*switch_input_to_input=*/12,
@@ -2032,6 +2034,8 @@ test_synthetic_with_syscalls_multiple()
     sched_ops.time_units_per_us = 1.;
     sched_ops.blocking_switch_threshold = BLOCK_LATENCY;
     sched_ops.block_time_multiplier = BLOCK_SCALE;
+    // Test dropping a bunch of final "_" from core1.
+    sched_ops.exit_if_fraction_inputs_left = 0.1;
     scheduler_t scheduler;
     if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) !=
         scheduler_t::STATUS_SUCCESS)
@@ -2054,9 +2058,8 @@ test_synthetic_with_syscalls_multiple()
     // explains why the two strings are different lengths.
     assert(sched_as_string[0] ==
            "BHHHFFFJJJJJJBHHHJJJFFFFFFBHHHDDDDDDDDDB__________B__________B__________B____"
-           "______B_______B");
-    assert(sched_as_string[1] ==
-           "EECCCIIICCCIIIEECCCIIIAAAGGGEEAAAGGEEGAAEGGAG_________");
+           "______B__________B");
+    assert(sched_as_string[1] == "EECCCIIICCCIIIEECCCIIIAAAGGGEEAAAGGEEGAAEGGAG");
     // Check scheduler stats.  # switches is the # of letter transitions; # preempts
     // is the instances where the same letter appears 3 times without another letter
     // appearing in between (and ignoring the last letter for an input: EOF doesn't
@@ -2066,7 +2069,7 @@ test_synthetic_with_syscalls_multiple()
                            /*switch_nop=*/4, /*preempts=*/10, /*direct_attempts=*/0,
                            /*direct_successes=*/0, /*migrations=*/0);
     verify_scheduler_stats(scheduler.get_stream(1), /*switch_input_to_input=*/19,
-                           /*switch_input_to_idle=*/1, /*switch_idle_to_input=*/0,
+                           /*switch_input_to_idle=*/0, /*switch_idle_to_input=*/0,
                            /*switch_nop=*/3, /*preempts=*/16, /*direct_attempts=*/0,
                            /*direct_successes=*/0, /*migrations=*/0);
 }
@@ -6001,6 +6004,114 @@ test_rebalancing()
     }
 }
 
+static void
+test_exit_early()
+{
+    std::cerr << "\n----------------\nTesting exiting early\n";
+    static constexpr int NUM_INPUTS = 12;
+    static constexpr int NUM_OUTPUTS = 2;
+    static constexpr int NUM_INSTRS = 9;
+    static constexpr int QUANTUM_DURATION = 3;
+    static constexpr memref_tid_t TID_BASE = 100;
+    static constexpr uint64_t TIMESTAMP = 101;
+    static constexpr uint64_t BLOCK_LATENCY = 1500;
+    std::vector<trace_entry_t> inputs[NUM_INPUTS];
+    for (int i = 0; i < NUM_INPUTS; i++) {
+        memref_tid_t tid = TID_BASE + i;
+        inputs[i].push_back(make_thread(tid));
+        inputs[i].push_back(make_pid(1));
+        inputs[i].push_back(make_version(TRACE_ENTRY_VERSION));
+        inputs[i].push_back(make_timestamp(TIMESTAMP)); // All the same time priority.
+        for (int j = 0; j < NUM_INSTRS; j++) {
+            inputs[i].push_back(make_instr(42 + j * 4));
+            // One input has a long blocking syscall toward the end.
+            if (i == 0 && j == NUM_INSTRS - 2) {
+                inputs[i].push_back(make_timestamp(TIMESTAMP));
+                inputs[i].push_back(make_marker(TRACE_MARKER_TYPE_SYSCALL, 42));
+                inputs[i].push_back(
+                    make_marker(TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL, 0));
+                inputs[i].push_back(make_timestamp(TIMESTAMP + BLOCK_LATENCY));
+            }
+        }
+        inputs[i].push_back(make_exit(tid));
+    }
+    {
+        // Run without any early exit.
+        std::vector<scheduler_t::input_workload_t> sched_inputs;
+        for (int i = 0; i < NUM_INPUTS; i++) {
+            std::vector<scheduler_t::input_reader_t> readers;
+            readers.emplace_back(
+                std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs[i])),
+                std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_BASE + i);
+            sched_inputs.emplace_back(std::move(readers));
+        }
+        scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
+                                                   scheduler_t::DEPENDENCY_IGNORE,
+                                                   scheduler_t::SCHEDULER_DEFAULTS,
+                                                   /*verbosity=*/2);
+        // We use our mock's time==instruction count for a deterministic result.
+        sched_ops.time_units_per_us = 1.;
+        sched_ops.quantum_duration_instrs = QUANTUM_DURATION;
+        sched_ops.blocking_switch_threshold = BLOCK_LATENCY;
+        sched_ops.exit_if_fraction_inputs_left = 0.;
+        scheduler_t scheduler;
+        if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) !=
+            scheduler_t::STATUS_SUCCESS)
+            assert(false);
+        std::vector<std::string> sched_as_string =
+            run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE, /*send_time=*/true);
+        for (int i = 0; i < NUM_OUTPUTS; i++) {
+            std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
+        }
+        // We have a long idle wait just to execute A's final instruction.
+        static const char *const CORE0_SCHED_STRING =
+            "..AAA..CCC..EEE..GGG..III..KKKAAACCCEEEGGGIIIKKKAA....CCC.EEE.GGG.III.KKK.__"
+            "_________________________________________________________________A.";
+        static const char *const CORE1_SCHED_STRING =
+            "..BBB..DDD..FFF..HHH..JJJ..LLLBBBDDDFFFHHHJJJLLLBBB.DDD.FFF.HHH.JJJ.LLL.____"
+            "___________________________________________________________________";
+        assert(sched_as_string[0] == CORE0_SCHED_STRING);
+        assert(sched_as_string[1] == CORE1_SCHED_STRING);
+    }
+    {
+        // Run with any early exit.
+        std::vector<scheduler_t::input_workload_t> sched_inputs;
+        for (int i = 0; i < NUM_INPUTS; i++) {
+            std::vector<scheduler_t::input_reader_t> readers;
+            readers.emplace_back(
+                std::unique_ptr<mock_reader_t>(new mock_reader_t(inputs[i])),
+                std::unique_ptr<mock_reader_t>(new mock_reader_t()), TID_BASE + i);
+            sched_inputs.emplace_back(std::move(readers));
+        }
+        scheduler_t::scheduler_options_t sched_ops(scheduler_t::MAP_TO_ANY_OUTPUT,
+                                                   scheduler_t::DEPENDENCY_IGNORE,
+                                                   scheduler_t::SCHEDULER_DEFAULTS,
+                                                   /*verbosity=*/2);
+        // We use our mock's time==instruction count for a deterministic result.
+        sched_ops.time_units_per_us = 1.;
+        sched_ops.quantum_duration_instrs = QUANTUM_DURATION;
+        sched_ops.blocking_switch_threshold = BLOCK_LATENCY;
+        // NUM_INPUTS=11 * 0.1 = 1.1 so we'll exit with 1 input left.
+        sched_ops.exit_if_fraction_inputs_left = 0.1;
+        scheduler_t scheduler;
+        if (scheduler.init(sched_inputs, NUM_OUTPUTS, std::move(sched_ops)) !=
+            scheduler_t::STATUS_SUCCESS)
+            assert(false);
+        std::vector<std::string> sched_as_string =
+            run_lockstep_simulation(scheduler, NUM_OUTPUTS, TID_BASE, /*send_time=*/true);
+        for (int i = 0; i < NUM_OUTPUTS; i++) {
+            std::cerr << "cpu #" << i << " schedule: " << sched_as_string[i] << "\n";
+        }
+        // Now we exit after K and never execute the 9th A.
+        static const char *const CORE0_SCHED_STRING =
+            "..AAA..CCC..EEE..GGG..III..KKKAAACCCEEEGGGIIIKKKAA....CCC.EEE.GGG.III.KKK.";
+        static const char *const CORE1_SCHED_STRING =
+            "..BBB..DDD..FFF..HHH..JJJ..LLLBBBDDDFFFHHHJJJLLLBBB.DDD.FFF.HHH.JJJ.LLL.__";
+        assert(sched_as_string[0] == CORE0_SCHED_STRING);
+        assert(sched_as_string[1] == CORE1_SCHED_STRING);
+    }
+}
+
 int
 test_main(int argc, const char *argv[])
 {
@@ -6042,6 +6153,7 @@ test_main(int argc, const char *argv[])
     test_random_schedule();
     test_record_scheduler();
     test_rebalancing();
+    test_exit_early();
 
     dr_standalone_exit();
     return 0;