Skip to content

Commit

Permalink
Auto-Compaction using Multi-Stage Query Engine (#16291)
Browse files Browse the repository at this point in the history
Description:
Compaction operations issued by the Coordinator currently run using the native query engine.
As majority of the advancements that we are making in batch ingestion are in MSQ, it is imperative
that we support compaction on MSQ to make Compaction more robust and possibly faster. 
For instance, we have seen OOM errors in native compaction that MSQ could have handled by its
auto-calculation of tuning parameters. 

This commit enables compaction on MSQ to remove the dependency on native engine. 

Main changes:
* `DataSourceCompactionConfig` now has an additional field `engine` that can be one of 
`[native, msq]` with `native` being the default.
*  if engine is MSQ, `CompactSegments` duty assigns all available compaction task slots to the
launched `CompactionTask` to ensure full capacity is available to MSQ. This is to avoid stalling which
could happen in case a fraction of the tasks were allotted and they eventually fell short of the number
of tasks required by the MSQ engine to run the compaction.
* `ClientCompactionTaskQuery` has a new field `compactionRunner` with just one `engine` field.
* `CompactionTask` now has `CompactionRunner` interface instance with its implementations
`NativeCompactinRunner` and `MSQCompactionRunner` in the `druid-multi-stage-query` extension.
The objectmapper deserializes `ClientCompactionRunnerInfo` in `ClientCompactionTaskQuery` to the
`CompactionRunner` instance that is mapped to the specified type [`native`, `msq`]. 
* `CompactTask` uses the `CompactionRunner` instance it receives to create the indexing tasks.
* `CompactionTask` to `MSQControllerTask` conversion logic checks whether metrics are present in 
the segment schema. If present, the task is created with a native group-by query; if not, the task is
issued with a scan query. The `storeCompactionState` flag is set in the context.
* Each created `MSQControllerTask` is launched in-place and its `TaskStatus` tracked to determine the
final status of the `CompactionTask`. The id of each of these tasks is the same as that of `CompactionTask`
since otherwise, the workers will be unable to determine the controller task's location for communication
(as they haven't been launched via the overlord).
  • Loading branch information
gargvishesh authored Jul 12, 2024
1 parent eb981d8 commit 197c54f
Show file tree
Hide file tree
Showing 37 changed files with 2,891 additions and 466 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public void setup()
null,
null,
null,
null,
null
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.DruidNode;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.http.ResultFormat;
Expand Down Expand Up @@ -1557,7 +1558,7 @@ private void handleQueryResults(
if (!destination.isReplaceTimeChunks()) {
// Store compaction state only for replace queries.
log.warn(
"storeCompactionState flag set for a non-REPLACE query [%s]. Ignoring the flag for now.",
"Ignoring storeCompactionState flag since it is set for a non-REPLACE query[%s].",
queryDef.getQueryId()
);
} else {
Expand Down Expand Up @@ -1657,9 +1658,11 @@ private static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateTo

GranularitySpec granularitySpec = new UniformGranularitySpec(
segmentGranularity,
dataSchema.getGranularitySpec().getQueryGranularity(),
QueryContext.of(querySpec.getQuery().getContext())
.getGranularity(DruidSqlInsert.SQL_INSERT_QUERY_GRANULARITY, jsonMapper),
dataSchema.getGranularitySpec().isRollup(),
dataSchema.getGranularitySpec().inputIntervals()
// Not using dataSchema.getGranularitySpec().inputIntervals() as that always has ETERNITY
((DataSourceMSQDestination) querySpec.getDestination()).getReplaceTimeChunks()
);

DimensionsSpec dimensionsSpec = dataSchema.getDimensionsSpec();
Expand All @@ -1671,9 +1674,9 @@ private static Function<Set<DataSegment>, Set<DataSegment>> addCompactionStateTo
List<Object> metricsSpec = dataSchema.getAggregators() == null
? null
: jsonMapper.convertValue(
dataSchema.getAggregators(), new TypeReference<List<Object>>()
{
});
dataSchema.getAggregators(),
new TypeReference<List<Object>>() {}
);


IndexSpec indexSpec = tuningConfig.getIndexSpec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.msq.guice;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
Expand All @@ -29,6 +30,7 @@
import org.apache.druid.msq.counters.SegmentGenerationProgressCounter;
import org.apache.druid.msq.counters.SuperSorterProgressTrackerCounter;
import org.apache.druid.msq.counters.WarningCounters;
import org.apache.druid.msq.indexing.MSQCompactionRunner;
import org.apache.druid.msq.indexing.MSQControllerTask;
import org.apache.druid.msq.indexing.MSQWorkerTask;
import org.apache.druid.msq.indexing.error.BroadcastTablesTooLargeFault;
Expand Down Expand Up @@ -192,6 +194,8 @@ public List<? extends Module> getJacksonModules()
NilInputSource.class
);

module.registerSubtypes(new NamedType(MSQCompactionRunner.class, MSQCompactionRunner.TYPE));

FAULT_CLASSES.forEach(module::registerSubtypes);
module.addSerializer(new CounterSnapshotsSerializer());
return Collections.singletonList(module);
Expand Down
Loading

0 comments on commit 197c54f

Please sign in to comment.