Skip to content

Commit

Permalink
Merge pull request #15385 from cdapio/multi-push-operation
Browse files Browse the repository at this point in the history
Multi push operation
  • Loading branch information
samdgupi authored Nov 9, 2023
2 parents a606282 + 21bfc91 commit 4b8c345
Show file tree
Hide file tree
Showing 15 changed files with 994 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class PullAppsOperation implements LongRunningOperation {
/**
* Only request is passed using AssistedInject. See {@link PullAppsOperationFactory}
*
* @param request contains apps to push
* @param request contains apps to pull
* @param runner runs git operations. The reason we do not use
* {@link io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner} rather than
* concrete implementation is because the git operations should always run inMemory.
Expand Down Expand Up @@ -88,7 +88,7 @@ public ListenableFuture<Set<OperationResource>> run(LongRunningOperationContext
);

// pull and deploy applications one at a time
scmOpRunner.pull(pullReq, response -> {
scmOpRunner.multiPull(pullReq, response -> {
appTobeDeployed.set(new ApplicationReference(context.getRunId().getNamespace(),
response.getApplicationName()));
try {
Expand All @@ -106,7 +106,8 @@ public ListenableFuture<Set<OperationResource>> run(LongRunningOperationContext
"Failed to deploy applications",
appTobeDeployed.get() != null ? ImmutableList.of(
new OperationResourceScopedError(appTobeDeployed.get().toString(), e.getMessage()))
: Collections.emptyList()
: Collections.emptyList(),
e
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.sourcecontrol;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.internal.operation.LongRunningOperation;
import io.cdap.cdap.internal.operation.LongRunningOperationContext;
import io.cdap.cdap.internal.operation.OperationException;
import io.cdap.cdap.proto.app.UpdateMultiSourceControlMetaReqeust;
import io.cdap.cdap.proto.app.UpdateSourceControlMetaRequest;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.operation.OperationResource;
import io.cdap.cdap.proto.sourcecontrol.RepositoryConfig;
import io.cdap.cdap.sourcecontrol.ApplicationManager;
import io.cdap.cdap.sourcecontrol.NoChangesToPushException;
import io.cdap.cdap.sourcecontrol.SourceControlException;
import io.cdap.cdap.sourcecontrol.operationrunner.InMemorySourceControlOperationRunner;
import io.cdap.cdap.sourcecontrol.operationrunner.MultiPushAppOperationRequest;
import io.cdap.cdap.sourcecontrol.operationrunner.PushAppResponse;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Defines operation for doing SCM Push for connected repositories.
**/
public class PushAppsOperation implements LongRunningOperation {

private final PushAppsRequest request;

private final InMemorySourceControlOperationRunner scmOpRunner;
private final ApplicationManager applicationManager;

/**
* Only request is passed using AssistedInject. See {@link PushAppsOperationFactory}
*
* @param request contains apps to push
* @param runner runs git operations. The reason we do not use
* {@link io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner} rather than
* concrete implementation is because the git operations should always run inMemory.
* @param applicationManager provides utilities to provide app-fabric exposed
* functionalities.
*/
@Inject
PushAppsOperation(@Assisted PushAppsRequest request,
InMemorySourceControlOperationRunner runner,
ApplicationManager applicationManager) {
this.request = request;
this.applicationManager = applicationManager;
this.scmOpRunner = runner;
}

@Override
public ListenableFuture<Set<OperationResource>> run(LongRunningOperationContext context)
throws OperationException {
RepositoryConfig repositoryConfig = request.getConfig();
NamespaceId namespaceId = context.getRunId().getNamespaceId();
MultiPushAppOperationRequest pushReq = new MultiPushAppOperationRequest(
namespaceId,
repositoryConfig,
request.getApps(),
request.getCommitDetails()
);

Collection<PushAppResponse> responses;

try {
responses = scmOpRunner.multiPush(pushReq, applicationManager);
context.updateOperationResources(getResources(namespaceId, responses));
} catch (SourceControlException | NoChangesToPushException e) {
throw new OperationException("Failed to push applications.", Collections.emptyList(), e);
}

try {
// update git metadata for the pushed application
applicationManager.updateSourceControlMeta(namespaceId, getUpdateMetaRequest(responses));
} catch (NotFoundException | BadRequestException | IOException | SourceControlException e) {
throw new OperationException("Failed to update git metadata.", Collections.emptySet(), e);
}

// TODO(samik) Update this after along with the runner implementation
return Futures.immediateFuture(getResources(namespaceId, responses));
}

private UpdateMultiSourceControlMetaReqeust getUpdateMetaRequest(
Collection<PushAppResponse> responses) {
List<UpdateSourceControlMetaRequest> reqs = responses.stream()
.map(response -> new UpdateSourceControlMetaRequest(
response.getName(), response.getVersion(), response.getFileHash()))
.collect(Collectors.toList());
return new UpdateMultiSourceControlMetaReqeust(reqs);
}

private Set<OperationResource> getResources(NamespaceId namespaceId,
Collection<PushAppResponse> responses) {
return responses.stream()
.map(response -> new OperationResource(
namespaceId.app(response.getName(), response.getVersion()).toString()))
.collect(Collectors.toSet());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.sourcecontrol;


/**
* Factory interface for creating {@link PushAppsOperation}.
* This interface is for Guice assisted binding, hence there will be no concrete implementation of it.
*/
public interface PushAppsOperationFactory {

/**
* Returns an implementation of {@link PushAppsOperation} that operates on the given {@link
* PushAppsRequest}.
*
* @param request contains list of apps to pull
* @return a new instance of {@link PushAppsOperation}.
*/
PushAppsOperation create(PushAppsRequest request);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.sourcecontrol;

import com.google.common.base.Objects;
import io.cdap.cdap.proto.sourcecontrol.RepositoryConfig;
import io.cdap.cdap.sourcecontrol.CommitMeta;
import java.util.Set;

/**
* Request type for {@link PullAppsOperation}.
*/
public class PushAppsRequest {

private final Set<String> apps;
private final RepositoryConfig config;

private final CommitMeta commitDetails;

/**
* Default Constructor.
*
* @param apps Set of apps to push.
*/
public PushAppsRequest(Set<String> apps, RepositoryConfig config, CommitMeta commitDetails) {
this.apps = apps;
this.config = config;
this.commitDetails = commitDetails;
}

public Set<String> getApps() {
return apps;
}

public RepositoryConfig getConfig() {
return config;
}

public CommitMeta getCommitDetails() {
return commitDetails;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

PushAppsRequest that = (PushAppsRequest) o;
return Objects.equal(this.getApps(), that.getApps())
&& Objects.equal(this.getConfig(), that.getConfig())
&& Objects.equal(this.getCommitDetails(), that.getCommitDetails());
}

@Override
public int hashCode() {
return Objects.hashCode(getApps(), getConfig(), getCommitDetails());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public OperationException(String message, Collection<OperationResourceScopedErro
this.errors = errors;
}

public OperationException(String message, Collection<OperationResourceScopedError> errors, Throwable cause) {
super(message, cause);
this.errors = errors;
}

public OperationError toOperationError() {
return new OperationError(getMessage(), errors);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@
import io.cdap.cdap.proto.sourcecontrol.SourceControlMeta;
import io.cdap.cdap.security.impersonation.CurrentUGIProvider;
import io.cdap.cdap.security.impersonation.UGIProvider;
import io.cdap.cdap.sourcecontrol.ApplicationManager;
import io.cdap.cdap.sourcecontrol.AuthenticationConfigException;
import io.cdap.cdap.sourcecontrol.NoChangesToPullException;
import io.cdap.cdap.sourcecontrol.NoChangesToPushException;
import io.cdap.cdap.sourcecontrol.SourceControlException;
import io.cdap.cdap.sourcecontrol.operationrunner.MultiPullAppOperationRequest;
import io.cdap.cdap.sourcecontrol.operationrunner.MultiPushAppOperationRequest;
import io.cdap.cdap.sourcecontrol.operationrunner.NamespaceRepository;
import io.cdap.cdap.sourcecontrol.operationrunner.PullAppOperationRequest;
import io.cdap.cdap.sourcecontrol.operationrunner.PullAppResponse;
Expand Down Expand Up @@ -495,14 +497,21 @@ public PushAppResponse push(PushAppOperationRequest pushAppOperationRequest)
return null;
}

@Override
public List<PushAppResponse> multiPush(MultiPushAppOperationRequest pushRequest,
ApplicationManager appManager)
throws NoChangesToPushException, AuthenticationConfigException {
return null;
}

@Override
public PullAppResponse<?> pull(PullAppOperationRequest pullRequest)
throws NotFoundException, AuthenticationConfigException {
return null;
}

@Override
public void pull(MultiPullAppOperationRequest pullRequest, Consumer<PullAppResponse<?>> consumer)
public void multiPull(MultiPullAppOperationRequest pullRequest, Consumer<PullAppResponse<?>> consumer)
throws NotFoundException, AuthenticationConfigException {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
Expand Down Expand Up @@ -75,10 +74,11 @@ public void setUp() throws Exception {
RepositoryManager mockRepositoryManager = Mockito.mock(RepositoryManager.class);
Mockito.doReturn(mockRepositoryManager).when(mockRepositoryManagerFactory)
.create(Mockito.any(), Mockito.any());
Mockito.doReturn(Paths.get("")).when(mockRepositoryManager).getRepositoryRoot();
Mockito.doReturn(repositoryBase.getRoot().toPath()).when(mockRepositoryManager)
.getRepositoryRoot();
Mockito.doReturn(repositoryBase.getRoot().toPath()).when(mockRepositoryManager).getBasePath();

Path rootPath = repositoryBase.getRoot().toPath();
Mockito.doReturn(rootPath).when(mockRepositoryManager).getRepositoryRoot();
Mockito.doReturn(rootPath).when(mockRepositoryManager).getBasePath();

Mockito.doReturn("testHash")
.when(mockRepositoryManager)
.getFileHash(Mockito.any(), Mockito.any());
Expand Down
Loading

0 comments on commit 4b8c345

Please sign in to comment.