Skip to content

Commit

Permalink
Merge changes to upgrade pending segments
Browse files Browse the repository at this point in the history
  • Loading branch information
AmatyaAvadhanula committed Oct 9, 2023
2 parents 3df8128 + 46a1b1e commit c14786a
Show file tree
Hide file tree
Showing 18 changed files with 677 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,20 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -42,18 +47,40 @@
public class SegmentTransactionalAppendAction implements TaskAction<SegmentPublishResult>
{
private final Set<DataSegment> segments;
@Nullable
private final DataSourceMetadata startMetadata;
@Nullable
private final DataSourceMetadata endMetadata;

public static SegmentTransactionalAppendAction create(Set<DataSegment> segments)
public static SegmentTransactionalAppendAction forSegments(Set<DataSegment> segments)
{
return new SegmentTransactionalAppendAction(segments);
return new SegmentTransactionalAppendAction(segments, null, null);
}

public static SegmentTransactionalAppendAction forSegmentsAndMetadata(
Set<DataSegment> segments,
DataSourceMetadata startMetadata,
DataSourceMetadata endMetadata
)
{
return new SegmentTransactionalAppendAction(segments, startMetadata, endMetadata);
}

@JsonCreator
private SegmentTransactionalAppendAction(
@JsonProperty("segments") Set<DataSegment> segments
@JsonProperty("segments") Set<DataSegment> segments,
@JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata,
@JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata
)
{
this.segments = segments;
this.startMetadata = startMetadata;
this.endMetadata = endMetadata;

if ((startMetadata == null && endMetadata != null)
|| (startMetadata != null && endMetadata == null)) {
throw InvalidInput.exception("startMetadata and endMetadata must either be both null or both non-null.");
}
}

@JsonProperty
Expand All @@ -62,6 +89,20 @@ public Set<DataSegment> getSegments()
return segments;
}

@JsonProperty
@Nullable
public DataSourceMetadata getStartMetadata()
{
return startMetadata;
}

@JsonProperty
@Nullable
public DataSourceMetadata getEndMetadata()
{
return endMetadata;
}

@Override
public TypeReference<SegmentPublishResult> getReturnTypeReference()
{
Expand All @@ -70,30 +111,48 @@ public TypeReference<SegmentPublishResult> getReturnTypeReference()
};
}

/**
* Performs some sanity checks and publishes the given segments.
*/
@Override
public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
{
// Verify that all the locks are of expected type
final List<TaskLock> locks = toolbox.getTaskLockbox().findLocksForTask(task);
for (TaskLock lock : locks) {
if (lock.getType() != TaskLockType.APPEND) {
throw InvalidInput.exception(
"Cannot use action[%s] for task[%s] as it is holding a lock of type[%s] instead of [APPEND].",
"CommitRealtimeSegmentsAndMetadata", task.getId(), lock.getType()
);
}
}

TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments);

final String datasource = task.getDataSource();
final Map<DataSegment, ReplaceTaskLock> segmentToReplaceLock
= TaskLocks.findReplaceLocksCoveringSegments(datasource, toolbox.getTaskLockbox(), segments);

final CriticalAction.Action<SegmentPublishResult> publishAction;
if (startMetadata == null) {
publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments(
segments,
segmentToReplaceLock
);
} else {
publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegmentsAndMetadata(
segments,
segmentToReplaceLock,
startMetadata,
endMetadata
);
}

final SegmentPublishResult retVal;
try {
retVal = toolbox.getTaskLockbox().doInCriticalSection(
task,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
CriticalAction.<SegmentPublishResult>builder()
.onValidLocks(
() -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments(
segments,
segmentToReplaceLock
)
)
.onValidLocks(publishAction)
.onInvalidLocks(
() -> SegmentPublishResult.fail(
"Invalid task locks. Maybe they are revoked by a higher priority task."
Expand All @@ -107,20 +166,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
throw new RuntimeException(e);
}

// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);

if (retVal.isSuccess()) {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 1));
for (DataSegment segment : retVal.getSegments()) {
IndexTaskUtils.setSegmentDimensions(metricBuilder, segment);
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", segment.getSize()));
}
} else {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 1));
}

IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox);
return retVal;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,8 @@
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -222,47 +217,10 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
throw new RuntimeException(e);
}

// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);

if (retVal.isSuccess()) {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 1));
} else {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 1));
}

// getSegments() should return an empty set if announceHistoricalSegments() failed
for (DataSegment segment : retVal.getSegments()) {
metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString());
metricBuilder.setDimension(
DruidMetrics.PARTITIONING_TYPE,
segment.getShardSpec() == null ? null : segment.getShardSpec().getType()
);
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", segment.getSize()));
// Emit the segment related metadata using the configured emitters.
// There is a possibility that some segments' metadata event might get missed if the
// server crashes after commiting segment but before emitting the event.
this.emitSegmentMetadata(segment, toolbox);
}

IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox);
return retVal;
}

private void emitSegmentMetadata(DataSegment segment, TaskActionToolbox toolbox)
{
SegmentMetadataEvent event = new SegmentMetadataEvent(
segment.getDataSource(),
DateTime.now(DateTimeZone.UTC),
segment.getInterval().getStart(),
segment.getInterval().getEnd(),
segment.getVersion(),
segment.getLastCompactionState() != null
);

toolbox.getEmitter().emit(event);
}

private void checkWithSegmentLock()
{
final Map<Interval, List<DataSegment>> oldSegmentsMap = groupSegmentsByIntervalAndSort(segmentsToBeOverwritten);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;

import java.util.Set;
Expand All @@ -42,6 +42,8 @@
*/
public class SegmentTransactionalReplaceAction implements TaskAction<SegmentPublishResult>
{
private static final Logger log = new Logger(SegmentTransactionalReplaceAction.class);

/**
* Set of segments to be inserted into metadata storage
*/
Expand Down Expand Up @@ -88,9 +90,9 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
final Set<ReplaceTaskLock> replaceLocksForTask
= toolbox.getTaskLockbox().findReplaceLocksForTask(task);

final SegmentPublishResult retVal;
final SegmentPublishResult publishResult;
try {
retVal = toolbox.getTaskLockbox().doInCriticalSection(
publishResult = toolbox.getTaskLockbox().doInCriticalSection(
task,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
CriticalAction.<SegmentPublishResult>builder()
Expand All @@ -111,24 +113,30 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
throw new RuntimeException(e);
}

// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);

if (retVal.isSuccess()) {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 1));

for (DataSegment segment : retVal.getSegments()) {
final String partitionType = segment.getShardSpec() == null ? null : segment.getShardSpec().getType();
metricBuilder.setDimension(DruidMetrics.PARTITIONING_TYPE, partitionType);
metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString());
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", segment.getSize()));
IndexTaskUtils.emitSegmentPublishMetrics(publishResult, task, toolbox);

if (publishResult.isSuccess()) {
// If upgrade of pending segments fails, the segments will still get upgraded
// when the corresponding APPEND task commits the segments.
// Thus, the upgrade of pending segments should not be done in the same
// transaction as the commit of replace segments and failure to upgrade
// pending segments should not affect success of replace commit.
try {
Set<SegmentIdWithShardSpec> upgradedPendingSegments =
toolbox.getIndexerMetadataStorageCoordinator().upgradePendingSegments(segments);
log.info(
"Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
);

// These upgraded pending segments should be forwarded to the SupervisorManager
}
catch (Exception e) {
log.error(e, "Error while upgrading pending segments for task[%s]", task.getId());
}
} else {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 1));
}

return retVal;
return publishResult;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ public static boolean isLockCoversSegments(
&& timeChunkLock.getDataSource().equals(segment.getDataSource())
&& (timeChunkLock.getVersion().compareTo(segment.getVersion()) >= 0
|| TaskLockType.APPEND.equals(timeChunkLock.getType()));
// APPEND locks always have the version DateTimes.EPOCH (1970-01-01)
// and cover the segments irrespective of the segment version
} else {
final SegmentLock segmentLock = (SegmentLock) lock;
return segmentLock.getInterval().contains(segment.getInterval())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ protected TaskAction<SegmentPublishResult> buildPublishAction(
case REPLACE:
return SegmentTransactionalReplaceAction.create(segmentsToPublish);
case APPEND:
return SegmentTransactionalAppendAction.create(segmentsToPublish);
return SegmentTransactionalAppendAction.forSegments(segmentsToPublish);
default:
return SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -696,7 +695,7 @@ private void publishSegments(
);
pendingHandoffs.add(Futures.transformAsync(
publishFuture,
(AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) driver::registerHandoff,
driver::registerHandoff,
MoreExecutors.directExecutor()
));
}
Expand Down
Loading

0 comments on commit c14786a

Please sign in to comment.