Skip to content

Commit

Permalink
crimson/net: port sharded-msgr to existing code
Browse files Browse the repository at this point in the history
Port sharded-msgr to crimson osd, monc and tests with compatible mode.

Signed-off-by: Yingxin Cheng <[email protected]>
  • Loading branch information
cyx1231st committed Jan 23, 2019
1 parent 5d91690 commit 71ba589
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 78 deletions.
3 changes: 1 addition & 2 deletions src/crimson/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,7 @@ set(crimson_thread_srcs
thread/Throttle.cc)
add_library(crimson STATIC
${crimson_auth_srcs}
# TODO: fix crimson_mon_client with the new design
# ${crimson_mon_srcs}
${crimson_mon_srcs}
${crimson_net_srcs}
${crimson_thread_srcs}
${CMAKE_SOURCE_DIR}/src/common/buffer_seastar.cc)
Expand Down
20 changes: 13 additions & 7 deletions src/crimson/mon/MonClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -508,13 +508,19 @@ seastar::future<> Client::reopen_session(int rank)
#warning fixme
auto peer = monmap.get_addrs(rank).legacy_addr();
logger().info("connecting to mon.{}", rank);
auto conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MON);
auto& mc = pending_conns.emplace_back(conn, &keyring);
return mc.authenticate(
monmap.get_epoch(), entity_name,
*auth_methods, want_keys).handle_exception([conn](auto ep) {
return (*conn)->close().then([ep = std::move(ep)] {
std::rethrow_exception(ep);
return msgr.connect(peer, CEPH_ENTITY_TYPE_MON)
.then([this] (auto xconn) {
// sharded-messenger compatible mode assumes all connections running
// on in the single shard.
ceph_assert((*xconn)->shard_id() == seastar::engine().cpu_id());
ceph::net::ConnectionRef conn = xconn->release();
auto& mc = pending_conns.emplace_back(conn, &keyring);
return mc.authenticate(
monmap.get_epoch(), entity_name,
*auth_methods, want_keys).handle_exception([conn](auto ep) {
return conn->close().then([ep = std::move(ep)] {
std::rethrow_exception(ep);
});
});
}).then([peer, this] {
if (!is_hunting()) {
Expand Down
11 changes: 11 additions & 0 deletions src/crimson/net/Messenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@
#include <seastar/core/future.hh>

#include "Fwd.h"
#include "msg/Policy.h"
#include "crimson/thread/Throttle.h"

class AuthAuthorizer;

namespace ceph::net {

using SocketPolicy = ceph::net::Policy<ceph::thread::Throttle>;
using Throttle = ceph::thread::Throttle;

class Messenger {
entity_name_t my_name;
entity_addrvec_t my_addrs;
Expand Down Expand Up @@ -82,6 +87,12 @@ class Messenger {

virtual void print(ostream& out) const = 0;

virtual void set_default_policy(const SocketPolicy& p) = 0;

virtual void set_policy(entity_type_t peer_type, const SocketPolicy& p) = 0;

virtual void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) = 0;

static seastar::future<std::reference_wrapper<Messenger>>
create(const entity_name_t& name,
const std::string& lname,
Expand Down
12 changes: 6 additions & 6 deletions src/crimson/net/SocketMessenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,12 @@
#include <seastar/core/reactor.hh>
#include <seastar/core/sharded.hh>

#include "msg/Policy.h"
#include "Messenger.h"
#include "SocketConnection.h"
#include "crimson/thread/Throttle.h"

namespace ceph::net {

using SocketPolicy = ceph::net::Policy<ceph::thread::Throttle>;

class SocketMessenger final : public Messenger, public seastar::peering_sharded_service<SocketMessenger> {
const int master_sid;
const seastar::shard_id sid;
Expand Down Expand Up @@ -89,11 +86,14 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_
<< ") " << get_myaddr();
}

void set_default_policy(const SocketPolicy& p) override;

void set_policy(entity_type_t peer_type, const SocketPolicy& p) override;

void set_policy_throttler(entity_type_t peer_type, Throttle* throttle) override;

public:
seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me);
void set_default_policy(const SocketPolicy& p);
void set_policy(entity_type_t peer_type, const SocketPolicy& p);
void set_policy_throttler(entity_type_t peer_type, Throttle* throttle);

SocketConnectionRef lookup_conn(const entity_addr_t& addr);
void accept_conn(SocketConnectionRef);
Expand Down
14 changes: 12 additions & 2 deletions src/crimson/osd/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "common/ceph_argparse.h"
#include "crimson/common/config_proxy.h"
#include "crimson/net/Messenger.h"

#include "osd.h"

Expand Down Expand Up @@ -60,8 +61,17 @@ int main(int argc, char* argv[])
}).then([&conf_file_list] {
return local_conf().parse_config_files(conf_file_list);
}).then([&] {
return osd.start_single(std::stoi(local_conf()->name.get_id()),
static_cast<uint32_t>(getpid()));
int id = std::stoi(local_conf()->name.get_id());
auto nonce = static_cast<uint32_t>(getpid());
return ceph::net::Messenger::create(entity_name_t::OSD(id),
"cluster", nonce, 0)
.then([id, nonce, &osd](auto cluster_msgr) {
return ceph::net::Messenger::create(entity_name_t::OSD(id),
"client", nonce, 0)
.then([cluster_msgr, id, &osd](auto client_msgr) {
return osd.start_single(id, &cluster_msgr.get(), &client_msgr.get());
});
});
}).then([&] {
return osd.invoke_on(0, &OSD::start);
});
Expand Down
16 changes: 9 additions & 7 deletions src/crimson/osd/osd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include "messages/MOSDBoot.h"
#include "messages/MOSDMap.h"
#include "crimson/net/Connection.h"
#include "crimson/net/SocketMessenger.h"
#include "crimson/net/Messenger.h"

namespace {
seastar::logger& logger() {
Expand All @@ -20,15 +20,15 @@ namespace {

using ceph::common::local_conf;

OSD::OSD(int id, uint32_t nonce)
OSD::OSD(int id,
ceph::net::Messenger *cluster_msgr,
ceph::net::Messenger *client_msgr)
: whoami{id},
cluster_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami),
"cluster", nonce}},
client_msgr{new ceph::net::SocketMessenger{entity_name_t::OSD(whoami),
"client", nonce}},
cluster_msgr{cluster_msgr},
client_msgr{client_msgr},
monc{*client_msgr}
{
for (auto msgr : {cluster_msgr.get(), client_msgr.get()}) {
for (auto msgr : {cluster_msgr, client_msgr}) {
if (local_conf()->ms_crc_data) {
msgr->set_crc_data();
}
Expand Down Expand Up @@ -143,6 +143,8 @@ seastar::future<> OSD::stop()
return monc.stop();
}).then([this] {
return client_msgr->shutdown();
}).then([this] {
return cluster_msgr->shutdown();
});
}

Expand Down
6 changes: 3 additions & 3 deletions src/crimson/osd/osd.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ class OSD : public ceph::net::Dispatcher {
seastar::timer<seastar::lowres_clock> beacon_timer;
const int whoami;
// talk with osd
std::unique_ptr<ceph::net::Messenger> cluster_msgr;
ceph::net::Messenger* cluster_msgr;
// talk with mon/mgr
std::unique_ptr<ceph::net::Messenger> client_msgr;
ceph::net::Messenger* client_msgr;
ChainedDispatchers dispatchers;
ceph::mon::Client monc;

Expand Down Expand Up @@ -58,7 +58,7 @@ class OSD : public ceph::net::Dispatcher {
seastar::future<> ms_handle_remote_reset(ceph::net::ConnectionRef conn) override;

public:
OSD(int id, uint32_t nonce);
OSD(int id, ceph::net::Messenger *cluster_msgr, ceph::net::Messenger *client_msgr);
~OSD();

static seastar::future<> mkfs(uuid_d fsid, int whoami);
Expand Down
14 changes: 6 additions & 8 deletions src/test/crimson/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ add_executable(unittest_seastar_messenger test_messenger.cc)
add_ceph_unittest(unittest_seastar_messenger)
target_link_libraries(unittest_seastar_messenger ceph-common crimson)

# TODO: fix unittest_seastar_echo with the new design
#add_executable(unittest_seastar_echo
# test_alien_echo.cc)
#target_link_libraries(unittest_seastar_echo ceph-common global crimson)
add_executable(unittest_seastar_echo
test_alien_echo.cc)
target_link_libraries(unittest_seastar_echo ceph-common global crimson)

add_executable(unittest_seastar_thread_pool
test_thread_pool.cc)
Expand All @@ -26,10 +25,9 @@ add_executable(unittest_seastar_config
test_config.cc)
target_link_libraries(unittest_seastar_config crimson)

# TODO: fix unittest_seastar_monc with the new design
#add_executable(unittest_seastar_monc
# test_monc.cc)
#target_link_libraries(unittest_seastar_monc crimson)
add_executable(unittest_seastar_monc
test_monc.cc)
target_link_libraries(unittest_seastar_monc crimson)

add_executable(unittest_seastar_perfcounters
test_perfcounters.cc)
Expand Down
91 changes: 50 additions & 41 deletions src/test/crimson/test_alien_echo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include "msg/Messenger.h"
#include "crimson/net/Connection.h"
#include "crimson/net/Dispatcher.h"
#include "crimson/net/SocketMessenger.h"
#include "crimson/net/Messenger.h"
#include "crimson/net/Config.h"
#include "crimson/thread/Condition.h"
#include "crimson/thread/Throttle.h"
Expand Down Expand Up @@ -38,8 +38,7 @@ struct DummyAuthAuthorizer : public AuthAuthorizer {

struct Server {
ceph::thread::Throttle byte_throttler;
static constexpr int64_t server_num = 0;
ceph::net::SocketMessenger msgr{entity_name_t::OSD(server_num), "server", 0};
ceph::net::Messenger& msgr;
struct ServerDispatcher : ceph::net::Dispatcher {
unsigned count = 0;
seastar::condition_variable on_reply;
Expand All @@ -65,8 +64,9 @@ struct Server {
new DummyAuthAuthorizer{});
}
} dispatcher;
Server()
: byte_throttler(ceph::net::conf.osd_client_message_size_cap)
Server(ceph::net::Messenger& msgr)
: byte_throttler(ceph::net::conf.osd_client_message_size_cap),
msgr{msgr}
{
msgr.set_crc_header();
msgr.set_crc_data();
Expand All @@ -75,8 +75,7 @@ struct Server {

struct Client {
ceph::thread::Throttle byte_throttler;
static constexpr int64_t client_num = 1;
ceph::net::SocketMessenger msgr{entity_name_t::OSD(client_num), "client", 0};
ceph::net::Messenger& msgr;
struct ClientDispatcher : ceph::net::Dispatcher {
unsigned count = 0;
seastar::condition_variable on_reply;
Expand All @@ -88,8 +87,9 @@ struct Client {
return seastar::now();
}
} dispatcher;
Client()
: byte_throttler(ceph::net::conf.osd_client_message_size_cap)
Client(ceph::net::Messenger& msgr)
: byte_throttler(ceph::net::conf.osd_client_message_size_cap),
msgr{msgr}
{
msgr.set_crc_header();
msgr.set_crc_data();
Expand Down Expand Up @@ -275,41 +275,50 @@ seastar_echo(SeastarContext& sc,
{
std::cout << "seastar/";
if (role == echo_role::as_server) {
return seastar::do_with(seastar_pingpong::Server{},
[&addr, count](auto& server) mutable {
std::cout << "server listening at " << addr << std::endl;
// bind the server
server.msgr.set_policy_throttler(entity_name_t::TYPE_OSD,
&server.byte_throttler);
server.msgr.bind(entity_addrvec_t{addr});
return server.msgr.start(&server.dispatcher)
.then([&dispatcher=server.dispatcher, count] {
return dispatcher.on_reply.wait([&dispatcher, count] {
return dispatcher.count >= count;
});
}).finally([&server] {
std::cout << "server shutting down" << std::endl;
return server.msgr.shutdown();
return ceph::net::Messenger::create(entity_name_t::OSD(0), "server", 0,
seastar::engine().cpu_id())
.then([&addr, count] (auto msgr) {
return seastar::do_with(seastar_pingpong::Server{msgr},
[&addr, count](auto& server) mutable {
std::cout << "server listening at " << addr << std::endl;
// bind the server
server.msgr.set_policy_throttler(entity_name_t::TYPE_OSD,
&server.byte_throttler);
return server.msgr.bind(entity_addrvec_t{addr})
.then([&server] {
return server.msgr.start(&server.dispatcher);
}).then([&dispatcher=server.dispatcher, count] {
return dispatcher.on_reply.wait([&dispatcher, count] {
return dispatcher.count >= count;
});
}).finally([&server] {
std::cout << "server shutting down" << std::endl;
return server.msgr.shutdown();
});
});
});
} else {
return seastar::do_with(seastar_pingpong::Client{},
[&addr, count](auto& client) {
std::cout << "client sending to " << addr << std::endl;
client.msgr.set_policy_throttler(entity_name_t::TYPE_OSD,
&client.byte_throttler);
return client.msgr.start(&client.dispatcher)
.then([&] {
return client.msgr.connect(addr, entity_name_t::TYPE_OSD);
}).then([&disp=client.dispatcher, count](ceph::net::ConnectionRef conn) {
return seastar::do_until(
[&disp,count] { return disp.count >= count; },
[&disp,conn] { return conn->send(MessageRef{new MPing(), false})
.then([&] { return disp.on_reply.wait(); });
});
}).finally([&client] {
std::cout << "client shutting down" << std::endl;
return client.msgr.shutdown();
return ceph::net::Messenger::create(entity_name_t::OSD(1), "client", 1,
seastar::engine().cpu_id())
.then([&addr, count] (auto msgr) {
return seastar::do_with(seastar_pingpong::Client{msgr},
[&addr, count](auto& client) {
std::cout << "client sending to " << addr << std::endl;
client.msgr.set_policy_throttler(entity_name_t::TYPE_OSD,
&client.byte_throttler);
return client.msgr.start(&client.dispatcher)
.then([&] {
return client.msgr.connect(addr, entity_name_t::TYPE_OSD);
}).then([&disp=client.dispatcher, count](ceph::net::ConnectionXRef conn) {
return seastar::do_until(
[&disp,count] { return disp.count >= count; },
[&disp,conn] { return (*conn)->send(MessageRef{new MPing(), false})
.then([&] { return disp.on_reply.wait(); });
});
}).finally([&client] {
std::cout << "client shutting down" << std::endl;
return client.msgr.shutdown();
});
});
});
}
Expand Down
5 changes: 3 additions & 2 deletions src/test/crimson/test_monc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ static seastar::future<> test_monc()
}).then([] {
return ceph::common::sharded_perf_coll().start();
}).then([] {
return seastar::do_with(ceph::net::SocketMessenger{entity_name_t::OSD(0), "monc", 0},
[](ceph::net::Messenger& msgr) {
return ceph::net::Messenger::create(entity_name_t::OSD(0), "monc", 0,
seastar::engine().cpu_id())
.then([] (ceph::net::Messenger& msgr) {
auto& conf = ceph::common::local_conf();
if (conf->ms_crc_data) {
msgr.set_crc_data();
Expand Down

0 comments on commit 71ba589

Please sign in to comment.