Queueing

Activities like processing videos, resizing images or sending emails aren’t suitable to be executed online or in real time because it may slow the loading time of pages and severely impact the user experience.

The best solution here is to implement background jobs. The web application puts jobs into a queue and which will be processed separately.

While you can find more sophisticated PHP extensions to address queueing in your applications like RabbitMQ; Phalcon provides a client for Beanstalk, a job queueing backend inspired by Memcached.It’s simple, lightweight, and completely specialized for job queueing.

Some of the data returned from queue methods require that the module Yaml be installed. Please refer to this for more information. You will need to use Yaml >= 2.0.0

Putting Jobs into the Queue

After connecting to Beanstalk you can insert as many jobs as required. You can define the message structure according to the needs of the application:

  1. <?php
  2. use Phalcon\Queue\Beanstalk;
  3. // Connect to the queue
  4. $queue = new Beanstalk(
  5. [
  6. 'host' => '192.168.0.21',
  7. 'port' => '11300',
  8. ]
  9. );
  10. // Insert the job in the queue
  11. $queue->put(
  12. [
  13. 'processVideo' => 4871,
  14. ]
  15. );

Available connection options are:

OptionDescriptionDefault
hostIP where the beanstalk server is located127.0.0.1
portConnection port11300

In the above example we stored a message which will allow a background job to process a video. The message is stored in the queue immediately and does not have a certain time to live.

Additional options as: time to run, priority and delay can be passed as second parameter:

  1. <?php
  2. // Insert the job in the queue with options
  3. $queue->put(
  4. [
  5. 'processVideo' => 4871,
  6. ],
  7. [
  8. 'priority' => 250,
  9. 'delay' => 10,
  10. 'ttr' => 3600,
  11. ]
  12. );

The following options are available:

OptionDescription
priorityIt’s an integer < 2**32. Jobs with smaller priority values will be scheduled before jobs with larger priorities. The most urgent priority is 0; the least urgent priority is 4,294,967,295.
delayIt’s an integer number of seconds to wait before putting the job in the ready queue. The job will be in the ‘delayed’ state during this time.
ttrTime to run – is an integer number of seconds to allow a worker to run this job. This time is counted from the moment a worker reserves this job.

Every job put into the queue returns a job id which you can use to track the status of the job:

  1. <?php
  2. $jobId = $queue->put(
  3. [
  4. 'processVideo' => 4871,
  5. ]
  6. );

Retrieving Messages

Once a job is placed into the queue, those messages can be consumed by a background worker which will have enough time to complete the task:

  1. <?php
  2. while (($job = $queue->peekReady()) !== false) {
  3. $message = $job->getBody();
  4. var_dump($message);
  5. $job->delete();
  6. }

Jobs must be removed from the queue to avoid double processing. If multiple background jobs workers are implemented, jobs must be reserved so other workers don’t re-process them while other workers have them reserved:

  1. <?php
  2. while (($job = $queue->reserve()) !== false) {
  3. $message = $job->getBody();
  4. var_dump($message);
  5. $job->delete();
  6. }

Our client implements a basic set of the features provided by Beanstalkd but enough to allow you to build applications implementing queues.

Advanced Topics

Multiple Queues

Beanstalkd supports multiple queues (called ‘tubes’) to allow for a single queue server to act as a hub for a variety of workers. Phalcon supports this readily.

Viewing the tubes available on the server, and choosing a tube for the queue object to use:

  1. <?php
  2. $tube_array = $queue->listTubes();
  3. $queue->choose('myOtherTube');

All subsequent work with $queue now manipulates myOtherTube instead of default.

You can view which tube the queue is using as well.

  1. <?php
  2. $current_tube = $queue->listTubeUsed();

Tube Manipulation

Tubes can be paused and resumed if needed. The example below pauses myOtherTube for 3 minutes.

  1. <?php
  2. $queue->pauseTube('myOtherTube', 180);

Setting the delay to 0 will resume normal operation.

  1. <?php
  2. $queue->pauseTube('myOtherTube', 0);

Server Status

You can get information about the entire server or specific tubes.

  1. <?php
  2. $server_stats = $queue->stats();
  3. $tube_stats = $queue->statsTube('myOtherTube');
  4. $server_status = $queue->readStatus();

Job Management

Beanstalkd supports the ability to manage jobs with both the idea of delaying a job and removing a job from the queue for later processing.

Burying a job is typically used to deal with potential problems outside of the worker that can be resolved. This takes the job and puts it into the buried queue.

  1. <?php
  2. $job = $queue->reserve();
  3. $job->bury();

A list of buried jobs is stored on the server. You can inspect the first buried job in the queue.

  1. <?php
  2. $job_data = $queue->peekBuried();

If the buried queue is empty, this will return false, else it returns a Job object.

You can kick the first [N] buried jobs in the buried queue to put it/them back in the ready queue. Below is an example of kicking the first three buried jobs.

  1. <?php
  2. $queue->kick(3);

Releasing jobs back to the ready queue can be done, along with an optional delay. This is handy for transient errors while processing a job. Below is an example of putting a low (100) priority and a 3 minute delay on a job.

  1. <?php
  2. $job = $queue->reserve();
  3. $job->release(100, 180);

Priority and delay are the same as when puting a job on the queue.

Inspecting a job in the queue can be accomplished with jobPeek($job_id). The example below attempts to peek at job id 5.

  1. <?php
  2. $queue->jobPeek(5)

Jobs that have been deleteed cannot be inspected and will return false. Ready, buried, and delayed jobs will return a Job object.

Further Reading

The protocol text contains all of the internal operational details of BeanstalkD and is often considered the defacto documentation for BeanstalkD.