在使用大数据进行数据计算的时候,首先我们需要获取到数据。如果是从MySQL获取数据的话,可以选择阿里的开源组件canal,它将自己伪装成MySQL的slave来接收数据。

开启MySQL的binlog设置

首先我们查看MySQL是否打开了binlog复制的功能

mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | OFF   |
+---------------+-------+
1 row in set (0.00 sec)

如果没有打开,就编辑MySQL的配置文件/etc/my.cnf,添加如下配置

# binlog文件名
log-bin=mysql-bin
# 选择row模式
binlog_format=ROW
# mysql实例id,不能和canal的slaveId重复
server_id=1

然后使用命令systemctl restart mysqld重启MySQL,之后再次查看配置,此时binlog复制功能已经开启了

mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.00 sec)

安装canal

开启了MySQL的binlog功能之后我们下载canal.deployer文件,下载完毕进行解压

mkdir canal
tar -zxvf canal.deployer-1.1.4.tar.gz -C canal

之后修改文件conf/example/instance.properties,修改如下配置

# slaveId,和之前的server_id不能一样
canal.instance.mysql.slaveId = 2
# 数据库的地址端口
canal.instance.master.address = 172.19.34.19:3306
# 数据库的用户名
canal.instance.dbUsername = root
# 数据库的密码
canal.instance.dbPassword = 1234

还要修改文件conf/canal.properties,添加ZooKeeper的地址设置

canal.zkServers = 172.19.65.196:2181,172.19.72.108:2181,172.19.72.112:2181

在另一台机器上面也下载解压canal文件,同样进行如上的配置。唯一的区别就是将这台机器的slaveId设置和mysql的server_id以及之前那台canal机器的slaveId不一样,这里我设置这台机器的slaveId为3。

配置好了之后使用./bin/startup.sh启动两台机器的canal服务,随后可以在MySQL上面执行show master status;查看binlog的位点。连接ZK之后我们也能看到我们所创建的两个canal服务地址

[zk] ls /otter/canal/cluster
[172.19.65.136:11111, 172.19.65.228:11111]

创建从canal获取数据的客户端

启动好了canal之后我们创建一个maven项目,并且添加依赖

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<!-- 可选,其实canal已经依赖了该包,但是我在执行时总是会报包不存在的错误,所以此处手动添加了依赖 -->
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.6.1</version>
</dependency>

使用如下代码从canal中获取MySQL的数据变更

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;

import java.util.List;

public class Test {

public static void main(String[] args) {
CanalConnector connector = CanalConnectors.newClusterConnector("172.19.65.196:2181,172.19.72.108:2181,172.19.72.112:2181", "example", "", "");
try {
// 打开连接
connector.connect();
// 订阅数据库表,全部表
connector.subscribe(".*\\..*");
// 回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(1000);
// 获取批量ID
long batchId = message.getId();
// 获取批量的数量
int size = message.getEntries().size();
// 如果没有数据
if (batchId == -1 || size == 0) {
// 线程休眠2秒
Thread.sleep(2000);
} else {
// 如果有数据,处理数据
printEntry(message.getEntries());
}
// 进行batch id的确认。确认之后,小于等于此batchId的Message都会被确认。
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}

private static void printEntry(List<Entry> entries) {
for (Entry entry : entries) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
// 开启/关闭事务的实体类型,跳过
continue;
}
// RowChange对象,包含了一行数据变化的所有特征
// 比如isDdl代表是否是ddl变更操作,sql代表具体的ddl sql,beforeColumns和afterColumns代表了变更前后的数据字段等等
RowChange rowChange;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of event has an error, data: " + entry, e);
}
// 获取操作类型:insert/update/delete类型
EventType eventType = rowChange.getEventType();
// 打印Header信息
System.out.printf("------ binlog[%s:%s], table[%s:%s], eventType: %s ------%n", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType);
// 判断是否是DDL语句
if (rowChange.getIsDdl()) {
System.out.printf("------ isDdl: true, sql: %s ------%n", rowChange.getSql());
}
// 获取RowChange对象里的每一行数据,打印出来
for (RowData rowData : rowChange.getRowDatasList()) {
if (eventType == EventType.DELETE) {
// 删除语句
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
// 新增语句
printColumn(rowData.getAfterColumnsList());
} else {
// 更新语句
// 变更前的数据
System.out.println("------->; before");
printColumn(rowData.getBeforeColumnsList());
// 变更后的数据
System.out.println("------->; after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}

private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println("[" + column.getName() + ":" + column.getValue() + "],(update:" + column.getUpdated() + ")");
}
}

}

启动如上程序,之后在我们对MySQL数据进行修改的时候,程序就会获取到相应的修改内容并且打印出来,如果两台canal的其中一台挂了,我们也还是能正常获取数据实现高可用的。

如下就是我们创建了一个名叫test的表并且进行数据的增改删,之后又删除了表的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
------ binlog[mysql-bin.000001:18205], table[bugatti:test], eventType: CREATE ------
------ isDdl: true, sql: /* ApplicationName=DataGrip 2021.3.4 */ create table test
(
name varchar(30) null
) ------
------ binlog[mysql-bin.000001:18564], table[bugatti:test], eventType: INSERT ------
[name:张三],(update:true)
------ binlog[mysql-bin.000001:18830], table[bugatti:test], eventType: UPDATE ------
------->; before
[name:张三],(update:false)
------->; after
[name:李四],(update:true)
------ binlog[mysql-bin.000001:19105], table[bugatti:test], eventType: DELETE ------
[name:李四],(update:false)
------ binlog[mysql-bin.000001:19244], table[bugatti:test], eventType: ERASE ------
------ isDdl: true, sql: DROP TABLE `test` /* generated by server */ ------

当然,当我们手动kill掉一台canal服务器模拟服务挂掉的情况的时候,程序也会自动连接另一台canal服务器实现高可用的,具体日志如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2021-04-26 11:29:19:093 WARN  --- [                main] c.a.o.c.c.i.ClusterCanalConnector : something goes wrong when getWithoutAck data from server:null
com.alibaba.otter.canal.protocol.exception.CanalClientException: java.io.IOException: end of stream when reading header
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.getWithoutAck(SimpleCanalConnector.java:325) ~[canal.client-1.1.4.jar:na]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.getWithoutAck(SimpleCanalConnector.java:295) ~[canal.client-1.1.4.jar:na]
at com.alibaba.otter.canal.client.impl.ClusterCanalConnector.getWithoutAck(ClusterCanalConnector.java:183) ~[canal.client-1.1.4.jar:na]
at me.hourui.canal.Test.main(Test.java:26) [classes/:na]
Caused by: java.io.IOException: end of stream when reading header
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.read(SimpleCanalConnector.java:413) ~[canal.client-1.1.4.jar:na]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.readNextPacket(SimpleCanalConnector.java:401) ~[canal.client-1.1.4.jar:na]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.readNextPacket(SimpleCanalConnector.java:385) ~[canal.client-1.1.4.jar:na]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.receiveMessages(SimpleCanalConnector.java:330) ~[canal.client-1.1.4.jar:na]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.getWithoutAck(SimpleCanalConnector.java:323) ~[canal.client-1.1.4.jar:na]
... 3 common frames omitted
2021-04-26 11:29:24:413 ERROR --- [ main] c.a.o.c.c.i.r.ClientRunningMonitor : There is an error when execute initRunning method, with destination [example].
com.alibaba.otter.canal.protocol.exception.CanalClientException: failed to subscribe with reason: something goes wrong with channel:[id: 0x5aa611b9, /172.19.6.8:53556 => /172.19.65.228:11111], exception=com.alibaba.otter.canal.server.exception.CanalServerException: destination:example should start first

at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.subscribe(SimpleCanalConnector.java:249) [canal.client-1.1.4.jar:na]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector$1.processActiveEnter(SimpleCanalConnector.java:434) ~[canal.client-1.1.4.jar:na]
at com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor.processActiveEnter(ClientRunningMonitor.java:221) [canal.client-1.1.4.jar:na]
at com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor.initRunning(ClientRunningMonitor.java:123) [canal.client-1.1.4.jar:na]
at com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor.start(ClientRunningMonitor.java:93) [canal.client-1.1.4.jar:na]
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.connect(SimpleCanalConnector.java:108) [canal.client-1.1.4.jar:na]
at com.alibaba.otter.canal.client.impl.ClusterCanalConnector.connect(ClusterCanalConnector.java:64) [canal.client-1.1.4.jar:na]
at com.alibaba.otter.canal.client.impl.ClusterCanalConnector.restart(ClusterCanalConnector.java:273) [canal.client-1.1.4.jar:na]
at com.alibaba.otter.canal.client.impl.ClusterCanalConnector.getWithoutAck(ClusterCanalConnector.java:189) [canal.client-1.1.4.jar:na]
at me.hourui.canal.Test.main(Test.java:26) [classes/:na]
2021-04-26 11:29:24:430 WARN --- [ main] c.a.o.c.c.i.ClusterCanalConnector : failed to connect to:/172.19.65.228:11111 after retry 0 times
2021-04-26 11:29:24:443 WARN --- [ main] c.a.o.c.c.i.r.ClientRunningMonitor : canal is not run any in node
2021-04-26 11:29:29:558 INFO --- [ main] c.a.o.c.c.i.ClusterCanalConnector : restart the connector for next round retry.
------ binlog[mysql-bin.000001:19244], table[bugatti:test], eventType: ERASE ------
------ isDdl: true, sql: DROP TABLE `test` /* generated by server */ ------