队列

简介

Laravel 的队列服务为不同的队列后端系统提供了一套统一的 API 。队列允许你将一个耗时的任务进行延迟处理,例如像 e-mail 发送。这能让应用程序对页面的请求有更快的响应。

配置

队列的配置文件被保存在 config/queue.php 中。在这个文件内你可以找到包含在 Laravel 中的每一种队列驱动连接设置。它们包含了数据库、BeanstalkdIronMQAmazon SQSRedis 以及提供本机使用的 synchronous 驱动。

另外框架也提供了 null 这个队列驱动用来丢弃队列任务。

驱动的必要设置

数据库

要使用 database 这个队列驱动的话,则需要创建一个数据表来记住任务,你可以用 queue:table 这个 Artisan 命令来创建这个数据表的迁移。当迁移建好后,就可以用 migrate 这个命令来创建数据表。

  1. php artisan queue:table
  2. php artisan migrate

其它队列系统的依赖扩展包

在使用列表里的队列服务前,必须安装以下依赖扩展包:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~3.0
  • Redis: predis/predis ~1.0

编写任务类

生成任务类

在你的应用程序中,队列的任务类都默认放在 app/Jobs 目录下,你可以用以下的 Artisan 命令来生成一个新的队列任务:

  1. php artisan make:job SendReminderEmail

这个命令将会在 app/Jobs 下生成一个新的类,而这个类会实现 Illuminate\Contracts\Queue\ShouldQueue 接口,让 Laravel 知道这个任务应该是被放到队列里的,而不是要直接运行。

任务类结构

任务类的结构很简单,一般来说只会包含一个让队列用来调用此任务的 handle 方法。我们用以下这个类来做示范:

  1. <?php
  2. namespace App\Jobs;
  3. use App\User;
  4. use App\Jobs\Job;
  5. use Illuminate\Contracts\Mail\Mailer;
  6. use Illuminate\Queue\SerializesModels;
  7. use Illuminate\Queue\InteractsWithQueue;
  8. use Illuminate\Contracts\Queue\ShouldQueue;
  9. class SendReminderEmail extends Job implements ShouldQueue
  10. {
  11. use InteractsWithQueue, SerializesModels;
  12. protected $user;
  13. /**
  14. * 创建一个新的任务实例。
  15. *
  16. * @param User $user
  17. * @return void
  18. */
  19. public function __construct(User $user)
  20. {
  21. $this->user = $user;
  22. }
  23. /**
  24. * 运行任务。
  25. *
  26. * @param Mailer $mailer
  27. * @return void
  28. */
  29. public function handle(Mailer $mailer)
  30. {
  31. $mailer->send('emails.reminder', ['user' => $this->user], function ($m) {
  32. //
  33. });
  34. $this->user->reminders()->create(...);
  35. }
  36. }

注意,在这个例子中,我们在任务类的构造器中直接传递了一个 Eloquent 模型。因为我们在任务类里引用了 SerializesModels 这个 trait,使得 Eloquent 模型在处理任务时可以被优雅地序列化和反序列化。如果你的队列任务类在构造器中接收了一个 Eloquent 模型,那么只有可识别出该模型的属性会被序列化到队列里。当任务被实际运行时,队列系统便会自动从数据库中重新取回完整的模型。这整个过程对你的应用程序来说是完全透明的,这样可以避免在序列化完整的 Eloquent 模式实例时所带来的一些问题。

在队列处理任务时,会调用 handle 方法,而这里我们也可以通过 handle 方法的参数类型提示,让 Laravel 的 服务容器 自动注入依赖对象。

当发生错误的时候

如果在处理任务时抛出了一个异常,它将会自动被释放回队列里并再次尝试运行。当该任务一直出错时,它会不断被发布重试,直到超过应用程序所允许的最大重试值。最大重试值可以在运行 queue:listenqueue:work 命令时,用 --tries 选项来设置。关于运行队列侦听器的更多信息在稍后会有 详细说明

手动释放任务

生成出来的任务类引用了 InteractsWithQueue 这个 trait,它提供了 release 方法让我们可以手动释放任务。在 release 方法中可指定一个参数,表示此任务被重新运行之前所等待的秒数。

  1. public function handle(Mailer $mailer)
  2. {
  3. if (condition) {
  4. $this->release(10);
  5. }
  6. }

检查重试次数

如前面所提到的,当任务被处理时若发生一些异常,则任务将会被自动释放回队列中。这时候可以用 attempts 方法来检查重试次数:

  1. public function handle(Mailer $mailer)
  2. {
  3. if ($this->attempts() > 3) {
  4. //
  5. }
  6. }

将任务推送到队列上

app/Http/Controllers/Controller.php 中 Laravel 定义了一个默认控制器,它引用了 DispatchesJob 这个 trait。而这个 trait 提供了数个方法来方便你推送任务到队列上,例如 dispatch 方法:

  1. <?php
  2. namespace App\Http\Controllers;
  3. use App\User;
  4. use Illuminate\Http\Request;
  5. use App\Jobs\SendReminderEmail;
  6. use App\Http\Controllers\Controller;
  7. class UserController extends Controller
  8. {
  9. /**
  10. * 发送提醒的 e-mail 给指定用户。
  11. *
  12. * @param Request $request
  13. * @param int $id
  14. * @return Response
  15. */
  16. public function sendReminderEmail(Request $request, $id)
  17. {
  18. $user = User::findOrFail($id);
  19. $this->dispatch(new SendReminderEmail($user));
  20. }
  21. }

DispatchesJobs Trait

当然有时候你不一定是从应用程序的路由或控制器来派发任务的,因此你可以在应用程序中的任何类里引用 DispatchesJobs 这个 trait ,以便使用它的各种派发方法。以下就是使用该 trait 的类:

  1. <?php
  2. namespace App;
  3. use Illuminate\Foundation\Bus\DispatchesJobs;
  4. class ExampleClass
  5. {
  6. use DispatchesJobs;
  7. }

dispatch 辅助函数

或者你可以使用 dispatch 辅助方法:

  1. Route::get('/job', function () {
  2. dispatch(new App\Jobs\PerformTask);
  3. return 'Done!';
  4. });

指定任务所属的队列

你可以指定任务应该要送到哪一个队列上。

要推送任务到不同的队列上,你需要将任务先「分类」,甚至可能要指定每个队列能有多少作业器可以运行任务。这并不会推送任务到你在配置文件中所定义的不同队列「连接」上,而是会推送到某个有单个连接的队列。要指定任务运行的队列,可以用任务实例的 onQueue 方法。onQueueIlluminate\Bus\Queueable trait 所提供的方法,而它已经包含在 App\Jobs\Job 基类中:

  1. <?php
  2. namespace App\Http\Controllers;
  3. use App\User;
  4. use Illuminate\Http\Request;
  5. use App\Jobs\SendReminderEmail;
  6. use App\Http\Controllers\Controller;
  7. class UserController extends Controller
  8. {
  9. /**
  10. * 发送提醒的 e-mail 给指定用户。
  11. *
  12. * @param Request $request
  13. * @param int $id
  14. * @return Response
  15. */
  16. public function sendReminderEmail(Request $request, $id)
  17. {
  18. $user = User::findOrFail($id);
  19. $job = (new SendReminderEmail($user))->onQueue('emails');
  20. $this->dispatch($job);
  21. }
  22. }

注意: DispatchesJobs trait 会把任务推送给默认的队列连接。

为任务指派队列连接

如果你的应用需要同时使用多个队列连接,你可以使用任务实例的 onConnection 方法来指派队列连接,onConnectionIlluminate\Bus\Queueable trait 提供,App\Jobs\Job 基类里已经加了了这个 Trait。

  1. <?php
  2. namespace App\Http\Controllers;
  3. use App\User;
  4. use Illuminate\Http\Request;
  5. use App\Jobs\SendReminderEmail;
  6. use App\Http\Controllers\Controller;
  7. class UserController extends Controller
  8. {
  9. /**
  10. * 发送提醒的 e-mail 给指定用户。
  11. *
  12. * @param Request $request
  13. * @param int $id
  14. * @return Response
  15. */
  16. public function sendReminderEmail(Request $request, $id)
  17. {
  18. $user = User::findOrFail($id);
  19. $job = (new SendReminderEmail($user))->onConnection('alternate');
  20. $this->dispatch($job);
  21. }
  22. }

当然你可以使用链式调用的方式来设置 onConnection

  1. public function sendReminderEmail(Request $request, $id)
  2. {
  3. $user = User::findOrFail($id);
  4. $job = (new SendReminderEmail($user))
  5. ->onConnection('alternate')
  6. ->onQueue('emails');
  7. $this->dispatch($job);
  8. }

延迟性任务

有时你可能会希望队列任务能晚一点再运行,例如在用户注册 5 分钟后才通过队列任务发送提醒信件。这可以通过任务类引用的 Illuminate\Bus\Queueable 这个 trait 所提供的 delay 方法来实现:

  1. <?php
  2. namespace App\Http\Controllers;
  3. use App\User;
  4. use Illuminate\Http\Request;
  5. use App\Jobs\SendReminderEmail;
  6. use App\Http\Controllers\Controller;
  7. class UserController extends Controller
  8. {
  9. /**
  10. * 发送提醒的 e-mail 给指定用户。
  11. *
  12. * @param Request $request
  13. * @param int $id
  14. * @return Response
  15. */
  16. public function sendReminderEmail(Request $request, $id)
  17. {
  18. $user = User::findOrFail($id);
  19. $job = (new SendReminderEmail($user))->delay(60 * 5);
  20. $this->dispatch($job);
  21. }
  22. }

在这个例子里,我们指定该任务延迟 5 分钟后再运行。

注意:Amazon 的 SQS 服务最大的延迟时间是 15 分钟。

任务事件

任务完成事件

Queue::after 方法可以让你注册一个队列任务运行完成后会被执行的回调。在此回调进行额外纪录、队列后续任务、或为仪表板增加统计等操作都是很好的时机。举个例子,我们可以在 Laravel 所包含的 AppServiceProvider 中附加一个回调到此事件上:

  1. <?php
  2. namespace App\Providers;
  3. use Queue;
  4. use Illuminate\Support\ServiceProvider;
  5. use Illuminate\Queue\Events\JobProcessed;
  6. class AppServiceProvider extends ServiceProvider
  7. {
  8. /**
  9. * 启动所有应用程序服务。
  10. *
  11. * @return void
  12. */
  13. public function boot()
  14. {
  15. Queue::after(function (JobProcessed $event) {
  16. // $event->connectionName
  17. // $event->job
  18. // $event->data
  19. });
  20. }
  21. /**
  22. * 注册服务提供者。
  23. *
  24. * @return void
  25. */
  26. public function register()
  27. {
  28. //
  29. }
  30. }

运行队列侦听器

启动队列侦听器

Laravel 引入了一个 Artisan 命令,用来运行被推送到队列里的任务。你可以通过 queue:listen 命令来运行侦听器:

  1. php artisan queue:listen

你也可以指定侦听器应该利用哪一个队列进行连接:

  1. php artisan queue:listen connection-name

要注意的是,在这个工作命令启动后,它将会继续运行直到被手动停止为止。你可以利用像 Supervisor 这样的进程监控软件,来确保队列侦听器不会停止运行。

队列优先顺序

你可以给 listen 命令一个以逗号分隔的队列连接列表,来设置队列的优先顺序:

  1. php artisan queue:listen --queue=high,low

在这个例子里,在 high 这个队列中的任务永远会先被处理,然后才是 low 队列里的任务。

指定任务的超时参数

你还可以设置每个任务被允许运行的时间长度,单位是秒数:

  1. php artisan queue:listen --timeout=60

指定队列的休眠期

此外,你也可以指定队列要等待多少秒后才能取新的任务来运行:

  1. php artisan queue:listen --sleep=5

注意这里是指队列在没有任务的状态下才会休眠。如果已经有多个任务卡在这个队列上,那么它会持续运行而不会休眠。

处理队列里面的第一个任务

你可以使用 queue:work 命令来处理队列里面的第一个任务:

  1. php artisan queue:work

Supervisor 设置

Supervisor 是一个 Linux 操作系统上的进程监控软件,它会在 queue:listenqueue:work 命令发生失败后自动重启它们。要在 Ubuntu 安装 Supervisor,可以用以下命令:

  1. sudo apt-get install supervisor

Supervisor 的配置文件一般是放在 /etc/supervisor/conf.d 目录下,在这个目录中你可以创建任意数量的配置文件来要求 Supervisor 监控你的进程。例如我们创建一个 laravel-worker.conf 来启动与监控一个 queue:work 进程:

  1. [program:laravel-worker]
  2. process_name=%(program_name)s_%(process_num)02d
  3. command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --daemon
  4. autostart=true
  5. autorestart=true
  6. user=forge
  7. numprocs=8
  8. redirect_stderr=true
  9. stdout_logfile=/home/forge/app.com/worker.log

这个例子里的 numprocs 命令会要求 Supervisor 运行并监控 8 个 queue:work 进程,并且在它们运行失败后重新启动。当然,你必须更改 command 命令的 queue:work sqs,以显示你所选择的队列驱动。

当这个配置文件被创建后,你需要更新 Supervisor 的设置,并用以下命令来启动该进程:

  1. sudo supervisorctl reread
  2. sudo supervisorctl update
  3. sudo supervisorctl start laravel-worker:*

更多有关 Supervisor 的设置与使用,请参考 Supervisor 官方文档。或是使用 Laravel Forge 所提供的 Web 接口,来自动设置与管理你的 Supervisor 设置。

将任务侦听器设为后台服务

queue:work Artisan 命令里包含了 --daemon 选项,强制队列服务器持续处理任务,而不需要重新启动整个框架。比起 queue:listen 命令,这将明显的减少 CPU 的用量。

--daemon 旗标在后台服务模式下启动队列服务器:

  1. php artisan queue:work connection --daemon
  2. php artisan queue:work connection --daemon --sleep=3
  3. php artisan queue:work connection --daemon --sleep=3 --tries=3

如你所见,queue:work 命令提供了多个和 queue:listen 命令相同的选项。你可以用 php artisan help queue:work 命令来查看所有可用的选项。

在后台服务的队列侦听器中开发时所要考量的事项

在后台运行的队列侦听器在处理完每个任务前不会重新启动框架。因此你应该在任务运行完成前,谨慎地释放所有内存占用较高的资源。例如你利用 GD 函数库来处理图片,就要在结束前用 imagedestroy 来释放内存。

在后台任务侦听器进行布署

由于后台队列工作器是长驻进程,因此除非队列重新启动,否则任何代码上的任何修改都不会被应用。所以要布署一个有用到后台队列服务器的应用程序,最简单的方式就是在布署脚本文件中对作业器进行重启。你可以在布署命令中加入以下命令,来优雅地重启所有作业器:

  1. php artisan queue:restart

这个命令会优雅地告知所有队列服务器,在它们处理完当前任务后重新启动,这样就不会有任务遗失的问题。

注意:这个命令依靠缓存系统来安排任务的重新启动。默认状况下,APCu 无法在 CLI 的任务上运作。如果你正在使用 APCu 的话,请把 apc.enable_cli=1 加到你的 APCu 设置里。

处理失败的任务

计划永远赶不上变化,有时候你的队列任务就是会失败。不过别担心,我们有准备好它发生时的应对方法。Laravel 有个很方便的方式可以指定任务的最大重试次数。当任务运行超过该重试次数时,它就会被写入至 failed_jobs 这个数据表中。而失败任务的名称可以在 config/queue.php 这个配置文件中设置。

要创建 failed_jobs 数据表的迁移类,你可以用 queue:failed-table 命令:

  1. php artisan queue:failed-table

当你运行 队列侦听器 时,可以用 queue:listen 命令的 --tries 参数来指定任务的最大重试次数:

  1. php artisan queue:listen connection-name --tries=3

任务失败事件

如果你想注册一个当队列任务失败时会被调用的事件,则可以用 Queue::failing 方法。这样你就有机会通过这个事件来用 e-mail 或 HipChat 通知你的团队。例如我们可以在 Laravel 内置的 AppServiceProvider 中对这个事件附加一个回调函数:

  1. <?php
  2. namespace App\Providers;
  3. use Queue;
  4. use Illuminate\Queue\Events\JobFailed;
  5. use Illuminate\Support\ServiceProvider;
  6. class AppServiceProvider extends ServiceProvider
  7. {
  8. /**
  9. * 启动任何应用程序的服务。
  10. *
  11. * @return void
  12. */
  13. public function boot()
  14. {
  15. Queue::failing(function (JobFailed $event) {
  16. // $event->connectionName
  17. // $event->job
  18. // $event->data
  19. });
  20. }
  21. /**
  22. * 注册服务容器。
  23. *
  24. * @return void
  25. */
  26. public function register()
  27. {
  28. //
  29. }
  30. }

在任务类里处理失败的方法

如果想有拥有更精细的控制,则可以在在任务类里直接定义一个 failed 方法,这个方法允许你指定错误发生时该执行哪些动作。

  1. <?php
  2. namespace App\Jobs;
  3. use App\Jobs\Job;
  4. use Illuminate\Contracts\Mail\Mailer;
  5. use Illuminate\Queue\SerializesModels;
  6. use Illuminate\Queue\InteractsWithQueue;
  7. use Illuminate\Contracts\Queue\ShouldQueue;
  8. class SendReminderEmail extends Job implements ShouldQueue
  9. {
  10. use InteractsWithQueue, SerializesModels;
  11. /**
  12. * 运行任务。
  13. *
  14. * @param Mailer $mailer
  15. * @return void
  16. */
  17. public function handle(Mailer $mailer)
  18. {
  19. //
  20. }
  21. /**
  22. * 处理一个失败的任务
  23. *
  24. * @return void
  25. */
  26. public function failed()
  27. {
  28. // Called when the job is failing...
  29. }
  30. }

重新尝试运行失败任务

要查看你在 failed_jobs 数据表中的所有失败任务,则可以用 queue:failed 这个 Artisan 命令:

  1. php artisan queue:failed

queue:failed 命令会列出所有任务的 ID、连接、队列以及失败时间,任务 ID 可以被用在重试失败的任务上。例如要重试一个 ID 为 5 的失败任务,其命令如下:

  1. php artisan queue:retry 5

要重试所有失败的任务,可以使用 queue:retry 并使用 all 作为 ID:

  1. php artisan queue:retry all

如果你想删除掉一个失败任务,可以用 queue:forget 命令:

  1. php artisan queue:forget 5

queue:flush 命令可以让你删除所有失败的任务:

  1. php artisan queue:flush

{note} 欢迎任何形式的转载,但请务必注明出处,尊重他人劳动共创开源社区。

转载请注明:本文档由 Laravel China 社区 [laravel-china.org] 组织翻译。

文档永久地址: http://d.laravel-china.org