diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java index 4dfdaf0f9873..a6f666e95c17 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/app/guice/AppFabricServiceRuntimeModule.java @@ -136,6 +136,7 @@ import io.cdap.cdap.internal.events.StartProgramEventSubscriber; import io.cdap.cdap.internal.namespace.credential.handler.GcpWorkloadIdentityHttpHandler; import io.cdap.cdap.internal.namespace.credential.handler.GcpWorkloadIdentityHttpHandlerInternal; +import io.cdap.cdap.internal.operation.guice.OperationModule; import io.cdap.cdap.internal.pipeline.SynchronousPipelineFactory; import io.cdap.cdap.internal.profile.ProfileService; import io.cdap.cdap.internal.provision.ProvisionerModule; @@ -198,6 +199,7 @@ public Module getInMemoryModules() { new SourceControlModule(), new EntityVerifierModule(), new MasterCredentialProviderModule(), + new OperationModule(), BootstrapModules.getInMemoryModule(), new AbstractModule() { @Override @@ -240,6 +242,7 @@ public Module getStandaloneModules() { new EntityVerifierModule(), new ProvisionerModule(), new MasterCredentialProviderModule(), + new OperationModule(), BootstrapModules.getFileBasedModule(), new AbstractModule() { @Override @@ -294,6 +297,7 @@ public Module getDistributedModules() { new EntityVerifierModule(), new ProvisionerModule(), new MasterCredentialProviderModule(), + new OperationModule(), BootstrapModules.getFileBasedModule(), new AbstractModule() { @Override diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/AbstractOperationRunner.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/AbstractOperationRunner.java new file mode 100644 index 000000000000..5970f4f432eb --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/AbstractOperationRunner.java @@ -0,0 +1,54 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operation; + +import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperationFactory; +import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest; + +/** + * Abstract runner implementation with common functionality. + */ +public abstract class AbstractOperationRunner implements OperationRunner { + + private final PullAppsOperationFactory pullOperationFactory; + + AbstractOperationRunner(PullAppsOperationFactory pullOperationFactory) { + this.pullOperationFactory = pullOperationFactory; + } + + /** + * Creates a {@link LongRunningOperation} given the {@link OperationRunDetail}. + * + * @param detail {@link OperationRunDetail} for the operation + * @return {@link LongRunningOperation} for the operation type in request + */ + protected LongRunningOperation createOperation(OperationRunDetail detail) + throws IllegalStateException { + switch (detail.getRun().getType()) { + case PULL_APPS: + PullAppsRequest request = detail.getPullAppsRequest(); + if (request == null) { + throw new IllegalStateException("Missing request for pull operation"); + } + return pullOperationFactory.create(request); + case PUSH_APPS: + default: + throw new IllegalStateException( + String.format("Invalid operation type %s", detail.getRun().getType())); + } + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/InMemoryOperationController.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/InMemoryOperationController.java new file mode 100644 index 000000000000..17973486d2a4 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/InMemoryOperationController.java @@ -0,0 +1,101 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operation; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.Service; +import com.google.common.util.concurrent.Service.State; +import com.google.common.util.concurrent.SettableFuture; +import io.cdap.cdap.proto.id.OperationRunId; +import io.cdap.cdap.proto.operation.OperationError; +import java.util.Collections; +import org.apache.twill.common.Threads; +import org.apache.twill.internal.ServiceListenerAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * In-Memory implementation of {@link OperationController}. + */ +public class InMemoryOperationController implements + OperationController { + + private final OperationRunId runId; + private final OperationStatePublisher statePublisher; + private final OperationDriver driver; + private final SettableFuture completionFuture = SettableFuture.create(); + private static final Logger LOG = LoggerFactory.getLogger(InMemoryOperationController.class); + + InMemoryOperationController(OperationRunId runId, OperationStatePublisher statePublisher, + OperationDriver driver) { + this.runId = runId; + this.driver = driver; + this.statePublisher = statePublisher; + startListen(driver); + } + + + @Override + public ListenableFuture stop() { + LOG.trace("Stopping operation {}", runId); + driver.stop(); + return completionFuture; + } + + @Override + public ListenableFuture complete() { + return completionFuture; + } + + private void startListen(Service service) { + service.addListener(new ServiceListenerAdapter() { + @Override + public void running() { + statePublisher.publishRunning(runId); + } + + @Override + public void terminated(Service.State from) { + if (from.equals(State.STOPPING)) { + statePublisher.publishKilled(runId); + } else { + statePublisher.publishSuccess(runId); + } + markComplete(); + } + + @Override + public void failed(Service.State from, Throwable failure) { + if (failure instanceof OperationException) { + statePublisher.publishFailed(runId, ((OperationException) failure).toOperationError()); + } else { + statePublisher.publishFailed(runId, getOperationErrorFromThrowable(failure)); + } + markComplete(); + } + }, Threads.SAME_THREAD_EXECUTOR); + } + + private void markComplete() { + completionFuture.set(this); + } + + private OperationError getOperationErrorFromThrowable(Throwable t) { + LOG.debug("Operation {} of namespace {} failed", runId.getRun(), runId.getParent(), t); + return new OperationError(t.getMessage(), Collections.emptyList()); + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/InMemoryOperationRunner.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/InMemoryOperationRunner.java new file mode 100644 index 000000000000..8b6c4697849f --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/InMemoryOperationRunner.java @@ -0,0 +1,50 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operation; + +import com.google.inject.Inject; +import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperationFactory; + +/** + * Implementation of {@link OperationRunner} to run an operation in the same service. + */ +public class InMemoryOperationRunner extends AbstractOperationRunner { + + private final OperationStatePublisher statePublisher; + + /** + * Default constructor. + * + * @param statePublisher Publishes the current operation state. + */ + @Inject + public InMemoryOperationRunner(OperationStatePublisher statePublisher, + PullAppsOperationFactory pullOperationFactory) { + super(pullOperationFactory); + this.statePublisher = statePublisher; + } + + @Override + public OperationController run(OperationRunDetail detail) throws IllegalStateException { + LongRunningOperationContext context = new LongRunningOperationContext(detail.getRunId(), statePublisher); + OperationDriver driver = new OperationDriver(createOperation(detail), context); + OperationController controller = new InMemoryOperationController(context.getRunId(), + statePublisher, driver); + driver.start(); + return controller; + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/LongRunningOperationContext.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/LongRunningOperationContext.java index 112a571ed694..4de07fdc38c7 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/LongRunningOperationContext.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/LongRunningOperationContext.java @@ -18,27 +18,36 @@ import io.cdap.cdap.proto.id.OperationRunId; import io.cdap.cdap.proto.operation.OperationResource; -import io.cdap.cdap.proto.operation.OperationType; import java.util.Set; /** * Provides the context for the current operation run. */ -public interface LongRunningOperationContext { +public class LongRunningOperationContext { + + private final OperationRunId runId; + private final OperationStatePublisher statePublisher; /** - * Get the {@link OperationRunId} for the current run. + * Default constructor. * - * @return the current runid + * @param runId id of the current operation + * @param type type of the current operation + * @param statePublisher to publish the operation metadata */ - OperationRunId getRunId(); + public LongRunningOperationContext(OperationRunId runId, OperationStatePublisher statePublisher) { + this.runId = runId; + this.statePublisher = statePublisher; + } /** - * Get the {@link OperationType} to be used by the runner for loading the right operation class. + * Get the {@link OperationRunId} for the current run. * - * @return the type of the current operation + * @return the current runid */ - OperationType getType(); + public OperationRunId getRunId() { + return runId; + } /** * Used by the {@link LongRunningOperation} to update the resources operated on in the @@ -46,8 +55,8 @@ public interface LongRunningOperationContext { * resources to be unique * * @param resources A set of resources to be updated. - * */ - // TODO Add exceptions based on implementations. - void updateOperationResources(Set resources); + public void updateOperationResources(Set resources) { + statePublisher.publishResources(runId, resources); + } } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/MessagingOperationStatePublisher.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/MessagingOperationStatePublisher.java new file mode 100644 index 000000000000..816a5e1a97bb --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/MessagingOperationStatePublisher.java @@ -0,0 +1,64 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operation; + +import com.google.inject.Inject; +import io.cdap.cdap.messaging.spi.MessagingService; +import io.cdap.cdap.proto.id.OperationRunId; +import io.cdap.cdap.proto.operation.OperationError; +import io.cdap.cdap.proto.operation.OperationResource; +import java.util.Set; + +/** + * Provides capabilities to send operation lifecycle specific messages. + */ +public class MessagingOperationStatePublisher implements OperationStatePublisher { + + private final MessagingService messagingService; + + @Inject + MessagingOperationStatePublisher(MessagingService messagingService) { + this.messagingService = messagingService; + } + + @Override + public void publishResources(OperationRunId runId, Set resources) { + // TODO(samik) implement message publish logic + } + + @Override + public void publishRunning(OperationRunId runId) { + // TODO(samik) implement message publish logic + } + + @Override + public void publishFailed(OperationRunId runId, OperationError error) { + // TODO(samik) implement message publish logic + + } + + @Override + public void publishSuccess(OperationRunId runId) { + // TODO(samik) implement message publish logic + + } + + @Override + public void publishKilled(OperationRunId runId) { + // TODO(samik) implement message publish logic + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/AbstractLongRunningOperationContext.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationController.java similarity index 54% rename from cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/AbstractLongRunningOperationContext.java rename to cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationController.java index de054860b75a..9522468d460d 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/AbstractLongRunningOperationContext.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationController.java @@ -16,22 +16,21 @@ package io.cdap.cdap.internal.operation; -import io.cdap.cdap.proto.id.OperationRunId; -import io.cdap.cdap.proto.operation.OperationType; +import com.google.common.util.concurrent.ListenableFuture; /** - * Abstract implementation of {@link LongRunningOperationContext} providing shared functionalities. + * Provides lifecycle hooks for managing the state of an operation. */ -public abstract class AbstractLongRunningOperationContext implements LongRunningOperationContext { +public interface OperationController { + + /** + * Attempt to stop the operation. + **/ + ListenableFuture stop(); - private final OperationRunId runId; - private final OperationType type; /** - * Default constructor. + * Returns a future which can be used to block till the operation is completed. */ - protected AbstractLongRunningOperationContext(OperationRunId runid, OperationType operationType) { - this.runId = runid; - this.type = operationType; - } + ListenableFuture complete(); } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationDriver.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationDriver.java new file mode 100644 index 000000000000..8e6e4504a19b --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationDriver.java @@ -0,0 +1,61 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operation; + +import com.google.common.util.concurrent.AbstractExecutionThreadService; +import io.cdap.cdap.proto.operation.OperationResource; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.twill.common.Threads; + +/** + * A Service for executing {@link LongRunningOperation}. + */ +class OperationDriver extends AbstractExecutionThreadService { + + private final LongRunningOperation operation; + private final LongRunningOperationContext context; + private ExecutorService executor; + + OperationDriver(LongRunningOperation operation, LongRunningOperationContext context) { + this.operation = operation; + this.context = context; + } + + @Override + protected void run() throws Exception { + Set resources = operation.run(context).get(); + context.updateOperationResources(resources); + } + + @Override + protected void triggerShutdown() { + if (executor != null) { + executor.shutdownNow(); + } + } + + @Override + protected Executor executor() { + executor = Executors.newSingleThreadExecutor( + Threads.createDaemonThreadFactory("operation-runner" + context.getRunId() + "-%d") + ); + return executor; + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunner.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunner.java new file mode 100644 index 000000000000..848d973325a1 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunner.java @@ -0,0 +1,31 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operation; + +/** + * Interface representing runner for LRO. + * A runner initiates the run and returns a {@link OperationController} for lifecycle management. + */ +public interface OperationRunner { + + /** + * Run an operation in asynchronous mode. + * + * @param runDetail {@link OperationRunDetail} for the operation to be run + */ + OperationController run(OperationRunDetail runDetail) throws IllegalStateException; +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationStatePublisher.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationStatePublisher.java new file mode 100644 index 000000000000..ba967d02e400 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationStatePublisher.java @@ -0,0 +1,56 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operation; + + +import io.cdap.cdap.proto.id.OperationRunId; +import io.cdap.cdap.proto.operation.OperationError; +import io.cdap.cdap.proto.operation.OperationResource; +import java.util.Set; + +/** + * Publishes operation state messages. + */ +public interface OperationStatePublisher { + + /** + * Publishes message with the current resources. The operation status should be RUNNING + * + * @param resources Current resources for the operation. + */ + void publishResources(OperationRunId runId, Set resources); + + /** + * Publishes the current operation status as RUNNING. + */ + void publishRunning(OperationRunId runId); + + /** + * Publishes the current operation status as FAILED. + */ + void publishFailed(OperationRunId runId, OperationError error); + + /** + * Publishes the current operation status as SUCCEEDED. + */ + void publishSuccess(OperationRunId runId); + + /** + * Publishes the current operation status as KILLED. + */ + void publishKilled(OperationRunId runId); +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/guice/OperationModule.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/guice/OperationModule.java new file mode 100644 index 000000000000..dcc694ad58d8 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/guice/OperationModule.java @@ -0,0 +1,48 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operation.guice; + +import com.google.inject.AbstractModule; +import com.google.inject.assistedinject.FactoryModuleBuilder; +import io.cdap.cdap.internal.app.sourcecontrol.LocalApplicationManager; +import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperation; +import io.cdap.cdap.internal.app.sourcecontrol.PullAppsOperationFactory; +import io.cdap.cdap.internal.operation.InMemoryOperationRunner; +import io.cdap.cdap.internal.operation.LongRunningOperation; +import io.cdap.cdap.internal.operation.MessagingOperationStatePublisher; +import io.cdap.cdap.internal.operation.OperationRunner; +import io.cdap.cdap.internal.operation.OperationStatePublisher; +import io.cdap.cdap.sourcecontrol.ApplicationManager; + + +/** + * Guice module for operation classes. + */ +public class OperationModule extends AbstractModule { + + @Override + protected void configure() { + install(new FactoryModuleBuilder() + .implement(LongRunningOperation.class, PullAppsOperation.class) + .build(PullAppsOperationFactory.class)); + // TODO(samik) change based on worker enabled on not + bind(ApplicationManager.class).to(LocalApplicationManager.class); + bind(OperationRunner.class).to(InMemoryOperationRunner.class); + bind(OperationStatePublisher.class).to(MessagingOperationStatePublisher.class); + } +} + diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/sourcecontrol/PullAppsOperationTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/sourcecontrol/PullAppsOperationTest.java index 168e7e66daf5..7f6c8727cbfb 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/sourcecontrol/PullAppsOperationTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/sourcecontrol/PullAppsOperationTest.java @@ -127,7 +127,6 @@ public void testRunSuccess() throws Exception { @Test(expected = OperationException.class) public void testRunFailedAtFirstApp() throws Exception { ApplicationManager mockManager = Mockito.mock(ApplicationManager.class); - LongRunningOperationContext mockContext = Mockito.mock(LongRunningOperationContext.class); PullAppsOperation operation = new PullAppsOperation(this.req, opRunner, mockManager); Mockito.doThrow(new SourceControlException("")).when(mockManager) diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/InMemoryOperationRunnerTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/InMemoryOperationRunnerTest.java new file mode 100644 index 000000000000..4f4719fa3ba0 --- /dev/null +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/InMemoryOperationRunnerTest.java @@ -0,0 +1,109 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operation; + + +import com.google.common.collect.ImmutableSet; +import com.google.inject.AbstractModule; +import com.google.inject.Injector; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.internal.AppFabricTestHelper; +import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest; +import io.cdap.cdap.proto.id.OperationRunId; +import io.cdap.cdap.proto.operation.OperationMeta; +import io.cdap.cdap.proto.operation.OperationRun; +import io.cdap.cdap.proto.operation.OperationRunStatus; +import io.cdap.cdap.proto.operation.OperationType; +import io.cdap.cdap.sourcecontrol.ApplicationManager; +import io.cdap.cdap.sourcecontrol.operationrunner.InMemorySourceControlOperationRunner; +import java.time.Instant; +import java.util.Collections; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +public class InMemoryOperationRunnerTest { + + private InMemoryOperationRunner runner; + + private static Injector injector; + private static final OperationStatePublisher mockPublisher = Mockito.mock( + OperationStatePublisher.class); + private static final InMemorySourceControlOperationRunner mockScmRunner = Mockito.mock( + InMemorySourceControlOperationRunner.class); + private static final ApplicationManager mockManager = Mockito.mock(ApplicationManager.class); + private static final OperationRunId runId = new OperationRunId("namespace", "run"); + private static final OperationRun run = OperationRun.builder() + .setRunId(runId.getRun()) + .setStatus(OperationRunStatus.PENDING) + .setType(OperationType.PULL_APPS) + .setMetadata( + new OperationMeta(Collections.emptySet(), Instant.now(), null)) + .build(); + private static final OperationRunDetail detail = OperationRunDetail.builder() + .setSourceId(AppFabricTestHelper.createSourceId(0)) + .setRunId(runId) + .setRun(run) + .setPullAppsRequest(new PullAppsRequest(ImmutableSet.of("1", "2", "3", "4"), null)) + .build(); + + @BeforeClass + public static void setupClass() { + CConfiguration cconf = CConfiguration.create(); + injector = AppFabricTestHelper.getInjector(cconf, new AbstractModule() { + @Override + protected void configure() { + bind(OperationStatePublisher.class).toInstance(mockPublisher); + bind(InMemorySourceControlOperationRunner.class).toInstance(mockScmRunner); + bind(ApplicationManager.class).toInstance(mockManager); + } + }); + } + + @Before + public void setupTest() { + runner = injector.getInstance(InMemoryOperationRunner.class); + Mockito.reset(mockPublisher); + } + + @Test + public void testRun() throws Exception { + + OperationController controller = runner.run(detail); + controller.complete().get(); + + Mockito.verify(mockPublisher, Mockito.times(1)).publishRunning(runId); + Mockito.verify(mockPublisher, Mockito.times(1)).publishSuccess(runId); + } + + @Test + public void testRunWithError() throws Exception { + + Mockito.doThrow(new RuntimeException("test")).when(mockScmRunner).pull( + Mockito.any(), Mockito.any() + ); + + OperationController controller = runner.run(detail); + controller.complete().get(); + + Mockito.verify(mockPublisher, Mockito.times(1)).publishRunning(runId); + Mockito.verify(mockPublisher, Mockito.times(1)).publishFailed(Mockito.eq(runId), Mockito.any()); + Mockito.verify(mockPublisher, Mockito.never()).publishSuccess(runId); + } +} + diff --git a/cdap-unit-test/src/main/java/io/cdap/cdap/test/TestBase.java b/cdap-unit-test/src/main/java/io/cdap/cdap/test/TestBase.java index de95b0386e0f..8b74f96b2722 100644 --- a/cdap-unit-test/src/main/java/io/cdap/cdap/test/TestBase.java +++ b/cdap-unit-test/src/main/java/io/cdap/cdap/test/TestBase.java @@ -94,10 +94,10 @@ import io.cdap.cdap.internal.provision.MockProvisionerModule; import io.cdap.cdap.logging.guice.LocalLogAppenderModule; import io.cdap.cdap.logging.guice.LogReaderRuntimeModules; -import io.cdap.cdap.messaging.spi.MessagingService; import io.cdap.cdap.messaging.context.BasicMessagingAdmin; import io.cdap.cdap.messaging.context.MultiThreadMessagingContext; import io.cdap.cdap.messaging.guice.MessagingServerRuntimeModule; +import io.cdap.cdap.messaging.spi.MessagingService; import io.cdap.cdap.metadata.FieldLineageAdmin; import io.cdap.cdap.metadata.LineageAdmin; import io.cdap.cdap.metadata.MetadataAdmin;