Skip to content

Commit

Permalink
Extend the PARTITION BY clause to accept string literals for the time…
Browse files Browse the repository at this point in the history
… partitioning (#15836)


This PR contains a portion of the changes from the inactive draft PR for integrating the catalog with the Calcite planner #13686 from @paul-rogers, extending the PARTITION BY clause to accept string literals for the time partitioning
  • Loading branch information
zachjsh authored Feb 9, 2024
1 parent 6e9eee4 commit f9ee2c3
Show file tree
Hide file tree
Showing 21 changed files with 424 additions and 116 deletions.
27 changes: 27 additions & 0 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,33 @@ The following ISO 8601 periods are supported for `TIME_FLOOR` and the string con
- P3M
- P1Y

The string constant can also include any of the keywords mentioned above:

- `HOUR` - Same as `'PT1H'`
- `DAY` - Same as `'P1D'`
- `MONTH` - Same as `'P1M'`
- `YEAR` - Same as `'P1Y'`
- `ALL TIME`
- `ALL` - Alias for `ALL TIME`

The `WEEK` granularity is deprecated and not supported in MSQ.

Examples:

```SQL
-- Keyword
PARTITIONED BY HOUR

-- String literal
PARTITIONED BY 'HOUR'

-- ISO 8601 period
PARTITIONED BY 'PT1H'

-- TIME_FLOOR function
PARTITIONED BY TIME_FLOOR(__time, 'PT1H')
```

For more information about partitioning, see [Partitioning](concepts.md#partitioning-by-time). <br /><br />
*Avoid partitioning by week, `P1W`, because weeks don't align neatly with months and years, making it difficult to partition by coarser granularities later.

Expand Down
1 change: 1 addition & 0 deletions sql/src/main/codegen/config.fmpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ data: {
"org.apache.calcite.sql.SqlNodeList"
"org.apache.calcite.sql.SqlBasicCall"
"org.apache.druid.java.util.common.granularity.Granularity"
"org.apache.druid.java.util.common.granularity.GranularityType"
"org.apache.druid.java.util.common.granularity.Granularities"
"org.apache.druid.sql.calcite.parser.DruidSqlInsert"
"org.apache.druid.sql.calcite.parser.DruidSqlParserUtils"
Expand Down
17 changes: 8 additions & 9 deletions sql/src/main/codegen/includes/common.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
* under the License.
*/

// Using fully qualified name for Pair class, since Calcite also has a same class name being used in the Parser.jj
org.apache.druid.java.util.common.Pair<Granularity, String> PartitionGranularity() :
SqlGranularityLiteral PartitionGranularity() :
{
SqlNode e;
Granularity granularity;
Expand Down Expand Up @@ -52,14 +51,14 @@ org.apache.druid.java.util.common.Pair<Granularity, String> PartitionGranularity
|
<ALL>
{
granularity = Granularities.ALL;
unparseString = "ALL";
granularity = Granularities.ALL;
unparseString = "ALL";
}
[
<TIME>
{
unparseString += " TIME";
}
<TIME>
{
unparseString += " TIME";
}
]
|
e = Expression(ExprContext.ACCEPT_SUB_QUERY)
Expand All @@ -69,7 +68,7 @@ org.apache.druid.java.util.common.Pair<Granularity, String> PartitionGranularity
}
)
{
return new org.apache.druid.java.util.common.Pair(granularity, unparseString);
return new SqlGranularityLiteral(granularity, unparseString, getPos());
}
}

Expand Down
6 changes: 3 additions & 3 deletions sql/src/main/codegen/includes/insert.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ SqlNode DruidSqlInsertEof() :
final SqlNodeList columnList;
final Span s;
final Pair<SqlNodeList, SqlNodeList> p;
org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy = new org.apache.druid.java.util.common.Pair(null, null);
SqlGranularityLiteral partitionedBy = null;
SqlNodeList clusteredBy = null;
String exportFileFormat = null;
}
Expand Down Expand Up @@ -93,7 +93,7 @@ SqlNode DruidSqlInsertEof() :
clusteredBy = ClusteredBy()
]
{
if (clusteredBy != null && partitionedBy.lhs == null) {
if (clusteredBy != null && partitionedBy == null) {
throw org.apache.druid.sql.calcite.parser.DruidSqlParserUtils.problemParsing(
"CLUSTERED BY found before PARTITIONED BY, CLUSTERED BY must come after the PARTITIONED BY clause"
);
Expand All @@ -112,6 +112,6 @@ SqlNode DruidSqlInsertEof() :
return insertNode;
}
SqlInsert sqlInsert = (SqlInsert) insertNode;
return new DruidSqlInsert(sqlInsert, partitionedBy.lhs, partitionedBy.rhs, clusteredBy, exportFileFormat);
return DruidSqlInsert.create(sqlInsert, partitionedBy, clusteredBy, exportFileFormat);
}
}
7 changes: 3 additions & 4 deletions sql/src/main/codegen/includes/replace.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ SqlNode DruidSqlReplaceEof() :
final Span s;
SqlNode tableRef = null;
SqlInsert sqlInsert;
// Using fully qualified name for Pair class, since Calcite also has a same class name being used in the Parser.jj
org.apache.druid.java.util.common.Pair<Granularity, String> partitionedBy = new org.apache.druid.java.util.common.Pair(null, null);
SqlGranularityLiteral partitionedBy = null;
SqlNodeList clusteredBy = null;
final Pair<SqlNodeList, SqlNodeList> p;
SqlNode replaceTimeQuery = null;
Expand Down Expand Up @@ -78,7 +77,7 @@ SqlNode DruidSqlReplaceEof() :
clusteredBy = ClusteredBy()
]
{
if (clusteredBy != null && partitionedBy.lhs == null) {
if (clusteredBy != null && partitionedBy == null) {
throw org.apache.druid.sql.calcite.parser.DruidSqlParserUtils.problemParsing(
"CLUSTERED BY found before PARTITIONED BY, CLUSTERED BY must come after the PARTITIONED BY clause"
);
Expand All @@ -91,7 +90,7 @@ SqlNode DruidSqlReplaceEof() :
<EOF>
{
sqlInsert = new SqlInsert(s.end(source), SqlNodeList.EMPTY, destination, source, columnList);
return new DruidSqlReplace(sqlInsert, partitionedBy.lhs, partitionedBy.rhs, clusteredBy, replaceTimeQuery, exportFileFormat);
return DruidSqlReplace.create(sqlInsert, partitionedBy, clusteredBy, replaceTimeQuery, exportFileFormat);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.calcite.util.ImmutableNullableList;

import javax.annotation.Nullable;
import java.util.List;

/**
* Common base class to the two Druid "ingest" statements: INSERT and REPLACE.
Expand All @@ -37,10 +38,7 @@ public abstract class DruidSqlIngest extends SqlInsert
public static final String SQL_EXPORT_FILE_FORMAT = "__exportFileFormat";

@Nullable
protected final Granularity partitionedBy;

// Used in the unparse function to generate the original query since we convert the string to an enum
protected final String partitionedByStringForUnparse;
protected final SqlGranularityLiteral partitionedBy;

@Nullable
protected final SqlNodeList clusteredBy;
Expand All @@ -53,22 +51,20 @@ public DruidSqlIngest(
SqlNode targetTable,
SqlNode source,
SqlNodeList columnList,
@Nullable Granularity partitionedBy,
@Nullable String partitionedByStringForUnparse,
@Nullable SqlGranularityLiteral partitionedBy,
@Nullable SqlNodeList clusteredBy,
@Nullable String exportFileFormat
)
{
super(pos, keywords, targetTable, source, columnList);

this.partitionedByStringForUnparse = partitionedByStringForUnparse;
this.partitionedBy = partitionedBy;
this.clusteredBy = clusteredBy;
this.exportFileFormat = exportFileFormat;
}

@Nullable
public Granularity getPartitionedBy()
public SqlGranularityLiteral getPartitionedBy()
{
return partitionedBy;
}
Expand All @@ -84,4 +80,14 @@ public String getExportFileFormat()
{
return exportFileFormat;
}

@Override
public List<SqlNode> getOperandList()
{
return ImmutableNullableList.<SqlNode>builder()
.addAll(super.getOperandList())
.add(partitionedBy)
.add(clusteredBy)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.calcite.sql.parser.SqlParserPos;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -41,28 +41,49 @@ public class DruidSqlInsert extends DruidSqlIngest
// This allows reusing super.unparse
public static final SqlOperator OPERATOR = SqlInsert.OPERATOR;

public static DruidSqlInsert create(
@Nonnull SqlInsert insertNode,
@Nullable SqlGranularityLiteral partitionedBy,
@Nullable SqlNodeList clusteredBy,
@Nullable String exportFileFormat
)
{
return new DruidSqlInsert(
insertNode.getParserPosition(),
(SqlNodeList) insertNode.getOperandList().get(0), // No better getter to extract this
insertNode.getTargetTable(),
insertNode.getSource(),
insertNode.getTargetColumnList(),
partitionedBy,
clusteredBy,
exportFileFormat
);
}

/**
* While partitionedBy and partitionedByStringForUnparse can be null as arguments to the constructor, this is
* While partitionedBy can be null as arguments to the constructor, this is
* disallowed (semantically) and the constructor performs checks to ensure that. This helps in producing friendly
* errors when the PARTITIONED BY custom clause is not present, and keeps its error separate from JavaCC/Calcite's
* custom errors which can be cryptic when someone accidentally forgets to explicitly specify the PARTITIONED BY clause
*/
public DruidSqlInsert(
@Nonnull SqlInsert insertNode,
@Nullable Granularity partitionedBy,
@Nullable String partitionedByStringForUnparse,
SqlParserPos pos,
SqlNodeList keywords,
SqlNode targetTable,
SqlNode source,
SqlNodeList columnList,
@Nullable SqlGranularityLiteral partitionedBy,
@Nullable SqlNodeList clusteredBy,
@Nullable String exportFileFormat
)
{
super(
insertNode.getParserPosition(),
(SqlNodeList) insertNode.getOperandList().get(0), // No better getter to extract this
insertNode.getTargetTable(),
insertNode.getSource(),
insertNode.getTargetColumnList(),
pos,
keywords,
targetTable,
source,
columnList,
partitionedBy,
partitionedByStringForUnparse,
clusteredBy,
exportFileFormat
);
Expand Down Expand Up @@ -95,9 +116,9 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec)
getSource().unparse(writer, 0, 0);
writer.newlineAndIndent();

if (partitionedByStringForUnparse != null) {
if (getPartitionedBy() != null) {
writer.keyword("PARTITIONED BY");
writer.keyword(partitionedByStringForUnparse);
getPartitionedBy().unparse(writer, leftPrec, rightPrec);
}

if (getClusteredBy() != null) {
Expand Down
Loading

0 comments on commit f9ee2c3

Please sign in to comment.