计算机科学中的任何问题都可以用另外的间接层解决,但是这通常会引发另一个问题。 — David Wheeler
2.15.1 新型计划任务回顾
在 [1.31]-新型计划任务:以接口形式实现的计划任务 一章中,我们讨论了PhalApi中对计划任务的设计和底层实现。
但对于很多应用,很多项目,或者很多同学来说,仍然比较广泛,不能直接使用。这一章则专门为此而进行演进,并提供最终可用的计划任务调度,同时我们也会阐明如何进行扩展定制。
也就是说,这一章将提供Task扩展类库的统一调度方式,以便在启动crontab任务后,可以通过数据库简单配置,即可执行各种任务。
2.15.2 最终调度的方式:crontab
出于对业务的考虑,我们首先需要明确此crontab调度方式所支持的功能,它应该包括但不限于:
- 1、通过简单的数据库配置,即可启动一个新的任务
- 2、具备循环调度的能力,并能初步防止并发调度
- 3、可以对异常的任务进行修复
- 4、优先执行太远未执行的任务
- 5、支持本地和远程两种调度方式、三种MQ类型,以及扩展的能力
2.15.3 核心时序图与分层
在原来的时序图基础上,我们可以进行演进的设计,追加了统一的调度后如下所示:
通过上面详细的时序图,我们可以发现里面的设计是出于这样的分层考虑:
序号 | 层 | 关键操作 | 说明 | 如何使用 |
---|---|---|---|---|
1 | 启动脚本 | crontab.php | 操作crontab执行的脚本 | 客户端可以进行必要的初始化工作 |
2 | 进程级 | Task_Progress::run() | 根据进程配置的数据库表,进行循环调度 | 不需要改动,直接使用 |
3 | 触发器 | Task_Trigger::fire() | 进行计划任务调度的上下文环境,用于指定runner和mq类型 | 客户端也可进行定制扩展,进行必要的操作 |
4 | MQ消费与调度 | Task_MQ::pop()和Task_Runner::go() | 不断消费MQ队列,并依次进行调度 | 不需要改动,直接使用,也可扩展 |
5 | 计划任务服务 | PhalApi_Api::doSth() | 执行计划任务服务 | 由客户端按接口形式实现 |
虽然上面的层级,初看起来有点多,但我们再次验证了计算机那个伟大的定论:计算机的任何问题都可以通过一个中间层来解决。
由此看出,上面的层级其实相当于:
客户端初始化 --> 直接使用 --> 自由组合与操作 --> 直接使用 --> 任务服务实现
2.16.4 进程配置的数据库表设计
CREATE TABLE `phalapi_task_progress` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`title` varchar(200) DEFAULT '' COMMENT '任务标题',
`trigger_class` varchar(50) DEFAULT '' COMMENT '触发器类名',
`fire_params` varchar(255) DEFAULT '' COMMENT '需要传递的参数,格式自定',
`interval_time` int(11) DEFAULT '0' COMMENT '执行间隔,单位:秒',
`enable` tinyint(1) DEFAULT '1' COMMENT '是否启动,1启动,0禁止',
`result` varchar(255) DEFAULT '' COMMENT '运行的结果,以json格式保存',
`state` tinyint(1) DEFAULT '0' COMMENT '进程状态,0空闲,1运行中,-1异常退出',
`last_fire_time` int(11) DEFAULT '0' COMMENT '上一次运行时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;
对此表的关键字段说明如下:
字段 | 说明 | 示例 |
---|---|---|
trigger_class | 触发器的类名 | 须实现Task_Progress_Trigger::fire($params)接口 |
fire_params | 触发器的参数 | 加传给Task_Progress_Trigger::fire()函数的参数,格式为:service&MQ类名&runner类名 |
interval_time | 执行间隔 | 单位为秒 |
enable | 是否启动 | 此字段禁止时,将不再执行 |
state | 进程状态 | 当此状态一直为异常或者运行且超过1天时,系统会进行修复,即重置为空闲状态 |
其中,对于fire_params参数,MQ类名和runner类名可选,以下是一些示例:
//示例1:完整的配置
//fire_params=Task_Demo.DoSth&Task_MQ_DB&Task_Runner_Local
$mq = new Task_MQ_DB();
$runner = new Task_Runner_Local($mq);
$runner->go('Task_Demo.DoSth');
//示例2:使用默认的Runner
//fire_params=Task_Demo.DoSth&Task_MQ_DB
$mq = new Task_MQ_DB();
$runner = new Task_Runner_Local($mq); //默认使用本地Runner
$runner->go('Task_Demo.DoSth');
//示例3:使用默认的MQ和默认的Runner
//fire_params=Task_Demo.DoSth
$mq = new Task_MQ_Redis(); //默认使用redis的MQ
$runner = new Task_Runner_Local($mq); //默认使用本地Runner
$runner->go('Task_Demo.DoSth');
//示例4:使用自定义的MQ和Runner
//fire_params=Task_Demo.DoSth&My_MQ&My_Runner
class My_MQ implements Task_MQ {
// ...
}
class My_Runner extends Task_Runner {
// ...
}
$mq = new My_MQ();
$runner = new My_Runner($mq);
$runner->go('Task_Demo.DoSth');
2.15.5 运行效果
最终的效果就是,我们通过这样两行简单的代码,即可实现一系列复杂的任务调度:
$progress = new Task_Progress();
$progress->run();
让我们来看下这样设计的运行效果吧!看下这两行代码背后所产生的魔力。
首先,我们先添加两条计划任务:
INSERT INTO `phalapi_task_progress` VALUES ('1', 'test demo', 'Task_Progress_Trigger_Common', 'Task_Demo.DoSth&Task_MQ_File&Task_Runner_Local', '300', '1', '', '0', '0');
INSERT INTO `phalapi_task_progress` VALUES ('2', 'test ok', 'Task_Progress_Trigger_Common', 'Default.Index&Task_MQ_DB&Task_Runner_Local', '100', '1', '', '0', '0');
然后,伪造一些MQ:
INSERT INTO `phalapi_task_mq_0` VALUES ('8', 'Default.Index', '', '0', '');
最后,生成单元测试:
<?php
class PhpUnderControl_TaskProgress_Test extends PHPUnit_Framework_TestCase
{
public $taskProgress;
protected function setUp()
{
parent::setUp();
$this->taskProgress = new Task_Progress();
}
/**
* @group testRun
*/
public function testRun()
{
$rs = $this->taskProgress->run();
}
}
并执行之:
$ phpunit ./Task_Progress_Test.php
[1 - 0.06666s]SELECT id, title FROM phalapi_task_progress WHERE (state != ?) AND (last_fire_time < ?) AND (enable = ?) ORDER BY last_fire_time ASC; -- 0, 1431965153, 1<br>
[2 - 0.07002s]SELECT id, title, trigger_class, fire_params FROM phalapi_task_progress WHERE (state = 0) AND (interval_time + last_fire_time < ?) AND (enable = ?); -- 1432051553, 1<br>
[3 - 0.06549s]SELECT enable, state FROM phalapi_task_progress WHERE (id = '1');<br>
[4 - 0.07432s]UPDATE phalapi_task_progress SET state = 1 WHERE (id = '1');<br>
[5 - 0.06469s]UPDATE phalapi_task_progress SET result = '{\"total\":0,\"fail\":0}', state = 0, last_fire_time = 1432051553 WHERE (id = '1');<br>
[6 - 0.06746s]SELECT enable, state FROM phalapi_task_progress WHERE (id = '2');<br>
[7 - 0.07043s]UPDATE phalapi_task_progress SET state = 1 WHERE (id = '2');<br>
[8 - 0.06673s]SELECT id, params FROM phalapi_task_mq_0 WHERE (service = 'Default.Index') ORDER BY id ASC LIMIT 0,10;<br>
[9 - 0.48185s]DELETE FROM phalapi_task_mq_0 WHERE (id IN ('8'));<br>
[10 - 0.06514s]SELECT id, params FROM phalapi_task_mq_0 WHERE (service = 'Default.Index') ORDER BY id ASC LIMIT 0,10;<br>
[11 - 0.50694s]UPDATE phalapi_task_progress SET result = '{\"total\":1,\"fail\":0}', state = 0, last_fire_time = 1432051553 WHERE (id = '2');<br>
Time: 1.98 seconds, Memory: 6.50Mb
OK (1 test, 0 assertions)
查看对比一下数据库,目前发现运行良好!
提交代码,保存文档,收工睡觉!
2.15.6 演进的乐趣
得益于前期良好的设计以及底层支持,我们发现,在提供这样一种统一的调度方式是非常方便的。
不仅如此,如果你明白了其中的设计,需要进行定制和扩展也是非常方便的。也就是说,我们不仅提供了一种具体实际可用的方式,也提供了广阔自由的扩展空间。具体与抽象,两者仍可得。
然而,这一切不仅依赖于良好的设计,还依赖于测试驱动开发下的浮现式设计。