Test Flink Streaming SQL
本地执行flink streaming SQL快速进行验证,不依赖任何额外组件,帮助业务人员提高SQL开发效率。
tlink.properties内容如下,是运行一个sql需要配置的最小集,默认产生数据的字段为user,product,amount
tlink.streaming.sql.statement=SELECT user, product, SUM(amount) as amounts FROM Orders GROUP BY user, product
tlink.sink.table.fieldNames=user,product,amounts
tlink.sink.table.fieldTypes=LONG,STRING,INT
tlink.sink.table.type=Retract
执行sh build.sh
进行编译,会在build文件夹下面生成tlink目录
进入tlink目录执行下面命令
sh bin/tlink /opt/tlink/conf/tlink.properties
在控制台就可以看到类似如下的输出,会显示发送的数据以及sql运行的结果
sand data:3,foo,2
Result:(true,3,foo,2)
sand data:9,baz,4
Result:(true,9,baz,4)
sand data:9,foo,7
Result:(true,9,foo,7)
sand data:6,baz,6
Result:(true,6,baz,6)
sand data:2,foo,8
Result:(true,2,foo,8)
sand data:3,foo,8
Result:(false,3,foo,2)
基于Event time窗口的SQL可以参见下面的配置
tlink.streaming.sql.statement=SELECT user, TUMBLE_START(rowtime, INTERVAL '5' SECOND) as wStart, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND), user
tlink.source.table.fieldNames=user,product,amount,rowtime.rowtime
tlink.source.table.fieldTypes=LONG,STRING,INT,LONG
tlink.source.eventTime.index=3
tlink.streaming.sql.env.timeCharacteristic=EVENT
tlink.sink.table.fieldNames=user,wStart,amounts
tlink.sink.table.fieldTypes=LONG,SQL_TIMESTAMP,INT
tlink.sink.table.type=Append
如果要使用blink planner请参见tlink-topN.properties配置
如果要使用多流join请参考tlink-join.properties配置
目前1.0.0版本支持如下特性
- 支持基于event time和processing time的窗口
- 支持随机产生数据
- 支持指定文件的方式产生数据
- 支持blink planer
- 支持2个流join、UnionAll等sql
- 支持通过ddl的方式创建数据源,目前只支持connector.type为filesystem
参数 | 默认值 | 含义 |
---|---|---|
tlink.source.table.names | 非必填,默认值Orders | 数据源注册的表名,最多支持2个表,表名用逗号分割 |
tlink.source.table.fieldNames | 非必填,默认值user, product, amount | 数据源字段名 |
tlink.source.table.fieldTypes | 非必填,默认值LONG, STRING, INT | 数据源字段类型 |
tlink.source.eventTime.index | 如果使用event time必填 | event time字段在所有字段中的位置 |
tlink.source.watermark.maxOutOfOrderness | 如果使用event time必填,默认值10000毫米 | 最大允许延迟时间 |
tlink.source.producer.mode | 非必填,默认值random | 产生数据的方式,可选值random或者file或者DDL |
tlink.source.producer.sql.statement | 非必填,默认值random | 产生数据的SQL语句 |
tlink.source.producer.file.path | 如果上面参数配置file,必填 | 数据文件绝对路径 |
tlink.source.producer.total | 非必填,默认20 | 随机模式下总共产生的数据量 |
tlink.source.producer.interval.ms | 非必填,默认1000毫秒 | 产生数据的固定时间间隔,如果不配置,采用下面的随机时间间隔 |
tlink.source.producer.interval.random.startInclusive | 非必填,默认值1 | 默认含义为RandomUtils.nextLong(1,5)*1000 |
tlink.source.producer.interval.random.endExclusive | 非必填,默认值5 | 默认含义为RandomUtils.nextLong(1,5)*1000 |
tlink.source.producer.interval.random.factor | 非必填,默认值1000 | 默认含义为RandomUtils.nextLong(1,5)*1000 |
tlink.source.producer.string.values | 非必填,默认值foo, bar, baz | 字符串字段候选数据集,随机选择一个作为string类型字段的值 |
tlink.source.producer.long.random.startInclusive | 非必填,默认值1 | 默认含义为RandomUtils.nextLong(1,10)*1 |
tlink.source.producer.long.random.endExclusive | 非必填,默认值10 | 默认含义为RandomUtils.nextLong(1,10)*1 |
tlink.source.producer.long.random.factor | 非必填,默认值1 | 默认含义为RandomUtils.nextLong(1,10)*1 |
tlink.source.producer.int.random.startInclusive | 非必填,默认值1 | 默认含义为RandomUtils.nextInt(1,10)*1 |
tlink.source.producer.int.random.endExclusive | 非必填,默认值10 | 默认含义为RandomUtils.nextInt(1,10)*1 |
tlink.source.producer.int.random.factor | 非必填,默认值1 | 默认含义为RandomUtils.nextInt(1,10)*1 |
tlink.source.producer.timestamp.random.startInclusive | 非必填,默认值1 | 默认通过RandomUtils.nextLong(1,10)*1000产生一个随机数,如果是偶数当前时间戳减去这个随机数作为event time,如果是奇数当前时间戳加上这个随机数作为event time |
tlink.source.producer.timestamp.random.endExclusive | 非必填,默认值10 | 默认通过RandomUtils.nextLong(1,10)*1000产生一个随机数,如果是偶数当前时间戳减去这个随机数作为event time,如果是奇数当前时间戳加上这个随机数作为event time |
tlink.source.producer.timestamp.random.factor | 非必填,默认值1000毫秒 | 默认通过RandomUtils.nextLong(1,10)*1000产生一个随机数,如果是偶数当前时间戳减去这个随机数作为event time,如果是奇数当前时间戳加上这个随机数作为event time |
tlink.sink.table.name | 非必填,默认值Output | 输出表的名字 |
tlink.sink.table.fieldNames | 必填,无默认值 | 输出字段 |
tlink.sink.table.fieldTypes | 必填,无默认值 | 输出字段类型 |
tlink.sink.table.type | 必填,无默认值 | 输出表的类型,可选值为Append或者Retract |
tlink.streaming.sql.env.parallelism | 非必填,默认值1 | 并行度 |
tlink.streaming.sql.env.timeCharacteristic | 非必填,默认值PROCESSING | 设置使用那种时间机制,可选值为EVENT或者PROCESSING |
tlink.streaming.sql.env.planner | 非必填,默认值old | 设置planner,可选值为old或者blink |
tlink.streaming.sql.statement | 必填,无默认值 | 要执行的sql语句 |