Skip to content

Commit

Permalink
crimson/net: compatible mode of crimson-msgr
Browse files Browse the repository at this point in the history
Added a compatible mode with master_sid to support single-core
dispatcher.

Signed-off-by: Yingxin Cheng <[email protected]>
  • Loading branch information
cyx1231st committed Jan 23, 2019
1 parent c69afcb commit 5d91690
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/crimson/mon/MonClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ seastar::future<> Client::reopen_session(int rank)
return mc.authenticate(
monmap.get_epoch(), entity_name,
*auth_methods, want_keys).handle_exception([conn](auto ep) {
return conn->close().then([ep = std::move(ep)] {
return (*conn)->close().then([ep = std::move(ep)] {
std::rethrow_exception(ep);
});
}).then([peer, this] {
Expand Down
5 changes: 3 additions & 2 deletions src/crimson/net/Messenger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ namespace ceph::net {
seastar::future<std::reference_wrapper<Messenger>>
Messenger::create(const entity_name_t& name,
const std::string& lname,
const uint64_t nonce)
const uint64_t nonce,
const int master_sid)
{
return create_sharded<SocketMessenger>(name, lname, nonce)
return create_sharded<SocketMessenger>(name, lname, nonce, master_sid)
.then([](Messenger &msgr) {
return std::ref(msgr);
});
Expand Down
5 changes: 4 additions & 1 deletion src/crimson/net/Messenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ class Messenger {
virtual void print(ostream& out) const = 0;

static seastar::future<std::reference_wrapper<Messenger>>
create(const entity_name_t& name, const std::string& lname, const uint64_t nonce);
create(const entity_name_t& name,
const std::string& lname,
const uint64_t nonce,
const int master_sid=-1);
};

inline ostream& operator<<(ostream& out, const Messenger& msgr) {
Expand Down
10 changes: 9 additions & 1 deletion src/crimson/net/SocketMessenger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ namespace {

SocketMessenger::SocketMessenger(const entity_name_t& myname,
const std::string& logic_name,
uint32_t nonce)
uint32_t nonce,
int master_sid)
: Messenger{myname},
master_sid{master_sid},
sid{seastar::engine().cpu_id()},
logic_name{logic_name},
nonce{nonce}
Expand Down Expand Up @@ -198,6 +200,9 @@ void SocketMessenger::set_policy_throttler(entity_type_t peer_type,
seastar::shard_id SocketMessenger::locate_shard(const entity_addr_t& addr)
{
ceph_assert(addr.get_family() == AF_INET);
if (master_sid >= 0) {
return master_sid;
}
std::size_t seed = 0;
boost::hash_combine(seed, addr.u.sin.sin_addr.s_addr);
//boost::hash_combine(seed, addr.u.sin.sin_port);
Expand Down Expand Up @@ -227,6 +232,9 @@ void SocketMessenger::unaccept_conn(SocketConnectionRef conn)

void SocketMessenger::register_conn(SocketConnectionRef conn)
{
if (master_sid >= 0) {
ceph_assert(static_cast<int>(sid) == master_sid);
}
auto [i, added] = connections.emplace(conn->get_peer_addr(), conn);
std::ignore = i;
ceph_assert(added);
Expand Down
9 changes: 6 additions & 3 deletions src/crimson/net/SocketMessenger.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ namespace ceph::net {
using SocketPolicy = ceph::net::Policy<ceph::thread::Throttle>;

class SocketMessenger final : public Messenger, public seastar::peering_sharded_service<SocketMessenger> {
const int master_sid;
const seastar::shard_id sid;
seastar::promise<> shutdown_promise;

Expand All @@ -53,15 +54,17 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_
const entity_type_t& peer_type);
seastar::future<> do_shutdown();
// conn sharding options:
// 1. Simplest: sharded by ip only
// 2. Balanced: sharded by ip + port + nonce,
// 0. Compatible (master_sid >= 0): place all connections to one master shard
// 1. Simplest (master_sid < 0): sharded by ip only
// 2. Balanced (not implemented): sharded by ip + port + nonce,
// but, need to move SocketConnection between cores.
seastar::shard_id locate_shard(const entity_addr_t& addr);

public:
SocketMessenger(const entity_name_t& myname,
const std::string& logic_name,
uint32_t nonce);
uint32_t nonce,
int master_sid);

seastar::future<> set_myaddrs(const entity_addrvec_t& addr) override;

Expand Down

0 comments on commit 5d91690

Please sign in to comment.