そうですね。例えば2つのクエリをDBに投げて結果を合成する、みたいなことを書いてみると実感しやすいです。Statementをcloseする処理ももちろん書くのです。
— 達者でな (@Kengo_TODA) March 14, 2020
DBにクエリを2つ投げて合成する処理
例えばユーザーIDから所有するリソースを検索してその情報を返すAPI Stream<Resource> find(long userId)
を実装するとします。Resource
は大量に帰ってくる可能性があるので、List<Resource>
ではなくStream<Resource>
として扱います。実装はどうなるでしょうか:
// blocking API (wrong) try (UserDao userDao = daoFactory.createUserDao(); ResourceDao resourceDao = daoFactory.createResourceDao();) { Stream<Long> resourceIds = userDao.search(userId); // ページング処理を隠蔽 return resourceIds.map(resourceId -> { Resource resource = resourceDao.find(resourceId); return resource; }); // Stream<Result> }
これは正常に動作しません。メソッドを抜ける前にtry-with-finally
ブロックがDAOを閉じてしまうからです。
呼び出し元がStream<Resource>
を閉じてくれることを期待して、以下のようなコードになるでしょう:
// blocking API UserDao userDao = daoFactory.createUserDao(); ResourceDao resourceDao = daoFactory.createResourceDao(); Stream<Long> resourceIds = userDao.search(userId); // ページング処理を隠蔽 // Streamが閉じられたらDAOも閉じる resourceIds.onClose(userDao::close); resourceIds.onClose(resourceDao::close); return resourceIds.map(resourceId -> { Resource resource = resourceDao.find(resourceId); return resource; }); // Stream<Result>
このように処理を遅延するコードはFutureでは書けません。Java8で登場したCompletableFutureを使うことになります。例えばUser
にResource
がひとつだけ結びつく状態なら以下のように書けます:
// CompletableFuture UserDao userDao = daoFactory.createUserDao(); ResourceDao resourceDao = daoFactory.createResourceDao(); CompletableFuture<Long> future = userDao.search(userId).whenComplete( ... ); return future.thenApply(resourceId -> { Resource resource = resourceDao.find(resourceId); return resource; }).whenComplete( ... ); // CompletableFuture<Result>
が、Stream
のように複数の値を返すモノに使うのは複雑な工夫が必要ですし、やりようによってはすべての値を一度にオンメモリに乗せてしまいます。
私の知る限りではReactorフレームワークやRxJavaの採用がこの問題へのスマートな解になります。例えばFlux#using()を使うと以下のように書けます:
// Reactor Flux<Long> resourceIds = Flux.using(daoFactory::createUserDao, userDao -> { return userDao.search(userId); }, UserDao::close); return resourceIds.flatMap(resourceId -> { Mono<Resource> resource = Mono.using(daoFactory::createResourceDao, resourceDao -> { resourceDao.find(resourceId); }, ResourceDao::close); return resource; // Flux<Resource> });