在Java 9新特性中,包含了由JSR166領導者Doug Lea提案的Flow API,它被包含在java.base模組,是定義在java.util.concurrent套件中精簡的Flow API,目的在於改進開發者的非同步串流資料處理,然而,Java平臺內部並沒有任何地方應用到Flow API,若實際看看API文件,也覺得它實際上只是個空殼?
關於Reactive Programming
現今對許多開發者而言,尤其是對前端開發者而言,聽到Reactive Programming應該不會覺得陌生,不少技術生態圈中都有著支援Reactive概念的框架,就ReactiveX而言,就有著JavaScript的RxJS,Java的RxJava、Go的RxGo等,清單可參考〈ReactiveX〉頁面(https://goo.gl/5mRm8Q)。簡單來說,Reactive Programming是一種程式設計典範,基於發布訂閱(Publish-subscribe)的概念,來處理非同步資料。
網路上有許多的資料,都解釋了Reactive Programming的概念,我先前專欄〈FRP與函數式〉,也曾經舉試算表軟體中欄位間基於公式的連動,作為例子——就實作上,我是將(事件)資料的迭代、轉換等底層細節隱藏起來;就應用上,更重要的是,從需求規格中辨識出高階資料流,結合函數式的概念,將資料流的處理意圖等突顯出來。
在Java這塊,社群的腳步總是遠快於Java標準本身,好不容易推出了純為安撫使用者,而新特性殘缺不全的Java 7之後,接下來,就是努力追趕社群企求的一些現代化特性。就像當Java 8好不容易跟上Lambda、Stream等函數式概念,並且提供了可基於同步概念實現非同步處理的CompletableFuture之後,社群開始追求下一塊關於Reactive的拼圖,而終於也在2015年一月,我們看到Doug Lea提出了Reactive的侯選規格(https://goo.gl/JGPkXM)。
如果你的身分之一是Java開發者,從未聽說過Reactive Programming,甚至,也沒使用過Java 8中的Lambda、Stream、CompletableFuture等API,可以先試著看看〈解析JDK8 Functional API〉(https://goo.gl/9zfiUq),然後,試著用函數式的概念來看看〈Java 8 Patterns〉(https://goo.gl/LPLnZz),此時,對於(Functional) Reactive Programming在Java中的實現與應用,就能有個粗淺的概念。
Reactive Streams Specification
當試著從RxJava瞭解Reactive Programming的實作與應用時,你會發現,它已經來到了RxJava 2.0。
根據RxJava的〈What's different in 2.0〉(https://goo.gl/7O2fgE)頁面,其一開始就談到,為了符合Reactive Streams Specification的規範,RxJava是整個重寫,而成為2.0。實際上,Reactive Streams Specification本身,亦是受到了RxJava 1.x許多影響,而演化出來的規格。
在〈Reactive Streams〉(https://goo.gl/BftrWt)中,一開始也指出,其主要目標之一,是為非同步串流的壓力處理(Back pressure),定義一個標準,它僅提供一個最小的介面、方法與協定集合,最終的DSL相關API被特意排除在外,留待並鼓勵廠商或社群,去各自實現。
就Java這部份具體來說,規格在org.reactivestreams套件下定義了Publisher、Subscriber、Subscription與Processor四個介面(詳細方法簽署等可參考https://goo.gl/ugyDSh),其中,Publisher實例會發布資料串流,接受Subscriber的訂閱,並建立一個Subscription實例代表該次訂閱,而在訂閱成功事件發生時,會呼叫Subscriber的onSubscribe,並傳入Subscription實例。
Subscription是Publisher、Subscriber之間溝通的橋樑,可以進行流量控制。這是為了避免訂閱者來不及消化資料流來源產生的資料,而引發事件的持續堆積而造成記憶體的滿溢,Subscriber可以透過傳入的Subscription,使用request(n)向Publisher請求n筆資料,或者是透過cancel(),要求Publisher停止傳送資料並清除資源。
而資料流可能被轉換,Processor同時扮演著Publisher與Subscriber(Processor繼承了這兩個介面),在最前端的Subscriber與最末端的Subscriber之間,可以串接多個Processor,每個Processor代表著整個資料流串的一個階段。
若開發者曾經使用過RxJava,可能想要知道的是,相同概念下1.x與2.0之間名稱的不同,而這部份,在RxJava的〈What's different in 2.0〉中有詳細的說明。
Java 9 Flow API
作為新特性之一,Java 9引入了Reactive Programming的概念,具體來說,是在java.util.concurrent.Flow類別中,定義了四個介面,它們遵守Reactive Streams Specification的規範,因此,各介面下實際的方法簽署,以及org.reactivestreams套件下的定義,是一樣的。
在Java 9中,Flow類別基本上僅作為一個名稱空間,除了管理四個介面之外,本身只定義了Publisher或Subscriber的緩衝預設值。Flow API中的唯一實作,就是java.util.concurrent.SubmissionPublisher類別,它實現了Publisher介面,在內部時,使用ForkJoinPool.commonPool()作為預設實作,以非同步地對Subscriber傳遞資料。
除了像〈Reactive Programming with JDK 9 Flow API〉(https://goo.gl/REmbvg)的介紹,有一些簡單場合,可直接使用SubmissionPublisher之外,SubmissionPublisher基本上是作為基礎類別,以便在繼承之後自行實現Publisher類別,或者是繼承之後同時實現Processor介面,以自行實作Processor類別。然而,Java 9中,並沒有其他任何SubmissionPublisher的子類別了,目前看來,Java 9也沒有在內部使用到Flow API。
如果真的要使用Java 9 Flow API,目前來說必須自行實作。SubmissionPublisher類別的API文件(https://goo.gl/QVCl9W),提供了一些範例,而在Flow類別的API文件(https://goo.gl/rIRuIo),則提供了直接實作Publisher與Subscriber介面的基本架構,其中,也包含了Subscription的實作,有助於瞭解它是怎麼在Publisher與Subscriber之間進行溝通。
有興趣的話,也可以看看SubmissionPublisher的原始碼。別忘了,在Java 9的模組化架構下,它已經被歸在java.base模組中,在打開原始碼壓縮檔之後,得在java/base目錄下,才能找到java.util.concurrent套件,以及底下相關的.java檔案。
等待實作品的Flow API
簡單來說,目前Java 9中的Flow API,還是個空殼,或者,另一說法,是像Doug Lea談到的——在CompletableFuture/CompletionStage,支援了以同步風格撰寫非同步程式,以及java.util.stream支援了群集的pull風格之後,Java還少了從主動源push資料的操作風格,支援這最小集合,有助於對主動資料源採用pull風格時,避免一些不愉快意外(unpleasant surprises)。
就目前而言,Java 9 Flow API還需等待實作品,而由於Flow API也僅出現在Java 9上,對於其他版本平臺,選擇支持Reactive Streams Specification的第三方程式庫,會是更好的選擇(例如RxJava 2.0),搭配retrolambda,以便使用lambda語法,將來真要遷移至Java 9,過程應會比較和緩。
當然,保守的Java納入了Reactive的概念,是一種象徵,代表著Reactive已是普及的開發選項之一。
而面對這類從社群進入標準平臺的典範,瞭解其發展過程,往往也是更為重要的一環。無論是等待支援Flow API的實作品,或者採取RxJava 2.0之類的程式庫,瞭解ReactiveX、Reactive Streams Specification與它們之間的關係,將有助於瞭解驅動演進的問題與需求所在,從而更能掌握何時、以及如何應用這樣的典範。
專欄作者
熱門新聞
2025-01-06
2025-01-07
2025-01-08
2025-01-06
2025-01-07
2025-01-07