Javadocをよく読む(CompletableFuture, CompletionStage編)

はじめに

Javadocしっかり読んでみるシリーズ第3弾です。
(第1弾(Executor編),第2弾(ThreadPoolExecutor編))

今回は、Java8で並行処理関連で追加された以下の2クラスのJavadocを読んでみます。

  • CompletionStage
  • CompletableFuture

CompletionStageとは

ちょっと長いJavadocなので最初だけ抜粋します。

CompletionStageが完了したときにアクションの実行または値の計算を行う、非同期の可能性がある計算のステージです。ステージはその計算が終了したときに完了しますが、これに続いて他の依存ステージがトリガーされる場合があります。

小難しい雰囲気の説明ですが、このインタフェースの特徴をざっくりまとめると以下3点がポイントです。

  • 非同期にできる
  • 計算のどこかの段階を表す
  • 完了後に、他の段階(ステージ)をトリガできる

CompletableFutureとは

CompletionStageとFutureの実装クラスです。 Javadocの説明は以下の通りです。

明示的に(その値とステータスを設定して)完了できるFutureです。その完了時に発生する依存関数およびアクションをサポートし、CompletionStageとして使用できます。

ということで、Javadocのままですが、以下の特徴を持つクラスです。

  • Futureの1つで
  • CompletionStageでもある

なので、とりあえずCompletionStageをおさえておけばこちらもOKです。

では、さっそくサンプルコード….の前に、これらのインターフェース(クラス)の用途について考えてみます。

CompletionStageを使えそうな場面

CompletionStageの特徴から考えてみると、

  • 時間のかかるタスク(非同期で実行させたい)で
  • いくつかのタスクに分離させたくて
  • 前のタスク完了をトリガに別のタスクを実行したい

といった場面で使えそうです。

サンプルプログラム仕様

先ほど考えた場面を踏まえて、サンプルは以下動作仕様のプログラムとしてみます。

動作仕様

  • Webから指定銘柄の株価情報テーブル(一定期間分)をダウンロード
  • ダウンロードが完了したら、株価情報テーブルから終値の列のみ抽出
  • 抽出した終値リストの平均値をプリント

ソースコード

このプログラムのポイントは以下3点です。

  • supplyAsyncで非同期に株価情報テーブルのダウンロードを実行
  • ↑の完了をトリガにthenApplyで株価情報テーブルから終値のリストを抽出
  • ↑の完了をトリガにthenAcceptで終値のリストから平均値を計算しプリント
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.math.BigDecimal;
import java.net.URL;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;

/**
 * yahooファイナンスの株価情報を取得し、終値の平均値を出力するプログラム
 */
public class Main {
    /**
     * Google/Amazon/Appleの数年分の株価情報を取得し終値の平均値を出力する
     */
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        System.out.println(Thread.currentThread().getName() + ": main() started.");
        //Googleの株価
        CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> downloadStockInfo("GOOG"))
                                                 .thenApply(Main::extractClosePrices)
                                                 .thenAccept(Main::printAveragePrice);
        //Amazonの株価
        CompletableFuture cf2 = CompletableFuture.supplyAsync(() -> downloadStockInfo("AMZN"))
                                                 .thenApply(Main::extractClosePrices)
                                                 .thenAccept(Main::printAveragePrice);
        //Appleの株価
        CompletableFuture cf3 = CompletableFuture.supplyAsync(() -> downloadStockInfo("AAPL"))
                                                 .thenApply(Main::extractClosePrices)
                                                 .thenAccept(Main::printAveragePrice);
        System.out.println(Thread.currentThread().getName() + ": main() running.");
        cf1.get();
        cf2.get();
        cf3.get();
        System.out.println(Thread.currentThread().getName() + ": main() end.");
    }
    }

    /**
     * YahooファイナンスのURLから指定した銘柄の数年分の株価情報を取得する
     * @param ticker 銘柄のID
     * @return 株価情報テーブル
     */
    private static Stream downloadStockInfo(final String ticker) {
        try {
            URL url = new URL("https://ichart.finance.yahoo.com/table.csv?s=" + ticker);
            BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream()));
            System.out.println(Thread.currentThread().getName() + ": downloadStockInfo() executed. ticker " + ticker);
            return reader.lines();
        } catch(Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    /**
     * 株価情報テーブルから終値を抽出する
     * @param stockInfo 株価情報テーブル
     * @return 株価情報テーブルから抽出した終値
     */
    private static Stream extractClosePrices(Stream stockInfo) {
        Stream closePricesStr = stockInfo.skip(1).map(x -> x.split(",")[4]);
        Stream closePrices = closePricesStr.map(BigDecimal::new);
        System.out.println(Thread.currentThread().getName() + ": getClosePrice() executed.");
        return closePrices;
    }

    /**
     * 終値の平均値を計算し出力する
     * @param closePrices 終値の数年分のリスト
     */
    private static void printAveragePrice(Stream closePrices){
        Double average = closePrices.mapToDouble(BigDecimal::doubleValue).average().getAsDouble();
        System.out.println(Thread.currentThread().getName() + ": printAveragePrice() executed. avg " + average + ".");
    }
}

実行結果

main: main() started.
main: main() running.
ForkJoinPool.commonPool-worker-1: downloadStockInfo() executed. ticker GOOG
ForkJoinPool.commonPool-worker-1: getClosePrice() executed.
ForkJoinPool.commonPool-worker-3: downloadStockInfo() executed. ticker AAPL
ForkJoinPool.commonPool-worker-3: getClosePrice() executed.
ForkJoinPool.commonPool-worker-2: downloadStockInfo() executed. ticker AMZN
ForkJoinPool.commonPool-worker-2: getClosePrice() executed.
ForkJoinPool.commonPool-worker-2: printAveragePrice() executed. avg 151.64508035651642.
ForkJoinPool.commonPool-worker-1: printAveragePrice() executed. avg 551.9024963535911.
ForkJoinPool.commonPool-worker-3: printAveragePrice() executed. avg 99.3297423291363.
main: main() end.

この実行結果のポイントは、以下の3点です。

  • 各銘柄の株価情報ダウンロードが異なるスレッドで、並列に非同期で実行されている
  • downloadStackInfo()の完了をトリガにgetClosePrice()が実行されている
  • getClosePrice()の完了をトリガにprintAveragePrice()が実行されている

まとめ

CompletableFutureとCompletableStageを使うとこんな感じで非同期の処理をメソッドチェーンで簡潔にスッキリ書くことができます。
 

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です

CAPTCHA