Skip to content

Commit

Permalink
Merge branch 'master' into extern-filename
Browse files Browse the repository at this point in the history
  • Loading branch information
gianm committed Oct 31, 2023
2 parents 5cbb28a + 3173093 commit a84e41d
Show file tree
Hide file tree
Showing 58 changed files with 2,900 additions and 359 deletions.
5 changes: 5 additions & 0 deletions docs/api-reference/sql-ingestion-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,11 @@ The following table describes the response fields when you retrieve a report for
| `multiStageQuery.payload.status.status` | RUNNING, SUCCESS, or FAILED. |
| `multiStageQuery.payload.status.startTime` | Start time of the query in ISO format. Only present if the query has started running. |
| `multiStageQuery.payload.status.durationMs` | Milliseconds elapsed after the query has started running. -1 denotes that the query hasn't started running yet. |
| `multiStageQuery.payload.status.workers` | Workers for the controller task.|
| `multiStageQuery.payload.status.workers.<workerNumber>` | Array of worker tasks including retries. |
| `multiStageQuery.payload.status.workers.<workerNumber>[].workerId` | Id of the worker task.| |
| `multiStageQuery.payload.status.workers.<workerNumber>[].status` | RUNNING, SUCCESS, or FAILED.|
| `multiStageQuery.payload.status.workers.<workerNumber>[].durationMs` | Milliseconds elapsed after the worker task started running. It is -1 for worker tasks with status RUNNING.|
| `multiStageQuery.payload.status.pendingTasks` | Number of tasks that are not fully started. -1 denotes that the number is currently unknown. |
| `multiStageQuery.payload.status.runningTasks` | Number of currently running tasks. Should be at least 1 since the controller is included. |
| `multiStageQuery.payload.status.segmentLoadStatus` | Segment loading container. Only present after the segments have been published. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2262,11 +2262,13 @@ private static MSQStatusReport makeStatusReport(
{
int pendingTasks = -1;
int runningTasks = 1;
Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>> workerStatsMap = new HashMap<>();

if (taskLauncher != null) {
WorkerCount workerTaskCount = taskLauncher.getWorkerTaskCount();
pendingTasks = workerTaskCount.getPendingWorkerCount();
runningTasks = workerTaskCount.getRunningWorkerCount() + 1; // To account for controller.
workerStatsMap = taskLauncher.getWorkerStats();
}

SegmentLoadStatusFetcher.SegmentLoadWaiterStatus status = segmentLoadWaiter == null ? null : segmentLoadWaiter.status();
Expand All @@ -2277,6 +2279,7 @@ private static MSQStatusReport makeStatusReport(
errorReports,
queryStartTime,
queryDuration,
workerStatsMap,
pendingTasks,
runningTasks,
status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.msq.indexing;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
Expand Down Expand Up @@ -47,15 +48,18 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -108,10 +112,11 @@ private enum State
@GuardedBy("taskIds")
private final IntSet fullyStartedTasks = new IntOpenHashSet();

// Mutable state accessible only to the main loop. LinkedHashMap since order of key set matters. Tasks are added
// here once they are submitted for running, but before they are fully started up.
// Mutable state accessed by mainLoop, ControllerImpl, and jetty (/liveReports) threads.
// Tasks are added here once they are submitted for running, but before they are fully started up.
// taskId -> taskTracker
private final Map<String, TaskTracker> taskTrackers = new LinkedHashMap<>();
private final ConcurrentMap<String, TaskTracker> taskTrackers = new ConcurrentSkipListMap<>(Comparator.comparingInt(
MSQTasks::workerFromTaskId));

// Set of tasks which are issued a cancel request by the controller.
private final Set<String> canceledWorkerTasks = ConcurrentHashMap.newKeySet();
Expand Down Expand Up @@ -348,6 +353,70 @@ public boolean isTaskLatest(String taskId)
}
}

public static class WorkerStats
{
String workerId;
TaskState state;
long duration;

/**
* For JSON deserialization only
*/
public WorkerStats()
{
}

public WorkerStats(String workerId, TaskState state, long duration)
{
this.workerId = workerId;
this.state = state;
this.duration = duration;
}

@JsonProperty
public String getWorkerId()
{
return workerId;
}

@JsonProperty
public TaskState getState()
{
return state;
}

@JsonProperty("durationMs")
public long getDuration()
{
return duration;
}
}

public Map<Integer, List<WorkerStats>> getWorkerStats()
{
final Map<Integer, List<WorkerStats>> workerStats = new TreeMap<>();

for (Map.Entry<String, TaskTracker> taskEntry : taskTrackers.entrySet()) {

TaskTracker taskTracker = taskEntry.getValue();

workerStats.computeIfAbsent(taskTracker.workerNumber, k -> new ArrayList<>())
.add(new WorkerStats(taskEntry.getKey(),
taskTracker.status.getStatusCode(),
// getDuration() returns -1 for running tasks.
// It's not calculated on-the-fly here since
// taskTracker.startTimeMillis marks task
// submission time rather than the actual start.
taskTracker.status.getDuration()
));
}

for (List<WorkerStats> workerStatsList : workerStats.values()) {
workerStatsList.sort(Comparator.comparing(WorkerStats::getWorkerId));
}
return workerStats;
}

private void mainLoop()
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
import com.google.common.base.Preconditions;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.msq.exec.SegmentLoadStatusFetcher;
import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class MSQStatusReport
Expand All @@ -47,6 +50,8 @@ public class MSQStatusReport

private final long durationMs;

private final Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>> workerStats;

private final int pendingTasks;

private final int runningTasks;
Expand All @@ -61,6 +66,7 @@ public MSQStatusReport(
@JsonProperty("warnings") Collection<MSQErrorReport> warningReports,
@JsonProperty("startTime") @Nullable DateTime startTime,
@JsonProperty("durationMs") long durationMs,
@JsonProperty("workers") Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>> workerStats,
@JsonProperty("pendingTasks") int pendingTasks,
@JsonProperty("runningTasks") int runningTasks,
@JsonProperty("segmentLoadWaiterStatus") @Nullable SegmentLoadStatusFetcher.SegmentLoadWaiterStatus segmentLoadWaiterStatus
Expand All @@ -71,6 +77,7 @@ public MSQStatusReport(
this.warningReports = warningReports != null ? warningReports : Collections.emptyList();
this.startTime = startTime;
this.durationMs = durationMs;
this.workerStats = workerStats;
this.pendingTasks = pendingTasks;
this.runningTasks = runningTasks;
this.segmentLoadWaiterStatus = segmentLoadWaiterStatus;
Expand Down Expand Up @@ -123,6 +130,12 @@ public long getDurationMs()
return durationMs;
}

@JsonProperty("workers")
public Map<Integer, List<MSQWorkerTaskLauncher.WorkerStats>> getWorkerStats()
{
return workerStats;
}

@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -107,7 +108,7 @@ public void testSerdeResultsReport() throws Exception
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, 1, 2, status),
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
Expand Down Expand Up @@ -172,7 +173,7 @@ public void testSerdeErrorReport() throws Exception
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
new MSQStatusReport(TaskState.FAILED, errorReport, new ArrayDeque<>(), null, 0, 1, 2, status),
new MSQStatusReport(TaskState.FAILED, errorReport, new ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
Expand Down Expand Up @@ -220,7 +221,7 @@ public void testWriteTaskReport() throws Exception
final MSQTaskReport report = new MSQTaskReport(
TASK_ID,
new MSQTaskReportPayload(
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, 1, 2, status),
new MSQStatusReport(TaskState.SUCCESS, null, new ArrayDeque<>(), null, 0, new HashMap<>(), 1, 2, status),
MSQStagesReport.create(
QUERY_DEFINITION,
ImmutableMap.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ public class SqlStatementResourceTest extends MSQTestBase
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
Expand Down Expand Up @@ -310,6 +311,7 @@ public class SqlStatementResourceTest extends MSQTestBase
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public void testDistinctPartitionsOnEachWorker()
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
Expand Down Expand Up @@ -105,6 +106,7 @@ public void testOnePartitionOnEachWorker()
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
Expand Down Expand Up @@ -144,6 +146,7 @@ public void testCommonPartitionsOnEachWorker()
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
Expand Down Expand Up @@ -181,6 +184,7 @@ public void testNullChannelCounters()
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
Expand Down Expand Up @@ -220,6 +224,7 @@ public void testConsecutivePartitionsOnEachWorker()
new ArrayDeque<>(),
null,
0,
new HashMap<>(),
1,
2,
null
Expand Down
5 changes: 5 additions & 0 deletions processing/src/main/java/org/apache/druid/query/Druids.java
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,11 @@ public ScanQueryBuilder dataSource(String ds)
dataSource = new TableDataSource(ds);
return this;
}
public ScanQueryBuilder dataSource(Query<?> q)
{
dataSource = new QueryDataSource(q);
return this;
}

public ScanQueryBuilder dataSource(DataSource ds)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class NaivePartitioningOperatorFactory implements OperatorFactory
{
Expand Down Expand Up @@ -65,4 +66,23 @@ public String toString()
"partitionColumns=" + partitionColumns +
'}';
}

@Override
public final int hashCode()
{
return Objects.hash(partitionColumns);
}

@Override
public final boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (obj == null || obj.getClass() != getClass()) {
return false;
}
NaivePartitioningOperatorFactory other = (NaivePartitioningOperatorFactory) obj;
return Objects.equals(partitionColumns, other.partitionColumns);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;

/**
* A naive sort operator is an operation that sorts a stream of data in-place. Generally speaking this means
Expand All @@ -33,11 +34,11 @@
public class NaiveSortOperator implements Operator
{
private final Operator child;
private final ArrayList<ColumnWithDirection> sortColumns;
private final List<ColumnWithDirection> sortColumns;

public NaiveSortOperator(
Operator child,
ArrayList<ColumnWithDirection> sortColumns
List<ColumnWithDirection> sortColumns
)
{
this.child = child;
Expand All @@ -57,7 +58,7 @@ public Closeable goOrContinue(Closeable continuation, Receiver receiver)
public Signal push(RowsAndColumns rac)
{
if (sorter == null) {
sorter = NaiveSortMaker.fromRAC(rac).make(sortColumns);
sorter = NaiveSortMaker.fromRAC(rac).make(new ArrayList<>(sortColumns));
} else {
sorter.moreData(rac);
}
Expand All @@ -67,7 +68,9 @@ public Signal push(RowsAndColumns rac)
@Override
public void completed()
{
receiver.push(sorter.complete());
if (sorter != null) {
receiver.push(sorter.complete());
}
receiver.completed();
}
}
Expand Down
Loading

0 comments on commit a84e41d

Please sign in to comment.