Skip to content

Commit

Permalink
Advise CDC doc
Browse files Browse the repository at this point in the history
  • Loading branch information
azexcy committed Nov 14, 2023
1 parent 11c4bc7 commit ab459b3
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ ShardingSphere CDC 分为两个部分,一个是 CDC Server,另一个是 CDC
#### 1. 源码编译安装

1. 准备代码环境,提前下载或者使用 Git clone,从 Github 下载 [ShardingSphere](https://github.com/apache/shardingsphere.git) 源码。
2. 删除 kernel/global-clock/type/tso/core/pom.xml 中 shardingsphere-global-clock-tso-provider-redis 依赖的 `<scope>provided</scope>` 标签和 kernel/global-clock/type/tso/provider/redis/pom.xml 中 jedis 的 `<scope>provided</scope>` 标签
2. 删除 kernel/global-clock/type/tso/core/pom.xml 中 shardingsphere-global-clock-tso-provider-redis 依赖的 `<scope>provided</scope>` 标签和 kernel/global-clock/type/tso/provider/redis/pom.xml 中 jedis
`<scope>provided</scope>` 标签
3. 编译 ShardingSphere-Proxy,具体编译步骤请参考 [ShardingSphere 编译手册](https://github.com/apache/shardingsphere/wiki#build-apache-shardingsphere)

#### 2. 直接引入 GLT 依赖
Expand All @@ -47,13 +48,15 @@ ShardingSphere CDC 分为两个部分,一个是 CDC Server,另一个是 CDC

配置示例:

1.`server.yaml` 中开启 CDC 功能。

```yaml
mode:
type: Cluster
repository:
type: ZooKeeper
props:
namespace: open_cdc
namespace: cdc_demo
server-lists: localhost:2181
retryIntervalMilliseconds: 500
timeToLiveSeconds: 60
Expand All @@ -62,20 +65,16 @@ mode:

authority:
users:
# 这里的用户名和密码在 CDC Client 的认证中也会用到
- user: root@%
password: root
- user: proxy
password: Proxy@123
privilege:
type: ALL_PERMITTED

# 开启 GLT 的时候也需要打开分布式事务
#transaction:
# defaultType: XA
# providerType: Atomikos

# GLT 模块配置,如果不需要 GLT 模块,可以不配置
#
#globalClock:
# enabled: true
# type: TSO
Expand All @@ -84,12 +83,12 @@ authority:
# host: 127.0.0.1
# port: 6379


props:
system-log-level: INFO
check-table-metadata-enabled: false
proxy-default-port: 3307 # Proxy default port.
cdc-server-port: 33071 # CDC Server 端口,必须配置
proxy-frontend-database-protocol-type: openGauss
# 省略其他配置
......
#proxy-frontend-database-protocol-type: openGauss # 和后端数据库的类型一致
```

2. 引入 JDBC 驱动。
Expand All @@ -98,10 +97,10 @@ proxy 已包含 PostgreSQL JDBC 驱动。

如果后端连接以下数据库,请下载相应 JDBC 驱动 jar 包,并将其放入 `${shardingsphere-proxy}/ext-lib` 目录。

| 数据库 | JDBC 驱动 | 参考 |
|-----------|---------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------|
| MySQL | [mysql-connector-java-8.0.11.jar]( https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.49/mysql-connector-java-5.1.49.jar ) | [Connector/J Versions]( https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.11/mysql-connector-java-8.0.11.jar ) |
| openGauss | [opengauss-jdbc-3.1.1-og.jar]( https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/3.1.1-og/opengauss-jdbc-3.1.1-og.jar ) | |
| 数据库 | JDBC 驱动 |
|-----------|---------------------------------------------------------------------------------------------------------------------------------|
| MySQL | [mysql-connector-java-8.0.31.jar](https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.31/) |
| openGauss | [opengauss-jdbc-3.1.1-og.jar](https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/3.1.1-og/opengauss-jdbc-3.1.1-og.jar) |

4. 启动 ShardingSphere-Proxy:

Expand All @@ -117,44 +116,31 @@ sh bin/start.sh

确认启动成功。

## CDC Client 使用手册

用户可以通过 CDC Client 和服务端进行交互,CDC Client 的依赖很轻,只包含 netty 以及 CDC 协议相关的依赖。

有两种方式可以引入 CDC Client

1. 源码编译,CDC Client 在编译 Proxy 的时候会一同编译,在 kernel/data-pipeline/scenario/cdc/client/target 目录下可以找到编译后的 jar 文件
2. 从 maven 仓库获取,[Shardingsphere Data Pipeline CDC Client](https://mvnrepository.com/artifact/io.github.greycode/shardingsphere-data-pipeline-cdc-client)

### CDC Client 介绍

`org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient` 是 CDC Client 的入口类,用户可以通过该类和 CDC Server 进行交互。主要的和新方法如下。

| 方法名 | 返回值 | 说明 |
|----------------------------------------------------| --- | --- |
| await() | void | 阻塞 CDC 线程,await channel 关闭 |
| close() | void | 关闭 channel |
| connect() | void | 和服务端进行连接 |
| login (CDCLoginParameter parameter) | void | 登陆验证 |
| startStreaming (StartStreamingParameter parameter) | java.lang.String (CDC 任务唯一标识) | 开启 CDC 订阅 |
| restartStreaming (java.lang.String streamingId) | void | 重启订阅 |
| stopStreaming (java.lang.String streamingId) | void | 停止订阅 |

## CDC Client 手册

### CDC Client 使用示例
CDC Client 不需要额外部署,只需要通过 maven 引入 CDC Client 的依赖就可以在项目中使用。用户可以通过 CDC Client 和服务端进行交互。

目前 CDC Client 只提供了 Java API,用户需要自行实现数据的消费逻辑。

1. 引入 CDC Client
如果有需要,用户也可以自行实现一个 CDC Client,进行数据的消费和 ACK。

```xml
<dependency>
<groupId>io.github.greycode</groupId>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-data-pipeline-cdc-client</artifactId>
<version>${version}</version>
</dependency>
```

2. 启动 CDC Client
### CDC Client 介绍

`org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient` 是 CDC Client 的入口类,用户可以通过该类和 CDC Server 进行交互。主要的和新方法如下。

参考 [Example](https://github.com/apache/shardingsphere/blob/master/kernel/data-pipeline/scenario/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/Bootstrap.java) 启动 CDC Client。
| 方法名 | 返回值 | 说明 |
|-----------------------------------------------------------------------------------------------------------------------------|--------------------------------------|---------------------------------------------------------------------------------|
| connect(Consumer<List<Record>> dataConsumer, ExceptionHandler exceptionHandler, ServerErrorResultHandler errorResultHandler | void | 和服务端进行连接,连接的时候需要指定 1. 数据的消费处理逻辑 2. 消费时候的异常处理逻辑 3. 服务端错误的异常处理逻辑 |
| login(CDCLoginParameter parameter) | void | CDC 登陆 CDCLoginParameter 参数 - username:用户名 - password:密码 |
| startStreaming(StartStreamingParameter parameter) | java.lang.String (CDC 任务唯一标识,用于后续操作) | 开启 CDC 订阅 StartStreamingParameter 参数 - database:逻辑库名称 - schemaTables:订阅的表名 - full:是否订阅全量数据 |
| restartStreaming(String streamingId) | void | 重启订阅 |
| stopStreaming(String streamingId) | void | 停止订阅 |
| dropStreaming(String streamingId) | void | 删除订阅 |
| await() | void | 阻塞 CDC 线程,等待 channel 关闭 |
| close() | void | 关闭 channel,流程结束。 |
Loading

0 comments on commit ab459b3

Please sign in to comment.