diff --git a/bench/yakushima.cpp b/bench/yakushima.cpp index d8c0af5..0b73951 100644 --- a/bench/yakushima.cpp +++ b/bench/yakushima.cpp @@ -170,8 +170,14 @@ void parallel_build_tree() { } } +struct alignas(CACHE_LINE_SIZE) workarea { + std::size_t res = 0; + bool exhaust = false; + std::chrono::system_clock::time_point w_stop; +}; + void get_worker(const size_t thid, char& ready, const bool& start, - const bool& quit, std::size_t& res) { + const bool& quit, workarea& work) { // init work Xoroshiro128Plus rnd; FastZipf zipf(&rnd, FLAGS_get_skew, FLAGS_initial_record); @@ -204,11 +210,11 @@ void get_worker(const size_t thid, char& ready, const bool& start, performance_tools::get_watch().set_point(1); #endif leave(token); - res = local_res; + work.res = local_res; } void scan_worker(const size_t thid, char& ready, const bool& start, - const bool& quit, std::size_t& res) { + const bool& quit, workarea& work) { // init work Xoroshiro128Plus rnd; FastZipf zipf(&rnd, FLAGS_get_skew, FLAGS_initial_record); @@ -243,11 +249,11 @@ void scan_worker(const size_t thid, char& ready, const bool& start, performance_tools::get_watch().set_point(1); #endif leave(token); - res = local_res; + work.res = local_res; } void remove_worker(const size_t thid, char& ready, const bool& start, - const bool& quit, std::size_t& res) { + bool& quit, workarea& work) { // this function can be used in Linux environment only. #ifdef YAKUSHIMA_LINUX set_thread_affinity(static_cast(thid)); @@ -278,8 +284,11 @@ void remove_worker(const size_t thid, char& ready, const bool& start, ++local_res; if (loadAcquireN(quit)) { break; } if (i == right_edge - 1) { - LOG(FATAL) << "This experiments fails. Please set less duration or " + LOG(ERROR) << "This experiments fails. Please set less duration or " "larger initial record."; + work.exhaust = true; + work.w_stop = std::chrono::system_clock::now(); + storeReleaseN(quit, true); } } #ifdef PERFORMANCE_TOOLS @@ -287,10 +296,10 @@ void remove_worker(const size_t thid, char& ready, const bool& start, #endif leave(token); - res = local_res; + work.res = local_res; } void put_worker(const size_t thid, char& ready, const bool& start, - const bool& quit, std::size_t& res) { + const bool& quit, workarea& work) { // this function can be used in Linux environment only. #ifdef YAKUSHIMA_LINUX set_thread_affinity(static_cast(thid)); @@ -328,13 +337,13 @@ void put_worker(const size_t thid, char& ready, const bool& start, #endif leave(token); - res = local_res; + work.res = local_res; } static void invoke_leader() try { alignas(CACHE_LINE_SIZE) bool start = false; alignas(CACHE_LINE_SIZE) bool quit = false; - alignas(CACHE_LINE_SIZE) std::vector res(FLAGS_thread); + alignas(CACHE_LINE_SIZE) std::vector work(FLAGS_thread); LOG(INFO) << "[start] init masstree database."; init(); @@ -360,22 +369,23 @@ static void invoke_leader() try { for (size_t i = 0; i < FLAGS_thread; ++i) { if (FLAGS_instruction == "get") { thv.emplace_back(get_worker, i, std::ref(readys[i]), - std::ref(start), std::ref(quit), std::ref(res[i])); + std::ref(start), std::ref(quit), std::ref(work[i])); } else if (FLAGS_instruction == "put") { thv.emplace_back(put_worker, i, std::ref(readys[i]), - std::ref(start), std::ref(quit), std::ref(res[i])); + std::ref(start), std::ref(quit), std::ref(work[i])); } else if (FLAGS_instruction == "remove") { thv.emplace_back(remove_worker, i, std::ref(readys[i]), - std::ref(start), std::ref(quit), std::ref(res[i])); + std::ref(start), std::ref(quit), std::ref(work[i])); } else if (FLAGS_instruction == "scan") { thv.emplace_back(scan_worker, i, std::ref(readys[i]), - std::ref(start), std::ref(quit), std::ref(res[i])); + std::ref(start), std::ref(quit), std::ref(work[i])); } else { LOG(FATAL) << "invalid instruction type."; } } waitForReady(readys); + std::chrono::system_clock::time_point w_start{std::chrono::system_clock::now()}; storeReleaseN(start, true); LOG(INFO) << "[start] measurement."; for (size_t i = 0; i < FLAGS_duration; ++i) { sleepMs(1000); } @@ -391,11 +401,17 @@ static void invoke_leader() try { */ std::uint64_t fin_res{0}; for (std::uint64_t i = 0; i < FLAGS_thread; ++i) { - if ((UINT64_MAX - fin_res) < res[i]) { + if ((UINT64_MAX - fin_res) < work[i].res) { LOG(FATAL) << "experimental setting is bad, which leads to overflow."; } - fin_res += res[i]; + fin_res += work[i].res; + if (work[i].exhaust) { + auto ms = std::chrono::duration_cast(work[i].w_stop - w_start).count(); + LOG(FATAL) << "a worker has exhausted its records in " << (static_cast(ms) / 1000.0) << "sec. " + << "Please set -duration less than this, " + << "or set -initial_record larger than " << (FLAGS_initial_record * FLAGS_duration * 1000 / ms); + } } std::cout << "throughput[ops/s]: " << fin_res / FLAGS_duration << std::endl; displayRusageRUMaxrss();