Skip to content

Commit

Permalink
Merge pull request #15319 from cdapio/operation-api
Browse files Browse the repository at this point in the history
CDAP-20809: Operation Runner
  • Loading branch information
samdgupi authored Nov 9, 2023
2 parents 3a7019f + 27bcbdd commit a606282
Show file tree
Hide file tree
Showing 14 changed files with 609 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
import io.cdap.cdap.internal.events.StartProgramEventSubscriber;
import io.cdap.cdap.internal.namespace.credential.handler.GcpWorkloadIdentityHttpHandler;
import io.cdap.cdap.internal.namespace.credential.handler.GcpWorkloadIdentityHttpHandlerInternal;
import io.cdap.cdap.internal.operation.guice.OperationModule;
import io.cdap.cdap.internal.pipeline.SynchronousPipelineFactory;
import io.cdap.cdap.internal.profile.ProfileService;
import io.cdap.cdap.internal.provision.ProvisionerModule;
Expand Down Expand Up @@ -198,6 +199,7 @@ public Module getInMemoryModules() {
new SourceControlModule(),
new EntityVerifierModule(),
new MasterCredentialProviderModule(),
new OperationModule(),
BootstrapModules.getInMemoryModule(),
new AbstractModule() {
@Override
Expand Down Expand Up @@ -240,6 +242,7 @@ public Module getStandaloneModules() {
new EntityVerifierModule(),
new ProvisionerModule(),
new MasterCredentialProviderModule(),
new OperationModule(),
BootstrapModules.getFileBasedModule(),
new AbstractModule() {
@Override
Expand Down Expand Up @@ -294,6 +297,7 @@ public Module getDistributedModules() {
new EntityVerifierModule(),
new ProvisionerModule(),
new MasterCredentialProviderModule(),
new OperationModule(),
BootstrapModules.getFileBasedModule(),
new AbstractModule() {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.operation;

import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperationFactory;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest;

/**
* Abstract runner implementation with common functionality.
*/
public abstract class AbstractOperationRunner implements OperationRunner {

private final PullAppsOperationFactory pullOperationFactory;

AbstractOperationRunner(PullAppsOperationFactory pullOperationFactory) {
this.pullOperationFactory = pullOperationFactory;
}

/**
* Creates a {@link LongRunningOperation} given the {@link OperationRunDetail}.
*
* @param detail {@link OperationRunDetail} for the operation
* @return {@link LongRunningOperation} for the operation type in request
*/
protected LongRunningOperation createOperation(OperationRunDetail detail)
throws IllegalStateException {
switch (detail.getRun().getType()) {
case PULL_APPS:
PullAppsRequest request = detail.getPullAppsRequest();
if (request == null) {
throw new IllegalStateException("Missing request for pull operation");
}
return pullOperationFactory.create(request);
case PUSH_APPS:
default:
throw new IllegalStateException(
String.format("Invalid operation type %s", detail.getRun().getType()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.operation;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.Service.State;
import com.google.common.util.concurrent.SettableFuture;
import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operation.OperationError;
import java.util.Collections;
import org.apache.twill.common.Threads;
import org.apache.twill.internal.ServiceListenerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* In-Memory implementation of {@link OperationController}.
*/
public class InMemoryOperationController implements
OperationController {

private final OperationRunId runId;
private final OperationStatePublisher statePublisher;
private final OperationDriver driver;
private final SettableFuture<OperationController> completionFuture = SettableFuture.create();
private static final Logger LOG = LoggerFactory.getLogger(InMemoryOperationController.class);

InMemoryOperationController(OperationRunId runId, OperationStatePublisher statePublisher,
OperationDriver driver) {
this.runId = runId;
this.driver = driver;
this.statePublisher = statePublisher;
startListen(driver);
}


@Override
public ListenableFuture<OperationController> stop() {
LOG.trace("Stopping operation {}", runId);
driver.stop();
return completionFuture;
}

@Override
public ListenableFuture<OperationController> complete() {
return completionFuture;
}

private void startListen(Service service) {
service.addListener(new ServiceListenerAdapter() {
@Override
public void running() {
statePublisher.publishRunning(runId);
}

@Override
public void terminated(Service.State from) {
if (from.equals(State.STOPPING)) {
statePublisher.publishKilled(runId);
} else {
statePublisher.publishSuccess(runId);
}
markComplete();
}

@Override
public void failed(Service.State from, Throwable failure) {
if (failure instanceof OperationException) {
statePublisher.publishFailed(runId, ((OperationException) failure).toOperationError());
} else {
statePublisher.publishFailed(runId, getOperationErrorFromThrowable(failure));
}
markComplete();
}
}, Threads.SAME_THREAD_EXECUTOR);
}

private void markComplete() {
completionFuture.set(this);
}

private OperationError getOperationErrorFromThrowable(Throwable t) {
LOG.debug("Operation {} of namespace {} failed", runId.getRun(), runId.getParent(), t);
return new OperationError(t.getMessage(), Collections.emptyList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.operation;

import com.google.inject.Inject;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperationFactory;

/**
* Implementation of {@link OperationRunner} to run an operation in the same service.
*/
public class InMemoryOperationRunner extends AbstractOperationRunner {

private final OperationStatePublisher statePublisher;

/**
* Default constructor.
*
* @param statePublisher Publishes the current operation state.
*/
@Inject
public InMemoryOperationRunner(OperationStatePublisher statePublisher,
PullAppsOperationFactory pullOperationFactory) {
super(pullOperationFactory);
this.statePublisher = statePublisher;
}

@Override
public OperationController run(OperationRunDetail detail) throws IllegalStateException {
LongRunningOperationContext context = new LongRunningOperationContext(detail.getRunId(), statePublisher);
OperationDriver driver = new OperationDriver(createOperation(detail), context);
OperationController controller = new InMemoryOperationController(context.getRunId(),
statePublisher, driver);
driver.start();
return controller;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,45 @@

import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operation.OperationResource;
import io.cdap.cdap.proto.operation.OperationType;
import java.util.Set;

/**
* Provides the context for the current operation run.
*/
public interface LongRunningOperationContext {
public class LongRunningOperationContext {

private final OperationRunId runId;
private final OperationStatePublisher statePublisher;

/**
* Get the {@link OperationRunId} for the current run.
* Default constructor.
*
* @return the current runid
* @param runId id of the current operation
* @param type type of the current operation
* @param statePublisher to publish the operation metadata
*/
OperationRunId getRunId();
public LongRunningOperationContext(OperationRunId runId, OperationStatePublisher statePublisher) {
this.runId = runId;
this.statePublisher = statePublisher;
}

/**
* Get the {@link OperationType} to be used by the runner for loading the right operation class.
* Get the {@link OperationRunId} for the current run.
*
* @return the type of the current operation
* @return the current runid
*/
OperationType getType();
public OperationRunId getRunId() {
return runId;
}

/**
* Used by the {@link LongRunningOperation} to update the resources operated on in the
* {@link io.cdap.cdap.proto.operation.OperationMeta} for the run. The input is set as we want all
* resources to be unique
*
* @param resources A set of resources to be updated.
*
*/
// TODO Add exceptions based on implementations.
void updateOperationResources(Set<OperationResource> resources);
public void updateOperationResources(Set<OperationResource> resources) {
statePublisher.publishResources(runId, resources);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.operation;

import com.google.inject.Inject;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operation.OperationError;
import io.cdap.cdap.proto.operation.OperationResource;
import java.util.Set;

/**
* Provides capabilities to send operation lifecycle specific messages.
*/
public class MessagingOperationStatePublisher implements OperationStatePublisher {

private final MessagingService messagingService;

@Inject
MessagingOperationStatePublisher(MessagingService messagingService) {
this.messagingService = messagingService;
}

@Override
public void publishResources(OperationRunId runId, Set<OperationResource> resources) {
// TODO(samik) implement message publish logic
}

@Override
public void publishRunning(OperationRunId runId) {
// TODO(samik) implement message publish logic
}

@Override
public void publishFailed(OperationRunId runId, OperationError error) {
// TODO(samik) implement message publish logic

}

@Override
public void publishSuccess(OperationRunId runId) {
// TODO(samik) implement message publish logic

}

@Override
public void publishKilled(OperationRunId runId) {
// TODO(samik) implement message publish logic
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,21 @@

package io.cdap.cdap.internal.operation;

import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operation.OperationType;
import com.google.common.util.concurrent.ListenableFuture;

/**
* Abstract implementation of {@link LongRunningOperationContext} providing shared functionalities.
* Provides lifecycle hooks for managing the state of an operation.
*/
public abstract class AbstractLongRunningOperationContext implements LongRunningOperationContext {
public interface OperationController {

/**
* Attempt to stop the operation.
**/
ListenableFuture<OperationController> stop();

private final OperationRunId runId;
private final OperationType type;

/**
* Default constructor.
* Returns a future which can be used to block till the operation is completed.
*/
protected AbstractLongRunningOperationContext(OperationRunId runid, OperationType operationType) {
this.runId = runid;
this.type = operationType;
}
ListenableFuture<OperationController> complete();
}
Loading

0 comments on commit a606282

Please sign in to comment.