Skip to content

14.2.2 Xử lý nhắn tin theo mô hình request-stream

Không phải tất cả các tương tác đều có một yêu cầu và một phản hồi duy nhất. Ví dụ, trong một kịch bản báo giá cổ phiếu, có thể hữu ích khi yêu cầu một luồng báo giá liên tục cho một mã cổ phiếu cụ thể. Trong mô hình request-response, phía client sẽ cần liên tục gửi yêu cầu để lấy giá hiện tại. Nhưng trong mô hình request-stream, phía client chỉ cần yêu cầu giá một lần và sau đó đăng ký (subscribe) để nhận các cập nhật định kỳ.

Để minh họa cho mô hình request-stream, chúng ta sẽ triển khai server và client cho kịch bản báo giá cổ phiếu. Đầu tiên, chúng ta cần định nghĩa một đối tượng có thể mang thông tin báo giá cổ phiếu. Lớp StockQuote trong đoạn mã dưới đây sẽ đảm nhận vai trò này.

Liệt kê 14.4 Một lớp mô hình đại diện cho báo giá cổ phiếu

java
package rsocket;
import java.math.BigDecimal;
import java.time.Instant;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class StockQuote {
    private String symbol;
    private BigDecimal price;
    private Instant timestamp;
}

Như bạn có thể thấy, một StockQuote mang theo mã cổ phiếu, giá và thời điểm mà giá đó có hiệu lực. Để đơn giản, chúng ta sử dụng Lombok để hỗ trợ việc tạo constructor và các phương thức truy cập.

Tiếp theo, hãy viết một controller để xử lý các yêu cầu báo giá cổ phiếu. Bạn sẽ thấy rằng StockQuoteController trong đoạn mã sau khá giống với GreetingController ở phần trước.

Liệt kê 14.5 Một RSocket controller để phát luồng báo giá cổ phiếu

java
package rsocket;
import java.math.BigDecimal;
import java.time.Duration;
import java.time.Instant;

import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;

import reactor.core.publisher.Flux;

@Controller
public class StockQuoteController {
  
  @MessageMapping("stock/{symbol}")
  public Flux<StockQuote> getStockPrice(
      @DestinationVariable("symbol") String symbol) {
    return Flux
      .interval(Duration.ofSeconds(1))
      .map(i -> {
        BigDecimal price = BigDecimal.valueOf(Math.random() * 10);
      return new StockQuote(symbol, price, Instant.now());
    });
  }
}

Ở đây, phương thức getStockPrice() xử lý các yêu cầu đến tại route "stock/{symbol}", nhận mã cổ phiếu từ route với annotation @DestinationVariable. Để đơn giản, thay vì tra cứu giá cổ phiếu thực tế, giá sẽ được tính ngẫu nhiên (có thể đúng hoặc không đúng với sự biến động thực tế của một số cổ phiếu).

Điều đáng chú ý nhất ở getStockPrice() là nó trả về một Flux<StockQuote> thay vì Mono<StockQuote>. Đây là tín hiệu cho Spring biết rằng phương thức xử lý này hỗ trợ mô hình request-stream. Nội tại, Flux được tạo ra như một luồng định kỳ phát sự kiện mỗi giây, và sau đó được ánh xạ thành một Flux khác tạo ra các StockQuote ngẫu nhiên. Nói đơn giản, một yêu cầu duy nhất được xử lý bởi phương thức getStockPrice() sẽ trả về nhiều giá trị, mỗi giây một lần.

Client của một dịch vụ request-stream khá giống với client của một dịch vụ request-response. Sự khác biệt chính là thay vì gọi retrieveMono() trên requester, nó sẽ gọi retrieveFlux(). Client của dịch vụ báo giá cổ phiếu có thể trông như sau:

java
String stockSymbol = "XYZ";

RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
tcp
  .route("stock/{symbol}", stockSymbol)
  .retrieveFlux(StockQuote.class)
  .doOnNext(stockQuote -> {
    log.info(
        "Price of " + stockQuote.getSymbol() +
        " : " + stockQuote.getPrice() +
        " (at " + stockQuote.getTimestamp() + ")");
  })
  .subscribe();

Tại thời điểm này, chúng ta đã thấy cách tạo RSocket server và client xử lý các phản hồi đơn và nhiều phản hồi. Nhưng nếu server không có phản hồi nào để gửi hoặc client không cần phản hồi thì sao? Hãy xem cách xử lý mô hình giao tiếp fire-and-forget.

Released under the MIT License.