Skip to content

9.3.3 Viết Kafka listeners

Ngoài các chữ ký phương thức đặc biệt cho send()sendDefault(), KafkaTemplate khác với JmsTemplateRabbitTemplate ở chỗ nó không cung cấp bất kỳ phương thức nào để nhận tin nhắn. Điều đó có nghĩa là cách duy nhất để tiêu thụ các tin nhắn từ một topic Kafka trong Spring là viết một listener (trình lắng nghe) cho tin nhắn.

Đối với Kafka, các listener được định nghĩa là các phương thức được chú thích với @KafkaListener. Chú thích @KafkaListener tương tự với @JmsListener@RabbitListener và được sử dụng theo cách gần giống nhau. Đoạn mã tiếp theo cho thấy cách bạn có thể viết một listener để nhận đơn hàng nếu sử dụng Kafka.

Liệt kê 9.9 Nhận đơn hàng với @KafkaListener

java
package tacos.kitchen.messaging.kafka.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import tacos.Order;
import tacos.kitchen.KitchenUI;

@Component
public class OrderListener {

  private KitchenUI ui;

  @Autowired
  public OrderListener(KitchenUI ui) {
    this.ui = ui;
  }

  @KafkaListener(topics="tacocloud.orders.topic")
  public void handle(TacoOrder order) {
    ui.displayOrder(order);
  }
}

Phương thức handle() được chú thích với @KafkaListener để chỉ ra rằng nó sẽ được gọi khi một tin nhắn đến trong topic có tên tacocloud.orders.topic. Như được viết trong liệt kê 9.9, chỉ có một TacoOrder (phần payload) được truyền vào handle(). Nhưng nếu bạn cần thêm siêu dữ liệu (metadata) từ tin nhắn, bạn cũng có thể chấp nhận một đối tượng ConsumerRecord hoặc Message.

Ví dụ, đoạn triển khai handle() dưới đây chấp nhận một ConsumerRecord để bạn có thể ghi log phân vùng và timestamp của tin nhắn:

java
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(
    TacoOrder order, ConsumerRecord<String, TacoOrder> record) {
  log.info("Received from partition {} with timestamp {}",
        record.partition(), record.timestamp());

  ui.displayOrder(order);
}

Tương tự, bạn có thể yêu cầu một Message thay vì ConsumerRecord và đạt được cùng mục đích, như được hiển thị dưới đây:

java
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, Message<Order> message) {
  MessageHeaders headers = message.getHeaders();
  log.info("Received from partition {} with timestamp {}",
    headers.get(KafkaHeaders.RECEIVED_PARTITION_ID),
    headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
  ui.displayOrder(order);
}

Đáng chú ý là phần payload của tin nhắn cũng có thể được truy cập thông qua ConsumerRecord.value() hoặc Message.getPayload(). Điều này có nghĩa là bạn có thể nhận TacoOrder thông qua các đối tượng đó thay vì nhận trực tiếp như một tham số trong handle().

Released under the MIT License.