Skip to content

Commit

Permalink
Merge pull request #14436 from rgacogne/ddist19-xsk-backend-race
Browse files Browse the repository at this point in the history
dnsdist-1.9.x: Backport 14429 - Fix a race in the XSK/AF_XDP backend handling code
rgacogne authored Jul 12, 2024
2 parents 84a7c7c + 372d776 commit 7c7f674
Showing 7 changed files with 143 additions and 126 deletions.
10 changes: 6 additions & 4 deletions pdns/dnsdist-lua.cc
Original file line number Diff line number Diff line change
@@ -809,10 +809,11 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck)
std::shared_ptr<XskSocket> socket;
parseXskVars(vars, socket);
if (socket) {
udpCS->xskInfo = XskWorker::create();
udpCS->xskInfo->sharedEmptyFrameOffset = socket->sharedEmptyFrameOffset;
udpCS->xskInfo = XskWorker::create(XskWorker::Type::Bidirectional, socket->sharedEmptyFrameOffset);
socket->addWorker(udpCS->xskInfo);
socket->addWorkerRoute(udpCS->xskInfo, loc);
udpCS->xskInfoResponder = XskWorker::create(XskWorker::Type::OutgoingOnly, socket->sharedEmptyFrameOffset);
socket->addWorker(udpCS->xskInfoResponder);
vinfolog("Enabling XSK in %s mode for incoming UDP packets to %s", socket->getXDPMode(), loc.toStringWithPort());
}
#endif /* HAVE_XSK */
@@ -863,10 +864,11 @@ static void setupLuaConfig(LuaContext& luaCtx, bool client, bool configCheck)
std::shared_ptr<XskSocket> socket;
parseXskVars(vars, socket);
if (socket) {
udpCS->xskInfo = XskWorker::create();
udpCS->xskInfo->sharedEmptyFrameOffset = socket->sharedEmptyFrameOffset;
udpCS->xskInfo = XskWorker::create(XskWorker::Type::Bidirectional, socket->sharedEmptyFrameOffset);
socket->addWorker(udpCS->xskInfo);
socket->addWorkerRoute(udpCS->xskInfo, loc);
udpCS->xskInfoResponder = XskWorker::create(XskWorker::Type::OutgoingOnly, socket->sharedEmptyFrameOffset);
socket->addWorker(udpCS->xskInfoResponder);
vinfolog("Enabling XSK in %s mode for incoming UDP packets to %s", socket->getXDPMode(), loc.toStringWithPort());
}
#endif /* HAVE_XSK */
4 changes: 2 additions & 2 deletions pdns/dnsdist.cc
Original file line number Diff line number Diff line change
@@ -861,9 +861,9 @@ void responderThread(std::shared_ptr<DownstreamState> dss)
continue;
}

if (processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids)) && ids->isXSK() && ids->cs->xskInfo) {
if (processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids)) && ids->isXSK() && ids->cs->xskInfoResponder) {
#ifdef HAVE_XSK
auto& xskInfo = ids->cs->xskInfo;
auto& xskInfo = ids->cs->xskInfoResponder;
auto xskPacket = xskInfo->getEmptyFrame();
if (!xskPacket) {
continue;
1 change: 1 addition & 0 deletions pdns/dnsdist.hh
Original file line number Diff line number Diff line change
@@ -516,6 +516,7 @@ struct ClientState
std::shared_ptr<DOH3Frontend> doh3Frontend{nullptr};
std::shared_ptr<BPFFilter> d_filter{nullptr};
std::shared_ptr<XskWorker> xskInfo{nullptr};
std::shared_ptr<XskWorker> xskInfoResponder{nullptr};
size_t d_maxInFlightQueriesPerConn{1};
size_t d_tcpConcurrentConnectionsLimit{0};
int udpFD{-1};
3 changes: 1 addition & 2 deletions pdns/dnsdistdist/dnsdist-backend.cc
Original file line number Diff line number Diff line change
@@ -905,10 +905,9 @@ void DownstreamState::registerXsk(std::vector<std::shared_ptr<XskSocket>>& xsks)
d_config.sourceMACAddr = d_xskSockets.at(0)->getSourceMACAddress();

for (auto& xsk : d_xskSockets) {
auto xskInfo = XskWorker::create();
auto xskInfo = XskWorker::create(XskWorker::Type::Bidirectional, xsk->sharedEmptyFrameOffset);
d_xskInfos.push_back(xskInfo);
xsk->addWorker(xskInfo);
xskInfo->sharedEmptyFrameOffset = xsk->sharedEmptyFrameOffset;
}
reconnect(false);
}
26 changes: 5 additions & 21 deletions pdns/dnsdistdist/dnsdist-xsk.cc
Original file line number Diff line number Diff line change
@@ -48,11 +48,7 @@ void XskResponderThread(std::shared_ptr<DownstreamState> dss, std::shared_ptr<Xs
if ((pollfds[0].revents & POLLIN) != 0) {
needNotify = true;
xskInfo->cleanSocketNotification();
#if defined(__SANITIZE_THREAD__)
xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
#else
xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) {
#endif
xskInfo->processIncomingFrames([&](XskPacket& packet) {
if (packet.getDataLen() < sizeof(dnsheader)) {
xskInfo->markAsFree(packet);
return;
@@ -77,7 +73,7 @@ void XskResponderThread(std::shared_ptr<DownstreamState> dss, std::shared_ptr<Xs
}
if (!processResponderPacket(dss, response, *localRespRuleActions, *localCacheInsertedRespRuleActions, std::move(*ids))) {
xskInfo->markAsFree(packet);
infolog("XSK packet pushed to queue because processResponderPacket failed");
vinfolog("XSK packet dropped because processResponderPacket failed");
return;
}
if (response.size() > packet.getCapacity()) {
@@ -171,11 +167,7 @@ void XskRouter(std::shared_ptr<XskSocket> xsk)
if ((fds.at(fdIndex).revents & POLLIN) != 0) {
ready--;
const auto& info = xsk->getWorkerByDescriptor(fds.at(fdIndex).fd);
#if defined(__SANITIZE_THREAD__)
info->outgoingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
#else
info->outgoingPacketsQueue.consume_all([&](XskPacket& packet) {
#endif
info->processOutgoingFrames([&](XskPacket& packet) {
if ((packet.getFlags() & XskPacket::UPDATE) == 0) {
xsk->markAsFree(packet);
return;
@@ -207,18 +199,10 @@ void XskClientThread(ClientState* clientState)
LocalHolders holders;

for (;;) {
#if defined(__SANITIZE_THREAD__)
while (xskInfo->incomingPacketsQueue.lock()->read_available() == 0U) {
#else
while (xskInfo->incomingPacketsQueue.read_available() == 0U) {
#endif
while (!xskInfo->hasIncomingFrames()) {
xskInfo->waitForXskSocket();
}
#if defined(__SANITIZE_THREAD__)
xskInfo->incomingPacketsQueue.lock()->consume_all([&](XskPacket& packet) {
#else
xskInfo->incomingPacketsQueue.consume_all([&](XskPacket& packet) {
#endif
xskInfo->processIncomingFrames([&](XskPacket& packet) {
if (XskProcessQuery(*clientState, holders, packet)) {
packet.updatePacket();
xskInfo->pushToSendQueue(packet);
Loading

0 comments on commit 7c7f674

Please sign in to comment.