Kengo's blog

Technical articles about original projects, JVM, Static Analysis and TypeScript.

RxJavaはVert.x 3でSQL書くのに便利

自分にとっての新技術を試すための個人プロジェクトでVert.xを使っているのだが、PostgresクライアントがTechnical Previewになったのでオンメモリに乗せていたデータをpostgresに乗せるようにしたところ、2重3重のネストがコードに現れるようになった。

そこでRxJavaを利用することでコードの短縮化を図った。以前利用したReactive Streamsとは別物とのこと。

結果

Handlerベースの実装だとClassがだいたい100行だが、Observableベースの実装だと40行強で済んだ。ネスト数も抑えられている。

何が効果的だったのか

コードの見通しを良くしたのは例外処理。 Handler (コールバック)内で処理が失敗したかどうか毎回確認していたのが、Stream API同様のmap処理に置き換えられた。

サービスを書いているとよくある「getConnectionしてトランザクションを開始してSELECTしてINSERTしてCOMMITする」という処理が、Handlerで書くと5重ネストと5回の例外処理に化けてしまう。ネストをメソッドチェーンに置き換えたうえで例外処理を一本化(主にSubscriber側の処理になる)できるのは非常に見通しが良い。

// before
        postgreSQLClient.getConnection(connected -> {
            if (connected.failed()) {
                future.fail(connected.cause());
                handler.handle(future);
            } else {
                SQLConnection con = connected.result();
// after
        return postgreSQLClient.getConnectionObservable()
            .flatMap(con -> {

使用感:DB操作をObservableとしてを扱うコードを書く上での課題

postgresクライアントのInstantの扱いにバグがある模様。現時点ではStringとして取り出す必要がある。Technical Previewだし仕方ない。

非同期APIなのでリソースの解放にtry-with-resourcesやLoanパターンが使えない点に注意が必要。例えばDB接続を使い終わった時点で接続を閉じるには、以下のように Observable#doAfterTerminate(Action0)を使う必要がある。これなら処理が正常終了した時も異常終了した時も実行される。

        return postgreSQLClient.getConnectionObservable().flatMap(con -> {
            return con.queryObservable(
                    "SELECT id, uploaded_file_name, resolutions, generated FROM task")
                    .doAfterTerminate(con::close);
        }).flatMap(selected -> {

なおリソース解放用に用意されているObservable#using()を律儀に使おうとすると以下のようになってしまい、EclipseJavaコンパイラだとうまくコンパイルできないことがある。第一引数にFunction0(Resource)ではなくFunction0(Observable<Resource>)を使えるなら、() -> conなどという無意味なコードを書かなくて済んだのだが……。

実装を見るとusing()doAfterTerminate()とは全く違う複雑な処理をしているので、doAfterTerminate()で完全な代替になるとは思わないほうが良さそうだ。Observable#using()を自然に使えるような書き方を探す必要があるが、Vert.xのサンプルプロジェクトには現状usingを使ったサンプルは無い。

        return postgreSQLClient.getConnectionObservable().flatMap(con -> {
                return Observable.using(
                        () -> con,
                        connection -> connection.queryWithParamsObservable("SELECT uploaded_file_name, resolutions, generated FROM task WHERE id = ?", params),
                        connection -> connection.close());
        }).flatMap(selected -> {

もっと利用できそうな機能

今のところ更新系など「値を返す必要はないけど正常終了したかどうかだけ確認したい」場合にObservable<Void>を返しているが、Single<Void>を使うほうがシンプルかもしれない。複数の結果が帰ってくることがないとわかっている以上、それを型として明示したほうが何かと混乱がなさそう。

現在のプロジェクトではSchedulerを一切使っていないが、Vert.xと組み合わせて使う以上その必要性は薄そうなので、GUIアプリで試す必要があるかも。

まとめ:RxJavaは何に便利か?

自分が想像できるのは非同期I/Oを多く持つサーバで、多数のI/Oをマージしてクライアントにレスポンスを返すような処理を書くこと。もちろんブロッキングI/Oも混ぜることも可能(Vert.xの機能に寄せるかSchedulerを利用する)。Stream APIとほぼ同じ感覚で扱える。

またObserverSubscriber利用するスレッドを指定できるので、UI Threadを極力あけたい&イベントベースで書きたいAndroidのようなGUIアプリにおいても活躍が期待されているように見える。もうずっとSwingに触れていないがおそらくSwingでも使えるだろうし、JavaFxバインディングは公式に存在するようだ。