Skip to content

Commit

Permalink
Merge pull request #15311 from cdapio/feature/CDAP-20790-internal-rou…
Browse files Browse the repository at this point in the history
…ter-proxying

[CDAP-20790] Localize cConf as a configmap and support use of the internal router in workers
  • Loading branch information
arjan-bal authored Sep 13, 2023
2 parents 537671f + f90967a commit d9ec268
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.cdap.cdap.app.store.preview.PreviewStore;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.InternalRouter;
import io.cdap.cdap.common.conf.Constants.Preview;
import io.cdap.cdap.common.conf.SConfiguration;
import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider;
import io.cdap.cdap.common.utils.DirUtils;
Expand All @@ -37,6 +39,7 @@
import io.cdap.cdap.internal.app.runtime.ProgramOptionConstants;
import io.cdap.cdap.internal.app.worker.sidecar.ArtifactLocalizerTwillRunnable;
import io.cdap.cdap.master.spi.twill.DependentTwillPreparer;
import io.cdap.cdap.master.spi.twill.ExtendedTwillPreparer;
import io.cdap.cdap.master.spi.twill.SecretDisk;
import io.cdap.cdap.master.spi.twill.SecureTwillPreparer;
import io.cdap.cdap.master.spi.twill.SecurityContext;
Expand Down Expand Up @@ -167,13 +170,18 @@ public void run() {
Path runDir = Files.createTempDirectory(tmpDir, "preview");
try {
CConfiguration cConfCopy = CConfiguration.copy(cConf);
Path cConfPath = runDir.resolve("cConf.xml");
if (!cConf.getBoolean(Constants.Twill.Security.WORKER_MOUNT_SECRET)) {
// Unset the internal certificate path since certificate is stored cdap-security which
// is not going to be exposed to preview runner.
// TODO: CDAP-18768 this will break preview when certificate checking is enabled.
cConfCopy.unset(Constants.Security.SSL.INTERNAL_CERT_PATH);
}
// Enable the use of internal router in the preview runner pods if
// required.
cConfCopy.setBoolean(InternalRouter.CLIENT_ENABLED,
cConf.getBoolean(Preview.INTERNAL_ROUTER_ENABLED));

Path cConfPath = runDir.resolve("cConf.xml");
try (Writer writer = Files.newBufferedWriter(cConfPath, StandardCharsets.UTF_8)) {
cConfCopy.writeXml(writer);
}
Expand All @@ -189,8 +197,7 @@ public void run() {
.setInstances(cConf.getInt(Constants.Preview.CONTAINER_COUNT))
.build();

Optional<ResourceSpecification> artifactLocalizerResourceSpec = Optional.empty();
artifactLocalizerResourceSpec = Optional.of(
Optional<ResourceSpecification> artifactLocalizerResourceSpec = Optional.of(
ResourceSpecification.Builder.with()
.setVirtualCores(cConf.getInt(Constants.ArtifactLocalizer.CONTAINER_CORES))
.setMemory(cConf.getInt(Constants.ArtifactLocalizer.CONTAINER_MEMORY_MB),
Expand All @@ -211,6 +218,14 @@ public void run() {
configMap.put(ProgramOptionConstants.RUNTIME_NAMESPACE,
NamespaceId.SYSTEM.getNamespace());
twillPreparer.withConfiguration(Collections.unmodifiableMap(configMap));
// If internal router is enabled, we need to localize the cdap-site copy
// as a configmap so that the init container also uses the internal
// router.
if (twillPreparer instanceof ExtendedTwillPreparer) {
twillPreparer = ((ExtendedTwillPreparer) twillPreparer)
.setShouldLocalizeConfigurationAsConfigmap(
cConf.getBoolean(Preview.INTERNAL_ROUTER_ENABLED));
}

if (Feature.NAMESPACED_SERVICE_ACCOUNTS.isEnabled(featureFlagsProvider)) {
String localhost = InetAddress.getLoopbackAddress().getHostName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.cdap.cdap.api.feature.FeatureFlagsProvider;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.InternalRouter;
import io.cdap.cdap.common.conf.Constants.TaskWorker;
import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider;
import io.cdap.cdap.common.utils.DirUtils;
Expand Down Expand Up @@ -154,6 +155,12 @@ public void run() {
// is not exposed (i.e. mounted in k8s) to TaskWorkerService.
CConfiguration cConfCopy = CConfiguration.copy(cConf);
cConfCopy.unset(Constants.Security.SSL.INTERNAL_CERT_PATH);

// Enable the use of internal router in the task worker pods if
// required.
cConfCopy.setBoolean(InternalRouter.CLIENT_ENABLED,
cConf.getBoolean(TaskWorker.INTERNAL_ROUTER_ENABLED));

Path cConfPath = runDir.resolve("cConf.xml");
try (Writer writer = Files.newBufferedWriter(cConfPath, StandardCharsets.UTF_8)) {
cConfCopy.writeXml(writer);
Expand Down Expand Up @@ -184,6 +191,14 @@ public void run() {
new TaskWorkerTwillApplication(cConfPath.toUri(), hConfPath.toUri(),
taskworkerResourceSpec,
artifactLocalizerResourceSpec));
// If internal router is enabled, we need to localize the cdap-site copy
// as a configmap so that the init container also uses the internal
// router.
if (twillPreparer instanceof ExtendedTwillPreparer) {
twillPreparer = ((ExtendedTwillPreparer) twillPreparer)
.setShouldLocalizeConfigurationAsConfigmap(
cConf.getBoolean(TaskWorker.INTERNAL_ROUTER_ENABLED));
}

Map<String, String> configMap = new HashMap<>();
configMap.put(ProgramOptionConstants.RUNTIME_NAMESPACE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
import com.google.inject.Inject;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.InternalRouter;
import io.cdap.cdap.common.conf.Constants.SystemWorker;
import io.cdap.cdap.common.conf.Constants.Twill.Security;
import io.cdap.cdap.common.conf.SConfiguration;
import io.cdap.cdap.common.utils.DirUtils;
import io.cdap.cdap.internal.app.runtime.ProgramOptionConstants;
import io.cdap.cdap.master.spi.twill.ExtendedTwillPreparer;
import io.cdap.cdap.master.spi.twill.SecretDisk;
import io.cdap.cdap.master.spi.twill.SecureTwillPreparer;
import io.cdap.cdap.master.spi.twill.SecurityContext;
Expand All @@ -49,6 +52,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Launches a pool of system workers.
*/
public class SystemWorkerServiceLauncher extends AbstractScheduledService {

private static final Logger LOG = LoggerFactory.getLogger(SystemWorkerServiceLauncher.class);
Expand All @@ -61,6 +67,9 @@ public class SystemWorkerServiceLauncher extends AbstractScheduledService {

private ScheduledExecutorService executor;

/**
* Default Constructor with injected configuration and {@link TwillRunner}.
*/
@Inject
public SystemWorkerServiceLauncher(CConfiguration cConf, Configuration hConf,
SConfiguration sConf,
Expand Down Expand Up @@ -110,6 +119,9 @@ protected final ScheduledExecutorService executor() {
return executor;
}

/**
* Inner run method for the service.
*/
public void run() {
TwillController activeController = null;
for (TwillController controller : twillRunner.lookup(SystemWorkerTwillApplication.NAME)) {
Expand All @@ -130,6 +142,11 @@ public void run() {
Path runDir = Files.createTempDirectory(tmpDir, "system.worker.launcher");
try {
CConfiguration cConfCopy = CConfiguration.copy(cConf);
// Enable the use of internal router in the system worker pods if
// required.
cConfCopy.setBoolean(InternalRouter.CLIENT_ENABLED,
cConf.getBoolean(SystemWorker.INTERNAL_ROUTER_ENABLED));

Path cConfPath = runDir.resolve("cConf.xml");
try (Writer writer = Files.newBufferedWriter(cConfPath, StandardCharsets.UTF_8)) {
cConfCopy.writeXml(writer);
Expand Down Expand Up @@ -157,6 +174,14 @@ public void run() {
new SystemWorkerTwillApplication(cConfPath.toUri(), hConfPath.toUri(),
sConfPath.toUri(),
systemResourceSpec));
// If internal router is enabled, we need to localize the cdap-site copy
// as a configmap so that the init container also uses the internal
// router.
if (twillPreparer instanceof ExtendedTwillPreparer) {
twillPreparer = ((ExtendedTwillPreparer) twillPreparer)
.setShouldLocalizeConfigurationAsConfigmap(
cConf.getBoolean(SystemWorker.INTERNAL_ROUTER_ENABLED));
}

Map<String, String> configMap = new HashMap<>();
configMap.put(ProgramOptionConstants.RUNTIME_NAMESPACE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ public static final class Preview {
public static final String CONTAINER_PRIORITY_CLASS_NAME = "preview.runner.container.priority.class.name";
public static final String CONTAINER_JVM_OPTS = "preview.runner.container.jvm.opts";
public static final String GCE_METADATA_HOST_ENV_VAR = "GCE_METADATA_HOST";
public static final String INTERNAL_ROUTER_ENABLED = "preview.runner.internal.router.enabled";
}

/**
Expand Down Expand Up @@ -520,6 +521,7 @@ public static final class TaskWorker {
"task.worker.systemapp.http.client.read.timeout.ms";
public static final String SYSTEMAPP_HTTP_CLIENT_CONNECTION_TIMEOUT_MS =
"task.worker.systemapp.http.client.connection.timeout.ms";
public static final String INTERNAL_ROUTER_ENABLED = "task.worker.internal.router.enabled";

/**
* Task worker http handler configuration.
Expand Down Expand Up @@ -551,6 +553,7 @@ public static final class SystemWorker {
public static final String HTTP_CLIENT_CONNECTION_TIMEOUT_MS = "system.worker.http.client.connection.timeout.ms";
public static final String TWILL_CONTROLLER_START_SECONDS = "system.worker.program.twill.controller.start.seconds";
public static final String ARTIFACT_LOCALIZER_ENABLED = "system.worker.artifact.localizer.enabled";
public static final String INTERNAL_ROUTER_ENABLED = "system.worker.internal.router.enabled";

/**
* System worker http handler configuration.
Expand Down
27 changes: 27 additions & 0 deletions cdap-common/src/main/resources/cdap-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3743,6 +3743,15 @@
</description>
</property>

<property>
<name>preview.runner.internal.router.enabled</name>
<value>false</value>
<description>
Whether to route requests from preview runners through the internal router service. This
is only applicable for k8s environment presently. By default, it is disabled.
</description>
</property>

<property>
<name>service.retry.policy.base.delay.ms</name>
<value>100</value>
Expand Down Expand Up @@ -5277,6 +5286,15 @@
</description>
</property>

<property>
<name>task.worker.internal.router.enabled</name>
<value>false</value>
<description>
Whether to route requests from task workers through the internal router service. This
is only applicable for k8s environment presently. By default, it is disabled.
</description>
</property>

<property>
<name>task.worker.container.jvm.opts</name>
<value>-XX:+UseG1GC -XX:+ExitOnOutOfMemoryError</value>
Expand Down Expand Up @@ -5459,6 +5477,15 @@
</description>
</property>

<property>
<name>system.worker.internal.router.enabled</name>
<value>false</value>
<description>
Whether to route requests from system workers through the internal router service. This
is only applicable for k8s environment presently. By default, it is disabled.
</description>
</property>

<property>
<name>system.worker.http.client.read.timeout.ms</name>
<value>300000</value>
Expand Down
Loading

0 comments on commit d9ec268

Please sign in to comment.