业务场景

在平时与数据库打交道的过程中,我们经常会有这样的疑惑:如何快速的获取数据变更记录呢?举个例子,搜索引擎要为外部客人提供快速准确的商品信息搜索功能,那么当有新的商品数据变更后,搜索引擎如何快速的发现这些新的变更数据呢?我们常见的两种做法:

全量更新

这种方法最为简单直接,反正不管三七二十一,搜索引擎每次全量拉取商品信息表所有数据,然后创建搜索索引,提供给外部客人查询。这种方法实现起来的确最为简单,当然同时也具有非常明显的缺点:

  • 浪费资源: 假如商品的变更频率为20%,那么剩下的80%商品实际上是不需要更新的。换句话说全量更新会浪费掉80%的系统资源(IO/CPU/Memory)来做无用功。
  • 耗时严重: 由于获取的是表的全量数据,所以全量更新大大增加了数据获取阶段和搜索索引生成阶段锁的概率,加之浪费资源做无用功,最终导致时间消耗大大拉长。
  • 数据更新时效性差: 由于耗时严重,所以导致数据更新不及时,时效性差,随着商品量的不断扩大,这种时效性会越来越差,最终导致客户抱怨。

全量+增量更新

针对全量更新的种种“罪行”,我们可以有针对性的采用全量+增量更新的方式来有效解决。这种方法的思路是,我们可以周期性的做全量更新,比如每天或者每周,然后在两个全量更新周期之间,我们采用增量更新的方式来覆盖新的数据变更,比如每小时或者每分钟。增量更新问题的关键在于如何获取数据变更记录,让我们来看看关系型数据库MSSQL Server是如何提供解决方法的。

MSSQL获取数据变更

MSSQL Server提供了一个函数,名为COLUMNS_UPDATED可以解决这个问题。先让我们来看看微软官方的解释:返回 varbinary 位模式,它指示表或视图中插入或更新了哪些列。官方文档的解释非常的抽象,如果想要使用这个函数来获取数据变更记录,我们需要踩过很多坑,突破很多点,这也是这篇文章的价值。

COLUMNS_UPDATED

首先,我们来看看这个函数表达的含义。假如某张表有8个字段,那么COLUMNS_UPDATED使用一个byte,八个bit来表示哪些列发生了数据变更,表示方法如下:

Col_id87654321
Bit76543210
Value1286432168421

Col_id:表字段顺序ID

Bit:bit位顺序,从0开始

Value:2的bit次方

当某些列被更新后,COLUMNS_UPDATED函数会返回varbinary位模式(varbinary位模式是什么?可以理解为所有列Value的SUM值的二进制格式)。比如:当第二列和第四列被更新,那么COLUMNS_UPDATED的varbinary位模式是2 + 8 = 10。来看一个具体的例子。

  1. use tempdb
  2. GO
  3. IF EXISTS(SELECT TOP 1 1
  4. FROM sys.tables
  5. WHERE name = 'employeeData')
  6. DROP TABLE employeeData;
  7. GO
  8. CREATE TABLE dbo.employeeData (
  9. col1 int identity(1,1) not null,
  10. col2 int NOT NULL,
  11. col3 int NOT NULL,
  12. col4 int NOT NULL,
  13. col5 int NOT NULL,
  14. col6 int NOT NULL,
  15. col7 int NOT NULL constraint uni unique,
  16. col8 int NOT NULL,
  17. );
  18. GO
  19. CREATE TRIGGER dbo.Trg_UID_employeeData
  20. ON dbo.employeeData
  21. AFTER UPDATE,INSERT,DELETE
  22. AS
  23. BEGIN
  24. declare
  25. @table_id int = 0
  26. ;
  27. select top 1
  28. @table_id = parent_id
  29. from sys.triggers with(nolock)
  30. where object_id = @@procid;
  31. select updated_columns =
  32. stuff
  33. (replace(
  34. replace(
  35. (
  36. select column_name = quotename(name)
  37. from sys.columns with(Nolock)
  38. where object_id = @table_id
  39. and CONVERT(VARBINARY,COLUMNS_UPDATED()) & POWER(2, column_id - 1) = POWER(2, column_id - 1)
  40. order by column_id asc
  41. for xml path('')
  42. )
  43. ,'<column_name>',',')
  44. ,'</column_name>','')
  45. ,1,1,'')
  46. ,columns_updated_value = cast(COLUMNS_UPDATED() as int)
  47. END
  48. GO
  49. --test DML actions
  50. --INSERT
  51. INSERT INTO dbo.employeeData
  52. VALUES ( 2, 3, 4, 5, 6, 7, 8);
  53. GO
  54. --UPDATE
  55. UPDATE A
  56. SET col2 = col2 + 10
  57. ,col4 = col4 + 11
  58. FROM dbo.employeeData AS A
  59. --DELETE
  60. delete from dbo.employeeData

结果如下:

01.png

踩过的坑1: INT数据类型溢出

注意上面的代码POWER(2, column_id - 1),返回的应该是一个INT数据类型的值。在MSSQL SQL Server中INT类型使用4个字节来存储,也就是32bit,换句话说,当表的字段列个数达到32时,这个POWER操作会导致INT数据类型溢出而报告异常。当我们将上面的表字段加到32个后,INSERT和UPDATE操作会导致TRIGGER报告如下错误:

  1. Msg 232, Level 16, State 3, Procedure Trg_UID_employeeData, Line 15
  2. Arithmetic overflow error for type int, value = 2147483648.000000.
  3. The statement has been terminated.
  4. Msg 232, Level 16, State 3, Procedure Trg_UID_employeeData, Line 15
  5. Arithmetic overflow error for type int, value = 2147483648.000000.
  6. The statement has been terminated.

踩过的坑2:BIGINT数据类型溢出

关于这个问题,在没有完美的解决方法之前,很长一段时间,我们强制将POWER转化为BIGINT数据类型来暂时突破32个字段数量限制。但是,这个坑原理和上面一样,仅仅是将字段数量从32个扩大到64个。方法如下:

  1. ...
  2. and CONVERT(VARBINARY,COLUMNS_UPDATED()) & POWER(cast(2 as bigint), column_id - 1) = POWER(cast(2 as bigint), column_id - 1)
  3. ...

如何完美的解决上面两个坑,我们先暂时留个悬念。

庖丁解牛

让我们回到最原始的需求,对于DML操作,不外乎三种,即INSERT,UPDATE和DELETE。我们的Trigger必须具备识别这三种操作类型的能力。

INSERT:Trigger需要具备识别表数据行唯一标识(RID)的能力(通常是主键),然后通过RID反过来查询正式表即可。

UPDATE:Trigger需要具备识别哪些字段被更新的能力,然后通过RID获取这些被更新的字段的值。

DELETE:Trigger获取到数据行唯一标识即可,通过RID删除对应的行。

综合了所有这些分析以后,我们可以使用如下的TRIGGER来捕获数据变更。

  1. use tempdb
  2. GO
  3. --create table to save changed data.
  4. if object_id('dbo.triggeredDataLog', 'U') is not null
  5. drop table dbo.triggeredDataLog
  6. GO
  7. create table dbo.triggeredDataLog(
  8. rowid bigint identity(1,1) not null primary key,
  9. database_name sysname not null,
  10. schame_name sysname not null,
  11. table_object_name sysname not null,
  12. operation char(1) not null,
  13. RID nvarchar(1000) not null,
  14. updated_columns nvarchar(max) null,
  15. indate datetime not null default (getdate()),
  16. intime timestamp not null,
  17. )
  18. IF EXISTS(SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES
  19. WHERE TABLE_NAME = 'employeeData')
  20. DROP TABLE employeeData;
  21. GO
  22. --create table for testing.
  23. CREATE TABLE dbo.employeeData (
  24. id int identity(1,1) not null,
  25. c1 int null,
  26. c2 int null,
  27. c3 int null,
  28. c4 int null,
  29. c5 int null,
  30. c6 int null,
  31. c7 int null,
  32. c8 int null,
  33. c9 int null,
  34. c10 int null,
  35. c11 int null,
  36. c12 int null,
  37. c13 int null,
  38. c14 int null,
  39. c15 int null,
  40. c16 int null,
  41. c17 int null,
  42. c18 int null,
  43. c19 int null,
  44. c20 int null,
  45. c21 int null,
  46. c22 int null,
  47. c23 int null,
  48. c24 int null,
  49. c25 int null,
  50. c26 int null,
  51. c27 int null,
  52. c28 int null,
  53. c29 int null,
  54. c30 int null,
  55. c31 int null,
  56. c32 int null,
  57. c33 int null,
  58. c34 int null,
  59. c35 int null,
  60. c36 int null,
  61. c37 int null,
  62. c38 int null,
  63. c39 int null,
  64. c40 int null,
  65. c41 int null,
  66. c42 int null,
  67. c43 int null,
  68. c44 int null,
  69. c45 int null,
  70. c46 int null,
  71. c47 int null,
  72. c48 int null,
  73. c49 int null,
  74. c50 int null,
  75. c51 int null,
  76. c52 int null,
  77. c53 int null,
  78. c54 int null,
  79. c55 int null,
  80. c56 int null,
  81. c57 int null,
  82. c58 int null,
  83. c59 int null,
  84. c60 int null,
  85. c61 int null,
  86. c62 int null,
  87. c63 int null,
  88. c64 int null,
  89. c65 int null,
  90. c66 int null,
  91. c67 int null,
  92. c68 int null,
  93. c69 int null
  94. );
  95. GO
  96. CREATE TRIGGER dbo.Trg_UID_employeeData
  97. ON dbo.employeeData
  98. AFTER UPDATE,INSERT,DELETE
  99. AS
  100. BEGIN
  101. SET NOCOUNT ON;
  102. --=======================================
  103. -- get DML Action (INSERT,UPDATE,DELETE)
  104. DECLARE
  105. @OperationType CHAR(1)
  106. ,@table_id int = 0
  107. ;
  108. select top 1
  109. @OperationType = 'D'
  110. ,@table_id = parent_id
  111. from sys.triggers with(nolock)
  112. where object_id = @@procid
  113. ;
  114. --get operation type:
  115. --record in inserted & deleted, that means UPDATE DML operation
  116. --record in inserted but not in deleted, that means INSERT DML operation
  117. --by default, we set operation type as DELETE DML operation
  118. IF EXISTS (SELECT TOP 1 1 FROM inserted)
  119. BEGIN
  120. IF EXISTS (SELECT TOP 1 1 FROM deleted)
  121. BEGIN
  122. SET @OperationType = 'U'; --UPDATE
  123. END
  124. ELSE
  125. SET @OperationType = 'I'; --INSERT
  126. END
  127. -- end of getting DML Action
  128. --=======================================
  129. -- we need to konw PK column(s) or identity column or unqiue column
  130. -- table exists PK
  131. declare
  132. @tb_unique_cols table(
  133. column_name sysname not null
  134. ,data_type sysname not null)
  135. IF EXISTS( --primary key
  136. select * from sys.indexes WITH(NOLOCK)
  137. where object_id = @table_id
  138. and is_primary_key = 1
  139. )
  140. BEGIN
  141. INSERT INTO @tb_unique_cols
  142. SELECT
  143. column_name = col.name
  144. ,data_type = ty.name
  145. FROM sys.indexes AS i with(NOLOCK)
  146. INNER JOIN sys.index_columns AS ic with(NOLOCK)
  147. ON i.OBJECT_ID = ic.OBJECT_ID
  148. AND i.index_id = ic.index_id
  149. INNER JOIN sys.columns AS col with(NOLOCK)
  150. ON i.object_id = col.object_id
  151. INNER JOIN sys.types as ty with(NOLOCK)
  152. ON col.user_type_id = ty.user_type_id
  153. WHERE i.is_primary_key = 1
  154. and i.object_id = @table_id
  155. and ic.column_id = col.column_id
  156. END
  157. ELSE IF EXISTS( --table doesn't have primary key but table exists identity
  158. select * from sys.columns
  159. where object_id = @table_id
  160. and is_identity = 1
  161. )
  162. BEGIN
  163. INSERT INTO @tb_unique_cols
  164. select column_name = col.name,data_type = ty.name
  165. from sys.columns as col with(NOLOCK)
  166. INNER JOIN sys.types as ty with(NOLOCK)
  167. ON col.user_type_id = ty.user_type_id
  168. where col.object_id = @table_id
  169. and col.is_identity = 1
  170. END
  171. ELSE IF EXISTS( --table doesn't have primary key/indentity but table has unique index or constraint
  172. select * from sys.indexes with(NOLOCK)
  173. where object_id = @table_id
  174. and is_unique = 1
  175. )
  176. BEGIN
  177. INSERT INTO @tb_unique_cols
  178. SELECT TOP 1 column_name = col.name
  179. ,data_type = ty.name
  180. FROM sys.indexes AS i with(NOLOCK)
  181. INNER JOIN sys.index_columns AS ic with(NOLOCK)
  182. ON i.OBJECT_ID = ic.OBJECT_ID
  183. AND i.index_id = ic.index_id
  184. INNER JOIN sys.columns AS col with(NOLOCK)
  185. ON i.object_id = col.object_id
  186. and col.column_id = ic.column_id
  187. INNER JOIN sys.types as ty with(NOLOCK)
  188. ON col.user_type_id = ty.user_type_id
  189. WHERE i.is_unique = 1
  190. and i.object_id = @table_id
  191. END
  192. --=======================================
  193. --get PK set: [pk1] = 1 and [pk2] = 'ABSDEF' and [pk3] = 'Jul 29 2016 5:04PM'
  194. declare
  195. @unique_cols_list nvarchar(max)
  196. ,@sql nvarchar(max)
  197. ,@RID nvarchar(max)
  198. ,@database_name sysname
  199. ,@schema_name sysname
  200. ,@table_object_name sysname
  201. ;
  202. select @unique_cols_list = '''' +
  203. stuff
  204. (replace(
  205. replace(
  206. (
  207. select column_name = N'+ ' + quotename( N' and ' + quotename(column_name)+ N' = ', '''') + N'+ ' +
  208. case
  209. when data_type in ('char','nchar','varchar','nvarchar','date','datetime','datetime2','smalldatetime') then N'quotename('
  210. else ''
  211. end +'cast('+quotename(column_name) +' as varchar)' +
  212. case
  213. when data_type in ('char','nchar','varchar','nvarchar','date','datetime','datetime2','smalldatetime') then N','''''''')'
  214. else ''
  215. end
  216. from @tb_unique_cols
  217. for xml path('')
  218. )
  219. ,'<column_name>','')
  220. ,'</column_name>','')
  221. ,1,8,'')
  222. ,@database_name = db_name()
  223. ,@schema_name = schema_name(schema_id)
  224. ,@table_object_name = object_name(object_id)
  225. from sys.tables
  226. where object_id = @table_id
  227. --end get PK set
  228. --end of table PK/identity/unique generation
  229. --=======================================
  230. -- recording the DML into log
  231. IF @OperationType = 'I' --INSERT
  232. BEGIN
  233. IF EXISTS(select TOP 1 1 from inserted)
  234. BEGIN
  235. select * into #inserted from inserted
  236. set
  237. @sql = N'SELECT @RID = '+ @unique_cols_list + N' FROM #inserted'
  238. ;
  239. exec sys.sp_executesql @sql
  240. ,N'@RID nvarchar(max) output'
  241. ,@RID = @RID output
  242. ;
  243. --select @sql,@RID
  244. INSERT INTO dbo.triggeredDataLog(database_name,schame_name,table_object_name,operation,RID)
  245. select @database_name,@schema_name,@table_object_name,@OperationType, @RID
  246. END
  247. END
  248. ELSE IF @OperationType = 'U' --UPDATE
  249. BEGIN
  250. --we need to konw PK column(s) & updated columns
  251. IF EXISTS(select TOP 1 1 from deleted)
  252. BEGIN
  253. /*start
  254. get updated columns
  255. */
  256. DECLARE
  257. @Columns_Updated NVARCHAR(max)
  258. ,@maxByteCU INT
  259. ,@curByteCU INT
  260. ,@cByte INT
  261. ,@curBit INT
  262. ,@maxBit INT
  263. ;
  264. SELECT
  265. @maxByteCU = DATALENGTH(COLUMNS_UPDATED())
  266. ,@Columns_Updated = N''
  267. ,@curByteCU = 1
  268. WHILE @curByteCU <= @maxByteCU
  269. BEGIN
  270. SELECT @cByte = SUBSTRING(COLUMNS_UPDATED(), @curByteCU, 1)
  271. ,@curBit = 1
  272. ,@maxBit = 8
  273. ;
  274. WHILE @curBit <= @maxBit
  275. BEGIN
  276. IF CONVERT(BIT, @cByte & POWER(2,@curBit - 1)) <> 0
  277. --SET @Columns_Updated = @Columns_Updated + '[' + CONVERT(VARCHAR, 8 * (@curByteCU - 1) + @curBit) + ']'
  278. select @Columns_Updated = @Columns_Updated + QUOTENAME(name) + ','
  279. from sys.columns with(Nolock)
  280. where object_id = @table_id
  281. and column_id = 8 * (@curByteCU - 1) + @curBit
  282. SET @curBit = @curBit + 1
  283. END
  284. SET @curByteCU = @curByteCU + 1
  285. END
  286. /*end
  287. get updated columns
  288. */
  289. select * into #deleted from deleted
  290. set
  291. @sql = N'SELECT @RID = '+ @unique_cols_list + N' FROM #deleted'
  292. ;
  293. exec sys.sp_executesql @sql
  294. ,N'@RID nvarchar(max) output'
  295. ,@RID = @RID output
  296. ;
  297. INSERT INTO dbo.triggeredDataLog(database_name,schame_name,table_object_name,operation,RID,updated_columns)
  298. select @database_name,@schema_name,@table_object_name,@OperationType, @RID,left(@Columns_Updated,len(@Columns_Updated) - 1)
  299. END
  300. END
  301. ELSE --DELETE
  302. BEGIN
  303. --we need to konw PK column(s)
  304. IF EXISTS(select TOP 1 1 from deleted)
  305. BEGIN
  306. select * into #deleted1 from deleted
  307. set
  308. @sql = N'SELECT @RID = '+ @unique_cols_list + N' FROM #deleted1'
  309. ;
  310. exec sys.sp_executesql @sql
  311. ,N'@RID nvarchar(max) output'
  312. ,@RID = @RID output
  313. ;
  314. INSERT INTO dbo.triggeredDataLog(database_name,schame_name,table_object_name,operation,RID)
  315. select @database_name,@schema_name,@table_object_name,@OperationType, @RID
  316. END
  317. END
  318. END
  319. GO
  320. --=======================================
  321. -- table just has identity column
  322. -- Testing INSERT
  323. INSERT INTO dbo.employeeData(c1,c2,c3,c4)
  324. VALUES(1,2,3,4)
  325. --Testing UPDATE
  326. UPDATE TOP(1) A
  327. SET c64 = 64
  328. ,c65 = 65
  329. FROM dbo.employeeData AS A
  330. --Testing DELETE
  331. DELETE TOP (1) A
  332. FROM dbo.employeeData AS A
  333. --=======================================
  334. -- table has unique constraint
  335. ALTER TABLE dbo.employeeData
  336. DROP COLUMN ID;
  337. ALTER TABLE dbo.employeeData ADD
  338. c70 int NOT NULL constraint uni_c70 unique
  339. GO
  340. -- Testing INSERT
  341. INSERT INTO dbo.employeeData(c1,c2,c3,c4,c70)
  342. VALUES(1,2,3,4,70)
  343. --Testing UPDATE
  344. UPDATE TOP(1) A
  345. SET c64 = 64
  346. ,c65 = 65
  347. FROM dbo.employeeData AS A
  348. --Testing DELETE
  349. DELETE TOP (1) A
  350. FROM dbo.employeeData AS A
  351. --=======================================
  352. -- table has primary key
  353. ALTER TABLE dbo.employeeData ADD
  354. pk1 int NOT NULL,
  355. pk2 varchar(100) not null,
  356. pk3 datetime not null default(getdate());
  357. ALTER TABLE dbo.employeeData ADD
  358. CONSTRAINT pk primary key(pk1,pk2,pk3)
  359. GO
  360. -- Testing INSERT
  361. INSERT INTO dbo.employeeData(pk1,pk2,pk3,c70)
  362. VALUES(1,2,GETDATE(),70)
  363. --Testing UPDATE
  364. UPDATE TOP(1) A
  365. SET c64 = 64
  366. ,c65 = 65
  367. FROM dbo.employeeData AS A
  368. --Testing DELETE
  369. DELETE TOP (1) A
  370. FROM dbo.employeeData AS A
  371. GO
  372. select * from dbo.triggeredDataLog with(NOLOCK) order by intime asc

结果分析

最后一条查询语句结果如下截图:

02.png

Rowid 1-3:表无主键,但存在IDENTITY属性列的情况,RID为IDENTITY属性列的值,我们抓取到的RID和Updated_columns

Rowid 4-6:表无主键,但存在UNIQUE约束的情况,RID为UNIQUE列的值,取到的RID和Updated_columns

Rowid 7-9:表有主键,这里是更加复杂的联合主键,RID为联合主键的值,取到的RID和Updated_columns

在本例的表字段个数超过了64个,达到73个,我们是采用循环获取的方式来踩过坑1和2,具体代码268行到307行。

总结

到目前为止,我们的搜索引擎只需要从dbo.triggeredDataLog表中获取数据变更RID和相应发生了变化的字段Updated_columns,而不需要从正式表中整个拉取全量数据,节约了数据库系统开销,增加了搜索索引创建的时效性,提高了客户体验。

注意:

这里需要特别提醒,正式表dbo.employeeData上千万不要使用TRUNCATE TABLE的操作,因为TRUNCATE动作无法激活触发器。

  1. --forbidden action
  2. TRUNCATE TABLE dbo.employeeData;