Skip to content

9.3.3 Writing Kafka listeners

Aside from the unique method signatures for send() and sendDefault(), KafkaTemplate differs from JmsTemplate and RabbitTemplate in that it doesn’t offer any methods for receiving messages. That means the only way to consume messages from a Kafka topic using Spring is to write a message listener.

For Kafka, message listeners are defined as methods that are annotated with @KafkaListener. The @KafkaListener annotation is roughly analogous to @JmsListener and @RabbitListener and is used in much the same way. The next listing shows what your listener-based order receiver might look like if written for Kafka.

Listing 9.9 Receiving orders with @KafkaListener

java
package tacos.kitchen.messaging.kafka.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import tacos.Order;
import tacos.kitchen.KitchenUI;

@Component
public class OrderListener {

  private KitchenUI ui;

  @Autowired
  public OrderListener(KitchenUI ui) {
    this.ui = ui;
  }

  @KafkaListener(topics="tacocloud.orders.topic")
  public void handle(TacoOrder order) {
    ui.displayOrder(order);
  }
}

The handle() method is annotated with @KafkaListener to indicate that it should be invoked when a message arrives in the topic named tacocloud.orders.topic. As it’s written in listing 9.9, only a TacoOrder (the payload) is given to handle(). But if you need additional metadata from the message, it can also accept a ConsumerRecord or Message object.

For example, the following implementation of handle() accepts a ConsumerRecord so that you can log the partition and timestamp of the message:

java
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(
    TacoOrder order, ConsumerRecord<String, TacoOrder> record) {
  log.info("Received from partition {} with timestamp {}",
        record.partition(), record.timestamp());

  ui.displayOrder(order);
}

Similarly, you could ask for a Message instead of a ConsumerRecord and achieve the same thing, as shown here:

java
@KafkaListener(topics="tacocloud.orders.topic")
public void handle(Order order, Message<Order> message) {
  MessageHeaders headers = message.getHeaders();
  log.info("Received from partition {} with timestamp {}",
    headers.get(KafkaHeaders.RECEIVED_PARTITION_ID),
    headers.get(KafkaHeaders.RECEIVED_TIMESTAMP));
  ui.displayOrder(order);
}

It’s worth noting that the message payload is also available via ConsumerRecord.value() or Message.getPayload(). This means that you could ask for the TacoOrder through those objects instead of asking for it directly as a parameter to handle().

Released under the MIT License.