Skip to content

14.2.2 Handling request-stream messaging

Not all interactions feature a single request and a single response. In a stock quote scenario, for example, it may be useful to request a stream of stock quotes for a given stock symbol. In a request-response model, the client would need to repeatedly poll for the current stock price. But in a request-stream model, the client need ask for the stock price only once and then subscribe to a stream of periodic updates.

To illustrate the request-stream model, let’s implement the server and client for the stock quote scenario. First, we’ll need to define an object that can carry the stock quote information. The StockQuote class in the next listing will serve this purpose.

Listing 14.4 A model class representing a stock quote

java
package rsocket;
import java.math.BigDecimal;
import java.time.Instant;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class StockQuote {
    private String symbol;
    private BigDecimal price;
    private Instant timestamp;
}

As you can see, a StockQuote carries the stock symbol, the price, and a timestamp that the price was valid. For brevity’s sake, we’re using Lombok to help with constructors and accessor methods.

Now let’s write a controller to handle requests for stock quotes. You’ll find that the StockQuoteController in the next snippet is quite similar to the GreetingController from the previous section.

Listing 14.5 An RSocket controller to stream stock quotes

java
package rsocket;
import java.math.BigDecimal;
import java.time.Duration;
import java.time.Instant;

import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;

import reactor.core.publisher.Flux;

@Controller
public class StockQuoteController {
  
  @MessageMapping("stock/{symbol}")
  public Flux<StockQuote> getStockPrice(
      @DestinationVariable("symbol") String symbol) {
    return Flux
      .interval(Duration.ofSeconds(1))
      .map(i -> {
        BigDecimal price = BigDecimal.valueOf(Math.random() * 10);
      return new StockQuote(symbol, price, Instant.now());
    });
  }
}

Here, the getStockPrice() method handles incoming requests on the "stock/{symbol}" route, accepting the stock symbol from the route with the @DestinationVariable annotation. For simplicity’s sake, rather than look up actual stock prices, the price is calculated as a random value (which may or may not accurately model the volatility of some actual stocks).

What’s most notable about getStockPrice(), however, is that it returns a Flux<StockQuote> instead of a Mono<StockQuote>. This is a clue to Spring that this handler method supports the request-stream model. Internally, the Flux is created initially as an interval that fires every one second, but that Flux is mapped to another Flux that produces the random StockQuote. Put simply, a single request handled by the getStockPrice() method returns multiple values, once every second.

A client of a request-stream service is similar to one for a request-response service. The only key difference is that instead of calling retrieveMono() on the requester, it should call retreiveFlux(). The client of the stock quote service might look like this:

java
String stockSymbol = "XYZ";

RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
tcp
  .route("stock/{symbol}", stockSymbol)
  .retrieveFlux(StockQuote.class)
  .doOnNext(stockQuote -> {
    log.info(
        "Price of " + stockQuote.getSymbol() +
        " : " + stockQuote.getPrice() +
        " (at " + stockQuote.getTimestamp() + ")");
  })
  .subscribe();

At this point, we’ve seen how to create RSocket servers and clients that handle single and multiple responses. But what if the server doesn’t have a response to send or the client doesn’t need a response? Let’s see how to deal with the fire-and-forget communication model.

Released under the MIT License.