计算机科学中的任何问题都可以用另外的间接层解决,但是这通常会引发另一个问题。 — David Wheeler

2.15.1 新型计划任务回顾

[1.31]-新型计划任务:以接口形式实现的计划任务 一章中,我们讨论了PhalApi中对计划任务的设计和底层实现。

但对于很多应用,很多项目,或者很多同学来说,仍然比较广泛,不能直接使用。这一章则专门为此而进行演进,并提供最终可用的计划任务调度,同时我们也会阐明如何进行扩展定制。

也就是说,这一章将提供Task扩展类库的统一调度方式,以便在启动crontab任务后,可以通过数据库简单配置,即可执行各种任务。

2.15.2 最终调度的方式:crontab

出于对业务的考虑,我们首先需要明确此crontab调度方式所支持的功能,它应该包括但不限于:

  • 1、通过简单的数据库配置,即可启动一个新的任务
  • 2、具备循环调度的能力,并能初步防止并发调度
  • 3、可以对异常的任务进行修复
  • 4、优先执行太远未执行的任务
  • 5、支持本地和远程两种调度方式、三种MQ类型,以及扩展的能力

    2.15.3 核心时序图与分层

在原来的时序图基础上,我们可以进行演进的设计,追加了统一的调度后如下所示:a pic

通过上面详细的时序图,我们可以发现里面的设计是出于这样的分层考虑:

序号 关键操作 说明 如何使用
1 启动脚本 crontab.php 操作crontab执行的脚本 客户端可以进行必要的初始化工作
2 进程级 Task_Progress::run() 根据进程配置的数据库表,进行循环调度 不需要改动,直接使用
3 触发器 Task_Trigger::fire() 进行计划任务调度的上下文环境,用于指定runner和mq类型 客户端也可进行定制扩展,进行必要的操作
4 MQ消费与调度 Task_MQ::pop()和Task_Runner::go() 不断消费MQ队列,并依次进行调度 不需要改动,直接使用,也可扩展
5 计划任务服务 PhalApi_Api::doSth() 执行计划任务服务 由客户端按接口形式实现

虽然上面的层级,初看起来有点多,但我们再次验证了计算机那个伟大的定论:计算机的任何问题都可以通过一个中间层来解决。

由此看出,上面的层级其实相当于:

  1. 客户端初始化 --> 直接使用 --> 自由组合与操作 --> 直接使用 --> 任务服务实现

2.16.4 进程配置的数据库表设计

  1. CREATE TABLE `phalapi_task_progress` (
  2. `id` bigint(20) NOT NULL AUTO_INCREMENT,
  3. `title` varchar(200) DEFAULT '' COMMENT '任务标题',
  4. `trigger_class` varchar(50) DEFAULT '' COMMENT '触发器类名',
  5. `fire_params` varchar(255) DEFAULT '' COMMENT '需要传递的参数,格式自定',
  6. `interval_time` int(11) DEFAULT '0' COMMENT '执行间隔,单位:秒',
  7. `enable` tinyint(1) DEFAULT '1' COMMENT '是否启动,1启动,0禁止',
  8. `result` varchar(255) DEFAULT '' COMMENT '运行的结果,以json格式保存',
  9. `state` tinyint(1) DEFAULT '0' COMMENT '进程状态,0空闲,1运行中,-1异常退出',
  10. `last_fire_time` int(11) DEFAULT '0' COMMENT '上一次运行时间',
  11. PRIMARY KEY (`id`)
  12. ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;

对此表的关键字段说明如下:

字段 说明 示例
trigger_class 触发器的类名 须实现Task_Progress_Trigger::fire($params)接口
fire_params 触发器的参数 加传给Task_Progress_Trigger::fire()函数的参数,格式为:service&MQ类名&runner类名
interval_time 执行间隔 单位为秒
enable 是否启动 此字段禁止时,将不再执行
state 进程状态 当此状态一直为异常或者运行且超过1天时,系统会进行修复,即重置为空闲状态

其中,对于fire_params参数,MQ类名和runner类名可选,以下是一些示例:

  1. //示例1:完整的配置
  2. //fire_params=Task_Demo.DoSth&Task_MQ_DB&Task_Runner_Local
  3. $mq = new Task_MQ_DB();
  4. $runner = new Task_Runner_Local($mq);
  5. $runner->go('Task_Demo.DoSth');
  6. //示例2:使用默认的Runner
  7. //fire_params=Task_Demo.DoSth&Task_MQ_DB
  8. $mq = new Task_MQ_DB();
  9. $runner = new Task_Runner_Local($mq); //默认使用本地Runner
  10. $runner->go('Task_Demo.DoSth');
  11. //示例3:使用默认的MQ和默认的Runner
  12. //fire_params=Task_Demo.DoSth
  13. $mq = new Task_MQ_Redis(); //默认使用redis的MQ
  14. $runner = new Task_Runner_Local($mq); //默认使用本地Runner
  15. $runner->go('Task_Demo.DoSth');
  16. //示例4:使用自定义的MQ和Runner
  17. //fire_params=Task_Demo.DoSth&My_MQ&My_Runner
  18. class My_MQ implements Task_MQ {
  19. // ...
  20. }
  21. class My_Runner extends Task_Runner {
  22. // ...
  23. }
  24. $mq = new My_MQ();
  25. $runner = new My_Runner($mq);
  26. $runner->go('Task_Demo.DoSth');

2.15.5 运行效果

最终的效果就是,我们通过这样两行简单的代码,即可实现一系列复杂的任务调度:

  1. $progress = new Task_Progress();
  2. $progress->run();

让我们来看下这样设计的运行效果吧!看下这两行代码背后所产生的魔力。

首先,我们先添加两条计划任务:

  1. INSERT INTO `phalapi_task_progress` VALUES ('1', 'test demo', 'Task_Progress_Trigger_Common', 'Task_Demo.DoSth&Task_MQ_File&Task_Runner_Local', '300', '1', '', '0', '0');
  2. INSERT INTO `phalapi_task_progress` VALUES ('2', 'test ok', 'Task_Progress_Trigger_Common', 'Default.Index&Task_MQ_DB&Task_Runner_Local', '100', '1', '', '0', '0');

然后,伪造一些MQ:

  1. INSERT INTO `phalapi_task_mq_0` VALUES ('8', 'Default.Index', '', '0', '');

最后,生成单元测试:

  1. <?php
  2. class PhpUnderControl_TaskProgress_Test extends PHPUnit_Framework_TestCase
  3. {
  4. public $taskProgress;
  5. protected function setUp()
  6. {
  7. parent::setUp();
  8. $this->taskProgress = new Task_Progress();
  9. }
  10. /**
  11. * @group testRun
  12. */
  13. public function testRun()
  14. {
  15. $rs = $this->taskProgress->run();
  16. }
  17. }

并执行之:

  1. $ phpunit ./Task_Progress_Test.php
  2. [1 - 0.06666s]SELECT id, title FROM phalapi_task_progress WHERE (state != ?) AND (last_fire_time < ?) AND (enable = ?) ORDER BY last_fire_time ASC; -- 0, 1431965153, 1<br>
  3. [2 - 0.07002s]SELECT id, title, trigger_class, fire_params FROM phalapi_task_progress WHERE (state = 0) AND (interval_time + last_fire_time < ?) AND (enable = ?); -- 1432051553, 1<br>
  4. [3 - 0.06549s]SELECT enable, state FROM phalapi_task_progress WHERE (id = '1');<br>
  5. [4 - 0.07432s]UPDATE phalapi_task_progress SET state = 1 WHERE (id = '1');<br>
  6. [5 - 0.06469s]UPDATE phalapi_task_progress SET result = '{\"total\":0,\"fail\":0}', state = 0, last_fire_time = 1432051553 WHERE (id = '1');<br>
  7. [6 - 0.06746s]SELECT enable, state FROM phalapi_task_progress WHERE (id = '2');<br>
  8. [7 - 0.07043s]UPDATE phalapi_task_progress SET state = 1 WHERE (id = '2');<br>
  9. [8 - 0.06673s]SELECT id, params FROM phalapi_task_mq_0 WHERE (service = 'Default.Index') ORDER BY id ASC LIMIT 0,10;<br>
  10. [9 - 0.48185s]DELETE FROM phalapi_task_mq_0 WHERE (id IN ('8'));<br>
  11. [10 - 0.06514s]SELECT id, params FROM phalapi_task_mq_0 WHERE (service = 'Default.Index') ORDER BY id ASC LIMIT 0,10;<br>
  12. [11 - 0.50694s]UPDATE phalapi_task_progress SET result = '{\"total\":1,\"fail\":0}', state = 0, last_fire_time = 1432051553 WHERE (id = '2');<br>
  13. Time: 1.98 seconds, Memory: 6.50Mb
  14. OK (1 test, 0 assertions)

查看对比一下数据库,目前发现运行良好!

提交代码,保存文档,收工睡觉!

2.15.6 演进的乐趣

得益于前期良好的设计以及底层支持,我们发现,在提供这样一种统一的调度方式是非常方便的。

不仅如此,如果你明白了其中的设计,需要进行定制和扩展也是非常方便的。也就是说,我们不仅提供了一种具体实际可用的方式,也提供了广阔自由的扩展空间。具体与抽象,两者仍可得。

然而,这一切不仅依赖于良好的设计,还依赖于测试驱动开发下的浮现式设计。

原文: https://www.phalapi.net/wikis/2-15.html