9.2.3 Receiving messages from RabbitMQ
You’ve seen that sending messages with RabbitTemplate doesn’t differ much from sending messages with JmsTemplate. And as it turns out, receiving messages from a RabbitMQ queue isn’t very different than from JMS.
As with JMS, you have the following two choices:
- Pulling messages from a queue with RabbitTemplate
- Having messages pushed to a @RabbitListener-annotated method
Let’s start by looking at the pull-based RabbitTemplate.receive() method.
RECEIVING MESSAGES WITH RABBITTEMPLATE
RabbitTemplate comes with several methods for pulling messages from a queue. A few of the most useful ones are listed here:
// Receive messages
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
// Receive objects converted from messages
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis)
throws AmqpException;
// Receive type-safe objects converted from messages
<T> T receiveAndConvert(ParameterizedTypeReference<T> type)
throws AmqpException;
<T> T receiveAndConvert(String queueName, ParameterizedTypeReference<T> type)
throws AmqpException;
<T> T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference<T> type)
throws AmqpException;
<T> T receiveAndConvert(String queueName, long timeoutMillis,
ParameterizedTypeReference<T> type) throws AmqpException;These methods are the mirror images of the send() and convertAndSend() methods described earlier. Whereas send() is used to send raw Message objects, receive() receives raw Message objects from a queue. Likewise, receiveAndConvert() receives messages and uses a message converter to convert them into domain objects before returning them.
But a few obvious differences occur in the method signatures. First, none of these methods take an exchange or routing key as a parameter. That’s because exchanges and routing keys are used to route messages to queues, but once the messages are in the queue, their next destination is the consumer who pulls them off the queue. Consuming applications needn’t concern themselves with exchanges or routing keys. A queue is the only thing the consuming applications need to know about.
You’ll also notice that many of the methods accept a long parameter to indicate a time-out for receiving the messages. By default, the receive time-out is 0 milliseconds. That is, a call to receive() will return immediately, potentially with a null value if no messages are available. This is a marked difference from how the receive() methods behave in JmsTemplate. By passing in a time-out value, you can have the receive() and receiveAndConvert() methods block until a message arrives or until the time-out expires. But even with a non-zero time-out, your code will need to be ready to deal with a null return.
Let’s see how you can put this in action. The next listing shows a new Rabbit-based implementation of OrderReceiver that uses RabbitTemplate to receive orders.
Listing 9.6 Pulling orders from RabbitMQ with RabbitTemplate
package tacos.kitchen.messaging.rabbit;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitOrderReceiver {
private RabbitTemplate rabbit;
private MessageConverter converter;
@Autowired
public RabbitOrderReceiver(RabbitTemplate rabbit) {
this.rabbit = rabbit;
this.converter = rabbit.getMessageConverter();
}
public TacoOrder receiveOrder() {
Message message = rabbit.receive("tacocloud.orders");
return message != null
? (TacoOrder) converter.fromMessage(message)
: null;
}
}The receiveOrder() method is where all of the action takes place. It makes a call to the receive() method on the injected RabbitTemplate to pull an order from the tacocloud.order queue. It provides no time-out value, so you can assume only that the call returns immediately with either a Message or null. If a Message is returned, you use the MessageConverter from the RabbitTemplate to convert the Message to a TacoOrder. On the other hand, if receive() returns null, you’ll return a null.
Depending on the use case, you may be able to tolerate a small delay. In the Taco Cloud kitchen’s overhead display, for example, you can possibly wait a while if no orders are available. Let’s say you decide to wait up to 30 seconds before giving up. Then the receiveOrder() method can be changed to pass a 30,000 millisecond delay to receive() as follows:
public TacoOrder receiveOrder() {
Message message = rabbit.receive("tacocloud.order.queue", 30000);
return message != null
? (TacoOrder) converter.fromMessage(message)
: null;
}If you’re like me, seeing a hardcoded number like that gives you a bit of discomfort. You might be thinking that it’d be a good idea to create a @ConfigurationPropertiesannotated class so you could configure that time-out with a Spring Boot configuration property. I’d agree with you, if it weren’t for the fact that Spring Boot already offers such a configuration property. If you want to set the time-out via configuration, simply remove the time-out value in the call to receive() and set it in your configuration with the spring.rabbitmq.template.receive-timeout property like so:
spring:
rabbitmq:
template:
receive-timeout: 30000Back in the receiveOrder() method, notice that you had to use the message converter from RabbitTemplate to convert the incoming Message object to a TacoOrder object. But if the RabbitTemplate is carrying a message converter around, why can’t it do the conversion for you? That’s precisely what the receiveAndConvert() method is for. Using receiveAndConvert(), you can rewrite receiveOrder() like this:
public TacoOrder receiveOrder() {
return (TacoOrder) rabbit.receiveAndConvert("tacocloud.order.queue");
}That’s a lot simpler, isn’t it? The only troubling thing I see is the cast from Object to TacoOrder. There’s an alternative to casting, though. Instead, you can pass a ParameterizedTypeReference to receiveAndConvert() to receive a TacoOrder object directly as follows:
public TacoOrder receiveOrder() {
return rabbit.receiveAndConvert("tacocloud.order.queue",
new ParameterizedTypeReference<Order>() {});
}It’s debatable whether that’s better than casting, but it is a more type-safe approach than casting. The only requirement to using a ParameterizedTypeReference with receiveAndConvert() is that the message converter must be an implementation of SmartMessageConverter; Jackson2JsonMessageConverter is the only out-of-the-box implementation to choose from.
The pull model offered by JmsTemplate fits a lot of use cases, but often it’s better to have code that listens for messages and that’s invoked when messages arrive. Let’s see how you can write message-driven beans that respond to RabbitMQ messages.
HANDLING RABBITMQ MESSAGES WITH LISTENERS
For message-driven RabbitMQ beans, Spring offers RabbitListener, the RabbitMQ counterpart to JmsListener. To specify that a method should be invoked when a message arrives in a RabbitMQ queue, annotate a bean’s method with @RabbitListener.
For example, the following listing shows a RabbitMQ implementation of OrderReceiver that’s annotated to listen for order messages rather than to poll for them with RabbitTemplate.
Listing 9.7 Declaring a method as a RabbitMQ message listener
package tacos.kitchen.messaging.rabbit.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import tacos.TacoOrder;
import tacos.kitchen.KitchenUI;
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@RabbitListener(queues = "tacocloud.order.queue")
public void receiveOrder(TacoOrder order) {
ui.displayOrder(order);
}
}You’ll no doubt notice that this looks remarkably like the code from listing 9.4. Indeed, the only thing that changed was the listener annotation—from @JmsListener to @RabbitListener. As wonderful as @RabbitListener is, this near duplication of code leaves me with little to say about @RabbitListener that I haven’t already said about @JmsListener. They’re both great for writing code that responds to messages that are pushed to them from their respective brokers—a JMS broker for @JmsListener and a RabbitMQ broker for @RabbitListener.
Although you may sense a lack of enthusiasm about @RabbitListener in that previous paragraph, be certain that isn’t my intent. In truth, the fact that @RabbitListener works much like @JmsListener is actually quite exciting! It means you don’t need to learn a completely different programming model when working with RabbitMQ vs. Artemis or ActiveMQ. The same excitement holds true for the similarities between RabbitTemplate and JmsTemplate.
Let’s hold on to that excitement as we wrap up this chapter by looking at one more messaging option supported by Spring: Apache Kafka.
