Skip to content

10.2.1 Configuring integration flows in Java

Message channels are the means by which messages move through an integration pipeline, as shown in figure 10.2. They’re the pipes that connect all the other parts of Spring Integration plumbing together.

Figure 10.2 Message channels are conduits through which data flows between other components in an integration flow.

Spring Integration provides several channel implementations, including the following:

  • PublishSubscribeChannel —— Messages published into a PublishSubscribeChannel are passed on to one or more consumers. If multiple consumers exist, all of them receive the message.
  • QueueChannel —— Messages published into a QueueChannel are stored in a queue until pulled by a consumer in a first in, first out (FIFO) fashion. If multiple consumers exist, only one of them receives the message.
  • PriorityChannel —— Like QueueChannel but, rather than FIFO behavior, messages are pulled by consumers based on the message priority header.
  • RendezvousChannel —— Like QueueChannel except that the sender blocks the channel until a consumer receives the message, effectively synchronizing the sender with the consumer.
  • DirectChannel —— Like PublishSubscribeChannel, but sends a message to a single consumer by invoking the consumer in the same thread as the sender. This allows for transactions to span across the channel.
  • ExecutorChannel —— Similar to DirectChannel, but the message dispatch occurs via a TaskExecutor, taking place in a thread separate from the sender. This channel type doesn’t support transactions that span the channel.
  • FluxMessageChannel —— A Reactive Streams Publisher message channel based on Project Reactor’s Flux. (We’ll talk more about Reactive Streams, Reactor, and Flux in chapter 11.)

In both the Java configuration and Java DSL styles, input channels are automatically created, with DirectChannel as the default. But if you want to use a different channel implementation, you’ll need to explicitly declare the channel as a bean and reference it in the integration flow. For example, to declare a PublishSubscribeChannel, you’d declare the following @Bean method:

java
@Bean
public MessageChannel orderChannel() {
  return new PublishSubscribeChannel();
}

Then you’d reference this channel by name in the integration flow definition. For example, if the channel were being consumed by a service activator bean, you’d reference it in the inputChannel attribute of @ServiceActivator like so:

java
@ServiceActovator(inputChannel="orderChannel")

Or, if you’re using the Java DSL configuration style, you’d reference it with a call to channel() as follows:

java
@Bean
public IntegrationFlow orderFlow() {
  return IntegrationFlows
    ...
    .channel("orderChannel")
    ...
    .get();
}

It’s important to note that if you’re using QueueChannel, the consumers must be configured with a poller. For instance, suppose that you’ve declared a QueueChannel bean like this:

java
@Bean
public MessageChannel orderChannel() {
    return new QueueChannel();
}

You’d need to make sure that the consumer is configured to poll the channel for messages. In the case of a service activator, the @ServiceActivator annotation might look like this:

java
@ServiceActivator(inputChannel="orderChannel",
        poller=@Poller(fixedRate="1000"))

In this example, the service activator polls from the channel named orderChannel every 1 second (or 1,000 ms).

Released under the MIT License.