Skip to content

Commit

Permalink
Merge pull request #15312 from cdapio/CDAP-20795
Browse files Browse the repository at this point in the history
[CDAP-20795] Create NamespaceWorkloadIdentity and GcpWorkloadIdentityHandler
  • Loading branch information
itsankit-google authored Oct 4, 2023
2 parents 23d301f + b66bbb9 commit d23c4a5
Show file tree
Hide file tree
Showing 19 changed files with 873 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.inject.name.Named;
import com.google.inject.name.Names;
import com.google.inject.util.Modules;
import io.cdap.cdap.api.feature.FeatureFlagsProvider;
import io.cdap.cdap.app.deploy.Configurator;
import io.cdap.cdap.app.deploy.Manager;
import io.cdap.cdap.app.deploy.ManagerFactory;
Expand All @@ -45,11 +46,13 @@
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.AppFabric;
import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider;
import io.cdap.cdap.common.guice.RemoteAuthenticatorModules;
import io.cdap.cdap.common.runtime.RuntimeModule;
import io.cdap.cdap.common.utils.Networks;
import io.cdap.cdap.config.guice.ConfigStoreModule;
import io.cdap.cdap.data.security.DefaultSecretStore;
import io.cdap.cdap.features.Feature;
import io.cdap.cdap.gateway.handlers.AppLifecycleHttpHandler;
import io.cdap.cdap.gateway.handlers.AppLifecycleHttpHandlerInternal;
import io.cdap.cdap.gateway.handlers.AppStateHandler;
Expand Down Expand Up @@ -131,6 +134,8 @@
import io.cdap.cdap.internal.events.SparkProgramStatusMetricsProvider;
import io.cdap.cdap.internal.events.StartProgramEventReaderExtensionProvider;
import io.cdap.cdap.internal.events.StartProgramEventSubscriber;
import io.cdap.cdap.internal.namespace.credential.handler.GcpWorkloadIdentityHttpHandler;
import io.cdap.cdap.internal.namespace.credential.handler.GcpWorkloadIdentityHttpHandlerInternal;
import io.cdap.cdap.internal.pipeline.SynchronousPipelineFactory;
import io.cdap.cdap.internal.profile.ProfileService;
import io.cdap.cdap.internal.provision.ProvisionerModule;
Expand Down Expand Up @@ -186,7 +191,7 @@ public AppFabricServiceRuntimeModule(CConfiguration cConf) {

@Override
public Module getInMemoryModules() {
return Modules.combine(new AppFabricServiceModule(),
return Modules.combine(new AppFabricServiceModule(cConf),
new CapabilityModule(),
new NamespaceAdminModule().getInMemoryModules(),
new ConfigStoreModule(),
Expand Down Expand Up @@ -227,7 +232,7 @@ protected void configure() {
@Override
public Module getStandaloneModules() {

return Modules.combine(new AppFabricServiceModule(),
return Modules.combine(new AppFabricServiceModule(cConf),
new CapabilityModule(),
new NamespaceAdminModule().getStandaloneModules(),
new ConfigStoreModule(),
Expand Down Expand Up @@ -281,7 +286,7 @@ protected void configure() {
@Override
public Module getDistributedModules() {

return Modules.combine(new AppFabricServiceModule(ImpersonationHandler.class),
return Modules.combine(new AppFabricServiceModule(cConf, ImpersonationHandler.class),
new CapabilityModule(),
new NamespaceAdminModule().getDistributedModules(),
new ConfigStoreModule(),
Expand Down Expand Up @@ -326,9 +331,12 @@ protected void configure() {
private static final class AppFabricServiceModule extends AbstractModule {

private final List<Class<? extends HttpHandler>> handlerClasses;
private final CConfiguration cConf;

private AppFabricServiceModule(Class<? extends HttpHandler>... handlerClasses) {
private AppFabricServiceModule(CConfiguration cConf,
Class<? extends HttpHandler>... handlerClasses) {
this.handlerClasses = ImmutableList.copyOf(handlerClasses);
this.cConf = cConf;
}

@Override
Expand Down Expand Up @@ -456,6 +464,12 @@ protected void configure() {
handlerBinder.addBinding().to(CredentialProviderHttpHandler.class);
handlerBinder.addBinding().to(CredentialProviderHttpHandlerInternal.class);

FeatureFlagsProvider featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf);
if (Feature.NAMESPACED_SERVICE_ACCOUNTS.isEnabled(featureFlagsProvider)) {
handlerBinder.addBinding().to(GcpWorkloadIdentityHttpHandler.class);
handlerBinder.addBinding().to(GcpWorkloadIdentityHttpHandlerInternal.class);
}

for (Class<? extends HttpHandler> handlerClass : handlerClasses) {
handlerBinder.addBinding().to(handlerClass);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,10 @@ public synchronized void updateProperties(NamespaceId namespaceId, NamespaceMeta
builder.setDescription(namespaceMeta.getDescription());
}

if (Strings.isNullOrEmpty(existingMeta.getIdentity())) {
builder.setIdentity(getIdentity(namespaceId));
}

NamespaceConfig config = namespaceMeta.getConfig();
if (config != null && !Strings.isNullOrEmpty(config.getSchedulerQueueName())) {
builder.setSchedulerQueueName(config.getSchedulerQueueName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,28 @@
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import io.cdap.cdap.api.feature.FeatureFlagsProvider;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.app.runtime.ProgramRuntimeService;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.SConfiguration;
import io.cdap.cdap.common.discovery.ResolvingDiscoverable;
import io.cdap.cdap.common.discovery.URIScheme;
import io.cdap.cdap.common.feature.DefaultFeatureFlagsProvider;
import io.cdap.cdap.common.http.CommonNettyHttpServiceFactory;
import io.cdap.cdap.common.logging.LoggingContextAccessor;
import io.cdap.cdap.common.logging.ServiceLoggingContext;
import io.cdap.cdap.common.metrics.MetricsReporterHook;
import io.cdap.cdap.common.security.HttpsEnabler;
import io.cdap.cdap.features.Feature;
import io.cdap.cdap.internal.app.store.AppMetadataStore;
import io.cdap.cdap.internal.bootstrap.BootstrapService;
import io.cdap.cdap.internal.credential.CredentialProviderService;
import io.cdap.cdap.internal.namespace.credential.NamespaceCredentialProviderService;
import io.cdap.cdap.internal.provision.ProvisioningService;
import io.cdap.cdap.internal.sysapp.SystemAppManagementService;
import io.cdap.cdap.proto.id.NamespaceId;
Expand Down Expand Up @@ -80,6 +86,8 @@ public class AppFabricServer extends AbstractIdleService {
private final ProgramRunStatusMonitorService programRunStatusMonitorService;
private final RunRecordMonitorService runRecordCounterService;
private final CoreSchedulerService coreSchedulerService;
private final CredentialProviderService credentialProviderService;
private final NamespaceCredentialProviderService namespaceCredentialProviderService;
private final ProvisioningService provisioningService;
private final BootstrapService bootstrapService;
private final SystemAppManagementService systemAppManagementService;
Expand Down Expand Up @@ -113,7 +121,8 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
@Named("appfabric.services.names") Set<String> servicesNames,
@Named("appfabric.handler.hooks") Set<String> handlerHookNames,
CoreSchedulerService coreSchedulerService,
ProvisioningService provisioningService,
CredentialProviderService credentialProviderService,
NamespaceCredentialProviderService namespaceCredentialProviderService, ProvisioningService provisioningService,
BootstrapService bootstrapService,
SystemAppManagementService systemAppManagementService,
TransactionRunner transactionRunner,
Expand All @@ -138,6 +147,8 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
this.programRunStatusMonitorService = programRunStatusMonitorService;
this.sslEnabled = cConf.getBoolean(Constants.Security.SSL.INTERNAL_ENABLED);
this.coreSchedulerService = coreSchedulerService;
this.credentialProviderService = credentialProviderService;
this.namespaceCredentialProviderService = namespaceCredentialProviderService;
this.provisioningService = provisioningService;
this.bootstrapService = bootstrapService;
this.systemAppManagementService = systemAppManagementService;
Expand All @@ -158,23 +169,28 @@ protected void startUp() throws Exception {
new ServiceLoggingContext(NamespaceId.SYSTEM.getNamespace(),
Constants.Logging.COMPONENT_NAME,
Constants.Service.APP_FABRIC_HTTP));
Futures.allAsList(
ImmutableList.of(
provisioningService.start(),
applicationLifecycleService.start(),
bootstrapService.start(),
programRuntimeService.start(),
programNotificationSubscriberService.start(),
programStopSubscriberService.start(),
runRecordCorrectorService.start(),
programRunStatusMonitorService.start(),
coreSchedulerService.start(),
runRecordCounterService.start(),
runRecordTimeToLiveService.start(),
sourceControlOperationRunner.start(),
repositoryCleanupService.start()
)
).get();
List<ListenableFuture<State>> futuresList = new ArrayList<>();
FeatureFlagsProvider featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf);
if (Feature.NAMESPACED_SERVICE_ACCOUNTS.isEnabled(featureFlagsProvider)) {
futuresList.add(namespaceCredentialProviderService.start());
}
futuresList.addAll(ImmutableList.of(
provisioningService.start(),
applicationLifecycleService.start(),
bootstrapService.start(),
programRuntimeService.start(),
programNotificationSubscriberService.start(),
programStopSubscriberService.start(),
runRecordCorrectorService.start(),
programRunStatusMonitorService.start(),
coreSchedulerService.start(),
credentialProviderService.start(),
runRecordCounterService.start(),
runRecordTimeToLiveService.start(),
sourceControlOperationRunner.start(),
repositoryCleanupService.start()
));
Futures.allAsList(futuresList).get();

// Create handler hooks
List<MetricsReporterHook> handlerHooks = handlerHookNames.stream()
Expand Down Expand Up @@ -231,6 +247,8 @@ protected void shutDown() throws Exception {
runRecordTimeToLiveService.stopAndWait();
sourceControlOperationRunner.stopAndWait();
repositoryCleanupService.stopAndWait();
credentialProviderService.stopAndWait();
namespaceCredentialProviderService.stopAndWait();
}

private Cancellable startHttpService(NettyHttpService httpService) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
import io.cdap.cdap.common.service.Retries;
import io.cdap.cdap.common.service.RetryStrategies;
import io.cdap.cdap.gateway.handlers.util.AbstractAppFabricHttpHandler;
import io.cdap.cdap.internal.credential.RemoteCredentialProviderService;
import io.cdap.cdap.internal.namespace.credential.RemoteNamespaceCredentialProvider;
import io.cdap.cdap.proto.BasicThrowable;
import io.cdap.cdap.proto.codec.BasicThrowableCodec;
import io.cdap.cdap.proto.credential.CredentialProvider;
import io.cdap.cdap.proto.credential.CredentialProvisioningException;
import io.cdap.cdap.proto.credential.NamespaceCredentialProvider;
import io.cdap.cdap.proto.credential.NotFoundException;
import io.cdap.cdap.proto.credential.ProvisionedCredential;
import io.cdap.cdap.proto.security.GcpMetadataTaskContext;
Expand Down Expand Up @@ -64,16 +64,14 @@
@Singleton
@Path("/")
public class GcpMetadataHttpHandlerInternal extends AbstractAppFabricHttpHandler {

public static final String GCP_CREDENTIAL_IDENTITY_NAME = "default";
protected static final String METADATA_FLAVOR_HEADER_KEY = "Metadata-Flavor";
protected static final String METADATA_FLAVOR_HEADER_VALUE = "Google";
private static final Logger LOG = LoggerFactory.getLogger(GcpMetadataHttpHandlerInternal.class);
private static final Gson GSON = new GsonBuilder().registerTypeAdapter(BasicThrowable.class,
new BasicThrowableCodec()).create();
private final CConfiguration cConf;
private final String metadataServiceTokenEndpoint;
private final CredentialProvider credentialProvider;
private final NamespaceCredentialProvider credentialProvider;
private final GcpWorkloadIdentityInternalAuthenticator gcpWorkloadIdentityInternalAuthenticator;
private GcpMetadataTaskContext gcpMetadataTaskContext;

Expand All @@ -89,7 +87,7 @@ public GcpMetadataHttpHandlerInternal(CConfiguration cConf,
Constants.TaskWorker.METADATA_SERVICE_END_POINT);
this.gcpWorkloadIdentityInternalAuthenticator =
new GcpWorkloadIdentityInternalAuthenticator(gcpMetadataTaskContext);
this.credentialProvider = new RemoteCredentialProviderService(remoteClientFactory,
this.credentialProvider = new RemoteNamespaceCredentialProvider(remoteClientFactory,
this.gcpWorkloadIdentityInternalAuthenticator);
}

Expand Down Expand Up @@ -187,8 +185,7 @@ public void token(HttpRequest request, HttpResponder responder,
private GcpTokenResponse fetchTokenFromCredentialProvider(String scopes) throws NotFoundException,
IOException, CredentialProvisioningException {
ProvisionedCredential provisionedCredential =
this.credentialProvider.provision(gcpMetadataTaskContext.getNamespace(),
GCP_CREDENTIAL_IDENTITY_NAME, scopes);
this.credentialProvider.provision(gcpMetadataTaskContext.getNamespace(), scopes);
return new GcpTokenResponse("Bearer", provisionedCredential.get(),
Duration.between(Instant.now(), provisionedCredential.getExpiration()).getSeconds());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import com.google.common.util.concurrent.AbstractIdleService;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.namespace.NamespaceQueryAdmin;
import io.cdap.cdap.common.namespace.NamespaceAdmin;
import io.cdap.cdap.proto.NamespaceMeta;
import io.cdap.cdap.proto.credential.CredentialIdentity;
import io.cdap.cdap.proto.credential.CredentialProfile;
Expand Down Expand Up @@ -51,25 +51,26 @@ public class DefaultCredentialProviderService extends AbstractIdleService
private final Map<String, CredentialProvider> credentialProviders;
private final CredentialIdentityManager credentialIdentityManager;
private final CredentialProfileManager credentialProfileManager;
private final NamespaceQueryAdmin namespaceQueryAdmin;
private final NamespaceAdmin namespaceAdmin;

@Inject
DefaultCredentialProviderService(CConfiguration cConf,
ContextAccessEnforcer contextAccessEnforcer,
CredentialProviderLoader credentialProviderLoader,
CredentialIdentityManager credentialIdentityManager,
CredentialProfileManager credentialProfileManager,
NamespaceQueryAdmin namespaceQueryAdmin) {
NamespaceAdmin namespaceAdmin) {
this.cConf = cConf;
this.contextAccessEnforcer = contextAccessEnforcer;
this.credentialProviders = credentialProviderLoader.loadCredentialProviders();
this.credentialIdentityManager = credentialIdentityManager;
this.credentialProfileManager = credentialProfileManager;
this.namespaceQueryAdmin = namespaceQueryAdmin;
this.namespaceAdmin = namespaceAdmin;
}

@Override
protected void startUp() throws Exception {

for (CredentialProvider provider : credentialProviders.values()) {
provider.initialize(new DefaultCredentialProviderContext(cConf, provider.getName()));
}
Expand Down Expand Up @@ -100,7 +101,7 @@ public ProvisionedCredential provision(String namespace, String identityName,
contextAccessEnforcer.enforce(identityId, StandardPermission.USE);
NamespaceMeta namespaceMeta;
try {
namespaceMeta = namespaceQueryAdmin.get(new NamespaceId(namespace));
namespaceMeta = namespaceAdmin.get(new NamespaceId(namespace));
} catch (Exception e) {
throw new IOException(String.format("Failed to get namespace '%s' metadata",
namespace), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.cdap.cdap.internal.credential;

import com.google.common.util.concurrent.AbstractIdleService;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.cdap.cdap.api.retry.Idempotency;
Expand All @@ -28,6 +27,7 @@
import io.cdap.cdap.proto.NamespaceMeta;
import io.cdap.cdap.proto.codec.BasicThrowableCodec;
import io.cdap.cdap.proto.credential.CredentialIdentity;
import io.cdap.cdap.proto.credential.CredentialProvider;
import io.cdap.cdap.proto.credential.CredentialProvisioningException;
import io.cdap.cdap.proto.credential.IdentityValidationException;
import io.cdap.cdap.proto.credential.NotFoundException;
Expand All @@ -39,45 +39,28 @@
import joptsimple.internal.Strings;

/**
* Remote implementation for {@link CredentialProviderService} used in
* Remote implementation for {@link CredentialProvider} used in
* {@link io.cdap.cdap.common.conf.Constants.ArtifactLocalizer}.
*/
public class RemoteCredentialProviderService extends AbstractIdleService
implements CredentialProviderService {
public class RemoteCredentialProvider implements CredentialProvider {
private static final Gson GSON = new GsonBuilder().registerTypeAdapter(BasicThrowable.class,
new BasicThrowableCodec()).create();
private final RemoteClient remoteClient;

/**
* Construct the {@link RemoteCredentialProviderService}.
* Construct the {@link RemoteCredentialProvider}.
*
* @param remoteClientFactory A factory to create {@link RemoteClient}.
* @param internalAuthenticator An authenticator to propagate internal identity headers.
*/
public RemoteCredentialProviderService(RemoteClientFactory remoteClientFactory,
public RemoteCredentialProvider(RemoteClientFactory remoteClientFactory,
InternalAuthenticator internalAuthenticator) {

this.remoteClient = remoteClientFactory.createRemoteClient(Constants.Service.APP_FABRIC_HTTP,
RemoteClientFactory.NO_VERIFY_HTTP_REQUEST_CONFIG, Constants.Gateway.INTERNAL_API_VERSION_3,
internalAuthenticator);
}

/**
* Start the service.
*/
@Override
protected void startUp() throws Exception {

}

/**
* Stop the service.
*/
@Override
protected void shutDown() throws Exception {

}

/**
* Provisions a short-lived credential for the provided identity using the provided identity.
*
Expand Down Expand Up @@ -105,6 +88,14 @@ public ProvisionedCredential provision(String namespace, String identityName,
throw new NotFoundException(String.format("Credential Identity %s Not Found.",
identityName));
}

if (response.getResponseCode() != HttpResponseStatus.OK.code()) {
throw new CredentialProvisioningException(String.format(
"Failed to provision credential with response code: %s and error: %s",
response.getResponseCode(),
response.getResponseBodyAsString()));
}

return GSON.fromJson(response.getResponseBodyAsString(), ProvisionedCredential.class);
}

Expand Down
Loading

0 comments on commit d23c4a5

Please sign in to comment.