From 8e74aa4f3aa90e995f734d69b11b505ae64b42c5 Mon Sep 17 00:00:00 2001 From: samik Date: Thu, 16 Nov 2023 01:14:07 +0530 Subject: [PATCH] Add operation notification subscriber --- .../operation/InMemoryOperationRuntime.java | 99 ++++++ .../MessagingOperationStatePublisher.java | 14 +- .../operation/OperationLifecycleManager.java | 56 ++- .../operation/OperationNotification.java | 40 ++- ...ificationSingleTopicSubscriberService.java | 222 ++++++++++++ ...perationNotificationSubscriberService.java | 155 +++++++++ .../internal/operation/OperationRuntime.java | 39 +++ .../operation/ScanOperationRunsRequest.java | 5 - .../operation/guice/OperationModule.java | 3 + .../InMemoryOperationRunnerTest.java | 2 +- ...ationSingleTopicSubscriberServiceTest.java | 327 ++++++++++++++++++ .../operation/OperationRunStoreTest.java | 11 +- .../internal/operation/OperationTestBase.java | 12 +- .../operation/SqlOperationRunsStoreTest.java | 5 +- .../io/cdap/cdap/common/conf/Constants.java | 9 + .../src/main/resources/cdap-default.xml | 41 +++ .../io/cdap/cdap/proto/id/OperationRunId.java | 2 +- .../cdap/proto/operation/OperationError.java | 3 +- .../proto/operation/OperationRunStatus.java | 3 - 19 files changed, 1001 insertions(+), 47 deletions(-) create mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/InMemoryOperationRuntime.java create mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationNotificationSingleTopicSubscriberService.java create mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationNotificationSubscriberService.java create mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRuntime.java create mode 100644 cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationNotificationSingleTopicSubscriberServiceTest.java diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/InMemoryOperationRuntime.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/InMemoryOperationRuntime.java new file mode 100644 index 000000000000..d07a2ed547bc --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/InMemoryOperationRuntime.java @@ -0,0 +1,99 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operation; + + +import com.google.inject.Inject; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.proto.id.OperationRunId; +import io.cdap.cdap.proto.operation.OperationError; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.Nullable; +import org.apache.twill.common.Threads; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link OperationRuntime} implementation when operations are running inside same service. + */ +public class InMemoryOperationRuntime implements OperationRuntime { + + private final CConfiguration cConf; + private final ConcurrentHashMap controllers; + private final OperationRunner runner; + private final OperationStatePublisher statePublisher; + private final TransactionRunner transactionRunner; + + private static final Logger LOG = LoggerFactory.getLogger(InMemoryOperationRuntime.class); + + @Inject + InMemoryOperationRuntime(CConfiguration cConf, OperationRunner runner, + OperationStatePublisher statePublisher, + TransactionRunner transactionRunner) { + this.cConf = cConf; + this.runner = runner; + this.statePublisher = statePublisher; + this.transactionRunner = transactionRunner; + this.controllers = new ConcurrentHashMap<>(); + } + + /** + * Runs an operation given the detail. + * + * @param runDetail detail of the run + * @return {@link OperationController} for the run + */ + public OperationController run(OperationRunDetail runDetail) { + return controllers.computeIfAbsent( + runDetail.getRunId(), + runId -> { + try { + OperationController controller = runner.run(runDetail); + LOG.debug("Added controller for {}", runId); + controller.complete().addListener(() -> remove(runId), Threads.SAME_THREAD_EXECUTOR); + return controller; + } catch (IllegalStateException e) { + statePublisher.publishFailed(runDetail.getRunId(), + new OperationError(e.getMessage(), Collections.emptyList()) + ); + } + return null; + } + ); + } + + /** + * Get controller for a given {@link OperationRunId}. + */ + @Nullable + public OperationController getController(OperationRunDetail detail) { + return controllers.computeIfAbsent(detail.getRunId(), runId -> { + // TODO(samik) create controller from detail after 6.10 release + return null; + }); + } + + private void remove(OperationRunId runId) { + OperationController controller = controllers.remove(runId); + if (controller != null) { + LOG.debug("Controller removed for {}", runId); + } + } + +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/MessagingOperationStatePublisher.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/MessagingOperationStatePublisher.java index 4c98c3caf79e..8f24bb74d542 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/MessagingOperationStatePublisher.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/MessagingOperationStatePublisher.java @@ -104,7 +104,7 @@ public MessagingOperationStatePublisher(MessagingService messagingService, @Override public void publishResources(OperationRunId runId, Set resources) { ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder() - .put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId)) + .put(Operation.RUN_ID_NOTIFICATION_KEY, runId.toString()) .put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.RUNNING.name()) .put(Operation.RESOURCES_NOTIFICATION_KEY, GSON.toJson(resources)); @@ -114,7 +114,7 @@ public void publishResources(OperationRunId runId, Set resour @Override public void publishRunning(OperationRunId runId) { ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder() - .put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId)) + .put(Operation.RUN_ID_NOTIFICATION_KEY, runId.toString()) .put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.RUNNING.name()); publish(runId, propertiesBuilder.build()); } @@ -122,7 +122,7 @@ public void publishRunning(OperationRunId runId) { @Override public void publishFailed(OperationRunId runId, OperationError error) { ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder() - .put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId)) + .put(Operation.RUN_ID_NOTIFICATION_KEY, runId.toString()) .put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.FAILED.name()) .put(Operation.ERROR_NOTIFICATION_KEY, GSON.toJson(error)) .put(Operation.ENDTIME_NOTIFICATION_KEY, Instant.now().toString()); @@ -132,7 +132,7 @@ public void publishFailed(OperationRunId runId, OperationError error) { @Override public void publishSuccess(OperationRunId runId) { ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder() - .put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId)) + .put(Operation.RUN_ID_NOTIFICATION_KEY, runId.toString()) .put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.SUCCEEDED.name()) .put(Operation.ENDTIME_NOTIFICATION_KEY, Instant.now().toString()); publish(runId, propertiesBuilder.build()); @@ -141,7 +141,7 @@ public void publishSuccess(OperationRunId runId) { @Override public void publishKilled(OperationRunId runId) { ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder() - .put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId)) + .put(Operation.RUN_ID_NOTIFICATION_KEY, runId.toString()) .put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.KILLED.name()) .put(Operation.ENDTIME_NOTIFICATION_KEY, Instant.now().toString()); publish(runId, propertiesBuilder.build()); @@ -150,7 +150,7 @@ public void publishKilled(OperationRunId runId) { @Override public void publishStopping(OperationRunId runId) { ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder() - .put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId)) + .put(Operation.RUN_ID_NOTIFICATION_KEY, runId.toString()) .put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.STOPPING.name()); publish(runId, propertiesBuilder.build()); } @@ -158,7 +158,7 @@ public void publishStopping(OperationRunId runId) { @Override public void publishStarting(OperationRunId runId) { ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder() - .put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId)) + .put(Operation.RUN_ID_NOTIFICATION_KEY, runId.toString()) .put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.STARTING.name()); publish(runId, propertiesBuilder.build()); } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java index 8dbe59425cdc..febad517f029 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java @@ -17,13 +17,15 @@ package io.cdap.cdap.internal.operation; import com.google.inject.Inject; -import io.cdap.cdap.proto.operation.OperationRunStatus; -import io.cdap.cdap.spi.data.InvalidFieldException; +import io.cdap.cdap.proto.operation.OperationError; import io.cdap.cdap.spi.data.StructuredTableContext; import io.cdap.cdap.spi.data.transaction.TransactionRunner; import io.cdap.cdap.spi.data.transaction.TransactionRunners; import java.io.IOException; +import java.util.Collections; import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Service that manages lifecycle of Operation. @@ -31,10 +33,14 @@ public class OperationLifecycleManager { private final TransactionRunner transactionRunner; + private final OperationRuntime runtime; + private static final Logger LOG = LoggerFactory.getLogger(OperationLifecycleManager.class); + @Inject - OperationLifecycleManager(TransactionRunner transactionRunner) { + OperationLifecycleManager(TransactionRunner transactionRunner, OperationRuntime runtime) { this.transactionRunner = transactionRunner; + this.runtime = runtime; } /** @@ -72,19 +78,49 @@ public boolean scanOperations(ScanOperationRunsRequest request, int txBatchSize, return currentLimit == 0; } + /** + * Runs a given operation. It is the responsibility of the caller to validate state transition. + * + * @param detail {@link OperationRunDetail} of the operation + */ + public OperationController startOperation(OperationRunDetail detail) { + return runtime.run(detail); + } + /** - * Scan all pending operations. Needed for try running all pending operation during startup. + * Initiate operation stop. It is the responsibility of the caller to validate state transition. * - * @param consumer {@link Consumer} to process each scanned run + * @param detail {@link OperationRunDetail} of the operation + * @throws IllegalStateException in case the operation is not running. It is the + * responsibility of the caller to handle the same */ - public void scanPendingOperations(Consumer consumer) - throws IOException, InvalidFieldException { - TransactionRunners.run(transactionRunner, context -> { - getOperationRunStore(context).scanOperationByStatus(OperationRunStatus.PENDING, consumer); - }, IOException.class, InvalidFieldException.class); + public void stopOperation(OperationRunDetail detail) { + OperationController controller = runtime.getController(detail); + if (controller == null) { + throw new IllegalStateException("Operation is not running"); + } + controller.stop(); } + /** + * Checks if the operation is running. If not sends a failure notification + * Called after service restart. + * + * @param detail {@link OperationRunDetail} of the operation + */ + public void isRunning(OperationRunDetail detail, OperationStatePublisher statePublisher) { + // If there is no active controller that means the operation is not running and should be failed + if (runtime.getController(detail) == null) { + LOG.debug("No running operation for {}, sending failure notification", detail.getRunId()); + statePublisher.publishFailed(detail.getRunId(), + new OperationError("Failed after service restart as operation is not running", + Collections.emptyList())); + return; + } + } + + private OperationRunStore getOperationRunStore(StructuredTableContext context) { return new OperationRunStore(context); } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationNotification.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationNotification.java index c4ae62ad7c81..93b3837c99b0 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationNotification.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationNotification.java @@ -37,6 +37,7 @@ public class OperationNotification { private final OperationRunId runId; private final OperationRunStatus status; + @Nullable private final Set resources; @Nullable @@ -51,8 +52,10 @@ public class OperationNotification { /** * Default constructor. */ - public OperationNotification(OperationRunId runId, OperationRunStatus status, - @Nullable Set resources, Instant endTime, @Nullable OperationError error) { + OperationNotification(OperationRunId runId, OperationRunStatus status, + @Nullable Set resources, Instant endTime, + @Nullable OperationError error + ) { this.runId = runId; this.status = status; this.resources = resources; @@ -68,15 +71,34 @@ public OperationNotification(OperationRunId runId, OperationRunStatus status, public static OperationNotification fromNotification(Notification notification) { Map properties = notification.getProperties(); - OperationRunId runId = GSON.fromJson(properties.get(Operation.RUN_ID_NOTIFICATION_KEY), - OperationRunId.class); + if (!properties.containsKey(Operation.RUN_ID_NOTIFICATION_KEY)) { + throw new IllegalArgumentException("Notification missing operation run id"); + } + + if (!properties.containsKey(Operation.STATUS_NOTIFICATION_KEY)) { + throw new IllegalArgumentException("Notification missing operation status"); + } + + OperationError error = null; + if (properties.containsKey(Operation.ERROR_NOTIFICATION_KEY)) { + error = GSON.fromJson(properties.get(Operation.ERROR_NOTIFICATION_KEY), + OperationError.class); + } + + Set resources = null; + if (properties.containsKey(Operation.RESOURCES_NOTIFICATION_KEY)) { + resources = GSON.fromJson( + properties.get(Operation.RESOURCES_NOTIFICATION_KEY), resourcesType); + } + + Instant endTime = null; + if (properties.containsKey(Operation.ENDTIME_NOTIFICATION_KEY)) { + endTime = Instant.parse(properties.get(Operation.ENDTIME_NOTIFICATION_KEY)); + } + + OperationRunId runId = OperationRunId.fromString(properties.get(Operation.RUN_ID_NOTIFICATION_KEY)); OperationRunStatus status = OperationRunStatus.valueOf( properties.get(Operation.STATUS_NOTIFICATION_KEY)); - OperationError error = GSON.fromJson(properties.get(Operation.ERROR_NOTIFICATION_KEY), - OperationError.class); - Set resources = GSON.fromJson( - properties.get(Operation.RESOURCES_NOTIFICATION_KEY), resourcesType); - Instant endTime = Instant.parse(properties.get(Operation.ENDTIME_NOTIFICATION_KEY)); return new OperationNotification(runId, status, resources, endTime, error); } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationNotificationSingleTopicSubscriberService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationNotificationSingleTopicSubscriberService.java new file mode 100644 index 000000000000..c239c33c34a2 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationNotificationSingleTopicSubscriberService.java @@ -0,0 +1,222 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operation; + +import com.google.common.annotations.VisibleForTesting; +import com.google.gson.JsonSyntaxException; +import io.cdap.cdap.api.common.Bytes; +import io.cdap.cdap.api.metrics.MetricsCollectionService; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.common.utils.ImmutablePair; +import io.cdap.cdap.internal.app.services.AbstractNotificationSubscriberService; +import io.cdap.cdap.internal.app.store.AppMetadataStore; +import io.cdap.cdap.messaging.spi.MessagingService; +import io.cdap.cdap.proto.Notification; +import io.cdap.cdap.proto.id.OperationRunId; +import io.cdap.cdap.proto.operation.OperationError; +import io.cdap.cdap.proto.operation.OperationRunStatus; +import io.cdap.cdap.security.spi.authentication.SecurityRequestContext; +import io.cdap.cdap.spi.data.StructuredTableContext; +import io.cdap.cdap.spi.data.TableNotFoundException; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Iterator; +import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service that receives program status notifications from a single topic and persists to the store. + * No transactions should be started in any of the overrided methods since they are already wrapped + * in a transaction. + */ +class OperationNotificationSingleTopicSubscriberService + extends AbstractNotificationSubscriberService { + + private static final Logger LOG = + LoggerFactory.getLogger(OperationNotificationSingleTopicSubscriberService.class); + + private final OperationStatePublisher statePublisher; + + private final OperationLifecycleManager lifecycleService; + + OperationNotificationSingleTopicSubscriberService( + MessagingService messagingService, + CConfiguration cConf, + MetricsCollectionService metricsCollectionService, + OperationStatePublisher statePublisher, + TransactionRunner transactionRunner, + String name, + String topicName, + OperationLifecycleManager lifecycleManager) { + super( + name, + cConf, + topicName, + cConf.getInt(Constants.Operation.STATUS_EVENT_FETCH_SIZE), + cConf.getLong(Constants.Operation.STATUS_EVENT_POLL_DELAY_MILLIS), + messagingService, + metricsCollectionService, + transactionRunner, + cConf.getInt(Constants.Operation.STATUS_EVENT_TX_SIZE)); + this.statePublisher = statePublisher; + this.lifecycleService = lifecycleManager; + } + + @Nullable + @Override + protected String loadMessageId(StructuredTableContext context) + throws IOException, TableNotFoundException { + return getAppMetadataStore(context).retrieveSubscriberState(getTopicId().getTopic(), + "operation.state.reader"); + } + + @Override + protected void storeMessageId(StructuredTableContext context, String messageId) + throws IOException, TableNotFoundException { + getAppMetadataStore(context).persistSubscriberState(getTopicId().getTopic(), + "operation.state.reader", messageId); + } + + @Override + protected void processMessages( + StructuredTableContext structuredTableContext, + Iterator> messages) + throws Exception { + while (messages.hasNext()) { + ImmutablePair messagePair = messages.next(); + processNotification( + messagePair.getFirst().getBytes(StandardCharsets.UTF_8), + messagePair.getSecond(), + structuredTableContext); + } + } + + /** + * Process a {@link Notification} received from TMS. + * + * @param messageIdBytes the raw message id in the TMS for the notification + * @param notification the {@link Notification} to process + * @param context context to get the table for operations + * @throws Exception if failed to process the given notification + */ + @VisibleForTesting + void processNotification( + byte[] messageIdBytes, + Notification notification, + StructuredTableContext context) + throws Exception { + OperationRunStore runStore = new OperationRunStore(context); + try { + OperationNotification operationNotification = OperationNotification.fromNotification( + notification); + handleOperationEvent(operationNotification, messageIdBytes, runStore); + } catch (IllegalArgumentException | JsonSyntaxException e) { + LOG.warn("Got invalid operation notification", e); + } + } + + private void handleOperationEvent( + OperationNotification notification, + byte[] messageIdBytes, + OperationRunStore runStore + ) throws Exception { + LOG.debug("Processing operation status notification: {}", notification); + OperationRunId runId = notification.getRunId(); + OperationRunDetail runDetail; + try { + runDetail = runStore.getOperation(runId); + if (!runDetail.getRun().getStatus().canTransitionTo(notification.getStatus())) { + LOG.debug( + "Ignoring unexpected request to transition operation {} from {} state to " + + "{} state.", + runId, runDetail.getRun().getStatus(), notification.getStatus()); + return; + } + + byte[] existingSourceId = runDetail.getSourceId(); + if (existingSourceId != null && Bytes.compareTo(messageIdBytes, existingSourceId) < 0) { + LOG.debug( + "Notification source id '{}' is not larger than the existing source id '{}' in the existing " + + "operation run detail.", + Bytes.toHexString(runDetail.getSourceId()), Bytes.toHexString(existingSourceId)); + return; + } + } catch (OperationRunNotFoundException e) { + LOG.debug(String.format("Ignoring message for non existent operation %s", runId)); + return; + } + + switch (notification.getStatus()) { + case STARTING: + String oldUser = SecurityRequestContext.getUserId(); + try { + if (runDetail.getPrincipal() != null) { + SecurityRequestContext.setUserId(runDetail.getPrincipal()); + } + lifecycleService.startOperation(runDetail); + } catch (Exception e) { + LOG.error("Failed to start operation {}", runDetail, e); + statePublisher.publishFailed(runId, + new OperationError(e.getMessage(), Collections.emptyList())); + } finally { + SecurityRequestContext.setUserId(oldUser); + } + runStore.updateOperationStatus(runId, OperationRunStatus.STARTING, messageIdBytes); + break; + case RUNNING: + case SUCCEEDED: + runStore.updateOperationStatus(runId, notification.getStatus(), messageIdBytes); + if (notification.getResources() != null) { + runStore.updateOperationResources(runId, notification.getResources(), messageIdBytes); + } + break; + case STOPPING: + try { + lifecycleService.stopOperation(runDetail); + } catch (IllegalStateException e) { + LOG.warn("Failed to stop operation {}", runDetail.getRunId(), e); + } + runStore.updateOperationStatus(runId, OperationRunStatus.STOPPING, messageIdBytes); + break; + case KILLED: + runStore.updateOperationStatus(runId, OperationRunStatus.KILLED, messageIdBytes); + break; + case FAILED: + runStore.failOperationRun(runId, notification.getError(), notification.getEndTime(), + messageIdBytes); + break; + default: + // This should not happen + LOG.error( + "Unsupported status {} for operation {}, {}", + notification.getStatus(), + runId, + notification); + } + } + + /** + * Returns an instance of {@link AppMetadataStore}. + */ + private AppMetadataStore getAppMetadataStore(StructuredTableContext context) { + return AppMetadataStore.create(context); + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationNotificationSubscriberService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationNotificationSubscriberService.java new file mode 100644 index 000000000000..04f9dd5eecf9 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationNotificationSubscriberService.java @@ -0,0 +1,155 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operation; + +import com.google.common.util.concurrent.AbstractIdleService; +import com.google.common.util.concurrent.Service; +import com.google.inject.Inject; +import io.cdap.cdap.api.metrics.MetricsCollectionService; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.common.service.Retries; +import io.cdap.cdap.common.service.RetryStrategies; +import io.cdap.cdap.common.service.RetryStrategy; +import io.cdap.cdap.messaging.spi.MessagingService; +import io.cdap.cdap.proto.operation.OperationRunStatus; +import io.cdap.cdap.spi.data.StructuredTableContext; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import io.cdap.cdap.spi.data.transaction.TransactionRunners; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.IntStream; +import org.apache.twill.internal.CompositeService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service that creates children services, each to handle a single partition of operation status + * events topic. + */ +public class OperationNotificationSubscriberService extends AbstractIdleService { + private static final Logger LOG = + LoggerFactory.getLogger(OperationNotificationSubscriberService.class); + + private final MessagingService messagingService; + private final CConfiguration cConf; + private final OperationStatePublisher statePublisher; + private final TransactionRunner transactionRunner; + private final MetricsCollectionService metricsCollectionService; + private final OperationLifecycleManager lifecycleManager; + private Service delegate; + + @Inject + OperationNotificationSubscriberService( + MessagingService messagingService, + CConfiguration cConf, + MetricsCollectionService metricsCollectionService, + OperationStatePublisher statePublisher, + TransactionRunner transactionRunner, + OperationLifecycleManager lifecycleManager) { + + this.messagingService = messagingService; + this.cConf = cConf; + this.metricsCollectionService = metricsCollectionService; + this.statePublisher = statePublisher; + this.transactionRunner = transactionRunner; + this.lifecycleManager = lifecycleManager; + } + + @Override + protected void startUp() throws Exception { + // first process the existing active runs before starting to listen + RetryStrategy retryStrategy = + RetryStrategies.fromConfiguration(cConf, Constants.Service.RUNTIME_MONITOR_RETRY_PREFIX); + + TransactionRunners.run( + transactionRunner, context -> { + Retries.runWithRetries( + () -> { + processStartingOperations(context); + processRunningOperations(context); + processStoppingOperations(context); + }, + retryStrategy, + e -> true + ); + } + ); + + List children = new ArrayList<>(); + String topicPrefix = cConf.get(Constants.Operation.STATUS_EVENT_TOPIC); + int numPartitions = cConf.getInt(Constants.Operation.STATUS_EVENT_NUM_PARTITIONS); + IntStream.range(0, numPartitions) + .forEach(i -> children.add(createChildService("operation.status." + i, topicPrefix + i))); + delegate = new CompositeService(children); + + delegate.startAndWait(); + } + + // Sends STARTING notification for all STARTING operations + private void processStartingOperations(StructuredTableContext context) throws Exception { + new OperationRunStore(context).scanOperationByStatus( + OperationRunStatus.STARTING, + (runDetail) -> { + LOG.debug("Retrying to start operation {}.", runDetail.getRunId()); + statePublisher.publishStarting(runDetail.getRunId()); + } + ); + } + + // Try to resume run for all RUNNING operations + private void processRunningOperations(StructuredTableContext context) throws Exception { + new OperationRunStore(context).scanOperationByStatus( + OperationRunStatus.RUNNING, + (runDetail) -> { + lifecycleManager.isRunning(runDetail, statePublisher); + } + ); + } + + // Sends STOPPING notification for all STOPPING operations + private void processStoppingOperations(StructuredTableContext context) throws Exception { + new OperationRunStore(context).scanOperationByStatus( + OperationRunStatus.STOPPING, + (runDetail) -> { + LOG.debug("Retrying to stop operation {}.", runDetail.getRunId()); + statePublisher.publishStopping(runDetail.getRunId()); + } + ); + } + + @Override + protected void shutDown() throws Exception { + delegate.stopAndWait(); + } + + private OperationNotificationSingleTopicSubscriberService createChildService( + String name, String topicName) { + return new OperationNotificationSingleTopicSubscriberService( + messagingService, + cConf, + metricsCollectionService, + statePublisher, + transactionRunner, + name, + topicName, + lifecycleManager + ); + } +} + + diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRuntime.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRuntime.java new file mode 100644 index 000000000000..f363263f19dd --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRuntime.java @@ -0,0 +1,39 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operation; + +import io.cdap.cdap.proto.id.OperationRunId; + +/** + * Provides management of running operations. Stores {@link OperationController} in memory. + */ +public interface OperationRuntime { + + /** + * Get controller for a given {@link OperationRunId}. + * If the operation is not running it will return null + */ + OperationController getController(OperationRunDetail detail); + + /** + * Runs an operation given the detail. + * + * @param detail detail of the run + * @return {@link OperationController} for the run + */ + OperationController run(OperationRunDetail detail); +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/ScanOperationRunsRequest.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/ScanOperationRunsRequest.java index e35c09c378c4..6e78293959eb 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/ScanOperationRunsRequest.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/ScanOperationRunsRequest.java @@ -97,8 +97,6 @@ public static class Builder { private String namespace; @Nullable - private String scanToRunId; - @Nullable private String scanAfterRunId; @Nullable private OperationRunFilter filter; @@ -151,9 +149,6 @@ public Builder setLimit(int limit) { * return new {@link ScanOperationRunsRequest}. */ public ScanOperationRunsRequest build() { - if (namespace == null) { - throw new IllegalArgumentException("namespace must be specified."); - } if (filter == null) { filter = OperationRunFilter.emptyFilter(); } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/guice/OperationModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/guice/OperationModule.java index dcc694ad58d8..a019958545ad 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/guice/OperationModule.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/guice/OperationModule.java @@ -22,9 +22,11 @@ import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperation; import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperationFactory; import io.cdap.cdap.internal.operation.InMemoryOperationRunner; +import io.cdap.cdap.internal.operation.InMemoryOperationRuntime; import io.cdap.cdap.internal.operation.LongRunningOperation; import io.cdap.cdap.internal.operation.MessagingOperationStatePublisher; import io.cdap.cdap.internal.operation.OperationRunner; +import io.cdap.cdap.internal.operation.OperationRuntime; import io.cdap.cdap.internal.operation.OperationStatePublisher; import io.cdap.cdap.sourcecontrol.ApplicationManager; @@ -43,6 +45,7 @@ protected void configure() { bind(ApplicationManager.class).to(LocalApplicationManager.class); bind(OperationRunner.class).to(InMemoryOperationRunner.class); bind(OperationStatePublisher.class).to(MessagingOperationStatePublisher.class); + bind(OperationRuntime.class).to(InMemoryOperationRuntime.class); } } diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/InMemoryOperationRunnerTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/InMemoryOperationRunnerTest.java index 19c0ebe6cfb5..ff457a5a2a87 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/InMemoryOperationRunnerTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/InMemoryOperationRunnerTest.java @@ -49,7 +49,7 @@ public class InMemoryOperationRunnerTest { private static final OperationRunId runId = new OperationRunId("namespace", "run"); private static final OperationRun run = OperationRun.builder() .setRunId(runId.getRun()) - .setStatus(OperationRunStatus.PENDING) + .setStatus(OperationRunStatus.STARTING) .setType(OperationType.PULL_APPS) .setMetadata( OperationMeta.builder().setCreateTime(Instant.now()).build()) diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationNotificationSingleTopicSubscriberServiceTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationNotificationSingleTopicSubscriberServiceTest.java new file mode 100644 index 000000000000..da6b1e67e332 --- /dev/null +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationNotificationSingleTopicSubscriberServiceTest.java @@ -0,0 +1,327 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operation; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.io.Closeables; +import com.google.gson.Gson; +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Scopes; +import io.cdap.cdap.api.metrics.MetricsCollectionService; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants.Operation; +import io.cdap.cdap.common.guice.ConfigModule; +import io.cdap.cdap.common.guice.LocalLocationModule; +import io.cdap.cdap.common.metrics.NoOpMetricsCollectionService; +import io.cdap.cdap.common.utils.ImmutablePair; +import io.cdap.cdap.data.runtime.StorageModule; +import io.cdap.cdap.data.runtime.SystemDatasetRuntimeModule; +import io.cdap.cdap.messaging.spi.MessagingService; +import io.cdap.cdap.proto.Notification; +import io.cdap.cdap.proto.id.OperationRunId; +import io.cdap.cdap.proto.operation.OperationError; +import io.cdap.cdap.proto.operation.OperationResource; +import io.cdap.cdap.proto.operation.OperationRun; +import io.cdap.cdap.proto.operation.OperationRunStatus; +import io.cdap.cdap.proto.operation.OperationType; +import io.cdap.cdap.spi.data.StructuredTableAdmin; +import io.cdap.cdap.spi.data.TableAlreadyExistsException; +import io.cdap.cdap.spi.data.sql.PostgresInstantiator; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import io.cdap.cdap.spi.data.transaction.TransactionRunners; +import io.cdap.cdap.store.StoreDefinition; +import io.zonky.test.db.postgres.embedded.EmbeddedPostgres; +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +public class OperationNotificationSingleTopicSubscriberServiceTest extends OperationTestBase { + + private static CConfiguration cConf; + private static TransactionRunner transactionRunner; + private static EmbeddedPostgres pg; + + private static final Gson GSON = new Gson(); + @ClassRule + public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); + + @BeforeClass + public static void beforeClass() throws IOException, TableAlreadyExistsException { + cConf = CConfiguration.create(); + cConf.set(Operation.STATUS_EVENT_FETCH_SIZE, "3"); + cConf.set(Operation.STATUS_EVENT_POLL_DELAY_MILLIS, "1"); + cConf.set(Operation.STATUS_EVENT_TX_SIZE, "1"); + // addRetryStrategy(cConf, "system.notification."); + pg = PostgresInstantiator.createAndStart(cConf, TEMP_FOLDER.newFolder()); + Injector injector = Guice.createInjector( + new ConfigModule(cConf), + new LocalLocationModule(), + new SystemDatasetRuntimeModule().getInMemoryModules(), + new StorageModule(), + new AbstractModule() { + @Override + protected void configure() { + bind(MetricsCollectionService.class).to(NoOpMetricsCollectionService.class) + .in(Scopes.SINGLETON); + } + } + ); + + transactionRunner = injector.getInstance(TransactionRunner.class); + StoreDefinition.OperationRunsStore.create(injector.getInstance(StructuredTableAdmin.class)); + } + + @AfterClass + public static void afterClass() { + Closeables.closeQuietly(pg); + } + + @Test + public void testProcessMessages() throws Exception { + MessagingService mockMsgService = Mockito.mock(MessagingService.class); + OperationStatePublisher mockStatePublisher = Mockito.mock(OperationStatePublisher.class); + InMemoryOperationRuntime mockRuntime = Mockito.mock(InMemoryOperationRuntime.class); + OperationLifecycleManager lifecycleManager = + new OperationLifecycleManager(transactionRunner, mockRuntime); + OperationNotificationSingleTopicSubscriberService subscriberService = + new OperationNotificationSingleTopicSubscriberService( + mockMsgService, + cConf, + Mockito.mock(MetricsCollectionService.class), + mockStatePublisher, + transactionRunner, + "name", + "topic", + lifecycleManager + ); + + List details = insertTestRuns(transactionRunner).stream() + .filter(d -> d.getRun().getStatus().equals(OperationRunStatus.STARTING)).collect( + Collectors.toList()); + + Notification notification1 = new Notification(Notification.Type.OPERATION_STATUS, + ImmutableMap.builder() + .put(Operation.RUN_ID_NOTIFICATION_KEY, details.get(0).getRunId().toString()) + .put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.STARTING.name()).build()); + + Notification notification2 = new Notification(Notification.Type.OPERATION_STATUS, + ImmutableMap.builder() + .put(Operation.RUN_ID_NOTIFICATION_KEY, details.get(1).getRunId().toString()) + .put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.STARTING.name()).build()); + + TransactionRunners.run(transactionRunner, (context) -> { + subscriberService.processMessages( + context, + ImmutableList.of(ImmutablePair.of("1", notification1), + ImmutablePair.of("2", notification2)).iterator() + ); + }, Exception.class); + + Mockito.verify(mockRuntime, Mockito.times(2)).run(Mockito.any()); + } + + @Test + public void testProcessNotificationInvalidOperation() { + MessagingService mockMsgService = Mockito.mock(MessagingService.class); + OperationStatePublisher mockStatePublisher = Mockito.mock(OperationStatePublisher.class); + InMemoryOperationRuntime mockRuntime = Mockito.mock(InMemoryOperationRuntime.class); + OperationLifecycleManager lifecycleManager = new OperationLifecycleManager(transactionRunner, + mockRuntime); + OperationNotificationSingleTopicSubscriberService subscriberService = + new OperationNotificationSingleTopicSubscriberService( + mockMsgService, + cConf, + Mockito.mock(MetricsCollectionService.class), + mockStatePublisher, + transactionRunner, + "name", + "topic", + lifecycleManager + ); + + Notification notification = new Notification(Notification.Type.OPERATION_STATUS, + ImmutableMap.builder().put(Operation.RUN_ID_NOTIFICATION_KEY, + GSON.toJson(new OperationRunId(testNamespace, "1"))).build()); + + TransactionRunners.run(transactionRunner, context -> { + subscriberService.processNotification(null, notification, context); + }); + // no exception should be thrown + } + + @Test + public void testProcessNotificationInvalidTransition() throws Exception { + MessagingService mockMsgService = Mockito.mock(MessagingService.class); + OperationStatePublisher mockStatePublisher = Mockito.mock(OperationStatePublisher.class); + InMemoryOperationRuntime mockRuntime = Mockito.mock(InMemoryOperationRuntime.class); + OperationLifecycleManager lifecycleManager = + new OperationLifecycleManager(transactionRunner, mockRuntime); + OperationNotificationSingleTopicSubscriberService subscriberService = + new OperationNotificationSingleTopicSubscriberService( + mockMsgService, + cConf, + Mockito.mock(MetricsCollectionService.class), + mockStatePublisher, + transactionRunner, + "name", + "topic", + lifecycleManager + ); + + OperationRunDetail detail = insertRun(testNamespace, OperationType.PULL_APPS, + OperationRunStatus.RUNNING, transactionRunner); + + Notification notification = new Notification(Notification.Type.OPERATION_STATUS, + ImmutableMap.builder() + .put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(detail.getRunId())) + .put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.STARTING.name()).build()); + + TransactionRunners.run(transactionRunner, context -> { + subscriberService.processNotification("1".getBytes(), notification, context); + }); + } + + @Test + public void testProcessNotification() throws Exception { + MessagingService mockMsgService = Mockito.mock(MessagingService.class); + OperationStatePublisher mockStatePublisher = Mockito.mock(OperationStatePublisher.class); + InMemoryOperationRuntime mockRuntime = Mockito.mock(InMemoryOperationRuntime.class); + OperationLifecycleManager lifecycleManager = new OperationLifecycleManager(transactionRunner, + mockRuntime); + OperationNotificationSingleTopicSubscriberService subscriberService = + new OperationNotificationSingleTopicSubscriberService( + mockMsgService, + cConf, + Mockito.mock(MetricsCollectionService.class), + mockStatePublisher, + transactionRunner, + "name", + "topic", + lifecycleManager + ); + + // process PENDING + OperationRunDetail detail = insertRun(testNamespace, OperationType.PULL_APPS, + OperationRunStatus.STARTING, transactionRunner); + + // process STARTING + TransactionRunners.run(transactionRunner, context -> { + Notification notification = new Notification(Notification.Type.OPERATION_STATUS, + ImmutableMap.builder() + .put(Operation.RUN_ID_NOTIFICATION_KEY, detail.getRunId().toString()) + .put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.STARTING.name()) + .build()); + subscriberService.processNotification("1".getBytes(), notification, context); + }); + + Mockito.verify(mockRuntime, Mockito.times(1)).run(detail); + Assert.assertEquals(getRun(detail.getRunId(), transactionRunner).getRun().getStatus(), + OperationRunStatus.STARTING); + + // process RUNNING + Set resources = ImmutableSet.of(new OperationResource("1")); + TransactionRunners.run(transactionRunner, context -> { + Notification notification = new Notification(Notification.Type.OPERATION_STATUS, + ImmutableMap.builder() + .put(Operation.RUN_ID_NOTIFICATION_KEY, detail.getRunId().toString()) + .put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.RUNNING.name()) + .put(Operation.RESOURCES_NOTIFICATION_KEY, GSON.toJson(resources)) + .build()); + subscriberService.processNotification("1".getBytes(), notification, context); + }); + + OperationRun run = getRun(detail.getRunId(), transactionRunner).getRun(); + Assert.assertEquals(run.getStatus(), OperationRunStatus.RUNNING); + Assert.assertEquals(run.getMetadata().getResources(), resources); + + // process SUCCEEDED + Set resources2 = ImmutableSet.of(new OperationResource("2")); + TransactionRunners.run(transactionRunner, context -> { + Notification notification = new Notification(Notification.Type.OPERATION_STATUS, + ImmutableMap.builder() + .put(Operation.RUN_ID_NOTIFICATION_KEY, detail.getRunId().toString()) + .put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.RUNNING.name()) + .put(Operation.RESOURCES_NOTIFICATION_KEY, GSON.toJson(resources2)).build()); + subscriberService.processNotification("1".getBytes(), notification, context); + }); + + run = getRun(detail.getRunId(), transactionRunner).getRun(); + Assert.assertEquals(run.getStatus(), OperationRunStatus.RUNNING); + Assert.assertEquals(run.getMetadata().getResources(), resources2); + + // process FAILED + OperationRunDetail detail2 = insertRun(testNamespace, OperationType.PULL_APPS, + OperationRunStatus.RUNNING, transactionRunner); + OperationError error = new OperationError("test", null); + TransactionRunners.run(transactionRunner, context -> { + Notification notification = new Notification(Notification.Type.OPERATION_STATUS, + ImmutableMap.builder() + .put(Operation.RUN_ID_NOTIFICATION_KEY, detail2.getRunId().toString()) + .put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.FAILED.name()) + .put(Operation.ERROR_NOTIFICATION_KEY, GSON.toJson(error)).build()); + subscriberService.processNotification("1".getBytes(), notification, context); + }); + + run = getRun(detail2.getRunId(), transactionRunner).getRun(); + Assert.assertEquals(run.getStatus(), OperationRunStatus.FAILED); + Assert.assertEquals(run.getError(), error); + + // process STOPPING + OperationRunDetail detail3 = insertRun(testNamespace, OperationType.PULL_APPS, + OperationRunStatus.RUNNING, transactionRunner); + OperationController controller = Mockito.mock(OperationController.class); + Mockito.when(mockRuntime.getController(detail3)).thenReturn(controller); + TransactionRunners.run(transactionRunner, context -> { + Notification notification = new Notification(Notification.Type.OPERATION_STATUS, + ImmutableMap.builder() + .put(Operation.RUN_ID_NOTIFICATION_KEY, detail3.getRunId().toString()) + .put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.STOPPING.name()) + .build()); + subscriberService.processNotification("1".getBytes(), notification, context); + }); + + run = getRun(detail3.getRunId(), transactionRunner).getRun(); + Assert.assertEquals(run.getStatus(), OperationRunStatus.STOPPING); + Mockito.verify(controller, Mockito.times(1)).stop(); + + // process KILLED + TransactionRunners.run(transactionRunner, context -> { + Notification notification = new Notification(Notification.Type.OPERATION_STATUS, + ImmutableMap.builder() + .put(Operation.RUN_ID_NOTIFICATION_KEY, detail3.getRunId().toString()) + .put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.KILLED.name()) + .build()); + subscriberService.processNotification("1".getBytes(), notification, context); + }); + + run = getRun(detail3.getRunId(), transactionRunner).getRun(); + Assert.assertEquals(run.getStatus(), OperationRunStatus.KILLED); + } +} + diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationRunStoreTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationRunStoreTest.java index 4bc1af5cb798..f44132da99ba 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationRunStoreTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationRunStoreTest.java @@ -160,8 +160,7 @@ public void testFailOperation() throws Exception { TransactionRunners.run(transactionRunner, context -> { OperationRunStore store = new OperationRunStore(context); - OperationRunDetail gotDetail = (OperationRunDetail) store.getOperation(runId); - Assert.assertEquals(expectedDetail, gotDetail); + Assert.assertEquals(expectedDetail, store.getOperation(runId)); OperationError error = new OperationError("operation failed", Collections.emptyList()); OperationRun updatedRun = OperationRun.builder(expectedDetail.getRun()) @@ -177,8 +176,7 @@ public void testFailOperation() throws Exception { .build(); store.failOperationRun(runId, error, updatedRun.getMetadata().getEndTime(), updatedDetail.getSourceId()); - gotDetail = store.getOperation(runId); - Assert.assertEquals(updatedDetail, gotDetail); + Assert.assertEquals(updatedDetail, store.getOperation(runId)); try { store.failOperationRun( @@ -253,11 +251,12 @@ public void testScanOperation() throws Exception { public void testScanOperationByStatus() throws Exception { TransactionRunners.run(transactionRunner, context -> { Set expectedRuns = insertTestRuns(transactionRunner).stream().filter( - d -> d.getRun().getStatus().equals(OperationRunStatus.PENDING) + d -> d.getRun().getStatus().equals(OperationRunStatus.STARTING) ).collect(Collectors.toSet()); Set gotRuns = new HashSet<>(); OperationRunStore store = new OperationRunStore(context); - store.scanOperationByStatus(OperationRunStatus.PENDING, gotRuns::add); + store.scanOperationByStatus(OperationRunStatus.STARTING, gotRuns::add); + Assert.assertEquals(expectedRuns.size(), gotRuns.size()); Assert.assertTrue(expectedRuns.containsAll(gotRuns)); }, InvalidFieldException.class, IOException.class); } diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationTestBase.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationTestBase.java index 404d57fa2260..0e0dad18ab74 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationTestBase.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationTestBase.java @@ -42,6 +42,14 @@ public class OperationTestBase { protected static final String testNamespace = "test"; private static final PullAppsRequest input = new PullAppsRequest(Collections.emptySet(), null); + protected static OperationRunDetail getRun(OperationRunId runId, + TransactionRunner transactionRunner) + throws OperationRunNotFoundException, IOException { + return TransactionRunners.run(transactionRunner, context -> { + return new OperationRunStore(context).getOperation(runId); + }, OperationRunNotFoundException.class, IOException.class); + } + protected static OperationRunDetail insertRun( String namespace, OperationType type, @@ -91,7 +99,7 @@ protected static List insertTestRuns(TransactionRunner trans insertRun( testNamespace, OperationType.PUSH_APPS, - OperationRunStatus.PENDING, + OperationRunStatus.STARTING, transactionRunner)); details.add( insertRun( @@ -109,7 +117,7 @@ protected static List insertTestRuns(TransactionRunner trans insertRun( testNamespace, OperationType.PULL_APPS, - OperationRunStatus.PENDING, + OperationRunStatus.STARTING, transactionRunner)); details.add( insertRun( diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/SqlOperationRunsStoreTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/SqlOperationRunsStoreTest.java index e2c54d7288b0..25b5df7aea83 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/SqlOperationRunsStoreTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/SqlOperationRunsStoreTest.java @@ -16,6 +16,7 @@ package io.cdap.cdap.internal.operation; +import com.google.common.io.Closeables; import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; @@ -69,7 +70,7 @@ protected void configure() { } @AfterClass - public static void afterClass() throws IOException { - pg.close(); + public static void afterClass() { + Closeables.closeQuietly(pg); } } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java index 9341e9c9d043..352447b40761 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java @@ -2452,6 +2452,14 @@ public static final class Operation { * subscribed to ensure any pending messages / active run events are processed properly. */ public static final String STATUS_EVENT_NUM_PARTITIONS = "operation.status.event.topic.num.partitions"; + public static final String STATUS_EVENT_FETCH_SIZE = "operation.status.event.fetch.size"; + public static final String STATUS_EVENT_TX_SIZE = "operation.status.event.tx.size"; + public static final String STATUS_EVENT_POLL_DELAY_MILLIS = "operatopn.status.event.poll.delay.millis"; + /** + * Topic name for publishing program status recording events to the messaging system. + */ + public static final String STATUS_RECORD_EVENT_TOPIC = "operation.status.record.event.topic"; + public static final String INIT_BATCH_SIZE = "operation.initialize.batch.size"; public static final String STATUS_RETRY_STRATEGY_PREFIX = "operation.status."; // Notification keys @@ -2460,5 +2468,6 @@ public static final class Operation { public static final String RESOURCES_NOTIFICATION_KEY = "operation.notification.resources"; public static final String ENDTIME_NOTIFICATION_KEY = "operation.notification.endtime"; public static final String ERROR_NOTIFICATION_KEY = "operation.notification.error"; + public static final String USER_ID_NOTIFICATION_KEY = "userId"; } } diff --git a/cdap-common/src/main/resources/cdap-default.xml b/cdap-common/src/main/resources/cdap-default.xml index 4296d9e03986..0b33354be333 100644 --- a/cdap-common/src/main/resources/cdap-default.xml +++ b/cdap-common/src/main/resources/cdap-default.xml @@ -4382,6 +4382,47 @@ + + operation.status.retry.policy.base.delay.ms + 1000 + + The base delay between retries in milliseconds + + + + + operation.status.retry.policy.max.delay.ms + 3000 + + The maximum delay between retries in milliseconds + + + + + operation.status.retry.policy.max.retries + 1000 + + The maximum number of retries to attempt before aborting + + + + + operation.status.retry.policy.max.time.secs + 600 + + The maximum elapsed time in seconds before retries are aborted + + + + + operation.status.retry.policy.type + fixed.delay + + The type of retry policy for programs. Allowed options: + "none", "fixed.delay", or "exponential.backoff". + + + diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/id/OperationRunId.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/id/OperationRunId.java index 17392c70008f..bc44f72e6b93 100644 --- a/cdap-proto/src/main/java/io/cdap/cdap/proto/id/OperationRunId.java +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/id/OperationRunId.java @@ -80,7 +80,7 @@ public int hashCode() { @SuppressWarnings("unused") public static OperationRunId fromIdParts(Iterable idString) { Iterator iterator = idString.iterator(); - return new OperationRunId(next(iterator, "namespace"), next(iterator, "run")); + return new OperationRunId(next(iterator, "namespace"), nextAndEnd(iterator, "run")); } @Override diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/operation/OperationError.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/operation/OperationError.java index 0d8757297a2d..4623f3661f0e 100644 --- a/cdap-proto/src/main/java/io/cdap/cdap/proto/operation/OperationError.java +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/operation/OperationError.java @@ -51,7 +51,8 @@ public boolean equals(Object o) { OperationError that = (OperationError) o; - return this.details.equals(that.details); + return Objects.equals(this.message, that.message) + && Objects.equals(this.details, that.details); } @Override diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/operation/OperationRunStatus.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/operation/OperationRunStatus.java index 38ceabb75eb2..b28c153d8d81 100644 --- a/cdap-proto/src/main/java/io/cdap/cdap/proto/operation/OperationRunStatus.java +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/operation/OperationRunStatus.java @@ -24,7 +24,6 @@ * Status of operation run. */ public enum OperationRunStatus { - PENDING, STARTING, RUNNING, STOPPING, @@ -48,8 +47,6 @@ public boolean canTransitionTo(OperationRunStatus status) { return true; } switch (this) { - case PENDING: - return status == STARTING || status == STOPPING; case STARTING: // RUNNING is the happy path // STOPPING happens if the run was manually stopped gracefully(may include a timeout)