Skip to content

Commit

Permalink
CDAP-20809-add-operation-interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
samdgupi committed Oct 9, 2023
1 parent 60d338b commit 795d7da
Show file tree
Hide file tree
Showing 30 changed files with 978 additions and 1,119 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.operation;

import avro.shaded.com.google.common.collect.ImmutableMap;
import io.cdap.cdap.common.operation.LongRunningOperation;
import io.cdap.cdap.common.operation.LongRunningOperationRequest;
import io.cdap.cdap.proto.operationrun.OperationType;
import java.util.Map;

/**
* Abstract runner implementation with common functionality like class loading.
*
* @param <T> The type of the operation request
*/
public abstract class AbstractOperationRunner<T> implements OperationRunner<T> {

private final ImmutableMap<OperationType, LongRunningOperation> operations;

protected AbstractOperationRunner(Map<OperationType, LongRunningOperation> operations) {
this.operations = ImmutableMap.copyOf(operations);
}


/**
* Converts an operation type to an operation class.
*
* @param request {@link LongRunningOperationRequest} for the operation
* @return {@link LongRunningOperation} for the operation type in request
* @throws OperationTypeNotSupportedException when the type is not mapped.
*/
public LongRunningOperation<T> getOperation(LongRunningOperationRequest<T> request)
throws OperationTypeNotSupportedException, InvalidRequestTypeException {
LongRunningOperation operation = operations.get(request.getOperationType());
if (operation == null) {
throw new OperationTypeNotSupportedException(request.getOperationType());
}
if (request.getOperationRequest().getClass()
.isAssignableFrom(operation.getRequestType().getClass())) {
throw new InvalidRequestTypeException(request.getOperationType(),
request.getOperationRequest().getClass().getName());
}
return (LongRunningOperation<T>) operation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.operation;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import io.cdap.cdap.common.operation.LongRunningOperation;
import io.cdap.cdap.common.operation.LongRunningOperationRequest;
import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operationrun.OperationError;
import io.cdap.cdap.proto.operationrun.OperationMeta;
import io.cdap.cdap.proto.operationrun.OperationType;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implementation of {@link OperationRunner} to run an operation in the same service.
*/
public class InMemoryOperationRunner<T> extends AbstractOperationRunner<T> implements
AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(InMemoryOperationRunner.class);

private final OperationStatePublisher statePublisher;
private final Lock lock;
private final ExecutorService executor;
private LongRunningOperation<T> currentOperation;

/**
* Default constructor.
*/
@Inject
public InMemoryOperationRunner(Map<OperationType, LongRunningOperation> operations,
OperationStatePublisher statePublisher) {
super(operations);
this.statePublisher = statePublisher;
this.lock = new ReentrantLock();
this.executor = Executors.newSingleThreadExecutor(
Threads.createDaemonThreadFactory("operation-runner-%d")
);
}

// TODO(samik) It is currently rudimentary implementation to unblock the state management
// Handle states, publish failures and heartbeat messages
@Override
public void run(LongRunningOperationRequest<T> request)
throws OperationTypeNotSupportedException, InvalidRequestTypeException {
lock.lock();
try {
if (currentOperation != null) {
throw new IllegalStateException("Operation already running");
}
currentOperation = getOperation(request);
executor.execute(() -> {
runInternal(request);
});
statePublisher.publishRunning();
} finally {
lock.unlock();
}
}

private void runInternal(LongRunningOperationRequest<T> request) {
try {
ListenableFuture<OperationError> operationTask = currentOperation.run(request.getOperationRequest(), this::sendMetaUpdateMessage);
Futures.addCallback(
operationTask,
new FutureCallback<OperationError>() {
@Override
public void onSuccess(@Nullable OperationError result) {
if (result != null) {
statePublisher.publishFailed(result);
} else {
statePublisher.publishSuccess();
}
}

@Override
public void onFailure(Throwable t) {
statePublisher.publishFailed(getOperationErrorFromThrowable(request, t));
}
}
);
} catch (Exception e) {
statePublisher.publishFailed(getOperationErrorFromThrowable(request, e));
}
}

// TODO(samik): Implement graceful shutdown
@Override
public void stop(long timeout, TimeUnit timeoutUnit) throws Exception {
close();
}

private void sendMetaUpdateMessage(OperationMeta meta) {
statePublisher.publishMetaUpdate(meta);
}

private OperationError getOperationErrorFromThrowable(LongRunningOperationRequest<T> request,
Throwable t) {
OperationRunId runId = request.getRunId();
LOG.error("Operation {} of namespace {} failed: {}", runId.getRun(), runId.getParent(),
t.getMessage());
return new OperationError(t.getMessage(), Collections.emptyList());
}

@Override
public void close() throws Exception {
lock.lock();
try {
if (currentOperation == null) {
throw new IllegalStateException("No operation is running");
}
executor.shutdownNow();
statePublisher.publishStopped();
} finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.operation;

import io.cdap.cdap.proto.operationrun.OperationType;

/**
* Thrown when the operation request in
* {@link LongRunningOperationRequest} is not valid for requested
* operation type.
*/
public class InvalidRequestTypeException extends Exception {

public InvalidRequestTypeException(OperationType type, String requestClass) {
super(String.format("Request class %s is not applicable operation type %s", requestClass,
type.toString()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.operation;

import io.cdap.cdap.messaging.MessagingService;
import io.cdap.cdap.proto.operationrun.OperationError;
import io.cdap.cdap.proto.operationrun.OperationMeta;

/**
* Provides capabilities to send operation lifecycle specific messages.
*/
public class MessagingOperationStatePublisher implements OperationStatePublisher {

private final MessagingService messagingService;


public MessagingOperationStatePublisher(MessagingService messagingService) {
this.messagingService = messagingService;
}

public void publishMetaUpdate(OperationMeta meta) {
// TODO(samik) implement message publish logic
}

public void publishRunning() {
// TODO(samik) implement message publish logic
}

public void publishFailed(OperationError error) {
// TODO(samik) implement message publish logic
}

public void publishSuccess() {
// TODO(samik) implement message publish logic
}

@Override
public void publishStopped() {
// TODO(samik) implement message publish logic
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.operation;

import com.google.common.util.concurrent.ListenableFuture;
import io.cdap.cdap.common.operation.LongRunningOperationRequest;
import java.util.concurrent.TimeUnit;

/**
* Interface representing runner for LRO. A runner will run only one operation at a time.
*/
public interface OperationRunner<T> {

/**
* Run an operation in asynchronous mode.
*
* @param request Request for the operation run
*/
void run(LongRunningOperationRequest<T> request)
throws OperationTypeNotSupportedException, InvalidRequestTypeException;

/**
* Attempt to stop the operation gracefully within the given timeout. Depending on the
* implementation, if the timeout reached, the running operation will be force killed.
*
* @param timeout the maximum time that it allows the operation to stop gracefully
* @param timeoutUnit the {@link TimeUnit} for the {@code timeout}
* @return A {@link ListenableFuture} that will be completed when the operation is actually stopped.
**/
void stop(long timeout, TimeUnit timeoutUnit) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright © 2023 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.internal.app.operation;

import io.cdap.cdap.proto.operationrun.OperationError;
import io.cdap.cdap.proto.operationrun.OperationMeta;

/**
* Publishes operation state messages.
*/
public interface OperationStatePublisher {

/**
* Publishes message with the current metadata. The operation status should be RUNNING
*
* @param meta Current metadata for the operation.
*/
void publishMetaUpdate(OperationMeta meta);

/**
* Publishes the current operation status as RUNNING.
*/
void publishRunning();

/**
* Publishes the current operation status as FAILED.
*/
void publishFailed(OperationError error);

/**
* Publishes the current operation status as SUCCEEDED.
*/
void publishSuccess();

/**
* Publishes the current operation status as STOPPED.
*/
void publishStopped();
}
Loading

0 comments on commit 795d7da

Please sign in to comment.