From 275a7e65f5420d96f3c9b352bfc272ce01be8c75 Mon Sep 17 00:00:00 2001 From: Lenar Fatikhov Date: Thu, 5 Oct 2023 12:53:02 -0700 Subject: [PATCH] add DistributionRoute as a wrapper Summary: Distribution route is used for alternative ways of distributing invalidation requests that go into different regions. Reviewed By: stuclar Differential Revision: D49022531 fbshipit-source-id: a3c7188087d933304384549e823ab5025fe83075 --- mcrouter/Makefile.am | 2 + mcrouter/McSpoolUtils.cpp | 7 +- mcrouter/routes/DistributionRoute-inl.h | 53 ++ mcrouter/routes/DistributionRoute.h | 174 +++++++ mcrouter/routes/McRouteHandleProvider-inl.h | 20 + mcrouter/routes/McRouteHandleProvider.cpp | 5 + .../routes/test/DistributionRouteTest.cpp | 486 ++++++++++++++++++ mcrouter/stat_list.h | 9 + 8 files changed, 753 insertions(+), 3 deletions(-) create mode 100644 mcrouter/routes/DistributionRoute-inl.h create mode 100644 mcrouter/routes/DistributionRoute.h create mode 100644 mcrouter/routes/test/DistributionRouteTest.cpp diff --git a/mcrouter/Makefile.am b/mcrouter/Makefile.am index a0b561286..b25494a25 100644 --- a/mcrouter/Makefile.am +++ b/mcrouter/Makefile.am @@ -147,6 +147,8 @@ libmcroutercore_a_SOURCES = \ routes/McBucketRoute-inl.h \ routes/McBucketRoute.cpp \ routes/McBucketRoute.h \ + routes/DistributionRoute-inl.h \ + routes/DistributionRoute.h \ routes/McExtraRouteHandleProvider-inl.h \ routes/McExtraRouteHandleProvider.h \ routes/McImportResolver.cpp \ diff --git a/mcrouter/McSpoolUtils.cpp b/mcrouter/McSpoolUtils.cpp index b978bd0a3..dfc5ac236 100644 --- a/mcrouter/McSpoolUtils.cpp +++ b/mcrouter/McSpoolUtils.cpp @@ -52,12 +52,13 @@ FOLLY_NOINLINE bool spoolAxonProxy( } // Run off fiber to save fiber stack for serialization auto kvPairs = folly::fibers::runInMainContext([&req, ®ion, &pool]() { - const auto& finalReq = + auto finalReq = req.attributes_ref()->find(memcache::kMcDeleteReqAttrSource) == req.attributes_ref()->cend() - ? addDeleteRequestSource( - req, memcache::McDeleteRequestSource::FAILED_INVALIDATION) + ? std::move(addDeleteRequestSource( + req, memcache::McDeleteRequestSource::FAILED_INVALIDATION)) : req; + finalReq.key_ref()->stripRoutingPrefix(); auto serialized = invalidation::McInvalidationKvPairs::serialize< memcache::McDeleteRequest>(finalReq) .template to(); diff --git a/mcrouter/routes/DistributionRoute-inl.h b/mcrouter/routes/DistributionRoute-inl.h new file mode 100644 index 000000000..fd130596d --- /dev/null +++ b/mcrouter/routes/DistributionRoute-inl.h @@ -0,0 +1,53 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include + +#include "mcrouter/lib/fbi/cpp/ParsingUtil.h" + +namespace facebook::memcache::mcrouter { + +inline DistributionRouteSettings parseDistributionRouteSettings( + const folly::dynamic& json) { + DistributionRouteSettings settings; + settings.replay = false; + if (auto* jReplay = json.get_ptr("replay")) { + settings.replay = parseBool(*jReplay, "replay"); + } + if (auto* jDistributedDeleteRpcEnabled = + json.get_ptr("distributed_delete_rpc_enabled")) { + settings.distributedDeleteRpcEnabled = parseBool( + *jDistributedDeleteRpcEnabled, "distributed_delete_rpc_enabled"); + } + return settings; +} + +// standalone handle +template +typename RouterInfo::RouteHandlePtr makeDistributionRoute( + RouteHandleFactory& factory, + const folly::dynamic& json) { + checkLogic(json.isObject(), "DistributionRoute is not an object"); + checkLogic(json.count("child"), "DistributionRoute: no child route"); + auto settings = parseDistributionRouteSettings(json); + return makeRouteHandleWithInfo( + factory.create(json["child"]), settings); +} + +// wrapper handle +template +typename RouterInfo::RouteHandlePtr makeDistributionRoute( + typename RouterInfo::RouteHandlePtr rh, + const folly::dynamic& json) { + auto settings = parseDistributionRouteSettings(json); + return makeRouteHandleWithInfo( + std::move(rh), settings); +} + +} // namespace facebook::memcache::mcrouter diff --git a/mcrouter/routes/DistributionRoute.h b/mcrouter/routes/DistributionRoute.h new file mode 100644 index 000000000..85b84b6d3 --- /dev/null +++ b/mcrouter/routes/DistributionRoute.h @@ -0,0 +1,174 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include + +#include "mcrouter/McSpoolUtils.h" +#include "mcrouter/McrouterFiberContext.h" +#include "mcrouter/ProxyBase.h" +#include "mcrouter/ProxyRequestContextTyped.h" +#include "mcrouter/lib/Reply.h" +#include "mcrouter/lib/RouteHandleTraverser.h" +#include "mcrouter/lib/invalidation/McInvalidationDefs.h" +#include "mcrouter/lib/network/AccessPoint.h" +#include "mcrouter/routes/RoutingUtils.h" + +namespace facebook::memcache::mcrouter { + +struct DistributionRouteSettings { + bool distributedDeleteRpcEnabled{true}; + bool replay{false}; +}; + +constexpr std::string_view kAsynclogDistributionEndpoint = "0.0.0.0"; + +/** + * The route handle is used to route cross-region requests via DL + * + * Config: + * - distribution_delete_rpc_enabled(bool) - enable sending the request via rpc + * after it is distributed + * - replay(bool) - enable replay mode (for mcreplay) + */ +template +class DistributionRoute { + private: + using RouteHandleIf = typename RouterInfo::RouteHandleIf; + using RouteHandlePtr = typename RouterInfo::RouteHandlePtr; + + public: + DistributionRoute(RouteHandlePtr rh, DistributionRouteSettings& settings) + : rh_(std::move(rh)), + distributedDeleteRpcEnabled_(settings.distributedDeleteRpcEnabled), + replay_{settings.replay} {} + + std::string routeName() const { + return fmt::format( + "distribution|distributed_delete_rpc_enabled={}|replay={}", + distributedDeleteRpcEnabled_ ? "true" : "false", + replay_ ? "true" : "false"); + } + + template + bool traverse( + const Request& req, + const RouteHandleTraverser& t) const { + return t(*rh_, req); + } + + template + ReplyT route(const Request& req) const { + return rh_->route(req); + } + + /** + * Delete can be: + * 1. In-region rpc delete + * 1.1 With no routing prefix + * 1.2 With current region in the prefix + * 2. Cross-region rpc delete having routing prefix with another region + * 3. Broadcast delete with routing prefix = /(star)/(star)/ + * + * If distribution is enabled, we write 2 and 3 to Axon. + * If write to Axon fails, we spool to Async log with the routing prefix. + */ + McDeleteReply route(const McDeleteRequest& req) const { + auto& proxy = fiber_local::getSharedCtx()->proxy(); + // In mcreplay case we try to infer target region from request + auto distributionRegion = FOLLY_LIKELY(!replay_) + ? fiber_local::getDistributionTargetRegion() + : inferDistributionRegionForReplay(req, proxy); + + if (FOLLY_LIKELY(!distributionRegion.has_value())) { + return rh_->route(req); + } + + auto& axonCtx = fiber_local::getAxonCtx(); + auto bucketId = fiber_local::getBucketId(); + assert(axonCtx && bucketId); + + bool spoolSucceeded = false; + auto source = distributionRegion.value().empty() + ? memcache::McDeleteRequestSource::CROSS_REGION_BROADCAST_INVALIDATION + : memcache::McDeleteRequestSource::CROSS_REGION_DIRECTED_INVALIDATION; + auto finalReq = addDeleteRequestSource(req, source); + finalReq.bucketId_ref() = *bucketId; + auto axonLogRes = spoolAxonProxy( + proxy, finalReq, axonCtx, *bucketId, std::move(*distributionRegion)); + if (axonLogRes) { + proxy.stats().increment(distribution_axon_write_success_stat); + } + spoolSucceeded |= axonLogRes; + if (FOLLY_UNLIKELY(!axonLogRes)) { + proxy.stats().increment(distribution_axon_write_failed_stat); + const auto host = + std::make_shared(kAsynclogDistributionEndpoint); + spoolSucceeded |= spoolAsynclog( + &proxy, + finalReq, + host, + true, + fiber_local::getAsynclogName()); + } + if (!spoolSucceeded) { + proxy.stats().increment(distribution_async_spool_failed_stat); + } + // if spool to Axon or Asynclog succeeded and rpc is disabled, we return + // default reply to the client: + if (!distributedDeleteRpcEnabled_) { + return spoolSucceeded ? createReply(DefaultReply, finalReq) + : McDeleteReply(carbon::Result::LOCAL_ERROR); + } + return rh_->route(req); + } + + private: + const RouteHandlePtr rh_; + const bool distributedDeleteRpcEnabled_; + const bool replay_; + + std::optional inferDistributionRegionForReplay( + const McDeleteRequest& req, + ProxyBase& proxy) const { + auto sourceIt = + req.attributes_ref()->find(memcache::kMcDeleteReqAttrSource); + if (FOLLY_LIKELY(sourceIt == req.attributes_ref()->end())) { + return std::nullopt; + } + switch (static_cast(sourceIt->second)) { + case McDeleteRequestSource::CROSS_REGION_DIRECTED_INVALIDATION: + if (!req.key_ref()->routingPrefix().empty()) { + auto routingPrefix = RoutingPrefix(req.key_ref()->routingPrefix()); + return routingPrefix.getRegion().str(); + } + proxy.stats().increment( + distribution_replay_xregion_directed_no_prefix_error_stat); + throw std::logic_error( + "Cross-region directed invalidation request must have routing prefix"); + case McDeleteRequestSource::CROSS_REGION_BROADCAST_INVALIDATION: + return ""; + default: + return std::nullopt; + } + } +}; + +template +typename RouterInfo::RouteHandlePtr makeDistributionRoute( + typename RouterInfo::RouteHandlePtr rh, + const folly::dynamic& json); + +template +typename RouterInfo::RouteHandlePtr makeDistributionRoute( + RouteHandleFactory& factory, + const folly::dynamic& json); + +} // namespace facebook::memcache::mcrouter + +#include "mcrouter/routes/DistributionRoute-inl.h" diff --git a/mcrouter/routes/McRouteHandleProvider-inl.h b/mcrouter/routes/McRouteHandleProvider-inl.h index a25ecfcf3..2a8f8122e 100644 --- a/mcrouter/routes/McRouteHandleProvider-inl.h +++ b/mcrouter/routes/McRouteHandleProvider-inl.h @@ -29,6 +29,7 @@ #include "mcrouter/routes/AllFastestRouteFactory.h" #include "mcrouter/routes/AsynclogRoute.h" #include "mcrouter/routes/DestinationRoute.h" +#include "mcrouter/routes/DistributionRoute.h" #include "mcrouter/routes/ExtraRouteHandleProviderIf.h" #include "mcrouter/routes/FailoverRoute.h" #include "mcrouter/routes/HashRouteFactory.h" @@ -528,6 +529,16 @@ McRouteHandleProvider::createSRRoute( auto route = factoryFunc(factory, json, proxy_); // Track the SRRoute created so that we can save it to SRRoute map later auto srRoute = route; + bool distributionRouteEnabled = false; + + if (auto* jDistribution = json.get_ptr("distribution_enabled")) { + distributionRouteEnabled = + parseBool(*jDistribution, "distribution_enabled"); + } + + if (distributionRouteEnabled) { + route = makeDistributionRoute(std::move(route), json); + } if (auto maxOutstandingJson = json.get_ptr("max_outstanding")) { auto v = parseInt( @@ -680,9 +691,18 @@ McRouteHandleProvider::makePoolRoute( jhashWithWeights, std::move(destinations), factory.getThreadId()); auto poolRoute = route; + bool distributionRouteEnabled = false; auto asynclogName = poolJson.name; bool needAsynclog = true; if (json.isObject()) { + if (auto* jDistribution = json.get_ptr("distribution_enabled")) { + distributionRouteEnabled = + parseBool(*jDistribution, "distribution_enabled"); + } + if (distributionRouteEnabled) { + route = makeDistributionRoute(std::move(route), json); + } + if (auto jrates = json.get_ptr("rates")) { route = createRateLimitRoute(std::move(route), RateLimiter(*jrates)); } diff --git a/mcrouter/routes/McRouteHandleProvider.cpp b/mcrouter/routes/McRouteHandleProvider.cpp index e453b3fe6..772313075 100644 --- a/mcrouter/routes/McRouteHandleProvider.cpp +++ b/mcrouter/routes/McRouteHandleProvider.cpp @@ -18,6 +18,7 @@ #include "mcrouter/routes/BlackholeRoute.h" #include "mcrouter/routes/CarbonLookasideRoute.h" #include "mcrouter/routes/DevNullRoute.h" +#include "mcrouter/routes/DistributionRoute.h" #include "mcrouter/routes/ErrorRoute.h" #include "mcrouter/routes/FailoverRoute.h" #include "mcrouter/routes/FailoverWithExptimeRouteFactory.h" @@ -255,6 +256,10 @@ McRouteHandleProvider::buildRouteMap() { MemcacheRouterInfo, MemcacheCarbonLookasideHelper>}, {"DevNullRoute", &makeDevNullRoute}, + {"DistributionRoute", + [](McRouteHandleFactory& factory, const folly::dynamic& json) { + return makeDistributionRoute(factory, json); + }}, {"ErrorRoute", &makeErrorRoute}, {"FailoverWithExptimeRoute", &makeFailoverWithExptimeRoute}, diff --git a/mcrouter/routes/test/DistributionRouteTest.cpp b/mcrouter/routes/test/DistributionRouteTest.cpp new file mode 100644 index 000000000..ce3a2369b --- /dev/null +++ b/mcrouter/routes/test/DistributionRouteTest.cpp @@ -0,0 +1,486 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include + +#include +#include + +#include "mcrouter/McrouterFiberContext.h" +#include "mcrouter/lib/invalidation/McInvalidationDefs.h" +#include "mcrouter/lib/invalidation/McInvalidationKvPairs.h" +#include "mcrouter/lib/network/gen/MemcacheMessages.h" +#include "mcrouter/lib/network/gen/MemcacheRouterInfo.h" +#include "mcrouter/routes/DistributionRoute.h" +#include "mcrouter/routes/test/RouteHandleTestUtil.h" +#include "thrift/lib/cpp2/protocol/Serializer.h" + +namespace facebook::memcache::mcrouter { + +TEST(DistributionRouteTest, getSetAreForwardedToRpc) { + std::vector> srHandleVec{ + std::make_shared( + GetRouteTestData(carbon::Result::FOUND, "a")), + }; + auto mockSrHandle = get_route_handles(srHandleVec)[0]; + + constexpr folly::StringPiece kDistributionRouteConfig = R"( + { + "distributed_delete_rpc_enabled": true, + "replay": false + } + )"; + + auto rh = makeDistributionRoute( + mockSrHandle, folly::parseJson(kDistributionRouteConfig)); + mockFiberContext(); + TestFiberManager fm; + fm.runAll({[&]() { rh->route(McGetRequest("getReq")); }}); + EXPECT_FALSE(srHandleVec[0]->saw_keys.empty()); + EXPECT_EQ(srHandleVec[0]->saw_keys[0], "getReq"); + EXPECT_EQ("get", srHandleVec[0]->sawOperations[0]); + fm.runAll({[&]() { rh->route(McSetRequest("setReq")); }}); + EXPECT_EQ(srHandleVec[0]->saw_keys[1], "setReq"); + EXPECT_EQ("set", srHandleVec[0]->sawOperations[1]); +} + +TEST(DistributionRouteTest, getSetAreForwardedToRpc2) { + std::vector> srHandleVec{ + std::make_shared( + GetRouteTestData(carbon::Result::FOUND, "a")), + }; + auto mockSrHandle = get_route_handles(srHandleVec)[0]; + + constexpr folly::StringPiece kDistributionRouteConfig = R"( + { + "distributed_delete_rpc_enabled": true, + "replay": false + } + )"; + + auto rh = makeDistributionRoute( + mockSrHandle, folly::parseJson(kDistributionRouteConfig)); + mockFiberContext(); + TestFiberManager fm; + fm.runAll({[&]() { rh->route(McGetRequest("getReq")); }}); + EXPECT_FALSE(srHandleVec[0]->saw_keys.empty()); + EXPECT_EQ(srHandleVec[0]->saw_keys[0], "getReq"); + EXPECT_EQ("get", srHandleVec[0]->sawOperations[0]); + fm.runAll({[&]() { rh->route(McSetRequest("setReq")); }}); + EXPECT_EQ(srHandleVec[0]->saw_keys[1], "setReq"); + EXPECT_EQ("set", srHandleVec[0]->sawOperations[1]); +} + +TEST(DistributionRouteTest, deleteForwardedToRpcIfDisabled) { + std::vector> srHandleVec{ + std::make_shared( + GetRouteTestData(carbon::Result::FOUND, "a")), + }; + auto mockSrHandle = get_route_handles(srHandleVec)[0]; + + constexpr folly::StringPiece kDistributionRouteConfig = R"( + { + "distributed_delete_rpc_enabled": false, + "replay": false + } + )"; + + auto rh = makeDistributionRoute( + mockSrHandle, folly::parseJson(kDistributionRouteConfig)); + mockFiberContext(); + fiber_local::runWithLocals( + [&]() { rh->route(McDeleteRequest("test1")); }); + EXPECT_FALSE(srHandleVec[0]->saw_keys.empty()); + EXPECT_EQ(srHandleVec[0]->saw_keys[0], "test1"); + EXPECT_EQ("delete", srHandleVec[0]->sawOperations[0]); +} + +TEST(DistributionRouteTest, crossRegionDeleteDisabledRpc) { + std::vector> srHandleVec{ + std::make_shared( + GetRouteTestData(carbon::Result::FOUND, "a")), + }; + auto mockSrHandle = get_route_handles(srHandleVec)[0]; + + auto axonCtx = std::make_shared(); + struct Tmp { + uint64_t bucketId; + std::string region; + std::string pool; + std::string serialized; + }; + auto tmp = Tmp{}; + axonCtx->writeProxyFn = [&](auto bucketId, auto&& payload) { + tmp.bucketId = bucketId; + if (payload.find(invalidation::kRegion) != payload.end()) { + tmp.region = payload.at(invalidation::kRegion); + } + if (payload.find(invalidation::kPool) != payload.end()) { + tmp.pool = payload.at(invalidation::kPool); + } + if (payload.find(invalidation::kSerialized) != payload.end()) { + tmp.serialized = payload.at(invalidation::kSerialized); + } + return true; + }; + axonCtx->allDelete = false; + axonCtx->fallbackAsynclog = false; + axonCtx->poolFilter = "testPool"; + + constexpr folly::StringPiece kDistributionRouteConfig = R"( + { + "distributed_delete_rpc_enabled": false, + "replay": false + } + )"; + + auto rh = makeDistributionRoute( + mockSrHandle, folly::parseJson(kDistributionRouteConfig)); + mockFiberContext(); + fiber_local::runWithLocals([&]() { + fiber_local::setAxonCtx(axonCtx); + fiber_local::setBucketId(1234); + fiber_local::setDistributionTargetRegion("georgia"); + rh->route(McDeleteRequest("/georgia/default/test1")); + }); + + // no keys routed to RPC: + EXPECT_TRUE(srHandleVec[0]->saw_keys.empty()); + // spooled to axon: + EXPECT_EQ(tmp.bucketId, 1234); + EXPECT_EQ(tmp.region, "georgia"); + EXPECT_EQ(tmp.pool, "testPool"); + auto req = apache::thrift::CompactSerializer::deserialize( + tmp.serialized); + EXPECT_EQ( + static_cast( + req.attributes_ref()->find(memcache::kMcDeleteReqAttrSource)->second), + memcache::McDeleteRequestSource::CROSS_REGION_DIRECTED_INVALIDATION); +} + +TEST(DistributionRouteTest, crossRegionDeleteEnabledRpc) { + std::vector> srHandleVec{ + std::make_shared( + GetRouteTestData(carbon::Result::FOUND, "a")), + }; + auto mockSrHandle = get_route_handles(srHandleVec)[0]; + + auto axonCtx = std::make_shared(); + struct Tmp { + uint64_t bucketId; + std::string region; + std::string pool; + std::string serialized; + }; + auto tmp = Tmp{}; + axonCtx->writeProxyFn = [&](auto bucketId, auto&& payload) { + tmp.bucketId = bucketId; + if (payload.find(invalidation::kRegion) != payload.end()) { + tmp.region = payload.at(invalidation::kRegion); + } + if (payload.find(invalidation::kPool) != payload.end()) { + tmp.pool = payload.at(invalidation::kPool); + } + if (payload.find(invalidation::kSerialized) != payload.end()) { + tmp.serialized = payload.at(invalidation::kSerialized); + } + return true; + }; + axonCtx->allDelete = false; + axonCtx->fallbackAsynclog = false; + axonCtx->poolFilter = "testPool"; + + constexpr folly::StringPiece kDistributionRouteConfig = R"( + { + "distributed_delete_rpc_enabled": true, + "replay": false + } + )"; + + auto rh = makeDistributionRoute( + mockSrHandle, folly::parseJson(kDistributionRouteConfig)); + mockFiberContext(); + fiber_local::runWithLocals([&]() { + fiber_local::setAxonCtx(axonCtx); + fiber_local::setBucketId(1234); + fiber_local::setDistributionTargetRegion("georgia"); + rh->route(McDeleteRequest("/georgia/default/test1")); + }); + + // the key is routed to RPC: + EXPECT_FALSE(srHandleVec[0]->saw_keys.empty()); + EXPECT_EQ(srHandleVec[0]->saw_keys[0], "/georgia/default/test1"); + EXPECT_EQ("delete", srHandleVec[0]->sawOperations[0]); + // spooled to axon: + EXPECT_EQ(tmp.bucketId, 1234); + EXPECT_EQ(tmp.region, "georgia"); + EXPECT_EQ(tmp.pool, "testPool"); + auto req = apache::thrift::CompactSerializer::deserialize( + tmp.serialized); + EXPECT_EQ( + static_cast( + req.attributes_ref()->find(memcache::kMcDeleteReqAttrSource)->second), + memcache::McDeleteRequestSource::CROSS_REGION_DIRECTED_INVALIDATION); +} + +TEST(DistributionRouteTest, broadcastDeleteDisabledRpc) { + std::vector> srHandleVec{ + std::make_shared( + GetRouteTestData(carbon::Result::FOUND, "a")), + }; + auto mockSrHandle = get_route_handles(srHandleVec)[0]; + + auto axonCtx = std::make_shared(); + struct Tmp { + uint64_t bucketId; + std::string region; + std::string pool; + std::string serialized; + }; + auto tmp = Tmp{}; + axonCtx->writeProxyFn = [&](auto bucketId, auto&& payload) { + tmp.bucketId = bucketId; + if (payload.find(invalidation::kRegion) != payload.end()) { + tmp.region = payload.at(invalidation::kRegion); + } + if (payload.find(invalidation::kPool) != payload.end()) { + tmp.pool = payload.at(invalidation::kPool); + } + if (payload.find(invalidation::kSerialized) != payload.end()) { + tmp.serialized = payload.at(invalidation::kSerialized); + } + return true; + }; + axonCtx->allDelete = false; + axonCtx->fallbackAsynclog = false; + axonCtx->poolFilter = "testPool"; + + constexpr folly::StringPiece kDistributionRouteConfig = R"( + { + "distributed_delete_rpc_enabled": false, + "replay": false + } + )"; + + auto rh = makeDistributionRoute( + mockSrHandle, folly::parseJson(kDistributionRouteConfig)); + mockFiberContext(); + fiber_local::runWithLocals([&]() { + fiber_local::setAxonCtx(axonCtx); + fiber_local::setBucketId(1234); + fiber_local::setDistributionTargetRegion(""); + rh->route(McDeleteRequest("/*/*/test1")); + }); + + // the key is not routed to RPC: + EXPECT_TRUE(srHandleVec[0]->saw_keys.empty()); + // spooled to axon: + EXPECT_EQ(tmp.bucketId, 1234); + EXPECT_TRUE(tmp.region.empty()); + EXPECT_EQ(tmp.pool, "testPool"); + auto req = apache::thrift::CompactSerializer::deserialize( + tmp.serialized); + EXPECT_EQ( + static_cast( + req.attributes_ref()->find(memcache::kMcDeleteReqAttrSource)->second), + memcache::McDeleteRequestSource::CROSS_REGION_BROADCAST_INVALIDATION); +} + +TEST(DistributionRouteTest, broadcastDeleteEnabledRpc) { + std::vector> srHandleVec{ + std::make_shared( + GetRouteTestData(carbon::Result::FOUND, "a")), + }; + auto mockSrHandle = get_route_handles(srHandleVec)[0]; + + auto axonCtx = std::make_shared(); + struct Tmp { + uint64_t bucketId; + std::string region; + std::string pool; + std::string serialized; + }; + auto tmp = Tmp{}; + axonCtx->writeProxyFn = [&](auto bucketId, auto&& payload) { + tmp.bucketId = bucketId; + if (payload.find(invalidation::kRegion) != payload.end()) { + tmp.region = payload.at(invalidation::kRegion); + } + if (payload.find(invalidation::kPool) != payload.end()) { + tmp.pool = payload.at(invalidation::kPool); + } + if (payload.find(invalidation::kSerialized) != payload.end()) { + tmp.serialized = payload.at(invalidation::kSerialized); + } + return true; + }; + axonCtx->allDelete = false; + axonCtx->fallbackAsynclog = false; + axonCtx->poolFilter = "testPool"; + + constexpr folly::StringPiece kDistributionRouteConfig = R"( + { + "distributed_delete_rpc_enabled": true, + "replay": false + } + )"; + + auto rh = makeDistributionRoute( + mockSrHandle, folly::parseJson(kDistributionRouteConfig)); + mockFiberContext(); + fiber_local::runWithLocals([&]() { + fiber_local::setAxonCtx(axonCtx); + fiber_local::setBucketId(1234); + fiber_local::setDistributionTargetRegion(""); + rh->route(McDeleteRequest("/*/*/test1")); + }); + // the key is routed to RPC: + EXPECT_FALSE(srHandleVec[0]->saw_keys.empty()); + EXPECT_EQ(srHandleVec[0]->saw_keys[0], "/*/*/test1"); + EXPECT_EQ("delete", srHandleVec[0]->sawOperations[0]); + // spooled to axon: + EXPECT_EQ(tmp.bucketId, 1234); + EXPECT_TRUE(tmp.region.empty()); + EXPECT_EQ(tmp.pool, "testPool"); + auto req = apache::thrift::CompactSerializer::deserialize( + tmp.serialized); + EXPECT_EQ( + static_cast( + req.attributes_ref()->find(memcache::kMcDeleteReqAttrSource)->second), + memcache::McDeleteRequestSource::CROSS_REGION_BROADCAST_INVALIDATION); +} + +TEST(DistributionRouteTest, broadcastSpooledDelete) { + std::vector> srHandleVec{ + std::make_shared( + GetRouteTestData(carbon::Result::FOUND, "a")), + }; + auto mockSrHandle = get_route_handles(srHandleVec)[0]; + + auto axonCtx = std::make_shared(); + struct Tmp { + uint64_t bucketId; + std::string region; + std::string pool; + std::string serialized; + }; + auto tmp = Tmp{}; + axonCtx->writeProxyFn = [&](auto bucketId, auto&& payload) { + tmp.bucketId = bucketId; + if (payload.find(invalidation::kRegion) != payload.end()) { + tmp.region = payload.at(invalidation::kRegion); + } + if (payload.find(invalidation::kPool) != payload.end()) { + tmp.pool = payload.at(invalidation::kPool); + } + if (payload.find(invalidation::kSerialized) != payload.end()) { + tmp.serialized = payload.at(invalidation::kSerialized); + } + return true; + }; + axonCtx->fallbackAsynclog = false; + axonCtx->poolFilter = "testPool"; + + constexpr folly::StringPiece kDistributionRouteConfig = R"( + { + "distributed_delete_rpc_enabled": false, + "replay": true + } + )"; + + auto rh = makeDistributionRoute( + mockSrHandle, folly::parseJson(kDistributionRouteConfig)); + mockFiberContext(); + fiber_local::runWithLocals([&]() { + fiber_local::setAxonCtx(axonCtx); + fiber_local::setBucketId(1234); + auto req = McDeleteRequest("/*/*/test1"); + req = addDeleteRequestSource( + req, + memcache::McDeleteRequestSource::CROSS_REGION_BROADCAST_INVALIDATION); + rh->route(req); + }); + + // the key is not routed to RPC: + EXPECT_TRUE(srHandleVec[0]->saw_keys.empty()); + // spooled to axon: + EXPECT_EQ(tmp.bucketId, 1234); + EXPECT_TRUE(tmp.region.empty()); + EXPECT_EQ(tmp.pool, "testPool"); + auto req = apache::thrift::CompactSerializer::deserialize( + tmp.serialized); + EXPECT_EQ( + static_cast( + req.attributes_ref()->find(memcache::kMcDeleteReqAttrSource)->second), + memcache::McDeleteRequestSource::CROSS_REGION_BROADCAST_INVALIDATION); +} + +TEST(DistributionRouteTest, crossRegionDirectedSpooledDelete) { + std::vector> srHandleVec{ + std::make_shared( + GetRouteTestData(carbon::Result::FOUND, "a")), + }; + auto mockSrHandle = get_route_handles(srHandleVec)[0]; + + auto axonCtx = std::make_shared(); + struct Tmp { + uint64_t bucketId; + std::string region; + std::string pool; + std::string serialized; + }; + auto tmp = Tmp{}; + axonCtx->writeProxyFn = [&](auto bucketId, auto&& payload) { + tmp.bucketId = bucketId; + if (payload.find(invalidation::kRegion) != payload.end()) { + tmp.region = payload.at(invalidation::kRegion); + } + if (payload.find(invalidation::kPool) != payload.end()) { + tmp.pool = payload.at(invalidation::kPool); + } + if (payload.find(invalidation::kSerialized) != payload.end()) { + tmp.serialized = payload.at(invalidation::kSerialized); + } + return true; + }; + axonCtx->fallbackAsynclog = false; + axonCtx->poolFilter = "testPool"; + constexpr folly::StringPiece kDistributionRouteConfig = R"( + { + "distributed_delete_rpc_enabled": false, + "replay": true + } + )"; + + auto rh = makeDistributionRoute( + mockSrHandle, folly::parseJson(kDistributionRouteConfig)); + mockFiberContext(); + fiber_local::runWithLocals([&]() { + fiber_local::setAxonCtx(axonCtx); + fiber_local::setBucketId(1234); + auto req = McDeleteRequest("/altoona/default/test1"); + req = addDeleteRequestSource( + req, + memcache::McDeleteRequestSource::CROSS_REGION_DIRECTED_INVALIDATION); + rh->route(req); + }); + + // the key is not routed to RPC: + EXPECT_TRUE(srHandleVec[0]->saw_keys.empty()); + // spooled to axon: + EXPECT_EQ(tmp.bucketId, 1234); + EXPECT_EQ(tmp.region, "altoona"); + EXPECT_EQ(tmp.pool, "testPool"); + auto req = apache::thrift::CompactSerializer::deserialize( + tmp.serialized); + EXPECT_EQ( + static_cast( + req.attributes_ref()->find(memcache::kMcDeleteReqAttrSource)->second), + memcache::McDeleteRequestSource::CROSS_REGION_DIRECTED_INVALIDATION); +} + +} // namespace facebook::memcache::mcrouter diff --git a/mcrouter/stat_list.h b/mcrouter/stat_list.h index c7a801fa1..d22657b54 100644 --- a/mcrouter/stat_list.h +++ b/mcrouter/stat_list.h @@ -162,6 +162,15 @@ STUI(proxy_queue_full, 0, 1) STUI(proxy_queues_all_full, 0, 1) // number of request routed using McBucketRoute STUI(bucketized_routing, 0, 1) +// distribution stats +STUI(distribution_axon_write_success, 0, 1) +STUI(distribution_axon_write_failed, 0, 1) +STUI(distribution_async_spool_failed, 0, 1) +STUI(distribution_replay_no_source, 0, 1) +STUI(distribution_replay_xregion_directed, 0, 1) +STUI(distribution_replay_xregion_directed_no_prefix_error, 0, 1) +STUI(distribution_replay_xregion_broadcast, 0, 1) +STUI(distribution_replay_other, 0, 1) STAT(client_queue_notify_period, stat_double, 0, .dbl = 0.0) #undef GROUP #define GROUP ods_stats | detailed_stats