Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Refactor Node DNS Resolver to use vertx virtual threads #7189

Merged
merged 3 commits into from
Jun 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ public void start() {
public void stop() {
LOG.info("Stopping DNSDaemon for {}", enrLink);
periodicTaskId.ifPresent(vertx::cancelTimer);
dnsResolver.close();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

import com.google.common.base.Splitter;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.dns.DnsClient;
import io.vertx.core.dns.DnsClientOptions;
Expand All @@ -42,9 +39,8 @@

// Adapted from https://github.com/tmio/tuweni and licensed under Apache 2.0
/** Resolves a set of ENR nodes from a host name. */
public class DNSResolver implements AutoCloseable {
public class DNSResolver {
private static final Logger LOG = LoggerFactory.getLogger(DNSResolver.class);
private final ExecutorService rawTxtRecordsExecutor = Executors.newSingleThreadExecutor();
private final String enrLink;
private long seq;
private final DnsClient dnsClient;
Expand Down Expand Up @@ -118,7 +114,7 @@ public long sequence() {
private void visitTree(final ENRTreeLink link, final DNSVisitor visitor) {
Optional<DNSEntry> optionalEntry = resolveRecord(link.domainName());
if (optionalEntry.isEmpty()) {
LOG.debug("No DNS record found for {}", link.domainName());
LOG.trace("No DNS record found for {}", link.domainName());
return;
}

Expand Down Expand Up @@ -146,32 +142,30 @@ private boolean internalVisit(
final String entryName, final String domainName, final DNSVisitor visitor) {
final Optional<DNSEntry> optionalDNSEntry = resolveRecord(entryName + "." + domainName);
if (optionalDNSEntry.isEmpty()) {
LOG.debug("No DNS record found for {}", entryName + "." + domainName);
return true;
}

final DNSEntry entry = optionalDNSEntry.get();
if (entry instanceof ENRNode node) {
// TODO: this always return true because the visitor is reference to list.add
return visitor.visit(node.nodeRecord());
} else if (entry instanceof DNSEntry.ENRTree tree) {
for (String e : tree.entries()) {
// TODO: When would this ever return false?
boolean keepGoing = internalVisit(e, domainName, visitor);
if (!keepGoing) {
return false;
switch (entry) {
case ENRNode node -> {
return visitor.visit(node.nodeRecord());
}
case DNSEntry.ENRTree tree -> {
for (String e : tree.entries()) {
boolean keepGoing = internalVisit(e, domainName, visitor);
if (!keepGoing) {
return false;
}
}
}
} else if (entry instanceof ENRTreeLink link) {
visitTree(link, visitor);
} else {
LOG.debug("Unsupported type of node {}", entry);
case ENRTreeLink link -> visitTree(link, visitor);
default -> LOG.debug("Unsupported type of node {}", entry);
}
return true;
}

/**
* Resolves one DNS record associated with the given domain name.
* Maps TXT DNS record to DNSEntry.
*
* @param domainName the domain name to query
* @return the DNS entry read from the domain. Empty if no record is found.
Expand All @@ -187,51 +181,21 @@ Optional<DNSEntry> resolveRecord(final String domainName) {
* @return the first TXT entry of the DNS record. Empty if no record is found.
*/
Optional<String> resolveRawRecord(final String domainName) {
// vertx-dns is async, kotlin coroutines allows us to await, similarly Java 21 new thread
// model would also allow us to await. For now, we will use CountDownLatch to block the
// current thread until the DNS resolution is complete.
LOG.debug("Resolving TXT records on domain: {}", domainName);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Optional<String>> record = new AtomicReference<>(Optional.empty());
rawTxtRecordsExecutor.submit(
() -> {
dnsClient
.resolveTXT(domainName)
.onComplete(
ar -> {
if (ar.succeeded()) {
LOG.trace(
"TXT record resolved on domain {}. Result: {}", domainName, ar.result());
record.set(ar.result().stream().findFirst());
} else {
LOG.trace(
"TXT record not resolved on domain {}, because: {}",
domainName,
ar.cause().getMessage());
}
latch.countDown();
});
});

LOG.trace("Resolving TXT records on domain: {}", domainName);
try {
// causes the worker thread to wait. Once we move to Java 21, this can be simplified.
latch.await();
} catch (InterruptedException e) {
LOG.debug("Interrupted while waiting for DNS resolution");
// Future.await parks current virtual thread and waits for the result. Any failure is
// thrown as a Throwable.
return Future.await(dnsClient.resolveTXT(domainName)).stream().findFirst();
} catch (final Throwable e) {
LOG.trace("Error while resolving TXT records on domain: {}", domainName, e);
return Optional.empty();
}

return record.get();
}

private boolean checkSignature(
final ENRTreeRoot root, final SECP256K1.PublicKey pubKey, final SECP256K1.Signature sig) {
Bytes32 hash =
final Bytes32 hash =
Hash.keccak256(Bytes.wrap(root.signedContent().getBytes(StandardCharsets.UTF_8)));
return SECP256K1.verifyHashed(hash, sig, pubKey);
}

@Override
public void close() {
rawTxtRecordsExecutor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
import io.vertx.core.Future;
import io.vertx.core.ThreadingModel;
import io.vertx.core.Vertx;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.devp2p.EthereumNodeRecord;
import org.slf4j.Logger;
Expand Down Expand Up @@ -152,7 +151,7 @@ public class DefaultP2PNetwork implements P2PNetwork {
private final CountDownLatch shutdownLatch = new CountDownLatch(2);
private final Duration shutdownTimeout = Duration.ofSeconds(15);
private final Vertx vertx;
private final AtomicReference<Optional<Pair<String, DNSDaemon>>> dnsDaemonRef =
private final AtomicReference<Optional<DNSDaemon>> dnsDaemonRef =
new AtomicReference<>(Optional.empty());

/**
Expand Down Expand Up @@ -242,17 +241,16 @@ public void start() {
600000L,
config.getDnsDiscoveryServerOverride().orElse(null));

// TODO: Java 21, we can move to Virtual Thread model
// Use Java 21 virtual thread to deploy verticle
final DeploymentOptions options =
new DeploymentOptions()
.setThreadingModel(ThreadingModel.WORKER)
.setThreadingModel(ThreadingModel.VIRTUAL_THREAD)
.setInstances(1)
.setWorkerPoolSize(1);

final Future<String> deployId = vertx.deployVerticle(dnsDaemon, options);
final String dnsDaemonDeployId =
deployId.toCompletionStage().toCompletableFuture().join();
dnsDaemonRef.set(Optional.of(Pair.of(dnsDaemonDeployId, dnsDaemon)));
deployId.toCompletionStage().toCompletableFuture().join();
dnsDaemonRef.set(Optional.of(dnsDaemon));
});

final int listeningPort = rlpxAgent.start().join();
Expand Down Expand Up @@ -301,7 +299,7 @@ public void stop() {

// since dnsDaemon is a vertx verticle, vertx.close will undeploy it.
// However, we can safely call stop as well.
dnsDaemonRef.get().map(Pair::getRight).ifPresent(DNSDaemon::stop);
dnsDaemonRef.get().ifPresent(DNSDaemon::stop);

peerConnectionScheduler.shutdownNow();
peerDiscoveryAgent.stop().whenComplete((res, err) -> shutdownLatch.countDown());
Expand Down Expand Up @@ -358,7 +356,7 @@ public boolean removeMaintainedConnectionPeer(final Peer peer) {

@VisibleForTesting
Optional<DNSDaemon> getDnsDaemon() {
return dnsDaemonRef.get().map(Pair::getRight);
return dnsDaemonRef.get();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.security.Security;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -38,12 +37,11 @@
class DNSDaemonTest {
private static final String holeskyEnr =
"enrtree://AKA3AM6LPBYEUDMVNU3BSVQJ5AD45Y7YPOHJLEF6W26QOE4VTUDPE@all.holesky.ethdisco.net";
// private static MockDNSServer mockDNSServer;
private final MockDnsServerVerticle mockDnsServerVerticle = new MockDnsServerVerticle();
private DNSDaemon dnsDaemon;

@BeforeAll
static void setup() throws IOException {
static void setup() {
Security.addProvider(new BouncyCastleProvider());
}

Expand All @@ -68,7 +66,9 @@ void testDNSDaemon(final Vertx vertx, final VertxTestContext testContext)
"localhost:" + mockDnsServerVerticle.port());

final DeploymentOptions options =
new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER).setWorkerPoolSize(1);
new DeploymentOptions()
.setThreadingModel(ThreadingModel.VIRTUAL_THREAD)
.setWorkerPoolSize(1);
vertx.deployVerticle(dnsDaemon, options);
}

Expand Down Expand Up @@ -109,7 +109,9 @@ void testDNSDaemonPeriodic(final Vertx vertx, final VertxTestContext testContext
"localhost:" + mockDnsServerVerticle.port());

final DeploymentOptions options =
new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER).setWorkerPoolSize(1);
new DeploymentOptions()
.setThreadingModel(ThreadingModel.VIRTUAL_THREAD)
.setWorkerPoolSize(1);
vertx.deployVerticle(dnsDaemon, options);
}

Expand Down
Loading