Skip to content

Commit

Permalink
Merge pull request #15411 from cdapio/operation-state-publisher
Browse files Browse the repository at this point in the history
Operation state publisher
  • Loading branch information
samdgupi authored Nov 9, 2023
2 parents c7e5433 + 26c1e1f commit 43fcc4c
Show file tree
Hide file tree
Showing 5 changed files with 303 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,49 +16,205 @@

package io.cdap.cdap.internal.operation;


import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import com.google.inject.Inject;
import io.cdap.cdap.api.messaging.TopicNotFoundException;
import io.cdap.cdap.api.security.AccessException;
import io.cdap.cdap.api.service.ServiceUnavailableException;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants.Operation;
import io.cdap.cdap.common.service.RetryStrategies;
import io.cdap.cdap.common.service.RetryStrategy;
import io.cdap.cdap.messaging.client.StoreRequestBuilder;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.proto.Notification;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.id.TopicId;
import io.cdap.cdap.proto.operation.OperationError;
import io.cdap.cdap.proto.operation.OperationResource;
import io.cdap.cdap.proto.operation.OperationRunStatus;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Provides capabilities to send operation lifecycle specific messages.
*/
public class MessagingOperationStatePublisher implements OperationStatePublisher {

private final MessagingService messagingService;
private final RetryStrategy retryStrategy;
private final List<TopicId> topicIds;

private static final Gson GSON = new Gson();
private static final Logger LOG = LoggerFactory.getLogger(
MessagingOperationStatePublisher.class);

/**
* Create a publisher that writes to MessagingService topics depending on the message content.
*
* @param cConf configuration containing the topic prefix and number of partitions
* @param messagingService messaging service to write messages to
*/
@Inject
MessagingOperationStatePublisher(MessagingService messagingService) {
public MessagingOperationStatePublisher(CConfiguration cConf, MessagingService messagingService) {
this(
messagingService,
cConf.get(Operation.STATUS_EVENT_TOPIC),
cConf.getInt(Operation.STATUS_EVENT_NUM_PARTITIONS),
RetryStrategies.fromConfiguration(
cConf, Operation.STATUS_RETRY_STRATEGY_PREFIX)
);
}

/**
* Create a publisher that writes to MessagingService topics depending on the message content.
*
* @param messagingService messaging service to write messages to
* @param topicPrefix prefix of the topic(s) to write to. If there is one topic, the prefix
* will be the topic name. If there is more than one topic, the topic name will be the prefix
* followed by the topic number
* @param numTopics number of topics to write to
* @param retryStrategy retry strategy to use for failures
*/
@VisibleForTesting
public MessagingOperationStatePublisher(MessagingService messagingService,
String topicPrefix, int numTopics, RetryStrategy retryStrategy) {
this.messagingService = messagingService;
this.topicIds = Collections.unmodifiableList(IntStream
.range(0, numTopics)
.mapToObj(i -> NamespaceId.SYSTEM.topic(topicPrefix + i))
.collect(Collectors.toList()));
this.retryStrategy = retryStrategy;
}

@Override
public void publishResources(OperationRunId runId, Set<OperationResource> resources) {
// TODO(samik) implement message publish logic
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.<String, String>builder()
.put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId))
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.RUNNING.name())
.put(Operation.RESOURCES_NOTIFICATION_KEY, GSON.toJson(resources));

publish(runId, propertiesBuilder.build());
}

@Override
public void publishRunning(OperationRunId runId) {
// TODO(samik) implement message publish logic
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.<String, String>builder()
.put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId))
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.RUNNING.name());
publish(runId, propertiesBuilder.build());
}

@Override
public void publishFailed(OperationRunId runId, OperationError error) {
// TODO(samik) implement message publish logic

ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.<String, String>builder()
.put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId))
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.FAILED.name())
.put(Operation.ERROR_NOTIFICATION_KEY, GSON.toJson(error))
.put(Operation.ENDTIME_NOTIFICATION_KEY, Instant.now().toString());
publish(runId, propertiesBuilder.build());
}

@Override
public void publishSuccess(OperationRunId runId) {
// TODO(samik) implement message publish logic

ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.<String, String>builder()
.put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId))
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.SUCCEEDED.name())
.put(Operation.ENDTIME_NOTIFICATION_KEY, Instant.now().toString());
publish(runId, propertiesBuilder.build());
}

@Override
public void publishKilled(OperationRunId runId) {
// TODO(samik) implement message publish logic
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.<String, String>builder()
.put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId))
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.KILLED.name())
.put(Operation.ENDTIME_NOTIFICATION_KEY, Instant.now().toString());
publish(runId, propertiesBuilder.build());
}

@Override
public void publishStopping(OperationRunId runId) {
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.<String, String>builder()
.put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId))
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.STOPPING.name());
publish(runId, propertiesBuilder.build());
}

@Override
public void publishStarting(OperationRunId runId) {
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.<String, String>builder()
.put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId))
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.STARTING.name());
publish(runId, propertiesBuilder.build());
}

/**
* Publish a notification to a topic.
*
* @param runId {@link OperationRunId} for the notification
* @param properties properties of the message to publish, assumed to contain the operation
* run id
*/
public void publish(OperationRunId runId, Map<String, String> properties) {
// OperationRunId is always required in a notification
Notification notification = new Notification(Notification.Type.OPERATION_STATUS, properties);

int failureCount = 0;
long startTime = -1L;
boolean done = false;
// TODO CDAP-12255 This should be refactored into a common class for publishing to TMS with a retry strategy
while (!done) {
try {
messagingService.publish(StoreRequestBuilder.of(getTopic(runId))
.addPayload(GSON.toJson(notification))
.build());
LOG.trace("Published operation status notification: {}", notification);
done = true;
} catch (IOException | AccessException e) {
throw Throwables.propagate(e);
} catch (TopicNotFoundException | ServiceUnavailableException e) {
// These exceptions are retry-able due to TMS not completely started
if (startTime < 0) {
startTime = System.currentTimeMillis();
}
long retryMillis = retryStrategy.nextRetry(++failureCount, startTime);
if (retryMillis < 0) {
LOG.error("Failed to publish messages to TMS and exceeded retry limit.", e);
throw Throwables.propagate(e);
}
LOG.debug("Failed to publish messages to TMS due to {}. Will be retried in {} ms.",
e.getMessage(), retryMillis);
try {
TimeUnit.MILLISECONDS.sleep(retryMillis);
} catch (InterruptedException e1) {
// Something explicitly stopping this thread. Simply just break and reset the interrupt flag.
LOG.warn("Publishing message to TMS interrupted.");
Thread.currentThread().interrupt();
done = true;
}
}
}
}

private TopicId getTopic(OperationRunId runId) {
if (topicIds.size() == 1) {
return topicIds.get(0);
}
return topicIds.get(Math.abs(runId.getRun().hashCode()) % topicIds.size());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.operation;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.cdap.cdap.common.conf.Constants.Operation;
import io.cdap.cdap.proto.Notification;
import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operation.OperationError;
import io.cdap.cdap.proto.operation.OperationResource;
import io.cdap.cdap.proto.operation.OperationRunStatus;
import java.lang.reflect.Type;
import java.time.Instant;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;

/**
* Encapsulates an operation notification sent through TMS.
*/
public class OperationNotification {

private final OperationRunId runId;
private final OperationRunStatus status;
@Nullable
private final Set<OperationResource> resources;
@Nullable
private final Instant endTime;
@Nullable
private final OperationError error;

private static final Gson GSON = new Gson();
private static final Type resourcesType = new TypeToken<Set<OperationResource>>() {
}.getType();

/**
* Default constructor.
*/
public OperationNotification(OperationRunId runId, OperationRunStatus status,
@Nullable Set<OperationResource> resources, Instant endTime, @Nullable OperationError error) {
this.runId = runId;
this.status = status;
this.resources = resources;
this.endTime = endTime;
this.error = error;
}

/**
* Parse {@link Notification} to generate {@link OperationNotification}.
*
* @param notification notification to parse
*/
public static OperationNotification fromNotification(Notification notification) {
Map<String, String> properties = notification.getProperties();

OperationRunId runId = GSON.fromJson(properties.get(Operation.RUN_ID_NOTIFICATION_KEY),
OperationRunId.class);
OperationRunStatus status = OperationRunStatus.valueOf(
properties.get(Operation.STATUS_NOTIFICATION_KEY));
OperationError error = GSON.fromJson(properties.get(Operation.ERROR_NOTIFICATION_KEY),
OperationError.class);
Set<OperationResource> resources = GSON.fromJson(
properties.get(Operation.RESOURCES_NOTIFICATION_KEY), resourcesType);
Instant endTime = Instant.parse(properties.get(Operation.ENDTIME_NOTIFICATION_KEY));

return new OperationNotification(runId, status, resources, endTime, error);
}

public OperationRunId getRunId() {
return runId;
}

public OperationRunStatus getStatus() {
return status;
}

@Nullable
public Set<OperationResource> getResources() {
return resources;
}

@Nullable
public Instant getEndTime() {
return endTime;
}

@Nullable
public OperationError getError() {
return error;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,8 @@ public interface OperationStatePublisher {
* Publishes the current operation status as KILLED.
*/
void publishKilled(OperationRunId runId);

void publishStopping(OperationRunId runId);

void publishStarting(OperationRunId runId);
}
27 changes: 27 additions & 0 deletions cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -2434,4 +2434,31 @@ public static final class InternalRouter {
public static final String CLIENT_ENABLED = "internal.router.client.enabled";
public static final String SERVER_ENABLED = "internal.router.server.enabled";
}

/**
* Constants for operations.
*/
public static final class Operation {

/**
* Topic prefix for publishing status transitioning events of operation runs to the messaging
* system.
*/
public static final String STATUS_EVENT_TOPIC = "operation.status.event.topic";
/**
* Number of topics to use for operation status events. All events related to same run should
* always go to same topic. If this value is 1, {@link #STATUS_EVENT_TOPIC} is a topic name. If
* it's more than 1, {@link #STATUS_EVENT_TOPIC} is a prefix, but bare name should still be
* subscribed to ensure any pending messages / active run events are processed properly.
*/
public static final String STATUS_EVENT_NUM_PARTITIONS = "operation.status.event.topic.num.partitions";
public static final String STATUS_RETRY_STRATEGY_PREFIX = "operation.status.";

// Notification keys
public static final String RUN_ID_NOTIFICATION_KEY = "operation.notification.run.id";
public static final String STATUS_NOTIFICATION_KEY = "operation.notification.status";
public static final String RESOURCES_NOTIFICATION_KEY = "operation.notification.resources";
public static final String ENDTIME_NOTIFICATION_KEY = "operation.notification.endtime";
public static final String ERROR_NOTIFICATION_KEY = "operation.notification.error";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public enum Type {
TIME,
PARTITION,
PROGRAM_STATUS,
PROGRAM_HEART_BEAT
PROGRAM_HEART_BEAT,
OPERATION_STATUS
}

private final Type notificationType;
Expand Down

0 comments on commit 43fcc4c

Please sign in to comment.