Skip to content

Commit

Permalink
added new run state machine
Browse files Browse the repository at this point in the history
  • Loading branch information
samdgupi committed Nov 16, 2023
1 parent e0c4c25 commit 46bd91e
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.cdap.cdap.proto.id.ApplicationReference;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.operation.OperationMeta;
import io.cdap.cdap.proto.operation.OperationRun;
import io.cdap.cdap.proto.sourcecontrol.PullMultipleAppsRequest;
import io.cdap.cdap.proto.sourcecontrol.PushAppRequest;
import io.cdap.cdap.proto.sourcecontrol.PushMultipleAppsRequest;
Expand All @@ -50,7 +51,6 @@
import io.cdap.http.HttpResponder;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;

import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
Expand Down Expand Up @@ -225,7 +225,7 @@ public void pushApps(FullHttpRequest request, HttpResponder responder,
throw new BadRequestException("Please specify commit message in the request body.");
}

OperationMeta operationMeta = sourceControlService.pushApps(namespace, appsRequest);
OperationRun operationMeta = sourceControlService.pushApps(namespace, appsRequest);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(operationMeta));
}

Expand Down Expand Up @@ -269,8 +269,8 @@ public void pullApps(FullHttpRequest request, HttpResponder responder,
throw new BadRequestException("Invalid request body.");
}

OperationMeta operationMeta = sourceControlService.pullApps(namespace, appsRequest);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(operationMeta));
OperationRun operationRun = sourceControlService.pullApps(namespace, appsRequest);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(operationRun));
}

private PushAppRequest validateAndGetAppsRequest(FullHttpRequest request) throws BadRequestException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.cdap.cdap.internal.app.services;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import io.cdap.cdap.api.artifact.ArtifactSummary;
import io.cdap.cdap.api.security.store.SecureStore;
Expand All @@ -27,12 +26,10 @@
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.internal.app.deploy.pipeline.ApplicationWithPrograms;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperation;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest;
import io.cdap.cdap.internal.app.sourcecontrol.PushAppsRequest;
import io.cdap.cdap.internal.operation.OperationException;
import io.cdap.cdap.internal.operation.OperationLifecycleService;
import io.cdap.cdap.internal.operation.SynchronousLongRunningOperationContext;
import io.cdap.cdap.proto.ApplicationDetail;
import io.cdap.cdap.proto.ApplicationRecord;
import io.cdap.cdap.proto.artifact.AppRequest;
Expand All @@ -41,8 +38,7 @@
import io.cdap.cdap.proto.id.KerberosPrincipalId;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.operation.OperationMeta;
import io.cdap.cdap.proto.operation.OperationResource;
import io.cdap.cdap.proto.operation.OperationType;
import io.cdap.cdap.proto.operation.OperationRun;
import io.cdap.cdap.proto.security.NamespacePermission;
import io.cdap.cdap.proto.security.StandardPermission;
import io.cdap.cdap.proto.sourcecontrol.PullMultipleAppsRequest;
Expand Down Expand Up @@ -76,7 +72,6 @@
import java.io.IOException;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -351,17 +346,16 @@ public RepositoryAppsResponse listApps(NamespaceId namespace) throws NotFoundExc
* @throws ExecutionException when the push operation execution fails
* @throws OperationException for any exception occuring in the operation logic
*/
public OperationMeta pushApps(NamespaceId namespace, PushMultipleAppsRequest request)
throws NoChangesToPushException, NotFoundException, InterruptedException,
ExecutionException, OperationException {
public OperationRun pushApps(NamespaceId namespace, PushMultipleAppsRequest request)
throws NotFoundException, IOException {
accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(),
NamespacePermission.WRITE_REPOSITORY);
RepositoryConfig repoConfig = getRepositoryMeta(namespace).getConfig();
String principal = authenticationContext.getPrincipal().getName();
CommitMeta commitMeta = new CommitMeta(principal, principal, System.currentTimeMillis(), request.getCommitMessage())
;
CommitMeta commitMeta = new CommitMeta(principal, principal, System.currentTimeMillis(),
request.getCommitMessage());
PushAppsRequest pushOpRequest = new PushAppsRequest(new HashSet<>(request.getApps()), repoConfig, commitMeta);
operationLifecycleService.startOperation()
return operationLifecycleService.createPushOperation(namespace.getNamespace(), RunIds.generate().getId(), pushOpRequest, principal);
}

/**
Expand All @@ -376,24 +370,14 @@ public OperationMeta pushApps(NamespaceId namespace, PushMultipleAppsRequest req
* @throws ExecutionException when the push operation execution fails
* @throws OperationException for any exception occuring in the operation logic
*/
public OperationMeta pullApps(NamespaceId namespace, PullMultipleAppsRequest request)
throws NotFoundException, InterruptedException,
ExecutionException, OperationException {
public OperationRun pullApps(NamespaceId namespace, PullMultipleAppsRequest request)
throws NotFoundException, IOException {
accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(),
NamespacePermission.READ_REPOSITORY);
RepositoryConfig repoConfig = getRepositoryMeta(namespace).getConfig();
PullAppsOperation pullOp = pullAppsOperationFactory.create(new PullAppsRequest(
new HashSet<>(request.getApps()),
repoConfig
));

SynchronousLongRunningOperationContext operationContext = new SynchronousLongRunningOperationContext(
namespace.getNamespace(),
OperationType.PULL_APPS
);
ListenableFuture<Set<OperationResource>> result = pullOp.run(operationContext);
result.get();
String principal = authenticationContext.getPrincipal().getName();
PullAppsRequest pullOpRequest = new PullAppsRequest(new HashSet<>(request.getApps()), repoConfig);

return operationContext.getOperationMeta();
return operationLifecycleService.createPullOperation(namespace.getNamespace(), RunIds.generate().getId(), pullOpRequest, principal);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,19 @@
package io.cdap.cdap.internal.operation;

import com.google.inject.Inject;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest;
import io.cdap.cdap.internal.app.sourcecontrol.PushAppsRequest;
import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operation.OperationMeta;
import io.cdap.cdap.proto.operation.OperationRun;
import io.cdap.cdap.proto.operation.OperationRunStatus;
import io.cdap.cdap.proto.operation.OperationType;
import io.cdap.cdap.proto.operation.OperationError;
import io.cdap.cdap.spi.data.StructuredTableContext;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.function.Consumer;
import org.slf4j.Logger;
Expand All @@ -34,13 +42,15 @@ public class OperationLifecycleManager {

private final TransactionRunner transactionRunner;
private final OperationRuntime runtime;
private final OperationStatePublisher statePublisher;
private static final Logger LOG = LoggerFactory.getLogger(OperationLifecycleManager.class);


@Inject
OperationLifecycleManager(TransactionRunner transactionRunner, OperationRuntime runtime) {
this.transactionRunner = transactionRunner;
this.runtime = runtime;
this.statePublisher = statePublisher;
}

/**
Expand Down Expand Up @@ -87,6 +97,52 @@ public OperationController startOperation(OperationRunDetail detail) {
return runtime.run(detail);
}

public OperationRun createPushOperation(String namespace, String runId, PushAppsRequest request, String principal) throws IOException {
OperationMeta meta = OperationMeta.builder().setCreateTime(Instant.now()).build();
OperationRunId operationRunId = new OperationRunId(namespace, runId);
OperationRun run = OperationRun.builder()
.setRunId(runId)
.setMetadata(meta)
.setStatus(OperationRunStatus.STARTING)
.setType(OperationType.PUSH_APPS)
.build();
OperationRunDetail detail = OperationRunDetail.builder()
.setRun(run)
.setPrincipal(principal)
.setPushAppsRequest(request)
.setRunId(operationRunId)
.build();
TransactionRunners.run(transactionRunner, context -> {
getOperationRunStore(context).createOperationRun(operationRunId, detail);
}, IOException.class);

statePublisher.publishStarting(operationRunId);
return run;
}

public OperationRun createPullOperation(String namespace, String runId, PullAppsRequest request, String principal) throws IOException {
OperationMeta meta = OperationMeta.builder().setCreateTime(Instant.now()).build();
OperationRunId operationRunId = new OperationRunId(namespace, runId);
OperationRun run = OperationRun.builder()
.setRunId(runId)
.setMetadata(meta)
.setStatus(OperationRunStatus.STARTING)
.setType(OperationType.PULL_APPS)
.build();
OperationRunDetail detail = OperationRunDetail.builder()
.setRun(run)
.setPrincipal(principal)
.setPullAppsRequest(request)
.setRunId(operationRunId)
.build();
TransactionRunners.run(transactionRunner, context -> {
getOperationRunStore(context).createOperationRun(operationRunId, detail);
}, IOException.class);

statePublisher.publishStarting(operationRunId);
return run;
}


/**
* Initiate operation stop. It is the responsibility of the caller to validate state transition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.base.Objects;
import com.google.gson.annotations.SerializedName;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest;
import io.cdap.cdap.internal.app.sourcecontrol.PushAppsRequest;
import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operation.OperationRun;
import java.util.Arrays;
Expand Down Expand Up @@ -57,15 +58,20 @@ public class OperationRunDetail {
@Nullable
private final PullAppsRequest pullAppsRequest;

@SerializedName("pushAppsRequest")
@Nullable
private final PushAppsRequest pushAppsRequest;

protected OperationRunDetail(
OperationRunId runId, OperationRun run,
byte[] sourceId, @Nullable String principal,
@Nullable PullAppsRequest pullAppsRequest) {
@Nullable byte[] sourceId, @Nullable String principal,
@Nullable PullAppsRequest pullAppsRequest, @Nullable PushAppsRequest pushAppsRequest) {
this.runId = runId;
this.run = run;
this.sourceId = sourceId;
this.principal = principal;
this.pullAppsRequest = pullAppsRequest;
this.pushAppsRequest = pushAppsRequest;
}

@Nullable
Expand All @@ -82,6 +88,10 @@ public PullAppsRequest getPullAppsRequest() {
return pullAppsRequest;
}

public PushAppsRequest getPushAppsRequest() {
return pushAppsRequest;
}

public OperationRun getRun() {
return run;
}
Expand Down Expand Up @@ -138,6 +148,7 @@ public static class Builder {
protected byte[] sourceId;
protected String principal;
protected PullAppsRequest pullAppsRequest;
protected PushAppsRequest pushAppsRequest;

protected Builder() {
}
Expand All @@ -148,6 +159,7 @@ protected Builder(OperationRunDetail detail) {
run = detail.getRun();
runId = detail.getRunId();
pullAppsRequest = detail.getPullAppsRequest();
pushAppsRequest = detail.getPushAppsRequest();
}

public Builder setSourceId(byte[] sourceId) {
Expand Down Expand Up @@ -176,16 +188,18 @@ public Builder setPullAppsRequest(PullAppsRequest pullAppsRequest) {
return this;
}

public Builder setPushAppsRequest(PushAppsRequest pushAppsRequest) {
this.pushAppsRequest = pushAppsRequest;
return this;
}

/**
* Validates input and returns a OperationRunDetail.
*/
public OperationRunDetail build() {
if (runId == null) {
throw new IllegalArgumentException("run id must be specified.");
}
if (sourceId == null) {
throw new IllegalArgumentException("Operation run source id must be specified.");
}
if (run == null) {
throw new IllegalArgumentException("Operation run must be specified.");
}
Expand All @@ -194,12 +208,12 @@ public OperationRunDetail build() {
throw new IllegalArgumentException("Exactly one request type can be non-null");
}

return new OperationRunDetail(runId, run, sourceId, principal, pullAppsRequest);
return new OperationRunDetail(runId, run, sourceId, principal, pullAppsRequest, pushAppsRequest);
}

private boolean validateRequests() {
// validate only one of the request is non-null
return Stream.of(pullAppsRequest).filter(java.util.Objects::nonNull).count() == 1;
return Stream.of(pullAppsRequest, pullAppsRequest).filter(java.util.Objects::nonNull).count() == 1;
}

}
Expand Down

0 comments on commit 46bd91e

Please sign in to comment.