基于 Avro 的 TiCDC 行数据 Checksum 校验

本文介绍如何使用 Golang 消费 TiCDC 发送到 Kafka、且由 Avro 协议编码的数据,以及如何基于单行数据 Checksum 功能进行数据校验。

本示例代码位于 avro-checksum-verification 目录下。

本文使用 kafka-go 实现一个简单的 Kafka Consumer 程序。该程序不断地从指定的 Topic 中读取数据、计算并校验 Checksum 值。

  1. package main
  2. import (
  3. "context"
  4. "encoding/binary"
  5. "encoding/json"
  6. "hash/crc32"
  7. "io"
  8. "math"
  9. "net/http"
  10. "strconv"
  11. "strings"
  12. "github.com/linkedin/goavro/v2"
  13. "github.com/pingcap/log"
  14. "github.com/pingcap/tidb/parser/mysql"
  15. "github.com/pingcap/tidb/types"
  16. "github.com/pingcap/tiflow/pkg/errors"
  17. "github.com/segmentio/kafka-go"
  18. "go.uber.org/zap"
  19. )
  20. const (
  21. // confluent avro wire format, the first byte is always 0
  22. // https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
  23. magicByte = uint8(0)
  24. )
  25. func main() {
  26. var (
  27. kafkaAddr = "127.0.0.1:9092"
  28. schemaRegistryURL = "http://127.0.0.1:8081"
  29. topic = "avro-checksum-test"
  30. consumerGroupID = "avro-checksum-test"
  31. )
  32. consumer := kafka.NewReader(kafka.ReaderConfig{
  33. Brokers: []string{kafkaAddr},
  34. GroupID: consumerGroupID,
  35. Topic: topic,
  36. MaxBytes: 10e6, // 10MB
  37. })
  38. defer consumer.Close()
  39. ctx := context.Background()
  40. log.Info("start consuming ...", zap.String("kafka", kafkaAddr), zap.String("topic", topic), zap.String("groupID", consumerGroupID))
  41. for {
  42. // 1. 获取 kafka 消息
  43. message, err := consumer.FetchMessage(ctx)
  44. if err != nil {
  45. log.Error("read kafka message failed", zap.Error(err))
  46. }
  47. value := message.Value
  48. if len(value) == 0 {
  49. log.Info("delete event does not have value, skip checksum verification", zap.String("topic", topic))
  50. }
  51. // 2. 对 value 进行解码,得到对应的 value map 和 schema map
  52. valueMap, valueSchema, err := getValueMapAndSchema(value, schemaRegistryURL)
  53. if err != nil {
  54. log.Panic("decode kafka value failed", zap.String("topic", topic), zap.ByteString("value", value), zap.Error(err))
  55. }
  56. // 3. 使用上一步得到的 value map 和 schema map,计算并且校验 checksum
  57. err = CalculateAndVerifyChecksum(valueMap, valueSchema)
  58. if err != nil {
  59. log.Panic("calculate checksum failed", zap.String("topic", topic), zap.ByteString("value", value), zap.Error(err))
  60. }
  61. // 4. 数据消费成功,提交 offset
  62. if err := consumer.CommitMessages(ctx, message); err != nil {
  63. log.Error("commit kafka message failed", zap.Error(err))
  64. break
  65. }
  66. }
  67. }

从上面的代码可以看出,getValueMapAndSchema()CalculateAndVerifyChecksum() 是计算 Checksum 的关键步骤,下面分别介绍这两个函数的实现。

解码数据以及获取相应的 Schema

getValueMapAndSchema() 方法的主要作用是解码数据以及获取相应的 schema,二者均以 map[string]interface{} 类型返回。

  1. // data is received kafka message's key or value, url is the schema registry url.
  2. // return the decoded value and corresponding schema as map.
  3. func getValueMapAndSchema(data []byte, url string) (map[string]interface{}, map[string]interface{}, error) {
  4. schemaID, binary, err := extractSchemaIDAndBinaryData(data)
  5. if err != nil {
  6. return nil, nil, err
  7. }
  8. codec, err := GetSchema(url, schemaID)
  9. if err != nil {
  10. return nil, nil, err
  11. }
  12. native, _, err := codec.NativeFromBinary(binary)
  13. if err != nil {
  14. return nil, nil, err
  15. }
  16. result, ok := native.(map[string]interface{})
  17. if !ok {
  18. return nil, nil, errors.New("raw avro message is not a map")
  19. }
  20. schema := make(map[string]interface{})
  21. if err := json.Unmarshal([]byte(codec.Schema()), &schema); err != nil {
  22. return nil, nil, errors.Trace(err)
  23. }
  24. return result, schema, nil
  25. }
  26. // extractSchemaIDAndBinaryData
  27. func extractSchemaIDAndBinaryData(data []byte) (int, []byte, error) {
  28. if len(data) < 5 {
  29. return 0, nil, errors.ErrAvroInvalidMessage.FastGenByArgs()
  30. }
  31. if data[0] != magicByte {
  32. return 0, nil, errors.ErrAvroInvalidMessage.FastGenByArgs()
  33. }
  34. return int(binary.BigEndian.Uint32(data[1:5])), data[5:], nil
  35. }
  36. // GetSchema query the schema registry to fetch the schema by the schema id.
  37. // return the goavro.Codec which can be used to encode and decode the data.
  38. func GetSchema(url string, schemaID int) (*goavro.Codec, error) {
  39. requestURI := url + "/schemas/ids/" + strconv.Itoa(schemaID)
  40. req, err := http.NewRequest("GET", requestURI, nil)
  41. if err != nil {
  42. log.Error("Cannot create the request to look up the schema", zap.Error(err))
  43. return nil, errors.WrapError(errors.ErrAvroSchemaAPIError, err)
  44. }
  45. req.Header.Add(
  46. "Accept",
  47. "application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, "+
  48. "application/json",
  49. )
  50. httpClient := &http.Client{}
  51. resp, err := httpClient.Do(req)
  52. if err != nil {
  53. return nil, err
  54. }
  55. defer resp.Body.Close()
  56. body, err := io.ReadAll(resp.Body)
  57. if err != nil {
  58. log.Error("Cannot parse the lookup schema response", zap.Error(err))
  59. return nil, errors.WrapError(errors.ErrAvroSchemaAPIError, err)
  60. }
  61. if resp.StatusCode == 404 {
  62. log.Warn("Specified schema not found in Registry", zap.String("requestURI", requestURI), zap.Int("schemaID", schemaID))
  63. return nil, errors.ErrAvroSchemaAPIError.GenWithStackByArgs("Schema not found in Registry")
  64. }
  65. if resp.StatusCode != 200 {
  66. log.Error("Failed to query schema from the Registry, HTTP error",
  67. zap.Int("status", resp.StatusCode), zap.String("uri", requestURI), zap.ByteString("responseBody", body))
  68. return nil, errors.ErrAvroSchemaAPIError.GenWithStack("Failed to query schema from the Registry, HTTP error")
  69. }
  70. var jsonResp lookupResponse
  71. err = json.Unmarshal(body, &jsonResp)
  72. if err != nil {
  73. log.Error("Failed to parse result from Registry", zap.Error(err))
  74. return nil, errors.WrapError(errors.ErrAvroSchemaAPIError, err)
  75. }
  76. codec, err := goavro.NewCodec(jsonResp.Schema)
  77. if err != nil {
  78. return nil, errors.WrapError(errors.ErrAvroSchemaAPIError, err)
  79. }
  80. return codec, nil
  81. }
  82. type lookupResponse struct {
  83. Name string `json:"name"`
  84. SchemaID int `json:"id"`
  85. Schema string `json:"schema"`
  86. }

计算并校验 Checksum

上一步获取的 valueMapvalueSchema 包含了所有用于 Checksum 计算和校验的元素。

在消费端计算和校验 Checksum 的过程包含以下几个步骤:

  1. 获取期望的 Checksum 值。
  2. 遍历每一列,根据列的数据值和对应的 MySQL Type,生成字节切片,不断更新 Checksum 值。
  3. 将上一步计算得到的 Checksum 和从收到的消息里提取出来的 Checksum 做比较。如果不一致,则说明 Checksum 校验失败,数据可能发生损坏。

示例代码如下:

  1. func CalculateAndVerifyChecksum(valueMap, valueSchema map[string]interface{}) error {
  2. // fields 存放有数据变更事件的每一个列的类型信息,按照每一列的 ID 排序,该顺序和 Checksum 计算顺序相同
  3. fields, ok := valueSchema["fields"].([]interface{})
  4. if !ok {
  5. return errors.New("schema fields should be a map")
  6. }
  7. // 1. 从 valueMap 里面查找期望的 checksum 值,它被编码成 string 类型
  8. // 如果找不到,说明 TiCDC 发送该条数据时,还没有开启 checksum 功能,直接返回即可
  9. o, ok := valueMap["_tidb_row_level_checksum"]
  10. if !ok {
  11. return nil
  12. }
  13. expected := o.(string)
  14. if expected == "" {
  15. return nil
  16. }
  17. // expectedChecksum 即是从 TiCDC 传递而来的期望的 checksum 值
  18. expectedChecksum, err := strconv.ParseUint(expected, 10, 64)
  19. if err != nil {
  20. return errors.Trace(err)
  21. }
  22. // 2. 遍历每一个 field,计算 checksum 值
  23. var actualChecksum uint32
  24. // buf 用来存储每次更新 checksum 时使用的字节切片
  25. buf := make([]byte, 0)
  26. for _, item := range fields {
  27. field, ok := item.(map[string]interface{})
  28. if !ok {
  29. return errors.New("schema field should be a map")
  30. }
  31. // `tidbOp` 及之后的列不参与到 checksum 计算中,因为它们是一些用于辅助数据消费的列,并非真实的 TiDB 列数据
  32. colName := field["name"].(string)
  33. if colName == "_tidb_op" {
  34. break
  35. }
  36. // holder 存放有列类型信息
  37. var holder map[string]interface{}
  38. switch ty := field["type"].(type) {
  39. case []interface{}:
  40. for _, item := range ty {
  41. if m, ok := item.(map[string]interface{}); ok {
  42. holder = m["connect.parameters"].(map[string]interface{})
  43. break
  44. }
  45. }
  46. case map[string]interface{}:
  47. holder = ty["connect.parameters"].(map[string]interface{})
  48. default:
  49. log.Panic("type info is anything else", zap.Any("typeInfo", field["type"]))
  50. }
  51. tidbType := holder["tidb_type"].(string)
  52. mysqlType := mysqlTypeFromTiDBType(tidbType)
  53. // 根据每一列的名字,从解码之后的 value map 里拿到该列的值
  54. value, ok := valueMap[colName]
  55. if !ok {
  56. return errors.New("value not found")
  57. }
  58. value, err := getColumnValue(value, holder, mysqlType)
  59. if err != nil {
  60. return errors.Trace(err)
  61. }
  62. if len(buf) > 0 {
  63. buf = buf[:0]
  64. }
  65. // 根据每一列的 value 和 mysqlType,生成用于更新 checksum 的字节切片,然后更新 checksum
  66. buf, err = buildChecksumBytes(buf, value, mysqlType)
  67. if err != nil {
  68. return errors.Trace(err)
  69. }
  70. actualChecksum = crc32.Update(actualChecksum, crc32.IEEETable, buf)
  71. }
  72. if uint64(actualChecksum) != expectedChecksum {
  73. log.Error("checksum mismatch",
  74. zap.Uint64("expected", expectedChecksum),
  75. zap.Uint64("actual", uint64(actualChecksum)))
  76. return errors.New("checksum mismatch")
  77. }
  78. log.Info("checksum verified", zap.Uint64("checksum", uint64(actualChecksum)))
  79. return nil
  80. }
  81. func mysqlTypeFromTiDBType(tidbType string) byte {
  82. var result byte
  83. switch tidbType {
  84. case "INT", "INT UNSIGNED":
  85. result = mysql.TypeLong
  86. case "BIGINT", "BIGINT UNSIGNED":
  87. result = mysql.TypeLonglong
  88. case "FLOAT":
  89. result = mysql.TypeFloat
  90. case "DOUBLE":
  91. result = mysql.TypeDouble
  92. case "BIT":
  93. result = mysql.TypeBit
  94. case "DECIMAL":
  95. result = mysql.TypeNewDecimal
  96. case "TEXT":
  97. result = mysql.TypeVarchar
  98. case "BLOB":
  99. result = mysql.TypeLongBlob
  100. case "ENUM":
  101. result = mysql.TypeEnum
  102. case "SET":
  103. result = mysql.TypeSet
  104. case "JSON":
  105. result = mysql.TypeJSON
  106. case "DATE":
  107. result = mysql.TypeDate
  108. case "DATETIME":
  109. result = mysql.TypeDatetime
  110. case "TIMESTAMP":
  111. result = mysql.TypeTimestamp
  112. case "TIME":
  113. result = mysql.TypeDuration
  114. case "YEAR":
  115. result = mysql.TypeYear
  116. default:
  117. log.Panic("this should not happen, unknown TiDB type", zap.String("type", tidbType))
  118. }
  119. return result
  120. }
  121. // value 是一个 interface 类型的值,需要根据 holder 提供的类型信息,做一次转换处理
  122. func getColumnValue(value interface{}, holder map[string]interface{}, mysqlType byte) (interface{}, error) {
  123. switch t := value.(type) {
  124. // nullable 的列,其值被编码成一个 map,只有一个键值对,键是类型,值是真实的值,此处只关心真实的值
  125. case map[string]interface{}:
  126. for _, v := range t {
  127. value = v
  128. }
  129. }
  130. switch mysqlType {
  131. case mysql.TypeEnum:
  132. // Enum 被编码成了 string,此处转换为对应于 Enum 定义的 int 值
  133. allowed := strings.Split(holder["allowed"].(string), ",")
  134. switch t := value.(type) {
  135. case string:
  136. enum, err := types.ParseEnum(allowed, t, "")
  137. if err != nil {
  138. return nil, errors.Trace(err)
  139. }
  140. value = enum.Value
  141. case nil:
  142. value = nil
  143. }
  144. case mysql.TypeSet:
  145. // Set 被编码成了 string,根据 set 定义的顺序,转换为对应的 int 值
  146. elems := strings.Split(holder["allowed"].(string), ",")
  147. switch t := value.(type) {
  148. case string:
  149. s, err := types.ParseSet(elems, t, "")
  150. if err != nil {
  151. return nil, errors.Trace(err)
  152. }
  153. value = s.Value
  154. case nil:
  155. value = nil
  156. }
  157. }
  158. return value, nil
  159. }
  160. // buildChecksumBytes 生成用于更新 checksum 的字节切片, 参考 https://github.com/pingcap/tidb/blob/e3417913f58cdd5a136259b902bf177eaf3aa637/util/rowcodec/common.go#L308
  161. func buildChecksumBytes(buf []byte, value interface{}, mysqlType byte) ([]byte, error) {
  162. if value == nil {
  163. return buf, nil
  164. }
  165. switch mysqlType {
  166. // TypeTiny, TypeShort, TypeInt32 被编码成 int32
  167. // TypeLong 被编码成 int32 if signed, else int64
  168. // TypeLongLong,如果是 signed,被编码成 int64,否则被编码成 uint64,
  169. // 开启 checksum 功能,bigintUnsignedHandlingMode 必须设置为 string,被编码成 string.
  170. case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24, mysql.TypeYear:
  171. switch a := value.(type) {
  172. case int32:
  173. buf = binary.LittleEndian.AppendUint64(buf, uint64(a))
  174. case uint32:
  175. buf = binary.LittleEndian.AppendUint64(buf, uint64(a))
  176. case int64:
  177. buf = binary.LittleEndian.AppendUint64(buf, uint64(a))
  178. case uint64:
  179. buf = binary.LittleEndian.AppendUint64(buf, a)
  180. case string:
  181. v, err := strconv.ParseUint(a, 10, 64)
  182. if err != nil {
  183. return nil, errors.Trace(err)
  184. }
  185. buf = binary.LittleEndian.AppendUint64(buf, v)
  186. default:
  187. log.Panic("unknown golang type for the integral value",
  188. zap.Any("value", value), zap.Any("mysqlType", mysqlType))
  189. }
  190. // Float 类型编码为 float32,Double 编码为 float64
  191. case mysql.TypeFloat, mysql.TypeDouble:
  192. var v float64
  193. switch a := value.(type) {
  194. case float32:
  195. v = float64(a)
  196. case float64:
  197. v = a
  198. }
  199. if math.IsInf(v, 0) || math.IsNaN(v) {
  200. v = 0
  201. }
  202. buf = binary.LittleEndian.AppendUint64(buf, math.Float64bits(v))
  203. // getColumnValue 将 Enum 和 Set 转换为了 uint64 类型
  204. case mysql.TypeEnum, mysql.TypeSet:
  205. buf = binary.LittleEndian.AppendUint64(buf, value.(uint64))
  206. case mysql.TypeBit:
  207. // bit 类型编码为 []bytes,需要进一步转换为 uint64
  208. v, err := binaryLiteralToInt(value.([]byte))
  209. if err != nil {
  210. return nil, errors.Trace(err)
  211. }
  212. buf = binary.LittleEndian.AppendUint64(buf, v)
  213. // 非二进制类型时,编码成 string, 反之则为 []byte
  214. case mysql.TypeVarchar, mysql.TypeVarString, mysql.TypeString, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob:
  215. switch a := value.(type) {
  216. case string:
  217. buf = appendLengthValue(buf, []byte(a))
  218. case []byte:
  219. buf = appendLengthValue(buf, a)
  220. default:
  221. log.Panic("unknown golang type for the string value",
  222. zap.Any("value", value), zap.Any("mysqlType", mysqlType))
  223. }
  224. case mysql.TypeTimestamp, mysql.TypeDatetime, mysql.TypeDate, mysql.TypeDuration, mysql.TypeNewDate:
  225. v := value.(string)
  226. buf = appendLengthValue(buf, []byte(v))
  227. // 开启 checksum 功能时,decimalHandlingMode 必须设置为 string
  228. case mysql.TypeNewDecimal:
  229. buf = appendLengthValue(buf, []byte(value.(string)))
  230. case mysql.TypeJSON:
  231. buf = appendLengthValue(buf, []byte(value.(string)))
  232. // Null 和 Geometry 不参与到 checksum 计算
  233. case mysql.TypeNull, mysql.TypeGeometry:
  234. // do nothing
  235. default:
  236. return buf, errors.New("invalid type for the checksum calculation")
  237. }
  238. return buf, nil
  239. }
  240. func appendLengthValue(buf []byte, val []byte) []byte {
  241. buf = binary.LittleEndian.AppendUint32(buf, uint32(len(val)))
  242. buf = append(buf, val...)
  243. return buf
  244. }
  245. // 将 []byte 转换为 uint64,参考 https://github.com/pingcap/tidb/blob/e3417913f58cdd5a136259b902bf177eaf3aa637/types/binary_literal.go#L105
  246. func binaryLiteralToInt(bytes []byte) (uint64, error) {
  247. bytes = trimLeadingZeroBytes(bytes)
  248. length := len(bytes)
  249. if length > 8 {
  250. log.Error("invalid bit value found", zap.ByteString("value", bytes))
  251. return math.MaxUint64, errors.New("invalid bit value")
  252. }
  253. if length == 0 {
  254. return 0, nil
  255. }
  256. val := uint64(bytes[0])
  257. for i := 1; i < length; i++ {
  258. val = (val << 8) | uint64(bytes[i])
  259. }
  260. return val, nil
  261. }
  262. func trimLeadingZeroBytes(bytes []byte) []byte {
  263. if len(bytes) == 0 {
  264. return bytes
  265. }
  266. pos, posMax := 0, len(bytes)-1
  267. for ; pos < posMax; pos++ {
  268. if bytes[pos] != 0 {
  269. break
  270. }
  271. }
  272. return bytes[pos:]
  273. }