Binlog Consumer Client User Guide

Binlog Consumer Client is used to consume TiDB secondary binlog data from Kafka and output the data in a specific format. Currently, Drainer supports multiple kinds of down streaming, including MySQL, TiDB, file and Kafka. But sometimes users have customized requirements for outputting data to other formats, for example, Elasticsearch and Hive, so this feature is introduced.

Configure Drainer

Modify the configuration file of Drainer and set it to output the data to Kafka:

  1. [syncer]
  2. db-type = "kafka"
  3. [syncer.to]
  4. # the Kafka address
  5. kafka-addrs = "127.0.0.1:9092"
  6. # the Kafka version
  7. kafka-version = "2.4.0"

Customized development

Data format

Firstly, you need to obtain the format information of the data which is output to Kafka by Drainer:

  1. // `Column` stores the column data in the corresponding variable based on the data type.
  2. message Column {
  3. // Indicates whether the data is null
  4. optional bool is_null = 1 [ default = false ];
  5. // Stores `int` data
  6. optional int64 int64_value = 2;
  7. // Stores `uint`, `enum`, and `set` data
  8. optional uint64 uint64_value = 3;
  9. // Stores `float` and `double` data
  10. optional double double_value = 4;
  11. // Stores `bit`, `blob`, `binary` and `json` data
  12. optional bytes bytes_value = 5;
  13. // Stores `date`, `time`, `decimal`, `text`, `char` data
  14. optional string string_value = 6;
  15. }
  16. // `ColumnInfo` stores the column information, including the column name, type, and whether it is the primary key.
  17. message ColumnInfo {
  18. optional string name = 1 [ (gogoproto.nullable) = false ];
  19. // the lower case column field type in MySQL
  20. // https://dev.mysql.com/doc/refman/8.0/en/data-types.html
  21. // for the `numeric` type: int bigint smallint tinyint float double decimal bit
  22. // for the `string` type: text longtext mediumtext char tinytext varchar
  23. // blob longblob mediumblob binary tinyblob varbinary
  24. // enum set
  25. // for the `json` type: json
  26. optional string mysql_type = 2 [ (gogoproto.nullable) = false ];
  27. optional bool is_primary_key = 3 [ (gogoproto.nullable) = false ];
  28. }
  29. // `Row` stores the actual data of a row.
  30. message Row { repeated Column columns = 1; }
  31. // `MutationType` indicates the DML type.
  32. enum MutationType {
  33. Insert = 0;
  34. Update = 1;
  35. Delete = 2;
  36. }
  37. // `Table` contains mutations in a table.
  38. message Table {
  39. optional string schema_name = 1;
  40. optional string table_name = 2;
  41. repeated ColumnInfo column_info = 3;
  42. repeated TableMutation mutations = 4;
  43. }
  44. // `TableMutation` stores mutations of a row.
  45. message TableMutation {
  46. required MutationType type = 1;
  47. // data after modification
  48. required Row row = 2;
  49. // data before modification. It only takes effect for `Update MutationType`.
  50. optional Row change_row = 3;
  51. }
  52. // `DMLData` stores all the mutations caused by DML in a transaction.
  53. message DMLData {
  54. // `tables` contains all the table changes in the transaction.
  55. repeated Table tables = 1;
  56. }
  57. // `DDLData` stores the DDL information.
  58. message DDLData {
  59. // the database used currently
  60. optional string schema_name = 1;
  61. // the relates table
  62. optional string table_name = 2;
  63. // `ddl_query` is the original DDL statement query.
  64. optional bytes ddl_query = 3;
  65. }
  66. // `BinlogType` indicates the binlog type, including DML and DDL.
  67. enum BinlogType {
  68. DML = 0; // Has `dml_data`
  69. DDL = 1; // Has `ddl_query`
  70. }
  71. // `Binlog` stores all the changes in a transaction. Kafka stores the serialized result of the structure data.
  72. message Binlog {
  73. optional BinlogType type = 1 [ (gogoproto.nullable) = false ];
  74. optional int64 commit_ts = 2 [ (gogoproto.nullable) = false ];
  75. optional DMLData dml_data = 3;
  76. optional DDLData ddl_data = 4;
  77. }

For the definition of the data format, see secondary_binlog.proto

Driver

The TiDB-Tools project provides Driver, which is used to read the binlog data in Kafka. It has the following features:

  • Read the Kafka data.
  • Locate the binlog stored in Kafka based on commit ts.

You need to configure the following information when using Driver:

  • KafkaAddr: the address of the Kafka cluster
  • CommitTS: from which commit ts to start reading the binlog
  • Offset: from which Kafka offset to start reading data. If CommitTS is set, you needn’t configure this parameter.
  • ClusterID: the cluster ID of the TiDB cluster
  • Topic: the topic name of Kafka. If Topic is empty, use the default name in Drainer <ClusterID>_obinlog.

You can use Driver by quoting the Driver code in package and refer to the example code provided by Driver to learn how to use Driver and parse the binlog data.

Currently, two examples are provided:

  • Using Driver to replicate data to MySQL. This example shows how to convert a binlog to SQL
  • Using Driver to print data

Binlog Consumer Client - 图1

Note

  • The example code only shows how to use Driver. If you want to use Driver in the production environment, you need to optimize the code.
  • Currently, only the Golang version of Driver and example code are available. If you want to use other languages, you need to generate the code file in the corresponding language based on the binlog proto file and develop an application to read the binlog data in Kafka, parse the data, and output the data to the downstream. You are also welcome to optimize the example code and submit the example code of other languages to TiDB-Tools.