更新数据

此页面将展示以下 SQL 语句,配合各种编程语言 TiDB 中的数据进行更新:

  • UPDATE: 用于修改指定表中的数据。
  • INSERT ON DUPLICATE KEY UPDATE: 用于插入数据,在有主键或唯一键冲突时,更新此数据。注意,不建议在有多个唯一键(包含主键)的情况下使用此语句。这是因为此语句在检测到任何唯一键(包括主键) 冲突时,将更新数据。在不止匹配到一行冲突时,将只会更新一行数据。

在开始之前

在阅读本页面之前,你需要准备以下事项:

使用 UPDATE

需更新表中的现有行,需要使用带有 WHERE 子句的 UPDATE 语句,即需要过滤列进行更新。

更新数据 - 图1

注意

如果您需要更新大量的行,比如数万甚至更多行,那么建议不要一次性进行完整的更新,而是每次迭代更新一部分,直到所有行全部更新。您可以编写脚本或程序,使用循环完成此操作。 您可参考批量更新获得指引。

SQL 语法

在 SQL 中,UPDATE 语句一般为以下形式:

  1. UPDATE {table} SET {update_column} = {update_value} WHERE {filter_column} = {filter_value}
参数描述
{table}表名
{update_column}需更新的列名
{update_value}需更新的此列的值
{filter_column}匹配条件过滤器的列名
{filter_value}匹配条件过滤器的列值

此处仅展示 UPDATE 的简单用法,详细文档可参考 TiDB 的 UPDATE 语法页

UPDATE 最佳实践

以下是更新行时需要遵循的一些最佳实践:

  • 始终在更新语句中指定 WHERE 子句。如果 UPDATE 没有 WHERE 子句,TiDB 将更新这个表内的所有行
  • 需要更新大量行(数万或更多)的时候,使用批量更新,这是因为 TiDB 单个事务大小限制为 txn-total-size-limit(默认为 100MB),且一次性过多的数据更新,将导致持有锁时间过长(悲观事务),或产生大量冲突(乐观事务)。

UPDATE 例子

假设某位作者改名为 Helen Haruki,需要更改 authors 表。假设他的唯一标识 id 为 1,即过滤器应为:id = 1

  • SQL
  • Java

在 SQL 中更改作者姓名的示例为:

  1. UPDATE `authors` SET `name` = "Helen Haruki" WHERE `id` = 1;

在 Java 中更改作者姓名的示例为:

  1. // ds is an entity of com.mysql.cj.jdbc.MysqlDataSource
  2. try (Connection connection = ds.getConnection()) {
  3. PreparedStatement pstmt = connection.prepareStatement("UPDATE `authors` SET `name` = ? WHERE `id` = ?");
  4. pstmt.setString(1, "Helen Haruki");
  5. pstmt.setInt(2, 1);
  6. pstmt.executeUpdate();
  7. } catch (SQLException e) {
  8. e.printStackTrace();
  9. }

使用 INSERT ON DUPLICATE KEY UPDATE

如果你需要将新数据插入表中,但如果有任何唯一键(主键也是一种唯一键)发生冲突,则会更新第一条冲突数据,可使用 INSERT ... ON DUPLICATE KEY UPDATE ... 语句进行插入或更新。

SQL 语法

在 SQL 中,INSERT ... ON DUPLICATE KEY UPDATE ... 语句一般为以下形式:

  1. INSERT INTO {table} ({columns}) VALUES ({values})
  2. ON DUPLICATE KEY UPDATE {update_column} = {update_value};
参数描述
{table}表名
{columns}需插入的列名
{values}需插入的此列的值
{update_column}需更新的列名
{update_value}需更新的此列的值

INSERT ON DUPLICATE KEY UPDATE 最佳实践

  • 在仅有一个唯一键的表上使用 INSERT ON DUPLICATE KEY UPDATE。此语句在检测到任何 唯一键 (包括主键) 冲突时,将更新数据。在不止匹配到一行冲突时,将只会更新一行数据。因此,除非能保证仅有一行冲突,否则不建议在有多个唯一键的表中使用 INSERT ON DUPLICATE KEY UPDATE 语句。
  • 在创建或更新的场景中使用此语句。

INSERT ON DUPLICATE KEY UPDATE 例子

例如,需要更新 ratings 表来写入用户对书籍的评价,如果用户还未评价此书籍,将新建一条评价,如果用户已经评价过,那么将会更新他之前的评价。

此处主键为 book_iduser_id 的联合主键。user_id 为 1 的用户,给 book_id 为 1000 的书籍,打出的 5 分的评价。

  • SQL
  • Java

在 SQL 中更新书籍评价的示例为:

  1. INSERT INTO `ratings`
  2. (`book_id`, `user_id`, `score`, `rated_at`)
  3. VALUES
  4. (1000, 1, 5, NOW())
  5. ON DUPLICATE KEY UPDATE `score` = 5, `rated_at` = NOW();

在 Java 中更新书籍评价的示例为:

  1. // ds is an entity of com.mysql.cj.jdbc.MysqlDataSource
  2. try (Connection connection = ds.getConnection()) {
  3. PreparedStatement p = connection.prepareStatement("INSERT INTO `ratings` (`book_id`, `user_id`, `score`, `rated_at`)
  4. VALUES (?, ?, ?, NOW()) ON DUPLICATE KEY UPDATE `score` = ?, `rated_at` = NOW()");
  5. p.setInt(1, 1000);
  6. p.setInt(2, 1);
  7. p.setInt(3, 5);
  8. p.setInt(4, 5);
  9. p.executeUpdate();
  10. } catch (SQLException e) {
  11. e.printStackTrace();
  12. }

批量更新

需要更新表中多行的数据,可选择使用 UPDATE,并使用 WHERE 子句过滤需要更新的数据。

但如果你需要更新大量行(数万或更多)的时候,建议使用一个迭代,每次都只更新一部分数据,直到更新全部完成。这是因为 TiDB 单个事务大小限制为 txn-total-size-limit(默认为 100MB),且一次性过多的数据更新,将导致持有锁时间过长(悲观事务),或产生大量冲突(乐观事务)。你可以在程序或脚本中使用循环来完成操作。

本页提供了编写脚本来处理循环更新的示例,该示例演示了应如何进行 SELECTUPDATE 的组合,完成循环更新。

编写批量更新循环

首先,你应在你的应用或脚本的循环中,编写一个 SELECT 查询。这个查询的返回值可以作为需要更新的行的主键。需要注意的是,定义这个 SELECT 查询时,需要注意使用 WHERE 子句过滤需要更新的行。

例子

假设在过去的一年里,用户在 bookshop 网站进行了大量的书籍打分,但是原本设计为 5 分制的评分导致书籍评分的区分度不够,大量书籍评分集中在 3 分附近,因此,决定将 5 分制改为 10 分制。用来增大书籍评分的区分度。

这时需要对 ratings 表内之前 5 分制的数据进行乘 2 操作,同时需向 ratings 表内添加一个新列,以指示行是否已经被更新了。使用此列,可以在 SELECT 中过滤掉已经更新的行,这将防止脚本崩溃时对行进行多次更新,导致不合理的数据出现。

例如,你可以创建一个名为 ten_point,数据类型为 BOOL 的列作为是否为 10 分制的标识:

  1. ALTER TABLE `bookshop`.`ratings` ADD COLUMN `ten_point` BOOL NOT NULL DEFAULT FALSE;

更新数据 - 图2

注意

此批量更新程序将使用 DDL 语句将进行数据表的模式更改。TiDB 的所有 DDL 变更操作全部都是在线进行的,可查看此处,了解此处使用的 ADD COLUMN 语句。

  • Golang
  • Java (JDBC)

在 Golang 中,批量更新程序类似于以下内容:

  1. package main
  2. import (
  3. "database/sql"
  4. "fmt"
  5. _ "github.com/go-sql-driver/mysql"
  6. "strings"
  7. "time"
  8. )
  9. func main() {
  10. db, err := sql.Open("mysql", "root:@tcp(127.0.0.1:4000)/bookshop")
  11. if err != nil {
  12. panic(err)
  13. }
  14. defer db.Close()
  15. bookID, userID := updateBatch(db, true, 0, 0)
  16. fmt.Println("first time batch update success")
  17. for {
  18. time.Sleep(time.Second)
  19. bookID, userID = updateBatch(db, false, bookID, userID)
  20. fmt.Printf("batch update success, [bookID] %d, [userID] %d\n", bookID, userID)
  21. }
  22. }
  23. // updateBatch select at most 1000 lines data to update score
  24. func updateBatch(db *sql.DB, firstTime bool, lastBookID, lastUserID int64) (bookID, userID int64) {
  25. // select at most 1000 primary keys in five-point scale data
  26. var err error
  27. var rows *sql.Rows
  28. if firstTime {
  29. rows, err = db.Query("SELECT `book_id`, `user_id` FROM `bookshop`.`ratings` " +
  30. "WHERE `ten_point` != true ORDER BY `book_id`, `user_id` LIMIT 1000")
  31. } else {
  32. rows, err = db.Query("SELECT `book_id`, `user_id` FROM `bookshop`.`ratings` "+
  33. "WHERE `ten_point` != true AND `book_id` > ? AND `user_id` > ? "+
  34. "ORDER BY `book_id`, `user_id` LIMIT 1000", lastBookID, lastUserID)
  35. }
  36. if err != nil || rows == nil {
  37. panic(fmt.Errorf("error occurred or rows nil: %+v", err))
  38. }
  39. // joint all id with a list
  40. var idList []interface{}
  41. for rows.Next() {
  42. var tempBookID, tempUserID int64
  43. if err := rows.Scan(&tempBookID, &tempUserID); err != nil {
  44. panic(err)
  45. }
  46. idList = append(idList, tempBookID, tempUserID)
  47. bookID, userID = tempBookID, tempUserID
  48. }
  49. bulkUpdateSql := fmt.Sprintf("UPDATE `bookshop`.`ratings` SET `ten_point` = true, "+
  50. "`score` = `score` * 2 WHERE (`book_id`, `user_id`) IN (%s)", placeHolder(len(idList)))
  51. db.Exec(bulkUpdateSql, idList...)
  52. return bookID, userID
  53. }
  54. // placeHolder format SQL place holder
  55. func placeHolder(n int) string {
  56. holderList := make([]string, n/2, n/2)
  57. for i := range holderList {
  58. holderList[i] = "(?,?)"
  59. }
  60. return strings.Join(holderList, ",")
  61. }

每次迭代中,SELECT 按主键顺序进行查询,最多选择 1000 行未更新到 10 分制(ten_pointfalse)数据的主键值。每次 SELECT 都会选择比上一次 SELECT 结果的最大主键还要大的数据,防止重复。然后,使用批量更新的方式,对其 score 列乘 2,并且将 ten_point 设为 true,更新 ten_point 的意义是在于防止更新程序崩溃重启后,反复更新同一行数据,导致数据损坏。每次循环中的 time.Sleep(time.Second) 将使得更新程序暂停 1 秒,防止批量更新程序占用过多的硬件资源。

在 Java (JDBC) 中,批量更新程序类似于以下内容:

Java 代码部分:

  1. package com.pingcap.bulkUpdate;
  2. import com.mysql.cj.jdbc.MysqlDataSource;
  3. import java.sql.*;
  4. import java.util.LinkedList;
  5. import java.util.List;
  6. import java.util.concurrent.TimeUnit;
  7. public class BatchUpdateExample {
  8. static class UpdateID {
  9. private Long bookID;
  10. private Long userID;
  11. public UpdateID(Long bookID, Long userID) {
  12. this.bookID = bookID;
  13. this.userID = userID;
  14. }
  15. public Long getBookID() {
  16. return bookID;
  17. }
  18. public void setBookID(Long bookID) {
  19. this.bookID = bookID;
  20. }
  21. public Long getUserID() {
  22. return userID;
  23. }
  24. public void setUserID(Long userID) {
  25. this.userID = userID;
  26. }
  27. @Override
  28. public String toString() {
  29. return "[bookID] " + bookID + ", [userID] " + userID ;
  30. }
  31. }
  32. public static void main(String[] args) throws InterruptedException {
  33. // Configure the example database connection.
  34. // Create a mysql data source instance.
  35. MysqlDataSource mysqlDataSource = new MysqlDataSource();
  36. // Set server name, port, database name, username and password.
  37. mysqlDataSource.setServerName("localhost");
  38. mysqlDataSource.setPortNumber(4000);
  39. mysqlDataSource.setDatabaseName("bookshop");
  40. mysqlDataSource.setUser("root");
  41. mysqlDataSource.setPassword("");
  42. UpdateID lastID = batchUpdate(mysqlDataSource, null);
  43. System.out.println("first time batch update success");
  44. while (true) {
  45. TimeUnit.SECONDS.sleep(1);
  46. lastID = batchUpdate(mysqlDataSource, lastID);
  47. System.out.println("batch update success, [lastID] " + lastID);
  48. }
  49. }
  50. public static UpdateID batchUpdate (MysqlDataSource ds, UpdateID lastID) {
  51. try (Connection connection = ds.getConnection()) {
  52. UpdateID updateID = null;
  53. PreparedStatement selectPs;
  54. if (lastID == null) {
  55. selectPs = connection.prepareStatement(
  56. "SELECT `book_id`, `user_id` FROM `bookshop`.`ratings` " +
  57. "WHERE `ten_point` != true ORDER BY `book_id`, `user_id` LIMIT 1000");
  58. } else {
  59. selectPs = connection.prepareStatement(
  60. "SELECT `book_id`, `user_id` FROM `bookshop`.`ratings` "+
  61. "WHERE `ten_point` != true AND `book_id` > ? AND `user_id` > ? "+
  62. "ORDER BY `book_id`, `user_id` LIMIT 1000");
  63. selectPs.setLong(1, lastID.getBookID());
  64. selectPs.setLong(2, lastID.getUserID());
  65. }
  66. List<Long> idList = new LinkedList<>();
  67. ResultSet res = selectPs.executeQuery();
  68. while (res.next()) {
  69. updateID = new UpdateID(
  70. res.getLong("book_id"),
  71. res.getLong("user_id")
  72. );
  73. idList.add(updateID.getBookID());
  74. idList.add(updateID.getUserID());
  75. }
  76. if (idList.isEmpty()) {
  77. System.out.println("no data should update");
  78. return null;
  79. }
  80. String updateSQL = "UPDATE `bookshop`.`ratings` SET `ten_point` = true, "+
  81. "`score` = `score` * 2 WHERE (`book_id`, `user_id`) IN (" +
  82. placeHolder(idList.size() / 2) + ")";
  83. PreparedStatement updatePs = connection.prepareStatement(updateSQL);
  84. for (int i = 0; i < idList.size(); i++) {
  85. updatePs.setLong(i + 1, idList.get(i));
  86. }
  87. int count = updatePs.executeUpdate();
  88. System.out.println("update " + count + " data");
  89. return updateID;
  90. } catch (SQLException e) {
  91. e.printStackTrace();
  92. }
  93. return null;
  94. }
  95. public static String placeHolder(int n) {
  96. StringBuilder sb = new StringBuilder();
  97. for (int i = 0; i < n ; i++) {
  98. sb.append(i == 0 ? "(?,?)" : ",(?,?)");
  99. }
  100. return sb.toString();
  101. }
  102. }

hibernate.cfg.xml 配置部分:

  1. <?xml version='1.0' encoding='utf-8'?>
  2. <!DOCTYPE hibernate-configuration PUBLIC
  3. "-//Hibernate/Hibernate Configuration DTD 3.0//EN"
  4. "http://www.hibernate.org/dtd/hibernate-configuration-3.0.dtd">
  5. <hibernate-configuration>
  6. <session-factory>
  7. <!-- Database connection settings -->
  8. <property name="hibernate.connection.driver_class">com.mysql.cj.jdbc.Driver</property>
  9. <property name="hibernate.dialect">org.hibernate.dialect.TiDBDialect</property>
  10. <property name="hibernate.connection.url">jdbc:mysql://localhost:4000/movie</property>
  11. <property name="hibernate.connection.username">root</property>
  12. <property name="hibernate.connection.password"></property>
  13. <property name="hibernate.connection.autocommit">false</property>
  14. <property name="hibernate.jdbc.batch_size">20</property>
  15. <!-- Optional: Show SQL output for debugging -->
  16. <property name="hibernate.show_sql">true</property>
  17. <property name="hibernate.format_sql">true</property>
  18. </session-factory>
  19. </hibernate-configuration>

每次迭代中,SELECT 按主键顺序进行查询,最多选择 1000 行未更新到 10 分制(ten_pointfalse)数据的主键值。每次 SELECT 都会选择比上一次 SELECT 结果的最大主键还要大的数据,防止重复。然后,使用批量更新的方式,对其 score 列乘 2,并且将 ten_point 设为 true,更新 ten_point 的意义是在于防止更新程序崩溃重启后,反复更新同一行数据,导致数据损坏。每次循环中的 TimeUnit.SECONDS.sleep(1); 将使得更新程序暂停 1 秒,防止批量更新程序占用过多的硬件资源。