在上一期的月报中,我们分析了 MySQL 5.6 并行复制是如何实现的,介绍了主要数据结构、Coordinator 线程的分发、Worker 线程的执行和checkpoint过程,读者朋友可以回顾下,本篇将对恢复逻辑进行介绍。

在并行复制之前,SQL线程的恢复很简单,从 relay-log.info 中取得上次执行到的位点,然后从这个位点开始执行即可。有了并行复制之后,情况就变得稍微复杂了些,worker 线程各自执行自己队列的事务,在stop slave或者 mysqld crash的时候,队列中的事务很可能没有执行完,比如crash时GAQ的状态如下图1所示,中间存在空隙(gap),先分发给 worker a 的事务还未完成,而后分发给 worker b 的事务已经完成,对应就是 relay log 中间有一部分event没执行。我们知道,SQL执行或者分发是顺序读relay log的,如果恢复时从 2 开始执行,3 和 4就会重复执行,如果从4开始执行,2就会被跳过,都不行。并行复制恢复的逻辑就是把 2 找出来执行,把空隙给填上,然后SQL线程就可以 5 开始愉快地跑下去了。

GAQ中的空隙

图1. GAQ中的空隙

信息持久化

恢复离不开信息的持久化,每个worker线程对应一个worker.info,定期将执行位点信息刷入worker.info。类似于relay-log.info,worker.info 可以存在表中,也可以存在文件中,取决于配置relay_log_info_repository,刷写频率由 sync_relay_log_info 控制。

下面是relay-log.info中存的信息:

  1. Number_of_lines: 后面有多少行(文件)或字段(表)
  2. Relay_log_name: 执行到的relay log 的文件名
  3. Relay_log_pos: 执行到的relay log 的位置
  4. Master_log_name: 执行到的对应在主库 binlog 的文件名
  5. Master_log_pos: 执行到的对应在主库 binlog 的位置
  6. Sql_delay: SQL线程必须落后master的时间,通过 CHANGE MASTER TO MASTER_DELAY=X 指定
  7. Number_of_workers: worker线程个数
  8. Id: 内部用的

下面是worker.info中存的信息:

  1. Id: worker id
  2. Relay_log_name: 执行到的relay log 的文件名
  3. Relay_log_pos: 执行到的relay log 的位置
  4. Master_log_name: 执行到的对应在主库 binlog 的文件名
  5. Master_log_pos: 执行到的对应在主库 binlog 的位置
  6. Checkpoint_relay_log_name: 上次 checkpoint 后,分发到的第一个 group 所在的 relay log 文件名
  7. Checkpoint_relay_log_pos: 同上,对应 relay log 中的位置
  8. Checkpoint_master_log_name: 同上,对应在主库 binlog 的文件名
  9. Checkpoint_master_log_pos: 同上,对应在主库 binlog 中的位置
  10. Checkpoint_seqno: 当前执行到事务序列,从上次checkpoint后开始算
  11. Checkpoint_group_size: checkpoint_group_bitmap 的长度,多少个BYTE
  12. Checkpoint_group_bitmap: 从上次 checkpoitn 执行事务的标记

每个字段对应Slave_worer类的一个成员(Checkpoint_group_size除外),这其中比较重要的就是 Checkpoint_group_bitmap,记录哪些事务是执行过的,下面会介绍对bitmap的操作。

bitmap 记录执行事务

本节介绍对 Slave_worker::group_executed 这个bitmap的操作,在此之前需要介绍另一个变量 Relay_log_info::checkpoint_seqno,对 Coordinator 线程来说,表示从上次checkpoint调整后,下一个分发的事务编号,同时对应GAQ中事务(Slave_job_group)的个数,我们在上期介绍过,GAQ中存的是Coordinator 线程分发的、尚未被checkpoint出队的事务(可能已经被worker执行完了);对woker线程来说,这个对应当前worker执行到的事务编号。

Coordinator 线程每分发一个事务,checkpoint_seqno 加 1;每次checkpoint后,会将 checkpoint_seqno 减去cnt(cnt为checkpoint时GAQ中出队的事务的个数)。worker 线程每执行完一个事务,会将 group_executed 的 checkpoint_seqno 位置1;如果遇到checkpoint,会将bitmap向左移位。

如下图所示,GAQ中第0、2、5个事务分发给了worker a,第0个已经执行完成,所以 worker a 的 bitmap 中,第0位置1;worker b 和 worker c 的 bitmap 同理,标识已经执行的事务。

worker线程的bitmap

图2. worker的bitmap

假设这个时候 Coordinator 线程做了一次 checkpoint,将队列头部2个已经完成的事务出队,然后将rli->checkpoint_seqno减2,同时将2累加到每个 worker->bitmap_shifted 中,当Coordinator 线程将新的事务分给worker的时候,会将 worker->bitmap_shifted 取出,存人当前Slave_job_group.shifted 中,当worker执行到这个group,就开始对 group_executed 进行偏移,偏移量就是Slave_job_group.shitfed (再一次说明了GAQ中的Slave_job_group,充当了Coordinator 线程和worker线程通信的角色)。bitmap的变化就如下图所示,checkpoint后,原来的0和1出队,然后新的4、5、6加入进来,新分发给worker b 和 worker c 的 4 和 6 已经执行完成,所以bitamp和上图相比,已经向左路偏移了2位,而新分发worker a的5并示执行,所以worker a 的bitmap还未偏移。

checkpoint后worker线程的bitmap

图3. checkpoint后worker的bitmap

group_executed bitmap的长度和GAQ大小一样,由配置slave_checkpoint_group决定。

恢复逻辑

恢复的主要逻辑是mts_recovery_groups() 这个函数。

在启动slave的时候,如果relay-log.info中存的Number_of_workers不为0,就说明之前是并行复制,然后调用 mts_recovery_groups(),进入恢复逻辑。如前所述,mts_recovery_groups() 的目的就是根据 slave_worker_info 和 slave_info 中信息,把空隙事务找出来。

首先会创建 Number_of_workers 个 worker,依次把每个worker.info的信息读出来,然后把worker执行位点信息和relay-log.info中记录的位点信息(低水位)相比,如果比后者小,说明崩溃前已经被checkpoint出队,不可能造成空隙,直接跳过;如果比后者大,就把worker存入 above_lwm_jobs 数组。 above_lwm_jobs收集完成后,初始化bitmap rli->recovery_groups,用来汇总每个worker的bitmap。对 above_lwm_jobs 中的每个worker,设置一个计数器recovery_group_cnt,从低水位位点开始扫relay log,每扫完一个事务,recovery_group_cnt加1,直到扫到worker.info中记录的位点为止,之后把worker的bitmap汇总到rli->recovery_groups中,其间会统计一个最大的 recovery_group_cnt,记入rli->mts_recovery_group_cnt,这个对应高水位。 bitmap 汇总逻辑如下:

  1. sql/rpl_slave.cc:8965
  2. for (uint i= (w->checkpoint_seqno + 1) - recovery_group_cnt,
  3. j= 0; i <= w->checkpoint_seqno; i++, j++)
  4. {
  5. if (bitmap_is_set(&w->group_executed, i))
  6. {
  7. DBUG_PRINT("mts", ("Setting bit %u.", j));
  8. bitmap_fast_test_and_set(groups, j);
  9. }
  10. }

之后SQL线程就可以从低水位往高水位扫relay log,对于每个事务,如果 rli->recovery_groups 对应bit为1,说明崩溃前已经执行过,就跳过;反之,就对事务中的每个event调用 do_apply_event()执行。扫描到高水位后整个恢复逻辑结束,后面SQL线程就进入正常的执行逻辑,执行(串行)或者分发(并行)event。