Skip to content

Commit

Permalink
Add undocumented taskLockType to MSQ. (apache#15168)
Browse files Browse the repository at this point in the history
Patch adds an undocumented parameter taskLockType to MSQ so that we can start enabling this feature for users who are interested in testing the new lock types.
  • Loading branch information
cryptoe authored and LakshSingla committed Oct 17, 2023
1 parent da24e1a commit 356d602
Show file tree
Hide file tree
Showing 10 changed files with 226 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.discovery.BrokerClient;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
import org.apache.druid.frame.channel.FrameChannelSequence;
import org.apache.druid.frame.key.ClusterBy;
Expand All @@ -69,7 +70,10 @@
import org.apache.druid.indexing.common.actions.LockReleaseAction;
import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.batch.parallel.TombstoneHelper;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
Expand Down Expand Up @@ -962,7 +966,11 @@ private List<SegmentIdWithShardSpec> generateSegmentIdsWithShardSpecs(
);
} else {
final RowKeyReader keyReader = clusterBy.keyReader(signature);
return generateSegmentIdsWithShardSpecsForAppend(destination, partitionBoundaries, keyReader);
return generateSegmentIdsWithShardSpecsForAppend(
destination,
partitionBoundaries,
keyReader,
MultiStageQueryContext.validateAndGetTaskLockType(QueryContext.of(task.getQuerySpec().getQuery().getContext()), false));
}
}

Expand All @@ -972,7 +980,8 @@ private List<SegmentIdWithShardSpec> generateSegmentIdsWithShardSpecs(
private List<SegmentIdWithShardSpec> generateSegmentIdsWithShardSpecsForAppend(
final DataSourceMSQDestination destination,
final ClusterByPartitions partitionBoundaries,
final RowKeyReader keyReader
final RowKeyReader keyReader,
final TaskLockType taskLockType
) throws IOException
{
final Granularity segmentGranularity = destination.getSegmentGranularity();
Expand All @@ -998,7 +1007,7 @@ private List<SegmentIdWithShardSpec> generateSegmentIdsWithShardSpecsForAppend(
false,
NumberedPartialShardSpec.instance(),
LockGranularity.TIME_CHUNK,
TaskLockType.SHARED
taskLockType
)
);
}
Expand Down Expand Up @@ -1399,6 +1408,10 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
(DataSourceMSQDestination) task.getQuerySpec().getDestination();
final Set<DataSegment> segmentsWithTombstones = new HashSet<>(segments);
int numTombstones = 0;
final TaskLockType taskLockType = MultiStageQueryContext.validateAndGetTaskLockType(
QueryContext.of(task.getQuerySpec().getQuery().getContext()),
destination.isReplaceTimeChunks()
);

if (destination.isReplaceTimeChunks()) {
final List<Interval> intervalsToDrop = findIntervalsToDrop(Preconditions.checkNotNull(segments, "segments"));
Expand Down Expand Up @@ -1441,7 +1454,7 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
}
performSegmentPublish(
context.taskActionClient(),
SegmentTransactionalInsertAction.overwriteAction(null, segmentsWithTombstones)
createOverwriteAction(taskLockType, segmentsWithTombstones)
);
}
} else if (!segments.isEmpty()) {
Expand All @@ -1458,7 +1471,7 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
// Append mode.
performSegmentPublish(
context.taskActionClient(),
SegmentTransactionalInsertAction.appendAction(segments, null, null)
createAppendAction(segments, taskLockType)
);
}

Expand All @@ -1467,6 +1480,34 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
task.emitMetric(context.emitter(), "ingest/segments/count", segmentsWithTombstones.size());
}

private static TaskAction<SegmentPublishResult> createAppendAction(
Set<DataSegment> segments,
TaskLockType taskLockType
)
{
if (taskLockType.equals(TaskLockType.APPEND)) {
return SegmentTransactionalAppendAction.forSegments(segments);
} else if (taskLockType.equals(TaskLockType.SHARED)) {
return SegmentTransactionalInsertAction.appendAction(segments, null, null);
} else {
throw DruidException.defensive("Invalid lock type [%s] received for append action", taskLockType);
}
}

private TaskAction<SegmentPublishResult> createOverwriteAction(
TaskLockType taskLockType,
Set<DataSegment> segmentsWithTombstones
)
{
if (taskLockType.equals(TaskLockType.REPLACE)) {
return SegmentTransactionalReplaceAction.create(segmentsWithTombstones);
} else if (taskLockType.equals(TaskLockType.EXCLUSIVE)) {
return SegmentTransactionalInsertAction.overwriteAction(null, segmentsWithTombstones);
} else {
throw DruidException.defensive("Invalid lock type [%s] received for overwrite action", taskLockType);
}
}

/**
* When doing an ingestion with {@link DataSourceMSQDestination#isReplaceTimeChunks()}, finds intervals
* containing data that should be dropped.
Expand Down Expand Up @@ -2282,7 +2323,7 @@ private static Map<Integer, Interval> copyOfStageRuntimesEndingAtCurrentTime(
*/
static void performSegmentPublish(
final TaskActionClient client,
final SegmentTransactionalInsertAction action
final TaskAction<SegmentPublishResult> action
) throws IOException
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
import org.apache.druid.msq.indexing.destination.MSQDestination;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClient;
Expand Down Expand Up @@ -204,12 +206,19 @@ public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
// If we're in replace mode, acquire locks for all intervals before declaring the task ready.
if (isIngestion(querySpec) && ((DataSourceMSQDestination) querySpec.getDestination()).isReplaceTimeChunks()) {
final TaskLockType taskLockType =
MultiStageQueryContext.validateAndGetTaskLockType(QueryContext.of(querySpec.getQuery().getContext()), true);
final List<Interval> intervals =
((DataSourceMSQDestination) querySpec.getDestination()).getReplaceTimeChunks();
log.debug("Task[%s] trying to acquire[%s] locks for intervals[%s] to become ready", getId(), TaskLockType.EXCLUSIVE, intervals);
log.debug(
"Task[%s] trying to acquire[%s] locks for intervals[%s] to become ready",
getId(),
taskLockType,
intervals
);
for (final Interval interval : intervals) {
final TaskLock taskLock =
taskActionClient.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval));
taskActionClient.submit(new TimeChunkLockTryAcquireAction(taskLockType, interval));

if (taskLock == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,15 @@ public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
fieldMapping.stream().map(f -> f.right).collect(Collectors.toList())
);

destination = new DataSourceMSQDestination(
final DataSourceMSQDestination dataSourceMSQDestination = new DataSourceMSQDestination(
targetDataSource,
segmentGranularityObject,
segmentSortOrder,
replaceTimeChunks
);
MultiStageQueryContext.validateAndGetTaskLockType(sqlQueryContext,
dataSourceMSQDestination.isReplaceTimeChunks());
destination = dataSourceMSQDestination;
} else {
final MSQSelectDestination msqSelectDestination = MultiStageQueryContext.getSelectDestination(sqlQueryContext);
if (msqSelectDestination.equals(MSQSelectDestination.TASKREPORT)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
import com.google.common.annotations.VisibleForTesting;
import com.opencsv.RFC4180Parser;
import com.opencsv.RFC4180ParserBuilder;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.exec.ClusterStatisticsMergeMode;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.exec.SegmentSource;
Expand Down Expand Up @@ -78,6 +82,11 @@
* ingested via MSQ. If set to 'none', arrays are not allowed to be ingested in MSQ. If set to 'array', array types
* can be ingested as expected. If set to 'mvd', numeric arrays can not be ingested, and string arrays will be
* ingested as MVDs (this is kept for legacy purpose).
*
* <li><b>taskLockType</b>: Temporary flag to allow MSQ to use experimental lock types. Valid values are present in
* {@link TaskLockType}. If the flag is not set, msq uses {@link TaskLockType#EXCLUSIVE} for replace queries and
* {@link TaskLockType#SHARED} for insert queries.
*
* </ol>
**/
public class MultiStageQueryContext
Expand Down Expand Up @@ -350,4 +359,53 @@ static IndexSpec decodeIndexSpec(@Nullable final Object indexSpecObject, final O
throw QueryContexts.badValueException(CTX_INDEX_SPEC, "an indexSpec", indexSpecObject);
}
}

/**
* This method is used to validate and get the taskLockType from the queryContext.
* If the queryContext does not contain the taskLockType, then {@link TaskLockType#EXCLUSIVE} is used for replace queries and
* {@link TaskLockType#SHARED} is used for insert queries.
* If the queryContext contains the taskLockType, then it is validated and returned.
*/
public static TaskLockType validateAndGetTaskLockType(QueryContext queryContext, boolean isReplaceQuery)
{
final TaskLockType taskLockType = QueryContexts.getAsEnum(
Tasks.TASK_LOCK_TYPE,
queryContext.getString(Tasks.TASK_LOCK_TYPE, null),
TaskLockType.class
);
if (taskLockType == null) {
if (isReplaceQuery) {
return TaskLockType.EXCLUSIVE;
} else {
return TaskLockType.SHARED;
}
}
final String appendErrorMessage = StringUtils.format(
" Please use [%s] key in the context parameter and use one of the TaskLock types as mentioned earlier or "
+ "remove this key for automatic lock type selection", Tasks.TASK_LOCK_TYPE);

if (isReplaceQuery && !(taskLockType.equals(TaskLockType.EXCLUSIVE) || taskLockType.equals(TaskLockType.REPLACE))) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build(
"TaskLock must be of type [%s] or [%s] for a REPLACE query. Found invalid type [%s] set."
+ appendErrorMessage,
TaskLockType.EXCLUSIVE,
TaskLockType.REPLACE,
taskLockType
);
}
if (!isReplaceQuery && !(taskLockType.equals(TaskLockType.SHARED) || taskLockType.equals(TaskLockType.APPEND))) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build(
"TaskLock must be of type [%s] or [%s] for an INSERT query. Found invalid type [%s] set."
+ appendErrorMessage,
TaskLockType.SHARED,
TaskLockType.APPEND,
taskLockType
);
}
return taskLockType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import com.google.common.collect.ImmutableMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
Expand All @@ -45,6 +47,7 @@
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -175,10 +178,10 @@ public void testInsertTimeNullFault()
.build();

final String sql = "INSERT INTO foo1\n"
+ "SELECT TIME_PARSE(dim1) AS __time, dim1 as cnt\n"
+ "FROM foo\n"
+ "PARTITIONED BY DAY\n"
+ "CLUSTERED BY dim1";
+ "SELECT TIME_PARSE(dim1) AS __time, dim1 as cnt\n"
+ "FROM foo\n"
+ "PARTITIONED BY DAY\n"
+ "CLUSTERED BY dim1";

testIngestQuery()
.setSql(sql)
Expand Down Expand Up @@ -349,8 +352,9 @@ public void testUnionAllWithDifferentColumnNames()
DruidException.Persona.ADMIN,
DruidException.Category.INVALID_INPUT,
"general"
).expectMessageContains("SQL requires union between two tables and column names queried for each table are different "
+ "Left: [dim2, dim1, m1], Right: [dim1, dim2, m1]."))
).expectMessageContains(
"SQL requires union between two tables and column names queried for each table are different "
+ "Left: [dim2, dim1, m1], Right: [dim1, dim2, m1]."))
.verifyPlanningErrors();
}

Expand All @@ -374,4 +378,47 @@ public void testTopLevelUnionAllWithJoins()
"SQL requires union between inputs that are not simple table scans and involve a filter or aliasing"))
.verifyPlanningErrors();
}

@Test
public void testInsertWithReplaceAndExcludeLocks()
{
for (TaskLockType taskLockType : new TaskLockType[]{TaskLockType.EXCLUSIVE, TaskLockType.REPLACE}) {
testLockTypes(
taskLockType,
"INSERT INTO foo1 select * from foo partitioned by day",
"TaskLock must be of type [SHARED] or [APPEND] for an INSERT query"
);
}
}

@Test
public void testReplaceWithAppendAndSharedLocks()
{
for (TaskLockType taskLockType : new TaskLockType[]{TaskLockType.APPEND, TaskLockType.SHARED}) {
testLockTypes(
taskLockType,
"REPLACE INTO foo1 overwrite ALL select * from foo partitioned by day",
"TaskLock must be of type [EXCLUSIVE] or [REPLACE] for a REPLACE query"
);
}
}

private void testLockTypes(TaskLockType contextTaskLockType, String sql, String errorMessage)
{
Map<String, Object> context = new HashMap<>(DEFAULT_MSQ_CONTEXT);
context.put(Tasks.TASK_LOCK_TYPE, contextTaskLockType.name());
testIngestQuery()
.setSql(
sql
)
.setQueryContext(context)
.setExpectedValidationErrorMatcher(
new DruidExceptionMatcher(
DruidException.Persona.USER,
DruidException.Category.INVALID_INPUT,
"general"
).expectMessageContains(
errorMessage))
.verifyPlanningErrors();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.hll.HyperLogLogCollector;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
Expand Down Expand Up @@ -59,6 +61,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
Expand All @@ -67,6 +70,16 @@
@RunWith(Parameterized.class)
public class MSQInsertTest extends MSQTestBase
{

private static final String WITH_APPEND_LOCK = "WITH_APPEND_LOCK";
private static final Map<String, Object> QUERY_CONTEXT_WITH_APPEND_LOCK =
ImmutableMap.<String, Object>builder()
.putAll(DEFAULT_MSQ_CONTEXT)
.put(
Tasks.TASK_LOCK_TYPE,
TaskLockType.APPEND.name().toLowerCase(Locale.ENGLISH)
)
.build();
private final HashFunction fn = Hashing.murmur3_128();

@Parameterized.Parameters(name = "{index}:with context {0}")
Expand All @@ -76,7 +89,8 @@ public static Collection<Object[]> data()
{DEFAULT, DEFAULT_MSQ_CONTEXT},
{DURABLE_STORAGE, DURABLE_STORAGE_MSQ_CONTEXT},
{FAULT_TOLERANCE, FAULT_TOLERANCE_MSQ_CONTEXT},
{PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT}
{PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT},
{WITH_APPEND_LOCK, QUERY_CONTEXT_WITH_APPEND_LOCK}
};
return Arrays.asList(data);
}
Expand Down
Loading

0 comments on commit 356d602

Please sign in to comment.