diff --git a/src/crimson/CMakeLists.txt b/src/crimson/CMakeLists.txt index 3294d634f6e46..5d42f60f24bb0 100644 --- a/src/crimson/CMakeLists.txt +++ b/src/crimson/CMakeLists.txt @@ -125,8 +125,7 @@ set(crimson_thread_srcs thread/Throttle.cc) add_library(crimson STATIC ${crimson_auth_srcs} - # TODO: fix crimson_mon_client with the new design - # ${crimson_mon_srcs} + ${crimson_mon_srcs} ${crimson_net_srcs} ${crimson_thread_srcs} ${CMAKE_SOURCE_DIR}/src/common/buffer_seastar.cc) diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc index 0ce65a5cfc79a..4b61270c271f2 100644 --- a/src/crimson/mon/MonClient.cc +++ b/src/crimson/mon/MonClient.cc @@ -512,13 +512,19 @@ seastar::future<> Client::reopen_session(int rank) #warning fixme auto peer = monmap.get_addrs(rank).legacy_addr(); logger().info("connecting to mon.{}", rank); - auto conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MON); - auto& mc = pending_conns.emplace_back(conn, &keyring); - return mc.authenticate( - monmap.get_epoch(), entity_name, - *auth_methods, want_keys).handle_exception([conn](auto ep) { - return conn->close().then([ep = std::move(ep)] { - std::rethrow_exception(ep); + return msgr.connect(peer, CEPH_ENTITY_TYPE_MON) + .then([this] (auto xconn) { + // sharded-messenger compatible mode assumes all connections running + // in one shard. + ceph_assert((*xconn)->shard_id() == seastar::engine().cpu_id()); + ceph::net::ConnectionRef conn = xconn->release(); + auto& mc = pending_conns.emplace_back(conn, &keyring); + return mc.authenticate( + monmap.get_epoch(), entity_name, + *auth_methods, want_keys).handle_exception([conn](auto ep) { + return conn->close().then([ep = std::move(ep)] { + std::rethrow_exception(ep); + }); }); }).then([peer, this] { if (!is_hunting()) { diff --git a/src/crimson/osd/heartbeat.cc b/src/crimson/osd/heartbeat.cc index c075ad6867dab..0d818b24f54d9 100644 --- a/src/crimson/osd/heartbeat.cc +++ b/src/crimson/osd/heartbeat.cc @@ -7,7 +7,7 @@ #include "crimson/common/config_proxy.h" #include "crimson/net/Connection.h" -#include "crimson/net/SocketMessenger.h" +#include "crimson/net/Messenger.h" #include "crimson/osd/osdmap_service.h" #include "crimson/mon/MonClient.h" @@ -31,10 +31,8 @@ Heartbeat::Heartbeat(int whoami, uint32_t nonce, const OSDMapService& service, ceph::mon::Client& monc) - : front_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami), - "hb_front", nonce}}, - back_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami), - "hb_back", nonce}}, + : whoami{whoami}, + nonce{nonce}, service{service}, monc{monc}, timer{[this] {send_heartbeats();}} @@ -48,17 +46,31 @@ seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs, for (auto& addr : boost::join(front_addrs.v, back_addrs.v)) { addr.set_port(0); } - front_msgr->try_bind(front_addrs, - local_conf()->ms_bind_port_min, - local_conf()->ms_bind_port_max); - back_msgr->try_bind(front_addrs, - local_conf()->ms_bind_port_min, - local_conf()->ms_bind_port_max); - return seastar::when_all_succeed(front_msgr->start(this), - back_msgr->start(this)).then([this] { - timer.arm_periodic( - std::chrono::seconds(local_conf()->osd_heartbeat_interval)); - }); + return seastar::when_all_succeed( + ceph::net::Messenger::create(entity_name_t::OSD(whoami), + "hb_front", + nonce, + seastar::engine().cpu_id()) + .then([this, front_addrs] (auto msgr) { + front_msgr = msgr; + return front_msgr->try_bind(front_addrs, + local_conf()->ms_bind_port_min, + local_conf()->ms_bind_port_max); + }).then([this] { return front_msgr->start(this); }), + ceph::net::Messenger::create(entity_name_t::OSD(whoami), + "hb_back", + nonce, + seastar::engine().cpu_id()) + .then([this, back_addrs] (auto msgr) { + back_msgr = msgr; + return back_msgr->try_bind(back_addrs, + local_conf()->ms_bind_port_min, + local_conf()->ms_bind_port_max); + }).then([this] { return back_msgr->start(this); })) + .then([this] { + timer.arm_periodic( + std::chrono::seconds(local_conf()->osd_heartbeat_interval)); + }); } seastar::future<> Heartbeat::stop() @@ -77,24 +89,29 @@ const entity_addrvec_t& Heartbeat::get_back_addrs() const return back_msgr->get_myaddrs(); } -void Heartbeat::add_peer(osd_id_t peer, epoch_t epoch) +seastar::future<> Heartbeat::add_peer(osd_id_t peer, epoch_t epoch) { auto found = peers.find(peer); if (found == peers.end()) { logger().info("add_peer({})", peer); - PeerInfo info; auto osdmap = service.get_map(); // TODO: msgr v2 - info.con_front = - front_msgr->connect(osdmap->get_hb_front_addrs(peer).legacy_addr(), - CEPH_ENTITY_TYPE_OSD); - info.con_back = - back_msgr->connect(osdmap->get_hb_back_addrs(peer).legacy_addr(), - CEPH_ENTITY_TYPE_OSD); - info.epoch = epoch; - peers.emplace(peer, std::move(info)); + return seastar::when_all_succeed( + front_msgr->connect(osdmap->get_hb_front_addrs(peer).legacy_addr(), + CEPH_ENTITY_TYPE_OSD), + back_msgr->connect(osdmap->get_hb_back_addrs(peer).legacy_addr(), + CEPH_ENTITY_TYPE_OSD)) + .then([this, peer, epoch] (auto xcon_front, auto xcon_back) { + PeerInfo info; + // sharded-messenger compatible mode + info.con_front = xcon_front->release(); + info.con_back = xcon_back->release(); + info.epoch = epoch; + peers.emplace(peer, std::move(info)); + }); } else { found->second.epoch = epoch; + return seastar::now(); } } diff --git a/src/crimson/osd/heartbeat.h b/src/crimson/osd/heartbeat.h index 916acba2c9a41..209914e78d643 100644 --- a/src/crimson/osd/heartbeat.h +++ b/src/crimson/osd/heartbeat.h @@ -31,7 +31,7 @@ class Heartbeat : public ceph::net::Dispatcher { entity_addrvec_t back); seastar::future<> stop(); - void add_peer(osd_id_t peer, epoch_t epoch); + seastar::future<> add_peer(osd_id_t peer, epoch_t epoch); seastar::future<> update_peers(int whoami); seastar::future<> remove_peer(osd_id_t peer); @@ -64,8 +64,10 @@ class Heartbeat : public ceph::net::Dispatcher { void add_reporter_peers(int whoami); private: - std::unique_ptr front_msgr; - std::unique_ptr back_msgr; + const int whoami; + const uint32_t nonce; + ceph::net::Messenger* front_msgr = nullptr; + ceph::net::Messenger* back_msgr = nullptr; const OSDMapService& service; ceph::mon::Client& monc; diff --git a/src/crimson/osd/osd.cc b/src/crimson/osd/osd.cc index 24f7de8bbd5d3..2b8d8a8d99550 100644 --- a/src/crimson/osd/osd.cc +++ b/src/crimson/osd/osd.cc @@ -7,7 +7,7 @@ #include "messages/MOSDBoot.h" #include "messages/MOSDMap.h" #include "crimson/net/Connection.h" -#include "crimson/net/SocketMessenger.h" +#include "crimson/net/Messenger.h" #include "crimson/os/cyan_collection.h" #include "crimson/os/cyan_object.h" #include "crimson/os/cyan_store.h" @@ -35,29 +35,8 @@ using ceph::os::CyanStore; OSD::OSD(int id, uint32_t nonce) : whoami{id}, - cluster_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami), - "cluster", nonce}}, - public_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami), - "client", nonce}}, - monc{*public_msgr}, - heartbeat{new Heartbeat{whoami, nonce, *this, monc}}, - heartbeat_timer{[this] { update_heartbeat_peers(); }} -{ - for (auto msgr : {cluster_msgr.get(), public_msgr.get()}) { - if (local_conf()->ms_crc_data) { - msgr->set_crc_data(); - } - if (local_conf()->ms_crc_header) { - msgr->set_crc_header(); - } - } - dispatchers.push_front(this); - dispatchers.push_front(&monc); - osdmaps[0] = seastar::make_lw_shared(); - beacon_timer.set_callback([this] { - send_beacon(); - }); -} + nonce{nonce} +{} OSD::~OSD() = default; @@ -131,9 +110,38 @@ namespace { seastar::future<> OSD::start() { logger().info("start"); - const auto data_path = local_conf().get_val("osd_data"); - store = std::make_unique(data_path); - return store->mount().then([this] { + + return seastar::when_all_succeed( + ceph::net::Messenger::create(entity_name_t::OSD(whoami), + "cluster", + nonce, + seastar::engine().cpu_id()) + .then([this] (auto msgr) { cluster_msgr = msgr; }), + ceph::net::Messenger::create(entity_name_t::OSD(whoami), + "client", + nonce, + seastar::engine().cpu_id()) + .then([this] (auto msgr) { public_msgr = msgr; })) + .then([this] { + monc.reset(new ceph::mon::Client{*public_msgr}); + heartbeat.reset(new Heartbeat{whoami, nonce, *this, *monc}); + + for (auto msgr : {cluster_msgr, public_msgr}) { + if (local_conf()->ms_crc_data) { + msgr->set_crc_data(); + } + if (local_conf()->ms_crc_header) { + msgr->set_crc_header(); + } + } + dispatchers.push_front(this); + dispatchers.push_front(monc.get()); + osdmaps[0] = seastar::make_lw_shared(); + + const auto data_path = local_conf().get_val("osd_data"); + store = std::make_unique(data_path); + return store->mount(); + }).then([this] { meta_coll = make_unique(store->open_collection(coll_t::meta()), store.get()); return meta_coll->load_superblock(); @@ -144,25 +152,28 @@ seastar::future<> OSD::start() osdmap = std::move(map); return load_pgs(); }).then([this] { - cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER), - local_conf()->ms_bind_port_min, - local_conf()->ms_bind_port_max); - public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC), - local_conf()->ms_bind_port_min, - local_conf()->ms_bind_port_max); - return seastar::when_all_succeed(cluster_msgr->start(&dispatchers), - public_msgr->start(&dispatchers)); + return seastar::when_all_succeed( + cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER), + local_conf()->ms_bind_port_min, + local_conf()->ms_bind_port_max) + .then([this] { return cluster_msgr->start(&dispatchers); }), + public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC), + local_conf()->ms_bind_port_min, + local_conf()->ms_bind_port_max) + .then([this] { return public_msgr->start(&dispatchers); })); }).then([this] { - return monc.start(); + return monc->start(); }).then([this] { - monc.sub_want("osd_pg_creates", last_pg_create_epoch, 0); - monc.sub_want("mgrmap", 0, 0); - monc.sub_want("osdmap", 0, 0); - return monc.renew_subs(); + monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0); + monc->sub_want("mgrmap", 0, 0); + monc->sub_want("osdmap", 0, 0); + return monc->renew_subs(); }).then([this] { return heartbeat->start(public_msgr->get_myaddrs(), cluster_msgr->get_myaddrs()); }).then([this] { + beacon_timer.set_callback([this] { send_beacon(); }); + heartbeat_timer.set_callback([this] { update_heartbeat_peers(); }); return start_boot(); }); } @@ -170,7 +181,7 @@ seastar::future<> OSD::start() seastar::future<> OSD::start_boot() { state.set_preboot(); - return monc.get_version("osdmap").then([this](version_t newest, version_t oldest) { + return monc->get_version("osdmap").then([this](version_t newest, version_t oldest) { return _preboot(newest, oldest); }); } @@ -222,7 +233,7 @@ seastar::future<> OSD::_send_boot() heartbeat->get_front_addrs(), cluster_msgr->get_myaddrs(), CEPH_FEATURES_ALL); - return monc.send_message(m); + return monc->send_message(m); } seastar::future<> OSD::stop() @@ -232,9 +243,11 @@ seastar::future<> OSD::stop() return gate.close().then([this] { return heartbeat->stop(); }).then([this] { - return monc.stop(); + return monc->stop(); }).then([this] { return public_msgr->shutdown(); + }).then([this] { + return cluster_msgr->shutdown(); }); } @@ -402,9 +415,9 @@ seastar::future<> OSD::store_maps(ceph::os::Transaction& t, seastar::future<> OSD::osdmap_subscribe(version_t epoch, bool force_request) { logger().info("{}({})", __func__, epoch); - if (monc.sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) || + if (monc->sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) || force_request) { - return monc.renew_subs(); + return monc->renew_subs(); } else { return seastar::now(); } @@ -455,7 +468,7 @@ seastar::future<> OSD::handle_osd_map(ceph::net::ConnectionRef conn, [=](auto& t) { return store_maps(t, start, m).then([=, &t] { // even if this map isn't from a mon, we may have satisfied our subscription - monc.sub_got("osdmap", last); + monc->sub_got("osdmap", last); if (!superblock.oldest_map || skip_maps) { superblock.oldest_map = first; } @@ -583,7 +596,7 @@ seastar::future<> OSD::send_beacon() epoch_t min_last_epoch_clean = osdmap->get_epoch(); auto m = make_message(osdmap->get_epoch(), min_last_epoch_clean); - return monc.send_message(m); + return monc->send_message(m); } void OSD::update_heartbeat_peers() diff --git a/src/crimson/osd/osd.h b/src/crimson/osd/osd.h index fc66666fba005..249cf9b351e04 100644 --- a/src/crimson/osd/osd.h +++ b/src/crimson/osd/osd.h @@ -40,12 +40,13 @@ class OSD : public ceph::net::Dispatcher, seastar::gate gate; seastar::timer beacon_timer; const int whoami; + const uint32_t nonce; // talk with osd - std::unique_ptr cluster_msgr; + ceph::net::Messenger* cluster_msgr = nullptr; // talk with client/mon/mgr - std::unique_ptr public_msgr; + ceph::net::Messenger* public_msgr = nullptr; ChainedDispatchers dispatchers; - ceph::mon::Client monc; + std::unique_ptr monc; std::unique_ptr heartbeat; seastar::timer heartbeat_timer; diff --git a/src/test/crimson/CMakeLists.txt b/src/test/crimson/CMakeLists.txt index d06fd341bd144..1e12b3e94da95 100644 --- a/src/test/crimson/CMakeLists.txt +++ b/src/test/crimson/CMakeLists.txt @@ -12,10 +12,9 @@ add_executable(unittest_seastar_messenger test_messenger.cc) add_ceph_unittest(unittest_seastar_messenger) target_link_libraries(unittest_seastar_messenger ceph-common crimson) -# TODO: fix unittest_seastar_echo with the new design -#add_executable(unittest_seastar_echo -# test_alien_echo.cc) -#target_link_libraries(unittest_seastar_echo ceph-common global crimson) +add_executable(unittest_seastar_echo + test_alien_echo.cc) +target_link_libraries(unittest_seastar_echo ceph-common global crimson) add_executable(unittest_seastar_thread_pool test_thread_pool.cc) @@ -26,10 +25,9 @@ add_executable(unittest_seastar_config test_config.cc) target_link_libraries(unittest_seastar_config crimson) -# TODO: fix unittest_seastar_monc with the new design -#add_executable(unittest_seastar_monc -# test_monc.cc) -#target_link_libraries(unittest_seastar_monc crimson) +add_executable(unittest_seastar_monc + test_monc.cc) +target_link_libraries(unittest_seastar_monc crimson) add_executable(unittest_seastar_perfcounters test_perfcounters.cc) diff --git a/src/test/crimson/test_alien_echo.cc b/src/test/crimson/test_alien_echo.cc index d58b2c94e6d80..74b6a9af70611 100644 --- a/src/test/crimson/test_alien_echo.cc +++ b/src/test/crimson/test_alien_echo.cc @@ -7,7 +7,7 @@ #include "msg/Messenger.h" #include "crimson/net/Connection.h" #include "crimson/net/Dispatcher.h" -#include "crimson/net/SocketMessenger.h" +#include "crimson/net/Messenger.h" #include "crimson/net/Config.h" #include "crimson/thread/Condition.h" #include "crimson/thread/Throttle.h" @@ -39,8 +39,7 @@ struct DummyAuthAuthorizer : public AuthAuthorizer { struct Server { ceph::thread::Throttle byte_throttler; - static constexpr int64_t server_num = 0; - ceph::net::SocketMessenger msgr{entity_name_t::OSD(server_num), "server", 0}; + ceph::net::Messenger& msgr; struct ServerDispatcher : ceph::net::Dispatcher { unsigned count = 0; seastar::condition_variable on_reply; @@ -66,8 +65,9 @@ struct Server { new DummyAuthAuthorizer{}); } } dispatcher; - Server() - : byte_throttler(ceph::net::conf.osd_client_message_size_cap) + Server(ceph::net::Messenger& msgr) + : byte_throttler(ceph::net::conf.osd_client_message_size_cap), + msgr{msgr} { msgr.set_crc_header(); msgr.set_crc_data(); @@ -76,8 +76,7 @@ struct Server { struct Client { ceph::thread::Throttle byte_throttler; - static constexpr int64_t client_num = 1; - ceph::net::SocketMessenger msgr{entity_name_t::OSD(client_num), "client", 0}; + ceph::net::Messenger& msgr; struct ClientDispatcher : ceph::net::Dispatcher { unsigned count = 0; seastar::condition_variable on_reply; @@ -89,8 +88,9 @@ struct Client { return seastar::now(); } } dispatcher; - Client() - : byte_throttler(ceph::net::conf.osd_client_message_size_cap) + Client(ceph::net::Messenger& msgr) + : byte_throttler(ceph::net::conf.osd_client_message_size_cap), + msgr{msgr} { msgr.set_crc_header(); msgr.set_crc_data(); @@ -276,41 +276,50 @@ seastar_echo(SeastarContext& sc, { std::cout << "seastar/"; if (role == echo_role::as_server) { - return seastar::do_with(seastar_pingpong::Server{}, - [&addr, count](auto& server) mutable { - std::cout << "server listening at " << addr << std::endl; - // bind the server - server.msgr.set_policy_throttler(entity_name_t::TYPE_OSD, - &server.byte_throttler); - server.msgr.bind(entity_addrvec_t{addr}); - return server.msgr.start(&server.dispatcher) - .then([&dispatcher=server.dispatcher, count] { - return dispatcher.on_reply.wait([&dispatcher, count] { - return dispatcher.count >= count; - }); - }).finally([&server] { - std::cout << "server shutting down" << std::endl; - return server.msgr.shutdown(); + return ceph::net::Messenger::create(entity_name_t::OSD(0), "server", 0, + seastar::engine().cpu_id()) + .then([&addr, count] (auto msgr) { + return seastar::do_with(seastar_pingpong::Server{*msgr}, + [&addr, count](auto& server) mutable { + std::cout << "server listening at " << addr << std::endl; + // bind the server + server.msgr.set_policy_throttler(entity_name_t::TYPE_OSD, + &server.byte_throttler); + return server.msgr.bind(entity_addrvec_t{addr}) + .then([&server] { + return server.msgr.start(&server.dispatcher); + }).then([&dispatcher=server.dispatcher, count] { + return dispatcher.on_reply.wait([&dispatcher, count] { + return dispatcher.count >= count; + }); + }).finally([&server] { + std::cout << "server shutting down" << std::endl; + return server.msgr.shutdown(); + }); }); }); } else { - return seastar::do_with(seastar_pingpong::Client{}, - [&addr, count](auto& client) { - std::cout << "client sending to " << addr << std::endl; - client.msgr.set_policy_throttler(entity_name_t::TYPE_OSD, - &client.byte_throttler); - return client.msgr.start(&client.dispatcher) - .then([&] { - return client.msgr.connect(addr, entity_name_t::TYPE_OSD); - }).then([&disp=client.dispatcher, count](ceph::net::ConnectionRef conn) { - return seastar::do_until( - [&disp,count] { return disp.count >= count; }, - [&disp,conn] { return conn->send(MessageRef{new MPing(), false}) - .then([&] { return disp.on_reply.wait(); }); - }); - }).finally([&client] { - std::cout << "client shutting down" << std::endl; - return client.msgr.shutdown(); + return ceph::net::Messenger::create(entity_name_t::OSD(1), "client", 1, + seastar::engine().cpu_id()) + .then([&addr, count] (auto msgr) { + return seastar::do_with(seastar_pingpong::Client{*msgr}, + [&addr, count](auto& client) { + std::cout << "client sending to " << addr << std::endl; + client.msgr.set_policy_throttler(entity_name_t::TYPE_OSD, + &client.byte_throttler); + return client.msgr.start(&client.dispatcher) + .then([&] { + return client.msgr.connect(addr, entity_name_t::TYPE_OSD); + }).then([&disp=client.dispatcher, count](ceph::net::ConnectionXRef conn) { + return seastar::do_until( + [&disp,count] { return disp.count >= count; }, + [&disp,conn] { return (*conn)->send(MessageRef{new MPing(), false}) + .then([&] { return disp.on_reply.wait(); }); + }); + }).finally([&client] { + std::cout << "client shutting down" << std::endl; + return client.msgr.shutdown(); + }); }); }); } diff --git a/src/test/crimson/test_monc.cc b/src/test/crimson/test_monc.cc index c6c5f548a0222..671aa644ff717 100644 --- a/src/test/crimson/test_monc.cc +++ b/src/test/crimson/test_monc.cc @@ -3,7 +3,7 @@ #include "crimson/common/config_proxy.h" #include "crimson/mon/MonClient.h" #include "crimson/net/Connection.h" -#include "crimson/net/SocketMessenger.h" +#include "crimson/net/Messenger.h" using Config = ceph::common::ConfigProxy; using MonClient = ceph::mon::Client; @@ -25,26 +25,27 @@ static seastar::future<> test_monc() }).then([] { return ceph::common::sharded_perf_coll().start(); }).then([] { - return seastar::do_with(ceph::net::SocketMessenger{entity_name_t::OSD(0), "monc", 0}, - [](ceph::net::Messenger& msgr) { + return ceph::net::Messenger::create(entity_name_t::OSD(0), "monc", 0, + seastar::engine().cpu_id()) + .then([] (ceph::net::Messenger *msgr) { auto& conf = ceph::common::local_conf(); if (conf->ms_crc_data) { - msgr.set_crc_data(); + msgr->set_crc_data(); } if (conf->ms_crc_header) { - msgr.set_crc_header(); + msgr->set_crc_header(); } - return seastar::do_with(MonClient{msgr}, - [&msgr](auto& monc) { - return msgr.start(&monc).then([&monc] { + return seastar::do_with(MonClient{*msgr}, + [msgr](auto& monc) { + return msgr->start(&monc).then([&monc] { return seastar::with_timeout( seastar::lowres_clock::now() + std::chrono::seconds{10}, monc.start()); }).then([&monc] { return monc.stop(); }); - }).finally([&msgr] { - return msgr.shutdown(); + }).finally([msgr] { + return msgr->shutdown(); }); }); }).finally([] {