Skip to content

Commit

Permalink
re-land distribution scuba logging
Browse files Browse the repository at this point in the history
Summary: Original diff D50904038

Reviewed By: stuclar

Differential Revision: D51837515

fbshipit-source-id: 2c7bbf92f199cec42b387533186e73dcd5e24cad
  • Loading branch information
Lenar Fatikhov authored and facebook-github-bot committed Dec 14, 2023
1 parent de94b86 commit 49f7a92
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 13 deletions.
6 changes: 6 additions & 0 deletions mcrouter/lib/network/AccessPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,5 +222,11 @@ std::string AccessPoint::toString() const {
compressed_ ? "compressed" : "notcompressed");
}

const AccessPoint& AccessPoint::defaultAp() {
static const AccessPoint ap =
AccessPoint(folly::IPAddress(), 0, 0, mc_thrift_protocol);
return ap;
}

} // namespace memcache
} // namespace facebook
2 changes: 2 additions & 0 deletions mcrouter/lib/network/AccessPoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ static_assert(
"McProtocolT must have uint8_t alignemnt");

struct AccessPoint {
static const AccessPoint& defaultAp();

explicit AccessPoint(
folly::StringPiece host = "",
uint16_t port = 0,
Expand Down
73 changes: 64 additions & 9 deletions mcrouter/routes/DistributionRoute.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ struct DistributionRouteSettings {
};

constexpr std::string_view kAsynclogDistributionEndpoint = "0.0.0.0";
constexpr folly::StringPiece kBroadcastRolloutMessage = "DistributionRoute";
constexpr std::string_view kBroadcastRolloutMessage = "DistributionRoute";
constexpr std::string_view kDistributionTargetMarkerForLog = "dl_distribution";

/**
* The route handle is used to route cross-region requests via DL
Expand Down Expand Up @@ -80,7 +81,8 @@ class DistributionRoute {
* If write to Axon fails, we spool to Async log with the routing prefix.
*/
McDeleteReply route(const McDeleteRequest& req) const {
auto& proxy = fiber_local<RouterInfo>::getSharedCtx()->proxy();
auto& ctx = *fiber_local<RouterInfo>::getSharedCtx();
auto& proxy = ctx.proxy();
// In mcreplay case we try to infer target region from request
auto distributionRegion = FOLLY_LIKELY(!replay_)
? fiber_local<RouterInfo>::getDistributionTargetRegion()
Expand All @@ -100,13 +102,24 @@ class DistributionRoute {
: memcache::McDeleteRequestSource::CROSS_REGION_DIRECTED_INVALIDATION;
auto finalReq = addDeleteRequestSource(req, source);
finalReq.bucketId_ref() = fmt::to_string(*bucketId);
DestinationRequestCtx dctx(nowUs());
onBeforeDistribution(finalReq, ctx, *finalReq.bucketId_ref(), dctx);

auto axonLogRes = spoolAxonProxy(
finalReq,
axonCtx,
*bucketId,
std::move(
distributionRegion.value().empty() ? kBroadcastRolloutMessage.str()
: *distributionRegion));
distributionRegion.value().empty()
? std::string(kBroadcastRolloutMessage)
: *distributionRegion));

auto reply = axonLogRes ? createReply(DefaultReply, finalReq)
: McDeleteReply(carbon::Result::LOCAL_ERROR);

dctx.endTime = nowUs();
onAfterDistribution(finalReq, reply, ctx, *finalReq.bucketId_ref(), dctx);

if (axonLogRes) {
proxy.stats().increment(distribution_axon_write_success_stat);
}
Expand All @@ -121,15 +134,17 @@ class DistributionRoute {
host,
true,
fiber_local<RouterInfo>::getAsynclogName());
}
if (!spoolSucceeded) {
proxy.stats().increment(distribution_async_spool_failed_stat);
if (spoolSucceeded) {
// update reply if axon failed but spool succeeded
reply = createReply(DefaultReply, finalReq);
} else {
proxy.stats().increment(distribution_async_spool_failed_stat);
}
}
// if spool to Axon or Asynclog succeeded and rpc is disabled, we return
// default reply to the client:
if (!distributedDeleteRpcEnabled_) {
return spoolSucceeded ? createReply(DefaultReply, finalReq)
: McDeleteReply(carbon::Result::LOCAL_ERROR);
return reply;
}
return rh_->route(req);
}
Expand Down Expand Up @@ -163,6 +178,46 @@ class DistributionRoute {
return std::nullopt;
}
}

void onBeforeDistribution(
const McDeleteRequest& req,
ProxyRequestContextWithInfo<RouterInfo>& ctx,
const std::string& bucketId,
const DestinationRequestCtx& dctx) const {
ctx.onBeforeRequestSent(
/*poolName*/ kDistributionTargetMarkerForLog,
/*ap*/ AccessPoint::defaultAp(),
/*strippedRoutingPrefix*/ folly::StringPiece(),
/*request*/ req,
/*requestClass*/ fiber_local<RouterInfo>::getRequestClass(),
/*startTimeUs*/ dctx.startTime,
/*bucketId*/ bucketId);
}

void onAfterDistribution(
const McDeleteRequest& req,
const McDeleteReply& reply,
ProxyRequestContextWithInfo<RouterInfo>& ctx,
const std::string& bucketId,
const DestinationRequestCtx& dctx) const {
RpcStatsContext rpcContext;
ctx.onReplyReceived(
/*poolName*/ kDistributionTargetMarkerForLog,
/*poolIndex*/ std::nullopt,
/*ap*/ AccessPoint::defaultAp(),
/*strippedRoutingPrefix*/ folly::StringPiece(),
/*request*/ req,
/*reply*/ reply,
/*requestClass*/ fiber_local<RouterInfo>::getRequestClass(),
/*startTimeUs*/ dctx.startTime,
/*endTimeUs*/ dctx.endTime,
/*poolStatIndex*/ -1,
/*rpcStatsContext*/ rpcContext,
/*networkTransportTimeUs*/
fiber_local<RouterInfo>::getNetworkTransportTimeUs(),
/*extraDataCallback*/ fiber_local<RouterInfo>::getExtraDataCallbacks(),
/*bucketId*/ bucketId);
}
};

template <class RouterInfo>
Expand Down
12 changes: 8 additions & 4 deletions mcrouter/routes/RootRoute.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class RootRoute {
McDeleteReply reply;
if (enableDeleteDistribution_ && !req.key_ref()->routingPrefix().empty()) {
auto routingPrefix = RoutingPrefix(req.key_ref()->routingPrefix());
if (routingPrefix.str() != defaultRoute_.str() &&
if (routingPrefix.getRegion() != defaultRoute_.getRegion() &&
req.key_ref()->routingPrefix() != kBroadcastPrefix) {
reply = fiber_local<RouterInfo>::runWithLocals(
[this, &req, &routingPrefix]() {
Expand Down Expand Up @@ -204,8 +204,12 @@ class RootRoute {
const std::vector<std::shared_ptr<typename RouterInfo::RouteHandleIf>>&
rh,
const Request& req) const {
if (FOLLY_LIKELY(rh.size() == 1)) {
return rh[0]->route(req);
// for deletes, we cannot assume that there's only one target
// even if it's a broadcast delete
if (!folly::IsOneOf<Request, McDeleteRequest>::value) {
if (FOLLY_LIKELY(rh.size() == 1)) {
return rh[0]->route(req);
}
}
if (!rh.empty()) {
return routeToAll(rh, req);
Expand All @@ -230,7 +234,7 @@ class RootRoute {
const std::vector<std::shared_ptr<typename RouterInfo::RouteHandleIf>>&
rh,
const McDeleteRequest& req) const {
if (enableCrossRegionDeleteRpc_) {
if (enableCrossRegionDeleteRpc_ && rh.size() > 1) {
auto reqCopy = std::make_shared<const McDeleteRequest>(req);
for (size_t i = 1, e = rh.size(); i < e; ++i) {
auto r = rh[i];
Expand Down

0 comments on commit 49f7a92

Please sign in to comment.