9.2.3 Nhận tin nhắn từ RabbitMQ
Bạn đã thấy rằng việc gửi tin nhắn với RabbitTemplate không khác nhiều so với JmsTemplate. Và thực tế, việc nhận tin nhắn từ một hàng đợi RabbitMQ cũng không khác mấy so với JMS.
Tương tự như với JMS, bạn có hai lựa chọn sau:
- Kéo tin nhắn từ hàng đợi bằng
RabbitTemplate - Nhận tin nhắn tự động thông qua phương thức được đánh dấu
@RabbitListener
Hãy bắt đầu với phương thức receive() của RabbitTemplate, sử dụng mô hình kéo (pull-based).
NHẬN TIN NHẮN VỚI RABBITTEMPLATE
RabbitTemplate cung cấp nhiều phương thức để kéo tin nhắn từ hàng đợi. Một vài phương thức hữu ích nhất được liệt kê dưới đây:
// Receive messages
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
// Receive objects converted from messages
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis)
throws AmqpException;
// Receive type-safe objects converted from messages
<T> T receiveAndConvert(ParameterizedTypeReference<T> type)
throws AmqpException;
<T> T receiveAndConvert(String queueName, ParameterizedTypeReference<T> type)
throws AmqpException;
<T> T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference<T> type)
throws AmqpException;
<T> T receiveAndConvert(String queueName, long timeoutMillis,
ParameterizedTypeReference<T> type) throws AmqpException;Các phương thức này giống như hình ảnh phản chiếu của send() và convertAndSend() đã được mô tả ở phần trước. Trong khi send() được dùng để gửi đối tượng Message thô, thì receive() dùng để nhận đối tượng Message thô từ hàng đợi. Tương tự, receiveAndConvert() nhận tin nhắn và sử dụng bộ chuyển đổi để chuyển chúng thành các đối tượng miền (domain objects) trước khi trả về.
Tuy nhiên, có một vài điểm khác biệt rõ ràng trong chữ ký của các phương thức. Trước hết, không phương thức nào trong số này nhận exchange hoặc routing key làm tham số. Đó là bởi vì exchange và routing key chỉ dùng để định tuyến tin nhắn đến hàng đợi, nhưng một khi tin nhắn đã nằm trong hàng đợi, điểm đến tiếp theo sẽ là ứng dụng tiêu thụ kéo tin nhắn ra khỏi hàng đợi. Ứng dụng tiêu thụ không cần quan tâm đến exchange hay routing key. Điều duy nhất ứng dụng cần biết là tên hàng đợi.
Bạn cũng sẽ nhận thấy rằng nhiều phương thức cho phép truyền vào một giá trị long để chỉ định thời gian chờ nhận tin nhắn. Mặc định, thời gian chờ này là 0 mili giây, tức là lệnh gọi receive() sẽ trả về ngay lập tức, có thể với giá trị null nếu không có tin nhắn nào sẵn sàng. Đây là một sự khác biệt đáng chú ý so với cách hoạt động của receive() trong JmsTemplate. Khi truyền vào một giá trị timeout, receive() và receiveAndConvert() có thể chờ đến khi có tin nhắn hoặc đến khi timeout xảy ra. Tuy nhiên, ngay cả với timeout khác 0, mã của bạn vẫn phải sẵn sàng xử lý trường hợp trả về null.
Hãy xem cách áp dụng điều này vào thực tế. Đoạn mã sau đây cho thấy một triển khai mới của OrderReceiver dựa trên Rabbit, sử dụng RabbitTemplate để nhận đơn hàng.
Liệt kê 9.6 Kéo đơn hàng từ RabbitMQ bằng RabbitTemplate
package tacos.kitchen.messaging.rabbit;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitOrderReceiver {
private RabbitTemplate rabbit;
private MessageConverter converter;
@Autowired
public RabbitOrderReceiver(RabbitTemplate rabbit) {
this.rabbit = rabbit;
this.converter = rabbit.getMessageConverter();
}
public TacoOrder receiveOrder() {
Message message = rabbit.receive("tacocloud.orders");
return message != null
? (TacoOrder) converter.fromMessage(message)
: null;
}
}Phương thức receiveOrder() là nơi mọi thứ diễn ra. Nó gọi receive() trên RabbitTemplate được tiêm vào để kéo một đơn hàng từ hàng đợi tacocloud.order. Không có timeout được truyền vào, nên bạn có thể giả định rằng lệnh gọi sẽ trả về ngay lập tức với một Message hoặc null. Nếu có Message, bạn sẽ dùng MessageConverter từ RabbitTemplate để chuyển đổi thành đối tượng TacoOrder. Nếu receive() trả về null, bạn cũng sẽ trả về null.
Tùy vào trường hợp sử dụng, bạn có thể chấp nhận một khoảng trễ nhỏ. Ví dụ, trong màn hình hiển thị tại nhà bếp Taco Cloud, bạn có thể chờ một lúc nếu chưa có đơn hàng nào. Giả sử bạn muốn chờ tối đa 30 giây trước khi bỏ cuộc, bạn có thể thay đổi receiveOrder() để truyền vào giá trị timeout là 30.000 mili giây như sau:
public TacoOrder receiveOrder() {
Message message = rabbit.receive("tacocloud.order.queue", 30000);
return message != null
? (TacoOrder) converter.fromMessage(message)
: null;
}Nếu bạn giống tôi, thấy việc hardcode một con số như vậy khá khó chịu. Bạn có thể nghĩ rằng nên tạo một lớp được đánh dấu @ConfigurationProperties để cấu hình timeout này qua thuộc tính Spring Boot. Tôi sẽ đồng ý với bạn — nếu Spring Boot không đã có sẵn thuộc tính cấu hình này. Để cấu hình timeout qua cấu hình, chỉ cần bỏ tham số timeout ra khỏi lời gọi receive() và đặt nó trong file cấu hình như sau:
spring:
rabbitmq:
template:
receive-timeout: 30000Quay lại phương thức receiveOrder(), bạn đã dùng bộ chuyển đổi từ RabbitTemplate để chuyển đổi đối tượng Message thành TacoOrder. Nhưng nếu RabbitTemplate đã có sẵn một MessageConverter, tại sao không để nó tự xử lý luôn? Đó chính là mục đích của phương thức receiveAndConvert(). Bạn có thể viết lại receiveOrder() như sau:
public TacoOrder receiveOrder() {
return (TacoOrder) rabbit.receiveAndConvert("tacocloud.order.queue");
}Đơn giản hơn nhiều, phải không? Điều duy nhất hơi phiền là việc ép kiểu từ Object sang TacoOrder. Có một cách tiếp cận khác an toàn hơn về kiểu dữ liệu: truyền vào một ParameterizedTypeReference để nhận trực tiếp TacoOrder như sau:
public TacoOrder receiveOrder() {
return rabbit.receiveAndConvert("tacocloud.order.queue",
new ParameterizedTypeReference<Order>() {});
}Việc này có tốt hơn ép kiểu không thì còn tùy, nhưng rõ ràng đây là cách an toàn hơn. Điều kiện duy nhất để sử dụng ParameterizedTypeReference với receiveAndConvert() là MessageConverter phải là một triển khai của SmartMessageConverter; Jackson2JsonMessageConverter là triển khai sẵn có duy nhất phù hợp.
Mô hình kéo tin nhắn do JmsTemplate cung cấp phù hợp với nhiều trường hợp, nhưng thường tốt hơn là viết code "lắng nghe" tin nhắn và tự động phản hồi khi chúng đến. Hãy xem cách viết các bean điều khiển theo sự kiện để xử lý tin nhắn RabbitMQ.
XỬ LÝ TIN NHẮN RABBITMQ VỚI LISTENER
Với các bean điều khiển theo sự kiện RabbitMQ, Spring cung cấp @RabbitListener, tương đương với @JmsListener. Để chỉ định rằng một phương thức nên được gọi khi có tin nhắn đến hàng đợi RabbitMQ, bạn chỉ cần đánh dấu phương thức đó với @RabbitListener.
Ví dụ sau đây minh họa một triển khai OrderReceiver dùng RabbitMQ, được đánh dấu để lắng nghe tin nhắn đơn hàng thay vì chủ động truy vấn bằng RabbitTemplate.
Listing 9.7 Khai báo một phương thức làm listener cho tin nhắn RabbitMQ
package tacos.kitchen.messaging.rabbit.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import tacos.TacoOrder;
import tacos.kitchen.KitchenUI;
@Component
public class OrderListener {
private KitchenUI ui;
@Autowired
public OrderListener(KitchenUI ui) {
this.ui = ui;
}
@RabbitListener(queues = "tacocloud.order.queue")
public void receiveOrder(TacoOrder order) {
ui.displayOrder(order);
}
}Bạn sẽ thấy rằng đoạn mã này rất giống với đoạn mã trong liệt kê 9.4. Thật vậy, điểm duy nhất thay đổi là annotation listener — từ @JmsListener sang @RabbitListener. Dù @RabbitListener rất hữu ích, sự giống nhau này khiến tôi không còn nhiều điều mới để nói. Cả hai đều tuyệt vời để viết code phản ứng với tin nhắn được đẩy tới từ broker tương ứng — một JMS broker đối với @JmsListener và RabbitMQ đối với @RabbitListener.
Dù đoạn trước có vẻ không mấy hào hứng với @RabbitListener, nhưng tôi không hề đánh giá thấp nó. Thật ra, việc @RabbitListener hoạt động giống như @JmsListener là điều rất đáng mừng! Điều đó có nghĩa là bạn không cần học lại một mô hình lập trình hoàn toàn mới khi làm việc với RabbitMQ so với Artemis hay ActiveMQ. Điều tương tự cũng đúng khi so sánh RabbitTemplate với JmsTemplate.
Hãy giữ nguyên sự phấn khởi đó khi chúng ta kết thúc chương này bằng cách khám phá thêm một tùy chọn nhắn tin khác mà Spring hỗ trợ: Apache Kafka.
