From 46bd91e6e747c4394ddfbdc590d935164175e1c8 Mon Sep 17 00:00:00 2001 From: samik Date: Wed, 15 Nov 2023 00:58:00 +0530 Subject: [PATCH] added new run state machine --- .../SourceControlManagementHttpHandler.java | 8 +-- .../SourceControlManagementService.java | 38 ++++--------- .../operation/OperationLifecycleService.java | 56 +++++++++++++++++++ .../operation/OperationRunDetail.java | 28 +++++++--- 4 files changed, 92 insertions(+), 38 deletions(-) diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java index 745b6aa0f4d9..0883649e9d03 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java @@ -35,6 +35,7 @@ import io.cdap.cdap.proto.id.ApplicationReference; import io.cdap.cdap.proto.id.NamespaceId; import io.cdap.cdap.proto.operation.OperationMeta; +import io.cdap.cdap.proto.operation.OperationRun; import io.cdap.cdap.proto.sourcecontrol.PullMultipleAppsRequest; import io.cdap.cdap.proto.sourcecontrol.PushAppRequest; import io.cdap.cdap.proto.sourcecontrol.PushMultipleAppsRequest; @@ -50,7 +51,6 @@ import io.cdap.http.HttpResponder; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; - import javax.ws.rs.DELETE; import javax.ws.rs.GET; import javax.ws.rs.POST; @@ -225,7 +225,7 @@ public void pushApps(FullHttpRequest request, HttpResponder responder, throw new BadRequestException("Please specify commit message in the request body."); } - OperationMeta operationMeta = sourceControlService.pushApps(namespace, appsRequest); + OperationRun operationMeta = sourceControlService.pushApps(namespace, appsRequest); responder.sendJson(HttpResponseStatus.OK, GSON.toJson(operationMeta)); } @@ -269,8 +269,8 @@ public void pullApps(FullHttpRequest request, HttpResponder responder, throw new BadRequestException("Invalid request body."); } - OperationMeta operationMeta = sourceControlService.pullApps(namespace, appsRequest); - responder.sendJson(HttpResponseStatus.OK, GSON.toJson(operationMeta)); + OperationRun operationRun = sourceControlService.pullApps(namespace, appsRequest); + responder.sendJson(HttpResponseStatus.OK, GSON.toJson(operationRun)); } private PushAppRequest validateAndGetAppsRequest(FullHttpRequest request) throws BadRequestException { diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java index 8cf12eb675cb..3fefad7fc66a 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/SourceControlManagementService.java @@ -16,7 +16,6 @@ package io.cdap.cdap.internal.app.services; -import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; import io.cdap.cdap.api.artifact.ArtifactSummary; import io.cdap.cdap.api.security.store.SecureStore; @@ -27,12 +26,10 @@ import io.cdap.cdap.common.app.RunIds; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.internal.app.deploy.pipeline.ApplicationWithPrograms; -import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperation; import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest; import io.cdap.cdap.internal.app.sourcecontrol.PushAppsRequest; import io.cdap.cdap.internal.operation.OperationException; import io.cdap.cdap.internal.operation.OperationLifecycleService; -import io.cdap.cdap.internal.operation.SynchronousLongRunningOperationContext; import io.cdap.cdap.proto.ApplicationDetail; import io.cdap.cdap.proto.ApplicationRecord; import io.cdap.cdap.proto.artifact.AppRequest; @@ -41,8 +38,7 @@ import io.cdap.cdap.proto.id.KerberosPrincipalId; import io.cdap.cdap.proto.id.NamespaceId; import io.cdap.cdap.proto.operation.OperationMeta; -import io.cdap.cdap.proto.operation.OperationResource; -import io.cdap.cdap.proto.operation.OperationType; +import io.cdap.cdap.proto.operation.OperationRun; import io.cdap.cdap.proto.security.NamespacePermission; import io.cdap.cdap.proto.security.StandardPermission; import io.cdap.cdap.proto.sourcecontrol.PullMultipleAppsRequest; @@ -76,7 +72,6 @@ import java.io.IOException; import java.util.HashSet; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -351,17 +346,16 @@ public RepositoryAppsResponse listApps(NamespaceId namespace) throws NotFoundExc * @throws ExecutionException when the push operation execution fails * @throws OperationException for any exception occuring in the operation logic */ - public OperationMeta pushApps(NamespaceId namespace, PushMultipleAppsRequest request) - throws NoChangesToPushException, NotFoundException, InterruptedException, - ExecutionException, OperationException { + public OperationRun pushApps(NamespaceId namespace, PushMultipleAppsRequest request) + throws NotFoundException, IOException { accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(), NamespacePermission.WRITE_REPOSITORY); RepositoryConfig repoConfig = getRepositoryMeta(namespace).getConfig(); String principal = authenticationContext.getPrincipal().getName(); - CommitMeta commitMeta = new CommitMeta(principal, principal, System.currentTimeMillis(), request.getCommitMessage()) - ; + CommitMeta commitMeta = new CommitMeta(principal, principal, System.currentTimeMillis(), + request.getCommitMessage()); PushAppsRequest pushOpRequest = new PushAppsRequest(new HashSet<>(request.getApps()), repoConfig, commitMeta); - operationLifecycleService.startOperation() + return operationLifecycleService.createPushOperation(namespace.getNamespace(), RunIds.generate().getId(), pushOpRequest, principal); } /** @@ -376,24 +370,14 @@ public OperationMeta pushApps(NamespaceId namespace, PushMultipleAppsRequest req * @throws ExecutionException when the push operation execution fails * @throws OperationException for any exception occuring in the operation logic */ - public OperationMeta pullApps(NamespaceId namespace, PullMultipleAppsRequest request) - throws NotFoundException, InterruptedException, - ExecutionException, OperationException { + public OperationRun pullApps(NamespaceId namespace, PullMultipleAppsRequest request) + throws NotFoundException, IOException { accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(), NamespacePermission.READ_REPOSITORY); RepositoryConfig repoConfig = getRepositoryMeta(namespace).getConfig(); - PullAppsOperation pullOp = pullAppsOperationFactory.create(new PullAppsRequest( - new HashSet<>(request.getApps()), - repoConfig - )); - - SynchronousLongRunningOperationContext operationContext = new SynchronousLongRunningOperationContext( - namespace.getNamespace(), - OperationType.PULL_APPS - ); - ListenableFuture> result = pullOp.run(operationContext); - result.get(); + String principal = authenticationContext.getPrincipal().getName(); + PullAppsRequest pullOpRequest = new PullAppsRequest(new HashSet<>(request.getApps()), repoConfig); - return operationContext.getOperationMeta(); + return operationLifecycleService.createPullOperation(namespace.getNamespace(), RunIds.generate().getId(), pullOpRequest, principal); } } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleService.java index febad517f029..f8cdaca10a3a 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleService.java @@ -17,11 +17,19 @@ package io.cdap.cdap.internal.operation; import com.google.inject.Inject; +import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest; +import io.cdap.cdap.internal.app.sourcecontrol.PushAppsRequest; +import io.cdap.cdap.proto.id.OperationRunId; +import io.cdap.cdap.proto.operation.OperationMeta; +import io.cdap.cdap.proto.operation.OperationRun; +import io.cdap.cdap.proto.operation.OperationRunStatus; +import io.cdap.cdap.proto.operation.OperationType; import io.cdap.cdap.proto.operation.OperationError; import io.cdap.cdap.spi.data.StructuredTableContext; import io.cdap.cdap.spi.data.transaction.TransactionRunner; import io.cdap.cdap.spi.data.transaction.TransactionRunners; import java.io.IOException; +import java.time.Instant; import java.util.Collections; import java.util.function.Consumer; import org.slf4j.Logger; @@ -34,6 +42,7 @@ public class OperationLifecycleManager { private final TransactionRunner transactionRunner; private final OperationRuntime runtime; + private final OperationStatePublisher statePublisher; private static final Logger LOG = LoggerFactory.getLogger(OperationLifecycleManager.class); @@ -41,6 +50,7 @@ public class OperationLifecycleManager { OperationLifecycleManager(TransactionRunner transactionRunner, OperationRuntime runtime) { this.transactionRunner = transactionRunner; this.runtime = runtime; + this.statePublisher = statePublisher; } /** @@ -87,6 +97,52 @@ public OperationController startOperation(OperationRunDetail detail) { return runtime.run(detail); } + public OperationRun createPushOperation(String namespace, String runId, PushAppsRequest request, String principal) throws IOException { + OperationMeta meta = OperationMeta.builder().setCreateTime(Instant.now()).build(); + OperationRunId operationRunId = new OperationRunId(namespace, runId); + OperationRun run = OperationRun.builder() + .setRunId(runId) + .setMetadata(meta) + .setStatus(OperationRunStatus.STARTING) + .setType(OperationType.PUSH_APPS) + .build(); + OperationRunDetail detail = OperationRunDetail.builder() + .setRun(run) + .setPrincipal(principal) + .setPushAppsRequest(request) + .setRunId(operationRunId) + .build(); + TransactionRunners.run(transactionRunner, context -> { + getOperationRunStore(context).createOperationRun(operationRunId, detail); + }, IOException.class); + + statePublisher.publishStarting(operationRunId); + return run; + } + + public OperationRun createPullOperation(String namespace, String runId, PullAppsRequest request, String principal) throws IOException { + OperationMeta meta = OperationMeta.builder().setCreateTime(Instant.now()).build(); + OperationRunId operationRunId = new OperationRunId(namespace, runId); + OperationRun run = OperationRun.builder() + .setRunId(runId) + .setMetadata(meta) + .setStatus(OperationRunStatus.STARTING) + .setType(OperationType.PULL_APPS) + .build(); + OperationRunDetail detail = OperationRunDetail.builder() + .setRun(run) + .setPrincipal(principal) + .setPullAppsRequest(request) + .setRunId(operationRunId) + .build(); + TransactionRunners.run(transactionRunner, context -> { + getOperationRunStore(context).createOperationRun(operationRunId, detail); + }, IOException.class); + + statePublisher.publishStarting(operationRunId); + return run; + } + /** * Initiate operation stop. It is the responsibility of the caller to validate state transition. diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java index e2780790bd24..2fa8e757c422 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java @@ -19,6 +19,7 @@ import com.google.common.base.Objects; import com.google.gson.annotations.SerializedName; import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest; +import io.cdap.cdap.internal.app.sourcecontrol.PushAppsRequest; import io.cdap.cdap.proto.id.OperationRunId; import io.cdap.cdap.proto.operation.OperationRun; import java.util.Arrays; @@ -57,15 +58,20 @@ public class OperationRunDetail { @Nullable private final PullAppsRequest pullAppsRequest; + @SerializedName("pushAppsRequest") + @Nullable + private final PushAppsRequest pushAppsRequest; + protected OperationRunDetail( OperationRunId runId, OperationRun run, - byte[] sourceId, @Nullable String principal, - @Nullable PullAppsRequest pullAppsRequest) { + @Nullable byte[] sourceId, @Nullable String principal, + @Nullable PullAppsRequest pullAppsRequest, @Nullable PushAppsRequest pushAppsRequest) { this.runId = runId; this.run = run; this.sourceId = sourceId; this.principal = principal; this.pullAppsRequest = pullAppsRequest; + this.pushAppsRequest = pushAppsRequest; } @Nullable @@ -82,6 +88,10 @@ public PullAppsRequest getPullAppsRequest() { return pullAppsRequest; } + public PushAppsRequest getPushAppsRequest() { + return pushAppsRequest; + } + public OperationRun getRun() { return run; } @@ -138,6 +148,7 @@ public static class Builder { protected byte[] sourceId; protected String principal; protected PullAppsRequest pullAppsRequest; + protected PushAppsRequest pushAppsRequest; protected Builder() { } @@ -148,6 +159,7 @@ protected Builder(OperationRunDetail detail) { run = detail.getRun(); runId = detail.getRunId(); pullAppsRequest = detail.getPullAppsRequest(); + pushAppsRequest = detail.getPushAppsRequest(); } public Builder setSourceId(byte[] sourceId) { @@ -176,6 +188,11 @@ public Builder setPullAppsRequest(PullAppsRequest pullAppsRequest) { return this; } + public Builder setPushAppsRequest(PushAppsRequest pushAppsRequest) { + this.pushAppsRequest = pushAppsRequest; + return this; + } + /** * Validates input and returns a OperationRunDetail. */ @@ -183,9 +200,6 @@ public OperationRunDetail build() { if (runId == null) { throw new IllegalArgumentException("run id must be specified."); } - if (sourceId == null) { - throw new IllegalArgumentException("Operation run source id must be specified."); - } if (run == null) { throw new IllegalArgumentException("Operation run must be specified."); } @@ -194,12 +208,12 @@ public OperationRunDetail build() { throw new IllegalArgumentException("Exactly one request type can be non-null"); } - return new OperationRunDetail(runId, run, sourceId, principal, pullAppsRequest); + return new OperationRunDetail(runId, run, sourceId, principal, pullAppsRequest, pushAppsRequest); } private boolean validateRequests() { // validate only one of the request is non-null - return Stream.of(pullAppsRequest).filter(java.util.Objects::nonNull).count() == 1; + return Stream.of(pullAppsRequest, pullAppsRequest).filter(java.util.Objects::nonNull).count() == 1; } }