Phalcon with Beanstalkd


Jan '16

Nov '16

6

1791

1


Thien
13.1k
edited Jan '16
Jan '16

Queueing is a great way to take some tasks out of the userflow and puts them in the background job. Queueing allows the user to skip waiting for slow performance tasks, and gives us another opportunity to segment our application and business logic out further.

For example. Performing activities like processing a video, resizing images or sending an email are not suitable to be executed online or in real-time because it increases the loading time of pages, and impacts the user experience. These make great candidates for work which we can dispose of to a queue.

Take another simple case for this article: "I want to send a notification email when new user creates or has a new comment on the article". To solve this case, Phalcon has a package "Queue" and it integrates with Beanstalkd.

Note that: We only discuss about the tasks queue, and not the messages queue.

Message Queue is the basic functionality of passing, holding, and delivering messages across multi-platform or service oriented system.

Tasks Queue manages work to be done and is considered a type of message queue

Beanstalkd

We use CentOS in development and also in production. The following is accomplished in CentOS 7.x Server LTS. The setup procedure may differ slightly depending on your environment, you can refer the article here

Beanstalk is a simple, fast working queue, it is an open-source licensed under the terms of the MIT license. Its interface is generic, but was originally designed for reducing the latency of page views in high-volume web applications by running time-consuming tasks asynchronously. The architecture is borrowed from memcached and it is designed specifically to be a message queue.

To install it on CentOS you just run this in your terminal.

sudo yum install beanstalkd -y
sudo chkconfig beanstalkd on
sudo service beanstalkd start
sudo service beanstalkd status

Alternatively, if you want to use the latest Beanstalkd version, you can install it from source:

git clone git://github.com/kr/beanstalkd.git
cd beanstalkd
make
cp beanstalkd /usr/bin/beanstalkd
mkdir /var/lib/beanstalkd

Because you install form source so you need to make the startup script file /etc/systemd/system/beanstalkd.service with following content.

[Unit]
Description=Beanstalkd is a simple, fast work queue

[Service]
User=root
ExecStart=/usr/bin/beanstalkd -b /var/lib/beanstalkd

[Install]
WantedBy=multi-user.target

Finally, Running and verify, enter:

systemctl enable beanstalkd && systemctl start beanstalkd
ps ax | grep beanstalkd

Note that the Beanstalkd architecture communicates via PUSH sockets which providing instant communication between providers and workers. It means that when a provider enqueues a job, a worker will reserve it immediately if it is connected and ready. Jobs are reserved until a worker has sent a response (delete, bury, etc.)

For a better understanding about the fundamental of Beanstalk, we recommend reading the official documentation at https: //github.com/kr/beanstalkd/blob/master/doc/protocol.md

Configuring

Now we can setup Beanstalkd with Phalcon. Inside your common/config/service.php file, copy paste the following the code:

/**
 * Queue to deliver e-mails in real-time
 */
$di->set(
    'queue',
    function () use ($config) {
        if (isset($config->beanstalk->disabled) && $config->beanstalk->disabled) {
            return new DummyServer();
        }
        if (!isset($config->beanstalk->host)) {
            throw new \Exception('Beanstalk is not configured');
        }
        return new Beanstalk(['host' => $config->beanstalk->host]);
    },
    true
);

For the full file service.php see here. We set the default port Beanstalk to listen on as 11300 for the configuration above.

Create a script to process it

Once our Phalcon dependency in installed, we can begin to write some code. In this example, we'll create a Server class to handle the processing. If no method is specified, Phalcon assumes the class will have a put() method. This is half of a worker - the code which does some processing.

<?php
/**
 * Phanbook : Delightfully simple forum software
 *
 * Licensed under The GNU License
 * For full copyright and license information, please see the LICENSE.txt
 * Redistributions of files must retain the above copyright notice.
 *
 * @link    http://phanbook.com Phanbook Project
 * @since   1.0.0
 * @license http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt
 */
namespace Phanbook\Queue;
/**
 * Server
 *
 * Facade to Phalcon\Queue\Beanstalkd
 */
class Server
{
    /**
     * Server constructor
     *
     * @param Phalcon\Queue\Beanstalkd $queue
     */
    public function __construct($queue)
    {
        $this->queue = $queue;
    }
    /**
     * Simulates putting a job in the queue
     *
     * @param array $job
     */
    public function put($job)
    {
        return true;
    }
    public function doSomething()
    {
        //@todo
    }
}

If you want an example for the images processing queue job, refer it here

Push a job to a Queue

When the user creates an article or comments, we will add a job to the queue so that our worker can process it. Because we defined service queue inside the above service.php file, so we just call it, in our example, we simply pass it a path to an Posts.php file.

[...]
/**
 * Implement hook beforeUpdate of Model Phalcon
 *
 * @return mixed
 */
public function afterCreate()
{
    if ($this->id > 0) {
        /**
         * Register the activity
         */
        $activity = new Activities();
        $activity->setUsersId($this->usersId);
        $activity->setPostsId($this->id);
        $activity->setType(Activities::NEW_POSTS);
        $activity->save();
        /**
         * Register the user in the post's notifications
         */
        $notification = new PostsNotifications();
        $notification->setUsersId($this->usersId);
        $notification->setPostsId($this->id);
        $notification->save();
        $toNotify = [];
        /**
         * Notify users that always want notifications
         */
        foreach (Users::find(['notifications = "Y"', 'columns' => 'id'])->toArray() as $user) {
            if ($this->usersId != $user['id']) {
                $notificationId = $this->setNotification(
                    $user['id'],
                    $this->id,
                    null,
                    Notifications::TYPE_POSTS
                );
                $toNotify[$user['id']] = $notificationId;
            }
        }
        /**
         * Queue notifications to be sent
         */
        $this->getDI()->getQueue()->put($toNotify);
    }

With $this->getDI()->getQueue()->put(); you can put data which holds information on what to process. This is what your program is going to use to process the job. We stored a message which will allow a background job to process a user Id. The message is stored in the queue immediately and does not have a certain time to live. Additional options as time to run, priority and delay could be passed as second parameter:

[...]
$toNotify[$user['id']] = $notificationId;
$optionTime = ['priority' => 250, 'delay' => 10, 'ttr' => 3600]
$this->getDI()->getQueue()->put(
    $toNotify,
    $optionTime
);

[...]

For the code above:

  • priority: an integer < 2**32. Jobs with smaller priority values will have higher priority. The most urgent priority is 0; the least urgent priority is 4,294,967,295.
  • delay: an integer number of seconds to wait before putting the job in the ready queue. The job will be in the “delayed” state during this time.
  • ttr: Time to run – is an integer indicates number of seconds to allow a worker to run this job. This time is counted from the moment a worker reserves this job.

A good practice would be to put jobs into the queue and return a “job id” to the developer can use to track the status of the job

Retrieving Messages

Once a job is placed into the queue, the messages can be consumed by a background job has enough time to complete the task, in our example:

/**
 * Check the queue from Beanstalk and send the notifications scheduled there
 *
 * @see at https://docs.phalconphp.com/en/latest/api/Phalcon_Queue_Beanstalk.html
 */
public function consumeQueue()
{
    while (true) {
        while ($this->queue->peekReady() !== false) {
            $job = $this->queue->reserve();
            $message = $job->getBody();
            foreach ($message as $userId => $id) {
                $notification = Notifications::findFirstById($id);
                if ($notification) {
                    $this->send($notification);
                }
            }
            if (is_object($this->transport)) {
                $this->transport->stop();
                $this->transport = null;
                $this->mailer = null;
            }
            $job->delete();
        }
        sleep(5);
    }
}

in the above example, it sends emails to all users so we call method reserve() because which have multiple background jobs workers are implemented. When the queue jobs complete, we need to removed from the queue to avoid double processing. For the full code see at

Process the jobs

At this moment, we have code to process an send mail (most of a workers), and we've added a job to the queue. The last step is to have code pull a job from the queue. We have implemented some Phalcon CLI into the script to do that, see the below:

<?php
namespace Phanbook\Cli\Tasks;
use Phalcon\CLI\Task;
use Phanbook\Mail\SendSpool;
class SendSpoolConsumerTask extends Task
{
    public function mainAction()
    {
        echo "\n Send email for post reply\n";
        $spool = new SendSpool();
        try {
            var_dump($spool->consumeQueue());
        } catch (Exception $e) {
            echo $e->getMessage(), PHP_EOL;
            echo $e->getTraceAsString();
        }
    }
}

Then to use it, you just run this in your terminal:

php cli SendSpoolConsumer &

By default, Phalcon will run queue jobs synchronously - that is, it runs the job at the time of creation. This means the email will be processed in the same request that the time user creates a post.

Also, to see detailed beanstalk, you can use a third-party library PHP at https://github.com/ptrofimov/beanstalk_console.

We can to summary workflow of Beanstalk below:

A job in beanstalk gets created by a client with the put command. During its life, it can be in one of four states: ready, reserved, delayed, or buried. After the put command, a job typically starts out ready. It waits in the ready queue until a worker comes along and runs the "reserve" command. If this job is next in the queue, it will be reserved for the worker. The worker will execute the job; when it is finished the worker will send a delete command to delete the job

Here is a picture of the typical job lifecycle:

 put            reserve               delete
  -----> [READY] ---------> [RESERVED] --------> *poof*

In conclusion

That is it! Our client implements a basic set of the features provided by Beanstalkd but enough to allow you to build applications implementing queues.

For more information on Phalcon Queue check out the online documentation. Did you enjoy this article? Let us know your thoughts!

The link you include is wrong, the correct link to Beanstalkd Admin Console is: https://github.com/ptrofimov/beanstalk_console


Thien
13.1k

Hi Pentium10

Thank you for your feedback

Great tutorial I just find https://github.com/phalcon/incubator/tree/master/Library/Phalcon/Queue/Beanstalk a little bit better for long runing daemons or cron, but either way your implementation can be use with the mention library.

Thanks

The link of the service.php is not working


Thien
13.1k

Hello Danny

Well, I had refector code Phanbook so this link is replace at https://github.com/phanbook/phanbook/blob/master/core/config/services.php#L373


Ri Xu
77

I create new single file cross-platform Beanstalk queue server console. https://github.com/Luxurioust/aurora


Buryni
84

what is DummyServer();?