Consuming RabbitMQ messages with PHP

« »

Once you’ve created a RabbitMQ producer, it’s fairly easy to create a consumer. In fact, the only difference is in exactly what commands you’re using. The connection, envelope, channel and queue declarations are the same.

While in RabbitMQ you publish to the exchange, you actually do consume a specific queue. As a result, the commands for consuming are part of the AMQPQueue class.

Consuming with a background process

It’s possible with RabbitMQ to run a background process (I use supervisord for starting and running my background process).

To do this, you can use the blocking command AMQPQueue::consume(). This method will sit and wait until it receives work from RabbitMQ to perform, until it receives a particular response from the callback function, or until it receives a kill signal.

When using the AMQPQueue::consume() method you must define a callback (or callable) as the first argument of the method.

<?php

$queue->consume([$object, 'callbackMethod']);

The callback or callable method you define should accept two arguments: the AMQPEnvelope (which will contain the payload of the data) and an optional second argument of the AMQPQueue that you’re reading from.

<?php

class MyCallable {

    public function callbackMethod(AMQPEnvelope $envelope, AMQPQueue $queue) {
        // Do the work here
    }
}

A non-blocking way to consume messages

Sometimes you may want to invoke a non-blocking mechanism for consuming messages, for example if you’re consuming RabbitMQ values in a web application.

The AMQP extension supports this through the AMQPQueue::get() method. This method essentially gets the next message off the queue and returns the AMQPEnvelope object.


while($envelope = $queue->get()) {
    // do work here...
}

Notice the difference: the AMQPQueue::get() method does not take a callback as an argument.

Success and failure conditions

RabbitMQ is designed to expect message acknowledgement or rejection based on whether or not you were able to consume the message properly.

Any message that was delivered but not acknowledged when the connection is terminated is requeued for redelivery to another worker. It is therefore crucial that your application be designed to support redelivery of messages.

Of course, you can specify that RabbitMQ not wait for an acknowledgement, but in most cases you’ll be using RabbitMQ in a way that wants to acknowledge the message was received and appropriately processed.

function runupdate(AMQPEnvelope $job, AMQPQueue $queue) {
    global $scripts, $logger;
    $body = $job->getBody();
    $logger->info("running job on " . $body);
    $repoID = $body;
    $result = `$scripts/update.php $id`;
    if(!$result) {
        $logger->info("completed job on " . $body);
        $queue->ack($job->getDeliveryTag());
        return;
    }
    $logger->err("Unable to complete job on $body: $result");
    $queue->nack($job->getDeliveryTag(), AMQP_REQUEUE);
        
}

In this callback function, we’re doing two things. First, if the job is successful, we’re running the AMQPQueue::ack() method. The message that you pass as the first argument to the ack() method is called the delivery tag and this is a numeric tag that’s specific to the message on this channel.

In the event that the job was unsuccessful, we log the failure and specifically refuse to acknowledge (or “nack”) the message using the AMQPQueue::nack() method. We’re passing the same numeric delivery tag, but we’re also passing a special flag of AMQP_REQUEUE to tell RabbitMQ “place this back in the queue”.

Conclusion

With these parts you will be able to successfully construct a RabbitMQ producer and consumer for your own projects. RabbitMQ is a powerful, robust, simple and efficient tool for queueing and message passing between layers of an application. I highly recommend it and encourage you to experiment with it.

Brandon Savage is the author of Mastering Object Oriented PHP and Practical Design Patterns in PHP

Posted on 5/30/2013 at 8:36 pm
Categories: PHP

Éric wrote at 5/31/2013 4:04 am:

Hello,

Thank you for your article, but I’m certainly dumb. I still don’t understand what a Message Queuing Protocol.
In which real life usage case can it be useful ?

Please excuse my poor english, I’m french :)

« »

Copyright © 2023 by Brandon Savage. All rights reserved.