当前位置 : 主页 > 编程语言 > java >

Canal——原理架构及应用场景

来源:互联网 收集:自由互联 发布时间:2023-02-04
Canal简介 Canal是阿里开源的一款基于Mysql数据库binlog的增量订阅和消费组件,通过它可以订阅数据库的binlog日志,然后进行一些数据消费,如数据镜像、数据异构、数据索引、缓存更新等

Canal简介

Canal是阿里开源的一款基于Mysql数据库binlog的增量订阅和消费组件,通过它可以订阅数据库的binlog日志,然后进行一些数据消费,如数据镜像、数据异构、数据索引、缓存更新等。相对于消息队列,通过这种机制可以实现数据的有序化和一致性。

 

github地址:https://github.com/alibaba/canal

完整wiki地址:https://github.com/alibaba/canal/wiki

Canal工作原理

原理相对比较简单:

  • 1、canal模拟mysql slave与mysql master的交互协议,伪装自己是一个mysql slave,向mysql master发送dump协议。
  • 2、mysql master收到mysql slave(canal)发送的dump请求,开始推送binlog增量日志给slave(也就是canal)。
  • 3、mysql slave(canal伪装的)收到binlog增量日志后,就可以对这部分日志进行解析,获取主库的结构及数据变更。

Mysql主从同步原理

canal工作原理其实也是基于mysql主从同步原理的,所以理解mysql主从同步原理是第一步

同步原理:

  • 1、Master主库,启动Binlog机制,将变更数据写入Binlog文件。( binary log,其中记录叫做二进制日志事件 log events,可以通过 show binlog events 进行查看)
  • 2、Slave(I/O thread),从Master主库拉取Binlog数据,将它拷贝到Slave的中继日志(relay log)中。
  • 3、Slave(SQL thread),回放relay log,更新从库数据以此来达到数据一致。

启用Binlog注意以下几点:

  • 1、Master主库一般会有多台Slave订阅,且Master主库要支持业务系统实时变更操作,服务器资源会有瓶颈。
  • 2、需要同步的数据表一定要有主键。

Mysql的binlog

  • 1、它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间。主要用来备份和数据同步。
  • 2、binlog 有三种:STATEMENT、ROW、MIXED。
    • STATEMENT:记录的是执行的sql语句。
    • ROW:记录的是真实的行数据记录。
    • MIXED:记录的是1+2,优先按照1的模式记录。

什么是中继日志?

  • 从服务器I/O线程将主服务器的二进制日志读取过来记录到从服务器本地文件,然后从服务器SQL线程会读取relay-log日志的内容并应用到从服务器,从而使从服务器和主服务器的数据保持一致。

Canal架构

说明:

  • 1、server代表一个canal运行实例,对应于一个jvm。
  • 2、instance对应于一个数据队列。

instance模块:

  • 1、eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • 2、eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • 3、eventStore (数据存储)
  • 4、metaManager (增量订阅&消费信息管理器)

Canal-HA机制

canal是支持HA的,其实现机制也是依赖zookeeper来实现的,用到的特性有watcher和EPHEMERAL节点(和session生命周期绑定),与HDFS的HA类似。

 

canal的ha分为两部分,canal server和canal client分别有对应的ha实现:

  • canal server:为了减少对mysql dump的请求,不同server上的instance(不同server上的相同instance)要求同一时间只能有一个处于running,其他的处于standby状态(standby是instance的状态)。
  • canal client:为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。

server ha的架构图如下:

大致步骤:

  • 1、canal server要启动某个canal instance时都先向zookeeper_进行一次尝试启动判断_(实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
  • 2、创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态。
  • 3、一旦zookeeper发现canal server A创建的instance节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance。
  • 4、canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect。

Canal Client的方式和canal server方式类似,也是利用zookeeper的抢占EPHEMERAL节点的方式进行控制

Canal应用场景

1、同步缓存redis/全文搜索ES

canal一个常见应用场景是同步缓存/全文搜索,当数据库变更后通过binlog进行缓存/ES的增量更新。当缓存/ES更新出现问题时,应该回退binlog到过去某个位置进行重新同步,并提供全量刷新缓存/ES的方法,如下图所示:

2、下发任务

另一种常见应用场景是下发任务,当数据变更时需要通知其他依赖系统。其原理是任务系统监听数据库变更,然后将变更的数据写入MQ/kafka进行任务下发,比如商品数据变更后需要通知商品详情页、列表页、搜索页等相关系统。这种方式可以保证数据下发的精确性,通过MQ发送消息通知变更缓存是无法做到这一点的,而且业务系统中不会散落着各种下发MQ的代码,从而实现了下发归集,如下图所示。

3、数据异构

在大型网站架构中,DB都会采用分库分表来解决容量和性能问题,但分库分表之后带来的新问题。比如不同维度的查询或者聚合查询,此时就会非常棘手。一般我们会通过数据异构机制来解决此问题。

 

所谓的数据异构,那就是将需要join查询的多表按照某一个维度又聚合在一个DB中。让你去查询。canal就是实现数据异构的手段之一。

canal的HA集群模式部署

1、canal下载地址

  • 下载地址:https://github.com/alibaba/canal/releases

  • 下载安装包: https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

  • 解压到安装目录

tar -zxf canal.deployer-1.1.5.tar.gz

解压出来的目录结构是这样的:

2、mysql开启binlog

MySQL,需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:

[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

3、mysql授权账号权限

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限(repication权限),如果已有账户可直接 grant:

CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;

备注

可以通过在 mysql 终端中执行以下命令判断配置是否生效:

show variables like 'log_bin'; show variables like 'binlog_format';

4、修改配置

canal服务的部署其实特别简单,解压之后只需修改canal.properties、instance.properties这两个配置两个文件即可。

修改canal.properties中配置

//指定注册的zk集群地址 canal.zkServers = 192.168.135.27:2181,192.168.135.28:2181,192.168.135.29:2181 //HA模式必须使用该xml,需要将相关数据写入zookeeper,保证数据集群共享 canal.instance.global.spring.xml = classpath:spring/default-instance.xml // 这个demo就是conf目录里的实例,如果要建别的实例'test'就建个test目录, // 把example里面的instance.properties文件拷贝到test的实例目录下就好了, // 然后在这里的配置就是canal.destinations = demo,test canal.destinations = demo

修改配置文件instance.properties

## mysql serverId , v1.0.26+ will autoGen # canal伪装的mysql slave的编号,不能与mysql数据库和其他的slave重复 canal.instance.mysql.slaveId=1003 # 按需修改成自己的数据库信息 # position info canal.instance.master.address=10.200.*.109:3306 # username/password 数据库的用户名和密码 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.defaultDatabaseName = test

5、详细步骤

canal 的HA集群模式部署详细步骤,可以访问:https://blog.csdn.net/XDSXHDYY/article/details/97825508

然后cd到bin目录 启动和停止canal-server 启动

sh startup.sh

停止

sh stop.sh

验证启动状态,查看log文件

vim canal/log/canal/canal.log 2021-07-11 16:17:38.224 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server. 2021-07-11 16:17:38.273 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.19.0.1:11111] 2021-07-11 16:17:38.569 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

上述日志信息显示启动canal成功

SpringBoot整合Canal

两种方式,官方提供的demo和springboot starter

1、官方提供的

<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.5</version> </dependency>

具体参考:https://blog.csdn.net/leilei1366615/article/details/108819651

2、springboot starter方式

  • 引入Maven依赖
<dependency> <groupId>top.javatool</groupId> <artifactId>canal-spring-boot-starter</artifactId> <version>1.2.1-RELEASE</version> </dependency>
  • 添加配置
canal: server: 127.0.0.1:11111 destination: demo
  • 实现EntryHandler接口
@Component public class ADHandler implements EntryHandler<SeckillGoods> { @Override public void insert(SeckillGoods seckillGoods) { // CanalModel可以得到当前这次的库名和表名 CanalModel canal = CanalContext.getModel(); //业务操作 //新增缓存操作等 } @Override public void update(SeckillGoods before, SeckillGoods after) { //业务操作 //更新缓存操作等 } @Override public void delete(SeckillGoods seckillGoods) { //业务操作 //删除缓存操作等 } }

参考: https://www.cnblogs.com/caoweixiong/p/11824423.html

https://www.cnblogs.com/huangxincheng/p/7456397.html

https://segmentfault.com/a/1190000023297973

https://blog.csdn.net/XDSXHDYY/article/details/97825508

https://blog.csdn.net/qq_46893497/article/details/111026996

上一篇:Canal高可用架构部署
下一篇:没有了
网友评论