Binlog Consumer Client 用户文档

目前 Drainer 提供了多种输出方式,包括 MySQL、TiDB、file 等。但是用户往往有一些自定义的需求,比如输出到 Elasticsearch、Hive 等,这些需求 Drainer 现在还没有实现,因此 Drainer 增加了输出到 Kafka 的功能,将 binlog 数据解析后按一定的格式再输出到 Kafka 中,用户编写代码从 Kafka 中读出数据再进行处理。

配置 Kafka Drainer

修改 Drainer 的配置文件,设置输出为 Kafka,相关配置如下:

  1. [syncer]
  2. db-type = "kafka"
  3. [syncer.to]
  4. # Kafka 地址
  5. kafka-addrs = "127.0.0.1:9092"
  6. # Kafka 版本号
  7. kafka-version = "2.4.0"

自定义开发

数据格式

首先需要了解 Drainer 写入到 Kafka 中的数据格式:

  1. // Column 保存列的数据,针对数据的类型,保存在对应的变量中
  2. message Column {
  3. // 数据是否为 null
  4. optional bool is_null = 1 [ default = false ];
  5. // 保存 int 类型的数据
  6. optional int64 int64_value = 2;
  7. // 保存 uint、enum, set 类型的数据
  8. optional uint64 uint64_value = 3;
  9. // 保存 float、double 类型的数据
  10. optional double double_value = 4;
  11. // 保存 bit、blob、binary、json 类型的数据
  12. optional bytes bytes_value = 5;
  13. // 保存 date、time、decimal、text、char 类型的数据
  14. optional string string_value = 6;
  15. }
  16. // ColumnInfo 保存列的信息,包括列名、类型、是否为主键
  17. message ColumnInfo {
  18. optional string name = 1 [ (gogoproto.nullable) = false ];
  19. // MySQL 中小写的列字段类型
  20. // https://dev.mysql.com/doc/refman/8.0/en/data-types.html
  21. // numeric 类型:int bigint smallint tinyint float double decimal bit
  22. // string 类型:text longtext mediumtext char tinytext varchar
  23. // blob longblob mediumblob binary tinyblob varbinary
  24. // enum set
  25. // json 类型:json
  26. optional string mysql_type = 2 [ (gogoproto.nullable) = false ];
  27. optional bool is_primary_key = 3 [ (gogoproto.nullable) = false ];
  28. }
  29. // Row 保存一行的具体数据
  30. message Row { repeated Column columns = 1; }
  31. // MutationType 表示 DML 的类型
  32. enum MutationType {
  33. Insert = 0;
  34. Update = 1;
  35. Delete = 2;
  36. }
  37. // Table 包含一个表的数据变更
  38. message Table {
  39. optional string schema_name = 1;
  40. optional string table_name = 2;
  41. repeated ColumnInfo column_info = 3;
  42. repeated TableMutation mutations = 4;
  43. }
  44. // TableMutation 保存一行数据的变更
  45. message TableMutation {
  46. required MutationType type = 1;
  47. // 修改后的数据
  48. required Row row = 2;
  49. // 修改前的数据,只对 Update MutationType 有效
  50. optional Row change_row = 3;
  51. }
  52. // DMLData 保存一个事务所有的 DML 造成的数据变更
  53. message DMLData {
  54. // `tables` 包含事务中所有表的数据变更
  55. repeated Table tables = 1;
  56. }
  57. // DDLData 保存 DDL 的信息
  58. message DDLData {
  59. // 当前使用的数据库
  60. optional string schema_name = 1;
  61. // 相关表
  62. optional string table_name = 2;
  63. // `ddl_query` 是原始的 DDL 语句 query
  64. optional bytes ddl_query = 3;
  65. }
  66. // BinlogType 为 Binlog 的类型,分为 DML 和 DDL
  67. enum BinlogType {
  68. DML = 0; // Has `dml_data`
  69. DDL = 1; // Has `ddl_query`
  70. }
  71. // Binlog 保存一个事务所有的变更,Kafka 中保存的数据为该结构数据序列化后的结果
  72. message Binlog {
  73. optional BinlogType type = 1 [ (gogoproto.nullable) = false ];
  74. optional int64 commit_ts = 2 [ (gogoproto.nullable) = false ];
  75. optional DMLData dml_data = 3;
  76. optional DDLData ddl_data = 4;
  77. }

查看数据格式的具体定义,参见 secondary_binlog.proto

Driver

TiDB-Tools 项目提供了用于读取 Kafka 中 binlog 数据的 Driver,具有如下功能:

  • 读取 Kafka 的数据
  • 根据 commit ts 查找 binlog 在 kafka 中的储存位置

使用该 Driver 时,用户需要配置如下信息:

  • KafkaAddr:Kafka 集群的地址
  • CommitTS:从哪个 commit ts 开始读取 binlog
  • Offset:从 Kafka 哪个 offset 开始读取,如果设置了 CommitTS 就不用配置该参数
  • ClusterID:TiDB 集群的 cluster ID
  • Topic: Kafka Topic 名称,如果 Topic 名称为空,将会使用 drainer <ClusterID>_obinlog 中的默认名称

用户以包的形式引用 Driver 的代码即可使用,可以参考 Driver 中提供的示例代码来学习如何使用 Driver 以及 binlog 数据的解析,目前提供了两个例子:

  • 使用该 Driver 将数据同步到 MySQL,该示例包含将 binlog 转化为 SQL 的具体方法
  • 使用该 Driver 将数据打印出来

Driver 项目地址:Binlog Slave Driver

Kafka 自定义开发 - 图1

注意

  • 示例代码仅仅用于示范如何使用 Driver,如果需要用于生产环境需要优化代码。
  • 目前仅提供了 golang 版本的 Driver 以及示例代码。如果需要使用其他语言,用户需要根据 binlog 的 proto 文件生成相应语言的代码文件,并自行开发程序读取 Kafka 中的 binlog 数据、解析数据、输出到下游。也欢迎用户优化 example 代码,以及提交其他语言的示例代码到 TiDB-Tools