当前位置 博文首页 > 努力的老刘:Canal:同步mysql增量数据工具,一篇详解核心知识点
老刘是一名即将找工作的研二学生,写博客一方面是总结大数据开发的知识点,一方面是希望能够帮助伙伴让自学从此不求人。由于老刘是自学大数据开发,博客中肯定会存在一些不足,还希望大家能够批评指正,让我们一起进步!
大数据领域数据源有业务库的数据,也有移动端埋点数据、服务器端产生的日志数据。我们在对数据进行采集时根据下游对数据的要求不同,我们可以使用不同的采集工具来进行。今天老刘给大家讲的是同步mysql增量数据的工具Canal,本篇文章的大纲如下:
老刘争取用这一篇文章让大家直接上手 Canal 这个工具,不再花别的时间来学习。
由于 Canal 是用来同步 mysql 中增量数据的,所以老刘先讲 mysql 的主备复制原理,之后再讲 Canal 的核心知识点。
根据这张图,老刘把 mysql 的主备复制原理分解为如下流程:
那么 mysql 主备复制实现原理就讲完了,大家看完这个流程,能不能猜到 Canal 的工作原理?
Canal 的工作原理就是它模拟 MySQL slave 的交互协议,把自己伪装为 MySQL slave,向 MySQL master 发动 dump 协议。MySQL master 收到 dump 请求后,就会开始推送 binlog 给 Canal。最后 Canal 就会解析 binlog 对象。
Canal,美[k??n?l],是这样读的,意思是水道/管道/渠道,主要用途就是用来同步 MySQL 中的增量数据(可以理解为实时数据),是阿里巴巴旗下的一款纯 Java 开发的开源项目。
server 代表一个 canal 运行实例,对应于一个 JVM。 instance 对应于一个数据队列,1 个 canal server 对应 1..n 个 instance instance 下的子模块:
到现在 Canal 的基本概念就讲完了,那接下来就要讲 Canal 如何同步 mysql 的增量数据。
我们用 Canal 同步 mysql 增量数据的前提是 mysql 的 binlog 是开启的,阿里云的 mysql 数据库是默认开启 binlog 的,但是如果我们是自己安装的 mysql 需要手动开启 binlog 日志功能。
先找到 mysql 的配置文件:
etc/my.cnf
server-id=1
log-bin=mysql-bin
binlog-format=ROW
这里有一个知识点是关于 binlog 的格式,老刘给大家讲讲。
binlog 的格式有三种:STATEMENT、ROW、MIXED
ROW 模式(一般就用它)
日志会记录每一行数据被修改的形式,不会记录执行 SQL 语句的上下文相关信息,只记录要修改的数据,哪条数据被修改了,修改成了什么样子,只有 value,不会有 SQL 多表关联的情况。
优点:它仅仅只需要记录哪条数据被修改了,修改成什么样子了,所以它的日志内容会非常清楚地记录下每一行数据修改的细节,非常容易理解。
缺点:ROW 模式下,特别是数据添加的情况下,所有执行的语句都会记录到日志中,都将以每行记录的修改来记录,这样会产生大量的日志内容。
STATEMENT 模式
每条会修改数据的 SQL 语句都会被记录下来。
缺点:由于它是记录的执行语句,所以,为了让这些语句在 slave 端也能正确执行,那他还必须记录每条语句在执行过程中的一些相关信息,也就是上下文信息,以保证所有语句在 slave 端被执行的时候能够得到和在 master 端执行时候相同的结果。
但目前例如 step()函数在有些版本中就不能被正确复制,在存储过程中使用了 last-insert-id()函数,可能会使 slave 和 master 上得到不一致的 id,就是会出现数据不一致的情况,ROW 模式下就没有。
MIXED 模式
以上两种模式都使用。
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要修改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要修改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
其中,canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK,ISO-8859-1。
sh bin/startup.sh
关闭使用 bin/stop.sh
观察日志
一般使用 cat 查看 canal/canal.log、example/example.log
启动客户端
在 IDEA 中业务代码,mysql 中如果有增量数据就拉取过来,在 IDEA 控制台打印出来
在 pom.xml 文件中添加:
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.12</version>
</dependency>
添加客户端代码:
public class Demo {
public static void main(String[] args) {
//创建连接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop03", 11111),
"example", "", "");
connector.connect();
//订阅
connector.subscribe();
connector.rollback();
int batchSize = 1000;
int emptyCount = 0;
int totalEmptyCount = 100;
while (totalEmptyCount > emptyCount) {
Message msg = connector.getWithoutAck(batchSize);
long id = msg.getId();
List<CanalEntry.Entry> entries = msg.getEntries();
if(id == -1 || entries.size() == 0){
emptyCount++;
System.out.println("emptyCount : " + emptyCount);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}else{
emptyCount = 0;
printEntry(entries);
}
connector.ack(id);
}
}
// batch -> entries -> rowchange - rowdata -> cols
private static void printEntry(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries){
if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){
continue;
}
CanalEntry.RowChange rowChange = null;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
CanalEntry.EventType eventType = rowChange.getEventType();
System.out.println(entry.getHeader().getLogfileName()+" __ " +
entry.getHeader().getSchemaName() + " __ " + eventType);
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for(CanalEntry.RowData rowData : rowDatasList){
for(CanalEntry.Column column: rowData.getAfterColumnsList()){
System.out.println(column.getName() + " - " +
column.getValue() + " - " +
column.getUpdated());
}
}
}
}
}
在大数据领域很多框架都会有 HA 机制,Canal 的 HA 分为两部分,Canal server 和 Canal client 分别有对应的 HA 实现:
整个 HA 机制的控制主要是依赖了 ZooKeeper 的几个特性,ZooKeeper 这里就不讲了。
Canal Server: