概述

本文主要介绍 canal 产品整体设计,帮助你快速地熟悉代码脉络。如果你对 canal 还一无所知,只是知道他能进行数据迁移同步,那么建议先行阅读 IntroductionAdminGuide 两篇文档了解。

产品设计

DevGuide - 图1

  • server 代表一个 canal 运行实例,对应于一个 jvm

  • instance 对应于一个数据队列 (1个 canal server 对应 1..n 个 instance )

  • instance 下的子模块

    • eventParser: 数据源接入,模拟 slave 协议和 master 进行交互,协议解析
    • eventSink: Parser 和 Store 链接器,进行数据过滤,加工,分发的工作
    • eventStore: 数据存储
    • metaManager: 增量订阅 & 消费信息管理器

整体类图设计

DevGuide - 图2

  • CanalLifeCycle: 所有 canal 模块的生命周期接口
  • CanalInstance: 组合 parser,sink,store 三个子模块,三个子模块的生命周期统一受 CanalInstance 管理
  • CanalServer: 聚合了多个 CanalInstance

EventParser 设计

DevGuide - 图3

  • 每个EventParser都会关联两个内部组件

    • CanalLogPositionManager : 记录binlog 最后一次解析成功位置信息,主要是描述下一次canal启动的位点
    • CanalHAController: 控制 EventParser 的链接主机管理,判断当前该链接哪个mysql数据库
  • 目前开源版本只支持 MySQL binlog , 默认通过 MySQL binlog dump 远程获取 binlog ,但也可以使用 LocalBinlog - 类 relay log 模式,直接消费本地文件中的 binlog

CanalLogPositionManager 设计

DevGuide - 图4

  • 如果 CanalEventStore 选择的是内存模式,可不保留解析位置,下一次 canal 启动时直接依赖 CanalMetaManager 记录的最后一次消费成功的位点即可. (最后一次ack提交的数据位点)
  • 如果 CanalEventStore 选择的是持久化模式,可通过 zookeeper 记录位点信息,canal instance 发生 failover 切换到另一台机器,可通过读取 zookeeper 获取位点信息
  • 可通过实现自己的 CanalLogPositionManager,比如记录位点信息到本地文件/nas 文件实现简单可用的无 HA 模式

CanalHAController类图设计

DevGuide - 图5

  • 失败检测常见方式可定时发送心跳语句到当前链接的数据库,超过一定次数检测失败时,尝试切换到备机
  • 如果有一套数据库主备信息管理系统,当数据库主备切换或者机器下线,推送配置到各个应用节点,HAController 收到后,控制 EventParser 进行链接切换

EventSink类图设计和扩展

DevGuide - 图6

DevGuide - 图7

  • 常见的 sink 业务有 1:n 和 n:1 形式,目前 GroupEventSink 主要是解决 n:1 的归并类业务

EventStore类图设计和扩展

DevGuide - 图8

  • 抽象 CanalStoreScavenge , 解决数据的清理,比如定时清理,满了之后清理,每次 ack 清理等
  • CanalEventStore 接口,主要包含 put/get/ack/rollback 的相关接口. put/get 操作会组成一个生产者/消费者模式,每个 store 都会有存储大小设计,存储满了,put 操作会阻塞等待 get 获取数据,所以不会无线占用存储,比如内存大小
    • EventStore 目前实现了 memory 模式,支持按照内存大小和内存记录数进行存储大小限制
    • 后续可开发基于本地文件的存储模式
    • 基于文件存储和内存存储,开发 mixed 模式,做成两级队列,内存 buffer 有空位时,将文件的数据读入到内存 buffer 中
    • mixed 模式实现可以让 canal 落地消费/订阅的模型,取 1 份binlog数据,提供多个客户端消费,消费有快有慢,各自保留消费位点

MetaManager类图设计和扩展

DevGuide - 图9

  • metaManager 目前支持了多种模式,最顶层 memory 和 zookeeper 模式,然后是 mixed 模式-先写内存,再写zookeeper
  • 可通过实现自己的 CanalMetaManager,比如记录位点信息到本地文件/nas文件,简单可用的无 HA 模式

应用扩展

上面介绍了相关模块的设计,这里介绍下如何将自己的扩展代码应用到canal中. 介绍之前首先需要了解instance的配置方式,可参见: AdminGuide 的 Spring 配置这一章节

  • canal instance 基于 spring 的管理方式,主要由两部分组成

    • xxx.properties
    • xxx-instance.xml
  • xxx-instance.xml 描述对应 instance 所使用的模块组件定义,默认的 instance 模块组件定义有

    • memory-instance.xml:选择 memory 模式的组件,速度优先,简单
    • default-instance.xml:选择 mixed/preiodmixed 模式的组件,可以提供 HA 的功能
    • group-instance.xml:提供 n:1 的 sink 模式如果要应用自己的组件,就只需要定义一份自己的 instance.xml,比如 custom-intance.xml

示例:

  1. <bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false">
  2. <property name="ignoreResourceNotFound" value="true" />
  3. <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 -->
  4. <property name="locationNames">
  5. <list>
  6. <value>classpath:canal.properties</value>
  7. <value>classpath:${canal.instance.destination:}/instance.properties</value>
  8. </list>
  9. </property>
  10. </bean>
  11.  
  12. <bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
  13. <property name="destination" value="${canal.instance.destination}" />
  14. <property name="eventParser">
  15. <ref local="eventParser" />
  16. </property>
  17. <property name="eventSink">
  18. <ref local="eventSink" />
  19. </property>
  20. <property name="eventStore">
  21. <ref local="eventStore" />
  22. </property>
  23. <property name="metaManager">
  24. <ref local="metaManager" />
  25. </property>
  26. <property name="alarmHandler">
  27. <ref local="alarmHandler" />
  28. </property>
  29. </bean>
  30. ......
  • 一份 instance.xml 中有一份或者多份 instance 定义,优先以 destination 名字查找对应的 instance bean 定义,如果没有,则按默认的名字 “instance” 查找 instance 对象,例如 xxxx-instance.xml 中定义 id 分别为 instance-1, instance-2 的两个 bean. 这两个 bean 将为同名的 instance 提供自定义的 eventParser , evnetSink , evnetStore , metaManager,alarmHandler.如果没有自定义这些 bean, 就使用 id="instance" 的 bean 来配置 canal instance.
  • 一份 instance bean 定义,需要包含 eventParser , evnetSink , evnetStore , metaManager,alarmHandler 的5个模块定义,( alarmHandler 主要是一些报警机制处理,因为简单没展开,可扩展)
  1. <bean id="instance-1" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
  2. <property name="destination" value="${canal.instance.destination}" />
  3. <property name="eventParser">
  4. <ref local="eventParser" />
  5. </property>
  6. ......
  7. </bean>
  8. <bean id="instance-2" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring">
  9. <property name="destination" value="${canal.instance.destination}" />
  10. .......
  11. </bean>

完成 custom-instance.xml 定义后,可通过 canal.properties 配置中进行引入

<pre class="java" name="code">canal.instance.{通道名字}.spring.xml = classpath:spring/custom-instance.xml

到这里,就完成了扩展组件的应用,启动canal instance后,就会使用自定义的的组件 , just have fun .