Skip to content

Commit

Permalink
verbose-message for delete exhaust
Browse files Browse the repository at this point in the history
  • Loading branch information
ban-nobuhiro committed Aug 12, 2024
1 parent a562439 commit 1952a33
Showing 1 changed file with 32 additions and 16 deletions.
48 changes: 32 additions & 16 deletions bench/yakushima.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,14 @@ void parallel_build_tree() {
}
}

struct alignas(CACHE_LINE_SIZE) workarea {
std::size_t res = 0;
bool exhaust = false;
std::chrono::system_clock::time_point w_stop;
};

void get_worker(const size_t thid, char& ready, const bool& start,
const bool& quit, std::size_t& res) {
const bool& quit, workarea& work) {
// init work
Xoroshiro128Plus rnd;
FastZipf zipf(&rnd, FLAGS_get_skew, FLAGS_initial_record);
Expand Down Expand Up @@ -204,11 +210,11 @@ void get_worker(const size_t thid, char& ready, const bool& start,
performance_tools::get_watch().set_point(1);
#endif
leave(token);
res = local_res;
work.res = local_res;
}

void scan_worker(const size_t thid, char& ready, const bool& start,
const bool& quit, std::size_t& res) {
const bool& quit, workarea& work) {
// init work
Xoroshiro128Plus rnd;
FastZipf zipf(&rnd, FLAGS_get_skew, FLAGS_initial_record);
Expand Down Expand Up @@ -243,11 +249,11 @@ void scan_worker(const size_t thid, char& ready, const bool& start,
performance_tools::get_watch().set_point(1);
#endif
leave(token);
res = local_res;
work.res = local_res;
}

void remove_worker(const size_t thid, char& ready, const bool& start,
const bool& quit, std::size_t& res) {
bool& quit, workarea& work) {
// this function can be used in Linux environment only.
#ifdef YAKUSHIMA_LINUX
set_thread_affinity(static_cast<const int>(thid));
Expand Down Expand Up @@ -278,19 +284,22 @@ void remove_worker(const size_t thid, char& ready, const bool& start,
++local_res;
if (loadAcquireN(quit)) { break; }
if (i == right_edge - 1) {
LOG(FATAL) << "This experiments fails. Please set less duration or "
LOG(ERROR) << "This experiments fails. Please set less duration or "
"larger initial record.";
work.exhaust = true;
work.w_stop = std::chrono::system_clock::now();
storeReleaseN(quit, true);
}
}
#ifdef PERFORMANCE_TOOLS
performance_tools::get_watch().set_point(1, thid);
#endif

leave(token);
res = local_res;
work.res = local_res;
}
void put_worker(const size_t thid, char& ready, const bool& start,
const bool& quit, std::size_t& res) {
const bool& quit, workarea& work) {
// this function can be used in Linux environment only.
#ifdef YAKUSHIMA_LINUX
set_thread_affinity(static_cast<const int>(thid));
Expand Down Expand Up @@ -328,13 +337,13 @@ void put_worker(const size_t thid, char& ready, const bool& start,
#endif

leave(token);
res = local_res;
work.res = local_res;
}

static void invoke_leader() try {
alignas(CACHE_LINE_SIZE) bool start = false;
alignas(CACHE_LINE_SIZE) bool quit = false;
alignas(CACHE_LINE_SIZE) std::vector<std::size_t> res(FLAGS_thread);
alignas(CACHE_LINE_SIZE) std::vector<workarea> work(FLAGS_thread);

LOG(INFO) << "[start] init masstree database.";
init();
Expand All @@ -360,22 +369,23 @@ static void invoke_leader() try {
for (size_t i = 0; i < FLAGS_thread; ++i) {
if (FLAGS_instruction == "get") {
thv.emplace_back(get_worker, i, std::ref(readys[i]),
std::ref(start), std::ref(quit), std::ref(res[i]));
std::ref(start), std::ref(quit), std::ref(work[i]));
} else if (FLAGS_instruction == "put") {
thv.emplace_back(put_worker, i, std::ref(readys[i]),
std::ref(start), std::ref(quit), std::ref(res[i]));
std::ref(start), std::ref(quit), std::ref(work[i]));
} else if (FLAGS_instruction == "remove") {
thv.emplace_back(remove_worker, i, std::ref(readys[i]),
std::ref(start), std::ref(quit), std::ref(res[i]));
std::ref(start), std::ref(quit), std::ref(work[i]));
} else if (FLAGS_instruction == "scan") {
thv.emplace_back(scan_worker, i, std::ref(readys[i]),
std::ref(start), std::ref(quit), std::ref(res[i]));
std::ref(start), std::ref(quit), std::ref(work[i]));
} else {
LOG(FATAL) << "invalid instruction type.";
}
}

waitForReady(readys);
std::chrono::system_clock::time_point w_start{std::chrono::system_clock::now()};
storeReleaseN(start, true);
LOG(INFO) << "[start] measurement.";
for (size_t i = 0; i < FLAGS_duration; ++i) { sleepMs(1000); }
Expand All @@ -391,11 +401,17 @@ static void invoke_leader() try {
*/
std::uint64_t fin_res{0};
for (std::uint64_t i = 0; i < FLAGS_thread; ++i) {
if ((UINT64_MAX - fin_res) < res[i]) {
if ((UINT64_MAX - fin_res) < work[i].res) {
LOG(FATAL)
<< "experimental setting is bad, which leads to overflow.";
}
fin_res += res[i];
fin_res += work[i].res;
if (work[i].exhaust) {
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(work[i].w_stop - w_start).count();
LOG(FATAL) << "a worker has exhausted its records in " << (static_cast<double>(ms) / 1000.0) << "sec. "
<< "Please set -duration less than this, "
<< "or set -initial_record larger than " << (FLAGS_initial_record * FLAGS_duration * 1000 / ms);
}
}
std::cout << "throughput[ops/s]: " << fin_res / FLAGS_duration << std::endl;
displayRusageRUMaxrss();
Expand Down

0 comments on commit 1952a33

Please sign in to comment.