当前位置 : 主页 > 编程语言 > java >

DOCKER 部署 CANAL

来源:互联网 收集:自由互联 发布时间:2022-10-15
DOCKER 部署 CANAL 1、MYSQL开启binlog 前提MYSQL已经安装完成,canal采用读取Mysql的binlog日志来实现数据同步,需要修改mysql配置为难my.cnf,并将binlog的格式模式设置为ROW,其中server-id必须与后边
DOCKER 部署 CANAL
1、MYSQL开启binlog

前提MYSQL已经安装完成,canal采用读取Mysql的binlog日志来实现数据同步,需要修改mysql配置为难my.cnf,并将binlog的格式模式设置为ROW,其中server-id必须与后边canal配置文件中的server-id不相同。

# binlog
log-bin=mysql-bin
binlog_format=ROW
server-id=1

配置修改完后,重新启动mysql,并binlog是否启动成功

# 重启mysql
systemctl mysqld restart
# 查看binlog是否启动成功【结果显示为:(log_bin ON)表示启动成功】
SHOW VARIABLES LIKE 'log_bin'
2、mysql授权canal用户
#创建canal用户
CREATE USER 'canal' IDENTIFIED BY 'canal'
# 授权canal用户 查询、从复制、客户端复制权限
GRANT SELECT,REPLICATION SLAVE ,REPLICATION CLIENT ON *.* to 'canal'@'%' IDENTIFIED BY "canal";
# 刷新权限
flush PRIVILEGES;

如果出现【1819 - Your password does not satisfy the current policy requirements, Time: 0.043000s】异常,表明密码级别过高,可以适当降低密码级别

# 查看密码校验信息
SHOW VARIABLES LIKE 'validate_password%';
# 设置为校验策略为低级别
SET GLOBAL validate_password_policy=LOW;
# 设置密码校验长度为5
SET GLOBAL validate_password_length=5;
# 再次授权
GRANT SELECT,REPLICATION SLAVE ,REPLICATION CLIENT ON *.* to 'canal'@'%' IDENTIFIED BY "canal";
# 刷新权限
FLUSH PRIVILEGES;
3、创建并启动canal容器

创建存放canal配置文件目录【目录可自行指定】

mkdir -p /opt/canal/conf

创建canal配置文件

vim /opt/canal/conf/instance.properties

canal配置内容如下

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=120.48.130.125:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
# canal.instance.filter.regex=.*\\..*
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################

Canal【instance.properties】配置文件解析

canal.instance.master.address 需要监听的mysql主机ip:端口
canal.instance.dbUsername canal订阅binlog使用的用户
canal.instance.dbPassword canal订阅binlog使用的用户密码
canal.instance.connectionCharset = UTF-8 canal订阅的字符集
canal.instance.filter.regex 需要监听的mysql库和表
mysql库和表表达式:【备注:配置全库或库下所有表有可能出现 (errorNumber=1146, fieldCount=-1, message=Table 'home.BASE TABLE' doesn't exist, sqlState=42S02, sqlStateMarker=#)异常,尽量配置到指定库下的指定表】
1 .*\\..* : 全库
2 canal\\..* : 指定库下的所有表
3 canal\\.canal,test\\.test : 指定库下的指定表

启动docker容器

docker run -d \
-v /opt/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
-p 11111:11111 \
--name canal \
canal/canal-server:v1.1.6

查看是否启动成功

docker logs -f canal
4、客户端监听canal

创建需要监听的表

CREATE TABLE `sys_user_role` (
`user_id` bigint(20) NOT NULL COMMENT '用户ID',
`role_id` bigint(20) NOT NULL COMMENT '角色ID',
PRIMARY KEY (`user_id`,`role_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='用户和角色关联表'
4.1 代码实现监听所有表变动

引入依赖

<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>

代码实现【摘自他人】

package com.pango.system.service.impl;

import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

/**
* @date 2022/10/9
*/
public class SimpleCanalClientExample {

public static void main(String args[]) {

// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("120.48.130.125",
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}

}

private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}

private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}

验证

/** 插入数据 */
INSERT INTO sys_user_role VALUES (3,3)
/** 测试结果: ================> binlog[mysql-bin.000001:167986] , name[pango,sys_user_role] , eventType : INSERT
user_id : 3 update=true
role_id : 3 update=true
**/
/** 更新数据 */
UPDATE sys_user_role SET role_id = 4 WHERE user_id = 3
/** 测试结果:================> binlog[mysql-bin.000001:168265] , name[pango,sys_user_role] , eventType : UPDATE
-------> before
user_id : 3 update=false
role_id : 3 update=false
-------> after
user_id : 3 update=false
role_id : 4 update=true
**/
/** 删除数据 */
DELETE FROM sys_user_role WHERE user_id = 3
/** 测试结果:================> binlog[mysql-bin.000001:168562] , name[pango,sys_user_role] , eventType : DELETE
user_id : 3 update=false
role_id : 4 update=false
**/
4.2 代码实现监听单表变动

引入依赖

<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>

编写配置【可以查看canal.properties,该文件有canal的集群名字等配置描述】

canal:
destination: example # canal的集群名字,要与安装canal时设置的名称一致
server: 120.48.130.125:11111 # canal服务地址

代码实现

  • 表数据接收类,该类在类定义上一定要标注@Table(name = "sys_user_role")注解,且属性上一定要标注 @Column(name = "user_id")注解,否则无法接收到表数据
package com.pango.system.domain;

import com.baomidou.mybatisplus.annotation.TableName;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;

import javax.persistence.Column;
import javax.persistence.Table;
import java.io.Serializable;

/**
* 用户和角色关联 sys_user_role
*
*/
@Table(name = "sys_user_role")
public class SysUserRole implements Serializable
{
/** 用户ID */
@Column(name = "user_id")
private Long userId;

/** 角色ID */
@Column(name = "role_id")
private Long roleId;

public Long getUserId()
{
return userId;
}

public void setUserId(Long userId)
{
this.userId = userId;
}

public Long getRoleId()
{
return roleId;
}

public void setRoleId(Long roleId)
{
this.roleId = roleId;
}

@Override
public String toString() {
return new ToStringBuilder(this,ToStringStyle.MULTI_LINE_STYLE)
.append("userId", getUserId())
.append("roleId", getRoleId())
.toString();
}
}
  • 监听处理类
package com.pango.system.handler;

import com.pango.system.domain.SysUserRole;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

/**
* @date 2022/10/9
*/
@CanalTable(value = "sys_user_role")
@Component
public class UserRoleHandler implements EntryHandler<SysUserRole> {

/**
* 插入表sys_user_role数据时,触发该方法
* 如:INSERT INTO sys_user_role VALUES (3,3)
* @param sysUserRole
*/
@Override
public void insert(SysUserRole sysUserRole) {
System.out.println(sysUserRole);
}

/**
* 更新表sys_user_role数据时,触发该方法
* 如:UPDATE sys_user_role SET role_id = 4 WHERE user_id = 3
* @param before
* @param after
*/
@Override
public void update(SysUserRole before, SysUserRole after) {
System.out.println("before:"+before);
System.out.println("after:"+after);
}

/**
* 删除表sys_user_role数据时,触发该方法
* 如:DELETE FROM sys_user_role WHERE user_id = 3
* @param sysUserRole
*/
@Override
public void delete(SysUserRole sysUserRole) {
System.out.println(sysUserRole);
}
}
网友评论