Skip to content

Commit

Permalink
Merge branch 'push-operation' into multi-push-operation
Browse files Browse the repository at this point in the history
  • Loading branch information
samdgupi committed Oct 27, 2023
2 parents 8973afc + 3072e11 commit 2b6800b
Show file tree
Hide file tree
Showing 12 changed files with 389 additions and 244 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class PullAppsOperation implements LongRunningOperation {
/**
* Only request is passed using AssistedInject. See {@link PullAppsOperationFactory}
*
* @param request contains apps to push
* @param request contains apps to pull
* @param runner runs git operations. The reason we do not use
* {@link io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner} rather than
* concrete implementation is because the git operations should always run inMemory.
Expand Down Expand Up @@ -88,7 +88,7 @@ public ListenableFuture<Set<OperationResource>> run(LongRunningOperationContext
);

// pull and deploy applications one at a time
scmOpRunner.pull(pullReq, response -> {
scmOpRunner.multiPull(pullReq, response -> {
appTobeDeployed.set(new ApplicationReference(context.getRunId().getNamespace(),
response.getApplicationName()));
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.internal.operation.LongRunningOperation;
import io.cdap.cdap.internal.operation.LongRunningOperationContext;
import io.cdap.cdap.internal.operation.OperationException;
import io.cdap.cdap.proto.ApplicationDetail;
import io.cdap.cdap.metadata.ApplicationDetailFetcher;
import io.cdap.cdap.proto.app.UpdateMultiSourceControlMetaReqeust;
import io.cdap.cdap.proto.app.UpdateSourceControlMetaRequest;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.operation.OperationResource;
import io.cdap.cdap.proto.sourcecontrol.RepositoryConfig;
import io.cdap.cdap.sourcecontrol.ApplicationManager;
Expand All @@ -48,6 +50,7 @@
public class PushAppsOperation implements LongRunningOperation {

private final PushAppsRequest request;

private final InMemorySourceControlOperationRunner scmOpRunner;
private final ApplicationManager applicationManager;

Expand All @@ -64,7 +67,7 @@ public class PushAppsOperation implements LongRunningOperation {
@Inject
PushAppsOperation(@Assisted PushAppsRequest request,
InMemorySourceControlOperationRunner runner,
ApplicationManager applicationManager) {
ApplicationManager applicationManager, ApplicationDetailFetcher appDetailsFetcher) {
this.request = request;
this.applicationManager = applicationManager;
this.scmOpRunner = runner;
Expand All @@ -73,55 +76,55 @@ public class PushAppsOperation implements LongRunningOperation {
@Override
public ListenableFuture<Set<OperationResource>> run(LongRunningOperationContext context)
throws OperationException {
try {
RepositoryConfig repositoryConfig = request.getConfig();
List<ApplicationDetail> apps = new ArrayList<>();
for (String app : request.getApps()) {
apps.add(applicationManager.get(request.getNamespace().appReference(app)));
}
RepositoryConfig repositoryConfig = request.getConfig();
NamespaceId namespaceId = context.getRunId().getNamespaceId();
MultiPushAppOperationRequest pushReq = new MultiPushAppOperationRequest(
namespaceId,
repositoryConfig,
request.getApps(),
request.getCommitDetails()
);

List<PushAppResponse> responses = new ArrayList<>();

MultiPushAppOperationRequest pushReq = new MultiPushAppOperationRequest(
repositoryConfig,
context.getRunId().getNamespaceId(),
apps,
request.getCommitDetails()
try {
// pull and deploy applications one at a time
responses = scmOpRunner.multiPush(pushReq, applicationManager);
context.updateOperationResources(getResources(namespaceId, responses));
} catch (SourceControlException | NoChangesToPushException e) {
throw new OperationException(
String.format("Failed to push applications: %s", e.getMessage()), Collections.emptyList()
);
}

scmOpRunner.push(pushReq, responses -> {
List<UpdateSourceControlMetaRequest> updateGitMetaRequests = new ArrayList<>();
for (PushAppResponse response : responses) {
updateGitMetaRequests.add(new UpdateSourceControlMetaRequest(
response.getName(),
response.getVersion(),
response.getFileHash()
));
}
UpdateMultiSourceControlMetaReqeust multiScmMetaUpdateReq = new UpdateMultiSourceControlMetaReqeust(
updateGitMetaRequests
);
try {
applicationManager.updateSourceControlMeta(
context.getRunId().getNamespaceId(),
multiScmMetaUpdateReq
);
} catch (Exception e) {
throw new SourceControlException(e);
}
context.updateOperationResources(getResources());
});
} catch (
NotFoundException | SourceControlException | IOException | NoChangesToPushException e) {
try {
// update git metadata for the pushed application
applicationManager.updateSourceControlMeta(namespaceId, getUpdateMetaRequest(responses));
} catch (NotFoundException | BadRequestException | IOException | SourceControlException e) {
throw new OperationException(
"Failed to push applications. " + e.getMessage(),
Collections.emptyList());
String.format("Failed to update git metadata: %s", e.getMessage()),
Collections.emptySet()
);
}

return Futures.immediateFuture(getResources());
// TODO(samik) Update this after along with the runner implementation
return Futures.immediateFuture(getResources(namespaceId, responses));
}

private UpdateMultiSourceControlMetaReqeust getUpdateMetaRequest(
List<PushAppResponse> responses) {
List<UpdateSourceControlMetaRequest> reqs = responses.stream()
.map(response -> new UpdateSourceControlMetaRequest(
response.getName(), response.getVersion(), response.getFileHash()))
.collect(Collectors.toList());
return new UpdateMultiSourceControlMetaReqeust(reqs);
}

private Set<OperationResource> getResources() {
return request.getApps().stream()
.map(OperationResource::new)
private Set<OperationResource> getResources(NamespaceId namespaceId,
List<PushAppResponse> responses) {
return responses.stream()
.map(response -> new OperationResource(
namespaceId.app(response.getName(), response.getVersion()).toString()))
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ public interface PushAppsOperationFactory {
* Returns an implementation of {@link PushAppsOperation} that operates on the given {@link
* PushAppsRequest}.
*
* @param request contains list of apps to push
* @param request contains list of apps to pull
* @return a new instance of {@link PushAppsOperation}.
*/
PushAppsOperation create(PushAppsRequest request);
PushAppsRequest create(PushAppsOperation request);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,41 +17,31 @@
package io.cdap.cdap.internal.app.sourcecontrol;

import com.google.common.base.Objects;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.sourcecontrol.RepositoryConfig;
import io.cdap.cdap.sourcecontrol.CommitMeta;
import java.util.Set;

/**
* Request type for {@link PushAppsOperation}.
* Request type for {@link PullAppsOperation}.
*/
public class PushAppsRequest {

private final NamespaceId namespace;
private final Set<String> apps;
private final RepositoryConfig config;

private final CommitMeta commitDetails;

/**
* Default Constructor.
*
* @param apps Set of apps to push.
* @param apps Set of apps to pull.
*/
public PushAppsRequest(
NamespaceId namespace,
Set<String> apps,
RepositoryConfig config,
CommitMeta commitDetails) {
this.namespace = namespace;
public PushAppsRequest(Set<String> apps, RepositoryConfig config, CommitMeta commitDetails) {
this.apps = apps;
this.config = config;
this.commitDetails = commitDetails;
}

public NamespaceId getNamespace() {
return namespace;
}

public Set<String> getApps() {
return apps;
}
Expand All @@ -75,13 +65,12 @@ public boolean equals(Object o) {

PushAppsRequest that = (PushAppsRequest) o;
return Objects.equal(this.getApps(), that.getApps())
&& Objects.equal(this.getNamespace(), that.getNamespace())
&& Objects.equal(this.getConfig(), that.getConfig())
&& Objects.equal(this.getCommitDetails(), that.getCommitDetails());
}

@Override
public int hashCode() {
return Objects.hashCode(getNamespace(), getApps(), getConfig(), getCommitDetails());
return Objects.hashCode(getApps(), getConfig(), getCommitDetails());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ public PullAppResponse<?> pull(PullAppOperationRequest pullRequest)
}

@Override
public void pull(MultiPullAppOperationRequest pullRequest, Consumer<PullAppResponse<?>> consumer)
public void multiPull(MultiPullAppOperationRequest pullRequest, Consumer<PullAppResponse<?>> consumer)
throws NotFoundException, AuthenticationConfigException {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.eclipse.jgit.api.CloneCommand;
Expand Down Expand Up @@ -213,12 +213,12 @@ public static void validateConfig(final SecureStore secureStore,
}

/**
* Commits and pushes the changes of a given file under the repository root
* Commits and pushes the changes of given files under the repository root
* path.
*
* @param commitMeta Details for the commit including author, committer and
* commit message
* @param fileChanged The relative path to repository root where the file is
* @param filesChanged The relative path to repository root where the file is
* updated
* @return the hash of the written file.
* @throws GitAPIException when the underlying git commands fail
Expand All @@ -229,33 +229,8 @@ public static void validateConfig(final SecureStore secureStore,
* @throws SourceControlException when failed to get the fileHash before
* push
*/
public String commitAndPush(final CommitMeta commitMeta,
final Path fileChanged)
throws NoChangesToPushException, GitAPIException {

return commitAndPush(commitMeta, Collections.singletonList(fileChanged)).get(0);
}

/**
* Commits and pushes the changes to a list of given files under the repository root
* path.
*
* @param commitMeta Details for the commit including author, committer and
* commit message
* @param filesChanged The relative paths to repository root where the files are
* updated
* @return the list of hashes of the written files.
*
* @throws GitAPIException when the underlying git commands fail
* @throws NoChangesToPushException when there's no file changes for the
* commit
* @throws GitOperationException when failed to get filehash due to IOException
* or the {@link PushResult} status is not OK
* @throws SourceControlException when failed to get the fileHash before
* push
*/
public List<String> commitAndPush(final CommitMeta commitMeta,
final List<Path> filesChanged)
public Map<Path, String> commitAndPush(final CommitMeta commitMeta,
final Set<Path> filesChanged)
throws NoChangesToPushException, GitAPIException {
validateInitialized();
final Stopwatch stopwatch = new Stopwatch().start();
Expand All @@ -273,11 +248,11 @@ public List<String> commitAndPush(final CommitMeta commitMeta,

RevCommit commit = getCommitCommand(commitMeta).call();

List<String> fileHashes = new ArrayList<>();
Map<Path, String> fileHashes = new HashMap<>();
for (Path fileChanged : filesChanged) {
try {
String fileHash = getFileHash(fileChanged, commit);
fileHashes.add(fileHash);
fileHashes.put(fileChanged, fileHash);
if (fileHash == null) {
throw new SourceControlException(
String.format(
Expand All @@ -291,6 +266,13 @@ public List<String> commitAndPush(final CommitMeta commitMeta,
}
}

if (fileHashes.size() != filesChanged.size()) {
throw new SourceControlException(
String.format(
"Failed to get fileHash for %s because some paths are not "
+ "found in Git tree", filesChanged));
}

PushCommand pushCommand = createCommand(git::push, sourceControlConfig,
credentialsProvider);
Iterable<PushResult> pushResults = pushCommand.call();
Expand All @@ -300,9 +282,7 @@ public List<String> commitAndPush(final CommitMeta commitMeta,
if (rru.getStatus() != RemoteRefUpdate.Status.OK
&& rru.getStatus() != RemoteRefUpdate.Status.UP_TO_DATE) {
throw new GitOperationException(
String.format("Push failed for %s: %s",
String.join(", ", filesChanged.stream().map(path -> path.toString()).collect(
Collectors.toList())),
String.format("Push failed for %s: %s", filesChanged,
rru.getStatus()));
}
}
Expand Down
Loading

0 comments on commit 2b6800b

Please sign in to comment.