From b1cbad31e4eab4614e91299973db38f982f3a7e2 Mon Sep 17 00:00:00 2001 From: Yingxin Date: Thu, 22 Nov 2018 21:55:23 +0800 Subject: [PATCH 1/7] crimson/net: add basic loggings for SocketConnection Signed-off-by: Yingxin --- src/crimson/net/SocketConnection.cc | 24 ++++++++++++++++++++---- src/crimson/net/SocketConnection.h | 4 ++++ src/crimson/net/SocketMessenger.cc | 5 +++-- src/crimson/net/SocketMessenger.h | 11 ++++++++++- src/test/crimson/test_alien_echo.cc | 4 ++-- src/test/crimson/test_messenger.cc | 8 ++++---- src/test/crimson/test_monc.cc | 2 +- 7 files changed, 44 insertions(+), 14 deletions(-) diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index ef8281d5a79ad..e7e2c5b28a3b3 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -95,7 +95,7 @@ void SocketConnection::read_tags_until_next_message() return handle_keepalive2_ack() .then([this] { return stop_t::no; }); case CEPH_MSGR_TAG_CLOSE: - std::cout << "close" << std::endl; + logger().info("{} got tag close", *this); break; } return seastar::make_ready_future(stop_t::no); @@ -322,6 +322,7 @@ seastar::future<> SocketConnection::close() ceph_assert(state == state_t::connecting); close_ready = pending_dispatch.close().finally(std::move(cleanup)); } + logger().debug("{} trigger closing, was {}", *this, static_cast(state)); state = state_t::closing; return close_ready.get_future(); } @@ -543,7 +544,7 @@ SocketConnection::handle_keepalive2() return socket->read_exactly(sizeof(ceph_timespec)) .then([this] (auto buf) { k.ack.stamp = *reinterpret_cast(buf.get()); - std::cout << "keepalive2 " << k.ack.stamp.tv_sec << std::endl; + logger().info("{} keepalive2 {}", *this, k.ack.stamp.tv_sec); return socket->write_flush(make_static_packet(k.ack)); }); } @@ -555,7 +556,7 @@ SocketConnection::handle_keepalive2_ack() .then([this] (auto buf) { auto t = reinterpret_cast(buf.get()); k.ack_stamp = *t; - std::cout << "keepalive2 ack " << t->tv_sec << std::endl; + logger().info("{} keepalive2 ack {}", *this, t->tv_sec); }); } @@ -778,6 +779,7 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr, peer_addr = _peer_addr; peer_type = _peer_type; messenger.register_conn(this); + logger().debug("{} trigger connecting, was {}", *this, static_cast(state)); state = state_t::connecting; seastar::with_gate(pending_dispatch, [this] { return seastar::connect(peer_addr.in4_addr()) @@ -826,6 +828,7 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr, execute_open(); }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in the connecting state + logger().warn("{} connecting fault: {}", *this, eptr); close(); }); }); @@ -840,6 +843,7 @@ SocketConnection::start_accept(seastar::connected_socket&& fd, peer_addr = _peer_addr; socket.emplace(std::move(fd)); messenger.accept_conn(this); + logger().debug("{} trigger accepting, was {}", *this, static_cast(state)); state = state_t::accepting; seastar::with_gate(pending_dispatch, [this] { // encode/send server's handshake header @@ -878,6 +882,7 @@ SocketConnection::start_accept(seastar::connected_socket&& fd, execute_open(); }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in the accepting state + logger().warn("{} accepting fault: {}", *this, eptr); close(); }); }); @@ -886,6 +891,7 @@ SocketConnection::start_accept(seastar::connected_socket&& fd, void SocketConnection::execute_open() { + logger().debug("{} trigger open, was {}", *this, static_cast(state)); state = state_t::open; seastar::with_gate(pending_dispatch, [this] { // start background processing of tags @@ -910,8 +916,9 @@ SocketConnection::execute_open() } else { throw e; } - }).handle_exception([] (std::exception_ptr eptr) { + }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in the open state + logger().warn("{} open fault: {}", *this, eptr); }); }); } @@ -931,3 +938,12 @@ seastar::future<> SocketConnection::fault() } return seastar::sleep(h.backoff); } + +namespace ceph::net { + +ostream& operator<<(ostream& out, const SocketConnection& conn) { + return out << conn.messenger + << " >> [" << conn.peer_addr << "]"; +} + +} // namespace ceph::net diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index effb594c14fa8..24f9d0a3e90dd 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -220,6 +220,10 @@ class SocketConnection : public Connection { std::tuple> get_out_queue() { return {out_seq, std::move(out_q)}; } + + friend ostream& operator<<(ostream& out, const SocketConnection& conn); }; +ostream& operator<<(ostream& out, const SocketConnection& conn); + } // namespace ceph::net diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 75c2870f3f8ce..6e23f9d62865d 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -22,8 +22,9 @@ using namespace ceph::net; -SocketMessenger::SocketMessenger(const entity_name_t& myname) - : Messenger{myname} +SocketMessenger::SocketMessenger(const entity_name_t& myname, + const std::string& logic_name) + : Messenger{myname}, logic_name{logic_name} {} void SocketMessenger::bind(const entity_addr_t& addr) diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index c348f5920b329..f205da861b612 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -36,12 +36,14 @@ class SocketMessenger final : public Messenger { std::set accepting_conns; using Throttle = ceph::thread::Throttle; ceph::net::PolicySet policy_set; + const std::string logic_name; seastar::future<> accept(seastar::connected_socket socket, seastar::socket_address paddr); public: - SocketMessenger(const entity_name_t& myname); + SocketMessenger(const entity_name_t& myname, + const std::string& logic_name); void bind(const entity_addr_t& addr) override; @@ -62,6 +64,13 @@ class SocketMessenger final : public Messenger { void unaccept_conn(SocketConnectionRef); void register_conn(SocketConnectionRef); void unregister_conn(SocketConnectionRef); + + friend ostream& operator<<(ostream& out, const SocketMessenger& msgr) { + return out << msgr.get_myname() + << "(" << msgr.logic_name + << ")[" << msgr.get_myaddr() + << "]"; + } }; } // namespace ceph::net diff --git a/src/test/crimson/test_alien_echo.cc b/src/test/crimson/test_alien_echo.cc index 51e009145c70c..78ef81d306797 100644 --- a/src/test/crimson/test_alien_echo.cc +++ b/src/test/crimson/test_alien_echo.cc @@ -39,7 +39,7 @@ struct DummyAuthAuthorizer : public AuthAuthorizer { struct Server { ceph::thread::Throttle byte_throttler; static constexpr int64_t server_num = 0; - ceph::net::SocketMessenger msgr{entity_name_t::OSD(server_num)}; + ceph::net::SocketMessenger msgr{entity_name_t::OSD(server_num), "server"}; struct ServerDispatcher : ceph::net::Dispatcher { unsigned count = 0; seastar::condition_variable on_reply; @@ -76,7 +76,7 @@ struct Server { struct Client { ceph::thread::Throttle byte_throttler; static constexpr int64_t client_num = 1; - ceph::net::SocketMessenger msgr{entity_name_t::OSD(client_num)}; + ceph::net::SocketMessenger msgr{entity_name_t::OSD(client_num), "client"}; struct ClientDispatcher : ceph::net::Dispatcher { unsigned count = 0; seastar::condition_variable on_reply; diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index c16434113e221..a2e19d4b79d0a 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -22,7 +22,7 @@ static seastar::future<> test_echo(unsigned rounds, entity_addr_t addr; struct { - ceph::net::SocketMessenger messenger{entity_name_t::OSD(1)}; + ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server1"}; struct ServerDispatcher : ceph::net::Dispatcher { seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, MessageRef m) override { @@ -38,7 +38,7 @@ static seastar::future<> test_echo(unsigned rounds, struct { unsigned rounds; std::bernoulli_distribution keepalive_dist{}; - ceph::net::SocketMessenger messenger{entity_name_t::OSD(0)}; + ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client1"}; struct ClientDispatcher : ceph::net::Dispatcher { seastar::promise reply; unsigned count = 0u; @@ -127,7 +127,7 @@ static seastar::future<> test_concurrent_dispatch() entity_addr_t addr; struct { - ceph::net::SocketMessenger messenger{entity_name_t::OSD(1)}; + ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server2"}; class ServerDispatcher : public ceph::net::Dispatcher { int count = 0; seastar::promise<> on_second; // satisfied on second dispatch @@ -151,7 +151,7 @@ static seastar::future<> test_concurrent_dispatch() } server; struct { - ceph::net::SocketMessenger messenger{entity_name_t::OSD(0)}; + ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client2"}; ceph::net::Dispatcher dispatcher; } client; }; diff --git a/src/test/crimson/test_monc.cc b/src/test/crimson/test_monc.cc index a2b76421f1f65..d90d7905f7311 100644 --- a/src/test/crimson/test_monc.cc +++ b/src/test/crimson/test_monc.cc @@ -23,7 +23,7 @@ static seastar::future<> test_monc() conf->cluster = cluster; return conf.parse_config_files(conf_file_list); }).then([] { - return seastar::do_with(ceph::net::SocketMessenger{entity_name_t::OSD(0)}, + return seastar::do_with(ceph::net::SocketMessenger{entity_name_t::OSD(0), "monc"}, [](ceph::net::Messenger& msgr) { auto& conf = ceph::common::local_conf(); if (conf->ms_crc_data) { From 8674978353acd1fe1a6e124b54232387f5548d7f Mon Sep 17 00:00:00 2001 From: Yingxin Date: Mon, 17 Dec 2018 21:51:49 +0800 Subject: [PATCH 2/7] crimson/net: fix address learning during banner exchange * Don't store my_addr in `Connection`, because my_addr can be learned and thus changed. * Support nonce in SocketMessenger. * Always set nonce when set_myaddr(). * Add learned_addr() for SocketMessenger. * Add my_socket_port and peer_socket_port to show the real connecting ports of the SocketConnection. * Fix bannder exchange logic for addresses, including nonce, type, ip, port, socket_port for my_addr and peer_addr. * Add more detailed logging prefixes for SocketConnection. Signed-off-by: Yingxin --- src/crimson/mon/MonClient.cc | 4 +-- src/crimson/net/Connection.h | 5 +--- src/crimson/net/Messenger.h | 2 +- src/crimson/net/SocketConnection.cc | 38 +++++++++++++++-------------- src/crimson/net/SocketConnection.h | 8 +++++- src/crimson/net/SocketMessenger.cc | 38 ++++++++++++++++++++++++----- src/crimson/net/SocketMessenger.h | 7 +++++- src/test/crimson/test_alien_echo.cc | 4 +-- src/test/crimson/test_messenger.cc | 12 ++++++--- src/test/crimson/test_monc.cc | 2 +- 10 files changed, 80 insertions(+), 40 deletions(-) diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc index 8817abc193dd6..058843c62a348 100644 --- a/src/crimson/mon/MonClient.cc +++ b/src/crimson/mon/MonClient.cc @@ -163,7 +163,7 @@ seastar::future Connection::do_auth() return reply.get_future(); }).then([this] (Ref m) { logger().info("mon {} => {} returns {}: {}", - conn->get_my_addr(), + conn->get_messenger()->get_myaddr(), conn->get_peer_addr(), *m, m->result); reply = decltype(reply){}; auto p = m->result_bl.cbegin(); @@ -360,7 +360,7 @@ seastar::future<> Client::handle_auth_reply(ceph::net::ConnectionRef conn, Ref m) { logger().info("mon {} => {} returns {}: {}", - conn->get_my_addr(), + conn->get_messenger()->get_myaddr(), conn->get_peer_addr(), *m, m->result); auto found = std::find_if(pending_conns.begin(), pending_conns.end(), [peer_addr = conn->get_peer_addr()](auto& mc) { diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h index cc2f4eabf8577..5262a43f778d6 100644 --- a/src/crimson/net/Connection.h +++ b/src/crimson/net/Connection.h @@ -27,17 +27,14 @@ using seq_num_t = uint64_t; class Connection : public boost::intrusive_ref_counter { protected: - entity_addr_t my_addr; entity_addr_t peer_addr; peer_type_t peer_type = -1; public: - Connection(const entity_addr_t& my_addr) - : my_addr(my_addr) {} + Connection() {} virtual ~Connection() {} virtual Messenger* get_messenger() const = 0; - const entity_addr_t& get_my_addr() const { return my_addr; } const entity_addr_t& get_peer_addr() const { return peer_addr; } virtual int get_peer_type() const = 0; diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h index 0d8484fd2c21b..a259e4db2ef23 100644 --- a/src/crimson/net/Messenger.h +++ b/src/crimson/net/Messenger.h @@ -36,7 +36,7 @@ class Messenger { const entity_name_t& get_myname() const { return my_name; } const entity_addr_t& get_myaddr() const { return my_addr; } - void set_myaddr(const entity_addr_t& addr) { + virtual void set_myaddr(const entity_addr_t& addr) { my_addr = addr; } diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index e7e2c5b28a3b3..e4d0275e6a4c5 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -44,10 +44,8 @@ namespace { } SocketConnection::SocketConnection(SocketMessenger& messenger, - const entity_addr_t& my_addr, Dispatcher& dispatcher) - : Connection(my_addr), - messenger(messenger), + : messenger(messenger), dispatcher(dispatcher), send_ready(h.promise.get_future()) { @@ -587,7 +585,7 @@ SocketConnection::handle_connect_with_existing(SocketConnectionRef existing, buf h.reply.connect_seq = existing->connect_seq() + 1; return send_connect_reply(CEPH_MSGR_TAG_RETRY_SESSION); } - } else if (peer_addr < my_addr || + } else if (peer_addr < messenger.get_myaddr() || existing->is_server_side()) { // incoming wins return replace_existing(existing, std::move(authorizer_reply)); @@ -778,6 +776,7 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr, ceph_assert(!socket); peer_addr = _peer_addr; peer_type = _peer_type; + peer_socket_port = _peer_addr.get_port(); messenger.register_conn(this); logger().debug("{} trigger connecting, was {}", *this, static_cast(state)); state = state_t::connecting; @@ -801,15 +800,13 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr, ceph_assert(p.end()); validate_peer_addr(saddr, peer_addr); - if (my_addr != caddr) { - // take peer's address for me, but preserve my nonce - caddr.nonce = my_addr.nonce; - my_addr = caddr; - } + my_socket_port = caddr.get_port(); + messenger.learned_addr(caddr); + // encode/send client's handshake header bufferlist bl; bl.append(buffer::create_static(banner_size, banner)); - ::encode(my_addr, bl, 0); + ::encode(messenger.get_myaddr(), bl, 0); h.global_seq = messenger.get_global_seq(); return socket->write_flush(std::move(bl)); }).then([=] { @@ -840,17 +837,20 @@ SocketConnection::start_accept(seastar::connected_socket&& fd, { ceph_assert(state == state_t::none); ceph_assert(!socket); - peer_addr = _peer_addr; + peer_addr.u = _peer_addr.u; + peer_addr.set_port(0); + peer_socket_port = _peer_addr.get_port(); + my_socket_port = messenger.get_myaddr().get_port(); socket.emplace(std::move(fd)); messenger.accept_conn(this); logger().debug("{} trigger accepting, was {}", *this, static_cast(state)); state = state_t::accepting; - seastar::with_gate(pending_dispatch, [this] { + seastar::with_gate(pending_dispatch, [this, _peer_addr] { // encode/send server's handshake header bufferlist bl; bl.append(buffer::create_static(banner_size, banner)); - ::encode(my_addr, bl, 0); - ::encode(peer_addr, bl, 0); + ::encode(messenger.get_myaddr(), bl, 0); + ::encode(_peer_addr, bl, 0); return socket->write_flush(std::move(bl)) .then([this] { // read client's handshake header and connect request @@ -861,9 +861,9 @@ SocketConnection::start_accept(seastar::connected_socket&& fd, entity_addr_t addr; ::decode(addr, p); ceph_assert(p.end()); - if (!addr.is_blank_ip()) { - peer_addr = addr; - } + peer_addr.set_type(addr.get_type()); + peer_addr.set_port(addr.get_port()); + peer_addr.set_nonce(addr.get_nonce()); }).then([this] { return seastar::repeat([this] { return repeat_handle_connect(); @@ -943,7 +943,9 @@ namespace ceph::net { ostream& operator<<(ostream& out, const SocketConnection& conn) { return out << conn.messenger - << " >> [" << conn.peer_addr << "]"; + << " [@" << conn.my_socket_port + << " >> " << conn.peer_addr + << " @" << conn.peer_socket_port << "]"; } } // namespace ceph::net diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 24f9d0a3e90dd..3503d8d24dd3c 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -40,6 +40,13 @@ class SocketConnection : public Connection { Dispatcher& dispatcher; seastar::gate pending_dispatch; + /// my_socket_port can be different from my_addr.get_port() if is + /// connector. + int my_socket_port = 0; + /// peer_socket_port can be different from peer_addr.get_port() if is + /// acceptor. + int peer_socket_port = 0; + enum class state_t { none, accepting, @@ -157,7 +164,6 @@ class SocketConnection : public Connection { public: SocketConnection(SocketMessenger& messenger, - const entity_addr_t& my_addr, Dispatcher& dispatcher); ~SocketConnection(); diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 6e23f9d62865d..b51129c661f0d 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -23,9 +23,22 @@ using namespace ceph::net; SocketMessenger::SocketMessenger(const entity_name_t& myname, - const std::string& logic_name) - : Messenger{myname}, logic_name{logic_name} -{} + const std::string& logic_name, + uint64_t nonce) + : Messenger{myname}, logic_name{logic_name}, nonce{nonce} +{ + entity_addr_t my_addr; + my_addr.set_type(entity_addr_t::TYPE_DEFAULT); + set_myaddr(my_addr); +} + +void SocketMessenger::set_myaddr(const entity_addr_t& addr) +{ + entity_addr_t my_addr = addr; + my_addr.nonce = nonce; + // TODO: propagate to all the cores of the Messenger + Messenger::set_myaddr(my_addr); +} void SocketMessenger::bind(const entity_addr_t& addr) { @@ -53,9 +66,8 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp) seastar::socket_address paddr) { // allocate the connection entity_addr_t peer_addr; - peer_addr.set_type(entity_addr_t::TYPE_DEFAULT); peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); - SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher); + SocketConnectionRef conn = new SocketConnection(*this, *dispatcher); // don't wait before accepting another conn->start_accept(std::move(socket), peer_addr); }); @@ -76,7 +88,7 @@ SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& pe if (auto found = lookup_conn(peer_addr); found) { return found; } - SocketConnectionRef conn = new SocketConnection(*this, get_myaddr(), *dispatcher); + SocketConnectionRef conn = new SocketConnection(*this, *dispatcher); conn->start_connect(peer_addr, peer_type); return conn; } @@ -99,6 +111,20 @@ seastar::future<> SocketMessenger::shutdown() }); } +void SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) +{ + if (!get_myaddr().is_blank_ip()) { + // already learned or binded + return; + } + + // Only learn IP address if blank. + entity_addr_t addr = get_myaddr(); + addr.u = peer_addr_for_me.u; + addr.set_port(get_myaddr().get_port()); + set_myaddr(addr); +} + void SocketMessenger::set_default_policy(const SocketPolicy& p) { policy_set.set_default(p); diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index f205da861b612..eb8f70ed24817 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -37,13 +37,17 @@ class SocketMessenger final : public Messenger { using Throttle = ceph::thread::Throttle; ceph::net::PolicySet policy_set; const std::string logic_name; + const uint64_t nonce; seastar::future<> accept(seastar::connected_socket socket, seastar::socket_address paddr); public: SocketMessenger(const entity_name_t& myname, - const std::string& logic_name); + const std::string& logic_name, + uint64_t nonce); + + void set_myaddr(const entity_addr_t& addr) override; void bind(const entity_addr_t& addr) override; @@ -55,6 +59,7 @@ class SocketMessenger final : public Messenger { seastar::future<> shutdown() override; public: + void learned_addr(const entity_addr_t &peer_addr_for_me); void set_default_policy(const SocketPolicy& p); void set_policy(entity_type_t peer_type, const SocketPolicy& p); void set_policy_throttler(entity_type_t peer_type, Throttle* throttle); diff --git a/src/test/crimson/test_alien_echo.cc b/src/test/crimson/test_alien_echo.cc index 78ef81d306797..046d48971b41d 100644 --- a/src/test/crimson/test_alien_echo.cc +++ b/src/test/crimson/test_alien_echo.cc @@ -39,7 +39,7 @@ struct DummyAuthAuthorizer : public AuthAuthorizer { struct Server { ceph::thread::Throttle byte_throttler; static constexpr int64_t server_num = 0; - ceph::net::SocketMessenger msgr{entity_name_t::OSD(server_num), "server"}; + ceph::net::SocketMessenger msgr{entity_name_t::OSD(server_num), "server", 0}; struct ServerDispatcher : ceph::net::Dispatcher { unsigned count = 0; seastar::condition_variable on_reply; @@ -76,7 +76,7 @@ struct Server { struct Client { ceph::thread::Throttle byte_throttler; static constexpr int64_t client_num = 1; - ceph::net::SocketMessenger msgr{entity_name_t::OSD(client_num), "client"}; + ceph::net::SocketMessenger msgr{entity_name_t::OSD(client_num), "client", 0}; struct ClientDispatcher : ceph::net::Dispatcher { unsigned count = 0; seastar::condition_variable on_reply; diff --git a/src/test/crimson/test_messenger.cc b/src/test/crimson/test_messenger.cc index a2e19d4b79d0a..e5a582c80b378 100644 --- a/src/test/crimson/test_messenger.cc +++ b/src/test/crimson/test_messenger.cc @@ -22,7 +22,7 @@ static seastar::future<> test_echo(unsigned rounds, entity_addr_t addr; struct { - ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server1"}; + ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server1", 1}; struct ServerDispatcher : ceph::net::Dispatcher { seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, MessageRef m) override { @@ -38,7 +38,7 @@ static seastar::future<> test_echo(unsigned rounds, struct { unsigned rounds; std::bernoulli_distribution keepalive_dist{}; - ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client1"}; + ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client1", 2}; struct ClientDispatcher : ceph::net::Dispatcher { seastar::promise reply; unsigned count = 0u; @@ -81,8 +81,10 @@ static seastar::future<> test_echo(unsigned rounds, return seastar::do_with(test_state{}, [rounds, keepalive_ratio] (test_state& t) { // bind the server + t.addr.set_type(entity_addr_t::TYPE_LEGACY); t.addr.set_family(AF_INET); t.addr.set_port(9010); + t.addr.set_nonce(1); t.server.messenger.bind(t.addr); t.client.rounds = rounds; @@ -127,7 +129,7 @@ static seastar::future<> test_concurrent_dispatch() entity_addr_t addr; struct { - ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server2"}; + ceph::net::SocketMessenger messenger{entity_name_t::OSD(1), "server2", 3}; class ServerDispatcher : public ceph::net::Dispatcher { int count = 0; seastar::promise<> on_second; // satisfied on second dispatch @@ -151,15 +153,17 @@ static seastar::future<> test_concurrent_dispatch() } server; struct { - ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client2"}; + ceph::net::SocketMessenger messenger{entity_name_t::OSD(0), "client2", 4}; ceph::net::Dispatcher dispatcher; } client; }; return seastar::do_with(test_state{}, [] (test_state& t) { // bind the server + t.addr.set_type(entity_addr_t::TYPE_LEGACY); t.addr.set_family(AF_INET); t.addr.set_port(9010); + t.addr.set_nonce(3); t.server.messenger.bind(t.addr); return t.server.messenger.start(&t.server.dispatcher) diff --git a/src/test/crimson/test_monc.cc b/src/test/crimson/test_monc.cc index d90d7905f7311..17775d00cb521 100644 --- a/src/test/crimson/test_monc.cc +++ b/src/test/crimson/test_monc.cc @@ -23,7 +23,7 @@ static seastar::future<> test_monc() conf->cluster = cluster; return conf.parse_config_files(conf_file_list); }).then([] { - return seastar::do_with(ceph::net::SocketMessenger{entity_name_t::OSD(0), "monc"}, + return seastar::do_with(ceph::net::SocketMessenger{entity_name_t::OSD(0), "monc", 0}, [](ceph::net::Messenger& msgr) { auto& conf = ceph::common::local_conf(); if (conf->ms_crc_data) { From a31b716be2494b6494686120cca6115e146135db Mon Sep 17 00:00:00 2001 From: Yingxin Date: Wed, 12 Sep 2018 16:52:04 +0800 Subject: [PATCH 3/7] [WIP] crimson/net: enable connections on all cores This is a PoC implementation towards true sharded seastar-messenger for interface discussion purposes. * Sharded Messenger: provides shared-nothing Messenger for each shard, it's interfaces are symmetric to be called, any modifications will be applied to all shards. * Sharded Dispatcher: allow connections to be dispatched, and related resources (such as Session) to be managed in its own shard. * Sharded Connection: A connection only lives at one dedicated core during its lifecycle. It's sharded by its peer_IP in this PoC, because peer port and nonce are not available when a socket is accepted. While its interfaces are safe to be called from all shards. * Replace `boost::intrusive_ptr` by seastar native smart ptrs for `Connection` and `SocketConnection`, because they need to be destructed from its original core. * Unit test: an example to establish multiple connections on both client and server sides, they runs concurrently and creates sessions that are also following shared-nothing design. Signed-off-by: Yingxin --- src/crimson/CMakeLists.txt | 3 +- src/crimson/net/Connection.h | 18 +- src/crimson/net/Dispatcher.h | 6 + src/crimson/net/Fwd.h | 8 +- src/crimson/net/Messenger.h | 15 +- src/crimson/net/SocketConnection.cc | 68 ++++-- src/crimson/net/SocketConnection.h | 13 +- src/crimson/net/SocketMessenger.cc | 98 +++++++-- src/crimson/net/SocketMessenger.h | 46 +++- src/test/crimson/CMakeLists.txt | 24 +- src/test/crimson/test_messenger_new.cc | 289 +++++++++++++++++++++++++ 11 files changed, 518 insertions(+), 70 deletions(-) create mode 100644 src/test/crimson/test_messenger_new.cc diff --git a/src/crimson/CMakeLists.txt b/src/crimson/CMakeLists.txt index a04ade8f0b5c6..1802ba5f28372 100644 --- a/src/crimson/CMakeLists.txt +++ b/src/crimson/CMakeLists.txt @@ -123,7 +123,8 @@ set(crimson_thread_srcs thread/Throttle.cc) add_library(crimson STATIC ${crimson_auth_srcs} - ${crimson_mon_srcs} + # TODO: fix crimson_mon_client with the new design + # ${crimson_mon_srcs} ${crimson_net_srcs} ${crimson_thread_srcs} ${CMAKE_SOURCE_DIR}/src/common/buffer_seastar.cc) diff --git a/src/crimson/net/Connection.h b/src/crimson/net/Connection.h index 5262a43f778d6..cb1a212639972 100644 --- a/src/crimson/net/Connection.h +++ b/src/crimson/net/Connection.h @@ -15,17 +15,18 @@ #pragma once #include -#include #include +#include #include "Fwd.h" namespace ceph::net { using seq_num_t = uint64_t; +using SharedPtr = seastar::shared_ptr; -class Connection : public boost::intrusive_ref_counter { +class Connection : public seastar::enable_shared_from_this { + SharedPtr priv; protected: entity_addr_t peer_addr; peer_type_t peer_type = -1; @@ -34,12 +35,18 @@ class Connection : public boost::intrusive_ref_counter is_connected() = 0; /// send a message over a connection that has completed its handshake virtual seastar::future<> send(MessageRef msg) = 0; @@ -50,6 +57,9 @@ class Connection : public boost::intrusive_ref_counter close() = 0; + + /// which shard id the connection lives + virtual seastar::shard_id shard_id() const = 0; }; } // namespace ceph::net diff --git a/src/crimson/net/Dispatcher.h b/src/crimson/net/Dispatcher.h index f90429cd12fb2..c0865aa2b8c2d 100644 --- a/src/crimson/net/Dispatcher.h +++ b/src/crimson/net/Dispatcher.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include "Fwd.h" @@ -54,6 +55,11 @@ class Dispatcher { } virtual seastar::future> ms_get_authorizer(peer_type_t, bool force_new); + + // get the locol dispatcher shard if it is accessed by another core + virtual Dispatcher* get_local_shard() { + return this; + } }; } // namespace ceph::net diff --git a/src/crimson/net/Fwd.h b/src/crimson/net/Fwd.h index 5aa04812d6021..331c69d7a3af6 100644 --- a/src/crimson/net/Fwd.h +++ b/src/crimson/net/Fwd.h @@ -14,7 +14,8 @@ #pragma once -#include +#include +#include #include "msg/msg_types.h" #include "msg/Message.h" @@ -27,7 +28,10 @@ namespace ceph::net { using msgr_tag_t = uint8_t; class Connection; -using ConnectionRef = boost::intrusive_ptr; +using ConnectionRef = seastar::shared_ptr; +// NOTE: ConnectionXRef should only be used in seastar world, because +// lw_shared_ptr<> is not safe to be accessed by unpinned alien threads. +using ConnectionXRef = seastar::lw_shared_ptr>; class Dispatcher; diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h index a259e4db2ef23..9a9df12ee0d8f 100644 --- a/src/crimson/net/Messenger.h +++ b/src/crimson/net/Messenger.h @@ -36,20 +36,22 @@ class Messenger { const entity_name_t& get_myname() const { return my_name; } const entity_addr_t& get_myaddr() const { return my_addr; } - virtual void set_myaddr(const entity_addr_t& addr) { + virtual seastar::future<> set_myaddr(const entity_addr_t& addr) { my_addr = addr; + return seastar::now(); } /// bind to the given address - virtual void bind(const entity_addr_t& addr) = 0; + virtual seastar::future<> bind(const entity_addr_t& addr) = 0; /// start the messenger virtual seastar::future<> start(Dispatcher *dispatcher) = 0; /// either return an existing connection to the peer, /// or a new pending connection - virtual ConnectionRef connect(const entity_addr_t& peer_addr, - const entity_type_t& peer_type) = 0; + virtual seastar::future + connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type) = 0; /// stop listenening and wait for all connections to close. safe to destruct /// after this future becomes available @@ -71,6 +73,11 @@ class Messenger { void set_crc_header() { crc_flags |= MSG_CRC_HEADER; } + + // get the local dispatcher shard if it is accessed by another core + virtual Messenger* get_local_shard() { + return this; + } }; } // namespace ceph::net diff --git a/src/crimson/net/SocketConnection.cc b/src/crimson/net/SocketConnection.cc index e4d0275e6a4c5..81af8020be335 100644 --- a/src/crimson/net/SocketConnection.cc +++ b/src/crimson/net/SocketConnection.cc @@ -49,6 +49,7 @@ SocketConnection::SocketConnection(SocketMessenger& messenger, dispatcher(dispatcher), send_ready(h.promise.get_future()) { + ceph_assert(&messenger.container().local() == &messenger); } SocketConnection::~SocketConnection() @@ -64,9 +65,32 @@ SocketConnection::get_messenger() const { return &messenger; } -bool SocketConnection::is_connected() +seastar::future SocketConnection::is_connected() { - return !send_ready.failed(); + return seastar::smp::submit_to(shard_id(), [this] { + return !send_ready.failed(); + }); +} + +seastar::future<> SocketConnection::send(MessageRef msg) +{ + return seastar::smp::submit_to(shard_id(), [this, msg=std::move(msg)] { + return do_send(std::move(msg)); + }); +} + +seastar::future<> SocketConnection::keepalive() +{ + return seastar::smp::submit_to(shard_id(), [this] { + return do_keepalive(); + }); +} + +seastar::future<> SocketConnection::close() +{ + return seastar::smp::submit_to(shard_id(), [this] { + return do_close(); + }); } void SocketConnection::read_tags_until_next_message() @@ -263,7 +287,7 @@ seastar::future<> SocketConnection::write_message(MessageRef msg) }); } -seastar::future<> SocketConnection::send(MessageRef msg) +seastar::future<> SocketConnection::do_send(MessageRef msg) { // chain the message after the last message is sent seastar::shared_future<> f = send_ready.then( @@ -277,7 +301,7 @@ seastar::future<> SocketConnection::send(MessageRef msg) return f.get_future(); } -seastar::future<> SocketConnection::keepalive() +seastar::future<> SocketConnection::do_keepalive() { seastar::shared_future<> f = send_ready.then([this] { k.req.stamp = ceph::coarse_real_clock::to_ceph_timespec( @@ -288,7 +312,7 @@ seastar::future<> SocketConnection::keepalive() return f.get_future(); } -seastar::future<> SocketConnection::close() +seastar::future<> SocketConnection::do_close() { if (state == state_t::closing) { // already closing @@ -297,12 +321,12 @@ seastar::future<> SocketConnection::close() } // unregister_conn() drops a reference, so hold another until completion - auto cleanup = [conn = SocketConnectionRef(this)] {}; + auto cleanup = [conn_ref = shared_from_this()] {}; if (state == state_t::accepting) { - messenger.unaccept_conn(this); + messenger.unaccept_conn(seastar::static_pointer_cast(shared_from_this())); } else if (state >= state_t::connecting && state < state_t::closing) { - messenger.unregister_conn(this); + messenger.unregister_conn(seastar::static_pointer_cast(shared_from_this())); } else { // cannot happen ceph_assert(false); @@ -777,7 +801,7 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr, peer_addr = _peer_addr; peer_type = _peer_type; peer_socket_port = _peer_addr.get_port(); - messenger.register_conn(this); + messenger.register_conn(seastar::static_pointer_cast(shared_from_this())); logger().debug("{} trigger connecting, was {}", *this, static_cast(state)); state = state_t::connecting; seastar::with_gate(pending_dispatch, [this] { @@ -801,8 +825,8 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr, validate_peer_addr(saddr, peer_addr); my_socket_port = caddr.get_port(); - messenger.learned_addr(caddr); - + return messenger.learned_addr(caddr); + }).then([this] { // encode/send client's handshake header bufferlist bl; bl.append(buffer::create_static(banner_size, banner)); @@ -820,7 +844,7 @@ SocketConnection::start_connect(const entity_addr_t& _peer_addr, fut.forward_to(std::move(h.promise)); }).then([this] { // notify the dispatcher and allow them to reject the connection - return dispatcher.ms_handle_connect(this); + return dispatcher.ms_handle_connect(seastar::static_pointer_cast(shared_from_this())); }).then([this] { execute_open(); }).handle_exception([this] (std::exception_ptr eptr) { @@ -842,7 +866,7 @@ SocketConnection::start_accept(seastar::connected_socket&& fd, peer_socket_port = _peer_addr.get_port(); my_socket_port = messenger.get_myaddr().get_port(); socket.emplace(std::move(fd)); - messenger.accept_conn(this); + messenger.accept_conn(seastar::static_pointer_cast(shared_from_this())); logger().debug("{} trigger accepting, was {}", *this, static_cast(state)); state = state_t::accepting; seastar::with_gate(pending_dispatch, [this, _peer_addr] { @@ -875,10 +899,10 @@ SocketConnection::start_accept(seastar::connected_socket&& fd, fut.forward_to(std::move(h.promise)); }).then([this] { // notify the dispatcher and allow them to reject the connection - return dispatcher.ms_handle_accept(this); + return dispatcher.ms_handle_accept(seastar::static_pointer_cast(shared_from_this())); }).then([this] { - messenger.register_conn(this); - messenger.unaccept_conn(this); + messenger.register_conn(seastar::static_pointer_cast(shared_from_this())); + messenger.unaccept_conn(seastar::static_pointer_cast(shared_from_this())); execute_open(); }).handle_exception([this] (std::exception_ptr eptr) { // TODO: handle fault in the accepting state @@ -901,7 +925,7 @@ SocketConnection::execute_open() .then([this] (MessageRef msg) { // start dispatch, ignoring exceptions from the application layer seastar::with_gate(pending_dispatch, [this, msg = std::move(msg)] { - return dispatcher.ms_dispatch(this, std::move(msg)) + return dispatcher.ms_dispatch(seastar::static_pointer_cast(shared_from_this()), std::move(msg)) .handle_exception([] (std::exception_ptr eptr) {}); }); // return immediately to start on the next message @@ -910,9 +934,9 @@ SocketConnection::execute_open() }).handle_exception_type([this] (const std::system_error& e) { if (e.code() == error::connection_aborted || e.code() == error::connection_reset) { - return dispatcher.ms_handle_reset(this); + return dispatcher.ms_handle_reset(seastar::static_pointer_cast(shared_from_this())); } else if (e.code() == error::read_eof) { - return dispatcher.ms_handle_remote_reset(this); + return dispatcher.ms_handle_remote_reset(seastar::static_pointer_cast(shared_from_this())); } else { throw e; } @@ -926,7 +950,7 @@ SocketConnection::execute_open() seastar::future<> SocketConnection::fault() { if (policy.lossy) { - messenger.unregister_conn(this); + messenger.unregister_conn(seastar::static_pointer_cast(shared_from_this())); } if (h.backoff.count()) { h.backoff += h.backoff; @@ -939,6 +963,10 @@ seastar::future<> SocketConnection::fault() return seastar::sleep(h.backoff); } +seastar::shard_id SocketConnection::shard_id() const { + return messenger.shard_id(); +} + namespace ceph::net { ostream& operator<<(ostream& out, const SocketConnection& conn) { diff --git a/src/crimson/net/SocketConnection.h b/src/crimson/net/SocketConnection.h index 3503d8d24dd3c..30d7db6a6b9cc 100644 --- a/src/crimson/net/SocketConnection.h +++ b/src/crimson/net/SocketConnection.h @@ -17,6 +17,7 @@ #include #include #include +#include #include "msg/Policy.h" #include "Connection.h" @@ -32,7 +33,7 @@ using stop_t = seastar::stop_iteration; class SocketMessenger; class SocketConnection; -using SocketConnectionRef = boost::intrusive_ptr; +using SocketConnectionRef = seastar::shared_ptr; class SocketConnection : public Connection { SocketMessenger& messenger; @@ -162,18 +163,24 @@ class SocketConnection : public Connection { void execute_open(); + seastar::future<> do_send(MessageRef msg); + seastar::future<> do_keepalive(); + seastar::future<> do_close(); + public: SocketConnection(SocketMessenger& messenger, Dispatcher& dispatcher); ~SocketConnection(); + // Connection interfaces shoud be safe to be called from any core, by seastar + // native threads. Messenger* get_messenger() const override; int get_peer_type() const override { return peer_type; } - bool is_connected() override; + seastar::future is_connected() override; seastar::future<> send(MessageRef msg) override; @@ -181,6 +188,8 @@ class SocketConnection : public Connection { seastar::future<> close() override; + seastar::shard_id shard_id() const override; + public: /// start a handshake from the client's perspective, /// only call when SocketConnection first construct diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index b51129c661f0d..3725b258fbbc6 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -15,6 +15,7 @@ #include "SocketMessenger.h" #include +#include #include "auth/Auth.h" #include "Errors.h" @@ -25,28 +26,68 @@ using namespace ceph::net; SocketMessenger::SocketMessenger(const entity_name_t& myname, const std::string& logic_name, uint64_t nonce) - : Messenger{myname}, logic_name{logic_name}, nonce{nonce} + : Messenger{myname}, + sid{seastar::engine().cpu_id()}, + logic_name{logic_name}, + nonce{nonce} { entity_addr_t my_addr; my_addr.set_type(entity_addr_t::TYPE_DEFAULT); - set_myaddr(my_addr); + my_addr.nonce = nonce; + Messenger::set_myaddr(my_addr); } -void SocketMessenger::set_myaddr(const entity_addr_t& addr) +seastar::future<> SocketMessenger::set_myaddr(const entity_addr_t& addr) { entity_addr_t my_addr = addr; my_addr.nonce = nonce; - // TODO: propagate to all the cores of the Messenger - Messenger::set_myaddr(my_addr); + return container().invoke_on_all([my_addr](auto& msgr) { + return msgr.Messenger::set_myaddr(my_addr); + }); } -void SocketMessenger::bind(const entity_addr_t& addr) +seastar::future<> SocketMessenger::bind(const entity_addr_t& addr) { - if (addr.get_family() != AF_INET) { - throw std::system_error(EAFNOSUPPORT, std::generic_category()); - } + return container().invoke_on_all([addr](auto& msgr) { + msgr.do_bind(addr); + }); +} + +seastar::future<> SocketMessenger::start(Dispatcher *disp) { + return container().invoke_on_all([disp](auto& msgr) { + return msgr.do_start(disp->get_local_shard()); + }); +} + +seastar::future +SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type) +{ + auto shard = locate_shard(peer_addr); + return container().invoke_on(shard, [peer_addr, peer_type](auto& msgr) { + return msgr.do_connect(peer_addr, peer_type); + }).then([](seastar::foreign_ptr&& conn) { + return seastar::make_lw_shared>(std::move(conn)); + }); +} + +seastar::future<> SocketMessenger::shutdown() +{ + return container().invoke_on_all([](auto& msgr) { + return msgr.do_shutdown(); + }).finally([this] { + return container().invoke_on_all([](auto& msgr) { + msgr.shutdown_promise.set_value(); + }); + }); +} + +void SocketMessenger::do_bind(const entity_addr_t& addr) +{ + ceph_assert(addr.get_family() == AF_INET); - set_myaddr(addr); + entity_addr_t my_addr = addr; + my_addr.nonce = nonce; + Messenger::set_myaddr(my_addr); seastar::socket_address address(addr.in4_addr()); seastar::listen_options lo; @@ -54,7 +95,7 @@ void SocketMessenger::bind(const entity_addr_t& addr) listener = seastar::listen(address, lo); } -seastar::future<> SocketMessenger::start(Dispatcher *disp) +seastar::future<> SocketMessenger::do_start(Dispatcher *disp) { dispatcher = disp; @@ -67,9 +108,12 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp) // allocate the connection entity_addr_t peer_addr; peer_addr.set_sockaddr(&paddr.as_posix_sockaddr()); - SocketConnectionRef conn = new SocketConnection(*this, *dispatcher); + auto shard = locate_shard(peer_addr); // don't wait before accepting another - conn->start_accept(std::move(socket), peer_addr); + container().invoke_on(shard, [socket = std::move(socket), peer_addr, this](auto& msgr) mutable { + SocketConnectionRef conn = seastar::make_shared(msgr, *msgr.dispatcher); + conn->start_accept(std::move(socket), peer_addr); + }); }); }).handle_exception_type([this] (const std::system_error& e) { // stop gracefully on connection_aborted @@ -82,18 +126,18 @@ seastar::future<> SocketMessenger::start(Dispatcher *disp) return seastar::now(); } -ceph::net::ConnectionRef -SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type) +seastar::foreign_ptr +SocketMessenger::do_connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type) { if (auto found = lookup_conn(peer_addr); found) { - return found; + return seastar::make_foreign(found->shared_from_this()); } - SocketConnectionRef conn = new SocketConnection(*this, *dispatcher); + SocketConnectionRef conn = seastar::make_shared(*this, *dispatcher); conn->start_connect(peer_addr, peer_type); - return conn; + return seastar::make_foreign(conn->shared_from_this()); } -seastar::future<> SocketMessenger::shutdown() +seastar::future<> SocketMessenger::do_shutdown() { if (listener) { listener->abort_accept(); @@ -111,18 +155,18 @@ seastar::future<> SocketMessenger::shutdown() }); } -void SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) +seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) { if (!get_myaddr().is_blank_ip()) { // already learned or binded - return; + return seastar::now(); } // Only learn IP address if blank. entity_addr_t addr = get_myaddr(); addr.u = peer_addr_for_me.u; addr.set_port(get_myaddr().get_port()); - set_myaddr(addr); + return set_myaddr(addr); } void SocketMessenger::set_default_policy(const SocketPolicy& p) @@ -143,6 +187,16 @@ void SocketMessenger::set_policy_throttler(entity_type_t peer_type, policy_set.set_throttlers(peer_type, throttle, nullptr); } +seastar::shard_id SocketMessenger::locate_shard(const entity_addr_t& addr) +{ + ceph_assert(addr.get_family() == AF_INET); + std::size_t seed = 0; + boost::hash_combine(seed, addr.u.sin.sin_addr.s_addr); + //boost::hash_combine(seed, addr.u.sin.sin_port); + //boost::hash_combine(seed, addr.nonce); + return seed % seastar::smp::count; +} + ceph::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr) { if (auto found = connections.find(addr); diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index eb8f70ed24817..4bc29b5304889 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -19,6 +19,7 @@ #include #include #include +#include #include "msg/Policy.h" #include "Messenger.h" @@ -29,7 +30,10 @@ namespace ceph::net { using SocketPolicy = ceph::net::Policy; -class SocketMessenger final : public Messenger { +class SocketMessenger final : public Messenger, public seastar::peering_sharded_service { + const seastar::shard_id sid; + seastar::promise<> shutdown_promise; + std::optional listener; Dispatcher *dispatcher = nullptr; std::map connections; @@ -42,24 +46,41 @@ class SocketMessenger final : public Messenger { seastar::future<> accept(seastar::connected_socket socket, seastar::socket_address paddr); + void do_bind(const entity_addr_t& addr); + seastar::future<> do_start(Dispatcher *disp); + seastar::foreign_ptr do_connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type); + seastar::future<> do_shutdown(); + // conn sharding options: + // 1. Simplest: sharded by ip only + // 2. Balanced: sharded by ip + port + nonce, + // but, need to move SocketConnection between cores. + seastar::shard_id locate_shard(const entity_addr_t& addr); + public: SocketMessenger(const entity_name_t& myname, const std::string& logic_name, uint64_t nonce); - void set_myaddr(const entity_addr_t& addr) override; + seastar::future<> set_myaddr(const entity_addr_t& addr) override; - void bind(const entity_addr_t& addr) override; + // Messenger interfaces are assumed to be called from its own shard, but its + // behavior should be symmetric when called from any shard. + seastar::future<> bind(const entity_addr_t& addr) override; seastar::future<> start(Dispatcher *dispatcher) override; - ConnectionRef connect(const entity_addr_t& peer_addr, - const entity_type_t& peer_type) override; + seastar::future connect(const entity_addr_t& peer_addr, + const entity_type_t& peer_type) override; seastar::future<> shutdown() override; + Messenger* get_local_shard() override { + return &container().local(); + } + public: - void learned_addr(const entity_addr_t &peer_addr_for_me); + seastar::future<> learned_addr(const entity_addr_t &peer_addr_for_me); void set_default_policy(const SocketPolicy& p); void set_policy(entity_type_t peer_type, const SocketPolicy& p); void set_policy_throttler(entity_type_t peer_type, Throttle* throttle); @@ -70,6 +91,19 @@ class SocketMessenger final : public Messenger { void register_conn(SocketConnectionRef); void unregister_conn(SocketConnectionRef); + // required by sharded<> + seastar::future<> stop() { + return seastar::make_ready_future<>(); + } + // can only wait once + seastar::future<> wait() { + return shutdown_promise.get_future(); + } + + seastar::shard_id shard_id() const { + return sid; + } + friend ostream& operator<<(ostream& out, const SocketMessenger& msgr) { return out << msgr.get_myname() << "(" << msgr.logic_name diff --git a/src/test/crimson/CMakeLists.txt b/src/test/crimson/CMakeLists.txt index 1e12b3e94da95..3e1d00e3a7c37 100644 --- a/src/test/crimson/CMakeLists.txt +++ b/src/test/crimson/CMakeLists.txt @@ -8,13 +8,18 @@ add_executable(unittest_seastar_denc add_ceph_unittest(unittest_seastar_denc) target_link_libraries(unittest_seastar_denc crimson GTest::Main) -add_executable(unittest_seastar_messenger test_messenger.cc) -add_ceph_unittest(unittest_seastar_messenger) -target_link_libraries(unittest_seastar_messenger ceph-common crimson) +# TODO: replace unittest_seastar_messenger with unittest_seastar_messenger_new +#add_executable(unittest_seastar_messenger test_messenger.cc) +#add_ceph_unittest(unittest_seastar_messenger) +#target_link_libraries(unittest_seastar_messenger ceph-common crimson) -add_executable(unittest_seastar_echo - test_alien_echo.cc) -target_link_libraries(unittest_seastar_echo ceph-common global crimson) +add_executable(unittest_seastar_messenger_new test_messenger_new.cc) +target_link_libraries(unittest_seastar_messenger_new ceph-common crimson) + +# TODO: fix unittest_seastar_echo with the new design +#add_executable(unittest_seastar_echo +# test_alien_echo.cc) +#target_link_libraries(unittest_seastar_echo ceph-common global crimson) add_executable(unittest_seastar_thread_pool test_thread_pool.cc) @@ -25,9 +30,10 @@ add_executable(unittest_seastar_config test_config.cc) target_link_libraries(unittest_seastar_config crimson) -add_executable(unittest_seastar_monc - test_monc.cc) -target_link_libraries(unittest_seastar_monc crimson) +# TODO: fix unittest_seastar_monc with the new design +#add_executable(unittest_seastar_monc +# test_monc.cc) +#target_link_libraries(unittest_seastar_monc crimson) add_executable(unittest_seastar_perfcounters test_perfcounters.cc) diff --git a/src/test/crimson/test_messenger_new.cc b/src/test/crimson/test_messenger_new.cc new file mode 100644 index 0000000000000..79bfc5791cc17 --- /dev/null +++ b/src/test/crimson/test_messenger_new.cc @@ -0,0 +1,289 @@ +#include "messages/MPing.h" +#include "crimson/common/log.h" +#include "crimson/net/Connection.h" +#include "crimson/net/Dispatcher.h" +#include "crimson/net/SocketMessenger.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace bpo = boost::program_options; + +namespace { + +seastar::logger& logger() { + return ceph::get_logger(ceph_subsys_ms); +} + +template +seastar::future create_sharded(Args... args) { + auto sharded_obj = seastar::make_lw_shared>(); + return sharded_obj->start(args...).then([sharded_obj]() { + auto& ret = sharded_obj->local(); + seastar::engine().at_exit([sharded_obj]() { + return sharded_obj->stop().finally([sharded_obj] {}); + }); + return &ret; + }); +} + +std::random_device rd; +std::default_random_engine rng{rd()}; +bool verbose = false; + +seastar::future<> test_echo(unsigned rounds, + double keepalive_ratio) +{ + struct test_state { + struct Server final + : public ceph::net::Dispatcher, + public seastar::peering_sharded_service { + ceph::net::Messenger *msgr = nullptr; + + Dispatcher* get_local_shard() override { + return &(container().local()); + } + seastar::future<> stop() { + return seastar::make_ready_future<>(); + } + seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, + MessageRef m) override { + if (verbose) { + logger().info("server got {}", *m); + } + // reply with a pong + return c->send(MessageRef{new MPing(), false}); + } + + seastar::future<> init(const entity_name_t& name, + const entity_addr_t& addr, + const std::string& lname, + const uint64_t nonce) { + return create_sharded(name, lname, nonce) + .then([this, addr](auto messenger) { + return container().invoke_on_all([messenger](auto& server) { + server.msgr = messenger->get_local_shard(); + }).then([messenger, addr] { + return messenger->bind(addr); + }).then([this, messenger] { + return messenger->start(this); + }); + }); + } + seastar::future<> shutdown() { + ceph_assert(msgr); + return msgr->shutdown(); + } + }; + + struct Client final + : public ceph::net::Dispatcher, + public seastar::peering_sharded_service { + + struct PingSession : public seastar::enable_shared_from_this { + unsigned count = 0u; + }; + + unsigned rounds; + std::bernoulli_distribution keepalive_dist; + ceph::net::Messenger *msgr = nullptr; + std::map> pending_conns; + + Client(unsigned rounds, double keepalive_ratio) + : rounds(rounds), + keepalive_dist(std::bernoulli_distribution{keepalive_ratio}) {} + Dispatcher* get_local_shard() override { + return &(container().local()); + } + seastar::future<> stop() { + return seastar::now(); + } + seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override { + logger().info("Conn[{}] connected to {}", &*conn, conn->get_peer_addr()); + auto session = seastar::make_shared(); + conn->set_priv(session); + return container().invoke_on_all([conn = conn.get()](auto& client) { + auto [i, added] = client.pending_conns.emplace(conn, seastar::promise<>()); + std::ignore = i; + ceph_assert(added); + }); + } + seastar::future<> ms_dispatch(ceph::net::ConnectionRef c, + MessageRef m) override { + // NOTE: we have to use reinterpret_cast because + // enable_shared_from_this privately inherits shared_ptr_count_base. + auto session = reinterpret_cast(c->get_priv().get()); + ++(session->count); + if (verbose) { + logger().info("client ms_dispatch {}", session->count); + } + + if (session->count == rounds) { + logger().info("Conn[{}] finished with {} pingpongs", c.get(), session->count); + return container().invoke_on_all([conn = c.get()](auto &client) { + auto found = client.pending_conns.find(conn); + ceph_assert(found != client.pending_conns.end()); + found->second.set_value(); + }); + } else { + return seastar::now(); + } + } + + seastar::future<> init(const entity_name_t& name, + const std::string& lname, + const uint64_t nonce) { + return create_sharded(name, lname, nonce) + .then([this](auto messenger) { + return container().invoke_on_all([messenger](auto& client) { + client.msgr = messenger->get_local_shard(); + }).then([this, messenger] { + return messenger->start(this); + }); + }); + } + + seastar::future<> shutdown() { + ceph_assert(msgr); + return msgr->shutdown(); + } + + seastar::future<> dispatch_pingpong(const entity_addr_t& peer_addr, bool foreign_dispatch=true) { + return msgr->connect(peer_addr, entity_name_t::TYPE_OSD) + .then([this, foreign_dispatch](auto conn) { + if (foreign_dispatch) { + return do_dispatch_pingpong(&**conn) + .finally([this, conn] {}); + } else { + // NOTE: this could be faster if we don't switch cores in do_dispatch_pingpong(). + return container().invoke_on(conn->get()->shard_id(), [conn = &**conn](auto &client) { + return client.do_dispatch_pingpong(conn); + }).finally([this, conn] {}); + } + }); + } + + private: + seastar::future<> do_dispatch_pingpong(ceph::net::Connection* conn) { + return seastar::do_with(unsigned(0), [this, conn](auto &count) { + return seastar::do_until( + [this, &count] { return count == rounds; }, + [this, conn, &count] { + return seastar::repeat([this, conn, &count] { + if (keepalive_dist(rng)) { + return conn->keepalive() + .then([] { + return seastar::make_ready_future( + seastar::stop_iteration::no); + }); + } else { + count += 1; + return conn->send(MessageRef{new MPing(), false}) + .then([] { + return seastar::make_ready_future( + seastar::stop_iteration::yes); + }); + } + }); + }).then([this, conn] { + auto found = pending_conns.find(conn); + ceph_assert(found != pending_conns.end()); + return found->second.get_future(); + }); + }); + } + }; + }; + + typedef std::tuple, + seastar::future, + seastar::future, + seastar::future> result_tuple; + return seastar::when_all( + create_sharded(), + create_sharded(), + create_sharded(rounds, keepalive_ratio), + create_sharded(rounds, keepalive_ratio)) + .then([rounds, keepalive_ratio](result_tuple t) { + auto server1 = std::get<0>(t).get0(); + auto server2 = std::get<1>(t).get0(); + auto client1 = std::get<2>(t).get0(); + auto client2 = std::get<3>(t).get0(); + // start servers and clients + entity_addr_t addr1; + addr1.set_type(entity_addr_t::TYPE_LEGACY); + addr1.set_family(AF_INET); + addr1.set_port(9010); + entity_addr_t addr2; + addr2.set_type(entity_addr_t::TYPE_LEGACY); + addr2.set_family(AF_INET); + addr2.set_port(9011); + return seastar::when_all_succeed( + server1->init(entity_name_t::OSD(0), addr1, "server1", 1), + server2->init(entity_name_t::OSD(1), addr2, "server2", 2), + client1->init(entity_name_t::OSD(2), "client1", 3), + client2->init(entity_name_t::OSD(3), "client2", 4)) + // dispatch pingpoing + .then([client1, client2] { + entity_addr_t peer_addr1; + peer_addr1.set_type(entity_addr_t::TYPE_LEGACY); + peer_addr1.parse("127.0.0.1:9010/1", nullptr); + entity_addr_t peer_addr2; + peer_addr2.set_type(entity_addr_t::TYPE_LEGACY); + peer_addr2.parse("127.0.0.1:9011/2", nullptr); + return seastar::when_all_succeed( + client1->dispatch_pingpong(peer_addr1, true), + client1->dispatch_pingpong(peer_addr2, false), + client2->dispatch_pingpong(peer_addr1, false), + client2->dispatch_pingpong(peer_addr2, true)); + // shutdown + }).then([client1] { + logger().info("client1 shutdown..."); + return client1->shutdown(); + }).then([client2] { + logger().info("client2 shutdown..."); + return client2->shutdown(); + }).then([server1] { + logger().info("server1 shutdown..."); + return server1->shutdown(); + }).then([server2] { + logger().info("server2 shutdown..."); + return server2->shutdown(); + }); + }); +} + +} + +int main(int argc, char** argv) +{ + seastar::app_template app; + app.add_options() + ("verbose,v", bpo::value()->default_value(false), + "chatty if true") + ("rounds", bpo::value()->default_value(512), + "number of pingpong rounds") + ("keepalive-ratio", bpo::value()->default_value(0.1), + "ratio of keepalive in ping messages"); + return app.run(argc, argv, [&app] { + auto&& config = app.configuration(); + verbose = config["verbose"].as(); + auto rounds = config["rounds"].as(); + auto keepalive_ratio = config["keepalive-ratio"].as(); + return test_echo(rounds, keepalive_ratio) + .then([] { + // return test_concurrent_dispatch(); + //}).then([] { + std::cout << "All tests succeeded" << std::endl; + }).handle_exception([] (auto eptr) { + std::cout << "Test failure" << std::endl; + return seastar::make_exception_future<>(eptr); + }); + }); +} From 400ba8b99da0bfcb1304a89184ab60a2beda90bb Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Fri, 28 Dec 2018 21:32:53 +0100 Subject: [PATCH 4/7] crimson: MonClient can live in sharded environment. Signed-off-by: Radoslaw Zarzynski --- src/crimson/CMakeLists.txt | 3 +- src/crimson/mon/MonClient.cc | 56 ++++++++++++++++++++------------- src/test/crimson/CMakeLists.txt | 7 ++--- src/test/crimson/test_monc.cc | 23 ++++++++++++-- 4 files changed, 59 insertions(+), 30 deletions(-) diff --git a/src/crimson/CMakeLists.txt b/src/crimson/CMakeLists.txt index 1802ba5f28372..a04ade8f0b5c6 100644 --- a/src/crimson/CMakeLists.txt +++ b/src/crimson/CMakeLists.txt @@ -123,8 +123,7 @@ set(crimson_thread_srcs thread/Throttle.cc) add_library(crimson STATIC ${crimson_auth_srcs} - # TODO: fix crimson_mon_client with the new design - # ${crimson_mon_srcs} + ${crimson_mon_srcs} ${crimson_net_srcs} ${crimson_thread_srcs} ${CMAKE_SOURCE_DIR}/src/common/buffer_seastar.cc) diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc index 058843c62a348..e48151d4469f5 100644 --- a/src/crimson/mon/MonClient.cc +++ b/src/crimson/mon/MonClient.cc @@ -16,6 +16,7 @@ #include "crimson/common/log.h" #include "crimson/net/Connection.h" #include "crimson/net/Errors.h" +#include "crimson/net/Fwd.h" #include "crimson/net/Messenger.h" #include "messages/MAuth.h" @@ -47,7 +48,7 @@ namespace ceph::mon { class Connection { public: - Connection(ceph::net::ConnectionRef conn, + Connection(ceph::net::ConnectionXRef conn, KeyRing* keyring); seastar::future<> handle_auth_reply(Ref m); seastar::future<> authenticate(epoch_t epoch, @@ -58,7 +59,8 @@ class Connection { bool is_my_peer(const entity_addr_t& addr) const; seastar::future<> renew_tickets(); - ceph::net::ConnectionRef get_conn(); + seastar::foreign_ptr& get_conn(); + const seastar::foreign_ptr& get_conn() const; private: seastar::future<> setup_session(epoch_t epoch, @@ -72,15 +74,15 @@ class Connection { private: bool closed = false; seastar::promise> reply; - ceph::net::ConnectionRef conn; + ceph::net::ConnectionXRef conn; std::unique_ptr auth; RotatingKeyRing rotating_keyring; uint64_t global_id; }; -Connection::Connection(ceph::net::ConnectionRef conn, +Connection::Connection(ceph::net::ConnectionXRef conn, KeyRing* keyring) - : conn{conn}, + : conn{std::move(conn)}, rotating_keyring{nullptr, CEPH_ENTITY_TYPE_OSD, keyring} {} @@ -143,7 +145,7 @@ Connection::setup_session(epoch_t epoch, encode(auth_methods.get_supported_set(), m->auth_payload); encode(name, m->auth_payload); encode(global_id, m->auth_payload); - return conn->send(m); + return get_conn()->send(m); } seastar::future Connection::do_auth() @@ -158,13 +160,13 @@ seastar::future Connection::do_auth() ceph::net::error::negotiation_failure)); } logger().info("sending {}", *m); - return conn->send(m).then([this] { + return get_conn()->send(m).then([this] { logger().info("waiting"); return reply.get_future(); }).then([this] (Ref m) { logger().info("mon {} => {} returns {}: {}", - conn->get_messenger()->get_myaddr(), - conn->get_peer_addr(), *m, m->result); + get_conn()->get_messenger()->get_myaddr(), + get_conn()->get_peer_addr(), *m, m->result); reply = decltype(reply){}; auto p = m->result_bl.cbegin(); auto ret = auth->handle_response(m->result, p); @@ -182,7 +184,7 @@ Connection::authenticate(epoch_t epoch, const AuthMethodList& auth_methods, uint32_t want_keys) { - return conn->keepalive().then([epoch, auth_methods, name, this] { + return get_conn()->keepalive().then([epoch, auth_methods, name, this] { return setup_session(epoch, auth_methods, name); }).then([this] { return reply.get_future(); @@ -213,8 +215,8 @@ Connection::authenticate(epoch_t epoch, seastar::future<> Connection::close() { - if (conn && !std::exchange(closed, true)) { - return conn->close(); + if (get_conn() && !std::exchange(closed, true)) { + return get_conn()->close(); } else { return seastar::now(); } @@ -222,11 +224,16 @@ seastar::future<> Connection::close() bool Connection::is_my_peer(const entity_addr_t& addr) const { - return conn->get_peer_addr() == addr; + return get_conn()->get_peer_addr() == addr; } -ceph::net::ConnectionRef Connection::get_conn() { - return conn; +seastar::foreign_ptr& Connection::get_conn() { + return *conn; +} + +const seastar::foreign_ptr& +Connection::get_conn() const { + return *conn; } namespace { AuthMethodList create_auth_methods(uint32_t entity_type) @@ -498,13 +505,18 @@ seastar::future<> Client::reopen_session(int rank) return seastar::parallel_for_each(mons, [this](auto rank) { auto peer = monmap.get_addr(rank); logger().info("connecting to mon.{}", rank); - auto conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MON); - auto& mc = pending_conns.emplace_back(conn, &keyring); - return mc.authenticate( - monmap.get_epoch(), entity_name, - auth_methods, want_keys).handle_exception([conn](auto ep) { - return conn->close().then([ep = std::move(ep)] { - std::rethrow_exception(ep); + auto&& conn_fut = msgr.connect(peer, CEPH_ENTITY_TYPE_MON); + + return conn_fut.then([peer, this](ceph::net::ConnectionXRef conn) { + auto& mc = pending_conns.emplace_back(conn, &keyring); + return mc.authenticate( + monmap.get_epoch(), + entity_name, + auth_methods, + want_keys).handle_exception([conn](auto ep) { + return (*conn)->close().then([ep = std::move(ep)] { + std::rethrow_exception(ep); + }); }); }).then([peer, this] { if (!is_hunting()) { diff --git a/src/test/crimson/CMakeLists.txt b/src/test/crimson/CMakeLists.txt index 3e1d00e3a7c37..d244dde05c9bd 100644 --- a/src/test/crimson/CMakeLists.txt +++ b/src/test/crimson/CMakeLists.txt @@ -30,10 +30,9 @@ add_executable(unittest_seastar_config test_config.cc) target_link_libraries(unittest_seastar_config crimson) -# TODO: fix unittest_seastar_monc with the new design -#add_executable(unittest_seastar_monc -# test_monc.cc) -#target_link_libraries(unittest_seastar_monc crimson) +add_executable(unittest_seastar_monc + test_monc.cc) +target_link_libraries(unittest_seastar_monc crimson) add_executable(unittest_seastar_perfcounters test_perfcounters.cc) diff --git a/src/test/crimson/test_monc.cc b/src/test/crimson/test_monc.cc index 17775d00cb521..05319cf4aba56 100644 --- a/src/test/crimson/test_monc.cc +++ b/src/test/crimson/test_monc.cc @@ -8,6 +8,18 @@ using Config = ceph::common::ConfigProxy; using MonClient = ceph::mon::Client; +template +seastar::future> create_sharded(Args... args) { + auto sharded_obj = seastar::make_lw_shared>(); + return sharded_obj->start(args...).then([sharded_obj]() { + auto& ret = sharded_obj->local(); + seastar::engine().at_exit([sharded_obj]() { + return sharded_obj->stop().finally([sharded_obj] {}); + }); + return std::ref(ret); + }); +} + static seastar::future<> test_monc() { return ceph::common::sharded_conf().start().then([] { @@ -23,8 +35,15 @@ static seastar::future<> test_monc() conf->cluster = cluster; return conf.parse_config_files(conf_file_list); }).then([] { - return seastar::do_with(ceph::net::SocketMessenger{entity_name_t::OSD(0), "monc", 0}, - [](ceph::net::Messenger& msgr) { + ceph::common::sharded_perf_coll().start().then([] { + seastar::engine().at_exit([] { + return ceph::common::sharded_perf_coll().stop(); + }); + }); + }).then([] { + auto&& msgr_fut = \ + create_sharded(entity_name_t::OSD(0), "monc", 0); + return msgr_fut.then([](ceph::net::Messenger& msgr) { auto& conf = ceph::common::local_conf(); if (conf->ms_crc_data) { msgr.set_crc_data(); From 86e4211efd49107b07d7d1cfca1235a55adcad56 Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Mon, 31 Dec 2018 18:46:47 +0100 Subject: [PATCH 5/7] crimson: bring set_crc_{data,header} to the sharded SocketMessenger. Signed-off-by: Radoslaw Zarzynski --- src/crimson/net/Messenger.h | 6 ++++-- src/crimson/net/SocketMessenger.cc | 14 ++++++++++++++ src/crimson/net/SocketMessenger.h | 3 +++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/src/crimson/net/Messenger.h b/src/crimson/net/Messenger.h index 9a9df12ee0d8f..fb8807d5c3bd6 100644 --- a/src/crimson/net/Messenger.h +++ b/src/crimson/net/Messenger.h @@ -67,11 +67,13 @@ class Messenger { uint32_t get_crc_flags() const { return crc_flags; } - void set_crc_data() { + virtual seastar::future<> set_crc_data() { crc_flags |= MSG_CRC_DATA; + return seastar::now(); } - void set_crc_header() { + virtual seastar::future<> set_crc_header() { crc_flags |= MSG_CRC_HEADER; + return seastar::now(); } // get the local dispatcher shard if it is accessed by another core diff --git a/src/crimson/net/SocketMessenger.cc b/src/crimson/net/SocketMessenger.cc index 3725b258fbbc6..b8ec87f004656 100644 --- a/src/crimson/net/SocketMessenger.cc +++ b/src/crimson/net/SocketMessenger.cc @@ -81,6 +81,20 @@ seastar::future<> SocketMessenger::shutdown() }); } +seastar::future<> SocketMessenger::set_crc_data() +{ + return container().invoke_on_all([](auto& msgr) { + return msgr.Messenger::set_crc_data(); + }); +} + +seastar::future<> SocketMessenger::set_crc_header() +{ + return container().invoke_on_all([](auto& msgr) { + return msgr.Messenger::set_crc_header(); + }); +} + void SocketMessenger::do_bind(const entity_addr_t& addr) { ceph_assert(addr.get_family() == AF_INET); diff --git a/src/crimson/net/SocketMessenger.h b/src/crimson/net/SocketMessenger.h index 4bc29b5304889..e6b5220f1ed84 100644 --- a/src/crimson/net/SocketMessenger.h +++ b/src/crimson/net/SocketMessenger.h @@ -75,6 +75,9 @@ class SocketMessenger final : public Messenger, public seastar::peering_sharded_ seastar::future<> shutdown() override; + seastar::future<> set_crc_data() override; + seastar::future<> set_crc_header() override; + Messenger* get_local_shard() override { return &container().local(); } From 4939af1054572d657c373becba5a502a418b14cf Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Tue, 1 Jan 2019 23:48:23 +0100 Subject: [PATCH 6/7] crimson: bring the ForeignDispatcher concept. Signed-off-by: Radoslaw Zarzynski --- src/crimson/net/Dispatcher.h | 134 +++++++++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) diff --git a/src/crimson/net/Dispatcher.h b/src/crimson/net/Dispatcher.h index c0865aa2b8c2d..cd38f15fc22f8 100644 --- a/src/crimson/net/Dispatcher.h +++ b/src/crimson/net/Dispatcher.h @@ -62,4 +62,138 @@ class Dispatcher { } }; + +// ForeignDispatcher -- a \c Dispatcher implementation that relays calls +// to a component residing on single, preselected engine. This component +// provides Dispatcher-like interface. The difference is about making +// inter-engine traffic more safe. + +// The intended use scenario is connecting components requiring sharding +// with the solitary ones. +// +// \param DecorateeT the type of foreign quasi-Dispatcher implementation. +// CRTP allows to avoid the overhead of dynamic polymorphism. +template +class ForeignDispatcher : public Dispatcher { + const seastar::shard_id sid; + + DecorateeT& get_decoratee() { + return static_cast(*this); + } + + public: + ForeignDispatcher(const seastar::shard_id sid) + : sid(sid) { + } + + seastar::future<> ms_dispatch(ConnectionRef conn, MessageRef msg) final { + // NOTE: this still might be troublesome when modifications are performed + // remotely. To be honest, I don't know a language measure to mitigate + // the problem entirely. Even passing e.g. pointer-to-const would be not + // enough due to e.g. a `mutable` member. + return seastar::smp::submit_to(sid, + [ this, + fconn = seastar::make_foreign(conn), + fmsg = seastar::make_foreign(msg) + ]() mutable { + // we're using non-virtual call here. For more details please refer to + // https://en.cppreference.com/w/cpp/language/virtual + return get_decoratee().DecorateeT::fms_dispatch(std::move(fconn), + std::move(fmsg)); + }); + } + + seastar::future<> ms_handle_accept(ConnectionRef conn) final { + return seastar::smp::submit_to(sid, + [ this, fconn = seastar::make_foreign(conn) ]() mutable { + return get_decoratee().DecorateeT::fms_handle_accept(std::move(fconn)); + }); + } + + seastar::future<> ms_handle_connect(ConnectionRef conn) final { + return seastar::smp::submit_to(sid, + [ this, fconn = seastar::make_foreign(conn) ]() mutable { + return get_decoratee().DecorateeT::fms_handle_connect(std::move(fconn)); + }); + } + + seastar::future<> ms_handle_reset(ConnectionRef conn) final { + return seastar::smp::submit_to(sid, + [ this, fconn = seastar::make_foreign(conn) ]() mutable { + return get_decoratee().DecorateeT::fms_handle_reset(std::move(fconn)); + }); + } + + seastar::future<> ms_handle_remote_reset(ConnectionRef conn) final { + return seastar::smp::submit_to(sid, + [ this, fconn = seastar::make_foreign(conn) ]() mutable { + return get_decoratee().DecorateeT::fms_handle_remote_reset(std::move(fconn)); + }); + } + + seastar::future + ms_verify_authorizer(peer_type_t peer, + auth_proto_t proto, + ceph::bufferlist& bl) final { + return seastar::smp::submit_to(sid, + [ this, &bl, peer = std::move(peer), proto = std::move(proto) ] { + return get_decoratee().DecorateeT::fms_verify_authorizer( + std::move(peer), std::move(proto), bl); + }); + } + + seastar::future> + ms_get_authorizer(peer_type_t peer, bool force_new) final { + return seastar::smp::submit_to(sid, + [ this, peer = std::move(peer), force_new ] { + return get_decoratee().DecorateeT::fms_get_authorizer( + std::move(peer), force_new); + }); + } + + Dispatcher* get_local_shard() final { + return this; + } + +protected: + using ConnectionFRef = seastar::foreign_ptr; + using MessageFRef = seastar::foreign_ptr; + + // XXX: making the fms_ stuff `virtual` ONLY to let derivatees to use + // `override`. Dynamic dispatch WILL NOT be used! + // This only to provide empty default implementations just following + // the Dispatcher's convention about defaults. + virtual seastar::future<> fms_dispatch(ConnectionFRef, MessageFRef) { + return seastar::make_ready_future<>(); + } + + virtual seastar::future<> fms_handle_accept(ConnectionFRef) { + return seastar::make_ready_future<>(); + } + + virtual seastar::future<> fms_handle_connect(ConnectionFRef) { + return seastar::make_ready_future<>(); + } + + virtual seastar::future<> fms_handle_reset(ConnectionFRef) { + return seastar::make_ready_future<>(); + } + + virtual seastar::future<> fms_handle_remote_reset(ConnectionFRef) { + return seastar::make_ready_future<>(); + } + + virtual seastar::future + fms_verify_authorizer(peer_type_t, + auth_proto_t, + bufferlist&) { + return seastar::make_ready_future(0, bufferlist{}); + } + + virtual seastar::future> + fms_get_authorizer(peer_type_t, bool force_new) { + return seastar::make_ready_future>(nullptr); + } +}; + } // namespace ceph::net From d14108bf7635e1894527e99e1d9331837e9eaaa7 Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Tue, 1 Jan 2019 23:53:45 +0100 Subject: [PATCH 7/7] fixup: MonClient can live in sharded environment. Signed-off-by: Radoslaw Zarzynski --- src/crimson/mon/MonClient.cc | 12 +++++++++--- src/crimson/mon/MonClient.h | 12 ++++++++---- src/crimson/net/Fwd.h | 3 ++- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/crimson/mon/MonClient.cc b/src/crimson/mon/MonClient.cc index e48151d4469f5..4fea5baa622cd 100644 --- a/src/crimson/mon/MonClient.cc +++ b/src/crimson/mon/MonClient.cc @@ -257,7 +257,8 @@ AuthMethodList create_auth_methods(uint32_t entity_type) Client::Client(const EntityName& name, ceph::net::Messenger& messenger) - : entity_name{name}, + : ForeignDispatcher(seastar::engine().cpu_id()), + entity_name{name}, auth_methods{create_auth_methods(entity_name.get_type())}, want_keys{CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD | @@ -296,9 +297,11 @@ bool Client::is_hunting() const { } seastar::future<> -Client::ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m) +Client::fms_dispatch(ceph::net::ConnectionFRef conn, Client::MessageFRef m) { logger().info("ms_dispatch {}", *m); +#if 0 + // TODO: need move to MessageFRef or MessengerXRef. // we only care about these message types switch (m->get_type()) { case CEPH_MSG_MON_MAP: @@ -324,9 +327,12 @@ Client::ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m) default: return seastar::now(); } +#else + return seastar::now(); +#endif } -seastar::future<> Client::ms_handle_reset(ceph::net::ConnectionRef conn) +seastar::future<> Client::fms_handle_reset(ceph::net::ConnectionFRef conn) { auto found = std::find_if(pending_conns.begin(), pending_conns.end(), [peer_addr = conn->get_peer_addr()](auto& mc) { diff --git a/src/crimson/mon/MonClient.h b/src/crimson/mon/MonClient.h index cc291f7510ad9..92824edf71d3a 100644 --- a/src/crimson/mon/MonClient.h +++ b/src/crimson/mon/MonClient.h @@ -36,7 +36,11 @@ namespace ceph::mon { class Connection; -class Client : public ceph::net::Dispatcher { +// Suppose we don't want to shard MonClient to save resources -- one +// instance is fine. Let's see how much effort is necessary to interact +// with sharded world. +class Client : public ceph::net::ForeignDispatcher { + friend ceph::net::ForeignDispatcher; const EntityName entity_name; KeyRing keyring; AuthMethodList auth_methods; @@ -80,9 +84,9 @@ class Client : public ceph::net::Dispatcher { private: void tick(); - seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, - MessageRef m) override; - seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override; + seastar::future<> fms_dispatch(ceph::net::ConnectionFRef conn, + MessageFRef m) override; + seastar::future<> fms_handle_reset(ceph::net::ConnectionFRef conn) override; seastar::future<> handle_monmap(ceph::net::ConnectionRef conn, Ref m); diff --git a/src/crimson/net/Fwd.h b/src/crimson/net/Fwd.h index 331c69d7a3af6..17ffd082c880c 100644 --- a/src/crimson/net/Fwd.h +++ b/src/crimson/net/Fwd.h @@ -29,9 +29,10 @@ using msgr_tag_t = uint8_t; class Connection; using ConnectionRef = seastar::shared_ptr; +using ConnectionFRef = seastar::foreign_ptr; // NOTE: ConnectionXRef should only be used in seastar world, because // lw_shared_ptr<> is not safe to be accessed by unpinned alien threads. -using ConnectionXRef = seastar::lw_shared_ptr>; +using ConnectionXRef = seastar::lw_shared_ptr; class Dispatcher;