Skip to content

Commit

Permalink
crimson/net: port sharded-msgr to existing code
Browse files Browse the repository at this point in the history
Port sharded-msgr to crimson osd, monc, heartbeat and tests with
compatible mode.

Signed-off-by: Yingxin Cheng <[email protected]>
  • Loading branch information
cyx1231st committed Feb 12, 2019
1 parent f239774 commit 6760779
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 147 deletions.
3 changes: 1 addition & 2 deletions src/crimson/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ set(crimson_thread_srcs
thread/Throttle.cc)
add_library(crimson STATIC
${crimson_auth_srcs}
# TODO: fix crimson_mon_client with the new design
# ${crimson_mon_srcs}
${crimson_mon_srcs}
${crimson_net_srcs}
${crimson_thread_srcs}
${CMAKE_SOURCE_DIR}/src/common/buffer_seastar.cc)
Expand Down
20 changes: 13 additions & 7 deletions src/crimson/mon/MonClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -512,13 +512,19 @@ seastar::future<> Client::reopen_session(int rank)
#warning fixme
auto peer = monmap.get_addrs(rank).legacy_addr();
logger().info("connecting to mon.{}", rank);
auto conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MON);
auto& mc = pending_conns.emplace_back(conn, &keyring);
return mc.authenticate(
monmap.get_epoch(), entity_name,
*auth_methods, want_keys).handle_exception([conn](auto ep) {
return conn->close().then([ep = std::move(ep)] {
std::rethrow_exception(ep);
return msgr.connect(peer, CEPH_ENTITY_TYPE_MON)
.then([this] (auto xconn) {
// sharded-messenger compatible mode assumes all connections running
// in one shard.
ceph_assert((*xconn)->shard_id() == seastar::engine().cpu_id());
ceph::net::ConnectionRef conn = xconn->release();
auto& mc = pending_conns.emplace_back(conn, &keyring);
return mc.authenticate(
monmap.get_epoch(), entity_name,
*auth_methods, want_keys).handle_exception([conn](auto ep) {
return conn->close().then([ep = std::move(ep)] {
std::rethrow_exception(ep);
});
});
}).then([peer, this] {
if (!is_hunting()) {
Expand Down
69 changes: 43 additions & 26 deletions src/crimson/osd/heartbeat.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

#include "crimson/common/config_proxy.h"
#include "crimson/net/Connection.h"
#include "crimson/net/SocketMessenger.h"
#include "crimson/net/Messenger.h"
#include "crimson/osd/osdmap_service.h"
#include "crimson/mon/MonClient.h"

Expand All @@ -31,10 +31,8 @@ Heartbeat::Heartbeat(int whoami,
uint32_t nonce,
const OSDMapService& service,
ceph::mon::Client& monc)
: front_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami),
"hb_front", nonce}},
back_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami),
"hb_back", nonce}},
: whoami{whoami},
nonce{nonce},
service{service},
monc{monc},
timer{[this] {send_heartbeats();}}
Expand All @@ -48,17 +46,31 @@ seastar::future<> Heartbeat::start(entity_addrvec_t front_addrs,
for (auto& addr : boost::join(front_addrs.v, back_addrs.v)) {
addr.set_port(0);
}
front_msgr->try_bind(front_addrs,
local_conf()->ms_bind_port_min,
local_conf()->ms_bind_port_max);
back_msgr->try_bind(front_addrs,
local_conf()->ms_bind_port_min,
local_conf()->ms_bind_port_max);
return seastar::when_all_succeed(front_msgr->start(this),
back_msgr->start(this)).then([this] {
timer.arm_periodic(
std::chrono::seconds(local_conf()->osd_heartbeat_interval));
});
return seastar::when_all_succeed(
ceph::net::Messenger::create(entity_name_t::OSD(whoami),
"hb_front",
nonce,
seastar::engine().cpu_id())
.then([this, front_addrs] (auto msgr) {
front_msgr = msgr;
return front_msgr->try_bind(front_addrs,
local_conf()->ms_bind_port_min,
local_conf()->ms_bind_port_max);
}).then([this] { return front_msgr->start(this); }),
ceph::net::Messenger::create(entity_name_t::OSD(whoami),
"hb_back",
nonce,
seastar::engine().cpu_id())
.then([this, back_addrs] (auto msgr) {
back_msgr = msgr;
return back_msgr->try_bind(back_addrs,
local_conf()->ms_bind_port_min,
local_conf()->ms_bind_port_max);
}).then([this] { return back_msgr->start(this); }))
.then([this] {
timer.arm_periodic(
std::chrono::seconds(local_conf()->osd_heartbeat_interval));
});
}

seastar::future<> Heartbeat::stop()
Expand All @@ -77,24 +89,29 @@ const entity_addrvec_t& Heartbeat::get_back_addrs() const
return back_msgr->get_myaddrs();
}

void Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
seastar::future<> Heartbeat::add_peer(osd_id_t peer, epoch_t epoch)
{
auto found = peers.find(peer);
if (found == peers.end()) {
logger().info("add_peer({})", peer);
PeerInfo info;
auto osdmap = service.get_map();
// TODO: msgr v2
info.con_front =
front_msgr->connect(osdmap->get_hb_front_addrs(peer).legacy_addr(),
CEPH_ENTITY_TYPE_OSD);
info.con_back =
back_msgr->connect(osdmap->get_hb_back_addrs(peer).legacy_addr(),
CEPH_ENTITY_TYPE_OSD);
info.epoch = epoch;
peers.emplace(peer, std::move(info));
return seastar::when_all_succeed(
front_msgr->connect(osdmap->get_hb_front_addrs(peer).legacy_addr(),
CEPH_ENTITY_TYPE_OSD),
back_msgr->connect(osdmap->get_hb_back_addrs(peer).legacy_addr(),
CEPH_ENTITY_TYPE_OSD))
.then([this, peer, epoch] (auto xcon_front, auto xcon_back) {
PeerInfo info;
// sharded-messenger compatible mode
info.con_front = xcon_front->release();
info.con_back = xcon_back->release();
info.epoch = epoch;
peers.emplace(peer, std::move(info));
});
} else {
found->second.epoch = epoch;
return seastar::now();
}
}

Expand Down
8 changes: 5 additions & 3 deletions src/crimson/osd/heartbeat.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class Heartbeat : public ceph::net::Dispatcher {
entity_addrvec_t back);
seastar::future<> stop();

void add_peer(osd_id_t peer, epoch_t epoch);
seastar::future<> add_peer(osd_id_t peer, epoch_t epoch);
seastar::future<> update_peers(int whoami);
seastar::future<> remove_peer(osd_id_t peer);

Expand Down Expand Up @@ -64,8 +64,10 @@ class Heartbeat : public ceph::net::Dispatcher {
void add_reporter_peers(int whoami);

private:
std::unique_ptr<ceph::net::Messenger> front_msgr;
std::unique_ptr<ceph::net::Messenger> back_msgr;
const int whoami;
const uint32_t nonce;
ceph::net::Messenger* front_msgr = nullptr;
ceph::net::Messenger* back_msgr = nullptr;
const OSDMapService& service;
ceph::mon::Client& monc;

Expand Down
107 changes: 60 additions & 47 deletions src/crimson/osd/osd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include "messages/MOSDBoot.h"
#include "messages/MOSDMap.h"
#include "crimson/net/Connection.h"
#include "crimson/net/SocketMessenger.h"
#include "crimson/net/Messenger.h"
#include "crimson/os/cyan_collection.h"
#include "crimson/os/cyan_object.h"
#include "crimson/os/cyan_store.h"
Expand Down Expand Up @@ -35,29 +35,8 @@ using ceph::os::CyanStore;

OSD::OSD(int id, uint32_t nonce)
: whoami{id},
cluster_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami),
"cluster", nonce}},
public_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami),
"client", nonce}},
monc{*public_msgr},
heartbeat{new Heartbeat{whoami, nonce, *this, monc}},
heartbeat_timer{[this] { update_heartbeat_peers(); }}
{
for (auto msgr : {cluster_msgr.get(), public_msgr.get()}) {
if (local_conf()->ms_crc_data) {
msgr->set_crc_data();
}
if (local_conf()->ms_crc_header) {
msgr->set_crc_header();
}
}
dispatchers.push_front(this);
dispatchers.push_front(&monc);
osdmaps[0] = seastar::make_lw_shared<OSDMap>();
beacon_timer.set_callback([this] {
send_beacon();
});
}
nonce{nonce}
{}

OSD::~OSD() = default;

Expand Down Expand Up @@ -131,9 +110,38 @@ namespace {
seastar::future<> OSD::start()
{
logger().info("start");
const auto data_path = local_conf().get_val<std::string>("osd_data");
store = std::make_unique<ceph::os::CyanStore>(data_path);
return store->mount().then([this] {

return seastar::when_all_succeed(
ceph::net::Messenger::create(entity_name_t::OSD(whoami),
"cluster",
nonce,
seastar::engine().cpu_id())
.then([this] (auto msgr) { cluster_msgr = msgr; }),
ceph::net::Messenger::create(entity_name_t::OSD(whoami),
"client",
nonce,
seastar::engine().cpu_id())
.then([this] (auto msgr) { public_msgr = msgr; }))
.then([this] {
monc.reset(new ceph::mon::Client{*public_msgr});
heartbeat.reset(new Heartbeat{whoami, nonce, *this, *monc});

for (auto msgr : {cluster_msgr, public_msgr}) {
if (local_conf()->ms_crc_data) {
msgr->set_crc_data();
}
if (local_conf()->ms_crc_header) {
msgr->set_crc_header();
}
}
dispatchers.push_front(this);
dispatchers.push_front(monc.get());
osdmaps[0] = seastar::make_lw_shared<OSDMap>();

const auto data_path = local_conf().get_val<std::string>("osd_data");
store = std::make_unique<ceph::os::CyanStore>(data_path);
return store->mount();
}).then([this] {
meta_coll = make_unique<OSDMeta>(store->open_collection(coll_t::meta()),
store.get());
return meta_coll->load_superblock();
Expand All @@ -144,33 +152,36 @@ seastar::future<> OSD::start()
osdmap = std::move(map);
return load_pgs();
}).then([this] {
cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
local_conf()->ms_bind_port_min,
local_conf()->ms_bind_port_max);
public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
local_conf()->ms_bind_port_min,
local_conf()->ms_bind_port_max);
return seastar::when_all_succeed(cluster_msgr->start(&dispatchers),
public_msgr->start(&dispatchers));
return seastar::when_all_succeed(
cluster_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_CLUSTER),
local_conf()->ms_bind_port_min,
local_conf()->ms_bind_port_max)
.then([this] { return cluster_msgr->start(&dispatchers); }),
public_msgr->try_bind(pick_addresses(CEPH_PICK_ADDRESS_PUBLIC),
local_conf()->ms_bind_port_min,
local_conf()->ms_bind_port_max)
.then([this] { return public_msgr->start(&dispatchers); }));
}).then([this] {
return monc.start();
return monc->start();
}).then([this] {
monc.sub_want("osd_pg_creates", last_pg_create_epoch, 0);
monc.sub_want("mgrmap", 0, 0);
monc.sub_want("osdmap", 0, 0);
return monc.renew_subs();
monc->sub_want("osd_pg_creates", last_pg_create_epoch, 0);
monc->sub_want("mgrmap", 0, 0);
monc->sub_want("osdmap", 0, 0);
return monc->renew_subs();
}).then([this] {
return heartbeat->start(public_msgr->get_myaddrs(),
cluster_msgr->get_myaddrs());
}).then([this] {
beacon_timer.set_callback([this] { send_beacon(); });
heartbeat_timer.set_callback([this] { update_heartbeat_peers(); });
return start_boot();
});
}

seastar::future<> OSD::start_boot()
{
state.set_preboot();
return monc.get_version("osdmap").then([this](version_t newest, version_t oldest) {
return monc->get_version("osdmap").then([this](version_t newest, version_t oldest) {
return _preboot(newest, oldest);
});
}
Expand Down Expand Up @@ -222,7 +233,7 @@ seastar::future<> OSD::_send_boot()
heartbeat->get_front_addrs(),
cluster_msgr->get_myaddrs(),
CEPH_FEATURES_ALL);
return monc.send_message(m);
return monc->send_message(m);
}

seastar::future<> OSD::stop()
Expand All @@ -232,9 +243,11 @@ seastar::future<> OSD::stop()
return gate.close().then([this] {
return heartbeat->stop();
}).then([this] {
return monc.stop();
return monc->stop();
}).then([this] {
return public_msgr->shutdown();
}).then([this] {
return cluster_msgr->shutdown();
});
}

Expand Down Expand Up @@ -402,9 +415,9 @@ seastar::future<> OSD::store_maps(ceph::os::Transaction& t,
seastar::future<> OSD::osdmap_subscribe(version_t epoch, bool force_request)
{
logger().info("{}({})", __func__, epoch);
if (monc.sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
if (monc->sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
force_request) {
return monc.renew_subs();
return monc->renew_subs();
} else {
return seastar::now();
}
Expand Down Expand Up @@ -455,7 +468,7 @@ seastar::future<> OSD::handle_osd_map(ceph::net::ConnectionRef conn,
[=](auto& t) {
return store_maps(t, start, m).then([=, &t] {
// even if this map isn't from a mon, we may have satisfied our subscription
monc.sub_got("osdmap", last);
monc->sub_got("osdmap", last);
if (!superblock.oldest_map || skip_maps) {
superblock.oldest_map = first;
}
Expand Down Expand Up @@ -583,7 +596,7 @@ seastar::future<> OSD::send_beacon()
epoch_t min_last_epoch_clean = osdmap->get_epoch();
auto m = make_message<MOSDBeacon>(osdmap->get_epoch(),
min_last_epoch_clean);
return monc.send_message(m);
return monc->send_message(m);
}

void OSD::update_heartbeat_peers()
Expand Down
7 changes: 4 additions & 3 deletions src/crimson/osd/osd.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ class OSD : public ceph::net::Dispatcher,
seastar::gate gate;
seastar::timer<seastar::lowres_clock> beacon_timer;
const int whoami;
const uint32_t nonce;
// talk with osd
std::unique_ptr<ceph::net::Messenger> cluster_msgr;
ceph::net::Messenger* cluster_msgr = nullptr;
// talk with client/mon/mgr
std::unique_ptr<ceph::net::Messenger> public_msgr;
ceph::net::Messenger* public_msgr = nullptr;
ChainedDispatchers dispatchers;
ceph::mon::Client monc;
std::unique_ptr<ceph::mon::Client> monc;

std::unique_ptr<Heartbeat> heartbeat;
seastar::timer<seastar::lowres_clock> heartbeat_timer;
Expand Down
14 changes: 6 additions & 8 deletions src/test/crimson/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ add_executable(unittest_seastar_messenger test_messenger.cc)
add_ceph_unittest(unittest_seastar_messenger)
target_link_libraries(unittest_seastar_messenger ceph-common crimson)

# TODO: fix unittest_seastar_echo with the new design
#add_executable(unittest_seastar_echo
# test_alien_echo.cc)
#target_link_libraries(unittest_seastar_echo ceph-common global crimson)
add_executable(unittest_seastar_echo
test_alien_echo.cc)
target_link_libraries(unittest_seastar_echo ceph-common global crimson)

add_executable(unittest_seastar_thread_pool
test_thread_pool.cc)
Expand All @@ -26,10 +25,9 @@ add_executable(unittest_seastar_config
test_config.cc)
target_link_libraries(unittest_seastar_config crimson)

# TODO: fix unittest_seastar_monc with the new design
#add_executable(unittest_seastar_monc
# test_monc.cc)
#target_link_libraries(unittest_seastar_monc crimson)
add_executable(unittest_seastar_monc
test_monc.cc)
target_link_libraries(unittest_seastar_monc crimson)

add_executable(unittest_seastar_perfcounters
test_perfcounters.cc)
Expand Down
Loading

0 comments on commit 6760779

Please sign in to comment.