diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/OperationHttpHandler.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/OperationHttpHandler.java index 0dcfd78dc4d..f1062b62648 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/OperationHttpHandler.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/OperationHttpHandler.java @@ -31,6 +31,7 @@ import io.cdap.cdap.internal.operation.OperationLifecycleManager; import io.cdap.cdap.internal.operation.OperationRunFilter; import io.cdap.cdap.internal.operation.OperationRunNotFoundException; +import io.cdap.cdap.internal.operation.OperationStatePublisher; import io.cdap.cdap.internal.operation.ScanOperationRunsRequest; import io.cdap.cdap.proto.id.NamespaceId; import io.cdap.cdap.proto.id.OperationRunId; @@ -53,33 +54,39 @@ import javax.ws.rs.PathParam; import javax.ws.rs.QueryParam; -/** The {@link HttpHandler} for handling REST calls to operation endpoints. */ +/** + * The {@link HttpHandler} for handling REST calls to operation endpoints. + */ @Path(Constants.Gateway.API_VERSION_3 + "/namespaces/{namespace-id}/operations") public class OperationHttpHandler extends AbstractAppFabricHttpHandler { + + private final OperationLifecycleManager operationLifecycleManager; + private final int batchSize; private static final Pattern KEY_VALUE_PATTERN = Pattern.compile("(\"?)(\\w+)=(\\w+)(\"?)"); private static final String FILTER_SPLITTER = "AND"; private final FeatureFlagsProvider featureFlagsProvider; private static final Gson GSON = new Gson(); - private final OperationLifecycleManager operationLifecycleManager; - private final int batchSize; public static final String OPERATIONS_LIST_PAGINATED_KEY = "operations"; @Inject - OperationHttpHandler(CConfiguration cConf, OperationLifecycleManager operationLifecycleManager) { + OperationHttpHandler(CConfiguration cConf, OperationLifecycleManager operationLifecycleManager, + OperationStatePublisher statePublisher) { this.batchSize = cConf.getInt(AppFabric.STREAMING_BATCH_SIZE); this.operationLifecycleManager = operationLifecycleManager; this.featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf); } // TODO(CDAP-20881) : Add RBAC check + /** * API to fetch all running operations in a namespace. * * @param namespaceId Namespace to fetch runs from - * @param pageToken the token identifier for the current page requested in a paginated request + * @param pageToken the token identifier for the current page requested in a paginated + * request * @param pageSize the number of application details returned in a paginated request - * @param filter optional filters in EBNF grammar. Currently Only one status and one type filter - * is supported with AND expression. + * @param filter optional filters in EBNF grammar. Currently Only one status and one type + * filter is supported with AND expression. */ @GET @Path("/") @@ -92,14 +99,15 @@ public void scanOperations( @QueryParam("filter") String filter) throws BadRequestException, IOException, ForbiddenException { checkSourceControlMultiAppFeatureFlag(); - validateNamespaceId(namespaceId); + NamespaceId namespace = validateNamespaceId(namespaceId); JsonPaginatedListResponder.respond( GSON, responder, OPERATIONS_LIST_PAGINATED_KEY, jsonListResponder -> { AtomicReference lastRun = new AtomicReference<>(); - ScanOperationRunsRequest scanRequest = getScanRequest(namespaceId, pageToken, pageSize, filter); + ScanOperationRunsRequest scanRequest = getScanRequest(namespaceId, pageToken, pageSize, + filter); boolean pageLimitReached = false; try { pageLimitReached = @@ -138,7 +146,7 @@ public void getOperationRun( checkSourceControlMultiAppFeatureFlag(); validateNamespaceId(namespaceId); if (runId == null || runId.isEmpty()) { - throw new BadRequestException("Path parameter runId cannot be empty"); + throw new BadRequestException("runId cannot be empty"); } responder.sendJson( HttpResponseStatus.OK, @@ -158,11 +166,14 @@ public void getOperationRun( @Path("/{id}/stop") public void failOperation(FullHttpRequest request, HttpResponder responder, @PathParam("namespace-id") String namespaceId, - @PathParam("id") String runId) { - // // TODO(samik, CDAP-20814) send the message to stop the operation - responder.sendString(HttpResponseStatus.OK, - String.format("Updated status for operation run %s in namespace '%s'.", runId, - namespaceId)); + @PathParam("id") String runId) + throws OperationRunNotFoundException, IOException, BadRequestException, ForbiddenException { + checkSourceControlMultiAppFeatureFlag(); + validateNamespaceId(namespaceId); + if (runId == null || runId.isEmpty()) { + throw new BadRequestException("runId cannot be empty"); + } + operationLifecycleManager.sendStopNotification(new OperationRunId(namespaceId, runId)); } private ScanOperationRunsRequest getScanRequest( @@ -218,7 +229,8 @@ private OperationRunFilter getFilter(String filterStr) throws IllegalArgumentExc * @param input The input string containing key-value pairs. * @param splitter The string used to split key-value pairs. * @return A {@code Map} containing the parsed key-value pairs. - * @throws IllegalArgumentException If the input does not match the expected key=val pair pattern. + * @throws IllegalArgumentException If the input does not match the expected key=val pair + * pattern. */ private static ImmutableMap parseKeyValStr(String input, String splitter) { ImmutableMap.Builder keyValMap = ImmutableMap.builder(); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java index 4e9c7f64f15..87f75810de6 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java @@ -17,8 +17,10 @@ package io.cdap.cdap.internal.operation; import com.google.inject.Inject; +import io.cdap.cdap.common.BadRequestException; import io.cdap.cdap.proto.id.OperationRunId; import io.cdap.cdap.proto.operation.OperationError; +import io.cdap.cdap.proto.operation.OperationRunStatus; import io.cdap.cdap.spi.data.StructuredTableContext; import io.cdap.cdap.spi.data.transaction.TransactionRunner; import io.cdap.cdap.spi.data.transaction.TransactionRunners; @@ -35,13 +37,17 @@ public class OperationLifecycleManager { private final TransactionRunner transactionRunner; private final OperationRuntime runtime; + private final OperationStatePublisher statePublisher; + private static final Logger LOG = LoggerFactory.getLogger(OperationLifecycleManager.class); @Inject - OperationLifecycleManager(TransactionRunner transactionRunner, OperationRuntime runtime) { + OperationLifecycleManager(TransactionRunner transactionRunner, OperationRuntime runtime, + OperationStatePublisher statePublisher) { this.transactionRunner = transactionRunner; this.runtime = runtime; + this.statePublisher = statePublisher; } /** @@ -98,6 +104,22 @@ public OperationRunDetail getOperationRun(OperationRunId runId) OperationRunNotFoundException.class); } + /** + * Validates state transition and sends STOPPING notification for an operation + * + * @param runId {@link OperationRunId} of the operation + */ + public void sendStopNotification(OperationRunId runId) + throws OperationRunNotFoundException, IOException, BadRequestException { + // validate the run exists + OperationRunDetail runDetail = getOperationRun(runId); + if (!runDetail.getRun().getStatus().canTransitionTo(OperationRunStatus.STOPPING)) { + throw new BadRequestException(String.format("Operation run in %s state cannot to be stopped", + runDetail.getRun().getStatus())); + } + statePublisher.publishStopping(runId); + } + /** * Runs a given operation. It is the responsibility of the caller to validate state transition. * diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationLifecycleManagerTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationLifecycleManagerTest.java index fd75fe460e9..72fc992b680 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationLifecycleManagerTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationLifecycleManagerTest.java @@ -93,7 +93,8 @@ protected void configure() { transactionRunner = injector.getInstance(TransactionRunner.class); operationLifecycleManager = - new OperationLifecycleManager(transactionRunner, Mockito.mock(OperationRuntime.class)); + new OperationLifecycleManager(transactionRunner, Mockito.mock(OperationRuntime.class), + null); StoreDefinition.OperationRunsStore.create(injector.getInstance(StructuredTableAdmin.class)); batchSize = cConf.getInt(AppFabric.STREAMING_BATCH_SIZE); } diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationNotificationSingleTopicSubscriberServiceTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationNotificationSingleTopicSubscriberServiceTest.java index da6b1e67e33..1b7498eb9fe 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationNotificationSingleTopicSubscriberServiceTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationNotificationSingleTopicSubscriberServiceTest.java @@ -108,7 +108,7 @@ public void testProcessMessages() throws Exception { OperationStatePublisher mockStatePublisher = Mockito.mock(OperationStatePublisher.class); InMemoryOperationRuntime mockRuntime = Mockito.mock(InMemoryOperationRuntime.class); OperationLifecycleManager lifecycleManager = - new OperationLifecycleManager(transactionRunner, mockRuntime); + new OperationLifecycleManager(transactionRunner, mockRuntime, mockStatePublisher); OperationNotificationSingleTopicSubscriberService subscriberService = new OperationNotificationSingleTopicSubscriberService( mockMsgService, @@ -152,7 +152,7 @@ public void testProcessNotificationInvalidOperation() { OperationStatePublisher mockStatePublisher = Mockito.mock(OperationStatePublisher.class); InMemoryOperationRuntime mockRuntime = Mockito.mock(InMemoryOperationRuntime.class); OperationLifecycleManager lifecycleManager = new OperationLifecycleManager(transactionRunner, - mockRuntime); + mockRuntime, mockStatePublisher); OperationNotificationSingleTopicSubscriberService subscriberService = new OperationNotificationSingleTopicSubscriberService( mockMsgService, @@ -181,7 +181,7 @@ public void testProcessNotificationInvalidTransition() throws Exception { OperationStatePublisher mockStatePublisher = Mockito.mock(OperationStatePublisher.class); InMemoryOperationRuntime mockRuntime = Mockito.mock(InMemoryOperationRuntime.class); OperationLifecycleManager lifecycleManager = - new OperationLifecycleManager(transactionRunner, mockRuntime); + new OperationLifecycleManager(transactionRunner, mockRuntime, mockStatePublisher); OperationNotificationSingleTopicSubscriberService subscriberService = new OperationNotificationSingleTopicSubscriberService( mockMsgService, @@ -213,7 +213,7 @@ public void testProcessNotification() throws Exception { OperationStatePublisher mockStatePublisher = Mockito.mock(OperationStatePublisher.class); InMemoryOperationRuntime mockRuntime = Mockito.mock(InMemoryOperationRuntime.class); OperationLifecycleManager lifecycleManager = new OperationLifecycleManager(transactionRunner, - mockRuntime); + mockRuntime, mockStatePublisher); OperationNotificationSingleTopicSubscriberService subscriberService = new OperationNotificationSingleTopicSubscriberService( mockMsgService,