Skip to content

Commit

Permalink
Add api for Retrieving unused segments (apache#15415)
Browse files Browse the repository at this point in the history
### Description

This pr adds an api for retrieving unused segments for a particular datasource. The api supports pagination by the addition of `limit` and `lastSegmentId` parameters. The resulting unused segments are returned with optional `sortOrder`, `ASC` or `DESC` with respect to the matching segments `id`, `start time`, and `end time`, or not returned in any guarenteed order if `sortOrder` is not specified

`GET /druid/coordinator/v1/datasources/{dataSourceName}/unusedSegments?interval={interval}&limit={limit}&lastSegmentId={lastSegmentId}&sortOrder={sortOrder}`

Returns a list of unused segments for a datasource in the cluster contained within an optionally specified interval.
Optional parameters for limit and lastSegmentId can be given as well, to limit results and enable paginated results.
The results may be sorted in either ASC, or DESC order depending on specifying the sortOrder parameter.

`dataSourceName`: The name of the datasource
`interval`:                 the specific interval to search for unused segments for.
`limit`:                      the maximum number of unused segments to return information about. This property helps to
                                 support pagination
`lastSegmentId`:     the last segment id from which to search for results. All segments returned are > this segment 
                                 lexigraphically if sortOrder is null or ASC, or < this segment lexigraphically if sortOrder is DESC.
`sortOrder`:            Specifies the order with which to return the matching segments by start time, end time. A null
                                value indicates that order does not matter.

This PR has:

- [x] been self-reviewed.
   - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
- [x] added documentation for new or modified features or behaviors.
- [ ] a release note entry in the PR description.
- [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
- [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
- [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
- [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
- [ ] added integration tests.
- [x] been tested in a test Druid cluster.
  • Loading branch information
zachjsh authored Dec 11, 2023
1 parent 4152f1d commit ab7d9bc
Show file tree
Hide file tree
Showing 13 changed files with 601 additions and 35 deletions.
10 changes: 10 additions & 0 deletions docs/api-reference/legacy-metadata-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,16 @@ Returns full segment metadata for a specific segment in the cluster.

Return the tiers that a datasource exists in.

`GET /druid/coordinator/v1/datasources/{dataSourceName}/unusedSegments?interval={interval}&limit={limit}&lastSegmentId={lastSegmentId}&sortOrder={sortOrder}`

Returns a list of unused segments for a datasource in the cluster contained within an optionally specified interval.
Optional parameters for limit and lastSegmentId can be given as well, to limit results and enable paginated results.
The results may be sorted in either ASC, or DESC order concerning their id, start, and end time, depending on
specifying the sortOrder parameter. The default behavior in the absence of all optional parameters is to return all
unused segments for the given datasource in no guaranteed order.

Example usage: `GET /druid/coordinator/v1/datasources/inline_data/unusedSegments?interval=2023-12-01_2023-12-10&limit=10&lastSegmentId=inline_data_2023-12-03T00%3A00%3A00.000Z_2023-12-04T00%3A00%3A00.000Z_2023-12-09T14%3A16%3A53.738Z&sortOrder=ASC}`

## Intervals

Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/` as in `2016-06-27_2016-06-28`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public List<DataSegment> retrieveUnusedSegmentsForInterval(
(handle, status) -> {
try (final CloseableIterator<DataSegment> iterator =
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper)
.retrieveUnusedSegments(dataSource, Collections.singletonList(interval), limit)) {
.retrieveUnusedSegments(dataSource, Collections.singletonList(interval), limit, null, null)) {
return ImmutableList.copyOf(iterator);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,30 @@ Optional<Iterable<DataSegment>> iterateAllUsedNonOvershadowedSegmentsForDatasour
boolean requiresLatest
);

/**
* Returns an iterable to go over un-used segments for a given datasource over an optional interval.
* The order in which segments are iterated is from earliest start-time, with ties being broken with earliest end-time
* first. Note: the iteration may not be as trivially cheap as,
* for example, iteration over an ArrayList. Try (to some reasonable extent) to organize the code so that it
* iterates the returned iterable only once rather than several times.
*
* @param datasource the name of the datasource.
* @param interval an optional interval to search over. If none is specified, {@link org.apache.druid.java.util.common.Intervals#ETERNITY}
* @param limit an optional maximum number of results to return. If none is specified, the results are not limited.
* @param lastSegmentId an optional last segment id from which to search for results. All segments returned are >
* this segment lexigraphically if sortOrder is null or {@link SortOrder#ASC}, or < this segment
* lexigraphically if sortOrder is {@link SortOrder#DESC}. If none is specified, no such filter is used.
* @param sortOrder an optional order with which to return the matching segments by id, start time, end time.
* If none is specified, the order of the results is not guarenteed.
*/
Iterable<DataSegment> iterateAllUnusedSegmentsForDatasource(
String datasource,
@Nullable Interval interval,
@Nullable Integer limit,
@Nullable String lastSegmentId,
@Nullable SortOrder sortOrder
);

/**
* Retrieves all data source names for which there are segment in the database, regardless of whether those segments
* are used or not. If there are no segments in the database, returns an empty set.
Expand Down
66 changes: 66 additions & 0 deletions server/src/main/java/org/apache/druid/metadata/SortOrder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.metadata;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.StringUtils;

import java.util.Arrays;
import java.util.stream.Collectors;

/**
* Specifies the sort order when doing metadata store queries.
*/
public enum SortOrder
{
ASC("ASC"),

DESC("DESC");

private String value;

SortOrder(String value)
{
this.value = value;
}

@Override
@JsonValue
public String toString()
{
return String.valueOf(value);
}

@JsonCreator
public static SortOrder fromValue(String value)
{
for (SortOrder b : SortOrder.values()) {
if (String.valueOf(b.value).equalsIgnoreCase(String.valueOf(value))) {
return b;
}
}
throw InvalidInput.exception(StringUtils.format(
"Unexpected value[%s] for SortOrder. Possible values are: %s",
value, Arrays.stream(SortOrder.values()).map(SortOrder::toString).collect(Collectors.toList())
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
Expand Down Expand Up @@ -686,7 +687,7 @@ private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, @Nullable
}

try (final CloseableIterator<DataSegment> iterator =
queryTool.retrieveUnusedSegments(dataSourceName, intervals, null)) {
queryTool.retrieveUnusedSegments(dataSourceName, intervals, null, null, null)) {
while (iterator.hasNext()) {
final DataSegment dataSegment = iterator.next();
timeline.addSegments(Iterators.singletonIterator(dataSegment));
Expand Down Expand Up @@ -955,6 +956,51 @@ public Optional<Iterable<DataSegment>> iterateAllUsedNonOvershadowedSegmentsForD
.transform(timeline -> timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE));
}

/**
* Retrieves segments for a given datasource that are marked unused and that are *fully contained by* an optionally
* specified interval. If the interval specified is null, this method will retrieve all unused segments.
*
* This call does not return any information about realtime segments.
*
* @param datasource The name of the datasource
* @param interval an optional interval to search over.
* @param limit an optional maximum number of results to return. If none is specified, the results are
* not limited.
* @param lastSegmentId an optional last segment id from which to search for results. All segments returned are >
* this segment lexigraphically if sortOrder is null or {@link SortOrder#ASC}, or < this
* segment lexigraphically if sortOrder is {@link SortOrder#DESC}. If none is specified, no
* such filter is used.
* @param sortOrder an optional order with which to return the matching segments by id, start time, end time. If
* none is specified, the order of the results is not guarenteed.
* Returns an iterable.
*/
@Override
public Iterable<DataSegment> iterateAllUnusedSegmentsForDatasource(
final String datasource,
@Nullable final Interval interval,
@Nullable final Integer limit,
@Nullable final String lastSegmentId,
@Nullable final SortOrder sortOrder
)
{
return connector.inReadOnlyTransaction(
(handle, status) -> {
final SqlSegmentsMetadataQuery queryTool =
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables.get(), jsonMapper);

final List<Interval> intervals =
interval == null
? Intervals.ONLY_ETERNITY
: Collections.singletonList(interval);
try (final CloseableIterator<DataSegment> iterator =
queryTool.retrieveUnusedSegments(datasource, intervals, limit, lastSegmentId, sortOrder)) {
return ImmutableList.copyOf(iterator);
}
}
);
}

@Override
public Set<String> retrieveAllDataSourceNames()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public CloseableIterator<DataSegment> retrieveUsedSegments(
final Collection<Interval> intervals
)
{
return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS, true, null);
return retrieveSegments(dataSource, intervals, IntervalMode.OVERLAPS, true, null, null, null);
}

/**
Expand All @@ -127,15 +127,26 @@ public CloseableIterator<DataSegment> retrieveUsedSegments(
*
* This call does not return any information about realtime segments.
*
* @param dataSource The name of the datasource
* @param intervals The intervals to search over
* @param limit The limit of segments to return
* @param lastSegmentId the last segment id from which to search for results. All segments returned are >
* this segment lexigraphically if sortOrder is null or ASC, or < this segment
* lexigraphically if sortOrder is DESC.
* @param sortOrder Specifies the order with which to return the matching segments by start time, end time.
* A null value indicates that order does not matter.
* Returns a closeable iterator. You should close it when you are done.
*/
public CloseableIterator<DataSegment> retrieveUnusedSegments(
final String dataSource,
final Collection<Interval> intervals,
@Nullable final Integer limit
@Nullable final Integer limit,
@Nullable final String lastSegmentId,
@Nullable final SortOrder sortOrder
)
{
return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS, false, limit);
return retrieveSegments(dataSource, intervals, IntervalMode.CONTAINS, false, limit, lastSegmentId, sortOrder);
}

/**
Expand Down Expand Up @@ -223,7 +234,15 @@ public int markSegmentsUnused(final String dataSource, final Interval interval)
// Retrieve, then drop, since we can't write a WHERE clause directly.
final List<SegmentId> segments = ImmutableList.copyOf(
Iterators.transform(
retrieveSegments(dataSource, Collections.singletonList(interval), IntervalMode.CONTAINS, true, null),
retrieveSegments(
dataSource,
Collections.singletonList(interval),
IntervalMode.CONTAINS,
true,
null,
null,
null
),
DataSegment::getId
)
);
Expand Down Expand Up @@ -358,20 +377,30 @@ private CloseableIterator<DataSegment> retrieveSegments(
final Collection<Interval> intervals,
final IntervalMode matchMode,
final boolean used,
@Nullable final Integer limit
@Nullable final Integer limit,
@Nullable final String lastSegmentId,
@Nullable final SortOrder sortOrder
)
{
if (intervals.isEmpty()) {
if (intervals.isEmpty() || intervals.size() <= MAX_INTERVALS_PER_BATCH) {
return CloseableIterators.withEmptyBaggage(
retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode, used, limit)
retrieveSegmentsInIntervalsBatch(dataSource, intervals, matchMode, used, limit, lastSegmentId, sortOrder)
);
} else {
final List<List<Interval>> intervalsLists = Lists.partition(new ArrayList<>(intervals), MAX_INTERVALS_PER_BATCH);
final List<Iterator<DataSegment>> resultingIterators = new ArrayList<>();
Integer limitPerBatch = limit;

for (final List<Interval> intervalList : intervalsLists) {
final UnmodifiableIterator<DataSegment> iterator = retrieveSegmentsInIntervalsBatch(dataSource, intervalList, matchMode, used, limitPerBatch);
final UnmodifiableIterator<DataSegment> iterator = retrieveSegmentsInIntervalsBatch(
dataSource,
intervalList,
matchMode,
used,
limitPerBatch,
lastSegmentId,
sortOrder
);
if (limitPerBatch != null) {
// If limit is provided, we need to shrink the limit for subsequent batches or circuit break if
// we have reached what was requested for.
Expand All @@ -394,7 +423,9 @@ private UnmodifiableIterator<DataSegment> retrieveSegmentsInIntervalsBatch(
final Collection<Interval> intervals,
final IntervalMode matchMode,
final boolean used,
@Nullable final Integer limit
@Nullable final Integer limit,
@Nullable final String lastSegmentId,
@Nullable final SortOrder sortOrder
)
{
// Check if the intervals all support comparing as strings. If so, bake them into the SQL.
Expand All @@ -407,11 +438,33 @@ private UnmodifiableIterator<DataSegment> retrieveSegmentsInIntervalsBatch(
appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode, connector);
}

if (lastSegmentId != null) {
sb.append(
StringUtils.format(
" AND id %s :id",
(sortOrder == null || sortOrder == SortOrder.ASC)
? ">"
: "<"
)
);
}

if (sortOrder != null) {
sb.append(StringUtils.format(" ORDER BY id %2$s, start %2$s, %1$send%1$s %2$s",
connector.getQuoteString(),
sortOrder.toString()));
}
final Query<Map<String, Object>> sql = handle
.createQuery(StringUtils.format(sb.toString(), dbTables.getSegmentsTable()))
.createQuery(StringUtils.format(
sb.toString(),
dbTables.getSegmentsTable()
))
.setFetchSize(connector.getStreamingFetchSize())
.bind("used", used)
.bind("dataSource", dataSource);
if (lastSegmentId != null) {
sql.bind("id", lastSegmentId);
}
if (null != limit) {
sql.setMaxRows(limit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.metadata.SegmentsMetadataManager;
import org.apache.druid.metadata.SortOrder;
import org.apache.druid.segment.metadata.AvailableSegmentMetadata;
import org.apache.druid.segment.metadata.CoordinatorSegmentMetadataCache;
import org.apache.druid.segment.metadata.DataSourceInformation;
Expand Down Expand Up @@ -334,6 +337,51 @@ public Response getUsedSegmentsInDataSourceForIntervals(
return builder.entity(Collections2.transform(segments, DataSegment::getId)).build();
}

@GET
@Path("/datasources/{dataSourceName}/unusedSegments")
@Produces(MediaType.APPLICATION_JSON)
public Response getUnusedSegmentsInDataSource(
@Context final HttpServletRequest req,
@PathParam("dataSourceName") final String dataSource,
@QueryParam("interval") @Nullable String interval,
@QueryParam("limit") @Nullable Integer limit,
@QueryParam("lastSegmentId") @Nullable final String lastSegmentId,
@QueryParam("sortOrder") @Nullable final String sortOrder
)
{
if (dataSource == null || dataSource.isEmpty()) {
throw InvalidInput.exception("dataSourceName must be non-empty");
}
if (limit != null && limit < 0) {
throw InvalidInput.exception("Invalid limit[%s] specified. Limit must be > 0", limit);
}

if (lastSegmentId != null && SegmentId.tryParse(dataSource, lastSegmentId) == null) {
throw InvalidInput.exception("Invalid lastSegmentId[%s] specified.", lastSegmentId);
}

SortOrder theSortOrder = sortOrder == null ? null : SortOrder.fromValue(sortOrder);

final Interval theInterval = interval != null ? Intervals.of(interval.replace('_', '/')) : null;
Iterable<DataSegment> unusedSegments = segmentsMetadataManager.iterateAllUnusedSegmentsForDatasource(
dataSource,
theInterval,
limit,
lastSegmentId,
theSortOrder
);

final Function<DataSegment, Iterable<ResourceAction>> raGenerator = segment -> Collections.singletonList(
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource()));

final Iterable<DataSegment> authorizedSegments =
AuthorizationUtils.filterAuthorizedResources(req, unusedSegments, raGenerator, authorizerMapper);

final List<DataSegment> retVal = new ArrayList<>();
authorizedSegments.iterator().forEachRemaining(retVal::add);
return Response.status(Response.Status.OK).entity(retVal).build();
}

@GET
@Path("/datasources/{dataSourceName}/segments/{segmentId}")
@Produces(MediaType.APPLICATION_JSON)
Expand Down
Loading

0 comments on commit ab7d9bc

Please sign in to comment.