segments
@@ -96,7 +112,8 @@ public static boolean isLockCoversSegments(
final TimeChunkLock timeChunkLock = (TimeChunkLock) lock;
return timeChunkLock.getInterval().contains(segment.getInterval())
&& timeChunkLock.getDataSource().equals(segment.getDataSource())
- && timeChunkLock.getVersion().compareTo(segment.getVersion()) >= 0;
+ && (timeChunkLock.getVersion().compareTo(segment.getVersion()) >= 0
+ || TaskLockType.APPEND.equals(timeChunkLock.getType()));
} else {
final SegmentLock segmentLock = (SegmentLock) lock;
return segmentLock.getInterval().contains(segment.getInterval())
@@ -110,6 +127,63 @@ public static boolean isLockCoversSegments(
);
}
+ /**
+ * Determines the type of time chunk lock to use for appending segments.
+ *
+ * This method should be de-duplicated with {@link AbstractBatchIndexTask#determineLockType}
+ * by passing the ParallelIndexSupervisorTask instance into the
+ * SinglePhaseParallelIndexTaskRunner.
+ */
+ public static TaskLockType determineLockTypeForAppend(
+ Map taskContext
+ )
+ {
+ final Object lockType = taskContext.get(Tasks.TASK_LOCK_TYPE);
+ if (lockType == null) {
+ final boolean useSharedLock = (boolean) taskContext.getOrDefault(Tasks.USE_SHARED_LOCK, false);
+ return useSharedLock ? TaskLockType.SHARED : TaskLockType.EXCLUSIVE;
+ } else {
+ return TaskLockType.valueOf(lockType.toString());
+ }
+ }
+
+ /**
+ * Finds locks of type {@link TaskLockType#REPLACE} for each of the given segments
+ * that have an interval completely covering the interval of the respective segments.
+ *
+ * @return Map from segment to REPLACE lock that completely covers it. The map
+ * does not contain an entry for segments that have no covering REPLACE lock.
+ */
+ public static Map findReplaceLocksCoveringSegments(
+ final String datasource,
+ final TaskLockbox taskLockbox,
+ final Set segments
+ )
+ {
+ // Identify unique segment intervals
+ final Map> intervalToSegments = new HashMap<>();
+ segments.forEach(
+ segment -> intervalToSegments.computeIfAbsent(
+ segment.getInterval(), interval -> new ArrayList<>()
+ ).add(segment)
+ );
+
+ final Set replaceLocks = taskLockbox.getAllReplaceLocksForDatasource(datasource);
+ final Map segmentToReplaceLock = new HashMap<>();
+
+ intervalToSegments.forEach((interval, segmentsInInterval) -> {
+ // For each interval, find the lock that covers it, if any
+ for (ReplaceTaskLock lock : replaceLocks) {
+ if (lock.getInterval().contains(interval)) {
+ segmentsInInterval.forEach(s -> segmentToReplaceLock.put(s, lock));
+ return;
+ }
+ }
+ });
+
+ return segmentToReplaceLock;
+ }
+
public static List findLocksForSegments(
final Task task,
final TaskLockbox taskLockbox,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 30b44ff91590..ea61f37c7e90 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -36,14 +36,18 @@
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
+import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
+import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction;
+import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
-import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.MaxAllowedLocksExceededException;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.indexing.input.InputRowSchemas;
+import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
@@ -289,23 +293,21 @@ public TaskLockHelper getTaskLockHelper()
*
* @return whether the lock was acquired
*/
- public boolean determineLockGranularityAndTryLock(TaskActionClient client, List intervals, IndexIOConfig ioConfig)
+ public boolean determineLockGranularityAndTryLock(TaskActionClient client, List intervals)
throws IOException
{
final boolean forceTimeChunkLock = getContextValue(
Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK
);
- IngestionMode ingestionMode = getIngestionMode();
- final boolean useSharedLock = ingestionMode == IngestionMode.APPEND
- && getContextValue(Tasks.USE_SHARED_LOCK, false);
+ final IngestionMode ingestionMode = getIngestionMode();
// Respect task context value most.
if (forceTimeChunkLock || ingestionMode == IngestionMode.REPLACE) {
log.info(
- "forceTimeChunkLock[%s] is set to true or mode[%s] is replace. Use timeChunk lock",
+ "Using time chunk lock since forceTimeChunkLock is [%s] and mode is [%s].",
forceTimeChunkLock, ingestionMode
);
- taskLockHelper = new TaskLockHelper(false, useSharedLock);
+ taskLockHelper = createLockHelper(LockGranularity.TIME_CHUNK);
if (!intervals.isEmpty()) {
return tryTimeChunkLock(client, intervals);
} else {
@@ -314,7 +316,7 @@ public boolean determineLockGranularityAndTryLock(TaskActionClient client, List<
} else {
if (!intervals.isEmpty()) {
final LockGranularityDetermineResult result = determineSegmentGranularity(client, intervals);
- taskLockHelper = new TaskLockHelper(result.lockGranularity == LockGranularity.SEGMENT, useSharedLock);
+ taskLockHelper = createLockHelper(result.lockGranularity);
return tryLockWithDetermineResult(client, result);
} else {
// This branch is the only one that will not initialize taskLockHelper.
@@ -342,11 +344,10 @@ boolean determineLockGranularityAndTryLockWithSegments(
Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK
);
- final boolean useSharedLock = getContextValue(Tasks.USE_SHARED_LOCK, false);
if (forceTimeChunkLock) {
log.info("[%s] is set to true in task context. Use timeChunk lock", Tasks.FORCE_TIME_CHUNK_LOCK_KEY);
- taskLockHelper = new TaskLockHelper(false, useSharedLock);
+ taskLockHelper = createLockHelper(LockGranularity.TIME_CHUNK);
segmentCheckFunction.accept(LockGranularity.TIME_CHUNK, segments);
return tryTimeChunkLock(
client,
@@ -354,7 +355,7 @@ boolean determineLockGranularityAndTryLockWithSegments(
);
} else {
final LockGranularityDetermineResult result = determineSegmentGranularity(segments);
- taskLockHelper = new TaskLockHelper(result.lockGranularity == LockGranularity.SEGMENT, useSharedLock);
+ taskLockHelper = createLockHelper(result.lockGranularity);
segmentCheckFunction.accept(result.lockGranularity, segments);
return tryLockWithDetermineResult(client, result);
}
@@ -398,6 +399,27 @@ private boolean tryLockWithDetermineResult(TaskActionClient client, LockGranular
}
}
+ /**
+ * Builds a TaskAction to publish segments based on the type of locks that this
+ * task acquires (determined by context property {@link Tasks#TASK_LOCK_TYPE}).
+ */
+ protected TaskAction buildPublishAction(
+ Set segmentsToBeOverwritten,
+ Set segmentsToPublish
+ )
+ {
+ TaskLockType lockType = TaskLockType.valueOf(
+ getContextValue(Tasks.TASK_LOCK_TYPE, Tasks.DEFAULT_TASK_LOCK_TYPE.name())
+ );
+ switch (lockType) {
+ case REPLACE:
+ return SegmentTransactionalReplaceAction.create(segmentsToPublish);
+ case APPEND:
+ return SegmentTransactionalAppendAction.create(segmentsToPublish);
+ default:
+ return SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish);
+ }
+ }
protected boolean tryTimeChunkLock(TaskActionClient client, List intervals) throws IOException
{
@@ -430,7 +452,8 @@ protected boolean tryTimeChunkLock(TaskActionClient client, List inter
}
prev = cur;
- final TaskLock lock = client.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, cur));
+ final TaskLockType taskLockType = determineLockType(LockGranularity.TIME_CHUNK);
+ final TaskLock lock = client.submit(new TimeChunkLockTryAcquireAction(taskLockType, cur));
if (lock == null) {
return false;
}
@@ -443,6 +466,42 @@ protected boolean tryTimeChunkLock(TaskActionClient client, List inter
return true;
}
+ private TaskLockHelper createLockHelper(LockGranularity lockGranularity)
+ {
+ return new TaskLockHelper(
+ lockGranularity == LockGranularity.SEGMENT,
+ determineLockType(lockGranularity)
+ );
+ }
+
+ /**
+ * Determines the type of lock to use with the given lock granularity.
+ */
+ private TaskLockType determineLockType(LockGranularity lockGranularity)
+ {
+ if (lockGranularity == LockGranularity.SEGMENT) {
+ return TaskLockType.EXCLUSIVE;
+ }
+
+ final String contextLockType = getContextValue(Tasks.TASK_LOCK_TYPE);
+ final TaskLockType lockType;
+ if (contextLockType == null) {
+ lockType = getContextValue(Tasks.USE_SHARED_LOCK, false)
+ ? TaskLockType.SHARED : TaskLockType.EXCLUSIVE;
+ } else {
+ lockType = TaskLockType.valueOf(contextLockType);
+ }
+
+ final IngestionMode ingestionMode = getIngestionMode();
+ if ((lockType == TaskLockType.SHARED || lockType == TaskLockType.APPEND)
+ && ingestionMode != IngestionMode.APPEND) {
+ // Lock types SHARED and APPEND are allowed only in APPEND ingestion mode
+ return Tasks.DEFAULT_TASK_LOCK_TYPE;
+ } else {
+ return lockType;
+ }
+ }
+
private LockGranularityDetermineResult determineSegmentGranularity(List segments)
{
if (segments.isEmpty()) {
@@ -671,7 +730,8 @@ public static String findVersion(Map versions, Interval interv
public static NonnullPair findIntervalAndVersion(
TaskToolbox toolbox,
IngestionSpec, ?> ingestionSpec,
- DateTime timestamp
+ DateTime timestamp,
+ TaskLockType taskLockType
) throws IOException
{
// This method is called whenever subtasks need to allocate a new segment via the supervisor task.
@@ -727,7 +787,7 @@ public static NonnullPair findIntervalAndVersion(
// We don't have a lock for this interval, so we should lock it now.
final TaskLock lock = Preconditions.checkNotNull(
toolbox.getTaskActionClient().submit(
- new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)
+ new TimeChunkLockTryAcquireAction(taskLockType, interval)
),
"Cannot acquire a lock for interval[%s]",
interval
@@ -783,8 +843,7 @@ Pair lookupVersion(DateTime timestamp)
protected SegmentIdWithShardSpec allocateNewSegmentForTombstone(
IngestionSpec ingestionSchema,
- DateTime timestamp,
- TaskToolbox toolbox
+ DateTime timestamp
)
{
// Since tombstones are derived from inputIntervals, inputIntervals cannot be empty for replace, and locks are
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index cb2cdb7baf07..dfa1f85fde72 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -55,6 +55,7 @@
import org.apache.druid.indexing.common.actions.SegmentLockAcquireAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
+import org.apache.druid.indexing.common.actions.TaskLocks;
import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.index.RealtimeAppenderatorIngestionSpec;
@@ -293,7 +294,7 @@ public TaskStatus runTask(final TaskToolbox toolbox)
DiscoveryDruidNode discoveryDruidNode = createDiscoveryDruidNode(toolbox);
appenderator = newAppenderator(dataSchema, tuningConfig, metrics, toolbox);
- TaskLockType lockType = getContextValue(Tasks.USE_SHARED_LOCK, false) ? TaskLockType.SHARED : TaskLockType.EXCLUSIVE;
+ final TaskLockType lockType = TaskLocks.determineLockTypeForAppend(getContext());
StreamAppenderatorDriver driver = newDriver(dataSchema, appenderator, toolbox, metrics, lockType);
try {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index 4642b4391d96..a2ca4f869ea7 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -53,7 +53,6 @@
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.indexing.common.task.batch.parallel.PartialHashSegmentGenerateTask;
@@ -253,8 +252,7 @@ public boolean isReady(TaskActionClient taskActionClient) throws Exception
}
return determineLockGranularityAndTryLock(
taskActionClient,
- ingestionSchema.dataSchema.getGranularitySpec().inputIntervals(),
- ingestionSchema.getIOConfig()
+ ingestionSchema.dataSchema.getGranularitySpec().inputIntervals()
);
}
@@ -524,7 +522,8 @@ public TaskStatus runTask(final TaskToolbox toolbox)
final DataSchema dataSchema;
if (determineIntervals) {
final boolean gotLocks = determineLockGranularityAndTryLock(
- toolbox.getTaskActionClient(), allocateIntervals, ingestionSchema.getIOConfig()
+ toolbox.getTaskActionClient(),
+ allocateIntervals
);
if (!gotLocks) {
throw new ISE("Failed to get locks for intervals[%s]", allocateIntervals);
@@ -912,9 +911,9 @@ private TaskStatus generateAndPublishSegments(
}
- final TransactionalSegmentPublisher publisher = (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
- toolbox.getTaskActionClient()
- .submit(SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish));
+ final TransactionalSegmentPublisher publisher =
+ (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) ->
+ toolbox.getTaskActionClient().submit(buildPublishAction(segmentsToBeOverwritten, segmentsToPublish));
String effectiveId = getContextValue(CompactionTask.CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, null);
if (effectiveId == null) {
@@ -980,8 +979,7 @@ private TaskStatus generateAndPublishSegments(
for (Interval interval : tombstoneIntervals) {
SegmentIdWithShardSpec segmentIdWithShardSpec = allocateNewSegmentForTombstone(
ingestionSchema,
- interval.getStart(),
- toolbox
+ interval.getStart()
);
tombstonesAndVersions.put(interval, segmentIdWithShardSpec);
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
index cd7a52f77284..20f7584c8eb2 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTaskUtils.java
@@ -33,6 +33,7 @@
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
+import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CircularBuffer;
import org.joda.time.DateTime;
@@ -141,4 +142,14 @@ public static void setTaskStatusDimensions(
metricBuilder.setDimension(DruidMetrics.TASK_ID, taskStatus.getId());
metricBuilder.setDimension(DruidMetrics.TASK_STATUS, taskStatus.getStatusCode().toString());
}
+
+ public static void setSegmentDimensions(
+ ServiceMetricEvent.Builder metricBuilder,
+ DataSegment segment
+ )
+ {
+ final String partitionType = segment.getShardSpec() == null ? null : segment.getShardSpec().getType();
+ metricBuilder.setDimension(DruidMetrics.PARTITIONING_TYPE, partitionType);
+ metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString());
+ }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
index be1d02f429bc..cc760894603e 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTask.java
@@ -34,9 +34,11 @@
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
import org.apache.druid.indexing.common.actions.RetrieveUnusedSegmentsAction;
+import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentNukeAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskLocks;
+import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
@@ -198,6 +200,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
if (nextBatchSize <= 0) {
break;
}
+
unusedSegments = toolbox
.getTaskActionClient()
.submit(new RetrieveUnusedSegmentsAction(getDataSource(), getInterval(), nextBatchSize));
@@ -218,7 +221,22 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception
// abandoned.
toolbox.getTaskActionClient().submit(new SegmentNukeAction(new HashSet<>(unusedSegments)));
- toolbox.getDataSegmentKiller().kill(unusedSegments);
+
+ // Fetch the load specs of all segments overlapping with the given interval
+ final Set