CanalAdmin初体验

给女友整理的canalHA部署教程

环境

1
2
3
4
5
6
jdk1.8
mysql-8.0.23
canal-admin-1.1.4
canal-deployer-1.1.4
zookeeper
kafka

MySQL前置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# 开启binlog
vi /etc/my.cnf
[mysqld]
# 开启binlog
log-bin=mysql-bin
# 日志记录方式,建议使用ROW模式
binlog-format=ROW
# 给当前mysql一个server id,之后的CDC工具里配置的不能跟这个重复
server_id=1

# 重启服务
service mysqld restart
service mysqld status

# 查看是否生效
show variables like 'binlog_format';
show variables like 'log_bin';

# 被监听MySQL上需要canal用户,可以做复制操作
CREATE USER canal IDENTIFIED BY 'canal';

# 如果创建用户时报错,密码不满足复杂度,可以使用下面的命令修改密码复杂度条件
# 查看密码策略
SHOW VARIABLES LIKE 'validate_password%';
# 设置密码策略为LOW
set global validate_password_policy=LOW;
# 设置密码最短位数
set global validate_password_length=4;

# 赋权
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;

# 刷新权限
FLUSH PRIVILEGES;

# 查看canal用户的权限
show grants for 'canal'

# 创建canal_admin管理用户
CREATE USER canal_admin IDENTIFIED BY 'canal_admin';
GRANT ALL PRIVILEGES ON *.* TO 'canal_admin'@'%' ;
FLUSH PRIVILEGES;

# 查询MySQL用户密码
select user,host,plugin,authentication_string from mysql.user;

# 加密插件可能不为mysql_native_password,需要修改
grant system_user on *.* to 'root';
alter user 'canal'@'%' identified with mysql_native_password by 'canal';
alter user 'canal_admin'@'%' identified with mysql_native_password by 'canal_admin';

canal_admin启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 注意,我的是MySQL8,所以lib下的mysql_jdbc驱动包需要替换成8的
# 修改配置文件
vi conf/application.yml
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: mac:3306
database: canal_manager
username: canal_admin
password: canal_admin
# 改,mysql8驱动类
driver-class-name: com.mysql.cj.jdbc.Driver
# 如果启动报错Public Key,加上allowPublicKeyRetrieval=true配置
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: 123456

# 执行canal_manager初始SQL脚本
# 注意,这里需要将canal_manager中初始化的表中的modified_time修改为可以为空
# 代码bug,插入时直接插入了空,还不如不插......迷之操作
mysql -ucanal_admin -pcanal_admin
source /opt/canal_admin/conf/canal_manager.sql

# 启动
./bin/startup.sh

# 查看日志
cat ./logs/admin.log

# WebUI
http://mac:8089/

canal集群配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# 这部分都是在WebUI上操作
集群管理->新建集群->输入集群名称和ZK地址
集群名称: canal_cluster
ZK地址: mac:2181

# 点击操作->主配置->载入模板
# 需要修改的地方,我做了标记
#################################################
######### common argument #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# 改: 填自己配置的canal用户和密码,这里密码用的是MySQL中的加密串
canal.user = canal
canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
# 改: canal_admin地址
canal.admin.manager = mac:8089
canal.admin.port = 11110
# 改: 这里是canal_admin中application.yml中配置的admin.user,密码是加密串,mysql8可以用上面的修改加密插件的方式获取
canal.admin.user = admin
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9

# 改: 配置ZK
canal.zkServers = mac:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
# 改: 修改为kafka
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size = 1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

#################################################
######### destinations #############
#################################################
canal.destinations =
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = manager
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
#canal.instance.global.spring.xml = classpath:spring/file-instance.xml
# 改: 改为default-instance.xml
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
######### MQ #############
##################################################
# 改: 填写kafka集群,下面的配置我选择注释,到时候需要时放开
canal.mq.servers = mac:9092
#canal.mq.servers = 127.0.0.1:6667
#canal.mq.retries = 0
#canal.mq.batchSize = 16384
#canal.mq.maxRequestSize = 1048576
#canal.mq.lingerMs = 100
#canal.mq.bufferMemory = 33554432
#canal.mq.canalBatchSize = 50
#canal.mq.canalGetTimeout = 100
#canal.mq.flatMessage = true
#canal.mq.compressionType = none
#canal.mq.acks = all
#canal.mq.properties. =
#canal.mq.producerGroup = test
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =

##################################################
######### Kafka Kerberos Info #############
##################################################
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"

# 保存

canal_server启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 修改配置
vi conf/canal_local.properties
# register ip
# 每台canal_server的IP不一样
canal.register.ip = mac

# canal admin config
# 下面的配置每台canal_server都保持一致
canal.admin.manager = mac:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
# admin auto register
canal.admin.register.auto = true
# 上面WebUI创建的canal集群名称
canal.admin.register.cluster = canal_cluster

# 启动
/opt/canal_server/bin/startup.sh local

# 可以会WebUI查看,已经添加上去了

# 再在其他节点启动canal_server,记的修改配置的注册IP,这里不赘述了

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# 也是WebUI操作
# Instance管理->新建Instance->载入模板
# 配置Instance名称,选择所属集群
# 参数文档https://github.com/alibaba/canal/wiki/AdminGuide

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
# 改: 需要获取的MySQL数据
canal.instance.master.address=mac:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
# 改: 有binlog权限的用户
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
# 改: 配置监控规则,正则匹配,监控temp下所有表
canal.instance.filter.regex=temp\\..*
#canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
# 改: 指定输出的kafka的topic
canal.mq.topic=canal_temp
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

# 保存->启动

# 往temp库中任意表插入数据

# 查看数据
kafka-topics --zookeeper mac:2181 --list
kafka-console-consumer --bootstrap-server mac:9092 --topic canal_temp --from-beginning --property print.key=true
null {"data":[{"id":"1","value":"2"}],"database":"temp","es":1630635189000,"id":3,"isDdl":false,"mysqlType":{"id":"varchar(12)","value":"varchar(12)"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":12,"value":12},"table":"test","ts":1630635189626,"type":"INSERT"}