14.2.4 Sending messages bidirectionally
In all of the communication models we’ve seen thus far, the client sends a single request, and the server responds with zero, one, or many responses. In the requeststream model, the server was able to stream back multiple responses to the client, but the client was still limited to sending only a single request. But why should the server have all of the fun? Why can’t the client send multiple requests?
That’s where the channel communication model comes in handy. In the channel communication model, the client can stream multiple requests to the server, which may also stream back multiple responses in a bidirectional conversation between both sides. It’s the most flexible of RSocket’s communication models, although also the most complex.
To demonstrate how to work with RSocket channel communication in Spring, let’s create a service that calculates gratuity on a bill, receiving a Flux of requests and responding with a Flux of responses. First, we’ll need to define the model objects that represent the request and the response. The GratuityIn class, shown next, represents the request sent by the client and received by the server.
Listing 14.8 A model representing an inbound gratuity request
package rsocket;
import java.math.BigDecimal;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class GratuityIn {
private BigDecimal billTotal;
private int percent;
}GratuityIn carries two essential pieces of information required to calculate gratuity: the bill total and a percentage. The GratuityOut class shown in the next code snippet represents the response, echoing the values given in GratuityIn, along with a gratuity property containing the calculated gratuity amount.
Listing 14.9 A model representing an outbound gratuity response
package rsocket;
import java.math.BigDecimal;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class GratuityOut {
private BigDecimal billTotal;
private int percent;
private BigDecimal gratuity;
}The GratuityController in the next code listing handles the gratuity request and looks a lot like the controllers we’ve written earlier in this chapter.
Listing 14.10 An RSocket controller that handles multiple messages on a channel
package rsocket;
import java.math.BigDecimal;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Controller
@Slf4j
public class GratuityController {
@MessageMapping("gratuity")
public Flux<GratuityOut> calculate(Flux<GratuityIn> gratuityInFlux) {
return gratuityInFlux
.doOnNext(in -> log.info("Calculating gratuity: " + in))
.map(in -> {
double percentAsDecimal = in.getPercent() / 100.0;
BigDecimal gratuity = in.getBillTotal()
.multiply(BigDecimal.valueOf(percentAsDecimal));
return new GratuityOut(in.getBillTotal(), in.getPercent(), gratuity);
});
}
}There is, however, one significant difference between the previous example and the earlier ones: not only does this code return a Flux, but it also accepts a Flux as input. As with the request-stream model, the Flux returned enables the controller to stream multiple values to the client. But the Flux parameter is what differentiates the channel model from the request-stream model. The Flux parameter coming in allows the controller to handle a stream of requests from the client coming into the handler method.
The client side of the channel model differs from the client of the requeststream model only in that it sends a Flux<GratuityIn> to the server instead of a Mono<GratuityIn>, as shown here.
Listing 14.11 A client that sends and receives multiple messages over an open channel
RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
Flux<GratuityIn> gratuityInFlux =
Flux.fromArray(new GratuityIn[] {
new GratuityIn(new BigDecimal(35.50), 18),
new GratuityIn(new BigDecimal(10.00), 15),
new GratuityIn(new BigDecimal(23.25), 20),
new GratuityIn(new BigDecimal(52.75), 18),
new GratuityIn(new BigDecimal(80.00), 15)
})
.delayElements(Duration.ofSeconds(1));
tcp
.route("gratuity")
.data(gratuityInFlux)
.retrieveFlux(GratuityOut.class)
.subscribe(out ->
log.info(out.getPercent() + "% gratuity on "
+ out.getBillTotal() + " is "
+ out.getGratuity()));In this case, the Flux<GratuityIn> is created statically using the fromArray() method, but it could be a Flux created from any source of data, perhaps retrieved from a reactive data repository.
You may have observed a pattern in how the reactive types accepted and returned by the server controller’s handler methods determine the RSocket communication model supported. Table 14.1 summarizes the relationship between the server’s input/output types and the RSocket communication models.
Table 14.1 The supported RSocket model is determined by the handler method’s parameter and return types.
| RSocket model | Handler paramemter | Handler returns |
|---|---|---|
| Request-response | Mono | Mono |
| Request-stream | Mono | Flux |
| Fire-and-forget | Mono | Mono<Void> |
| Channel | Flux | Flux |
You may wonder whether it’s possible for a server to accept a Flux and return a Mono. In short, that’s not an option. Although you may imagine handling multiple requests on an incoming Flux and responding with a Mono<Void> in a weird mashup of the channel and fire-and-forget models, there is no RSocket model that maps to that scenario. Therefore, it’s not supported.
