みんからきりまで

きりみんです。

RxJavaについて調べた試した

RxJava Nightに向けてRxJavaについて調べ始めたものの、日本語の情報がほとんど無くて辛かったので調べた事をまとめました。
RxJavaやリアクティブプログラミングについては今までまったく知らなかったので、内容には間違いが含まれている可能性があります。予めご容赦頂いた上でご覧ください。


元ネタはほぼ
ReactiveX/RxJava/Wiki
Grokking RxJava
【翻訳】あなたが求めていたリアクティブプログラミング入門 - ninjinkun's diary
です。

RxJavaとは

RxJavaとはリアクティブプログラミングを行うためのライブラリであるRx(Reactive Extensions)のJVM版とのこと。
リアクティブプログラミングの概念についてはそれほどよく理解出来ていないので、上記の記事などを参照してください。


StreamAPI+非同期コールバックみたいな感じ。
イベントを定義したObservableインスタンスに対して短い関数をチェーンしていくストリーム的にイベント結果のデータを加工する処理を定義し、遅延実行・非同期コールバックを行う事が出来ます。

基本的な使い方

処理を実行して結果を伝えるObservableと、結果を受け取った時の処理を決めるObserverを使う。


まずは"Hello"と"world"という2つの文字列を結果として伝えるシンプルなObservable生成してみる。
java.util.Observableではないので注意

Observable<String> myObservable = Observable.create(
    new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("Hello");
            subscriber.onNext("world!");
            subscriber.onCompleted();
        }
    }
);


次に結果を受け取る側であるObserverを生成する。

Observer<String> myObserver = new Observer<String>() {
    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
};


最後にObservableのsubscribeメソッドにObserverを渡して実行。
この時点で初めてObservableが実行され、onNextに渡した結果が順番にObserverに伝わる。

myObservable.subscribe(myObserver);

そして、
"Hello"
"world!"
が出力される。

もっと簡潔に書く

上記は丁寧な例だが、用意された様々なメソッドを活用する事でもっと簡潔に記述する事が出来る。
fromは受け取った配列やIteratorの要素を順番に結果として返すObservableを生成してくれる。
また、subscribeにはObserverの代わりに関数(Action1インターフェイスの実装)を一つだけ渡す事で、onNext時に関数を実行してくれる。

Observable<String> myObservable = Observable.from(new String[]{"Hello", "world!"});
myObservable.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        System.out.println(s);
    }
});


Java8ならAction1は関数型インターフェイスになるのでここまでの処理はラムダ式で一行になる。

Observable.from(new String[]{"Hello", "world!"}).subscribe(System.out::println);

Observableを新たなObservableに変換する

map

mapメソッドはObservableを別のObservableに変換する。
例えば先程のObservableから受け取りたいのが文字列そのものではなくて文字列の長さだったとする。
mapを使えば文字列を伝えるObservableを元に文字列の長さを伝える新たなObservableを簡単に生成する事が出来る。

Observable.from(new String[]{"Hello", "world!"})
        .map(new Func1<String, Integer>() {
            @Override
            public Integer call(String s) {
                return s.length();
            }
        })
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer i) {
                System.out.println(i)
            }
        });

これは
"5"
"6"
を出力する。

flatMap

それでは配列の要素がネストしていたらどうだろう。
flatMapを使ってネストした要素を並列の結果として伝える事が出来る。
ここでflatMapのFunc1ではObservableを返しているが、flatMapはこのObservableを分解して並列な一つのObservableを生成してくれている。

String[][] helloAndGoodbye = {{"Hello", "world!"}, {"goodbye", "world!"}};
Observable.from(helloAndGoodbye)
        .flatMap(new Func1<String[], Observable<String>>() {
            @Override
            public Observable<String> call(String[] strings) {
                // ここに伝わるのは2つの配列。それをObservableにして返す。
                return Observable.from(strings);
            }
        })
        .map(new Func1<String, Integer>() {
            @Override
            public Integer call(String s) {
                // ここに伝わるのは4つの文字列。
                return s.length();
            }
        })
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer i) {
                 System.out.println(i)
            }
        });

"5"
"6"
"7"
"6"
が出力される。

marge

複数のObservableを合成するmargeを使用してflatMapを使わずに書くことも出来る。

String[][] helloAndGoodbye = {{"Hello", "world!"}, {"goodbye", "world!"}};
Observable.merge(Observable.from(helloAndGoodbye[0]), Observable.from(helloAndGoodbye[1]))
        .map(new Func1<String, Integer>() {
            @Override
            public Integer call(String s) {
                return s.length();
            }
        })
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer i) {
                 System.out.println(i)
            }
        });
filter

filterを使うと結果を取捨選択する事が出来る。
ここでは文字列が"world!"と一致するものを除外している。

String[][] helloAndGoodbye = {{"Hello", "world!"}, {"goodbye", "world!"}};
Observable.from(helloAndGoodbye)
        .flatMap(new Func1<String[], Observable<String>>() {
            @Override
            public Observable<String> call(String[] strings) {
                return Observable.from(strings);
            }
        })
        .filter(new Func1<String, Boolean>() {
            @Override
            public Boolean call(String s) {
                return !s.equals("world!");
            }
        })
        .map(new Func1<String, Integer>() {
            @Override
            public Integer call(String s) {
                return s.length();
            }
        })
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer i) {
                 System.out.println(i)
            }
        });

"5"
"7"
が出力される。


ここまでをJava8のラムダ式で書くと下記のようになる。

String[][] helloAndGoodbye = {{"Hello", "world!"}, {"goodbye", "world!"}};
Observable.from(helloAndGoodbye)
        .flatMap(Observable::from)
        .filter(s -> !s.equals("world!"))
        .map(String::length)
        .subscribe(System.out::println);


無名クラスが連なっていると正直あんまり見やすいコードではないですが、ラムダ式にするとifやforを多用して処理を記述するのに比べかなり直感的なコードになりますね。
Wikiを参照して貰えば分かるかと思いますが、RxJavaには他にも相当数の便利な関数が用意されているようです。
例のような比較的単純な処理ではそこまで有り難みを感じないかもしれませんが、データの加工が複雑になればなるほど効果を発揮しそうな気がします。
また、RxJavaで処理を書くと自然と処理の単位が小さくなり入力・出力が明確になるため、メソッドとして切り出してテスト書くのがやりやすくなるというメリットもありそうです。


RXJavaにはAndroid用のモジュールも用意されているので、実際のAndroidアプリでのユーケースに合わせた例も書いてみたいと思います。