diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationLifecycleManager.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationLifecycleManager.java new file mode 100644 index 000000000000..707c1b0808b9 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationLifecycleManager.java @@ -0,0 +1,77 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operations; + +import com.google.inject.Inject; +import io.cdap.cdap.internal.app.store.OperationRunDetail; +import io.cdap.cdap.spi.data.StructuredTableContext; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import io.cdap.cdap.spi.data.transaction.TransactionRunners; +import java.io.IOException; +import java.util.function.Consumer; + +/** + * Service that manages lifecycle of Operation. + */ +public class OperationLifecycleManager { + + private final TransactionRunner transactionRunner; + + @Inject + OperationLifecycleManager(TransactionRunner transactionRunner) { + this.transactionRunner = transactionRunner; + } + + /** + * Scan operations in a namespace. + * + * @param request scan request including filters and limit + * @param txBatchSize batch size of transaction + * @param consumer {@link Consumer} to process each scanned run + * @return true if we have scanned till the request limit else return false. This will be used by + * the caller to identify if there is any further runs left to scan. + */ + public boolean scanOperations(ScanOperationRunsRequest request, int txBatchSize, + Consumer> consumer) throws OperationRunNotFoundException, IOException { + String lastKey = request.getScanAfter(); + int currentLimit = request.getLimit(); + + while (currentLimit > 0) { + ScanOperationRunsRequest batchRequest = ScanOperationRunsRequest + .builder(request) + .setScanAfter(lastKey) + .setLimit(Math.min(txBatchSize, currentLimit)) + .build(); + + request = batchRequest; + + lastKey = TransactionRunners.run(transactionRunner, context -> { + return getOperationRunStore(context).scanOperations(batchRequest, consumer); + }, IOException.class, OperationRunNotFoundException.class); + + if (lastKey == null) { + break; + } + currentLimit -= txBatchSize; + } + return currentLimit == 0; + } + + private OperationRunStore getOperationRunStore(StructuredTableContext context) { + return new OperationRunStore(context); + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunAlreadyExistsException.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunAlreadyExistsException.java new file mode 100644 index 000000000000..5dc378ab0ab4 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunAlreadyExistsException.java @@ -0,0 +1,30 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operations; + +import io.cdap.cdap.common.AlreadyExistsException; +import io.cdap.cdap.proto.operationrun.OperationRunStatus; + +/** + * Thrown when an operation run already exists. + */ +public class OperationRunAlreadyExistsException extends AlreadyExistsException { + + public OperationRunAlreadyExistsException(String operationId, OperationRunStatus status) { + super(String.format("Operation %s already exists with status %s", operationId, status)); + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunFilter.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunFilter.java new file mode 100644 index 000000000000..a83a615157dd --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunFilter.java @@ -0,0 +1,51 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operations; + +import io.cdap.cdap.proto.operationrun.OperationRunStatus; +import javax.annotation.Nullable; + +/** + * This class defined various filters that can be applied during operation runs scanning. + */ +public class OperationRunFilter { + + @Nullable + private final String operationType; + @Nullable + private final OperationRunStatus status; + // TODO(samik) status and type filters as list + + public OperationRunFilter(@Nullable String operationType, @Nullable OperationRunStatus status) { + this.operationType = operationType; + this.status = status; + } + + @Nullable + public String getOperationType() { + return operationType; + } + + @Nullable + public OperationRunStatus getStatus() { + return status; + } + + public static OperationRunFilter emptyFilter() { + return new OperationRunFilter(null, null); + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunNotFoundException.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunNotFoundException.java new file mode 100644 index 000000000000..4ba621aebf31 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunNotFoundException.java @@ -0,0 +1,30 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operations; + +import io.cdap.cdap.common.NotFoundException; + +/** + * Exception thrown when an operation run with the specified id not found in the specified + * namespace. + */ +public class OperationRunNotFoundException extends NotFoundException { + + public OperationRunNotFoundException(String namespace, String runId) { + super(String.format("Operation run %s does not exist in namespace %s", runId, namespace)); + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunStore.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunStore.java new file mode 100644 index 000000000000..b85fd06b9b10 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunStore.java @@ -0,0 +1,342 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operations; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import io.cdap.cdap.api.dataset.lib.CloseableIterator; +import io.cdap.cdap.internal.app.store.OperationRunDetail; +import io.cdap.cdap.proto.id.OperationRunId; +import io.cdap.cdap.proto.operationrun.OperationError; +import io.cdap.cdap.proto.operationrun.OperationMeta; +import io.cdap.cdap.proto.operationrun.OperationRun; +import io.cdap.cdap.proto.operationrun.OperationRunStatus; +import io.cdap.cdap.spi.data.SortOrder; +import io.cdap.cdap.spi.data.StructuredRow; +import io.cdap.cdap.spi.data.StructuredTable; +import io.cdap.cdap.spi.data.StructuredTableContext; +import io.cdap.cdap.spi.data.table.field.Field; +import io.cdap.cdap.spi.data.table.field.Fields; +import io.cdap.cdap.spi.data.table.field.Range; +import io.cdap.cdap.store.StoreDefinition; +import java.io.IOException; +import java.time.Clock; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; +import javax.annotation.Nullable; + +/** + * Store for operation runs. + */ +public class OperationRunStore { + + private static final Gson GSON = new GsonBuilder().create(); + private static final String SMALLEST_POSSIBLE_STRING = ""; + + private final StructuredTableContext context; + private final Clock clock; + + public OperationRunStore(StructuredTableContext context) { + this(context, Clock.systemUTC()); + } + + @VisibleForTesting + OperationRunStore(StructuredTableContext context, Clock clock) { + this.context = context; + this.clock = clock; + } + + /** + * Create a new operation. If an operation with same id exist throw exception. + * + * @param runId {@link OperationRunId} for the run + * @param detail the run details of the operation + * @throws OperationRunAlreadyExistsException when a run with same id exist in namespace + */ + public void createOperationRun(OperationRunId runId, OperationRunDetail detail) + throws OperationRunAlreadyExistsException, IOException { + Optional row = getOperationRunInternal(runId); + if (row.isPresent()) { + OperationRunStatus status = OperationRunStatus.valueOf( + row.get().getString(StoreDefinition.OperationRunsStore.STATUS_FIELD)); + throw new OperationRunAlreadyExistsException(runId.getRun(), status); + } + writeOperationRun(runId, detail); + } + + /** + * Update the metadata of an operation run. + * + * @param runId {@link OperationRunId} for the run + * @param metadata new metdata of the run + * @param sourceId the message id which is responsible for the update + * @throws OperationRunNotFoundException run with id does not exist in namespace + */ + public void updateOperationMeta(OperationRunId runId, OperationMeta metadata, + @Nullable byte[] sourceId) throws OperationRunNotFoundException, IOException { + OperationRunDetail currentDetail = getRunDetail(runId); + OperationRun currentRun = currentDetail.getRun(); + OperationRun updatedRun = OperationRun.builder(currentRun) + .setMetadata(metadata).build(); + OperationRunDetail updatedDetail = OperationRunDetail.builder(currentDetail) + .setRun(updatedRun).setSourceId(sourceId).build(); + + Collection> fields = getCommonUpdateFields(runId); + fields.add( + Fields.stringField(StoreDefinition.OperationRunsStore.DETAILS_FIELD, + GSON.toJson(updatedDetail))); + + StructuredTable operationRunsTable = getOperationRunsTable(context); + operationRunsTable.update(fields); + } + + /** + * Update the status of an operation run. + * + * @param runId {@link OperationRunId} for the run + * @param status new metadata of the run + * @param sourceId the message id which is responsible for the update + * @throws OperationRunNotFoundException run with id does not exist in namespace + */ + public void updateOperationStatus(OperationRunId runId, OperationRunStatus status, + @Nullable byte[] sourceId) throws OperationRunNotFoundException, IOException { + OperationRunDetail currentDetail = getRunDetail(runId); + OperationRun currentRun = currentDetail.getRun(); + OperationRun updatedRun = OperationRun.builder(currentRun) + .setStatus(status).build(); + OperationRunDetail updatedDetail = OperationRunDetail.builder(currentDetail) + .setRun(updatedRun).setSourceId(sourceId).build(); + + Collection> fields = getCommonUpdateFields(runId); + fields.add( + Fields.stringField(StoreDefinition.OperationRunsStore.DETAILS_FIELD, + GSON.toJson(updatedDetail))); + fields.add( + Fields.stringField(StoreDefinition.OperationRunsStore.STATUS_FIELD, + GSON.toJson(status))); + + StructuredTable operationRunsTable = getOperationRunsTable(context); + operationRunsTable.update(fields); + } + + /** + * Mark an operation run as failed. + * + * @param runId {@link OperationRunId} for the run + * @param error error related to the failure + * @param sourceId the message id which is responsible for the update + * @throws OperationRunNotFoundException run with id does not exist in namespace + */ + public void failOperationRun(OperationRunId runId, OperationError error, + @Nullable byte[] sourceId) throws OperationRunNotFoundException, IOException { + OperationRunDetail currentDetail = getRunDetail(runId); + OperationRun currentRun = currentDetail.getRun(); + OperationRun updatedRun = OperationRun.builder(currentRun) + .setStatus(OperationRunStatus.FAILED).setError(error).build(); + OperationRunDetail updatedDetail = OperationRunDetail.builder(currentDetail) + .setRun(updatedRun).setSourceId(sourceId).build(); + + Collection> fields = getCommonUpdateFields(runId); + fields.add( + Fields.stringField(StoreDefinition.OperationRunsStore.DETAILS_FIELD, + GSON.toJson(updatedDetail))); + fields.add( + Fields.stringField(StoreDefinition.OperationRunsStore.STATUS_FIELD, + GSON.toJson(OperationRunStatus.FAILED))); + + StructuredTable operationRunsTable = getOperationRunsTable(context); + operationRunsTable.update(fields); + } + + /** + * Get an operation run by id. + * + * @param runId {@link OperationRunId} for the run + * @throws OperationRunNotFoundException run with id does not exist in namespace + */ + public OperationRunDetail getOperation(OperationRunId runId) + throws OperationRunNotFoundException, IOException { + Optional row = getOperationRunInternal(runId); + if (!row.isPresent()) { + throw new OperationRunNotFoundException(runId.getNamespace(), runId.getRun()); + } + return GSON.fromJson( + row.get().getString(StoreDefinition.OperationRunsStore.DETAILS_FIELD), + OperationRunDetail.class + ); + } + + + /** + * Scans operations. Allows to optionally set filters and implement pagination. For pagination set + * {@link ScanOperationRunsRequest#getScanAfter()} to the last application id of the previous + * page. + * + * @param request parameters defining filters + * @param consumer {@link Consumer} to process each scanned run + * @throws IOException if failed to scan the storage + * @throws OperationRunNotFoundException if the + * {@link ScanOperationRunsRequest#getScanAfter()} operation run does not exist + * @see ScanOperationRunsRequest#builder(ScanOperationRunsRequest) to create a next page / batch + * request + */ + public String scanOperations(ScanOperationRunsRequest request, + Consumer> consumer) throws IOException, OperationRunNotFoundException { + Range.Bound startBound = Range.Bound.INCLUSIVE; + final Range.Bound endBound = Range.Bound.INCLUSIVE; + Collection> startFields = new ArrayList<>(); + + startFields.add( + Fields.stringField(StoreDefinition.OperationRunsStore.NAMESPACE_FIELD, + request.getNamespace()) + ); + if (request.getFilter().getOperationType() != null) { + startFields.add(Fields.stringField(StoreDefinition.OperationRunsStore.TYPE_FIELD, + request.getFilter().getOperationType()) + ); + } + if (request.getFilter().getStatus() != null) { + startFields.add(Fields.stringField(StoreDefinition.OperationRunsStore.STATUS_FIELD, + request.getFilter().getStatus().toString())); + } + + Collection> endFields = startFields; + + if (request.getScanAfter() != null) { + startBound = Range.Bound.EXCLUSIVE; + startFields = getRangeFields( + new OperationRunId(request.getNamespace(), request.getScanAfter())); + } + + Range range = Range.create(endFields, endBound, startFields, startBound); + StructuredTable table = getOperationRunsTable(context); + String lastKey = null; + + try (CloseableIterator iterator = table.scan(range, request.getLimit(), + StoreDefinition.OperationRunsStore.START_TIME_FIELD, SortOrder.DESC)) { + while (iterator.hasNext()) { + StructuredRow row = iterator.next(); + lastKey = row.getString(StoreDefinition.OperationRunsStore.ID_FIELD); + OperationRunDetail detail = GSON.fromJson( + row.getString(StoreDefinition.OperationRunsStore.DETAILS_FIELD), + OperationRunDetail.class + ); + consumer.accept(detail); + } + } + + return lastKey; + } + + private List> getRangeFields(OperationRunId runId) + throws IOException, OperationRunNotFoundException { + List> fields = new ArrayList<>(); + fields.add( + Fields.stringField(StoreDefinition.OperationRunsStore.NAMESPACE_FIELD, + runId.getNamespace())); + fields.add(Fields.stringField(StoreDefinition.OperationRunsStore.ID_FIELD, runId.getRun())); + + Long startTime = getOperationRunInternal(runId) + .map(r -> r.getLong(StoreDefinition.OperationRunsStore.START_TIME_FIELD)) + .orElseThrow(() -> new OperationRunNotFoundException(runId.getNamespace(), runId.getRun())); + + fields.add( + Fields.longField(StoreDefinition.OperationRunsStore.START_TIME_FIELD, startTime)); + return fields; + } + + private OperationRunDetail getRunDetail(OperationRunId runId) + throws IOException, OperationRunNotFoundException { + return getOperationRunInternal(runId) + .map(this::rowToRunDetail) + .orElseThrow(() -> new OperationRunNotFoundException(runId.getNamespace(), runId.getRun())); + } + + private OperationRunDetail rowToRunDetail(StructuredRow row) { + OperationRunDetail detail = GSON.fromJson( + row.getString(StoreDefinition.OperationRunsStore.DETAILS_FIELD), + OperationRunDetail.class + ); + // RunId is not serialized hence we need to populate it from row + String namespace = row.getString(StoreDefinition.OperationRunsStore.NAMESPACE_FIELD); + String id = row.getString(StoreDefinition.OperationRunsStore.ID_FIELD); + return OperationRunDetail.builder(detail).setRunId(new OperationRunId(namespace, id)).build(); + } + + private Collection> getCommonUpdateFields(OperationRunId runId) { + Collection> fields = new ArrayList<>(); + fields.add( + Fields.stringField(StoreDefinition.OperationRunsStore.NAMESPACE_FIELD, + runId.getNamespace())); + fields.add( + Fields.stringField(StoreDefinition.OperationRunsStore.ID_FIELD, runId.getRun())); + fields.add( + Fields.longField(StoreDefinition.OperationRunsStore.UPDATE_TIME_FIELD, clock.millis())); + return fields; + } + + private Optional getOperationRunInternal(OperationRunId runId) + throws IOException { + StructuredTable operationRunsTable = getOperationRunsTable(context); + Collection> key = ImmutableList.of( + Fields.stringField(StoreDefinition.OperationRunsStore.NAMESPACE_FIELD, + runId.getNamespace()), + Fields.stringField(StoreDefinition.OperationRunsStore.ID_FIELD, runId.getRun()) + ); + return operationRunsTable.read(key); + } + + private void writeOperationRun(OperationRunId runId, OperationRunDetail detail) + throws IOException { + Collection> fields = ImmutableList.of( + Fields.stringField(StoreDefinition.OperationRunsStore.ID_FIELD, runId.getRun()), + Fields.stringField(StoreDefinition.OperationRunsStore.NAMESPACE_FIELD, + runId.getNamespace()), + Fields.stringField(StoreDefinition.OperationRunsStore.STATUS_FIELD, + detail.getRun().getStatus().toString()), + Fields.stringField(StoreDefinition.OperationRunsStore.TYPE_FIELD, + detail.getRun().getType()), + Fields.longField(StoreDefinition.OperationRunsStore.START_TIME_FIELD, + detail.getRun().getMetadata().getCreateTime().toEpochMilli()), + Fields.longField(StoreDefinition.OperationRunsStore.UPDATE_TIME_FIELD, + System.currentTimeMillis()), + Fields.stringField(StoreDefinition.OperationRunsStore.DETAILS_FIELD, GSON.toJson(detail)) + ); + StructuredTable operationRunsTable = getOperationRunsTable(context); + operationRunsTable.upsert(fields); + } + + private StructuredTable getOperationRunsTable(StructuredTableContext context) { + return context.getTable(StoreDefinition.OperationRunsStore.OPERATION_RUNS); + } + + @VisibleForTesting + // USE ONLY IN TESTS: WILL DELETE ALL OPERATION RUNS + void clearData() throws IOException { + StructuredTable table = getOperationRunsTable(context); + table.deleteAll( + Range.from(ImmutableList.of( + Fields.stringField(StoreDefinition.AppMetadataStore.NAMESPACE_FIELD, + SMALLEST_POSSIBLE_STRING)), + Range.Bound.INCLUSIVE)); + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/ScanOperationRunsRequest.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/ScanOperationRunsRequest.java new file mode 100644 index 000000000000..6220a8d979e5 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/ScanOperationRunsRequest.java @@ -0,0 +1,163 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operations; + +import javax.annotation.Nullable; + +/** + * Defines parameters of operation run scan. + */ +public class ScanOperationRunsRequest { + + private final String namespace; + @Nullable + private final String scanAfterRunId; + private final int limit; + private final OperationRunFilter filter; + + /** + * Constructor for ScanOperationRunsRequest. + * + * @param namespace namespace to return runs for + * @param scanAfterRunId run id to start scan from (exclusive) + * @param filter additional filters to apply + * @param limit maximum number of records to return + */ + private ScanOperationRunsRequest(String namespace, @Nullable String scanAfterRunId, int limit, + OperationRunFilter filter) { + this.namespace = namespace; + this.scanAfterRunId = scanAfterRunId; + this.limit = limit; + this.filter = filter; + } + + /** + * namespace to return applications for. + */ + + public String getNamespace() { + return namespace; + } + + /** + * run id to start scan from (exclusive). + */ + @Nullable + public String getScanAfter() { + return scanAfterRunId; + } + + /** + * additional filters to apply. All filters must be satisfied (and operation). + */ + public OperationRunFilter getFilter() { + return filter; + } + + /** + * maximum number of records to read. + */ + public int getLimit() { + return limit; + } + + /** + * Builder to create a new {@link ScanOperationRunsRequest}. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Builder to create a new {@link ScanOperationRunsRequest} prefilled with passed in request + * values. + */ + public static Builder builder(ScanOperationRunsRequest request) { + return new Builder(request); + } + + /** + * Builder for {@link ScanOperationRunsRequest}. + */ + public static class Builder { + + private String namespace; + @Nullable + private String scanToRunId; + @Nullable + private String scanAfterRunId; + @Nullable + private OperationRunFilter filter; + private int limit = Integer.MAX_VALUE; + + private Builder() { + } + + private Builder(ScanOperationRunsRequest request) { + this.namespace = request.namespace; + this.scanAfterRunId = request.scanAfterRunId; + this.filter = request.filter; + this.limit = request.limit; + } + + /** + * namespaceId namespace to scan in. + */ + public Builder setNamespace(String namespace) { + this.namespace = namespace; + return this; + } + + /** + * restart the scan after specific run id. Useful for pagination. + */ + public Builder setScanAfter(String scanFromRunId) { + this.scanAfterRunId = scanFromRunId; + return this; + } + + /** + * filters to apply. + */ + public Builder setFilter( + OperationRunFilter filter) { + this.filter = filter; + return this; + } + + /** + * limit maximum number of records to scan. + */ + public Builder setLimit(int limit) { + this.limit = limit; + return this; + } + + /** + * return new {@link ScanOperationRunsRequest}. + */ + public ScanOperationRunsRequest build() { + if (namespace == null) { + throw new IllegalArgumentException("namespace must be specified."); + } + if (filter == null) { + filter = OperationRunFilter.emptyFilter(); + } + return new ScanOperationRunsRequest(namespace, scanAfterRunId, limit, filter); + } + } +} diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operations/OperationRunStoreTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operations/OperationRunStoreTest.java new file mode 100644 index 000000000000..dd5e26aa3d50 --- /dev/null +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operations/OperationRunStoreTest.java @@ -0,0 +1,297 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operations; + +import io.cdap.cdap.common.app.RunIds; +import io.cdap.cdap.common.id.Id.Namespace; +import io.cdap.cdap.internal.AppFabricTestHelper; +import io.cdap.cdap.internal.app.store.OperationRunDetail; +import io.cdap.cdap.proto.id.OperationRunId; +import io.cdap.cdap.proto.operationrun.OperationError; +import io.cdap.cdap.proto.operationrun.OperationMeta; +import io.cdap.cdap.proto.operationrun.OperationRun; +import io.cdap.cdap.proto.operationrun.OperationRunStatus; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import io.cdap.cdap.spi.data.transaction.TransactionRunners; +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public abstract class OperationRunStoreTest { + + protected static TransactionRunner transactionRunner; + private final AtomicInteger sourceId = new AtomicInteger(); + private final AtomicLong runIdTime = new AtomicLong(System.currentTimeMillis()); + private final String testNamespace = "test"; + + + @Before + public void before() throws Exception { + TransactionRunners.run(transactionRunner, context -> { + OperationRunStore operationRunsStore = new OperationRunStore(context); + operationRunsStore.clearData(); + }); + } + + @Test + public void testGetOperation() throws Exception { + OperationRunDetail expectedDetail = insertRun(testNamespace, "LIST", + OperationRunStatus.RUNNING); + String testId = expectedDetail.getRun().getId(); + OperationRunId runId = new OperationRunId(testNamespace, testId); + + TransactionRunners.run(transactionRunner, context -> { + OperationRunStore store = new OperationRunStore(context); + OperationRunDetail gotDetail = (OperationRunDetail) store.getOperation(runId); + Assert.assertEquals(expectedDetail, gotDetail); + try { + store.getOperation(new OperationRunId(Namespace.DEFAULT.getId(), testId)); + Assert.fail("Found unexpected run in default namespace"); + } catch (OperationRunNotFoundException e) { + // expected + } + }, Exception.class); + } + + @Test + public void testUpdateMetadata() throws Exception { + OperationRunDetail expectedDetail = insertRun(testNamespace, "LIST", + OperationRunStatus.RUNNING); + String testId = expectedDetail.getRun().getId(); + OperationRunId runId = new OperationRunId(testNamespace, testId); + + TransactionRunners.run(transactionRunner, context -> { + OperationRunStore store = new OperationRunStore(context); + OperationRunDetail gotDetail = (OperationRunDetail) store.getOperation(runId); + Assert.assertEquals(expectedDetail, gotDetail); + + OperationMeta updatedMeta = new OperationMeta(Collections.emptySet(), + Instant.ofEpochMilli(runIdTime.incrementAndGet()), + Instant.ofEpochMilli(runIdTime.incrementAndGet())); + OperationRun updatedRun = OperationRun.builder(expectedDetail.getRun()) + .setMetadata(updatedMeta).build(); + OperationRunDetail updatedDetail = OperationRunDetail.builder(expectedDetail) + .setRun(updatedRun) + .setSourceId(AppFabricTestHelper.createSourceId(sourceId.incrementAndGet())) + .build(); + store.updateOperationMeta(runId, updatedMeta, updatedDetail.getSourceId()); + gotDetail = (OperationRunDetail) store.getOperation(runId); + Assert.assertEquals(updatedDetail, gotDetail); + + try { + store.updateOperationMeta( + new OperationRunId(Namespace.DEFAULT.getId(), testId), + updatedMeta, + updatedDetail.getSourceId()); + Assert.fail("Found unexpected run in default namespace"); + } catch (OperationRunNotFoundException e) { + // expected + } + }, Exception.class); + } + + @Test + public void testUpdateStatus() throws Exception { + OperationRunDetail expectedDetail = insertRun(testNamespace, "LIST", + OperationRunStatus.RUNNING); + String testId = expectedDetail.getRun().getId(); + OperationRunId runId = new OperationRunId(testNamespace, testId); + + TransactionRunners.run(transactionRunner, context -> { + OperationRunStore store = new OperationRunStore(context); + OperationRunDetail gotDetail = (OperationRunDetail) store.getOperation(runId); + Assert.assertEquals(expectedDetail, gotDetail); + + OperationRun updatedRun = OperationRun.builder(expectedDetail.getRun()) + .setStatus(OperationRunStatus.STOPPING) + .build(); + OperationRunDetail updatedDetail = OperationRunDetail.builder(expectedDetail) + .setRun(updatedRun) + .setSourceId(AppFabricTestHelper.createSourceId(sourceId.incrementAndGet())) + .build(); + store.updateOperationStatus(runId, updatedRun.getStatus(), + updatedDetail.getSourceId()); + gotDetail = (OperationRunDetail) store.getOperation(runId); + Assert.assertEquals(updatedDetail, gotDetail); + + try { + store.updateOperationStatus( + new OperationRunId(Namespace.DEFAULT.getId(), testId), + updatedRun.getStatus(), + updatedDetail.getSourceId()) + ; + Assert.fail("Found unexpected run in default namespace"); + } catch (OperationRunNotFoundException e) { + // expected + } + }, Exception.class); + } + + @Test + public void testFailOperation() throws Exception { + OperationRunDetail expectedDetail = insertRun(testNamespace, "LIST", + OperationRunStatus.RUNNING); + String testId = expectedDetail.getRun().getId(); + OperationRunId runId = new OperationRunId(testNamespace, testId); + + TransactionRunners.run(transactionRunner, context -> { + OperationRunStore store = new OperationRunStore(context); + OperationRunDetail gotDetail = (OperationRunDetail) store.getOperation(runId); + Assert.assertEquals(expectedDetail, gotDetail); + + OperationError error = new OperationError("operation failed", Collections.emptyList()); + OperationRun updatedRun = OperationRun.builder(expectedDetail.getRun()) + .setStatus(OperationRunStatus.FAILED) + .setError(error) + .build(); + OperationRunDetail updatedDetail = OperationRunDetail.builder(expectedDetail) + .setRun(updatedRun) + .setSourceId(AppFabricTestHelper.createSourceId(sourceId.incrementAndGet())) + .build(); + store.failOperationRun(runId, error, updatedDetail.getSourceId()); + gotDetail = (OperationRunDetail) store.getOperation(runId); + Assert.assertEquals(updatedDetail, gotDetail); + + try { + store.failOperationRun( + new OperationRunId(Namespace.DEFAULT.getId(), testId), + error, + updatedDetail.getSourceId() + ); + Assert.fail("Found unexpected run in default namespace"); + } catch (OperationRunNotFoundException e) { + // expected + } + }, Exception.class); + } + + @Test + public void testScanOperation() throws Exception { + List> insertedRuns = insertTestRuns(); + // get a filtered list of testNamespace runs + List> testNamespaceRuns = insertedRuns.stream() + .filter(detail -> detail.getRunId().getNamespace().equals(testNamespace)) + .collect(Collectors.toList()); + + TransactionRunners.run(transactionRunner, context -> { + List gotRuns = new ArrayList<>(); + List> expectedRuns; + ScanOperationRunsRequest request; + + OperationRunStore store = new OperationRunStore(context); + + // verify the scan without filters picks all runs for testNamespace + request = ScanOperationRunsRequest.builder() + .setNamespace(testNamespace).build(); + store.scanOperations(request, gotRuns::add); + expectedRuns = testNamespaceRuns; + Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray()); + + // verify limit + gotRuns.clear(); + request = ScanOperationRunsRequest.builder() + .setNamespace(testNamespace).setLimit(2).build(); + store.scanOperations(request, gotRuns::add); + expectedRuns = testNamespaceRuns.stream().limit(2).collect(Collectors.toList()); + Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray()); + + // verify the scan with type filter + gotRuns.clear(); + request = ScanOperationRunsRequest.builder() + .setNamespace(testNamespace) + .setFilter(new OperationRunFilter("PUSH", null)).build(); + store.scanOperations(request, gotRuns::add); + expectedRuns = testNamespaceRuns.stream() + .filter(detail -> detail.getRun().getType().equals("PUSH")) + .collect(Collectors.toList()); + Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray()); + + // verify the scan with status filter + gotRuns.clear(); + request = ScanOperationRunsRequest.builder() + .setNamespace(testNamespace) + .setFilter(new OperationRunFilter("PUSH", OperationRunStatus.FAILED)).build(); + expectedRuns = testNamespaceRuns.stream() + .filter(detail -> detail.getRun().getType().equals("PUSH")) + .filter(detail -> detail.getRun().getStatus().equals(OperationRunStatus.FAILED)) + .collect(Collectors.toList()); + Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray()); + + gotRuns.clear(); + request = ScanOperationRunsRequest.builder() + .setNamespace(testNamespace).setLimit(20) + .setFilter(new OperationRunFilter("PULL", OperationRunStatus.FAILED)).build(); + store.scanOperations(request, gotRuns::add); + expectedRuns = testNamespaceRuns.stream() + .filter(detail -> detail.getRun().getType().equals("PULL")) + .filter(detail -> detail.getRun().getStatus().equals(OperationRunStatus.FAILED)) + .collect(Collectors.toList()); + Assert.assertArrayEquals(expectedRuns.toArray(), gotRuns.toArray()); + }, Exception.class); + } + + private OperationRunDetail insertRun(String namespace, String type, + OperationRunStatus status) + throws IOException, OperationRunAlreadyExistsException { + long startTime = runIdTime.incrementAndGet(); + String id = RunIds.generate(startTime).getId(); + OperationRun run = OperationRun.builder() + .setRunId(id) + .setStatus(status) + .setType(type) + .setMetadata( + new OperationMeta(Collections.emptySet(), Instant.ofEpochMilli(startTime), null)) + .build(); + OperationRunId runId = new OperationRunId(namespace, id); + OperationRunDetail detail = OperationRunDetail.builder() + .setSourceId(AppFabricTestHelper.createSourceId(sourceId.incrementAndGet())) + .setRunId(runId) + .setRun(run) + .setRequest("") + .build(); + TransactionRunners.run(transactionRunner, context -> { + OperationRunStore operationRunStore = new OperationRunStore(context); + operationRunStore.createOperationRun(runId, detail); + }, IOException.class, OperationRunAlreadyExistsException.class); + return detail; + } + + private List> insertTestRuns() throws Exception { + List> details = new ArrayList<>(); + // insert 10 runs with increasing start time in two namespaces + // 5 would be in running state 5 in Failed + // 5 would be of type PUSH 5 would be of type PULL + for (int i = 0; i < 5; i++) { + details.add(insertRun(testNamespace, "PUSH", OperationRunStatus.RUNNING)); + details.add(insertRun(Namespace.DEFAULT.getId(), "PUSH", OperationRunStatus.RUNNING)); + details.add(insertRun(testNamespace, "PULL", OperationRunStatus.FAILED)); + details.add(insertRun(Namespace.DEFAULT.getId(), "PULL", OperationRunStatus.RUNNING)); + } + // The runs are added in increasing start time, hence reversing the List + Collections.reverse(details); + return details; + } + +} diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operations/SqlOperationRunsStoreTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operations/SqlOperationRunsStoreTest.java new file mode 100644 index 000000000000..379ac906cca4 --- /dev/null +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operations/SqlOperationRunsStoreTest.java @@ -0,0 +1,75 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operations; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Scopes; +import io.cdap.cdap.api.metrics.MetricsCollectionService; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.guice.ConfigModule; +import io.cdap.cdap.common.guice.LocalLocationModule; +import io.cdap.cdap.common.metrics.NoOpMetricsCollectionService; +import io.cdap.cdap.data.runtime.StorageModule; +import io.cdap.cdap.data.runtime.SystemDatasetRuntimeModule; +import io.cdap.cdap.spi.data.StructuredTableAdmin; +import io.cdap.cdap.spi.data.TableAlreadyExistsException; +import io.cdap.cdap.spi.data.sql.PostgresInstantiator; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import io.cdap.cdap.store.StoreDefinition; +import io.zonky.test.db.postgres.embedded.EmbeddedPostgres; +import java.io.IOException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.rules.TemporaryFolder; + +public class SqlOperationRunsStoreTest extends OperationRunStoreTest { + + @ClassRule + public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); + + private static EmbeddedPostgres pg; + + @BeforeClass + public static void beforeClass() throws IOException, TableAlreadyExistsException { + CConfiguration cConf = CConfiguration.create(); + pg = PostgresInstantiator.createAndStart(cConf, TEMP_FOLDER.newFolder()); + Injector injector = Guice.createInjector( + new ConfigModule(cConf), + new LocalLocationModule(), + new SystemDatasetRuntimeModule().getInMemoryModules(), + new StorageModule(), + new AbstractModule() { + @Override + protected void configure() { + bind(MetricsCollectionService.class).to(NoOpMetricsCollectionService.class) + .in(Scopes.SINGLETON); + } + } + ); + + transactionRunner = injector.getInstance(TransactionRunner.class); + StoreDefinition.OperationRunsStore.create(injector.getInstance(StructuredTableAdmin.class)); + } + + @AfterClass + public static void afterClass() throws IOException { + pg.close(); + } +} diff --git a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java index 77fa448bfe71..5511dc91f4af 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java +++ b/cdap-common/src/main/java/io/cdap/cdap/common/conf/Constants.java @@ -1085,6 +1085,9 @@ public static final class Tag { public static final String SCHEDULE = "sch"; public static final String METADATA_CONSUMER = "met"; + + // For operations + public static final String OPERATION_RUN = "operation"; } /** diff --git a/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/OperationRunDetail.java b/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/OperationRunDetail.java index 7aefc37ef062..e137757229fd 100644 --- a/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/OperationRunDetail.java +++ b/cdap-common/src/main/java/io/cdap/cdap/internal/app/store/OperationRunDetail.java @@ -18,6 +18,7 @@ import com.google.common.base.Objects; import com.google.gson.annotations.SerializedName; +import io.cdap.cdap.proto.id.OperationRunId; import io.cdap.cdap.proto.operationrun.OperationRun; import java.util.Arrays; import javax.annotation.Nullable; @@ -32,6 +33,9 @@ */ public class OperationRunDetail { + // carries the OperationRunId, but we don't need to serialize it as it is already in the key of the store + private final transient OperationRunId runId; + @SerializedName("run") private final OperationRun run; @@ -47,8 +51,10 @@ public class OperationRunDetail { @SerializedName("request") private final T request; - protected OperationRunDetail(OperationRun run, byte[] sourceId, @Nullable String principal, + protected OperationRunDetail(OperationRunId runId, OperationRun run, byte[] sourceId, + @Nullable String principal, T request) { + this.runId = runId; this.run = run; this.sourceId = sourceId; this.principal = principal; @@ -73,6 +79,10 @@ public OperationRun getRun() { return run; } + public OperationRunId getRunId() { + return runId; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -116,6 +126,7 @@ public static Builder builder(OperationRunDetail detail) { */ public static class Builder { + protected OperationRunId runId; protected OperationRun run; protected byte[] sourceId; protected String principal; @@ -129,6 +140,7 @@ protected Builder(OperationRunDetail detail) { principal = detail.getPrincipal(); request = detail.getRequest(); run = detail.getRun(); + runId = detail.getRunId(); } public Builder setSourceId(byte[] sourceId) { @@ -151,10 +163,18 @@ public Builder setRun(OperationRun run) { return this; } + public Builder setRunId(OperationRunId runId) { + this.runId = runId; + return this; + } + /** * Validates input and returns a OperationRunDetail. */ public OperationRunDetail build() { + if (runId == null) { + throw new IllegalArgumentException("run id must be specified."); + } if (request == null) { throw new IllegalArgumentException("Operation run request must be specified."); } @@ -165,7 +185,7 @@ public OperationRunDetail build() { throw new IllegalArgumentException("Operation run must be specified."); } - return new OperationRunDetail<>(run, sourceId, principal, request); + return new OperationRunDetail<>(runId, run, sourceId, principal, request); } } } diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStructuredTable.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStructuredTable.java index 114fabf86f0a..19d7ff3aeb75 100644 --- a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStructuredTable.java +++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStructuredTable.java @@ -65,6 +65,9 @@ public class PostgreSqlStructuredTable implements StructuredTable { private final FieldValidator fieldValidator; private final int fetchSize; + /** + * Default constructor for PostgreSqlStructuredTable. + */ public PostgreSqlStructuredTable(Connection connection, StructuredTableSchema tableSchema, int fetchSize) { this.connection = connection; @@ -272,9 +275,9 @@ public CloseableIterator multiScan(Collection keyRanges, * Generates a SELECT query for scanning over all the provided ranges. For each of the range, it * generates a where clause using the {@link #appendRange(StringBuilder, Range)} method. The where * clause of each range are OR together. E.g. - *

- * SELECT * FROM table WHERE (key1 = ? AND key2 = ?) OR (key1 = ? AND key2 = ?) OR ((key3 >= ?) - * AND (key3 <= ?)) OR ((key4 >= ?) AND (key4 <= ?)) LIMIT limit + * + *

SELECT * FROM table WHERE (key1 = ? AND key2 = ?) OR (key1 = ? AND key2 = ?) OR ((key3 >= + * ?) AND (key3 <= ?)) OR ((key4 >= ?) AND (key4 <= ?)) LIMIT limit * * @param singletonRanges the list of singleton ranges to scan * @param ranges the list of ranges to scan @@ -362,6 +365,12 @@ public CloseableIterator scan(Range keyRange, int limit, public CloseableIterator scan(Range keyRange, int limit, Collection> filterIndexes, SortOrder sortOrder) throws InvalidFieldException, IOException { + return scan(keyRange, limit, filterIndexes, tableSchema.getPrimaryKeys(), sortOrder); + } + + private CloseableIterator scan(Range keyRange, int limit, + Collection> filterIndexes, Collection fieldsToSort, SortOrder sortOrder) + throws InvalidFieldException, IOException { fieldValidator.validateScanRange(keyRange); filterIndexes.forEach(fieldValidator::validateField); if (!tableSchema.isIndexColumns( @@ -373,7 +382,7 @@ public CloseableIterator scan(Range keyRange, int limit, LOG.trace("Table {}: Scan range {} with filterIndexes {} limit {} sortOrder {}", tableSchema.getTableId(), keyRange, filterIndexes, limit, sortOrder); - String scanQuery = getScanIndexesQuery(keyRange, limit, filterIndexes, sortOrder); + String scanQuery = getScanIndexesQuery(keyRange, limit, filterIndexes, fieldsToSort, sortOrder); // Since in getScanIndexesQuery we directly set the NULL checks, we need to skip the null fields filterIndexes = filterIndexes.stream().filter(f -> f.getValue() != null) .collect(Collectors.toList()); @@ -788,7 +797,8 @@ private void setStatementFieldByRange(Range keyRange, } /** - * Sets the {@link PreparedStatement} arguments by the key {@link Collection}. + * Sets the {@link PreparedStatement} arguments by the key + * {@link Collection}<{@link Range}>}. */ private void setStatementFieldByRange(Collection keyRanges, PreparedStatement statement) throws SQLException, InvalidFieldException { @@ -931,18 +941,24 @@ private String getScanQuery(Range range, int limit, Collection fieldsToS * @return the scan query */ private String getScanIndexesQuery(Range range, int limit, Collection> filterIndexes, - SortOrder sortOrder) { + Collection fieldsToSort, SortOrder sortOrder) { StringBuilder queryString = new StringBuilder("SELECT * FROM ") .append(tableSchema.getTableId().getName()) .append(" WHERE "); if (!range.getBegin().isEmpty() || !range.getEnd().isEmpty()) { appendRange(queryString, range); - queryString.append(" AND "); } - queryString.append(getIndexesFilterClause(filterIndexes)); - queryString.append(getOrderByClause(tableSchema.getPrimaryKeys(), sortOrder)); + if (!filterIndexes.isEmpty()) { + // only add AND if there was range clause applied + if (!range.getBegin().isEmpty() || !range.getEnd().isEmpty()) { + queryString.append(" AND "); + } + queryString.append(getIndexesFilterClause(filterIndexes)); + } + + queryString.append(getOrderByClause(fieldsToSort, sortOrder)); queryString.append(" LIMIT ").append(limit).append(";"); return queryString.toString(); } diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/element/EntityType.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/element/EntityType.java index 3d1df5fa63e8..61137a651f79 100644 --- a/cdap-proto/src/main/java/io/cdap/cdap/proto/element/EntityType.java +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/element/EntityType.java @@ -28,6 +28,7 @@ import io.cdap.cdap.proto.id.InstanceId; import io.cdap.cdap.proto.id.KerberosPrincipalId; import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.proto.id.OperationRunId; import io.cdap.cdap.proto.id.PluginId; import io.cdap.cdap.proto.id.ProfileId; import io.cdap.cdap.proto.id.ProgramId; @@ -78,7 +79,8 @@ public enum EntityType { SYSTEM_APP_ENTITY(SystemAppEntityId.class), CREDENTIAL_PROFILE(CredentialProfileId.class), - CREDENTIAL_IDENTITY(CredentialIdentityId.class); + CREDENTIAL_IDENTITY(CredentialIdentityId.class), + OPERATION_RUN(OperationRunId.class); private final Class idClass; @Nullable diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/id/OperationRunId.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/id/OperationRunId.java new file mode 100644 index 000000000000..17392c70008f --- /dev/null +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/id/OperationRunId.java @@ -0,0 +1,94 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.proto.id; + +import io.cdap.cdap.api.metadata.MetadataEntity; +import io.cdap.cdap.proto.element.EntityType; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.Objects; + +/** + * Uniquely identifies an operation run. + */ +public class OperationRunId extends NamespacedEntityId implements ParentedId { + + private final String run; + + /** + * Default constructor. + */ + public OperationRunId(String namespace, String run) { + super(namespace, EntityType.OPERATION_RUN); + if (run == null) { + throw new NullPointerException("run id cannot be null."); + } + this.run = run; + } + + @Override + public String getEntityName() { + return getRun(); + } + + public String getRun() { + return this.run; + } + + @Override + public MetadataEntity toMetadataEntity() { + return MetadataEntity.builder().append(MetadataEntity.NAMESPACE, namespace) + .appendAsType(MetadataEntity.APPLICATION, run) + .build(); + } + + @Override + public NamespaceId getParent() { + return new NamespaceId(namespace); + } + + @Override + public boolean equals(Object o) { + if (!super.equals(o)) { + return false; + } + OperationRunId that = (OperationRunId) o; + return Objects.equals(namespace, that.namespace) + && Objects.equals(run, that.run); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), namespace, run); + } + + @SuppressWarnings("unused") + public static OperationRunId fromIdParts(Iterable idString) { + Iterator iterator = idString.iterator(); + return new OperationRunId(next(iterator, "namespace"), next(iterator, "run")); + } + + @Override + public Iterable toIdParts() { + return Collections.unmodifiableList(Arrays.asList(namespace, run)); + } + + public static OperationRunId fromString(String string) { + return EntityId.fromString(string, OperationRunId.class); + } +} diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/operationrun/OperationMeta.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/operationrun/OperationMeta.java index 9d1cb31793e3..557398fce61b 100644 --- a/cdap-proto/src/main/java/io/cdap/cdap/proto/operationrun/OperationMeta.java +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/operationrun/OperationMeta.java @@ -67,4 +67,17 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(resources, createTime, endTime); } + + public Set getResources() { + return resources; + } + + public Instant getCreateTime() { + return createTime; + } + + @Nullable + public Instant getEndTime() { + return endTime; + } } diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/operationrun/OperationRun.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/operationrun/OperationRun.java index f8e821660414..499cbf0aeeab 100644 --- a/cdap-proto/src/main/java/io/cdap/cdap/proto/operationrun/OperationRun.java +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/operationrun/OperationRun.java @@ -53,7 +53,7 @@ protected OperationRun(String id, String type, OperationRunStatus status, Operat @Nullable OperationError error) { this.id = id; this.type = type; - this.done = getStatus().isEndState(); + this.done = status.isEndState(); this.status = status; this.metadata = metadata; this.error = error; @@ -86,7 +86,7 @@ public int hashCode() { /** * Creates a OperationRun Builder. */ - public static Builder runBuilder() { + public static Builder builder() { return new Builder(); } @@ -95,7 +95,7 @@ public static Builder runBuilder() { * * @param operationRun existing record to copy fields from */ - public static Builder runBuilder(OperationRun operationRun) { + public static Builder builder(OperationRun operationRun) { return new Builder(operationRun); } diff --git a/cdap-security/src/main/java/io/cdap/cdap/security/authorization/DefaultAccessEnforcer.java b/cdap-security/src/main/java/io/cdap/cdap/security/authorization/DefaultAccessEnforcer.java index 3b69d0d85078..ac42db5d7c64 100644 --- a/cdap-security/src/main/java/io/cdap/cdap/security/authorization/DefaultAccessEnforcer.java +++ b/cdap-security/src/main/java/io/cdap/cdap/security/authorization/DefaultAccessEnforcer.java @@ -329,6 +329,9 @@ private static void addTagsForEntityId(Map tags, EntityId entity case PROFILE: tags.put(Constants.Metrics.Tag.PROFILE, entityId.getEntityName()); break; + case OPERATION_RUN: + tags.put(Constants.Metrics.Tag.OPERATION_RUN, entityId.getEntityName()); + break; default: // No tags to set } diff --git a/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/StructuredTable.java b/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/StructuredTable.java index c3e242f16ba2..82a1d3b90e80 100644 --- a/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/StructuredTable.java +++ b/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/StructuredTable.java @@ -280,7 +280,7 @@ void increment(Collection> keys, String column, long amount) void deleteAll(Range keyRange) throws InvalidFieldException, IOException; /** - * Updates the specific fields in a range of rows from the table + * Updates the specific fields in a range of rows from the table. * * @param keyRange key range of the rows to update: cab only be a primary key prefix * @param fields the fields to write to