From a00f520737489cf1124f238124dfaabbec3b73f4 Mon Sep 17 00:00:00 2001 From: samik Date: Mon, 20 Nov 2023 09:41:22 +0530 Subject: [PATCH] Add APIs to do multi pull/push --- .../SourceControlManagementHttpHandler.java | 84 ++++++- .../SourceControlManagementService.java | 168 ++++++++++---- .../operation/OperationLifecycleManager.java | 98 ++++++++- .../operation/OperationRunDetail.java | 5 +- .../internal/operation/OperationRunStore.java | 43 ++++ .../app/services/http/AppFabricTestBase.java | 101 +++++---- ...urceControlManagementHttpHandlerTests.java | 132 ++++++++++- .../OperationLifecycleManagerTest.java | 206 ++++++++++-------- .../internal/operation/OperationTestBase.java | 7 +- .../PullMultipleAppsRequest.java | 34 +++ .../PushMultipleAppsRequest.java | 40 ++++ 11 files changed, 722 insertions(+), 196 deletions(-) create mode 100644 cdap-proto/src/main/java/io/cdap/cdap/proto/sourcecontrol/PullMultipleAppsRequest.java create mode 100644 cdap-proto/src/main/java/io/cdap/cdap/proto/sourcecontrol/PushMultipleAppsRequest.java diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java index 82622b1e7c3e..114f64914f0a 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java @@ -34,7 +34,11 @@ import io.cdap.cdap.proto.ApplicationRecord; import io.cdap.cdap.proto.id.ApplicationReference; import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.proto.operation.OperationMeta; +import io.cdap.cdap.proto.operation.OperationRun; +import io.cdap.cdap.proto.sourcecontrol.PullMultipleAppsRequest; import io.cdap.cdap.proto.sourcecontrol.PushAppRequest; +import io.cdap.cdap.proto.sourcecontrol.PushMultipleAppsRequest; import io.cdap.cdap.proto.sourcecontrol.RemoteRepositoryValidationException; import io.cdap.cdap.proto.sourcecontrol.RepositoryConfigRequest; import io.cdap.cdap.proto.sourcecontrol.RepositoryConfigValidationException; @@ -47,7 +51,6 @@ import io.cdap.http.HttpResponder; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; - import javax.ws.rs.DELETE; import javax.ws.rs.GET; import javax.ws.rs.POST; @@ -183,6 +186,49 @@ public void pushApp(FullHttpRequest request, HttpResponder responder, } } + /** + * Pushes application configs of requested applications to linked repository in Json format. + * It expects a post body that has a list of application ids and an optional commit message + * E.g. + * + *
+   * {@code
+   * {
+   *   "appIds": ["app_id_1", "app_id_2"],
+   *   "commitMessage": "pushed application XYZ"
+   * }
+   * }
+   *
+   * 
+ * The response will be a {@link OperationMeta} object, which encapsulates the application name, + * version and fileHash. + */ + @POST + @Path("/apps/push") + public void pushApps(FullHttpRequest request, HttpResponder responder, + @PathParam("namespace-id") String namespaceId) throws Exception { + checkSourceControlMultiFeatureFlag(); + PushMultipleAppsRequest appsRequest; + try { + appsRequest = parseBody(request, PushMultipleAppsRequest.class); + } catch (JsonSyntaxException e) { + throw new BadRequestException(String.format("Invalid request body: %s", e.getMessage())); + } + + if (appsRequest == null) { + throw new BadRequestException("Invalid request body."); + } + + if (Strings.isNullOrEmpty(appsRequest.getCommitMessage())) { + throw new BadRequestException("Please specify commit message in the request body."); + } + + NamespaceId namespace = validateNamespaceId(namespaceId); + + OperationRun operationMeta = sourceControlService.pushApps(namespace, appsRequest); + responder.sendJson(HttpResponseStatus.OK, GSON.toJson(operationMeta)); + } + /** * Pull the requested application from linked repository and deploy in current namespace. */ @@ -202,6 +248,31 @@ public void pullApp(FullHttpRequest request, HttpResponder responder, } } + /** + * Pull the requested applications from linked repository and deploy in current namespace. + */ + @POST + @Path("/apps/pull") + public void pullApps(FullHttpRequest request, HttpResponder responder, + @PathParam("namespace-id") String namespaceId) throws Exception { + checkSourceControlMultiFeatureFlag(); + NamespaceId namespace = validateNamespaceId(namespaceId); + + PullMultipleAppsRequest appsRequest; + try { + appsRequest = parseBody(request, PullMultipleAppsRequest.class); + } catch (JsonSyntaxException e) { + throw new BadRequestException("Invalid request body.", e); + } + + if (appsRequest == null) { + throw new BadRequestException("Invalid request body."); + } + + OperationRun operationRun = sourceControlService.pullApps(namespace, appsRequest); + responder.sendJson(HttpResponseStatus.OK, GSON.toJson(operationRun)); + } + private PushAppRequest validateAndGetAppsRequest(FullHttpRequest request) throws BadRequestException { PushAppRequest appRequest; try { @@ -217,16 +288,19 @@ private PushAppRequest validateAndGetAppsRequest(FullHttpRequest request) throws return appRequest; } - /** - * - * throws {@link ForbiddenException} if the feature is disabled - */ private void checkSourceControlFeatureFlag() throws ForbiddenException { if (!Feature.SOURCE_CONTROL_MANAGEMENT_GIT.isEnabled(featureFlagsProvider)) { throw new ForbiddenException("Source Control Management feature is not enabled."); } } + private void checkSourceControlMultiFeatureFlag() throws ForbiddenException { + checkSourceControlFeatureFlag(); + if (!Feature.SOURCE_CONTROL_MANAGEMENT_MULTI_APP.isEnabled(featureFlagsProvider)) { + throw new ForbiddenException("Source Control Management for multiple apps feature is not enabled."); + } + } + private NamespaceId validateNamespaceId(String namespaceId) throws BadRequestException { try { return new NamespaceId(namespaceId); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java index 9e24ecac06b2..80e2f5cbbf17 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java @@ -23,9 +23,13 @@ import io.cdap.cdap.common.NamespaceNotFoundException; import io.cdap.cdap.common.NotFoundException; import io.cdap.cdap.common.RepositoryNotFoundException; +import io.cdap.cdap.common.TooManyRequestsException; import io.cdap.cdap.common.app.RunIds; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.internal.app.deploy.pipeline.ApplicationWithPrograms; +import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest; +import io.cdap.cdap.internal.app.sourcecontrol.PushAppsRequest; +import io.cdap.cdap.internal.operation.OperationLifecycleManager; import io.cdap.cdap.proto.ApplicationDetail; import io.cdap.cdap.proto.ApplicationRecord; import io.cdap.cdap.proto.artifact.AppRequest; @@ -33,8 +37,11 @@ import io.cdap.cdap.proto.id.ApplicationReference; import io.cdap.cdap.proto.id.KerberosPrincipalId; import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.proto.operation.OperationRun; import io.cdap.cdap.proto.security.NamespacePermission; import io.cdap.cdap.proto.security.StandardPermission; +import io.cdap.cdap.proto.sourcecontrol.PullMultipleAppsRequest; +import io.cdap.cdap.proto.sourcecontrol.PushMultipleAppsRequest; import io.cdap.cdap.proto.sourcecontrol.RemoteRepositoryValidationException; import io.cdap.cdap.proto.sourcecontrol.RepositoryConfig; import io.cdap.cdap.proto.sourcecontrol.RepositoryMeta; @@ -62,15 +69,17 @@ import io.cdap.cdap.store.NamespaceTable; import io.cdap.cdap.store.RepositoryTable; import java.io.IOException; +import java.util.HashSet; import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Service that manages source control for repositories and applications. - * It exposes repository CRUD apis and source control tasks that do pull/pull/list applications in linked repository. + * Service that manages source control for repositories and applications. It exposes repository CRUD + * apis and source control tasks that do pull/pull/list applications in linked repository. */ public class SourceControlManagementService { + private final AccessEnforcer accessEnforcer; private final AuthenticationContext authenticationContext; private final TransactionRunner transactionRunner; @@ -79,6 +88,7 @@ public class SourceControlManagementService { private final SourceControlOperationRunner sourceControlOperationRunner; private final ApplicationLifecycleService appLifecycleService; private final Store store; + private final OperationLifecycleManager operationLifecycleManager; private static final Logger LOG = LoggerFactory.getLogger(SourceControlManagementService.class); @@ -87,13 +97,14 @@ public class SourceControlManagementService { */ @Inject public SourceControlManagementService(CConfiguration cConf, - SecureStore secureStore, - TransactionRunner transactionRunner, - AccessEnforcer accessEnforcer, - AuthenticationContext authenticationContext, - SourceControlOperationRunner sourceControlOperationRunner, - ApplicationLifecycleService applicationLifecycleService, - Store store) { + SecureStore secureStore, + TransactionRunner transactionRunner, + AccessEnforcer accessEnforcer, + AuthenticationContext authenticationContext, + SourceControlOperationRunner sourceControlOperationRunner, + ApplicationLifecycleService applicationLifecycleService, + Store store, + OperationLifecycleManager operationLifecycleManager) { this.cConf = cConf; this.secureStore = secureStore; this.transactionRunner = transactionRunner; @@ -102,13 +113,16 @@ public SourceControlManagementService(CConfiguration cConf, this.sourceControlOperationRunner = sourceControlOperationRunner; this.appLifecycleService = applicationLifecycleService; this.store = store; + this.operationLifecycleManager = operationLifecycleManager; } - private RepositoryTable getRepositoryTable(StructuredTableContext context) throws TableNotFoundException { + private RepositoryTable getRepositoryTable(StructuredTableContext context) + throws TableNotFoundException { return new RepositoryTable(context); } - private NamespaceTable getNamespaceTable(StructuredTableContext context) throws TableNotFoundException { + private NamespaceTable getNamespaceTable(StructuredTableContext context) + throws TableNotFoundException { return new NamespaceTable(context); } @@ -121,7 +135,7 @@ private NamespaceTable getNamespaceTable(StructuredTableContext context) throws * @throws NamespaceNotFoundException if the namespace is non-existent */ public RepositoryMeta setRepository(NamespaceId namespace, RepositoryConfig repository) - throws NamespaceNotFoundException { + throws NamespaceNotFoundException { accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(), NamespacePermission.UPDATE_REPOSITORY_METADATA); @@ -155,9 +169,11 @@ public void deleteRepository(NamespaceId namespace) { /** * Return the repository config for the given namespace. * - * @throws RepositoryNotFoundException if repository config is not available for the namespace + * @throws RepositoryNotFoundException if repository config is not available for the + * namespace */ - public RepositoryMeta getRepositoryMeta(NamespaceId namespace) throws RepositoryNotFoundException { + public RepositoryMeta getRepositoryMeta(NamespaceId namespace) + throws RepositoryNotFoundException { accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(), StandardPermission.GET); return TransactionRunners.run(transactionRunner, context -> { @@ -177,10 +193,11 @@ public RepositoryMeta getRepositoryMeta(NamespaceId namespace) throws Repository * @throws RemoteRepositoryValidationException if validation of remote repository fails */ public void validateRepository(NamespaceId namespace, RepositoryConfig repoConfig) - throws RemoteRepositoryValidationException { + throws RemoteRepositoryValidationException { accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(), NamespacePermission.UPDATE_REPOSITORY_METADATA); - RepositoryManager.validateConfig(secureStore, new SourceControlConfig(namespace, repoConfig, cConf)); + RepositoryManager.validateConfig(secureStore, + new SourceControlConfig(namespace, repoConfig, cConf)); } /** @@ -189,27 +206,31 @@ public void validateRepository(NamespaceId namespace, RepositoryConfig repoConfi * @param appRef {@link ApplicationReference} * @param commitMessage enforced commit message from user * @return {@link PushAppResponse} - * @throws NotFoundException if the application is not found or the repository config is not found - * @throws IOException if {@link ApplicationLifecycleService} fails to get the adminOwner store + * @throws NotFoundException if the application is not found or the repository config is not + * found + * @throws IOException if {@link ApplicationLifecycleService} fails to get the adminOwner + * store * @throws SourceControlException if {@link SourceControlOperationRunner} fails to push * @throws AuthenticationConfigException if the repository configuration authentication fails - * @throws NoChangesToPushException if there's no change of the application between namespace and linked repository + * @throws NoChangesToPushException if there's no change of the application between namespace + * and linked repository */ public PushAppResponse pushApp(ApplicationReference appRef, String commitMessage) - throws NotFoundException, IOException, NoChangesToPushException, AuthenticationConfigException { + throws NotFoundException, IOException, NoChangesToPushException, AuthenticationConfigException { accessEnforcer.enforce(appRef.getParent(), authenticationContext.getPrincipal(), NamespacePermission.WRITE_REPOSITORY); // TODO: CDAP-20396 RepositoryConfig is currently only accessible from the service layer // Need to fix it and avoid passing it in RepositoryManagerFactory RepositoryConfig repoConfig = getRepositoryMeta(appRef.getParent()).getConfig(); - + // AppLifecycleService already enforces ApplicationDetail Access ApplicationDetail appDetail = appLifecycleService.getLatestAppDetail(appRef, false); String committer = authenticationContext.getPrincipal().getName(); // TODO CDAP-20371 revisit and put correct Author and Committer, for now they are the same - CommitMeta commitMeta = new CommitMeta(committer, committer, System.currentTimeMillis(), commitMessage); + CommitMeta commitMeta = new CommitMeta(committer, committer, System.currentTimeMillis(), + commitMessage); LOG.info("Start to push app {} in namespace {} to linked repository by user {}", appRef.getApplication(), @@ -217,14 +238,14 @@ public PushAppResponse pushApp(ApplicationReference appRef, String commitMessage appLifecycleService.decodeUserId(authenticationContext)); PushAppResponse pushResponse = sourceControlOperationRunner.push( - new PushAppOperationRequest(appRef.getParent(), repoConfig, appDetail, commitMeta) + new PushAppOperationRequest(appRef.getParent(), repoConfig, appDetail, commitMeta) ); LOG.info("Successfully pushed app {} in namespace {} to linked repository by user {}", appRef.getApplication(), appRef.getParent(), appLifecycleService.decodeUserId(authenticationContext)); - + SourceControlMeta sourceControlMeta = new SourceControlMeta(pushResponse.getFileHash()); ApplicationId appId = appRef.app(appDetail.getAppVersion()); store.setAppSourceControlMeta(appId, sourceControlMeta); @@ -239,7 +260,8 @@ public PushAppResponse pushApp(ApplicationReference appRef, String commitMessage * @return {@link ApplicationRecord} of the deployed application. * @throws Exception when {@link ApplicationLifecycleService} fails to deploy. * @throws NoChangesToPullException if the fileHashes are the same - * @throws NotFoundException if the repository config is not found or the application in repository is not found + * @throws NotFoundException if the repository config is not found or the application in + * repository is not found * @throws SourceControlException if unexpected errors happen when pulling the application. * @throws AuthenticationConfigException if the repository configuration authentication fails */ @@ -251,11 +273,12 @@ public ApplicationRecord pullAndDeploy(ApplicationReference appRef) throws Excep accessEnforcer.enforce(appId, authenticationContext.getPrincipal(), StandardPermission.CREATE); accessEnforcer.enforce(appRef.getParent(), authenticationContext.getPrincipal(), NamespacePermission.READ_REPOSITORY); - + PullAppResponse pullResponse = pullAndValidateApplication(appRef); AppRequest appRequest = pullResponse.getAppRequest(); - SourceControlMeta sourceControlMeta = new SourceControlMeta(pullResponse.getApplicationFileHash()); + SourceControlMeta sourceControlMeta = new SourceControlMeta( + pullResponse.getApplicationFileHash()); LOG.info("Start to deploy app {} in namespace {} by user {}", appId.getApplication(), @@ -263,34 +286,40 @@ public ApplicationRecord pullAndDeploy(ApplicationReference appRef) throws Excep appLifecycleService.decodeUserId(authenticationContext)); ApplicationWithPrograms app = appLifecycleService.deployApp(appId, appRequest, - sourceControlMeta, x -> { }, false); - - LOG.info("Successfully deployed app {} in namespace {} from artifact {} with configuration {} and " - + "principal {}", app.getApplicationId().getApplication(), app.getApplicationId().getNamespace(), - app.getArtifactId(), appRequest.getConfig(), app.getOwnerPrincipal() + sourceControlMeta, x -> { + }, false); + + LOG.info( + "Successfully deployed app {} in namespace {} from artifact {} with configuration {} and " + + "principal {}", app.getApplicationId().getApplication(), + app.getApplicationId().getNamespace(), + app.getArtifactId(), appRequest.getConfig(), app.getOwnerPrincipal() ); return new ApplicationRecord( - ArtifactSummary.from(app.getArtifactId().toApiArtifactId()), - app.getApplicationId().getApplication(), - app.getApplicationId().getVersion(), - app.getSpecification().getDescription(), - Optional.ofNullable(app.getOwnerPrincipal()).map(KerberosPrincipalId::getPrincipal).orElse(null), - app.getChangeDetail(), app.getSourceControlMeta()); + ArtifactSummary.from(app.getArtifactId().toApiArtifactId()), + app.getApplicationId().getApplication(), + app.getApplicationId().getVersion(), + app.getSpecification().getDescription(), + Optional.ofNullable(app.getOwnerPrincipal()).map(KerberosPrincipalId::getPrincipal) + .orElse(null), + app.getChangeDetail(), app.getSourceControlMeta()); } /** - * Pull the application from repository, look up the fileHash in store and compare it with the cone in repository. + * Pull the application from repository, look up the fileHash in store and compare it with the + * cone in repository. * * @param appRef {@link ApplicationReference} to fetch the application with * @return {@link PullAppResponse} * @throws NoChangesToPullException if the fileHashes are the same - * @throws NotFoundException if the repository config is not found or the application in repository is not found + * @throws NotFoundException if the repository config is not found or the application in + * repository is not found * @throws SourceControlException if unexpected errors happen when pulling the application. * @throws AuthenticationConfigException if the repository configuration authentication fails */ private PullAppResponse pullAndValidateApplication(ApplicationReference appRef) - throws NoChangesToPullException, NotFoundException, AuthenticationConfigException { + throws NoChangesToPullException, NotFoundException, AuthenticationConfigException { RepositoryConfig repoConfig = getRepositoryMeta(appRef.getParent()).getConfig(); SourceControlMeta latestMeta = store.getAppSourceControlMeta(appRef); PullAppResponse pullResponse = sourceControlOperationRunner.pull( @@ -298,8 +327,9 @@ private PullAppResponse pullAndValidateApplication(ApplicationReference appRe if (latestMeta != null && latestMeta.getFileHash().equals(pullResponse.getApplicationFileHash())) { - throw new NoChangesToPullException(String.format("Pipeline deployment was not successful because there is " - + "no new change for the pulled application: %s", appRef)); + throw new NoChangesToPullException( + String.format("Pipeline deployment was not successful because there is " + + "no new change for the pulled application: %s", appRef)); } return pullResponse; } @@ -308,15 +338,61 @@ private PullAppResponse pullAndValidateApplication(ApplicationReference appRe * The method to list all applications found in linked repository. * * @return {@link RepositoryAppsResponse} - * @throws RepositoryNotFoundException if the repository config is not found + * @throws RepositoryNotFoundException if the repository config is not found * @throws AuthenticationConfigException if git auth config is not found - * @throws SourceControlException if {@link SourceControlOperationRunner} fails to list applications + * @throws SourceControlException if {@link SourceControlOperationRunner} fails to list + * applications */ public RepositoryAppsResponse listApps(NamespaceId namespace) throws NotFoundException, - AuthenticationConfigException { + AuthenticationConfigException { accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(), NamespacePermission.READ_REPOSITORY); RepositoryConfig repoConfig = getRepositoryMeta(namespace).getConfig(); return sourceControlOperationRunner.list(new NamespaceRepository(namespace, repoConfig)); } + + /** + * The method to push multiple applications in the same namespace to the linked repository. + * + * @param namespace {@link NamespaceId} from where the apps are to be pushed + * @param request {@link PushMultipleAppsRequest} containing the appIds and the commit + * message + * @return {@link OperationRun} of the operation to push the apps + * @throws NotFoundException when the repository or any of the apps are not found + */ + public OperationRun pushApps(NamespaceId namespace, PushMultipleAppsRequest request) + throws NotFoundException, IOException, TooManyRequestsException { + accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(), + NamespacePermission.WRITE_REPOSITORY); + RepositoryConfig repoConfig = getRepositoryMeta(namespace).getConfig(); + String principal = authenticationContext.getPrincipal().getName(); + CommitMeta commitMeta = new CommitMeta(principal, principal, System.currentTimeMillis(), + request.getCommitMessage()); + PushAppsRequest pushOpRequest = new PushAppsRequest(new HashSet<>(request.getApps()), + repoConfig, commitMeta); + return operationLifecycleManager.createPushOperation(namespace.getNamespace(), + RunIds.generate().getId(), pushOpRequest, principal); + } + + /** + * The method to pull multiple applications from the linked repository and deploy them in current + * namespace. + * + * @param namespace {@link NamespaceId} from where the apps are to be pushed + * @param request {@link PullMultipleAppsRequest} containing the appIds + * @return {@link OperationRun} of the operation to push the apps + * @throws NotFoundException when the repository or any of the apps are not found + */ + public OperationRun pullApps(NamespaceId namespace, PullMultipleAppsRequest request) + throws NotFoundException, IOException, TooManyRequestsException { + accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(), + NamespacePermission.READ_REPOSITORY); + RepositoryConfig repoConfig = getRepositoryMeta(namespace).getConfig(); + String principal = authenticationContext.getPrincipal().getName(); + PullAppsRequest pullOpRequest = new PullAppsRequest(new HashSet<>(request.getApps()), + repoConfig); + + return operationLifecycleManager.createPullOperation(namespace.getNamespace(), + RunIds.generate().getId(), pullOpRequest, principal); + } } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java index 87f75810de6c..35a0d9160e95 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java @@ -18,13 +18,20 @@ import com.google.inject.Inject; import io.cdap.cdap.common.BadRequestException; +import io.cdap.cdap.common.TooManyRequestsException; +import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest; +import io.cdap.cdap.internal.app.sourcecontrol.PushAppsRequest; import io.cdap.cdap.proto.id.OperationRunId; import io.cdap.cdap.proto.operation.OperationError; +import io.cdap.cdap.proto.operation.OperationMeta; +import io.cdap.cdap.proto.operation.OperationRun; import io.cdap.cdap.proto.operation.OperationRunStatus; +import io.cdap.cdap.proto.operation.OperationType; import io.cdap.cdap.spi.data.StructuredTableContext; import io.cdap.cdap.spi.data.transaction.TransactionRunner; import io.cdap.cdap.spi.data.transaction.TransactionRunners; import java.io.IOException; +import java.time.Instant; import java.util.Collections; import java.util.function.Consumer; import org.slf4j.Logger; @@ -67,15 +74,15 @@ public boolean scanOperations(ScanOperationRunsRequest request, int txBatchSize, while (currentLimit > 0) { ScanOperationRunsRequest batchRequest = ScanOperationRunsRequest .builder(request) - .setScanAfter(lastKey) - .setLimit(Math.min(txBatchSize, currentLimit)) - .build(); + .setScanAfter(lastKey) + .setLimit(Math.min(txBatchSize, currentLimit)) + .build(); request = batchRequest; lastKey = TransactionRunners.run(transactionRunner, context -> { - return getOperationRunStore(context).scanOperations(batchRequest, consumer); - }, IOException.class, OperationRunNotFoundException.class); + return getOperationRunStore(context).scanOperations(batchRequest, consumer); + }, IOException.class, OperationRunNotFoundException.class); if (lastKey == null) { break; @@ -105,7 +112,7 @@ public OperationRunDetail getOperationRun(OperationRunId runId) } /** - * Validates state transition and sends STOPPING notification for an operation + * Validates state transition and sends STOPPING notification for an operation. * * @param runId {@link OperationRunId} of the operation */ @@ -129,6 +136,63 @@ public OperationController startOperation(OperationRunDetail detail) { return runtime.run(detail); } + /** + * Create a new pull operation. Inserts the run in DB and then send TMS message. + */ + public OperationRun createPushOperation(String namespace, String runId, PushAppsRequest request, + String principal) + throws IOException, TooManyRequestsException { + OperationMeta meta = OperationMeta.builder().setCreateTime(Instant.now()).build(); + OperationRunId operationRunId = new OperationRunId(namespace, runId); + OperationRun run = OperationRun.builder() + .setRunId(runId) + .setMetadata(meta) + .setStatus(OperationRunStatus.STARTING) + .setType(OperationType.PUSH_APPS) + .build(); + OperationRunDetail detail = OperationRunDetail.builder() + .setRun(run) + .setPrincipal(principal) + .setPushAppsRequest(request) + .setRunId(operationRunId) + .build(); + TransactionRunners.run(transactionRunner, context -> { + validateOnlyOneGitOperationRunning(namespace, context); + getOperationRunStore(context).createOperationRun(operationRunId, detail); + statePublisher.publishStarting(operationRunId); + }, TooManyRequestsException.class, IOException.class); + return run; + } + + /** + * Create a new pull operation. Inserts the run in DB and then send TMS message. + */ + public OperationRun createPullOperation(String namespace, String runId, PullAppsRequest request, + String principal) + throws IOException, TooManyRequestsException { + OperationMeta meta = OperationMeta.builder().setCreateTime(Instant.now()).build(); + OperationRunId operationRunId = new OperationRunId(namespace, runId); + OperationRun run = OperationRun.builder() + .setRunId(runId) + .setMetadata(meta) + .setStatus(OperationRunStatus.STARTING) + .setType(OperationType.PULL_APPS) + .build(); + OperationRunDetail detail = OperationRunDetail.builder() + .setRun(run) + .setPrincipal(principal) + .setPullAppsRequest(request) + .setRunId(operationRunId) + .build(); + TransactionRunners.run(transactionRunner, context -> { + validateOnlyOneGitOperationRunning(namespace, context); + getOperationRunStore(context).createOperationRun(operationRunId, detail); + statePublisher.publishStarting(operationRunId); + }, TooManyRequestsException.class, IOException.class); + + return run; + } + /** * Initiate operation stop. It is the responsibility of the caller to validate state transition. @@ -146,8 +210,8 @@ public void stopOperation(OperationRunDetail detail) { } /** - * Checks if the operation is running. If not sends a failure notification - * Called after service restart. + * Checks if the operation is running. If not sends a failure notification Called after service + * restart. * * @param detail {@link OperationRunDetail} of the operation */ @@ -162,6 +226,24 @@ public void isRunning(OperationRunDetail detail, OperationStatePublisher statePu } } + // Validate only one multi git operation running at a time + private void validateOnlyOneGitOperationRunning(String namespaceId, + StructuredTableContext context) + throws TooManyRequestsException, OperationRunNotFoundException, IOException { + OperationRunStore store = getOperationRunStore(context); + OperationRunDetail existing = store.getLatestActiveOperation(namespaceId, + OperationType.PULL_APPS); + if (existing != null) { + throw new TooManyRequestsException( + String.format("Already running a bulk pull operation %s", existing.getRun())); + } + existing = store.getLatestActiveOperation(namespaceId, OperationType.PUSH_APPS); + if (existing != null) { + throw new TooManyRequestsException( + String.format("Already running a bulk push operation %s", existing.getRun())); + } + } + private OperationRunStore getOperationRunStore(StructuredTableContext context) { return new OperationRunStore(context); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java index 407ce6d3a1a4..ae3cbaf039da 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java @@ -64,9 +64,8 @@ public class OperationRunDetail { protected OperationRunDetail( OperationRunId runId, OperationRun run, - byte[] sourceId, @Nullable String principal, - @Nullable PullAppsRequest pullAppsRequest, - @Nullable PushAppsRequest pushAppsRequest) { + @Nullable byte[] sourceId, @Nullable String principal, + @Nullable PullAppsRequest pullAppsRequest, @Nullable PushAppsRequest pushAppsRequest) { this.runId = runId; this.run = run; this.sourceId = sourceId; diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunStore.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunStore.java index aeeee7e8a978..702aa0e93659 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunStore.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunStore.java @@ -27,6 +27,7 @@ import io.cdap.cdap.proto.operation.OperationResource; import io.cdap.cdap.proto.operation.OperationRun; import io.cdap.cdap.proto.operation.OperationRunStatus; +import io.cdap.cdap.proto.operation.OperationType; import io.cdap.cdap.spi.data.SortOrder; import io.cdap.cdap.spi.data.StructuredRow; import io.cdap.cdap.spi.data.StructuredTable; @@ -44,6 +45,7 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import javax.annotation.Nullable; @@ -260,6 +262,43 @@ public void scanOperationByStatus(OperationRunStatus status, } } + /** + * Get the latest active operation of a given type in a namespace. + * + * @param namespaceId to serach in + * @param type {@link OperationType} to search + */ + public OperationRunDetail getLatestActiveOperation(String namespaceId, OperationType type) + throws OperationRunNotFoundException, IOException { + AtomicReference latestRun = new AtomicReference<>(); + // get STARTING + ScanOperationRunsRequest request = ScanOperationRunsRequest.builder() + .setNamespace(namespaceId).setFilter(new OperationRunFilter(type, OperationRunStatus.STARTING)) + .setLimit(1).build(); + scanOperations(request, latestRun::set); + + // get RUNNING + request = ScanOperationRunsRequest.builder() + .setNamespace(namespaceId).setFilter(new OperationRunFilter(type, OperationRunStatus.RUNNING)) + .setLimit(1).build(); + scanOperations(request, detail -> { + if (latestRun.get() != null && isRunLater(detail.getRun(), latestRun.get().getRun())) { + latestRun.set(detail); + } + }); + + // get STOPPING + request = ScanOperationRunsRequest.builder() + .setNamespace(namespaceId).setFilter(new OperationRunFilter(type, OperationRunStatus.STOPPING)) + .setLimit(1).build(); + scanOperations(request, detail -> { + if (latestRun.get() != null && isRunLater(detail.getRun(), latestRun.get().getRun())) { + latestRun.set(detail); + } + }); + return latestRun.get(); + } + private List> getRangeFields(OperationRunId runId) throws IOException, OperationRunNotFoundException { List> fields = new ArrayList<>(); @@ -342,6 +381,10 @@ private StructuredTable getOperationRunsTable(StructuredTableContext context) { return context.getTable(StoreDefinition.OperationRunsStore.OPERATION_RUNS); } +private boolean isRunLater(OperationRun run1, OperationRun run2) { + return run1.getMetadata().getCreateTime().isAfter(run2.getMetadata().getCreateTime()); +} + @VisibleForTesting // USE ONLY IN TESTS: WILL DELETE ALL OPERATION RUNS void clearData() throws IOException { diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/AppFabricTestBase.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/AppFabricTestBase.java index abf6a5a1c8ef..4807c5599f84 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/AppFabricTestBase.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/AppFabricTestBase.java @@ -109,7 +109,9 @@ import io.cdap.cdap.proto.id.ProgramId; import io.cdap.cdap.proto.id.ProgramReference; import io.cdap.cdap.proto.profile.Profile; +import io.cdap.cdap.proto.sourcecontrol.PullMultipleAppsRequest; import io.cdap.cdap.proto.sourcecontrol.PushAppRequest; +import io.cdap.cdap.proto.sourcecontrol.PushMultipleAppsRequest; import io.cdap.cdap.runtime.spi.profile.ProfileStatus; import io.cdap.cdap.scheduler.CoreSchedulerService; import io.cdap.cdap.scheduler.Scheduler; @@ -517,6 +519,16 @@ private HttpResponse addArtifact(Id.Artifact artifactId, InputSupplier appRequest) + throws Exception { + String deployPath = getVersionedInternalApiPath( + "apps/" + appId.getId() + "?skipMarkingLatest=true", appId.getNamespaceId()); + return executeDeploy(HttpRequest.put(getEndPoint(deployPath).toURL()), appRequest); + } + /** * Deploys an application. */ @@ -547,13 +559,6 @@ protected HttpResponse deploy(Id.Application appId, return executeDeploy(HttpRequest.put(getEndPoint(deployPath).toURL()), appRequest); } - protected HttpResponse deployWithoutMarkingLatest(Id.Application appId, AppRequest appRequest) - throws Exception { - String deployPath = getVersionedInternalApiPath( - "apps/" + appId.getId() + "?skipMarkingLatest=true", appId.getNamespaceId()); - return executeDeploy(HttpRequest.put(getEndPoint(deployPath).toURL()), appRequest); - } - protected HttpResponse deploy(ApplicationId appId, AppRequest appRequest) throws Exception { String deployPath = getVersionedApiPath(String.format("apps/%s/versions/%s/create", appId.getApplication(), appId.getVersion()), @@ -709,6 +714,13 @@ protected ApplicationDetail getAppDetails(String namespace, String appName) thro return readResponse(response, ApplicationDetail.class); } + protected ApplicationDetail getAppDetails(String namespace, String appName, String appVersion) throws Exception { + HttpResponse response = getAppResponse(namespace, appName, appVersion); + assertResponseCode(200, response); + Assert.assertEquals("application/json", getFirstHeaderValue(response, HttpHeaderNames.CONTENT_TYPE.toString())); + return readResponse(response, ApplicationDetail.class); + } + protected HttpResponse getAppResponse(String namespace, String appName) throws Exception { return doGet(getVersionedApiPath(String.format("apps/%s", appName), Constants.Gateway.API_VERSION_3_TOKEN, namespace)); @@ -729,13 +741,6 @@ protected Set getAppVersions(String namespace, String appName) throws Ex return readResponse(response, SET_STRING_TYPE); } - protected ApplicationDetail getAppDetails(String namespace, String appName, String appVersion) throws Exception { - HttpResponse response = getAppResponse(namespace, appName, appVersion); - assertResponseCode(200, response); - Assert.assertEquals("application/json", getFirstHeaderValue(response, HttpHeaderNames.CONTENT_TYPE.toString())); - return readResponse(response, ApplicationDetail.class); - } - /** * Checks the given schedule states. */ @@ -1402,6 +1407,15 @@ protected List getProgramRuns(Id.Program program, ProgramRunStatus st return GSON.fromJson(response.getResponseBodyAsString(), LIST_RUN_RECORD_TYPE); } + protected List getProgramRuns(ProgramId program, ProgramRunStatus status) throws Exception { + String path = String.format("apps/%s/versions/%s/%s/%s/runs?status=%s", program.getApplication(), + program.getVersion(), program.getType().getCategoryName(), program.getProgram(), + status.toString()); + HttpResponse response = doGet(getVersionedApiPath(path, program.getNamespace())); + assertResponseCode(200, response); + return GSON.fromJson(response.getResponseBodyAsString(), LIST_RUN_RECORD_TYPE); + } + protected int getProgramRunRecord(Id.Program program, String runId) throws Exception { String path = String.format("apps/%s/%s/%s/runs/%s", program.getApplicationId(), program.getType().getCategoryName(), program.getId(), runId); @@ -1438,27 +1452,18 @@ protected void assertProgramRuns(final ProgramId program, final ProgramRunStatus Tasks.waitFor(true, () -> getProgramRuns(program, status).size() == expected, 15, TimeUnit.SECONDS); } - protected List getProgramRuns(ProgramId program, ProgramRunStatus status) throws Exception { - String path = String.format("apps/%s/versions/%s/%s/%s/runs?status=%s", program.getApplication(), - program.getVersion(), program.getType().getCategoryName(), program.getProgram(), - status.toString()); - HttpResponse response = doGet(getVersionedApiPath(path, program.getNamespace())); - assertResponseCode(200, response); - return GSON.fromJson(response.getResponseBodyAsString(), LIST_RUN_RECORD_TYPE); - } - protected HttpResponse createNamespace(String id) throws Exception { return doPut(String.format("%s/namespaces/%s", Constants.Gateway.API_VERSION_3, id), null); } - protected HttpResponse deleteNamespace(String name) throws Exception { - return doDelete(String.format("%s/unrecoverable/namespaces/%s", Constants.Gateway.API_VERSION_3, name)); - } - protected HttpResponse createNamespace(String metadata, String id) throws Exception { return doPut(String.format("%s/namespaces/%s", Constants.Gateway.API_VERSION_3, id), metadata); } + protected HttpResponse deleteNamespace(String name) throws Exception { + return doDelete(String.format("%s/unrecoverable/namespaces/%s", Constants.Gateway.API_VERSION_3, name)); + } + protected HttpResponse listAllNamespaces() throws Exception { return doGet(String.format("%s/namespaces", Constants.Gateway.API_VERSION_3)); } @@ -1497,12 +1502,26 @@ protected HttpResponse pushApplication(ApplicationReference appRef, String commi appRef.getNamespace(), appRef.getApplication()), GSON.toJson(request)); } + protected HttpResponse pushApplications(String namespace, List apps, String commitMessage) + throws Exception { + PushMultipleAppsRequest request = new PushMultipleAppsRequest(apps, commitMessage); + return doPost(String.format("%s/namespaces/%s/repository/apps/push", + Constants.Gateway.API_VERSION_3, namespace), GSON.toJson(request)); + } + protected HttpResponse pullApplication(ApplicationReference appRef) throws Exception { return doPost(String.format("%s/namespaces/%s/repository/apps/%s/pull", Constants.Gateway.API_VERSION_3, appRef.getNamespace(), appRef.getApplication())); } + protected HttpResponse pullApplications(String namespace, List apps) + throws Exception { + PullMultipleAppsRequest request = new PullMultipleAppsRequest(apps); + return doPost(String.format("%s/namespaces/%s/repository/apps/pull", + Constants.Gateway.API_VERSION_3, namespace), GSON.toJson(request)); + } + protected HttpResponse listApplicationsFromRepository(String namespace) throws Exception { return doGet(String.format("%s/namespaces/%s/repository/apps", Constants.Gateway.API_VERSION_3, namespace)); } @@ -1565,6 +1584,20 @@ protected File buildAppArtifact(Class cls, String name) throws IOException { return buildAppArtifact(cls, name, new Manifest()); } + private File buildAppArtifact(Class cls, String name, Manifest manifest) throws IOException { + if (!name.endsWith(".jar")) { + name += ".jar"; + } + File destination = new File(tmpFolder.newFolder(), name); + return buildAppArtifact(cls, manifest, destination); + } + + protected File buildAppArtifact(Class cls, Manifest manifest, File destination) throws IOException { + Location appJar = AppJarHelper.createDeploymentJar(locationFactory, cls, manifest); + Locations.linkOrCopyOverwrite(appJar, destination); + return destination; + } + /** * If the configuration has authorization enabled, e.g. with * {@link io.cdap.cdap.internal.AppFabricTestHelper#enableAuthorization(CConfiguration, TemporaryFolder)}, allows @@ -1580,20 +1613,6 @@ protected void doAs(String user, Retries.Runnable actio } } - private File buildAppArtifact(Class cls, String name, Manifest manifest) throws IOException { - if (!name.endsWith(".jar")) { - name += ".jar"; - } - File destination = new File(tmpFolder.newFolder(), name); - return buildAppArtifact(cls, manifest, destination); - } - - protected File buildAppArtifact(Class cls, Manifest manifest, File destination) throws IOException { - Location appJar = AppJarHelper.createDeploymentJar(locationFactory, cls, manifest); - Locations.linkOrCopyOverwrite(appJar, destination); - return destination; - } - protected DatasetMeta getDatasetMeta(DatasetId datasetId) throws UnauthorizedException, UnauthenticatedException, NotFoundException, IOException { return datasetClient.get(datasetId); diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/SourceControlManagementHttpHandlerTests.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/SourceControlManagementHttpHandlerTests.java index 209c86887d8e..42366e6371e7 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/SourceControlManagementHttpHandlerTests.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/http/handlers/SourceControlManagementHttpHandlerTests.java @@ -28,14 +28,20 @@ import io.cdap.cdap.common.NotFoundException; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.id.Id; +import io.cdap.cdap.common.id.Id.Namespace; import io.cdap.cdap.features.Feature; import io.cdap.cdap.gateway.handlers.SourceControlManagementHttpHandler; import io.cdap.cdap.internal.app.services.ApplicationLifecycleService; import io.cdap.cdap.internal.app.services.SourceControlManagementService; import io.cdap.cdap.internal.app.services.http.AppFabricTestBase; +import io.cdap.cdap.internal.operation.OperationLifecycleManager; import io.cdap.cdap.metadata.MetadataSubscriberService; import io.cdap.cdap.proto.ApplicationRecord; import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.proto.operation.OperationMeta; +import io.cdap.cdap.proto.operation.OperationRun; +import io.cdap.cdap.proto.operation.OperationRunStatus; +import io.cdap.cdap.proto.operation.OperationType; import io.cdap.cdap.proto.sourcecontrol.AuthConfig; import io.cdap.cdap.proto.sourcecontrol.AuthType; import io.cdap.cdap.proto.sourcecontrol.PatConfig; @@ -61,6 +67,7 @@ import io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner; import io.cdap.cdap.spi.data.transaction.TransactionRunner; import io.cdap.common.http.HttpResponse; +import java.time.Instant; import java.util.Arrays; import java.util.Map; import javax.annotation.Nullable; @@ -119,17 +126,20 @@ public SourceControlManagementService provideSourceControlManagementService( AuthenticationContext authenticationContext, SourceControlOperationRunner sourceControlRunner, ApplicationLifecycleService applicationLifecycleService, - Store store) { + Store store, OperationLifecycleManager manager) { + return Mockito.spy(new SourceControlManagementService(cConf, secureStore, transactionRunner, accessEnforcer, authenticationContext, sourceControlRunner, applicationLifecycleService, - store)); + store, manager)); } }); } private static void setScmFeatureFlag(boolean flag) { cConf.setBoolean(FEATURE_FLAG_PREFIX + Feature.SOURCE_CONTROL_MANAGEMENT_GIT.getFeatureFlagString(), flag); + cConf.setBoolean(FEATURE_FLAG_PREFIX + + Feature.SOURCE_CONTROL_MANAGEMENT_MULTI_APP.getFeatureFlagString(), flag); } private void assertResponseCode(int expected, HttpResponse response) { @@ -497,6 +507,124 @@ public void testListAppsNotFound() throws Exception { assertResponseCode(404, response); } + @Test + public void testPushAppsSucceeds() throws Exception { + String commitMessage = "push two apps"; + OperationRun expectedResponse = OperationRun.builder().setRunId("1") + .setStatus(OperationRunStatus.STARTING).setType(OperationType.PUSH_APPS).setMetadata( + OperationMeta.builder().setCreateTime(Instant.now()).build() + ).build(); + + Mockito.doReturn(expectedResponse).when(sourceControlService) + .pushApps(Mockito.any(), Mockito.any()); + HttpResponse response = pushApplications(Namespace.DEFAULT.getId(), + Arrays.asList("appToPush1", "appToPush2"), commitMessage); + + assertResponseCode(200, response); + OperationMeta result = readResponse(response, OperationRun.class); + Assert.assertEquals(result, expectedResponse); + } + + @Test + public void testPushAppsInvalidRequest() throws Exception { + // Push empty commit message + String commitMessage = ""; + HttpResponse response = pushApplications(NamespaceId.DEFAULT.getNamespace(), + Arrays.asList("appToPush1", "appToPush2"), commitMessage); + + // Assert the response + assertResponseCode(400, response); + Assert.assertEquals(response.getResponseBodyAsString(), + "Please specify commit message in the request body."); + } + + @Test + public void testPushAppsNotFound() throws Exception { + String commitMessage = "push two apps"; + Mockito.doThrow(new NotFoundException("apps not found")).when(sourceControlService) + .pushApps(Mockito.any(), Mockito.any()); + HttpResponse response = pushApplications(NamespaceId.DEFAULT.getNamespace(), + Arrays.asList("appToPush1", "appToPush2"), commitMessage); + + assertResponseCode(404, response); + Assert.assertEquals(response.getResponseBodyAsString(), "apps not found"); + } + + @Test + public void testPushAppsSourceControlException() throws Exception { + String commitMessage = "push two apps"; + Mockito.doThrow(new SourceControlException("Failed to push apps.")).when(sourceControlService) + .pushApps(Mockito.any(), Mockito.any()); + HttpResponse response = pushApplications(NamespaceId.DEFAULT.getNamespace(), + Arrays.asList("appToPush1", "appToPush2"), commitMessage); + + assertResponseCode(500, response); + Assert.assertTrue(response.getResponseBodyAsString().contains("Failed to push apps.")); + } + + @Test + public void testPushAppsInvalidAuthenticationConfig() throws Exception { + String commitMessage = "push two apps"; + Mockito.doThrow(new AuthenticationConfigException("Repository config not valid")).when(sourceControlService) + .pushApps(Mockito.any(), Mockito.any()); + HttpResponse response = pushApplications(NamespaceId.DEFAULT.getNamespace(), + Arrays.asList("appToPush1", "appToPush2"), commitMessage); + + assertResponseCode(500, response); + Assert.assertTrue(response.getResponseBodyAsString().contains("Repository config not valid")); + } + + @Test + public void testPullAppsSucceeds() throws Exception { + OperationRun expectedResponse = OperationRun.builder().setRunId("2") + .setStatus(OperationRunStatus.STARTING).setType(OperationType.PULL_APPS).setMetadata( + OperationMeta.builder().setCreateTime(Instant.now()).build() + ).build(); + + Mockito.doReturn(expectedResponse).when(sourceControlService) + .pullApps(Mockito.any(), Mockito.any()); + HttpResponse response = pullApplications(Namespace.DEFAULT.getId(), + Arrays.asList("appToPush1", "appToPush2")); + + assertResponseCode(200, response); + OperationMeta result = readResponse(response, OperationRun.class); + Assert.assertEquals(result, expectedResponse); + } + + @Test + public void testPullAppsNotFound() throws Exception { + Mockito.doThrow(new NotFoundException("apps not found")).when(sourceControlService) + .pullApps(Mockito.any(), Mockito.any()); + HttpResponse response = pullApplications(NamespaceId.DEFAULT.getNamespace(), + Arrays.asList("appToPush1", "appToPush2")); + + // Assert the app is not found + assertResponseCode(404, response); + Assert.assertEquals(response.getResponseBodyAsString(), "apps not found"); + } + + @Test + public void testPullAppsSourceControlException() throws Exception { + Mockito.doThrow(new SourceControlException("Failed to pull apps.")).when(sourceControlService) + .pullApps(Mockito.any(), Mockito.any()); + HttpResponse response = pullApplications(NamespaceId.DEFAULT.getNamespace(), + Arrays.asList("appToPush1", "appToPush2")); + + assertResponseCode(500, response); + Assert.assertTrue(response.getResponseBodyAsString().contains("Failed to pull apps.")); + } + + @Test + public void testPullAppsInvalidAuthenticationConfig() throws Exception { + Mockito.doThrow(new AuthenticationConfigException("Repository config not valid.")).when(sourceControlService) + .pullApps(Mockito.any(), Mockito.any()); + HttpResponse response = pullApplications(NamespaceId.DEFAULT.getNamespace(), + Arrays.asList("appToPush1", "appToPush2")); + + assertResponseCode(500, response); + Assert.assertTrue(response.getResponseBodyAsString().contains("Repository config not valid.")); + } + private String buildRepoRequestString(Provider provider, String link, String defaultBranch, AuthConfig authConfig, @Nullable String pathPrefix) { Map patJsonMap = ImmutableMap.of( diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationLifecycleManagerTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationLifecycleManagerTest.java index 72fc992b6801..6a6efdd61ff8 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationLifecycleManagerTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationLifecycleManagerTest.java @@ -22,6 +22,7 @@ import com.google.inject.Injector; import com.google.inject.Scopes; import io.cdap.cdap.api.metrics.MetricsCollectionService; +import io.cdap.cdap.common.TooManyRequestsException; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants.AppFabric; import io.cdap.cdap.common.guice.ConfigModule; @@ -52,12 +53,11 @@ import org.mockito.Mockito; public class OperationLifecycleManagerTest extends OperationTestBase { + protected static TransactionRunner transactionRunner; - private static final String testNamespace = "test"; - private static OperationLifecycleManager operationLifecycleManager; private static int batchSize; - @ClassRule + @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); private static EmbeddedPostgres postgres; @@ -92,9 +92,6 @@ protected void configure() { }); transactionRunner = injector.getInstance(TransactionRunner.class); - operationLifecycleManager = - new OperationLifecycleManager(transactionRunner, Mockito.mock(OperationRuntime.class), - null); StoreDefinition.OperationRunsStore.create(injector.getInstance(StructuredTableAdmin.class)); batchSize = cConf.getInt(AppFabric.STREAMING_BATCH_SIZE); } @@ -106,6 +103,8 @@ public static void afterClass() { @Test public void testScanOperations() throws Exception { + OperationLifecycleManager manager = + new OperationLifecycleManager(transactionRunner, null, null); List insertedRuns = insertTestRuns(transactionRunner); // get a filtered list of testNamespace runs List testNamespaceRuns = @@ -113,98 +112,127 @@ public void testScanOperations() throws Exception { .filter(detail -> detail.getRunId().getNamespace().equals(testNamespace)) .collect(Collectors.toList()); - TransactionRunners.run( - transactionRunner, - context -> { - List gotRuns = new ArrayList<>(); - List expectedRuns; - ScanOperationRunsRequest request; - - // verify the scan without filters picks all runs for testNamespace - request = ScanOperationRunsRequest.builder().setNamespace(testNamespace).build(); - operationLifecycleManager.scanOperations(request, batchSize, gotRuns::add); - expectedRuns = testNamespaceRuns; - Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray()); - - // verify limit - gotRuns.clear(); - request = - ScanOperationRunsRequest.builder().setNamespace(testNamespace).setLimit(2).build(); - operationLifecycleManager.scanOperations(request, batchSize, gotRuns::add); - expectedRuns = testNamespaceRuns.stream().limit(2).collect(Collectors.toList()); - Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray()); - - // verify the scan with type filter - gotRuns.clear(); - request = - ScanOperationRunsRequest.builder() - .setNamespace(testNamespace) - .setFilter(new OperationRunFilter(OperationType.PUSH_APPS, null)) - .build(); - operationLifecycleManager.scanOperations(request, batchSize, gotRuns::add); - expectedRuns = - testNamespaceRuns.stream() - .filter(detail -> detail.getRun().getType().equals(OperationType.PUSH_APPS)) - .collect(Collectors.toList()); - Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray()); - - // verify the scan with status filter and limit - gotRuns.clear(); - request = - ScanOperationRunsRequest.builder() - .setNamespace(testNamespace) - .setLimit(2) - .setFilter( - new OperationRunFilter(OperationType.PULL_APPS, OperationRunStatus.FAILED)) - .build(); - operationLifecycleManager.scanOperations(request, batchSize, gotRuns::add); - expectedRuns = - testNamespaceRuns.stream() - .filter(detail -> detail.getRun().getType().equals(OperationType.PULL_APPS)) - .filter(detail -> detail.getRun().getStatus().equals(OperationRunStatus.FAILED)) - .limit(2) - .collect(Collectors.toList()); - Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray()); - - // verify the scan with status filter - gotRuns.clear(); - request = - ScanOperationRunsRequest.builder() - .setNamespace(testNamespace) - .setFilter( - new OperationRunFilter(OperationType.PULL_APPS, OperationRunStatus.FAILED)) - .build(); - operationLifecycleManager.scanOperations(request, batchSize, gotRuns::add); - expectedRuns = - testNamespaceRuns.stream() - .filter(detail -> detail.getRun().getType().equals(OperationType.PULL_APPS)) - .filter(detail -> detail.getRun().getStatus().equals(OperationRunStatus.FAILED)) - .collect(Collectors.toList()); - Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray()); - }); + List gotRuns = new ArrayList<>(); + List expectedRuns; + ScanOperationRunsRequest request; + + // verify the scan without filters picks all runs for testNamespace + request = ScanOperationRunsRequest.builder().setNamespace(testNamespace).build(); + manager.scanOperations(request, batchSize, gotRuns::add); + expectedRuns = testNamespaceRuns; + Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray()); + + // verify limit + gotRuns.clear(); + request = + ScanOperationRunsRequest.builder().setNamespace(testNamespace).setLimit(2).build(); + manager.scanOperations(request, batchSize, gotRuns::add); + expectedRuns = testNamespaceRuns.stream().limit(2).collect(Collectors.toList()); + Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray()); + + // verify the scan with type filter + gotRuns.clear(); + request = + ScanOperationRunsRequest.builder() + .setNamespace(testNamespace) + .setFilter(new OperationRunFilter(OperationType.PUSH_APPS, null)) + .build(); + manager.scanOperations(request, batchSize, gotRuns::add); + expectedRuns = + testNamespaceRuns.stream() + .filter(detail -> detail.getRun().getType().equals(OperationType.PUSH_APPS)) + .collect(Collectors.toList()); + Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray()); + + // verify the scan with status filter and limit + gotRuns.clear(); + request = + ScanOperationRunsRequest.builder() + .setNamespace(testNamespace) + .setLimit(2) + .setFilter( + new OperationRunFilter(OperationType.PULL_APPS, OperationRunStatus.FAILED)) + .build(); + manager.scanOperations(request, batchSize, gotRuns::add); + expectedRuns = + testNamespaceRuns.stream() + .filter(detail -> detail.getRun().getType().equals(OperationType.PULL_APPS)) + .filter(detail -> detail.getRun().getStatus().equals(OperationRunStatus.FAILED)) + .limit(2) + .collect(Collectors.toList()); + Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray()); + + // verify the scan with status filter + gotRuns.clear(); + request = + ScanOperationRunsRequest.builder() + .setNamespace(testNamespace) + .setFilter( + new OperationRunFilter(OperationType.PULL_APPS, OperationRunStatus.FAILED)) + .build(); + manager.scanOperations(request, batchSize, gotRuns::add); + expectedRuns = + testNamespaceRuns.stream() + .filter(detail -> detail.getRun().getType().equals(OperationType.PULL_APPS)) + .filter(detail -> detail.getRun().getStatus().equals(OperationRunStatus.FAILED)) + .collect(Collectors.toList()); + Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray()); } @Test public void testGetOperation() throws Exception { + OperationLifecycleManager manager = + new OperationLifecycleManager(transactionRunner, null, null); OperationRunDetail expectedDetail = insertRun( testNamespace, OperationType.PUSH_APPS, OperationRunStatus.RUNNING, transactionRunner); String testId = expectedDetail.getRun().getId(); OperationRunId runId = new OperationRunId(testNamespace, testId); - TransactionRunners.run( - transactionRunner, - context -> { - OperationRunDetail gotDetail = operationLifecycleManager.getOperationRun(runId); - Assert.assertEquals(expectedDetail, gotDetail); - try { - operationLifecycleManager.getOperationRun( - new OperationRunId(Namespace.DEFAULT.getId(), testId)); - Assert.fail("Found unexpected run in default namespace"); - } catch (OperationRunNotFoundException e) { - // expected - } - }, - Exception.class); + OperationRunDetail gotDetail = manager.getOperationRun(runId); + Assert.assertEquals(expectedDetail, gotDetail); + try { + manager.getOperationRun( + new OperationRunId(Namespace.DEFAULT.getId(), testId)); + Assert.fail("Found unexpected run in default namespace"); + } catch (OperationRunNotFoundException e) { + // expected + } + } + + @Test + public void testCreatePushOperation() throws Exception { + OperationStatePublisher publisher = Mockito.mock(OperationStatePublisher.class); + OperationLifecycleManager manager = + new OperationLifecycleManager(transactionRunner, null, publisher); + // happy path + manager.createPushOperation(testNamespace, "1", testPushRequest, "test"); + Mockito.verify(publisher).publishStarting(new OperationRunId(testNamespace, "1")); + + // test two run at a time fails + try { + manager.createPushOperation(testNamespace, "2", testPushRequest, "test"); + Assert.fail("Expected exception"); + } catch (TooManyRequestsException e) { + // expected + } + } + + @Test + public void testCreatePullOperation() throws Exception { + OperationStatePublisher publisher = Mockito.mock(OperationStatePublisher.class); + OperationLifecycleManager manager = + new OperationLifecycleManager(transactionRunner, null, publisher); + // happy path + manager.createPullOperation(testNamespace, "1", testPullRequest, "test"); + Mockito.verify(publisher).publishStarting(new OperationRunId(testNamespace, "1")); + + // test two run at a time fails + try { + manager.createPullOperation(testNamespace, "2", testPullRequest, "test"); + Assert.fail("Expected exception"); + } catch (TooManyRequestsException e) { + // expected + } } } diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationTestBase.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationTestBase.java index 0e0dad18ab74..3fb825da7d07 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationTestBase.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationTestBase.java @@ -20,6 +20,7 @@ import io.cdap.cdap.common.id.Id.Namespace; import io.cdap.cdap.internal.AppFabricTestHelper; import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest; +import io.cdap.cdap.internal.app.sourcecontrol.PushAppsRequest; import io.cdap.cdap.proto.id.OperationRunId; import io.cdap.cdap.proto.operation.OperationMeta; import io.cdap.cdap.proto.operation.OperationRun; @@ -40,7 +41,9 @@ public class OperationTestBase { private static final AtomicInteger sourceId = new AtomicInteger(); private static final AtomicLong runIdTime = new AtomicLong(System.currentTimeMillis()); protected static final String testNamespace = "test"; - private static final PullAppsRequest input = new PullAppsRequest(Collections.emptySet(), null); + protected static final PullAppsRequest testPullRequest = new PullAppsRequest(Collections.emptySet(), null); + protected static final PushAppsRequest testPushRequest = new PushAppsRequest(Collections.emptySet(), null, null); + protected static OperationRunDetail getRun(OperationRunId runId, TransactionRunner transactionRunner) @@ -75,7 +78,7 @@ protected static OperationRunDetail insertRun( .setSourceId(AppFabricTestHelper.createSourceId(sourceId.incrementAndGet())) .setRunId(runId) .setRun(run) - .setPullAppsRequest(input) + .setPullAppsRequest(testPullRequest) .build(); TransactionRunners.run( transactionRunner, diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/sourcecontrol/PullMultipleAppsRequest.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/sourcecontrol/PullMultipleAppsRequest.java new file mode 100644 index 000000000000..9db83e09aca5 --- /dev/null +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/sourcecontrol/PullMultipleAppsRequest.java @@ -0,0 +1,34 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.proto.sourcecontrol; + +import java.util.List; + +/** + * The request class to push multiple applications (in the same namespace) to linked git repository. + */ +public class PullMultipleAppsRequest { + private final List apps; + + public PullMultipleAppsRequest(List apps) { + this.apps = apps; + } + + public List getApps() { + return apps; + } +} diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/sourcecontrol/PushMultipleAppsRequest.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/sourcecontrol/PushMultipleAppsRequest.java new file mode 100644 index 000000000000..c81155dda223 --- /dev/null +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/sourcecontrol/PushMultipleAppsRequest.java @@ -0,0 +1,40 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.proto.sourcecontrol; + +import java.util.List; + +/** + * The request class to push multiple applications (in the same namespace) to linked git repository. + */ +public class PushMultipleAppsRequest { + private final String commitMessage; + private final List apps; + + public PushMultipleAppsRequest(List apps, String commitMessage) { + this.apps = apps; + this.commitMessage = commitMessage; + } + + public String getCommitMessage() { + return commitMessage; + } + + public List getApps() { + return apps; + } +}