From ab7d9bc6ecf0dbb39c0eaf5b807829c019c6aca7 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Mon, 11 Dec 2023 16:32:18 -0500 Subject: [PATCH] Add api for Retrieving unused segments (#15415) ### Description This pr adds an api for retrieving unused segments for a particular datasource. The api supports pagination by the addition of `limit` and `lastSegmentId` parameters. The resulting unused segments are returned with optional `sortOrder`, `ASC` or `DESC` with respect to the matching segments `id`, `start time`, and `end time`, or not returned in any guarenteed order if `sortOrder` is not specified `GET /druid/coordinator/v1/datasources/{dataSourceName}/unusedSegments?interval={interval}&limit={limit}&lastSegmentId={lastSegmentId}&sortOrder={sortOrder}` Returns a list of unused segments for a datasource in the cluster contained within an optionally specified interval. Optional parameters for limit and lastSegmentId can be given as well, to limit results and enable paginated results. The results may be sorted in either ASC, or DESC order depending on specifying the sortOrder parameter. `dataSourceName`: The name of the datasource `interval`: the specific interval to search for unused segments for. `limit`: the maximum number of unused segments to return information about. This property helps to support pagination `lastSegmentId`: the last segment id from which to search for results. All segments returned are > this segment lexigraphically if sortOrder is null or ASC, or < this segment lexigraphically if sortOrder is DESC. `sortOrder`: Specifies the order with which to return the matching segments by start time, end time. A null value indicates that order does not matter. This PR has: - [x] been self-reviewed. - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.) - [x] added documentation for new or modified features or behaviors. - [ ] a release note entry in the PR description. - [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links. - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md) - [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader. - [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met. - [ ] added integration tests. - [x] been tested in a test Druid cluster. --- docs/api-reference/legacy-metadata-api.md | 10 ++ .../IndexerSQLMetadataStorageCoordinator.java | 2 +- .../metadata/SegmentsMetadataManager.java | 24 +++ .../org/apache/druid/metadata/SortOrder.java | 66 ++++++++ .../metadata/SqlSegmentsMetadataManager.java | 48 +++++- .../metadata/SqlSegmentsMetadataQuery.java | 73 +++++++-- .../druid/server/http/MetadataResource.java | 48 ++++++ ...exerSQLMetadataStorageCoordinatorTest.java | 147 ++++++++++++++++-- .../apache/druid/metadata/SortOrderTest.java | 60 +++++++ .../SqlSegmentsMetadataManagerTest.java | 1 - .../simulate/TestSegmentsMetadataManager.java | 13 ++ .../server/http/MetadataResourceTest.java | 142 ++++++++++++++++- website/.spelling | 2 + 13 files changed, 601 insertions(+), 35 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/metadata/SortOrder.java create mode 100644 server/src/test/java/org/apache/druid/metadata/SortOrderTest.java diff --git a/docs/api-reference/legacy-metadata-api.md b/docs/api-reference/legacy-metadata-api.md index 453159c1a582..134ede08d876 100644 --- a/docs/api-reference/legacy-metadata-api.md +++ b/docs/api-reference/legacy-metadata-api.md @@ -248,6 +248,16 @@ Returns full segment metadata for a specific segment in the cluster. Return the tiers that a datasource exists in. +`GET /druid/coordinator/v1/datasources/{dataSourceName}/unusedSegments?interval={interval}&limit={limit}&lastSegmentId={lastSegmentId}&sortOrder={sortOrder}` + +Returns a list of unused segments for a datasource in the cluster contained within an optionally specified interval. +Optional parameters for limit and lastSegmentId can be given as well, to limit results and enable paginated results. +The results may be sorted in either ASC, or DESC order concerning their id, start, and end time, depending on +specifying the sortOrder parameter. The default behavior in the absence of all optional parameters is to return all +unused segments for the given datasource in no guaranteed order. + +Example usage: `GET /druid/coordinator/v1/datasources/inline_data/unusedSegments?interval=2023-12-01_2023-12-10&limit=10&lastSegmentId=inline_data_2023-12-03T00%3A00%3A00.000Z_2023-12-04T00%3A00%3A00.000Z_2023-12-09T14%3A16%3A53.738Z&sortOrder=ASC}` + ## Intervals Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/` as in `2016-06-27_2016-06-28`. diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 62f55f96c475..c62e59c0b25e 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -233,7 +233,7 @@ public List retrieveUnusedSegmentsForInterval( (handle, status) -> { try (final CloseableIterator iterator = SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) - .retrieveUnusedSegments(dataSource, Collections.singletonList(interval), limit)) { + .retrieveUnusedSegments(dataSource, Collections.singletonList(interval), limit, null, null)) { return ImmutableList.copyOf(iterator); } } diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java index a774afcd47b3..eb8a36bc3a78 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java @@ -125,6 +125,30 @@ Optional> iterateAllUsedNonOvershadowedSegmentsForDatasour boolean requiresLatest ); + /** + * Returns an iterable to go over un-used segments for a given datasource over an optional interval. + * The order in which segments are iterated is from earliest start-time, with ties being broken with earliest end-time + * first. Note: the iteration may not be as trivially cheap as, + * for example, iteration over an ArrayList. Try (to some reasonable extent) to organize the code so that it + * iterates the returned iterable only once rather than several times. + * + * @param datasource the name of the datasource. + * @param interval an optional interval to search over. If none is specified, {@link org.apache.druid.java.util.common.Intervals#ETERNITY} + * @param limit an optional maximum number of results to return. If none is specified, the results are not limited. + * @param lastSegmentId an optional last segment id from which to search for results. All segments returned are > + * this segment lexigraphically if sortOrder is null or {@link SortOrder#ASC}, or < this segment + * lexigraphically if sortOrder is {@link SortOrder#DESC}. If none is specified, no such filter is used. + * @param sortOrder an optional order with which to return the matching segments by id, start time, end time. + * If none is specified, the order of the results is not guarenteed. + */ + Iterable iterateAllUnusedSegmentsForDatasource( + String datasource, + @Nullable Interval interval, + @Nullable Integer limit, + @Nullable String lastSegmentId, + @Nullable SortOrder sortOrder + ); + /** * Retrieves all data source names for which there are segment in the database, regardless of whether those segments * are used or not. If there are no segments in the database, returns an empty set. diff --git a/server/src/main/java/org/apache/druid/metadata/SortOrder.java b/server/src/main/java/org/apache/druid/metadata/SortOrder.java new file mode 100644 index 000000000000..afabd0cde59a --- /dev/null +++ b/server/src/main/java/org/apache/druid/metadata/SortOrder.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonValue; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.java.util.common.StringUtils; + +import java.util.Arrays; +import java.util.stream.Collectors; + +/** + * Specifies the sort order when doing metadata store queries. + */ +public enum SortOrder +{ + ASC("ASC"), + + DESC("DESC"); + + private String value; + + SortOrder(String value) + { + this.value = value; + } + + @Override + @JsonValue + public String toString() + { + return String.valueOf(value); + } + + @JsonCreator + public static SortOrder fromValue(String value) + { + for (SortOrder b : SortOrder.values()) { + if (String.valueOf(b.value).equalsIgnoreCase(String.valueOf(value))) { + return b; + } + } + throw InvalidInput.exception(StringUtils.format( + "Unexpected value[%s] for SortOrder. Possible values are: %s", + value, Arrays.stream(SortOrder.values()).map(SortOrder::toString).collect(Collectors.toList()) + )); + } +} diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 14c6ef6c1fcf..12d43ec5b76a 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -25,6 +25,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; @@ -686,7 +687,7 @@ private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, @Nullable } try (final CloseableIterator iterator = - queryTool.retrieveUnusedSegments(dataSourceName, intervals, null)) { + queryTool.retrieveUnusedSegments(dataSourceName, intervals, null, null, null)) { while (iterator.hasNext()) { final DataSegment dataSegment = iterator.next(); timeline.addSegments(Iterators.singletonIterator(dataSegment)); @@ -955,6 +956,51 @@ public Optional> iterateAllUsedNonOvershadowedSegmentsForD .transform(timeline -> timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE)); } + /** + * Retrieves segments for a given datasource that are marked unused and that are *fully contained by* an optionally + * specified interval. If the interval specified is null, this method will retrieve all unused segments. + * + * This call does not return any information about realtime segments. + * + * @param datasource The name of the datasource + * @param interval an optional interval to search over. + * @param limit an optional maximum number of results to return. If none is specified, the results are + * not limited. + * @param lastSegmentId an optional last segment id from which to search for results. All segments returned are > + * this segment lexigraphically if sortOrder is null or {@link SortOrder#ASC}, or < this + * segment lexigraphically if sortOrder is {@link SortOrder#DESC}. If none is specified, no + * such filter is used. + * @param sortOrder an optional order with which to return the matching segments by id, start time, end time. If + * none is specified, the order of the results is not guarenteed. + + * Returns an iterable. + */ + @Override + public Iterable iterateAllUnusedSegmentsForDatasource( + final String datasource, + @Nullable final Interval interval, + @Nullable final Integer limit, + @Nullable final String lastSegmentId, + @Nullable final SortOrder sortOrder + ) + { + return connector.inReadOnlyTransaction( + (handle, status) -> { + final SqlSegmentsMetadataQuery queryTool = + SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables.get(), jsonMapper); + + final List intervals = + interval == null + ? Intervals.ONLY_ETERNITY + : Collections.singletonList(interval); + try (final CloseableIterator iterator = + queryTool.retrieveUnusedSegments(datasource, intervals, limit, lastSegmentId, sortOrder)) { + return ImmutableList.copyOf(iterator); + } + } + ); + } + @Override public Set retrieveAllDataSourceNames() { diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index 61fc919a8be5..63bfdbf53dec 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -117,7 +117,7 @@ public CloseableIterator retrieveUsedSegments( final Collection intervals ) { - return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS, true, null); + return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS, true, null, null, null); } /** @@ -127,15 +127,26 @@ public CloseableIterator retrieveUsedSegments( * * This call does not return any information about realtime segments. * + * @param dataSource The name of the datasource + * @param intervals The intervals to search over + * @param limit The limit of segments to return + * @param lastSegmentId the last segment id from which to search for results. All segments returned are > + * this segment lexigraphically if sortOrder is null or ASC, or < this segment + * lexigraphically if sortOrder is DESC. + * @param sortOrder Specifies the order with which to return the matching segments by start time, end time. + * A null value indicates that order does not matter. + * Returns a closeable iterator. You should close it when you are done. */ public CloseableIterator retrieveUnusedSegments( final String dataSource, final Collection intervals, - @Nullable final Integer limit + @Nullable final Integer limit, + @Nullable final String lastSegmentId, + @Nullable final SortOrder sortOrder ) { - return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS, false, limit); + return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS, false, limit, lastSegmentId, sortOrder); } /** @@ -223,7 +234,15 @@ public int markSegmentsUnused(final String dataSource, final Interval interval) // Retrieve, then drop, since we can't write a WHERE clause directly. final List segments = ImmutableList.copyOf( Iterators.transform( - retrieveSegments(dataSource, Collections.singletonList(interval), IntervalMode.CONTAINS, true, null), + retrieveSegments( + dataSource, + Collections.singletonList(interval), + IntervalMode.CONTAINS, + true, + null, + null, + null + ), DataSegment::getId ) ); @@ -358,12 +377,14 @@ private CloseableIterator retrieveSegments( final Collection intervals, final IntervalMode matchMode, final boolean used, - @Nullable final Integer limit + @Nullable final Integer limit, + @Nullable final String lastSegmentId, + @Nullable final SortOrder sortOrder ) { - if (intervals.isEmpty()) { + if (intervals.isEmpty() || intervals.size() <= MAX_INTERVALS_PER_BATCH) { return CloseableIterators.withEmptyBaggage( - retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode, used, limit) + retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode, used, limit, lastSegmentId, sortOrder) ); } else { final List> intervalsLists = Lists.partition(new ArrayList<>(intervals), MAX_INTERVALS_PER_BATCH); @@ -371,7 +392,15 @@ private CloseableIterator retrieveSegments( Integer limitPerBatch = limit; for (final List intervalList : intervalsLists) { - final UnmodifiableIterator iterator = retrieveSegmentsInIntervalsBatch(dataSource, intervalList, matchMode, used, limitPerBatch); + final UnmodifiableIterator iterator = retrieveSegmentsInIntervalsBatch( + dataSource, + intervalList, + matchMode, + used, + limitPerBatch, + lastSegmentId, + sortOrder + ); if (limitPerBatch != null) { // If limit is provided, we need to shrink the limit for subsequent batches or circuit break if // we have reached what was requested for. @@ -394,7 +423,9 @@ private UnmodifiableIterator retrieveSegmentsInIntervalsBatch( final Collection intervals, final IntervalMode matchMode, final boolean used, - @Nullable final Integer limit + @Nullable final Integer limit, + @Nullable final String lastSegmentId, + @Nullable final SortOrder sortOrder ) { // Check if the intervals all support comparing as strings. If so, bake them into the SQL. @@ -407,11 +438,33 @@ private UnmodifiableIterator retrieveSegmentsInIntervalsBatch( appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode, connector); } + if (lastSegmentId != null) { + sb.append( + StringUtils.format( + " AND id %s :id", + (sortOrder == null || sortOrder == SortOrder.ASC) + ? ">" + : "<" + ) + ); + } + + if (sortOrder != null) { + sb.append(StringUtils.format(" ORDER BY id %2$s, start %2$s, %1$send%1$s %2$s", + connector.getQuoteString(), + sortOrder.toString())); + } final Query> sql = handle - .createQuery(StringUtils.format(sb.toString(), dbTables.getSegmentsTable())) + .createQuery(StringUtils.format( + sb.toString(), + dbTables.getSegmentsTable() + )) .setFetchSize(connector.getStreamingFetchSize()) .bind("used", used) .bind("dataSource", dataSource); + if (lastSegmentId != null) { + sql.bind("id", lastSegmentId); + } if (null != limit) { sql.setMaxRows(limit); } diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index 3fc13469723e..fb976a04bf3e 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -26,9 +26,12 @@ import com.sun.jersey.spi.container.ResourceFilters; import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.ImmutableDruidDataSource; +import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.Segments; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.metadata.SegmentsMetadataManager; +import org.apache.druid.metadata.SortOrder; import org.apache.druid.segment.metadata.AvailableSegmentMetadata; import org.apache.druid.segment.metadata.CoordinatorSegmentMetadataCache; import org.apache.druid.segment.metadata.DataSourceInformation; @@ -334,6 +337,51 @@ public Response getUsedSegmentsInDataSourceForIntervals( return builder.entity(Collections2.transform(segments, DataSegment::getId)).build(); } + @GET + @Path("/datasources/{dataSourceName}/unusedSegments") + @Produces(MediaType.APPLICATION_JSON) + public Response getUnusedSegmentsInDataSource( + @Context final HttpServletRequest req, + @PathParam("dataSourceName") final String dataSource, + @QueryParam("interval") @Nullable String interval, + @QueryParam("limit") @Nullable Integer limit, + @QueryParam("lastSegmentId") @Nullable final String lastSegmentId, + @QueryParam("sortOrder") @Nullable final String sortOrder + ) + { + if (dataSource == null || dataSource.isEmpty()) { + throw InvalidInput.exception("dataSourceName must be non-empty"); + } + if (limit != null && limit < 0) { + throw InvalidInput.exception("Invalid limit[%s] specified. Limit must be > 0", limit); + } + + if (lastSegmentId != null && SegmentId.tryParse(dataSource, lastSegmentId) == null) { + throw InvalidInput.exception("Invalid lastSegmentId[%s] specified.", lastSegmentId); + } + + SortOrder theSortOrder = sortOrder == null ? null : SortOrder.fromValue(sortOrder); + + final Interval theInterval = interval != null ? Intervals.of(interval.replace('_', '/')) : null; + Iterable unusedSegments = segmentsMetadataManager.iterateAllUnusedSegmentsForDatasource( + dataSource, + theInterval, + limit, + lastSegmentId, + theSortOrder + ); + + final Function> raGenerator = segment -> Collections.singletonList( + AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource())); + + final Iterable authorizedSegments = + AuthorizationUtils.filterAuthorizedResources(req, unusedSegments, raGenerator, authorizerMapper); + + final List retVal = new ArrayList<>(); + authorizedSegments.iterator().forEachRemaining(retVal::add); + return Response.status(Response.Status.OK).entity(retVal).build(); + } + @GET @Path("/datasources/{dataSourceName}/segments/{segmentId}") @Produces(MediaType.APPLICATION_JSON) diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 8e2e7eb747fb..9e977dec3e8c 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -1206,23 +1206,86 @@ public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndNoLimit() throws final List segments = createAndGetUsedYearSegments(1900, 2133); markAllSegmentsUnused(new HashSet<>(segments)); - final ImmutableList actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit( + final ImmutableList actualUnusedSegments = retrieveUnusedSegments( segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + null, + null, + null + ); + Assert.assertEquals(segments.size(), actualUnusedSegments.size()); + Assert.assertTrue(segments.containsAll(actualUnusedSegments)); + } + + @Test + public void testRetrieveUnusedSegmentsUsingNoIntervalsNoLimitAndNoLastSegmentId() throws IOException + { + final List segments = createAndGetUsedYearSegments(1900, 2133); + markAllSegmentsUnused(new HashSet<>(segments)); + + final ImmutableList actualUnusedSegments = retrieveUnusedSegments( + ImmutableList.of(), + null, + null, null ); Assert.assertEquals(segments.size(), actualUnusedSegments.size()); Assert.assertTrue(segments.containsAll(actualUnusedSegments)); } + @Test + public void testRetrieveUnusedSegmentsUsingNoIntervalsAndNoLimitAndNoLastSegmentId() throws IOException + { + final List segments = createAndGetUsedYearSegments(2033, 2133); + markAllSegmentsUnused(new HashSet<>(segments)); + + String lastSegmentId = segments.get(9).getId().toString(); + final List expectedSegmentsAscOrder = segments.stream() + .filter(s -> s.getId().toString().compareTo(lastSegmentId) > 0) + .collect(Collectors.toList()); + ImmutableList actualUnusedSegments = retrieveUnusedSegments( + ImmutableList.of(), + null, + lastSegmentId, + null + ); + Assert.assertEquals(expectedSegmentsAscOrder.size(), actualUnusedSegments.size()); + Assert.assertTrue(expectedSegmentsAscOrder.containsAll(actualUnusedSegments)); + + actualUnusedSegments = retrieveUnusedSegments( + ImmutableList.of(), + null, + lastSegmentId, + SortOrder.ASC + ); + Assert.assertEquals(expectedSegmentsAscOrder.size(), actualUnusedSegments.size()); + Assert.assertEquals(expectedSegmentsAscOrder, actualUnusedSegments); + + final List expectedSegmentsDescOrder = segments.stream() + .filter(s -> s.getId().toString().compareTo(lastSegmentId) < 0) + .collect(Collectors.toList()); + Collections.reverse(expectedSegmentsDescOrder); + + actualUnusedSegments = retrieveUnusedSegments( + ImmutableList.of(), + null, + lastSegmentId, + SortOrder.DESC + ); + Assert.assertEquals(expectedSegmentsDescOrder.size(), actualUnusedSegments.size()); + Assert.assertEquals(expectedSegmentsDescOrder, actualUnusedSegments); + } + @Test public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitAtRange() throws IOException { final List segments = createAndGetUsedYearSegments(1900, 2133); markAllSegmentsUnused(new HashSet<>(segments)); - final ImmutableList actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit( + final ImmutableList actualUnusedSegments = retrieveUnusedSegments( segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), - segments.size() + segments.size(), + null, + null ); Assert.assertEquals(segments.size(), actualUnusedSegments.size()); Assert.assertTrue(segments.containsAll(actualUnusedSegments)); @@ -1235,23 +1298,69 @@ public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitInRange() th markAllSegmentsUnused(new HashSet<>(segments)); final int requestedLimit = segments.size() - 1; - final ImmutableList actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit( - segments.stream().limit(requestedLimit).map(DataSegment::getInterval).collect(Collectors.toList()), - requestedLimit + final ImmutableList actualUnusedSegments = retrieveUnusedSegments( + segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + requestedLimit, + null, + null ); Assert.assertEquals(requestedLimit, actualUnusedSegments.size()); Assert.assertTrue(actualUnusedSegments.containsAll(segments.stream().limit(requestedLimit).collect(Collectors.toList()))); } + @Test + public void testRetrieveUnusedSegmentsUsingMultipleIntervalsInSingleBatchLimitAndOffsetInRange() throws IOException + { + final List segments = createAndGetUsedYearSegments(2034, 2133); + markAllSegmentsUnused(new HashSet<>(segments)); + + final int requestedLimit = segments.size(); + final String lastSegmentId = segments.get(4).getId().toString(); + final ImmutableList actualUnusedSegments = retrieveUnusedSegments( + segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + requestedLimit, + lastSegmentId, + null + ); + Assert.assertEquals(segments.size() - 5, actualUnusedSegments.size()); + Assert.assertEquals(actualUnusedSegments, segments.stream() + .filter(s -> s.getId().toString().compareTo(lastSegmentId) > 0) + .limit(requestedLimit) + .collect(Collectors.toList())); + } + + @Test + public void testRetrieveUnusedSegmentsUsingMultipleIntervalsLimitAndOffsetInRange() throws IOException + { + final List segments = createAndGetUsedYearSegments(1900, 2133); + markAllSegmentsUnused(new HashSet<>(segments)); + + final int requestedLimit = segments.size() - 1; + final String lastSegmentId = segments.get(4).getId().toString(); + final ImmutableList actualUnusedSegments = retrieveUnusedSegments( + segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + requestedLimit, + lastSegmentId, + null + ); + Assert.assertEquals(requestedLimit - 4, actualUnusedSegments.size()); + Assert.assertEquals(actualUnusedSegments, segments.stream() + .filter(s -> s.getId().toString().compareTo(lastSegmentId) > 0) + .limit(requestedLimit) + .collect(Collectors.toList())); + } + @Test public void testRetrieveUnusedSegmentsUsingMultipleIntervalsAndLimitOutOfRange() throws IOException { final List segments = createAndGetUsedYearSegments(1900, 2133); markAllSegmentsUnused(new HashSet<>(segments)); - final ImmutableList actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit( + final ImmutableList actualUnusedSegments = retrieveUnusedSegments( segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), - segments.size() + 1 + segments.size() + 1, + null, + null ); Assert.assertEquals(segments.size(), actualUnusedSegments.size()); Assert.assertTrue(actualUnusedSegments.containsAll(segments)); @@ -1267,8 +1376,10 @@ public void testRetrieveUnusedSegmentsUsingIntervalOutOfRange() throws IOExcepti Assert.assertTrue(segments.stream() .anyMatch(segment -> !segment.getInterval().overlaps(outOfRangeInterval))); - final ImmutableList actualUnusedSegments = retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit( + final ImmutableList actualUnusedSegments = retrieveUnusedSegments( ImmutableList.of(outOfRangeInterval), + null, + null, null ); Assert.assertEquals(0, actualUnusedSegments.size()); @@ -3048,21 +3159,23 @@ private List createAndGetUsedYearSegments(final int startYear, fina return segments; } - private ImmutableList retrieveUnusedSegmentsUsingMultipleIntervalsAndLimit( + private ImmutableList retrieveUnusedSegments( final List intervals, - final Integer limit + final Integer limit, + final String lastSegmentId, + final SortOrder sortOrder ) { return derbyConnector.inReadOnlyTransaction( (handle, status) -> { try (final CloseableIterator iterator = SqlSegmentsMetadataQuery.forHandle( - handle, - derbyConnector, - derbyConnectorRule.metadataTablesConfigSupplier().get(), - mapper - ) - .retrieveUnusedSegments(DS.WIKI, intervals, limit)) { + handle, + derbyConnector, + derbyConnectorRule.metadataTablesConfigSupplier().get(), + mapper + ) + .retrieveUnusedSegments(DS.WIKI, intervals, limit, lastSegmentId, sortOrder)) { return ImmutableList.copyOf(iterator); } } diff --git a/server/src/test/java/org/apache/druid/metadata/SortOrderTest.java b/server/src/test/java/org/apache/druid/metadata/SortOrderTest.java new file mode 100644 index 000000000000..a191763d444b --- /dev/null +++ b/server/src/test/java/org/apache/druid/metadata/SortOrderTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import org.apache.druid.error.DruidExceptionMatcher; +import org.junit.Assert; +import org.junit.Test; + +public class SortOrderTest +{ + + @Test + public void testAsc() + { + Assert.assertEquals(SortOrder.ASC, SortOrder.fromValue("asc")); + Assert.assertEquals("ASC", SortOrder.fromValue("asc").toString()); + Assert.assertEquals(SortOrder.ASC, SortOrder.fromValue("ASC")); + Assert.assertEquals("ASC", SortOrder.fromValue("ASC").toString()); + Assert.assertEquals(SortOrder.ASC, SortOrder.fromValue("AsC")); + Assert.assertEquals("ASC", SortOrder.fromValue("AsC").toString()); + } + + @Test + public void testDesc() + { + Assert.assertEquals(SortOrder.DESC, SortOrder.fromValue("desc")); + Assert.assertEquals("DESC", SortOrder.fromValue("desc").toString()); + Assert.assertEquals(SortOrder.DESC, SortOrder.fromValue("DESC")); + Assert.assertEquals("DESC", SortOrder.fromValue("DESC").toString()); + Assert.assertEquals(SortOrder.DESC, SortOrder.fromValue("DesC")); + Assert.assertEquals("DESC", SortOrder.fromValue("DesC").toString()); + } + + @Test + public void testInvalid() + { + DruidExceptionMatcher.invalidInput().expectMessageIs( + "Unexpected value[bad] for SortOrder. Possible values are: [ASC, DESC]" + ).assertThrowsAndMatches( + () -> SortOrder.fromValue("bad") + ); + } +} diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java index 7a23234761ee..c1a5dbbdac02 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java @@ -56,7 +56,6 @@ import java.util.Set; import java.util.stream.Collectors; - public class SqlSegmentsMetadataManagerTest { private static DataSegment createSegment( diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java index 76493dfdfce9..cfb8fec941e0 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java @@ -24,6 +24,7 @@ import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.metadata.SegmentsMetadataManager; +import org.apache.druid.metadata.SortOrder; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.Partitions; import org.apache.druid.timeline.SegmentId; @@ -192,6 +193,18 @@ public Optional> iterateAllUsedNonOvershadowedSegmentsForD )); } + @Override + public Iterable iterateAllUnusedSegmentsForDatasource( + String datasource, + @Nullable Interval interval, + @Nullable Integer limit, + @Nullable String lastSegmentId, + @Nullable SortOrder sortOrder + ) + { + return null; + } + @Override public Set retrieveAllDataSourceNames() { diff --git a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java index cb6e9d2a37ed..b1c8f0cb6de5 100644 --- a/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/MetadataResourceTest.java @@ -26,9 +26,13 @@ import com.google.common.collect.Lists; import org.apache.druid.client.DataSourcesSnapshot; import org.apache.druid.client.ImmutableDruidDataSource; +import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.metadata.SegmentsMetadataManager; +import org.apache.druid.metadata.SortOrder; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.metadata.AvailableSegmentMetadata; @@ -42,28 +46,36 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentStatusInCluster; +import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatchers; import org.mockito.Mockito; +import org.mockito.stubbing.Answer; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Response; + +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public class MetadataResourceTest { private static final String DATASOURCE1 = "datasource1"; - + private static final String SEGMENT_START_INTERVAL = "2012-10-24"; + private static final int NUM_PARTITIONS = 2; private final DataSegment[] segments = CreateDataSegments.ofDatasource(DATASOURCE1) - .forIntervals(3, Granularities.DAY) - .withNumPartitions(2) - .eachOfSizeInMb(500) - .toArray(new DataSegment[0]); + .startingAt(SEGMENT_START_INTERVAL) + .forIntervals(3, Granularities.DAY) + .withNumPartitions(NUM_PARTITIONS) + .eachOfSizeInMb(500) + .toArray(new DataSegment[0]); private HttpServletRequest request; private SegmentsMetadataManager segmentsMetadataManager; private IndexerMetadataStorageCoordinator storageCoordinator; @@ -236,6 +248,126 @@ public void testGetAllSegmentsIncludingRealtime() Assert.assertEquals(new SegmentStatusInCluster(realTimeSegments[1], false, null, 40L, true), resultList.get(5)); } + @Test + public void testGetUnusedSegmentsInDataSource() + { + Mockito.doAnswer(mockIterateAllUnusedSegmentsForDatasource()) + .when(segmentsMetadataManager) + .iterateAllUnusedSegmentsForDatasource( + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any()); + + // test with null datasource name - fails with expected bad datasource name error + DruidExceptionMatcher.invalidInput().expectMessageIs( + "dataSourceName must be non-empty" + ).assertThrowsAndMatches( + () -> metadataResource.getUnusedSegmentsInDataSource(request, null, null, null, null, null) + ); + + // test with empty datasource name - fails with expected bad datasource name error + DruidExceptionMatcher.invalidInput().expectMessageIs( + "dataSourceName must be non-empty" + ).assertThrowsAndMatches( + () -> metadataResource.getUnusedSegmentsInDataSource(request, "", null, null, null, null) + ); + + // test invalid datasource - returns empty segments + Response response = metadataResource.getUnusedSegmentsInDataSource( + request, + "invalid_datasource", + null, + null, + null, + null + ); + List resultList = extractResponseList(response); + Assert.assertTrue(resultList.isEmpty()); + + // test valid datasource with bad limit - fails with expected invalid limit message + DruidExceptionMatcher.invalidInput().expectMessageIs( + StringUtils.format("Invalid limit[%s] specified. Limit must be > 0", -1) + ).assertThrowsAndMatches( + () -> metadataResource.getUnusedSegmentsInDataSource(request, DATASOURCE1, null, -1, null, null) + ); + + // test valid datasource with invalid lastSegmentId - fails with expected invalid lastSegmentId message + DruidExceptionMatcher.invalidInput().expectMessageIs( + StringUtils.format("Invalid lastSegmentId[%s] specified.", "invalid") + ).assertThrowsAndMatches( + () -> metadataResource.getUnusedSegmentsInDataSource(request, DATASOURCE1, null, null, "invalid", null) + ); + + // test valid datasource - returns all unused segments for that datasource + response = metadataResource.getUnusedSegmentsInDataSource(request, DATASOURCE1, null, null, null, null); + + resultList = extractResponseList(response); + Assert.assertEquals(Arrays.asList(segments), resultList); + + // test valid datasource with interval filter - returns all unused segments for that datasource within interval + int numDays = 2; + String interval = SEGMENT_START_INTERVAL + "_P" + numDays + "D"; + response = metadataResource.getUnusedSegmentsInDataSource(request, DATASOURCE1, interval, null, null, null); + + resultList = extractResponseList(response); + Assert.assertEquals(NUM_PARTITIONS * numDays, resultList.size()); + Assert.assertEquals(Arrays.asList(segments[0], segments[1], segments[2], segments[3]), resultList); + + // test valid datasource with interval filter limit and last segment id - returns unused segments for that + // datasource within interval upto limit starting at last segment id + int limit = 3; + response = metadataResource.getUnusedSegmentsInDataSource(request, DATASOURCE1, interval, limit, null, null); + + resultList = extractResponseList(response); + Assert.assertEquals(limit, resultList.size()); + Assert.assertEquals(Arrays.asList(segments[0], segments[1], segments[2]), resultList); + + // test valid datasource with interval filter limit and offset - returns unused segments for that datasource within + // interval upto limit starting at offset + response = metadataResource.getUnusedSegmentsInDataSource( + request, + DATASOURCE1, + interval, + limit, + segments[2].getId().toString(), + null + ); + + resultList = extractResponseList(response); + Assert.assertEquals(Collections.singletonList(segments[3]), resultList); + } + + Answer> mockIterateAllUnusedSegmentsForDatasource() + { + return invocationOnMock -> { + String dataSourceName = invocationOnMock.getArgument(0); + Interval interval = invocationOnMock.getArgument(1); + Integer limit = invocationOnMock.getArgument(2); + String lastSegmentId = invocationOnMock.getArgument(3); + SortOrder sortOrder = invocationOnMock.getArgument(4); + if (!DATASOURCE1.equals(dataSourceName)) { + return ImmutableList.of(); + } + + return Arrays.stream(segments) + .filter(d -> d.getDataSource().equals(dataSourceName) + && (interval == null + || (d.getInterval().getStartMillis() >= interval.getStartMillis() + && d.getInterval().getEndMillis() <= interval.getEndMillis())) + && (lastSegmentId == null + || (sortOrder == null && d.getId().toString().compareTo(lastSegmentId) > 0) + || (sortOrder == SortOrder.ASC && d.getId().toString().compareTo(lastSegmentId) > 0) + || (sortOrder == SortOrder.DESC && d.getId().toString().compareTo(lastSegmentId) < 0))) + .sorted((o1, o2) -> Comparators.intervalsByStartThenEnd().compare(o1.getInterval(), o2.getInterval())) + .limit(limit != null + ? limit + : segments.length) + .collect(Collectors.toList()); + }; + } + @Test public void testGetDataSourceInformation() { diff --git a/website/.spelling b/website/.spelling index 5b21a4c5bd2c..cad92260fbdc 100644 --- a/website/.spelling +++ b/website/.spelling @@ -373,6 +373,7 @@ kubernetes kubexit k8s laning +lastSegmentId lifecycle localhost log4j @@ -501,6 +502,7 @@ Smoosh smoosh smooshed snapshotting +sortOrder splittable ssl sslmode