From 953ce794396dfc9216e8d6f98c09a14cb9b3caff Mon Sep 17 00:00:00 2001 From: Karan Kumar Date: Tue, 17 Oct 2023 21:44:04 +0530 Subject: [PATCH] Add undocumented taskLockType to MSQ. (#15168) Patch adds an undocumented parameter taskLockType to MSQ so that we can start enabling this feature for users who are interested in testing the new lock types. --- .../apache/druid/msq/exec/ControllerImpl.java | 53 +++++++++++++++-- .../druid/msq/indexing/MSQControllerTask.java | 13 +++- .../druid/msq/sql/MSQTaskQueryMaker.java | 5 +- .../msq/util/MultiStageQueryContext.java | 58 ++++++++++++++++++ .../apache/druid/msq/exec/MSQFaultsTest.java | 59 +++++++++++++++++-- .../apache/druid/msq/exec/MSQInsertTest.java | 16 ++++- .../apache/druid/msq/exec/MSQReplaceTest.java | 18 +++++- .../apache/druid/msq/exec/MSQSelectTest.java | 5 +- .../msq/test/MSQTestTaskActionClient.java | 11 +++- .../common/task/AbstractBatchIndexTask.java | 12 +++- 10 files changed, 226 insertions(+), 24 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index c108c7d679e9..f2260b055a96 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -47,6 +47,7 @@ import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.discovery.BrokerClient; +import org.apache.druid.error.DruidException; import org.apache.druid.frame.allocation.ArenaMemoryAllocator; import org.apache.druid.frame.channel.FrameChannelSequence; import org.apache.druid.frame.key.ClusterBy; @@ -69,7 +70,10 @@ import org.apache.druid.indexing.common.actions.LockReleaseAction; import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction; import org.apache.druid.indexing.common.actions.SegmentAllocateAction; +import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction; +import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.task.batch.parallel.TombstoneHelper; import org.apache.druid.indexing.overlord.SegmentPublishResult; @@ -962,7 +966,11 @@ private List generateSegmentIdsWithShardSpecs( ); } else { final RowKeyReader keyReader = clusterBy.keyReader(signature); - return generateSegmentIdsWithShardSpecsForAppend(destination, partitionBoundaries, keyReader); + return generateSegmentIdsWithShardSpecsForAppend( + destination, + partitionBoundaries, + keyReader, + MultiStageQueryContext.validateAndGetTaskLockType(QueryContext.of(task.getQuerySpec().getQuery().getContext()), false)); } } @@ -972,7 +980,8 @@ private List generateSegmentIdsWithShardSpecs( private List generateSegmentIdsWithShardSpecsForAppend( final DataSourceMSQDestination destination, final ClusterByPartitions partitionBoundaries, - final RowKeyReader keyReader + final RowKeyReader keyReader, + final TaskLockType taskLockType ) throws IOException { final Granularity segmentGranularity = destination.getSegmentGranularity(); @@ -998,7 +1007,7 @@ private List generateSegmentIdsWithShardSpecsForAppend( false, NumberedPartialShardSpec.instance(), LockGranularity.TIME_CHUNK, - TaskLockType.SHARED + taskLockType ) ); } @@ -1399,6 +1408,10 @@ private void publishAllSegments(final Set segments) throws IOExcept (DataSourceMSQDestination) task.getQuerySpec().getDestination(); final Set segmentsWithTombstones = new HashSet<>(segments); int numTombstones = 0; + final TaskLockType taskLockType = MultiStageQueryContext.validateAndGetTaskLockType( + QueryContext.of(task.getQuerySpec().getQuery().getContext()), + destination.isReplaceTimeChunks() + ); if (destination.isReplaceTimeChunks()) { final List intervalsToDrop = findIntervalsToDrop(Preconditions.checkNotNull(segments, "segments")); @@ -1441,7 +1454,7 @@ private void publishAllSegments(final Set segments) throws IOExcept } performSegmentPublish( context.taskActionClient(), - SegmentTransactionalInsertAction.overwriteAction(null, segmentsWithTombstones) + createOverwriteAction(taskLockType, segmentsWithTombstones) ); } } else if (!segments.isEmpty()) { @@ -1458,7 +1471,7 @@ private void publishAllSegments(final Set segments) throws IOExcept // Append mode. performSegmentPublish( context.taskActionClient(), - SegmentTransactionalInsertAction.appendAction(segments, null, null) + createAppendAction(segments, taskLockType) ); } @@ -1467,6 +1480,34 @@ private void publishAllSegments(final Set segments) throws IOExcept task.emitMetric(context.emitter(), "ingest/segments/count", segmentsWithTombstones.size()); } + private static TaskAction createAppendAction( + Set segments, + TaskLockType taskLockType + ) + { + if (taskLockType.equals(TaskLockType.APPEND)) { + return SegmentTransactionalAppendAction.forSegments(segments); + } else if (taskLockType.equals(TaskLockType.SHARED)) { + return SegmentTransactionalInsertAction.appendAction(segments, null, null); + } else { + throw DruidException.defensive("Invalid lock type [%s] received for append action", taskLockType); + } + } + + private TaskAction createOverwriteAction( + TaskLockType taskLockType, + Set segmentsWithTombstones + ) + { + if (taskLockType.equals(TaskLockType.REPLACE)) { + return SegmentTransactionalReplaceAction.create(segmentsWithTombstones); + } else if (taskLockType.equals(TaskLockType.EXCLUSIVE)) { + return SegmentTransactionalInsertAction.overwriteAction(null, segmentsWithTombstones); + } else { + throw DruidException.defensive("Invalid lock type [%s] received for overwrite action", taskLockType); + } + } + /** * When doing an ingestion with {@link DataSourceMSQDestination#isReplaceTimeChunks()}, finds intervals * containing data that should be dropped. @@ -2282,7 +2323,7 @@ private static Map copyOfStageRuntimesEndingAtCurrentTime( */ static void performSegmentPublish( final TaskActionClient client, - final SegmentTransactionalInsertAction action + final TaskAction action ) throws IOException { try { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index 43967e7d748a..3cdf706ba163 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -51,6 +51,8 @@ import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination; import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination; import org.apache.druid.msq.indexing.destination.MSQDestination; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.QueryContext; import org.apache.druid.rpc.ServiceClientFactory; import org.apache.druid.rpc.StandardRetryPolicy; import org.apache.druid.rpc.indexing.OverlordClient; @@ -204,12 +206,19 @@ public boolean isReady(TaskActionClient taskActionClient) throws Exception { // If we're in replace mode, acquire locks for all intervals before declaring the task ready. if (isIngestion(querySpec) && ((DataSourceMSQDestination) querySpec.getDestination()).isReplaceTimeChunks()) { + final TaskLockType taskLockType = + MultiStageQueryContext.validateAndGetTaskLockType(QueryContext.of(querySpec.getQuery().getContext()), true); final List intervals = ((DataSourceMSQDestination) querySpec.getDestination()).getReplaceTimeChunks(); - log.debug("Task[%s] trying to acquire[%s] locks for intervals[%s] to become ready", getId(), TaskLockType.EXCLUSIVE, intervals); + log.debug( + "Task[%s] trying to acquire[%s] locks for intervals[%s] to become ready", + getId(), + taskLockType, + intervals + ); for (final Interval interval : intervals) { final TaskLock taskLock = - taskActionClient.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)); + taskActionClient.submit(new TimeChunkLockTryAcquireAction(taskLockType, interval)); if (taskLock == null) { return false; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java index de48387db200..d38fa1a8dc64 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java @@ -226,12 +226,15 @@ public QueryResponse runQuery(final DruidQuery druidQuery) fieldMapping.stream().map(f -> f.right).collect(Collectors.toList()) ); - destination = new DataSourceMSQDestination( + final DataSourceMSQDestination dataSourceMSQDestination = new DataSourceMSQDestination( targetDataSource, segmentGranularityObject, segmentSortOrder, replaceTimeChunks ); + MultiStageQueryContext.validateAndGetTaskLockType(sqlQueryContext, + dataSourceMSQDestination.isReplaceTimeChunks()); + destination = dataSourceMSQDestination; } else { final MSQSelectDestination msqSelectDestination = MultiStageQueryContext.getSelectDestination(sqlQueryContext); if (msqSelectDestination.equals(MSQSelectDestination.TASKREPORT)) { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java index 77b11a287687..f6caa6da0590 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java @@ -25,6 +25,10 @@ import com.google.common.annotations.VisibleForTesting; import com.opencsv.RFC4180Parser; import com.opencsv.RFC4180ParserBuilder; +import org.apache.druid.error.DruidException; +import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.task.Tasks; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.msq.exec.ClusterStatisticsMergeMode; import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.exec.SegmentSource; @@ -78,6 +82,11 @@ * ingested via MSQ. If set to 'none', arrays are not allowed to be ingested in MSQ. If set to 'array', array types * can be ingested as expected. If set to 'mvd', numeric arrays can not be ingested, and string arrays will be * ingested as MVDs (this is kept for legacy purpose). + * + *
  • taskLockType: Temporary flag to allow MSQ to use experimental lock types. Valid values are present in + * {@link TaskLockType}. If the flag is not set, msq uses {@link TaskLockType#EXCLUSIVE} for replace queries and + * {@link TaskLockType#SHARED} for insert queries. + * * **/ public class MultiStageQueryContext @@ -350,4 +359,53 @@ static IndexSpec decodeIndexSpec(@Nullable final Object indexSpecObject, final O throw QueryContexts.badValueException(CTX_INDEX_SPEC, "an indexSpec", indexSpecObject); } } + + /** + * This method is used to validate and get the taskLockType from the queryContext. + * If the queryContext does not contain the taskLockType, then {@link TaskLockType#EXCLUSIVE} is used for replace queries and + * {@link TaskLockType#SHARED} is used for insert queries. + * If the queryContext contains the taskLockType, then it is validated and returned. + */ + public static TaskLockType validateAndGetTaskLockType(QueryContext queryContext, boolean isReplaceQuery) + { + final TaskLockType taskLockType = QueryContexts.getAsEnum( + Tasks.TASK_LOCK_TYPE, + queryContext.getString(Tasks.TASK_LOCK_TYPE, null), + TaskLockType.class + ); + if (taskLockType == null) { + if (isReplaceQuery) { + return TaskLockType.EXCLUSIVE; + } else { + return TaskLockType.SHARED; + } + } + final String appendErrorMessage = StringUtils.format( + " Please use [%s] key in the context parameter and use one of the TaskLock types as mentioned earlier or " + + "remove this key for automatic lock type selection", Tasks.TASK_LOCK_TYPE); + + if (isReplaceQuery && !(taskLockType.equals(TaskLockType.EXCLUSIVE) || taskLockType.equals(TaskLockType.REPLACE))) { + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build( + "TaskLock must be of type [%s] or [%s] for a REPLACE query. Found invalid type [%s] set." + + appendErrorMessage, + TaskLockType.EXCLUSIVE, + TaskLockType.REPLACE, + taskLockType + ); + } + if (!isReplaceQuery && !(taskLockType.equals(TaskLockType.SHARED) || taskLockType.equals(TaskLockType.APPEND))) { + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build( + "TaskLock must be of type [%s] or [%s] for an INSERT query. Found invalid type [%s] set." + + appendErrorMessage, + TaskLockType.SHARED, + TaskLockType.APPEND, + taskLockType + ); + } + return taskLockType; + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java index 4b77dd78b339..612dee3bbd83 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java @@ -22,7 +22,9 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.error.DruidException; import org.apache.druid.error.DruidExceptionMatcher; +import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.actions.SegmentAllocateAction; +import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault; @@ -45,6 +47,7 @@ import java.io.File; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -175,10 +178,10 @@ public void testInsertTimeNullFault() .build(); final String sql = "INSERT INTO foo1\n" - + "SELECT TIME_PARSE(dim1) AS __time, dim1 as cnt\n" - + "FROM foo\n" - + "PARTITIONED BY DAY\n" - + "CLUSTERED BY dim1"; + + "SELECT TIME_PARSE(dim1) AS __time, dim1 as cnt\n" + + "FROM foo\n" + + "PARTITIONED BY DAY\n" + + "CLUSTERED BY dim1"; testIngestQuery() .setSql(sql) @@ -349,8 +352,9 @@ public void testUnionAllWithDifferentColumnNames() DruidException.Persona.ADMIN, DruidException.Category.INVALID_INPUT, "general" - ).expectMessageContains("SQL requires union between two tables and column names queried for each table are different " - + "Left: [dim2, dim1, m1], Right: [dim1, dim2, m1].")) + ).expectMessageContains( + "SQL requires union between two tables and column names queried for each table are different " + + "Left: [dim2, dim1, m1], Right: [dim1, dim2, m1].")) .verifyPlanningErrors(); } @@ -374,4 +378,47 @@ public void testTopLevelUnionAllWithJoins() "SQL requires union between inputs that are not simple table scans and involve a filter or aliasing")) .verifyPlanningErrors(); } + + @Test + public void testInsertWithReplaceAndExcludeLocks() + { + for (TaskLockType taskLockType : new TaskLockType[]{TaskLockType.EXCLUSIVE, TaskLockType.REPLACE}) { + testLockTypes( + taskLockType, + "INSERT INTO foo1 select * from foo partitioned by day", + "TaskLock must be of type [SHARED] or [APPEND] for an INSERT query" + ); + } + } + + @Test + public void testReplaceWithAppendAndSharedLocks() + { + for (TaskLockType taskLockType : new TaskLockType[]{TaskLockType.APPEND, TaskLockType.SHARED}) { + testLockTypes( + taskLockType, + "REPLACE INTO foo1 overwrite ALL select * from foo partitioned by day", + "TaskLock must be of type [EXCLUSIVE] or [REPLACE] for a REPLACE query" + ); + } + } + + private void testLockTypes(TaskLockType contextTaskLockType, String sql, String errorMessage) + { + Map context = new HashMap<>(DEFAULT_MSQ_CONTEXT); + context.put(Tasks.TASK_LOCK_TYPE, contextTaskLockType.name()); + testIngestQuery() + .setSql( + sql + ) + .setQueryContext(context) + .setExpectedValidationErrorMatcher( + new DruidExceptionMatcher( + DruidException.Persona.USER, + DruidException.Category.INVALID_INPUT, + "general" + ).expectMessageContains( + errorMessage)) + .verifyPlanningErrors(); + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index b43dd72e88c8..2314c10d7e6e 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -28,6 +28,8 @@ import org.apache.druid.error.DruidException; import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.hll.HyperLogLogCollector; +import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; @@ -59,6 +61,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.TreeSet; @@ -67,6 +70,16 @@ @RunWith(Parameterized.class) public class MSQInsertTest extends MSQTestBase { + + private static final String WITH_APPEND_LOCK = "WITH_APPEND_LOCK"; + private static final Map QUERY_CONTEXT_WITH_APPEND_LOCK = + ImmutableMap.builder() + .putAll(DEFAULT_MSQ_CONTEXT) + .put( + Tasks.TASK_LOCK_TYPE, + TaskLockType.APPEND.name().toLowerCase(Locale.ENGLISH) + ) + .build(); private final HashFunction fn = Hashing.murmur3_128(); @Parameterized.Parameters(name = "{index}:with context {0}") @@ -76,7 +89,8 @@ public static Collection data() {DEFAULT, DEFAULT_MSQ_CONTEXT}, {DURABLE_STORAGE, DURABLE_STORAGE_MSQ_CONTEXT}, {FAULT_TOLERANCE, FAULT_TOLERANCE_MSQ_CONTEXT}, - {PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT} + {PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT}, + {WITH_APPEND_LOCK, QUERY_CONTEXT_WITH_APPEND_LOCK} }; return Arrays.asList(data); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java index 0a43fdaea721..144e74b084e9 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQReplaceTest.java @@ -20,10 +20,14 @@ package org.apache.druid.msq.exec; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; +import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.msq.test.CounterSnapshotMatcher; import org.apache.druid.msq.test.MSQTestBase; import org.apache.druid.msq.test.MSQTestFileUtils; @@ -54,6 +58,17 @@ @RunWith(Parameterized.class) public class MSQReplaceTest extends MSQTestBase { + + private static final String WITH_REPLACE_LOCK = "WITH_REPLACE_LOCK"; + private static final Map QUERY_CONTEXT_WITH_REPLACE_LOCK = + ImmutableMap.builder() + .putAll(DEFAULT_MSQ_CONTEXT) + .put( + Tasks.TASK_LOCK_TYPE, + StringUtils.toLowerCase(TaskLockType.REPLACE.name()) + ) + .build(); + @Parameterized.Parameters(name = "{index}:with context {0}") public static Collection data() { @@ -61,7 +76,8 @@ public static Collection data() {DEFAULT, DEFAULT_MSQ_CONTEXT}, {DURABLE_STORAGE, DURABLE_STORAGE_MSQ_CONTEXT}, {FAULT_TOLERANCE, FAULT_TOLERANCE_MSQ_CONTEXT}, - {PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT} + {PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT}, + {WITH_REPLACE_LOCK, QUERY_CONTEXT_WITH_REPLACE_LOCK} }; return Arrays.asList(data); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index 219af6a31883..b63ee479e202 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -97,7 +97,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.Locale; import java.util.Map; @RunWith(Parameterized.class) @@ -114,7 +113,7 @@ public class MSQSelectTest extends MSQTestBase .put(MultiStageQueryContext.CTX_ROWS_PER_PAGE, 2) .put( MultiStageQueryContext.CTX_SELECT_DESTINATION, - MSQSelectDestination.DURABLESTORAGE.getName().toLowerCase(Locale.ENGLISH) + StringUtils.toLowerCase(MSQSelectDestination.DURABLESTORAGE.getName()) ) .build(); @@ -124,7 +123,7 @@ public class MSQSelectTest extends MSQTestBase .putAll(DEFAULT_MSQ_CONTEXT) .put( MultiStageQueryContext.CTX_SELECT_DESTINATION, - MSQSelectDestination.DURABLESTORAGE.getName().toLowerCase(Locale.ENGLISH) + StringUtils.toLowerCase(MSQSelectDestination.DURABLESTORAGE.getName()) ) .build(); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java index 897e57c93bc8..31b3272b74ff 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestTaskActionClient.java @@ -28,7 +28,9 @@ import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.SegmentAllocateAction; +import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; +import org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.overlord.SegmentPublishResult; @@ -121,10 +123,17 @@ public RetType submit(TaskAction taskAction) ).collect(Collectors.toSet()); } } else if (taskAction instanceof SegmentTransactionalInsertAction) { - // Always OK. final Set segments = ((SegmentTransactionalInsertAction) taskAction).getSegments(); publishedSegments.addAll(segments); return (RetType) SegmentPublishResult.ok(segments); + } else if (taskAction instanceof SegmentTransactionalReplaceAction) { + final Set segments = ((SegmentTransactionalReplaceAction) taskAction).getSegments(); + publishedSegments.addAll(segments); + return (RetType) SegmentPublishResult.ok(segments); + } else if (taskAction instanceof SegmentTransactionalAppendAction) { + final Set segments = ((SegmentTransactionalAppendAction) taskAction).getSegments(); + publishedSegments.addAll(segments); + return (RetType) SegmentPublishResult.ok(segments); } else { return null; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index fe19b35391e9..cbc469b61333 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -62,6 +62,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.query.DruidMetrics; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.segment.handoff.SegmentHandoffNotifier; import org.apache.druid.segment.incremental.ParseExceptionHandler; @@ -483,13 +484,18 @@ private TaskLockType determineLockType(LockGranularity lockGranularity) return TaskLockType.EXCLUSIVE; } - final String contextLockType = getContextValue(Tasks.TASK_LOCK_TYPE); + final TaskLockType contextTaskLockType = QueryContexts.getAsEnum( + Tasks.TASK_LOCK_TYPE, + getContextValue(Tasks.TASK_LOCK_TYPE), + TaskLockType.class + ); + final TaskLockType lockType; - if (contextLockType == null) { + if (contextTaskLockType == null) { lockType = getContextValue(Tasks.USE_SHARED_LOCK, false) ? TaskLockType.SHARED : TaskLockType.EXCLUSIVE; } else { - lockType = TaskLockType.valueOf(contextLockType); + lockType = contextTaskLockType; } final IngestionMode ingestionMode = getIngestionMode();