Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

crimson, WIP, experiment: bring MonClient to the sharded world #1

Open
wants to merge 7 commits into
base: wip-seastar-msgr-accept-all
Choose a base branch
from
Open
70 changes: 44 additions & 26 deletions src/crimson/mon/MonClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "crimson/common/log.h"
#include "crimson/net/Connection.h"
#include "crimson/net/Errors.h"
#include "crimson/net/Fwd.h"
#include "crimson/net/Messenger.h"

#include "messages/MAuth.h"
Expand Down Expand Up @@ -47,7 +48,7 @@ namespace ceph::mon {

class Connection {
public:
Connection(ceph::net::ConnectionRef conn,
Connection(ceph::net::ConnectionXRef conn,
KeyRing* keyring);
seastar::future<> handle_auth_reply(Ref<MAuthReply> m);
seastar::future<> authenticate(epoch_t epoch,
Expand All @@ -58,7 +59,8 @@ class Connection {
bool is_my_peer(const entity_addr_t& addr) const;

seastar::future<> renew_tickets();
ceph::net::ConnectionRef get_conn();
seastar::foreign_ptr<ceph::net::ConnectionRef>& get_conn();
const seastar::foreign_ptr<ceph::net::ConnectionRef>& get_conn() const;

private:
seastar::future<> setup_session(epoch_t epoch,
Expand All @@ -72,15 +74,15 @@ class Connection {
private:
bool closed = false;
seastar::promise<Ref<MAuthReply>> reply;
ceph::net::ConnectionRef conn;
ceph::net::ConnectionXRef conn;
std::unique_ptr<AuthClientHandler> auth;
RotatingKeyRing rotating_keyring;
uint64_t global_id;
};

Connection::Connection(ceph::net::ConnectionRef conn,
Connection::Connection(ceph::net::ConnectionXRef conn,
KeyRing* keyring)
: conn{conn},
: conn{std::move(conn)},
rotating_keyring{nullptr, CEPH_ENTITY_TYPE_OSD, keyring}
{}

Expand Down Expand Up @@ -143,7 +145,7 @@ Connection::setup_session(epoch_t epoch,
encode(auth_methods.get_supported_set(), m->auth_payload);
encode(name, m->auth_payload);
encode(global_id, m->auth_payload);
return conn->send(m);
return get_conn()->send(m);
}

seastar::future<bool> Connection::do_auth()
Expand All @@ -158,13 +160,13 @@ seastar::future<bool> Connection::do_auth()
ceph::net::error::negotiation_failure));
}
logger().info("sending {}", *m);
return conn->send(m).then([this] {
return get_conn()->send(m).then([this] {
logger().info("waiting");
return reply.get_future();
}).then([this] (Ref<MAuthReply> m) {
logger().info("mon {} => {} returns {}: {}",
conn->get_my_addr(),
conn->get_peer_addr(), *m, m->result);
get_conn()->get_messenger()->get_myaddr(),
get_conn()->get_peer_addr(), *m, m->result);
reply = decltype(reply){};
auto p = m->result_bl.cbegin();
auto ret = auth->handle_response(m->result, p);
Expand All @@ -182,7 +184,7 @@ Connection::authenticate(epoch_t epoch,
const AuthMethodList& auth_methods,
uint32_t want_keys)
{
return conn->keepalive().then([epoch, auth_methods, name, this] {
return get_conn()->keepalive().then([epoch, auth_methods, name, this] {
return setup_session(epoch, auth_methods, name);
}).then([this] {
return reply.get_future();
Expand Down Expand Up @@ -213,20 +215,25 @@ Connection::authenticate(epoch_t epoch,

seastar::future<> Connection::close()
{
if (conn && !std::exchange(closed, true)) {
return conn->close();
if (get_conn() && !std::exchange(closed, true)) {
return get_conn()->close();
} else {
return seastar::now();
}
}

bool Connection::is_my_peer(const entity_addr_t& addr) const
{
return conn->get_peer_addr() == addr;
return get_conn()->get_peer_addr() == addr;
}

ceph::net::ConnectionRef Connection::get_conn() {
return conn;
seastar::foreign_ptr<ceph::net::ConnectionRef>& Connection::get_conn() {
return *conn;
}

const seastar::foreign_ptr<ceph::net::ConnectionRef>&
Connection::get_conn() const {
return *conn;
}
namespace {
AuthMethodList create_auth_methods(uint32_t entity_type)
Expand All @@ -250,7 +257,8 @@ AuthMethodList create_auth_methods(uint32_t entity_type)

Client::Client(const EntityName& name,
ceph::net::Messenger& messenger)
: entity_name{name},
: ForeignDispatcher(seastar::engine().cpu_id()),
entity_name{name},
auth_methods{create_auth_methods(entity_name.get_type())},
want_keys{CEPH_ENTITY_TYPE_MON |
CEPH_ENTITY_TYPE_OSD |
Expand Down Expand Up @@ -289,9 +297,11 @@ bool Client::is_hunting() const {
}

seastar::future<>
Client::ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m)
Client::fms_dispatch(ceph::net::ConnectionFRef conn, Client::MessageFRef m)
{
logger().info("ms_dispatch {}", *m);
#if 0
// TODO: need move to MessageFRef or MessengerXRef.
// we only care about these message types
switch (m->get_type()) {
case CEPH_MSG_MON_MAP:
Expand All @@ -317,9 +327,12 @@ Client::ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m)
default:
return seastar::now();
}
#else
return seastar::now();
#endif
}

seastar::future<> Client::ms_handle_reset(ceph::net::ConnectionRef conn)
seastar::future<> Client::fms_handle_reset(ceph::net::ConnectionFRef conn)
{
auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
[peer_addr = conn->get_peer_addr()](auto& mc) {
Expand Down Expand Up @@ -360,7 +373,7 @@ seastar::future<> Client::handle_auth_reply(ceph::net::ConnectionRef conn,
Ref<MAuthReply> m)
{
logger().info("mon {} => {} returns {}: {}",
conn->get_my_addr(),
conn->get_messenger()->get_myaddr(),
conn->get_peer_addr(), *m, m->result);
auto found = std::find_if(pending_conns.begin(), pending_conns.end(),
[peer_addr = conn->get_peer_addr()](auto& mc) {
Expand Down Expand Up @@ -498,13 +511,18 @@ seastar::future<> Client::reopen_session(int rank)
return seastar::parallel_for_each(mons, [this](auto rank) {
auto peer = monmap.get_addr(rank);
logger().info("connecting to mon.{}", rank);
auto conn = msgr.connect(peer, CEPH_ENTITY_TYPE_MON);
auto& mc = pending_conns.emplace_back(conn, &keyring);
return mc.authenticate(
monmap.get_epoch(), entity_name,
auth_methods, want_keys).handle_exception([conn](auto ep) {
return conn->close().then([ep = std::move(ep)] {
std::rethrow_exception(ep);
auto&& conn_fut = msgr.connect(peer, CEPH_ENTITY_TYPE_MON);

return conn_fut.then([peer, this](ceph::net::ConnectionXRef conn) {
auto& mc = pending_conns.emplace_back(conn, &keyring);
return mc.authenticate(
monmap.get_epoch(),
entity_name,
auth_methods,
want_keys).handle_exception([conn](auto ep) {
return (*conn)->close().then([ep = std::move(ep)] {
std::rethrow_exception(ep);
});
});
}).then([peer, this] {
if (!is_hunting()) {
Expand Down
12 changes: 8 additions & 4 deletions src/crimson/mon/MonClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ namespace ceph::mon {

class Connection;

class Client : public ceph::net::Dispatcher {
// Suppose we don't want to shard MonClient to save resources -- one
// instance is fine. Let's see how much effort is necessary to interact
// with sharded world.
class Client : public ceph::net::ForeignDispatcher<Client> {
friend ceph::net::ForeignDispatcher<Client>;
const EntityName entity_name;
KeyRing keyring;
AuthMethodList auth_methods;
Expand Down Expand Up @@ -80,9 +84,9 @@ class Client : public ceph::net::Dispatcher {
private:
void tick();

seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn,
MessageRef m) override;
seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override;
seastar::future<> fms_dispatch(ceph::net::ConnectionFRef conn,
MessageFRef m) override;
seastar::future<> fms_handle_reset(ceph::net::ConnectionFRef conn) override;

seastar::future<> handle_monmap(ceph::net::ConnectionRef conn,
Ref<MMonMap> m);
Expand Down
23 changes: 15 additions & 8 deletions src/crimson/net/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,38 @@
#pragma once

#include <queue>
#include <boost/smart_ptr/intrusive_ref_counter.hpp>
#include <seastar/core/future.hh>
#include <seastar/core/shared_ptr.hh>

#include "Fwd.h"

namespace ceph::net {

using seq_num_t = uint64_t;
using SharedPtr = seastar::shared_ptr<seastar::shared_ptr_count_base>;

class Connection : public boost::intrusive_ref_counter<Connection,
boost::thread_unsafe_counter> {
class Connection : public seastar::enable_shared_from_this<Connection> {
SharedPtr priv;
protected:
entity_addr_t my_addr;
entity_addr_t peer_addr;
peer_type_t peer_type = -1;

public:
Connection(const entity_addr_t& my_addr)
: my_addr(my_addr) {}
Connection() {}
virtual ~Connection() {}

void set_priv(const SharedPtr& o) {
priv = o;
}
SharedPtr get_priv() {
return priv;
}
virtual Messenger* get_messenger() const = 0;
const entity_addr_t& get_my_addr() const { return my_addr; }
const entity_addr_t& get_peer_addr() const { return peer_addr; }
virtual int get_peer_type() const = 0;

/// true if the handshake has completed and no errors have been encountered
virtual bool is_connected() = 0;
virtual seastar::future<bool> is_connected() = 0;

/// send a message over a connection that has completed its handshake
virtual seastar::future<> send(MessageRef msg) = 0;
Expand All @@ -53,6 +57,9 @@ class Connection : public boost::intrusive_ref_counter<Connection,

/// close the connection and cancel any any pending futures from read/send
virtual seastar::future<> close() = 0;

/// which shard id the connection lives
virtual seastar::shard_id shard_id() const = 0;
};

} // namespace ceph::net
Loading