Skip to content

Commit

Permalink
Updated the LRO interface to update resources instead
Browse files Browse the repository at this point in the history
  • Loading branch information
samdgupi committed Nov 6, 2023
1 parent e5850f8 commit f0dfb67
Show file tree
Hide file tree
Showing 21 changed files with 367 additions and 560 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
import io.cdap.cdap.internal.events.StartProgramEventSubscriber;
import io.cdap.cdap.internal.namespace.credential.handler.GcpWorkloadIdentityHttpHandler;
import io.cdap.cdap.internal.namespace.credential.handler.GcpWorkloadIdentityHttpHandlerInternal;
import io.cdap.cdap.internal.operation.guice.OperationModule;
import io.cdap.cdap.internal.pipeline.SynchronousPipelineFactory;
import io.cdap.cdap.internal.profile.ProfileService;
import io.cdap.cdap.internal.provision.ProvisionerModule;
Expand Down Expand Up @@ -198,6 +199,7 @@ public Module getInMemoryModules() {
new SourceControlModule(),
new EntityVerifierModule(),
new MasterCredentialProviderModule(),
new OperationModule(),
BootstrapModules.getInMemoryModule(),
new AbstractModule() {
@Override
Expand Down Expand Up @@ -240,6 +242,7 @@ public Module getStandaloneModules() {
new EntityVerifierModule(),
new ProvisionerModule(),
new MasterCredentialProviderModule(),
new OperationModule(),
BootstrapModules.getFileBasedModule(),
new AbstractModule() {
@Override
Expand Down Expand Up @@ -294,6 +297,7 @@ public Module getDistributedModules() {
new EntityVerifierModule(),
new ProvisionerModule(),
new MasterCredentialProviderModule(),
new OperationModule(),
BootstrapModules.getFileBasedModule(),
new AbstractModule() {
@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,39 @@

package io.cdap.cdap.internal.operation;

import avro.shaded.com.google.common.collect.ImmutableMap;
import io.cdap.cdap.common.operation.LongRunningOperation;
import io.cdap.cdap.common.operation.LongRunningOperationRequest;
import io.cdap.cdap.proto.operationrun.OperationType;
import java.util.Map;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperationFactory;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest;

/**
* Abstract runner implementation with common functionality like class loading.
*
* @param <T> The type of the operation request
* Abstract runner implementation with common functionality.
*/
public abstract class AbstractOperationRunner<T> implements OperationRunner<T> {
public abstract class AbstractOperationRunner implements OperationRunner {

private final ImmutableMap<OperationType, LongRunningOperation> operations;
private final PullAppsOperationFactory pullOperationFactory;

protected AbstractOperationRunner(Map<OperationType, LongRunningOperation> operations) {
this.operations = ImmutableMap.copyOf(operations);
AbstractOperationRunner(PullAppsOperationFactory pullOperationFactory) {
this.pullOperationFactory = pullOperationFactory;
}


/**
* Converts an operation type to an operation class.
*
* @param request {@link LongRunningOperationRequest} for the operation
* @param detail {@link OperationRunDetail} for the operation
* @return {@link LongRunningOperation} for the operation type in request
* @throws OperationTypeNotSupportedException when the type is not mapped.
*/
public LongRunningOperation<T> getOperation(LongRunningOperationRequest<T> request)
throws OperationTypeNotSupportedException, InvalidRequestTypeException {
LongRunningOperation operation = operations.get(request.getOperationType());
if (operation == null) {
throw new OperationTypeNotSupportedException(request.getOperationType());
}
if (request.getOperationRequest().getClass()
.isAssignableFrom(operation.getRequestType().getClass())) {
throw new InvalidRequestTypeException(request.getOperationType(),
request.getOperationRequest().getClass().getName());
protected LongRunningOperation getOperation(OperationRunDetail detail)
throws OperationTypeNotSupportedException {
switch (detail.getRun().getType()) {
case PULL_APPS:
PullAppsRequest request = detail.getPullAppsRequest();
if (request == null) {
throw new RuntimeException("Missing request for pull operation");
}
return pullOperationFactory.create(request);
case PUSH_APPS:
default:
throw new OperationTypeNotSupportedException(detail.getRun().getType());
}
return (LongRunningOperation<T>) operation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.operation;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.SettableFuture;
import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operation.OperationError;
import io.cdap.cdap.proto.operation.OperationRunStatus;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.twill.common.Threads;
import org.apache.twill.internal.ServiceListenerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryOperationController implements

Check warning on line 34 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/InMemoryOperationController.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.MissingJavadocTypeCheck

Missing a Javadoc comment.
OperationController {

private final OperationRunId runId;
private final OperationStatePublisher statePublisher;
private final OperationDriver driver;
private final SettableFuture<OperationController> completionFuture = SettableFuture.create();
private final Lock lock = new ReentrantLock();
private final AtomicReference<OperationRunStatus> status = new AtomicReference<>();

private static final Logger LOG = LoggerFactory.getLogger(InMemoryOperationController.class);

InMemoryOperationController(OperationRunId runId, OperationStatePublisher statePublisher,
OperationDriver driver) {
this.runId = runId;
this.driver = driver;
this.statePublisher = statePublisher;
this.status.set(OperationRunStatus.PENDING);
startListen(driver);
}


@Override
public void stop() {
//TODO(samik) verify state before calling stop
lock.lock();
try {
this.status.set(OperationRunStatus.STOPPING);
driver.stopAndWait();
complete();
} catch (Exception e) {
LOG.warn("Exception when stopping operation {}", runId, e);
} finally {
lock.unlock();
}
}

@Override
public ListenableFuture<OperationController> completionFuture() {
return completionFuture;
}

private void startListen(Service service) {
service.addListener(new ServiceListenerAdapter() {
@Override
public void running() {
status.set(OperationRunStatus.RUNNING);
statePublisher.publishRunning(runId);
}

@Override
public void terminated(Service.State from) {
if (status.get().equals(OperationRunStatus.STOPPING)) {
status.set(OperationRunStatus.KILLED);
} else {
status.set(OperationRunStatus.SUCCEEDED);
statePublisher.publishSuccess(runId);
}
complete();
}

@Override
public void failed(Service.State from, Throwable failure) {
status.set(OperationRunStatus.FAILED);
if (failure instanceof OperationException) {
statePublisher.publishFailed(runId, ((OperationException) failure).toOperationError());
} else {
statePublisher.publishFailed(runId, getOperationErrorFromThrowable(failure));
}
complete();
}
}, Threads.SAME_THREAD_EXECUTOR);
}

private void complete() {
completionFuture.set(this);
}

private OperationError getOperationErrorFromThrowable(Throwable t) {
LOG.debug("Operation {} of namespace {} failed", runId.getRun(), runId.getParent(), t);
return new OperationError(t.getMessage(), Collections.emptyList());
}
}
Loading

0 comments on commit f0dfb67

Please sign in to comment.