RSS2.0

Java で Akka Stream を遊んでみる

Akka Stream は並列/分散処理を実現するためのライブラリのひとつです。他のライブラリ同様に、処理全体をいくつかの処理単位に分割し、複数箇所に独立して存在するマシンで並列に処理することができますが、Akka Stream は大きな特徴として、その処理単位を扱うバッファスペースを固定長(bounded buffer space)として実現しているという点があります。

普通、分散処理では独立したマシン同士が処理データや処理結果を共有するためにメッセージ送信をしあうのですが、そこで課題となるのが受取ったメッセージの保管方法です。処理するデータを受け取ったとしても、自分の担当する処理内容が他よりも時間のかかる内容だった場合、受け取ったデータを捌ききれずに滞留してしまうことになります。このため、受け取ったデータの保管する領域を可変長にしてすべて保管できるようにしたり(この場合、保管領域が無尽蔵に必要となってしまう)、固定長の領域に補完しきれない分は捨ててしまったり(データの消失がありうる)といった手段が取られることがあります。

しかし Akka Stream は、いくつかの工夫によって、処理単位を扱う領域であるバッファスペースを固定長としたまま、持ちきれなかったデータを捨てることなくメッセージングを実現しています。

今回は最初ということで、Akka Stream の立ち入った話には触れずに公式ドキュメントの Java 向け Quickstart Guide の内容をさらってみたいと思います。
2 年前の年末くらいに一度触っていたのですが、それからバージョンが上がって使い勝手が良くなっているようなので、その辺の調査も兼ねた感じですね。

Akka Stream の Graph の構成要素

まずは用語を整理しておきましょう。公式ドキュメントの Basics and working with Flows ページを参考に話を進めます。さらに、このページには Akka Stream 全体に関わる用語の定義が記載されていますが、ここで触れるのは今回の内容に関係する部分にとどめます。

まず、Akka Stream で実現する処理全体を Stream と呼びます。そして Stream は任意の数の処理内容の基本単位である Element によって構成されます。
例えば、全 100 巻の長編小説から特定の登場人物が登場するページ数を集計するという要望があったとすると、その 100 冊の本を与えて集計結果を取り出すまでの処理全体が Stream にあたります。Element は好きなように設計できるので、1 冊毎にページ数を集計する Element を設計してもいいですし、さらに細分化して 1 冊内の集計対象ページの抽出する Element と抽出したページ数の合算する Element を切り離して設計してもよいです。基本的に処理を並列/分散する単位がこの Element になります。

Element はその動作の特徴から更に 3 つに細分化され分類されています。他の Element からデータを受け取るのではなく自らデータを発信するだけの Source、逆に受け取ったデータを他の Element には渡すことのない Sink、他の Element からデータを受取り他の Element に渡す Flow です。先ほどの小説の例では、100 冊の小説データを Stream 上に流し始めるのが Source、巻毎ページ毎に登場人物の名前を抽出したり集計したりするのが Flow、最後に各 Flow から数値をとりまとめて最終的な集計結果とするのが Sink になります。

どの Element も前後の別の Element と連携するためにデータのやり取りをする必要があるため、すべてが連なって Stream を構成していることがわかると思います。そして各 Element が担当している処理内容や、関わる他の Element、他の Element とどのようなやりとりをするのかを表した見取図のようなものが Graph と呼ばれています。Graph はそのまま絵のような意味に聞こえますが、Akka Stream ではクラスとして設計されていますので、実際にコードとして書いていくことになります。

Akka Stream の依存性設定

実際に Akka Stream でコーディングするにあたって、まずはプロジェクトにライブラリを追加します。
Maven リポジトリに上がっている Akka Stream のバージョンは、現時点は 2.5.9 が最新版になります。
maven プロジェクトの場合は pom.xml に以下のように依存性を追加します。
<!-- Akka Stream -->
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-stream_2.12</artifactId>
  <version>2.5.9</version>
</dependency>

Akka Stream を動かしてみる

さっそく Akka Stream でコードを書いてみます。
以下は 1 〜 100 までの数値を生成して標準出力に出力する Stream です。
Source は 1 ~ 100 の Integer 値を生成して下流に渡し、Sink は受け取った Integer 値を System.out.println() でコンソールに出力します。
import java.util.concurrent.CompletionStage;

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

public class AkkaStreamSample01 {

    public static void main(String[] argv) {
        // Akka Stream の処理系を用意する
        final ActorSystem system = ActorSystem.create("AkkaStreamSample");
        final Materializer materializer = ActorMaterializer.create(system);

        // Akka Stream で処理する Graph を用意する

        // Source は 1 〜 100 の Integer を生み出す
        final Source<Integer, NotUsed> source = Source.range(1, 100);
        // Sink は標準出力へ受け取った値を出力する
        final Sink<Integer, CompletionStage<Done>> sink = Sink.foreach(i -> System.out.println(Thread.currentThread().getId() + ": " + i));

        // Source を Sink につなげて Graph にする
        final RunnableGraph<NotUsed> graph = source.to(sink);

        // Akka Stream の処理系を使って、実際に Graph に定義された処理を実行する
        final NotUsed done = graph.run(materializer);

        // 実行後には Akka Stream の処理系を終了する
        system.terminate();
    }

}
ソースコードは公式ドキュメントの Quick Start ガイドの内容に手を加えたものです。イメージをつかみやすいよう、Source や Sink、Graph をそれぞれインスタンス化しています。

ActorSystem と Materializer クラスについては、現時点では Stream を動かすための処理エンジンに相当するもの、という程度の認識で良いと思います。以降の Graph の定義とは別に、ActorSystem や Materializer のインスタンス生成が必要で、それは Graph 本体とは分離されている、ということですね。

次に Source と Sink をそれぞれ作ります。Source はファクトリーメソッドである Source.range() を使うことで、指定の範囲の連番を生成する Source を簡単に作ることができます。Sink も Sink.foreach() メソッドに入力値を処理するラムダ式を渡すことで、そのラムダ式を実行する Sink を作っています。

Source クラスの to() メソッドで Sink につなぐと Graph が完成します。正確にはこれは RunnableGraph クラスのインスタンスになります。
この Graph の run() メソッドに引数として Materializer を渡して実行することで、実際に Graph を織りなす Element の処理内容が動くことになります。

実行後には ActorySystem クラスの terminate() メソッドで、ActorySystem 自体を終了します。

これと同じ処理内容を以下のようにも書くことができます。
    public static void main(String[] argv) {
        // Akka Stream の処理系を用意する
        final ActorSystem system = ActorSystem.create("AkkaStreamSample");
        final Materializer materializer = ActorMaterializer.create(system);

        // Akka Stream で処理する Graph を用意する

        // Source は 1 〜 100 の Integer を生み出す
        final Source<Integer, NotUsed> source = Source.range(1, 100);

        // Akka Stream の処理系を使って、実際に Graph に定義された処理を実行する
        // Sink は標準出力へ受け取った値を出力する
        // Source を Sink につないだことで得られる RunnableGraph に対して run() を実行し、実処理を行う
        final CompletionStage<Done> done = source.runWith(Sink.foreach(i -> System.out.println(Thread.currentThread().getId() + ": " + i)), materializer);

        // 実行後には Akka Stream の処理系を終了する
        done.thenRun(() -> system.terminate());
    }
Source クラスの runWith() メソッドは、Source と Sink をつなげつつ同時に Materializer での処理実行までを行うことができます。また返り値の CompletionStage には thenRun() メソッドが用意されていて、Stream 処理実行後に行う ActorSystem の終了もラムダ式として渡すことができます。

裏では Sink や Graph を同じように生成していますが、コード量はこちらのほうが少なくて済みます。
公式ドキュメントの Quick Start ガイドに記載されている内容はこんなコードなのですが、Stream を構成する Element について意識したほうが良いと思い、最初は詳細なコードを書いてみました。

Graph をまとめて別の Graph にする

Akka Stream の Graph は、任意の数の別の Graph を内包することができます。言い換えるなら、ある Graph に Element を追加して別の Graph を作ることができます。これは Graph 設計上での利便性や、設計した Graph の再利用性を高める上でとても有利な特徴となります。

以下のソースコードでは、既存の Sink を拡張して別の Sink を作成したり、Source にいくつかの Flow を追加して全体をまとめて 1 つの Sink としたりしています。Graph の処理としては、2 の累乗を 100 個作り出し、それらのうち 1024 に等しくない値に絞り込み、残りの値をすべてファイルに出力しています。結果が出力されるファイルは result.txt です。
import java.math.BigInteger;
import java.nio.file.Paths;
import java.util.concurrent.CompletionStage;

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.IOResult;
import akka.stream.Materializer;
import akka.stream.javadsl.FileIO;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.util.ByteString;

public class AkkaStreamSample02 {

    public static void main(String[] argv) {
        // Akka Stream の処理系を用意する
        final ActorSystem system = ActorSystem.create("AkkaStreamSample");
        final Materializer materializer = ActorMaterializer.create(system);

        // すでに作成済みの Source に処理を追加して新たな Source を作成する

        // 1 〜 100 の Integer を生み出す Source を作成する
        final Source<Integer, NotUsed> otherSource = Source.range(1, 100);
        // 1 ~ 100 の Integer を生み出す Source を使って、100 個の 2 の累乗を生み出す Source を作成する
        final Source<BigInteger, NotUsed> source = otherSource.map(n -> BigInteger.valueOf(2).pow(n));


        // 複数の Flow をつなぎ合わせた Sink を新たな Sink として作成する

        final Sink<BigInteger, CompletionStage<IOResult>> sink = Flow
                // BigInteger 型のデータを処理する Flow として Sink を作成する
                .of(BigInteger.class)
                // 受け取ったデータを 1 行ずつ出力するために、1 行分の文字列に変換する Flow を通す
                .map(s -> s.toString() + "\n")
                // 受け取ったデータをファイル出力を行う Sink につなげるために ByteString 型に変換する Flow を通す
                .map(n -> ByteString.fromString(n.toString()))
                // ファイル出力を行う Sink につなぐ
                .toMat(FileIO.toPath(Paths.get("result.txt")), Keep.right());


        // 作成済みの Sink と Flow から Graph を作成する

        final CompletionStage<IOResult> result = source
                // 受け取った数値のうち、1024 でない数に絞り込む Flow を通す
                .filter(i -> !i.equals(BigInteger.valueOf(1024)))
                // ファイル出力する Sink につないで Graph を実行する
                .runWith(sink, materializer);

        // Graph に対して run() を実行し、実処理を開始する
        result.thenRun(() -> system.terminate());
    }

}
このソースコードには 2 つの Source が登場します。1 つめの otherSource は、先程の例と同じく 1 〜 100 までの Integer 値を生み出す Source です。2 つめの source は 1 つめの otherSource につながれていて、otherSource から渡された値を指数とする 2 の累乗を生成します。厳密に数値を生成しているのは source ですが、otherSource は source を内包しているため、これ自体も Source であることになります。そしてソースコード上でも、他の Source を内包する Source を Source 型インスタンスとして作ることができます。

次に変数 sink について見てみます。これは型が Sink 型であることからもわかるとおり、ソースコード上は Sink として宣言されています。しかし sink に代入されている値は単純な Sink 型インスタンスではなく、Flow クラスの map() メソッドが呼ばれることでいくつかの Flow がつなげられていることがわかります。
全体としてファイル出力を担当するこの Graph の中で、最後に行き着くのは FileIO.toPath() メソッドによって返される Sink インスタンスです。これは渡された値を引数のパスのファイルに出力します。そしてこの Sink の手前に、受け取った値のデータ型を調整したり、テキストファイルに出力するための整形をしたりする Flow がつながれています。

以降のコードでは、これらの構造化された Graph をつなぎあわせて最終的な Graph を作り、実行しています。2 の累乗を 100 個作り出す Source に、1024 でない数字のみに絞り込む Flow をつなげ、ファイル出力する Sink につなげています。

ここまで見てきた中で重要となるのは、Graph を構成する Source も Flow も Sink も別の更に小さな Graph として独立して実装できることです。各 Element を他の Element に影響されない形で実装できるということは、各 Element をその目的に即した形できれいに設計でき、そして設計した Element を再利用可能な部品として使っていけるということになります。
  Akka Stream  コメント (0)  2018/02/16 17:18:54


公開範囲:
プロフィール HN: ももかん
ゲーム作ったり雑談書いたり・・・していた時期が私にもありました。
カレンダー
<<2018, 11>>
28293031123
45678910
11121314151617
18192021222324
2526272829301