什么是CDC?
CDC是(Change Data Capture 变更数据获取)的简称。
核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INSERT、更新UPDATE、删除DELETE等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
基于查询的CDC 和 基于日志的CDC
CDC主要分为基于查询和基于Binlog两种方式
经过以上对比,我们可以发现基于日志CDC 有以下这几种优势:
能够捕获所有数据的变化,捕获完整的变更记录。在异地容灾,数据备份等场景中得到广泛应用,如果是基于查询的 CDC 有可能导致两次查询的中间一部分数据丢失
每次 DML 操作均有记录无需像查询 CDC 这样发起全表扫描进行过滤,拥有更高的效率和性能,具有低延迟,不增加数据库负载的优势
无需入侵业务,业务解耦,无需更改业务模型
捕获删除事件和捕获旧记录的状态,在查询 CDC 中,周期的查询无法感知中间数据是否删除
在实时性、吞吐量方面占优,如果数据源是 MySQL、PostgreSQL、MongoDB 等常见的数据库实现,建议使用 Debezium 来实现变更数据的捕获(下图来自 Debezium 官方文档)。如果使用的只有 MySQL,则可以用 Canal。
Flink CDC
Flink社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。目前也已开源,开源地址:https://github.com/ververica/flink-cdc-connectors
我们先从之前的数据架构来看CDC的内容
以上是之前的mysql binlog日志处理流程,例如canal监听binlog把日志写入到kafka中。而Flink实时消费Kakfa的数据实现mysql数据的同步或其他内容等。
拆分来说整体上可以分为以下几个阶段。
1.mysql开启binlog
2.canal同步binlog数据写入到kafka
3.flink读取kakfa中的binlog数据进行相关的业务处理。
整体的处理链路较长,需要用到的组件也比较多。Flink CDC可以直接从数据库获取到binlog供下游进行业务计算分析,从内部实现上讲,Flink CDC Connectors 内置了一套 Debezium 和 Kafka 组件,但这个细节对用户屏蔽,简单来说链路会变成这样
也就是说数据不再通过canal与kafka进行同步,而flink直接进行处理mysql的数据。节省了canal与kafka的过程。
Flink CDC原理简述
在最新 CDC 调研报告中,Debezium 和 Canal 是目前最流行使用的 CDC 工具,这些 CDC 工具的核心原理是抽取数据库日志获取变更。
在经过一系列调研后,目前 Debezium (支持全量、增量同步,同时支持 MySQL、PostgreSQL、Oracle 等数据库),使用较为广泛。
Flink SQL CDC 内置了 Debezium 引擎,利用其抽取日志获取变更的能力,将
changelog 转换为 Flink SQL 认识的 RowData 数据。(以下右侧是 Debezium
的数据格式,左侧是 Flink 的 RowData 数据格式)。
RowData 代表了一行的数据,在 RowData 上面会有一个元数据的信息 RowKind,RowKind 里面包括了插入(+I)、更新前(-U)、更新后(+U)、删除(-D),这样和数据库里面的 binlog 概念十分类似。
通过 Debezium 采集的数据,包含了旧数据(before)和新数据行(after)以及原数据信息(source),op 的 u 表示是update 更新操作标识符(op 字段的值 c,u,d,r 分别对应 create,update,delete,reade),ts_ms 表示同步的时间戳。
基于 Flink SQL CDC 的数据同步方案实践
案例 1 : Flink SQL CDC + JDBC Connector
这个案例通过订阅我们订单表(事实表)数据,通过 Debezium 将 MySQL Binlog 发送至 Kafka,通过维表 Join 和 ETL 操作把结果输出至下游的 PG 数据库。
案例 2 : CDC Streaming ETL
电商公司的订单表和物流表,需要对订单数据进行统计分析,对于不同的信息需要进行关联后续形成订单的大宽表后,交给下游的业务方使用 ES 做数据分析,这个案例演示了如何只依赖 Flink 不依赖其他组件,借助 Flink 强大的计算能力实时把 Binlog 的数据流关联一次并同步至 ES 。
例如如下的这段 Flink SQL 代码就能完成实时同步 MySQL 中 orders 表的全量+增量数据的目的。
CREATE TABLE orders (order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders'
);
SELECT * FROM orders
案例 3 : Streaming Changes to Kafka
下面案例就是对 GMV 进行天级别的全站统计。包含插入/更新/删除,只有付款的订单才能计算进入 GMV ,观察 GMV 值的变化。