Skip to content

Commit

Permalink
Add startup logic to operation runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
samdgupi committed Nov 14, 2023
1 parent f3affc2 commit d45b12a
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ protected void startUp() throws Exception {
transactionRunner, context -> {
Retries.runWithRetries(
() -> {
processPendingOperations(context);
processStartingOperations(context);
processRunningOperations(context);
processStoppingOperations(context);
},
Expand Down Expand Up @@ -163,12 +163,12 @@ private void remove(OperationRunId runId) {
}

// Sends PENDING notification for all PENDING operations
protected void processPendingOperations(StructuredTableContext context) throws Exception {
protected void processStartingOperations(StructuredTableContext context) throws Exception {
new OperationRunStore(context).scanOperationByStatus(
OperationRunStatus.PENDING,
OperationRunStatus.STARTING,
(runDetail) -> {
LOG.debug("Retrying to start operation {}.", runDetail.getRunId());
statePublisher.publishPending(runDetail.getRunId());
statePublisher.publishStarting(runDetail.getRunId());
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public void publishStopping(OperationRunId runId) {
}

@Override
public void publishPending(OperationRunId runId) {
public void publishStarting(OperationRunId runId) {
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.<String, String>builder()
.put(Operation.RUN_ID_NOTIFICATION_KEY, runId.toString())
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.STARTING.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operation.OperationRunStatus;
import io.cdap.cdap.spi.data.InvalidFieldException;
import io.cdap.cdap.spi.data.StructuredTableContext;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
Expand All @@ -33,10 +30,10 @@
public class OperationLifecycleService extends AbstractIdleService {

private final TransactionRunner transactionRunner;
private final InMemoryOperationRuntime runtime;
private final OperationRuntime runtime;

@Inject
OperationLifecycleService(TransactionRunner transactionRunner, InMemoryOperationRuntime runtime) {
OperationLifecycleService(TransactionRunner transactionRunner, OperationRuntime runtime) {
this.transactionRunner = transactionRunner;
this.runtime = runtime;
}
Expand Down Expand Up @@ -76,66 +73,41 @@ public boolean scanOperations(ScanOperationRunsRequest request, int txBatchSize,
return currentLimit == 0;
}


/**
* Scan all pending operations. Needed for try running all pending operation during startup.
* Runs a given operation. It is the responsibility of the caller to validate state transition.
*
* @param consumer {@link Consumer} to process each scanned run
* @param detail {@link OperationRunDetail} of the operation
*/
public void scanPendingOperations(Consumer<OperationRunDetail> consumer)
throws IOException, InvalidFieldException {
TransactionRunners.run(transactionRunner, context -> {
getOperationRunStore(context).scanOperationByStatus(OperationRunStatus.PENDING, consumer);
}, IOException.class, InvalidFieldException.class);
}

/**
* Runs a given operation.
*
* @param runId of the operation
* @throws OperationRunNotFoundException if the operation runid is not found
*/
public void startOperation(OperationRunId runId)
throws OperationRunNotFoundException, IOException {
TransactionRunners.run(
transactionRunner,
context -> {
OperationRunDetail detail = getOperationRunStore(context).getOperation(runId);
OperationRunStatus status = detail.getRun().getStatus();
if (!status.canTransitionTo(OperationRunStatus.RUNNING)) {
throw new IllegalStateException(
String.format("Cannot start operation %s with status %s", runId, status));
}
runtime.run(detail);
}, OperationRunNotFoundException.class, IOException.class
);
public OperationController startOperation(OperationRunDetail detail) {
return runtime.run(detail);
}


/**
* Initiate operation stop. It is the responsibility of the caller to validate state transition.
*
* @param detail runDetail of the operation
* @throws OperationRunNotFoundException if the operation run is not present
* @throws IllegalStateException if the operation cannot be stopped due to invalid state
* transition
*/
public void stopOperation(OperationRunDetail detail) {
OperationController controller = runtime.getController(detail);
if (controller == null) {
throw new IllegalStateException("Operation is not running");
}
controller.stop();
}


private OperationRunStore getOperationRunStore(StructuredTableContext context) {
return new OperationRunStore(context);
}

@Override
protected void startUp() throws Exception {
runtime.startAndWait();
runtime.startAndWait();
}

@Override
protected void shutDown() throws Exception {
runtime.stopAndWait();
runtime.stopAndWait();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.gson.GsonBuilder;
import com.google.gson.JsonSyntaxException;
import com.google.inject.Inject;
import io.cdap.cdap.api.common.Bytes;
import io.cdap.cdap.api.metrics.MetricsCollectionService;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
Expand Down Expand Up @@ -195,8 +196,6 @@ protected void processMessages(
* @param messageIdBytes the raw message id in the TMS for the notification
* @param notification the {@link Notification} to process
* @param context context to get the table for operations
* @return a {@link List} of {@link Runnable} tasks to run after the transactional processing of
* the whole messages batch is completed
* @throws Exception if failed to process the given notification
*/
@VisibleForTesting
Expand All @@ -206,17 +205,13 @@ void processNotification(
StructuredTableContext context)
throws Exception {
OperationRunStore runStore = new OperationRunStore(context);
OperationNotification operationNotification;

try {
operationNotification = OperationNotification.fromNotification(
OperationNotification operationNotification = OperationNotification.fromNotification(
notification);
handleOperationEvent(operationNotification, messageIdBytes, runStore);
} catch (IllegalArgumentException | JsonSyntaxException e) {
LOG.warn("Got invalid operation notification", e);
return;
}

handleOperationEvent(operationNotification, messageIdBytes, runStore);
}

private void handleOperationEvent(
Expand All @@ -236,21 +231,28 @@ private void handleOperationEvent(
runId, runDetail.getRun().getStatus(), notification.getStatus());
return;
}

byte[] existingSourceId = runDetail.getSourceId();
if (existingSourceId != null && Bytes.compareTo(messageIdBytes, existingSourceId) < 0) {
LOG.debug(
"Notification source id '{}' is not larger than the existing source id '{}' in the existing "
+ "operation run detail.",
Bytes.toHexString(runDetail.getSourceId()), Bytes.toHexString(existingSourceId));
return;
}
} catch (OperationRunNotFoundException e) {
LOG.debug(String.format("Ignoring message for non existent operation %s", runId));
return;
}

switch (notification.getStatus()) {
case PENDING:
statePublisher.publishPending(runId);
break;
case STARTING:
String oldUser = SecurityRequestContext.getUserId();
try {
if (runDetail.getPrincipal() != null) {
SecurityRequestContext.setUserId(runDetail.getPrincipal());
}
lifecycleService.startOperation(runId);
lifecycleService.startOperation(runDetail);
} catch (Exception e) {
LOG.error("Failed to start operation {}", runDetail, e);
statePublisher.publishFailed(runId,
Expand All @@ -268,7 +270,11 @@ private void handleOperationEvent(
}
break;
case STOPPING:
lifecycleService.stopOperation(runDetail);
try {
lifecycleService.stopOperation(runDetail);
} catch (IllegalStateException e) {
LOG.warn("Failed to stop operation {}", runDetail.getRunId(), e);
}
runStore.updateOperationStatus(runId, OperationRunStatus.STOPPING, messageIdBytes);
break;
case KILLED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,5 @@ public interface OperationStatePublisher {

void publishStopping(OperationRunId runId);

void publishPending(OperationRunId runId);
void publishStarting(OperationRunId runId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperation;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperationFactory;
import io.cdap.cdap.internal.operation.InMemoryOperationRunner;
import io.cdap.cdap.internal.operation.InMemoryOperationRuntime;
import io.cdap.cdap.internal.operation.LongRunningOperation;
import io.cdap.cdap.internal.operation.MessagingOperationStatePublisher;
import io.cdap.cdap.internal.operation.OperationRunner;
import io.cdap.cdap.internal.operation.OperationRuntime;
import io.cdap.cdap.internal.operation.OperationStatePublisher;
import io.cdap.cdap.sourcecontrol.ApplicationManager;

Expand All @@ -43,6 +45,7 @@ protected void configure() {
bind(ApplicationManager.class).to(LocalApplicationManager.class);
bind(OperationRunner.class).to(InMemoryOperationRunner.class);
bind(OperationStatePublisher.class).to(MessagingOperationStatePublisher.class);
bind(OperationRuntime.class).to(InMemoryOperationRuntime.class);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class InMemoryOperationRunnerTest {
private static final OperationRunId runId = new OperationRunId("namespace", "run");
private static final OperationRun run = OperationRun.builder()
.setRunId(runId.getRun())
.setStatus(OperationRunStatus.PENDING)
.setStatus(OperationRunStatus.STARTING)
.setType(OperationType.PULL_APPS)
.setMetadata(
OperationMeta.builder().setCreateTime(Instant.now()).build())
Expand Down

This file was deleted.

Loading

0 comments on commit d45b12a

Please sign in to comment.