Skip to content

Commit

Permalink
Add scan operation runs by status method
Browse files Browse the repository at this point in the history
  • Loading branch information
samdgupi committed Nov 10, 2023
1 parent 6863ef4 commit c98e762
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package io.cdap.cdap.internal.operation;

import com.google.inject.Inject;
import io.cdap.cdap.proto.operation.OperationRunStatus;
import io.cdap.cdap.spi.data.InvalidFieldException;
import io.cdap.cdap.spi.data.StructuredTableContext;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
Expand Down Expand Up @@ -70,6 +72,20 @@ public boolean scanOperations(ScanOperationRunsRequest request, int txBatchSize,
return currentLimit == 0;
}


/**
* Scan all pending operations. Needed for try running all pending operation during startup.
*
* @param txBatchSize batch size of transaction
* @param consumer {@link Consumer} to process each scanned run
*/
public void scanPendingOperations(int txBatchSize, Consumer<OperationRunDetail> consumer)
throws IOException, InvalidFieldException {
TransactionRunners.run(transactionRunner, context -> {
getOperationRunStore(context).scanOperationByStatus(OperationRunStatus.PENDING, consumer);
}, IOException.class, InvalidFieldException.class);
}

private OperationRunStore getOperationRunStore(StructuredTableContext context) {
return new OperationRunStore(context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,16 @@ public boolean equals(Object o) {
}

OperationRunDetail that = (OperationRunDetail) o;
return Objects.equal(this.getRun(), that.getRun())
return Objects.equal(this.getRunId(), that.getRunId())
&& Objects.equal(this.getRun(), that.getRun())
&& Arrays.equals(this.getSourceId(), that.getSourceId())
&& Objects.equal(this.getPullAppsRequest(), that.getPullAppsRequest())
&& Objects.equal(this.getPrincipal(), that.getPrincipal());
}

@Override
public int hashCode() {
return Objects.hashCode(super.hashCode(), Arrays.hashCode(getSourceId()), getPrincipal(),
getPullAppsRequest());
return Objects.hashCode(runId, run, Arrays.hashCode(sourceId), principal, pullAppsRequest);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operation.OperationError;
import io.cdap.cdap.proto.operation.OperationMeta;
import io.cdap.cdap.proto.operation.OperationResource;
import io.cdap.cdap.proto.operation.OperationRun;
import io.cdap.cdap.proto.operation.OperationRunStatus;
import io.cdap.cdap.spi.data.SortOrder;
Expand All @@ -34,12 +35,15 @@
import io.cdap.cdap.spi.data.table.field.Fields;
import io.cdap.cdap.spi.data.table.field.Range;
import io.cdap.cdap.store.StoreDefinition;
import io.cdap.cdap.store.StoreDefinition.OperationRunsStore;
import java.io.IOException;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -83,19 +87,19 @@ public void createOperationRun(OperationRunId runId, OperationRunDetail detail)
}

/**
* Update the metadata of an operation run.
* Update the resources of an operation run.
*
* @param runId {@link OperationRunId} for the run
* @param metadata new metdata of the run
* @param resources updated resources for the run
* @param sourceId the message id which is responsible for the update
* @throws OperationRunNotFoundException run with id does not exist in namespace
*/
public void updateOperationMeta(OperationRunId runId, OperationMeta metadata,
public void updateOperationResources(OperationRunId runId, Set<OperationResource> resources,
@Nullable byte[] sourceId) throws OperationRunNotFoundException, IOException {
OperationRunDetail currentDetail = getRunDetail(runId);
OperationRun currentRun = currentDetail.getRun();
OperationRun updatedRun = OperationRun.builder(currentRun)
.setMetadata(metadata).build();
OperationRun updatedRun = OperationRun.builder(currentRun).setMetadata(
OperationMeta.builder(currentRun.getMetadata()).setResources(resources).build()).build();
OperationRunDetail updatedDetail = OperationRunDetail.builder(currentDetail)
.setRun(updatedRun).setSourceId(sourceId).build();

Expand Down Expand Up @@ -145,12 +149,14 @@ public void updateOperationStatus(OperationRunId runId, OperationRunStatus statu
* @param sourceId the message id which is responsible for the update
* @throws OperationRunNotFoundException run with id does not exist in namespace
*/
public void failOperationRun(OperationRunId runId, OperationError error,
public void failOperationRun(OperationRunId runId, OperationError error, Instant endTime,
@Nullable byte[] sourceId) throws OperationRunNotFoundException, IOException {
OperationRunDetail currentDetail = getRunDetail(runId);
OperationRun currentRun = currentDetail.getRun();
OperationMeta updatedMeta = OperationMeta.builder(currentDetail.getRun().getMetadata())
.setEndTime(endTime).build();
OperationRun updatedRun = OperationRun.builder(currentRun)
.setStatus(OperationRunStatus.FAILED).setError(error).build();
.setStatus(OperationRunStatus.FAILED).setError(error).setMetadata(updatedMeta).build();
OperationRunDetail updatedDetail = OperationRunDetail.builder(currentDetail)
.setRun(updatedRun).setSourceId(sourceId).build();

Expand Down Expand Up @@ -235,17 +241,30 @@ public String scanOperations(ScanOperationRunsRequest request,
while (iterator.hasNext()) {
StructuredRow row = iterator.next();
lastKey = row.getString(StoreDefinition.OperationRunsStore.ID_FIELD);
OperationRunDetail detail = GSON.fromJson(
row.getString(StoreDefinition.OperationRunsStore.DETAILS_FIELD),
OperationRunDetail.class
);
OperationRunDetail detail = rowToRunDetail(row);
consumer.accept(detail);
}
}

return lastKey;
}

/**
* Returns runs with a given status for all namespaces.
*/
public void scanOperationByStatus(OperationRunStatus status,
Consumer<OperationRunDetail> consumer) throws IOException {
try (CloseableIterator<StructuredRow> iterator = getOperationRunsTable(context).scan(
Fields.stringField(OperationRunsStore.STATUS_FIELD, status.name())
)) {
while (iterator.hasNext()) {
StructuredRow row = iterator.next();
OperationRunDetail detail = rowToRunDetail(row);
consumer.accept(detail);
}
}
}

private List<Field<?>> getRangeFields(OperationRunId runId)
throws IOException, OperationRunNotFoundException {
List<Field<?>> fields = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.cdap.cdap.sourcecontrol.ApplicationManager;
import io.cdap.cdap.sourcecontrol.operationrunner.InMemorySourceControlOperationRunner;
import java.time.Instant;
import java.util.Collections;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -53,7 +52,7 @@ public class InMemoryOperationRunnerTest {
.setStatus(OperationRunStatus.PENDING)
.setType(OperationType.PULL_APPS)
.setMetadata(
new OperationMeta(Collections.emptySet(), Instant.now(), null))
OperationMeta.builder().setCreateTime(Instant.now()).build())
.build();
private static final OperationRunDetail detail = OperationRunDetail.builder()
.setSourceId(AppFabricTestHelper.createSourceId(0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,38 @@

package io.cdap.cdap.internal.operation;

import io.cdap.cdap.common.app.RunIds;
import com.google.common.collect.ImmutableSet;
import io.cdap.cdap.common.id.Id.Namespace;
import io.cdap.cdap.internal.AppFabricTestHelper;
import io.cdap.cdap.internal.app.sourcecontrol.PullAppsRequest;
import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operation.OperationError;
import io.cdap.cdap.proto.operation.OperationMeta;
import io.cdap.cdap.proto.operation.OperationResource;
import io.cdap.cdap.proto.operation.OperationRun;
import io.cdap.cdap.proto.operation.OperationRunStatus;
import io.cdap.cdap.proto.operation.OperationType;
import io.cdap.cdap.spi.data.InvalidFieldException;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public abstract class OperationRunStoreTest {
public abstract class OperationRunStoreTest extends OperationTestBase {

protected static TransactionRunner transactionRunner;
private final AtomicInteger sourceId = new AtomicInteger();
private final AtomicLong runIdTime = new AtomicLong(System.currentTimeMillis());
private final String testNamespace = "test";

private final PullAppsRequest input = new PullAppsRequest(Collections.emptySet(), null);


@Before
public void before() throws Exception {
Expand All @@ -61,7 +60,7 @@ public void before() throws Exception {
@Test
public void testGetOperation() throws Exception {
OperationRunDetail expectedDetail = insertRun(testNamespace, OperationType.PUSH_APPS,
OperationRunStatus.RUNNING);
OperationRunStatus.RUNNING, transactionRunner);
String testId = expectedDetail.getRun().getId();
OperationRunId runId = new OperationRunId(testNamespace, testId);

Expand All @@ -79,9 +78,9 @@ public void testGetOperation() throws Exception {
}

@Test
public void testUpdateMetadata() throws Exception {
public void testUpdateResources() throws Exception {
OperationRunDetail expectedDetail = insertRun(testNamespace, OperationType.PUSH_APPS,
OperationRunStatus.RUNNING);
OperationRunStatus.RUNNING, transactionRunner);
String testId = expectedDetail.getRun().getId();
OperationRunId runId = new OperationRunId(testNamespace, testId);

Expand All @@ -90,23 +89,23 @@ public void testUpdateMetadata() throws Exception {
OperationRunDetail gotDetail = store.getOperation(runId);
Assert.assertEquals(expectedDetail, gotDetail);

OperationMeta updatedMeta = new OperationMeta(Collections.emptySet(),
Instant.ofEpochMilli(runIdTime.incrementAndGet()),
Instant.ofEpochMilli(runIdTime.incrementAndGet()));
OperationMeta updatedMeta = OperationMeta.builder(expectedDetail.getRun().getMetadata())
.setResources(ImmutableSet.of(new OperationResource("test"))).build();
OperationRun updatedRun = OperationRun.builder(expectedDetail.getRun())
.setMetadata(updatedMeta).build();
OperationRunDetail updatedDetail = OperationRunDetail.builder(expectedDetail)
.setRun(updatedRun)
.setSourceId(AppFabricTestHelper.createSourceId(sourceId.incrementAndGet()))
.build();
store.updateOperationMeta(runId, updatedMeta, updatedDetail.getSourceId());
store.updateOperationResources(runId, updatedMeta.getResources(),
updatedDetail.getSourceId());
gotDetail = store.getOperation(runId);
Assert.assertEquals(updatedDetail, gotDetail);

try {
store.updateOperationMeta(
store.updateOperationResources(
new OperationRunId(Namespace.DEFAULT.getId(), testId),
updatedMeta,
updatedMeta.getResources(),
updatedDetail.getSourceId());
Assert.fail("Found unexpected run in default namespace");
} catch (OperationRunNotFoundException e) {
Expand All @@ -118,7 +117,7 @@ public void testUpdateMetadata() throws Exception {
@Test
public void testUpdateStatus() throws Exception {
OperationRunDetail expectedDetail = insertRun(testNamespace, OperationType.PUSH_APPS,
OperationRunStatus.RUNNING);
OperationRunStatus.RUNNING, transactionRunner);
String testId = expectedDetail.getRun().getId();
OperationRunId runId = new OperationRunId(testNamespace, testId);

Expand Down Expand Up @@ -155,7 +154,7 @@ public void testUpdateStatus() throws Exception {
@Test
public void testFailOperation() throws Exception {
OperationRunDetail expectedDetail = insertRun(testNamespace, OperationType.PUSH_APPS,
OperationRunStatus.RUNNING);
OperationRunStatus.RUNNING, transactionRunner);
String testId = expectedDetail.getRun().getId();
OperationRunId runId = new OperationRunId(testNamespace, testId);

Expand All @@ -168,19 +167,24 @@ public void testFailOperation() throws Exception {
OperationRun updatedRun = OperationRun.builder(expectedDetail.getRun())
.setStatus(OperationRunStatus.FAILED)
.setError(error)
.setMetadata(OperationMeta.builder(expectedDetail.getRun().getMetadata())
.setEndTime(Instant.ofEpochMilli(runIdTime.incrementAndGet()))
.build())
.build();
OperationRunDetail updatedDetail = OperationRunDetail.builder(expectedDetail)
.setRun(updatedRun)
.setSourceId(AppFabricTestHelper.createSourceId(sourceId.incrementAndGet()))
.build();
store.failOperationRun(runId, error, updatedDetail.getSourceId());
gotDetail = (OperationRunDetail) store.getOperation(runId);
store.failOperationRun(runId, error, updatedRun.getMetadata().getEndTime(),
updatedDetail.getSourceId());
gotDetail = store.getOperation(runId);
Assert.assertEquals(updatedDetail, gotDetail);

try {
store.failOperationRun(
new OperationRunId(Namespace.DEFAULT.getId(), testId),
error,
Instant.now(), // no need to verify this
updatedDetail.getSourceId()
);
Assert.fail("Found unexpected run in default namespace");
Expand All @@ -192,7 +196,7 @@ public void testFailOperation() throws Exception {

@Test
public void testScanOperation() throws Exception {
List<OperationRunDetail> insertedRuns = insertTestRuns();
List<OperationRunDetail> insertedRuns = insertTestRuns(transactionRunner);
// get a filtered list of testNamespace runs
List<OperationRunDetail> testNamespaceRuns = insertedRuns.stream()
.filter(detail -> detail.getRunId().getNamespace().equals(testNamespace))
Expand Down Expand Up @@ -245,46 +249,16 @@ public void testScanOperation() throws Exception {
}, Exception.class);
}

private OperationRunDetail insertRun(String namespace, OperationType type,
OperationRunStatus status)
throws IOException, OperationRunAlreadyExistsException {
long startTime = runIdTime.incrementAndGet();
String id = RunIds.generate(startTime).getId();
OperationRun run = OperationRun.builder()
.setRunId(id)
.setStatus(status)
.setType(type)
.setMetadata(
new OperationMeta(Collections.emptySet(), Instant.ofEpochMilli(startTime), null))
.build();
OperationRunId runId = new OperationRunId(namespace, id);
OperationRunDetail detail = OperationRunDetail.builder()
.setSourceId(AppFabricTestHelper.createSourceId(sourceId.incrementAndGet()))
.setRunId(runId)
.setRun(run)
.setPullAppsRequest(input)
.build();
@Test
public void testScanOperationByStatus() throws Exception {
TransactionRunners.run(transactionRunner, context -> {
OperationRunStore operationRunStore = new OperationRunStore(context);
operationRunStore.createOperationRun(runId, detail);
}, IOException.class, OperationRunAlreadyExistsException.class);
return detail;
}

private List<OperationRunDetail> insertTestRuns() throws Exception {
List<OperationRunDetail> details = new ArrayList<>();
// insert 10 runs with increasing start time in two namespaces
// 5 would be in running state 5 in Failed
// 5 would be of type PUSH 5 would be of type PULL
for (int i = 0; i < 5; i++) {
details.add(insertRun(testNamespace, OperationType.PUSH_APPS, OperationRunStatus.RUNNING));
details.add(insertRun(Namespace.DEFAULT.getId(), OperationType.PUSH_APPS, OperationRunStatus.RUNNING));
details.add(insertRun(testNamespace, OperationType.PULL_APPS, OperationRunStatus.FAILED));
details.add(insertRun(Namespace.DEFAULT.getId(), OperationType.PULL_APPS, OperationRunStatus.RUNNING));
}
// The runs are added in increasing start time, hence reversing the List
Collections.reverse(details);
return details;
Set<OperationRunDetail> expectedRuns = insertTestRuns(transactionRunner).stream().filter(
d -> d.getRun().getStatus().equals(OperationRunStatus.PENDING)
).collect(Collectors.toSet());
Set<OperationRunDetail> gotRuns = new HashSet<>();
OperationRunStore store = new OperationRunStore(context);
store.scanOperationByStatus(OperationRunStatus.PENDING, gotRuns::add);
Assert.assertTrue(expectedRuns.containsAll(gotRuns));
}, InvalidFieldException.class, IOException.class);
}

}
Loading

0 comments on commit c98e762

Please sign in to comment.