Skip to content

Commit

Permalink
RootRoute changes to accommodate distribution cases
Browse files Browse the repository at this point in the history
Summary: Distribution now can be set up for broadcast and cross-region directed deletes. This requires a DistributionRoute below the local routing tree branch to work.

Reviewed By: stuclar

Differential Revision: D49598728

fbshipit-source-id: a7786a9fa57a5b29e07666d84f27b1bddef89d2f
  • Loading branch information
Lenar Fatikhov authored and facebook-github-bot committed Oct 2, 2023
1 parent ddb03f7 commit f2a2e51
Show file tree
Hide file tree
Showing 7 changed files with 541 additions and 43 deletions.
15 changes: 13 additions & 2 deletions mcrouter/McrouterFiberContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,16 @@ class fiber_local {
std::bitset<NUM_FLAGS> featureFlags;
int32_t selectedIndex{-1};
uint32_t failoverCount{0};
int64_t accumulatedBeforeReqInjectedLatencyUs{0};
int64_t accumulatedAfterReqInjectedLatencyUs{0};
std::optional<uint64_t> bucketId;
std::optional<std::string> distributionTargetRegion;
RequestClass requestClass;
folly::StringPiece asynclogName;
int64_t networkTransportTimeUs{0};
ServerLoad load{0};
std::vector<ExtraDataCallbackT> extraDataCallbacks;
std::shared_ptr<AxonContext> axonCtx{nullptr};
int64_t accumulatedBeforeReqInjectedLatencyUs{0};
int64_t accumulatedAfterReqInjectedLatencyUs{0};
};

static auto makeGuardHelperBase(McrouterFiberContext&& tmp) {
Expand Down Expand Up @@ -367,6 +368,16 @@ class fiber_local {
static std::optional<uint64_t> getBucketId() {
return folly::fibers::local<McrouterFiberContext>().bucketId;
}

static void setDistributionTargetRegion(std::string region) {
folly::fibers::local<McrouterFiberContext>().distributionTargetRegion =
region;
}

static std::optional<std::string> getDistributionTargetRegion() {
return folly::fibers::local<McrouterFiberContext>()
.distributionTargetRegion;
}
};

} // namespace mcrouter
Expand Down
25 changes: 19 additions & 6 deletions mcrouter/ProxyConfig-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,28 @@ ProxyConfig<RouterInfo>::ProxyConfig(
partialConfigs_ = provider.releasePartialConfigs();
}
accessPoints_ = provider.releaseAccessPoints();
bool disableBroadcastDeleteRpc = false;
if (auto* jDisableBroadcastDeleteRpc =
json.get_ptr("disable_broadcast_delete_rpc")) {
disableBroadcastDeleteRpc =
parseBool(*jDisableBroadcastDeleteRpc, "disable_broadcast_delete_rpc");
bool enableCrossRegionDeleteRpc = true;
if (auto* jEnableCrossRegionDeleteRpc =
json.get_ptr("enable_cross_region_delete_rpc")) {
enableCrossRegionDeleteRpc = parseBool(
*jEnableCrossRegionDeleteRpc, "enable_cross_region_delete_rpc");
}

bool enableDeleteDistribution = false;
if (auto* jEnableDeleteDistribution =
json.get_ptr("enable_delete_distribution")) {
enableDeleteDistribution =
parseBool(*jEnableDeleteDistribution, "enable_delete_distribution");
}
checkLogic(
enableDeleteDistribution || enableCrossRegionDeleteRpc,
"ProxyConfig: cannot disable cross-region delete rpc if distribution is disabled");

proxyRoute_ = std::make_shared<ProxyRoute<RouterInfo>>(
proxy, routeSelectors, disableBroadcastDeleteRpc);
proxy,
routeSelectors,
enableDeleteDistribution,
enableCrossRegionDeleteRpc);
serviceInfo_ = std::make_shared<ServiceInfo<RouterInfo>>(proxy, *this);
}

Expand Down
14 changes: 14 additions & 0 deletions mcrouter/lib/test/RouteHandleTestUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ struct TestHandleImpl {

std::vector<uint64_t> sawQueryTags;

std::vector<std::string> distributionRegionInFiber;

bool isTko;

bool isPaused;
Expand Down Expand Up @@ -302,6 +304,17 @@ struct RecordingRoute {
std::enable_if_t<!HasBucketId<Request>::value, void> recordBucketId(
const Request&) const {}

template <class Request>
void recordDistributionTargetRegion(const Request&) const {
if (mcrouter::fiber_local<MemcacheRouterInfo>::getDistributionTargetRegion()
.has_value()) {
h_->distributionRegionInFiber.push_back(
mcrouter::fiber_local<
MemcacheRouterInfo>::getDistributionTargetRegion()
.value());
}
}

template <class Request>
bool traverse(const Request& req, const RouteHandleTraverser<RouteHandleIf>&)
const {
Expand Down Expand Up @@ -351,6 +364,7 @@ struct RecordingRoute {
h_->sawFlags.push_back(getFlagsIfExist(req));
h_->sawQueryTags.push_back(getQueryTagsIfExists(req));
recordBucketId(req);
recordDistributionTargetRegion(req);
if (carbon::GetLike<Request>::value) {
reply.result_ref() = h_->resultGenerator_.hasValue()
? (*h_->resultGenerator_)(req.key_ref()->fullKey().str())
Expand Down
8 changes: 5 additions & 3 deletions mcrouter/routes/ProxyRoute-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ template <class RouterInfo>
ProxyRoute<RouterInfo>::ProxyRoute(
Proxy<RouterInfo>& proxy,
const RouteSelectorMap<typename RouterInfo::RouteHandleIf>& routeSelectors,
bool disableBroadcastDeleteRpc)
bool enableDeleteDistribution,
bool enableCrossRegionDeleteRpc)
: proxy_(proxy),
root_(makeRouteHandle<typename RouterInfo::RouteHandleIf, RootRoute>(
root_(makeRouteHandleWithInfo<RouterInfo, RootRoute>(
proxy_,
routeSelectors,
disableBroadcastDeleteRpc)) {
enableDeleteDistribution,
enableCrossRegionDeleteRpc)) {
if (proxy_.getRouterOptions().big_value_split_threshold != 0) {
root_ = detail::wrapWithBigValueRoute<RouterInfo>(
std::move(root_), proxy_.getRouterOptions());
Expand Down
3 changes: 2 additions & 1 deletion mcrouter/routes/ProxyRoute.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class ProxyRoute {
Proxy<RouterInfo>& proxy,
const RouteSelectorMap<typename RouterInfo::RouteHandleIf>&
routeSelectors,
bool disableBroadcastDeleteRpc = false);
bool enableDeleteDistribution = false,
bool enableCrossRegionDeleteRpc = true);

template <class Request>
bool traverse(
Expand Down
132 changes: 101 additions & 31 deletions mcrouter/routes/RootRoute.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace facebook {
namespace memcache {
namespace mcrouter {

template <class RouteHandleIf>
template <class RouterInfo>
class RootRoute {
public:
static std::string routeName() {
Expand All @@ -35,20 +35,24 @@ class RootRoute {

RootRoute(
ProxyBase& proxy,
const RouteSelectorMap<RouteHandleIf>& routeSelectors,
bool disableBroadcastDeleteRpc = false)
const RouteSelectorMap<typename RouterInfo::RouteHandleIf>&
routeSelectors,
bool enableDeleteDistribution = false,
bool enableCrossRegionDeleteRpc = true)
: opts_(proxy.getRouterOptions()),
rhMap_(
routeSelectors,
opts_.default_route,
opts_.send_invalid_route_to_default,
opts_.enable_route_policy_v2),
disableBroadcastDeleteRpc_(disableBroadcastDeleteRpc) {}
defaultRoute_(opts_.default_route),
enableDeleteDistribution_(enableDeleteDistribution),
enableCrossRegionDeleteRpc_(enableCrossRegionDeleteRpc) {}

template <class Request>
bool traverse(
const Request& req,
const RouteHandleTraverser<RouteHandleIf>& t) const {
const RouteHandleTraverser<typename RouterInfo::RouteHandleIf>& t) const {
const auto* rhPtr = rhMap_.getTargetsForKeyFast(
req.key_ref()->routingPrefix(), req.key_ref()->routingKey());
if (FOLLY_LIKELY(rhPtr != nullptr)) {
Expand Down Expand Up @@ -76,31 +80,71 @@ class RootRoute {
run in the background.
This is a good default for /star/star/ requests. */
const auto* rhPtr = rhMap_.getTargetsForKeyFast(
req.key_ref()->routingPrefix(), req.key_ref()->routingKey());

auto reply = FOLLY_UNLIKELY(rhPtr == nullptr)
? routeImpl(
rhMap_.getTargetsForKeySlow(
req.key_ref()->routingPrefix(), req.key_ref()->routingKey()),
req)
: routeImpl(*rhPtr, req);

auto reply = getTargetsAndRoute(req.key_ref()->routingPrefix(), req);
if (isErrorResult(*reply.result_ref()) && opts_.group_remote_errors) {
reply = ReplyT<Request>(carbon::Result::REMOTE_ERROR);
}

return reply;
}

McDeleteReply route(const McDeleteRequest& req) const {
// If distribution is enabled, route deletes to the default route where
// DistributionRoute will route cross region.
//
// NOTE: if enableCrossRegionDeleteRpc flag is not set it defaults to true,
// the distribution step will be followed by a duplicating RPC step, and the
// return value will be the reply returned by the RPC step.
McDeleteReply reply;
if (enableDeleteDistribution_ && !req.key_ref()->routingPrefix().empty()) {
auto routingPrefix = RoutingPrefix(req.key_ref()->routingPrefix());
if (routingPrefix.str() != defaultRoute_.str() &&
req.key_ref()->routingPrefix() != kBroadcastPrefix) {
reply = fiber_local<RouterInfo>::runWithLocals(
[this, &req, &routingPrefix]() {
fiber_local<RouterInfo>::setDistributionTargetRegion(
routingPrefix.getRegion().str());
return getTargetsAndRoute(defaultRoute_, req);
});

if (!enableCrossRegionDeleteRpc_) {
return reply;
}
}
}
reply = getTargetsAndRoute(req.key_ref()->routingPrefix(), req);
if (isErrorResult(*reply.result_ref()) && opts_.group_remote_errors) {
reply = McDeleteReply(carbon::Result::REMOTE_ERROR);
}
return reply;
}

private:
const McrouterOptions& opts_;
RouteHandleMap<RouteHandleIf> rhMap_;
bool disableBroadcastDeleteRpc_;
RouteHandleMap<typename RouterInfo::RouteHandleIf> rhMap_;
RoutingPrefix defaultRoute_;
bool enableDeleteDistribution_;
bool enableCrossRegionDeleteRpc_;

template <class Request>
FOLLY_ALWAYS_INLINE ReplyT<Request> getTargetsAndRoute(
folly::StringPiece routingPrefix,
const Request& req) const {
const auto* rhPtr =
rhMap_.getTargetsForKeyFast(routingPrefix, req.key_ref()->routingKey());

return UNLIKELY(rhPtr == nullptr)
? routeImpl(
rhMap_.getTargetsForKeySlow(
routingPrefix, req.key_ref()->routingKey()),
req)
: routeImpl(*rhPtr, req);
}

template <class Request>
ReplyT<Request> routeImpl(
const std::vector<std::shared_ptr<RouteHandleIf>>& rh,
const std::vector<std::shared_ptr<typename RouterInfo::RouteHandleIf>>&
rh,
const Request& req,
carbon::GetLikeT<Request> = 0) const {
auto reply = doRoute(rh, req);
Expand All @@ -123,7 +167,8 @@ class RootRoute {

template <class Request>
ReplyT<Request> routeImpl(
const std::vector<std::shared_ptr<RouteHandleIf>>& rh,
const std::vector<std::shared_ptr<typename RouterInfo::RouteHandleIf>>&
rh,
const Request& req,
carbon::ArithmeticLikeT<Request> = 0) const {
auto reply = opts_.allow_only_gets ? createReply(DefaultReply, req)
Expand All @@ -137,7 +182,8 @@ class RootRoute {

template <class Request>
ReplyT<Request> routeImpl(
const std::vector<std::shared_ptr<RouteHandleIf>>& rh,
const std::vector<std::shared_ptr<typename RouterInfo::RouteHandleIf>>&
rh,
const Request& req,
carbon::OtherThanT<Request, carbon::GetLike<>, carbon::ArithmeticLike<>> =
0) const {
Expand All @@ -150,29 +196,53 @@ class RootRoute {

template <class Request>
ReplyT<Request> doRoute(
const std::vector<std::shared_ptr<RouteHandleIf>>& rh,
const std::vector<std::shared_ptr<typename RouterInfo::RouteHandleIf>>&
rh,
const Request& req) const {
if (FOLLY_LIKELY(rh.size() == 1)) {
return rh[0]->route(req);
}
if (!rh.empty()) {
// Broadcast delete via Distribution, route only
// to the first (local) route handle
if constexpr (folly::IsOneOf<Request, McDeleteRequest>::value) {
if (disableBroadcastDeleteRpc_ &&
req.key_ref()->routingPrefix() == kBroadcastPrefix) {
return rh[0]->route(req);
}
}
return routeToAll(rh, req);
}
return createReply<Request>(ErrorReply);
}

auto reqCopy = std::make_shared<const Request>(req);
template <class Request>
ReplyT<Request> routeToAll(
const std::vector<std::shared_ptr<typename RouterInfo::RouteHandleIf>>&
rh,
const Request& req) const {
auto reqCopy = std::make_shared<const Request>(req);
for (size_t i = 1, e = rh.size(); i < e; ++i) {
auto r = rh[i];
folly::fibers::addTask([r, reqCopy]() { r->route(*reqCopy); });
}
return rh[0]->route(req);
}

McDeleteReply routeToAll(
const std::vector<std::shared_ptr<typename RouterInfo::RouteHandleIf>>&
rh,
const McDeleteRequest& req) const {
if (enableCrossRegionDeleteRpc_) {
auto reqCopy = std::make_shared<const McDeleteRequest>(req);
for (size_t i = 1, e = rh.size(); i < e; ++i) {
auto r = rh[i];
folly::fibers::addTask([r, reqCopy]() { r->route(*reqCopy); });
}
}
if (enableDeleteDistribution_ &&
req.key_ref()->routingPrefix() == kBroadcastPrefix) {
return fiber_local<RouterInfo>::runWithLocals([&req, &rh]() {
// DistributionRoute will read empty string as "broadcast", i.e.
// distribute to all regions:
fiber_local<RouterInfo>::setDistributionTargetRegion("");
return rh[0]->route(req);
});
} else {
return rh[0]->route(req);
}
return createReply<Request>(ErrorReply);
}
};
} // namespace mcrouter
Expand Down
Loading

0 comments on commit f2a2e51

Please sign in to comment.