Skip to content

Commit

Permalink
Merge pull request #15421 from cdapio/operation-notification-subscriber
Browse files Browse the repository at this point in the history
Add Operation notification consumer
  • Loading branch information
samdgupi authored Nov 15, 2023
2 parents d068939 + 8e74aa4 commit 815f131
Show file tree
Hide file tree
Showing 19 changed files with 1,001 additions and 47 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.operation;


import com.google.inject.Inject;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operation.OperationError;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* {@link OperationRuntime} implementation when operations are running inside same service.
*/
public class InMemoryOperationRuntime implements OperationRuntime {

private final CConfiguration cConf;
private final ConcurrentHashMap<OperationRunId, OperationController> controllers;
private final OperationRunner runner;
private final OperationStatePublisher statePublisher;
private final TransactionRunner transactionRunner;

private static final Logger LOG = LoggerFactory.getLogger(InMemoryOperationRuntime.class);

@Inject
InMemoryOperationRuntime(CConfiguration cConf, OperationRunner runner,
OperationStatePublisher statePublisher,
TransactionRunner transactionRunner) {
this.cConf = cConf;
this.runner = runner;
this.statePublisher = statePublisher;
this.transactionRunner = transactionRunner;
this.controllers = new ConcurrentHashMap<>();
}

/**
* Runs an operation given the detail.
*
* @param runDetail detail of the run
* @return {@link OperationController} for the run
*/
public OperationController run(OperationRunDetail runDetail) {
return controllers.computeIfAbsent(
runDetail.getRunId(),
runId -> {
try {
OperationController controller = runner.run(runDetail);
LOG.debug("Added controller for {}", runId);
controller.complete().addListener(() -> remove(runId), Threads.SAME_THREAD_EXECUTOR);
return controller;
} catch (IllegalStateException e) {
statePublisher.publishFailed(runDetail.getRunId(),
new OperationError(e.getMessage(), Collections.emptyList())
);
}
return null;
}
);
}

/**
* Get controller for a given {@link OperationRunId}.
*/
@Nullable
public OperationController getController(OperationRunDetail detail) {
return controllers.computeIfAbsent(detail.getRunId(), runId -> {
// TODO(samik) create controller from detail after 6.10 release
return null;
});
}

private void remove(OperationRunId runId) {
OperationController controller = controllers.remove(runId);
if (controller != null) {
LOG.debug("Controller removed for {}", runId);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public MessagingOperationStatePublisher(MessagingService messagingService,
@Override
public void publishResources(OperationRunId runId, Set<OperationResource> resources) {
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.<String, String>builder()
.put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId))
.put(Operation.RUN_ID_NOTIFICATION_KEY, runId.toString())
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.RUNNING.name())
.put(Operation.RESOURCES_NOTIFICATION_KEY, GSON.toJson(resources));

Expand All @@ -114,15 +114,15 @@ public void publishResources(OperationRunId runId, Set<OperationResource> resour
@Override
public void publishRunning(OperationRunId runId) {
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.<String, String>builder()
.put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId))
.put(Operation.RUN_ID_NOTIFICATION_KEY, runId.toString())
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.RUNNING.name());
publish(runId, propertiesBuilder.build());
}

@Override
public void publishFailed(OperationRunId runId, OperationError error) {
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.<String, String>builder()
.put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId))
.put(Operation.RUN_ID_NOTIFICATION_KEY, runId.toString())
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.FAILED.name())
.put(Operation.ERROR_NOTIFICATION_KEY, GSON.toJson(error))
.put(Operation.ENDTIME_NOTIFICATION_KEY, Instant.now().toString());
Expand All @@ -132,7 +132,7 @@ public void publishFailed(OperationRunId runId, OperationError error) {
@Override
public void publishSuccess(OperationRunId runId) {
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.<String, String>builder()
.put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId))
.put(Operation.RUN_ID_NOTIFICATION_KEY, runId.toString())
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.SUCCEEDED.name())
.put(Operation.ENDTIME_NOTIFICATION_KEY, Instant.now().toString());
publish(runId, propertiesBuilder.build());
Expand All @@ -141,7 +141,7 @@ public void publishSuccess(OperationRunId runId) {
@Override
public void publishKilled(OperationRunId runId) {
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.<String, String>builder()
.put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId))
.put(Operation.RUN_ID_NOTIFICATION_KEY, runId.toString())
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.KILLED.name())
.put(Operation.ENDTIME_NOTIFICATION_KEY, Instant.now().toString());
publish(runId, propertiesBuilder.build());
Expand All @@ -150,15 +150,15 @@ public void publishKilled(OperationRunId runId) {
@Override
public void publishStopping(OperationRunId runId) {
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.<String, String>builder()
.put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId))
.put(Operation.RUN_ID_NOTIFICATION_KEY, runId.toString())
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.STOPPING.name());
publish(runId, propertiesBuilder.build());
}

@Override
public void publishStarting(OperationRunId runId) {
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.<String, String>builder()
.put(Operation.RUN_ID_NOTIFICATION_KEY, GSON.toJson(runId))
.put(Operation.RUN_ID_NOTIFICATION_KEY, runId.toString())
.put(Operation.STATUS_NOTIFICATION_KEY, OperationRunStatus.STARTING.name());
publish(runId, propertiesBuilder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,30 @@
package io.cdap.cdap.internal.operation;

import com.google.inject.Inject;
import io.cdap.cdap.proto.operation.OperationRunStatus;
import io.cdap.cdap.spi.data.InvalidFieldException;
import io.cdap.cdap.proto.operation.OperationError;
import io.cdap.cdap.spi.data.StructuredTableContext;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import java.io.IOException;
import java.util.Collections;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Service that manages lifecycle of Operation.
*/
public class OperationLifecycleManager {

private final TransactionRunner transactionRunner;
private final OperationRuntime runtime;
private static final Logger LOG = LoggerFactory.getLogger(OperationLifecycleManager.class);


@Inject
OperationLifecycleManager(TransactionRunner transactionRunner) {
OperationLifecycleManager(TransactionRunner transactionRunner, OperationRuntime runtime) {
this.transactionRunner = transactionRunner;
this.runtime = runtime;
}

/**
Expand Down Expand Up @@ -72,19 +78,49 @@ public boolean scanOperations(ScanOperationRunsRequest request, int txBatchSize,
return currentLimit == 0;
}

/**
* Runs a given operation. It is the responsibility of the caller to validate state transition.
*
* @param detail {@link OperationRunDetail} of the operation
*/
public OperationController startOperation(OperationRunDetail detail) {
return runtime.run(detail);
}


/**
* Scan all pending operations. Needed for try running all pending operation during startup.
* Initiate operation stop. It is the responsibility of the caller to validate state transition.
*
* @param consumer {@link Consumer} to process each scanned run
* @param detail {@link OperationRunDetail} of the operation
* @throws IllegalStateException in case the operation is not running. It is the
* responsibility of the caller to handle the same
*/
public void scanPendingOperations(Consumer<OperationRunDetail> consumer)
throws IOException, InvalidFieldException {
TransactionRunners.run(transactionRunner, context -> {
getOperationRunStore(context).scanOperationByStatus(OperationRunStatus.PENDING, consumer);
}, IOException.class, InvalidFieldException.class);
public void stopOperation(OperationRunDetail detail) {
OperationController controller = runtime.getController(detail);
if (controller == null) {
throw new IllegalStateException("Operation is not running");
}
controller.stop();
}

/**
* Checks if the operation is running. If not sends a failure notification
* Called after service restart.
*
* @param detail {@link OperationRunDetail} of the operation
*/
public void isRunning(OperationRunDetail detail, OperationStatePublisher statePublisher) {
// If there is no active controller that means the operation is not running and should be failed
if (runtime.getController(detail) == null) {
LOG.debug("No running operation for {}, sending failure notification", detail.getRunId());
statePublisher.publishFailed(detail.getRunId(),
new OperationError("Failed after service restart as operation is not running",
Collections.emptyList()));
return;
}
}


private OperationRunStore getOperationRunStore(StructuredTableContext context) {
return new OperationRunStore(context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class OperationNotification {

private final OperationRunId runId;
private final OperationRunStatus status;

@Nullable
private final Set<OperationResource> resources;
@Nullable
Expand All @@ -51,8 +52,10 @@ public class OperationNotification {
/**
* Default constructor.
*/
public OperationNotification(OperationRunId runId, OperationRunStatus status,
@Nullable Set<OperationResource> resources, Instant endTime, @Nullable OperationError error) {
OperationNotification(OperationRunId runId, OperationRunStatus status,
@Nullable Set<OperationResource> resources, Instant endTime,
@Nullable OperationError error
) {
this.runId = runId;
this.status = status;
this.resources = resources;
Expand All @@ -68,15 +71,34 @@ public OperationNotification(OperationRunId runId, OperationRunStatus status,
public static OperationNotification fromNotification(Notification notification) {
Map<String, String> properties = notification.getProperties();

OperationRunId runId = GSON.fromJson(properties.get(Operation.RUN_ID_NOTIFICATION_KEY),
OperationRunId.class);
if (!properties.containsKey(Operation.RUN_ID_NOTIFICATION_KEY)) {
throw new IllegalArgumentException("Notification missing operation run id");
}

if (!properties.containsKey(Operation.STATUS_NOTIFICATION_KEY)) {
throw new IllegalArgumentException("Notification missing operation status");
}

OperationError error = null;
if (properties.containsKey(Operation.ERROR_NOTIFICATION_KEY)) {
error = GSON.fromJson(properties.get(Operation.ERROR_NOTIFICATION_KEY),
OperationError.class);
}

Set<OperationResource> resources = null;
if (properties.containsKey(Operation.RESOURCES_NOTIFICATION_KEY)) {
resources = GSON.fromJson(
properties.get(Operation.RESOURCES_NOTIFICATION_KEY), resourcesType);
}

Instant endTime = null;
if (properties.containsKey(Operation.ENDTIME_NOTIFICATION_KEY)) {
endTime = Instant.parse(properties.get(Operation.ENDTIME_NOTIFICATION_KEY));
}

OperationRunId runId = OperationRunId.fromString(properties.get(Operation.RUN_ID_NOTIFICATION_KEY));
OperationRunStatus status = OperationRunStatus.valueOf(
properties.get(Operation.STATUS_NOTIFICATION_KEY));
OperationError error = GSON.fromJson(properties.get(Operation.ERROR_NOTIFICATION_KEY),
OperationError.class);
Set<OperationResource> resources = GSON.fromJson(
properties.get(Operation.RESOURCES_NOTIFICATION_KEY), resourcesType);
Instant endTime = Instant.parse(properties.get(Operation.ENDTIME_NOTIFICATION_KEY));

return new OperationNotification(runId, status, resources, endTime, error);
}
Expand Down
Loading

0 comments on commit 815f131

Please sign in to comment.