From ed3ade1819e776a536003a9ce0cf154b501079ba Mon Sep 17 00:00:00 2001 From: ritwiksahani Date: Tue, 6 Aug 2024 15:58:53 +0530 Subject: [PATCH] Adding logic to delete lineage data. --- .../app/services/AppFabricServer.java | 10 +- .../services/RunDataTimeToLiveService.java | 191 ++++++++++++++++++ .../services/RunRecordTimeToLiveService.java | 119 ----------- .../data2/metadata/lineage/LineageTable.java | 41 ++++ .../lineage/field/FieldLineageTable.java | 32 +++ .../data/common/MetricStructuredTable.java | 19 ++ .../spi/data/nosql/NoSqlStructuredTable.java | 11 + .../data/sql/PostgreSqlStructuredTable.java | 13 ++ .../metadata/lineage/LineageTableTest.java | 2 +- .../lineage/NoSqlLineageTableTest.java | 29 +++ .../metadata/lineage/SqlLineageTableTest.java | 64 ++++++ .../lineage/field/FieldLineageTableTest.java | 2 +- .../field/NoSqlFieldLineageTableTest.java | 27 +++ .../field/SqlFieldLineageTableTest.java | 79 ++++++++ .../data/nosql/NoSqlStructuredTableTest.java | 6 + .../spanner/SpannerStructuredTable.java | 17 +- .../cdap/cdap/spi/data/StructuredTable.java | 20 +- .../cdap/spi/data/StructuredTableTest.java | 64 ++++++ 18 files changed, 618 insertions(+), 128 deletions(-) create mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunDataTimeToLiveService.java delete mode 100644 cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordTimeToLiveService.java diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java index 40cda348e4ce..470b8720a1c3 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/AppFabricServer.java @@ -83,7 +83,7 @@ public class AppFabricServer extends AbstractIdleService { private final ProgramNotificationSubscriberService programNotificationSubscriberService; private final ProgramStopSubscriberService programStopSubscriberService; private final RunRecordCorrectorService runRecordCorrectorService; - private final RunRecordTimeToLiveService runRecordTimeToLiveService; + private final RunDataTimeToLiveService runDataTimeToLiveService; private final ProgramRunStatusMonitorService programRunStatusMonitorService; private final RunRecordMonitorService runRecordCounterService; private final CoreSchedulerService coreSchedulerService; @@ -131,7 +131,7 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf, TransactionRunner transactionRunner, RunRecordMonitorService runRecordCounterService, CommonNettyHttpServiceFactory commonNettyHttpServiceFactory, - RunRecordTimeToLiveService runRecordTimeToLiveService, + RunDataTimeToLiveService runDataTimeToLiveService, SourceControlOperationRunner sourceControlOperationRunner, RepositoryCleanupService repositoryCleanupService, OperationNotificationSubscriberService operationNotificationSubscriberService) { @@ -158,7 +158,7 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf, this.systemAppManagementService = systemAppManagementService; this.transactionRunner = transactionRunner; this.runRecordCounterService = runRecordCounterService; - this.runRecordTimeToLiveService = runRecordTimeToLiveService; + this.runDataTimeToLiveService = runDataTimeToLiveService; this.commonNettyHttpServiceFactory = commonNettyHttpServiceFactory; this.sourceControlOperationRunner = sourceControlOperationRunner; this.repositoryCleanupService = repositoryCleanupService; @@ -191,7 +191,7 @@ protected void startUp() throws Exception { coreSchedulerService.start(), credentialProviderService.start(), runRecordCounterService.start(), - runRecordTimeToLiveService.start(), + runDataTimeToLiveService.start(), sourceControlOperationRunner.start(), repositoryCleanupService.start(), operationNotificationSubscriberService.start() @@ -250,7 +250,7 @@ protected void shutDown() throws Exception { programRunStatusMonitorService.stopAndWait(); provisioningService.stopAndWait(); runRecordCounterService.stopAndWait(); - runRecordTimeToLiveService.stopAndWait(); + runDataTimeToLiveService.stopAndWait(); sourceControlOperationRunner.stopAndWait(); repositoryCleanupService.stopAndWait(); credentialProviderService.stopAndWait(); diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunDataTimeToLiveService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunDataTimeToLiveService.java new file mode 100644 index 000000000000..0ddf96ed1cbb --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunDataTimeToLiveService.java @@ -0,0 +1,191 @@ +/* + * Copyright © 2023 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.services; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.AbstractIdleService; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.inject.Inject; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.data2.metadata.lineage.LineageTable; +import io.cdap.cdap.data2.metadata.lineage.field.FieldLineageTable; +import io.cdap.cdap.internal.app.store.AppMetadataStore; +import io.cdap.cdap.spi.data.transaction.TransactionException; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Service which periodically scans the database tables for records which should be deleted per the + * global time to live value. + * + *

Does not run if no TTL is configured or a TTL of 0 is specified. + *

+ * This service would invoke other TTL based clean up services which implement the interface + * {@link CleanupService}. Those services would be called in sequence. + *

+ */ +public final class RunDataTimeToLiveService extends AbstractIdleService { + + private static final Logger LOG = LoggerFactory.getLogger(RunDataTimeToLiveService.class); + + private final TransactionRunner transactionRunner; + private final boolean isEnabled; + private final Duration ttlMaxAge; + private final Duration checkFrequency; + private final Duration initialDelay; + private final Clock clock; + + private ScheduledExecutorService service; + private List cleanupServiceList; + + @Inject + RunDataTimeToLiveService(CConfiguration cConf, TransactionRunner transactionRunner) { + // Negative TTLs do not make sense, treat as 0. + this.ttlMaxAge = + Duration.ofDays(Math.max(cConf.getInt(Constants.AppFabric.RUN_DATA_CLEANUP_TTL_DAYS), 0)); + this.isEnabled = !this.ttlMaxAge.isZero(); + // Delay should be at least 1 hour to ensure it isn't infinitely running. + this.checkFrequency = + Duration.ofHours( + Math.max(cConf.getInt(Constants.AppFabric.RUN_DATA_CLEANUP_TTL_FREQUENCY_HOURS), 1)); + // Negative delays do not make sense, treat as 0. + this.initialDelay = + Duration.ofMinutes( + Math.max( + cConf.getInt(Constants.AppFabric.RUN_DATA_CLEANUP_TTL_INITIAL_DELAY_MINUTES), 0)); + + this.transactionRunner = transactionRunner; + this.clock = Clock.systemUTC(); + this.cleanupServiceList = ImmutableList.of( + new RunRecordsCleanupService(), + new LineageCleanupService(), + new FieldLineageCleanupService()); + } + + @Override + protected void startUp() { + if (!isEnabled) { + LOG.info("No TTL configured, skipping starting RunDataTimeToLiveService"); + return; + } + + service = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat("Records TTL janitor").build()); + + service.scheduleAtFixedRate( + () -> doCleanup(), + initialDelay.getSeconds(), + checkFrequency.getSeconds(), + TimeUnit.SECONDS); + } + + @Override + protected void shutDown() { + if (!isEnabled) { + // no-op because no services were started. + return; + } + LOG.info("Stopping RunDataTimeToLiveService"); + + service.shutdownNow(); + } + + private void doCleanup() { + Instant endDate = Instant.now(clock).minus(ttlMaxAge); + // Perform cleanup together for all services with a fixed end time. This is currently a + // sequential call and may be executed parallelly in future if required. + this.cleanupServiceList.forEach(service -> { + long startTime = System.currentTimeMillis(); + service.doCleanup(endDate); + double timeTaken = (System.currentTimeMillis() - startTime) / 1000.0; + LOG.info("{} cleanup completed in {} seconds", service.getClass().getName(), timeTaken); + }); + } + + private interface CleanupService { + + void doCleanup(Instant endDate); + } + + private class RunRecordsCleanupService implements CleanupService { + + @Override + public void doCleanup(Instant endDate) { + LOG.info("Doing scheduled cleanup, deleting all run records before {}", endDate); + + try { + transactionRunner.run( + context -> { + AppMetadataStore appMetadataStore = AppMetadataStore.create(context); + + appMetadataStore.deleteCompletedRunsStartedBefore(endDate); + }); + } catch (TransactionException e) { + LOG.error("Failed to clean up old run records", e); + } + } + } + + private class LineageCleanupService implements CleanupService { + + @Override + public void doCleanup(Instant endDate) { + LOG.info("Doing scheduled cleanup, deleting all run lineage records before {}", endDate); + + try { + transactionRunner.run( + context -> { + LineageTable lineageTable = LineageTable.create(context); + + lineageTable.deleteCompletedLineageRecordsStartedBefore(endDate); + }); + } catch (TransactionException e) { + LOG.error("Failed to clean up old lineage records", e); + } + } + } + + + private class FieldLineageCleanupService implements CleanupService { + + @Override + public void doCleanup(Instant endDate) { + LOG.info("Doing scheduled cleanup, deleting all field lineage records before {}", endDate); + + try { + transactionRunner.run( + context -> { + FieldLineageTable fieldLineageTable = FieldLineageTable.create(context); + + fieldLineageTable.deleteFieldRecordsBefore(endDate); + }); + } catch (TransactionException e) { + LOG.error("Failed to clean up old field lineage records", e); + } + } + } +} diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordTimeToLiveService.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordTimeToLiveService.java deleted file mode 100644 index bf13c44a5219..000000000000 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/services/RunRecordTimeToLiveService.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Copyright © 2023 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.cdap.internal.app.services; - -import com.google.common.util.concurrent.AbstractIdleService; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.inject.Inject; -import io.cdap.cdap.common.conf.CConfiguration; -import io.cdap.cdap.common.conf.Constants; -import io.cdap.cdap.internal.app.store.AppMetadataStore; -import io.cdap.cdap.spi.data.transaction.TransactionException; -import io.cdap.cdap.spi.data.transaction.TransactionRunner; -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Service which periodically scans the database tables for run records which should be deleted per - * the global time to live value. - * - *

Does not run if no TTL is configured or a TTL of 0 is specified. - */ -public final class RunRecordTimeToLiveService extends AbstractIdleService { - - private static final Logger LOG = LoggerFactory.getLogger(RunRecordTimeToLiveService.class); - - private final TransactionRunner transactionRunner; - private final boolean isEnabled; - private final Duration ttlMaxAge; - private final Duration checkFrequency; - private final Duration initialDelay; - private final Clock clock; - - private ScheduledExecutorService service; - - @Inject - RunRecordTimeToLiveService(CConfiguration cConf, TransactionRunner transactionRunner) { - // Negative TTLs do not make sense, treat as 0. - this.ttlMaxAge = - Duration.ofDays(Math.max(cConf.getInt(Constants.AppFabric.RUN_DATA_CLEANUP_TTL_DAYS), 0)); - this.isEnabled = !this.ttlMaxAge.isZero(); - // Delay should be at least 1 hour to ensure it isn't infinitely running. - this.checkFrequency = - Duration.ofHours( - Math.max(cConf.getInt(Constants.AppFabric.RUN_DATA_CLEANUP_TTL_FREQUENCY_HOURS), 1)); - // Negative delays do not make sense, treat as 0. - this.initialDelay = - Duration.ofMinutes( - Math.max( - cConf.getInt(Constants.AppFabric.RUN_DATA_CLEANUP_TTL_INITIAL_DELAY_MINUTES), 0)); - - this.transactionRunner = transactionRunner; - this.clock = Clock.systemUTC(); - } - - @Override - protected void startUp() { - if (!isEnabled) { - LOG.info("No TTL configured, skipping starting RunRecordTimeToLiveService"); - return; - } - - service = - Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat("Run Record TTL janitor").build()); - - service.scheduleAtFixedRate( - () -> doCleanup(), - initialDelay.getSeconds(), - checkFrequency.getSeconds(), - TimeUnit.SECONDS); - } - - @Override - protected void shutDown() { - if (!isEnabled) { - // no-op because no services were started. - return; - } - LOG.info("Stopping RunRecordTimeToLiveService"); - - service.shutdownNow(); - } - - private void doCleanup() { - Instant endDate = Instant.now(clock).minus(ttlMaxAge); - LOG.info("Doing scheduled cleanup, deleting all run records before {}", endDate); - - try { - transactionRunner.run( - context -> { - AppMetadataStore appMetadataStore = AppMetadataStore.create(context); - - appMetadataStore.deleteCompletedRunsStartedBefore(endDate); - }); - } catch (TransactionException e) { - LOG.error("Failed to clean up old records", e); - } - } -} diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/lineage/LineageTable.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/lineage/LineageTable.java index 86867dd5963c..8fcd630f1c32 100644 --- a/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/lineage/LineageTable.java +++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/lineage/LineageTable.java @@ -35,6 +35,7 @@ import io.cdap.cdap.spi.data.table.field.Range; import io.cdap.cdap.store.StoreDefinition; import java.io.IOException; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Set; @@ -66,6 +67,46 @@ public static LineageTable create(StructuredTableContext context) { return new LineageTable(context); } + /** + * Deletes all completed run lineage records with a start time before {@code timeUpperBound}, + * throwing {@code IOException} if the delete operation fails. + *

+ * Assumption is that lineage records are added only after run is completed, so we don't need to + * check completed run statuses. + *

+ * + *

+ * This function will not work with the in memory no-sql DB and will return + * {@code InvalidFieldException}. This is because of the way we need to query and will only be + * used in the managed instances controlled by a flag. + *

+ * + * @param timeUpperBound is the end time before which all records should be deleted. + */ + public void deleteCompletedLineageRecordsStartedBefore(Instant timeUpperBound) + throws IOException { + // While converting from Run we are using Millis hence we need to get epoch millis. + long maxTimeEpoch = timeUpperBound.toEpochMilli(); + if (maxTimeEpoch == Long.MAX_VALUE) { + // We don't want to blanket delete all records in case of incorrect values. + LOG.warn("Passed invalid start time to for lineage deletion, this would delete all entries"); + return; + } + // Data should be deleted from both the lineage tables. + getDatasetTable() + .scanDeleteAll(createStartTimeEndRange(maxTimeEpoch)); + getProgramTable() + .scanDeleteAll(createStartTimeEndRange(maxTimeEpoch)); + } + + private Range createStartTimeEndRange(long endTime) { + ImmutableList> end = ImmutableList.of( + Fields.longField(StoreDefinition.LineageStore.START_TIME_FIELD, invertTime(endTime))); + // Since the times are inverted the end time will be the start of the range. + return Range.from(end, Range.Bound.EXCLUSIVE); + } + + private LineageTable(StructuredTableContext structuredTableContext) { this.structuredTableContext = structuredTableContext; } diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/lineage/field/FieldLineageTable.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/lineage/field/FieldLineageTable.java index 7a3f5cc4dff6..6b4d22eca13e 100644 --- a/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/lineage/field/FieldLineageTable.java +++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/data2/metadata/lineage/field/FieldLineageTable.java @@ -41,6 +41,7 @@ import io.cdap.cdap.store.StoreDefinition; import java.io.IOException; import java.lang.reflect.Type; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -164,6 +165,37 @@ public void deleteAll() throws IOException { getSummaryFieldsTable().deleteAll(Range.all()); } + /** + * Delete the field lineage records that started before the {@param endTime}. + * + *

+ * This method deletes all field record entries entries from the + * {@link StoreDefinition.FieldLineageStore} tables. Currently only the parent table i.e. + * {@code fields_lineage} is being deleted. + *

+ * + * @param endTime is the end time before which all records should be deleted. + */ + public void deleteFieldRecordsBefore(Instant endTime) throws IOException { + // While converting from Run we are using Millis hence we need to get epoch millis. + long maxTimeEpoch = endTime.toEpochMilli(); + if (maxTimeEpoch == Long.MAX_VALUE) { + // We don't want to blanket delete all records in case of incorrect values. + LOG.warn("Passed invalid start time to for lineage deletion, this would delete all entries"); + return; + } + // Start time is only available in the parent Field lineage table and has the maximum amount of + // entries. We are only deleting from the parent table. Child tables are essentially nested and + // would need to be handled separately. + getEndpointChecksumTable().scanDeleteAll(createStartTimeEndRange(maxTimeEpoch)); + } + + private Range createStartTimeEndRange(long endTime) { + ImmutableList> end = ImmutableList.of( + Fields.longField(StoreDefinition.FieldLineageStore.START_TIME_FIELD, invertTime(endTime))); + // Since the times are inverted the end time will be the start of the range. + return Range.from(end, Range.Bound.EXCLUSIVE); + } @Nullable private Set readOperations(long checksum) throws IOException { List> fields = getOperationsKey(checksum); diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/common/MetricStructuredTable.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/common/MetricStructuredTable.java index 0d595aa5743c..29b6b45fcecf 100644 --- a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/common/MetricStructuredTable.java +++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/common/MetricStructuredTable.java @@ -315,6 +315,25 @@ public void deleteAll(Range keyRange) throws InvalidFieldException, IOException } } + @Override + public void scanDeleteAll(Range keyRange) + throws InvalidFieldException, UnsupportedOperationException, IOException { + try { + if (!emitTimeMetrics) { + structuredTable.scanDeleteAll(keyRange); + } else { + long curTime = System.nanoTime(); + structuredTable.scanDeleteAll(keyRange); + long duration = System.nanoTime() - curTime; + metricsCollector.increment(metricPrefix + "scanDeleteAll.time", duration); + } + metricsCollector.increment(metricPrefix + "scanDeleteAll.count", 1L); + } catch (Exception e) { + metricsCollector.increment(metricPrefix + "scanDeleteAll.error", 1L); + throw e; + } + } + @Override public void updateAll(Range keyRange, Collection> fields) throws InvalidFieldException, IOException { diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTable.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTable.java index 5b86f3cb9eae..1aa40adbf95a 100644 --- a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTable.java +++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTable.java @@ -456,6 +456,17 @@ public void deleteAll(Range keyRange) throws InvalidFieldException, IOException } } + @Override + public void scanDeleteAll(Range keyRange) + throws InvalidFieldException, UnsupportedOperationException, IOException { + // For No SQL tables we are not supporting deletes on random columns because of performance + // concerns. + throw new UnsupportedOperationException( + String.format("scanDeleteAll operation not supported for NoSQL Table: %s", + schema.getTableId() + .getName())); + } + @Override public void updateAll(Range keyRange, Collection> fields) throws InvalidFieldException { LOG.trace("Table {}: Update fields {} in range {}", schema.getTableId(), fields, keyRange); diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStructuredTable.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStructuredTable.java index 392d4dca14aa..0a2a55943af9 100644 --- a/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStructuredTable.java +++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/spi/data/sql/PostgreSqlStructuredTable.java @@ -556,6 +556,19 @@ public void delete(Collection> keys) throws InvalidFieldException, IOEx public void deleteAll(Range keyRange) throws InvalidFieldException, IOException { LOG.trace("Table {}: DeleteAll with range {}", tableSchema.getTableId(), keyRange); fieldValidator.validateScanRange(keyRange); + executeDeleteAll(keyRange); + } + + @Override + public void scanDeleteAll(Range keyRange) + throws InvalidFieldException, UnsupportedOperationException, IOException { + LOG.trace("Table {}: ScanDeleteAll with range {}", tableSchema.getTableId(), keyRange); + keyRange.getBegin().forEach(fieldValidator::validateField); + keyRange.getEnd().forEach(fieldValidator::validateField); + executeDeleteAll(keyRange); + } + + private void executeDeleteAll(Range keyRange) throws IOException { String sql = getDeleteAllStatement(keyRange); try (PreparedStatement statement = connection.prepareStatement(sql)) { setStatementFieldByRange(keyRange, statement); diff --git a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/LineageTableTest.java b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/LineageTableTest.java index 3e660a606d6a..0006f2d60272 100644 --- a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/LineageTableTest.java +++ b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/LineageTableTest.java @@ -136,7 +136,7 @@ public void testMultipleRelations() { } @SafeVarargs - private static Set toSet(T... elements) { + protected static Set toSet(T... elements) { return ImmutableSet.copyOf(elements); } } diff --git a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/NoSqlLineageTableTest.java b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/NoSqlLineageTableTest.java index 0c3c80da1386..c58f04bb8524 100644 --- a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/NoSqlLineageTableTest.java +++ b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/NoSqlLineageTableTest.java @@ -16,16 +16,27 @@ package io.cdap.cdap.data2.metadata.lineage; +import io.cdap.cdap.common.app.RunIds; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; import io.cdap.cdap.data2.dataset2.DatasetFrameworkTestUtil; +import io.cdap.cdap.proto.ProgramType; +import io.cdap.cdap.proto.id.DatasetId; +import io.cdap.cdap.proto.id.NamespacedEntityId; +import io.cdap.cdap.proto.id.ProgramId; +import io.cdap.cdap.proto.id.ProgramRunId; import io.cdap.cdap.spi.data.StructuredTableAdmin; import io.cdap.cdap.spi.data.TableAlreadyExistsException; import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import io.cdap.cdap.spi.data.transaction.TransactionRunners; import io.cdap.cdap.store.StoreDefinition; import java.io.IOException; +import java.time.Instant; +import java.util.Set; +import org.apache.twill.api.RunId; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Test; public class NoSqlLineageTableTest extends LineageTableTest { @ClassRule @@ -41,4 +52,22 @@ public static void beforeClass() throws IOException, TableAlreadyExistsException StoreDefinition.createAllTables(structuredTableAdmin); } + @Test(expected = UnsupportedOperationException.class) + public void testDeleteCompletedLineageRecordsThrowsException() { + final Instant currentTime = Instant.now(); + final RunId runId = RunIds.generate(10000); + final DatasetId datasetInstance = new DatasetId("default", "dataset1"); + final ProgramId program = new ProgramId("default", "app1", ProgramType.SERVICE, "service1"); + final ProgramRunId run = program.run(runId.getId()); + + final long accessTimeMillis = System.currentTimeMillis(); + TransactionRunners.run(transactionRunner, context -> { + LineageTable lineageTable = LineageTable.create(context); + lineageTable.addAccess(run, datasetInstance, AccessType.READ, accessTimeMillis); + Set entitiesForRun = lineageTable.getEntitiesForRun(run); + // Since no-SQL DBs are not supported for time based deletes, this should throw an + // UnsupportedOperationException. + lineageTable.deleteCompletedLineageRecordsStartedBefore(currentTime); + }); + } } diff --git a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/SqlLineageTableTest.java b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/SqlLineageTableTest.java index aa011651bd6f..f244f10bb9bc 100644 --- a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/SqlLineageTableTest.java +++ b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/SqlLineageTableTest.java @@ -16,30 +16,45 @@ package io.cdap.cdap.data2.metadata.lineage; +import com.google.common.collect.ImmutableList; import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Scopes; import io.cdap.cdap.api.metrics.MetricsCollectionService; +import io.cdap.cdap.common.app.RunIds; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.guice.ConfigModule; import io.cdap.cdap.common.guice.LocalLocationModule; import io.cdap.cdap.common.metrics.NoOpMetricsCollectionService; import io.cdap.cdap.data.runtime.StorageModule; import io.cdap.cdap.data.runtime.SystemDatasetRuntimeModule; +import io.cdap.cdap.proto.ProgramType; +import io.cdap.cdap.proto.id.DatasetId; +import io.cdap.cdap.proto.id.NamespacedEntityId; +import io.cdap.cdap.proto.id.ProgramId; +import io.cdap.cdap.proto.id.ProgramRunId; import io.cdap.cdap.spi.data.StructuredTableAdmin; import io.cdap.cdap.spi.data.TableAlreadyExistsException; import io.cdap.cdap.spi.data.sql.PostgresInstantiator; import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import io.cdap.cdap.spi.data.transaction.TransactionRunners; import io.cdap.cdap.store.StoreDefinition; import io.zonky.test.db.postgres.embedded.EmbeddedPostgres; import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Set; +import org.apache.twill.api.RunId; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Test; import org.junit.rules.TemporaryFolder; public class SqlLineageTableTest extends LineageTableTest { + @ClassRule public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); @@ -67,6 +82,55 @@ protected void configure() { StoreDefinition.createAllTables(injector.getInstance(StructuredTableAdmin.class)); } + + @Test + public void testDeleteOutOfRangeCompletedRuns() { + final Instant currentTime = Instant.now(); + final RunId runId = RunIds.generate(10000); + final DatasetId datasetInstance = new DatasetId("default", "dataset1"); + final ProgramId program = new ProgramId("default", "app1", ProgramType.SERVICE, "service1"); + final ProgramRunId run = program.run(runId.getId()); + final long accessTimeMillis = System.currentTimeMillis(); + TransactionRunners.run(transactionRunner, context -> { + LineageTable lineageTable = LineageTable.create(context); + lineageTable.addAccess(run, datasetInstance, AccessType.READ, accessTimeMillis); + + lineageTable.deleteCompletedLineageRecordsStartedBefore(currentTime); + + Set entitiesForRun = lineageTable.getEntitiesForRun(run); + // Asserts that the records are deleted. + Assert.assertTrue(entitiesForRun.isEmpty()); + }); + } + + @Test + public void testDeleteOutOfRangeCompletedRunsDoesNotDeleteLatestRuns() { + final Instant currentTime = Instant.now(); + final RunId runId = RunIds.generate(currentTime.toEpochMilli()); + final DatasetId datasetInstance = new DatasetId("default", "dataset1"); + final ProgramId program = new ProgramId("default", "app1", ProgramType.SERVICE, "service1"); + final ProgramRunId run = program.run(runId.getId()); + + + final long accessTimeMillis = System.currentTimeMillis(); + TransactionRunners.run(transactionRunner, context -> { + LineageTable lineageTable = LineageTable.create(context); + lineageTable.addAccess(run, datasetInstance, AccessType.READ, accessTimeMillis); + + Instant deleteStartTime = currentTime.minus(10, ChronoUnit.HOURS); + lineageTable.deleteCompletedLineageRecordsStartedBefore(deleteStartTime); + + Relation expected = new Relation(datasetInstance, program, AccessType.READ, runId); + Set relations = lineageTable.getRelations(datasetInstance, 0, currentTime.toEpochMilli(), x -> true); + // Asserts that the records are not deleted. + Assert.assertEquals(1, relations.size()); + Assert.assertEquals(expected, relations.iterator().next()); + Assert.assertEquals(toSet(program, datasetInstance), lineageTable.getEntitiesForRun(run)); + Assert.assertEquals(ImmutableList.of(accessTimeMillis), lineageTable.getAccessTimesForRun(run)); + }); + } + + @AfterClass public static void teardown() throws IOException { pg.close(); diff --git a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/FieldLineageTableTest.java b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/FieldLineageTableTest.java index 5ed965d8b578..130cd737bd58 100644 --- a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/FieldLineageTableTest.java +++ b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/FieldLineageTableTest.java @@ -224,7 +224,7 @@ public void testMergeSummaries() { }); } - private List generateOperations(boolean addAditionalField) { + protected List generateOperations(boolean addAditionalField) { // read: file -> (offset, body) // parse: (body) -> (first_name, last_name) // concat: (first_name, last_name) -> (name) diff --git a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/NoSqlFieldLineageTableTest.java b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/NoSqlFieldLineageTableTest.java index 0d759c5ff8dc..47419721b113 100644 --- a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/NoSqlFieldLineageTableTest.java +++ b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/NoSqlFieldLineageTableTest.java @@ -16,18 +16,27 @@ package io.cdap.cdap.data2.metadata.lineage.field; +import io.cdap.cdap.common.app.RunIds; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.conf.Constants; import io.cdap.cdap.data2.dataset2.DatasetFrameworkTestUtil; +import io.cdap.cdap.proto.ProgramType; +import io.cdap.cdap.proto.id.ProgramId; +import io.cdap.cdap.proto.id.ProgramRunId; import io.cdap.cdap.spi.data.StructuredTableAdmin; import io.cdap.cdap.spi.data.TableAlreadyExistsException; import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import io.cdap.cdap.spi.data.transaction.TransactionRunners; import io.cdap.cdap.store.StoreDefinition; import java.io.IOException; +import java.time.Instant; +import org.apache.twill.api.RunId; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Test; public class NoSqlFieldLineageTableTest extends FieldLineageTableTest { + @ClassRule public static DatasetFrameworkTestUtil dsFrameworkUtil = new DatasetFrameworkTestUtil(); @@ -41,4 +50,22 @@ public static void beforeClass() throws IOException, TableAlreadyExistsException StoreDefinition.createAllTables(structuredTableAdmin); } + @Test(expected = UnsupportedOperationException.class) + public void testDeleteFieldRecordsBefore() { + final Instant currentTime = Instant.now(); + + RunId runId = RunIds.generate(10000); + ProgramId program = new ProgramId("default", "app1", ProgramType.WORKFLOW, "workflow1"); + final ProgramRunId programRun1 = program.run(runId.getId()); + final FieldLineageInfo info1 = new FieldLineageInfo(generateOperations(false)); + + TransactionRunners.run(transactionRunner, context -> { + FieldLineageTable fieldLineageTable = FieldLineageTable.create(context); + fieldLineageTable.addFieldLineageInfo(programRun1, info1); + // Should throw UnsupportedOperationException exception since non-primary key based deletes + // are not supported in no-sql. + fieldLineageTable.deleteFieldRecordsBefore(currentTime); + }); + } + } diff --git a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/SqlFieldLineageTableTest.java b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/SqlFieldLineageTableTest.java index db98a9580269..190ebe497fbd 100644 --- a/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/SqlFieldLineageTableTest.java +++ b/cdap-data-fabric/src/test/java/io/cdap/cdap/data2/metadata/lineage/field/SqlFieldLineageTableTest.java @@ -16,27 +16,43 @@ package io.cdap.cdap.data2.metadata.lineage.field; +import com.google.common.collect.ImmutableList; import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Scopes; +import io.cdap.cdap.api.lineage.field.EndPoint; import io.cdap.cdap.api.metrics.MetricsCollectionService; +import io.cdap.cdap.common.app.RunIds; import io.cdap.cdap.common.conf.CConfiguration; import io.cdap.cdap.common.guice.ConfigModule; import io.cdap.cdap.common.guice.LocalLocationModule; import io.cdap.cdap.common.metrics.NoOpMetricsCollectionService; import io.cdap.cdap.data.runtime.StorageModule; import io.cdap.cdap.data.runtime.SystemDatasetRuntimeModule; +import io.cdap.cdap.proto.ProgramType; +import io.cdap.cdap.proto.id.ProgramId; +import io.cdap.cdap.proto.id.ProgramRunId; import io.cdap.cdap.spi.data.StructuredTableAdmin; import io.cdap.cdap.spi.data.TableAlreadyExistsException; import io.cdap.cdap.spi.data.sql.PostgresInstantiator; import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import io.cdap.cdap.spi.data.transaction.TransactionRunners; import io.cdap.cdap.store.StoreDefinition; import io.zonky.test.db.postgres.embedded.EmbeddedPostgres; import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.twill.api.RunId; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Test; import org.junit.rules.TemporaryFolder; public class SqlFieldLineageTableTest extends FieldLineageTableTest { @@ -67,6 +83,69 @@ protected void configure() { StoreDefinition.createAllTables(injector.getInstance(StructuredTableAdmin.class)); } + @Test + public void testDeleteFieldRecordsBefore() { + final Instant currentTime = Instant.now(); + + RunId runId = RunIds.generate(10000); + ProgramId program = new ProgramId("default", "app1", ProgramType.WORKFLOW, "workflow1"); + final ProgramRunId programRun1 = program.run(runId.getId()); + final FieldLineageInfo info1 = new FieldLineageInfo(generateOperations(false)); + + TransactionRunners.run(transactionRunner, context -> { + FieldLineageTable fieldLineageTable = FieldLineageTable.create(context); + fieldLineageTable.addFieldLineageInfo(programRun1, info1); + EndPoint source = EndPoint.of("ns1", "endpoint1"); + EndPoint destination = EndPoint.of("myns", "another_file"); + + fieldLineageTable.deleteFieldRecordsBefore(currentTime); + List actual = fieldLineageTable.getEndpoints("ns1", + program.getProgramReference(), runId); + + // Asserts that only the endpoint records are deleted. + Assert.assertTrue(actual.isEmpty()); + // Since the base table is deleted, get fields also won't return anything. + Assert.assertTrue(fieldLineageTable.getFields(destination, 0, 10001).isEmpty()); + Assert.assertTrue(fieldLineageTable.getFields(source, 0, 10001).isEmpty()); + + }); + } + + @Test + public void testDeleteFieldRecordsBeforeDoesNotDelete() { + final Instant currentTime = Instant.now(); + + RunId runId = RunIds.generate( + currentTime.minus(1, ChronoUnit.MINUTES) + .toEpochMilli()); + ProgramId program = new ProgramId("default", "app1", ProgramType.WORKFLOW, "workflow1"); + final ProgramRunId programRun1 = program.run(runId.getId()); + final FieldLineageInfo info1 = new FieldLineageInfo(generateOperations(false)); + + TransactionRunners.run(transactionRunner, context -> { + FieldLineageTable fieldLineageTable = FieldLineageTable.create(context); + fieldLineageTable.addFieldLineageInfo(programRun1, info1); + EndPoint source = EndPoint.of("ns1", "endpoint1"); + EndPoint destination = EndPoint.of("myns", "another_file"); + Instant deleteTime = currentTime.minus(2, ChronoUnit.HOURS); + fieldLineageTable.deleteFieldRecordsBefore(deleteTime); + List actual = fieldLineageTable.getEndpoints("ns1", + program.getProgramReference(), runId); + List expected = ImmutableList.of(EndPoint.of("ns1", "endpoint1")); + // Asserts that only the endpoint records are deleted. + Assert.assertEquals(expected, actual); + + Set expectedDestinationFields = new HashSet<>(Arrays.asList("offset", "name")); + Set expectedSourceFields = new HashSet<>(Arrays.asList("offset", "body")); + // End time of currentTime should return the data for the run which was added at time + // currentTime - 1 minute. + Assert.assertEquals(expectedDestinationFields, + fieldLineageTable.getFields(destination, 0, currentTime.toEpochMilli())); + Assert.assertEquals(expectedSourceFields, fieldLineageTable.getFields(source, 0, currentTime.toEpochMilli())); + + }); + } + @AfterClass public static void teardown() throws IOException { pg.close(); diff --git a/cdap-data-fabric/src/test/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTableTest.java b/cdap-data-fabric/src/test/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTableTest.java index 861f7948cdfd..a11bd0d7f9ff 100644 --- a/cdap-data-fabric/src/test/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTableTest.java +++ b/cdap-data-fabric/src/test/java/io/cdap/cdap/spi/data/nosql/NoSqlStructuredTableTest.java @@ -29,6 +29,7 @@ import io.cdap.cdap.spi.data.table.StructuredTableSchema; import io.cdap.cdap.spi.data.table.field.Field; import io.cdap.cdap.spi.data.table.field.Fields; +import io.cdap.cdap.spi.data.transaction.TransactionException; import io.cdap.cdap.spi.data.transaction.TransactionRunner; import java.io.IOException; import java.util.ArrayList; @@ -202,6 +203,11 @@ public void testFilterByIndexIteratorMultiMatch() { Assert.assertEquals(actual.get(2), filterIndex.getValue()); } + @Test(expected = TransactionException.class) + @Override + public void testScanDeleteAll() throws Exception{ + super.testScanDeleteAll(); + } private static class MockScanner implements Scanner { private final Iterator iterator; private boolean closed; diff --git a/cdap-storage-ext-spanner/src/main/java/io/cdap/cdap/storage/spanner/SpannerStructuredTable.java b/cdap-storage-ext-spanner/src/main/java/io/cdap/cdap/storage/spanner/SpannerStructuredTable.java index 96c42968c5eb..c49e04204d4c 100644 --- a/cdap-storage-ext-spanner/src/main/java/io/cdap/cdap/storage/spanner/SpannerStructuredTable.java +++ b/cdap-storage-ext-spanner/src/main/java/io/cdap/cdap/storage/spanner/SpannerStructuredTable.java @@ -37,6 +37,7 @@ import io.cdap.cdap.spi.data.table.field.FieldValidator; import io.cdap.cdap.spi.data.table.field.Fields; import io.cdap.cdap.spi.data.table.field.Range; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -377,7 +378,21 @@ public void delete(Collection> keys) throws InvalidFieldException { @Override public void deleteAll(Range range) throws InvalidFieldException { fieldValidator.validateScanRange(range); + Statement statement = buildRangeDeleteStatement(range); + transactionContext.executeUpdate(statement); + } + @Override + public void scanDeleteAll(Range keyRange) + throws InvalidFieldException, UnsupportedOperationException, IOException { + keyRange.getBegin().forEach(fieldValidator::validateField); + keyRange.getEnd().forEach(fieldValidator::validateField); + Statement statement = buildRangeDeleteStatement(keyRange); + LOG.trace("Executing scanDeleteAll statement: {}", statement); + transactionContext.executeUpdate(statement); + } + + private Statement buildRangeDeleteStatement(Range range) { Map parameters = new HashMap<>(); String condition = getRangeWhereClause(range, parameters); @@ -385,7 +400,7 @@ public void deleteAll(Range range) throws InvalidFieldException { "DELETE FROM " + escapeName(schema.getTableId().getName()) + " WHERE " + (condition.isEmpty() ? "true" : condition)); parameters.forEach((name, value) -> builder.bind(name).to(value)); - transactionContext.executeUpdate(builder.build()); + return builder.build(); } @Override diff --git a/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/StructuredTable.java b/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/StructuredTable.java index df05e525b7d6..e4272b3c9292 100644 --- a/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/StructuredTable.java +++ b/cdap-storage-spi/src/main/java/io/cdap/cdap/spi/data/StructuredTable.java @@ -274,11 +274,29 @@ void increment(Collection> keys, String column, long amount) * * @param keyRange key range of the rows to delete * @throws InvalidFieldException if any of the keys are not part of table schema, or their - * types do not match the schema + * types do not match the schema or the first key in range does not match the first field of + * the primary key. * @throws IOException if there is an error reading or deleting from the table */ void deleteAll(Range keyRange) throws InvalidFieldException, IOException; + /** + * Delete a range of rows from the table based on a potentially non indexed column. + * + *

+ * This is a potentially slow operation because it may query a non primary key or a non indexed + * column. + *

+ * + * @param keyRange key range of the rows to delete + * @throws InvalidFieldException if any of the keys are not part of table schema, or their + * types do not match the schema + * @throws UnsupportedOperationException if this method is not supported in the database + * implementation. + * @throws IOException if there is an error reading or deleting from the table + */ + void scanDeleteAll(Range keyRange) throws InvalidFieldException, UnsupportedOperationException, IOException; + /** * Updates the specific fields in a range of rows from the table. * diff --git a/cdap-storage-spi/src/test/java/io/cdap/cdap/spi/data/StructuredTableTest.java b/cdap-storage-spi/src/test/java/io/cdap/cdap/spi/data/StructuredTableTest.java index 81c876486809..0b7446f7f441 100644 --- a/cdap-storage-spi/src/test/java/io/cdap/cdap/spi/data/StructuredTableTest.java +++ b/cdap-storage-spi/src/test/java/io/cdap/cdap/spi/data/StructuredTableTest.java @@ -1037,6 +1037,70 @@ public void testDeleteAll() throws Exception { Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max)); } + @Test + public void testScanDeleteAll() throws Exception { + int max = 10; + List>> expected = writeSimpleStructuredRows(max, ""); + Assert.assertEquals(max, expected.size()); + // Delete 7-9 (both inclusive) using a column which is part of PK but not the first PK. + expected.subList(7, 10).clear(); + getTransactionRunner().run(context -> { + StructuredTable table = context.getTable(SIMPLE_TABLE); + Range range = Range.create(Arrays.asList(Fields.longField(KEY2, 7L)), + Range.Bound.INCLUSIVE, + Arrays.asList(Fields.longField(KEY2, 9L)), + Range.Bound.INCLUSIVE); + table.scanDeleteAll(range); + }); + // Verify the deletion + Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max)); + + // Delete 6 using a random column + expected.subList(6, 7).clear(); + getTransactionRunner().run(context -> { + StructuredTable table = context.getTable(SIMPLE_TABLE); + Range range = Range.create(Arrays.asList(Fields.doubleField(DOUBLE_COL, 6.0)), + Range.Bound.INCLUSIVE, + Arrays.asList(Fields.doubleField(DOUBLE_COL, 6.0)), + Range.Bound.INCLUSIVE); + table.scanDeleteAll(range); + }); + + // Verify the deletion + Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max)); + + // Delete 2-5 (end exclusive) using the first key only + expected.subList(2, 5).clear(); + getTransactionRunner().run(context -> { + StructuredTable table = context.getTable(SIMPLE_TABLE); + Range range = Range.create(Collections.singletonList(Fields.intField(KEY, 2)), Range.Bound.INCLUSIVE, + Collections.singletonList(Fields.intField(KEY, 5)), Range.Bound.EXCLUSIVE); + table.scanDeleteAll(range); + }); + // Verify the deletion + Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max)); + + // Use a range outside the element list, nothing should get deleted + getTransactionRunner().run(context -> { + StructuredTable table = context.getTable(SIMPLE_TABLE); + Range range = Range.create(Collections.singletonList(Fields.intField(KEY, max + 1)), Range.Bound.INCLUSIVE, + Collections.singletonList(Fields.intField(KEY, max + 5)), Range.Bound.EXCLUSIVE); + table.scanDeleteAll(range); + }); + // Verify the deletion + Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max)); + + // Delete all the remaining + expected.clear(); + getTransactionRunner().run(context -> { + StructuredTable table = context.getTable(SIMPLE_TABLE); + table.scanDeleteAll(Range.all()); + }); + // Verify the deletion + Assert.assertEquals(expected, scanSimpleStructuredRows(Range.all(), max)); + } + + @Test public void testIndexScanIndexStringFieldType() throws Exception { int num = 5;