Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added UI support for waitTillSegmentsLoad #15110

Merged
merged 5 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions web-console/src/druid-models/execution/execution.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ describe('Execution', () => {
"maxNumTasks": 2,
},
"result": undefined,
"segmentStatus": undefined,
"sqlQuery": "REPLACE INTO \\"kttm_simple\\" OVERWRITE ALL
SELECT
TIME_PARSE(\\"timestamp\\") AS \\"__time\\",
Expand Down Expand Up @@ -643,6 +644,7 @@ describe('Execution', () => {
"sqlQuery": undefined,
"sqlQueryId": undefined,
},
"segmentStatus": undefined,
"sqlQuery": undefined,
"stages": undefined,
"startTime": 2023-07-05T21:33:19.147Z,
Expand Down Expand Up @@ -679,6 +681,7 @@ describe('Execution', () => {
"nativeQuery": undefined,
"queryContext": undefined,
"result": undefined,
"segmentStatus": undefined,
"sqlQuery": undefined,
"stages": undefined,
"startTime": 2023-07-05T21:40:39.986Z,
Expand Down
50 changes: 50 additions & 0 deletions web-console/src/druid-models/execution/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,18 @@ function formatPendingMessage(
}
}

interface SegmentStatus {
duration: number;
onDemandSegments: number;
pendingSegments: number;
precachedSegments: number;
startTime: Date;
state: 'INIT' | 'WAITING' | 'SUCCESS';
totalSegments: number;
unknownSegments: number;
usedSegments: number;
}

export interface ExecutionValue {
engine: DruidEngine;
id: string;
Expand All @@ -182,6 +194,7 @@ export interface ExecutionValue {
warnings?: ExecutionError[];
capacityInfo?: CapacityInfo;
_payload?: MsqTaskPayloadResponse;
segmentStatus?: SegmentStatus;
}

export class Execution {
Expand Down Expand Up @@ -292,6 +305,11 @@ export class Execution {
const startTime = new Date(deepGet(taskReport, 'multiStageQuery.payload.status.startTime'));
const durationMs = deepGet(taskReport, 'multiStageQuery.payload.status.durationMs');

const segmentLoaderStatus = deepGet(
taskReport,
'multiStageQuery.payload.status.segmentLoadWaiterStatus',
);

let result: QueryResult | undefined;
const resultsPayload: {
signature: { name: string; type: string }[];
Expand All @@ -313,6 +331,7 @@ export class Execution {
engine: 'sql-msq-task',
id,
status: Execution.normalizeTaskStatus(status),
segmentStatus: segmentLoaderStatus,
startTime: isNaN(startTime.getTime()) ? undefined : startTime,
duration: typeof durationMs === 'number' ? durationMs : undefined,
usageInfo: getUsageInfoFromStatusPayload(
Expand Down Expand Up @@ -369,6 +388,7 @@ export class Execution {
public readonly error?: ExecutionError;
public readonly warnings?: ExecutionError[];
public readonly capacityInfo?: CapacityInfo;
public readonly segmentStatus?: SegmentStatus;

public readonly _payload?: { payload: any; task: string };

Expand All @@ -390,6 +410,7 @@ export class Execution {
this.error = value.error;
this.warnings = nonEmptyArray(value.warnings) ? value.warnings : undefined;
this.capacityInfo = value.capacityInfo;
this.segmentStatus = value.segmentStatus;

this._payload = value._payload;
}
Expand All @@ -412,6 +433,7 @@ export class Execution {
error: this.error,
warnings: this.warnings,
capacityInfo: this.capacityInfo,
segmentStatus: this.segmentStatus,

_payload: this._payload,
};
Expand Down Expand Up @@ -526,6 +548,34 @@ export class Execution {
return status !== 'SUCCESS' && status !== 'FAILED';
}

public getSegmentStatusDescription() {
const { segmentStatus } = this;

let label = '';

switch (segmentStatus?.state) {
case 'INIT':
label = 'Waiting for segments loading to start...';
break;

case 'WAITING':
label = 'Waiting for segments loading to complete...';
break;

case 'SUCCESS':
label = 'Segments loaded successfully in ' + segmentStatus.duration + 'ms.';
break;

default:
break;
}

return {
label,
...segmentStatus,
};
}

public isFullyComplete(): boolean {
if (this.isWaitingForQuery()) return false;

Expand Down
16 changes: 16 additions & 0 deletions web-console/src/druid-models/query-context/query-context.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,22 @@ export function changeFinalizeAggregations(
: deepDelete(context, 'finalizeAggregations');
}

// waitTillSegmentsLoad

export function getWaitTillSegmentsLoad(context: QueryContext): boolean | undefined {
const { waitTillSegmentsLoad } = context;
return typeof waitTillSegmentsLoad === 'boolean' ? waitTillSegmentsLoad : undefined;
}

export function changeWaitTillSegmentsLoad(
context: QueryContext,
waitTillSegmentsLoad: boolean | undefined,
): QueryContext {
return typeof waitTillSegmentsLoad === 'boolean'
? deepSet(context, 'waitTillSegmentsLoad', waitTillSegmentsLoad)
: deepDelete(context, 'waitTillSegmentsLoad');
}

// groupByEnableMultiValueUnnesting

export function getGroupByEnableMultiValueUnnesting(context: QueryContext): boolean | undefined {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ describe('WorkbenchQuery', () => {
finalizeAggregations: false,
groupByEnableMultiValueUnnesting: false,
useCache: false,
waitTillSegmentsLoad: true,
},
header: true,
query: 'INSERT INTO wiki2 SELECT * FROM wikipedia',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,7 @@ export class WorkbenchQuery {
apiQuery.context.executionMode ??= 'async';
apiQuery.context.finalizeAggregations ??= !ingestQuery;
apiQuery.context.groupByEnableMultiValueUnnesting ??= !ingestQuery;
apiQuery.context.waitTillSegmentsLoad ??= true;
}

if (Array.isArray(queryParameters) && queryParameters.length) {
Expand Down
13 changes: 12 additions & 1 deletion web-console/src/helpers/execution/sql-task-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,13 @@ export interface SubmitTaskQueryOptions {
export async function submitTaskQuery(
options: SubmitTaskQueryOptions,
): Promise<Execution | IntermediateQueryState<Execution>> {
const { query, context, prefixLines, cancelToken, preserveOnTermination, onSubmitted } = options;
const { query, prefixLines, cancelToken, preserveOnTermination, onSubmitted } = options;

// setting waitTillSegmentsLoad to true by default
const context = {
waitTillSegmentsLoad: true,
...(options.context || {}),
};

let sqlQuery: string;
let jsonQuery: Record<string, any>;
Expand Down Expand Up @@ -261,6 +267,11 @@ export async function updateExecutionWithDatasourceLoadedIfNeeded(
return execution;
}

// This means we don't have to perform the SQL query to check if the segments are loaded
if (execution.queryContext?.waitTillSegmentsLoad === true) {
return execution.markDestinationDatasourceLoaded();
}

const endTime = execution.getEndTime();
if (
!endTime || // If endTime is not set (this is not expected to happen) then just bow out
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ exports[`ExecutionDetailsPane matches snapshot no init tab 1`] = `
"id": "native",
"label": "Native query",
},
false,
undefined,
undefined,
Object {
Expand Down Expand Up @@ -286,6 +287,7 @@ PARTITIONED BY DAY",
"maxParseExceptions": 2,
},
"result": undefined,
"segmentStatus": undefined,
"sqlQuery": "REPLACE INTO \\"kttm-blank-lines\\" OVERWRITE ALL
SELECT
TIME_PARSE(\\"timestamp\\") AS \\"__time\\",
Expand Down Expand Up @@ -909,6 +911,7 @@ PARTITIONED BY DAY",
"maxParseExceptions": 2,
},
"result": undefined,
"segmentStatus": undefined,
"sqlQuery": "REPLACE INTO \\"kttm-blank-lines\\" OVERWRITE ALL
SELECT
TIME_PARSE(\\"timestamp\\") AS \\"__time\\",
Expand Down Expand Up @@ -1319,6 +1322,7 @@ exports[`ExecutionDetailsPane matches snapshot with init tab 1`] = `
"id": "native",
"label": "Native query",
},
false,
undefined,
undefined,
Object {
Expand Down Expand Up @@ -1576,6 +1580,7 @@ PARTITIONED BY DAY",
"maxParseExceptions": 2,
},
"result": undefined,
"segmentStatus": undefined,
"sqlQuery": "REPLACE INTO \\"kttm-blank-lines\\" OVERWRITE ALL
SELECT
TIME_PARSE(\\"timestamp\\") AS \\"__time\\",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import React, { useState } from 'react';

import { FancyTabPane } from '../../../components';
import type { Execution } from '../../../druid-models';
import { pluralIfNeeded } from '../../../utils';
import { formatDuration, formatDurationWithMs, pluralIfNeeded } from '../../../utils';
import { DestinationPagesPane } from '../destination-pages-pane/destination-pages-pane';
import { ExecutionErrorPane } from '../execution-error-pane/execution-error-pane';
import { ExecutionStagesPane } from '../execution-stages-pane/execution-stages-pane';
Expand All @@ -40,7 +40,8 @@ export type ExecutionDetailsTab =
| 'result'
| 'pages'
| 'error'
| 'warnings';
| 'warnings'
| 'segmentStatus';

interface ExecutionDetailsPaneProps {
execution: Execution;
Expand All @@ -53,6 +54,7 @@ export const ExecutionDetailsPane = React.memo(function ExecutionDetailsPane(
) {
const { execution, initTab, goToTask } = props;
const [activeTab, setActiveTab] = useState<ExecutionDetailsTab>(initTab || 'general');
const segmentStatusDescription = execution.getSegmentStatusDescription();

function renderContent() {
switch (activeTab) {
Expand Down Expand Up @@ -120,6 +122,25 @@ export const ExecutionDetailsPane = React.memo(function ExecutionDetailsPane(
case 'warnings':
return <ExecutionWarningsPane execution={execution} />;

case 'segmentStatus':
return (
<>
<p>
Duration:{' '}
{segmentStatusDescription.duration
? formatDurationWithMs(segmentStatusDescription.duration)
: '-'}
{execution.duration
? ` (query duration was ${formatDuration(execution.duration)})`
: ''}
</p>
<p>Total segments: {segmentStatusDescription.totalSegments ?? '-'}</p>
<p>Used segments: {segmentStatusDescription.usedSegments ?? '-'}</p>
<p>Precached segments: {segmentStatusDescription.precachedSegments ?? '-'}</p>
<p>On demand segments: {segmentStatusDescription.onDemandSegments ?? '-'}</p>
</>
);

default:
return;
}
Expand All @@ -146,6 +167,11 @@ export const ExecutionDetailsPane = React.memo(function ExecutionDetailsPane(
label: 'Native query',
icon: IconNames.COG,
},
Boolean(execution.segmentStatus) && {
id: 'segmentStatus',
label: 'Segments',
icon: IconNames.HEAT_GRID,
},
execution.result && {
id: 'result',
label: 'Results',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,8 @@ exports[`ExecutionProgressBarPane matches snapshot 1`] = `
className="overall"
intent="primary"
/>
<Unknown>
</Unknown>
</div>
`;
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ export const ExecutionProgressBarPane = React.memo(function ExecutionProgressBar

const idx = stages ? stages.currentStageIndex() : -1;
const waitingForSegments = stages && !execution.isWaitingForQuery();

const segmentStatusDescription = execution?.getSegmentStatusDescription();

return (
<div className="execution-progress-bar-pane">
<Label>
Expand Down Expand Up @@ -78,6 +81,7 @@ export const ExecutionProgressBarPane = React.memo(function ExecutionProgressBar
intent={stages ? Intent.PRIMARY : undefined}
value={stages && execution.isWaitingForQuery() ? stages.overallProgress() : undefined}
/>
{segmentStatusDescription && <Label>{segmentStatusDescription.label}</Label>}
{stages && idx >= 0 && (
<>
<Label>{`Current stage (${idx + 1} of ${stages.stageCount()})`}</Label>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ exports[`IngestSuccessPane matches snapshot 1`] = `
</p>
<p>
Insert query took 0:00:23.
<span
className="action"
onClick={[Function]}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ export const IngestSuccessPane = React.memo(function IngestSuccessPane(

const warnings = execution.stages?.getWarningCount() || 0;

const duration = execution.duration;
const { duration } = execution;
const segmentStatusDescription = execution.getSegmentStatusDescription();

return (
<div className="ingest-success-pane">
<p>
Expand All @@ -63,10 +65,12 @@ export const IngestSuccessPane = React.memo(function IngestSuccessPane(
</p>
<p>
{duration ? `Insert query took ${formatDuration(duration)}. ` : `Insert query completed. `}
{segmentStatusDescription ? segmentStatusDescription.label + ' ' : ''}
<span className="action" onClick={() => onDetails(execution.id)}>
Show details
</span>
</p>

{onQueryTab && (
<p>
Open new tab with:{' '}
Expand Down
12 changes: 12 additions & 0 deletions web-console/src/views/workbench-view/run-panel/run-panel.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import {
changeUseApproximateCountDistinct,
changeUseApproximateTopN,
changeUseCache,
changeWaitTillSegmentsLoad,
getDurableShuffleStorage,
getFinalizeAggregations,
getGroupByEnableMultiValueUnnesting,
Expand All @@ -53,6 +54,7 @@ import {
getUseApproximateCountDistinct,
getUseApproximateTopN,
getUseCache,
getWaitTillSegmentsLoad,
summarizeIndexSpec,
} from '../../../druid-models';
import { deepGet, deepSet, pluralIfNeeded, tickIcon } from '../../../utils';
Expand Down Expand Up @@ -110,6 +112,7 @@ export const RunPanel = React.memo(function RunPanel(props: RunPanelProps) {

const maxParseExceptions = getMaxParseExceptions(queryContext);
const finalizeAggregations = getFinalizeAggregations(queryContext);
const waitTillSegmentsLoad = getWaitTillSegmentsLoad(queryContext);
const groupByEnableMultiValueUnnesting = getGroupByEnableMultiValueUnnesting(queryContext);
const sqlJoinAlgorithm = queryContext.sqlJoinAlgorithm ?? 'broadcast';
const selectDestination = queryContext.selectDestination ?? 'taskReport';
Expand Down Expand Up @@ -311,6 +314,15 @@ export const RunPanel = React.memo(function RunPanel(props: RunPanelProps) {
changeQueryContext(changeFinalizeAggregations(queryContext, v))
}
/>
<MenuTristate
icon={IconNames.STOPWATCH}
text="Wait until segments have loaded"
value={waitTillSegmentsLoad}
undefinedEffectiveValue /* ={true} */
onValueChange={v =>
changeQueryContext(changeWaitTillSegmentsLoad(queryContext, v))
}
/>
<MenuTristate
icon={IconNames.FORK}
text="Enable GroupBy multi-value unnesting"
Expand Down