Skip to content

Commit

Permalink
feat(flink): add flink expression column
Browse files Browse the repository at this point in the history
  • Loading branch information
LuckyFBB committed Oct 16, 2024
1 parent b02894b commit c2f525d
Show file tree
Hide file tree
Showing 10 changed files with 2,910 additions and 2,707 deletions.
20 changes: 12 additions & 8 deletions src/grammar/flink/FlinkSqlParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ columnName
| {this.shouldMatchEmpty()}?
;

columnNamePath
: uid
;

columnNameList
: LR_BRACKET columnName (COMMA columnName)* RR_BRACKET
;
Expand Down Expand Up @@ -451,9 +455,9 @@ queryStatement
: valuesCaluse
| withClause queryStatement
| LR_BRACKET queryStatement RR_BRACKET
| left=queryStatement operator=(KW_INTERSECT | KW_UNION | KW_EXCEPT) KW_ALL? right=queryStatement orderByCaluse? limitClause?
| selectClause orderByCaluse? limitClause?
| selectStatement orderByCaluse? limitClause?
| left=queryStatement operator=(KW_INTERSECT | KW_UNION | KW_EXCEPT) KW_ALL? right=queryStatement orderByClause? limitClause?
| selectClause orderByClause? limitClause?
| selectStatement orderByClause? limitClause?
;

valuesCaluse
Expand Down Expand Up @@ -626,15 +630,15 @@ namedWindow
;

windowSpec
: name=errorCapturingIdentifier? LR_BRACKET partitionByClause? orderByCaluse? windowFrame? RR_BRACKET
: name=errorCapturingIdentifier? LR_BRACKET partitionByClause? orderByClause? windowFrame? RR_BRACKET
;

matchRecognizeClause
: KW_MATCH_RECOGNIZE LR_BRACKET partitionByClause? orderByCaluse? measuresClause? outputMode? afterMatchStrategy? patternDefination?
: KW_MATCH_RECOGNIZE LR_BRACKET partitionByClause? orderByClause? measuresClause? outputMode? afterMatchStrategy? patternDefination?
patternVariablesDefination RR_BRACKET (KW_AS? identifier)?
;

orderByCaluse
orderByClause
: KW_ORDER KW_BY orderItemDefition (COMMA orderItemDefition)*
;

Expand Down Expand Up @@ -763,7 +767,7 @@ primaryExpression
// | identifier '->' expression #lambda
// | '(' identifier (',' identifier)+ ')' '->' expression #lambda
| value=primaryExpression LS_BRACKET index=valueExpression RS_BRACKET # subscript
| identifier # columnReference
| columnNamePath # columnReference
| dereferenceDefinition # dereference
| LR_BRACKET expression RR_BRACKET # parenthesizedExpression
| KW_CURRENT_TIMESTAMP # dateFunctionExpression
Expand Down Expand Up @@ -1216,4 +1220,4 @@ nonReservedKeywords
| KW_WEEK
| KW_YEARS
| KW_ZONE
;
;
5 changes: 3 additions & 2 deletions src/lib/flink/FlinkSqlParser.interp

Large diffs are not rendered by default.

5,432 changes: 2,744 additions & 2,688 deletions src/lib/flink/FlinkSqlParser.ts

Large diffs are not rendered by default.

21 changes: 16 additions & 5 deletions src/lib/flink/FlinkSqlParserListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import { ColumnOptionDefinitionContext } from "./FlinkSqlParser.js";
import { PhysicalColumnDefinitionContext } from "./FlinkSqlParser.js";
import { ColumnNameCreateContext } from "./FlinkSqlParser.js";
import { ColumnNameContext } from "./FlinkSqlParser.js";
import { ColumnNamePathContext } from "./FlinkSqlParser.js";
import { ColumnNameListContext } from "./FlinkSqlParser.js";
import { ColumnTypeContext } from "./FlinkSqlParser.js";
import { LengthOneDimensionContext } from "./FlinkSqlParser.js";
Expand Down Expand Up @@ -123,7 +124,7 @@ import { WindowClauseContext } from "./FlinkSqlParser.js";
import { NamedWindowContext } from "./FlinkSqlParser.js";
import { WindowSpecContext } from "./FlinkSqlParser.js";
import { MatchRecognizeClauseContext } from "./FlinkSqlParser.js";
import { OrderByCaluseContext } from "./FlinkSqlParser.js";
import { OrderByClauseContext } from "./FlinkSqlParser.js";
import { OrderItemDefitionContext } from "./FlinkSqlParser.js";
import { LimitClauseContext } from "./FlinkSqlParser.js";
import { PartitionByClauseContext } from "./FlinkSqlParser.js";
Expand Down Expand Up @@ -497,6 +498,16 @@ export class FlinkSqlParserListener implements ParseTreeListener {
* @param ctx the parse tree
*/
exitColumnName?: (ctx: ColumnNameContext) => void;
/**
* Enter a parse tree produced by `FlinkSqlParser.columnNamePath`.
* @param ctx the parse tree
*/
enterColumnNamePath?: (ctx: ColumnNamePathContext) => void;
/**
* Exit a parse tree produced by `FlinkSqlParser.columnNamePath`.
* @param ctx the parse tree
*/
exitColumnNamePath?: (ctx: ColumnNamePathContext) => void;
/**
* Enter a parse tree produced by `FlinkSqlParser.columnNameList`.
* @param ctx the parse tree
Expand Down Expand Up @@ -1382,15 +1393,15 @@ export class FlinkSqlParserListener implements ParseTreeListener {
*/
exitMatchRecognizeClause?: (ctx: MatchRecognizeClauseContext) => void;
/**
* Enter a parse tree produced by `FlinkSqlParser.orderByCaluse`.
* Enter a parse tree produced by `FlinkSqlParser.orderByClause`.
* @param ctx the parse tree
*/
enterOrderByCaluse?: (ctx: OrderByCaluseContext) => void;
enterOrderByClause?: (ctx: OrderByClauseContext) => void;
/**
* Exit a parse tree produced by `FlinkSqlParser.orderByCaluse`.
* Exit a parse tree produced by `FlinkSqlParser.orderByClause`.
* @param ctx the parse tree
*/
exitOrderByCaluse?: (ctx: OrderByCaluseContext) => void;
exitOrderByClause?: (ctx: OrderByClauseContext) => void;
/**
* Enter a parse tree produced by `FlinkSqlParser.orderItemDefition`.
* @param ctx the parse tree
Expand Down
13 changes: 10 additions & 3 deletions src/lib/flink/FlinkSqlParserVisitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import { ColumnOptionDefinitionContext } from "./FlinkSqlParser.js";
import { PhysicalColumnDefinitionContext } from "./FlinkSqlParser.js";
import { ColumnNameCreateContext } from "./FlinkSqlParser.js";
import { ColumnNameContext } from "./FlinkSqlParser.js";
import { ColumnNamePathContext } from "./FlinkSqlParser.js";
import { ColumnNameListContext } from "./FlinkSqlParser.js";
import { ColumnTypeContext } from "./FlinkSqlParser.js";
import { LengthOneDimensionContext } from "./FlinkSqlParser.js";
Expand Down Expand Up @@ -123,7 +124,7 @@ import { WindowClauseContext } from "./FlinkSqlParser.js";
import { NamedWindowContext } from "./FlinkSqlParser.js";
import { WindowSpecContext } from "./FlinkSqlParser.js";
import { MatchRecognizeClauseContext } from "./FlinkSqlParser.js";
import { OrderByCaluseContext } from "./FlinkSqlParser.js";
import { OrderByClauseContext } from "./FlinkSqlParser.js";
import { OrderItemDefitionContext } from "./FlinkSqlParser.js";
import { LimitClauseContext } from "./FlinkSqlParser.js";
import { PartitionByClauseContext } from "./FlinkSqlParser.js";
Expand Down Expand Up @@ -392,6 +393,12 @@ export class FlinkSqlParserVisitor<Result> extends AbstractParseTreeVisitor<Resu
* @return the visitor result
*/
visitColumnName?: (ctx: ColumnNameContext) => Result;
/**
* Visit a parse tree produced by `FlinkSqlParser.columnNamePath`.
* @param ctx the parse tree
* @return the visitor result
*/
visitColumnNamePath?: (ctx: ColumnNamePathContext) => Result;
/**
* Visit a parse tree produced by `FlinkSqlParser.columnNameList`.
* @param ctx the parse tree
Expand Down Expand Up @@ -923,11 +930,11 @@ export class FlinkSqlParserVisitor<Result> extends AbstractParseTreeVisitor<Resu
*/
visitMatchRecognizeClause?: (ctx: MatchRecognizeClauseContext) => Result;
/**
* Visit a parse tree produced by `FlinkSqlParser.orderByCaluse`.
* Visit a parse tree produced by `FlinkSqlParser.orderByClause`.
* @param ctx the parse tree
* @return the visitor result
*/
visitOrderByCaluse?: (ctx: OrderByCaluseContext) => Result;
visitOrderByClause?: (ctx: OrderByClauseContext) => Result;
/**
* Visit a parse tree produced by `FlinkSqlParser.orderItemDefition`.
* @param ctx the parse tree
Expand Down
2 changes: 2 additions & 0 deletions src/parser/flink/flinkErrorListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export class FlinkErrorListener extends ParseErrorListener {
[FlinkSqlParser.RULE_functionName, 'function'],
[FlinkSqlParser.RULE_functionNameCreate, 'function'],
[FlinkSqlParser.RULE_columnName, 'column'],
[FlinkSqlParser.RULE_columnNamePath, 'column'],
[FlinkSqlParser.RULE_columnNameCreate, 'column'],
]);

Expand Down Expand Up @@ -51,6 +52,7 @@ export class FlinkErrorListener extends ParseErrorListener {
case FlinkSqlParser.RULE_viewPath:
case FlinkSqlParser.RULE_functionName:
case FlinkSqlParser.RULE_columnName:
case FlinkSqlParser.RULE_columnNamePath:
case FlinkSqlParser.RULE_catalogPath: {
result.push(`{existing}${name}`);
break;
Expand Down
14 changes: 14 additions & 0 deletions src/parser/flink/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export class FlinkSQL extends BasicSQL<FlinkSqlLexer, ProgramContext, FlinkSqlPa
FlinkSqlParser.RULE_functionName, // functionName
FlinkSqlParser.RULE_functionNameCreate, // functionName that will be created
FlinkSqlParser.RULE_columnName,
FlinkSqlParser.RULE_columnNamePath,
FlinkSqlParser.RULE_columnNameCreate,
]);

Expand Down Expand Up @@ -110,6 +111,19 @@ export class FlinkSQL extends BasicSQL<FlinkSqlLexer, ProgramContext, FlinkSqlPa
syntaxContextType = EntityContextType.COLUMN_CREATE;
break;
}
case FlinkSqlParser.RULE_columnNamePath: {
if (
candidateRule.ruleList.includes(FlinkSqlParser.RULE_selectClause) ||
candidateRule.ruleList.includes(FlinkSqlParser.RULE_whereClause) ||
candidateRule.ruleList.includes(FlinkSqlParser.RULE_groupByClause) ||
candidateRule.ruleList.includes(FlinkSqlParser.RULE_limitClause) ||
candidateRule.ruleList.includes(FlinkSqlParser.RULE_whenClause) ||
candidateRule.ruleList.includes(FlinkSqlParser.RULE_havingClause)
) {
syntaxContextType = EntityContextType.COLUMN;
}
break;
}
default:
break;
}
Expand Down
17 changes: 17 additions & 0 deletions test/parser/flink/errorListener.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const sql3 = `DROP VIEW IF EXIsST aaa aaa`;
const sql4 = `SELECT * froma aaa`;
const sql5 = `CREATE VIEW `;
const sql6 = `DROP CATALOG `;
const sql7 = `SELECT SUM(amount) FROM Orders GROUP BY length(users) HAVING SUM( `;

describe('FlinkSQL validate invalid sql and test msg', () => {
const flink = new FlinkSQL();
Expand Down Expand Up @@ -64,6 +65,14 @@ describe('FlinkSQL validate invalid sql and test msg', () => {
);
});

test('validate unComplete sql7', () => {
const errors = flink.validate(sql7);
expect(errors.length).toBe(1);
expect(errors[0].message).toBe(
`Statement is incomplete, expecting an existing function or an existing column or a keyword`
);
});

test('validate random text cn', () => {
flink.locale = 'zh_CN';
const errors = flink.validate(randomText);
Expand Down Expand Up @@ -97,4 +106,12 @@ describe('FlinkSQL validate invalid sql and test msg', () => {
expect(errors.length).toBe(1);
expect(errors[0].message).toBe(`'aaa' 在此位置无效,期望一个存在的column或者一个关键字`);
});

test('validate unComplete sql7 cn', () => {
const errors = flink.validate(sql7);
expect(errors.length).toBe(1);
expect(errors[0].message).toBe(
`语句不完整,期望一个存在的function或者一个存在的column或者一个关键字`
);
});
});
8 changes: 7 additions & 1 deletion test/parser/flink/suggestion/fixtures/syntaxSuggestion.sql
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,10 @@ INSERT INTO tb (col, tb.c );

CREATE TABLE yourTable (ts TIMESTAMP(3), WATERMARK FOR );

CREATE TABLE newTable ( );
CREATE TABLE newTable ( );

SELECT SUM(amount) FROM Orders GROUP BY length(users) HAVING SUM(amount) > 50;

SELECT * FROM Orders ORDER BY orderTime LIMIT length(order_id);

SELECT age CASE WHEN age < 18 THEN 1 ELSE 0 END AS is_minor FROM dt_catalog.dt_db.subscriptions;
85 changes: 85 additions & 0 deletions test/parser/flink/suggestion/syntaxSuggestion.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -374,4 +374,89 @@ describe('Flink SQL Syntax Suggestion', () => {
expect(suggestion).not.toBeUndefined();
expect(suggestion?.wordRanges.map((token) => token.text)).toEqual([]);
});

test('Select expression column', () => {
const pos: CaretPosition = {
lineNumber: 43,
column: 18,
};
const syntaxes = flink.getSuggestionAtCaretPosition(
commentOtherLine(syntaxSql, pos.lineNumber),
pos
)?.syntax;
const suggestion = syntaxes?.find(
(syn) => syn.syntaxContextType === EntityContextType.COLUMN
);

expect(suggestion).not.toBeUndefined();
expect(suggestion?.wordRanges.map((token) => token.text)).toEqual(['amount']);
});

test('Group by expression column', () => {
const pos: CaretPosition = {
lineNumber: 43,
column: 53,
};
const syntaxes = flink.getSuggestionAtCaretPosition(
commentOtherLine(syntaxSql, pos.lineNumber),
pos
)?.syntax;
const suggestion = syntaxes?.find(
(syn) => syn.syntaxContextType === EntityContextType.COLUMN
);

expect(suggestion).not.toBeUndefined();
expect(suggestion?.wordRanges.map((token) => token.text)).toEqual(['users']);
});

test('Having expression column', () => {
const pos: CaretPosition = {
lineNumber: 43,
column: 72,
};
const syntaxes = flink.getSuggestionAtCaretPosition(
commentOtherLine(syntaxSql, pos.lineNumber),
pos
)?.syntax;
const suggestion = syntaxes?.find(
(syn) => syn.syntaxContextType === EntityContextType.COLUMN
);

expect(suggestion).not.toBeUndefined();
expect(suggestion?.wordRanges.map((token) => token.text)).toEqual(['amount']);
});

test('Limit by expression column', () => {
const pos: CaretPosition = {
lineNumber: 45,
column: 62,
};
const syntaxes = flink.getSuggestionAtCaretPosition(
commentOtherLine(syntaxSql, pos.lineNumber),
pos
)?.syntax;
const suggestion = syntaxes?.find(
(syn) => syn.syntaxContextType === EntityContextType.COLUMN
);

expect(suggestion).not.toBeUndefined();
expect(suggestion?.wordRanges.map((token) => token.text)).toEqual(['order_id']);
});

test('When by expression column', () => {
const pos: CaretPosition = {
lineNumber: 47,
column: 25,
};
const syntaxes = flink.getSuggestionAtCaretPosition(
commentOtherLine(syntaxSql, pos.lineNumber),
pos
)?.syntax;
const suggestion = syntaxes?.find(
(syn) => syn.syntaxContextType === EntityContextType.COLUMN
);

expect(suggestion).not.toBeUndefined();
expect(suggestion?.wordRanges.map((token) => token.text)).toEqual(['age']);
});
});

0 comments on commit c2f525d

Please sign in to comment.