Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MSQ controller: Support in-memory shuffles; towards JVM reuse. #16168

Merged
merged 21 commits into from
May 1, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge branch 'master' into msq-controller-refactor
  • Loading branch information
gianm committed Apr 15, 2024
commit 279ab58407f3dc20957a00ae71c091c84d4963d2
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@

package org.apache.druid.msq.exec;

import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.msq.counters.CounterSnapshots;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.indexing.MSQControllerTask;
Original file line number Diff line number Diff line change
@@ -61,7 +61,6 @@
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.indexer.report.TaskContextReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
@@ -173,8 +172,6 @@
import org.apache.druid.msq.util.MSQFutureUtils;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.msq.util.PassthroughAggregatorFactory;
import org.apache.druid.msq.util.SqlStatementResourceHelper;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -393,7 +390,7 @@ private void runInternal(final QueryListener queryListener, final Closer closer)

queryKernel = Preconditions.checkNotNull(queryRunResult.lhs);
workerTaskRunnerFuture = Preconditions.checkNotNull(queryRunResult.rhs);
publishSegmentsIfNeeded(queryDef, queryKernel);
handleQueryResults(queryDef, queryKernel);
}
catch (Throwable e) {
exceptionEncountered = e;
@@ -1488,12 +1485,16 @@ private CounterSnapshotsTree getFinalCountersSnapshot(@Nullable final Controller
}
}

private void publishSegmentsIfNeeded(
private void handleQueryResults(
final QueryDefinition queryDef,
final ControllerQueryKernel queryKernel
) throws IOException
{
if (queryKernel.isSuccess() && MSQControllerTask.isIngestion(querySpec)) {
if (!queryKernel.isSuccess()) {
return;
}
if (MSQControllerTask.isIngestion(querySpec)) {
// Publish segments if needed.
final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber());

@SuppressWarnings("unchecked")
@@ -1531,9 +1532,28 @@ private void publishSegmentsIfNeeded(
}
log.info("Query [%s] publishing %d segments.", queryDef.getQueryId(), segments.size());
publishAllSegments(segments);
} else if (MSQControllerTask.isExport(task.getQuerySpec())) {
} else if (MSQControllerTask.isExport(querySpec)) {
// Write manifest file.
ExportMSQDestination destination = (ExportMSQDestination) querySpec.getDestination();
ExportMetadataManager exportMetadataManager = new ExportMetadataManager(destination.getExportStorageProvider());

final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber());
//noinspection unchecked


Object resultObjectForStage = queryKernel.getResultObjectForStage(finalStageId);
if (!(resultObjectForStage instanceof List)) {
// This might occur if all workers are running on an older version. We are not able to write a manifest file in this case.
log.warn("Was unable to create manifest file due to ");
return;
}
@SuppressWarnings("unchecked")
List<String> exportedFiles = (List<String>) queryKernel.getResultObjectForStage(finalStageId);
log.info("Query [%s] exported %d files.", queryDef.getQueryId(), exportedFiles.size());
exportMetadataManager.writeMetadata(exportedFiles);
} else if (MSQControllerTask.isExport(querySpec)) {
// Write manifest file.
ExportMSQDestination destination = (ExportMSQDestination) task.getQuerySpec().getDestination();
ExportMSQDestination destination = (ExportMSQDestination) querySpec.getDestination();
ExportMetadataManager exportMetadataManager = new ExportMetadataManager(destination.getExportStorageProvider());

final StageId finalStageId = queryKernel.getStageId(queryDef.getFinalStageDefinition().getStageNumber());
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@

package org.apache.druid.msq.rpc;

import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.msq.counters.CounterSnapshots;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.exec.Controller;
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@
import com.google.common.collect.ImmutableSet;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.guice.MSQIndexingModule;
Original file line number Diff line number Diff line change
@@ -19,8 +19,8 @@

package org.apache.druid.msq.indexing.client;

import org.apache.druid.indexing.common.KillTaskReport;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexer.report.KillTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.msq.exec.Controller;
import org.apache.druid.server.security.AuthorizerMapper;
import org.junit.Assert;
Original file line number Diff line number Diff line change
@@ -51,7 +51,6 @@
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Injector;
import org.apache.druid.frame.processor.Bouncer;
import org.apache.druid.indexing.common.TaskReportFileWriter;
import org.apache.druid.indexer.report.TaskReportFileWriter;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
import org.apache.druid.java.util.common.FileUtils;
Original file line number Diff line number Diff line change
@@ -21,6 +21,8 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Rule;
Original file line number Diff line number Diff line change
@@ -21,6 +21,9 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter;
import org.apache.druid.indexer.report.TaskReport;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Rule;
You are viewing a condensed version of this merge commit. You can view the full changes here.