Skip to content

9.3.2 Sending messages with KafkaTemplate

In many ways, KafkaTemplate is similar to its JMS and RabbitMQ counterparts. At the same time, however, it’s very different. This becomes apparent as we consider its methods for sending messages, as shown here:

java
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, 
                Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, 
                Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition,
                         K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition,
                   Long timestamp, K key, V data);

The first thing you may have noticed is that there are no convertAndSend() methods. That’s because KafkaTemplate is typed with generics and is able to deal with domain types directly when sending messages. In a way, all of the send() methods are doing the job of convertAndSend().

You may also have noticed that there are several parameters to send() and sendDefault() that are quite different from what you used with JMS and Rabbit. When sending messages in Kafka, you can specify the following parameters to guide how the message is sent as follows:

  • The topic to which to send the message (required for send())
  • A partition to which to write the topic (optional)
  • A key to send on the record (optional)
  • A timestamp (optional; defaults to System.currentTimeMillis())
  • The payload (required)

The topic and payload are the two most important parameters. Partitions and keys have little effect on how you use KafkaTemplate, aside from being extra information provided as parameters to send() and sendDefault(). For our purposes, we’re going to focus on sending the message payload to a given topic and not worry ourselves with partitions and keys.

For the send() method, you can also choose to send a ProducerRecord, which is little more than a type that captures all of the preceding parameters in a single object. You can also send a Message object, but doing so would require you to convert your domain objects into a Message. Generally, it’s easier to use one of the other methods rather than to create and send a ProducerRecord or Message object.

Using the KafkaTemplate and its send() method, you can write a Kafka-based implementation of OrderMessagingService. The following listing shows what such an implementation might look like.

Listing 9.8 Sending orders with KafkaTemplate

java
package tacos.messaging;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import tacos.TacoOrder;

@Service
public class KafkaOrderMessagingService
                  implements OrderMessagingService {

  private KafkaTemplate<String, TacoOrder> kafkaTemplate;

  @Autowired
  public KafkaOrderMessagingService(
        KafkaTemplate<String, TacoOrder> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
  }

  @Override
  public void sendOrder(TacoOrder order) {
    kafkaTemplate.send("tacocloud.orders.topic", order);
  }
}

In this new implementation of OrderMessagingService, the sendOrder() method uses the send() method of the injected KafkaTemplate to send a TacoOrder to the topic named tacocloud.orders.topic. Except for the word “Kafka” scattered throughout the code, this isn’t much different than the code you wrote for JMS and Rabbit. And, just like those other implementations of OrderMessagingService, it can be injected into OrderApiController and used to send orders through Kafka when orders are placed via the /api/orders endpoint.

Until we create a Kafka implementation of the message receiver, you’ll need a console to view what was sent. There are several management consoles available for Kafka, including Offset Explorer ( https://www.kafkatool.com/ ) and Confluent’s Apache Kafka UI ( https://www.confluent.io/product/confluent-platform/gui-driven-management-and-monitoring/ ).

If you set a default topic, you can simplify the sendOrder() method slightly. First, set your default topic to tacocloud.orders.topic by setting the spring.kafka.template.default-topic property as follows:

yaml
spring:
  kafka:
    bootstrap-servers:
    - localhost:9092
    template:
      default-topic: tacocloud.orders.topic

Then, in the sendOrder() method, you can call sendDefault() instead of send() and not specify the topic name, as shown here:

java
@Override
public void sendOrder(TacoOrder order) {
  kafkaTemplate.sendDefault(order);
}

Now that your message-sending code has been written, let’s turn our attention to writing code that will receive those messages from Kafka.

Released under the MIT License.