Tropical Software Observations

30 August 2010

Posted by Teo Choong Ping

at 4:20 PM


Labels: , ,

RabbitMQ: PubSub Messaging with Groovy

RabbitMQ typically send "direct" message between the queue and the listeners -- if you have 5 messages put on the queue with 3 listeners, then each of the listener will get one of these messages. However if you need to have all the listeners get the same message, you need to set the exchange to "fanout".

The explanation of using rabbitmq's fanout feature may be well described in the FAQ, but it's not clear in code. So I hope someone will find the below groovy code helpful to see how to use rabbitmq fanout for pubsub topology messaging.

The below code is a basic message publisher to send message to rabbitmq queue. Note the exchange declared as "fanout" exchange. The queue binding may not be useful in fanout use case as all the messages will be send out to all listening consumers.

import com.rabbitmq.client.*

def factory = new ConnectionFactory();
Connection conn = factory.newConnection();
def channel = conn.createChannel()

String exchangeName = "foExchange"
String routingKey = "foo"
String queueName = "fooQueue"

channel.exchangeDeclare(exchangeName, "fanout", true)
channel.queueDeclare(queueName, true, false, false, null);

(1..3).each { n ->
    channel.basicPublish(exchangeName, routingKey,  MessageProperties.PERSISTENT_TEXT_PLAIN, 
                        " (${n}) test testing test [${new Date()}]".getBytes() );

println " Fin "

Now lets look at the consumer side to consume messages in fanout use case.In a fanout setup, the consumer doesn't have to do anything special. If you don't specify any particular queue name, it will get all messages from the exchange. You can however continue to received message from any particular queue if you bind to it.

String exchangeName = "fooExchange"
String routingKey = "foo"
//String queueName = "fooQueue"

String queueName = channel.queueDeclare().getQueue()
channel.queueBind(queueName, exchangeName, "#");
println " Queue: ${queueName} "

boolean noAck = false;
def consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, noAck, consumer);
boolean running = true

while(running) {

    QueueingConsumer.Delivery delivery;
    try {
        delivery = consumer.nextDelivery();
        println new String(delivery.body)
    } catch (InterruptedException ie) {
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);