Publishing messages to RabbitMQ with PHP

« »

Now that we understand the basics behind RabbitMQ, it’s time for us to start working with it. The first step in working with RabbitMQ is to begin sending messages to the exchange so that they can be queued.

In RabbitMQ parlance, the “producer” is responsible for “publishing” the messages to the exchange.

Connecting to RabbitMQ and getting a channel

The first step we need to accomplish is connecting to RabbitMQ. This is fairly easy:


$cnn = new AMQPConnection();
$cnn->setLogin($rabbitcreds['user']);
$cnn->setPassword($rabbitcreds['pass']);
$cnn->setHost($rabbitcreds['host']);
$cnn->setVhost($rabbitcreds['vhost']);
$cnn->connect();

In this example, we’re using the majority of the settings in the Connection object to customize our connection. In a development instance of RabbitMQ, leaving most of these fields alone will connect you on guest (with “guest” as a password). But in a production environment, you’ll want to specify users and access permissions.

RabbitMQ employs a concept called “vhost” to segment data and exchanges. The easiest way to think of the vhost is to compare it to a database: you have to select a database and you have to select a vhost. If you don’t specify a vhost, it will revert to the basic vhost.

Also notice that the connection is not automatically established here; you must explicitly call AMQPConnection::connect() to complete the connection.

Now, once you’ve established a connection you’ll need to obtain a channel. This is a really easy but terribly important step.

$ch = new AMQPChannel($cnn);

The channel object takes the connection object as an argument and uses that connection object to open the channel.

Declaring an exchange

In order to publish messages to RabbitMQ, we’ll need an exchange. Exchanges must be declared before they are used; they can be re-declared as often as you like (as long as their type and persistence don’t change). Therefore, you’ll see that we declare the exchange in both the producer and the consumer.

$ex = new AMQPExchange($ch);
$ex->setName($rabbit['topic_exchange']);
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
$ex->declare();

We’re doing a lot of things here, so I’ll explain.

First, we’re declaring the exchange> The AMQPExchange class takes the AMQPChannel object as a constructor argument, so we’ll pass that in.

Next, we’re setting the name of the exchange. You’ll want to name your exchanges for easy differentiation later.

We’re also setting a few arguments on the exchange. We’re using the AMQP_EX_TYPE_DIRECT to tell it we want a direct exchange. We’re also using AMQP_DURABLE to declare that the exchange should persist even after the last consumer unsubscribes.

Finally, once we’ve set up our exchange we have to call the AMQPExchange::declare() method to actually declare our exchange.

Creating a queue

In this example, we also want to create a queue for our messages to go into. We’ll then bind the queue to the exchange.

$q = new AMQPQueue($ch);
$q->setName($rabbit['queue']);
$q->setFlags(AMQP_DURABLE);
$q->declare();

rout

In this example, we’re doing quite a few things as well. Note that the AMQPQueue class takes the AMQPChannel object, not the AMQPExchange object as a constructor argument.

We’re declaring a name for this queue, and we’re also then declaring that the queue will be durable, or persist past all consumers unsubscribing or a restart.

Also note that like the exchange, we must call the AMQPQueue::declare() method once we’ve set our arguments.

A note about persistent queues and exchanges: the methodology for persisting queues and exchanges causes the data they contain to be written to disk. This incurs a performance hit, and the extent of that performance hit will be different for every application. You’ll need to determine exactly how you want to work with RabbitMQ and decide if this performance cost is worthwhile.

Binding the queue to the exchange

Once the queue and the exchange have been declared they must be linked, our bound together.

Queues are bound to exchanges by a common interface known as the “routing key”. A routing key tells the exchange where to send the message and which queue it belongs in.

A routing key is always optional (if you don’t specify one the exchange will route all matching messages to that queue).


$q->bind($rabbit['topic_exchange'], $rabbit['routing_key']);

With this little bit of code, we’re telling the exchange “for routing key X, send all messages to queue Y”.

Publishing our messages to RabbitMQ

Now we’re prepared for our final step: actually producing and publishing our message to RabbitMQ.

Remember from our first discussion that you publish to an exchange, and consume from a queue. The exchange will determine where to place the message based on the routing key.

In order to publish our message, we’ll use the following code:

$ex->publish($message, $rabbit['routing_key]);

Note that we’re publishing to the AMQPExchange object, NOT to the queue object. In fact, to publish, we don’t even need to instantiate AMQPQueue (but we did in this example, since you can’t publish messages to a queue without declaring one).

Testing if your code works

Newer versions of RabbitMQ include a web console as a plugin. You can read the directions for installing the plugin here.

Once you’ve published messages to your queue, you’ll see the number of pending messages start to climb in the web console.

Brandon Savage is the author of Mastering Object Oriented PHP and Practical Design Patterns in PHP

Posted on 5/29/2013 at 8:31 am
Categories: PHP

There are currently no comments.

« »

Copyright © 2023 by Brandon Savage. All rights reserved.