diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/MessagingOperationStatePublisher.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/MessagingOperationStatePublisher.java index 816a5e1a97bb..4c98c3caf79e 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/MessagingOperationStatePublisher.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/MessagingOperationStatePublisher.java @@ -16,12 +16,39 @@ package io.cdap.cdap.internal.operation; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.gson.Gson; import com.google.inject.Inject; +import io.cdap.cdap.api.messaging.TopicNotFoundException; +import io.cdap.cdap.api.security.AccessException; +import io.cdap.cdap.api.service.ServiceUnavailableException; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants.Operation; +import io.cdap.cdap.common.service.RetryStrategies; +import io.cdap.cdap.common.service.RetryStrategy; +import io.cdap.cdap.messaging.client.StoreRequestBuilder; import io.cdap.cdap.messaging.spi.MessagingService; +import io.cdap.cdap.proto.Notification; +import io.cdap.cdap.proto.id.NamespaceId; import io.cdap.cdap.proto.id.OperationRunId; +import io.cdap.cdap.proto.id.TopicId; import io.cdap.cdap.proto.operation.OperationError; import io.cdap.cdap.proto.operation.OperationResource; +import io.cdap.cdap.proto.operation.OperationRunStatus; +import java.io.IOException; +import java.time.Instant; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Provides capabilities to send operation lifecycle specific messages. @@ -29,36 +56,165 @@ public class MessagingOperationStatePublisher implements OperationStatePublisher { private final MessagingService messagingService; + private final RetryStrategy retryStrategy; + private final List topicIds; + + private static final Gson GSON = new Gson(); + private static final Logger LOG = LoggerFactory.getLogger( + MessagingOperationStatePublisher.class); + /** + * Create a publisher that writes to MessagingService topics depending on the message content. + * + * @param cConf configuration containing the topic prefix and number of partitions + * @param messagingService messaging service to write messages to + */ @Inject - MessagingOperationStatePublisher(MessagingService messagingService) { + public MessagingOperationStatePublisher(CConfiguration cConf, MessagingService messagingService) { + this( + messagingService, + cConf.get(Operation.STATUS_EVENT_TOPIC), + cConf.getInt(Operation.STATUS_EVENT_NUM_PARTITIONS), + RetryStrategies.fromConfiguration( + cConf, Operation.STATUS_RETRY_STRATEGY_PREFIX) + ); + } + + /** + * Create a publisher that writes to MessagingService topics depending on the message content. + * + * @param messagingService messaging service to write messages to + * @param topicPrefix prefix of the topic(s) to write to. If there is one topic, the prefix + * will be the topic name. If there is more than one topic, the topic name will be the prefix + * followed by the topic number + * @param numTopics number of topics to write to + * @param retryStrategy retry strategy to use for failures + */ + @VisibleForTesting + public MessagingOperationStatePublisher(MessagingService messagingService, + String topicPrefix, int numTopics, RetryStrategy retryStrategy) { this.messagingService = messagingService; + this.topicIds = Collections.unmodifiableList(IntStream + .range(0, numTopics) + .mapToObj(i -> NamespaceId.SYSTEM.topic(topicPrefix + i)) + .collect(Collectors.toList())); + this.retryStrategy = retryStrategy; } @Override public void publishResources(OperationRunId runId, Set resources) { - // TODO(samik) implement message publish logic + ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder() + .put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId)) + .put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.RUNNING.name()) + .put(Operation.RESOURCES_NOTIFICATION_KEY, GSON.toJson(resources)); + + publish(runId, propertiesBuilder.build()); } @Override public void publishRunning(OperationRunId runId) { - // TODO(samik) implement message publish logic + ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder() + .put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId)) + .put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.RUNNING.name()); + publish(runId, propertiesBuilder.build()); } @Override public void publishFailed(OperationRunId runId, OperationError error) { - // TODO(samik) implement message publish logic - + ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder() + .put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId)) + .put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.FAILED.name()) + .put(Operation.ERROR_NOTIFICATION_KEY, GSON.toJson(error)) + .put(Operation.ENDTIME_NOTIFICATION_KEY, Instant.now().toString()); + publish(runId, propertiesBuilder.build()); } @Override public void publishSuccess(OperationRunId runId) { - // TODO(samik) implement message publish logic - + ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder() + .put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId)) + .put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.SUCCEEDED.name()) + .put(Operation.ENDTIME_NOTIFICATION_KEY, Instant.now().toString()); + publish(runId, propertiesBuilder.build()); } @Override public void publishKilled(OperationRunId runId) { - // TODO(samik) implement message publish logic + ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder() + .put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId)) + .put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.KILLED.name()) + .put(Operation.ENDTIME_NOTIFICATION_KEY, Instant.now().toString()); + publish(runId, propertiesBuilder.build()); + } + + @Override + public void publishStopping(OperationRunId runId) { + ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder() + .put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId)) + .put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.STOPPING.name()); + publish(runId, propertiesBuilder.build()); + } + + @Override + public void publishStarting(OperationRunId runId) { + ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder() + .put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId)) + .put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.STARTING.name()); + publish(runId, propertiesBuilder.build()); + } + + /** + * Publish a notification to a topic. + * + * @param runId {@link OperationRunId} for the notification + * @param properties properties of the message to publish, assumed to contain the operation + * run id + */ + public void publish(OperationRunId runId, Map properties) { + // OperationRunId is always required in a notification + Notification notification = new Notification(Notification.Type.OPERATION_STATUS, properties); + + int failureCount = 0; + long startTime = -1L; + boolean done = false; + // TODO CDAP-12255 This should be refactored into a common class for publishing to TMS with a retry strategy + while (!done) { + try { + messagingService.publish(StoreRequestBuilder.of(getTopic(runId)) + .addPayload(GSON.toJson(notification)) + .build()); + LOG.trace("Published operation status notification: {}", notification); + done = true; + } catch (IOException | AccessException e) { + throw Throwables.propagate(e); + } catch (TopicNotFoundException | ServiceUnavailableException e) { + // These exceptions are retry-able due to TMS not completely started + if (startTime < 0) { + startTime = System.currentTimeMillis(); + } + long retryMillis = retryStrategy.nextRetry(++failureCount, startTime); + if (retryMillis < 0) { + LOG.error("Failed to publish messages to TMS and exceeded retry limit.", e); + throw Throwables.propagate(e); + } + LOG.debug("Failed to publish messages to TMS due to {}. Will be retried in {} ms.", + e.getMessage(), retryMillis); + try { + TimeUnit.MILLISECONDS.sleep(retryMillis); + } catch (InterruptedException e1) { + // Something explicitly stopping this thread. Simply just break and reset the interrupt flag. + LOG.warn("Publishing message to TMS interrupted."); + Thread.currentThread().interrupt(); + done = true; + } + } + } + } + + private TopicId getTopic(OperationRunId runId) { + if (topicIds.size() == 1) { + return topicIds.get(0); + } + return topicIds.get(Math.abs(runId.getRun().hashCode()) % topicIds.size()); } } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationNotification.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationNotification.java new file mode 100644 index 000000000000..c4ae62ad7c81 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationNotification.java @@ -0,0 +1,106 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operation; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import io.cdap.cdap.common.conf.Constants.Operation; +import io.cdap.cdap.proto.Notification; +import io.cdap.cdap.proto.id.OperationRunId; +import io.cdap.cdap.proto.operation.OperationError; +import io.cdap.cdap.proto.operation.OperationResource; +import io.cdap.cdap.proto.operation.OperationRunStatus; +import java.lang.reflect.Type; +import java.time.Instant; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; + +/** + * Encapsulates an operation notification sent through TMS. + */ +public class OperationNotification { + + private final OperationRunId runId; + private final OperationRunStatus status; + @Nullable + private final Set resources; + @Nullable + private final Instant endTime; + @Nullable + private final OperationError error; + + private static final Gson GSON = new Gson(); + private static final Type resourcesType = new TypeToken>() { + }.getType(); + + /** + * Default constructor. + */ + public OperationNotification(OperationRunId runId, OperationRunStatus status, + @Nullable Set resources, Instant endTime, @Nullable OperationError error) { + this.runId = runId; + this.status = status; + this.resources = resources; + this.endTime = endTime; + this.error = error; + } + + /** + * Parse {@link Notification} to generate {@link OperationNotification}. + * + * @param notification notification to parse + */ + public static OperationNotification fromNotification(Notification notification) { + Map properties = notification.getProperties(); + + OperationRunId runId = GSON.fromJson(properties.get(Operation.RUN_ID_NOTIFICATION_KEY), + OperationRunId.class); + OperationRunStatus status = OperationRunStatus.valueOf( + properties.get(Operation.STATUS_NOTIFICATION_KEY)); + OperationError error = GSON.fromJson(properties.get(Operation.ERROR_NOTIFICATION_KEY), + OperationError.class); + Set resources = GSON.fromJson( + properties.get(Operation.RESOURCES_NOTIFICATION_KEY), resourcesType); + Instant endTime = Instant.parse(properties.get(Operation.ENDTIME_NOTIFICATION_KEY)); + + return new OperationNotification(runId, status, resources, endTime, error); + } + + public OperationRunId getRunId() { + return runId; + } + + public OperationRunStatus getStatus() { + return status; + } + + @Nullable + public Set getResources() { + return resources; + } + + @Nullable + public Instant getEndTime() { + return endTime; + } + + @Nullable + public OperationError getError() { + return error; + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationStatePublisher.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationStatePublisher.java index ba967d02e400..f8970ede3cf7 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationStatePublisher.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationStatePublisher.java @@ -53,4 +53,8 @@ public interface OperationStatePublisher { * Publishes the current operation status as KILLED. */ void publishKilled(OperationRunId runId); + + void publishStopping(OperationRunId runId); + + void publishStarting(OperationRunId runId); } diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java index f3a5dc32a179..fbe6897b6c32 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java @@ -2432,4 +2432,31 @@ public static final class InternalRouter { public static final String CLIENT_ENABLED = "internal.router.client.enabled"; public static final String SERVER_ENABLED = "internal.router.server.enabled"; } + + /** + * Constants for operations. + */ + public static final class Operation { + + /** + * Topic prefix for publishing status transitioning events of operation runs to the messaging + * system. + */ + public static final String STATUS_EVENT_TOPIC = "operation.status.event.topic"; + /** + * Number of topics to use for operation status events. All events related to same run should + * always go to same topic. If this value is 1, {@link #STATUS_EVENT_TOPIC} is a topic name. If + * it's more than 1, {@link #STATUS_EVENT_TOPIC} is a prefix, but bare name should still be + * subscribed to ensure any pending messages / active run events are processed properly. + */ + public static final String STATUS_EVENT_NUM_PARTITIONS = "operation.status.event.topic.num.partitions"; + public static final String STATUS_RETRY_STRATEGY_PREFIX = "operation.status."; + + // Notification keys + public static final String RUN_ID_NOTIFICATION_KEY = "operation.notification.run.id"; + public static final String STATUS_NOTIFICATION_KEY = "operation.notification.status"; + public static final String RESOURCES_NOTIFICATION_KEY = "operation.notification.resources"; + public static final String ENDTIME_NOTIFICATION_KEY = "operation.notification.endtime"; + public static final String ERROR_NOTIFICATION_KEY = "operation.notification.error"; + } } diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/Notification.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/Notification.java index c9a5e803d439..9f3fb676e6c3 100644 --- a/cdap-proto/src/main/java/io/cdap/cdap/proto/Notification.java +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/Notification.java @@ -45,7 +45,8 @@ public enum Type { TIME, PARTITION, PROGRAM_STATUS, - PROGRAM_HEART_BEAT + PROGRAM_HEART_BEAT, + OPERATION_STATUS } private final Type notificationType;