自分にとっての新技術を試すための個人プロジェクトでVert.xを使っているのだが、PostgresクライアントがTechnical Previewになったのでオンメモリに乗せていたデータをpostgresに乗せるようにしたところ、2重3重のネストがコードに現れるようになった。
本当にこのネスト地獄はすごい。JS以上だ。 https://t.co/z8XEuz0svq
— Kengo TODA (@Kengo_TODA) July 25, 2016
そこでRxJavaを利用することでコードの短縮化を図った。以前利用したReactive Streamsとは別物とのこと。
結果
Handlerベースの実装だとClassがだいたい100行だが、Observableベースの実装だと40行強で済んだ。ネスト数も抑えられている。
何が効果的だったのか
コードの見通しを良くしたのは例外処理。 Handler
(コールバック)内で処理が失敗したかどうか毎回確認していたのが、Stream API同様のmap処理に置き換えられた。
サービスを書いているとよくある「getConnectionしてトランザクションを開始してSELECTしてINSERTしてCOMMITする」という処理が、Handler
で書くと5重ネストと5回の例外処理に化けてしまう。ネストをメソッドチェーンに置き換えたうえで例外処理を一本化(主にSubscriber
側の処理になる)できるのは非常に見通しが良い。
// before postgreSQLClient.getConnection(connected -> { if (connected.failed()) { future.fail(connected.cause()); handler.handle(future); } else { SQLConnection con = connected.result();
// after return postgreSQLClient.getConnectionObservable() .flatMap(con -> {
使用感:DB操作をObservableとしてを扱うコードを書く上での課題
postgresクライアントのInstant
の扱いにバグがある模様。現時点ではString
として取り出す必要がある。Technical Previewだし仕方ない。
非同期APIなのでリソースの解放にtry-with-resourcesやLoanパターンが使えない点に注意が必要。例えばDB接続を使い終わった時点で接続を閉じるには、以下のように Observable#doAfterTerminate(Action0)
を使う必要がある。これなら処理が正常終了した時も異常終了した時も実行される。
return postgreSQLClient.getConnectionObservable().flatMap(con -> { return con.queryObservable( "SELECT id, uploaded_file_name, resolutions, generated FROM task") .doAfterTerminate(con::close); }).flatMap(selected -> {
なおリソース解放用に用意されているObservable#using()
を律儀に使おうとすると以下のようになってしまい、EclipseのJavaコンパイラだとうまくコンパイルできないことがある。第一引数にFunction0(Resource)
ではなくFunction0(Observable<Resource>)
を使えるなら、() -> con
などという無意味なコードを書かなくて済んだのだが……。
実装を見るとusing()
はdoAfterTerminate()
とは全く違う複雑な処理をしているので、doAfterTerminate()
で完全な代替になるとは思わないほうが良さそうだ。Observable#using()
を自然に使えるような書き方を探す必要があるが、Vert.xのサンプルプロジェクトには現状usingを使ったサンプルは無い。
return postgreSQLClient.getConnectionObservable().flatMap(con -> { return Observable.using( () -> con, connection -> connection.queryWithParamsObservable("SELECT uploaded_file_name, resolutions, generated FROM task WHERE id = ?", params), connection -> connection.close()); }).flatMap(selected -> {
もっと利用できそうな機能
今のところ更新系など「値を返す必要はないけど正常終了したかどうかだけ確認したい」場合にObservable<Void>
を返しているが、Single<Void>
を使うほうがシンプルかもしれない。複数の結果が帰ってくることがないとわかっている以上、それを型として明示したほうが何かと混乱がなさそう。
現在のプロジェクトではScheduler
を一切使っていないが、Vert.xと組み合わせて使う以上その必要性は薄そうなので、GUIアプリで試す必要があるかも。
まとめ:RxJavaは何に便利か?
自分が想像できるのは非同期I/Oを多く持つサーバで、多数のI/Oをマージしてクライアントにレスポンスを返すような処理を書くこと。もちろんブロッキングI/Oも混ぜることも可能(Vert.xの機能に寄せるかScheduler
を利用する)。Stream APIとほぼ同じ感覚で扱える。
またObserver
とSubscriber
で利用するスレッドを指定できるので、UI Threadを極力あけたい&イベントベースで書きたいAndroidのようなGUIアプリにおいても活躍が期待されているように見える。もうずっとSwingに触れていないがおそらくSwingでも使えるだろうし、JavaFxバインディングは公式に存在するようだ。