当前位置 博文首页 > 努力的老刘:Canal:同步mysql增量数据工具,一篇详解核心知识点

    努力的老刘:Canal:同步mysql增量数据工具,一篇详解核心知识点

    作者:努力的老刘 时间:2021-01-22 12:36

    老刘是一名即将找工作的研二学生,写博客一方面是总结大数据开发的知识点,一方面是希望能够帮助伙伴让自学从此不求人。由于老刘是自学大数据开发,博客中肯定会存在一些不足,还希望大家能够批评指正,让我们一起进步!

    背景

    大数据领域数据源有业务库的数据,也有移动端埋点数据、服务器端产生的日志数据。我们在对数据进行采集时根据下游对数据的要求不同,我们可以使用不同的采集工具来进行。今天老刘给大家讲的是同步mysql增量数据的工具Canal,本篇文章的大纲如下:

    1. Canal 的概念
    2. mysql 中主备复制实现原理
    3. Canal 如何从 MySQL 中同步数据
    4. Canal 的 HA 机制设计
    5. 各种数据同步解决方法的简单总结

    老刘争取用这一篇文章让大家直接上手 Canal 这个工具,不再花别的时间来学习。

    mysql 主备复制实现原理

    由于 Canal 是用来同步 mysql 中增量数据的,所以老刘先讲 mysql 的主备复制原理,之后再讲 Canal 的核心知识点。

    根据这张图,老刘把 mysql 的主备复制原理分解为如下流程:

    1. 主服务器首先必须启动二进制日志 binlog,用来记录任何修改了数据库数据的事件。
    2. 主服务器将数据的改变记录到二进制 binlog 日志。
    3. 从服务器会将主服务器的二进制日志复制到其本地的中继日志(Relaylog)中。这一步细化的说就是首先从服务器会启动一个工作线程 I/O 线程,I/O 线程会跟主库建立一个普通的客户单连接,然后在主服务器上启动一个特殊的二进制转储(binlog dump)线程,这个 binlog dump 线程会读取主服务器上二进制日志中的事件,然后向 I/O 线程发送二进制事件,并保存到从服务器上的中继日志中。
    4. 从服务器启动 SQL 线程,从中继日志中读取二进制日志,并且在从服务器本地会再执行一次数据修改操作,从而实现从服务器数据的更新。

    那么 mysql 主备复制实现原理就讲完了,大家看完这个流程,能不能猜到 Canal 的工作原理?

    Canal 核心知识点

    Canal 的工作原理

    Canal 的工作原理就是它模拟 MySQL slave 的交互协议,把自己伪装为 MySQL slave,向 MySQL master 发动 dump 协议。MySQL master 收到 dump 请求后,就会开始推送 binlog 给 Canal。最后 Canal 就会解析 binlog 对象。

    Canal 概念

    Canal,美[k??n?l],是这样读的,意思是水道/管道/渠道,主要用途就是用来同步 MySQL 中的增量数据(可以理解为实时数据),是阿里巴巴旗下的一款纯 Java 开发的开源项目。

    Canal 架构

    server 代表一个 canal 运行实例,对应于一个 JVM。 instance 对应于一个数据队列,1 个 canal server 对应 1..n 个 instance instance 下的子模块:

    1. EventParser:数据源接入,模拟 salve 协议和 master 进行交互,协议解析
    2. EventSink:Parser 和 Store 链接器,进行数据过滤,加工,分发的工作
    3. EventStore:数据存储
    4. MetaManager: 增量订阅&消费信息管理器

    到现在 Canal 的基本概念就讲完了,那接下来就要讲 Canal 如何同步 mysql 的增量数据。

    Canal 同步 MySQL 增量数据

    开启 mysql binlog

    我们用 Canal 同步 mysql 增量数据的前提是 mysql 的 binlog 是开启的,阿里云的 mysql 数据库是默认开启 binlog 的,但是如果我们是自己安装的 mysql 需要手动开启 binlog 日志功能。

    先找到 mysql 的配置文件:

    etc/my.cnf

    server-id=1
    log-bin=mysql-bin
    binlog-format=ROW

    这里有一个知识点是关于 binlog 的格式,老刘给大家讲讲。

    binlog 的格式有三种:STATEMENT、ROW、MIXED

    1. ROW 模式(一般就用它)

      日志会记录每一行数据被修改的形式,不会记录执行 SQL 语句的上下文相关信息,只记录要修改的数据,哪条数据被修改了,修改成了什么样子,只有 value,不会有 SQL 多表关联的情况。

      优点:它仅仅只需要记录哪条数据被修改了,修改成什么样子了,所以它的日志内容会非常清楚地记录下每一行数据修改的细节,非常容易理解。

      缺点:ROW 模式下,特别是数据添加的情况下,所有执行的语句都会记录到日志中,都将以每行记录的修改来记录,这样会产生大量的日志内容。

    2. STATEMENT 模式

      每条会修改数据的 SQL 语句都会被记录下来。

      缺点:由于它是记录的执行语句,所以,为了让这些语句在 slave 端也能正确执行,那他还必须记录每条语句在执行过程中的一些相关信息,也就是上下文信息,以保证所有语句在 slave 端被执行的时候能够得到和在 master 端执行时候相同的结果。

      但目前例如 step()函数在有些版本中就不能被正确复制,在存储过程中使用了 last-insert-id()函数,可能会使 slave 和 master 上得到不一致的 id,就是会出现数据不一致的情况,ROW 模式下就没有。

    3. MIXED 模式

      以上两种模式都使用。

    Canal 实时同步

    1. 首先我们要配置环境,在 conf/example/instance.properties 下:
     ## mysql serverId
     canal.instance.mysql.slaveId = 1234
     #position info,需要修改成自己的数据库信息
     canal.instance.master.address = 127.0.0.1:3306
     canal.instance.master.journal.name =
     canal.instance.master.position =
     canal.instance.master.timestamp =
     #canal.instance.standby.address =
     #canal.instance.standby.journal.name =
     #canal.instance.standby.position =
     #canal.instance.standby.timestamp =
     #username/password,需要修改成自己的数据库信息
     canal.instance.dbUsername = canal
     canal.instance.dbPassword = canal
     canal.instance.defaultDatabaseName =
     canal.instance.connectionCharset = UTF-8
     #table regex
     canal.instance.filter.regex = .\*\\\\..\*

    其中,canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK,ISO-8859-1。

    1. 配置完后,就要启动了
     sh bin/startup.sh
     关闭使用 bin/stop.sh
    1. 观察日志

      一般使用 cat 查看 canal/canal.log、example/example.log

    2. 启动客户端

      在 IDEA 中业务代码,mysql 中如果有增量数据就拉取过来,在 IDEA 控制台打印出来

      在 pom.xml 文件中添加:

     <dependency>
       <groupId>com.alibaba.otter</groupId>
       <artifactId>canal.client</artifactId>
       <version>1.0.12</version>
     </dependency>

    添加客户端代码:

    public class Demo {
     public static void main(String[] args) {
         //创建连接
         CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop03", 11111),
                 "example""""");
         connector.connect();
         //订阅
         connector.subscribe();
         connector.rollback();
         int batchSize = 1000;
         int emptyCount = 0;
         int totalEmptyCount = 100;
         while (totalEmptyCount > emptyCount) {
             Message msg = connector.getWithoutAck(batchSize);
             long id = msg.getId();
             List<CanalEntry.Entry> entries = msg.getEntries();
             if(id == -1 || entries.size() == 0){
                 emptyCount++;
                 System.out.println("emptyCount : " + emptyCount);
                 try {
                     Thread.sleep(3000);
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
             }else{
                 emptyCount = 0;
                 printEntry(entries);
             }
             connector.ack(id);
         }
     }
     // batch -> entries -> rowchange - rowdata -> cols
     private static void printEntry(List<CanalEntry.Entry> entries) {
         for (CanalEntry.Entry entry : entries){
             if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
                     entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){
                 continue;
             }
             CanalEntry.RowChange rowChange = null;
             try {
                 rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
             } catch (InvalidProtocolBufferException e) {
                 e.printStackTrace();
             }
             CanalEntry.EventType eventType = rowChange.getEventType();
             System.out.println(entry.getHeader().getLogfileName()+" __ " +
                     entry.getHeader().getSchemaName() + " __ " + eventType);
             List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
             for(CanalEntry.RowData rowData : rowDatasList){
                 for(CanalEntry.Column column: rowData.getAfterColumnsList()){
                     System.out.println(column.getName() + " - " +
                             column.getValue() + " - " +
                             column.getUpdated());
                 }
             }
         }
     }
    }
    1. 在mysql中写数据,客户端就会把增量数据打印到控制台。

    Canal 的 HA 机制设计

    在大数据领域很多框架都会有 HA 机制,Canal 的 HA 分为两部分,Canal server 和 Canal client 分别有对应的 HA 实现:

    1. canal server:为了减少对 mysql dump 的请求,不同 server 上的 instance 要求同一时间只能有一个处于 running,其他的处于 standby 状态。
    2. canal client:为了保证有序性,一份 instance 同一时间只能由一个 canal client 进行 get/ack/rollback 操作,否则客户端接收无法保证有序。

    整个 HA 机制的控制主要是依赖了 ZooKeeper 的几个特性,ZooKeeper 这里就不讲了。

    Canal Server:

    1. canal server 要启动某个 canal instance 时都先向 ZooKeeper 进行一次尝试启动判断(创建 EPHEMERAL 节点,谁创建成功就允许谁启动)。
    2. 创建 ZooKeeper 节点成功后,对应的 canal server 就启动对应的 canal instance,没有创建成功的 canal instance 就会处于 standby 状态。
    3. 一旦 ZooKeeper 发现 canal server 创建的节点消失后,立即通知其他的 canal server 再次进行步骤 1 的操作,重新选出一个 canal server 启动 instance。
    4. canal client 每次进行 connect 时,会首先向 ZooKeeper 询问当前是谁启动了 canal instance,然后和其建立连接,一旦连接不可用,会重新尝试 connect。
    下一篇:没有了