diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/InMemoryOperationRuntime.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/InMemoryOperationRuntime.java index 59776da40692..11cdb1c54b9c 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/InMemoryOperationRuntime.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/InMemoryOperationRuntime.java @@ -31,11 +31,7 @@ import io.cdap.cdap.spi.data.transaction.TransactionRunner; import io.cdap.cdap.spi.data.transaction.TransactionRunners; import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nullable; import org.apache.twill.common.Threads; import org.slf4j.Logger; @@ -47,8 +43,7 @@ public class InMemoryOperationRuntime extends AbstractIdleService implements OperationRuntime { private final CConfiguration cConf; - private final ReadWriteLock runtimeLock; - private final Map controllers; + private final ConcurrentHashMap controllers; private final OperationRunner runner; private final OperationStatePublisher statePublisher; private final TransactionRunner transactionRunner; @@ -63,8 +58,7 @@ public class InMemoryOperationRuntime extends AbstractIdleService implements Ope this.runner = runner; this.statePublisher = statePublisher; this.transactionRunner = transactionRunner; - this.runtimeLock = new ReentrantReadWriteLock(); - this.controllers = new HashMap<>(); + this.controllers = new ConcurrentHashMap<>(); } /** @@ -74,19 +68,22 @@ public class InMemoryOperationRuntime extends AbstractIdleService implements Ope * @return {@link OperationController} for the run */ public OperationController run(OperationRunDetail runDetail) { - OperationController controller = getController(runDetail); - if (controller != null) { - LOG.debug("Operation has already been started: {}", runDetail.getRunId()); - return controller; - } - try { - updateController(runDetail.getRunId(), runner.run(runDetail)); - } catch (IllegalStateException e) { - statePublisher.publishFailed(runDetail.getRunId(), - new OperationError(e.getMessage(), Collections.emptyList()) - ); - } - return controllers.get(runDetail.getRunId()); + return controllers.computeIfAbsent( + runDetail.getRunId(), + runId -> { + try { + OperationController controller = runner.run(runDetail); + LOG.debug("Added controller for {}", runId); + controller.complete().addListener(() -> remove(runId), Threads.SAME_THREAD_EXECUTOR); + return controller; + } catch (IllegalStateException e) { + statePublisher.publishFailed(runDetail.getRunId(), + new OperationError(e.getMessage(), Collections.emptyList()) + ); + } + return null; + } + ); } /** @@ -94,14 +91,10 @@ public OperationController run(OperationRunDetail runDetail) { */ @Nullable public OperationController getController(OperationRunDetail detail) { - Lock lock = this.runtimeLock.readLock(); - lock.lock(); - try { - return controllers.get(detail.getRunId()); + return controllers.computeIfAbsent(detail.getRunId(), runId -> { // TODO(samik) fetch from store for remote operations - } finally { - lock.unlock(); - } + return null; + }); } @Override @@ -129,34 +122,8 @@ protected void shutDown() throws Exception { // no-op } - /** - * Updates the controller cache by adding the given {@link OperationController} if it does not - * exist. - */ - private void updateController(OperationRunId runId, OperationController controller) { - // Add the runtime info if it does not exist in the cache. - Lock lock = this.runtimeLock.writeLock(); - lock.lock(); - try { - controllers.put(runId, controller); - } finally { - lock.unlock(); - } - - LOG.debug("Added controller for {}", runId); - controller.complete().addListener(() -> remove(runId), Threads.SAME_THREAD_EXECUTOR); - } - private void remove(OperationRunId runId) { - OperationController controller; - Lock lock = this.runtimeLock.writeLock(); - lock.lock(); - try { - controller = controllers.remove(runId); - } finally { - lock.unlock(); - } - + OperationController controller = controllers.remove(runId); if (controller != null) { LOG.debug("Controller removed for {}", runId); } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationNotificationSingleTopicSubscriberService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationNotificationSingleTopicSubscriberService.java new file mode 100644 index 000000000000..8186daf171cb --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationNotificationSingleTopicSubscriberService.java @@ -0,0 +1,228 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operation; + +import com.google.common.annotations.VisibleForTesting; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonSyntaxException; +import io.cdap.cdap.api.common.Bytes; +import io.cdap.cdap.api.metrics.MetricsCollectionService; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.common.utils.ImmutablePair; +import io.cdap.cdap.internal.app.ApplicationSpecificationAdapter; +import io.cdap.cdap.internal.app.services.AbstractNotificationSubscriberService; +import io.cdap.cdap.internal.app.store.AppMetadataStore; +import io.cdap.cdap.messaging.spi.MessagingService; +import io.cdap.cdap.proto.Notification; +import io.cdap.cdap.proto.id.OperationRunId; +import io.cdap.cdap.proto.operation.OperationError; +import io.cdap.cdap.proto.operation.OperationRunStatus; +import io.cdap.cdap.security.spi.authentication.SecurityRequestContext; +import io.cdap.cdap.spi.data.StructuredTableContext; +import io.cdap.cdap.spi.data.TableNotFoundException; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Iterator; +import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service that receives program status notifications from a single topic and persists to the store. + * No transactions should be started in any of the overrided methods since they are already wrapped + * in a transaction. + */ +class OperationNotificationSingleTopicSubscriberService + extends AbstractNotificationSubscriberService { + + private static final Logger LOG = + LoggerFactory.getLogger(OperationNotificationSubscriberService.class); + + private static final Gson GSON = + ApplicationSpecificationAdapter.addTypeAdapters(new GsonBuilder()).create(); + + private final OperationStatePublisher statePublisher; + + private final OperationLifecycleService lifecycleService; + + OperationNotificationSingleTopicSubscriberService( + MessagingService messagingService, + CConfiguration cConf, + MetricsCollectionService metricsCollectionService, + OperationStatePublisher statePublisher, + TransactionRunner transactionRunner, + String name, + String topicName, + OperationLifecycleService lifecycleService) { + super( + name, + cConf, + topicName, + cConf.getInt(Constants.Operation.STATUS_EVENT_FETCH_SIZE), + cConf.getLong(Constants.Operation.STATUS_EVENT_POLL_DELAY_MILLIS), + messagingService, + metricsCollectionService, + transactionRunner, + cConf.getInt(Constants.Operation.STATUS_EVENT_TX_SIZE)); + this.statePublisher = statePublisher; + this.lifecycleService = lifecycleService; + } + + @Nullable + @Override + protected String loadMessageId(StructuredTableContext context) + throws IOException, TableNotFoundException { + return getAppMetadataStore(context).retrieveSubscriberState(getTopicId().getTopic(), + "operation.state.reader"); + } + + @Override + protected void storeMessageId(StructuredTableContext context, String messageId) + throws IOException, TableNotFoundException { + getAppMetadataStore(context).persistSubscriberState(getTopicId().getTopic(), + "operation.state.reader", messageId); + } + + @Override + protected void processMessages( + StructuredTableContext structuredTableContext, + Iterator> messages) + throws Exception { + while (messages.hasNext()) { + ImmutablePair messagePair = messages.next(); + processNotification( + messagePair.getFirst().getBytes(StandardCharsets.UTF_8), + messagePair.getSecond(), + structuredTableContext); + } + } + + /** + * Process a {@link Notification} received from TMS. + * + * @param messageIdBytes the raw message id in the TMS for the notification + * @param notification the {@link Notification} to process + * @param context context to get the table for operations + * @throws Exception if failed to process the given notification + */ + @VisibleForTesting + void processNotification( + byte[] messageIdBytes, + Notification notification, + StructuredTableContext context) + throws Exception { + OperationRunStore runStore = new OperationRunStore(context); + try { + OperationNotification operationNotification = OperationNotification.fromNotification( + notification); + handleOperationEvent(operationNotification, messageIdBytes, runStore); + } catch (IllegalArgumentException | JsonSyntaxException e) { + LOG.warn("Got invalid operation notification", e); + } + } + + private void handleOperationEvent( + OperationNotification notification, + byte[] messageIdBytes, + OperationRunStore runStore + ) throws Exception { + LOG.debug("Processing operation status notification: {}", notification); + OperationRunId runId = notification.getRunId(); + OperationRunDetail runDetail; + try { + runDetail = runStore.getOperation(runId); + if (!runDetail.getRun().getStatus().canTransitionTo(notification.getStatus())) { + LOG.debug( + "Ignoring unexpected request to transition operation {} from {} state to " + + "{} state.", + runId, runDetail.getRun().getStatus(), notification.getStatus()); + return; + } + + byte[] existingSourceId = runDetail.getSourceId(); + if (existingSourceId != null && Bytes.compareTo(messageIdBytes, existingSourceId) < 0) { + LOG.debug( + "Notification source id '{}' is not larger than the existing source id '{}' in the existing " + + "operation run detail.", + Bytes.toHexString(runDetail.getSourceId()), Bytes.toHexString(existingSourceId)); + return; + } + } catch (OperationRunNotFoundException e) { + LOG.debug(String.format("Ignoring message for non existent operation %s", runId)); + return; + } + + switch (notification.getStatus()) { + case STARTING: + String oldUser = SecurityRequestContext.getUserId(); + try { + if (runDetail.getPrincipal() != null) { + SecurityRequestContext.setUserId(runDetail.getPrincipal()); + } + lifecycleService.startOperation(runDetail); + } catch (Exception e) { + LOG.error("Failed to start operation {}", runDetail, e); + statePublisher.publishFailed(runId, + new OperationError(e.getMessage(), Collections.emptyList())); + } finally { + SecurityRequestContext.setUserId(oldUser); + } + runStore.updateOperationStatus(runId, OperationRunStatus.STARTING, messageIdBytes); + break; + case RUNNING: + case SUCCEEDED: + runStore.updateOperationStatus(runId, notification.getStatus(), messageIdBytes); + if (notification.getResources() != null) { + runStore.updateOperationResources(runId, notification.getResources(), messageIdBytes); + } + break; + case STOPPING: + try { + lifecycleService.stopOperation(runDetail); + } catch (IllegalStateException e) { + LOG.warn("Failed to stop operation {}", runDetail.getRunId(), e); + } + runStore.updateOperationStatus(runId, OperationRunStatus.STOPPING, messageIdBytes); + break; + case KILLED: + runStore.updateOperationStatus(runId, OperationRunStatus.KILLED, messageIdBytes); + break; + case FAILED: + runStore.failOperationRun(runId, notification.getError(), notification.getEndTime(), + messageIdBytes); + break; + default: + // This should not happen + LOG.error( + "Unsupported status {} for operation {}, {}", + notification.getStatus(), + runId, + notification); + } + } + + /** + * Returns an instance of {@link AppMetadataStore}. + */ + private AppMetadataStore getAppMetadataStore(StructuredTableContext context) { + return AppMetadataStore.create(context); + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationNotificationSubscriberService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationNotificationSubscriberService.java index fa130e83b972..5b758d1141a3 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationNotificationSubscriberService.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationNotificationSubscriberService.java @@ -16,45 +16,22 @@ package io.cdap.cdap.internal.operation; -import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.AbstractIdleService; import com.google.common.util.concurrent.Service; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonSyntaxException; import com.google.inject.Inject; -import io.cdap.cdap.api.common.Bytes; import io.cdap.cdap.api.metrics.MetricsCollectionService; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; -import io.cdap.cdap.common.utils.ImmutablePair; -import io.cdap.cdap.internal.app.ApplicationSpecificationAdapter; -import io.cdap.cdap.internal.app.services.AbstractNotificationSubscriberService; -import io.cdap.cdap.internal.app.store.AppMetadataStore; import io.cdap.cdap.messaging.spi.MessagingService; -import io.cdap.cdap.proto.Notification; -import io.cdap.cdap.proto.id.OperationRunId; -import io.cdap.cdap.proto.operation.OperationError; -import io.cdap.cdap.proto.operation.OperationRunStatus; -import io.cdap.cdap.security.spi.authentication.SecurityRequestContext; -import io.cdap.cdap.spi.data.StructuredTableContext; -import io.cdap.cdap.spi.data.TableNotFoundException; import io.cdap.cdap.spi.data.transaction.TransactionRunner; -import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.stream.IntStream; -import javax.annotation.Nullable; import org.apache.twill.internal.CompositeService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Service that creates children services, each to handle a single partition of operation status - * events topic + * events topic. */ public class OperationNotificationSubscriberService extends AbstractIdleService { @@ -120,186 +97,3 @@ private OperationNotificationSingleTopicSubscriberService createChildService( } -/** - * Service that receives program status notifications from a single topic and persists to the store. - * No transactions should be started in any of the overrided methods since they are already wrapped - * in a transaction. - */ -class OperationNotificationSingleTopicSubscriberService - extends AbstractNotificationSubscriberService { - - private static final Logger LOG = - LoggerFactory.getLogger(OperationNotificationSubscriberService.class); - - private static final Gson GSON = - ApplicationSpecificationAdapter.addTypeAdapters(new GsonBuilder()).create(); - - private final OperationStatePublisher statePublisher; - - private final OperationLifecycleService lifecycleService; - - OperationNotificationSingleTopicSubscriberService( - MessagingService messagingService, - CConfiguration cConf, - MetricsCollectionService metricsCollectionService, - OperationStatePublisher statePublisher, - TransactionRunner transactionRunner, - String name, - String topicName, - OperationLifecycleService lifecycleService) { - super( - name, - cConf, - topicName, - cConf.getInt(Constants.Operation.STATUS_EVENT_FETCH_SIZE), - cConf.getLong(Constants.Operation.STATUS_EVENT_POLL_DELAY_MILLIS), - messagingService, - metricsCollectionService, - transactionRunner, - cConf.getInt(Constants.Operation.STATUS_EVENT_TX_SIZE)); - this.statePublisher = statePublisher; - this.lifecycleService = lifecycleService; - } - - @Nullable - @Override - protected String loadMessageId(StructuredTableContext context) - throws IOException, TableNotFoundException { - return getAppMetadataStore(context).retrieveSubscriberState(getTopicId().getTopic(), - "operation.state.reader"); - } - - @Override - protected void storeMessageId(StructuredTableContext context, String messageId) - throws IOException, TableNotFoundException { - getAppMetadataStore(context).persistSubscriberState(getTopicId().getTopic(), - "operation.state.reader", messageId); - } - - @Override - protected void processMessages( - StructuredTableContext structuredTableContext, - Iterator> messages) - throws Exception { - while (messages.hasNext()) { - ImmutablePair messagePair = messages.next(); - processNotification( - messagePair.getFirst().getBytes(StandardCharsets.UTF_8), - messagePair.getSecond(), - structuredTableContext); - } - } - - /** - * Process a {@link Notification} received from TMS. - * - * @param messageIdBytes the raw message id in the TMS for the notification - * @param notification the {@link Notification} to process - * @param context context to get the table for operations - * @throws Exception if failed to process the given notification - */ - @VisibleForTesting - void processNotification( - byte[] messageIdBytes, - Notification notification, - StructuredTableContext context) - throws Exception { - OperationRunStore runStore = new OperationRunStore(context); - try { - OperationNotification operationNotification = OperationNotification.fromNotification( - notification); - handleOperationEvent(operationNotification, messageIdBytes, runStore); - } catch (IllegalArgumentException | JsonSyntaxException e) { - LOG.warn("Got invalid operation notification", e); - } - } - - private void handleOperationEvent( - OperationNotification notification, - byte[] messageIdBytes, - OperationRunStore runStore - ) throws Exception { - LOG.debug("Processing operation status notification: {}", notification); - OperationRunId runId = notification.getRunId(); - OperationRunDetail runDetail; - try { - runDetail = runStore.getOperation(runId); - if (!runDetail.getRun().getStatus().canTransitionTo(notification.getStatus())) { - LOG.debug( - "Ignoring unexpected request to transition operation {} from {} state to " - + "{} state.", - runId, runDetail.getRun().getStatus(), notification.getStatus()); - return; - } - - byte[] existingSourceId = runDetail.getSourceId(); - if (existingSourceId != null && Bytes.compareTo(messageIdBytes, existingSourceId) < 0) { - LOG.debug( - "Notification source id '{}' is not larger than the existing source id '{}' in the existing " - + "operation run detail.", - Bytes.toHexString(runDetail.getSourceId()), Bytes.toHexString(existingSourceId)); - return; - } - } catch (OperationRunNotFoundException e) { - LOG.debug(String.format("Ignoring message for non existent operation %s", runId)); - return; - } - - switch (notification.getStatus()) { - case STARTING: - String oldUser = SecurityRequestContext.getUserId(); - try { - if (runDetail.getPrincipal() != null) { - SecurityRequestContext.setUserId(runDetail.getPrincipal()); - } - lifecycleService.startOperation(runDetail); - } catch (Exception e) { - LOG.error("Failed to start operation {}", runDetail, e); - statePublisher.publishFailed(runId, - new OperationError(e.getMessage(), Collections.emptyList())); - } finally { - SecurityRequestContext.setUserId(oldUser); - } - runStore.updateOperationStatus(runId, OperationRunStatus.STARTING, messageIdBytes); - break; - case RUNNING: - case SUCCEEDED: - runStore.updateOperationStatus(runId, notification.getStatus(), messageIdBytes); - if (notification.getResources() != null) { - runStore.updateOperationResources(runId, notification.getResources(), messageIdBytes); - } - break; - case STOPPING: - try { - lifecycleService.stopOperation(runDetail); - } catch (IllegalStateException e) { - LOG.warn("Failed to stop operation {}", runDetail.getRunId(), e); - } - runStore.updateOperationStatus(runId, OperationRunStatus.STOPPING, messageIdBytes); - break; - case KILLED: - runStore.updateOperationStatus(runId, OperationRunStatus.KILLED, messageIdBytes); - break; - case FAILED: - runStore.failOperationRun(runId, notification.getError(), notification.getEndTime(), - messageIdBytes); - break; - default: - // This should not happen - LOG.error( - "Unsupported status {} for operation {}, {}", - notification.getStatus(), - runId, - notification); - } - } - - /** - * Returns an instance of {@link AppMetadataStore}. - */ - private AppMetadataStore getAppMetadataStore(StructuredTableContext context) { - return AppMetadataStore.create(context); - } -} - - diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/ScanOperationRunsRequest.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/ScanOperationRunsRequest.java index 6120863510f2..6e78293959eb 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/ScanOperationRunsRequest.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/ScanOperationRunsRequest.java @@ -95,7 +95,6 @@ public static Builder builder(ScanOperationRunsRequest request) { */ public static class Builder { - @Nullable private String namespace; @Nullable private String scanAfterRunId; diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationLifecycleManagerTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationLifecycleManagerTest.java deleted file mode 100644 index 0c22a5a7eac2..000000000000 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationLifecycleManagerTest.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright © 2023 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.cdap.internal.operation; - -import com.google.common.io.Closeables; -import com.google.inject.AbstractModule; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.google.inject.Scopes; -import io.cdap.cdap.api.metrics.MetricsCollectionService; -import io.cdap.cdap.common.conf.CConfiguration; -import io.cdap.cdap.common.guice.ConfigModule; -import io.cdap.cdap.common.guice.LocalLocationModule; -import io.cdap.cdap.common.metrics.NoOpMetricsCollectionService; -import io.cdap.cdap.data.runtime.StorageModule; -import io.cdap.cdap.data.runtime.SystemDatasetRuntimeModule; -import io.cdap.cdap.spi.data.StructuredTableAdmin; -import io.cdap.cdap.spi.data.TableAlreadyExistsException; -import io.cdap.cdap.spi.data.sql.PostgresInstantiator; -import io.cdap.cdap.spi.data.transaction.TransactionRunner; -import io.cdap.cdap.store.StoreDefinition; -import io.zonky.test.db.postgres.embedded.EmbeddedPostgres; -import java.io.IOException; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.rules.TemporaryFolder; - -public class OperationLifecycleManagerTest extends OperationTestBase { - - protected static TransactionRunner transactionRunner; - private static EmbeddedPostgres pg; - @ClassRule - public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); - - @BeforeClass - public static void beforeClass() throws IOException, TableAlreadyExistsException { - CConfiguration cConf = CConfiguration.create(); - pg = PostgresInstantiator.createAndStart(cConf, TEMP_FOLDER.newFolder()); - Injector injector = Guice.createInjector( - new ConfigModule(cConf), - new LocalLocationModule(), - new SystemDatasetRuntimeModule().getInMemoryModules(), - new StorageModule(), - new AbstractModule() { - @Override - protected void configure() { - bind(MetricsCollectionService.class).to(NoOpMetricsCollectionService.class) - .in(Scopes.SINGLETON); - } - } - ); - - transactionRunner = injector.getInstance(TransactionRunner.class); - StoreDefinition.OperationRunsStore.create(injector.getInstance(StructuredTableAdmin.class)); - } - - @AfterClass - public static void afterClass() { - Closeables.closeQuietly(pg); - } - - // @Test - // public void testScanPendingOperations() throws Exception { - // OperationLifecycleManager manager = new OperationLifecycleManager(transactionRunner, null); - // Set expectedDetails = insertTestRuns(transactionRunner).stream() - // .filter(d -> d.getRun().getStatus().equals( - // OperationRunStatus.PENDING)).collect(Collectors.toSet()); - // - // Set gotDetails = new HashSet<>(); - // manager.scanPendingOperations(gotDetails::add); - // - // Assert.assertEquals(expectedDetails, gotDetails); - // } -}