From 71ba5896595c937a5b8bb7aecd7f166b81105dac Mon Sep 17 00:00:00 2001 From: Yingxin Cheng Date: Wed, 23 Jan 2019 18:40:13 +0800 Subject: [PATCH] crimson/net: port sharded-msgr to existing code Port sharded-msgr to crimson osd, monc and tests with compatible mode. Signed-off-by: Yingxin Cheng --- src/crimson/CMakeLists.txt | 3 +- src/crimson/mon/MonClient.cc | 20 ++++--- src/crimson/net/Messenger.h | 11 ++++ src/crimson/net/SocketMessenger.h | 12 ++-- src/crimson/osd/main.cc | 14 ++++- src/crimson/osd/osd.cc | 16 ++--- src/crimson/osd/osd.h | 6 +- src/test/crimson/CMakeLists.txt | 14 ++--- src/test/crimson/test_alien_echo.cc | 91 ++++++++++++++++------------- src/test/crimson/test_monc.cc | 5 +- 10 files changed, 114 insertions(+), 78 deletions(-) diff --git a/src/crimson/CMakeLists.txt b/src/crimson/CMakeLists.txt index 46eae339ba7333..12622f31860464 100644 --- a/src/crimson/CMakeLists.txt +++ b/src/crimson/CMakeLists.txt @@ -124,8 +124,7 @@ set(crimson_thread_srcs thread/Throttle.cc) add_library(crimson STATIC ${crimson_auth_srcs} - # TODO: fix crimson_mon_client with the new design - # ${crimson_mon_srcs} + ${crimson_mon_srcs} ${crimson_net_srcs} ${crimson_thread_srcs} ${CMAKE_SOURCE_DIR}/src/common/buffer_seastar.cc) diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc index ea38b70b229c78..a83d93f30cce22 100644 --- a/src/crimson/mon/MonClient.cc +++ b/src/crimson/mon/MonClient.cc @@ -508,13 +508,19 @@ seastar::future<> Client::reopen_session(int rank) #warning fixme auto peer = monmap.get_addrs(rank).legacy_addr(); logger().info("connecting to mon.{}", rank); - auto conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MON); - auto& mc = pending_conns.emplace_back(conn, &keyring); - return mc.authenticate( - monmap.get_epoch(), entity_name, - *auth_methods, want_keys).handle_exception([conn](auto ep) { - return (*conn)->close().then([ep = std::move(ep)] { - std::rethrow_exception(ep); + return msgr.connect(peer, CEPH_ENTITY_TYPE_MON) + .then([this] (auto xconn) { + // sharded-messenger compatible mode assumes all connections running + // on in the single shard. + ceph_assert((*xconn)->shard_id() == seastar::engine().cpu_id()); + ceph::net::ConnectionRef conn = xconn->release(); + auto& mc = pending_conns.emplace_back(conn, &keyring); + return mc.authenticate( + monmap.get_epoch(), entity_name, + *auth_methods, want_keys).handle_exception([conn](auto ep) { + return conn->close().then([ep = std::move(ep)] { + std::rethrow_exception(ep); + }); }); }).then([peer, this] { if (!is_hunting()) { diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h index 6bbe0199cb0c36..8ac3d036cbfd99 100644 --- a/src/crimson/net/Messenger.h +++ b/src/crimson/net/Messenger.h @@ -17,11 +17,16 @@ #include #include "Fwd.h" +#include "msg/Policy.h" +#include "crimson/thread/Throttle.h" class AuthAuthorizer; namespace ceph::net { +using SocketPolicy = ceph::net::Policy; +using Throttle = ceph::thread::Throttle; + class Messenger { entity_name_t my_name; entity_addrvec_t my_addrs; @@ -82,6 +87,12 @@ class Messenger { virtual void print(ostream& out) const = 0; + virtual void set_default_policy(const SocketPolicy& p) = 0; + + virtual void set_policy(entity_type_t peer_type, const SocketPolicy& p) = 0; + + virtual void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) = 0; + static seastar::future> create(const entity_name_t& name, const std::string& lname, diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index 4df8c9031dc1f5..12b46455b61e58 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -21,15 +21,12 @@ #include #include -#include "msg/Policy.h" #include "Messenger.h" #include "SocketConnection.h" #include "crimson/thread/Throttle.h" namespace ceph::net { -using SocketPolicy = ceph::net::Policy; - class SocketMessenger final : public Messenger, public seastar::peering_sharded_service { const int master_sid; const seastar::shard_id sid; @@ -89,11 +86,14 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_ << ") " << get_myaddr(); } + void set_default_policy(const SocketPolicy& p) override; + + void set_policy(entity_type_t peer_type, const SocketPolicy& p) override; + + void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override; + public: seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me); - void set_default_policy(const SocketPolicy& p); - void set_policy(entity_type_t peer_type, const SocketPolicy& p); - void set_policy_throttler(entity_type_t peer_type, Throttle* throttle); SocketConnectionRef lookup_conn(const entity_addr_t& addr); void accept_conn(SocketConnectionRef); diff --git a/src/crimson/osd/main.cc b/src/crimson/osd/main.cc index 1d7e9297a3f527..a543205b06e60d 100644 --- a/src/crimson/osd/main.cc +++ b/src/crimson/osd/main.cc @@ -11,6 +11,7 @@ #include "common/ceph_argparse.h" #include "crimson/common/config_proxy.h" +#include "crimson/net/Messenger.h" #include "osd.h" @@ -60,8 +61,17 @@ int main(int argc, char* argv[]) }).then([&conf_file_list] { return local_conf().parse_config_files(conf_file_list); }).then([&] { - return osd.start_single(std::stoi(local_conf()->name.get_id()), - static_cast(getpid())); + int id = std::stoi(local_conf()->name.get_id()); + auto nonce = static_cast(getpid()); + return ceph::net::Messenger::create(entity_name_t::OSD(id), + "cluster", nonce, 0) + .then([id, nonce, &osd](auto cluster_msgr) { + return ceph::net::Messenger::create(entity_name_t::OSD(id), + "client", nonce, 0) + .then([cluster_msgr, id, &osd](auto client_msgr) { + return osd.start_single(id, &cluster_msgr.get(), &client_msgr.get()); + }); + }); }).then([&] { return osd.invoke_on(0, &OSD::start); }); diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 72ac133c0d1bd3..cb0c1af00afcf7 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -4,7 +4,7 @@ #include "messages/MOSDBoot.h" #include "messages/MOSDMap.h" #include "crimson/net/Connection.h" -#include "crimson/net/SocketMessenger.h" +#include "crimson/net/Messenger.h" namespace { seastar::logger& logger() { @@ -20,15 +20,15 @@ namespace { using ceph::common::local_conf; -OSD::OSD(int id, uint32_t nonce) +OSD::OSD(int id, + ceph::net::Messenger *cluster_msgr, + ceph::net::Messenger *client_msgr) : whoami{id}, - cluster_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami), - "cluster", nonce}}, - client_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami), - "client", nonce}}, + cluster_msgr{cluster_msgr}, + client_msgr{client_msgr}, monc{*client_msgr} { - for (auto msgr : {cluster_msgr.get(), client_msgr.get()}) { + for (auto msgr : {cluster_msgr, client_msgr}) { if (local_conf()->ms_crc_data) { msgr->set_crc_data(); } @@ -143,6 +143,8 @@ seastar::future<> OSD::stop() return monc.stop(); }).then([this] { return client_msgr->shutdown(); + }).then([this] { + return cluster_msgr->shutdown(); }); } diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index 9a1ada416ad47d..c02e30bc5f62de 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -26,9 +26,9 @@ class OSD : public ceph::net::Dispatcher { seastar::timer beacon_timer; const int whoami; // talk with osd - std::unique_ptr cluster_msgr; + ceph::net::Messenger* cluster_msgr; // talk with mon/mgr - std::unique_ptr client_msgr; + ceph::net::Messenger* client_msgr; ChainedDispatchers dispatchers; ceph::mon::Client monc; @@ -58,7 +58,7 @@ class OSD : public ceph::net::Dispatcher { seastar::future<> ms_handle_remote_reset(ceph::net::ConnectionRef conn) override; public: - OSD(int id, uint32_t nonce); + OSD(int id, ceph::net::Messenger *cluster_msgr, ceph::net::Messenger *client_msgr); ~OSD(); static seastar::future<> mkfs(uuid_d fsid, int whoami); diff --git a/src/test/crimson/CMakeLists.txt b/src/test/crimson/CMakeLists.txt index d06fd341bd1446..1e12b3e94da956 100644 --- a/src/test/crimson/CMakeLists.txt +++ b/src/test/crimson/CMakeLists.txt @@ -12,10 +12,9 @@ add_executable(unittest_seastar_messenger test_messenger.cc) add_ceph_unittest(unittest_seastar_messenger) target_link_libraries(unittest_seastar_messenger ceph-common crimson) -# TODO: fix unittest_seastar_echo with the new design -#add_executable(unittest_seastar_echo -# test_alien_echo.cc) -#target_link_libraries(unittest_seastar_echo ceph-common global crimson) +add_executable(unittest_seastar_echo + test_alien_echo.cc) +target_link_libraries(unittest_seastar_echo ceph-common global crimson) add_executable(unittest_seastar_thread_pool test_thread_pool.cc) @@ -26,10 +25,9 @@ add_executable(unittest_seastar_config test_config.cc) target_link_libraries(unittest_seastar_config crimson) -# TODO: fix unittest_seastar_monc with the new design -#add_executable(unittest_seastar_monc -# test_monc.cc) -#target_link_libraries(unittest_seastar_monc crimson) +add_executable(unittest_seastar_monc + test_monc.cc) +target_link_libraries(unittest_seastar_monc crimson) add_executable(unittest_seastar_perfcounters test_perfcounters.cc) diff --git a/src/test/crimson/test_alien_echo.cc b/src/test/crimson/test_alien_echo.cc index 5f93f45cf58e74..d3e46cfda47f7f 100644 --- a/src/test/crimson/test_alien_echo.cc +++ b/src/test/crimson/test_alien_echo.cc @@ -7,7 +7,7 @@ #include "msg/Messenger.h" #include "crimson/net/Connection.h" #include "crimson/net/Dispatcher.h" -#include "crimson/net/SocketMessenger.h" +#include "crimson/net/Messenger.h" #include "crimson/net/Config.h" #include "crimson/thread/Condition.h" #include "crimson/thread/Throttle.h" @@ -38,8 +38,7 @@ struct DummyAuthAuthorizer : public AuthAuthorizer { struct Server { ceph::thread::Throttle byte_throttler; - static constexpr int64_t server_num = 0; - ceph::net::SocketMessenger msgr{entity_name_t::OSD(server_num), "server", 0}; + ceph::net::Messenger& msgr; struct ServerDispatcher : ceph::net::Dispatcher { unsigned count = 0; seastar::condition_variable on_reply; @@ -65,8 +64,9 @@ struct Server { new DummyAuthAuthorizer{}); } } dispatcher; - Server() - : byte_throttler(ceph::net::conf.osd_client_message_size_cap) + Server(ceph::net::Messenger& msgr) + : byte_throttler(ceph::net::conf.osd_client_message_size_cap), + msgr{msgr} { msgr.set_crc_header(); msgr.set_crc_data(); @@ -75,8 +75,7 @@ struct Server { struct Client { ceph::thread::Throttle byte_throttler; - static constexpr int64_t client_num = 1; - ceph::net::SocketMessenger msgr{entity_name_t::OSD(client_num), "client", 0}; + ceph::net::Messenger& msgr; struct ClientDispatcher : ceph::net::Dispatcher { unsigned count = 0; seastar::condition_variable on_reply; @@ -88,8 +87,9 @@ struct Client { return seastar::now(); } } dispatcher; - Client() - : byte_throttler(ceph::net::conf.osd_client_message_size_cap) + Client(ceph::net::Messenger& msgr) + : byte_throttler(ceph::net::conf.osd_client_message_size_cap), + msgr{msgr} { msgr.set_crc_header(); msgr.set_crc_data(); @@ -275,41 +275,50 @@ seastar_echo(SeastarContext& sc, { std::cout << "seastar/"; if (role == echo_role::as_server) { - return seastar::do_with(seastar_pingpong::Server{}, - [&addr, count](auto& server) mutable { - std::cout << "server listening at " << addr << std::endl; - // bind the server - server.msgr.set_policy_throttler(entity_name_t::TYPE_OSD, - &server.byte_throttler); - server.msgr.bind(entity_addrvec_t{addr}); - return server.msgr.start(&server.dispatcher) - .then([&dispatcher=server.dispatcher, count] { - return dispatcher.on_reply.wait([&dispatcher, count] { - return dispatcher.count >= count; - }); - }).finally([&server] { - std::cout << "server shutting down" << std::endl; - return server.msgr.shutdown(); + return ceph::net::Messenger::create(entity_name_t::OSD(0), "server", 0, + seastar::engine().cpu_id()) + .then([&addr, count] (auto msgr) { + return seastar::do_with(seastar_pingpong::Server{msgr}, + [&addr, count](auto& server) mutable { + std::cout << "server listening at " << addr << std::endl; + // bind the server + server.msgr.set_policy_throttler(entity_name_t::TYPE_OSD, + &server.byte_throttler); + return server.msgr.bind(entity_addrvec_t{addr}) + .then([&server] { + return server.msgr.start(&server.dispatcher); + }).then([&dispatcher=server.dispatcher, count] { + return dispatcher.on_reply.wait([&dispatcher, count] { + return dispatcher.count >= count; + }); + }).finally([&server] { + std::cout << "server shutting down" << std::endl; + return server.msgr.shutdown(); + }); }); }); } else { - return seastar::do_with(seastar_pingpong::Client{}, - [&addr, count](auto& client) { - std::cout << "client sending to " << addr << std::endl; - client.msgr.set_policy_throttler(entity_name_t::TYPE_OSD, - &client.byte_throttler); - return client.msgr.start(&client.dispatcher) - .then([&] { - return client.msgr.connect(addr, entity_name_t::TYPE_OSD); - }).then([&disp=client.dispatcher, count](ceph::net::ConnectionRef conn) { - return seastar::do_until( - [&disp,count] { return disp.count >= count; }, - [&disp,conn] { return conn->send(MessageRef{new MPing(), false}) - .then([&] { return disp.on_reply.wait(); }); - }); - }).finally([&client] { - std::cout << "client shutting down" << std::endl; - return client.msgr.shutdown(); + return ceph::net::Messenger::create(entity_name_t::OSD(1), "client", 1, + seastar::engine().cpu_id()) + .then([&addr, count] (auto msgr) { + return seastar::do_with(seastar_pingpong::Client{msgr}, + [&addr, count](auto& client) { + std::cout << "client sending to " << addr << std::endl; + client.msgr.set_policy_throttler(entity_name_t::TYPE_OSD, + &client.byte_throttler); + return client.msgr.start(&client.dispatcher) + .then([&] { + return client.msgr.connect(addr, entity_name_t::TYPE_OSD); + }).then([&disp=client.dispatcher, count](ceph::net::ConnectionXRef conn) { + return seastar::do_until( + [&disp,count] { return disp.count >= count; }, + [&disp,conn] { return (*conn)->send(MessageRef{new MPing(), false}) + .then([&] { return disp.on_reply.wait(); }); + }); + }).finally([&client] { + std::cout << "client shutting down" << std::endl; + return client.msgr.shutdown(); + }); }); }); } diff --git a/src/test/crimson/test_monc.cc b/src/test/crimson/test_monc.cc index c6c5f548a02220..b5a9b9817434a8 100644 --- a/src/test/crimson/test_monc.cc +++ b/src/test/crimson/test_monc.cc @@ -25,8 +25,9 @@ static seastar::future<> test_monc() }).then([] { return ceph::common::sharded_perf_coll().start(); }).then([] { - return seastar::do_with(ceph::net::SocketMessenger{entity_name_t::OSD(0), "monc", 0}, - [](ceph::net::Messenger& msgr) { + return ceph::net::Messenger::create(entity_name_t::OSD(0), "monc", 0, + seastar::engine().cpu_id()) + .then([] (ceph::net::Messenger& msgr) { auto& conf = ceph::common::local_conf(); if (conf->ms_crc_data) { msgr.set_crc_data();