Skip to content

Commit

Permalink
add multi push and pull apis
Browse files Browse the repository at this point in the history
  • Loading branch information
GnsP committed Nov 1, 2023
1 parent b970e5f commit eae55a2
Show file tree
Hide file tree
Showing 12 changed files with 487 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@
import io.cdap.cdap.internal.app.services.RunRecordCorrectorService;
import io.cdap.cdap.internal.app.services.RunRecordMonitorService;
import io.cdap.cdap.internal.app.services.ScheduledRunRecordCorrectorService;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperationFactory;
import io.cdap.cdap.internal.app.sourcecontrol.PushAppsOperationFactory;
import io.cdap.cdap.internal.app.store.DefaultStore;
import io.cdap.cdap.internal.bootstrap.guice.BootstrapModules;
import io.cdap.cdap.internal.capability.CapabilityModule;
Expand Down Expand Up @@ -157,6 +159,7 @@
import io.cdap.cdap.security.impersonation.UGIProvider;
import io.cdap.cdap.security.impersonation.UnsupportedUGIProvider;
import io.cdap.cdap.security.store.SecureStoreHandler;
import io.cdap.cdap.sourcecontrol.ApplicationManager;
import io.cdap.cdap.sourcecontrol.guice.SourceControlModule;
import io.cdap.cdap.spi.events.StartProgramEvent;
import io.cdap.http.HttpHandler;
Expand Down Expand Up @@ -366,6 +369,12 @@ protected void configure() {
.build(Key.get(ConfiguratorFactory.class,
Names.named(AppFabric.FACTORY_IMPLEMENTATION_REMOTE)))
);

bind(ApplicationManager.class).to(
io.cdap.cdap.internal.app.sourcecontrol.LocalApplicationManager.class);
install(new FactoryModuleBuilder().build(PullAppsOperationFactory.class));
install(new FactoryModuleBuilder().build(PushAppsOperationFactory.class));

// Used in InMemoryProgramRunDispatcher, TetheringClientHandler
install(RemoteAuthenticatorModules.getDefaultModule(
TetheringAgentService.REMOTE_TETHERING_AUTHENTICATOR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
import io.cdap.cdap.proto.ApplicationRecord;
import io.cdap.cdap.proto.id.ApplicationReference;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.operation.OperationMeta;
import io.cdap.cdap.proto.sourcecontrol.PullMultipleAppsRequest;
import io.cdap.cdap.proto.sourcecontrol.PushAppRequest;
import io.cdap.cdap.proto.sourcecontrol.PushMultipleAppsRequest;
import io.cdap.cdap.proto.sourcecontrol.RemoteRepositoryValidationException;
import io.cdap.cdap.proto.sourcecontrol.RepositoryConfigRequest;
import io.cdap.cdap.proto.sourcecontrol.RepositoryConfigValidationException;
Expand Down Expand Up @@ -183,6 +186,49 @@ public void pushApp(FullHttpRequest request, HttpResponder responder,
}
}

/**
* Pushes application configs of requested applications to linked repository in Json format.
* It expects a post body that has a list of application ids and an optional commit message
* E.g.
*
* <pre>
* {@code
* {
* "appIds": ["app_id_1", "app_id_2"],
* "commitMessage": "pushed application XYZ"
* }
* }
*
* </pre>
* The response will be a {@link OperationMeta} object, which encapsulates the application name,
* version and fileHash.
*/
@POST
@Path("/apps/push")
public void pushApps(FullHttpRequest request, HttpResponder responder,
@PathParam("namespace-id") String namespaceId) throws Exception {
checkSourceControlMultiFeatureFlag();
NamespaceId namespace = validateNamespaceId(namespaceId);

Check warning on line 211 in cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.coding.VariableDeclarationUsageDistanceCheck

Distance between variable 'namespace' declaration and its first usage is 4, but allowed 3. Consider making that variable final if you still need to store its value in advance (before method calls that might have side effects on the original value).

PushMultipleAppsRequest appsRequest;
try {
appsRequest = parseBody(request, PushMultipleAppsRequest.class);
} catch (JsonSyntaxException e) {
throw new BadRequestException("Invalid request body.", e);
}

if (appsRequest == null) {
throw new BadRequestException("Invalid request body.");
}

if (Strings.isNullOrEmpty(appsRequest.getCommitMessage())) {
throw new BadRequestException("Please specify commit message in the request body.");
}

OperationMeta operationMeta = sourceControlService.pushApps(namespace, appsRequest);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(operationMeta));
}

/**
* Pull the requested application from linked repository and deploy in current namespace.
*/
Expand All @@ -202,6 +248,31 @@ public void pullApp(FullHttpRequest request, HttpResponder responder,
}
}

/**
* Pull the requested applications from linked repository and deploy in current namespace.
*/
@POST
@Path("/apps/pull")
public void pullApps(FullHttpRequest request, HttpResponder responder,
@PathParam("namespace-id") String namespaceId) throws Exception {
checkSourceControlMultiFeatureFlag();
NamespaceId namespace = validateNamespaceId(namespaceId);

PullMultipleAppsRequest appsRequest;
try {
appsRequest = parseBody(request, PullMultipleAppsRequest.class);
} catch (JsonSyntaxException e) {
throw new BadRequestException("Invalid request body.", e);
}

if (appsRequest == null) {
throw new BadRequestException("Invalid request body.");
}

OperationMeta operationMeta = sourceControlService.pullApps(namespace, appsRequest);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(operationMeta));
}

private PushAppRequest validateAndGetAppsRequest(FullHttpRequest request) throws BadRequestException {
PushAppRequest appRequest;
try {
Expand All @@ -227,6 +298,13 @@ private void checkSourceControlFeatureFlag() throws ForbiddenException {
}
}

private void checkSourceControlMultiFeatureFlag() throws ForbiddenException {
checkSourceControlFeatureFlag();
if (!Feature.SOURCE_CONTROL_MANAGEMENT_MULTIPLE_APPS.isEnabled(featureFlagsProvider)) {
throw new ForbiddenException("Source Control Management for multiple apps feature is not enabled.");
}
}

private NamespaceId validateNamespaceId(String namespaceId) throws BadRequestException {
try {
return new NamespaceId(namespaceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.cdap.internal.app.services;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import io.cdap.cdap.api.artifact.ArtifactSummary;
import io.cdap.cdap.api.security.store.SecureStore;
Expand All @@ -26,15 +27,28 @@
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.internal.app.deploy.pipeline.ApplicationWithPrograms;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperation;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperationFactory;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest;
import io.cdap.cdap.internal.app.sourcecontrol.PushAppsOperation;
import io.cdap.cdap.internal.app.sourcecontrol.PushAppsOperationFactory;
import io.cdap.cdap.internal.app.sourcecontrol.PushAppsRequest;
import io.cdap.cdap.internal.operation.OperationException;
import io.cdap.cdap.internal.operation.SynchronousLongRunningOperationContext;
import io.cdap.cdap.proto.ApplicationDetail;
import io.cdap.cdap.proto.ApplicationRecord;
import io.cdap.cdap.proto.artifact.AppRequest;
import io.cdap.cdap.proto.id.ApplicationId;
import io.cdap.cdap.proto.id.ApplicationReference;
import io.cdap.cdap.proto.id.KerberosPrincipalId;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.operation.OperationMeta;
import io.cdap.cdap.proto.operation.OperationResource;
import io.cdap.cdap.proto.operation.OperationType;
import io.cdap.cdap.proto.security.NamespacePermission;
import io.cdap.cdap.proto.security.StandardPermission;
import io.cdap.cdap.proto.sourcecontrol.PullMultipleAppsRequest;
import io.cdap.cdap.proto.sourcecontrol.PushMultipleAppsRequest;
import io.cdap.cdap.proto.sourcecontrol.RemoteRepositoryValidationException;
import io.cdap.cdap.proto.sourcecontrol.RepositoryConfig;
import io.cdap.cdap.proto.sourcecontrol.RepositoryMeta;
Expand Down Expand Up @@ -62,7 +76,10 @@
import io.cdap.cdap.store.NamespaceTable;
import io.cdap.cdap.store.RepositoryTable;
import java.io.IOException;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -79,6 +96,8 @@ public class SourceControlManagementService {
private final SourceControlOperationRunner sourceControlOperationRunner;
private final ApplicationLifecycleService appLifecycleService;
private final Store store;
private final PushAppsOperationFactory pushAppsOperationFactory;
private final PullAppsOperationFactory pullAppsOperationFactory;
private static final Logger LOG = LoggerFactory.getLogger(SourceControlManagementService.class);


Expand All @@ -93,7 +112,8 @@ public SourceControlManagementService(CConfiguration cConf,
AuthenticationContext authenticationContext,
SourceControlOperationRunner sourceControlOperationRunner,
ApplicationLifecycleService applicationLifecycleService,
Store store) {
Store store, PushAppsOperationFactory pushAppsOperationFactory,
PullAppsOperationFactory pullAppsOperationFactory) {
this.cConf = cConf;
this.secureStore = secureStore;
this.transactionRunner = transactionRunner;
Expand All @@ -102,6 +122,8 @@ public SourceControlManagementService(CConfiguration cConf,
this.sourceControlOperationRunner = sourceControlOperationRunner;
this.appLifecycleService = applicationLifecycleService;
this.store = store;
this.pushAppsOperationFactory = pushAppsOperationFactory;
this.pullAppsOperationFactory = pullAppsOperationFactory;
}

private RepositoryTable getRepositoryTable(StructuredTableContext context) throws TableNotFoundException {
Expand Down Expand Up @@ -203,7 +225,7 @@ public PushAppResponse pushApp(ApplicationReference appRef, String commitMessage
// TODO: CDAP-20396 RepositoryConfig is currently only accessible from the service layer
// Need to fix it and avoid passing it in RepositoryManagerFactory
RepositoryConfig repoConfig = getRepositoryMeta(appRef.getParent()).getConfig();

// AppLifecycleService already enforces ApplicationDetail Access
ApplicationDetail appDetail = appLifecycleService.getLatestAppDetail(appRef, false);

Expand All @@ -224,7 +246,7 @@ public PushAppResponse pushApp(ApplicationReference appRef, String commitMessage
appRef.getApplication(),
appRef.getParent(),
appLifecycleService.decodeUserId(authenticationContext));

SourceControlMeta sourceControlMeta = new SourceControlMeta(pushResponse.getFileHash());
ApplicationId appId = appRef.app(appDetail.getAppVersion());
store.setAppSourceControlMeta(appId, sourceControlMeta);
Expand All @@ -251,7 +273,7 @@ public ApplicationRecord pullAndDeploy(ApplicationReference appRef) throws Excep
accessEnforcer.enforce(appId, authenticationContext.getPrincipal(), StandardPermission.CREATE);
accessEnforcer.enforce(appRef.getParent(), authenticationContext.getPrincipal(),
NamespacePermission.READ_REPOSITORY);

PullAppResponse<?> pullResponse = pullAndValidateApplication(appRef);

AppRequest<?> appRequest = pullResponse.getAppRequest();
Expand Down Expand Up @@ -319,4 +341,73 @@ public RepositoryAppsResponse listApps(NamespaceId namespace) throws NotFoundExc
RepositoryConfig repoConfig = getRepositoryMeta(namespace).getConfig();
return sourceControlOperationRunner.list(new NamespaceRepository(namespace, repoConfig));
}

/**
* The method to push multiple applications in the same namespace to the linked repository.
*
* @param namespace {@link NamespaceId} from where the apps are to be pushed
* @param request {@link PushMultipleAppsRequest} containing the appIds and the commit message
*
* @return {@link OperationMeta} of the operation to push the apps
* @throws NoChangesToPushException when none of the apps have changed since last commit
* @throws NotFoundException when the repository or any of the apps are not found
* @throws InterruptedException when the push operation is inturrupted
* @throws ExecutionException when the push operation execution fails
* @throws OperationException for any exception occuring in the operation logic
*/
public OperationMeta pushApps(NamespaceId namespace, PushMultipleAppsRequest request)
throws NoChangesToPushException, NotFoundException, InterruptedException,
ExecutionException, OperationException {
accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(),
NamespacePermission.WRITE_REPOSITORY);
RepositoryConfig repoConfig = getRepositoryMeta(namespace).getConfig();
String committer = authenticationContext.getPrincipal().getName();
PushAppsOperation pushOp = pushAppsOperationFactory.create(new PushAppsRequest(
new HashSet<>(request.getApps()),
repoConfig,
new CommitMeta(committer, committer, System.currentTimeMillis(), request.getCommitMessage())
));

SynchronousLongRunningOperationContext operationContext = new SynchronousLongRunningOperationContext(
namespace.getNamespace(),
OperationType.PUSH_APPS
);
ListenableFuture<Set<OperationResource>> result = pushOp.run(operationContext);
result.get();

return operationContext.getOperationMeta();
}

/**
* The method to pull multiple applications from the linked repository and deploy them in current namespace.
*
* @param namespace {@link NamespaceId} from where the apps are to be pushed
* @param request {@link PullMultipleAppsRequest} containing the appIds
*
* @return {@link OperationMeta} of the operation to push the apps
* @throws NotFoundException when the repository or any of the apps are not found
* @throws InterruptedException when the push operation is inturrupted
* @throws ExecutionException when the push operation execution fails
* @throws OperationException for any exception occuring in the operation logic
*/
public OperationMeta pullApps(NamespaceId namespace, PullMultipleAppsRequest request)
throws NotFoundException, InterruptedException,
ExecutionException, OperationException {
accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(),
NamespacePermission.READ_REPOSITORY);
RepositoryConfig repoConfig = getRepositoryMeta(namespace).getConfig();
PullAppsOperation pullOp = pullAppsOperationFactory.create(new PullAppsRequest(
new HashSet<>(request.getApps()),
repoConfig
));

SynchronousLongRunningOperationContext operationContext = new SynchronousLongRunningOperationContext(
namespace.getNamespace(),
OperationType.PULL_APPS
);
ListenableFuture<Set<OperationResource>> result = pullOp.run(operationContext);
result.get();

return operationContext.getOperationMeta();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.cdap.cdap.internal.app.sourcecontrol;


/**
* Factory interface for creating {@link PushAppsOperation}.
* This interface is for Guice assisted binding, hence there will be no concrete implementation of it.
Expand All @@ -27,7 +26,7 @@ public interface PushAppsOperationFactory {
* Returns an implementation of {@link PushAppsOperation} that operates on the given {@link
* PushAppsRequest}.
*
* @param request contains list of apps to pull
* @param request contains list of apps to push
* @return a new instance of {@link PushAppsOperation}.
*/
PushAppsOperation create(PushAppsRequest request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,14 @@ protected AbstractLongRunningOperationContext(OperationRunId runid, OperationTyp
this.runId = runid;
this.type = operationType;
}

@Override
public OperationRunId getRunId() {
return runId;
}

@Override
public OperationType getType() {
return type;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.operation;

import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operation.OperationMeta;
import io.cdap.cdap.proto.operation.OperationResource;
import io.cdap.cdap.proto.operation.OperationType;
import java.time.Instant;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;

public class SynchronousLongRunningOperationContext extends AbstractLongRunningOperationContext {

Check warning on line 28 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/SynchronousLongRunningOperationContext.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.MissingJavadocTypeCheck

Missing a Javadoc comment.
private OperationMeta operationMeta;

public SynchronousLongRunningOperationContext(String namespace, OperationType operationType) {
super(new OperationRunId(namespace, UUID.randomUUID().toString()), operationType);
this.operationMeta = new OperationMeta(new HashSet<>(), Instant.now(), null);
}

@Override
public void updateOperationResources(Set<OperationResource> resources) {
operationMeta = new OperationMeta(resources, operationMeta.getCreateTime(), Instant.now());
}

public OperationMeta getOperationMeta() {
return operationMeta;
}
}
Loading

0 comments on commit eae55a2

Please sign in to comment.