Skip to content

Commit

Permalink
Remove extra task action
Browse files Browse the repository at this point in the history
  • Loading branch information
kfaraz committed Oct 7, 2023
1 parent 1182c74 commit 9ddfd5e
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 208 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,20 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -41,18 +47,40 @@
public class SegmentTransactionalAppendAction implements TaskAction<SegmentPublishResult>
{
private final Set<DataSegment> segments;
@Nullable
private final DataSourceMetadata startMetadata;
@Nullable
private final DataSourceMetadata endMetadata;

public static SegmentTransactionalAppendAction create(Set<DataSegment> segments)
public static SegmentTransactionalAppendAction forSegments(Set<DataSegment> segments)
{
return new SegmentTransactionalAppendAction(segments);
return new SegmentTransactionalAppendAction(segments, null, null);
}

public static SegmentTransactionalAppendAction forSegmentsAndMetadata(
Set<DataSegment> segments,
DataSourceMetadata startMetadata,
DataSourceMetadata endMetadata
)
{
return new SegmentTransactionalAppendAction(segments, startMetadata, endMetadata);
}

@JsonCreator
private SegmentTransactionalAppendAction(
@JsonProperty("segments") Set<DataSegment> segments
@JsonProperty("segments") Set<DataSegment> segments,
@JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata,
@JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata
)
{
this.segments = segments;
this.startMetadata = startMetadata;
this.endMetadata = endMetadata;

if ((startMetadata == null && endMetadata != null)
|| (startMetadata != null && endMetadata == null)) {
throw InvalidInput.exception("startMetadata and endMetadata must either be both null or both non-null.");
}
}

@JsonProperty
Expand All @@ -61,6 +89,20 @@ public Set<DataSegment> getSegments()
return segments;
}

@JsonProperty
@Nullable
public DataSourceMetadata getStartMetadata()
{
return startMetadata;
}

@JsonProperty
@Nullable
public DataSourceMetadata getEndMetadata()
{
return endMetadata;
}

@Override
public TypeReference<SegmentPublishResult> getReturnTypeReference()
{
Expand All @@ -72,24 +114,45 @@ public TypeReference<SegmentPublishResult> getReturnTypeReference()
@Override
public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
{
// Verify that all the locks are of expected type
final List<TaskLock> locks = toolbox.getTaskLockbox().findLocksForTask(task);
for (TaskLock lock : locks) {
if (lock.getType() != TaskLockType.APPEND) {
throw InvalidInput.exception(
"Cannot use action[%s] for task[%s] as it is holding a lock of type[%s] instead of [APPEND].",
"CommitRealtimeSegmentsAndMetadata", task.getId(), lock.getType()
);
}
}

TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments);

final String datasource = task.getDataSource();
final Map<DataSegment, ReplaceTaskLock> segmentToReplaceLock
= TaskLocks.findReplaceLocksCoveringSegments(datasource, toolbox.getTaskLockbox(), segments);

final CriticalAction.Action<SegmentPublishResult> publishAction;
if (startMetadata == null) {
publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments(
segments,
segmentToReplaceLock
);
} else {
publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegmentsAndMetadata(
segments,
segmentToReplaceLock,
startMetadata,
endMetadata
);
}

final SegmentPublishResult retVal;
try {
retVal = toolbox.getTaskLockbox().doInCriticalSection(
task,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
CriticalAction.<SegmentPublishResult>builder()
.onValidLocks(
() -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments(
segments,
segmentToReplaceLock
)
)
.onValidLocks(publishAction)
.onInvalidLocks(
() -> SegmentPublishResult.fail(
"Invalid task locks. Maybe they are revoked by a higher priority task."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
@JsonSubTypes.Type(name = "segmentTransactionalInsert", value = SegmentTransactionalInsertAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalAppend", value = SegmentTransactionalAppendAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class),
@JsonSubTypes.Type(name = "commitRealtimeSegments", value = CommitRealtimeSegmentsAndMetadataAction.class),
// Type name doesn't correspond to the name of the class for backward compatibility.
@JsonSubTypes.Type(name = "segmentListUsed", value = RetrieveUsedSegmentsAction.class),
// Type name doesn't correspond to the name of the class for backward compatibility.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ protected TaskAction<SegmentPublishResult> buildPublishAction(
case REPLACE:
return SegmentTransactionalReplaceAction.create(segmentsToPublish);
case APPEND:
return SegmentTransactionalAppendAction.create(segmentsToPublish);
return SegmentTransactionalAppendAction.forSegments(segmentsToPublish);
default:
return SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.druid.data.input.Committer;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.CommitRealtimeSegmentsAndMetadataAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskAction;
Expand Down Expand Up @@ -418,11 +417,11 @@ public SegmentPublishResult publishAnnotatedSegments(
);
final DataSourceMetadata endMetadata = runner.createDataSourceMetadata(finalPartitions);
action = taskLockType == TaskLockType.APPEND
? CommitRealtimeSegmentsAndMetadataAction.create(segmentsToPush, startMetadata, endMetadata)
? SegmentTransactionalAppendAction.forSegmentsAndMetadata(segmentsToPush, startMetadata, endMetadata)
: SegmentTransactionalInsertAction.appendAction(segmentsToPush, startMetadata, endMetadata);
} else {
action = taskLockType == TaskLockType.APPEND
? SegmentTransactionalAppendAction.create(segmentsToPush)
? SegmentTransactionalAppendAction.forSegments(segmentsToPush)
: SegmentTransactionalInsertAction.appendAction(segmentsToPush, null, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public SegmentPublishResult commitReplaceSegments(DataSegment... segments)
public SegmentPublishResult commitAppendSegments(DataSegment... segments)
{
return runAction(
SegmentTransactionalAppendAction.create(Sets.newHashSet(segments))
SegmentTransactionalAppendAction.forSegments(Sets.newHashSet(segments))
);
}

Expand Down
Loading

0 comments on commit 9ddfd5e

Please sign in to comment.