Skip to content

9.2.2 Gửi tin nhắn với RabbitTemplate

Cốt lõi trong hỗ trợ của Spring đối với RabbitMQ là RabbitTemplate. RabbitTemplate tương tự như JmsTemplate và cung cấp một tập hợp các phương thức giống nhau. Tuy nhiên, như bạn sẽ thấy, có một vài khác biệt tinh tế phù hợp với cách hoạt động đặc trưng của RabbitMQ.

Về việc gửi tin nhắn với RabbitTemplate, các phương thức send()convertAndSend() tương ứng với các phương thức cùng tên trong JmsTemplate. Nhưng khác với JmsTemplate, vốn chỉ gửi tin nhắn đến một hàng đợi (queue) hoặc chủ đề (topic) cụ thể, các phương thức của RabbitTemplate gửi tin nhắn đến các exchange cùng với routing key. Dưới đây là một số phương thức tiêu biểu để gửi tin nhắn với RabbitTemplate:

java
// Send raw messages
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) 
                          throws AmqpException;

// Send messages converted from objects
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) 
                          throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;

// Send messages converted from objects with post-processing
void convertAndSend(Object message, MessagePostProcessor mPP) 
                          throws AmqpException;
void convertAndSend(String routingKey, Object message, 
                          MessagePostProcessor messagePostProcessor) 
                          throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message, 
                          MessagePostProcessor messagePostProcessor) 
                          throws AmqpException;

Như bạn thấy, các phương thức này theo mẫu giống như trong JmsTemplate. Ba phương thức send() đầu tiên gửi một đối tượng Message thô. Ba phương thức convertAndSend() tiếp theo nhận một đối tượng và tự động chuyển đổi thành Message trước khi gửi. Ba phương thức convertAndSend() cuối cùng cũng tương tự, nhưng có thêm tham số MessagePostProcessor để xử lý thêm Message trước khi gửi.

Các phương thức này khác với JmsTemplate ở chỗ chúng nhận các chuỗi (String) để xác định exchangerouting key, thay vì tên đích (destination) hoặc đối tượng Destination. Các phương thức không chỉ định exchange sẽ gửi tin nhắn đến exchange mặc định. Tương tự, nếu không chỉ định routing key, tin nhắn sẽ được gửi với routing key mặc định.

Bây giờ hãy sử dụng RabbitTemplate để gửi đơn đặt hàng taco. Một cách là dùng phương thức send() như trong liệt kê 9.5. Tuy nhiên, trước khi gọi send(), bạn cần chuyển đổi đối tượng TacoOrder thành một Message. Việc này có thể khá tẻ nhạt nếu không nhờ vào phương thức getMessageConverter()RabbitTemplate cung cấp.

Liệt kê 9.5 Gửi tin nhắn với RabbitTemplate.send()

java
package tacos.messaging;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import
    org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import tacos.Order;

@Service
public class RabbitOrderMessagingService
          implements OrderMessagingService {
  private RabbitTemplate rabbit;

  @Autowired
  public RabbitOrderMessagingService(RabbitTemplate rabbit) {
    this.rabbit = rabbit;
  }

  public void sendOrder(TacoOrder order) {
    MessageConverter converter = rabbit.getMessageConverter();
    MessageProperties props = new MessageProperties();
    Message message = converter.toMessage(order, props);
    rabbit.send("tacocloud.order", message);
  }
}

Khi đã có MessageConverter, việc chuyển TacoOrder thành Message rất đơn giản. Bạn có thể cung cấp các thuộc tính tin nhắn bằng một đối tượng MessageProperties, hoặc chỉ cần dùng một thể hiện mặc định nếu không cần cấu hình gì đặc biệt. Sau đó, chỉ cần gọi send(), truyền vào exchangerouting key (cả hai đều không bắt buộc). Trong ví dụ này, bạn chỉ định routing keytacocloud.order, vì vậy exchange mặc định sẽ được sử dụng.

Nói về exchange mặc định, tên của nó là "" (chuỗi rỗng), tương ứng với exchange mặc định được RabbitMQ tạo tự động. Tương tự, routing key mặc định là "" (việc định tuyến phụ thuộc vào cấu hình của exchange và các ràng buộc - bindings). Bạn có thể thay đổi mặc định này bằng cách cấu hình các thuộc tính spring.rabbitmq.template.exchangespring.rabbitmq.template.routing-key như sau:

yaml
spring:
  rabbitmq:
    template:
      exchange: tacocloud.orders
      routing-key: kitchens.central

Trong trường hợp này, tất cả tin nhắn được gửi mà không chỉ định exchange sẽ tự động được gửi đến exchange có tên là tacocloud.order. Nếu không chỉ định routing key trong lời gọi send() hoặc convertAndSend(), tin nhắn sẽ có routing keykitchens.central.

Tạo một đối tượng Message từ MessageConverter là đủ dễ, nhưng sẽ dễ hơn nữa nếu dùng convertAndSend() để để RabbitTemplate lo phần chuyển đổi cho bạn, như sau:

java
public void sendOrder(Order order) {
    rabbit.convertAndSend("tacocloud.order", order);
}

Theo mặc định, việc chuyển đổi tin nhắn được thực hiện với SimpleMessageConverter, có thể chuyển đổi các kiểu đơn giản (như String) và đối tượng Serializable thành Message. Tuy nhiên, Spring còn cung cấp nhiều bộ chuyển đổi khác cho RabbitTemplate, bao gồm:

  • Jackson2JsonMessageConverter —— Chuyển đổi đối tượng sang và từ JSON sử dụng thư viện Jackson 2
  • MarshallingMessageConverter —— Chuyển đổi sử dụng Spring MarshallerUnmarshaller
  • SerializerMessageConverter —— Chuyển đổi chuỗi và các đối tượng bất kỳ sử dụng abstraction Serializer/Deserializer của Spring
  • SimpleMessageConverter —— Chuyển đổi chuỗi, mảng byte, và các kiểu Serializable
  • ContentTypeDelegatingMessageConverter —— Ủy quyền việc chuyển đổi cho MessageConverter khác dựa trên header contentType
  • MessagingMessageConverter —— Ủy quyền việc chuyển đổi tin nhắn cho MessageConverter và xử lý header thông qua AmqpHeaderConverter

Nếu bạn muốn thay đổi bộ chuyển đổi, chỉ cần cấu hình một bean kiểu MessageConverter. Ví dụ, để chuyển đổi tin nhắn dưới dạng JSON, bạn có thể cấu hình Jackson2JsonMessageConverter như sau:

java
@Bean
public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
}

Spring Boot autoconfiguration sẽ tự động phát hiện bean này và chèn vào RabbitTemplate, thay thế bộ chuyển đổi mặc định.

CẤU HÌNH THUỘC TÍNH TIN NHẮN

Tương tự như JMS, bạn có thể cần đặt một số header trong tin nhắn. Ví dụ, bạn cần gửi một header X_ORDER_SOURCE cho tất cả đơn hàng gửi từ website Taco Cloud. Khi tự tạo Message, bạn có thể đặt header thông qua đối tượng MessageProperties dùng với MessageConverter. Quay lại phương thức sendOrder() từ liệt kê 9.5, bạn chỉ cần thêm một dòng để đặt header:

java
public void sendOrder(TacoOrder order) {
  MessageConverter converter = rabbit.getMessageConverter();
  MessageProperties props = new MessageProperties();
  props.setHeader("X_ORDER_SOURCE", "WEB");
  Message message = converter.toMessage(order, props);
  rabbit.send("tacocloud.order", message);
}

Khi dùng convertAndSend(), bạn không thể truy cập trực tiếp vào MessageProperties. Tuy nhiên, MessagePostProcessor có thể giúp bạn, như ví dụ dưới đây:

java
public void sendOrder(TacoOrder order) {
  rabbit.convertAndSend("tacocloud.order.queue", order,
    new MessagePostProcessor() {
      @Override
      public Message postProcessMessage(Message message)
                  throws AmqpException {
        MessageProperties props = message.getMessageProperties();
        props.setHeader("X_ORDER_SOURCE", "WEB");
        return message;
      }
  });
}

Ở đây, bạn truyền cho convertAndSend() một lớp ẩn danh thực thi MessagePostProcessor. Trong phương thức postProcessMessage(), bạn lấy MessageProperties từ Message và gọi setHeader() để đặt giá trị cho header X_ORDER_SOURCE.

Bây giờ bạn đã biết cách gửi tin nhắn bằng RabbitTemplate, hãy chuyển sang phần nhận tin nhắn từ hàng đợi RabbitMQ.

Released under the MIT License.