diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java index 82622b1e7c3e..2fc26e7e698e 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java @@ -34,7 +34,11 @@ import io.cdap.cdap.proto.ApplicationRecord; import io.cdap.cdap.proto.id.ApplicationReference; import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.proto.operation.OperationMeta; +import io.cdap.cdap.proto.operation.OperationRun; +import io.cdap.cdap.proto.sourcecontrol.PullMultipleAppsRequest; import io.cdap.cdap.proto.sourcecontrol.PushAppRequest; +import io.cdap.cdap.proto.sourcecontrol.PushMultipleAppsRequest; import io.cdap.cdap.proto.sourcecontrol.RemoteRepositoryValidationException; import io.cdap.cdap.proto.sourcecontrol.RepositoryConfigRequest; import io.cdap.cdap.proto.sourcecontrol.RepositoryConfigValidationException; @@ -47,7 +51,6 @@ import io.cdap.http.HttpResponder; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; - import javax.ws.rs.DELETE; import javax.ws.rs.GET; import javax.ws.rs.POST; @@ -183,6 +186,49 @@ public void pushApp(FullHttpRequest request, HttpResponder responder, } } + /** + * Pushes application configs of requested applications to linked repository in Json format. + * It expects a post body that has a list of application ids and an optional commit message + * E.g. + * + *
+ * {@code + * { + * "appIds": ["app_id_1", "app_id_2"], + * "commitMessage": "pushed application XYZ" + * } + * } + * + *+ * The response will be a {@link OperationMeta} object, which encapsulates the application name, + * version and fileHash. + */ + @POST + @Path("/apps/push") + public void pushApps(FullHttpRequest request, HttpResponder responder, + @PathParam("namespace-id") String namespaceId) throws Exception { + checkSourceControlMultiFeatureFlag(); + NamespaceId namespace = validateNamespaceId(namespaceId); + + PushMultipleAppsRequest appsRequest; + try { + appsRequest = parseBody(request, PushMultipleAppsRequest.class); + } catch (JsonSyntaxException e) { + throw new BadRequestException("Invalid request body.", e); + } + + if (appsRequest == null) { + throw new BadRequestException("Invalid request body."); + } + + if (Strings.isNullOrEmpty(appsRequest.getCommitMessage())) { + throw new BadRequestException("Please specify commit message in the request body."); + } + + OperationRun operationMeta = sourceControlService.pushApps(namespace, appsRequest); + responder.sendJson(HttpResponseStatus.OK, GSON.toJson(operationMeta)); + } + /** * Pull the requested application from linked repository and deploy in current namespace. */ @@ -202,6 +248,31 @@ public void pullApp(FullHttpRequest request, HttpResponder responder, } } + /** + * Pull the requested applications from linked repository and deploy in current namespace. + */ + @POST + @Path("/apps/pull") + public void pullApps(FullHttpRequest request, HttpResponder responder, + @PathParam("namespace-id") String namespaceId) throws Exception { + checkSourceControlMultiFeatureFlag(); + NamespaceId namespace = validateNamespaceId(namespaceId); + + PullMultipleAppsRequest appsRequest; + try { + appsRequest = parseBody(request, PullMultipleAppsRequest.class); + } catch (JsonSyntaxException e) { + throw new BadRequestException("Invalid request body.", e); + } + + if (appsRequest == null) { + throw new BadRequestException("Invalid request body."); + } + + OperationRun operationRun = sourceControlService.pullApps(namespace, appsRequest); + responder.sendJson(HttpResponseStatus.OK, GSON.toJson(operationRun)); + } + private PushAppRequest validateAndGetAppsRequest(FullHttpRequest request) throws BadRequestException { PushAppRequest appRequest; try { @@ -217,16 +288,19 @@ private PushAppRequest validateAndGetAppsRequest(FullHttpRequest request) throws return appRequest; } - /** - * - * throws {@link ForbiddenException} if the feature is disabled - */ private void checkSourceControlFeatureFlag() throws ForbiddenException { if (!Feature.SOURCE_CONTROL_MANAGEMENT_GIT.isEnabled(featureFlagsProvider)) { throw new ForbiddenException("Source Control Management feature is not enabled."); } } + private void checkSourceControlMultiFeatureFlag() throws ForbiddenException { + checkSourceControlFeatureFlag(); + if (!Feature.SOURCE_CONTROL_MANAGEMENT_MULTI_APP.isEnabled(featureFlagsProvider)) { + throw new ForbiddenException("Source Control Management for multiple apps feature is not enabled."); + } + } + private NamespaceId validateNamespaceId(String namespaceId) throws BadRequestException { try { return new NamespaceId(namespaceId); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java index 9e24ecac06b2..80e2f5cbbf17 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java @@ -23,9 +23,13 @@ import io.cdap.cdap.common.NamespaceNotFoundException; import io.cdap.cdap.common.NotFoundException; import io.cdap.cdap.common.RepositoryNotFoundException; +import io.cdap.cdap.common.TooManyRequestsException; import io.cdap.cdap.common.app.RunIds; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.internal.app.deploy.pipeline.ApplicationWithPrograms; +import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest; +import io.cdap.cdap.internal.app.sourcecontrol.PushAppsRequest; +import io.cdap.cdap.internal.operation.OperationLifecycleManager; import io.cdap.cdap.proto.ApplicationDetail; import io.cdap.cdap.proto.ApplicationRecord; import io.cdap.cdap.proto.artifact.AppRequest; @@ -33,8 +37,11 @@ import io.cdap.cdap.proto.id.ApplicationReference; import io.cdap.cdap.proto.id.KerberosPrincipalId; import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.proto.operation.OperationRun; import io.cdap.cdap.proto.security.NamespacePermission; import io.cdap.cdap.proto.security.StandardPermission; +import io.cdap.cdap.proto.sourcecontrol.PullMultipleAppsRequest; +import io.cdap.cdap.proto.sourcecontrol.PushMultipleAppsRequest; import io.cdap.cdap.proto.sourcecontrol.RemoteRepositoryValidationException; import io.cdap.cdap.proto.sourcecontrol.RepositoryConfig; import io.cdap.cdap.proto.sourcecontrol.RepositoryMeta; @@ -62,15 +69,17 @@ import io.cdap.cdap.store.NamespaceTable; import io.cdap.cdap.store.RepositoryTable; import java.io.IOException; +import java.util.HashSet; import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Service that manages source control for repositories and applications. - * It exposes repository CRUD apis and source control tasks that do pull/pull/list applications in linked repository. + * Service that manages source control for repositories and applications. It exposes repository CRUD + * apis and source control tasks that do pull/pull/list applications in linked repository. */ public class SourceControlManagementService { + private final AccessEnforcer accessEnforcer; private final AuthenticationContext authenticationContext; private final TransactionRunner transactionRunner; @@ -79,6 +88,7 @@ public class SourceControlManagementService { private final SourceControlOperationRunner sourceControlOperationRunner; private final ApplicationLifecycleService appLifecycleService; private final Store store; + private final OperationLifecycleManager operationLifecycleManager; private static final Logger LOG = LoggerFactory.getLogger(SourceControlManagementService.class); @@ -87,13 +97,14 @@ public class SourceControlManagementService { */ @Inject public SourceControlManagementService(CConfiguration cConf, - SecureStore secureStore, - TransactionRunner transactionRunner, - AccessEnforcer accessEnforcer, - AuthenticationContext authenticationContext, - SourceControlOperationRunner sourceControlOperationRunner, - ApplicationLifecycleService applicationLifecycleService, - Store store) { + SecureStore secureStore, + TransactionRunner transactionRunner, + AccessEnforcer accessEnforcer, + AuthenticationContext authenticationContext, + SourceControlOperationRunner sourceControlOperationRunner, + ApplicationLifecycleService applicationLifecycleService, + Store store, + OperationLifecycleManager operationLifecycleManager) { this.cConf = cConf; this.secureStore = secureStore; this.transactionRunner = transactionRunner; @@ -102,13 +113,16 @@ public SourceControlManagementService(CConfiguration cConf, this.sourceControlOperationRunner = sourceControlOperationRunner; this.appLifecycleService = applicationLifecycleService; this.store = store; + this.operationLifecycleManager = operationLifecycleManager; } - private RepositoryTable getRepositoryTable(StructuredTableContext context) throws TableNotFoundException { + private RepositoryTable getRepositoryTable(StructuredTableContext context) + throws TableNotFoundException { return new RepositoryTable(context); } - private NamespaceTable getNamespaceTable(StructuredTableContext context) throws TableNotFoundException { + private NamespaceTable getNamespaceTable(StructuredTableContext context) + throws TableNotFoundException { return new NamespaceTable(context); } @@ -121,7 +135,7 @@ private NamespaceTable getNamespaceTable(StructuredTableContext context) throws * @throws NamespaceNotFoundException if the namespace is non-existent */ public RepositoryMeta setRepository(NamespaceId namespace, RepositoryConfig repository) - throws NamespaceNotFoundException { + throws NamespaceNotFoundException { accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(), NamespacePermission.UPDATE_REPOSITORY_METADATA); @@ -155,9 +169,11 @@ public void deleteRepository(NamespaceId namespace) { /** * Return the repository config for the given namespace. * - * @throws RepositoryNotFoundException if repository config is not available for the namespace + * @throws RepositoryNotFoundException if repository config is not available for the + * namespace */ - public RepositoryMeta getRepositoryMeta(NamespaceId namespace) throws RepositoryNotFoundException { + public RepositoryMeta getRepositoryMeta(NamespaceId namespace) + throws RepositoryNotFoundException { accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(), StandardPermission.GET); return TransactionRunners.run(transactionRunner, context -> { @@ -177,10 +193,11 @@ public RepositoryMeta getRepositoryMeta(NamespaceId namespace) throws Repository * @throws RemoteRepositoryValidationException if validation of remote repository fails */ public void validateRepository(NamespaceId namespace, RepositoryConfig repoConfig) - throws RemoteRepositoryValidationException { + throws RemoteRepositoryValidationException { accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(), NamespacePermission.UPDATE_REPOSITORY_METADATA); - RepositoryManager.validateConfig(secureStore, new SourceControlConfig(namespace, repoConfig, cConf)); + RepositoryManager.validateConfig(secureStore, + new SourceControlConfig(namespace, repoConfig, cConf)); } /** @@ -189,27 +206,31 @@ public void validateRepository(NamespaceId namespace, RepositoryConfig repoConfi * @param appRef {@link ApplicationReference} * @param commitMessage enforced commit message from user * @return {@link PushAppResponse} - * @throws NotFoundException if the application is not found or the repository config is not found - * @throws IOException if {@link ApplicationLifecycleService} fails to get the adminOwner store + * @throws NotFoundException if the application is not found or the repository config is not + * found + * @throws IOException if {@link ApplicationLifecycleService} fails to get the adminOwner + * store * @throws SourceControlException if {@link SourceControlOperationRunner} fails to push * @throws AuthenticationConfigException if the repository configuration authentication fails - * @throws NoChangesToPushException if there's no change of the application between namespace and linked repository + * @throws NoChangesToPushException if there's no change of the application between namespace + * and linked repository */ public PushAppResponse pushApp(ApplicationReference appRef, String commitMessage) - throws NotFoundException, IOException, NoChangesToPushException, AuthenticationConfigException { + throws NotFoundException, IOException, NoChangesToPushException, AuthenticationConfigException { accessEnforcer.enforce(appRef.getParent(), authenticationContext.getPrincipal(), NamespacePermission.WRITE_REPOSITORY); // TODO: CDAP-20396 RepositoryConfig is currently only accessible from the service layer // Need to fix it and avoid passing it in RepositoryManagerFactory RepositoryConfig repoConfig = getRepositoryMeta(appRef.getParent()).getConfig(); - + // AppLifecycleService already enforces ApplicationDetail Access ApplicationDetail appDetail = appLifecycleService.getLatestAppDetail(appRef, false); String committer = authenticationContext.getPrincipal().getName(); // TODO CDAP-20371 revisit and put correct Author and Committer, for now they are the same - CommitMeta commitMeta = new CommitMeta(committer, committer, System.currentTimeMillis(), commitMessage); + CommitMeta commitMeta = new CommitMeta(committer, committer, System.currentTimeMillis(), + commitMessage); LOG.info("Start to push app {} in namespace {} to linked repository by user {}", appRef.getApplication(), @@ -217,14 +238,14 @@ public PushAppResponse pushApp(ApplicationReference appRef, String commitMessage appLifecycleService.decodeUserId(authenticationContext)); PushAppResponse pushResponse = sourceControlOperationRunner.push( - new PushAppOperationRequest(appRef.getParent(), repoConfig, appDetail, commitMeta) + new PushAppOperationRequest(appRef.getParent(), repoConfig, appDetail, commitMeta) ); LOG.info("Successfully pushed app {} in namespace {} to linked repository by user {}", appRef.getApplication(), appRef.getParent(), appLifecycleService.decodeUserId(authenticationContext)); - + SourceControlMeta sourceControlMeta = new SourceControlMeta(pushResponse.getFileHash()); ApplicationId appId = appRef.app(appDetail.getAppVersion()); store.setAppSourceControlMeta(appId, sourceControlMeta); @@ -239,7 +260,8 @@ public PushAppResponse pushApp(ApplicationReference appRef, String commitMessage * @return {@link ApplicationRecord} of the deployed application. * @throws Exception when {@link ApplicationLifecycleService} fails to deploy. * @throws NoChangesToPullException if the fileHashes are the same - * @throws NotFoundException if the repository config is not found or the application in repository is not found + * @throws NotFoundException if the repository config is not found or the application in + * repository is not found * @throws SourceControlException if unexpected errors happen when pulling the application. * @throws AuthenticationConfigException if the repository configuration authentication fails */ @@ -251,11 +273,12 @@ public ApplicationRecord pullAndDeploy(ApplicationReference appRef) throws Excep accessEnforcer.enforce(appId, authenticationContext.getPrincipal(), StandardPermission.CREATE); accessEnforcer.enforce(appRef.getParent(), authenticationContext.getPrincipal(), NamespacePermission.READ_REPOSITORY); - + PullAppResponse> pullResponse = pullAndValidateApplication(appRef); AppRequest> appRequest = pullResponse.getAppRequest(); - SourceControlMeta sourceControlMeta = new SourceControlMeta(pullResponse.getApplicationFileHash()); + SourceControlMeta sourceControlMeta = new SourceControlMeta( + pullResponse.getApplicationFileHash()); LOG.info("Start to deploy app {} in namespace {} by user {}", appId.getApplication(), @@ -263,34 +286,40 @@ public ApplicationRecord pullAndDeploy(ApplicationReference appRef) throws Excep appLifecycleService.decodeUserId(authenticationContext)); ApplicationWithPrograms app = appLifecycleService.deployApp(appId, appRequest, - sourceControlMeta, x -> { }, false); - - LOG.info("Successfully deployed app {} in namespace {} from artifact {} with configuration {} and " - + "principal {}", app.getApplicationId().getApplication(), app.getApplicationId().getNamespace(), - app.getArtifactId(), appRequest.getConfig(), app.getOwnerPrincipal() + sourceControlMeta, x -> { + }, false); + + LOG.info( + "Successfully deployed app {} in namespace {} from artifact {} with configuration {} and " + + "principal {}", app.getApplicationId().getApplication(), + app.getApplicationId().getNamespace(), + app.getArtifactId(), appRequest.getConfig(), app.getOwnerPrincipal() ); return new ApplicationRecord( - ArtifactSummary.from(app.getArtifactId().toApiArtifactId()), - app.getApplicationId().getApplication(), - app.getApplicationId().getVersion(), - app.getSpecification().getDescription(), - Optional.ofNullable(app.getOwnerPrincipal()).map(KerberosPrincipalId::getPrincipal).orElse(null), - app.getChangeDetail(), app.getSourceControlMeta()); + ArtifactSummary.from(app.getArtifactId().toApiArtifactId()), + app.getApplicationId().getApplication(), + app.getApplicationId().getVersion(), + app.getSpecification().getDescription(), + Optional.ofNullable(app.getOwnerPrincipal()).map(KerberosPrincipalId::getPrincipal) + .orElse(null), + app.getChangeDetail(), app.getSourceControlMeta()); } /** - * Pull the application from repository, look up the fileHash in store and compare it with the cone in repository. + * Pull the application from repository, look up the fileHash in store and compare it with the + * cone in repository. * * @param appRef {@link ApplicationReference} to fetch the application with * @return {@link PullAppResponse} * @throws NoChangesToPullException if the fileHashes are the same - * @throws NotFoundException if the repository config is not found or the application in repository is not found + * @throws NotFoundException if the repository config is not found or the application in + * repository is not found * @throws SourceControlException if unexpected errors happen when pulling the application. * @throws AuthenticationConfigException if the repository configuration authentication fails */ private PullAppResponse> pullAndValidateApplication(ApplicationReference appRef) - throws NoChangesToPullException, NotFoundException, AuthenticationConfigException { + throws NoChangesToPullException, NotFoundException, AuthenticationConfigException { RepositoryConfig repoConfig = getRepositoryMeta(appRef.getParent()).getConfig(); SourceControlMeta latestMeta = store.getAppSourceControlMeta(appRef); PullAppResponse> pullResponse = sourceControlOperationRunner.pull( @@ -298,8 +327,9 @@ private PullAppResponse> pullAndValidateApplication(ApplicationReference appRe if (latestMeta != null && latestMeta.getFileHash().equals(pullResponse.getApplicationFileHash())) { - throw new NoChangesToPullException(String.format("Pipeline deployment was not successful because there is " - + "no new change for the pulled application: %s", appRef)); + throw new NoChangesToPullException( + String.format("Pipeline deployment was not successful because there is " + + "no new change for the pulled application: %s", appRef)); } return pullResponse; } @@ -308,15 +338,61 @@ private PullAppResponse> pullAndValidateApplication(ApplicationReference appRe * The method to list all applications found in linked repository. * * @return {@link RepositoryAppsResponse} - * @throws RepositoryNotFoundException if the repository config is not found + * @throws RepositoryNotFoundException if the repository config is not found * @throws AuthenticationConfigException if git auth config is not found - * @throws SourceControlException if {@link SourceControlOperationRunner} fails to list applications + * @throws SourceControlException if {@link SourceControlOperationRunner} fails to list + * applications */ public RepositoryAppsResponse listApps(NamespaceId namespace) throws NotFoundException, - AuthenticationConfigException { + AuthenticationConfigException { accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(), NamespacePermission.READ_REPOSITORY); RepositoryConfig repoConfig = getRepositoryMeta(namespace).getConfig(); return sourceControlOperationRunner.list(new NamespaceRepository(namespace, repoConfig)); } + + /** + * The method to push multiple applications in the same namespace to the linked repository. + * + * @param namespace {@link NamespaceId} from where the apps are to be pushed + * @param request {@link PushMultipleAppsRequest} containing the appIds and the commit + * message + * @return {@link OperationRun} of the operation to push the apps + * @throws NotFoundException when the repository or any of the apps are not found + */ + public OperationRun pushApps(NamespaceId namespace, PushMultipleAppsRequest request) + throws NotFoundException, IOException, TooManyRequestsException { + accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(), + NamespacePermission.WRITE_REPOSITORY); + RepositoryConfig repoConfig = getRepositoryMeta(namespace).getConfig(); + String principal = authenticationContext.getPrincipal().getName(); + CommitMeta commitMeta = new CommitMeta(principal, principal, System.currentTimeMillis(), + request.getCommitMessage()); + PushAppsRequest pushOpRequest = new PushAppsRequest(new HashSet<>(request.getApps()), + repoConfig, commitMeta); + return operationLifecycleManager.createPushOperation(namespace.getNamespace(), + RunIds.generate().getId(), pushOpRequest, principal); + } + + /** + * The method to pull multiple applications from the linked repository and deploy them in current + * namespace. + * + * @param namespace {@link NamespaceId} from where the apps are to be pushed + * @param request {@link PullMultipleAppsRequest} containing the appIds + * @return {@link OperationRun} of the operation to push the apps + * @throws NotFoundException when the repository or any of the apps are not found + */ + public OperationRun pullApps(NamespaceId namespace, PullMultipleAppsRequest request) + throws NotFoundException, IOException, TooManyRequestsException { + accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(), + NamespacePermission.READ_REPOSITORY); + RepositoryConfig repoConfig = getRepositoryMeta(namespace).getConfig(); + String principal = authenticationContext.getPrincipal().getName(); + PullAppsRequest pullOpRequest = new PullAppsRequest(new HashSet<>(request.getApps()), + repoConfig); + + return operationLifecycleManager.createPullOperation(namespace.getNamespace(), + RunIds.generate().getId(), pullOpRequest, principal); + } } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java index 4e9c7f64f156..f6176ea5b88c 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java @@ -17,12 +17,20 @@ package io.cdap.cdap.internal.operation; import com.google.inject.Inject; +import io.cdap.cdap.common.TooManyRequestsException; +import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest; +import io.cdap.cdap.internal.app.sourcecontrol.PushAppsRequest; import io.cdap.cdap.proto.id.OperationRunId; import io.cdap.cdap.proto.operation.OperationError; +import io.cdap.cdap.proto.operation.OperationMeta; +import io.cdap.cdap.proto.operation.OperationRun; +import io.cdap.cdap.proto.operation.OperationRunStatus; +import io.cdap.cdap.proto.operation.OperationType; import io.cdap.cdap.spi.data.StructuredTableContext; import io.cdap.cdap.spi.data.transaction.TransactionRunner; import io.cdap.cdap.spi.data.transaction.TransactionRunners; import java.io.IOException; +import java.time.Instant; import java.util.Collections; import java.util.function.Consumer; import org.slf4j.Logger; @@ -35,13 +43,16 @@ public class OperationLifecycleManager { private final TransactionRunner transactionRunner; private final OperationRuntime runtime; + private final OperationStatePublisher statePublisher; private static final Logger LOG = LoggerFactory.getLogger(OperationLifecycleManager.class); @Inject - OperationLifecycleManager(TransactionRunner transactionRunner, OperationRuntime runtime) { + OperationLifecycleManager(TransactionRunner transactionRunner, OperationRuntime runtime, + OperationStatePublisher statePublisher) { this.transactionRunner = transactionRunner; this.runtime = runtime; + this.statePublisher = statePublisher; } /** @@ -61,15 +72,15 @@ public boolean scanOperations(ScanOperationRunsRequest request, int txBatchSize, while (currentLimit > 0) { ScanOperationRunsRequest batchRequest = ScanOperationRunsRequest .builder(request) - .setScanAfter(lastKey) - .setLimit(Math.min(txBatchSize, currentLimit)) - .build(); + .setScanAfter(lastKey) + .setLimit(Math.min(txBatchSize, currentLimit)) + .build(); request = batchRequest; lastKey = TransactionRunners.run(transactionRunner, context -> { - return getOperationRunStore(context).scanOperations(batchRequest, consumer); - }, IOException.class, OperationRunNotFoundException.class); + return getOperationRunStore(context).scanOperations(batchRequest, consumer); + }, IOException.class, OperationRunNotFoundException.class); if (lastKey == null) { break; @@ -107,6 +118,61 @@ public OperationController startOperation(OperationRunDetail detail) { return runtime.run(detail); } + /** + * Create a new pull operation. Inserts the run in DB and then send TMS message. + */ + public OperationRun createPushOperation(String namespace, String runId, PushAppsRequest request, String principal) + throws IOException, TooManyRequestsException { + OperationMeta meta = OperationMeta.builder().setCreateTime(Instant.now()).build(); + OperationRunId operationRunId = new OperationRunId(namespace, runId); + OperationRun run = OperationRun.builder() + .setRunId(runId) + .setMetadata(meta) + .setStatus(OperationRunStatus.STARTING) + .setType(OperationType.PUSH_APPS) + .build(); + OperationRunDetail detail = OperationRunDetail.builder() + .setRun(run) + .setPrincipal(principal) + .setPushAppsRequest(request) + .setRunId(operationRunId) + .build(); + TransactionRunners.run(transactionRunner, context -> { + validateOnlyOneGitOperationRunning(namespace, context); + getOperationRunStore(context).createOperationRun(operationRunId, detail); + statePublisher.publishStarting(operationRunId); + }, TooManyRequestsException.class, IOException.class); + return run; + } + + /** + * Create a new pull operation. Inserts the run in DB and then send TMS message. + */ + public OperationRun createPullOperation(String namespace, String runId, PullAppsRequest request, String principal) + throws IOException, TooManyRequestsException { + OperationMeta meta = OperationMeta.builder().setCreateTime(Instant.now()).build(); + OperationRunId operationRunId = new OperationRunId(namespace, runId); + OperationRun run = OperationRun.builder() + .setRunId(runId) + .setMetadata(meta) + .setStatus(OperationRunStatus.STARTING) + .setType(OperationType.PULL_APPS) + .build(); + OperationRunDetail detail = OperationRunDetail.builder() + .setRun(run) + .setPrincipal(principal) + .setPullAppsRequest(request) + .setRunId(operationRunId) + .build(); + TransactionRunners.run(transactionRunner, context -> { + validateOnlyOneGitOperationRunning(namespace, context); + getOperationRunStore(context).createOperationRun(operationRunId, detail); + statePublisher.publishStarting(operationRunId); + }, TooManyRequestsException.class, IOException.class); + + return run; + } + /** * Initiate operation stop. It is the responsibility of the caller to validate state transition. @@ -140,6 +206,20 @@ public void isRunning(OperationRunDetail detail, OperationStatePublisher statePu } } + // Validate only one multi git operation running at a time + private void validateOnlyOneGitOperationRunning(String namespaceId, StructuredTableContext context) + throws TooManyRequestsException, OperationRunNotFoundException, IOException { + OperationRunStore store = getOperationRunStore(context); + OperationRunDetail existing = store.getLatestActiveOperation(namespaceId, OperationType.PULL_APPS); + if (existing != null) { + throw new TooManyRequestsException(String.format("Already running a bulk pull operation %s", existing.getRun())); + } + existing = store.getLatestActiveOperation(namespaceId, OperationType.PUSH_APPS); + if (existing != null) { + throw new TooManyRequestsException(String.format("Already running a bulk push operation %s", existing.getRun())); + } + } + private OperationRunStore getOperationRunStore(StructuredTableContext context) { return new OperationRunStore(context); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java index 27863fccc09c..ae3cbaf039da 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java @@ -58,15 +58,20 @@ public class OperationRunDetail { @Nullable private final PullAppsRequest pullAppsRequest; + @SerializedName("pushAppsRequest") + @Nullable + private final PushAppsRequest pushAppsRequest; + protected OperationRunDetail( OperationRunId runId, OperationRun run, - byte[] sourceId, @Nullable String principal, - @Nullable PullAppsRequest pullAppsRequest) { + @Nullable byte[] sourceId, @Nullable String principal, + @Nullable PullAppsRequest pullAppsRequest, @Nullable PushAppsRequest pushAppsRequest) { this.runId = runId; this.run = run; this.sourceId = sourceId; this.principal = principal; this.pullAppsRequest = pullAppsRequest; + this.pushAppsRequest = pushAppsRequest; } @Nullable diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunStore.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunStore.java index aeeee7e8a978..1fd427136a58 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunStore.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunStore.java @@ -27,6 +27,7 @@ import io.cdap.cdap.proto.operation.OperationResource; import io.cdap.cdap.proto.operation.OperationRun; import io.cdap.cdap.proto.operation.OperationRunStatus; +import io.cdap.cdap.proto.operation.OperationType; import io.cdap.cdap.spi.data.SortOrder; import io.cdap.cdap.spi.data.StructuredRow; import io.cdap.cdap.spi.data.StructuredTable; @@ -44,6 +45,7 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import javax.annotation.Nullable; @@ -260,6 +262,43 @@ public void scanOperationByStatus(OperationRunStatus status, } } + /** + * Get the latest active operation of a given type in a namespace. + * + * @param namespaceId to serach in + * @param type {@link OperationType} to search + */ + public OperationRunDetail getLatestActiveOperation(String namespaceId, OperationType type) + throws OperationRunNotFoundException, IOException { + AtomicReference