Kengo's blog

Technical articles about original projects, JVM, Static Analysis and TypeScript.

CompletableFutureの何が嬉しいか、そしてReactorへ……

DBにクエリを2つ投げて合成する処理

例えばユーザーIDから所有するリソースを検索してその情報を返すAPI Stream<Resource> find(long userId) を実装するとします。Resourceは大量に帰ってくる可能性があるので、List<Resource>ではなくStream<Resource>として扱います。実装はどうなるでしょうか:

// blocking API (wrong)
try (UserDao userDao = daoFactory.createUserDao();
    ResourceDao resourceDao = daoFactory.createResourceDao();) {
  Stream<Long> resourceIds = userDao.search(userId); // ページング処理を隠蔽
  return resourceIds.map(resourceId -> {
    Resource resource = resourceDao.find(resourceId);
    return resource;
  }); // Stream<Result>
}

これは正常に動作しません。メソッドを抜ける前にtry-with-finallyブロックがDAOを閉じてしまうからです。 呼び出し元がStream<Resource>を閉じてくれることを期待して、以下のようなコードになるでしょう:

// blocking API
UserDao userDao = daoFactory.createUserDao();
ResourceDao resourceDao = daoFactory.createResourceDao();
Stream<Long> resourceIds = userDao.search(userId); // ページング処理を隠蔽

// Streamが閉じられたらDAOも閉じる
resourceIds.onClose(userDao::close);
resourceIds.onClose(resourceDao::close);

return resourceIds.map(resourceId -> {
  Resource resource = resourceDao.find(resourceId);
  return resource;
}); // Stream<Result>

このように処理を遅延するコードはFutureでは書けません。Java8で登場したCompletableFutureを使うことになります。例えばUserResourceがひとつだけ結びつく状態なら以下のように書けます:

// CompletableFuture
UserDao userDao = daoFactory.createUserDao();
ResourceDao resourceDao = daoFactory.createResourceDao();
CompletableFuture<Long> future = userDao.search(userId).whenComplete( ... );

return future.thenApply(resourceId -> {
  Resource resource = resourceDao.find(resourceId);
  return resource;
}).whenComplete( ... ); // CompletableFuture<Result>

が、Streamのように複数の値を返すモノに使うのは複雑な工夫が必要ですし、やりようによってはすべての値を一度にオンメモリに乗せてしまいます。 私の知る限りではReactorフレームワークやRxJavaの採用がこの問題へのスマートな解になります。例えばFlux#using()を使うと以下のように書けます:

// Reactor
Flux<Long> resourceIds = Flux.using(daoFactory::createUserDao, userDao -> {
  return userDao.search(userId);
}, UserDao::close);

return resourceIds.flatMap(resourceId -> {
  Mono<Resource> resource = Mono.using(daoFactory::createResourceDao, resourceDao -> {
    resourceDao.find(resourceId);
  }, ResourceDao::close);
  return resource; // Flux<Resource>
});

関連記事

blog.kengo-toda.jp