Skip to content

Commit

Permalink
add DistributionRoute as a wrapper
Browse files Browse the repository at this point in the history
Summary: Distribution route is used for alternative ways of distributing invalidation requests that go into different regions.

Reviewed By: stuclar

Differential Revision: D49022531

fbshipit-source-id: a3c7188087d933304384549e823ab5025fe83075
  • Loading branch information
Lenar Fatikhov authored and facebook-github-bot committed Oct 5, 2023
1 parent 9982d1d commit 275a7e6
Show file tree
Hide file tree
Showing 8 changed files with 753 additions and 3 deletions.
2 changes: 2 additions & 0 deletions mcrouter/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ libmcroutercore_a_SOURCES = \
routes/McBucketRoute-inl.h \
routes/McBucketRoute.cpp \
routes/McBucketRoute.h \
routes/DistributionRoute-inl.h \
routes/DistributionRoute.h \
routes/McExtraRouteHandleProvider-inl.h \
routes/McExtraRouteHandleProvider.h \
routes/McImportResolver.cpp \
Expand Down
7 changes: 4 additions & 3 deletions mcrouter/McSpoolUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ FOLLY_NOINLINE bool spoolAxonProxy(
}
// Run off fiber to save fiber stack for serialization
auto kvPairs = folly::fibers::runInMainContext([&req, &region, &pool]() {
const auto& finalReq =
auto finalReq =
req.attributes_ref()->find(memcache::kMcDeleteReqAttrSource) ==
req.attributes_ref()->cend()
? addDeleteRequestSource(
req, memcache::McDeleteRequestSource::FAILED_INVALIDATION)
? std::move(addDeleteRequestSource(
req, memcache::McDeleteRequestSource::FAILED_INVALIDATION))
: req;
finalReq.key_ref()->stripRoutingPrefix();
auto serialized = invalidation::McInvalidationKvPairs::serialize<
memcache::McDeleteRequest>(finalReq)
.template to<std::string>();
Expand Down
53 changes: 53 additions & 0 deletions mcrouter/routes/DistributionRoute-inl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <folly/dynamic.h>

#include "mcrouter/lib/fbi/cpp/ParsingUtil.h"

namespace facebook::memcache::mcrouter {

inline DistributionRouteSettings parseDistributionRouteSettings(
const folly::dynamic& json) {
DistributionRouteSettings settings;
settings.replay = false;
if (auto* jReplay = json.get_ptr("replay")) {
settings.replay = parseBool(*jReplay, "replay");
}
if (auto* jDistributedDeleteRpcEnabled =
json.get_ptr("distributed_delete_rpc_enabled")) {
settings.distributedDeleteRpcEnabled = parseBool(
*jDistributedDeleteRpcEnabled, "distributed_delete_rpc_enabled");
}
return settings;
}

// standalone handle
template <class RouterInfo>
typename RouterInfo::RouteHandlePtr makeDistributionRoute(
RouteHandleFactory<typename RouterInfo::RouteHandleIf>& factory,
const folly::dynamic& json) {
checkLogic(json.isObject(), "DistributionRoute is not an object");
checkLogic(json.count("child"), "DistributionRoute: no child route");
auto settings = parseDistributionRouteSettings(json);
return makeRouteHandleWithInfo<RouterInfo, DistributionRoute>(
factory.create(json["child"]), settings);
}

// wrapper handle
template <class RouterInfo>
typename RouterInfo::RouteHandlePtr makeDistributionRoute(
typename RouterInfo::RouteHandlePtr rh,
const folly::dynamic& json) {
auto settings = parseDistributionRouteSettings(json);
return makeRouteHandleWithInfo<RouterInfo, DistributionRoute>(
std::move(rh), settings);
}

} // namespace facebook::memcache::mcrouter
174 changes: 174 additions & 0 deletions mcrouter/routes/DistributionRoute.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <cstddef>

#include "mcrouter/McSpoolUtils.h"
#include "mcrouter/McrouterFiberContext.h"
#include "mcrouter/ProxyBase.h"
#include "mcrouter/ProxyRequestContextTyped.h"
#include "mcrouter/lib/Reply.h"
#include "mcrouter/lib/RouteHandleTraverser.h"
#include "mcrouter/lib/invalidation/McInvalidationDefs.h"
#include "mcrouter/lib/network/AccessPoint.h"
#include "mcrouter/routes/RoutingUtils.h"

namespace facebook::memcache::mcrouter {

struct DistributionRouteSettings {
bool distributedDeleteRpcEnabled{true};
bool replay{false};
};

constexpr std::string_view kAsynclogDistributionEndpoint = "0.0.0.0";

/**
* The route handle is used to route cross-region requests via DL
*
* Config:
* - distribution_delete_rpc_enabled(bool) - enable sending the request via rpc
* after it is distributed
* - replay(bool) - enable replay mode (for mcreplay)
*/
template <class RouterInfo>
class DistributionRoute {
private:
using RouteHandleIf = typename RouterInfo::RouteHandleIf;
using RouteHandlePtr = typename RouterInfo::RouteHandlePtr;

public:
DistributionRoute(RouteHandlePtr rh, DistributionRouteSettings& settings)
: rh_(std::move(rh)),
distributedDeleteRpcEnabled_(settings.distributedDeleteRpcEnabled),
replay_{settings.replay} {}

std::string routeName() const {
return fmt::format(
"distribution|distributed_delete_rpc_enabled={}|replay={}",
distributedDeleteRpcEnabled_ ? "true" : "false",
replay_ ? "true" : "false");
}

template <class Request>
bool traverse(
const Request& req,
const RouteHandleTraverser<RouteHandleIf>& t) const {
return t(*rh_, req);
}

template <class Request>
ReplyT<Request> route(const Request& req) const {
return rh_->route(req);
}

/**
* Delete can be:
* 1. In-region rpc delete
* 1.1 With no routing prefix
* 1.2 With current region in the prefix
* 2. Cross-region rpc delete having routing prefix with another region
* 3. Broadcast delete with routing prefix = /(star)/(star)/
*
* If distribution is enabled, we write 2 and 3 to Axon.
* If write to Axon fails, we spool to Async log with the routing prefix.
*/
McDeleteReply route(const McDeleteRequest& req) const {
auto& proxy = fiber_local<RouterInfo>::getSharedCtx()->proxy();
// In mcreplay case we try to infer target region from request
auto distributionRegion = FOLLY_LIKELY(!replay_)
? fiber_local<RouterInfo>::getDistributionTargetRegion()
: inferDistributionRegionForReplay(req, proxy);

if (FOLLY_LIKELY(!distributionRegion.has_value())) {
return rh_->route(req);
}

auto& axonCtx = fiber_local<RouterInfo>::getAxonCtx();
auto bucketId = fiber_local<RouterInfo>::getBucketId();
assert(axonCtx && bucketId);

bool spoolSucceeded = false;
auto source = distributionRegion.value().empty()
? memcache::McDeleteRequestSource::CROSS_REGION_BROADCAST_INVALIDATION
: memcache::McDeleteRequestSource::CROSS_REGION_DIRECTED_INVALIDATION;
auto finalReq = addDeleteRequestSource(req, source);
finalReq.bucketId_ref() = *bucketId;
auto axonLogRes = spoolAxonProxy(
proxy, finalReq, axonCtx, *bucketId, std::move(*distributionRegion));
if (axonLogRes) {
proxy.stats().increment(distribution_axon_write_success_stat);
}
spoolSucceeded |= axonLogRes;
if (FOLLY_UNLIKELY(!axonLogRes)) {
proxy.stats().increment(distribution_axon_write_failed_stat);
const auto host =
std::make_shared<AccessPoint>(kAsynclogDistributionEndpoint);
spoolSucceeded |= spoolAsynclog(
&proxy,
finalReq,
host,
true,
fiber_local<RouterInfo>::getAsynclogName());
}
if (!spoolSucceeded) {
proxy.stats().increment(distribution_async_spool_failed_stat);
}
// if spool to Axon or Asynclog succeeded and rpc is disabled, we return
// default reply to the client:
if (!distributedDeleteRpcEnabled_) {
return spoolSucceeded ? createReply(DefaultReply, finalReq)
: McDeleteReply(carbon::Result::LOCAL_ERROR);
}
return rh_->route(req);
}

private:
const RouteHandlePtr rh_;
const bool distributedDeleteRpcEnabled_;
const bool replay_;

std::optional<std::string> inferDistributionRegionForReplay(
const McDeleteRequest& req,
ProxyBase& proxy) const {
auto sourceIt =
req.attributes_ref()->find(memcache::kMcDeleteReqAttrSource);
if (FOLLY_LIKELY(sourceIt == req.attributes_ref()->end())) {
return std::nullopt;
}
switch (static_cast<McDeleteRequestSource>(sourceIt->second)) {
case McDeleteRequestSource::CROSS_REGION_DIRECTED_INVALIDATION:
if (!req.key_ref()->routingPrefix().empty()) {
auto routingPrefix = RoutingPrefix(req.key_ref()->routingPrefix());
return routingPrefix.getRegion().str();
}
proxy.stats().increment(
distribution_replay_xregion_directed_no_prefix_error_stat);
throw std::logic_error(
"Cross-region directed invalidation request must have routing prefix");
case McDeleteRequestSource::CROSS_REGION_BROADCAST_INVALIDATION:
return "";
default:
return std::nullopt;
}
}
};

template <class RouterInfo>
typename RouterInfo::RouteHandlePtr makeDistributionRoute(
typename RouterInfo::RouteHandlePtr rh,
const folly::dynamic& json);

template <class RouterInfo>
typename RouterInfo::RouteHandlePtr makeDistributionRoute(
RouteHandleFactory<typename RouterInfo::RouteHandleIf>& factory,
const folly::dynamic& json);

} // namespace facebook::memcache::mcrouter

#include "mcrouter/routes/DistributionRoute-inl.h"
20 changes: 20 additions & 0 deletions mcrouter/routes/McRouteHandleProvider-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "mcrouter/routes/AllFastestRouteFactory.h"
#include "mcrouter/routes/AsynclogRoute.h"
#include "mcrouter/routes/DestinationRoute.h"
#include "mcrouter/routes/DistributionRoute.h"
#include "mcrouter/routes/ExtraRouteHandleProviderIf.h"
#include "mcrouter/routes/FailoverRoute.h"
#include "mcrouter/routes/HashRouteFactory.h"
Expand Down Expand Up @@ -528,6 +529,16 @@ McRouteHandleProvider<RouterInfo>::createSRRoute(
auto route = factoryFunc(factory, json, proxy_);
// Track the SRRoute created so that we can save it to SRRoute map later
auto srRoute = route;
bool distributionRouteEnabled = false;

if (auto* jDistribution = json.get_ptr("distribution_enabled")) {
distributionRouteEnabled =
parseBool(*jDistribution, "distribution_enabled");
}

if (distributionRouteEnabled) {
route = makeDistributionRoute<RouterInfo>(std::move(route), json);
}

if (auto maxOutstandingJson = json.get_ptr("max_outstanding")) {
auto v = parseInt(
Expand Down Expand Up @@ -680,9 +691,18 @@ McRouteHandleProvider<RouterInfo>::makePoolRoute(
jhashWithWeights, std::move(destinations), factory.getThreadId());
auto poolRoute = route;

bool distributionRouteEnabled = false;
auto asynclogName = poolJson.name;
bool needAsynclog = true;
if (json.isObject()) {
if (auto* jDistribution = json.get_ptr("distribution_enabled")) {
distributionRouteEnabled =
parseBool(*jDistribution, "distribution_enabled");
}
if (distributionRouteEnabled) {
route = makeDistributionRoute<RouterInfo>(std::move(route), json);
}

if (auto jrates = json.get_ptr("rates")) {
route = createRateLimitRoute(std::move(route), RateLimiter(*jrates));
}
Expand Down
5 changes: 5 additions & 0 deletions mcrouter/routes/McRouteHandleProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "mcrouter/routes/BlackholeRoute.h"
#include "mcrouter/routes/CarbonLookasideRoute.h"
#include "mcrouter/routes/DevNullRoute.h"
#include "mcrouter/routes/DistributionRoute.h"
#include "mcrouter/routes/ErrorRoute.h"
#include "mcrouter/routes/FailoverRoute.h"
#include "mcrouter/routes/FailoverWithExptimeRouteFactory.h"
Expand Down Expand Up @@ -255,6 +256,10 @@ McRouteHandleProvider<MemcacheRouterInfo>::buildRouteMap() {
MemcacheRouterInfo,
MemcacheCarbonLookasideHelper>},
{"DevNullRoute", &makeDevNullRoute<MemcacheRouterInfo>},
{"DistributionRoute",
[](McRouteHandleFactory& factory, const folly::dynamic& json) {
return makeDistributionRoute<MemcacheRouterInfo>(factory, json);
}},
{"ErrorRoute", &makeErrorRoute<MemcacheRouterInfo>},
{"FailoverWithExptimeRoute",
&makeFailoverWithExptimeRoute<MemcacheRouterInfo>},
Expand Down
Loading

0 comments on commit 275a7e6

Please sign in to comment.