Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into joins_over_unnest
Browse files Browse the repository at this point in the history
  • Loading branch information
somu-imply committed Oct 5, 2023
2 parents 78f0090 + 36d7b3c commit 589ed89
Show file tree
Hide file tree
Showing 28 changed files with 492 additions and 226 deletions.
1 change: 1 addition & 0 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ The following table lists the context parameters for the MSQ task engine:
| `durableShuffleStorage` | SELECT, INSERT, REPLACE <br /><br />Whether to use durable storage for shuffle mesh. To use this feature, configure the durable storage at the server level using `druid.msq.intermediate.storage.enable=true`). If these properties are not configured, any query with the context variable `durableShuffleStorage=true` fails with a configuration error. <br /><br /> | `false` |
| `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` |
| `selectDestination` | SELECT<br /><br /> Controls where the final result of the select query is written. <br />Use `taskReport`(the default) to write select results to the task report. <b> This is not scalable since task reports size explodes for large results </b> <br/>Use `durableStorage` to write results to durable storage location. <b>For large results sets, its recommended to use `durableStorage` </b>. To configure durable storage see [`this`](#durable-storage) section. | `taskReport` |
| `waitTillSegmentsLoad` | INSERT, REPLACE<br /><br /> If set, the ingest query waits for the generated segment to be loaded before exiting, else the ingest query exits without waiting. The task and live reports contain the information about the status of loading segments if this flag is set. This will ensure that any future queries made after the ingestion exits will include results from the ingestion. The drawback is that the controller task will stall till the segments are loaded. | `false` |

## Joins

Expand Down
6 changes: 3 additions & 3 deletions docs/querying/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Druid SQL supports SELECT queries with the following structure:
[ WITH tableName [ ( column1, column2, ... ) ] AS ( query ) ]
SELECT [ ALL | DISTINCT ] { * | exprs }
FROM { <table> | (<subquery>) | <o1> [ INNER | LEFT ] JOIN <o2> ON condition }
[, UNNEST(source_expression) as table_alias_name(column_alias_name) ]
[ CROSS JOIN UNNEST(source_expression) as table_alias_name(column_alias_name) ]
[ WHERE expr ]
[ GROUP BY [ exprs | GROUPING SETS ( (exprs), ... ) | ROLLUP (exprs) | CUBE (exprs) ] ]
[ HAVING expr ]
Expand Down Expand Up @@ -97,7 +97,7 @@ The UNNEST clause unnests array values. It's the SQL equivalent to the [unnest d
The following is the general syntax for UNNEST, specifically a query that returns the column that gets unnested:

```sql
SELECT column_alias_name FROM datasource, UNNEST(source_expression1) AS table_alias_name1(column_alias_name1), UNNEST(source_expression2) AS table_alias_name2(column_alias_name2), ...
SELECT column_alias_name FROM datasource CROSS JOIN UNNEST(source_expression1) AS table_alias_name1(column_alias_name1) CROSS JOIN UNNEST(source_expression2) AS table_alias_name2(column_alias_name2) ...
```

* The `datasource` for UNNEST can be any Druid datasource, such as the following:
Expand All @@ -112,7 +112,7 @@ Keep the following things in mind when writing your query:

- You must include the context parameter `"enableUnnest": true`.
- You can unnest multiple source expressions in a single query.
- Notice the comma between the datasource and the UNNEST function. This is needed in most cases of the UNNEST function. Specifically, it is not needed when you're unnesting an inline array since the array itself is the datasource.
- Notice the CROSS JOIN between the datasource and the UNNEST function. This is needed in most cases of the UNNEST function. Specifically, it is not needed when you're unnesting an inline array since the array itself is the datasource.
- If you view the native explanation of a SQL UNNEST, you'll notice that Druid uses `j0.unnest` as a virtual column to perform the unnest. An underscore is added for each unnest, so you may notice virtual columns named `_j0.unnest` or `__j0.unnest`.
- UNNEST preserves the ordering of the source array that is being unnested.

Expand Down
18 changes: 9 additions & 9 deletions docs/tutorials/tutorial-unnest-arrays.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ In the results, notice that the column named `dim3` has nested values like `["a"
The following is the general syntax for UNNEST:

```sql
SELECT column_alias_name FROM datasource, UNNEST(source_expression) AS table_alias_name(column_alias_name)
SELECT column_alias_name FROM datasource CROSS JOIN UNNEST(source_expression) AS table_alias_name(column_alias_name)
```

In addition, you must supply the following context parameter:
Expand All @@ -179,7 +179,7 @@ For more information about the syntax, see [UNNEST](../querying/sql.md#unnest).
The following query returns a column called `d3` from the table `nested_data`. `d3` contains the unnested values from the source column `dim3`:

```sql
SELECT d3 FROM "nested_data", UNNEST(MV_TO_ARRAY(dim3)) AS example_table(d3)
SELECT d3 FROM "nested_data" CROSS JOIN UNNEST(MV_TO_ARRAY(dim3)) AS example_table(d3)
```

Notice the MV_TO_ARRAY helper function, which converts the multi-value records in `dim3` to arrays. It is required since `dim3` is a multi-value string dimension.
Expand All @@ -191,15 +191,15 @@ If the column you are unnesting is not a string dimension, then you do not need
You can unnest into a virtual column (multiple columns treated as one). The following query returns the two source columns and a third virtual column containing the unnested data:

```sql
SELECT dim4,dim5,d45 FROM nested_data, UNNEST(ARRAY[dim4,dim5]) AS example_table(d45)
SELECT dim4,dim5,d45 FROM nested_data CROSS JOIN UNNEST(ARRAY[dim4,dim5]) AS example_table(d45)
```

The virtual column `d45` is the product of the two source columns. Notice how the total number of rows has grown. The table `nested_data` had only seven rows originally.

Another way to unnest a virtual column is to concatenate them with ARRAY_CONCAT:

```sql
SELECT dim4,dim5,d45 FROM nested_data, UNNEST(ARRAY_CONCAT(dim4,dim5)) AS example_table(d45)
SELECT dim4,dim5,d45 FROM nested_data CROSS JOIN UNNEST(ARRAY_CONCAT(dim4,dim5)) AS example_table(d45)
```

Decide which method to use based on what your goals are.
Expand All @@ -221,7 +221,7 @@ The example query returns the following from the `nested_data` datasource:
- an unnested virtual column composed of `dim4` and `dim5` aliased to `d45`

```sql
SELECT dim3,dim4,dim5,d3,d45 FROM "nested_data", UNNEST(MV_TO_ARRAY("dim3")) AS foo1(d3), UNNEST(ARRAY[dim4,dim5]) AS foo2(d45)
SELECT dim3,dim4,dim5,d3,d45 FROM "nested_data" CROSS JOIN UNNEST(MV_TO_ARRAY("dim3")) AS foo1(d3) CROSS JOIN UNNEST(ARRAY[dim4,dim5]) AS foo2(d45)
```


Expand All @@ -230,7 +230,7 @@ SELECT dim3,dim4,dim5,d3,d45 FROM "nested_data", UNNEST(MV_TO_ARRAY("dim3")) AS
The following query uses only three columns from the `nested_data` table as the datasource. From that subset, it unnests the column `dim3` into `d3` and returns `d3`.

```sql
SELECT d3 FROM (SELECT dim1, dim2, dim3 FROM "nested_data"), UNNEST(MV_TO_ARRAY(dim3)) AS example_table(d3)
SELECT d3 FROM (SELECT dim1, dim2, dim3 FROM "nested_data") CROSS JOIN UNNEST(MV_TO_ARRAY(dim3)) AS example_table(d3)
```

### Unnest with a filter
Expand All @@ -242,7 +242,7 @@ You can specify which rows to unnest by including a filter in your query. The fo
* Returns the records for the unnested `d3` that have a `dim2` record that matches the filter

```sql
SELECT d3 FROM (SELECT * FROM nested_data WHERE dim2 IN ('abc')), UNNEST(MV_TO_ARRAY(dim3)) AS example_table(d3)
SELECT d3 FROM (SELECT * FROM nested_data WHERE dim2 IN ('abc')) CROSS JOIN UNNEST(MV_TO_ARRAY(dim3)) AS example_table(d3)
```

You can also filter the results of an UNNEST clause. The following example unnests the inline array `[1,2,3]` but only returns the rows that match the filter:
Expand All @@ -257,7 +257,7 @@ This means that you can run a query like the following where Druid only return r
- The value of `m1` is less than 2.

```sql
SELECT * FROM nested_data, UNNEST(MV_TO_ARRAY("dim3")) AS foo(d3) WHERE d3 IN ('b', 'd') and m1 < 2
SELECT * FROM nested_data CROSS JOIN UNNEST(MV_TO_ARRAY("dim3")) AS foo(d3) WHERE d3 IN ('b', 'd') and m1 < 2
```

The query only returns a single row since only one row meets the conditions. You can see the results change if you modify the filter.
Expand All @@ -267,7 +267,7 @@ The query only returns a single row since only one row meets the conditions. You
The following query unnests `dim3` and then performs a GROUP BY on the output `d3`.

```sql
SELECT d3 FROM nested_data, UNNEST(MV_TO_ARRAY(dim3)) AS example_table(d3) GROUP BY d3
SELECT d3 FROM nested_data CROSS JOIN UNNEST(MV_TO_ARRAY(dim3)) AS example_table(d3) GROUP BY d3
```

You can further transform your results by including clauses like `ORDER BY d3 DESC` or LIMIT.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,14 +463,18 @@ public TaskStatus runTask(final Closer closer)
}
}

boolean shouldWaitForSegmentLoad = MultiStageQueryContext.shouldWaitForSegmentLoad(task.getQuerySpec().getQuery().context());

try {
releaseTaskLocks();
cleanUpDurableStorageIfNeeded();

if (queryKernel != null && queryKernel.isSuccess()) {
if (segmentLoadWaiter != null) {
// If successful and there are segments created, segmentLoadWaiter should wait for them to become available.
if (shouldWaitForSegmentLoad && segmentLoadWaiter != null) {
// If successful, there are segments created and segment load is enabled, segmentLoadWaiter should wait
// for them to become available.
log.info("Controller will now wait for segments to be loaded. The query has already finished executing,"
+ " and results will be included once the segments are loaded, even if this query is cancelled now.");
segmentLoadWaiter.waitForSegmentsToLoad();
}
}
Expand Down Expand Up @@ -1363,31 +1367,35 @@ private void publishAllSegments(final Set<DataSegment> segments) throws IOExcept
}
} else {
Set<String> versionsToAwait = segmentsWithTombstones.stream().map(DataSegment::getVersion).collect(Collectors.toSet());
if (MultiStageQueryContext.shouldWaitForSegmentLoad(task.getQuerySpec().getQuery().context())) {
segmentLoadWaiter = new SegmentLoadStatusFetcher(
context.injector().getInstance(BrokerClient.class),
context.jsonMapper(),
task.getId(),
task.getDataSource(),
versionsToAwait,
segmentsWithTombstones.size(),
true
);
}
performSegmentPublish(
context.taskActionClient(),
SegmentTransactionalInsertAction.overwriteAction(null, segmentsWithTombstones)
);
}
} else if (!segments.isEmpty()) {
Set<String> versionsToAwait = segments.stream().map(DataSegment::getVersion).collect(Collectors.toSet());
if (MultiStageQueryContext.shouldWaitForSegmentLoad(task.getQuerySpec().getQuery().context())) {
segmentLoadWaiter = new SegmentLoadStatusFetcher(
context.injector().getInstance(BrokerClient.class),
context.jsonMapper(),
task.getId(),
task.getDataSource(),
versionsToAwait,
segmentsWithTombstones.size(),
segments.size(),
true
);
performSegmentPublish(
context.taskActionClient(),
SegmentTransactionalInsertAction.overwriteAction(null, segmentsWithTombstones)
);
}
} else if (!segments.isEmpty()) {
Set<String> versionsToAwait = segments.stream().map(DataSegment::getVersion).collect(Collectors.toSet());
segmentLoadWaiter = new SegmentLoadStatusFetcher(
context.injector().getInstance(BrokerClient.class),
context.jsonMapper(),
task.getId(),
task.getDataSource(),
versionsToAwait,
segments.size(),
true
);
// Append mode.
performSegmentPublish(
context.taskActionClient(),
Expand Down
Loading

0 comments on commit 589ed89

Please sign in to comment.