Skip to content

Commit

Permalink
Add undocumented taskLockType to MSQ.
Browse files Browse the repository at this point in the history
  • Loading branch information
cryptoe committed Oct 16, 2023
1 parent dc8d219 commit 3ed9cf9
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.discovery.BrokerClient;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
import org.apache.druid.frame.channel.FrameChannelSequence;
import org.apache.druid.frame.key.ClusterBy;
Expand All @@ -69,7 +70,10 @@
import org.apache.druid.indexing.common.actions.LockReleaseAction;
import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalAppendAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalReplaceAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.task.batch.parallel.TombstoneHelper;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
Expand Down Expand Up @@ -962,7 +966,11 @@ private List<SegmentIdWithShardSpec> generateSegmentIdsWithShardSpecs(
);
} else {
final RowKeyReader keyReader = clusterBy.keyReader(signature);
return generateSegmentIdsWithShardSpecsForAppend(destination, partitionBoundaries, keyReader);
return generateSegmentIdsWithShardSpecsForAppend(
destination,
partitionBoundaries,
keyReader,
MultiStageQueryContext.validateAndGetTaskLockType(QueryContext.of(task.getContext()), false));
}
}

Expand All @@ -972,7 +980,8 @@ private List<SegmentIdWithShardSpec> generateSegmentIdsWithShardSpecs(
private List<SegmentIdWithShardSpec> generateSegmentIdsWithShardSpecsForAppend(
final DataSourceMSQDestination destination,
final ClusterByPartitions partitionBoundaries,
final RowKeyReader keyReader
final RowKeyReader keyReader,
final TaskLockType taskLockType
) throws IOException
{
final Granularity segmentGranularity = destination.getSegmentGranularity();
Expand All @@ -998,7 +1007,7 @@ private List<SegmentIdWithShardSpec> generateSegmentIdsWithShardSpecsForAppend(
false,
NumberedPartialShardSpec.instance(),
LockGranularity.TIME_CHUNK,
TaskLockType.SHARED
taskLockType
)
);
}
Expand Down Expand Up @@ -1399,6 +1408,10 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
(DataSourceMSQDestination) task.getQuerySpec().getDestination();
final Set<DataSegment> segmentsWithTombstones = new HashSet<>(segments);
int numTombstones = 0;
final TaskLockType taskLockType = MultiStageQueryContext.validateAndGetTaskLockType(
QueryContext.of(task.getContext()),
destination.isReplaceTimeChunks()
);

if (destination.isReplaceTimeChunks()) {
final List<Interval> intervalsToDrop = findIntervalsToDrop(Preconditions.checkNotNull(segments, "segments"));
Expand Down Expand Up @@ -1441,7 +1454,7 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
}
performSegmentPublish(
context.taskActionClient(),
SegmentTransactionalInsertAction.overwriteAction(null, segmentsWithTombstones)
createOverwriteAction(taskLockType, segmentsWithTombstones)
);
}
} else if (!segments.isEmpty()) {
Expand All @@ -1458,7 +1471,7 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
// Append mode.
performSegmentPublish(
context.taskActionClient(),
SegmentTransactionalInsertAction.appendAction(segments, null, null)
createAppendAction(segments, taskLockType)
);
}

Expand All @@ -1467,6 +1480,34 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
task.emitMetric(context.emitter(), "ingest/segments/count", segmentsWithTombstones.size());
}

private static TaskAction<SegmentPublishResult> createAppendAction(
Set<DataSegment> segments,
TaskLockType taskLockType
)
{
if (taskLockType.equals(TaskLockType.APPEND)) {
return SegmentTransactionalAppendAction.forSegments(segments);
} else if (taskLockType.equals(TaskLockType.SHARED)) {
return SegmentTransactionalInsertAction.appendAction(segments, null, null);
} else {
throw DruidException.defensive("Invalid lock type %s received for append action", taskLockType);
}
}

private TaskAction<SegmentPublishResult> createOverwriteAction(
TaskLockType taskLockType,
Set<DataSegment> segmentsWithTombstones
)
{
if (taskLockType.equals(TaskLockType.REPLACE)) {
return SegmentTransactionalReplaceAction.create(segmentsWithTombstones);
} else if (taskLockType.equals(TaskLockType.EXCLUSIVE)) {
return SegmentTransactionalInsertAction.overwriteAction(null, segmentsWithTombstones);
} else {
throw DruidException.defensive("Invalid lock type %s received for overwrite action", taskLockType);
}
}

/**
* When doing an ingestion with {@link DataSourceMSQDestination#isReplaceTimeChunks()}, finds intervals
* containing data that should be dropped.
Expand Down Expand Up @@ -2282,7 +2323,7 @@ private static Map<Integer, Interval> copyOfStageRuntimesEndingAtCurrentTime(
*/
static void performSegmentPublish(
final TaskActionClient client,
final SegmentTransactionalInsertAction action
final TaskAction<SegmentPublishResult> action
) throws IOException
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
import org.apache.druid.msq.indexing.destination.MSQDestination;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClient;
Expand Down Expand Up @@ -204,12 +206,19 @@ public boolean isReady(TaskActionClient taskActionClient) throws Exception
{
// If we're in replace mode, acquire locks for all intervals before declaring the task ready.
if (isIngestion(querySpec) && ((DataSourceMSQDestination) querySpec.getDestination()).isReplaceTimeChunks()) {
final TaskLockType taskLockType =
MultiStageQueryContext.validateAndGetTaskLockType(QueryContext.of(sqlQueryContext), true);
final List<Interval> intervals =
((DataSourceMSQDestination) querySpec.getDestination()).getReplaceTimeChunks();
log.debug("Task[%s] trying to acquire[%s] locks for intervals[%s] to become ready", getId(), TaskLockType.EXCLUSIVE, intervals);
log.debug(
"Task[%s] trying to acquire[%s] locks for intervals[%s] to become ready",
getId(),
taskLockType,
intervals
);
for (final Interval interval : intervals) {
final TaskLock taskLock =
taskActionClient.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval));
taskActionClient.submit(new TimeChunkLockTryAcquireAction(taskLockType, interval));

if (taskLock == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,15 @@ public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
fieldMapping.stream().map(f -> f.right).collect(Collectors.toList())
);

destination = new DataSourceMSQDestination(
final DataSourceMSQDestination dataSourceMSQDestination = new DataSourceMSQDestination(
targetDataSource,
segmentGranularityObject,
segmentSortOrder,
replaceTimeChunks
);
MultiStageQueryContext.validateAndGetTaskLockType(sqlQueryContext,
dataSourceMSQDestination.isReplaceTimeChunks());
destination = dataSourceMSQDestination;
} else {
final MSQSelectDestination msqSelectDestination = MultiStageQueryContext.getSelectDestination(sqlQueryContext);
if (msqSelectDestination.equals(MSQSelectDestination.TASKREPORT)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
import com.google.common.annotations.VisibleForTesting;
import com.opencsv.RFC4180Parser;
import com.opencsv.RFC4180ParserBuilder;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.exec.ClusterStatisticsMergeMode;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.exec.SegmentSource;
Expand Down Expand Up @@ -78,6 +82,11 @@
* ingested via MSQ. If set to 'none', arrays are not allowed to be ingested in MSQ. If set to 'array', array types
* can be ingested as expected. If set to 'mvd', numeric arrays can not be ingested, and string arrays will be
* ingested as MVDs (this is kept for legacy purpose).
*
* <li><b>taskLockType</b>: Temporary flag to allow MSQ to use experimental lock types. Valid values are present in
* {@link TaskLockType}. If the flag is not set, msq uses {@link TaskLockType#EXCLUSIVE} for replace queries and
* {@link TaskLockType#SHARED} for insert queries.
*
* </ol>
**/
public class MultiStageQueryContext
Expand Down Expand Up @@ -350,4 +359,53 @@ static IndexSpec decodeIndexSpec(@Nullable final Object indexSpecObject, final O
throw QueryContexts.badValueException(CTX_INDEX_SPEC, "an indexSpec", indexSpecObject);
}
}

/**
* This method is used to validate and get the taskLockType from the queryContext.
* If the queryContext does not contain the taskLockType, then {@link TaskLockType#EXCLUSIVE} is used for replace queries and
* {@link TaskLockType#SHARED} is used for insert queries.
* If the queryContext contains the taskLockType, then it is validated and returned.
*/
public static TaskLockType validateAndGetTaskLockType(QueryContext queryContext, boolean isReplaceQuery)
{
final TaskLockType taskLockType = QueryContexts.getAsEnum(
Tasks.TASK_LOCK_TYPE,
queryContext.getString(Tasks.TASK_LOCK_TYPE, null),
TaskLockType.class
);
if (taskLockType == null) {
if (isReplaceQuery) {
return TaskLockType.EXCLUSIVE;
} else {
return TaskLockType.APPEND;
}
}
final String appendErrorMessage = StringUtils.format(
"Please use context parameter with key %s to set the taskLockType or "
+ "remove this key for automatic lock type selection", Tasks.TASK_LOCK_TYPE);

if (isReplaceQuery && !(taskLockType.equals(TaskLockType.EXCLUSIVE) || taskLockType.equals(TaskLockType.REPLACE))) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build(
"TaskLock must be of type %s or %s for a replace query. Found type %s set."
+ appendErrorMessage,
TaskLockType.EXCLUSIVE,
TaskLockType.REPLACE,
taskLockType
);
}
if (!isReplaceQuery && !(taskLockType.equals(TaskLockType.SHARED) || taskLockType.equals(TaskLockType.APPEND))) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build(
"TaskLock must be of type %s or %s for an insert query. Found type %s set."
+ appendErrorMessage,
TaskLockType.SHARED,
TaskLockType.APPEND,
taskLockType
);
}
return taskLockType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import com.google.common.collect.ImmutableMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.indexing.error.InsertCannotAllocateSegmentFault;
Expand All @@ -45,6 +47,7 @@
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -175,10 +178,10 @@ public void testInsertTimeNullFault()
.build();

final String sql = "INSERT INTO foo1\n"
+ "SELECT TIME_PARSE(dim1) AS __time, dim1 as cnt\n"
+ "FROM foo\n"
+ "PARTITIONED BY DAY\n"
+ "CLUSTERED BY dim1";
+ "SELECT TIME_PARSE(dim1) AS __time, dim1 as cnt\n"
+ "FROM foo\n"
+ "PARTITIONED BY DAY\n"
+ "CLUSTERED BY dim1";

testIngestQuery()
.setSql(sql)
Expand Down Expand Up @@ -349,8 +352,9 @@ public void testUnionAllWithDifferentColumnNames()
DruidException.Persona.ADMIN,
DruidException.Category.INVALID_INPUT,
"general"
).expectMessageContains("SQL requires union between two tables and column names queried for each table are different "
+ "Left: [dim2, dim1, m1], Right: [dim1, dim2, m1]."))
).expectMessageContains(
"SQL requires union between two tables and column names queried for each table are different "
+ "Left: [dim2, dim1, m1], Right: [dim1, dim2, m1]."))
.verifyPlanningErrors();
}

Expand All @@ -374,4 +378,47 @@ public void testTopLevelUnionAllWithJoins()
"SQL requires union between inputs that are not simple table scans and involve a filter or aliasing"))
.verifyPlanningErrors();
}

@Test
public void testInsertWithReplaceAndExcludeLocks()
{
for (TaskLockType taskLockType : new TaskLockType[]{TaskLockType.EXCLUSIVE, TaskLockType.REPLACE}) {
testLockTypes(
taskLockType,
"insert into foo1 select * from foo partitioned by day",
"TaskLock must be of type SHARED or APPEND for an insert query"
);
}
}

@Test
public void testReplaceWithAppendAndSharedLocks()
{
for (TaskLockType taskLockType : new TaskLockType[]{TaskLockType.APPEND, TaskLockType.SHARED}) {
testLockTypes(
taskLockType,
"replace into foo1 overwrite ALL select * from foo partitioned by day",
"TaskLock must be of type EXCLUSIVE or REPLACE for a replace query"
);
}
}

private void testLockTypes(TaskLockType contextTaskLockType, String sql, String errorMessage)
{
Map<String, Object> context = new HashMap<>(DEFAULT_MSQ_CONTEXT);
context.put(Tasks.TASK_LOCK_TYPE, contextTaskLockType.name());
testIngestQuery()
.setSql(
sql
)
.setQueryContext(context)
.setExpectedValidationErrorMatcher(
new DruidExceptionMatcher(
DruidException.Persona.USER,
DruidException.Category.INVALID_INPUT,
"general"
).expectMessageContains(
errorMessage))
.verifyPlanningErrors();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.hll.HyperLogLogCollector;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
Expand Down Expand Up @@ -59,6 +61,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
Expand All @@ -67,6 +70,16 @@
@RunWith(Parameterized.class)
public class MSQInsertTest extends MSQTestBase
{

private static final String WITH_APPEND_LOCK = "WITH_APPEND_LOCK";
private static final Map<String, Object> QUERY_CONTEXT_WITH_APPEND_LOCK =
ImmutableMap.<String, Object>builder()
.putAll(DEFAULT_MSQ_CONTEXT)
.put(
Tasks.TASK_LOCK_TYPE,
TaskLockType.APPEND.name().toLowerCase(Locale.ENGLISH)
)
.build();
private final HashFunction fn = Hashing.murmur3_128();

@Parameterized.Parameters(name = "{index}:with context {0}")
Expand All @@ -76,7 +89,8 @@ public static Collection<Object[]> data()
{DEFAULT, DEFAULT_MSQ_CONTEXT},
{DURABLE_STORAGE, DURABLE_STORAGE_MSQ_CONTEXT},
{FAULT_TOLERANCE, FAULT_TOLERANCE_MSQ_CONTEXT},
{PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT}
{PARALLEL_MERGE, PARALLEL_MERGE_MSQ_CONTEXT},
{WITH_APPEND_LOCK, QUERY_CONTEXT_WITH_APPEND_LOCK}
};
return Arrays.asList(data);
}
Expand Down
Loading

0 comments on commit 3ed9cf9

Please sign in to comment.