diff --git a/src/test/crimson/perf_crimson_msgr.cc b/src/test/crimson/perf_crimson_msgr.cc index f197384505f478..fc6a202e83a8c0 100644 --- a/src/test/crimson/perf_crimson_msgr.cc +++ b/src/test/crimson/perf_crimson_msgr.cc @@ -32,28 +32,35 @@ seastar::logger& logger() { return ceph::get_logger(ceph_subsys_ms); } - enum class perf_mode_t { both, client, server }; -static std::random_device rd; -static std::default_random_engine rng{rd()}; - static seastar::future<> run(unsigned rounds, - double keepalive_ratio, - int bs, - int depth, + unsigned jobs, + unsigned bs, + unsigned depth, std::string addr, - perf_mode_t mode) + perf_mode_t mode, + unsigned core) { struct test_state { struct Server final : public ceph::net::Dispatcher, public seastar::peering_sharded_service { ceph::net::Messenger *msgr = nullptr; + const seastar::shard_id sid; + const seastar::shard_id msgr_sid; + std::string lname; + + Server(unsigned msgr_core) + : sid{seastar::engine().cpu_id()}, + msgr_sid{msgr_core} { + lname = "server#"; + lname += std::to_string(sid); + } Dispatcher* get_local_shard() override { return &(container().local()); @@ -66,29 +73,30 @@ static seastar::future<> run(unsigned rounds, ceph_assert(m->get_type() == CEPH_MSG_OSD_OP); // reply Ref req = boost::static_pointer_cast(m); - req->finish_decode(); return c->send(MessageRef{ new MOSDOpReply(req.get(), 0, 0, 0, false), false }); } - seastar::future<> init(const entity_name_t& name, - const std::string& lname, - const uint64_t nonce, - const entity_addr_t& addr) { - auto&& fut = ceph::net::Messenger::create(name, lname, nonce, 1); - return fut.then([this, addr](ceph::net::Messenger *messenger) { - return container().invoke_on_all([messenger](auto& server) { - server.msgr = messenger->get_local_shard(); - server.msgr->set_crc_header(); - }).then([messenger, addr] { - return messenger->bind(entity_addrvec_t{addr}); - }).then([this, messenger] { - return messenger->start(this); - }); - }); + seastar::future<> init(const entity_addr_t& addr) { + return container().invoke_on(msgr_sid, [addr] (auto& server) { + // server msgr is always with nonce 0 + auto&& fut = ceph::net::Messenger::create(entity_name_t::OSD(server.sid), server.lname, 0, server.sid); + return fut.then([&server, addr](ceph::net::Messenger *messenger) { + return server.container().invoke_on_all([messenger](auto& server) { + server.msgr = messenger->get_local_shard(); + }).then([messenger, addr] { + return messenger->bind(entity_addrvec_t{addr}); + }).then([&server, messenger] { + return messenger->start(&server); + }); + }); + }); } seastar::future<> shutdown() { - ceph_assert(msgr); - return msgr->shutdown(); + logger().info("\n{} shutdown...", lname); + return container().invoke_on(msgr_sid, [] (auto& server) { + ceph_assert(server.msgr); + return server.msgr->shutdown(); + }); } }; @@ -97,38 +105,42 @@ static seastar::future<> run(unsigned rounds, public seastar::peering_sharded_service { struct PingSession : public seastar::enable_shared_from_this { - unsigned count = 0u; + unsigned received_count = 0u; + mono_time connecting_time; mono_time connected_time; + mono_time start_time; mono_time finish_time; + seastar::promise<> done; }; using PingSessionRef = seastar::shared_ptr; - unsigned rounds; - std::bernoulli_distribution keepalive_dist; + const seastar::shard_id sid; + std::string lname; + + const unsigned jobs; + const unsigned rounds; ceph::net::Messenger *msgr = nullptr; - std::map> pending_conns; - std::map sessions; - int msg_len; + const unsigned msg_len; bufferlist msg_data; seastar::semaphore depth; - Client(unsigned rounds, double keepalive_ratio, int msg_len, int depth) - : rounds(rounds), - keepalive_dist(std::bernoulli_distribution{keepalive_ratio}), - depth(depth) { + unsigned sent_count = 0u; + ceph::net::ConnectionRef active_conn = nullptr; + PingSessionRef active_session = nullptr; + + Client(unsigned jobs, unsigned rounds, unsigned msg_len, unsigned depth) + : sid{seastar::engine().cpu_id()}, + jobs{jobs}, + rounds{rounds/jobs}, + msg_len{msg_len}, + depth{depth} { + lname = "client#"; + lname += std::to_string(sid); bufferptr ptr(msg_len); memset(ptr.c_str(), 0, msg_len); msg_data.append(ptr); } - PingSessionRef find_session(ceph::net::ConnectionRef c) { - auto found = sessions.find(c); - if (found == sessions.end()) { - ceph_assert(false); - } - return found->second; - } - Dispatcher* get_local_shard() override { return &(container().local()); } @@ -136,80 +148,95 @@ static seastar::future<> run(unsigned rounds, return seastar::now(); } seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override { - logger().info("{}: connected to {}", *conn, conn->get_peer_addr()); - auto session = seastar::make_shared(); - auto [i, added] = sessions.emplace(conn, session); - std::ignore = i; - ceph_assert(added); - session->connected_time = mono_clock::now(); + logger().info("{}: connected", *conn); + active_session = seastar::make_shared(); + active_session->connected_time = mono_clock::now(); return seastar::now(); } seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, MessageRef m) override { ceph_assert(m->get_type() == CEPH_MSG_OSD_OPREPLY); depth.signal(1); - auto session = find_session(c); - ++(session->count); + ceph_assert(active_session); + ++(active_session->received_count); - if (session->count == rounds) { - logger().info("{}: finished receiving {} OPREPLYs", *c.get(), session->count); - session->finish_time = mono_clock::now(); - return container().invoke_on_all([conn = c.get()](auto &client) { - auto found = client.pending_conns.find(conn); - ceph_assert(found != client.pending_conns.end()); - found->second.set_value(); - }); - } else { - return seastar::now(); + if (active_session->received_count == rounds) { + logger().info("{}: finished receiving {} OPREPLYs", *c, active_session->received_count); + active_session->finish_time = mono_clock::now(); + active_session->done.set_value(); } + return seastar::now(); } - seastar::future<> init(const entity_name_t& name, - const std::string& lname, - const uint64_t nonce) { - return ceph::net::Messenger::create(name, lname, nonce, 2) - .then([this](ceph::net::Messenger *messenger) { - return container().invoke_on_all([messenger](auto& client) { - client.msgr = messenger->get_local_shard(); - client.msgr->set_crc_header(); - }).then([this, messenger] { - return messenger->start(this); - }); - }); + // should start messenger at this shard? + bool is_active() { + ceph_assert(seastar::engine().cpu_id() == sid); + return sid != 0 && sid <= jobs; + } + + seastar::future<> init() { + return container().invoke_on_all([] (auto& client) { + if (client.is_active()) { + return ceph::net::Messenger::create(entity_name_t::OSD(client.sid), client.lname, client.sid, client.sid) + .then([&client] (ceph::net::Messenger *messenger) { + client.msgr = messenger; + return client.msgr->start(&client); + }); + } + return seastar::now(); + }); } seastar::future<> shutdown() { - ceph_assert(msgr); - return msgr->shutdown(); + return container().invoke_on_all([] (auto& client) { + if (client.is_active()) { + logger().info("\n{} shutdown...", client.lname); + ceph_assert(client.msgr); + return client.msgr->shutdown(); + } + return seastar::now(); + }); } - seastar::future<> dispatch_messages(const entity_addr_t& peer_addr, bool foreign_dispatch=true) { - mono_time start_time = mono_clock::now(); - return msgr->connect(peer_addr, entity_name_t::TYPE_OSD) - .then([this, foreign_dispatch, start_time](auto conn) { - return seastar::futurize_apply([this, conn, foreign_dispatch] { - if (foreign_dispatch) { - return do_dispatch_messages(&**conn); - } else { - // NOTE: this could be faster if we don't switch cores in do_dispatch_messages(). - return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) { - return client.do_dispatch_messages(conn); - }); - } - }).finally([this, conn, start_time] { - return container().invoke_on(conn->get()->shard_id(), [conn, start_time](auto &client) { - auto session = client.find_session((*conn)->shared_from_this()); - std::chrono::duration dur_handshake = session->connected_time - start_time; - std::chrono::duration dur_messaging = session->finish_time - session->connected_time; - logger().info("{}: handshake {}, messaging {}", - **conn, dur_handshake.count(), dur_messaging.count()); - }); - }); + seastar::future<> dispatch_messages(const entity_addr_t& peer_addr) { + return container().invoke_on_all([peer_addr] (auto& client) { + // start clients in active cores (#1 ~ #jobs) + if (client.is_active()) { + mono_time start_time = mono_clock::now(); + return client.msgr->connect(peer_addr, entity_name_t::TYPE_OSD) + .then([&client] (auto conn) { + client.active_conn = conn->release(); + // make sure handshake won't heart the performance + return seastar::sleep(1s); + }).then([&client, start_time] { + if (!client.active_session) { + logger().error("\n{} not connected after 1s!\n", client.lname); + ceph_assert(false); + } + client.active_session->connecting_time = start_time; + }); + } + return seastar::now(); + }).then([this] { + logger().info("\nstart sending {} MOSDOps from {} clients", + jobs * rounds, jobs); + mono_time start_time = mono_clock::now(); + return container().invoke_on_all([] (auto& client) { + if (client.is_active()) { + return client.do_dispatch_messages(client.active_conn.get()); + } + return seastar::now(); + }).then([this, start_time] { + std::chrono::duration dur_messaging = mono_clock::now() - start_time; + logger().info("\nSummary:\n clients: {}\n MOSDOps: {}\n total time: {}s\n", + jobs, jobs * rounds, dur_messaging.count()); }); + }); } private: seastar::future<> send_msg(ceph::net::Connection* conn) { + ceph_assert(seastar::engine().cpu_id() == sid); return depth.wait(1).then([this, conn] { const static pg_t pgid; const static object_locator_t oloc; @@ -225,91 +252,83 @@ static seastar::future<> run(unsigned rounds, } seastar::future<> do_dispatch_messages(ceph::net::Connection* conn) { - return container().invoke_on_all([conn](auto& client) { - auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>()); - std::ignore = i; - ceph_assert(added); - }).then([this, conn] { - return seastar::do_with(0u, 0u, - [this, conn](auto &count_ping, auto &count_keepalive) { - return seastar::do_until( - [this, conn, &count_ping, &count_keepalive] { - bool stop = (count_ping == rounds); - if (stop) { - logger().info("{}: finished sending {} OSDOPs with {} keepalives", - *conn, count_ping, count_keepalive); - } - return stop; - }, - [this, conn, &count_ping, &count_keepalive] { - return seastar::repeat([this, conn, &count_ping, &count_keepalive] { - if (keepalive_dist(rng)) { - return conn->keepalive() - .then([&count_keepalive] { - count_keepalive += 1; - return seastar::make_ready_future( - seastar::stop_iteration::no); - }); - } else { - return send_msg(conn) - .then([&count_ping] { - count_ping += 1; - return seastar::make_ready_future( - seastar::stop_iteration::yes); - }); - } - }); - }).then([this, conn] { - auto found = pending_conns.find(conn); - return found->second.get_future(); - }); - }); - }); + ceph_assert(seastar::engine().cpu_id() == sid); + ceph_assert(sent_count == 0); + active_session->start_time = mono_clock::now(); + return seastar::do_until( + [this, conn] { + bool stop = (sent_count == rounds); + if (stop) { + logger().info("{}: finished sending {} OSDOPs", + *conn, sent_count); + } + return stop; + }, + [this, conn] { + sent_count += 1; + return send_msg(conn); + } + ).then([this] { + return active_session->done.get_future(); + }).then([this] { + std::chrono::duration dur_conn = active_session->connected_time - active_session->connecting_time; + std::chrono::duration dur_msg = mono_clock::now() - active_session->start_time; + logger().info("\n{}:\n messages: {}\n connect time: {}s\n messaging time: {}s\n", + lname, active_session->received_count, dur_conn.count(), dur_msg.count()); + }); } }; }; return seastar::when_all_succeed( - ceph::net::create_sharded(), - ceph::net::create_sharded(rounds, keepalive_ratio, bs, depth)) - .then([rounds, keepalive_ratio, addr, mode](test_state::Server *server, - test_state::Client *client) { + ceph::net::create_sharded(core), + ceph::net::create_sharded(jobs, rounds, bs, depth)) + .then([=](test_state::Server *server, + test_state::Client *client) { entity_addr_t target_addr; target_addr.parse(addr.c_str(), nullptr); target_addr.set_type(entity_addr_t::TYPE_LEGACY); if (mode == perf_mode_t::both) { + logger().info("\nperf settings:\n mode=server+client\n server addr={}\n server core={}\n rounds={}\n client jobs={}\n bs={}\n depth={}\n", + addr, core, rounds, jobs, bs, depth); + ceph_assert(seastar::smp::count >= std::max(1+jobs, 1+core)); + ceph_assert(core == 0 || core > jobs); + ceph_assert(jobs > 0); return seastar::when_all_succeed( - server->init(entity_name_t::OSD(0), "server", 0, target_addr), - client->init(entity_name_t::OSD(1), "client", 0)) - // dispatch pingpoing + server->init(target_addr), + client->init()) + // dispatch ops .then([client, target_addr] { - return client->dispatch_messages(target_addr, false); + return client->dispatch_messages(target_addr); // shutdown }).finally([client] { - logger().info("client shutdown..."); return client->shutdown(); }).finally([server] { - logger().info("server shutdown..."); return server->shutdown(); }); } else if (mode == perf_mode_t::client) { - return client->init(entity_name_t::OSD(1), "client", 0) - // dispatch pingpoing + logger().info("\nperf settings:\n mode=client\n server addr={}\n rounds={}\n client jobs={}\n bs={}\n depth={}\n", + addr, rounds, jobs, bs, depth); + ceph_assert(seastar::smp::count >= 1+jobs); + ceph_assert(jobs > 0); + return client->init() + // dispatch ops .then([client, target_addr] { - return client->dispatch_messages(target_addr, false); + return client->dispatch_messages(target_addr); // shutdown }).finally([client] { - logger().info("client shutdown..."); return client->shutdown(); }); } else { // mode == perf_mode_t::server - return server->init(entity_name_t::OSD(0), "server", 0, target_addr) - // dispatch pingpoing + ceph_assert(seastar::smp::count >= 1+core); + logger().info("\nperf settings:\n mode=server\n server addr={}\n server core={}\n", + addr, core); + return server->init(target_addr) + // dispatch ops .then([server] { return server->msgr->wait(); // shutdown }).finally([server] { - logger().info("server shutdown..."); return server->shutdown(); }); } @@ -323,34 +342,35 @@ int main(int argc, char** argv) seastar::app_template app; app.add_options() ("addr", bpo::value()->default_value("0.0.0.0:9010"), - "start server") - ("mode", bpo::value()->default_value(0), + "server address") + ("core", bpo::value()->default_value(0), + "server running core") + ("mode", bpo::value()->default_value(0), "0: both, 1:client, 2:server") ("rounds", bpo::value()->default_value(65536), "number of messaging rounds") - ("keepalive-ratio", bpo::value()->default_value(0), - "ratio of keepalive in ping messages") - ("bs", bpo::value()->default_value(4096), + ("jobs", bpo::value()->default_value(1), + "number of jobs (client messengers)") + ("bs", bpo::value()->default_value(4096), "block size") - ("depth", bpo::value()->default_value(512), + ("depth", bpo::value()->default_value(512), "io depth"); return app.run(argc, argv, [&app] { auto&& config = app.configuration(); auto rounds = config["rounds"].as(); - auto keepalive_ratio = config["keepalive-ratio"].as(); - auto bs = config["bs"].as(); - auto depth = config["depth"].as(); + auto jobs = config["jobs"].as(); + auto bs = config["bs"].as(); + auto depth = config["depth"].as(); auto addr = config["addr"].as(); - auto mode = config["mode"].as(); - logger().info("\nsettings:\n addr={}\n mode={}\n rounds={}\n keepalive-ratio={}\n bs={}\n depth={}", - addr, mode, rounds, keepalive_ratio, bs, depth); - ceph_assert(mode >= 0 && mode <= 2); + auto core = config["core"].as(); + auto mode = config["mode"].as(); + ceph_assert(mode <= 2); auto _mode = static_cast(mode); - return run(rounds, keepalive_ratio, bs, depth, addr, _mode) + return run(rounds, jobs, bs, depth, addr, _mode, core) .then([] { - std::cout << "successful" << std::endl; + logger().info("\nsuccessful!\n"); }).handle_exception([] (auto eptr) { - std::cout << "failed" << std::endl; + logger().info("\nfailed!\n"); return seastar::make_exception_future<>(eptr); }); });