11.1.1 Định nghĩa Reactive Streams
Reactive Streams là một sáng kiến được khởi xướng vào cuối năm 2013 bởi các kỹ sư đến từ Netflix, Lightbend và Pivotal (công ty đứng sau Spring). Reactive Streams hướng đến việc cung cấp một tiêu chuẩn cho xử lý luồng dữ liệu bất đồng bộ với cơ chế phản hồi áp lực (nonblocking backpressure).
Chúng ta đã đề cập đến tính chất bất đồng bộ của lập trình phản ứng; đây chính là yếu tố cho phép thực hiện các tác vụ song song để đạt được khả năng mở rộng cao hơn. Backpressure là cơ chế mà qua đó các thành phần tiêu thụ dữ liệu có thể tránh bị quá tải bởi các nguồn dữ liệu phát ra quá nhanh, bằng cách thiết lập giới hạn về lượng dữ liệu mà chúng sẵn sàng xử lý.
Java Streams vs Reactive Streams
Có nhiều điểm tương đồng giữa Java streams và Reactive Streams. Đầu tiên, cả hai đều có từ "streams" trong tên. Cả hai cũng đều cung cấp API theo phong cách hàm để làm việc với dữ liệu. Thực tế, như bạn sẽ thấy sau khi tìm hiểu về Reactor, chúng thậm chí chia sẻ nhiều thao tác giống nhau.
Tuy nhiên, Java streams thường là đồng bộ và làm việc với tập dữ liệu hữu hạn. Chúng về cơ bản là một cách để lặp qua một tập hợp với các hàm.
Reactive Streams hỗ trợ xử lý bất đồng bộ với tập dữ liệu có kích thước bất kỳ, kể cả vô hạn. Chúng xử lý dữ liệu theo thời gian thực, khi dữ liệu sẵn có, với cơ chế backpressure để tránh làm quá tải bộ tiêu thụ.
Mặt khác, API Flow trong JDK 9 tương ứng với Reactive Streams. Các kiểu
Flow.Publisher,Flow.Subscriber,Flow.Subscription, vàFlow.Processortrong JDK 9 ánh xạ trực tiếp tớiPublisher,Subscriber,Subscription, vàProcessortrong Reactive Streams. Tuy nhiên, API Flow của JDK 9 không phải là một hiện thực chính thức của Reactive Streams.
Đặc tả Reactive Streams có thể được tóm gọn bằng bốn giao diện: Publisher, Subscriber, Subscription, và Processor. Một Publisher tạo ra dữ liệu và gửi dữ liệu đó đến Subscriber thông qua một Subscription. Giao diện Publisher khai báo một phương thức duy nhất là subscribe(), qua đó một Subscriber có thể đăng ký với Publisher, như minh họa sau:
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}Sau khi một Subscriber đã đăng ký, nó có thể nhận các sự kiện từ Publisher. Những sự kiện này được gửi thông qua các phương thức trong giao diện Subscriber như sau:
public interface Subscriber<T> {
void onSubscribe(Subscription sub);
void onNext(T item);
void onError(Throwable ex);
void onComplete();
}Những sự kiện này được gửi thông qua các phương thức trong giao diện Subscriber như sau: Sự kiện đầu tiên mà Subscriber nhận được là cuộc gọi đến onSubscribe(). Khi Publisher gọi onSubscribe(), nó truyền một đối tượng Subscription đến Subscriber. Thông qua Subscription, Subscriber có thể quản lý đăng ký của mình như sau:
public interface Subscription {
void request(long n);
void cancel();
}Subscriber có thể gọi request() để yêu cầu dữ liệu được gửi, hoặc có thể gọi cancel() để cho biết rằng nó không còn muốn nhận dữ liệu nữa và hủy đăng ký. Khi gọi request(), Subscriber truyền vào một giá trị kiểu long để chỉ rõ số lượng dữ liệu mà nó sẵn sàng chấp nhận. Đây là lúc backpressure phát huy tác dụng, ngăn Publisher gửi quá nhiều dữ liệu vượt khả năng xử lý của Subscriber. Sau khi Publisher đã gửi hết số lượng dữ liệu được yêu cầu, Subscriber có thể gọi request() thêm lần nữa để yêu cầu thêm dữ liệu.
Sau khi Subscriber đã yêu cầu dữ liệu, dữ liệu sẽ bắt đầu chảy qua luồng. Với mỗi phần tử được phát hành bởi Publisher, phương thức onNext() sẽ được gọi để chuyển dữ liệu tới Subscriber. Nếu có lỗi xảy ra, onError() sẽ được gọi. Nếu Publisher không còn dữ liệu nào để gửi và sẽ không tạo thêm dữ liệu mới, nó sẽ gọi onComplete() để thông báo cho Subscriber rằng quá trình đã hoàn tất.
Về giao diện Processor, nó là sự kết hợp của Subscriber và Publisher, như minh họa sau:
public interface Processor<T, R>
extends Subscriber<T>, Publisher<R> {}Là một Subscriber, Processor sẽ nhận dữ liệu và xử lý theo một cách nào đó. Sau đó, nó sẽ chuyển sang vai trò Publisher để phát hành kết quả tới các Subscriber của nó.
Như bạn có thể thấy, đặc tả Reactive Streams khá đơn giản. Rất dễ hình dung cách xây dựng một pipeline xử lý dữ liệu bắt đầu với một Publisher, đẩy dữ liệu qua một hoặc nhiều Processor, rồi cuối cùng chuyển kết quả tới một Subscriber.
Tuy nhiên, các giao diện Reactive Streams không hỗ trợ việc kết hợp các luồng theo cách hàm. Project Reactor là một hiện thực của đặc tả Reactive Streams và cung cấp một API hàm để kết hợp các luồng phản ứng. Như bạn sẽ thấy trong các chương tiếp theo, Reactor là nền tảng cho mô hình lập trình phản ứng của Spring. Trong phần còn lại của chương này, chúng ta sẽ khám phá (và có thể nói là rất thú vị) về Project Reactor.
