14.2.4 Gửi tin nhắn hai chiều
Trong tất cả các mô hình giao tiếp mà chúng ta đã thấy cho đến nay, phía client gửi một yêu cầu đơn, và server phản hồi với không, một hoặc nhiều phản hồi. Trong mô hình request-stream, server có thể stream lại nhiều phản hồi đến client, nhưng client vẫn bị giới hạn chỉ gửi một yêu cầu duy nhất. Nhưng tại sao server lại được hưởng toàn bộ "niềm vui"? Tại sao client không thể gửi nhiều yêu cầu?
Đó là lúc mô hình giao tiếp channel phát huy tác dụng. Trong mô hình channel, client có thể stream nhiều yêu cầu tới server, và server cũng có thể stream nhiều phản hồi lại, tạo nên một cuộc trò chuyện hai chiều giữa cả hai phía. Đây là mô hình linh hoạt nhất của RSocket, mặc dù cũng là phức tạp nhất.
Để minh họa cách làm việc với giao tiếp RSocket channel trong Spring, chúng ta sẽ tạo một dịch vụ tính tiền boa trên hóa đơn, nhận một Flux yêu cầu và phản hồi bằng một Flux phản hồi. Trước tiên, chúng ta cần định nghĩa các đối tượng mô hình đại diện cho yêu cầu và phản hồi. Lớp GratuityIn bên dưới đại diện cho yêu cầu được gửi bởi client và nhận bởi server.
Danh sách 14.8 Một mô hình đại diện cho yêu cầu boa đầu vào
package rsocket;
import java.math.BigDecimal;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class GratuityIn {
private BigDecimal billTotal;
private int percent;
}GratuityIn chứa hai thông tin quan trọng để tính tiền boa: tổng hóa đơn và phần trăm boa. Lớp GratuityOut được trình bày tiếp theo đại diện cho phản hồi, lặp lại các giá trị trong GratuityIn, cùng với thuộc tính gratuity chứa số tiền boa được tính toán.
Danh sách 14.9 Một mô hình đại diện cho phản hồi boa đầu ra
package rsocket;
import java.math.BigDecimal;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class GratuityOut {
private BigDecimal billTotal;
private int percent;
private BigDecimal gratuity;
}GratuityController trong danh sách mã tiếp theo xử lý yêu cầu tính boa và trông khá giống với các controller mà chúng ta đã viết ở phần trước trong chương này.
Danh sách 14.10 Một RSocket controller xử lý nhiều tin nhắn trên channel
package rsocket;
import java.math.BigDecimal;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Controller
@Slf4j
public class GratuityController {
@MessageMapping("gratuity")
public Flux<GratuityOut> calculate(Flux<GratuityIn> gratuityInFlux) {
return gratuityInFlux
.doOnNext(in -> log.info("Calculating gratuity: " + in))
.map(in -> {
double percentAsDecimal = in.getPercent() / 100.0;
BigDecimal gratuity = in.getBillTotal()
.multiply(BigDecimal.valueOf(percentAsDecimal));
return new GratuityOut(in.getBillTotal(), in.getPercent(), gratuity);
});
}
}Tuy nhiên, có một điểm khác biệt đáng kể giữa ví dụ này và các ví dụ trước đó: không chỉ trả về một Flux, đoạn mã này còn chấp nhận một Flux làm đầu vào. Tương tự như mô hình request-stream, Flux trả về cho phép controller stream nhiều giá trị đến client. Nhưng tham số Flux đầu vào mới là điểm khác biệt chính giữa mô hình channel và mô hình request-stream. Tham số Flux này cho phép controller xử lý một luồng các yêu cầu từ client đi vào phương thức xử lý.
Phía client của mô hình channel chỉ khác với phía client của mô hình request-stream ở chỗ nó gửi một Flux<GratuityIn> đến server thay vì Mono<GratuityIn>, như thể hiện dưới đây.
Danh sách 14.11 Một client gửi và nhận nhiều tin nhắn qua một channel mở
RSocketRequester tcp = requesterBuilder.tcp("localhost", 7000);
Flux<GratuityIn> gratuityInFlux =
Flux.fromArray(new GratuityIn[] {
new GratuityIn(new BigDecimal(35.50), 18),
new GratuityIn(new BigDecimal(10.00), 15),
new GratuityIn(new BigDecimal(23.25), 20),
new GratuityIn(new BigDecimal(52.75), 18),
new GratuityIn(new BigDecimal(80.00), 15)
})
.delayElements(Duration.ofSeconds(1));
tcp
.route("gratuity")
.data(gratuityInFlux)
.retrieveFlux(GratuityOut.class)
.subscribe(out ->
log.info(out.getPercent() + "% gratuity on "
+ out.getBillTotal() + " is "
+ out.getGratuity()));Trong trường hợp này, Flux<GratuityIn> được tạo tĩnh bằng phương thức fromArray(), nhưng nó cũng có thể là một Flux được tạo từ bất kỳ nguồn dữ liệu nào, chẳng hạn dữ liệu được lấy từ repository phản ứng.
Bạn có thể đã nhận ra một quy luật trong cách các kiểu phản ứng được chấp nhận và trả về bởi các phương thức xử lý controller phía server quyết định mô hình giao tiếp RSocket được hỗ trợ. Bảng 14.1 tóm tắt mối quan hệ giữa kiểu đầu vào/đầu ra của server và các mô hình giao tiếp của RSocket.
Bảng 14.1 Mô hình RSocket được hỗ trợ được xác định bởi tham số và kiểu trả về của phương thức xử lý
| RSocket model | Handler paramemter | Handler returns |
|---|---|---|
| Request-response | Mono | Mono |
| Request-stream | Mono | Flux |
| Fire-and-forget | Mono | Mono<Void> |
| Channel | Flux | Flux |
Bạn có thể thắc mắc liệu server có thể chấp nhận một Flux và trả về một Mono không. Câu trả lời ngắn gọn là: không thể. Dù bạn có thể tưởng tượng việc xử lý nhiều yêu cầu từ một Flux đầu vào và phản hồi bằng Mono<Void> như một sự pha trộn kỳ lạ giữa mô hình channel và fire-and-forget, thì không có mô hình RSocket nào tương ứng với kịch bản đó. Vì vậy, nó không được hỗ trợ.
