Skip to content

Commit

Permalink
Add pull Opereation
Browse files Browse the repository at this point in the history
  • Loading branch information
samdgupi committed Oct 26, 2023
1 parent bc7ba31 commit 3072e11
Show file tree
Hide file tree
Showing 12 changed files with 611 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class PullAppsOperation implements LongRunningOperation {
/**
* Only request is passed using AssistedInject. See {@link PullAppsOperationFactory}
*
* @param request contains apps to push
* @param request contains apps to pull
* @param runner runs git operations. The reason we do not use
* {@link io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner} rather than
* concrete implementation is because the git operations should always run inMemory.
Expand Down Expand Up @@ -88,7 +88,7 @@ public ListenableFuture<Set<OperationResource>> run(LongRunningOperationContext
);

// pull and deploy applications one at a time
scmOpRunner.pull(pullReq, response -> {
scmOpRunner.multiPull(pullReq, response -> {
appTobeDeployed.set(new ApplicationReference(context.getRunId().getNamespace(),
response.getApplicationName()));
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.sourcecontrol;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.internal.operation.LongRunningOperation;
import io.cdap.cdap.internal.operation.LongRunningOperationContext;
import io.cdap.cdap.internal.operation.OperationException;
import io.cdap.cdap.metadata.ApplicationDetailFetcher;
import io.cdap.cdap.proto.app.UpdateMultiSourceControlMetaReqeust;
import io.cdap.cdap.proto.app.UpdateSourceControlMetaRequest;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.operation.OperationResource;
import io.cdap.cdap.proto.sourcecontrol.RepositoryConfig;
import io.cdap.cdap.sourcecontrol.ApplicationManager;
import io.cdap.cdap.sourcecontrol.NoChangesToPushException;
import io.cdap.cdap.sourcecontrol.SourceControlException;
import io.cdap.cdap.sourcecontrol.operationrunner.InMemorySourceControlOperationRunner;
import io.cdap.cdap.sourcecontrol.operationrunner.MultiPushAppOperationRequest;
import io.cdap.cdap.sourcecontrol.operationrunner.PushAppResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Defines operation for doing SCM Push for connected repositories.
**/
public class PushAppsOperation implements LongRunningOperation {

private final PushAppsRequest request;

private final InMemorySourceControlOperationRunner scmOpRunner;
private final ApplicationManager applicationManager;

/**
* Only request is passed using AssistedInject. See {@link PushAppsOperationFactory}
*
* @param request contains apps to push
* @param runner runs git operations. The reason we do not use
* {@link io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner} rather than
* concrete implementation is because the git operations should always run inMemory.
* @param applicationManager provides utilities to provide app-fabric exposed
* functionalities.
*/
@Inject
PushAppsOperation(@Assisted PushAppsRequest request,
InMemorySourceControlOperationRunner runner,
ApplicationManager applicationManager, ApplicationDetailFetcher appDetailsFetcher) {
this.request = request;
this.applicationManager = applicationManager;
this.scmOpRunner = runner;
}

@Override
public ListenableFuture<Set<OperationResource>> run(LongRunningOperationContext context)
throws OperationException {
RepositoryConfig repositoryConfig = request.getConfig();
NamespaceId namespaceId = context.getRunId().getNamespaceId();
MultiPushAppOperationRequest pushReq = new MultiPushAppOperationRequest(
namespaceId,
repositoryConfig,
request.getApps(),
request.getCommitDetails()
);

List<PushAppResponse> responses = new ArrayList<>();

try {
// pull and deploy applications one at a time
responses = scmOpRunner.multiPush(pushReq, applicationManager);
context.updateOperationResources(getResources(namespaceId, responses));
} catch (SourceControlException | NoChangesToPushException e) {
throw new OperationException(
String.format("Failed to push applications: %s", e.getMessage()), Collections.emptyList()
);
}

try {
// update git metadata for the pushed application
applicationManager.updateSourceControlMeta(namespaceId, getUpdateMetaRequest(responses));
} catch (NotFoundException | BadRequestException | IOException | SourceControlException e) {
throw new OperationException(
String.format("Failed to update git metadata: %s", e.getMessage()),
Collections.emptySet()
);
}

// TODO(samik) Update this after along with the runner implementation
return Futures.immediateFuture(getResources(namespaceId, responses));
}

private UpdateMultiSourceControlMetaReqeust getUpdateMetaRequest(
List<PushAppResponse> responses) {
List<UpdateSourceControlMetaRequest> reqs = responses.stream()
.map(response -> new UpdateSourceControlMetaRequest(
response.getName(), response.getVersion(), response.getFileHash()))
.collect(Collectors.toList());
return new UpdateMultiSourceControlMetaReqeust(reqs);
}

private Set<OperationResource> getResources(NamespaceId namespaceId,
List<PushAppResponse> responses) {
return responses.stream()
.map(response -> new OperationResource(
namespaceId.app(response.getName(), response.getVersion()).toString()))
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.sourcecontrol;


/**
* Factory interface for creating {@link PushAppsOperation}.
* This interface is for Guice assisted binding, hence there will be no concrete implementation of it.
*/
public interface PushAppsOperationFactory {

/**
* Returns an implementation of {@link PushAppsOperation} that operates on the given {@link
* PushAppsRequest}.
*
* @param request contains list of apps to pull
* @return a new instance of {@link PushAppsOperation}.
*/
PushAppsRequest create(PushAppsOperation request);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.sourcecontrol;

import com.google.common.base.Objects;
import io.cdap.cdap.proto.sourcecontrol.RepositoryConfig;
import io.cdap.cdap.sourcecontrol.CommitMeta;
import java.util.Set;

/**
* Request type for {@link PullAppsOperation}.
*/
public class PushAppsRequest {

private final Set<String> apps;
private final RepositoryConfig config;

private final CommitMeta commitDetails;

/**
* Default Constructor.
*
* @param apps Set of apps to pull.
*/
public PushAppsRequest(Set<String> apps, RepositoryConfig config, CommitMeta commitDetails) {
this.apps = apps;
this.config = config;
this.commitDetails = commitDetails;
}

public Set<String> getApps() {
return apps;
}

public RepositoryConfig getConfig() {
return config;
}

public CommitMeta getCommitDetails() {
return commitDetails;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

PushAppsRequest that = (PushAppsRequest) o;
return Objects.equal(this.getApps(), that.getApps())
&& Objects.equal(this.getConfig(), that.getConfig())
&& Objects.equal(this.getCommitDetails(), that.getCommitDetails());
}

@Override
public int hashCode() {
return Objects.hashCode(getApps(), getConfig(), getCommitDetails());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ public PullAppResponse<?> pull(PullAppOperationRequest pullRequest)
}

@Override
public void pull(MultiPullAppOperationRequest pullRequest, Consumer<PullAppResponse<?>> consumer)
public void multiPull(MultiPullAppOperationRequest pullRequest, Consumer<PullAppResponse<?>> consumer)
throws NotFoundException, AuthenticationConfigException {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@
import java.nio.file.Paths;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand Down Expand Up @@ -211,12 +213,12 @@ public static void validateConfig(final SecureStore secureStore,
}

/**
* Commits and pushes the changes of a given file under the repository root
* Commits and pushes the changes of given files under the repository root
* path.
*
* @param commitMeta Details for the commit including author, committer and
* commit message
* @param fileChanged The relative path to repository root where the file is
* @param filesChanged The relative path to repository root where the file is
* updated
* @return the hash of the written file.
* @throws GitAPIException when the underlying git commands fail
Expand All @@ -227,8 +229,8 @@ public static void validateConfig(final SecureStore secureStore,
* @throws SourceControlException when failed to get the fileHash before
* push
*/
public String commitAndPush(final CommitMeta commitMeta,
final Path fileChanged)
public Map<Path, String> commitAndPush(final CommitMeta commitMeta,
final Set<Path> filesChanged)
throws NoChangesToPushException, GitAPIException {
validateInitialized();
final Stopwatch stopwatch = new Stopwatch().start();
Expand All @@ -240,24 +242,35 @@ public String commitAndPush(final CommitMeta commitMeta,
"No changes have been made for the applications to push.");
}

git.add().addFilepattern(fileChanged.toString()).call();
for (Path fileChanged : filesChanged) {
git.add().addFilepattern(fileChanged.toString()).call();
}

RevCommit commit = getCommitCommand(commitMeta).call();

String fileHash;
try {
fileHash = getFileHash(fileChanged, commit);
} catch (IOException e) {
throw new GitOperationException(
String.format("Failed to get fileHash for %s", fileChanged),
e);
Map<Path, String> fileHashes = new HashMap<>();
for (Path fileChanged : filesChanged) {
try {
String fileHash = getFileHash(fileChanged, commit);
fileHashes.put(fileChanged, fileHash);
if (fileHash == null) {
throw new SourceControlException(
String.format(
"Failed to get fileHash for %s, because the path is not "
+ "found in Git tree", fileChanged));
}
} catch (IOException e) {
throw new GitOperationException(
String.format("Failed to get fileHash for %s", fileChanged),
e);
}
}

if (fileHash == null) {
if (fileHashes.size() != filesChanged.size()) {
throw new SourceControlException(
String.format(
"Failed to get fileHash for %s, because the path is not "
+ "found in Git tree", fileChanged));
"Failed to get fileHash for %s because some paths are not "
+ "found in Git tree", filesChanged));
}

PushCommand pushCommand = createCommand(git::push, sourceControlConfig,
Expand All @@ -269,7 +282,7 @@ public String commitAndPush(final CommitMeta commitMeta,
if (rru.getStatus() != RemoteRefUpdate.Status.OK
&& rru.getStatus() != RemoteRefUpdate.Status.UP_TO_DATE) {
throw new GitOperationException(
String.format("Push failed for %s: %s", fileChanged,
String.format("Push failed for %s: %s", filesChanged,
rru.getStatus()));
}
}
Expand All @@ -278,7 +291,7 @@ public String commitAndPush(final CommitMeta commitMeta,
metricsContext.event(
SourceControlManagement.COMMIT_PUSH_LATENCY_MILLIS,
stopwatch.stop().elapsedTime(TimeUnit.MILLISECONDS));
return fileHash;
return fileHashes;
}

private CommitCommand getCommitCommand(final CommitMeta commitMeta) {
Expand Down
Loading

0 comments on commit 3072e11

Please sign in to comment.