在上一期的月报中,我们分析了 MySQL 5.6 并行复制是如何实现的,介绍了主要数据结构、Coordinator 线程的分发、Worker 线程的执行和checkpoint过程,读者朋友可以回顾下,本篇将对恢复逻辑进行介绍。
在并行复制之前,SQL线程的恢复很简单,从 relay-log.info 中取得上次执行到的位点,然后从这个位点开始执行即可。有了并行复制之后,情况就变得稍微复杂了些,worker 线程各自执行自己队列的事务,在stop slave
或者 mysqld crash的时候,队列中的事务很可能没有执行完,比如crash时GAQ的状态如下图1所示,中间存在空隙(gap),先分发给 worker a 的事务还未完成,而后分发给 worker b 的事务已经完成,对应就是 relay log 中间有一部分event没执行。我们知道,SQL执行或者分发是顺序读relay log的,如果恢复时从 2 开始执行,3 和 4就会重复执行,如果从4开始执行,2就会被跳过,都不行。并行复制恢复的逻辑就是把 2 找出来执行,把空隙给填上,然后SQL线程就可以 5 开始愉快地跑下去了。
图1. GAQ中的空隙
信息持久化
恢复离不开信息的持久化,每个worker线程对应一个worker.info,定期将执行位点信息刷入worker.info。类似于relay-log.info,worker.info 可以存在表中,也可以存在文件中,取决于配置relay_log_info_repository
,刷写频率由 sync_relay_log_info
控制。
下面是relay-log.info中存的信息:
Number_of_lines: 后面有多少行(文件)或字段(表)
Relay_log_name: 执行到的relay log 的文件名
Relay_log_pos: 执行到的relay log 的位置
Master_log_name: 执行到的对应在主库 binlog 的文件名
Master_log_pos: 执行到的对应在主库 binlog 的位置
Sql_delay: SQL线程必须落后master的时间,通过 CHANGE MASTER TO MASTER_DELAY=X 指定
Number_of_workers: worker线程个数
Id: 内部用的
下面是worker.info中存的信息:
Id: worker 的 id
Relay_log_name: 执行到的relay log 的文件名
Relay_log_pos: 执行到的relay log 的位置
Master_log_name: 执行到的对应在主库 binlog 的文件名
Master_log_pos: 执行到的对应在主库 binlog 的位置
Checkpoint_relay_log_name: 上次 checkpoint 后,分发到的第一个 group 所在的 relay log 文件名
Checkpoint_relay_log_pos: 同上,对应 relay log 中的位置
Checkpoint_master_log_name: 同上,对应在主库 binlog 的文件名
Checkpoint_master_log_pos: 同上,对应在主库 binlog 中的位置
Checkpoint_seqno: 当前执行到事务序列,从上次checkpoint后开始算
Checkpoint_group_size: checkpoint_group_bitmap 的长度,多少个BYTE
Checkpoint_group_bitmap: 从上次 checkpoitn 执行事务的标记
每个字段对应Slave_worer类的一个成员(Checkpoint_group_size除外),这其中比较重要的就是 Checkpoint_group_bitmap,记录哪些事务是执行过的,下面会介绍对bitmap的操作。
bitmap 记录执行事务
本节介绍对 Slave_worker::group_executed
这个bitmap的操作,在此之前需要介绍另一个变量 Relay_log_info::checkpoint_seqno
,对 Coordinator 线程来说,表示从上次checkpoint调整后,下一个分发的事务编号,同时对应GAQ中事务(Slave_job_group)的个数,我们在上期介绍过,GAQ中存的是Coordinator 线程分发的、尚未被checkpoint出队的事务(可能已经被worker执行完了);对woker线程来说,这个对应当前worker执行到的事务编号。
Coordinator 线程每分发一个事务,checkpoint_seqno 加 1;每次checkpoint后,会将 checkpoint_seqno 减去cnt(cnt为checkpoint时GAQ中出队的事务的个数)。worker 线程每执行完一个事务,会将 group_executed 的 checkpoint_seqno 位置1;如果遇到checkpoint,会将bitmap向左移位。
如下图所示,GAQ中第0、2、5个事务分发给了worker a,第0个已经执行完成,所以 worker a 的 bitmap 中,第0位置1;worker b 和 worker c 的 bitmap 同理,标识已经执行的事务。
图2. worker的bitmap
假设这个时候 Coordinator 线程做了一次 checkpoint,将队列头部2个已经完成的事务出队,然后将rli->checkpoint_seqno
减2,同时将2累加到每个 worker->bitmap_shifted
中,当Coordinator 线程将新的事务分给worker的时候,会将 worker->bitmap_shifted
取出,存人当前Slave_job_group.shifted
中,当worker执行到这个group,就开始对 group_executed 进行偏移,偏移量就是Slave_job_group.shitfed
(再一次说明了GAQ中的Slave_job_group,充当了Coordinator 线程和worker线程通信的角色)。bitmap的变化就如下图所示,checkpoint后,原来的0和1出队,然后新的4、5、6加入进来,新分发给worker b 和 worker c 的 4 和 6 已经执行完成,所以bitamp和上图相比,已经向左路偏移了2位,而新分发worker a的5并示执行,所以worker a 的bitmap还未偏移。
图3. checkpoint后worker的bitmap
group_executed bitmap的长度和GAQ大小一样,由配置slave_checkpoint_group
决定。
恢复逻辑
恢复的主要逻辑是mts_recovery_groups()
这个函数。
在启动slave的时候,如果relay-log.info中存的Number_of_workers不为0,就说明之前是并行复制,然后调用 mts_recovery_groups()
,进入恢复逻辑。如前所述,mts_recovery_groups()
的目的就是根据 slave_worker_info 和 slave_info 中信息,把空隙事务找出来。
首先会创建 Number_of_workers 个 worker,依次把每个worker.info的信息读出来,然后把worker执行位点信息和relay-log.info中记录的位点信息(低水位)相比,如果比后者小,说明崩溃前已经被checkpoint出队,不可能造成空隙,直接跳过;如果比后者大,就把worker存入 above_lwm_jobs
数组。 above_lwm_jobs
收集完成后,初始化bitmap rli->recovery_groups
,用来汇总每个worker的bitmap。对 above_lwm_jobs
中的每个worker,设置一个计数器recovery_group_cnt
,从低水位位点开始扫relay log,每扫完一个事务,recovery_group_cnt
加1,直到扫到worker.info中记录的位点为止,之后把worker的bitmap汇总到rli->recovery_groups
中,其间会统计一个最大的 recovery_group_cnt
,记入rli->mts_recovery_group_cnt
,这个对应高水位。 bitmap 汇总逻辑如下:
sql/rpl_slave.cc:8965
for (uint i= (w->checkpoint_seqno + 1) - recovery_group_cnt,
j= 0; i <= w->checkpoint_seqno; i++, j++)
{
if (bitmap_is_set(&w->group_executed, i))
{
DBUG_PRINT("mts", ("Setting bit %u.", j));
bitmap_fast_test_and_set(groups, j);
}
}
之后SQL线程就可以从低水位往高水位扫relay log,对于每个事务,如果 rli->recovery_groups
对应bit为1,说明崩溃前已经执行过,就跳过;反之,就对事务中的每个event调用 do_apply_event()
执行。扫描到高水位后整个恢复逻辑结束,后面SQL线程就进入正常的执行逻辑,执行(串行)或者分发(并行)event。