Skip to content

Commit

Permalink
add resume
Browse files Browse the repository at this point in the history
  • Loading branch information
samdgupi committed Nov 15, 2023
1 parent 01a12f1 commit 86c847c
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,11 @@
package io.cdap.cdap.internal.operation;


import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.service.Retries;
import io.cdap.cdap.common.service.RetryStrategies;
import io.cdap.cdap.common.service.RetryStrategy;
import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operation.OperationError;
import io.cdap.cdap.proto.operation.OperationRunStatus;
import io.cdap.cdap.spi.data.StructuredTableContext;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
Expand All @@ -40,7 +32,7 @@
/**
* {@link OperationRuntime} implementation when operations are running inside same service.
*/
public class InMemoryOperationRuntime extends AbstractIdleService implements OperationRuntime {
public class InMemoryOperationRuntime implements OperationRuntime {

private final CConfiguration cConf;
private final ConcurrentHashMap<OperationRunId, OperationController> controllers;
Expand Down Expand Up @@ -92,75 +84,16 @@ public OperationController run(OperationRunDetail runDetail) {
@Nullable
public OperationController getController(OperationRunDetail detail) {
return controllers.computeIfAbsent(detail.getRunId(), runId -> {
// TODO(samik) fetch from store for remote operations
// TODO(samik) create controller from detail after 6.10 release
return null;
});
}

@Override
protected void startUp() throws Exception {
RetryStrategy retryStrategy =
RetryStrategies.fromConfiguration(cConf, Constants.Service.RUNTIME_MONITOR_RETRY_PREFIX);

TransactionRunners.run(
transactionRunner, context -> {
Retries.runWithRetries(
() -> {
processStartingOperations(context);
processRunningOperations(context);
processStoppingOperations(context);
},
retryStrategy,
e -> true
);
}
);
}

@Override
protected void shutDown() throws Exception {
// no-op
}

private void remove(OperationRunId runId) {
OperationController controller = controllers.remove(runId);
if (controller != null) {
LOG.debug("Controller removed for {}", runId);
}
}

// Sends PENDING notification for all PENDING operations
protected void processStartingOperations(StructuredTableContext context) throws Exception {
new OperationRunStore(context).scanOperationByStatus(
OperationRunStatus.STARTING,
(runDetail) -> {
LOG.debug("Retrying to start operation {}.", runDetail.getRunId());
statePublisher.publishStarting(runDetail.getRunId());
}
);
}

// Sends FAILED notification for all RUNNING operations
// TODO(samik) check if we can add RESUME capability post 6.10
protected void processRunningOperations(StructuredTableContext context) throws Exception {
new OperationRunStore(context).scanOperationByStatus(
OperationRunStatus.RUNNING,
(runDetail) -> {
LOG.debug("Mark operation {} as failed due to service restart.", runDetail.getRunId());
statePublisher.publishFailed(runDetail.getRunId(),
new OperationError("Failed due to service restart", Collections.emptyList()));
}
);
}

// Sends STOPPED notification for all STOPPING operations
protected void processStoppingOperations(StructuredTableContext context) throws Exception {
new OperationRunStore(context).scanOperationByStatus(
OperationRunStatus.STOPPING,
(runDetail) -> {
LOG.debug("Mark operation {} as stopped due to service restart.", runDetail.getRunId());
statePublisher.publishKilled(runDetail.getRunId());
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,29 @@

package io.cdap.cdap.internal.operation;

import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import io.cdap.cdap.proto.operation.OperationError;
import io.cdap.cdap.spi.data.StructuredTableContext;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import java.io.IOException;
import java.util.Collections;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Service that manages lifecycle of Operation.
*/
public class OperationLifecycleService extends AbstractIdleService {
public class OperationLifecycleManager {

private final TransactionRunner transactionRunner;
private final OperationRuntime runtime;
private static final Logger LOG = LoggerFactory.getLogger(OperationLifecycleManager.class);


@Inject
OperationLifecycleService(TransactionRunner transactionRunner, OperationRuntime runtime) {
OperationLifecycleManager(TransactionRunner transactionRunner, OperationRuntime runtime) {
this.transactionRunner = transactionRunner;
this.runtime = runtime;
}
Expand Down Expand Up @@ -86,7 +91,7 @@ public OperationController startOperation(OperationRunDetail detail) {
/**
* Initiate operation stop. It is the responsibility of the caller to validate state transition.
*
* @param detail runDetail of the operation
* @param detail {@link OperationRunDetail} of the operation
* @throws IllegalStateException in case the operation is not running. It is the
* responsibility of the caller to handle the same
*/
Expand All @@ -98,18 +103,24 @@ public void stopOperation(OperationRunDetail detail) {
controller.stop();
}


private OperationRunStore getOperationRunStore(StructuredTableContext context) {
return new OperationRunStore(context);
/**
* Resume the operation. Called after service restart.
*
* @param detail {@link OperationRunDetail} of the operation
*/
public void resumeOperation(OperationRunDetail detail, OperationStatePublisher statePublisher) {
// If there is no active controller that means the operation is not running and should be failed
if (runtime.getController(detail) == null) {
LOG.debug("No running operation for {}, sending failure notification", detail.getRunId());
statePublisher.publishFailed(detail.getRunId(),
new OperationError("Cannot resume after service restart as operation is not running",
Collections.emptyList()));
return;
}
}

@Override
protected void startUp() throws Exception {
runtime.startAndWait();
}

@Override
protected void shutDown() throws Exception {
runtime.stopAndWait();
private OperationRunStore getOperationRunStore(StructuredTableContext context) {
return new OperationRunStore(context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@
package io.cdap.cdap.internal.operation;

import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonSyntaxException;
import io.cdap.cdap.api.common.Bytes;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.utils.ImmutablePair;
import io.cdap.cdap.internal.app.ApplicationSpecificationAdapter;
import io.cdap.cdap.internal.app.services.AbstractNotificationSubscriberService;
import io.cdap.cdap.internal.app.store.AppMetadataStore;
import io.cdap.cdap.messaging.spi.MessagingService;
Expand Down Expand Up @@ -54,14 +51,11 @@ class OperationNotificationSingleTopicSubscriberService
extends AbstractNotificationSubscriberService {

private static final Logger LOG =
LoggerFactory.getLogger(OperationNotificationSubscriberService.class);

private static final Gson GSON =
ApplicationSpecificationAdapter.addTypeAdapters(new GsonBuilder()).create();
LoggerFactory.getLogger(OperationNotificationSingleTopicSubscriberService.class);

private final OperationStatePublisher statePublisher;

private final OperationLifecycleService lifecycleService;
private final OperationLifecycleManager lifecycleService;

OperationNotificationSingleTopicSubscriberService(
MessagingService messagingService,
Expand All @@ -71,7 +65,7 @@ class OperationNotificationSingleTopicSubscriberService
TransactionRunner transactionRunner,
String name,
String topicName,
OperationLifecycleService lifecycleService) {
OperationLifecycleManager lifecycleManager) {
super(
name,
cConf,
Expand All @@ -83,7 +77,7 @@ class OperationNotificationSingleTopicSubscriberService
transactionRunner,
cConf.getInt(Constants.Operation.STATUS_EVENT_TX_SIZE));
this.statePublisher = statePublisher;
this.lifecycleService = lifecycleService;
this.lifecycleService = lifecycleManager;
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,35 @@
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.service.Retries;
import io.cdap.cdap.common.service.RetryStrategies;
import io.cdap.cdap.common.service.RetryStrategy;
import io.cdap.cdap.messaging.spi.MessagingService;
import io.cdap.cdap.proto.operation.OperationRunStatus;
import io.cdap.cdap.spi.data.StructuredTableContext;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.twill.internal.CompositeService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Service that creates children services, each to handle a single partition of operation status
* events topic.
*/
public class OperationNotificationSubscriberService extends AbstractIdleService {
private static final Logger LOG =
LoggerFactory.getLogger(OperationNotificationSubscriberService.class);

private final MessagingService messagingService;
private final CConfiguration cConf;
private final OperationStatePublisher statePublisher;
private final TransactionRunner transactionRunner;
private final MetricsCollectionService metricsCollectionService;
private final OperationLifecycleService lifecycleService;
private final OperationLifecycleManager lifecycleManager;
private Service delegate;

@Inject
Expand All @@ -50,20 +60,35 @@ public class OperationNotificationSubscriberService extends AbstractIdleService
MetricsCollectionService metricsCollectionService,
OperationStatePublisher statePublisher,
TransactionRunner transactionRunner,
OperationLifecycleService lifecycleService) {
OperationLifecycleManager lifecycleManager) {

this.messagingService = messagingService;
this.cConf = cConf;
this.metricsCollectionService = metricsCollectionService;
this.statePublisher = statePublisher;
this.transactionRunner = transactionRunner;
this.lifecycleService = lifecycleService;
this.lifecycleManager = lifecycleManager;
}

@Override
protected void startUp() throws Exception {
// First start the lifecycle service to process the existing entries
lifecycleService.startAndWait();
// first process the existing active runs before starting to listen
RetryStrategy retryStrategy =
RetryStrategies.fromConfiguration(cConf, Constants.Service.RUNTIME_MONITOR_RETRY_PREFIX);

TransactionRunners.run(
transactionRunner, context -> {
Retries.runWithRetries(
() -> {
processStartingOperations(context);
processRunningOperations(context);
processStoppingOperations(context);
},
retryStrategy,
e -> true
);
}
);

List<Service> children = new ArrayList<>();
String topicPrefix = cConf.get(Constants.Operation.STATUS_EVENT_TOPIC);
Expand All @@ -75,6 +100,37 @@ protected void startUp() throws Exception {
delegate.startAndWait();
}

// Sends STARTING notification for all STARTING operations
private void processStartingOperations(StructuredTableContext context) throws Exception {
new OperationRunStore(context).scanOperationByStatus(
OperationRunStatus.STARTING,
(runDetail) -> {
LOG.debug("Retrying to start operation {}.", runDetail.getRunId());
statePublisher.publishStarting(runDetail.getRunId());
}
);
}

// Try to resume run for all RUNNING operations
private void processRunningOperations(StructuredTableContext context) throws Exception {
new OperationRunStore(context).scanOperationByStatus(
OperationRunStatus.RUNNING,
(runDetail) -> {
lifecycleManager.resumeOperation(runDetail, statePublisher);
}
);
}

// Sends STOPPING notification for all STOPPING operations
private void processStoppingOperations(StructuredTableContext context) throws Exception {
new OperationRunStore(context).scanOperationByStatus(
OperationRunStatus.STOPPING,
(runDetail) -> {
LOG.debug("Retrying to stop operation {}.", runDetail.getRunId());
statePublisher.publishStopping(runDetail.getRunId());
}
);
}

@Override
protected void shutDown() throws Exception {
Expand All @@ -91,7 +147,7 @@ private OperationNotificationSingleTopicSubscriberService createChildService(
transactionRunner,
name,
topicName,
lifecycleService
lifecycleManager
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@

package io.cdap.cdap.internal.operation;

import com.google.common.util.concurrent.Service;
import io.cdap.cdap.proto.id.OperationRunId;

/**
* Provides management of running operations. Stores {@link OperationController} in memory.
*/
public interface OperationRuntime extends Service {
public interface OperationRuntime {

/**
* Get controller for a given {@link OperationRunId}.
Expand All @@ -33,9 +32,8 @@ public interface OperationRuntime extends Service {
/**
* Runs an operation given the detail.
*
* @param runDetail detail of the run
* @param detail detail of the run
* @return {@link OperationController} for the run
*/
OperationController run(OperationRunDetail runDetail);

OperationController run(OperationRunDetail detail);
}
Loading

0 comments on commit 86c847c

Please sign in to comment.