diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java index 1bede510c5a8..8dbe59425cdc 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationLifecycleManager.java @@ -17,6 +17,8 @@ package io.cdap.cdap.internal.operation; import com.google.inject.Inject; +import io.cdap.cdap.proto.operation.OperationRunStatus; +import io.cdap.cdap.spi.data.InvalidFieldException; import io.cdap.cdap.spi.data.StructuredTableContext; import io.cdap.cdap.spi.data.transaction.TransactionRunner; import io.cdap.cdap.spi.data.transaction.TransactionRunners; @@ -70,6 +72,19 @@ public boolean scanOperations(ScanOperationRunsRequest request, int txBatchSize, return currentLimit == 0; } + + /** + * Scan all pending operations. Needed for try running all pending operation during startup. + * + * @param consumer {@link Consumer} to process each scanned run + */ + public void scanPendingOperations(Consumer consumer) + throws IOException, InvalidFieldException { + TransactionRunners.run(transactionRunner, context -> { + getOperationRunStore(context).scanOperationByStatus(OperationRunStatus.PENDING, consumer); + }, IOException.class, InvalidFieldException.class); + } + private OperationRunStore getOperationRunStore(StructuredTableContext context) { return new OperationRunStore(context); } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java index 4da517858bc1..e2780790bd24 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunDetail.java @@ -100,7 +100,8 @@ public boolean equals(Object o) { } OperationRunDetail that = (OperationRunDetail) o; - return Objects.equal(this.getRun(), that.getRun()) + return Objects.equal(this.getRunId(), that.getRunId()) + && Objects.equal(this.getRun(), that.getRun()) && Arrays.equals(this.getSourceId(), that.getSourceId()) && Objects.equal(this.getPullAppsRequest(), that.getPullAppsRequest()) && Objects.equal(this.getPrincipal(), that.getPrincipal()); @@ -108,8 +109,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hashCode(super.hashCode(), Arrays.hashCode(getSourceId()), getPrincipal(), - getPullAppsRequest()); + return Objects.hashCode(runId, run, Arrays.hashCode(sourceId), principal, pullAppsRequest); } /** diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunStore.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunStore.java index 28a7139e4558..ed437d960d82 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunStore.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operation/OperationRunStore.java @@ -24,6 +24,7 @@ import io.cdap.cdap.proto.id.OperationRunId; import io.cdap.cdap.proto.operation.OperationError; import io.cdap.cdap.proto.operation.OperationMeta; +import io.cdap.cdap.proto.operation.OperationResource; import io.cdap.cdap.proto.operation.OperationRun; import io.cdap.cdap.proto.operation.OperationRunStatus; import io.cdap.cdap.spi.data.SortOrder; @@ -34,12 +35,15 @@ import io.cdap.cdap.spi.data.table.field.Fields; import io.cdap.cdap.spi.data.table.field.Range; import io.cdap.cdap.store.StoreDefinition; +import io.cdap.cdap.store.StoreDefinition.OperationRunsStore; import java.io.IOException; import java.time.Clock; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.function.Consumer; import javax.annotation.Nullable; @@ -71,7 +75,7 @@ public OperationRunStore(StructuredTableContext context) { * @param detail the run details of the operation * @throws OperationRunAlreadyExistsException when a run with same id exist in namespace */ - public void createOperationRun(OperationRunId runId, OperationRunDetail detail) + public OperationRunDetail createOperationRun(OperationRunId runId, OperationRunDetail detail) throws OperationRunAlreadyExistsException, IOException { Optional row = getOperationRunInternal(runId); if (row.isPresent()) { @@ -80,22 +84,23 @@ public void createOperationRun(OperationRunId runId, OperationRunDetail detail) throw new OperationRunAlreadyExistsException(runId.getRun(), status); } writeOperationRun(runId, detail); + return detail; } /** - * Update the metadata of an operation run. + * Update the resources of an operation run. * * @param runId {@link OperationRunId} for the run - * @param metadata new metdata of the run + * @param resources updated resources for the run * @param sourceId the message id which is responsible for the update * @throws OperationRunNotFoundException run with id does not exist in namespace */ - public void updateOperationMeta(OperationRunId runId, OperationMeta metadata, + public void updateOperationResources(OperationRunId runId, Set resources, @Nullable byte[] sourceId) throws OperationRunNotFoundException, IOException { OperationRunDetail currentDetail = getRunDetail(runId); OperationRun currentRun = currentDetail.getRun(); - OperationRun updatedRun = OperationRun.builder(currentRun) - .setMetadata(metadata).build(); + OperationRun updatedRun = OperationRun.builder(currentRun).setMetadata( + OperationMeta.builder(currentRun.getMetadata()).setResources(resources).build()).build(); OperationRunDetail updatedDetail = OperationRunDetail.builder(currentDetail) .setRun(updatedRun).setSourceId(sourceId).build(); @@ -145,12 +150,14 @@ public void updateOperationStatus(OperationRunId runId, OperationRunStatus statu * @param sourceId the message id which is responsible for the update * @throws OperationRunNotFoundException run with id does not exist in namespace */ - public void failOperationRun(OperationRunId runId, OperationError error, + public void failOperationRun(OperationRunId runId, OperationError error, Instant endTime, @Nullable byte[] sourceId) throws OperationRunNotFoundException, IOException { OperationRunDetail currentDetail = getRunDetail(runId); OperationRun currentRun = currentDetail.getRun(); + OperationMeta updatedMeta = OperationMeta.builder(currentDetail.getRun().getMetadata()) + .setEndTime(endTime).build(); OperationRun updatedRun = OperationRun.builder(currentRun) - .setStatus(OperationRunStatus.FAILED).setError(error).build(); + .setStatus(OperationRunStatus.FAILED).setError(error).setMetadata(updatedMeta).build(); OperationRunDetail updatedDetail = OperationRunDetail.builder(currentDetail) .setRun(updatedRun).setSourceId(sourceId).build(); @@ -174,14 +181,8 @@ public void failOperationRun(OperationRunId runId, OperationError error, */ public OperationRunDetail getOperation(OperationRunId runId) throws OperationRunNotFoundException, IOException { - Optional row = getOperationRunInternal(runId); - if (!row.isPresent()) { - throw new OperationRunNotFoundException(runId.getNamespace(), runId.getRun()); - } - return GSON.fromJson( - row.get().getString(StoreDefinition.OperationRunsStore.DETAILS_FIELD), - OperationRunDetail.class - ); + return getOperationRunInternal(runId).map(this::rowToRunDetail) + .orElseThrow(() -> new OperationRunNotFoundException(runId.getNamespace(), runId.getRun())); } @@ -235,10 +236,7 @@ public String scanOperations(ScanOperationRunsRequest request, while (iterator.hasNext()) { StructuredRow row = iterator.next(); lastKey = row.getString(StoreDefinition.OperationRunsStore.ID_FIELD); - OperationRunDetail detail = GSON.fromJson( - row.getString(StoreDefinition.OperationRunsStore.DETAILS_FIELD), - OperationRunDetail.class - ); + OperationRunDetail detail = rowToRunDetail(row); consumer.accept(detail); } } @@ -246,6 +244,22 @@ public String scanOperations(ScanOperationRunsRequest request, return lastKey; } + /** + * Returns runs with a given status for all namespaces. + */ + public void scanOperationByStatus(OperationRunStatus status, + Consumer consumer) throws IOException { + try (CloseableIterator iterator = getOperationRunsTable(context).scan( + Fields.stringField(OperationRunsStore.STATUS_FIELD, status.name()) + )) { + while (iterator.hasNext()) { + StructuredRow row = iterator.next(); + OperationRunDetail detail = rowToRunDetail(row); + consumer.accept(detail); + } + } + } + private List> getRangeFields(OperationRunId runId) throws IOException, OperationRunNotFoundException { List> fields = new ArrayList<>(); diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/InMemoryOperationRunnerTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/InMemoryOperationRunnerTest.java index cb325be0b4bc..19c0ebe6cfb5 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/InMemoryOperationRunnerTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/InMemoryOperationRunnerTest.java @@ -31,7 +31,6 @@ import io.cdap.cdap.sourcecontrol.ApplicationManager; import io.cdap.cdap.sourcecontrol.operationrunner.InMemorySourceControlOperationRunner; import java.time.Instant; -import java.util.Collections; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -53,7 +52,7 @@ public class InMemoryOperationRunnerTest { .setStatus(OperationRunStatus.PENDING) .setType(OperationType.PULL_APPS) .setMetadata( - new OperationMeta(Collections.emptySet(), Instant.now(), null)) + OperationMeta.builder().setCreateTime(Instant.now()).build()) .build(); private static final OperationRunDetail detail = OperationRunDetail.builder() .setSourceId(AppFabricTestHelper.createSourceId(0)) diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationRunStoreTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationRunStoreTest.java index a983b815db54..4bc1af5cb798 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationRunStoreTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationRunStoreTest.java @@ -16,23 +16,26 @@ package io.cdap.cdap.internal.operation; -import io.cdap.cdap.common.app.RunIds; +import com.google.common.collect.ImmutableSet; import io.cdap.cdap.common.id.Id.Namespace; import io.cdap.cdap.internal.AppFabricTestHelper; -import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest; import io.cdap.cdap.proto.id.OperationRunId; import io.cdap.cdap.proto.operation.OperationError; import io.cdap.cdap.proto.operation.OperationMeta; +import io.cdap.cdap.proto.operation.OperationResource; import io.cdap.cdap.proto.operation.OperationRun; import io.cdap.cdap.proto.operation.OperationRunStatus; import io.cdap.cdap.proto.operation.OperationType; +import io.cdap.cdap.spi.data.InvalidFieldException; import io.cdap.cdap.spi.data.transaction.TransactionRunner; import io.cdap.cdap.spi.data.transaction.TransactionRunners; import java.io.IOException; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -40,15 +43,11 @@ import org.junit.Before; import org.junit.Test; -public abstract class OperationRunStoreTest { +public abstract class OperationRunStoreTest extends OperationTestBase { protected static TransactionRunner transactionRunner; private final AtomicInteger sourceId = new AtomicInteger(); private final AtomicLong runIdTime = new AtomicLong(System.currentTimeMillis()); - private final String testNamespace = "test"; - - private final PullAppsRequest input = new PullAppsRequest(Collections.emptySet(), null); - @Before public void before() throws Exception { @@ -61,7 +60,7 @@ public void before() throws Exception { @Test public void testGetOperation() throws Exception { OperationRunDetail expectedDetail = insertRun(testNamespace, OperationType.PUSH_APPS, - OperationRunStatus.RUNNING); + OperationRunStatus.RUNNING, transactionRunner); String testId = expectedDetail.getRun().getId(); OperationRunId runId = new OperationRunId(testNamespace, testId); @@ -79,9 +78,9 @@ public void testGetOperation() throws Exception { } @Test - public void testUpdateMetadata() throws Exception { + public void testUpdateResources() throws Exception { OperationRunDetail expectedDetail = insertRun(testNamespace, OperationType.PUSH_APPS, - OperationRunStatus.RUNNING); + OperationRunStatus.RUNNING, transactionRunner); String testId = expectedDetail.getRun().getId(); OperationRunId runId = new OperationRunId(testNamespace, testId); @@ -90,23 +89,23 @@ public void testUpdateMetadata() throws Exception { OperationRunDetail gotDetail = store.getOperation(runId); Assert.assertEquals(expectedDetail, gotDetail); - OperationMeta updatedMeta = new OperationMeta(Collections.emptySet(), - Instant.ofEpochMilli(runIdTime.incrementAndGet()), - Instant.ofEpochMilli(runIdTime.incrementAndGet())); + OperationMeta updatedMeta = OperationMeta.builder(expectedDetail.getRun().getMetadata()) + .setResources(ImmutableSet.of(new OperationResource("test"))).build(); OperationRun updatedRun = OperationRun.builder(expectedDetail.getRun()) .setMetadata(updatedMeta).build(); OperationRunDetail updatedDetail = OperationRunDetail.builder(expectedDetail) .setRun(updatedRun) .setSourceId(AppFabricTestHelper.createSourceId(sourceId.incrementAndGet())) .build(); - store.updateOperationMeta(runId, updatedMeta, updatedDetail.getSourceId()); + store.updateOperationResources(runId, updatedMeta.getResources(), + updatedDetail.getSourceId()); gotDetail = store.getOperation(runId); Assert.assertEquals(updatedDetail, gotDetail); try { - store.updateOperationMeta( + store.updateOperationResources( new OperationRunId(Namespace.DEFAULT.getId(), testId), - updatedMeta, + updatedMeta.getResources(), updatedDetail.getSourceId()); Assert.fail("Found unexpected run in default namespace"); } catch (OperationRunNotFoundException e) { @@ -118,7 +117,7 @@ public void testUpdateMetadata() throws Exception { @Test public void testUpdateStatus() throws Exception { OperationRunDetail expectedDetail = insertRun(testNamespace, OperationType.PUSH_APPS, - OperationRunStatus.RUNNING); + OperationRunStatus.RUNNING, transactionRunner); String testId = expectedDetail.getRun().getId(); OperationRunId runId = new OperationRunId(testNamespace, testId); @@ -155,7 +154,7 @@ public void testUpdateStatus() throws Exception { @Test public void testFailOperation() throws Exception { OperationRunDetail expectedDetail = insertRun(testNamespace, OperationType.PUSH_APPS, - OperationRunStatus.RUNNING); + OperationRunStatus.RUNNING, transactionRunner); String testId = expectedDetail.getRun().getId(); OperationRunId runId = new OperationRunId(testNamespace, testId); @@ -168,19 +167,24 @@ public void testFailOperation() throws Exception { OperationRun updatedRun = OperationRun.builder(expectedDetail.getRun()) .setStatus(OperationRunStatus.FAILED) .setError(error) + .setMetadata(OperationMeta.builder(expectedDetail.getRun().getMetadata()) + .setEndTime(Instant.ofEpochMilli(runIdTime.incrementAndGet())) + .build()) .build(); OperationRunDetail updatedDetail = OperationRunDetail.builder(expectedDetail) .setRun(updatedRun) .setSourceId(AppFabricTestHelper.createSourceId(sourceId.incrementAndGet())) .build(); - store.failOperationRun(runId, error, updatedDetail.getSourceId()); - gotDetail = (OperationRunDetail) store.getOperation(runId); + store.failOperationRun(runId, error, updatedRun.getMetadata().getEndTime(), + updatedDetail.getSourceId()); + gotDetail = store.getOperation(runId); Assert.assertEquals(updatedDetail, gotDetail); try { store.failOperationRun( new OperationRunId(Namespace.DEFAULT.getId(), testId), error, + Instant.now(), // no need to verify this updatedDetail.getSourceId() ); Assert.fail("Found unexpected run in default namespace"); @@ -192,7 +196,7 @@ public void testFailOperation() throws Exception { @Test public void testScanOperation() throws Exception { - List insertedRuns = insertTestRuns(); + List insertedRuns = insertTestRuns(transactionRunner); // get a filtered list of testNamespace runs List testNamespaceRuns = insertedRuns.stream() .filter(detail -> detail.getRunId().getNamespace().equals(testNamespace)) @@ -245,46 +249,16 @@ public void testScanOperation() throws Exception { }, Exception.class); } - private OperationRunDetail insertRun(String namespace, OperationType type, - OperationRunStatus status) - throws IOException, OperationRunAlreadyExistsException { - long startTime = runIdTime.incrementAndGet(); - String id = RunIds.generate(startTime).getId(); - OperationRun run = OperationRun.builder() - .setRunId(id) - .setStatus(status) - .setType(type) - .setMetadata( - new OperationMeta(Collections.emptySet(), Instant.ofEpochMilli(startTime), null)) - .build(); - OperationRunId runId = new OperationRunId(namespace, id); - OperationRunDetail detail = OperationRunDetail.builder() - .setSourceId(AppFabricTestHelper.createSourceId(sourceId.incrementAndGet())) - .setRunId(runId) - .setRun(run) - .setPullAppsRequest(input) - .build(); + @Test + public void testScanOperationByStatus() throws Exception { TransactionRunners.run(transactionRunner, context -> { - OperationRunStore operationRunStore = new OperationRunStore(context); - operationRunStore.createOperationRun(runId, detail); - }, IOException.class, OperationRunAlreadyExistsException.class); - return detail; - } - - private List insertTestRuns() throws Exception { - List details = new ArrayList<>(); - // insert 10 runs with increasing start time in two namespaces - // 5 would be in running state 5 in Failed - // 5 would be of type PUSH 5 would be of type PULL - for (int i = 0; i < 5; i++) { - details.add(insertRun(testNamespace, OperationType.PUSH_APPS, OperationRunStatus.RUNNING)); - details.add(insertRun(Namespace.DEFAULT.getId(), OperationType.PUSH_APPS, OperationRunStatus.RUNNING)); - details.add(insertRun(testNamespace, OperationType.PULL_APPS, OperationRunStatus.FAILED)); - details.add(insertRun(Namespace.DEFAULT.getId(), OperationType.PULL_APPS, OperationRunStatus.RUNNING)); - } - // The runs are added in increasing start time, hence reversing the List - Collections.reverse(details); - return details; + Set expectedRuns = insertTestRuns(transactionRunner).stream().filter( + d -> d.getRun().getStatus().equals(OperationRunStatus.PENDING) + ).collect(Collectors.toSet()); + Set gotRuns = new HashSet<>(); + OperationRunStore store = new OperationRunStore(context); + store.scanOperationByStatus(OperationRunStatus.PENDING, gotRuns::add); + Assert.assertTrue(expectedRuns.containsAll(gotRuns)); + }, InvalidFieldException.class, IOException.class); } - } diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationTestBase.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationTestBase.java new file mode 100644 index 000000000000..404d57fa2260 --- /dev/null +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operation/OperationTestBase.java @@ -0,0 +1,131 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operation; + +import io.cdap.cdap.common.app.RunIds; +import io.cdap.cdap.common.id.Id.Namespace; +import io.cdap.cdap.internal.AppFabricTestHelper; +import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest; +import io.cdap.cdap.proto.id.OperationRunId; +import io.cdap.cdap.proto.operation.OperationMeta; +import io.cdap.cdap.proto.operation.OperationRun; +import io.cdap.cdap.proto.operation.OperationRunStatus; +import io.cdap.cdap.proto.operation.OperationType; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import io.cdap.cdap.spi.data.transaction.TransactionRunners; +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class OperationTestBase { + + private static final AtomicInteger sourceId = new AtomicInteger(); + private static final AtomicLong runIdTime = new AtomicLong(System.currentTimeMillis()); + protected static final String testNamespace = "test"; + private static final PullAppsRequest input = new PullAppsRequest(Collections.emptySet(), null); + + protected static OperationRunDetail insertRun( + String namespace, + OperationType type, + OperationRunStatus status, + TransactionRunner transactionRunner) + throws IOException, OperationRunAlreadyExistsException { + long startTime = runIdTime.incrementAndGet(); + String id = RunIds.generate(startTime).getId(); + OperationRun run = + OperationRun.builder() + .setRunId(id) + .setStatus(status) + .setType(type) + .setMetadata( + OperationMeta.builder() + .setCreateTime(Instant.ofEpochMilli(startTime)) + .build() + ) + .build(); + OperationRunId runId = new OperationRunId(namespace, id); + OperationRunDetail detail = + OperationRunDetail.builder() + .setSourceId(AppFabricTestHelper.createSourceId(sourceId.incrementAndGet())) + .setRunId(runId) + .setRun(run) + .setPullAppsRequest(input) + .build(); + TransactionRunners.run( + transactionRunner, + context -> { + OperationRunStore operationRunStore = new OperationRunStore(context); + operationRunStore.createOperationRun(runId, detail); + }, + IOException.class, + OperationRunAlreadyExistsException.class); + return detail; + } + + protected static List insertTestRuns(TransactionRunner transactionRunner) + throws Exception { + List details = new ArrayList<>(); + // insert 10 runs with increasing start time in two namespaces + // 5 would be in running state 5 in Failed + // 5 would be of type PUSH 5 would be of type PULL + for (int i = 0; i < 5; i++) { + details.add( + insertRun( + testNamespace, + OperationType.PUSH_APPS, + OperationRunStatus.PENDING, + transactionRunner)); + details.add( + insertRun( + Namespace.DEFAULT.getId(), + OperationType.PUSH_APPS, + OperationRunStatus.RUNNING, + transactionRunner)); + details.add( + insertRun( + Namespace.DEFAULT.getId(), + OperationType.PUSH_APPS, + OperationRunStatus.FAILED, + transactionRunner)); + details.add( + insertRun( + testNamespace, + OperationType.PULL_APPS, + OperationRunStatus.PENDING, + transactionRunner)); + details.add( + insertRun( + Namespace.DEFAULT.getId(), + OperationType.PULL_APPS, + OperationRunStatus.RUNNING, + transactionRunner)); + details.add( + insertRun( + Namespace.DEFAULT.getId(), + OperationType.PUSH_APPS, + OperationRunStatus.FAILED, + transactionRunner)); + } + // The runs are added in increasing start time, hence reversing the List + Collections.reverse(details); + return details; + } +} diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/operation/OperationMeta.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/operation/OperationMeta.java index f71ec8ade734..2aff0a046fe9 100644 --- a/cdap-proto/src/main/java/io/cdap/cdap/proto/operation/OperationMeta.java +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/operation/OperationMeta.java @@ -17,17 +17,18 @@ package io.cdap.cdap.proto.operation; import java.time.Instant; +import java.util.Collections; +import java.util.HashSet; import java.util.Objects; import java.util.Set; import javax.annotation.Nullable; /** - * Metadata for an operation includes - * 1. The resources on which operation is executed - * 2. Timestamp of operation create - * 3. Timestamp of operation endtime + * Metadata for an operation includes 1. The resources on which operation is executed 2. Timestamp + * of operation create 3. Timestamp of operation endtime */ public class OperationMeta { + private final Set resources; private final Instant createTime; @@ -41,7 +42,8 @@ public class OperationMeta { * @param createTime timestamp when the operation was created * @param endTime timestamp when the operation reached an end state */ - public OperationMeta(Set resources, Instant createTime, @Nullable Instant endTime) { + private OperationMeta(Set resources, Instant createTime, + @Nullable Instant endTime) { this.resources = resources; this.createTime = createTime; this.endTime = endTime; @@ -69,7 +71,7 @@ public int hashCode() { } public Set getResources() { - return resources; + return Collections.unmodifiableSet(resources); } public Instant getCreateTime() { @@ -80,4 +82,67 @@ public Instant getCreateTime() { public Instant getEndTime() { return endTime; } + + /** + * Creates a Builder. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Create a Builder from existing run. + * + * @param meta existing meta to copy fields from + */ + public static Builder builder(OperationMeta meta) { + return new Builder() + .setCreateTime(meta.getCreateTime()) + .setEndTime(meta.endTime) + .setResources(meta.resources); + } + + /** + * Builder to create OperationMeta. + */ + @SuppressWarnings("unchecked") + public static class Builder { + + private final Set resources; + private Instant createTime; + private Instant endTime; + + protected Builder() { + this.resources = new HashSet<>(); + } + + /** + * Clear current resources and add given resources. + */ + public Builder setResources(Set resources) { + this.resources.clear(); + this.resources.addAll(resources); + return this; + } + + public Builder setCreateTime(Instant createTime) { + this.createTime = createTime; + return this; + } + + public Builder setEndTime(Instant endTime) { + this.endTime = endTime; + return this; + } + + /** + * Builds the OperationMeta. + */ + public OperationMeta build() { + if (createTime == null) { + throw new IllegalArgumentException("create time must be specified"); + } + return new OperationMeta(resources, createTime, endTime); + } + } } diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/operation/OperationRunStatus.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/operation/OperationRunStatus.java index 406364db79a4..38ceabb75eb2 100644 --- a/cdap-proto/src/main/java/io/cdap/cdap/proto/operation/OperationRunStatus.java +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/operation/OperationRunStatus.java @@ -48,6 +48,8 @@ public boolean canTransitionTo(OperationRunStatus status) { return true; } switch (this) { + case PENDING: + return status == STARTING || status == STOPPING; case STARTING: // RUNNING is the happy path // STOPPING happens if the run was manually stopped gracefully(may include a timeout)