Skip to content

Commit

Permalink
add multi push and pull apis
Browse files Browse the repository at this point in the history
  • Loading branch information
GnsP committed Oct 31, 2023
1 parent 9db1dba commit 2918ba1
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
import io.cdap.cdap.proto.ApplicationRecord;
import io.cdap.cdap.proto.id.ApplicationReference;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.operation.OperationMeta;
import io.cdap.cdap.proto.sourcecontrol.PullMultipleAppsRequest;
import io.cdap.cdap.proto.sourcecontrol.PushAppRequest;
import io.cdap.cdap.proto.sourcecontrol.PushMultipleAppsRequest;
import io.cdap.cdap.proto.sourcecontrol.RemoteRepositoryValidationException;
import io.cdap.cdap.proto.sourcecontrol.RepositoryConfigRequest;
import io.cdap.cdap.proto.sourcecontrol.RepositoryConfigValidationException;
Expand Down Expand Up @@ -183,6 +186,49 @@ public void pushApp(FullHttpRequest request, HttpResponder responder,
}
}

/**
* Pushes application configs of requested applications to linked repository in Json format.
* It expects a post body that has a list of application ids and an optional commit message
* E.g.
*
* <pre>
* {@code
* {
* "appIds": ["app_id_1", "app_id_2"],
* "commitMessage": "pushed application XYZ"
* }
* }
*
* </pre>
* The response will be a {@link PushAppsResponse} object, which encapsulates the application name,
* version and fileHash.
*/
@POST
@Path("/apps/push")
public void pushApps(FullHttpRequest request, HttpResponder responder,
@PathParam("namespace-id") String namespaceId) throws Exception {
checkSourceControlFeatureFlag();
NamespaceId namespace = validateNamespaceId(namespaceId);

PushMultipleAppsRequest appsRequest;
try {
appsRequest = parseBody(request, PushMultipleAppsRequest.class);
} catch (JsonSyntaxException e) {
throw new BadRequestException("Invalid request body.", e);
}

if (appsRequest == null) {
throw new BadRequestException("Invalid request body.");
}

if (Strings.isNullOrEmpty(appsRequest.getCommitMessage())) {
throw new BadRequestException("Please specify commit message in the request body.");
}

OperationMeta operationMeta = sourceControlService.pushApps(namespace, appsRequest);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(operationMeta));
}

/**
* Pull the requested application from linked repository and deploy in current namespace.
*/
Expand All @@ -202,6 +248,31 @@ public void pullApp(FullHttpRequest request, HttpResponder responder,
}
}

/**
* Pull the requested applications from linked repository and deploy in current namespace.
*/
@POST
@Path("/apps/pull")
public void pullApps(FullHttpRequest request, HttpResponder responder,
@PathParam("namespace-id") String namespaceId) throws Exception {
checkSourceControlFeatureFlag();
NamespaceId namespace = validateNamespaceId(namespaceId);

PullMultipleAppsRequest appsRequest;
try {
appsRequest = parseBody(request, PullMultipleAppsRequest.class);
} catch (JsonSyntaxException e) {
throw new BadRequestException("Invalid request body.", e);
}

if (appsRequest == null) {
throw new BadRequestException("Invalid request body.");
}

OperationMeta operationMeta = sourceControlService.pullApps(namespace, appsRequest);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(operationMeta));
}

private PushAppRequest validateAndGetAppsRequest(FullHttpRequest request) throws BadRequestException {
PushAppRequest appRequest;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.cdap.internal.app.services;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import io.cdap.cdap.api.artifact.ArtifactSummary;
import io.cdap.cdap.api.security.store.SecureStore;
Expand All @@ -26,15 +27,28 @@
import io.cdap.cdap.common.app.RunIds;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.internal.app.deploy.pipeline.ApplicationWithPrograms;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperation;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperationFactory;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest;
import io.cdap.cdap.internal.app.sourcecontrol.PushAppsOperation;
import io.cdap.cdap.internal.app.sourcecontrol.PushAppsOperationFactory;
import io.cdap.cdap.internal.app.sourcecontrol.PushAppsRequest;
import io.cdap.cdap.internal.operation.OperationException;
import io.cdap.cdap.internal.operation.SynchronousLongRunningOperationContext;
import io.cdap.cdap.proto.ApplicationDetail;
import io.cdap.cdap.proto.ApplicationRecord;
import io.cdap.cdap.proto.artifact.AppRequest;
import io.cdap.cdap.proto.id.ApplicationId;
import io.cdap.cdap.proto.id.ApplicationReference;
import io.cdap.cdap.proto.id.KerberosPrincipalId;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.operation.OperationMeta;
import io.cdap.cdap.proto.operation.OperationResource;
import io.cdap.cdap.proto.operation.OperationType;
import io.cdap.cdap.proto.security.NamespacePermission;
import io.cdap.cdap.proto.security.StandardPermission;
import io.cdap.cdap.proto.sourcecontrol.PullMultipleAppsRequest;
import io.cdap.cdap.proto.sourcecontrol.PushMultipleAppsRequest;
import io.cdap.cdap.proto.sourcecontrol.RemoteRepositoryValidationException;
import io.cdap.cdap.proto.sourcecontrol.RepositoryConfig;
import io.cdap.cdap.proto.sourcecontrol.RepositoryMeta;
Expand Down Expand Up @@ -62,7 +76,10 @@
import io.cdap.cdap.store.NamespaceTable;
import io.cdap.cdap.store.RepositoryTable;
import java.io.IOException;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -79,6 +96,8 @@ public class SourceControlManagementService {
private final SourceControlOperationRunner sourceControlOperationRunner;
private final ApplicationLifecycleService appLifecycleService;
private final Store store;
private final PushAppsOperationFactory pushAppsOperationFactory;
private final PullAppsOperationFactory pullAppsOperationFactory;
private static final Logger LOG = LoggerFactory.getLogger(SourceControlManagementService.class);


Expand All @@ -93,7 +112,8 @@ public SourceControlManagementService(CConfiguration cConf,
AuthenticationContext authenticationContext,
SourceControlOperationRunner sourceControlOperationRunner,
ApplicationLifecycleService applicationLifecycleService,
Store store) {
Store store, PushAppsOperationFactory pushAppsOperationFactory,
PullAppsOperationFactory pullAppsOperationFactory) {
this.cConf = cConf;
this.secureStore = secureStore;
this.transactionRunner = transactionRunner;
Expand All @@ -102,6 +122,8 @@ public SourceControlManagementService(CConfiguration cConf,
this.sourceControlOperationRunner = sourceControlOperationRunner;
this.appLifecycleService = applicationLifecycleService;
this.store = store;
this.pushAppsOperationFactory = pushAppsOperationFactory;
this.pullAppsOperationFactory = pullAppsOperationFactory;
}

private RepositoryTable getRepositoryTable(StructuredTableContext context) throws TableNotFoundException {
Expand Down Expand Up @@ -203,7 +225,7 @@ public PushAppResponse pushApp(ApplicationReference appRef, String commitMessage
// TODO: CDAP-20396 RepositoryConfig is currently only accessible from the service layer
// Need to fix it and avoid passing it in RepositoryManagerFactory
RepositoryConfig repoConfig = getRepositoryMeta(appRef.getParent()).getConfig();

// AppLifecycleService already enforces ApplicationDetail Access
ApplicationDetail appDetail = appLifecycleService.getLatestAppDetail(appRef, false);

Expand All @@ -224,7 +246,7 @@ public PushAppResponse pushApp(ApplicationReference appRef, String commitMessage
appRef.getApplication(),
appRef.getParent(),
appLifecycleService.decodeUserId(authenticationContext));

SourceControlMeta sourceControlMeta = new SourceControlMeta(pushResponse.getFileHash());
ApplicationId appId = appRef.app(appDetail.getAppVersion());
store.setAppSourceControlMeta(appId, sourceControlMeta);
Expand All @@ -251,7 +273,7 @@ public ApplicationRecord pullAndDeploy(ApplicationReference appRef) throws Excep
accessEnforcer.enforce(appId, authenticationContext.getPrincipal(), StandardPermission.CREATE);
accessEnforcer.enforce(appRef.getParent(), authenticationContext.getPrincipal(),
NamespacePermission.READ_REPOSITORY);

PullAppResponse<?> pullResponse = pullAndValidateApplication(appRef);

AppRequest<?> appRequest = pullResponse.getAppRequest();
Expand Down Expand Up @@ -319,4 +341,74 @@ public RepositoryAppsResponse listApps(NamespaceId namespace) throws NotFoundExc
RepositoryConfig repoConfig = getRepositoryMeta(namespace).getConfig();
return sourceControlOperationRunner.list(new NamespaceRepository(namespace, repoConfig));
}

/**
* The method to push multiple applications in the same namespace to the linked repository.
*
* @param namespace {@link NamespaceId} from where the apps are to be pushed
* @param request {@link PushMultipleAppsRequest} containing the appIds and the commit message
*
* @return {@link OperationMeta} of the operation to push the apps
* @throws NoChangesToPushException when none of the apps have changed since last commit
* @throws NotFoundException when the repository or any of the apps are not found
* @throws InterruptedException when the push operation is inturrupted
* @throws ExecutionException when the push operation execution fails
* @throws OperationException for any exception occuring in the operation logic
*/
public OperationMeta pushApps(NamespaceId namespace, PushMultipleAppsRequest request)
throws NoChangesToPushException, NotFoundException, InterruptedException,
ExecutionException, OperationException {
accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(),
NamespacePermission.WRITE_REPOSITORY);
RepositoryConfig repoConfig = getRepositoryMeta(namespace).getConfig();
String committer = authenticationContext.getPrincipal().getName();
PushAppsOperation pushOp = pushAppsOperationFactory.create(new PushAppsRequest(
new HashSet<>(request.getApps()),
repoConfig,
new CommitMeta(committer, committer, System.currentTimeMillis(), request.getCommitMessage())
));

SynchronousLongRunningOperationContext operationContext = new SynchronousLongRunningOperationContext(
namespace.getNamespace(),
OperationType.PUSH_APPS
);
ListenableFuture<Set<OperationResource>> result = pushOp.run(operationContext);
result.get();

return operationContext.getOperationMeta();
}

/**
* The method to pull multiple applications from the linked repository and deploy them in current namespace.
*
* @param namespace {@link NamespaceId} from where the apps are to be pushed
* @param request {@link PullMultipleAppsRequest} containing the appIds
*
* @return {@link OperationMeta} of the operation to push the apps
* @throws NotFoundException when the repository or any of the apps are not found
* @throws InterruptedException when the push operation is inturrupted
* @throws ExecutionException when the push operation execution fails
* @throws OperationException for any exception occuring in the operation logic
*/
public OperationMeta pullApps(NamespaceId namespace, PullMultipleAppsRequest request)
throws NotFoundException, InterruptedException,
ExecutionException, OperationException {
accessEnforcer.enforce(namespace, authenticationContext.getPrincipal(),
NamespacePermission.READ_REPOSITORY);
RepositoryConfig repoConfig = getRepositoryMeta(namespace).getConfig();
String committer = authenticationContext.getPrincipal().getName();
PullAppsOperation pullOp = pullAppsOperationFactory.create(new PullAppsRequest(
new HashSet<>(request.getApps()),
repoConfig
));

SynchronousLongRunningOperationContext operationContext = new SynchronousLongRunningOperationContext(
namespace.getNamespace(),
OperationType.PULL_APPS
);
ListenableFuture<Set<OperationResource>> result = pullOp.run(operationContext);
result.get();

return operationContext.getOperationMeta();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.cdap.cdap.internal.app.sourcecontrol;


/**
* Factory interface for creating {@link PushAppsOperation}.
* This interface is for Guice assisted binding, hence there will be no concrete implementation of it.
Expand All @@ -27,7 +26,7 @@ public interface PushAppsOperationFactory {
* Returns an implementation of {@link PushAppsOperation} that operates on the given {@link
* PushAppsRequest}.
*
* @param request contains list of apps to pull
* @param request contains list of apps to push
* @return a new instance of {@link PushAppsOperation}.
*/
PushAppsOperation create(PushAppsRequest request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,14 @@ protected AbstractLongRunningOperationContext(OperationRunId runid, OperationTyp
this.runId = runid;
this.type = operationType;
}

@Override
public OperationRunId getRunId() {
return runId;
}

@Override
public OperationType getType() {
return type;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.operation;

import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operation.OperationMeta;
import io.cdap.cdap.proto.operation.OperationResource;
import io.cdap.cdap.proto.operation.OperationType;
import java.time.Instant;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;

public class SynchronousLongRunningOperationContext extends AbstractLongRunningOperationContext {
private OperationMeta operationMeta;

public SynchronousLongRunningOperationContext(String namespace, OperationType operationType) {
super(new OperationRunId(namespace, UUID.randomUUID().toString()), operationType);
this.operationMeta = new OperationMeta(new HashSet<>(), Instant.now(), null);
}

@Override
public void updateOperationResources(Set<OperationResource> resources) {
operationMeta = new OperationMeta(resources, operationMeta.getCreateTime(), Instant.now());
}

public OperationMeta getOperationMeta() {
return operationMeta;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import io.cdap.cdap.internal.app.services.ApplicationLifecycleService;
import io.cdap.cdap.internal.app.services.SourceControlManagementService;
import io.cdap.cdap.internal.app.services.http.AppFabricTestBase;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperationFactory;
import io.cdap.cdap.internal.app.sourcecontrol.PushAppsOperationFactory;
import io.cdap.cdap.metadata.MetadataSubscriberService;
import io.cdap.cdap.proto.ApplicationRecord;
import io.cdap.cdap.proto.id.NamespaceId;
Expand Down Expand Up @@ -119,11 +121,13 @@ public SourceControlManagementService provideSourceControlManagementService(
AuthenticationContext authenticationContext,
SourceControlOperationRunner sourceControlRunner,
ApplicationLifecycleService applicationLifecycleService,
Store store) {
Store store, PushAppsOperationFactory pushAppsOpFactory, PullAppsOperationFactory pullAppsOpFactory) {

return Mockito.spy(new SourceControlManagementService(cConf, secureStore, transactionRunner,
accessEnforcer, authenticationContext,
sourceControlRunner, applicationLifecycleService,
store));
store, pushAppsOpFactory,
pullAppsOpFactory));
}
});
}
Expand Down
Loading

0 comments on commit 2918ba1

Please sign in to comment.