Play! Framework RabbitMQ Module
Felipe Oliveira
http://mashup.fm
http://playframework.info
http://geeks.aretotally.in
http://twitter.com/_felipera
1) Installation
Under dependencies.yml:
require:
- play -> rabbitmq
2) RabbitMQ
Download RabbitMQ from http://www.rabbitmq.com/server.html.
Unzip the distribution, go to folder sbin and run “./rabbitmq-server”.
It should be running on port 5762 by default. The default user “guest” and password is “guest”.
3) Configuration
rabbitmq.host=localhost
rabbitmq.port=5672
rabbitmq.vhost=/
rabbitmq.user=guest
rabbitmq.password=guest
rabbitmq.exchangeType=direct
rabbitmq.durable=true
rabbitmq.autoAck=false
rabbitmq.basicQos=true
rabbitmq.retries=5
rabbitmq.msgmapper=json
rabbitmq.host -> I am sure you can guess this one
rabbitmq.port -> I am sure you can guess this one
rabbitmq.vhost -> If you don’t know what this is I recommend reading the Admin Guide for RabbitMQ (http://www.rabbitmq.com/admin-guide.html)
rabbitmq.user -> I am sure you can guess this one. guest is the default user on a fresh RabbitMQ install.
rabbitmq.password -> I am sure you can guess this one. guest is the default value on a fresh RabbitMQ install.
rabbitmq.exchangeType -> These are the different exchangeTypes provided by RabbitMQ like direct, fanout, etc. If you are not familiar with them, please checkout RabbitMQ’s doc (http://www.rabbitmq.com/getstarted.html).
rabbitmq.durable -> Durable message. You most likely want to keep that as true if your app requires reliability in your message deliveries.
rabbitmq.autoAck -> If true you are basically turning off any form of retry so you probabily don’t want that.
rabbitmq.basicQos -> Please read RabbitMQ’s documentation for more information on basicQos (https://dev.rabbitmq.com/wiki/BasicQosDesign?version=15b1bab0ac73).
rabbitmq.retries -> Max number of retries per message. This number can be overwritten on a queue level by overriding the retries() method on your consumer class.
rabbitmq.msgmapper -> Possible values are “pojo” or “json”. These are the different implementations used to store and retrieve messages, “json” uses Jackson’s ObjectMapper and “pojo” uses Java’s serialization.
4) Define Message that will be used by the Queue (just a simple POJO)
public class SampleMessage implements Serializable {
private String field1;
private String field2;
public SampleMessage() {
}
public SampleMessage(String field1, String field2) {
super();
this.field1 = field1;
this.field2 = field2;
}
public String getField1() {
return field1;
}
public void setField1(String field1) {
this.field1 = field1;
}
public String getField2() {
return field2;
}
public void setField2(String field2) {
this.field2 = field2;
}
@Override
public String toString() {
return “SampleMessage [field1=” + field1 + “, field2=” + field2 + “]”;
}
}
5) Publish a Message
public static void publish(SampleMessage q) {
RabbitMQPublisher.publish(“myQueue”, q);
render(q);
}
6) Creating a Message Consumer
@OnApplicationStart(async = true)
public class RabbitMQSampleConsumer extends RabbitMQConsumer
protected void consume(SampleMessage message) {
System.out.println(“******************************”);
System.out.println(“* Message Consumed: ” + message);
System.out.println(“******************************”);
}
protected String queue() {
return “myQueue”;
}
protected String routingKey() {
return this.queue();
}
protected int retries() {
// This is the default value defined by “rabbitmq.retries” on
// application.conf (please override if you need a new value)
return RabbitMQPlugin.retries();
}
protected Class getMessageType() {
return SampleMessage.class;
}
}
- Please note this is a Play! job so you can start it manualy or you can use the other annotations provided by Play! like @On or @Every. More information available at http://www.playframework.org/documentation/1.2/jobs.
- There are some cases where you don’t want to retry re-processing the message, for example in cases you know you will keep getting the same error no matter how many times you retry that message.
First of all, we avoid infiniate loops by retrying only a certain amount of times, defined by the method retries() on the consumer class.
For the cases where you don’t want to retry at all simply throw a RabbitMQNotRetriableException. The RabbitMQConsumer base class will catch it and acknowledge to RabbitMQ, instead of trying to reprocess the message.
- Each consumer has a method routingKey() which you can optionally overwrite to setup a Topic Exchange with RabbitMQ. Same thing on the producer.
7) Firehose – Another way to publish messages in batch
@OnApplicationStart(async = true)
public class RabbitMQSampleFirehose extends RabbitMQFirehose
public int count = 0;
protected List
if ( count >= 10 ) {
return null;
}
List
for (int i = 0; i < n; i++) {
results.add(new SampleMessage(“field1”, “field2”));
count++;
}
return results;
}
protected int batchSize() {
return 2;
}
protected String queueName() {
return “myQueue”;
}
}
- Please note this is a Play! job so you can start it manualy or you can use the other annotations provided by Play! like @On or @Every. More information available at http://www.playframework.org/documentation/1.2/jobs.
8) Live Stats and Live Streaming
Add the module routes in your routes.conf
- /rabbitmq/ module:rabbitmq
Live Stats and Live Streaming of messages produced and consumed are available at http://localhost:9000/rabbitmq/.
Live Stats uses Ajax to display the number of successful messages produced and consumed to a queue, errors as well.
Live Streaming uses WebSockets (requires newer browsers) to stream messages consumed and produced.
- To find out more about WebSockets in the Play! Framework, please visit http://www.playframework.org/documentation/1.2/asynchronous.
Source Code
The source code is available on Github at https://github.com/feliperazeek/play-rabbitmq.
Changelog
May 11th 2011 – Version 0.0.5: Fixed bugs on stats and added average time to stats service, stats page for a queue and also to the WebSocket live streaming.
May 18th 2011 – Version 0.0.6: Adding routingKey to Publisher, Consumer, Firehose, RabbitMQPlugin, etc to add support for RabbitMQ Topic Exchange.
May 26th 2011 – Version 0.0.8: Adding more stats
May 28th 2011 – Version 0.0.9: Adding ability for consumers to pause.