diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index d6732364985a..ad37c5380c56 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -119,6 +119,7 @@ import org.apache.druid.msq.indexing.error.MSQWarningReportLimiterPublisher; import org.apache.druid.msq.indexing.error.QueryNotSupportedFault; import org.apache.druid.msq.indexing.error.TooManyBucketsFault; +import org.apache.druid.msq.indexing.error.TooManySegmentsInTimeChunkFault; import org.apache.druid.msq.indexing.error.TooManyWarningsFault; import org.apache.druid.msq.indexing.error.UnknownFault; import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault; @@ -962,6 +963,14 @@ private List generateSegmentIdsWithShardSpecsForAppend( final Granularity segmentGranularity = destination.getSegmentGranularity(); + // Compute & validate partitions by bucket (time chunk) if there is a maximum number of segments to be enforced per time chunk + if (querySpec.getTuningConfig().getMaxNumSegments() != null) { + final Map>> partitionsByBucket = + getPartitionsByBucket(partitionBoundaries, segmentGranularity, keyReader); + + validateNumSegmentsPerBucketOrThrow(partitionsByBucket, segmentGranularity); + } + String previousSegmentId = null; segmentReport = new MSQSegmentReport( @@ -1029,6 +1038,43 @@ private List generateSegmentIdsWithShardSpecsForAppend( return retVal; } + /** + * Return partition ranges by bucket (time chunk). + */ + private Map>> getPartitionsByBucket( + final ClusterByPartitions partitionBoundaries, + final Granularity segmentGranularity, + final RowKeyReader keyReader + ) + { + final Map>> partitionsByBucket = new HashMap<>(); + for (int i = 0; i < partitionBoundaries.ranges().size(); i++) { + final ClusterByPartition partitionBoundary = partitionBoundaries.ranges().get(i); + final DateTime bucketDateTime = getBucketDateTime(partitionBoundary, segmentGranularity, keyReader); + partitionsByBucket.computeIfAbsent(bucketDateTime, ignored -> new ArrayList<>()) + .add(Pair.of(i, partitionBoundary)); + } + return partitionsByBucket; + } + + private void validateNumSegmentsPerBucketOrThrow( + final Map>> partitionsByBucket, + final Granularity segmentGranularity + ) + { + final Integer maxNumSegments = querySpec.getTuningConfig().getMaxNumSegments(); + if (maxNumSegments == null) { + // Return early because a null value indicates no maximum, i.e., a time chunk can have any number of segments. + return; + } + for (final Map.Entry>> bucketEntry : partitionsByBucket.entrySet()) { + final int numSegmentsInTimeChunk = bucketEntry.getValue().size(); + if (numSegmentsInTimeChunk > maxNumSegments) { + throw new MSQException(new TooManySegmentsInTimeChunkFault(bucketEntry.getKey(), numSegmentsInTimeChunk, maxNumSegments, segmentGranularity)); + } + } + } + /** * Used by {@link #generateSegmentIdsWithShardSpecs}. * @@ -1072,13 +1118,11 @@ private List generateSegmentIdsWithShardSpecsForReplace( } // Group partition ranges by bucket (time chunk), so we can generate shardSpecs for each bucket independently. - final Map>> partitionsByBucket = new HashMap<>(); - for (int i = 0; i < partitionBoundaries.ranges().size(); i++) { - ClusterByPartition partitionBoundary = partitionBoundaries.ranges().get(i); - final DateTime bucketDateTime = getBucketDateTime(partitionBoundary, segmentGranularity, keyReader); - partitionsByBucket.computeIfAbsent(bucketDateTime, ignored -> new ArrayList<>()) - .add(Pair.of(i, partitionBoundary)); - } + final Map>> partitionsByBucket = + getPartitionsByBucket(partitionBoundaries, segmentGranularity, keyReader); + + // Validate the buckets. + validateNumSegmentsPerBucketOrThrow(partitionsByBucket, segmentGranularity); // Process buckets (time chunks) one at a time. for (final Map.Entry>> bucketEntry : partitionsByBucket.entrySet()) { @@ -1090,6 +1134,7 @@ private List generateSegmentIdsWithShardSpecsForReplace( } final List> ranges = bucketEntry.getValue(); + String version = null; final List locks = context.taskActionClient().submit(new LockListAction()); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java index f4d24cfc5c4c..c64c893e5cd8 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java @@ -59,6 +59,7 @@ import org.apache.druid.msq.indexing.error.TooManyInputFilesFault; import org.apache.druid.msq.indexing.error.TooManyPartitionsFault; import org.apache.druid.msq.indexing.error.TooManyRowsWithSameKeyFault; +import org.apache.druid.msq.indexing.error.TooManySegmentsInTimeChunkFault; import org.apache.druid.msq.indexing.error.TooManyWarningsFault; import org.apache.druid.msq.indexing.error.TooManyWorkersFault; import org.apache.druid.msq.indexing.error.UnknownFault; @@ -126,6 +127,7 @@ public class MSQIndexingModule implements DruidModule TooManyInputFilesFault.class, TooManyPartitionsFault.class, TooManyRowsWithSameKeyFault.class, + TooManySegmentsInTimeChunkFault.class, TooManyWarningsFault.class, TooManyWorkersFault.class, TooManyAttemptsForJob.class, diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQTuningConfig.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQTuningConfig.java index f36cbec2b399..b6958b2df5ed 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQTuningConfig.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQTuningConfig.java @@ -56,6 +56,9 @@ public class MSQTuningConfig @Nullable private final Integer rowsPerSegment; + @Nullable + private final Integer maxNumSegments; + @Nullable private final IndexSpec indexSpec; @@ -63,18 +66,20 @@ public MSQTuningConfig( @JsonProperty("maxNumWorkers") @Nullable final Integer maxNumWorkers, @JsonProperty("maxRowsInMemory") @Nullable final Integer maxRowsInMemory, @JsonProperty("rowsPerSegment") @Nullable final Integer rowsPerSegment, + @JsonProperty("maxNumSegments") @Nullable final Integer maxNumSegments, @JsonProperty("indexSpec") @Nullable final IndexSpec indexSpec ) { this.maxNumWorkers = maxNumWorkers; this.maxRowsInMemory = maxRowsInMemory; this.rowsPerSegment = rowsPerSegment; + this.maxNumSegments = maxNumSegments; this.indexSpec = indexSpec; } public static MSQTuningConfig defaultConfig() { - return new MSQTuningConfig(null, null, null, null); + return new MSQTuningConfig(null, null, null, null, null); } @JsonProperty("maxNumWorkers") @@ -98,6 +103,13 @@ Integer getRowsPerSegmentForSerialization() return rowsPerSegment; } + @JsonProperty("maxNumSegments") + @JsonInclude(JsonInclude.Include.NON_NULL) + Integer getMaxNumSegmentsForSerialization() + { + return maxNumSegments; + } + @JsonProperty("indexSpec") @JsonInclude(JsonInclude.Include.NON_NULL) IndexSpec getIndexSpecForSerialization() @@ -120,6 +132,12 @@ public int getRowsPerSegment() return rowsPerSegment != null ? rowsPerSegment : PartitionsSpec.DEFAULT_MAX_ROWS_PER_SEGMENT; } + @Nullable + public Integer getMaxNumSegments() + { + return maxNumSegments; + } + public IndexSpec getIndexSpec() { return indexSpec != null ? indexSpec : IndexSpec.DEFAULT; @@ -138,13 +156,14 @@ public boolean equals(Object o) return Objects.equals(maxNumWorkers, that.maxNumWorkers) && Objects.equals(maxRowsInMemory, that.maxRowsInMemory) && Objects.equals(rowsPerSegment, that.rowsPerSegment) + && Objects.equals(maxNumSegments, that.maxNumSegments) && Objects.equals(indexSpec, that.indexSpec); } @Override public int hashCode() { - return Objects.hash(maxNumWorkers, maxRowsInMemory, rowsPerSegment, indexSpec); + return Objects.hash(maxNumWorkers, maxRowsInMemory, rowsPerSegment, maxNumSegments, indexSpec); } @Override @@ -154,6 +173,7 @@ public String toString() "maxNumWorkers=" + maxNumWorkers + ", maxRowsInMemory=" + maxRowsInMemory + ", rowsPerSegment=" + rowsPerSegment + + ", maxNumSegments=" + maxNumSegments + ", indexSpec=" + indexSpec + '}'; } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManySegmentsInTimeChunkFault.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManySegmentsInTimeChunkFault.java new file mode 100644 index 000000000000..ac0c2a641da0 --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManySegmentsInTimeChunkFault.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing.error; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.granularity.GranularityType; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.joda.time.DateTime; + +import java.util.Objects; + +@JsonTypeName(TooManySegmentsInTimeChunkFault.CODE) +public class TooManySegmentsInTimeChunkFault extends BaseMSQFault +{ + public static final String CODE = "TooManySegmentsInTimeChunk"; + + private final DateTime timeChunk; + private final int numSegments; + private final int maxNumSegments; + private final Granularity segmentGranularity; + + @JsonCreator + public TooManySegmentsInTimeChunkFault( + @JsonProperty("timeChunk") final DateTime timeChunk, + @JsonProperty("numSegments") final int numSegments, + @JsonProperty("maxNumSegments") final int maxNumSegments, + @JsonProperty("segmentGranularity") final Granularity segmentGranularity + ) + { + super( + CODE, + "Too many segments requested to be generated in time chunk[%s] with granularity[%s]" + + " (requested = [%,d], maximum = [%,d]). Please try breaking up your query or change the maximum using" + + " the query context parameter[%s].", + timeChunk, + convertToGranularityString(segmentGranularity), + numSegments, + maxNumSegments, + MultiStageQueryContext.CTX_MAX_NUM_SEGMENTS + ); + this.timeChunk = timeChunk; + this.numSegments = numSegments; + this.maxNumSegments = maxNumSegments; + this.segmentGranularity = segmentGranularity; + } + + /** + * Convert the given granularity to a more user-friendly granularity string, when possible. + */ + private static String convertToGranularityString(final Granularity granularity) + { + // If it's a "standard" granularity, we get a nicer string from the GranularityType enum. For any other + // granularity, we just fall back to the toString(). See GranularityType#isStandard(). + for (GranularityType value : GranularityType.values()) { + if (value.getDefaultGranularity().equals(granularity)) { + return value.name(); + } + } + return granularity.toString(); + } + + @JsonProperty + public DateTime getTimeChunk() + { + return timeChunk; + } + + @JsonProperty + public int getNumSegments() + { + return numSegments; + } + + @JsonProperty + public int getMaxNumSegments() + { + return maxNumSegments; + } + + @JsonProperty + public Granularity getSegmentGranularity() + { + return segmentGranularity; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + TooManySegmentsInTimeChunkFault that = (TooManySegmentsInTimeChunkFault) o; + return numSegments == that.numSegments + && maxNumSegments == that.maxNumSegments + && Objects.equals(timeChunk, that.timeChunk) + && Objects.equals(segmentGranularity, that.segmentGranularity); + } + + @Override + public int hashCode() + { + return Objects.hash(super.hashCode(), timeChunk, numSegments, maxNumSegments, segmentGranularity); + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index 58031558a0a7..c6396c0b3060 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -162,6 +162,7 @@ public QueryResponse runQuery(final DruidQuery druidQuery) final int maxNumWorkers = maxNumTasks - 1; final int rowsPerSegment = MultiStageQueryContext.getRowsPerSegment(sqlQueryContext); final int maxRowsInMemory = MultiStageQueryContext.getRowsInMemory(sqlQueryContext); + final Integer maxNumSegments = MultiStageQueryContext.getMaxNumSegments(sqlQueryContext); final IndexSpec indexSpec = MultiStageQueryContext.getIndexSpec(sqlQueryContext, jsonMapper); final boolean finalizeAggregations = MultiStageQueryContext.isFinalizeAggregations(sqlQueryContext); @@ -279,7 +280,7 @@ public QueryResponse runQuery(final DruidQuery druidQuery) .columnMappings(new ColumnMappings(columnMappings)) .destination(destination) .assignmentStrategy(MultiStageQueryContext.getAssignmentStrategy(sqlQueryContext)) - .tuningConfig(new MSQTuningConfig(maxNumWorkers, maxRowsInMemory, rowsPerSegment, indexSpec)) + .tuningConfig(new MSQTuningConfig(maxNumWorkers, maxRowsInMemory, rowsPerSegment, maxNumSegments, indexSpec)) .build(); MSQTaskQueryMakerUtils.validateRealtimeReindex(querySpec); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java index d001e0177a69..f8bdb36bfaea 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java @@ -149,6 +149,8 @@ public class MultiStageQueryContext public static final String CTX_IS_REINDEX = "isReindex"; + public static final String CTX_MAX_NUM_SEGMENTS = "maxNumSegments"; + /** * Controls sort order within segments. Normally, this is the same as the overall order of the query (from the * CLUSTERED BY clause) but it can be overridden. @@ -324,6 +326,12 @@ public static int getRowsInMemory(final QueryContext queryContext) return queryContext.getInt(CTX_ROWS_IN_MEMORY, DEFAULT_ROWS_IN_MEMORY); } + public static Integer getMaxNumSegments(final QueryContext queryContext) + { + // The default is null, if the context is not set. + return queryContext.getInt(CTX_MAX_NUM_SEGMENTS); + } + public static List getSortOrder(final QueryContext queryContext) { return decodeList(CTX_SORT_ORDER, queryContext.getString(CTX_SORT_ORDER)); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java index 425609628b3a..d144e7659578 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java @@ -29,9 +29,11 @@ import org.apache.druid.indexing.common.actions.SegmentAllocateAction; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.task.Tasks; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault; import org.apache.druid.msq.indexing.error.InsertCannotBeEmptyFault; import org.apache.druid.msq.indexing.error.InsertTimeNullFault; @@ -41,6 +43,7 @@ import org.apache.druid.msq.indexing.error.TooManyColumnsFault; import org.apache.druid.msq.indexing.error.TooManyInputFilesFault; import org.apache.druid.msq.indexing.error.TooManyPartitionsFault; +import org.apache.druid.msq.indexing.error.TooManySegmentsInTimeChunkFault; import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.test.MSQTestTaskActionClient; import org.apache.druid.segment.column.ColumnType; @@ -284,7 +287,7 @@ public void testInsertTimeNullFault() } @Test - public void testInsertWithTooManySegments() throws IOException + public void testInsertWithTooManyPartitions() throws IOException { Map context = ImmutableMap.builder() .putAll(DEFAULT_MSQ_CONTEXT) @@ -316,6 +319,51 @@ public void testInsertWithTooManySegments() throws IOException } + @Test + public void testReplaceWithTooManySegmentsInTimeChunk() throws IOException + { + // Each segment will contain at most 10 rows. So with ALL granularity, an ingest query will + // attempt to generate a total of 5 segments for 50 input rows but will fail since only 1 segment is allowed. + final int maxNumSegments = 1; + final int rowsPerSegment = 10; + final int numRowsInInputFile = 50; + + final Map context = ImmutableMap.builder() + .putAll(DEFAULT_MSQ_CONTEXT) + .put("maxNumSegments", maxNumSegments) + .put("rowsPerSegment", rowsPerSegment) + .build(); + + + final File file = createNdJsonFile(newTempFile("ndjson30k"), numRowsInInputFile, 1); + final String filePathAsJson = queryFramework().queryJsonMapper().writeValueAsString(file.getAbsolutePath()); + + testIngestQuery().setSql( + "REPLACE INTO foo1 " + + " OVERWRITE ALL " + + " SELECT FLOOR(TIME_PARSE(\"timestamp\") to day) AS __time" + + " FROM TABLE(\n" + + " EXTERN(\n" + + " '{ \"files\": [" + filePathAsJson + "],\"type\":\"local\"}',\n" + + " '{\"type\": \"json\"}',\n" + + " '[{\"name\": \"timestamp\",\"type\":\"string\"}]'\n" + + " )\n" + + " ) PARTITIONED BY ALL") + .setExpectedDataSource("foo1") + .setExpectedRowSignature(RowSignature.builder().add("__time", ColumnType.LONG).build()) + .setQueryContext(context) + .setExpectedMSQFault( + new TooManySegmentsInTimeChunkFault( + DateTimes.of("1970-01-01"), + numRowsInInputFile / rowsPerSegment, + maxNumSegments, + Granularities.ALL + ) + ) + .verifyResults(); + + } + /** * Helper method that populates a file with {@code numRows} rows and {@code numColumns} columns where the * first column is a string 'timestamp' while the rest are string columns with junk value diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index ecdc30294dbd..03ed429848af 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -30,11 +30,13 @@ import org.apache.druid.hll.HyperLogLogCollector; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.task.Tasks; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.msq.indexing.error.ColumnNameRestrictedFault; import org.apache.druid.msq.indexing.error.RowTooLargeFault; +import org.apache.druid.msq.indexing.error.TooManySegmentsInTimeChunkFault; import org.apache.druid.msq.indexing.report.MSQSegmentReport; import org.apache.druid.msq.kernel.WorkerAssignmentStrategy; import org.apache.druid.msq.test.CounterSnapshotMatcher; @@ -50,6 +52,7 @@ import org.apache.druid.timeline.partition.NumberedShardSpec; import org.hamcrest.CoreMatchers; import org.junit.internal.matchers.ThrowableMessageMatcher; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mockito; @@ -1366,6 +1369,66 @@ public void testInsertWithTooLargeRowShouldThrowException(String contextName, Ma .verifyExecutionError(); } + @Test + public void testInsertWithTooManySegmentsInTimeChunk() + { + final Map context = ImmutableMap.builder() + .putAll(DEFAULT_MSQ_CONTEXT) + .put("maxNumSegments", 1) + .put("rowsPerSegment", 1) + .build(); + + testIngestQuery().setSql("INSERT INTO foo" + + " SELECT TIME_PARSE(ts) AS __time, c1 " + + " FROM (VALUES('2023-01-01', 'day1_1'), ('2023-01-01', 'day1_2'), ('2023-02-01', 'day2')) AS t(ts, c1)" + + " PARTITIONED BY DAY") + .setExpectedDataSource("foo") + .setExpectedRowSignature(RowSignature.builder().add("__time", ColumnType.LONG).build()) + .setQueryContext(context) + .setExpectedMSQFault( + new TooManySegmentsInTimeChunkFault( + DateTimes.of("2023-01-01"), + 2, + 1, + Granularities.DAY + ) + ) + .verifyResults(); + + } + + @Test + public void testInsertWithMaxNumSegments() + { + final Map context = ImmutableMap.builder() + .putAll(DEFAULT_MSQ_CONTEXT) + .put("maxNumSegments", 2) + .put("rowsPerSegment", 1) + .build(); + + final RowSignature expectedRowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("c1", ColumnType.STRING) + .build(); + // Ingest query should at most generate 2 segments per time chunk + // i.e. 2 segments for the first time chunk and 1 segment for the last time chunk. + testIngestQuery().setSql("INSERT INTO foo" + + " SELECT TIME_PARSE(ts) AS __time, c1 " + + " FROM (VALUES('2023-01-01', 'day1_1'), ('2023-01-01', 'day1_2'), ('2023-02-01', 'day2')) AS t(ts, c1)" + + " PARTITIONED BY DAY") + .setQueryContext(context) + .setExpectedDataSource("foo") + .setExpectedRowSignature(expectedRowSignature) + .setExpectedResultRows( + ImmutableList.of( + new Object[]{1672531200000L, "day1_1"}, + new Object[]{1672531200000L, "day1_2"}, + new Object[]{1675209600000L, "day2"} + ) + ) + .verifyResults(); + } + @MethodSource("data") @ParameterizedTest(name = "{index}:with context {0}") public void testInsertLimitWithPeriodGranularityThrowsException(String contextName, Map context) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index 174e09243aeb..7d7f4e310c62 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -41,7 +41,9 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.GranularityType; +import org.apache.druid.msq.indexing.error.TooManySegmentsInTimeChunkFault; import org.apache.druid.msq.indexing.report.MSQSegmentReport; import org.apache.druid.msq.test.CounterSnapshotMatcher; import org.apache.druid.msq.test.MSQTestBase; @@ -753,6 +755,68 @@ public void testReplaceWhereClauseLargerThanData(String contextName, Map context = ImmutableMap.builder() + .putAll(DEFAULT_MSQ_CONTEXT) + .put("maxNumSegments", 1) + .put("rowsPerSegment", 1) + .build(); + + testIngestQuery().setSql("REPLACE INTO foo" + + " OVERWRITE ALL " + + " SELECT TIME_PARSE(ts) AS __time, c1 " + + " FROM (VALUES('2023-01-01 01:00:00', 'day1_1'), ('2023-01-01 01:00:00', 'day1_2'), ('2023-02-01 06:00:00', 'day2')) AS t(ts, c1)" + + " PARTITIONED BY HOUR") + .setExpectedDataSource("foo") + .setExpectedRowSignature(RowSignature.builder().add("__time", ColumnType.LONG).build()) + .setQueryContext(context) + .setExpectedMSQFault( + new TooManySegmentsInTimeChunkFault( + DateTimes.of("2023-01-01T01:00:00.000Z"), + 2, + 1, + Granularities.HOUR + ) + ) + .verifyResults(); + + } + + @Test + public void testReplaceWithMaxNumSegments() + { + final Map context = ImmutableMap.builder() + .putAll(DEFAULT_MSQ_CONTEXT) + .put("maxNumSegments", 1) + .build(); + + final RowSignature expectedRowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("c1", ColumnType.STRING) + .build(); + + // Ingest query should generate at most 1 segment for all the rows. + testIngestQuery().setSql("REPLACE INTO foo" + + " OVERWRITE ALL" + + " SELECT TIME_PARSE(ts) AS __time, c1 " + + " FROM (VALUES('2023-01-01', 'day1_1'), ('2023-01-01', 'day1_2'), ('2023-02-01', 'day2')) AS t(ts, c1)" + + " LIMIT 10" + + " PARTITIONED BY ALL") + .setQueryContext(context) + .setExpectedDataSource("foo") + .setExpectedRowSignature(expectedRowSignature) + .setExpectedResultRows( + ImmutableList.of( + new Object[]{1672531200000L, "day1_1"}, + new Object[]{1672531200000L, "day1_2"}, + new Object[]{1675209600000L, "day2"} + ) + ) + .verifyResults(); + } + @MethodSource("data") @ParameterizedTest(name = "{index}:with context {0}") public void testReplaceLimitWithPeriodGranularityThrowsException(String contextName, Map context) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQTuningConfigTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQTuningConfigTest.java index 44d22b3d50fa..45d93001386c 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQTuningConfigTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQTuningConfigTest.java @@ -53,6 +53,7 @@ public void testSerdeNonDefault() throws Exception 2, 3, 4, + 10, IndexSpec.builder() .withStringDictionaryEncoding( new StringEncodingStrategy.FrontCoded(null, FrontCodedIndexed.V1) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java index 6ee9a5b52760..c33faa40c14e 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQFaultSerdeTest.java @@ -22,7 +22,9 @@ import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.ObjectMapper; import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.msq.guice.MSQIndexingModule; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.ColumnType; @@ -84,6 +86,7 @@ public void testFaultSerde() throws IOException assertFaultSerde(new TooManyInputFilesFault(15, 10, 5)); assertFaultSerde(new TooManyPartitionsFault(10)); assertFaultSerde(new TooManyRowsWithSameKeyFault(Arrays.asList("foo", 123), 1, 2)); + assertFaultSerde(new TooManySegmentsInTimeChunkFault(DateTimes.nowUtc(), 10, 1, Granularities.ALL)); assertFaultSerde(new TooManyWarningsFault(10, "the error")); assertFaultSerde(new TooManyWorkersFault(10, 5)); assertFaultSerde(new TooManyAttemptsForWorker(2, "taskId", 1, "rootError")); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 33c1374d2a73..f7c9b3296cab 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -834,6 +834,10 @@ private void assertTuningConfig( expectedTuningConfig.getRowsPerSegment(), tuningConfig.getRowsPerSegment() ); + Assert.assertEquals( + expectedTuningConfig.getMaxNumSegments(), + tuningConfig.getMaxNumSegments() + ); } @Nullable