Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade pending segments when a concurrent replace happens #15097

Closed
wants to merge 8 commits into from
Closed
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.common.actions;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;

import java.util.Set;
import java.util.stream.Collectors;

/**
* Task action to commit realtime segments and metadata when using APPEND task locks.
* <p>
* This action performs the following operations in a single transaction:
* <ul>
* <li>Commit the append segments</li>
* <li>Upgrade the append segments to all visible REPLACE versions</li>
* <li>Commit start and end {@link DataSourceMetadata}.</li>
* </ul>
* This action differs from {@link SegmentTransactionalInsertAction} as it is used
* only with APPEND locks and also upgrades segments as needed.
*/
public class CommitRealtimeSegmentsAndMetadataAction implements TaskAction<SegmentPublishResult>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we not re-use SegmentTransactionAppendAction with the metadata being null for batch and the required values for streaming ingestion (similar to the original insert action)?

Is there anything besides the metadata commit that this action does that the transactional append action doesn't?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was thinking the same. Let me see what we can do.

{
/**
* Set of segments to be inserted into metadata storage
*/
private final Set<DataSegment> segments;

private final DataSourceMetadata startMetadata;
private final DataSourceMetadata endMetadata;

public static CommitRealtimeSegmentsAndMetadataAction create(
Set<DataSegment> segments,
DataSourceMetadata startMetadata,
DataSourceMetadata endMetadata
)
{
return new CommitRealtimeSegmentsAndMetadataAction(segments, startMetadata, endMetadata);
}

@JsonCreator
private CommitRealtimeSegmentsAndMetadataAction(
@JsonProperty("segments") Set<DataSegment> segments,
@JsonProperty("startMetadata") DataSourceMetadata startMetadata,
@JsonProperty("endMetadata") DataSourceMetadata endMetadata
)
{
Preconditions.checkArgument(
segments != null && !segments.isEmpty(),
"Segments to commit should not be empty"
);
this.segments = segments;
this.startMetadata = startMetadata;
this.endMetadata = endMetadata;
}

@JsonProperty
public Set<DataSegment> getSegments()
{
return segments;
}

@JsonProperty
public DataSourceMetadata getStartMetadata()
{
return startMetadata;
}

@JsonProperty
public DataSourceMetadata getEndMetadata()
{
return endMetadata;
}

@Override
public TypeReference<SegmentPublishResult> getReturnTypeReference()
{
return new TypeReference<SegmentPublishResult>()
{
};
}

/**
* Performs some sanity checks and publishes the given segments.
*/
@Override
public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
{
final SegmentPublishResult publishResult;

TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments);

try {
publishResult = toolbox.getTaskLockbox().doInCriticalSection(
task,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
CriticalAction.<SegmentPublishResult>builder()
.onValidLocks(
// TODO: this might need to call a new method which does the following in the same transaction
// - commit append segments
// - upgrade append segments to replace versions
// - commit metadata
() -> toolbox.getIndexerMetadataStorageCoordinator().commitSegmentsAndMetadata(
segments,
startMetadata,
endMetadata
)
)
.onInvalidLocks(
() -> SegmentPublishResult.fail(
"Invalid task locks. Maybe they are revoked by a higher priority task."
+ " Please check the overlord log for details."
)
)
.build()
);
}
catch (Exception e) {
throw new RuntimeException(e);
}

IndexTaskUtils.emitSegmentPublishMetrics(publishResult, task, toolbox);
return publishResult;
}

@Override
public boolean isAudited()
{
return true;
}

@Override
public String toString()
{
return "CommitRealtimeSegmentsAndMetadataAction{" +
", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
", startMetadata=" + startMetadata +
", endMetadata=" + endMetadata +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -107,20 +106,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
throw new RuntimeException(e);
}

// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);

if (retVal.isSuccess()) {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 1));
for (DataSegment segment : retVal.getSegments()) {
IndexTaskUtils.setSegmentDimensions(metricBuilder, segment);
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", segment.getSize()));
}
} else {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 1));
}

IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox);
return retVal;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,8 @@
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -222,47 +217,10 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
throw new RuntimeException(e);
}

// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);

if (retVal.isSuccess()) {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 1));
} else {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 1));
}

// getSegments() should return an empty set if announceHistoricalSegments() failed
for (DataSegment segment : retVal.getSegments()) {
metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString());
metricBuilder.setDimension(
DruidMetrics.PARTITIONING_TYPE,
segment.getShardSpec() == null ? null : segment.getShardSpec().getType()
);
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", segment.getSize()));
// Emit the segment related metadata using the configured emitters.
// There is a possibility that some segments' metadata event might get missed if the
// server crashes after commiting segment but before emitting the event.
this.emitSegmentMetadata(segment, toolbox);
}

IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox);
return retVal;
}

private void emitSegmentMetadata(DataSegment segment, TaskActionToolbox toolbox)
{
SegmentMetadataEvent event = new SegmentMetadataEvent(
segment.getDataSource(),
DateTime.now(DateTimeZone.UTC),
segment.getInterval().getStart(),
segment.getInterval().getEnd(),
segment.getVersion(),
segment.getLastCompactionState() != null
);

toolbox.getEmitter().emit(event);
}

private void checkWithSegmentLock()
{
final Map<Interval, List<DataSegment>> oldSegmentsMap = groupSegmentsByIntervalAndSort(segmentsToBeOverwritten);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;

Expand Down Expand Up @@ -111,23 +109,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
throw new RuntimeException(e);
}

// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);

if (retVal.isSuccess()) {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 1));

for (DataSegment segment : retVal.getSegments()) {
final String partitionType = segment.getShardSpec() == null ? null : segment.getShardSpec().getType();
metricBuilder.setDimension(DruidMetrics.PARTITIONING_TYPE, partitionType);
metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString());
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", segment.getSize()));
}
} else {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 1));
}

IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox);
return retVal;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
@JsonSubTypes.Type(name = "segmentTransactionalInsert", value = SegmentTransactionalInsertAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalAppend", value = SegmentTransactionalAppendAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class),
@JsonSubTypes.Type(name = "commitRealtimeSegments", value = CommitRealtimeSegmentsAndMetadataAction.class),
// Type name doesn't correspond to the name of the class for backward compatibility.
@JsonSubTypes.Type(name = "segmentListUsed", value = RetrieveUsedSegmentsAction.class),
// Type name doesn't correspond to the name of the class for backward compatibility.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ private void publishSegments(
);
pendingHandoffs.add(Futures.transformAsync(
publishFuture,
(AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) driver::registerHandoff,
driver::registerHandoff,
MoreExecutors.directExecutor()
));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.apache.druid.indexing.common.task;

import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.actions.TaskActionToolbox;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.ParseExceptionReport;
Expand All @@ -35,7 +37,6 @@
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.CircularBuffer;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
Expand All @@ -45,29 +46,6 @@

public class IndexTaskUtils
{
@Nullable
public static List<String> getMessagesFromSavedParseExceptions(
CircularBuffer<ParseException> savedParseExceptions,
boolean includeTimeOfException
)
{
if (savedParseExceptions == null) {
return null;
}

List<String> events = new ArrayList<>();
for (int i = 0; i < savedParseExceptions.size(); i++) {
if (includeTimeOfException) {
DateTime timeOfException = DateTimes.utc(savedParseExceptions.getLatest(i).getTimeOfExceptionMillis());
events.add(timeOfException + ", " + savedParseExceptions.getLatest(i).getMessage());
} else {
events.add(savedParseExceptions.getLatest(i).getMessage());
}
}

return events;
}

@Nullable
public static List<ParseExceptionReport> getReportListFromSavedParseExceptions(
CircularBuffer<ParseExceptionReport> savedParseExceptionReports
Expand Down Expand Up @@ -152,4 +130,25 @@ public static void setSegmentDimensions(
metricBuilder.setDimension(DruidMetrics.PARTITIONING_TYPE, partitionType);
metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString());
}

public static void emitSegmentPublishMetrics(
SegmentPublishResult publishResult,
Task task,
TaskActionToolbox toolbox
)
{
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);

if (publishResult.isSuccess()) {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 1));
for (DataSegment segment : publishResult.getSegments()) {
IndexTaskUtils.setSegmentDimensions(metricBuilder, segment);
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", segment.getSize()));
toolbox.getEmitter().emit(SegmentMetadataEvent.create(segment, DateTimes.nowUtc()));
}
} else {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 1));
}
}
}
Loading
Loading