Skip to content

Commit

Permalink
Merge pull request #15424 from cdapio/get-pending-runs
Browse files Browse the repository at this point in the history
Add scan operation runs by status method
  • Loading branch information
samdgupi authored Nov 14, 2023
2 parents 6863ef4 + 87d79a6 commit d068939
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package io.cdap.cdap.internal.operation;

import com.google.inject.Inject;
import io.cdap.cdap.proto.operation.OperationRunStatus;
import io.cdap.cdap.spi.data.InvalidFieldException;
import io.cdap.cdap.spi.data.StructuredTableContext;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
Expand Down Expand Up @@ -70,6 +72,19 @@ public boolean scanOperations(ScanOperationRunsRequest request, int txBatchSize,
return currentLimit == 0;
}


/**
* Scan all pending operations. Needed for try running all pending operation during startup.
*
* @param consumer {@link Consumer} to process each scanned run
*/
public void scanPendingOperations(Consumer<OperationRunDetail> consumer)
throws IOException, InvalidFieldException {
TransactionRunners.run(transactionRunner, context -> {
getOperationRunStore(context).scanOperationByStatus(OperationRunStatus.PENDING, consumer);
}, IOException.class, InvalidFieldException.class);
}

private OperationRunStore getOperationRunStore(StructuredTableContext context) {
return new OperationRunStore(context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,16 @@ public boolean equals(Object o) {
}

OperationRunDetail that = (OperationRunDetail) o;
return Objects.equal(this.getRun(), that.getRun())
return Objects.equal(this.getRunId(), that.getRunId())
&& Objects.equal(this.getRun(), that.getRun())
&& Arrays.equals(this.getSourceId(), that.getSourceId())
&& Objects.equal(this.getPullAppsRequest(), that.getPullAppsRequest())
&& Objects.equal(this.getPrincipal(), that.getPrincipal());
}

@Override
public int hashCode() {
return Objects.hashCode(super.hashCode(), Arrays.hashCode(getSourceId()), getPrincipal(),
getPullAppsRequest());
return Objects.hashCode(runId, run, Arrays.hashCode(sourceId), principal, pullAppsRequest);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.cdap.cdap.proto.id.OperationRunId;
import io.cdap.cdap.proto.operation.OperationError;
import io.cdap.cdap.proto.operation.OperationMeta;
import io.cdap.cdap.proto.operation.OperationResource;
import io.cdap.cdap.proto.operation.OperationRun;
import io.cdap.cdap.proto.operation.OperationRunStatus;
import io.cdap.cdap.spi.data.SortOrder;
Expand All @@ -34,12 +35,15 @@
import io.cdap.cdap.spi.data.table.field.Fields;
import io.cdap.cdap.spi.data.table.field.Range;
import io.cdap.cdap.store.StoreDefinition;
import io.cdap.cdap.store.StoreDefinition.OperationRunsStore;
import java.io.IOException;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -71,7 +75,7 @@ public OperationRunStore(StructuredTableContext context) {
* @param detail the run details of the operation
* @throws OperationRunAlreadyExistsException when a run with same id exist in namespace
*/
public void createOperationRun(OperationRunId runId, OperationRunDetail detail)
public OperationRunDetail createOperationRun(OperationRunId runId, OperationRunDetail detail)
throws OperationRunAlreadyExistsException, IOException {
Optional<StructuredRow> row = getOperationRunInternal(runId);
if (row.isPresent()) {
Expand All @@ -80,22 +84,23 @@ public void createOperationRun(OperationRunId runId, OperationRunDetail detail)
throw new OperationRunAlreadyExistsException(runId.getRun(), status);
}
writeOperationRun(runId, detail);
return detail;
}

/**
* Update the metadata of an operation run.
* Update the resources of an operation run.
*
* @param runId {@link OperationRunId} for the run
* @param metadata new metdata of the run
* @param resources updated resources for the run
* @param sourceId the message id which is responsible for the update
* @throws OperationRunNotFoundException run with id does not exist in namespace
*/
public void updateOperationMeta(OperationRunId runId, OperationMeta metadata,
public void updateOperationResources(OperationRunId runId, Set<OperationResource> resources,
@Nullable byte[] sourceId) throws OperationRunNotFoundException, IOException {
OperationRunDetail currentDetail = getRunDetail(runId);
OperationRun currentRun = currentDetail.getRun();
OperationRun updatedRun = OperationRun.builder(currentRun)
.setMetadata(metadata).build();
OperationRun updatedRun = OperationRun.builder(currentRun).setMetadata(
OperationMeta.builder(currentRun.getMetadata()).setResources(resources).build()).build();
OperationRunDetail updatedDetail = OperationRunDetail.builder(currentDetail)
.setRun(updatedRun).setSourceId(sourceId).build();

Expand Down Expand Up @@ -145,12 +150,14 @@ public void updateOperationStatus(OperationRunId runId, OperationRunStatus statu
* @param sourceId the message id which is responsible for the update
* @throws OperationRunNotFoundException run with id does not exist in namespace
*/
public void failOperationRun(OperationRunId runId, OperationError error,
public void failOperationRun(OperationRunId runId, OperationError error, Instant endTime,
@Nullable byte[] sourceId) throws OperationRunNotFoundException, IOException {
OperationRunDetail currentDetail = getRunDetail(runId);
OperationRun currentRun = currentDetail.getRun();
OperationMeta updatedMeta = OperationMeta.builder(currentDetail.getRun().getMetadata())
.setEndTime(endTime).build();
OperationRun updatedRun = OperationRun.builder(currentRun)
.setStatus(OperationRunStatus.FAILED).setError(error).build();
.setStatus(OperationRunStatus.FAILED).setError(error).setMetadata(updatedMeta).build();
OperationRunDetail updatedDetail = OperationRunDetail.builder(currentDetail)
.setRun(updatedRun).setSourceId(sourceId).build();

Expand All @@ -174,14 +181,8 @@ public void failOperationRun(OperationRunId runId, OperationError error,
*/
public OperationRunDetail getOperation(OperationRunId runId)
throws OperationRunNotFoundException, IOException {
Optional<StructuredRow> row = getOperationRunInternal(runId);
if (!row.isPresent()) {
throw new OperationRunNotFoundException(runId.getNamespace(), runId.getRun());
}
return GSON.fromJson(
row.get().getString(StoreDefinition.OperationRunsStore.DETAILS_FIELD),
OperationRunDetail.class
);
return getOperationRunInternal(runId).map(this::rowToRunDetail)
.orElseThrow(() -> new OperationRunNotFoundException(runId.getNamespace(), runId.getRun()));
}


Expand Down Expand Up @@ -235,17 +236,30 @@ public String scanOperations(ScanOperationRunsRequest request,
while (iterator.hasNext()) {
StructuredRow row = iterator.next();
lastKey = row.getString(StoreDefinition.OperationRunsStore.ID_FIELD);
OperationRunDetail detail = GSON.fromJson(
row.getString(StoreDefinition.OperationRunsStore.DETAILS_FIELD),
OperationRunDetail.class
);
OperationRunDetail detail = rowToRunDetail(row);
consumer.accept(detail);
}
}

return lastKey;
}

/**
* Returns runs with a given status for all namespaces.
*/
public void scanOperationByStatus(OperationRunStatus status,
Consumer<OperationRunDetail> consumer) throws IOException {
try (CloseableIterator<StructuredRow> iterator = getOperationRunsTable(context).scan(
Fields.stringField(OperationRunsStore.STATUS_FIELD, status.name())
)) {
while (iterator.hasNext()) {
StructuredRow row = iterator.next();
OperationRunDetail detail = rowToRunDetail(row);
consumer.accept(detail);
}
}
}

private List<Field<?>> getRangeFields(OperationRunId runId)
throws IOException, OperationRunNotFoundException {
List<Field<?>> fields = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.cdap.cdap.sourcecontrol.ApplicationManager;
import io.cdap.cdap.sourcecontrol.operationrunner.InMemorySourceControlOperationRunner;
import java.time.Instant;
import java.util.Collections;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -53,7 +52,7 @@ public class InMemoryOperationRunnerTest {
.setStatus(OperationRunStatus.PENDING)
.setType(OperationType.PULL_APPS)
.setMetadata(
new OperationMeta(Collections.emptySet(), Instant.now(), null))
OperationMeta.builder().setCreateTime(Instant.now()).build())
.build();
private static final OperationRunDetail detail = OperationRunDetail.builder()
.setSourceId(AppFabricTestHelper.createSourceId(0))
Expand Down
Loading

0 comments on commit d068939

Please sign in to comment.