Skip to content

Commit

Permalink
CDAP-20841: Add stop operation api
Browse files Browse the repository at this point in the history
  • Loading branch information
samdgupi committed Nov 21, 2023
1 parent 135cdb5 commit 3444460
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.cdap.cdap.internal.operation.OperationLifecycleManager;
import io.cdap.cdap.internal.operation.OperationRunFilter;
import io.cdap.cdap.internal.operation.OperationRunNotFoundException;
import io.cdap.cdap.internal.operation.OperationStatePublisher;
import io.cdap.cdap.internal.operation.ScanOperationRunsRequest;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.id.OperationRunId;
Expand All @@ -53,33 +54,39 @@
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;

/** The {@link HttpHandler} for handling REST calls to operation endpoints. */
/**
* The {@link HttpHandler} for handling REST calls to operation endpoints.
*/
@Path(Constants.Gateway.API_VERSION_3 + "/namespaces/{namespace-id}/operations")
public class OperationHttpHandler extends AbstractAppFabricHttpHandler {

private final OperationLifecycleManager operationLifecycleManager;
private final int batchSize;
private static final Pattern KEY_VALUE_PATTERN = Pattern.compile("(\"?)(\\w+)=(\\w+)(\"?)");
private static final String FILTER_SPLITTER = "AND";
private final FeatureFlagsProvider featureFlagsProvider;
private static final Gson GSON = new Gson();
private final OperationLifecycleManager operationLifecycleManager;
private final int batchSize;
public static final String OPERATIONS_LIST_PAGINATED_KEY = "operations";

@Inject
OperationHttpHandler(CConfiguration cConf, OperationLifecycleManager operationLifecycleManager) {
OperationHttpHandler(CConfiguration cConf, OperationLifecycleManager operationLifecycleManager,
OperationStatePublisher statePublisher) {
this.batchSize = cConf.getInt(AppFabric.STREAMING_BATCH_SIZE);
this.operationLifecycleManager = operationLifecycleManager;
this.featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf);
}

// TODO(CDAP-20881) : Add RBAC check

/**
* API to fetch all running operations in a namespace.
*
* @param namespaceId Namespace to fetch runs from
* @param pageToken the token identifier for the current page requested in a paginated request
* @param pageToken the token identifier for the current page requested in a paginated
* request
* @param pageSize the number of application details returned in a paginated request
* @param filter optional filters in EBNF grammar. Currently Only one status and one type filter
* is supported with AND expression.
* @param filter optional filters in EBNF grammar. Currently Only one status and one type
* filter is supported with AND expression.
*/
@GET
@Path("/")
Expand All @@ -92,14 +99,15 @@ public void scanOperations(
@QueryParam("filter") String filter)
throws BadRequestException, IOException, ForbiddenException {
checkSourceControlMultiAppFeatureFlag();
validateNamespaceId(namespaceId);
NamespaceId namespace = validateNamespaceId(namespaceId);
JsonPaginatedListResponder.respond(
GSON,
responder,
OPERATIONS_LIST_PAGINATED_KEY,
jsonListResponder -> {
AtomicReference<OperationRun> lastRun = new AtomicReference<>();
ScanOperationRunsRequest scanRequest = getScanRequest(namespaceId, pageToken, pageSize, filter);
ScanOperationRunsRequest scanRequest = getScanRequest(namespaceId, pageToken, pageSize,
filter);
boolean pageLimitReached = false;
try {
pageLimitReached =
Expand Down Expand Up @@ -138,7 +146,7 @@ public void getOperationRun(
checkSourceControlMultiAppFeatureFlag();
validateNamespaceId(namespaceId);
if (runId == null || runId.isEmpty()) {
throw new BadRequestException("Path parameter runId cannot be empty");
throw new BadRequestException("runId cannot be empty");
}
responder.sendJson(
HttpResponseStatus.OK,
Expand All @@ -158,11 +166,14 @@ public void getOperationRun(
@Path("/{id}/stop")
public void failOperation(FullHttpRequest request, HttpResponder responder,
@PathParam("namespace-id") String namespaceId,
@PathParam("id") String runId) {
// // TODO(samik, CDAP-20814) send the message to stop the operation
responder.sendString(HttpResponseStatus.OK,
String.format("Updated status for operation run %s in namespace '%s'.", runId,
namespaceId));
@PathParam("id") String runId)
throws OperationRunNotFoundException, IOException, BadRequestException, ForbiddenException {
checkSourceControlMultiAppFeatureFlag();
validateNamespaceId(namespaceId);
if (runId == null || runId.isEmpty()) {
throw new BadRequestException("runId cannot be empty");
}
operationLifecycleManager.sendStopNotification(new OperationRunId(namespaceId, runId));
}

private ScanOperationRunsRequest getScanRequest(
Expand Down Expand Up @@ -218,7 +229,8 @@ private OperationRunFilter getFilter(String filterStr) throws IllegalArgumentExc
* @param input The input string containing key-value pairs.
* @param splitter The string used to split key-value pairs.
* @return A {@code Map<String, String>} containing the parsed key-value pairs.
* @throws IllegalArgumentException If the input does not match the expected key=val pair pattern.
* @throws IllegalArgumentException If the input does not match the expected key=val pair
* pattern.
*/
private static ImmutableMap<String, String> parseKeyValStr(String input, String splitter) {
ImmutableMap.Builder<String, String> keyValMap = ImmutableMap.<String, String>builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package io.cdap.cdap.internal.operation;

import com.google.inject.Inject;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operation.OperationError;
import io.cdap.cdap.proto.operation.OperationRunStatus;
import io.cdap.cdap.spi.data.StructuredTableContext;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
Expand All @@ -35,13 +37,17 @@ public class OperationLifecycleManager {

private final TransactionRunner transactionRunner;
private final OperationRuntime runtime;
private final OperationStatePublisher statePublisher;

private static final Logger LOG = LoggerFactory.getLogger(OperationLifecycleManager.class);


@Inject
OperationLifecycleManager(TransactionRunner transactionRunner, OperationRuntime runtime) {
OperationLifecycleManager(TransactionRunner transactionRunner, OperationRuntime runtime,
OperationStatePublisher statePublisher) {
this.transactionRunner = transactionRunner;
this.runtime = runtime;
this.statePublisher = statePublisher;
}

/**
Expand Down Expand Up @@ -98,6 +104,22 @@ public OperationRunDetail getOperationRun(OperationRunId runId)
OperationRunNotFoundException.class);
}

/**

Check warning on line 107 in cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.javadoc.SummaryJavadocCheck

First sentence of Javadoc is missing an ending period.
* Validates state transition and sends STOPPING notification for an operation
*
* @param runId {@link OperationRunId} of the operation
*/
public void sendStopNotification(OperationRunId runId)
throws OperationRunNotFoundException, IOException, BadRequestException {
// validate the run exists
OperationRunDetail runDetail = getOperationRun(runId);
if (!runDetail.getRun().getStatus().canTransitionTo(OperationRunStatus.STOPPING)) {
throw new BadRequestException(String.format("Operation run in %s state cannot to be stopped",
runDetail.getRun().getStatus()));
}
statePublisher.publishStopping(runId);
}

/**
* Runs a given operation. It is the responsibility of the caller to validate state transition.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ protected void configure() {

transactionRunner = injector.getInstance(TransactionRunner.class);
operationLifecycleManager =
new OperationLifecycleManager(transactionRunner, Mockito.mock(OperationRuntime.class));
new OperationLifecycleManager(transactionRunner, Mockito.mock(OperationRuntime.class),
null);
StoreDefinition.OperationRunsStore.create(injector.getInstance(StructuredTableAdmin.class));
batchSize = cConf.getInt(AppFabric.STREAMING_BATCH_SIZE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void testProcessMessages() throws Exception {
OperationStatePublisher mockStatePublisher = Mockito.mock(OperationStatePublisher.class);
InMemoryOperationRuntime mockRuntime = Mockito.mock(InMemoryOperationRuntime.class);
OperationLifecycleManager lifecycleManager =
new OperationLifecycleManager(transactionRunner, mockRuntime);
new OperationLifecycleManager(transactionRunner, mockRuntime, mockStatePublisher);
OperationNotificationSingleTopicSubscriberService subscriberService =
new OperationNotificationSingleTopicSubscriberService(
mockMsgService,
Expand Down Expand Up @@ -152,7 +152,7 @@ public void testProcessNotificationInvalidOperation() {
OperationStatePublisher mockStatePublisher = Mockito.mock(OperationStatePublisher.class);
InMemoryOperationRuntime mockRuntime = Mockito.mock(InMemoryOperationRuntime.class);
OperationLifecycleManager lifecycleManager = new OperationLifecycleManager(transactionRunner,
mockRuntime);
mockRuntime, mockStatePublisher);
OperationNotificationSingleTopicSubscriberService subscriberService =
new OperationNotificationSingleTopicSubscriberService(
mockMsgService,
Expand Down Expand Up @@ -181,7 +181,7 @@ public void testProcessNotificationInvalidTransition() throws Exception {
OperationStatePublisher mockStatePublisher = Mockito.mock(OperationStatePublisher.class);
InMemoryOperationRuntime mockRuntime = Mockito.mock(InMemoryOperationRuntime.class);
OperationLifecycleManager lifecycleManager =
new OperationLifecycleManager(transactionRunner, mockRuntime);
new OperationLifecycleManager(transactionRunner, mockRuntime, mockStatePublisher);
OperationNotificationSingleTopicSubscriberService subscriberService =
new OperationNotificationSingleTopicSubscriberService(
mockMsgService,
Expand Down Expand Up @@ -213,7 +213,7 @@ public void testProcessNotification() throws Exception {
OperationStatePublisher mockStatePublisher = Mockito.mock(OperationStatePublisher.class);
InMemoryOperationRuntime mockRuntime = Mockito.mock(InMemoryOperationRuntime.class);
OperationLifecycleManager lifecycleManager = new OperationLifecycleManager(transactionRunner,
mockRuntime);
mockRuntime, mockStatePublisher);
OperationNotificationSingleTopicSubscriberService subscriberService =
new OperationNotificationSingleTopicSubscriberService(
mockMsgService,
Expand Down

0 comments on commit 3444460

Please sign in to comment.