Queues

介绍

提示: 现在,Laravel 为你的 Redis 队列 提供了 Horizon,一个漂亮的仪表盘和配置系统。查看完整的 [Horizon 文档] Horizon documentation 了解更多信息。

Laravel队列为各种不同的队列后端(如Beanstalk、Amazon SQS、Redis甚至关系数据库)提供了的统一API。通过队列,你可以将耗时任务(如发送电子邮件)的处理往后推延。延迟这些耗时的任务可以极大地提升web请求响应速度。

队列配置文件存储在 config/queue.php 中。在这个文件中,你可以找到框架中包含的每个队列驱动程序的连接配置,其中包括数据库、BeanstalkdAmazon SQSRedis,和一个同步驱动程序,该同步驱动程序将立即执行作业(供本地使用)。还包括一个用于丢弃排队任务的 null 队列驱动。

链接 Vs. 队列

在开始使用Laravel队列之前,理解连接队列之间的区别非常重要。在 config/queue.php 配置文件中,有一个 connections 配置选项。此选项定义到后端服务(如Amazon SQS、Beanstalk或Redis)的特定连接。然而,任何给定的队列连接都可能有多个队列,这些队列可能被认为是不同的堆栈或成堆的排队任务。

请注意, queue 配置文件中的每个连接配置示例都包含一个 queue 属性。这是将任务发送到给定连接时将被分配到的默认队列。换句话说,如果您没有显式地定义任务应该被发送到哪个队列,那么该任务将被放置在连接配置的 queue 属性中定义的队列上:

  1. // 这个任务将被推送到默认队列...
  2. Job::dispatch();
  3. // 这个任务将被推送到 "emails" 队列...
  4. Job::dispatch()->onQueue('emails');

有些应用程序可能不需要将任务推到多个队列中,而是倾向于使用一个简单的队列。然而,如果希望对任务的处理方式进行优先级排序或分段时,将任务推送到多个队列就显得特别有用,因为Laravel队列工作程序允许您指定哪些队列应该按优先级处理。例如,如果您将任务推送到一个 high 队列,你可能会运行一个赋予它们更高处理优先级的worker:

  1. php artisan queue:work --queue=high,default

驱动程序说明和先决条件

数据库

要使用 database 队列驱动程序,你需要一个数据库表来保存任务。要生成创建此表的迁移,请运行 queue:table Artisan命令。一旦迁移已经创建,你可以使用 migrate 命令迁移你的数据库:

  1. php artisan queue:table
  2. php artisan migrate

Redis

要使用’ redis ‘队列驱动程序,需要在 config/database.php 配置文件中配置一个redis数据库连接

Redis集群

如果你的Redis队列连接使用一个Redis集群,那么你的队列名称就必须包含一个key hash tag。这是为了确保一个给定队列的所有Redis键都被放在同一个哈希槽:

  1. 'redis' => [
  2. 'driver' => 'redis',
  3. 'connection' => 'default',
  4. 'queue' => '{default}',
  5. 'retry_after' => 90,
  6. ],

阻塞

在使用Redis队列时,您可以使用 block_for 配置选项来指定在遍历worker循环和重新轮询Redis数据库之前,驱动程序需要等待多长时间才能使任务变得可用。

根据您的队列负载调整此值要比连续轮询Redis数据库中的新任务更加有效。例如,您可以将值设置为5,以指示驱动程序在等待作业变得可用时应该阻塞5秒:

  1. 'redis' => [
  2. 'driver' => 'redis',
  3. 'connection' => 'default',
  4. 'queue' => 'default',
  5. 'retry_after' => 90,
  6. 'block_for' => 5,
  7. ],

注意: 将 block_for 设置为 0 将导致队列workers一直阻塞,直到某一个任务变得可用。这还能防止在下一个任务被处理之前处理诸如 SIGTERM 之类的信号。

其他驱动的先决条件

列出的队列驱动程序需要以下依赖项:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~4.0
  • Redis: predis/predis ~1.0 or phpredis PHP extension

创建任务

生成任务类

默认情况下,应用程序的所有可排队任务都存储在 app/Jobs 目录下。如果 app/Jobs 目录不存在,则会在运行 make:job Artisan命令时将创建它。你可以使用Artisan CLI生成一个新的队列任务:

  1. php artisan make:job ProcessPodcast

生成的类要实现 Illuminate\Contracts\Queue\ShouldQueue接口,告诉Laravel任务应该推入队列以异步方式运行。

提示: 任务stubs可以使用stub publishing来自定义

类结构

任务类非常简单,通常只包含一个 handle 方法,该方法在队列处理任务时调用。首先,让我们看一个示例任务类。在这个例子中,我们将假设我们管理一个podcast发布服务,并且需要在上传的podcast文件发布之前对其进行处理:

  1. <?php
  2. namespace App\Jobs;
  3. use App\AudioProcessor;
  4. use App\Podcast;
  5. use Illuminate\Bus\Queueable;
  6. use Illuminate\Contracts\Queue\ShouldQueue;
  7. use Illuminate\Foundation\Bus\Dispatchable;
  8. use Illuminate\Queue\InteractsWithQueue;
  9. use Illuminate\Queue\SerializesModels;
  10. class ProcessPodcast implements ShouldQueue
  11. {
  12. use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
  13. protected $podcast;
  14. /**
  15. * 创建一个新的job实例
  16. *
  17. * @param Podcast $podcast
  18. * @return void
  19. */
  20. public function __construct(Podcast $podcast)
  21. {
  22. $this->podcast = $podcast;
  23. }
  24. /**
  25. * 执行job
  26. *
  27. * @param AudioProcessor $processor
  28. * @return void
  29. */
  30. public function handle(AudioProcessor $processor)
  31. {
  32. // Process uploaded podcast...
  33. }
  34. }

在本例中,请注意我们能够将一个Eloquent model直接传递到已排队任务的构造函数中。由于任务所使用的 SerializesModels trait,在任务处理时,Eloquent模型及其加载的关系将被优雅地序列化和非序列化。如果你的队列任务在其构造函数中接受一个Eloquent模型,那么只有模型的标识符才会被序列化到队列中。当实际处理任务时,队列系统将自动重新从数据库中获取完整的模型实例及其加载的关系。它对你的应用程序来说是完全透明的,并且可以防止在序列化完整的Eloquent模型实例时可能出现的问题。

当任务由队列处理时,将调用 handle 方法。注意,我们可以对任务的 handle 方法进行类型提示。Laravel服务容器会自动注入这些依赖项。

如果您想完全控制容器如何将依赖注入 handle 方法,你可以使用容器的 bindMethod 方法。bindMethod 方法接受一个回调,该回调接收任务和容器。在回调中,你可以随意调用 handle 方法。通常,您应该从服务提供者中调用此方法:

  1. use App\Jobs\ProcessPodcast;
  2. $this->app->bindMethod(ProcessPodcast::class.'@handle', function ($job, $app) {
  3. return $job->handle($app->make(AudioProcessor::class));
  4. });

注意: 二进制数据,例如原始图像内容,应该在传递到队列任务之前通过 base64_encode 函数传递。否则,在将任务放入队列时,可能无法正确地序列化为JSON。

处理关联关系

因为加载的关联关系也会被序列化,所以序列化的任务字符串可能会变得非常大。为了防止关系被序列化,您可以在设置属性值时调用模型上的 withoutRelations 方法。这个方法会返回一个没有加载关系的模型实例:

  1. /**
  2. * 创建一个新的任务实例
  3. *
  4. * @param \App\Podcast $podcast
  5. * @return void
  6. */
  7. public function __construct(Podcast $podcast)
  8. {
  9. $this->podcast = $podcast->withoutRelations();
  10. }

任务中间件

任务中间件允许你围绕排队任务的执行封装自定义逻辑,从而减少了任务本身的样板代码。例如,看下面的 handle 方法,它利用了Laravel的Redis速率限制特性,允许每5秒只处理一个任务:

  1. /**
  2. * 执行任务.
  3. *
  4. * @return void
  5. */
  6. public function handle()
  7. {
  8. Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
  9. info('Lock obtained...');
  10. // Handle job...
  11. }, function () {
  12. // Could not obtain lock...
  13. return $this->release(5);
  14. });
  15. }

虽然这段代码是有效的, 但是handle 方法的结构却变得杂乱,因为它是杂乱的Redis速率限制逻辑。此外,必须将此速率限制逻辑复制到我们希望对其进行速率限制的任何其他任务中。

我们可以定义一个处理速率限制的任务中间件,而不是在 handle 方法中定义速率限制。Laravel没有任务中间件的默认位置,所以你最好将任务中间件放置在应用程序中的任何位置。在本例中,我们将把中间件放在 app/Jobs/Middleware 目录下:

  1. <?php
  2. namespace App\Jobs\Middleware;
  3. use Illuminate\Support\Facades\Redis;
  4. class RateLimited
  5. {
  6. /**
  7. * 处理队列任务
  8. *
  9. * @param mixed $job
  10. * @param callable $next
  11. * @return mixed
  12. */
  13. public function handle($job, $next)
  14. {
  15. Redis::throttle('key')
  16. ->block(0)->allow(1)->every(5)
  17. ->then(function () use ($job, $next) {
  18. // Lock obtained...
  19. $next($job);
  20. }, function () use ($job) {
  21. // Could not obtain lock...
  22. $job->release(5);
  23. });
  24. }
  25. }

正如你所看到的,像路由中间件一样,任务中间件接收正在处理的任务,并且应该调用一个回调来继续处理任务。

在创建任务中间件之后,可以通过从任务的 middleware 方法返回它们来连接到任务。这个方法并不存在于 make:job Artisan命令搭建的任务中,所以你需要将它添加到你自己的任务类的定义中:

  1. use App\Jobs\Middleware\RateLimited;
  2. /**
  3. * 获取任务应该通过的中间件
  4. *
  5. * @return array
  6. */
  7. public function middleware()
  8. {
  9. return [new RateLimited];
  10. }

调度任务

一旦编写了任务类,就可以使用作业本身的 dispatch 方法来分派它。传递给 dispatch 方法的参数将被传递给任务的构造函数:

  1. <?php
  2. namespace App\Http\Controllers;
  3. use App\Http\Controllers\Controller;
  4. use App\Jobs\ProcessPodcast;
  5. use Illuminate\Http\Request;
  6. class PodcastController extends Controller
  7. {
  8. /**
  9. * 存储一个新的广播
  10. *
  11. * @param Request $request
  12. * @return Response
  13. */
  14. public function store(Request $request)
  15. {
  16. // Create podcast...
  17. ProcessPodcast::dispatch($podcast);
  18. }
  19. }

如果你希望有条件地分派任务,可以使用 dispatchIfdispatchUnless 方法:

  1. ProcessPodcast::dispatchIf($accountActive = true, $podcast);
  2. ProcessPodcast::dispatchUnless($accountSuspended = false, $podcast);

延迟调度

如果你想延迟队列任务的执行,你可以在调度任务时使用 delay 方法。例如,让我们指定一个任务在发出10分钟后才能进行处理:

  1. <?php
  2. namespace App\Http\Controllers;
  3. use App\Http\Controllers\Controller;
  4. use App\Jobs\ProcessPodcast;
  5. use Illuminate\Http\Request;
  6. class PodcastController extends Controller
  7. {
  8. /**
  9. * 存储一个新的podcast
  10. *
  11. * @param Request $request
  12. * @return Response
  13. */
  14. public function store(Request $request)
  15. {
  16. // 创建podcast...
  17. ProcessPodcast::dispatch($podcast)
  18. ->delay(now()->addMinutes(10));
  19. }
  20. }

注意: Amazon SQS 队列服务的最大延迟时间为15分钟。

响应发送到浏览器后的调度

另外, dispatchAfterResponse 方法会延迟发送任务,直到将响应发送到用户的浏览器之后。这仍然允许用户开始使用应用程序,即使队列任务仍然在执行。这通常只适用于需要1秒钟的任务,比如发送电子邮件:

  1. use App\Jobs\SendNotification;
  2. SendNotification::dispatchAfterResponse();

你可以 dispatch 一个闭包,并把 afterResponse 方法链到帮助程序上,在响应发送到浏览器后执行闭包:

  1. use App\Mail\WelcomeMessage;
  2. use Illuminate\Support\Facades\Mail;
  3. dispatch(function () {
  4. Mail::to('taylor@laravel.com')->send(new WelcomeMessage);
  5. })->afterResponse();

同步调度

如果您想要立即(同步地)调度任务,您可以使用 dispatchNow 方法。当使用此方法时,任务将不会排队,并将立即运行在当前进程:

  1. <?php
  2. namespace App\Http\Controllers;
  3. use App\Http\Controllers\Controller;
  4. use App\Jobs\ProcessPodcast;
  5. use Illuminate\Http\Request;
  6. class PodcastController extends Controller
  7. {
  8. /**
  9. * 存储一个新的podcast
  10. *
  11. * @param Request $request
  12. * @return Response
  13. */
  14. public function store(Request $request)
  15. {
  16. // 创建 podcast...
  17. ProcessPodcast::dispatchNow($podcast);
  18. }
  19. }

任务链接

任务链接允许您指定一组应在主任务成功执行后按顺序运行的排队任务。如果序列中的一个任务失败,其余的任务将不会运行。要执行一个排队任务链,你可以在你的任何可调度任务上使用 withChain 方法:

  1. ProcessPodcast::withChain([
  2. new OptimizePodcast,
  3. new ReleasePodcast
  4. ])->dispatch();

除了链接作任务实例,你还可以链接闭包:

  1. ProcessPodcast::withChain([
  2. new OptimizePodcast,
  3. new ReleasePodcast,
  4. function () {
  5. Podcast::update(...);
  6. },
  7. ])->dispatch();

注意:使用 $this->delete() 方法删除作业不会阻止已被链接的任务被处理。只有当链中的任务失败时,该链才会停止执行。

链连接 & 队列

如果你想指定应该用于已连接任务的默认连接和队列,可以使用 allOnConnectionallOnQueue 方法。这些方法指定了应该使用的队列连接和队列名称,除非队列任务被显式地分配了一个不同的连接 / 队列:

  1. ProcessPodcast::withChain([
  2. new OptimizePodcast,
  3. new ReleasePodcast
  4. ])->dispatch()->allOnConnection('redis')->allOnQueue('podcasts');

自定义队列和连接

调度到特定队列

通过将任务推到不同的队列,你可以对排队的任务进行分类,甚至可以对分配给不同队列的任务进行优先排序。请记住,这并不是将任务推到你的队列配置文件定义的不同队列连接,而是仅推到单个连接中的特定队列。若要指定队列,请在分派任务时使用 onQueue 方法:

  1. <?php
  2. namespace App\Http\Controllers;
  3. use App\Http\Controllers\Controller;
  4. use App\Jobs\ProcessPodcast;
  5. use Illuminate\Http\Request;
  6. class PodcastController extends Controller
  7. {
  8. /**
  9. * 存储一个新的 podcast.
  10. *
  11. * @param Request $request
  12. * @return Response
  13. */
  14. public function store(Request $request)
  15. {
  16. // 创建 podcast...
  17. ProcessPodcast::dispatch($podcast)->onQueue('processing');
  18. }
  19. }

发送到特定连接

如果你正在使用多个队列连接,可以指定将任务推送到哪个连接。要指定连接,在调度作业时使用 onConnection 方法:

  1. <?php
  2. namespace App\Http\Controllers;
  3. use App\Http\Controllers\Controller;
  4. use App\Jobs\ProcessPodcast;
  5. use Illuminate\Http\Request;
  6. class PodcastController extends Controller
  7. {
  8. /**
  9. * 创建一个新的 podcast.
  10. *
  11. * @param Request $request
  12. * @return Response
  13. */
  14. public function store(Request $request)
  15. {
  16. // 创建 podcast...
  17. ProcessPodcast::dispatch($podcast)->onConnection('sqs');
  18. }
  19. }

你可以使用 onConnectiononQueue 方法来指定任务的连接和队列:

  1. ProcessPodcast::dispatch($podcast)
  2. ->onConnection('sqs')
  3. ->onQueue('processing');

或者,你也可以指定 connection 作为任务类的一个属性:

  1. <?php
  2. namespace App\Jobs;
  3. class ProcessPodcast implements ShouldQueue
  4. {
  5. /**
  6. * 应该处理任务的队列连接
  7. *
  8. * @var string
  9. */
  10. public $connection = 'sqs';
  11. }

指定任务最大尝试次数 / 超时值

最大尝试次数

指定任务可尝试的最大次数的其中一个方法是,通过Artisan命令行上的 —tries 开关:

  1. php artisan queue:work --tries=3

但是,可以采用更细粒度的方法:定义任务类本身的最大尝试次数。如果在任务类上指定了最大尝试次数,它将优先于命令行上提供的值:

  1. <?php
  2. namespace App\Jobs;
  3. class ProcessPodcast implements ShouldQueue
  4. {
  5. /**
  6. * 任务可尝试的次数
  7. *
  8. * @var int
  9. */
  10. public $tries = 5;
  11. }

基于时间的“尝试”

除了定义任务失败前尝试的次数之外,还可以定义任务应该超时的时间。这允许在给定的时间范围内尝试任意次数的任务。要定义任务超时的时间,请在任务类中添加 retryUntil 方法:

  1. /**
  2. * 确定作业应该超时的时间
  3. *
  4. * @return \DateTime
  5. */
  6. public function retryUntil()
  7. {
  8. return now()->addSeconds(5);
  9. }

提示:你也可以在队列事件监听器上定义一个 retryUntil 方法。

最大异常数

有时,你可能希望指定某个任务可尝试很多次,但如果重试次数超过了给定数量,触发了异常,则该任务应该失败。为了实现这一点,你可以在你的任务类中定义一个 maxExceptions 属性:

  1. <?php
  2. namespace App\Jobs;
  3. class ProcessPodcast implements ShouldQueue
  4. {
  5. /**
  6. * 任务可尝试的次数
  7. *
  8. * @var int
  9. */
  10. public $tries = 25;
  11. /**
  12. * 任务失败前允许的最大异常数
  13. *
  14. * @var int
  15. */
  16. public $maxExceptions = 3;
  17. /**
  18. * 执行任务
  19. *
  20. * @return void
  21. */
  22. public function handle()
  23. {
  24. Redis::throttle('key')->allow(10)->every(60)->then(function () {
  25. // 获得锁,处理podcast...
  26. }, function () {
  27. // 无法获得锁...
  28. return $this->release(10);
  29. });
  30. }
  31. }

在本例中,如果应用程序无法获得Redis锁,并且继续重试次数达25次,则任务将被释放10秒钟。但是,如果任务抛出三个未处理的异常,则任务将失败。

超时

注意:timeout 功能针对PHP 7.1+和 pcntl PHP扩展进行了优化。

同样,任务可以运行的最大秒数可以使用Artisan命令行上的—timeout 开关来指定:

  1. php artisan queue:work --timeout=30

但是,你也可以定义允许任务在任务类本身上运行的最大秒数。如果在任务上指定了超时,它将优先于在命令行上指定的任何超时:

  1. <?php
  2. namespace App\Jobs;
  3. class ProcessPodcast implements ShouldQueue
  4. {
  5. /**
  6. * 在超时之前任务可以运行的秒数
  7. *
  8. * @var int
  9. */
  10. public $timeout = 120;
  11. }

速率限制

注意: 该特性要求你的应用程序可以与[Redis服务器]Redis server交互。

如果你的应用程序与Redis交互,你可能会根据时间或并发性限制排队任务。当排队的任务与同样有速率限制的api交互时,此特性可以派上用场。

例如,使用 throttle 方法,您可以将给定类型的作业限制为每60秒只运行10次。如果无法获得锁,通常应将任务释放回队列,以便稍后重试:

  1. Redis::throttle('key')->allow(10)->every(60)->then(function () {
  2. // 任务逻辑...
  3. }, function () {
  4. // 无法获得锁...
  5. return $this->release(10);
  6. });

提示:在上面的示例中, key 可以是唯一标识你希望对其进行速率限制的任务类型的任何字符串。例如,你可能希望基于任务的类名和它所操作的 Eloquent 模型的id来构造密钥。

注意: 将一个已被限流的任务释放回队列仍然会增加该任务的attempts的总数。

或者,你可以指定可以同时处理给定任务的worker的最大数量。当队列作业正在修改一个每次只能修改一个任务的资源时,这是很有用的。例如,使用 funnel方法,你可以限制一个给定类型的任务一次只能由一个worker处理:

  1. Redis::funnel('key')->limit(1)->then(function () {
  2. // 任务逻辑...
  3. }, function () {
  4. // 无法获得锁...
  5. return $this->release(10);
  6. });

提示:在使用速率限制时,很难确定任务成功运行所需的尝试次数。因此,将速率限制与基于时间的尝试结合起来是很有用的。

错误处理

如果在处理任务时抛出异常,则任务将自动释放回队列,以便再次尝试。直到它被尝试的次数达到你的申请允许的最大次数,该任务才将继续被释放。最大尝试次数由 queue:work Artisan命令上使用的 —tries 开关定义。或者,可以在任务类本身上定义尝试的最大次数。有关运行队列worker的更多信息可以在下面找到

排队闭包

你也可以发送闭包,而不是向队列发送任务类。这对于需要在当前请求周期之外执行的快速、简单的任务非常有用:

  1. $podcast = App\Podcast::find(1);
  2. dispatch(function () use ($podcast) {
  3. $podcast->publish();
  4. });

当将闭包分派到队列时,闭包的代码内容是加密签名的,因此不能在传输过程中修改它。

运行队列处理器(worker)

Laravel包含一个队列工作器,它将在新任务被推入队列时处理它们。您可以使用 queue:work Artisan命令运行worker。请注意,一旦 queue:work 命令已经启动,它将一直运行,直到它被手动停止或你关闭你的终端:

  1. php artisan queue:work

提示: 为了让 queue:work 进程永久地在后台运行,您应该使用一个进程监视器,如Supervisor,以确保队列worker不会停止运行。

请记住,队列worker是长生命周期的进程,并将启动的应用程序状态存储在内存中。因此,在启动它们之后,代码库中的更改对其不起作用。因此,在部署过程中,一定要重新启动你的队列worker。此外,请记住,应用程序创建或修改的任何静态状态不会在任务之间自动重置。

或者,你可以运行 queue:listen 命令。当使用 queue:listen 命令时,当你想要重新加载更新的代码或重置应用程序状态时,你不必手动重新启动worker;但是,这个命令的效率不如queue:work:

  1. php artisan queue:listen

指定连接 & 队列

你还可以指定worker应该使用哪个队列连接。传递给 work 命令的连接名应该与 config/queue.php 配置文件中定义的一个连接相对应:

  1. php artisan queue:work redis

你甚至可以通过仅处理特定连接的特定队列来进一步定制你的队列任务worker。例如,如果你所有的电子邮件都在你的 redis 队列连接的 emails队列中处理,你可以发出以下命令来启动一个只处理该队列的worker:

  1. php artisan queue:work redis --queue=emails

处理单个任务

—once 选项可以用来命令worker从队列中只处理一个任务:

  1. php artisan queue:work --once

处理所有排队的任务 & 然后退出

可以使用 —stop-when-empty 选项命令worker处理所有任务,然后优雅地退出。如果你想在队列为空时关闭容器,这个选项在处理Docker容器中的Laravel队列时非常有用:

  1. php artisan queue:work --stop-when-empty

资源方面的考虑

守护进程队列worker程序在处理每个任务之前不会重新启动框架。因此,你应该在每个任务完成后释放所有繁重的资源。例如,如果你正在使用GD库进行图像处理,那么在完成之后,应该使用 imagedestroy 来释放内存。

队列优先级

有时,你可能希望优先考虑如何处理队列。例如,在 config/queue.php 中,你可以将你的 redis 连接的默认 queue 设置为 low。然而,有时你可能希望将一个任务推到一个 high 优先级队列,就像这样:

  1. dispatch((new Job)->onQueue('high'));

要启动一个worker,它在继续执行 high 队列上的任何作业之前,验证所有的 high 队列任务都被处理了,请将一个以逗号分隔的队列名称列表传递给 work 命令:

  1. php artisan queue:work --queue=high,low

队列worker & 部署

因为队列worker是长生命周期的进程,所以在重启之前,任何的代码更改都不会生效。因此,使用队列worker部署应用程序的最简单方法是在部署过程中重新启动worker。你可以通过执行 queue:restart 命令来优雅地重新启动所有的worker:

  1. php artisan queue:restart

该命令将指示所有队列worker在完成当前任务后优雅地“死亡”,这样就不会丢失现有的任务。由于在执行 queue:restart 命令时,队列worker将被杀掉,因此你应该运行一个进程管理器(如Supervisor)来自动重新启动队列worker。

提示: 队列使用缓存 来存储重启信号,因此在使用该特性之前,你应该检查应用程序的缓存驱动程序是否正确配置。

任务到期 & 超时

任务到期

在你的 config/queue.php 配置文件,每个队列连接定义一个 retry_after 选项。此选项指定在重试正在处理的任务之前,队列连接应等待多少秒。例如,如果 retry_after 的值被设置为 90,那么如果任务已经处理了90秒而没有被删除,那么它将被释放回队列。通常,您应该将 retry_after 值设置为你的任务完成处理所需的最大秒数。

注意: 唯一不包含 retry_after 值的队列连接是 Amazon SQS。SQS将基于在AWS控制台中管理的默认可见性超时 重试任务。

Worker 超时

queue:work Artisan命令暴露一个 —timeout 选项。—timeout 选项指定在杀死正在处理作业的子队列worker之前,Laravel队列主进程将等待多长时间。有时,由于各种原因,子队列进程可能会被“冻结”。 —timeout 选项用来删除超过指定时间限制的冻结进程:

  1. php artisan queue:work --timeout=60

retry_after 配置选项和 —timeout CLI选项是不同的,但它们共同确保不会丢失任务,并且任务只被成功处理一次。

注意: —timeout 值应该总是比 retry_after 配置值至少短几秒。这将确保处理给定任务的worker总是在重试作业之前被杀死。如果你的 —timeout 选项比你的 retry_after 配置值长,你的任务可能会被处理两次。

worker休眠时间

当任务在队列中可用时,worker将继续处理任务,中间没有任何延迟。然而, sleep 选项决定了如果没有新的worker可用,worker将”sleep”多长时间(以秒为单位)。在休眠时,worker将不处理任何新任务 —— 这些任务将在worker再次醒来后处理。

  1. php artisan queue:work --sleep=3

Supervisor 配置

安装 Supervisor

Supervisor是一个用于Linux操作系统的进程监视器,如果 queue:work 进程失败,它将自动重启该进程。要在Ubuntu上安装Supervisor,你可以使用以下命令:

  1. sudo apt-get install supervisor

提示:如果你觉得自己配置Supervisor很困难,可以考虑使用Laravel Forge,它将自动为你的Laravel项目安装和配置Supervisor。

配置 Supervisor

Supervisor 配置文件通常存储在 /etc/supervisor/conf.d 目录。在此目录中,你可以创建任意数量的配置文件,这些配置文件将目录supervisor如何监视你的进程。例如,让我们创建一个 laravel-worker.conf 文件,启动并监视 queue:work 进程:

  1. [program:laravel-worker]
  2. process_name=%(program_name)s_%(process_num)02d
  3. command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
  4. autostart=true
  5. autorestart=true
  6. user=forge
  7. numprocs=8
  8. redirect_stderr=true
  9. stdout_logfile=/home/forge/app.com/worker.log
  10. stopwaitsecs=3600

在本例中, numprocs 指令将指示监控器运行8个 queue:work 进程并监视所有进程,如果它们失败,将自动重新启动它们。你应该更改 command 指令的 queue:work sqs 部分,以反映所需的队列连接。

注意:应该确保 stopwaitsecs 的值大于运行时间最长的任务所消耗的秒数。否则,Supervisor可能会在任务完成前终止任务。

启动 Supervisor

创建了配置文件后,你可以使用以下命令更新Supervisor配置并启动进程:

  1. sudo supervisorctl reread
  2. sudo supervisorctl update
  3. sudo supervisorctl start laravel-worker:*

有关Supervisor的更多信息,请参考 Supervisor documentation

处理失败任务

有时你排队的任务会失败。别担心,事情并不总是按计划进行!Laravel提供了一种方便的方法来指定一个任务应该尝试的最大次数。当任务超过这个尝试数量后,它将被插入到 failed_jobs 数据库表中。要为 failed_jobs 表创建一个迁移,你可以使用 queue:failed-table 命令:

  1. php artisan queue:failed-table
  2. php artisan migrate

然后,在运行你的队列处理器时,你可以使用 —triesqueue:work 命令上指定应该尝试一个任务的最大次数。如果你没有为 —tries 选项指定一个值,那么任务将只被尝试一次:

  1. php artisan queue:work redis --tries=3

此外,您可以使用 —delay 选项指定Laravel在重试失败的任务之前应该等待多少秒。默认情况下,任务会立即重试:

  1. php artisan queue:work redis --tries=3 --delay=3

如果你想在每个任务的基础上配置失败的任务重试延迟,你可以通过在你的排队job类中定义一个 retryAfter 属性来实现:

  1. /**
  2. * 重试任务前等待的秒数
  3. *
  4. * @var int
  5. */
  6. public $retryAfter = 3;

任务失败后的清理工作

你可以直接在job类上定义一个 failed 方法,它允许你在发生故障时执行特定于任务的清理。这是向用户发送警报或还原任务执行的任何操作的最佳位置。导致作业失败的 Exception 将被传递给 failed 方法:

  1. <?php
  2. namespace App\Jobs;
  3. use App\AudioProcessor;
  4. use App\Podcast;
  5. use Exception;
  6. use Illuminate\Bus\Queueable;
  7. use Illuminate\Contracts\Queue\ShouldQueue;
  8. use Illuminate\Queue\InteractsWithQueue;
  9. use Illuminate\Queue\SerializesModels;
  10. class ProcessPodcast implements ShouldQueue
  11. {
  12. use InteractsWithQueue, Queueable, SerializesModels;
  13. protected $podcast;
  14. /**
  15. * 创建一个新的任务实例.
  16. *
  17. * @param Podcast $podcast
  18. * @return void
  19. */
  20. public function __construct(Podcast $podcast)
  21. {
  22. $this->podcast = $podcast;
  23. }
  24. /**
  25. * 执行任务
  26. *
  27. * @param AudioProcessor $processor
  28. * @return void
  29. */
  30. public function handle(AudioProcessor $processor)
  31. {
  32. // 处理上传的 podcast...
  33. }
  34. /**
  35. * 任务未能处理
  36. *
  37. * @param Exception $exception
  38. * @return void
  39. */
  40. public function failed(Exception $exception)
  41. {
  42. // 给用户发送失败通知, 等等...
  43. }
  44. }

注意:如果任务是使用dispatchNow 方法分派的,则不会调用 failed 方法。

任务失败事件

如果你想要注册一个将在任务失败时调用的事件,您可以使用 Queue::failing 方法。这是一个通过电子邮件或Slack通知你的团队的好机会。例如,我们可以在Laravel里的 AppServiceProvider 里附加一个回调到这个事件:

  1. <?php
  2. namespace App\Providers;
  3. use Illuminate\Support\Facades\Queue;
  4. use Illuminate\Support\ServiceProvider;
  5. use Illuminate\Queue\Events\JobFailed;
  6. class AppServiceProvider extends ServiceProvider
  7. {
  8. /**
  9. * 注册任何应用程序服务
  10. *
  11. * @return void
  12. */
  13. public function register()
  14. {
  15. //
  16. }
  17. /**
  18. * 引导任何应用程序服务
  19. *
  20. * @return void
  21. */
  22. public function boot()
  23. {
  24. Queue::failing(function (JobFailed $event) {
  25. // $event->connectionName
  26. // $event->job
  27. // $event->exception
  28. });
  29. }
  30. }

重试失败的任务

要查看所有插入到 failed_jobs 数据库表中的失败任务,可以使用 queue:failed Artisan命令:

  1. php artisan queue:failed

queue:failed 命令将列出任务ID、连接、队列和故障时间。任务ID可用于重试失败的任务。例如,要重试ID为 5 的失败任务,请执行以下命令:

  1. php artisan queue:retry 5

要重试所有失败的任务,请执行 queue:retry 命令,并将 all作为ID传递:

  1. php artisan queue:retry all

如果你想删除一个失败的任务,你可以使用 queue:forget 命令:

  1. php artisan queue:forget 5

要删除所有失败的任务,您可以使用 queue:flush 命令:

  1. php artisan queue:flush

忽略丢失的Models

当向任务注入一个Eloquent模型时,它会在被放入队列之前自动序列化,并在处理任务时恢复。但是,如果在任务等待worker处理时删除了模型,你的任务可能会失败,出现 ModelNotFoundException

为方便起见,你可以通过设置你的任务的 deleteWhenMissingModels 属性为 true 来自动删除缺少模型的作业:

  1. /**
  2. * 如果任务的模型不再存在,则删除该任务
  3. *
  4. * @var bool
  5. */
  6. public $deleteWhenMissingModels = true;

任务事件

使用 Queue facade上的 beforeafter 方法,可以指定在处理排队任务之前或之后执行的回调。如果要为控制面板执行附加日志记录或增量统计,这些回调会是绝佳时机。通常,你应该从服务提供者调用这些方法。例如,我们可以使用 Laravel的 AppServiceProvider :

  1. <?php
  2. namespace App\Providers;
  3. use Illuminate\Support\Facades\Queue;
  4. use Illuminate\Support\ServiceProvider;
  5. use Illuminate\Queue\Events\JobProcessed;
  6. use Illuminate\Queue\Events\JobProcessing;
  7. class AppServiceProvider extends ServiceProvider
  8. {
  9. /**
  10. * 注册任何应用程序服务
  11. *
  12. * @return void
  13. */
  14. public function register()
  15. {
  16. //
  17. }
  18. /**
  19. * 启动任何应用程序服务
  20. *
  21. * @return void
  22. */
  23. public function boot()
  24. {
  25. Queue::before(function (JobProcessing $event) {
  26. // $event->connectionName
  27. // $event->job
  28. // $event->job->payload()
  29. });
  30. Queue::after(function (JobProcessed $event) {
  31. // $event->connectionName
  32. // $event->job
  33. // $event->job->payload()
  34. });
  35. }
  36. }

使用 Queue facade上的 looping 方法,你可以指定在worker尝试从队列获取任务之前执行的回调。例如,你可以注册一个闭包来回滚以前失败的任务留下的任何事务:

  1. Queue::looping(function () {
  2. while (DB::transactionLevel() > 0) {
  3. DB::rollBack();
  4. }
  5. });

本文章首发在 LearnKu.com 网站上。

本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接 我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。

Laravel China 社区:https://learnku.com/docs/laravel/7.x/queues/7491