diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CommitRealtimeSegmentsAndMetadataAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CommitRealtimeSegmentsAndMetadataAction.java
deleted file mode 100644
index 7a73db0d4a4a..000000000000
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CommitRealtimeSegmentsAndMetadataAction.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.actions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.base.Preconditions;
-import org.apache.druid.error.InvalidInput;
-import org.apache.druid.indexing.common.TaskLock;
-import org.apache.druid.indexing.common.TaskLockType;
-import org.apache.druid.indexing.common.task.IndexTaskUtils;
-import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.overlord.CriticalAction;
-import org.apache.druid.indexing.overlord.DataSourceMetadata;
-import org.apache.druid.indexing.overlord.SegmentPublishResult;
-import org.apache.druid.metadata.ReplaceTaskLock;
-import org.apache.druid.segment.SegmentUtils;
-import org.apache.druid.timeline.DataSegment;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/**
- * Task action to commit realtime segments and metadata when using APPEND task locks.
- *
- * This action performs the following operations in a single transaction:
- *
- * - Commit the append segments
- * - Upgrade the append segments to all visible REPLACE versions
- * - Commit start and end {@link DataSourceMetadata}.
- *
- * This action differs from {@link SegmentTransactionalInsertAction} as it is used
- * only with APPEND locks and also upgrades segments as needed.
- */
-public class CommitRealtimeSegmentsAndMetadataAction implements TaskAction
-{
- /**
- * Set of segments to be inserted into metadata storage
- */
- private final Set segments;
-
- private final DataSourceMetadata startMetadata;
- private final DataSourceMetadata endMetadata;
-
- public static CommitRealtimeSegmentsAndMetadataAction create(
- Set segments,
- DataSourceMetadata startMetadata,
- DataSourceMetadata endMetadata
- )
- {
- return new CommitRealtimeSegmentsAndMetadataAction(segments, startMetadata, endMetadata);
- }
-
- @JsonCreator
- private CommitRealtimeSegmentsAndMetadataAction(
- @JsonProperty("segments") Set segments,
- @JsonProperty("startMetadata") DataSourceMetadata startMetadata,
- @JsonProperty("endMetadata") DataSourceMetadata endMetadata
- )
- {
- Preconditions.checkArgument(
- segments != null && !segments.isEmpty(),
- "Segments to commit should not be empty"
- );
- this.segments = segments;
- this.startMetadata = startMetadata;
- this.endMetadata = endMetadata;
- }
-
- @JsonProperty
- public Set getSegments()
- {
- return segments;
- }
-
- @JsonProperty
- public DataSourceMetadata getStartMetadata()
- {
- return startMetadata;
- }
-
- @JsonProperty
- public DataSourceMetadata getEndMetadata()
- {
- return endMetadata;
- }
-
- @Override
- public TypeReference getReturnTypeReference()
- {
- return new TypeReference()
- {
- };
- }
-
- @Override
- public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
- {
- // Verify that all the locks are of expected type
- final List locks = toolbox.getTaskLockbox().findLocksForTask(task);
- for (TaskLock lock : locks) {
- if (lock.getType() != TaskLockType.APPEND) {
- throw InvalidInput.exception(
- "Cannot use action[%s] for task[%s] as it is holding a lock of type[%s] instead of [APPEND].",
- "CommitRealtimeSegmentsAndMetadata", task.getId(), lock.getType()
- );
- }
- }
-
- TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments);
- final String datasource = task.getDataSource();
- final Map segmentToReplaceLock
- = TaskLocks.findReplaceLocksCoveringSegments(datasource, toolbox.getTaskLockbox(), segments);
-
- final SegmentPublishResult publishResult;
- try {
- publishResult = toolbox.getTaskLockbox().doInCriticalSection(
- task,
- segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
- CriticalAction.builder()
- .onValidLocks(
- () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegmentsAndMetadata(
- segments,
- segmentToReplaceLock,
- startMetadata,
- endMetadata
- )
- )
- .onInvalidLocks(
- () -> SegmentPublishResult.fail(
- "Invalid task locks. Maybe they are revoked by a higher priority task."
- + " Please check the overlord log for details."
- )
- )
- .build()
- );
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- IndexTaskUtils.emitSegmentPublishMetrics(publishResult, task, toolbox);
- return publishResult;
- }
-
- @Override
- public boolean isAudited()
- {
- return true;
- }
-
- @Override
- public String toString()
- {
- return "CommitRealtimeSegmentsAndMetadataAction{" +
- ", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
- ", startMetadata=" + startMetadata +
- ", endMetadata=" + endMetadata +
- '}';
- }
-}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
index 994454a9a4b2..125a008cdc90 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java
@@ -22,14 +22,20 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
+import javax.annotation.Nullable;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -41,18 +47,40 @@
public class SegmentTransactionalAppendAction implements TaskAction
{
private final Set segments;
+ @Nullable
+ private final DataSourceMetadata startMetadata;
+ @Nullable
+ private final DataSourceMetadata endMetadata;
- public static SegmentTransactionalAppendAction create(Set segments)
+ public static SegmentTransactionalAppendAction forSegments(Set segments)
{
- return new SegmentTransactionalAppendAction(segments);
+ return new SegmentTransactionalAppendAction(segments, null, null);
+ }
+
+ public static SegmentTransactionalAppendAction forSegmentsAndMetadata(
+ Set segments,
+ DataSourceMetadata startMetadata,
+ DataSourceMetadata endMetadata
+ )
+ {
+ return new SegmentTransactionalAppendAction(segments, startMetadata, endMetadata);
}
@JsonCreator
private SegmentTransactionalAppendAction(
- @JsonProperty("segments") Set segments
+ @JsonProperty("segments") Set segments,
+ @JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata,
+ @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata
)
{
this.segments = segments;
+ this.startMetadata = startMetadata;
+ this.endMetadata = endMetadata;
+
+ if ((startMetadata == null && endMetadata != null)
+ || (startMetadata != null && endMetadata == null)) {
+ throw InvalidInput.exception("startMetadata and endMetadata must either be both null or both non-null.");
+ }
}
@JsonProperty
@@ -61,6 +89,20 @@ public Set getSegments()
return segments;
}
+ @JsonProperty
+ @Nullable
+ public DataSourceMetadata getStartMetadata()
+ {
+ return startMetadata;
+ }
+
+ @JsonProperty
+ @Nullable
+ public DataSourceMetadata getEndMetadata()
+ {
+ return endMetadata;
+ }
+
@Override
public TypeReference getReturnTypeReference()
{
@@ -72,24 +114,45 @@ public TypeReference getReturnTypeReference()
@Override
public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
{
+ // Verify that all the locks are of expected type
+ final List locks = toolbox.getTaskLockbox().findLocksForTask(task);
+ for (TaskLock lock : locks) {
+ if (lock.getType() != TaskLockType.APPEND) {
+ throw InvalidInput.exception(
+ "Cannot use action[%s] for task[%s] as it is holding a lock of type[%s] instead of [APPEND].",
+ "CommitRealtimeSegmentsAndMetadata", task.getId(), lock.getType()
+ );
+ }
+ }
+
TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments);
final String datasource = task.getDataSource();
final Map segmentToReplaceLock
= TaskLocks.findReplaceLocksCoveringSegments(datasource, toolbox.getTaskLockbox(), segments);
+ final CriticalAction.Action publishAction;
+ if (startMetadata == null) {
+ publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments(
+ segments,
+ segmentToReplaceLock
+ );
+ } else {
+ publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegmentsAndMetadata(
+ segments,
+ segmentToReplaceLock,
+ startMetadata,
+ endMetadata
+ );
+ }
+
final SegmentPublishResult retVal;
try {
retVal = toolbox.getTaskLockbox().doInCriticalSection(
task,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
CriticalAction.builder()
- .onValidLocks(
- () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments(
- segments,
- segmentToReplaceLock
- )
- )
+ .onValidLocks(publishAction)
.onInvalidLocks(
() -> SegmentPublishResult.fail(
"Invalid task locks. Maybe they are revoked by a higher priority task."
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
index 0948c036ea73..171d53b9cdd6 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java
@@ -38,7 +38,6 @@
@JsonSubTypes.Type(name = "segmentTransactionalInsert", value = SegmentTransactionalInsertAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalAppend", value = SegmentTransactionalAppendAction.class),
@JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class),
- @JsonSubTypes.Type(name = "commitRealtimeSegments", value = CommitRealtimeSegmentsAndMetadataAction.class),
// Type name doesn't correspond to the name of the class for backward compatibility.
@JsonSubTypes.Type(name = "segmentListUsed", value = RetrieveUsedSegmentsAction.class),
// Type name doesn't correspond to the name of the class for backward compatibility.
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index ea61f37c7e90..276d82eedc84 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -415,7 +415,7 @@ protected TaskAction buildPublishAction(
case REPLACE:
return SegmentTransactionalReplaceAction.create(segmentsToPublish);
case APPEND:
- return SegmentTransactionalAppendAction.create(segmentsToPublish);
+ return SegmentTransactionalAppendAction.forSegments(segmentsToPublish);
default:
return SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish);
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
index aa009c674900..47ccbaa00b35 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
@@ -27,7 +27,6 @@
import org.apache.druid.data.input.Committer;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.indexing.common.actions.CommitRealtimeSegmentsAndMetadataAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskAction;
@@ -418,11 +417,11 @@ public SegmentPublishResult publishAnnotatedSegments(
);
final DataSourceMetadata endMetadata = runner.createDataSourceMetadata(finalPartitions);
action = taskLockType == TaskLockType.APPEND
- ? CommitRealtimeSegmentsAndMetadataAction.create(segmentsToPush, startMetadata, endMetadata)
+ ? SegmentTransactionalAppendAction.forSegmentsAndMetadata(segmentsToPush, startMetadata, endMetadata)
: SegmentTransactionalInsertAction.appendAction(segmentsToPush, startMetadata, endMetadata);
} else {
action = taskLockType == TaskLockType.APPEND
- ? SegmentTransactionalAppendAction.create(segmentsToPush)
+ ? SegmentTransactionalAppendAction.forSegments(segmentsToPush)
: SegmentTransactionalInsertAction.appendAction(segmentsToPush, null, null);
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java
index 6a1a92641d3f..69a8b6cc1030 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java
@@ -81,7 +81,7 @@ public SegmentPublishResult commitReplaceSegments(DataSegment... segments)
public SegmentPublishResult commitAppendSegments(DataSegment... segments)
{
return runAction(
- SegmentTransactionalAppendAction.create(Sets.newHashSet(segments))
+ SegmentTransactionalAppendAction.forSegments(Sets.newHashSet(segments))
);
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java
index af7410bd1d79..fbe63ffe2689 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java
@@ -29,9 +29,8 @@
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
-import org.junit.Rule;
+import org.junit.Assert;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
@@ -43,9 +42,6 @@
@RunWith(MockitoJUnitRunner.class)
public class SequenceMetadataTest
{
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
-
@Mock
private SeekableStreamIndexTaskRunner mockSeekableStreamIndexTaskRunner;
@@ -59,7 +55,7 @@ public class SequenceMetadataTest
private TaskToolbox mockTaskToolbox;
@Test
- public void testPublishAnnotatedSegmentsThrowExceptionIfOverwriteSegmentsNotNullAndNotEmpty() throws Exception
+ public void testPublishAnnotatedSegmentsThrowExceptionIfOverwriteSegmentsNotNullAndNotEmpty()
{
DataSegment dataSegment = DataSegment.builder()
.dataSource("foo")
@@ -79,14 +75,18 @@ public void testPublishAnnotatedSegmentsThrowExceptionIfOverwriteSegmentsNotNull
ImmutableSet.of(),
null
);
- TransactionalSegmentPublisher transactionalSegmentPublisher = sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner, mockTaskToolbox, true);
+ TransactionalSegmentPublisher transactionalSegmentPublisher
+ = sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner, mockTaskToolbox, true);
- expectedException.expect(ISE.class);
- expectedException.expectMessage(
- "Stream ingestion task unexpectedly attempted to overwrite segments: " + SegmentUtils.commaSeparatedIdentifiers(notNullNotEmptySegment)
+ ISE exception = Assert.assertThrows(
+ ISE.class,
+ () -> transactionalSegmentPublisher.publishAnnotatedSegments(notNullNotEmptySegment, ImmutableSet.of(), null)
+ );
+ Assert.assertEquals(
+ "Stream ingestion task unexpectedly attempted to overwrite segments: "
+ + SegmentUtils.commaSeparatedIdentifiers(notNullNotEmptySegment),
+ exception.getMessage()
);
-
- transactionalSegmentPublisher.publishAnnotatedSegments(notNullNotEmptySegment, ImmutableSet.of(), null);
}
@Test
diff --git a/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java b/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java
index 83a4fcba7dc5..a926b004c0af 100644
--- a/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java
+++ b/processing/src/test/java/org/apache/druid/java/util/emitter/service/SegmentMetadataEventTest.java
@@ -21,6 +21,11 @@
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.emitter.core.EventMap;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
@@ -51,4 +56,30 @@ public void testBasicEvent()
event.toMap()
);
}
+
+ @Test
+ public void testCreate()
+ {
+ final DataSegment segment = DataSegment.builder()
+ .dataSource("wiki")
+ .interval(Intervals.of("2023/2024"))
+ .shardSpec(new NumberedShardSpec(1, 1))
+ .version("v1")
+ .size(100)
+ .build();
+ final DateTime eventTime = DateTimes.nowUtc();
+ SegmentMetadataEvent event = SegmentMetadataEvent.create(segment, eventTime);
+ Assert.assertEquals(
+ EventMap.builder()
+ .put(SegmentMetadataEvent.FEED, "segment_metadata")
+ .put(SegmentMetadataEvent.DATASOURCE, segment.getDataSource())
+ .put(SegmentMetadataEvent.CREATED_TIME, eventTime)
+ .put(SegmentMetadataEvent.START_TIME, segment.getInterval().getStart())
+ .put(SegmentMetadataEvent.END_TIME, segment.getInterval().getEnd())
+ .put(SegmentMetadataEvent.VERSION, segment.getVersion())
+ .put(SegmentMetadataEvent.IS_COMPACTED, false)
+ .build(),
+ event.toMap()
+ );
+ }
}