Skip to content

Commit

Permalink
Add support for streaming ingestion with concurrent replace (apache#1…
Browse files Browse the repository at this point in the history
…5039)

Add support for streaming ingestion with concurrent replace

---------

Co-authored-by: Kashif Faraz <[email protected]>
  • Loading branch information
AmatyaAvadhanula and kfaraz authored Oct 13, 2023
1 parent 0a6f78c commit d25caae
Show file tree
Hide file tree
Showing 39 changed files with 1,314 additions and 301 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,20 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -42,18 +47,40 @@
public class SegmentTransactionalAppendAction implements TaskAction<SegmentPublishResult>
{
private final Set<DataSegment> segments;
@Nullable
private final DataSourceMetadata startMetadata;
@Nullable
private final DataSourceMetadata endMetadata;

public static SegmentTransactionalAppendAction create(Set<DataSegment> segments)
public static SegmentTransactionalAppendAction forSegments(Set<DataSegment> segments)
{
return new SegmentTransactionalAppendAction(segments);
return new SegmentTransactionalAppendAction(segments, null, null);
}

public static SegmentTransactionalAppendAction forSegmentsAndMetadata(
Set<DataSegment> segments,
DataSourceMetadata startMetadata,
DataSourceMetadata endMetadata
)
{
return new SegmentTransactionalAppendAction(segments, startMetadata, endMetadata);
}

@JsonCreator
private SegmentTransactionalAppendAction(
@JsonProperty("segments") Set<DataSegment> segments
@JsonProperty("segments") Set<DataSegment> segments,
@JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata,
@JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata
)
{
this.segments = segments;
this.startMetadata = startMetadata;
this.endMetadata = endMetadata;

if ((startMetadata == null && endMetadata != null)
|| (startMetadata != null && endMetadata == null)) {
throw InvalidInput.exception("startMetadata and endMetadata must either be both null or both non-null.");
}
}

@JsonProperty
Expand All @@ -62,6 +89,20 @@ public Set<DataSegment> getSegments()
return segments;
}

@JsonProperty
@Nullable
public DataSourceMetadata getStartMetadata()
{
return startMetadata;
}

@JsonProperty
@Nullable
public DataSourceMetadata getEndMetadata()
{
return endMetadata;
}

@Override
public TypeReference<SegmentPublishResult> getReturnTypeReference()
{
Expand All @@ -70,30 +111,48 @@ public TypeReference<SegmentPublishResult> getReturnTypeReference()
};
}

/**
* Performs some sanity checks and publishes the given segments.
*/
@Override
public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
{
// Verify that all the locks are of expected type
final List<TaskLock> locks = toolbox.getTaskLockbox().findLocksForTask(task);
for (TaskLock lock : locks) {
if (lock.getType() != TaskLockType.APPEND) {
throw InvalidInput.exception(
"Cannot use action[%s] for task[%s] as it is holding a lock of type[%s] instead of [APPEND].",
"SegmentTransactionalAppendAction", task.getId(), lock.getType()
);
}
}

TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments);

final String datasource = task.getDataSource();
final Map<DataSegment, ReplaceTaskLock> segmentToReplaceLock
= TaskLocks.findReplaceLocksCoveringSegments(datasource, toolbox.getTaskLockbox(), segments);

final CriticalAction.Action<SegmentPublishResult> publishAction;
if (startMetadata == null) {
publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments(
segments,
segmentToReplaceLock
);
} else {
publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegmentsAndMetadata(
segments,
segmentToReplaceLock,
startMetadata,
endMetadata
);
}

final SegmentPublishResult retVal;
try {
retVal = toolbox.getTaskLockbox().doInCriticalSection(
task,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
CriticalAction.<SegmentPublishResult>builder()
.onValidLocks(
() -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments(
segments,
segmentToReplaceLock
)
)
.onValidLocks(publishAction)
.onInvalidLocks(
() -> SegmentPublishResult.fail(
"Invalid task locks. Maybe they are revoked by a higher priority task."
Expand All @@ -107,20 +166,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
throw new RuntimeException(e);
}

// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);

if (retVal.isSuccess()) {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 1));
for (DataSegment segment : retVal.getSegments()) {
IndexTaskUtils.setSegmentDimensions(metricBuilder, segment);
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", segment.getSize()));
}
} else {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 1));
}

IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox);
return retVal;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,8 @@
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -222,47 +217,10 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
throw new RuntimeException(e);
}

// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);

if (retVal.isSuccess()) {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 1));
} else {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 1));
}

// getSegments() should return an empty set if announceHistoricalSegments() failed
for (DataSegment segment : retVal.getSegments()) {
metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString());
metricBuilder.setDimension(
DruidMetrics.PARTITIONING_TYPE,
segment.getShardSpec() == null ? null : segment.getShardSpec().getType()
);
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", segment.getSize()));
// Emit the segment related metadata using the configured emitters.
// There is a possibility that some segments' metadata event might get missed if the
// server crashes after commiting segment but before emitting the event.
this.emitSegmentMetadata(segment, toolbox);
}

IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox);
return retVal;
}

private void emitSegmentMetadata(DataSegment segment, TaskActionToolbox toolbox)
{
SegmentMetadataEvent event = new SegmentMetadataEvent(
segment.getDataSource(),
DateTime.now(DateTimeZone.UTC),
segment.getInterval().getStart(),
segment.getInterval().getEnd(),
segment.getVersion(),
segment.getLastCompactionState() != null
);

toolbox.getEmitter().emit(event);
}

private void checkWithSegmentLock()
{
final Map<Interval, List<DataSegment>> oldSegmentsMap = groupSegmentsByIntervalAndSort(segmentsToBeOverwritten);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,20 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;

import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

Expand All @@ -42,6 +45,8 @@
*/
public class SegmentTransactionalReplaceAction implements TaskAction<SegmentPublishResult>
{
private static final Logger log = new Logger(SegmentTransactionalReplaceAction.class);

/**
* Set of segments to be inserted into metadata storage
*/
Expand Down Expand Up @@ -88,9 +93,9 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
final Set<ReplaceTaskLock> replaceLocksForTask
= toolbox.getTaskLockbox().findReplaceLocksForTask(task);

final SegmentPublishResult retVal;
final SegmentPublishResult publishResult;
try {
retVal = toolbox.getTaskLockbox().doInCriticalSection(
publishResult = toolbox.getTaskLockbox().doInCriticalSection(
task,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
CriticalAction.<SegmentPublishResult>builder()
Expand All @@ -111,24 +116,45 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
throw new RuntimeException(e);
}

// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);

if (retVal.isSuccess()) {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 1));
IndexTaskUtils.emitSegmentPublishMetrics(publishResult, task, toolbox);

for (DataSegment segment : retVal.getSegments()) {
final String partitionType = segment.getShardSpec() == null ? null : segment.getShardSpec().getType();
metricBuilder.setDimension(DruidMetrics.PARTITIONING_TYPE, partitionType);
metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString());
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", segment.getSize()));
// Upgrade any overlapping pending segments
// Do not perform upgrade in the same transaction as replace commit so that
// failure to upgrade pending segments does not affect success of the commit
if (publishResult.isSuccess() && toolbox.getSupervisorManager() != null) {
try {
tryUpgradeOverlappingPendingSegments(task, toolbox);
}
catch (Exception e) {
log.error(e, "Error while upgrading pending segments for task[%s]", task.getId());
}
} else {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 1));
}

return retVal;
return publishResult;
}

/**
* Tries to upgrade any pending segments that overlap with the committed segments.
*/
private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox toolbox)
{
final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
final Optional<String> activeSupervisorId = supervisorManager.getActiveSupervisorIdForDatasource(task.getDataSource());
if (!activeSupervisorId.isPresent()) {
return;
}

Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradedPendingSegments =
toolbox.getIndexerMetadataStorageCoordinator().upgradePendingSegmentsOverlappingWith(segments);
log.info(
"Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
);

upgradedPendingSegments.forEach(
(oldId, newId) -> toolbox.getSupervisorManager()
.registerNewVersionOfPendingSegmentOnSupervisor(activeSupervisorId.get(), oldId, newId)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ public static boolean isLockCoversSegments(
&& timeChunkLock.getDataSource().equals(segment.getDataSource())
&& (timeChunkLock.getVersion().compareTo(segment.getVersion()) >= 0
|| TaskLockType.APPEND.equals(timeChunkLock.getType()));
// APPEND locks always have the version DateTimes.EPOCH (1970-01-01)
// and cover the segments irrespective of the segment version
} else {
final SegmentLock segmentLock = (SegmentLock) lock;
return segmentLock.getInterval().contains(segment.getInterval())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,21 +401,21 @@ private boolean tryLockWithDetermineResult(TaskActionClient client, LockGranular

/**
* Builds a TaskAction to publish segments based on the type of locks that this
* task acquires (determined by context property {@link Tasks#TASK_LOCK_TYPE}).
* task acquires.
*
* @see #determineLockType
*/
protected TaskAction<SegmentPublishResult> buildPublishAction(
Set<DataSegment> segmentsToBeOverwritten,
Set<DataSegment> segmentsToPublish
Set<DataSegment> segmentsToPublish,
TaskLockType lockType
)
{
TaskLockType lockType = TaskLockType.valueOf(
getContextValue(Tasks.TASK_LOCK_TYPE, Tasks.DEFAULT_TASK_LOCK_TYPE.name())
);
switch (lockType) {
case REPLACE:
return SegmentTransactionalReplaceAction.create(segmentsToPublish);
case APPEND:
return SegmentTransactionalAppendAction.create(segmentsToPublish);
return SegmentTransactionalAppendAction.forSegments(segmentsToPublish);
default:
return SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -696,7 +695,7 @@ private void publishSegments(
);
pendingHandoffs.add(Futures.transformAsync(
publishFuture,
(AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) driver::registerHandoff,
driver::registerHandoff,
MoreExecutors.directExecutor()
));
}
Expand Down
Loading

0 comments on commit d25caae

Please sign in to comment.