TiCDC Row Data Checksum Verification Based on Avro

This document introduces how to consume data sent to Kafka by TiCDC and encoded by Avro protocol using Golang, and how to perform data verification using the Single-row data checksum feature.

The source code of this example is available in the avro-checksum-verification directory.

The example in this document uses kafka-go to create a simple Kafka consumer program. This program continuously reads data from a specified topic, calculates the checksum, and verifies its value.

  1. package main
  2. import (
  3. "context"
  4. "encoding/binary"
  5. "encoding/json"
  6. "hash/crc32"
  7. "io"
  8. "math"
  9. "net/http"
  10. "strconv"
  11. "strings"
  12. "github.com/linkedin/goavro/v2"
  13. "github.com/pingcap/log"
  14. "github.com/pingcap/tidb/parser/mysql"
  15. "github.com/pingcap/tidb/types"
  16. "github.com/pingcap/tiflow/pkg/errors"
  17. "github.com/segmentio/kafka-go"
  18. "go.uber.org/zap"
  19. )
  20. const (
  21. // The first byte of the Confluent Avro wire format is always 0.
  22. // For more details, see https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format.
  23. magicByte = uint8(0)
  24. )
  25. func main() {
  26. var (
  27. kafkaAddr = "127.0.0.1:9092"
  28. schemaRegistryURL = "http://127.0.0.1:8081"
  29. topic = "avro-checksum-test"
  30. consumerGroupID = "avro-checksum-test"
  31. )
  32. consumer := kafka.NewReader(kafka.ReaderConfig{
  33. Brokers: []string{kafkaAddr},
  34. GroupID: consumerGroupID,
  35. Topic: topic,
  36. MaxBytes: 10e6, // 10MB
  37. })
  38. defer consumer.Close()
  39. ctx := context.Background()
  40. log.Info("start consuming ...", zap.String("kafka", kafkaAddr), zap.String("topic", topic), zap.String("groupID", consumerGroupID))
  41. for {
  42. // 1. Fetch the kafka message.
  43. message, err := consumer.FetchMessage(ctx)
  44. if err != nil {
  45. log.Error("read kafka message failed", zap.Error(err))
  46. }
  47. value := message.Value
  48. if len(value) == 0 {
  49. log.Info("delete event does not have value, skip checksum verification", zap.String("topic", topic))
  50. }
  51. // 2. Decode the value to get the corresponding value map and schema map.
  52. valueMap, valueSchema, err := getValueMapAndSchema(value, schemaRegistryURL)
  53. if err != nil {
  54. log.Panic("decode kafka value failed", zap.String("topic", topic), zap.ByteString("value", value), zap.Error(err))
  55. }
  56. // 3. Calculate and verify checksum value using the value map and schema map obtained in the previous step.
  57. err = CalculateAndVerifyChecksum(valueMap, valueSchema)
  58. if err != nil {
  59. log.Panic("calculate checksum failed", zap.String("topic", topic), zap.ByteString("value", value), zap.Error(err))
  60. }
  61. // 4. Commit offset after the data is successfully consumed.
  62. if err := consumer.CommitMessages(ctx, message); err != nil {
  63. log.Error("commit kafka message failed", zap.Error(err))
  64. break
  65. }
  66. }
  67. }

The key steps for calculating the checksum value are getValueMapAndSchema() and CalculateAndVerifyChecksum(). The following sections describe the implementation of these two functions.

Decode data and get the corresponding schema

The getValueMapAndSchema() method decodes data and gets the corresponding schema. This method returns both the data and schema as a map[string]interface{} type.

  1. // data is the key or value of the received kafka message, and url is the schema registry url.
  2. // This function returns the decoded value and corresponding schema as map.
  3. func getValueMapAndSchema(data []byte, url string) (map[string]interface{}, map[string]interface{}, error) {
  4. schemaID, binary, err := extractSchemaIDAndBinaryData(data)
  5. if err != nil {
  6. return nil, nil, err
  7. }
  8. codec, err := GetSchema(url, schemaID)
  9. if err != nil {
  10. return nil, nil, err
  11. }
  12. native, _, err := codec.NativeFromBinary(binary)
  13. if err != nil {
  14. return nil, nil, err
  15. }
  16. result, ok := native.(map[string]interface{})
  17. if !ok {
  18. return nil, nil, errors.New("raw avro message is not a map")
  19. }
  20. schema := make(map[string]interface{})
  21. if err := json.Unmarshal([]byte(codec.Schema()), &schema); err != nil {
  22. return nil, nil, errors.Trace(err)
  23. }
  24. return result, schema, nil
  25. }
  26. // extractSchemaIDAndBinaryData
  27. func extractSchemaIDAndBinaryData(data []byte) (int, []byte, error) {
  28. if len(data) < 5 {
  29. return 0, nil, errors.ErrAvroInvalidMessage.FastGenByArgs()
  30. }
  31. if data[0] != magicByte {
  32. return 0, nil, errors.ErrAvroInvalidMessage.FastGenByArgs()
  33. }
  34. return int(binary.BigEndian.Uint32(data[1:5])), data[5:], nil
  35. }
  36. // GetSchema fetches the schema from the schema registry by the schema ID.
  37. // This function returns a goavro.Codec that can be used to encode and decode the data.
  38. func GetSchema(url string, schemaID int) (*goavro.Codec, error) {
  39. requestURI := url + "/schemas/ids/" + strconv.Itoa(schemaID)
  40. req, err := http.NewRequest("GET", requestURI, nil)
  41. if err != nil {
  42. log.Error("Cannot create the request to look up the schema", zap.Error(err))
  43. return nil, errors.WrapError(errors.ErrAvroSchemaAPIError, err)
  44. }
  45. req.Header.Add(
  46. "Accept",
  47. "application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, "+
  48. "application/json",
  49. )
  50. httpClient := &http.Client{}
  51. resp, err := httpClient.Do(req)
  52. if err != nil {
  53. return nil, err
  54. }
  55. defer resp.Body.Close()
  56. body, err := io.ReadAll(resp.Body)
  57. if err != nil {
  58. log.Error("Cannot parse the lookup schema response", zap.Error(err))
  59. return nil, errors.WrapError(errors.ErrAvroSchemaAPIError, err)
  60. }
  61. if resp.StatusCode == 404 {
  62. log.Warn("Specified schema not found in Registry", zap.String("requestURI", requestURI), zap.Int("schemaID", schemaID))
  63. return nil, errors.ErrAvroSchemaAPIError.GenWithStackByArgs("Schema not found in Registry")
  64. }
  65. if resp.StatusCode != 200 {
  66. log.Error("Failed to query schema from the Registry, HTTP error",
  67. zap.Int("status", resp.StatusCode), zap.String("uri", requestURI), zap.ByteString("responseBody", body))
  68. return nil, errors.ErrAvroSchemaAPIError.GenWithStack("Failed to query schema from the Registry, HTTP error")
  69. }
  70. var jsonResp lookupResponse
  71. err = json.Unmarshal(body, &jsonResp)
  72. if err != nil {
  73. log.Error("Failed to parse result from Registry", zap.Error(err))
  74. return nil, errors.WrapError(errors.ErrAvroSchemaAPIError, err)
  75. }
  76. codec, err := goavro.NewCodec(jsonResp.Schema)
  77. if err != nil {
  78. return nil, errors.WrapError(errors.ErrAvroSchemaAPIError, err)
  79. }
  80. return codec, nil
  81. }
  82. type lookupResponse struct {
  83. Name string `json:"name"`
  84. SchemaID int `json:"id"`
  85. Schema string `json:"schema"`
  86. }

Calculate and verify the checksum value

The valueMap and valueSchema obtained in the previous step contain all the elements used for checksum calculation and verification.

The checksum calculation and verification process on the consumer side includes the following steps:

  1. Get the expected checksum value.
  2. Iterate over each column, generate a byte slice according to the column value and the corresponding MySQL type, and update the checksum value continuously.
  3. Compare the checksum value calculated in the previous step with the checksum value obtained from the received message. If they are not the same, the checksum verification fails and the data might be corrupted.

The sample code is as follows:

  1. func CalculateAndVerifyChecksum(valueMap, valueSchema map[string]interface{}) error {
  2. // The fields variable stores the column type information for each data change event. The column IDs are used to sort the fields, which is the same as the order in which the checksum is calculated.
  3. fields, ok := valueSchema["fields"].([]interface{})
  4. if !ok {
  5. return errors.New("schema fields should be a map")
  6. }
  7. // 1. Get the expected checksum value from valueMap, which is encoded as a string.
  8. // If the expected checksum value is not found, it means that the checksum feature is not enabled when TiCDC sends the data. In this case, this function returns directly.
  9. o, ok := valueMap["_tidb_row_level_checksum"]
  10. if !ok {
  11. return nil
  12. }
  13. expected := o.(string)
  14. if expected == "" {
  15. return nil
  16. }
  17. // expectedChecksum is the expected checksum value passed from TiCDC.
  18. expectedChecksum, err := strconv.ParseUint(expected, 10, 64)
  19. if err != nil {
  20. return errors.Trace(err)
  21. }
  22. // 2. Iterate over each field and calculate the checksum value.
  23. var actualChecksum uint32
  24. // buf stores the byte slice used to update the checksum value each time.
  25. buf := make([]byte, 0)
  26. for _, item := range fields {
  27. field, ok := item.(map[string]interface{})
  28. if !ok {
  29. return errors.New("schema field should be a map")
  30. }
  31. // The tidbOp and subsequent columns are not involved in the checksum calculation, because they are used to assist data consumption and not real TiDB column data.
  32. colName := field["name"].(string)
  33. if colName == "_tidb_op" {
  34. break
  35. }
  36. // The holder variable stores the type information of each column.
  37. var holder map[string]interface{}
  38. switch ty := field["type"].(type) {
  39. case []interface{}:
  40. for _, item := range ty {
  41. if m, ok := item.(map[string]interface{}); ok {
  42. holder = m["connect.parameters"].(map[string]interface{})
  43. break
  44. }
  45. }
  46. case map[string]interface{}:
  47. holder = ty["connect.parameters"].(map[string]interface{})
  48. default:
  49. log.Panic("type info is anything else", zap.Any("typeInfo", field["type"]))
  50. }
  51. tidbType := holder["tidb_type"].(string)
  52. mysqlType := mysqlTypeFromTiDBType(tidbType)
  53. // Get the value of each column from the decoded value map according to the name of each column.
  54. value, ok := valueMap[colName]
  55. if !ok {
  56. return errors.New("value not found")
  57. }
  58. value, err := getColumnValue(value, holder, mysqlType)
  59. if err != nil {
  60. return errors.Trace(err)
  61. }
  62. if len(buf) > 0 {
  63. buf = buf[:0]
  64. }
  65. // Generate a byte slice used to update the checksum according to the value and mysqlType of each column, and then update the checksum value.
  66. buf, err = buildChecksumBytes(buf, value, mysqlType)
  67. if err != nil {
  68. return errors.Trace(err)
  69. }
  70. actualChecksum = crc32.Update(actualChecksum, crc32.IEEETable, buf)
  71. }
  72. if uint64(actualChecksum) != expectedChecksum {
  73. log.Error("checksum mismatch",
  74. zap.Uint64("expected", expectedChecksum),
  75. zap.Uint64("actual", uint64(actualChecksum)))
  76. return errors.New("checksum mismatch")
  77. }
  78. log.Info("checksum verified", zap.Uint64("checksum", uint64(actualChecksum)))
  79. return nil
  80. }
  81. func mysqlTypeFromTiDBType(tidbType string) byte {
  82. var result byte
  83. switch tidbType {
  84. case "INT", "INT UNSIGNED":
  85. result = mysql.TypeLong
  86. case "BIGINT", "BIGINT UNSIGNED":
  87. result = mysql.TypeLonglong
  88. case "FLOAT":
  89. result = mysql.TypeFloat
  90. case "DOUBLE":
  91. result = mysql.TypeDouble
  92. case "BIT":
  93. result = mysql.TypeBit
  94. case "DECIMAL":
  95. result = mysql.TypeNewDecimal
  96. case "TEXT":
  97. result = mysql.TypeVarchar
  98. case "BLOB":
  99. result = mysql.TypeLongBlob
  100. case "ENUM":
  101. result = mysql.TypeEnum
  102. case "SET":
  103. result = mysql.TypeSet
  104. case "JSON":
  105. result = mysql.TypeJSON
  106. case "DATE":
  107. result = mysql.TypeDate
  108. case "DATETIME":
  109. result = mysql.TypeDatetime
  110. case "TIMESTAMP":
  111. result = mysql.TypeTimestamp
  112. case "TIME":
  113. result = mysql.TypeDuration
  114. case "YEAR":
  115. result = mysql.TypeYear
  116. default:
  117. log.Panic("this should not happen, unknown TiDB type", zap.String("type", tidbType))
  118. }
  119. return result
  120. }
  121. // The value is an interface type, which needs to be converted according to the type information provided by the holder.
  122. func getColumnValue(value interface{}, holder map[string]interface{}, mysqlType byte) (interface{}, error) {
  123. switch t := value.(type) {
  124. // The column with nullable is encoded as a map, and there is only one key-value pair. The key is the type, and the value is the real value. Only the real value is concerned here.
  125. case map[string]interface{}:
  126. for _, v := range t {
  127. value = v
  128. }
  129. }
  130. switch mysqlType {
  131. case mysql.TypeEnum:
  132. // Enum is encoded as a string, which is converted to the int value corresponding to the Enum definition here.
  133. allowed := strings.Split(holder["allowed"].(string), ",")
  134. switch t := value.(type) {
  135. case string:
  136. enum, err := types.ParseEnum(allowed, t, "")
  137. if err != nil {
  138. return nil, errors.Trace(err)
  139. }
  140. value = enum.Value
  141. case nil:
  142. value = nil
  143. }
  144. case mysql.TypeSet:
  145. // Set is encoded as a string, which is converted to the int value corresponding to the Set definition here.
  146. elems := strings.Split(holder["allowed"].(string), ",")
  147. switch t := value.(type) {
  148. case string:
  149. s, err := types.ParseSet(elems, t, "")
  150. if err != nil {
  151. return nil, errors.Trace(err)
  152. }
  153. value = s.Value
  154. case nil:
  155. value = nil
  156. }
  157. }
  158. return value, nil
  159. }
  160. // buildChecksumBytes generates a byte slice used to update the checksum, refer to https://github.com/pingcap/tidb/blob/e3417913f58cdd5a136259b902bf177eaf3aa637/util/rowcodec/common.go#L308
  161. func buildChecksumBytes(buf []byte, value interface{}, mysqlType byte) ([]byte, error) {
  162. if value == nil {
  163. return buf, nil
  164. }
  165. switch mysqlType {
  166. // TypeTiny, TypeShort, and TypeInt32 are encoded as int32.
  167. // TypeLong is encoded as int32 if signed, otherwise, it is encoded as int64.
  168. // TypeLongLong is encoded as int64 if signed, otherwise, it is encoded as uint64.
  169. // When the checksum feature is enabled, bigintUnsignedHandlingMode must be set to string, which is encoded as string.
  170. case mysql.TypeTiny, mysql.TypeShort, mysql.TypeLong, mysql.TypeLonglong, mysql.TypeInt24, mysql.TypeYear:
  171. switch a := value.(type) {
  172. case int32:
  173. buf = binary.LittleEndian.AppendUint64(buf, uint64(a))
  174. case uint32:
  175. buf = binary.LittleEndian.AppendUint64(buf, uint64(a))
  176. case int64:
  177. buf = binary.LittleEndian.AppendUint64(buf, uint64(a))
  178. case uint64:
  179. buf = binary.LittleEndian.AppendUint64(buf, a)
  180. case string:
  181. v, err := strconv.ParseUint(a, 10, 64)
  182. if err != nil {
  183. return nil, errors.Trace(err)
  184. }
  185. buf = binary.LittleEndian.AppendUint64(buf, v)
  186. default:
  187. log.Panic("unknown golang type for the integral value",
  188. zap.Any("value", value), zap.Any("mysqlType", mysqlType))
  189. }
  190. // Encode float type as float64 and encode double type as float64.
  191. case mysql.TypeFloat, mysql.TypeDouble:
  192. var v float64
  193. switch a := value.(type) {
  194. case float32:
  195. v = float64(a)
  196. case float64:
  197. v = a
  198. }
  199. if math.IsInf(v, 0) || math.IsNaN(v) {
  200. v = 0
  201. }
  202. buf = binary.LittleEndian.AppendUint64(buf, math.Float64bits(v))
  203. // getColumnValue encodes Enum and Set to uint64 type.
  204. case mysql.TypeEnum, mysql.TypeSet:
  205. buf = binary.LittleEndian.AppendUint64(buf, value.(uint64))
  206. case mysql.TypeBit:
  207. // Encode bit type as []byte and convert it to uint64.
  208. v, err := binaryLiteralToInt(value.([]byte))
  209. if err != nil {
  210. return nil, errors.Trace(err)
  211. }
  212. buf = binary.LittleEndian.AppendUint64(buf, v)
  213. // Non-binary types are encoded as string, and binary types are encoded as []byte.
  214. case mysql.TypeVarchar, mysql.TypeVarString, mysql.TypeString, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob:
  215. switch a := value.(type) {
  216. case string:
  217. buf = appendLengthValue(buf, []byte(a))
  218. case []byte:
  219. buf = appendLengthValue(buf, a)
  220. default:
  221. log.Panic("unknown golang type for the string value",
  222. zap.Any("value", value), zap.Any("mysqlType", mysqlType))
  223. }
  224. case mysql.TypeTimestamp, mysql.TypeDatetime, mysql.TypeDate, mysql.TypeDuration, mysql.TypeNewDate:
  225. v := value.(string)
  226. buf = appendLengthValue(buf, []byte(v))
  227. // When the checksum feature is enabled, decimalHandlingMode must be set to string.
  228. case mysql.TypeNewDecimal:
  229. buf = appendLengthValue(buf, []byte(value.(string)))
  230. case mysql.TypeJSON:
  231. buf = appendLengthValue(buf, []byte(value.(string)))
  232. // Null and Geometry are not involved in the checksum calculation.
  233. case mysql.TypeNull, mysql.TypeGeometry:
  234. // do nothing
  235. default:
  236. return buf, errors.New("invalid type for the checksum calculation")
  237. }
  238. return buf, nil
  239. }
  240. func appendLengthValue(buf []byte, val []byte) []byte {
  241. buf = binary.LittleEndian.AppendUint32(buf, uint32(len(val)))
  242. buf = append(buf, val...)
  243. return buf
  244. }
  245. // Convert []byte to uint64, refer to https://github.com/pingcap/tidb/blob/e3417913f58cdd5a136259b902bf177eaf3aa637/types/binary_literal.go#L105
  246. func binaryLiteralToInt(bytes []byte) (uint64, error) {
  247. bytes = trimLeadingZeroBytes(bytes)
  248. length := len(bytes)
  249. if length > 8 {
  250. log.Error("invalid bit value found", zap.ByteString("value", bytes))
  251. return math.MaxUint64, errors.New("invalid bit value")
  252. }
  253. if length == 0 {
  254. return 0, nil
  255. }
  256. val := uint64(bytes[0])
  257. for i := 1; i < length; i++ {
  258. val = (val << 8) | uint64(bytes[i])
  259. }
  260. return val, nil
  261. }
  262. func trimLeadingZeroBytes(bytes []byte) []byte {
  263. if len(bytes) == 0 {
  264. return bytes
  265. }
  266. pos, posMax := 0, len(bytes)-1
  267. for ; pos < posMax; pos++ {
  268. if bytes[pos] != 0 {
  269. break
  270. }
  271. }
  272. return bytes[pos:]
  273. }