Skip to content

Commit

Permalink
feat(posts): rewrite flink materialized view post
Browse files Browse the repository at this point in the history
  • Loading branch information
KKKIIO committed Nov 26, 2023
1 parent c7758cf commit a9cf000
Showing 1 changed file with 60 additions and 73 deletions.
133 changes: 60 additions & 73 deletions _posts/2022-06-10-flink-materialized-view.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,34 @@ categories: engineering
tags: stream-processing database
---

几周前面试遇到一道题:在一个预约业务里,展示顾客复购的情况,是典型的在几张业务表上聚合信息的需求。

用 SQL 可以很方便统计出我们需要的数据,但每次查询都要扫全表性能很差,加缓存又担心维护成本高。
对于比较固定的统计需求,[物化视图(Materialized View)](https://en.wikipedia.org/wiki/Materialized_view)是一个低成本的方案。
它是指预计算结果到一张新表,缩短读路径,提高查询性能。

![Materialized View](https://docs.microsoft.com/en-us/azure/architecture/patterns/_images/materialized-view-pattern-diagram.png)

几周前面试遇到一道题:在一个预约业务里,展示顾客复购的情况,是典型的在几张业务表上聚合信息的需求。用 SQL 可以很方便表达出我们需要的数据,但每次查询都要扫全表性能很差,这时可以使用[物化视图(Materialized View)](https://en.wikipedia.org/wiki/Materialized_view),即预计算结果到一张表或 ElasticSearch 索引,就能缩短读路径,提高查询性能。物化视图有多种实现方式,各有明显优缺点,本文介绍用 Flink SQL 实现物化视图这种方案。
## 数据库的物化视图

## 流处理+数据库
有些传统数据库会提供物化视图功能,例如 [PostgreSQL Materialized Views](https://www.postgresql.org/docs/current/rules-materializedviews.html)。但通常无法实时更新,例如 PostgreSQL 的 Materialized Views 要用户用`REFRESH`更新,不能自动增量更新。

物化视图有两种实现方式
前几年兴起的流数据库可以轻松解决这个问题,例如[RisingWave](https://docs.risingwave.com/docs/current/intro/)介绍里的第一句就是

- 存储服务原生实现:跟用 SQL 创建 View 一样,很方便,但功能一般不全。像 PostgreSQL 提供的 [Materialized Views](https://www.postgresql.org/docs/current/rules-materializedviews.html),要用户用`REFRESH`更新,也就是不能自动增量更新。
- 外部程序同步数据:开发外部程序监听数据存储的变化([Change Data Capture(CDC)](https://en.wikipedia.org/wiki/Change_data_capture)),经过一些计算再把结果写进存储服务,例如写程序读 MySQL Binlog 增量更新结果。好处是灵活不受限制,但坏在需要开发工作。
> RisingWave specializes in providing **incrementally updated, consistent materialized views**
这两种方式都有明显优缺点。恰巧面试后隔天我看一个[新闻](https://www.infoq.cn/article/OIFS2PtAZlMsgBbsW6eO)提到流数据库,突然想到可以用流处理框架 [Flink](https://nightlies.apache.org/flink/flink-docs-release-1.13/) 的 Table API 实现 Materialized View,这方案行得通的话我们就可以同时拥有两个优点:极少工作量的 SQL + 自动增量更新。
流数据库无法代替传统的关系型数据库,要单独部署,目前普及程度很低。
在流数据库兴起之前,我们可以用流处理框架(例如 [Apache Flink](https://flink.apache.org/)) 实现物化视图。

我很快写了用 Flink SQL 解决这道面试题的[Demo](https://github.com/KKKIIO/materialized_view_flink),途中遇到一些部署难和文档不全的问题,所幸 Flink SQL 的简洁强大到达了我的预期。末尾我还写了 Flink DataStream 用算子实现 Materialized View 的方案作为对比,感兴趣可以看一下。
## 流处理实现物化视图

## 用 Flink SQL 实现物化视图
流数据库和流处理框架实现物化视图的原理是一样的,都是监听数据存储的变化([Change Data Capture(CDC)](https://en.wikipedia.org/wiki/Change_data_capture)),经过数据转换、连接和计算,再把结果写进存储服务。

Flink 还提供了 SQL API,稍后我们会看到它的简洁和强大。

本文的代码 Demo 存放在[Github 仓库](https://github.com/KKKIIO/materialized_view_flink)上。

### 面试题
### 题目介绍

首先我们有三张已经存在的业务表,分别是顾客表、预约表、顾客预约习惯表:

Expand Down Expand Up @@ -60,7 +70,9 @@ CREATE TABLE `customer_reorder_tab` (
);
```

### Flink SQL 读取表数据
## 用 Flink SQL 实现物化视图

### 读取表数据

[MySQL CDC Connector](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html) 能让我们把 MySQL 表引入 Flink 作为数据来源(Source),是现成的解决方案,美中不足的是文档不够详细。

Expand All @@ -87,7 +99,7 @@ env.executeSql(

除了连接配置和列类型,表定义跟 MySQL 几乎是一样的。

### Flink SQL 写入表数据
### 写入表数据

[JDBC SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/) 能让我们把 MySQL 表作为 Flink 的输出(Sink)。

Expand All @@ -101,7 +113,7 @@ env.executeSql(
mysqlHost, mysqlPort, mysqlDb, mysqlUser, mysqlPassword));
```

### 用 Flink SQL 表达聚合计算
### 聚合计算

我们用 `JOIN` 连接三个表,用 `customer_id` 做分组(`GROUP BY`)聚合出结果,并更新(`INSERT INTO`)到`customer_reorder_tab`

Expand Down Expand Up @@ -139,37 +151,20 @@ after waiting 3s, mismatch count: 0
...
```

没问题再看看拓扑图:
没问题再看看拓扑图:

![topology](/assets/image/flink-sql-topo.png)

也很直观,容易对应到 SQL 的 Join。

## Flink SQL 的工程问题

Flink SQL 这么简洁,这个解决方案可以在生产环境上用吗?

可以,但**门槛**有些高。

### 繁重的部署

![Flink Architecture](https://nightlies.apache.org/flink/flink-docs-master/fig/deployment_overview.svg)

Flink 一般作为集群部署,写一个计算程序需要部署上图这么多组件(即便出了[Application Mode](https://flink.apache.org/news/2020/07/14/application-mode.html)可以向 Kubernetes 提交单应用集群简化部署,组件还是一个没少),一般人都会犹豫,害怕一个问题变成两个问题。

相比,如果你能用 CLI 连接流数据库,直接输入 SQL 就能得到一个 Materialized View,是不是更让人心动呢?

### 高复杂度

用一个 Go 程序,把 Binlog 位置和计算结果都放到 MySQL,这样简单的方案即使出现问题,你都能很快地定位到并修复它。

而像 Flink 这样一个追求通用、高可用、强一致性的一个庞大的框架(跟所有的 Java 框架一样),你很难对它有掌握感,Java 库泛滥的 Runtime Dispatch 也是我本次看源码解决问题时的一个头疼的点。
## 用 Flink DataStream 理解流处理

## 附:用 Flink DataStream 实现物化视图
文章本该到这里结束,以体现 Flink SQL 的简洁和强大,但 Flink SQL 抽象程度太高,第一次接触流处理的人大概率不明白它是怎么工作的。
我们可以用 Flink [DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/learn-flink/datastream_api/) 重新实现一遍物化视图,理解它的工作原理。

Flink 跟 MapReduce 、 Apache Storm 那段大数据时期的项目一样,一开始就提供了编程接口 [DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/learn-flink/datastream_api/)。我们用 DataStream API 重新实现一下物化视图,对比一下其他方案,还可以看到 Flink 里的一些流处理概念
DataStream 是 Flink 开始就提供的编程模型,它提供了很多函数式编程的接口,例如 `map``filter``keyBy``reduce` 等,可以组合成一个实时数据处理的拓扑图

### Flink MySQL Source
### 解析 MySQL 事件

依旧先引入 Source

Expand Down Expand Up @@ -237,11 +232,15 @@ public class ChangeDeserializer implements DebeziumDeserializationSchema<Change>
}
```

### 单独处理`customer_tab`以减少聚合运算
### 自定义拓扑

这次我想处理得更简单些,因为`customer_tab`的数据可以单独同步到`customer_reorder_tab`,而`customer_reorder_tab`其他需要计算的字段并不需要`customer_tab`的参与,`order_tab``customer_preference_tab`都有`customer_id`,可以自己聚合。
Flink SQL 生成的拓扑有些复杂了:

我们把`customer_tab`的变更事件单独拎出来:
1. `customer_tab` 的数据可以直接写到`customer_reorder_tab`,统计字段可以初始化为 0
2. `order_tab``customer_preference_tab`都有`customer_id`,不需要 JOIN `customer_tab`,可以自己算。

用 DataStream 我们可以简化这个拓扑结构。
先把 `customer_tab` 的变更事件单独拎出来:

```java
val customerTag = new OutputTag<Customer>("customer") { };
Expand Down Expand Up @@ -282,8 +281,6 @@ mainStream.getSideOutput(customerTag).addSink(JdbcSink.sink(
.name("MySQL Customer Sink");
```

### 分组聚合`order_tab``customer_preference_tab`

接下来我们将`order_tab``customer_preference_tab`的变更根据`customer_id`分组,每遇到一个变更就更新一下`customer_reorder_tab`

```java
Expand All @@ -308,13 +305,20 @@ mainStream.keyBy(c -> {
.name("MySQL ReorderInfo Sink");
```

### 分组聚合需要状态(State)
### 算子的中间状态

上面我们使用了一个 `ReorderCalc` 算子,它可以理解成对 `Change`_reduce_

回顾函数式编程里 _reduce_ 的定义,它每次会给合并函数一个累计值和一个新元素。
在 Flink 里,累计值是算子的状态(State),会被定期持久化(Checkpoint),用于重启和故障恢复。

`ReorderCalc` 里,我们用 `ReorderState` 类表示算子的状态,它包含三个字段:

`customer_reorder_tab`新的值,可以对同个`customer_id``Change`_reduce_ 得出:
1. `OrderCount`:预约单总数
2. `LastOrderTime`:最后的预约时间
3. `Frequency`:预约习惯

- `order_count`:遇到新订单就递增
- `last_order_time`:遇到订单变更就取更大的值
- `expected_next_order_time`:遇到顾客习惯变更就更新`frequency`,并将`frequency``last_order_time`相加
合并函数只需做一些递增、比较和取最大值的操作:

```java
public class ReorderCalc extends RichMapFunction<Change, ReorderInfo> {
Expand Down Expand Up @@ -369,41 +373,24 @@ public class ReorderCalc extends RichMapFunction<Change, ReorderInfo> {
}
```

拓扑图变得很简单,因为 Flink 会合并 Trivial 的步骤:
### 成果

下面是用 DataStream 实现的拓扑图:

![topology](/assets/image/flink-ds-topo.png)

测试也过了:
拓扑图变得很简单,因为 Flink 会合并 Trivial 的步骤,只有分组操作 `keyBy` 增加了一个节点。

```
...
Day 13
new customer count: 77, new order count: 160
after waiting 3s, mismatch count: 0
Day 14
new customer count: 31, new order count: 132
after waiting 3s, mismatch count: 0
Day 15
new customer count: 30, new order count: 140
after waiting 3s, mismatch count: 0
Day 16
new customer count: 56, new order count: 164
after waiting 3s, mismatch count: 0
Day 17
new customer count: 35, new order count: 166
after waiting 3s, mismatch count: 0
...
```
## 工程问题

### DataStream 比 SQL 更繁琐
Flink SQL 这么简洁,这个解决方案可以在生产环境上用吗?

Flink DataStream 明显难用了很多,它需要你处理具体的(增删改)事件,虽然没有 `JOIN` ,但我们还是要显示地管理每个顾客的状态`reorderState`,而 Flink SQL 让用户不太需要关注这些实现细节。
可以,但**门槛**有些高,主要有两个问题:

Flink DataStream 理论上可以做的事情比 Flink SQL 多,但代码多了,可能出错的地方也会变多,例如我一开始没有考虑 Order 是否是新增事件就递增了`state.orderCount`
1. 运维成本高:Flink 以集群方式部署,如果实现一个物化视图需要部署好几个服务,大概率会引入新的问题

### DataStream 比 Homemade Program 更标准
![Flink Architecture](https://nightlies.apache.org/flink/flink-docs-master/fig/deployment_overview.svg)

基本上自研程序也是在做流处理:监听变化事件、写入计算结果,实现思路是差不多的。不同点在于高可用和强一致的方案:
2. 学习/维护成本高:Flink 作为一个通用、高可用、强一致性的流处理框架,复杂度很高。

- Flink 可以定时 Checkpoint 各个步骤的 State ,增强每个步骤的模块化,坏处是要提供额外的存储(`EmbeddedRocksDBStateBackend`),而且 Flink 只(能)保证内部状态的一致性,外部结果只能保证最终一致性,例如重启 Flink Job 可能会导致 MySQL 数据“回退”几秒。
- 自研程序要自己实现 **事务性的存取** Binlog 位置和计算中间结果等数据,好处是可以实现端到端(end-to-end)的一致性,例如使用 MySQL 事务存取。
看来流数据库的兴起不是没有道理的,把计算和存储闭环在一起,降低流处理系统的使用成本。

0 comments on commit a9cf000

Please sign in to comment.