From 49f7a920ed28fae30cc319fb373ef44a57d4ee00 Mon Sep 17 00:00:00 2001 From: Lenar Fatikhov Date: Thu, 14 Dec 2023 15:47:11 -0800 Subject: [PATCH] re-land distribution scuba logging Summary: Original diff D50904038 Reviewed By: stuclar Differential Revision: D51837515 fbshipit-source-id: 2c7bbf92f199cec42b387533186e73dcd5e24cad --- mcrouter/lib/network/AccessPoint.cpp | 6 +++ mcrouter/lib/network/AccessPoint.h | 2 + mcrouter/routes/DistributionRoute.h | 73 ++++++++++++++++++++++++---- mcrouter/routes/RootRoute.h | 12 +++-- 4 files changed, 80 insertions(+), 13 deletions(-) diff --git a/mcrouter/lib/network/AccessPoint.cpp b/mcrouter/lib/network/AccessPoint.cpp index 48ae9c821..58ca88db5 100644 --- a/mcrouter/lib/network/AccessPoint.cpp +++ b/mcrouter/lib/network/AccessPoint.cpp @@ -222,5 +222,11 @@ std::string AccessPoint::toString() const { compressed_ ? "compressed" : "notcompressed"); } +const AccessPoint& AccessPoint::defaultAp() { + static const AccessPoint ap = + AccessPoint(folly::IPAddress(), 0, 0, mc_thrift_protocol); + return ap; +} + } // namespace memcache } // namespace facebook diff --git a/mcrouter/lib/network/AccessPoint.h b/mcrouter/lib/network/AccessPoint.h index cee92066c..7b4375b6d 100644 --- a/mcrouter/lib/network/AccessPoint.h +++ b/mcrouter/lib/network/AccessPoint.h @@ -46,6 +46,8 @@ static_assert( "McProtocolT must have uint8_t alignemnt"); struct AccessPoint { + static const AccessPoint& defaultAp(); + explicit AccessPoint( folly::StringPiece host = "", uint16_t port = 0, diff --git a/mcrouter/routes/DistributionRoute.h b/mcrouter/routes/DistributionRoute.h index e01817b58..594267338 100644 --- a/mcrouter/routes/DistributionRoute.h +++ b/mcrouter/routes/DistributionRoute.h @@ -27,7 +27,8 @@ struct DistributionRouteSettings { }; constexpr std::string_view kAsynclogDistributionEndpoint = "0.0.0.0"; -constexpr folly::StringPiece kBroadcastRolloutMessage = "DistributionRoute"; +constexpr std::string_view kBroadcastRolloutMessage = "DistributionRoute"; +constexpr std::string_view kDistributionTargetMarkerForLog = "dl_distribution"; /** * The route handle is used to route cross-region requests via DL @@ -80,7 +81,8 @@ class DistributionRoute { * If write to Axon fails, we spool to Async log with the routing prefix. */ McDeleteReply route(const McDeleteRequest& req) const { - auto& proxy = fiber_local::getSharedCtx()->proxy(); + auto& ctx = *fiber_local::getSharedCtx(); + auto& proxy = ctx.proxy(); // In mcreplay case we try to infer target region from request auto distributionRegion = FOLLY_LIKELY(!replay_) ? fiber_local::getDistributionTargetRegion() @@ -100,13 +102,24 @@ class DistributionRoute { : memcache::McDeleteRequestSource::CROSS_REGION_DIRECTED_INVALIDATION; auto finalReq = addDeleteRequestSource(req, source); finalReq.bucketId_ref() = fmt::to_string(*bucketId); + DestinationRequestCtx dctx(nowUs()); + onBeforeDistribution(finalReq, ctx, *finalReq.bucketId_ref(), dctx); + auto axonLogRes = spoolAxonProxy( finalReq, axonCtx, *bucketId, std::move( - distributionRegion.value().empty() ? kBroadcastRolloutMessage.str() - : *distributionRegion)); + distributionRegion.value().empty() + ? std::string(kBroadcastRolloutMessage) + : *distributionRegion)); + + auto reply = axonLogRes ? createReply(DefaultReply, finalReq) + : McDeleteReply(carbon::Result::LOCAL_ERROR); + + dctx.endTime = nowUs(); + onAfterDistribution(finalReq, reply, ctx, *finalReq.bucketId_ref(), dctx); + if (axonLogRes) { proxy.stats().increment(distribution_axon_write_success_stat); } @@ -121,15 +134,17 @@ class DistributionRoute { host, true, fiber_local::getAsynclogName()); - } - if (!spoolSucceeded) { - proxy.stats().increment(distribution_async_spool_failed_stat); + if (spoolSucceeded) { + // update reply if axon failed but spool succeeded + reply = createReply(DefaultReply, finalReq); + } else { + proxy.stats().increment(distribution_async_spool_failed_stat); + } } // if spool to Axon or Asynclog succeeded and rpc is disabled, we return // default reply to the client: if (!distributedDeleteRpcEnabled_) { - return spoolSucceeded ? createReply(DefaultReply, finalReq) - : McDeleteReply(carbon::Result::LOCAL_ERROR); + return reply; } return rh_->route(req); } @@ -163,6 +178,46 @@ class DistributionRoute { return std::nullopt; } } + + void onBeforeDistribution( + const McDeleteRequest& req, + ProxyRequestContextWithInfo& ctx, + const std::string& bucketId, + const DestinationRequestCtx& dctx) const { + ctx.onBeforeRequestSent( + /*poolName*/ kDistributionTargetMarkerForLog, + /*ap*/ AccessPoint::defaultAp(), + /*strippedRoutingPrefix*/ folly::StringPiece(), + /*request*/ req, + /*requestClass*/ fiber_local::getRequestClass(), + /*startTimeUs*/ dctx.startTime, + /*bucketId*/ bucketId); + } + + void onAfterDistribution( + const McDeleteRequest& req, + const McDeleteReply& reply, + ProxyRequestContextWithInfo& ctx, + const std::string& bucketId, + const DestinationRequestCtx& dctx) const { + RpcStatsContext rpcContext; + ctx.onReplyReceived( + /*poolName*/ kDistributionTargetMarkerForLog, + /*poolIndex*/ std::nullopt, + /*ap*/ AccessPoint::defaultAp(), + /*strippedRoutingPrefix*/ folly::StringPiece(), + /*request*/ req, + /*reply*/ reply, + /*requestClass*/ fiber_local::getRequestClass(), + /*startTimeUs*/ dctx.startTime, + /*endTimeUs*/ dctx.endTime, + /*poolStatIndex*/ -1, + /*rpcStatsContext*/ rpcContext, + /*networkTransportTimeUs*/ + fiber_local::getNetworkTransportTimeUs(), + /*extraDataCallback*/ fiber_local::getExtraDataCallbacks(), + /*bucketId*/ bucketId); + } }; template diff --git a/mcrouter/routes/RootRoute.h b/mcrouter/routes/RootRoute.h index 7f948eb3e..fb72551ad 100644 --- a/mcrouter/routes/RootRoute.h +++ b/mcrouter/routes/RootRoute.h @@ -103,7 +103,7 @@ class RootRoute { McDeleteReply reply; if (enableDeleteDistribution_ && !req.key_ref()->routingPrefix().empty()) { auto routingPrefix = RoutingPrefix(req.key_ref()->routingPrefix()); - if (routingPrefix.str() != defaultRoute_.str() && + if (routingPrefix.getRegion() != defaultRoute_.getRegion() && req.key_ref()->routingPrefix() != kBroadcastPrefix) { reply = fiber_local::runWithLocals( [this, &req, &routingPrefix]() { @@ -204,8 +204,12 @@ class RootRoute { const std::vector>& rh, const Request& req) const { - if (FOLLY_LIKELY(rh.size() == 1)) { - return rh[0]->route(req); + // for deletes, we cannot assume that there's only one target + // even if it's a broadcast delete + if (!folly::IsOneOf::value) { + if (FOLLY_LIKELY(rh.size() == 1)) { + return rh[0]->route(req); + } } if (!rh.empty()) { return routeToAll(rh, req); @@ -230,7 +234,7 @@ class RootRoute { const std::vector>& rh, const McDeleteRequest& req) const { - if (enableCrossRegionDeleteRpc_) { + if (enableCrossRegionDeleteRpc_ && rh.size() > 1) { auto reqCopy = std::make_shared(req); for (size_t i = 1, e = rh.size(); i < e; ++i) { auto r = rh[i];