CanalAdapter的使用

之前有给媳妇整理过一篇CanalAdmin初体验,这次换一个Adapter,体验下Canal的主从实时同步,顺带着看是否可以实现自定义的Sink,做到实时落湖

前提

1
2
3
4
传送门看之前的Canal使用
为了和之前的保持一致,这里还是选择1.1.4版本

我这里是将Source的数据转移到Kafka,然后Adapter将Kafka的数据修改入Sink中

Adapter配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
mkdir ./canal_adapter
tar -zxvf canal.adapter-1.1.4.tar.gz -C canal_adapter
cd ./canal_adapter
cp ./conf/application.yml ./conf/application.yml.bak

配置Adapter运行时配置
vi ./conf/application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null

canal.conf:
# 这里是canal的Instance配置的数据Sink端
mode: kafka # tcp kafka rocketMQ
canalServerHost: mac:11111
zookeeperHosts: mac:2181
mqServers: mac:9092 #or rocketmq
# flatMessage: true
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
# 原始数据源连接信息
srcDataSources:
defaultDS:
url: jdbc:mysql://mac:3306/temp?useUnicode=true
username: root
password: 123456
canalAdapters:
# Adapter的消费数据方式
- instance: canal_temp # canal instance Name or mq topic name
groups:
- groupId: g1
# 目的Adapter的配置,是可以一个输入对应多个输出的
outerAdapters:
- name: logger
- name: rdb
# 目的数据库连接配置
key: mysql1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://mac:3306/copy?useUnicode=true
jdbc.username: root
jdbc.password: 123456
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
# - name: es
# hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
# properties:
# mode: transport # or rest
# # security.auth: test:123456 # only used for rest mode
# cluster.name: elasticsearch

rdb配置信息
vi conf/rdb/mytest_user.yml
dataSourceKey: defaultDS
# 目的
destination: canal_temp
groupId: g1
outerAdapterKey: mysql1
concurrent: true
# 设置主从映射关系
dbMapping:
database: temp
table: test
targetTable: copy.test
targetPk:
id: id
# mapAll: true
targetColumns:
id:
value:
etlCondition: ""
commitBatch: 1 # 批量提交的大小
## Mirror schema synchronize config
#dataSourceKey: defaultDS
#destination: example
#groupId: g1
#outerAdapterKey: mysql1
#concurrent: true
#dbMapping:
# mirrorDb: true
# database: mytest

日志情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
首先咱们看下Adapter的日志情况
2021-12-08 14:20:04.333 [pool-10-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":"2","value":"3"}],"database":"temp","destination":"canal_temp","es":1638944403000,"groupId":"g1","isDdl":false,"old":null,"pkNames":null,"sql":"","table":"test","ts":1638944404125,"type":"INSERT"}
2021-12-08 14:20:04.399 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"2","value":"3"},"database":"temp","destination":"canal_temp","old":null,"table":"test","type":"INSERT"}
2021-12-08 14:21:45.318 [pool-10-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":"2","value":"4"}],"database":"temp","destination":"canal_temp","es":1638944505000,"groupId":"g1","isDdl":false,"old":[{"value":"3"}],"pkNames":null,"sql":"","table":"test","ts":1638944505276,"type":"UPDATE"}
2021-12-08 14:21:45.483 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"2","value":"4"},"database":"temp","destination":"canal_temp","old":{"value":"3"},"table":"test","type":"UPDATE"}
2021-12-08 14:24:06.777 [pool-10-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":"2","value":"4"}],"database":"temp","destination":"canal_temp","es":1638944646000,"groupId":"g1","isDdl":false,"old":null,"pkNames":null,"sql":"","table":"test","ts":1638944646718,"type":"DELETE"}
2021-12-08 14:24:06.841 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"2","value":"4"},"database":"temp","destination":"canal_temp","old":null,"table":"test","type":"DELETE"}

可以看到DML操作被Adapter捕捉,然后操作Sink端数据

接着看下Kafka内的数据
{"data":null,"database":"","es":1638782059000,"id":2,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE DATABASE `copy` CHARACTER SET 'utf8mb4' COLLATE 'utf8mb4_general_ci'","sqlType":null,"table":"","ts":1638782059582,"type":"QUERY"}
{"data":[{"id":"2","value":"3"}],"database":"temp","es":1638944403000,"id":3,"isDdl":false,"mysqlType":{"id":"varchar(12)","value":"varchar(12)"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":12,"value":12},"table":"test","ts":1638944404125,"type":"INSERT"}
{"data":[{"id":"2","value":"4"}],"database":"temp","es":1638944505000,"id":4,"isDdl":false,"mysqlType":{"id":"varchar(12)","value":"varchar(12)"},"old":[{"value":"3"}],"pkNames":null,"sql":"","sqlType":{"id":12,"value":12},"table":"test","ts":1638944505276,"type":"UPDATE"}
{"data":[{"id":"2","value":"4"}],"database":"temp","es":1638944646000,"id":7,"isDdl":false,"mysqlType":{"id":"varchar(12)","value":"varchar(12)"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":12,"value":12},"table":"test","ts":1638944646718,"type":"DELETE"}

创建库的咱们忽略,这个是需要剔除掉的,主要的数据是DML操作记录数据
更新时old字段中会包含修改前的数据,这里好像做过优化,只有修改字段,未修改的字段不会包括在内