From 60d338b741c1fd7468d6935d183a930ee4384bea Mon Sep 17 00:00:00 2001 From: samik Date: Wed, 13 Sep 2023 23:29:18 +0530 Subject: [PATCH] Add operation run store methods --- .../OperationRunAlreadyExistsException.java | 29 ++ .../operations/OperationRunFilter.java | 45 ++ .../OperationRunNotFoundException.java | 28 ++ .../operations/OperationRunsStore.java | 388 ++++++++++++++++++ .../operations/ScanOperationRunsRequest.java | 181 ++++++++ .../operations/OperationRunsStoreTest.java | 269 ++++++++++++ .../operations/SqlOperationRunsStoreTest.java | 75 ++++ .../data/common/MetricStructuredTable.java | 8 + .../data/sql/PostgreSqlStructuredTable.java | 159 ++++--- .../proto/operationrun/OperationMeta.java | 13 + .../cdap/proto/operationrun/OperationRun.java | 6 +- .../cdap/cdap/spi/data/StructuredTable.java | 23 +- 12 files changed, 1155 insertions(+), 69 deletions(-) create mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunAlreadyExistsException.java create mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunFilter.java create mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunNotFoundException.java create mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunsStore.java create mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/ScanOperationRunsRequest.java create mode 100644 cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operations/OperationRunsStoreTest.java create mode 100644 cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operations/SqlOperationRunsStoreTest.java diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunAlreadyExistsException.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunAlreadyExistsException.java new file mode 100644 index 000000000000..1defb6ebeced --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunAlreadyExistsException.java @@ -0,0 +1,29 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package io.cdap.cdap.internal.operations; + +import io.cdap.cdap.common.BadRequestException; +import io.cdap.cdap.proto.operationrun.OperationRunStatus; + +/** + * Thrown when an operation run already exists. + */ +public class OperationRunAlreadyExistsException extends BadRequestException { + + public OperationRunAlreadyExistsException(String operationId, OperationRunStatus status) { + super(String.format("Operation %s already exists with status %s", operationId, status)); + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunFilter.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunFilter.java new file mode 100644 index 000000000000..70f84c5fc90f --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunFilter.java @@ -0,0 +1,45 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operations; + +import io.cdap.cdap.proto.operationrun.OperationRunStatus; +import javax.annotation.Nullable; + +/** + * This class defined various filters that can be applied during operation runs scanning. + */ +public class OperationRunFilter { + @Nullable + private final String operationType; + @Nullable + private final OperationRunStatus status; + // TODO(samik) status and type filters as list + public OperationRunFilter(@Nullable String operationType, @Nullable OperationRunStatus status) { + this.operationType = operationType; + this.status = status; + } + + @Nullable + public String getOperationType() { + return operationType; + } + + @Nullable + public OperationRunStatus getStatus() { + return status; + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunNotFoundException.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunNotFoundException.java new file mode 100644 index 000000000000..53b0706d6b7a --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunNotFoundException.java @@ -0,0 +1,28 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operations; + +import io.cdap.cdap.common.NotFoundException; + +/** + * Exception thrown when an operation run with the specified id not found in the specified namespace + */ +public class OperationRunNotFoundException extends NotFoundException { + public OperationRunNotFoundException(String namespace, String runId) { + super(String.format("Operation run %s does not exist in namespace %s", runId, namespace)); + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunsStore.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunsStore.java new file mode 100644 index 000000000000..0b510f11f3a0 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/OperationRunsStore.java @@ -0,0 +1,388 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operations; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import io.cdap.cdap.api.dataset.lib.CloseableIterator; +import io.cdap.cdap.common.NotFoundException; +import io.cdap.cdap.internal.app.store.OperationRunDetail; +import io.cdap.cdap.proto.operationrun.OperationError; +import io.cdap.cdap.proto.operationrun.OperationMeta; +import io.cdap.cdap.proto.operationrun.OperationRun; +import io.cdap.cdap.proto.operationrun.OperationRunStatus; +import io.cdap.cdap.spi.data.SortOrder; +import io.cdap.cdap.spi.data.StructuredRow; +import io.cdap.cdap.spi.data.StructuredTable; +import io.cdap.cdap.spi.data.StructuredTableContext; +import io.cdap.cdap.spi.data.table.field.Field; +import io.cdap.cdap.spi.data.table.field.Fields; +import io.cdap.cdap.spi.data.table.field.Range; +import io.cdap.cdap.store.StoreDefinition; +import java.io.IOException; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Function; +import javax.annotation.Nullable; +import javax.inject.Inject; + +/** + * Store for operation runs. + */ +public class OperationRunsStore { + + private static final Gson GSON = new GsonBuilder().create(); + private final StructuredTableContext context; + + private static final String SMALLEST_POSSIBLE_STRING = ""; + + @Inject + public OperationRunsStore(StructuredTableContext context) { + this.context = context; + } + + /** + * Create a new operation. If a operation with same id exist throw exception. + * + * @param namespace namespace of the operation + * @param id id of the operation + * @param detail the run details of the operation + * @throws OperationRunAlreadyExistsException when a run with same id exist in namespace + */ + public void createOperationRun(String namespace, String id, OperationRunDetail detail) + throws OperationRunAlreadyExistsException, IOException { + Optional row = getOperationRunInternal(namespace, id); + if (row.isPresent()) { + OperationRunStatus status = OperationRunStatus.valueOf( + row.get().getString(StoreDefinition.OperationRunsStore.STATUS_FIELD)); + throw new OperationRunAlreadyExistsException(id, status); + } + writeOperationRun(namespace, id, detail); + } + + /** + * Update the metadata of an operation run. + * + * @param namespace namespace of the operation + * @param id id of the run + * @param metadata new metdata of the run + * @param sourceId the message id which is responsible for the update + * @throws OperationRunNotFoundException run with id does not exist in namespace + */ + public void updateOperationMeta(String namespace, String id, OperationMeta metadata, + @Nullable byte[] sourceId) throws OperationRunNotFoundException, IOException { + OperationRunDetail currentDetail = getCurrentRunDetail(namespace, id); + OperationRun currentRun = currentDetail.getRun(); + OperationRun updatedRun = OperationRun.builder(currentRun) + .setMetadata(metadata).build(); + OperationRunDetail updatedDetail = OperationRunDetail.builder(currentDetail) + .setRun(updatedRun).setSourceId(sourceId).build(); + + Collection> fields = getCommonUpdateFields(namespace, id); + fields.add( + Fields.stringField(StoreDefinition.OperationRunsStore.DETAILS_FIELD, + GSON.toJson(updatedDetail))); + + StructuredTable operationRunsTable = getOperationRunsTable(context); + operationRunsTable.update(fields); + } + + /** + * Update the status of an operation run. + * + * @param namespace namespace of the operation + * @param id id of the run + * @param status new metadata of the run + * @param sourceId the message id which is responsible for the update + * @throws OperationRunNotFoundException run with id does not exist in namespace + */ + public void updateOperationStatus(String namespace, String id, OperationRunStatus status, + @Nullable byte[] sourceId) throws OperationRunNotFoundException, IOException { + OperationRunDetail currentDetail = getCurrentRunDetail(namespace, id); + OperationRun currentRun = currentDetail.getRun(); + OperationRun updatedRun = OperationRun.builder(currentRun) + .setStatus(status).build(); + OperationRunDetail updatedDetail = OperationRunDetail.builder(currentDetail) + .setRun(updatedRun).setSourceId(sourceId).build(); + + Collection> fields = getCommonUpdateFields(namespace, id); + fields.add( + Fields.stringField(StoreDefinition.OperationRunsStore.DETAILS_FIELD, + GSON.toJson(updatedDetail))); + fields.add( + Fields.stringField(StoreDefinition.OperationRunsStore.STATUS_FIELD, + GSON.toJson(status))); + + StructuredTable operationRunsTable = getOperationRunsTable(context); + operationRunsTable.update(fields); + } + + /** + * Mark an operation run as failed. + * + * @param namespace namespace of the operation + * @param id id of the run + * @param error error related to the failure + * @param sourceId the message id which is responsible for the update + * @throws OperationRunNotFoundException run with id does not exist in namespace + */ + public void failOperationRun(String namespace, String id, OperationError error, + @Nullable byte[] sourceId) throws OperationRunNotFoundException, IOException { + OperationRunDetail currentDetail = getCurrentRunDetail(namespace, id); + OperationRun currentRun = currentDetail.getRun(); + OperationRun updatedRun = OperationRun.builder(currentRun) + .setStatus(OperationRunStatus.FAILED).setError(error).build(); + OperationRunDetail updatedDetail = OperationRunDetail.builder(currentDetail) + .setRun(updatedRun).setSourceId(sourceId).build(); + + Collection> fields = getCommonUpdateFields(namespace, id); + fields.add( + Fields.stringField(StoreDefinition.OperationRunsStore.DETAILS_FIELD, + GSON.toJson(updatedDetail))); + fields.add( + Fields.stringField(StoreDefinition.OperationRunsStore.STATUS_FIELD, + GSON.toJson(OperationRunStatus.FAILED))); + + StructuredTable operationRunsTable = getOperationRunsTable(context); + operationRunsTable.update(fields); + } + + /** + * Get an operation run by id. + * + * @param namespace namespace of the operation + * @param id id of the run + * @throws OperationRunNotFoundException run with id does not exist in namespace + */ + public OperationRunDetail getOperation(String namespace, String id) + throws NotFoundException, IOException { + Optional row = getOperationRunInternal(namespace, id); + if (!row.isPresent()) { + throw new OperationRunNotFoundException(namespace, id); + } + return GSON.fromJson( + row.get().getString(StoreDefinition.OperationRunsStore.DETAILS_FIELD), + OperationRunDetail.class + ); + } + + /** + * Scan operations in a namespace. + * + * @param request scan request including filters and limit + * @param txBatchSize batch size of transaction + */ + public boolean scanRuns(ScanOperationRunsRequest request, int txBatchSize, + BiConsumer> consumer) + throws OperationRunNotFoundException, IOException { + + AtomicReference requestRef = new AtomicReference<>(request); + AtomicReference lastKey = new AtomicReference<>(); + AtomicInteger currentLimit = new AtomicInteger(request.getLimit()); + + while (currentLimit.get() > 0) { + AtomicInteger count = new AtomicInteger(); + + scanRunsInternal(requestRef.get(), entry -> { + lastKey.set(entry.getKey()); + currentLimit.decrementAndGet(); + consumer.accept(entry.getKey(), entry.getValue()); + return count.incrementAndGet() < txBatchSize && currentLimit.get() > 0; + }); + + if (lastKey.get() == null) { + break; + } + + ScanOperationRunsRequest nextBatchRequest = ScanOperationRunsRequest + .builder(requestRef.get()) + .setScanFrom(lastKey.get()) + .setLimit(currentLimit.get()) + .build(); + requestRef.set(nextBatchRequest); + lastKey.set(null); + } + return currentLimit.get() == 0; + } + + + private void scanRunsInternal(ScanOperationRunsRequest request, + Function>, Boolean> func) + throws IOException, OperationRunNotFoundException { + Range.Bound startBound = Range.Bound.INCLUSIVE; + Range.Bound endBound = Range.Bound.INCLUSIVE; + Collection> startFields = new ArrayList<>(); + + startFields.add(Fields.stringField(StoreDefinition.OperationRunsStore.NAMESPACE_FIELD, + request.getNamespace())); + + Collection> endFields = startFields; + + if (request.getScanFrom() != null) { + startBound = Range.Bound.EXCLUSIVE; + startFields = getRangeFields(request.getNamespace(), request.getScanFrom()); + } + if (request.getScanTo() != null) { + endBound = Range.Bound.EXCLUSIVE; + endFields = getRangeFields(request.getNamespace(), request.getScanTo()); + } + + Range range = Range.create(endFields, endBound, startFields, startBound); + Collection> filterIndexes = getFilterIndexes(request.getFilter()); + int limit = request.getLimit(); + + StructuredTable table = getOperationRunsTable(context); + try (CloseableIterator iterator = table.scan(range, limit, + filterIndexes, StoreDefinition.OperationRunsStore.START_TIME_FIELD, SortOrder.DESC)) { + boolean keepScanning = true; + while (iterator.hasNext() && keepScanning && limit > 0) { + StructuredRow row = iterator.next(); + Map.Entry> scanEntry = + new SimpleImmutableEntry>( + row.getString(StoreDefinition.OperationRunsStore.ID_FIELD), + GSON.fromJson(row.getString(StoreDefinition.OperationRunsStore.DETAILS_FIELD), + OperationRunDetail.class) + ); + keepScanning = func.apply(scanEntry); + limit--; + } + } + } + + private Collection> getFilterIndexes(OperationRunFilter filter) { + Collection> filterIndexes = new ArrayList<>(); + if (filter == null) { + return filterIndexes; + } + if (filter.getOperationType() != null) { + filterIndexes.add(Fields.stringField(StoreDefinition.OperationRunsStore.TYPE_FIELD, + filter.getOperationType())); + } + if (filter.getStatus() != null) { + filterIndexes.add(Fields.stringField(StoreDefinition.OperationRunsStore.STATUS_FIELD, + filter.getStatus().toString())); + } + return filterIndexes; + } + + private List> getRangeFields(String namespace, String runId) + throws IOException, OperationRunNotFoundException { + List> fields = new ArrayList<>(); + fields.add( + Fields.stringField(StoreDefinition.OperationRunsStore.NAMESPACE_FIELD, namespace)); + fields.add(Fields.stringField(StoreDefinition.OperationRunsStore.ID_FIELD, runId)); + Long startTime = getStartTime(namespace, runId); + fields.add( + Fields.longField(StoreDefinition.OperationRunsStore.START_TIME_FIELD, startTime)); + return fields; + } + + private OperationRunDetail getCurrentRunDetail(String namespace, String id) + throws IOException, OperationRunNotFoundException { + Optional currentRow = getOperationRunInternal(namespace, id); + if (!currentRow.isPresent()) { + throw new OperationRunNotFoundException(namespace, id); + } + return GSON.fromJson( + currentRow.get().getString(StoreDefinition.OperationRunsStore.DETAILS_FIELD), + OperationRunDetail.class + ); + } + + private long getStartTime(String namespace, String id) + throws IOException, OperationRunNotFoundException { + Optional currentRow = getOperationRunInternal(namespace, id); + if (!currentRow.isPresent()) { + throw new OperationRunNotFoundException(namespace, id); + } + return currentRow.get().getLong(StoreDefinition.OperationRunsStore.START_TIME_FIELD); + } + + private Collection> getCommonUpdateFields(String namespace, String id) { + Collection> fields = new ArrayList<>(); + fields.add( + Fields.stringField(StoreDefinition.OperationRunsStore.NAMESPACE_FIELD, namespace)); + fields.add( + Fields.stringField(StoreDefinition.OperationRunsStore.ID_FIELD, id)); + fields.add( + Fields.longField(StoreDefinition.OperationRunsStore.UPDATE_TIME_FIELD, + System.currentTimeMillis())); + return fields; + } + + private Optional getOperationRunInternal(String namespace, String operationId) + throws IOException { + StructuredTable operationRunsTable = getOperationRunsTable(context); + Collection> key = ImmutableList.of( + Fields.stringField(StoreDefinition.OperationRunsStore.NAMESPACE_FIELD, namespace), + Fields.stringField(StoreDefinition.OperationRunsStore.ID_FIELD, operationId) + ); + return operationRunsTable.read(key); + } + + private void writeOperationRun(String namespace, String id, OperationRunDetail detail) + throws IOException { + Collection> fields = new ArrayList<>(); + fields.add( + Fields.stringField(StoreDefinition.OperationRunsStore.ID_FIELD, id)); + fields.add( + Fields.stringField(StoreDefinition.OperationRunsStore.NAMESPACE_FIELD, namespace)); + fields.add( + Fields.stringField(StoreDefinition.OperationRunsStore.STATUS_FIELD, + detail.getRun().getStatus().toString())); + fields.add( + Fields.stringField(StoreDefinition.OperationRunsStore.TYPE_FIELD, + detail.getRun().getType())); + fields.add( + Fields.longField(StoreDefinition.OperationRunsStore.START_TIME_FIELD, + detail.getRun().getMetadata().getCreateTime().toEpochMilli())); + fields.add( + Fields.longField(StoreDefinition.OperationRunsStore.UPDATE_TIME_FIELD, + System.currentTimeMillis())); + fields.add( + Fields.stringField(StoreDefinition.OperationRunsStore.DETAILS_FIELD, + GSON.toJson(detail))); + StructuredTable operationRunsTable = getOperationRunsTable(context); + operationRunsTable.upsert(fields); + } + + private StructuredTable getOperationRunsTable(StructuredTableContext context) { + return context.getTable(StoreDefinition.OperationRunsStore.OPERATION_RUNS); + } + + @VisibleForTesting + // USE ONLY IN TESTS: WILL DELETE ALL OPERATION RUNS + public void deleteAllTables() throws IOException { + deleteTable(getOperationRunsTable(context), StoreDefinition.AppMetadataStore.NAMESPACE_FIELD); + } + + private void deleteTable(StructuredTable table, String firstKey) throws IOException { + table.deleteAll( + Range.from(ImmutableList.of(Fields.stringField(firstKey, SMALLEST_POSSIBLE_STRING)), + Range.Bound.INCLUSIVE)); + } + +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/ScanOperationRunsRequest.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/ScanOperationRunsRequest.java new file mode 100644 index 000000000000..df51bdeb2d59 --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/operations/ScanOperationRunsRequest.java @@ -0,0 +1,181 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operations; + +import javax.annotation.Nullable; + +/** + * Defines parameters of operation run scan. + */ +public class ScanOperationRunsRequest { + + private final String namespace; + @Nullable + private final String scanToRunId; + private final String scanFromRunId; + private final int limit; + @Nullable + private final OperationRunFilter filter; + + /** + * Constructor for ScanOperationRunsRequest. + * + * @param namespace namespace to return runs for + * @param scanFromRunId run id to start scan from (exclusive) + * @param scanToRunId run id to stop scan at (exclusive) + * @param filter additional filters to apply + * @param limit maximum number of records to return + */ + private ScanOperationRunsRequest(String namespace, @Nullable String scanToRunId, + String scanFromRunId, int limit, @Nullable OperationRunFilter filter) { + this.namespace = namespace; + this.scanToRunId = scanToRunId; + this.scanFromRunId = scanFromRunId; + this.limit = limit; + this.filter = filter; + } + + /** + * namespace to return applications for or null for all namespaces. + */ + @Nullable + public String getNamespace() { + return namespace; + } + + /** + * run id to start scan from (exclusive). + */ + @Nullable + public String getScanFrom() { + return scanFromRunId; + } + + /** + * run id to stop scan at (exclusive). + */ + @Nullable + public String getScanTo() { + return scanToRunId; + } + + /** + * additional filters to apply. All filters must be satisfied (and operation). + */ + public OperationRunFilter getFilter() { + return filter; + } + + /** + * maximum number of records to read. + */ + public int getLimit() { + return limit; + } + + /** + * Builder to create a new {@link ScanOperationRunsRequest}. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Builder to create a new {@link ScanOperationRunsRequest} prefilled with passed in request + * values. + */ + public static Builder builder(ScanOperationRunsRequest request) { + return new Builder(request); + } + + /** + * Builder for {@link ScanOperationRunsRequest}. + */ + public static class Builder { + + private String namespace; + @Nullable + private String scanToRunId; + @Nullable + private String scanFromRunId; + @Nullable + private OperationRunFilter filter; + private int limit = Integer.MAX_VALUE; + + private Builder() { + } + + private Builder(ScanOperationRunsRequest request) { + this.namespace = request.namespace; + this.scanFromRunId = request.scanFromRunId; + this.scanToRunId = request.scanToRunId; + this.filter = request.filter; + this.limit = request.limit; + } + + /** + * namespaceId namespace to scan in. + */ + public Builder setNamespace(String namespace) { + this.namespace = namespace; + return this; + } + + /** + * restart the scan after specific run id. Useful for pagination. + */ + public Builder setScanFrom(String scanFromRunId) { + this.scanFromRunId = scanFromRunId; + return this; + } + + /** + * stop the scan before specific run id. + */ + public Builder setScanTo(String scanToRunId) { + this.scanToRunId = scanToRunId; + return this; + } + + /** + * filters to apply. + */ + public Builder setFilter( + OperationRunFilter filter) { + this.filter = filter; + return this; + } + + /** + * limit maximum number of records to scan. + */ + public Builder setLimit(int limit) { + this.limit = limit; + return this; + } + + /** + * return new {@link ScanOperationRunsRequest}. + */ + public ScanOperationRunsRequest build() { + if (namespace == null) { + throw new IllegalArgumentException("namespace must be specified."); + } + return new ScanOperationRunsRequest(namespace, scanToRunId, scanFromRunId, limit, filter); + } + } +} diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operations/OperationRunsStoreTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operations/OperationRunsStoreTest.java new file mode 100644 index 000000000000..f3742515e159 --- /dev/null +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operations/OperationRunsStoreTest.java @@ -0,0 +1,269 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operations; + +import io.cdap.cdap.common.app.RunIds; +import io.cdap.cdap.common.id.Id.Namespace; +import io.cdap.cdap.internal.AppFabricTestHelper; +import io.cdap.cdap.internal.app.store.AppMetadataStoreTest; +import io.cdap.cdap.internal.app.store.OperationRunDetail; +import io.cdap.cdap.proto.operationrun.OperationError; +import io.cdap.cdap.proto.operationrun.OperationMeta; +import io.cdap.cdap.proto.operationrun.OperationRun; +import io.cdap.cdap.proto.operationrun.OperationRunStatus; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import io.cdap.cdap.spi.data.transaction.TransactionRunners; +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class OperationRunsStoreTest { + + private static final Logger LOG = LoggerFactory.getLogger(AppMetadataStoreTest.class); + + protected static TransactionRunner transactionRunner; + private final AtomicInteger sourceId = new AtomicInteger(); + private final AtomicLong runIdTime = new AtomicLong(System.currentTimeMillis()); + private final String testNamespace = "test"; + + + @Before + public void before() throws Exception { + TransactionRunners.run(transactionRunner, context -> { + OperationRunsStore operationRunsStore = new OperationRunsStore(context); + operationRunsStore.deleteAllTables(); + }); + } + + @Test + public void testGetOperation() throws Exception { + OperationRunDetail expectedDetail = insertRun(testNamespace, "LIST", + OperationRunStatus.RUNNING); + String testId = expectedDetail.getRun().getId(); + + TransactionRunners.run(transactionRunner, context -> { + OperationRunsStore store = new OperationRunsStore(context); + OperationRunDetail gotDetail = (OperationRunDetail) store.getOperation( + testNamespace, testId); + Assert.assertEquals(expectedDetail, gotDetail); + try { + + store.getOperation(Namespace.DEFAULT.getId(), testId); + Assert.fail("Found unexpected run in default namespace"); + } catch (OperationRunNotFoundException e) { + // expected + } + }, Exception.class); + } + + @Test + public void testUpdateMetadata() throws Exception { + OperationRunDetail expectedDetail = insertRun(testNamespace, "LIST", + OperationRunStatus.RUNNING); + String testId = expectedDetail.getRun().getId(); + + TransactionRunners.run(transactionRunner, context -> { + OperationRunsStore store = new OperationRunsStore(context); + OperationRunDetail gotDetail = (OperationRunDetail) store.getOperation( + testNamespace, testId); + Assert.assertEquals(expectedDetail, gotDetail); + + OperationMeta updatedMeta = new OperationMeta(Collections.emptySet(), + Instant.ofEpochMilli(runIdTime.incrementAndGet()), + Instant.ofEpochMilli(runIdTime.incrementAndGet())); + OperationRun updatedRun = OperationRun.builder(expectedDetail.getRun()) + .setMetadata(updatedMeta).build(); + OperationRunDetail updatedDetail = OperationRunDetail.builder(expectedDetail) + .setRun(updatedRun) + .setSourceId(AppFabricTestHelper.createSourceId(sourceId.incrementAndGet())) + .build(); + store.updateOperationMeta(testNamespace, testId, updatedMeta, + updatedDetail.getSourceId()); + gotDetail = (OperationRunDetail) store.getOperation( + testNamespace, testId); + Assert.assertEquals(updatedDetail, gotDetail); + + try { + store.updateOperationMeta(Namespace.DEFAULT.getId(), testId, updatedMeta, + updatedDetail.getSourceId()); + Assert.fail("Found unexpected run in default namespace"); + } catch (OperationRunNotFoundException e) { + // expected + } + }, Exception.class); + } + + @Test + public void testUpdateStatus() throws Exception { + OperationRunDetail expectedDetail = insertRun(testNamespace, "LIST", + OperationRunStatus.RUNNING); + String testId = expectedDetail.getRun().getId(); + + TransactionRunners.run(transactionRunner, context -> { + OperationRunsStore store = new OperationRunsStore(context); + OperationRunDetail gotDetail = (OperationRunDetail) store.getOperation( + testNamespace, testId); + Assert.assertEquals(expectedDetail, gotDetail); + + OperationRun updatedRun = OperationRun.builder(expectedDetail.getRun()) + .setStatus(OperationRunStatus.STOPPING) + .build(); + OperationRunDetail updatedDetail = OperationRunDetail.builder(expectedDetail) + .setRun(updatedRun) + .setSourceId(AppFabricTestHelper.createSourceId(sourceId.incrementAndGet())) + .build(); + store.updateOperationStatus(testNamespace, testId, updatedRun.getStatus(), + updatedDetail.getSourceId()); + gotDetail = (OperationRunDetail) store.getOperation( + testNamespace, testId); + Assert.assertEquals(updatedDetail, gotDetail); + + try { + store.updateOperationStatus(Namespace.DEFAULT.getId(), testId, + updatedRun.getStatus(), updatedDetail.getSourceId()); + Assert.fail("Found unexpected run in default namespace"); + } catch (OperationRunNotFoundException e) { + // expected + } + }, Exception.class); + } + + @Test + public void testFailOperation() throws Exception { + OperationRunDetail expectedDetail = insertRun(testNamespace, "LIST", + OperationRunStatus.RUNNING); + String testId = expectedDetail.getRun().getId(); + + TransactionRunners.run(transactionRunner, context -> { + OperationRunsStore store = new OperationRunsStore(context); + OperationRunDetail gotDetail = (OperationRunDetail) store.getOperation( + testNamespace, testId); + Assert.assertEquals(expectedDetail, gotDetail); + + OperationError error = new OperationError("operation failed", Collections.emptyList()); + OperationRun updatedRun = OperationRun.builder(expectedDetail.getRun()) + .setStatus(OperationRunStatus.FAILED) + .setError(error) + .build(); + OperationRunDetail updatedDetail = OperationRunDetail.builder(expectedDetail) + .setRun(updatedRun) + .setSourceId(AppFabricTestHelper.createSourceId(sourceId.incrementAndGet())) + .build(); + store.failOperationRun(testNamespace, testId, error, + updatedDetail.getSourceId()); + gotDetail = (OperationRunDetail) store.getOperation( + testNamespace, testId); + Assert.assertEquals(updatedDetail, gotDetail); + + try { + store.failOperationRun(Namespace.DEFAULT.getId(), testId, error, + updatedDetail.getSourceId()); + Assert.fail("Found unexpected run in default namespace"); + } catch (OperationRunNotFoundException e) { + // expected + } + }, Exception.class); + } + + @Test + public void testScanOperation() throws Exception { + insertTestRuns(); + + // TODO(samik) verify the actual list + TransactionRunners.run(transactionRunner, context -> { + OperationRunsStore store = new OperationRunsStore(context); + + // verify the scan without filters picks all runs for testNamespace + List gotRuns = new ArrayList<>(); + ScanOperationRunsRequest request = ScanOperationRunsRequest.builder() + .setNamespace(testNamespace).setLimit(20).build(); + store.scanRuns(request, 5, (id, detail) -> gotRuns.add(detail)); + Assert.assertEquals(10, gotRuns.size()); + + // verify the scan with type filter + gotRuns.clear(); + request = ScanOperationRunsRequest.builder() + .setNamespace(testNamespace).setLimit(20) + .setFilter(new OperationRunFilter("PUSH", null)).build(); + store.scanRuns(request, 5, (id, detail) -> gotRuns.add(detail)); + Assert.assertEquals(5, gotRuns.size()); + + // verify the scan with status filter + gotRuns.clear(); + request = ScanOperationRunsRequest.builder() + .setNamespace(testNamespace).setLimit(20) + .setFilter(new OperationRunFilter("PUSH", OperationRunStatus.FAILED)).build(); + store.scanRuns(request, 5, (id, detail) -> gotRuns.add(detail)); + Assert.assertEquals(0, gotRuns.size()); + + gotRuns.clear(); + request = ScanOperationRunsRequest.builder() + .setNamespace(testNamespace).setLimit(20) + .setFilter(new OperationRunFilter("PULL", OperationRunStatus.FAILED)).build(); + store.scanRuns(request, 5, (id, detail) -> gotRuns.add(detail)); + Assert.assertEquals(5, gotRuns.size()); + }, Exception.class); + } + + private OperationRunDetail insertRun(String namespace, String type, + OperationRunStatus status) + throws IOException, OperationRunAlreadyExistsException { + long startTime = runIdTime.incrementAndGet(); + String id = RunIds.generate(startTime).getId(); + OperationRun run = OperationRun.builder() + .setRunId(id) + .setStatus(status) + .setType(type) + .setMetadata( + new OperationMeta(Collections.emptySet(), Instant.ofEpochMilli(startTime), null)) + .build(); + OperationRunDetail detail = OperationRunDetail.builder() + .setSourceId(AppFabricTestHelper.createSourceId(sourceId.incrementAndGet())) + .setRun(run) + .setRequest("") + .build(); + TransactionRunners.run(transactionRunner, context -> { + OperationRunsStore operationRunsStore = new OperationRunsStore(context); + operationRunsStore.createOperationRun(namespace, id, detail); + }, IOException.class, OperationRunAlreadyExistsException.class); + return detail; + } + + private void insertTestRuns() throws Exception { + // insert 10 runs with increasing start time in two namespaces + // 5 would be in running state 5 in Failed + // 5 would be of type PUSH 5 would be of type PULL + for (int i = 0; i < 5; i++) { + insertRun(testNamespace, "PUSH", + OperationRunStatus.RUNNING); + insertRun(Namespace.DEFAULT.getId(), "PUSH", OperationRunStatus.RUNNING); + insertRun(testNamespace, "PULL", + OperationRunStatus.FAILED); + insertRun(Namespace.DEFAULT.getId(), "PULL", OperationRunStatus.RUNNING); + } + } + +} diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operations/SqlOperationRunsStoreTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operations/SqlOperationRunsStoreTest.java new file mode 100644 index 000000000000..a08b6b1382b0 --- /dev/null +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/operations/SqlOperationRunsStoreTest.java @@ -0,0 +1,75 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.operations; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Scopes; +import io.cdap.cdap.api.metrics.MetricsCollectionService; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.guice.ConfigModule; +import io.cdap.cdap.common.guice.LocalLocationModule; +import io.cdap.cdap.common.metrics.NoOpMetricsCollectionService; +import io.cdap.cdap.data.runtime.StorageModule; +import io.cdap.cdap.data.runtime.SystemDatasetRuntimeModule; +import io.cdap.cdap.spi.data.StructuredTableAdmin; +import io.cdap.cdap.spi.data.TableAlreadyExistsException; +import io.cdap.cdap.spi.data.sql.PostgresInstantiator; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import io.cdap.cdap.store.StoreDefinition; +import io.zonky.test.db.postgres.embedded.EmbeddedPostgres; +import java.io.IOException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.rules.TemporaryFolder; + +public class SqlOperationRunsStoreTest extends OperationRunsStoreTest { + + @ClassRule + public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); + + private static EmbeddedPostgres pg; + + @BeforeClass + public static void beforeClass() throws IOException, TableAlreadyExistsException { + CConfiguration cConf = CConfiguration.create(); + pg = PostgresInstantiator.createAndStart(cConf, TEMP_FOLDER.newFolder()); + Injector injector = Guice.createInjector( + new ConfigModule(cConf), + new LocalLocationModule(), + new SystemDatasetRuntimeModule().getInMemoryModules(), + new StorageModule(), + new AbstractModule() { + @Override + protected void configure() { + bind(MetricsCollectionService.class).to(NoOpMetricsCollectionService.class) + .in(Scopes.SINGLETON); + } + } + ); + + transactionRunner = injector.getInstance(TransactionRunner.class); + StoreDefinition.OperationRunsStore.create(injector.getInstance(StructuredTableAdmin.class)); + } + + @AfterClass + public static void afterClass() throws IOException { + pg.close(); + } +} diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/common/MetricStructuredTable.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/common/MetricStructuredTable.java index 86b83ef53478..46c1950bf793 100644 --- a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/common/MetricStructuredTable.java +++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/common/MetricStructuredTable.java @@ -191,6 +191,14 @@ public CloseableIterator scan(Range keyRange, int limit, return scan(() -> structuredTable.scan(keyRange, limit, filterIndexes), "index.range.scan."); } + @Override + public CloseableIterator scan(Range keyRange, int limit, + Collection> filterIndexes, String orderByField, SortOrder sortOrder) + throws InvalidFieldException, IOException { + return scan(() -> structuredTable.scan(keyRange, limit, filterIndexes, orderByField, sortOrder), + "sort.filter.index.range.scan."); + } + @Override public CloseableIterator scan(Range keyRange, int limit, String orderByField, SortOrder sortOrder) diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStructuredTable.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStructuredTable.java index 114fabf86f0a..9616dbd49389 100644 --- a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStructuredTable.java +++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStructuredTable.java @@ -65,6 +65,9 @@ public class PostgreSqlStructuredTable implements StructuredTable { private final FieldValidator fieldValidator; private final int fetchSize; + /** + * Default constructor for PostgreSqlStructuredTable. + */ public PostgreSqlStructuredTable(Connection connection, StructuredTableSchema tableSchema, int fetchSize) { this.connection = connection; @@ -268,64 +271,6 @@ public CloseableIterator multiScan(Collection keyRanges, } } - /** - * Generates a SELECT query for scanning over all the provided ranges. For each of the range, it - * generates a where clause using the {@link #appendRange(StringBuilder, Range)} method. The where - * clause of each range are OR together. E.g. - *

- * SELECT * FROM table WHERE (key1 = ? AND key2 = ?) OR (key1 = ? AND key2 = ?) OR ((key3 >= ?) - * AND (key3 <= ?)) OR ((key4 >= ?) AND (key4 <= ?)) LIMIT limit - * - * @param singletonRanges the list of singleton ranges to scan - * @param ranges the list of ranges to scan - * @param limit number of result - * @return a select query - */ - private PreparedStatement prepareMultiScanQuery(Collection singletonRanges, - Collection ranges, int limit) throws SQLException { - // TODO: CDAP-19734, refactor cases like - // (namespace >= 'default' and namespace <= 'default' and app >='a' and app <= 'b') - // to (namespace = 'default' and app >='a' and app <= 'b') - StringBuilder query = new StringBuilder("SELECT * FROM ") - .append(tableSchema.getTableId().getName()).append(" WHERE "); - - // Generates "(key1 = ? AND key2 = ?) OR (key1 = ? AND key2 = ?)..." clause - String separator = ""; - Collection> singletonFields = new ArrayList<>(); - for (Range singleton : singletonRanges) { - query - .append(separator) - .append(singleton.getBegin().stream().map(field -> field.getName() + " = ?") - .collect(Collectors.joining(" AND ", "(", ")"))); - separator = " OR "; - singletonFields.addAll(singleton.getBegin()); - } - - // Generates the ((key3 >= ?) AND (key3 <= ?)) OR ((key4 >= ?) AND (key4 <= ?)) - if (!ranges.isEmpty()) { - separator = singletonRanges.isEmpty() ? "(" : " OR ("; - for (Range range : ranges) { - query.append(separator).append("("); - appendRange(query, range); - query.append(")"); - separator = " OR "; - } - query.append(")"); - } - query.append(getOrderByClause(tableSchema.getPrimaryKeys())); - query.append(" LIMIT ").append(limit).append(";"); - - PreparedStatement statement = connection.prepareStatement(query.toString()); - statement.setFetchSize(fetchSize); - - // Set the parameters - int index = setFields(statement, singletonFields, 1); - for (Range range : ranges) { - index = setStatementFieldByRange(range, statement, index); - } - return statement; - } - @Override public CloseableIterator scan(Field index) throws InvalidFieldException, IOException { @@ -362,6 +307,25 @@ public CloseableIterator scan(Range keyRange, int limit, public CloseableIterator scan(Range keyRange, int limit, Collection> filterIndexes, SortOrder sortOrder) throws InvalidFieldException, IOException { + return scan(keyRange, limit, filterIndexes, tableSchema.getPrimaryKeys(), sortOrder, false); + } + + @Override + public CloseableIterator scan(Range keyRange, int limit, + Collection> filterIndexes, String orderByField, SortOrder sortOrder) + throws InvalidFieldException, IOException { + if (!tableSchema.isIndexColumn(orderByField) && !tableSchema.isPrimaryKeyColumn(orderByField)) { + throw new InvalidFieldException(tableSchema.getTableId(), orderByField, + "is not an indexed column or primary key"); + } + return scan(keyRange, limit, filterIndexes, Collections.singleton(orderByField), sortOrder, + true); + } + + private CloseableIterator scan(Range keyRange, int limit, + Collection> filterIndexes, Collection fieldsToSort, SortOrder sortOrder, + boolean isAndFilter) + throws InvalidFieldException, IOException { fieldValidator.validateScanRange(keyRange); filterIndexes.forEach(fieldValidator::validateField); if (!tableSchema.isIndexColumns( @@ -373,7 +337,8 @@ public CloseableIterator scan(Range keyRange, int limit, LOG.trace("Table {}: Scan range {} with filterIndexes {} limit {} sortOrder {}", tableSchema.getTableId(), keyRange, filterIndexes, limit, sortOrder); - String scanQuery = getScanIndexesQuery(keyRange, limit, filterIndexes, sortOrder); + String scanQuery = getScanIndexesQuery(keyRange, limit, filterIndexes, fieldsToSort, sortOrder, + isAndFilter); // Since in getScanIndexesQuery we directly set the NULL checks, we need to skip the null fields filterIndexes = filterIndexes.stream().filter(f -> f.getValue() != null) .collect(Collectors.toList()); @@ -603,6 +568,64 @@ public void close() throws IOException { } } + /** + * Generates a SELECT query for scanning over all the provided ranges. For each of the range, it + * generates a where clause using the {@link #appendRange(StringBuilder, Range)} method. The where + * clause of each range are OR together. E.g. + * + *

SELECT * FROM table WHERE (key1 = ? AND key2 = ?) OR (key1 = ? AND key2 = ?) OR ((key3 >= ?) + * AND (key3 <= ?)) OR ((key4 >= ?) AND (key4 <= ?)) LIMIT limit + * + * @param singletonRanges the list of singleton ranges to scan + * @param ranges the list of ranges to scan + * @param limit number of result + * @return a select query + */ + private PreparedStatement prepareMultiScanQuery(Collection singletonRanges, + Collection ranges, int limit) throws SQLException { + // TODO: CDAP-19734, refactor cases like + // (namespace >= 'default' and namespace <= 'default' and app >='a' and app <= 'b') + // to (namespace = 'default' and app >='a' and app <= 'b') + StringBuilder query = new StringBuilder("SELECT * FROM ") + .append(tableSchema.getTableId().getName()).append(" WHERE "); + + // Generates "(key1 = ? AND key2 = ?) OR (key1 = ? AND key2 = ?)..." clause + String separator = ""; + Collection> singletonFields = new ArrayList<>(); + for (Range singleton : singletonRanges) { + query + .append(separator) + .append(singleton.getBegin().stream().map(field -> field.getName() + " = ?") + .collect(Collectors.joining(" AND ", "(", ")"))); + separator = " OR "; + singletonFields.addAll(singleton.getBegin()); + } + + // Generates the ((key3 >= ?) AND (key3 <= ?)) OR ((key4 >= ?) AND (key4 <= ?)) + if (!ranges.isEmpty()) { + separator = singletonRanges.isEmpty() ? "(" : " OR ("; + for (Range range : ranges) { + query.append(separator).append("("); + appendRange(query, range); + query.append(")"); + separator = " OR "; + } + query.append(")"); + } + query.append(getOrderByClause(tableSchema.getPrimaryKeys())); + query.append(" LIMIT ").append(limit).append(";"); + + PreparedStatement statement = connection.prepareStatement(query.toString()); + statement.setFetchSize(fetchSize); + + // Set the parameters + int index = setFields(statement, singletonFields, 1); + for (Range range : ranges) { + index = setStatementFieldByRange(range, statement, index); + } + return statement; + } + private void upsertInternal(Collection> fields) throws IOException { String sqlQuery = getWriteSqlQuery(fields, null); try (PreparedStatement statement = connection.prepareStatement(sqlQuery)) { @@ -788,7 +811,7 @@ private void setStatementFieldByRange(Range keyRange, } /** - * Sets the {@link PreparedStatement} arguments by the key {@link Collection}. + * Sets the {@link PreparedStatement} arguments by the key {@link Collection}<{@link Range}>}. */ private void setStatementFieldByRange(Collection keyRanges, PreparedStatement statement) throws SQLException, InvalidFieldException { @@ -928,21 +951,25 @@ private String getScanQuery(Range range, int limit, Collection fieldsToS * @param limit limit number of row * @param filterIndexes index fields * @param sortOrder sort order by primary keys + * @param isAndFilter if the filters have AND condition, false would signify OR condition * @return the scan query */ private String getScanIndexesQuery(Range range, int limit, Collection> filterIndexes, - SortOrder sortOrder) { + Collection fieldsToSort, SortOrder sortOrder, boolean isAndFilter) { StringBuilder queryString = new StringBuilder("SELECT * FROM ") .append(tableSchema.getTableId().getName()) .append(" WHERE "); if (!range.getBegin().isEmpty() || !range.getEnd().isEmpty()) { appendRange(queryString, range); + } + + if (!filterIndexes.isEmpty()) { queryString.append(" AND "); + queryString.append(getIndexesFilterClause(filterIndexes, isAndFilter)); } - queryString.append(getIndexesFilterClause(filterIndexes)); - queryString.append(getOrderByClause(tableSchema.getPrimaryKeys(), sortOrder)); + queryString.append(getOrderByClause(fieldsToSort, sortOrder)); queryString.append(" LIMIT ").append(limit).append(";"); return queryString.toString(); } @@ -1021,8 +1048,10 @@ private String getEqualsClause(Collection> keys) { return joiner.toString(); } - private String getIndexesFilterClause(Collection> indexes) { - StringJoiner joiner = new StringJoiner(" OR ", "(", ")"); + private String getIndexesFilterClause(Collection> indexes, + boolean isAndFilter) { + String delimiter = isAndFilter ? " AND " : " OR "; + StringJoiner joiner = new StringJoiner(delimiter, "(", ")"); for (Field key : indexes) { joiner.add(key.getName() + (key.getValue() == null ? " is NULL" : " = ?")); } diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/operationrun/OperationMeta.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/operationrun/OperationMeta.java index 9d1cb31793e3..557398fce61b 100644 --- a/cdap-proto/src/main/java/io/cdap/cdap/proto/operationrun/OperationMeta.java +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/operationrun/OperationMeta.java @@ -67,4 +67,17 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(resources, createTime, endTime); } + + public Set getResources() { + return resources; + } + + public Instant getCreateTime() { + return createTime; + } + + @Nullable + public Instant getEndTime() { + return endTime; + } } diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/operationrun/OperationRun.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/operationrun/OperationRun.java index f8e821660414..499cbf0aeeab 100644 --- a/cdap-proto/src/main/java/io/cdap/cdap/proto/operationrun/OperationRun.java +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/operationrun/OperationRun.java @@ -53,7 +53,7 @@ protected OperationRun(String id, String type, OperationRunStatus status, Operat @Nullable OperationError error) { this.id = id; this.type = type; - this.done = getStatus().isEndState(); + this.done = status.isEndState(); this.status = status; this.metadata = metadata; this.error = error; @@ -86,7 +86,7 @@ public int hashCode() { /** * Creates a OperationRun Builder. */ - public static Builder runBuilder() { + public static Builder builder() { return new Builder(); } @@ -95,7 +95,7 @@ public static Builder runBuilder() { * * @param operationRun existing record to copy fields from */ - public static Builder runBuilder(OperationRun operationRun) { + public static Builder builder(OperationRun operationRun) { return new Builder(operationRun); } diff --git a/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/StructuredTable.java b/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/StructuredTable.java index c3e242f16ba2..72f4d7569ed8 100644 --- a/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/StructuredTable.java +++ b/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/StructuredTable.java @@ -194,6 +194,27 @@ default CloseableIterator scan(Range keyRange, int limit, throw new UnsupportedOperationException("No supported implementation."); } + /** + * Read a set of rows from the table matching the index and the key range. The rows returned will + * be sorted on given indexed field. + * + * @param keyRange key range for the scan + * @param limit maximum number of rows to return + * @param filterIndexes the index to filter upon + * @param orderByField the field to sort upon + * @param sortOrder defined primary key sort order. Note that the comparator used is specific + * to the underlying store and is not necessarily lexicographic. + * @return a {@link CloseableIterator} of rows + * @throws InvalidFieldException if the field is not part of the table schema, or is not an + * indexed column, or the type does not match the schema + * @throws IOException if there is an error scanning the table + */ + default CloseableIterator scan(Range keyRange, int limit, + Collection> filterIndexes, String orderByField, SortOrder sortOrder) + throws InvalidFieldException, IOException { + throw new UnsupportedOperationException("No supported implementation."); + } + /** * Read a set of rows from the table matching the key range, return by sortOrder of specified * index field. @@ -280,7 +301,7 @@ void increment(Collection> keys, String column, long amount) void deleteAll(Range keyRange) throws InvalidFieldException, IOException; /** - * Updates the specific fields in a range of rows from the table + * Updates the specific fields in a range of rows from the table. * * @param keyRange key range of the rows to update: cab only be a primary key prefix * @param fields the fields to write to