Skip to content

Commit

Permalink
Kill tasks honor the buffer period of unused segments (apache#15710)
Browse files Browse the repository at this point in the history
* Kill tasks should honor the buffer period of unused segments.

- The coordinator duty KillUnusedSegments determines an umbrella interval
 for each datasource to determine the kill interval. There can be multiple unused
segments in an umbrella interval with different used_status_last_updated timestamps.
For example, consider an unused segment that is 30 days old and one that is 1 hour old. Currently
the kill task after the 30-day mark would kill both the unused segments and not retain the 1-hour
old one.

- However, when a kill task is instantiated with this umbrella interval, it’d kill
all the unused segments regardless of the last updated timestamp. We need kill
tasks and RetrieveUnusedSegmentsAction to honor the bufferPeriod to avoid killing
unused segments in the kill interval prematurely.

* Clarify default behavior in docs.

* test comments

* fix canDutyRun()

* small updates.

* checkstyle

* forbidden api fix

* doc fix, unused import, codeql scan error, and cleanup logs.

* Address review comments

* Rename maxUsedFlagLastUpdatedTime to maxUsedStatusLastUpdatedTime

This is consistent with the column name `used_status_last_updated`.

* Apply suggestions from code review

Co-authored-by: Kashif Faraz <[email protected]>

* Make period Duration type

* Remove older variants of runKilLTask() in OverlordClient interface

* Test can now run without waiting for canDutyRun().

* Remove previous variants of retrieveUnusedSegments from internal metadata storage coordinator interface.

Removes the following interface methods in favor of a new method added:
- retrieveUnusedSegmentsForInterval(String, Interval)
- retrieveUnusedSegmentsForInterval(String, Interval, Integer)

* Chain stream operations

* cleanup

* Pass in the lastUpdatedTime to markUnused test function and remove sleep.

---------

Co-authored-by: Kashif Faraz <[email protected]>
  • Loading branch information
abhishekrb19 and kfaraz authored Jan 19, 2024
1 parent 96b4abc commit 38c1def
Show file tree
Hide file tree
Showing 28 changed files with 980 additions and 292 deletions.
10 changes: 6 additions & 4 deletions docs/data-management/delete.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ The available grammar is:
"id": <task_id>,
"dataSource": <task_datasource>,
"interval" : <all_unused_segments_in_this_interval_will_die!>,
"context": <task context>,
"batchSize": <optional_batch size>,
"limit": <the maximum number of segments to delete>
"context": <task_context>,
"batchSize": <optional_batch_size>,
"limit": <optional_maximum_number_of_segments_to_delete>,
"maxUsedStatusLastUpdatedTime": <optional_maximum_timestamp_when_segments_were_marked_as_unused>
}
```

Expand All @@ -106,7 +107,8 @@ Some of the parameters used in the task payload are further explained below:
| Parameter | Default | Explanation |
|-------------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `batchSize` |100 | Maximum number of segments that are deleted in one kill batch. Some operations on the Overlord may get stuck while a `kill` task is in progress due to concurrency constraints (such as in `TaskLockbox`). Thus, a `kill` task splits the list of unused segments to be deleted into smaller batches to yield the Overlord resources intermittently to other task operations.|
| `limit` | null - no limit | Maximum number of segments for the kill task to delete.|
| `limit` | null (no limit) | Maximum number of segments for the kill task to delete.|
| `maxUsedStatusLastUpdatedTime` | null (no cutoff) | Maximum timestamp used as a cutoff to include unused segments. The kill task only considers segments which lie in the specified `interval` and were marked as unused no later than this time. The default behavior is to kill all unused segments in the `interval` regardless of when they where marked as unused.|


**WARNING:** The `kill` task permanently removes all information about the affected segments from the metadata store and
Expand Down
11 changes: 7 additions & 4 deletions docs/operations/clean-metadata-store.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ Only applies to the specified datasources in the dynamic configuration parameter
If `killDataSourceWhitelist` is not set or empty, then kill tasks can be submitted for all datasources.
- `druid.coordinator.kill.period`: Defines the frequency in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) for the cleanup job to check for and delete eligible segments. Defaults to `P1D`. Must be greater than `druid.coordinator.period.indexingPeriod`.
- `druid.coordinator.kill.durationToRetain`: Defines the retention period in [ISO 8601 format](https://en.wikipedia.org/wiki/ISO_8601#Durations) after creation that segments become eligible for deletion.
- `druid.coordinator.kill.ignoreDurationToRetain`: A way to override `druid.coordinator.kill.durationToRetain`. When enabled, the coordinator considers all unused segments as eligible to be killed.
- `druid.coordinator.kill.bufferPeriod`: Defines the amount of time that a segment must be unused before it can be permanently removed from metadata and deep storage. This serves as a buffer period to prevent data loss if data ends up being needed after being marked unused.
- `druid.coordinator.kill.maxSegments`: Defines the maximum number of segments to delete per kill task.

### Audit records
Expand Down Expand Up @@ -189,22 +191,23 @@ druid.coordinator.kill.datasource.on=false
<a name="example"></a>
## Example configuration for automated metadata cleanup

Consider a scenario where you have scripts to create and delete hundreds of datasources and related entities a day. You do not want to fill your metadata store with leftover records. The datasources and related entities tend to persist for only one or two days. Therefore, you want to run a cleanup job that identifies and removes leftover records that are at least four days old. The exception is for audit logs, which you need to retain for 30 days:
Consider a scenario where you have scripts to create and delete hundreds of datasources and related entities a day. You do not want to fill your metadata store with leftover records. The datasources and related entities tend to persist for only one or two days. Therefore, you want to run a cleanup job that identifies and removes leftover records that are at least four days old after a seven day buffer period in case you want to recover the data. The exception is for audit logs, which you need to retain for 30 days:

```properties
...
# Schedule the metadata management store task for every hour:
druid.coordinator.period.metadataStoreManagementPeriod=P1H
druid.coordinator.period.metadataStoreManagementPeriod=PT1H

# Set a kill task to poll every day to delete Segment records and segments
# in deep storage > 4 days old. When druid.coordinator.kill.on is set to true,
# Set a kill task to poll every day to delete segment records and segments
# in deep storage > 4 days old after a 7-day buffer period. When druid.coordinator.kill.on is set to true,
# you can set killDataSourceWhitelist in the dynamic configuration to limit
# the datasources that can be killed.
# Required also for automated cleanup of rules and compaction configuration.

druid.coordinator.kill.on=true
druid.coordinator.kill.period=P1D
druid.coordinator.kill.durationToRetain=P4D
druid.coordinator.kill.bufferPeriod=P7D
druid.coordinator.kill.maxSegments=1000

# Poll every day to delete audit records > 30 days old
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import javax.annotation.Nullable;
Expand All @@ -42,16 +43,21 @@ public class RetrieveUnusedSegmentsAction implements TaskAction<List<DataSegment
@JsonIgnore
private final Integer limit;

@JsonIgnore
private final DateTime maxUsedStatusLastUpdatedTime;

@JsonCreator
public RetrieveUnusedSegmentsAction(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("limit") @Nullable Integer limit
@JsonProperty("limit") @Nullable Integer limit,
@JsonProperty("maxUsedStatusLastUpdatedTime") @Nullable DateTime maxUsedStatusLastUpdatedTime
)
{
this.dataSource = dataSource;
this.interval = interval;
this.limit = limit;
this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime;
}

@JsonProperty
Expand All @@ -73,6 +79,13 @@ public Integer getLimit()
return limit;
}

@Nullable
@JsonProperty
public DateTime getMaxUsedStatusLastUpdatedTime()
{
return maxUsedStatusLastUpdatedTime;
}

@Override
public TypeReference<List<DataSegment>> getReturnTypeReference()
{
Expand All @@ -83,7 +96,7 @@ public TypeReference<List<DataSegment>> getReturnTypeReference()
public List<DataSegment> perform(Task task, TaskActionToolbox toolbox)
{
return toolbox.getIndexerMetadataStorageCoordinator()
.retrieveUnusedSegmentsForInterval(dataSource, interval, limit);
.retrieveUnusedSegmentsForInterval(dataSource, interval, limit, maxUsedStatusLastUpdatedTime);
}

@Override
Expand All @@ -99,6 +112,7 @@ public String toString()
"dataSource='" + dataSource + '\'' +
", interval=" + interval +
", limit=" + limit +
", maxUsedStatusLastUpdatedTime=" + maxUsedStatusLastUpdatedTime +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null));
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null, null));

// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@
/**
* The client representation of this task is {@link ClientKillUnusedSegmentsTaskQuery}.
* JSON serialization fields of this class must correspond to those of {@link
* ClientKillUnusedSegmentsTaskQuery}, except for "id" and "context" fields.
* ClientKillUnusedSegmentsTaskQuery}, except for {@link #id} and {@link #context} fields.
* <p>
* The field {@link #isMarkAsUnused()} is now deprecated.
* </p>
*/
public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
{
Expand Down Expand Up @@ -95,6 +96,12 @@ public class KillUnusedSegmentsTask extends AbstractFixedIntervalTask
*/
@Nullable private final Integer limit;

/**
* The maximum used status last updated time. Any segments with
* {@code used_status_last_updated} no later than this time will be included in the kill task.
*/
@Nullable private final DateTime maxUsedStatusLastUpdatedTime;

@JsonCreator
public KillUnusedSegmentsTask(
@JsonProperty("id") String id,
Expand All @@ -103,7 +110,8 @@ public KillUnusedSegmentsTask(
@JsonProperty("context") Map<String, Object> context,
@JsonProperty("markAsUnused") @Deprecated Boolean markAsUnused,
@JsonProperty("batchSize") Integer batchSize,
@JsonProperty("limit") @Nullable Integer limit
@JsonProperty("limit") @Nullable Integer limit,
@JsonProperty("maxUsedStatusLastUpdatedTime") @Nullable DateTime maxUsedStatusLastUpdatedTime
)
{
super(
Expand All @@ -115,15 +123,16 @@ public KillUnusedSegmentsTask(
this.markAsUnused = markAsUnused != null && markAsUnused;
this.batchSize = (batchSize != null) ? batchSize : DEFAULT_SEGMENT_NUKE_BATCH_SIZE;
if (this.batchSize <= 0) {
throw InvalidInput.exception("batchSize[%d] must be a positive integer.", limit);
throw InvalidInput.exception("batchSize[%d] must be a positive integer.", batchSize);
}
if (limit != null && limit <= 0) {
throw InvalidInput.exception("Limit[%d] must be a positive integer.", limit);
throw InvalidInput.exception("limit[%d] must be a positive integer.", limit);
}
if (limit != null && Boolean.TRUE.equals(markAsUnused)) {
throw InvalidInput.exception("Limit cannot be provided when markAsUnused is enabled.");
throw InvalidInput.exception("limit[%d] cannot be provided when markAsUnused is enabled.", limit);
}
this.limit = limit;
this.maxUsedStatusLastUpdatedTime = maxUsedStatusLastUpdatedTime;
}

/**
Expand Down Expand Up @@ -155,6 +164,13 @@ public Integer getLimit()
return limit;
}

@Nullable
@JsonProperty
public DateTime getMaxUsedStatusLastUpdatedTime()
{
return maxUsedStatusLastUpdatedTime;
}

@Override
public String getType()
{
Expand All @@ -180,7 +196,8 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
numSegmentsMarkedAsUnused = toolbox.getTaskActionClient().submit(
new MarkSegmentsAsUnusedAction(getDataSource(), getInterval())
);
LOG.info("Marked [%d] segments as unused.", numSegmentsMarkedAsUnused);
LOG.info("Marked [%d] segments of datasource[%s] in interval[%s] as unused.",
numSegmentsMarkedAsUnused, getDataSource(), getInterval());
} else {
numSegmentsMarkedAsUnused = 0;
}
Expand All @@ -190,9 +207,13 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
@Nullable Integer numTotalBatches = getNumTotalBatches();
List<DataSegment> unusedSegments;
LOG.info(
"Starting kill with batchSize[%d], up to limit[%d] segments will be deleted%s",
"Starting kill for datasource[%s] in interval[%s] with batchSize[%d], up to limit[%d] segments "
+ "before maxUsedStatusLastUpdatedTime[%s] will be deleted%s",
getDataSource(),
getInterval(),
batchSize,
limit,
maxUsedStatusLastUpdatedTime,
numTotalBatches != null ? StringUtils.format(" in [%d] batches.", numTotalBatches) : "."
);

Expand All @@ -217,7 +238,9 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception

unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize));
.submit(
new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize, maxUsedStatusLastUpdatedTime
));

// Fetch locks each time as a revokal could have occurred in between batches
final NavigableMap<DateTime, List<TaskLock>> taskLockMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null));
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null, null));

// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
// List unused segments
final List<DataSegment> unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null));
.submit(new RetrieveUnusedSegmentsAction(myLock.getDataSource(), myLock.getInterval(), null, null));

// Verify none of these segments have versions > lock version
for (final DataSegment unusedSegment : unusedSegments) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
package org.apache.druid.indexing.common.actions;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
Expand Down Expand Up @@ -104,7 +106,23 @@ public void testRetrieveUsedSegmentsAction()
@Test
public void testRetrieveUnusedSegmentsAction()
{
final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null);
final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, null);
final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(expectedUnusedSegments, resultSegments);
}

@Test
public void testRetrieveUnusedSegmentsActionWithMinUsedLastUpdatedTime()
{
final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, DateTimes.MIN);
final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(ImmutableSet.of(), resultSegments);
}

@Test
public void testRetrieveUnusedSegmentsActionWithNowUsedLastUpdatedTime()
{
final RetrieveUnusedSegmentsAction action = new RetrieveUnusedSegmentsAction(task.getDataSource(), INTERVAL, null, DateTimes.nowUtc());
final Set<DataSegment> resultSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox()));
Assert.assertEquals(expectedUnusedSegments, resultSegments);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.druid.client.indexing.ClientKillUnusedSegmentsTaskQuery;
import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -53,7 +54,8 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro
Intervals.of("2020-01-01/P1D"),
false,
99,
5
5,
DateTimes.nowUtc()
);
final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
final KillUnusedSegmentsTask fromJson = (KillUnusedSegmentsTask) objectMapper.readValue(json, Task.class);
Expand All @@ -63,7 +65,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTask() thro
Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
Assert.assertEquals(taskQuery.getBatchSize(), Integer.valueOf(fromJson.getBatchSize()));
Assert.assertEquals(taskQuery.getLimit(), fromJson.getLimit());

Assert.assertEquals(taskQuery.getMaxUsedStatusLastUpdatedTime(), fromJson.getMaxUsedStatusLastUpdatedTime());
}

@Test
Expand All @@ -75,6 +77,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefault
Intervals.of("2020-01-01/P1D"),
true,
null,
null,
null
);
final byte[] json = objectMapper.writeValueAsBytes(taskQuery);
Expand All @@ -85,6 +88,7 @@ public void testClientKillUnusedSegmentsTaskQueryToKillUnusedSegmentsTaskDefault
Assert.assertEquals(taskQuery.getMarkAsUnused(), fromJson.isMarkAsUnused());
Assert.assertEquals(100, fromJson.getBatchSize());
Assert.assertNull(taskQuery.getLimit());
Assert.assertNull(taskQuery.getMaxUsedStatusLastUpdatedTime());
}

@Test
Expand All @@ -97,6 +101,7 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro
null,
true,
99,
null,
null
);
final byte[] json = objectMapper.writeValueAsBytes(task);
Expand All @@ -110,5 +115,33 @@ public void testKillUnusedSegmentsTaskToClientKillUnusedSegmentsTaskQuery() thro
Assert.assertEquals(task.isMarkAsUnused(), taskQuery.getMarkAsUnused());
Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize());
Assert.assertNull(task.getLimit());
Assert.assertNull(task.getMaxUsedStatusLastUpdatedTime());
}

@Test
public void testKillUnusedSegmentsTaskWithNonNullValuesToClientKillUnusedSegmentsTaskQuery() throws IOException
{
final KillUnusedSegmentsTask task = new KillUnusedSegmentsTask(
null,
"datasource",
Intervals.of("2020-01-01/P1D"),
null,
null,
99,
100,
DateTimes.nowUtc()
);
final byte[] json = objectMapper.writeValueAsBytes(task);
final ClientKillUnusedSegmentsTaskQuery taskQuery = (ClientKillUnusedSegmentsTaskQuery) objectMapper.readValue(
json,
ClientTaskQuery.class
);
Assert.assertEquals(task.getId(), taskQuery.getId());
Assert.assertEquals(task.getDataSource(), taskQuery.getDataSource());
Assert.assertEquals(task.getInterval(), taskQuery.getInterval());
Assert.assertNull(taskQuery.getMarkAsUnused());
Assert.assertEquals(Integer.valueOf(task.getBatchSize()), taskQuery.getBatchSize());
Assert.assertEquals(task.getLimit(), taskQuery.getLimit());
Assert.assertEquals(task.getMaxUsedStatusLastUpdatedTime(), taskQuery.getMaxUsedStatusLastUpdatedTime());
}
}
Loading

0 comments on commit 38c1def

Please sign in to comment.