diff --git a/_posts/2022-06-10-flink-materialized-view.md b/_posts/2022-06-10-flink-materialized-view.md index 24fbd47..8d02da6 100644 --- a/_posts/2022-06-10-flink-materialized-view.md +++ b/_posts/2022-06-10-flink-materialized-view.md @@ -6,24 +6,34 @@ categories: engineering tags: stream-processing database --- +几周前面试遇到一道题:在一个预约业务里,展示顾客复购的情况,是典型的在几张业务表上聚合信息的需求。 + +用 SQL 可以很方便统计出我们需要的数据,但每次查询都要扫全表性能很差,加缓存又担心维护成本高。 +对于比较固定的统计需求,[物化视图(Materialized View)](https://en.wikipedia.org/wiki/Materialized_view)是一个低成本的方案。 +它是指预计算结果到一张新表,缩短读路径,提高查询性能。 + ![Materialized View](https://docs.microsoft.com/en-us/azure/architecture/patterns/_images/materialized-view-pattern-diagram.png) -几周前面试遇到一道题:在一个预约业务里,展示顾客复购的情况,是典型的在几张业务表上聚合信息的需求。用 SQL 可以很方便表达出我们需要的数据,但每次查询都要扫全表性能很差,这时可以使用[物化视图(Materialized View)](https://en.wikipedia.org/wiki/Materialized_view),即预计算结果到一张表或 ElasticSearch 索引,就能缩短读路径,提高查询性能。物化视图有多种实现方式,各有明显优缺点,本文介绍用 Flink SQL 实现物化视图这种方案。 +## 数据库的物化视图 -## 流处理+数据库 +有些传统数据库会提供物化视图功能,例如 [PostgreSQL Materialized Views](https://www.postgresql.org/docs/current/rules-materializedviews.html)。但通常无法实时更新,例如 PostgreSQL 的 Materialized Views 要用户用`REFRESH`更新,不能自动增量更新。 -物化视图有两种实现方式: +前几年兴起的流数据库可以轻松解决这个问题,例如[RisingWave](https://docs.risingwave.com/docs/current/intro/)介绍里的第一句就是: -- 存储服务原生实现:跟用 SQL 创建 View 一样,很方便,但功能一般不全。像 PostgreSQL 提供的 [Materialized Views](https://www.postgresql.org/docs/current/rules-materializedviews.html),要用户用`REFRESH`更新,也就是不能自动增量更新。 -- 外部程序同步数据:开发外部程序监听数据存储的变化([Change Data Capture(CDC)](https://en.wikipedia.org/wiki/Change_data_capture)),经过一些计算再把结果写进存储服务,例如写程序读 MySQL Binlog 增量更新结果。好处是灵活不受限制,但坏在需要开发工作。 +> RisingWave specializes in providing **incrementally updated, consistent materialized views** -这两种方式都有明显优缺点。恰巧面试后隔天我看一个[新闻](https://www.infoq.cn/article/OIFS2PtAZlMsgBbsW6eO)提到流数据库,突然想到可以用流处理框架 [Flink](https://nightlies.apache.org/flink/flink-docs-release-1.13/) 的 Table API 实现 Materialized View,这方案行得通的话我们就可以同时拥有两个优点:极少工作量的 SQL + 自动增量更新。 +流数据库无法代替传统的关系型数据库,要单独部署,目前普及程度很低。 +在流数据库兴起之前,我们可以用流处理框架(例如 [Apache Flink](https://flink.apache.org/)) 实现物化视图。 -我很快写了用 Flink SQL 解决这道面试题的[Demo](https://github.com/KKKIIO/materialized_view_flink),途中遇到一些部署难和文档不全的问题,所幸 Flink SQL 的简洁强大到达了我的预期。末尾我还写了 Flink DataStream 用算子实现 Materialized View 的方案作为对比,感兴趣可以看一下。 +## 流处理实现物化视图 -## 用 Flink SQL 实现物化视图 +流数据库和流处理框架实现物化视图的原理是一样的,都是监听数据存储的变化([Change Data Capture(CDC)](https://en.wikipedia.org/wiki/Change_data_capture)),经过数据转换、连接和计算,再把结果写进存储服务。 + +Flink 还提供了 SQL API,稍后我们会看到它的简洁和强大。 + +本文的代码 Demo 存放在[Github 仓库](https://github.com/KKKIIO/materialized_view_flink)上。 -### 面试题 +### 题目介绍 首先我们有三张已经存在的业务表,分别是顾客表、预约表、顾客预约习惯表: @@ -60,7 +70,9 @@ CREATE TABLE `customer_reorder_tab` ( ); ``` -### Flink SQL 读取表数据 +## 用 Flink SQL 实现物化视图 + +### 读取表数据 [MySQL CDC Connector](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html) 能让我们把 MySQL 表引入 Flink 作为数据来源(Source),是现成的解决方案,美中不足的是文档不够详细。 @@ -87,7 +99,7 @@ env.executeSql( 除了连接配置和列类型,表定义跟 MySQL 几乎是一样的。 -### Flink SQL 写入表数据 +### 写入表数据 [JDBC SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/) 能让我们把 MySQL 表作为 Flink 的输出(Sink)。 @@ -101,7 +113,7 @@ env.executeSql( mysqlHost, mysqlPort, mysqlDb, mysqlUser, mysqlPassword)); ``` -### 用 Flink SQL 表达聚合计算 +### 聚合计算 我们用 `JOIN` 连接三个表,用 `customer_id` 做分组(`GROUP BY`)聚合出结果,并更新(`INSERT INTO`)到`customer_reorder_tab`表 @@ -139,37 +151,20 @@ after waiting 3s, mismatch count: 0 ... ``` -没问题,再看看拓扑图: +没问题。再看看拓扑图: ![topology](/assets/image/flink-sql-topo.png) 也很直观,容易对应到 SQL 的 Join。 -## Flink SQL 的工程问题 - -Flink SQL 这么简洁,这个解决方案可以在生产环境上用吗? - -可以,但**门槛**有些高。 - -### 繁重的部署 - -![Flink Architecture](https://nightlies.apache.org/flink/flink-docs-master/fig/deployment_overview.svg) - -Flink 一般作为集群部署,写一个计算程序需要部署上图这么多组件(即便出了[Application Mode](https://flink.apache.org/news/2020/07/14/application-mode.html)可以向 Kubernetes 提交单应用集群简化部署,组件还是一个没少),一般人都会犹豫,害怕一个问题变成两个问题。 - -相比,如果你能用 CLI 连接流数据库,直接输入 SQL 就能得到一个 Materialized View,是不是更让人心动呢? - -### 高复杂度 - -用一个 Go 程序,把 Binlog 位置和计算结果都放到 MySQL,这样简单的方案即使出现问题,你都能很快地定位到并修复它。 - -而像 Flink 这样一个追求通用、高可用、强一致性的一个庞大的框架(跟所有的 Java 框架一样),你很难对它有掌握感,Java 库泛滥的 Runtime Dispatch 也是我本次看源码解决问题时的一个头疼的点。 +## 用 Flink DataStream 理解流处理 -## 附:用 Flink DataStream 实现物化视图 +文章本该到这里结束,以体现 Flink SQL 的简洁和强大,但 Flink SQL 抽象程度太高,第一次接触流处理的人大概率不明白它是怎么工作的。 +我们可以用 Flink [DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/learn-flink/datastream_api/) 重新实现一遍物化视图,理解它的工作原理。 -Flink 跟 MapReduce 、 Apache Storm 那段大数据时期的项目一样,一开始就提供了编程接口 [DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/learn-flink/datastream_api/)。我们用 DataStream API 重新实现一下物化视图,对比一下其他方案,还可以看到 Flink 里的一些流处理概念。 +DataStream 是 Flink 开始就提供的编程模型,它提供了很多函数式编程的接口,例如 `map`、`filter`、`keyBy`、`reduce` 等,可以组合成一个实时数据处理的拓扑图。 -### Flink MySQL Source +### 解析 MySQL 事件 依旧先引入 Source @@ -237,11 +232,15 @@ public class ChangeDeserializer implements DebeziumDeserializationSchema } ``` -### 单独处理`customer_tab`以减少聚合运算 +### 自定义拓扑 -这次我想处理得更简单些,因为`customer_tab`的数据可以单独同步到`customer_reorder_tab`,而`customer_reorder_tab`其他需要计算的字段并不需要`customer_tab`的参与,`order_tab`和`customer_preference_tab`都有`customer_id`,可以自己聚合。 +Flink SQL 生成的拓扑有些复杂了: -我们把`customer_tab`的变更事件单独拎出来: +1. `customer_tab` 的数据可以直接写到`customer_reorder_tab`,统计字段可以初始化为 0 +2. `order_tab`和`customer_preference_tab`都有`customer_id`,不需要 JOIN `customer_tab`,可以自己算。 + +用 DataStream 我们可以简化这个拓扑结构。 +先把 `customer_tab` 的变更事件单独拎出来: ```java val customerTag = new OutputTag("customer") { }; @@ -282,8 +281,6 @@ mainStream.getSideOutput(customerTag).addSink(JdbcSink.sink( .name("MySQL Customer Sink"); ``` -### 分组聚合`order_tab`和`customer_preference_tab` - 接下来我们将`order_tab`和`customer_preference_tab`的变更根据`customer_id`分组,每遇到一个变更就更新一下`customer_reorder_tab`: ```java @@ -308,13 +305,20 @@ mainStream.keyBy(c -> { .name("MySQL ReorderInfo Sink"); ``` -### 分组聚合需要状态(State) +### 算子的中间状态 + +上面我们使用了一个 `ReorderCalc` 算子,它可以理解成对 `Change` 的 _reduce_ 。 + +回顾函数式编程里 _reduce_ 的定义,它每次会给合并函数一个累计值和一个新元素。 +在 Flink 里,累计值是算子的状态(State),会被定期持久化(Checkpoint),用于重启和故障恢复。 + +在 `ReorderCalc` 里,我们用 `ReorderState` 类表示算子的状态,它包含三个字段: -`customer_reorder_tab`新的值,可以对同个`customer_id`的`Change`做 _reduce_ 得出: +1. `OrderCount`:预约单总数 +2. `LastOrderTime`:最后的预约时间 +3. `Frequency`:预约习惯 -- `order_count`:遇到新订单就递增 -- `last_order_time`:遇到订单变更就取更大的值 -- `expected_next_order_time`:遇到顾客习惯变更就更新`frequency`,并将`frequency`与`last_order_time`相加 +合并函数只需做一些递增、比较和取最大值的操作: ```java public class ReorderCalc extends RichMapFunction { @@ -369,41 +373,24 @@ public class ReorderCalc extends RichMapFunction { } ``` -拓扑图变得很简单,因为 Flink 会合并 Trivial 的步骤: +### 成果 + +下面是用 DataStream 实现的拓扑图: ![topology](/assets/image/flink-ds-topo.png) -测试也过了: +拓扑图变得很简单,因为 Flink 会合并 Trivial 的步骤,只有分组操作 `keyBy` 增加了一个节点。 -``` -... -Day 13 -new customer count: 77, new order count: 160 -after waiting 3s, mismatch count: 0 -Day 14 -new customer count: 31, new order count: 132 -after waiting 3s, mismatch count: 0 -Day 15 -new customer count: 30, new order count: 140 -after waiting 3s, mismatch count: 0 -Day 16 -new customer count: 56, new order count: 164 -after waiting 3s, mismatch count: 0 -Day 17 -new customer count: 35, new order count: 166 -after waiting 3s, mismatch count: 0 -... -``` +## 工程问题 -### DataStream 比 SQL 更繁琐 +Flink SQL 这么简洁,这个解决方案可以在生产环境上用吗? -Flink DataStream 明显难用了很多,它需要你处理具体的(增删改)事件,虽然没有 `JOIN` ,但我们还是要显示地管理每个顾客的状态`reorderState`,而 Flink SQL 让用户不太需要关注这些实现细节。 +可以,但**门槛**有些高,主要有两个问题: -Flink DataStream 理论上可以做的事情比 Flink SQL 多,但代码多了,可能出错的地方也会变多,例如我一开始没有考虑 Order 是否是新增事件就递增了`state.orderCount`。 +1. 运维成本高:Flink 以集群方式部署,如果实现一个物化视图需要部署好几个服务,大概率会引入新的问题。 -### DataStream 比 Homemade Program 更标准 +![Flink Architecture](https://nightlies.apache.org/flink/flink-docs-master/fig/deployment_overview.svg) -基本上自研程序也是在做流处理:监听变化事件、写入计算结果,实现思路是差不多的。不同点在于高可用和强一致的方案: +2. 学习/维护成本高:Flink 作为一个通用、高可用、强一致性的流处理框架,复杂度很高。 -- Flink 可以定时 Checkpoint 各个步骤的 State ,增强每个步骤的模块化,坏处是要提供额外的存储(`EmbeddedRocksDBStateBackend`),而且 Flink 只(能)保证内部状态的一致性,外部结果只能保证最终一致性,例如重启 Flink Job 可能会导致 MySQL 数据“回退”几秒。 -- 自研程序要自己实现 **事务性的存取** Binlog 位置和计算中间结果等数据,好处是可以实现端到端(end-to-end)的一致性,例如使用 MySQL 事务存取。 +看来流数据库的兴起不是没有道理的,把计算和存储闭环在一起,降低流处理系统的使用成本。