9.3.2 Gửi tin nhắn với KafkaTemplate
Ở nhiều khía cạnh, KafkaTemplate tương tự như các đối tượng tương ứng của JMS và RabbitMQ. Tuy nhiên, đồng thời nó cũng có nhiều điểm khác biệt. Điều này sẽ trở nên rõ ràng khi bạn xem xét các phương thức dùng để gửi tin nhắn, như được mô tả dưới đây:
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic,
Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic,
Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition,
K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition,
Long timestamp, K key, V data);Điều đầu tiên bạn có thể nhận thấy là không có phương thức convertAndSend(). Đó là vì KafkaTemplate được định kiểu bằng generics và có thể xử lý trực tiếp các kiểu domain khi gửi tin nhắn. Theo một cách nào đó, tất cả các phương thức send() đều thực hiện vai trò của convertAndSend().
Bạn cũng sẽ nhận thấy rằng có một số tham số trong send() và sendDefault() khác khá nhiều so với những gì bạn đã sử dụng trong JMS và Rabbit. Khi gửi tin nhắn trong Kafka, bạn có thể chỉ định các tham số sau để kiểm soát cách tin nhắn được gửi:
- Tên topic để gửi tin nhắn đến (bắt buộc đối với
send()) - Partition mà bạn muốn ghi tin nhắn vào (không bắt buộc)
- Key để đính kèm trong bản ghi (không bắt buộc)
- Timestamp (không bắt buộc; mặc định là
System.currentTimeMillis()) - Payload (bắt buộc)
Topic và payload là hai tham số quan trọng nhất. Partition và key không ảnh hưởng nhiều đến cách bạn sử dụng KafkaTemplate, ngoại trừ việc đó là thông tin bổ sung được cung cấp như các tham số cho send() và sendDefault(). Trong phạm vi tài liệu này, chúng ta sẽ tập trung vào việc gửi payload tới một topic nhất định mà không cần lo lắng về partition hay key.
Đối với phương thức send(), bạn cũng có thể chọn gửi một ProducerRecord, về cơ bản là một kiểu dữ liệu gói gọn tất cả các tham số nói trên trong một đối tượng. Bạn cũng có thể gửi một đối tượng Message, nhưng làm như vậy sẽ đòi hỏi bạn phải chuyển đổi các đối tượng domain của mình thành Message. Thông thường, sẽ dễ dàng hơn nếu bạn dùng một trong các phương thức khác thay vì tạo và gửi một ProducerRecord hoặc đối tượng Message.
Sử dụng KafkaTemplate và phương thức send() của nó, bạn có thể viết một phiên bản triển khai của OrderMessagingService dựa trên Kafka. Đoạn mã dưới đây minh họa cách triển khai đó:
Listing 9.8 Gửi đơn hàng bằng KafkaTemplate
package tacos.messaging;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import tacos.TacoOrder;
@Service
public class KafkaOrderMessagingService
implements OrderMessagingService {
private KafkaTemplate<String, TacoOrder> kafkaTemplate;
@Autowired
public KafkaOrderMessagingService(
KafkaTemplate<String, TacoOrder> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void sendOrder(TacoOrder order) {
kafkaTemplate.send("tacocloud.orders.topic", order);
}
}Trong phiên bản mới này của OrderMessagingService, phương thức sendOrder() sử dụng phương thức send() của KafkaTemplate đã được inject để gửi một đối tượng TacoOrder tới topic có tên tacocloud.orders.topic. Ngoại trừ từ "Kafka" được lặp lại trong mã, phần code này không khác gì so với phần bạn đã viết cho JMS và Rabbit. Và cũng giống như các phiên bản khác của OrderMessagingService, nó có thể được inject vào OrderApiController và được dùng để gửi đơn hàng qua Kafka khi người dùng gửi đơn hàng qua endpoint /api/orders.
Cho đến khi bạn tạo một phiên bản Kafka của trình nhận tin nhắn, bạn sẽ cần một công cụ console để xem nội dung đã gửi. Có một số công cụ quản lý Kafka rất phổ biến, bao gồm Offset Explorer (trước đây là Kafka Tool) và Confluent’s Apache Kafka UI
Nếu bạn thiết lập một topic mặc định, bạn có thể đơn giản hóa phương thức sendOrder() một chút. Trước tiên, thiết lập topic mặc định của bạn là tacocloud.orders.topic bằng cách thêm cấu hình sau vào file cấu hình:
spring:
kafka:
bootstrap-servers:
- localhost:9092
template:
default-topic: tacocloud.orders.topicSau đó, trong phương thức sendOrder(), bạn có thể gọi sendDefault() thay vì send() và không cần chỉ định tên topic, như sau:
@Override
public void sendOrder(TacoOrder order) {
kafkaTemplate.sendDefault(order);
}Giờ đây bạn đã viết xong phần gửi tin nhắn, hãy chuyển sự chú ý sang việc viết mã để nhận các tin nhắn đó từ Kafka.
