也许还有小伙伴没接触过 canal ,我其实也刚接触两天左右,想了解的伙伴请前往官网文档
-
配置构思和书写
-
连接规则制定
-
书写 canal 客户端
-
监听 canal 客户端从 canal 服务端推送的消息,并处理。
-
处理消息机制,这里使用了集成接口和注解两种方式
-
测试类去测试所写的组件
-
创建配置文件
-
CanalConfig.java:读取 spring boot 配置文件信息
-
CanalClientConfiguration.java:加载 canal 配置,并启动客户端
-
-
注解使其成为 spring boot 组件
EnableCanalClient.java:该注解作用是启用 canal
-
canal 客户端书写:SimpleCanalClient.java
-
初始化监听器(注解方式:CanalEventListener.java;实现接口方式:DealCanalEventListener.java),这里通过一个工具类,BeanUtil,通过反射注入 bean (包含通过接口方式实现数据同步和注解方式的数据同步).
-
开启 canal 客户端,若是开启多个客户端,会开启多个进程。
-
初始化一个线程池,使得线程复用,减小频繁创建线程带来的内存开销。
-
通过线程池开启 canal 客户端,每一个客户端都是一个线程。
-
-
canal 客户端处理消息过程:AbstractBasicMessageTransponder.java;DefaultMessageTransponder.java
-
获取消息,判断消息可用性
-
可用的消息处理机制
-
消息消费完成确认
-
处理消息发生异常,等待设定的心跳时间进行重试,当重试机制次数超过指定的次数,停止 canal 客户端,结束线程。
-
-
canal 处理消息操作,主要通过反射和代理模式实现:ListenPoint.java;AbstractDBOption.java
-
创建表操作
通过注解方式:CreateTableListenPoint.java
实现接口方式:CreateTableOption.java
-
删除表操作
通过注解方式:DropTableListenPoint.java
实现接口方式:DropTableOption.java
-
修改表信息
通过注解方式:AlertTableListenPoint.java
实现接口方式:AlertTableOption.java
-
重新命名表
通过注解方式:RenameTableListenPoint.java
实现接口方式:RenameTableOption.java
-
创建索引
通过注解方式:CreateIndexListenPoint.java
实现接口方式:CreateIndexOption.java
-
删除索引
通过注解方式:DropIndexListenPoint.java
实现接口方式:DropIndexOption.java
-
新增数据
通过注解方式:InsertListenPoint.java
实现接口方式:InsertOption.java
-
更新数据
通过注解方式:UpdateListenPoint.java
实现接口方式:UpdateOption.java
-
删除数据
通过注解方式:DeleteListenPoint.java
实现接口方式:DeleteOption.java
-
-
整个流程是怎么运行的呢?下面请允许我通 UML 图的方式呈现出来【放大后更清晰】
#是否是集群模式
canal.client.instances.${wwjd}.clusterEnabled=true
canal.client.instances.${wwjd}.cluster-enabled=true
#zookeeper 地址
canal.client.instances.${wwjd}.zookeeperAddress=127.0.0.1:2181,127.1.1.1:2187
canal.client.instances.${wwjd}.zookeeper-address=127.0.0.1:2181,127.1.1.1:2187
#canal 服务器地址,默认是本地的环回地址
canal.client.instances.${wwjd}.host=127.0.0.1
#canal 服务设置的端口,默认 11111
canal.client.instances.${wwjd}.port=11111
#集群 设置的用户名
canal.client.instances.${wwjd}.userName=root
canal.client.instances.${wwjd}.user-name=root
#集群 设置的密码
canal.client.instances.${wwjd}.password=123456
#批量从 canal 服务器获取数据的最多数目
canal.client.instances.${wwjd}.batchSize=1000
canal.client.instances.${wwjd}.batch-size=1000
#是否有过滤规则
canal.client.instances.${wwjd}.filter=.*\\..*
#当错误发生时,重试次树
canal.client.instances.${wwjd}.retryCount=20
canal.client.instances.${wwjd}.retry-count=20
#信息捕获心跳时间
canal.client.instances.${wwjd}.acquireInterval=1000
canal.client.instances.${wwjd}.acquire-interval=1000
####### 假若你所有的环境都搞定了,包括 mysql 开启 binlog 日志,canal 伪装从数据库连接到 mysql 等,然后配置信息都正确,那就开始正文了
-
通过继承接口的方式处理数据:MyEventListenerimpl.java
-
通过注解的方式处理数据:MyAnnoEventListener.java
======================接口方式(修改表信息操作)==========================
use dao;
/* ApplicationName=IntelliJ IDEA 2018.1.2 */ ALTER TABLE user ADD age int DEFAULT 18 NOT NULL COMMENT '年龄'
======================================================
======================注解方式(修改表信息操作)==========================
use dao;
/* ApplicationName=IntelliJ IDEA 2018.1.2 */ ALTER TABLE user ADD age int DEFAULT 18 NOT NULL COMMENT '年龄'
======================================================
======================接口方式(新增数据操作)==========================
use dao;
INSERT INTO user(id,name,age) VALUES('85','阿导','107');
======================================================
======================注解方式(新增数据操作)==========================
use dao;
INSERT INTO user(id,name,age) VALUES('85','阿导','107');
======================================================