Skip to content

Commit

Permalink
Add startup logic to operation runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
samdgupi committed Nov 14, 2023
1 parent f3affc2 commit 0b8a73e
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ protected void startUp() throws Exception {
transactionRunner, context -> {
Retries.runWithRetries(
() -> {
processPendingOperations(context);
processStartingOperations(context);
processRunningOperations(context);
processStoppingOperations(context);
},
Expand Down Expand Up @@ -163,12 +163,12 @@ private void remove(OperationRunId runId) {
}

// Sends PENDING notification for all PENDING operations
protected void processPendingOperations(StructuredTableContext context) throws Exception {
protected void processStartingOperations(StructuredTableContext context) throws Exception {
new OperationRunStore(context).scanOperationByStatus(
OperationRunStatus.PENDING,
OperationRunStatus.STARTING,
(runDetail) -> {
LOG.debug("Retrying to start operation {}.", runDetail.getRunId());
statePublisher.publishPending(runDetail.getRunId());
statePublisher.publishStarting(runDetail.getRunId());
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void publishStopping(OperationRunId runId) {
}

@Override
public void publishPending(OperationRunId runId) {
public void publishStarting(OperationRunId runId) {
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.<String, String>builder()
.put(Operation.RUN_ID_NOTIFICATION_KEY, runId.toString())
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.STARTING.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operation.OperationRunStatus;
import io.cdap.cdap.spi.data.InvalidFieldException;
import io.cdap.cdap.spi.data.StructuredTableContext;
Expand All @@ -33,10 +32,10 @@
public class OperationLifecycleService extends AbstractIdleService {

private final TransactionRunner transactionRunner;
private final InMemoryOperationRuntime runtime;
private final OperationRuntime runtime;

@Inject
OperationLifecycleService(TransactionRunner transactionRunner, InMemoryOperationRuntime runtime) {
OperationLifecycleService(TransactionRunner transactionRunner, OperationRuntime runtime) {
this.transactionRunner = transactionRunner;
this.runtime = runtime;
}
Expand Down Expand Up @@ -90,52 +89,40 @@ public void scanPendingOperations(Consumer<OperationRunDetail> consumer)
}

/**
* Runs a given operation.
* Runs a given operation. It is the responsibility of the caller to validate state transition.
*
* @param runId of the operation
* @throws OperationRunNotFoundException if the operation runid is not found
* @param detail {@link OperationRunDetail} of the operation
*/
public void startOperation(OperationRunId runId)
throws OperationRunNotFoundException, IOException {
TransactionRunners.run(
transactionRunner,
context -> {
OperationRunDetail detail = getOperationRunStore(context).getOperation(runId);
OperationRunStatus status = detail.getRun().getStatus();
if (!status.canTransitionTo(OperationRunStatus.RUNNING)) {
throw new IllegalStateException(
String.format("Cannot start operation %s with status %s", runId, status));
}
runtime.run(detail);
}, OperationRunNotFoundException.class, IOException.class
);
public OperationController startOperation(OperationRunDetail detail) {
return runtime.run(detail);
}


/**
* Initiate operation stop. It is the responsibility of the caller to validate state transition.
*
* @param detail runDetail of the operation
* @throws OperationRunNotFoundException if the operation run is not present
* @throws IllegalStateException if the operation cannot be stopped due to invalid state
* transition
*/
public void stopOperation(OperationRunDetail detail) {
OperationController controller = runtime.getController(detail);
if (controller == null) {
throw new IllegalStateException("Operation is not running");
}
controller.stop();
}


private OperationRunStore getOperationRunStore(StructuredTableContext context) {
return new OperationRunStore(context);
}

@Override
protected void startUp() throws Exception {
runtime.startAndWait();
runtime.startAndWait();
}

@Override
protected void shutDown() throws Exception {
runtime.stopAndWait();
runtime.stopAndWait();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.gson.GsonBuilder;
import com.google.gson.JsonSyntaxException;
import com.google.inject.Inject;
import io.cdap.cdap.api.common.Bytes;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
Expand Down Expand Up @@ -195,8 +196,6 @@ protected void processMessages(
* @param messageIdBytes the raw message id in the TMS for the notification
* @param notification the {@link Notification} to process
* @param context context to get the table for operations
* @return a {@link List} of {@link Runnable} tasks to run after the transactional processing of
* the whole messages batch is completed
* @throws Exception if failed to process the given notification
*/
@VisibleForTesting
Expand All @@ -206,17 +205,13 @@ void processNotification(
StructuredTableContext context)
throws Exception {
OperationRunStore runStore = new OperationRunStore(context);
OperationNotification operationNotification;

try {
operationNotification = OperationNotification.fromNotification(
OperationNotification operationNotification = OperationNotification.fromNotification(
notification);
handleOperationEvent(operationNotification, messageIdBytes, runStore);
} catch (IllegalArgumentException | JsonSyntaxException e) {
LOG.warn("Got invalid operation notification", e);
return;
}

handleOperationEvent(operationNotification, messageIdBytes, runStore);
}

private void handleOperationEvent(
Expand All @@ -236,21 +231,28 @@ private void handleOperationEvent(
runId, runDetail.getRun().getStatus(), notification.getStatus());
return;
}

byte[] existingSourceId = runDetail.getSourceId();
if (existingSourceId != null && Bytes.compareTo(messageIdBytes, existingSourceId) < 0) {
LOG.debug(
"Notification source id '{}' is not larger than the existing source id '{}' in the existing "
+ "operation run detail.",
Bytes.toHexString(runDetail.getSourceId()), Bytes.toHexString(existingSourceId));
return;
}
} catch (OperationRunNotFoundException e) {
LOG.debug(String.format("Ignoring message for non existent operation %s", runId));
return;
}

switch (notification.getStatus()) {
case PENDING:
statePublisher.publishPending(runId);
break;
case STARTING:
String oldUser = SecurityRequestContext.getUserId();
try {
if (runDetail.getPrincipal() != null) {
SecurityRequestContext.setUserId(runDetail.getPrincipal());
}
lifecycleService.startOperation(runId);
lifecycleService.startOperation(runDetail);
} catch (Exception e) {
LOG.error("Failed to start operation {}", runDetail, e);
statePublisher.publishFailed(runId,
Expand All @@ -268,7 +270,11 @@ private void handleOperationEvent(
}
break;
case STOPPING:
lifecycleService.stopOperation(runDetail);
try {
lifecycleService.stopOperation(runDetail);
} catch (IllegalStateException e) {
LOG.warn("Failed to stop operation {}", runDetail.getRunId(), e);
}
runStore.updateOperationStatus(runId, OperationRunStatus.STOPPING, messageIdBytes);
break;
case KILLED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,5 @@ public interface OperationStatePublisher {

void publishStopping(OperationRunId runId);

void publishPending(OperationRunId runId);
void publishStarting(OperationRunId runId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperation;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperationFactory;
import io.cdap.cdap.internal.operation.InMemoryOperationRunner;
import io.cdap.cdap.internal.operation.InMemoryOperationRuntime;
import io.cdap.cdap.internal.operation.LongRunningOperation;
import io.cdap.cdap.internal.operation.MessagingOperationStatePublisher;
import io.cdap.cdap.internal.operation.OperationRunner;
import io.cdap.cdap.internal.operation.OperationRuntime;
import io.cdap.cdap.internal.operation.OperationStatePublisher;
import io.cdap.cdap.sourcecontrol.ApplicationManager;

Expand All @@ -43,6 +45,7 @@ protected void configure() {
bind(ApplicationManager.class).to(LocalApplicationManager.class);
bind(OperationRunner.class).to(InMemoryOperationRunner.class);
bind(OperationStatePublisher.class).to(MessagingOperationStatePublisher.class);
bind(OperationRuntime.class).to(InMemoryOperationRuntime.class);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,18 @@ public void testProcessMessages() throws Exception {
);

List<OperationRunDetail> details = insertTestRuns(transactionRunner).stream()
.filter(d -> d.getRun().getStatus().equals(OperationRunStatus.PENDING)).collect(
.filter(d -> d.getRun().getStatus().equals(OperationRunStatus.STARTING)).collect(
Collectors.toList());

// happy path
Notification notification1 = new Notification(Notification.Type.OPERATION_STATUS,
ImmutableMap.<String, String>builder().put(Operation.RUN_ID_NOTIFICATION_KEY,
GSON.toJson(details.get(0).getRunId()))
ImmutableMap.<String, String>builder()
.put(Operation.RUN_ID_NOTIFICATION_KEY, details.get(0).getRunId().toString())
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.STARTING.name()).build());

Notification notification2 = new Notification(Notification.Type.OPERATION_STATUS,
ImmutableMap.<String, String>builder().put(Operation.RUN_ID_NOTIFICATION_KEY,
GSON.toJson(details.get(1).getRunId()))
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.PENDING.name()).build());
ImmutableMap.<String, String>builder()
.put(Operation.RUN_ID_NOTIFICATION_KEY, details.get(1).getRunId().toString())
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.STARTING.name()).build());

TransactionRunners.run(transactionRunner, (context) -> {
subscriberService.processMessages(
Expand All @@ -143,36 +142,6 @@ public void testProcessMessages() throws Exception {
}, Exception.class);

Mockito.verify(mockRuntime, Mockito.times(1)).run(Mockito.any());

// if a message fail then no post-process is running
Notification notification3 = new Notification(Notification.Type.OPERATION_STATUS,
ImmutableMap.<String, String>builder().put(Operation.RUN_ID_NOTIFICATION_KEY,
GSON.toJson(details.get(2).getRunId()))
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.STARTING.name()).build());

Notification notification4 = new Notification(Notification.Type.OPERATION_STATUS,
ImmutableMap.<String, String>builder().put(Operation.RUN_ID_NOTIFICATION_KEY,
GSON.toJson(details.get(3).getRunId()))
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.PENDING.name()).build());

// fail second state publish
Mockito.doThrow(new RuntimeException("")).when(mockStatePublisher)
.publishPending(Mockito.any());

try {
TransactionRunners.run(transactionRunner, (context) -> {
subscriberService.processMessages(
context,
ImmutableList.of(ImmutablePair.of("3", notification3),
ImmutablePair.of("4", notification4)).iterator()
);
}, Exception.class);
Assert.fail();
} catch (Exception e) {
// expected
}
// no more run method is called
Mockito.verify(mockRuntime, Mockito.times(1)).run(Mockito.any());
}

@Test
Expand Down Expand Up @@ -229,7 +198,7 @@ public void testProcessNotificationInvalidTransition() throws Exception {
Notification notification = new Notification(Notification.Type.OPERATION_STATUS,
ImmutableMap.<String, String>builder()
.put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(detail.getRunId()))
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.PENDING.name()).build());
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.STARTING.name()).build());

TransactionRunners.run(transactionRunner, context -> {
subscriberService.processNotification("1".getBytes(), notification, context);
Expand Down Expand Up @@ -257,17 +226,7 @@ public void testProcessNotification() throws Exception {

// process PENDING
OperationRunDetail detail = insertRun(testNamespace, OperationType.PULL_APPS,
OperationRunStatus.PENDING, transactionRunner);

TransactionRunners.run(transactionRunner, context -> {
Notification notification = new Notification(Notification.Type.OPERATION_STATUS,
ImmutableMap.<String, String>builder()
.put(Operation.RUN_ID_NOTIFICATION_KEY, detail.getRunId().toString())
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.PENDING.name())
.build());
subscriberService.processNotification("1".getBytes(), notification, context);
});
Mockito.verify(mockStatePublisher, Mockito.times(1)).publishPending(detail.getRunId());
OperationRunStatus.STARTING, transactionRunner);

// process STARTING
TransactionRunners.run(transactionRunner, context -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,11 @@ public void testScanOperation() throws Exception {
public void testScanOperationByStatus() throws Exception {
TransactionRunners.run(transactionRunner, context -> {
Set<OperationRunDetail> expectedRuns = insertTestRuns(transactionRunner).stream().filter(
d -> d.getRun().getStatus().equals(OperationRunStatus.PENDING)
d -> d.getRun().getStatus().equals(OperationRunStatus.STARTING)
).collect(Collectors.toSet());
Set<OperationRunDetail> gotRuns = new HashSet<>();
OperationRunStore store = new OperationRunStore(context);
store.scanOperationByStatus(OperationRunStatus.PENDING, gotRuns::add);
store.scanOperationByStatus(OperationRunStatus.STARTING, gotRuns::add);
Assert.assertTrue(expectedRuns.containsAll(gotRuns));
}, InvalidFieldException.class, IOException.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ protected static List<OperationRunDetail> insertTestRuns(TransactionRunner trans
insertRun(
testNamespace,
OperationType.PUSH_APPS,
OperationRunStatus.PENDING,
OperationRunStatus.STARTING,
transactionRunner));
details.add(
insertRun(
Expand All @@ -118,7 +118,7 @@ protected static List<OperationRunDetail> insertTestRuns(TransactionRunner trans
insertRun(
testNamespace,
OperationType.PULL_APPS,
OperationRunStatus.PENDING,
OperationRunStatus.STARTING,
transactionRunner));
details.add(
insertRun(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
* Status of operation run.
*/
public enum OperationRunStatus {
PENDING,
STARTING,
RUNNING,
STOPPING,
Expand All @@ -48,8 +47,6 @@ public boolean canTransitionTo(OperationRunStatus status) {
return true;
}
switch (this) {
case PENDING:
return status == STARTING || status == STOPPING;
case STARTING:
// RUNNING is the happy path
// STOPPING happens if the run was manually stopped gracefully(may include a timeout)
Expand Down

0 comments on commit 0b8a73e

Please sign in to comment.