Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into windowing-fix-test-cmp
Browse files Browse the repository at this point in the history
  • Loading branch information
kgyrtkirk committed Oct 10, 2023
2 parents 0ddd3be + 90a1458 commit 4073f5e
Show file tree
Hide file tree
Showing 153 changed files with 7,651 additions and 1,827 deletions.
10 changes: 8 additions & 2 deletions .github/workflows/reusable-unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,15 @@ jobs:
with:
fetch-depth: 0

- name: setup jdk${{ inputs.jdk }}
run: echo "JAVA_HOME=$JAVA_HOME_${{ inputs.jdk }}_X64" >> $GITHUB_ENV
# skip the "cache: maven" step from setup-java. We explicitly use a
# different cache key since we cannot reuse it across commits.
- uses: actions/setup-java@v3
with:
distribution: 'zulu'
java-version: ${{ inputs.jdk }}

# the build step produces SNAPSHOT artifacts into the local maven repository,
# we include github.sha in the cache key to make it specific to that build/jdk
- name: Restore Maven repository
id: maven-restore
uses: actions/cache/restore@v3
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/standard-its.yml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ jobs:
- name: Setup java
run: export JAVA_HOME=$JAVA_HOME_8_X64

# the build step produces SNAPSHOT artifacts into the local maven repository,
# we include github.sha in the cache key to make it specific to that build/jdk
- name: Restore Maven repository
id: maven-restore
uses: actions/cache/restore@v3
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/static-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
strategy:
fail-fast: false
matrix:
java: [ '8', '11', '17' ]
java: [ '8', '11', '17', '21' ]
runs-on: ubuntu-latest
steps:
- name: checkout branch
Expand Down
15 changes: 10 additions & 5 deletions .github/workflows/unit-and-integration-tests-unified.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,21 @@ jobs:
strategy:
fail-fast: false
matrix:
jdk: [ '8', '11', '17' ]
jdk: [ '8', '11', '17', '21' ]
runs-on: ubuntu-latest
steps:
- name: Checkout branch
uses: actions/checkout@v3

- name: setup jdk${{ matrix.jdk }}
run: |
echo "JAVA_HOME=$JAVA_HOME_${{ matrix.jdk }}_X64" >> $GITHUB_ENV
# skip the "cache: maven" step from setup-java. We explicitly use a
# different cache key since we cannot reuse it across commits.
- uses: actions/setup-java@v3
with:
distribution: 'zulu'
java-version: ${{ matrix.jdk }}

# the build step produces SNAPSHOT artifacts into the local maven repository,
# we include github.sha in the cache key to make it specific to that build/jdk
- name: Cache Maven m2 repository
id: maven
uses: actions/cache@v3
Expand Down Expand Up @@ -112,7 +117,7 @@ jobs:
strategy:
fail-fast: false
matrix:
jdk: [ 11, 17 ]
jdk: [ 11, 17, 21 ]
name: "unit tests (jdk${{ matrix.jdk }}, sql-compat=true)"
uses: ./.github/workflows/unit-tests.yml
needs: unit-tests
Expand Down
2 changes: 2 additions & 0 deletions docs/development/extensions-core/lookups-cached-global.md
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ The JDBC lookups will poll a database to populate its local cache. If the `tsCol
|`filter`|The filter to use when selecting lookups, this is used to create a where clause on lookup population|No|No Filter|
|`tsColumn`| The column in `table` which contains when the key was updated|No|Not used|
|`pollPeriod`|How often to poll the DB|No|0 (only once)|
|`jitterSeconds`| How much jitter to add (in seconds) up to maximum as a delay (actual value will be used as random from 0 to `jitterSeconds`), used to distribute db load more evenly|No|0|
|`maxHeapPercentage`|The maximum percentage of heap size that the lookup should consume. If the lookup grows beyond this size, warning messages will be logged in the respective service logs.|No|10% of JVM heap size|

```json
Expand All @@ -367,6 +368,7 @@ The JDBC lookups will poll a database to populate its local cache. If the `tsCol
"valueColumn":"the_new_dim_value",
"tsColumn":"timestamp_column",
"pollPeriod":600000,
"jitterSeconds": 120,
"maxHeapPercentage": 10
}
```
Expand Down
2 changes: 0 additions & 2 deletions docs/multi-stage-query/known-issues.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ an [UnknownError](./reference.md#error_UnknownError) with a message including "N

## `SELECT` Statement

- `SELECT` from a Druid datasource does not include unpublished real-time data.

- `GROUPING SETS` and `UNION ALL` are not implemented. Queries using these features return a
[QueryNotSupported](reference.md#error_QueryNotSupported) error.

Expand Down
1 change: 1 addition & 0 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ The following table lists the context parameters for the MSQ task engine:
| `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` |
| `selectDestination` | SELECT<br /><br /> Controls where the final result of the select query is written. <br />Use `taskReport`(the default) to write select results to the task report. <b> This is not scalable since task reports size explodes for large results </b> <br/>Use `durableStorage` to write results to durable storage location. <b>For large results sets, its recommended to use `durableStorage` </b>. To configure durable storage see [`this`](#durable-storage) section. | `taskReport` |
| `waitTillSegmentsLoad` | INSERT, REPLACE<br /><br /> If set, the ingest query waits for the generated segment to be loaded before exiting, else the ingest query exits without waiting. The task and live reports contain the information about the status of loading segments if this flag is set. This will ensure that any future queries made after the ingestion exits will include results from the ingestion. The drawback is that the controller task will stall till the segments are loaded. | `false` |
| `includeSegmentSource` | SELECT, INSERT, REPLACE<br /><br /> Controls the sources, which will be queried for results in addition to the segments present on deep storage. Can be `NONE` or `REALTIME`. If this value is `NONE`, only non-realtime (published and used) segments will be downloaded from deep storage. If this value is `REALTIME`, results will also be included from realtime tasks. | `NONE` |

## Joins

Expand Down
2 changes: 2 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`query/segments/count`|This metric is not enabled by default. See the `QueryMetrics` Interface for reference regarding enabling this metric. Number of segments that will be touched by the query. In the broker, it makes a plan to distribute the query to realtime tasks and historicals based on a snapshot of segment distribution state. If there are some segments moved after this snapshot is created, certain historicals and realtime tasks can report those segments as missing to the broker. The broker will resend the query to the new servers that serve those segments after move. In this case, those segments can be counted more than once in this metric.||Varies|
|`query/priority`|Assigned lane and priority, only if Laning strategy is enabled. Refer to [Laning strategies](../configuration/index.md#laning-strategies)|`lane`, `dataSource`, `type`|0|
|`sqlQuery/time`|Milliseconds taken to complete a SQL query.|`id`, `nativeQueryIds`, `dataSource`, `remoteAddress`, `success`, `engine`|< 1s|
Expand Down Expand Up @@ -97,6 +98,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.||

### Real-time

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

import com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
Expand All @@ -36,12 +34,12 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -71,12 +69,10 @@ public SqlAggFunction calciteFunction()
@Override
public Aggregation toDruidAggregation(
PlannerContext plannerContext,
RowSignature rowSignature,
VirtualColumnRegistry virtualColumnRegistry,
RexBuilder rexBuilder,
String name,
AggregateCall aggregateCall,
Project project,
InputAccessor inputAccessor,
List<Aggregation> existingAggregations,
boolean finalizeAggregations
)
Expand All @@ -88,13 +84,8 @@ public Aggregation toDruidAggregation(
// fetch sum column expression
DruidExpression sumColumn = Expressions.toDruidExpression(
plannerContext,
rowSignature,
Expressions.fromFieldAccess(
rexBuilder.getTypeFactory(),
rowSignature,
project,
aggregateCall.getArgList().get(0)
)
inputAccessor.getInputRowSignature(),
inputAccessor.getField(aggregateCall.getArgList().get(0))
);

if (sumColumn == null) {
Expand All @@ -114,12 +105,7 @@ public Aggregation toDruidAggregation(
Integer size = null;

if (aggregateCall.getArgList().size() >= 2) {
RexNode sizeArg = Expressions.fromFieldAccess(
rexBuilder.getTypeFactory(),
rowSignature,
project,
aggregateCall.getArgList().get(1)
);
RexNode sizeArg = inputAccessor.getField(aggregateCall.getArgList().get(1));

size = ((Number) RexLiteral.value(sizeArg)).intValue();
}
Expand All @@ -128,25 +114,15 @@ public Aggregation toDruidAggregation(
Integer scale = null;

if (aggregateCall.getArgList().size() >= 3) {
RexNode scaleArg = Expressions.fromFieldAccess(
rexBuilder.getTypeFactory(),
rowSignature,
project,
aggregateCall.getArgList().get(2)
);
RexNode scaleArg = inputAccessor.getField(aggregateCall.getArgList().get(2));

scale = ((Number) RexLiteral.value(scaleArg)).intValue();
}

Boolean useStrictNumberParsing = null;

if (aggregateCall.getArgList().size() >= 4) {
RexNode useStrictNumberParsingArg = Expressions.fromFieldAccess(
rexBuilder.getTypeFactory(),
rowSignature,
project,
aggregateCall.getArgList().get(3)
);
RexNode useStrictNumberParsingArg = inputAccessor.getField(aggregateCall.getArgList().get(3));

useStrictNumberParsing = RexLiteral.booleanValue(useStrictNumberParsingArg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
package org.apache.druid.query.aggregation.tdigestsketch.sql;

import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
Expand All @@ -36,13 +34,12 @@
import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchAggregatorFactory;
import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchUtils;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.Aggregations;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;

import javax.annotation.Nullable;
Expand All @@ -63,25 +60,18 @@ public SqlAggFunction calciteFunction()
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
final RexBuilder rexBuilder,
final String name,
final AggregateCall aggregateCall,
final Project project,
final InputAccessor inputAccessor,
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
{
final RexNode inputOperand = Expressions.fromFieldAccess(
rexBuilder.getTypeFactory(),
rowSignature,
project,
aggregateCall.getArgList().get(0)
);
final RexNode inputOperand = inputAccessor.getField(aggregateCall.getArgList().get(0));
final DruidExpression input = Aggregations.toDruidExpressionForNumericAggregator(
plannerContext,
rowSignature,
inputAccessor.getInputRowSignature(),
inputOperand
);
if (input == null) {
Expand All @@ -93,12 +83,7 @@ public Aggregation toDruidAggregation(

Integer compression = TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION;
if (aggregateCall.getArgList().size() > 1) {
RexNode compressionOperand = Expressions.fromFieldAccess(
rexBuilder.getTypeFactory(),
rowSignature,
project,
aggregateCall.getArgList().get(1)
);
RexNode compressionOperand = inputAccessor.getField(aggregateCall.getArgList().get(1));
if (!compressionOperand.isA(SqlKind.LITERAL)) {
// compressionOperand must be a literal in order to plan.
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

import com.google.common.collect.ImmutableList;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlAggFunction;
Expand All @@ -39,13 +37,12 @@
import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchToQuantilePostAggregator;
import org.apache.druid.query.aggregation.tdigestsketch.TDigestSketchUtils;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.sql.calcite.aggregation.Aggregation;
import org.apache.druid.sql.calcite.aggregation.Aggregations;
import org.apache.druid.sql.calcite.aggregation.SqlAggregator;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.rel.InputAccessor;
import org.apache.druid.sql.calcite.rel.VirtualColumnRegistry;

import javax.annotation.Nullable;
Expand All @@ -66,26 +63,19 @@ public SqlAggFunction calciteFunction()
@Override
public Aggregation toDruidAggregation(
final PlannerContext plannerContext,
final RowSignature rowSignature,
final VirtualColumnRegistry virtualColumnRegistry,
final RexBuilder rexBuilder,
final String name,
final AggregateCall aggregateCall,
final Project project,
final InputAccessor inputAccessor,
final List<Aggregation> existingAggregations,
final boolean finalizeAggregations
)
{
// This is expected to be a tdigest sketch
final DruidExpression input = Aggregations.toDruidExpressionForNumericAggregator(
plannerContext,
rowSignature,
Expressions.fromFieldAccess(
rexBuilder.getTypeFactory(),
rowSignature,
project,
aggregateCall.getArgList().get(0)
)
inputAccessor.getInputRowSignature(),
inputAccessor.getField(aggregateCall.getArgList().get(0))
);
if (input == null) {
return null;
Expand All @@ -95,12 +85,7 @@ public Aggregation toDruidAggregation(
final String sketchName = StringUtils.format("%s:agg", name);

// this is expected to be quantile fraction
final RexNode quantileArg = Expressions.fromFieldAccess(
rexBuilder.getTypeFactory(),
rowSignature,
project,
aggregateCall.getArgList().get(1)
);
final RexNode quantileArg = inputAccessor.getField(aggregateCall.getArgList().get(1));

if (!quantileArg.isA(SqlKind.LITERAL)) {
// Quantile must be a literal in order to plan.
Expand All @@ -110,12 +95,7 @@ public Aggregation toDruidAggregation(
final double quantile = ((Number) RexLiteral.value(quantileArg)).floatValue();
Integer compression = TDigestSketchAggregatorFactory.DEFAULT_COMPRESSION;
if (aggregateCall.getArgList().size() > 2) {
final RexNode compressionArg = Expressions.fromFieldAccess(
rexBuilder.getTypeFactory(),
rowSignature,
project,
aggregateCall.getArgList().get(2)
);
final RexNode compressionArg = inputAccessor.getField(aggregateCall.getArgList().get(2));
compression = ((Number) RexLiteral.value(compressionArg)).intValue();
}

Expand Down
Loading

0 comments on commit 4073f5e

Please sign in to comment.