diff --git a/docs/api-reference/legacy-metadata-api.md b/docs/api-reference/legacy-metadata-api.md index 453159c1a582..134ede08d876 100644 --- a/docs/api-reference/legacy-metadata-api.md +++ b/docs/api-reference/legacy-metadata-api.md @@ -248,6 +248,16 @@ Returns full segment metadata for a specific segment in the cluster. Return the tiers that a datasource exists in. +`GET /druid/coordinator/v1/datasources/{dataSourceName}/unusedSegments?interval={interval}&limit={limit}&lastSegmentId={lastSegmentId}&sortOrder={sortOrder}` + +Returns a list of unused segments for a datasource in the cluster contained within an optionally specified interval. +Optional parameters for limit and lastSegmentId can be given as well, to limit results and enable paginated results. +The results may be sorted in either ASC, or DESC order concerning their id, start, and end time, depending on +specifying the sortOrder parameter. The default behavior in the absence of all optional parameters is to return all +unused segments for the given datasource in no guaranteed order. + +Example usage: `GET /druid/coordinator/v1/datasources/inline_data/unusedSegments?interval=2023-12-01_2023-12-10&limit=10&lastSegmentId=inline_data_2023-12-03T00%3A00%3A00.000Z_2023-12-04T00%3A00%3A00.000Z_2023-12-09T14%3A16%3A53.738Z&sortOrder=ASC}` + ## Intervals Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/` as in `2016-06-27_2016-06-28`. diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 62f55f96c475..c62e59c0b25e 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -233,7 +233,7 @@ public List retrieveUnusedSegmentsForInterval( (handle, status) -> { try (final CloseableIterator iterator = SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) - .retrieveUnusedSegments(dataSource, Collections.singletonList(interval), limit)) { + .retrieveUnusedSegments(dataSource, Collections.singletonList(interval), limit, null, null)) { return ImmutableList.copyOf(iterator); } } diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java index a774afcd47b3..eb8a36bc3a78 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java @@ -125,6 +125,30 @@ Optional> iterateAllUsedNonOvershadowedSegmentsForDatasour boolean requiresLatest ); + /** + * Returns an iterable to go over un-used segments for a given datasource over an optional interval. + * The order in which segments are iterated is from earliest start-time, with ties being broken with earliest end-time + * first. Note: the iteration may not be as trivially cheap as, + * for example, iteration over an ArrayList. Try (to some reasonable extent) to organize the code so that it + * iterates the returned iterable only once rather than several times. + * + * @param datasource the name of the datasource. + * @param interval an optional interval to search over. If none is specified, {@link org.apache.druid.java.util.common.Intervals#ETERNITY} + * @param limit an optional maximum number of results to return. If none is specified, the results are not limited. + * @param lastSegmentId an optional last segment id from which to search for results. All segments returned are > + * this segment lexigraphically if sortOrder is null or {@link SortOrder#ASC}, or < this segment + * lexigraphically if sortOrder is {@link SortOrder#DESC}. If none is specified, no such filter is used. + * @param sortOrder an optional order with which to return the matching segments by id, start time, end time. + * If none is specified, the order of the results is not guarenteed. + */ + Iterable iterateAllUnusedSegmentsForDatasource( + String datasource, + @Nullable Interval interval, + @Nullable Integer limit, + @Nullable String lastSegmentId, + @Nullable SortOrder sortOrder + ); + /** * Retrieves all data source names for which there are segment in the database, regardless of whether those segments * are used or not. If there are no segments in the database, returns an empty set. diff --git a/server/src/main/java/org/apache/druid/metadata/SortOrder.java b/server/src/main/java/org/apache/druid/metadata/SortOrder.java new file mode 100644 index 000000000000..afabd0cde59a --- /dev/null +++ b/server/src/main/java/org/apache/druid/metadata/SortOrder.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.java.util.common.StringUtils; + +import java.util.Arrays; +import java.util.stream.Collectors; + +/** + * Specifies the sort order when doing metadata store queries. + */ +public enum SortOrder +{ + ASC("ASC"), + + DESC("DESC"); + + private String value; + + SortOrder(String value) + { + this.value = value; + } + + @Override + @JsonValue + public String toString() + { + return String.valueOf(value); + } + + @JsonCreator + public static SortOrder fromValue(String value) + { + for (SortOrder b : SortOrder.values()) { + if (String.valueOf(b.value).equalsIgnoreCase(String.valueOf(value))) { + return b; + } + } + throw InvalidInput.exception(StringUtils.format( + "Unexpected value[%s] for SortOrder. Possible values are: %s", + value, Arrays.stream(SortOrder.values()).map(SortOrder::toString).collect(Collectors.toList()) + )); + } +} diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 14c6ef6c1fcf..12d43ec5b76a 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -25,6 +25,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; @@ -686,7 +687,7 @@ private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, @Nullable } try (final CloseableIterator iterator = - queryTool.retrieveUnusedSegments(dataSourceName, intervals, null)) { + queryTool.retrieveUnusedSegments(dataSourceName, intervals, null, null, null)) { while (iterator.hasNext()) { final DataSegment dataSegment = iterator.next(); timeline.addSegments(Iterators.singletonIterator(dataSegment)); @@ -955,6 +956,51 @@ public Optional> iterateAllUsedNonOvershadowedSegmentsForD .transform(timeline -> timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE)); } + /** + * Retrieves segments for a given datasource that are marked unused and that are *fully contained by* an optionally + * specified interval. If the interval specified is null, this method will retrieve all unused segments. + * + * This call does not return any information about realtime segments. + * + * @param datasource The name of the datasource + * @param interval an optional interval to search over. + * @param limit an optional maximum number of results to return. If none is specified, the results are + * not limited. + * @param lastSegmentId an optional last segment id from which to search for results. All segments returned are > + * this segment lexigraphically if sortOrder is null or {@link SortOrder#ASC}, or < this + * segment lexigraphically if sortOrder is {@link SortOrder#DESC}. If none is specified, no + * such filter is used. + * @param sortOrder an optional order with which to return the matching segments by id, start time, end time. If + * none is specified, the order of the results is not guarenteed. + + * Returns an iterable. + */ + @Override + public Iterable iterateAllUnusedSegmentsForDatasource( + final String datasource, + @Nullable final Interval interval, + @Nullable final Integer limit, + @Nullable final String lastSegmentId, + @Nullable final SortOrder sortOrder + ) + { + return connector.inReadOnlyTransaction( + (handle, status) -> { + final SqlSegmentsMetadataQuery queryTool = + SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables.get(), jsonMapper); + + final List intervals = + interval == null + ? Intervals.ONLY_ETERNITY + : Collections.singletonList(interval); + try (final CloseableIterator iterator = + queryTool.retrieveUnusedSegments(datasource, intervals, limit, lastSegmentId, sortOrder)) { + return ImmutableList.copyOf(iterator); + } + } + ); + } + @Override public Set retrieveAllDataSourceNames() { diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index 61fc919a8be5..63bfdbf53dec 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -117,7 +117,7 @@ public CloseableIterator retrieveUsedSegments( final Collection intervals ) { - return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS, true, null); + return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS, true, null, null, null); } /** @@ -127,15 +127,26 @@ public CloseableIterator retrieveUsedSegments( * * This call does not return any information about realtime segments. * + * @param dataSource The name of the datasource + * @param intervals The intervals to search over + * @param limit The limit of segments to return + * @param lastSegmentId the last segment id from which to search for results. All segments returned are > + * this segment lexigraphically if sortOrder is null or ASC, or < this segment + * lexigraphically if sortOrder is DESC. + * @param sortOrder Specifies the order with which to return the matching segments by start time, end time. + * A null value indicates that order does not matter. + * Returns a closeable iterator. You should close it when you are done. */ public CloseableIterator retrieveUnusedSegments( final String dataSource, final Collection intervals, - @Nullable final Integer limit + @Nullable final Integer limit, + @Nullable final String lastSegmentId, + @Nullable final SortOrder sortOrder ) { - return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS, false, limit); + return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS, false, limit, lastSegmentId, sortOrder); } /** @@ -223,7 +234,15 @@ public int markSegmentsUnused(final String dataSource, final Interval interval) // Retrieve, then drop, since we can't write a WHERE clause directly. final List segments = ImmutableList.copyOf( Iterators.transform( - retrieveSegments(dataSource, Collections.singletonList(interval), IntervalMode.CONTAINS, true, null), + retrieveSegments( + dataSource, + Collections.singletonList(interval), + IntervalMode.CONTAINS, + true, + null, + null, + null + ), DataSegment::getId ) ); @@ -358,12 +377,14 @@ private CloseableIterator retrieveSegments( final Collection intervals, final IntervalMode matchMode, final boolean used, - @Nullable final Integer limit + @Nullable final Integer limit, + @Nullable final String lastSegmentId, + @Nullable final SortOrder sortOrder ) { - if (intervals.isEmpty()) { + if (intervals.isEmpty() || intervals.size() <= MAX_INTERVALS_PER_BATCH) { return CloseableIterators.withEmptyBaggage( - retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode, used, limit) + retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode, used, limit, lastSegmentId, sortOrder) ); } else { final List> intervalsLists = Lists.partition(new ArrayList<>(intervals), MAX_INTERVALS_PER_BATCH); @@ -371,7 +392,15 @@ private CloseableIterator retrieveSegments( Integer limitPerBatch = limit; for (final List intervalList : intervalsLists) { - final UnmodifiableIterator iterator = retrieveSegmentsInIntervalsBatch(dataSource, intervalList, matchMode, used, limitPerBatch); + final UnmodifiableIterator iterator = retrieveSegmentsInIntervalsBatch( + dataSource, + intervalList, + matchMode, + used, + limitPerBatch, + lastSegmentId, + sortOrder + ); if (limitPerBatch != null) { // If limit is provided, we need to shrink the limit for subsequent batches or circuit break if // we have reached what was requested for. @@ -394,7 +423,9 @@ private UnmodifiableIterator retrieveSegmentsInIntervalsBatch( final Collection intervals, final IntervalMode matchMode, final boolean used, - @Nullable final Integer limit + @Nullable final Integer limit, + @Nullable final String lastSegmentId, + @Nullable final SortOrder sortOrder ) { // Check if the intervals all support comparing as strings. If so, bake them into the SQL. @@ -407,11 +438,33 @@ private UnmodifiableIterator retrieveSegmentsInIntervalsBatch( appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode, connector); } + if (lastSegmentId != null) { + sb.append( + StringUtils.format( + " AND id %s :id", + (sortOrder == null || sortOrder == SortOrder.ASC) + ? ">" + : "<" + ) + ); + } + + if (sortOrder != null) { + sb.append(StringUtils.format(" ORDER BY id %2$s, start %2$s, %1$send%1$s %2$s", + connector.getQuoteString(), + sortOrder.toString())); + } final Query> sql = handle - .createQuery(StringUtils.format(sb.toString(), dbTables.getSegmentsTable())) + .createQuery(StringUtils.format( + sb.toString(), + dbTables.getSegmentsTable() + )) .setFetchSize(connector.getStreamingFetchSize()) .bind("used", used) .bind("dataSource", dataSource); + if (lastSegmentId != null) { + sql.bind("id", lastSegmentId); + } if (null != limit) { sql.setMaxRows(limit); } diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index 3fc13469723e..fb976a04bf3e 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -26,9 +26,12 @@ import com.sun.jersey.spi.container.ResourceFilters; import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.ImmutableDruidDataSource; +import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.Segments; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.metadata.SegmentsMetadataManager; +import org.apache.druid.metadata.SortOrder; import org.apache.druid.segment.metadata.AvailableSegmentMetadata; import org.apache.druid.segment.metadata.CoordinatorSegmentMetadataCache; import org.apache.druid.segment.metadata.DataSourceInformation; @@ -334,6 +337,51 @@ public Response getUsedSegmentsInDataSourceForIntervals( return builder.entity(Collections2.transform(segments, DataSegment::getId)).build(); } + @GET + @Path("/datasources/{dataSourceName}/unusedSegments") + @Produces(MediaType.APPLICATION_JSON) + public Response getUnusedSegmentsInDataSource( + @Context final HttpServletRequest req, + @PathParam("dataSourceName") final String dataSource, + @QueryParam("interval") @Nullable String interval, + @QueryParam("limit") @Nullable Integer limit, + @QueryParam("lastSegmentId") @Nullable final String lastSegmentId, + @QueryParam("sortOrder") @Nullable final String sortOrder + ) + { + if (dataSource == null || dataSource.isEmpty()) { + throw InvalidInput.exception("dataSourceName must be non-empty"); + } + if (limit != null && limit < 0) { + throw InvalidInput.exception("Invalid limit[%s] specified. Limit must be > 0", limit); + } + + if (lastSegmentId != null && SegmentId.tryParse(dataSource, lastSegmentId) == null) { + throw InvalidInput.exception("Invalid lastSegmentId[%s] specified.", lastSegmentId); + } + + SortOrder theSortOrder = sortOrder == null ? null : SortOrder.fromValue(sortOrder); + + final Interval theInterval = interval != null ? Intervals.of(interval.replace('_', '/')) : null; + Iterable unusedSegments = segmentsMetadataManager.iterateAllUnusedSegmentsForDatasource( + dataSource, + theInterval, + limit, + lastSegmentId, + theSortOrder + ); + + final Function> raGenerator = segment -> Collections.singletonList( + AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource())); + + final Iterable authorizedSegments = + AuthorizationUtils.filterAuthorizedResources(req, unusedSegments, raGenerator, authorizerMapper); + + final List retVal = new ArrayList<>(); + authorizedSegments.iterator().forEachRemaining(retVal::add); + return Response.status(Response.Status.OK).entity(retVal).build(); + } + @GET @Path("/datasources/{dataSourceName}/segments/{segmentId}") @Produces(MediaType.APPLICATION_JSON) diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 8e2e7eb747fb..9e977dec3e8c 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -1206,23 +1206,86 @@ public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndNoLimit() throws final List segments = createAndGetUsedYearSegments(1900, 2133); markAllSegmentsUnused(new HashSet<>(segments)); - final ImmutableList actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit( + final ImmutableList actualUnusedSegments = retrieveUnusedSegments( segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + null, + null, + null + ); + Assert.assertEquals(segments.size(), actualUnusedSegments.size()); + Assert.assertTrue(segments.containsAll(actualUnusedSegments)); + } + + @Test + public void testRetrieveUnusedSegmentsUsingNoIntervalsNoLimitAndNoLastSegmentId() throws IOException + { + final List segments = createAndGetUsedYearSegments(1900, 2133); + markAllSegmentsUnused(new HashSet<>(segments)); + + final ImmutableList actualUnusedSegments = retrieveUnusedSegments( + ImmutableList.of(), + null, + null, null ); Assert.assertEquals(segments.size(), actualUnusedSegments.size()); Assert.assertTrue(segments.containsAll(actualUnusedSegments)); } + @Test + public void testRetrieveUnusedSegmentsUsingNoIntervalsAndNoLimitAndNoLastSegmentId() throws IOException + { + final List segments = createAndGetUsedYearSegments(2033, 2133); + markAllSegmentsUnused(new HashSet<>(segments)); + + String lastSegmentId = segments.get(9).getId().toString(); + final List expectedSegmentsAscOrder = segments.stream() + .filter(s -> s.getId().toString().compareTo(lastSegmentId) > 0) + .collect(Collectors.toList()); + ImmutableList actualUnusedSegments = retrieveUnusedSegments( + ImmutableList.of(), + null, + lastSegmentId, + null + ); + Assert.assertEquals(expectedSegmentsAscOrder.size(), actualUnusedSegments.size()); + Assert.assertTrue(expectedSegmentsAscOrder.containsAll(actualUnusedSegments)); + + actualUnusedSegments = retrieveUnusedSegments( + ImmutableList.of(), + null, + lastSegmentId, + SortOrder.ASC + ); + Assert.assertEquals(expectedSegmentsAscOrder.size(), actualUnusedSegments.size()); + Assert.assertEquals(expectedSegmentsAscOrder, actualUnusedSegments); + + final List expectedSegmentsDescOrder = segments.stream() + .filter(s -> s.getId().toString().compareTo(lastSegmentId) < 0) + .collect(Collectors.toList()); + Collections.reverse(expectedSegmentsDescOrder); + + actualUnusedSegments = retrieveUnusedSegments( + ImmutableList.of(), + null, + lastSegmentId, + SortOrder.DESC + ); + Assert.assertEquals(expectedSegmentsDescOrder.size(), actualUnusedSegments.size()); + Assert.assertEquals(expectedSegmentsDescOrder, actualUnusedSegments); + } + @Test public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitAtRange() throws IOException { final List segments = createAndGetUsedYearSegments(1900, 2133); markAllSegmentsUnused(new HashSet<>(segments)); - final ImmutableList actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit( + final ImmutableList actualUnusedSegments = retrieveUnusedSegments( segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), - segments.size() + segments.size(), + null, + null ); Assert.assertEquals(segments.size(), actualUnusedSegments.size()); Assert.assertTrue(segments.containsAll(actualUnusedSegments)); @@ -1235,23 +1298,69 @@ public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitInRange() th markAllSegmentsUnused(new HashSet<>(segments)); final int requestedLimit = segments.size() - 1; - final ImmutableList actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit( - segments.stream().limit(requestedLimit).map(DataSegment::getInterval).collect(Collectors.toList()), - requestedLimit + final ImmutableList actualUnusedSegments = retrieveUnusedSegments( + segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + requestedLimit, + null, + null ); Assert.assertEquals(requestedLimit, actualUnusedSegments.size()); Assert.assertTrue(actualUnusedSegments.containsAll(segments.stream().limit(requestedLimit).collect(Collectors.toList()))); } + @Test + public void testRetrieveUnusedSegmentsUsingMultipleIntervalsInSingleBatchLimitAndOffsetInRange() throws IOException + { + final List segments = createAndGetUsedYearSegments(2034, 2133); + markAllSegmentsUnused(new HashSet<>(segments)); + + final int requestedLimit = segments.size(); + final String lastSegmentId = segments.get(4).getId().toString(); + final ImmutableList actualUnusedSegments = retrieveUnusedSegments( + segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + requestedLimit, + lastSegmentId, + null + ); + Assert.assertEquals(segments.size() - 5, actualUnusedSegments.size()); + Assert.assertEquals(actualUnusedSegments, segments.stream() + .filter(s -> s.getId().toString().compareTo(lastSegmentId) > 0) + .limit(requestedLimit) + .collect(Collectors.toList())); + } + + @Test + public void testRetrieveUnusedSegmentsUsingMultipleIntervalsLimitAndOffsetInRange() throws IOException + { + final List segments = createAndGetUsedYearSegments(1900, 2133); + markAllSegmentsUnused(new HashSet<>(segments)); + + final int requestedLimit = segments.size() - 1; + final String lastSegmentId = segments.get(4).getId().toString(); + final ImmutableList actualUnusedSegments = retrieveUnusedSegments( + segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + requestedLimit, + lastSegmentId, + null + ); + Assert.assertEquals(requestedLimit - 4, actualUnusedSegments.size()); + Assert.assertEquals(actualUnusedSegments, segments.stream() + .filter(s -> s.getId().toString().compareTo(lastSegmentId) > 0) + .limit(requestedLimit) + .collect(Collectors.toList())); + } + @Test public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitOutOfRange() throws IOException { final List segments = createAndGetUsedYearSegments(1900, 2133); markAllSegmentsUnused(new HashSet<>(segments)); - final ImmutableList actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit( + final ImmutableList actualUnusedSegments = retrieveUnusedSegments( segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), - segments.size() + 1 + segments.size() + 1, + null, + null ); Assert.assertEquals(segments.size(), actualUnusedSegments.size()); Assert.assertTrue(actualUnusedSegments.containsAll(segments)); @@ -1267,8 +1376,10 @@ public void testRetrieveUnusedSegmentsUsingIntervalOutOfRange() throws IOExcepti Assert.assertTrue(segments.stream() .anyMatch(segment -> !segment.getInterval().overlaps(outOfRangeInterval))); - final ImmutableList actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit( + final ImmutableList actualUnusedSegments = retrieveUnusedSegments( ImmutableList.of(outOfRangeInterval), + null, + null, null ); Assert.assertEquals(0, actualUnusedSegments.size()); @@ -3048,21 +3159,23 @@ private List createAndGetUsedYearSegments(final int startYear, fina return segments; } - private ImmutableList retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit( + private ImmutableList retrieveUnusedSegments( final List intervals, - final Integer limit + final Integer limit, + final String lastSegmentId, + final SortOrder sortOrder ) { return derbyConnector.inReadOnlyTransaction( (handle, status) -> { try (final CloseableIterator iterator = SqlSegmentsMetadataQuery.forHandle( - handle, - derbyConnector, - derbyConnectorRule.metadataTablesConfigSupplier().get(), - mapper - ) - .retrieveUnusedSegments(DS.WIKI, intervals, limit)) { + handle, + derbyConnector, + derbyConnectorRule.metadataTablesConfigSupplier().get(), + mapper + ) + .retrieveUnusedSegments(DS.WIKI, intervals, limit, lastSegmentId, sortOrder)) { return ImmutableList.copyOf(iterator); } } diff --git a/server/src/test/java/org/apache/druid/metadata/SortOrderTest.java b/server/src/test/java/org/apache/druid/metadata/SortOrderTest.java new file mode 100644 index 000000000000..a191763d444b --- /dev/null +++ b/server/src/test/java/org/apache/druid/metadata/SortOrderTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import org.apache.druid.error.DruidExceptionMatcher; +import org.junit.Assert; +import org.junit.Test; + +public class SortOrderTest +{ + + @Test + public void testAsc() + { + Assert.assertEquals(SortOrder.ASC, SortOrder.fromValue("asc")); + Assert.assertEquals("ASC", SortOrder.fromValue("asc").toString()); + Assert.assertEquals(SortOrder.ASC, SortOrder.fromValue("ASC")); + Assert.assertEquals("ASC", SortOrder.fromValue("ASC").toString()); + Assert.assertEquals(SortOrder.ASC, SortOrder.fromValue("AsC")); + Assert.assertEquals("ASC", SortOrder.fromValue("AsC").toString()); + } + + @Test + public void testDesc() + { + Assert.assertEquals(SortOrder.DESC, SortOrder.fromValue("desc")); + Assert.assertEquals("DESC", SortOrder.fromValue("desc").toString()); + Assert.assertEquals(SortOrder.DESC, SortOrder.fromValue("DESC")); + Assert.assertEquals("DESC", SortOrder.fromValue("DESC").toString()); + Assert.assertEquals(SortOrder.DESC, SortOrder.fromValue("DesC")); + Assert.assertEquals("DESC", SortOrder.fromValue("DesC").toString()); + } + + @Test + public void testInvalid() + { + DruidExceptionMatcher.invalidInput().expectMessageIs( + "Unexpected value[bad] for SortOrder. Possible values are: [ASC, DESC]" + ).assertThrowsAndMatches( + () -> SortOrder.fromValue("bad") + ); + } +} diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java index 7a23234761ee..c1a5dbbdac02 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java @@ -56,7 +56,6 @@ import java.util.Set; import java.util.stream.Collectors; - public class SqlSegmentsMetadataManagerTest { private static DataSegment createSegment( diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java index 76493dfdfce9..cfb8fec941e0 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java @@ -24,6 +24,7 @@ import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.metadata.SegmentsMetadataManager; +import org.apache.druid.metadata.SortOrder; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.Partitions; import org.apache.druid.timeline.SegmentId; @@ -192,6 +193,18 @@ public Optional> iterateAllUsedNonOvershadowedSegmentsForD )); } + @Override + public Iterable iterateAllUnusedSegmentsForDatasource( + String datasource, + @Nullable Interval interval, + @Nullable Integer limit, + @Nullable String lastSegmentId, + @Nullable SortOrder sortOrder + ) + { + return null; + } + @Override public Set retrieveAllDataSourceNames() { diff --git a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java index cb6e9d2a37ed..b1c8f0cb6de5 100644 --- a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java @@ -26,9 +26,13 @@ import com.google.common.collect.Lists; import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.ImmutableDruidDataSource; +import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.metadata.SegmentsMetadataManager; +import org.apache.druid.metadata.SortOrder; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.metadata.AvailableSegmentMetadata; @@ -42,28 +46,36 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentStatusInCluster; +import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatchers; import org.mockito.Mockito; +import org.mockito.stubbing.Answer; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Response; + +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class MetadataResourceTest { private static final String DATASOURCE1 = "datasource1"; - + private static final String SEGMENT_START_INTERVAL = "2012-10-24"; + private static final int NUM_PARTITIONS = 2; private final DataSegment[] segments = CreateDataSegments.ofDatasource(DATASOURCE1) - .forIntervals(3, Granularities.DAY) - .withNumPartitions(2) - .eachOfSizeInMb(500) - .toArray(new DataSegment[0]); + .startingAt(SEGMENT_START_INTERVAL) + .forIntervals(3, Granularities.DAY) + .withNumPartitions(NUM_PARTITIONS) + .eachOfSizeInMb(500) + .toArray(new DataSegment[0]); private HttpServletRequest request; private SegmentsMetadataManager segmentsMetadataManager; private IndexerMetadataStorageCoordinator storageCoordinator; @@ -236,6 +248,126 @@ public void testGetAllSegmentsIncludingRealtime() Assert.assertEquals(new SegmentStatusInCluster(realTimeSegments[1], false, null, 40L, true), resultList.get(5)); } + @Test + public void testGetUnusedSegmentsInDataSource() + { + Mockito.doAnswer(mockIterateAllUnusedSegmentsForDatasource()) + .when(segmentsMetadataManager) + .iterateAllUnusedSegmentsForDatasource( + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any()); + + // test with null datasource name - fails with expected bad datasource name error + DruidExceptionMatcher.invalidInput().expectMessageIs( + "dataSourceName must be non-empty" + ).assertThrowsAndMatches( + () -> metadataResource.getUnusedSegmentsInDataSource(request, null, null, null, null, null) + ); + + // test with empty datasource name - fails with expected bad datasource name error + DruidExceptionMatcher.invalidInput().expectMessageIs( + "dataSourceName must be non-empty" + ).assertThrowsAndMatches( + () -> metadataResource.getUnusedSegmentsInDataSource(request, "", null, null, null, null) + ); + + // test invalid datasource - returns empty segments + Response response = metadataResource.getUnusedSegmentsInDataSource( + request, + "invalid_datasource", + null, + null, + null, + null + ); + List resultList = extractResponseList(response); + Assert.assertTrue(resultList.isEmpty()); + + // test valid datasource with bad limit - fails with expected invalid limit message + DruidExceptionMatcher.invalidInput().expectMessageIs( + StringUtils.format("Invalid limit[%s] specified. Limit must be > 0", -1) + ).assertThrowsAndMatches( + () -> metadataResource.getUnusedSegmentsInDataSource(request, DATASOURCE1, null, -1, null, null) + ); + + // test valid datasource with invalid lastSegmentId - fails with expected invalid lastSegmentId message + DruidExceptionMatcher.invalidInput().expectMessageIs( + StringUtils.format("Invalid lastSegmentId[%s] specified.", "invalid") + ).assertThrowsAndMatches( + () -> metadataResource.getUnusedSegmentsInDataSource(request, DATASOURCE1, null, null, "invalid", null) + ); + + // test valid datasource - returns all unused segments for that datasource + response = metadataResource.getUnusedSegmentsInDataSource(request, DATASOURCE1, null, null, null, null); + + resultList = extractResponseList(response); + Assert.assertEquals(Arrays.asList(segments), resultList); + + // test valid datasource with interval filter - returns all unused segments for that datasource within interval + int numDays = 2; + String interval = SEGMENT_START_INTERVAL + "_P" + numDays + "D"; + response = metadataResource.getUnusedSegmentsInDataSource(request, DATASOURCE1, interval, null, null, null); + + resultList = extractResponseList(response); + Assert.assertEquals(NUM_PARTITIONS * numDays, resultList.size()); + Assert.assertEquals(Arrays.asList(segments[0], segments[1], segments[2], segments[3]), resultList); + + // test valid datasource with interval filter limit and last segment id - returns unused segments for that + // datasource within interval upto limit starting at last segment id + int limit = 3; + response = metadataResource.getUnusedSegmentsInDataSource(request, DATASOURCE1, interval, limit, null, null); + + resultList = extractResponseList(response); + Assert.assertEquals(limit, resultList.size()); + Assert.assertEquals(Arrays.asList(segments[0], segments[1], segments[2]), resultList); + + // test valid datasource with interval filter limit and offset - returns unused segments for that datasource within + // interval upto limit starting at offset + response = metadataResource.getUnusedSegmentsInDataSource( + request, + DATASOURCE1, + interval, + limit, + segments[2].getId().toString(), + null + ); + + resultList = extractResponseList(response); + Assert.assertEquals(Collections.singletonList(segments[3]), resultList); + } + + Answer> mockIterateAllUnusedSegmentsForDatasource() + { + return invocationOnMock -> { + String dataSourceName = invocationOnMock.getArgument(0); + Interval interval = invocationOnMock.getArgument(1); + Integer limit = invocationOnMock.getArgument(2); + String lastSegmentId = invocationOnMock.getArgument(3); + SortOrder sortOrder = invocationOnMock.getArgument(4); + if (!DATASOURCE1.equals(dataSourceName)) { + return ImmutableList.of(); + } + + return Arrays.stream(segments) + .filter(d -> d.getDataSource().equals(dataSourceName) + && (interval == null + || (d.getInterval().getStartMillis() >= interval.getStartMillis() + && d.getInterval().getEndMillis() <= interval.getEndMillis())) + && (lastSegmentId == null + || (sortOrder == null && d.getId().toString().compareTo(lastSegmentId) > 0) + || (sortOrder == SortOrder.ASC && d.getId().toString().compareTo(lastSegmentId) > 0) + || (sortOrder == SortOrder.DESC && d.getId().toString().compareTo(lastSegmentId) < 0))) + .sorted((o1, o2) -> Comparators.intervalsByStartThenEnd().compare(o1.getInterval(), o2.getInterval())) + .limit(limit != null + ? limit + : segments.length) + .collect(Collectors.toList()); + }; + } + @Test public void testGetDataSourceInformation() { diff --git a/website/.spelling b/website/.spelling index 5b21a4c5bd2c..cad92260fbdc 100644 --- a/website/.spelling +++ b/website/.spelling @@ -373,6 +373,7 @@ kubernetes kubexit k8s laning +lastSegmentId lifecycle localhost log4j @@ -501,6 +502,7 @@ Smoosh smoosh smooshed snapshotting +sortOrder splittable ssl sslmode