RxJavaのFiltering Operatorの使い方をまとめてみた
ReactiveExtenstionの根本的な概念としてObservableとOperatorがあります。前回はRxJavaをより深く理解するためにObservableの生成メソッドをまとめてみました。
今回は前回に続いて、Observableから流れてきた値をフィルタするFiltering Operatorの使い方をサンプルを作りながら確認してみます。
Filtering Operator一覧
ReactiveXの公式ページでリストアップされているFiltering Operatorは以下のとおりです。
- Debounce
- Distinct
- ElementAt
- Filter
- First
- IgnoreElements
- Last
- Sample
- Skip
- SkipLast
- Take
- TakeLast
結構多いですが以下で1つずつ試してみます。
Debounce
出力元Observableで値を出力後、指定した時間経過後まで新たな出力が行われなかった場合に値を出力する。文章ではわかりづらいが、簡単に言うとタイマーのようなものと思えば良い。
出力元Observableから値が出力されたら指定した時間のタイマーが動き出し、タイマー作動中に他の値が出力されたら前の値のタイマーはキャンセルされ、新しい値に対するタイマーが起動する。タイマーが発火したらその値が出力される。
以下のサンプルはボタンクリック後、2秒間ボタンクリックを行わなければ値が出力される。2秒以内にボタンを再度クリックすると、クリック時点からまた2秒間のカウントが始まる。
Observable<Void> clickObservable = RxView.clicks(mButton); RxView.clicks(mButton).debounce(2, TimeUnit.SECONDS).subscribe(v -> { Log.d("AAA", "debounce detected"); });
Distinct
Observableから重複した値を取り除く。引数に重複判定用のメソッドを渡すことで任意の重複判定を行うこともできる。
Observable.just(1, 1, 2, 3, 2, 1, 3, 3, 4).distinct().subscribe(i -> { Log.d("AAA", "distinct : " + i); }); // distinct : 1 // distinct : 2 // distinct : 3 // distinct : 4
DistinctUntilChanged
Distinctのバリエーションの1つで、連続で同じ値が出力された場合は1つに圧縮されるが、別の値が出力された後に再び元の値が出力された場合、その値を出力するというオペレータ。
文章だけでは分かりにくいが、例えば[A, A, B, A, B, B, B, C]という値を出力するObservableがあった場合、DistinctUntilChangedでフィルタリングすると[A, B, A, B, C]という出力となる。
Observable.just(1, 1, 2, 3, 2, 1, 3, 3, 4).distinctUntilChanged().subscribe(i -> { Log.d("AAA", "distinctUntilChanged : " + i); }); // distinctUntilChanged : 1 // distinctUntilChanged : 2 // distinctUntilChanged : 3 // distinctUntilChanged : 2 // distinctUntilChanged : 1 // distinctUntilChanged : 3 // distinctUntilChanged : 4
ElementAt
指定したインデックス個目の値のみを出力する。
Observable.range(1, 10).elementAt(4).subscribe(i -> { Log.d("AAA", "elementAt(4) : " + i); }); // elementAt(4) : 5
ElementAtOrDefault
指定したインデックス個目の値のみを出力する。指定したインデックスが存在しない場合にデフォルト値を指定することができる。
Observable.range(1, 10).elementAtOrDefault(20, 100).subscribe(i -> { Log.d("AAA", "elementAtOrDefault(20, 100) : " + i); }); // elementAtOrDefault(20, 100) : 100
Filter
booleanを返すフィルタメソッドを渡すことで、メソッドの戻り値がtrueになる値のみを出力する。
Observable.range(1, 50).filter(num -> { return num % 10 == 0; }).subscribe(filtered -> { Log.d("AAA", "filter : " + filtered); }); // filter : 10 // filter : 20 // filter : 30 // filter : 40 // filter : 50
ofType
Filterの一種で、メソッドではなくクラスを渡すことで、そのクラスと一致する値のみを出力する。静的型付け言語の場合基本的に型は指定されているためあまり活用の機会はないかもしれないが、動的型付け言語の場合はもしかしたら便利なのかも?
以下のサンプルではintからIntegerへの暗黙的型変換が行われるため、range(1, 5)
で出力される値がIntegerと判定されて出力されている。しかしjust(1.0, 2.0, 3.0)
とした場合はDoubleへの暗黙的型変換が行われるため何も出力されない。
Observable.range(1, 5).ofType(Integer.class).subscribe(filtered -> { Log.d("AAA", "ofType(Integer) : " + filtered); }); // ofType(Integer) : 1 // ofType(Integer) : 2 // ofType(Integer) : 3 // ofType(Integer) : 4 // ofType(Integer) : 5
First
最初にObservableから流れてきた値のみを出力する。
Observable.range(1, 100).first().subscribe(i -> { Log.d("AAA", "first : " + i); }); // first : 1
First
では引数にbooleanを返すフィルタメソッドを渡すことで、最初にフィルタの条件に該当した値のみを出力することもできる。例えばFizzBuzzを判定するメソッドを渡すと以下のようになる。
Observable.range(1, 100).first(num -> { return fizz(num) && buzz(num); }).subscribe(firstMatched -> { Log.d("AAA", "first(FizzBuzz) : " + firstMatched); }); // first(FizzBuzz) : 15 private boolean fizz(int num) { return num % 3 == 0; } private boolean buzz(int num) { return num % 5 == 0; }
IgnoreElements
値は何も出力せずにObservableの終了イベントのみに反応する。つまりonError
かonComplete
のみが実行される。
Observable.range(1, 10).ignoreElements().subscribe( i -> { Log.d("AAA", "ignoreElements : onNext : " + i); }, e -> { Log.d("AAA", "ignoreElements : onError "); }, () -> { Log.d("AAA", "ignoreElements : onComplete"); } ); // ignoreElements : onComplete
Last
Observableから最後に出力された値のみを出力する。
Observable.range(1, 100).last().subscribe(i -> { Log.d("AAA", "last : " + i); }); // last : 100
First
と同じくこちらもbooleanを返すメソッドを渡すことで、その条件に合致する最後の値のみを出力することができる。First
と同様にFizzBuzzを判定するメソッドを渡すと以下のようになる。
Observable.range(1, 100).last(num -> { return fizz(num) && buzz(num); }).subscribe(firstMatched -> { Log.d("AAA", "last(FizzBuzz) : " + firstMatched); }); // last(FizzBuzz) : 90
Sample
出力元Observableから出力される値を指定した時間間隔ごとに出力する。いわゆるサンプリング。指定した間隔(ここでは3秒)ごとに、まだフィルタリングされていない最新の値があればそれを出力する。出力元Observableが終了したらサンプリング間隔に関わらず最後の値が出力される。
subscribe
直後にサンプリングが行われるのかと思いきや、ここではなぜか1秒経過後*1からサンプリングが開始されているように見える。最初の出力は0になるんじゃないのか?この辺りの挙動は謎。教えてエロい人。
Observable.interval(1, TimeUnit.SECONDS).take(10).sample(3, TimeUnit.SECONDS).subscribe(i -> { Log.d("AAA", "sample(3) : " + i); }); // sample(3) : 1 // sample(3) : 4 // sample(3) : 7 // sample(3) : 9
throttleLast
Sample
の別名。単なるエイリアスとのことなので割愛*2。
throttleFirst
Sample
は指定した間隔経過時に出力元Observableから値をフィルタリングするが、こちらは指定した間隔の最初に値をフィルタリングする。これについては公式ページのマーブルダイアグラムを見ればよくわかる。端的に言えば、Sample
はサンプリング間隔の最後でサンプリングを実行し、throttleFirst
はサンプリング間隔の最初でサンプリングを実行する。
Observable.interval(1, TimeUnit.SECONDS).take(10).throttleFirst(3, TimeUnit.SECONDS).subscribe(i -> { Log.d("AAA", "throttleFirst(3) : " + i); }); // throttleFirst(3) : 0 // throttleFirst(3) : 3 // throttleFirst(3) : 6 // throttleFirst(3) : 9
Skip
Observableから出力される値のうち、引数で指定した最初のn個を無視する。
Observable.range(1, 10).skip(5).subscribe(i -> { Log.d("AAA", "skip(5) : " + i); }); // skip(5) : 6 // skip(5) : 7 // skip(5) : 8 // skip(5) : 9 // skip(5) : 10
Skip
では時間を指定することもできる。時間(int)と単位(TimeUnit.SECONDS等)を指定すると、指定した時間が経過するまでの出力は無視される。以下は0~9を1秒毎に出力するObservableを8秒間Skipする例。
Observable.interval(1, TimeUnit.SECONDS).take(10).skip(8, TimeUnit.SECONDS).subscribe(i -> { Log.d("AAA", "skip(8, TimeUnit.SECONDS) : " + i); }); // skip(8, TimeUnit.SECONDS) : 7 // skip(8, TimeUnit.SECONDS) : 8 // skip(8, TimeUnit.SECONDS) : 9
SkipLast
Observableから出力される値のうち、引数で指定した最後のn個を無視する。
Observable.range(1, 10).skipLast(5).subscribe(i -> { Log.d("AAA", "skipLast(5) : " + i); }); // skipLast(5) : 1 // skipLast(5) : 2 // skipLast(5) : 3 // skipLast(5) : 4 // skipLast(5) : 5
SkipLast
では時間を指定することもできる。時間(int)と単位(TimeUnit.SECONDS等)を指定すると、出力元Observableが終了後、終了時点から引数で指定した時間以前の出力は無視される。以下は0~9を1秒毎に出力するObservableの最後8秒間をSkipする例。
Observable.interval(1, TimeUnit.SECONDS).take(10).skipLast(8, TimeUnit.SECONDS).subscribe(i -> { // Observable終了後に実行される Log.d("AAA", "skipLast(8, TimeUnit.SECONDS) : " + i); }); // skipLast(8, TimeUnit.SECONDS) : 0 // skipLast(8, TimeUnit.SECONDS) : 1
Take
出力元のObservableから指定した個数の値のみ出力する。
Observable.range(1, 10).take(3).subscribe(i -> { Log.d("AAA", "take(3) : " + i); }); // take(3) : 1 // take(3) : 2 // take(3) : 3
Skip
同様、時間を指定することもできる。時間(int)と単位(TimeUnit.SECONDS等)を指定すると、指定した時間の間に出力元Observableから出力された値のみをフィルタリングできる。
Observable.interval(1, TimeUnit.SECONDS).take(3, TimeUnit.SECONDS).subscribe(i -> { Log.d("AAA", "take(3, TimeUnit.SECONDS) : " + i); }); // take(3, TimeUnit.SECONDS) : 0 // take(3, TimeUnit.SECONDS) : 1 // take(3, TimeUnit.SECONDS) : 2
TakeLast
出力元Observableから出力された最後のn個の値を出力する。
Observable.range(1, 10).takeLast(3).subscribe(i -> { Log.d("AAA", "takeLast(3) : " + i); }); // takeLast(3) : 8 // takeLast(3) : 9 // takeLast(3) : 10
他のオペレータ同様、このメソッドも時間を指定することができる。時間を指定すると、出力元Observable終了後、終了直前の指定した時間分の値が出力される。
Observable.interval(1, TimeUnit.SECONDS).take(10).takeLast(3, TimeUnit.SECONDS).subscribe(i -> { Log.d("AAA", "takeLast(3, TimeUnit.SECONDS) : " + i); }); // takeLast(3, TimeUnit.SECONDS) : 6 // takeLast(3, TimeUnit.SECONDS) : 7 // takeLast(3, TimeUnit.SECONDS) : 8 // takeLast(3, TimeUnit.SECONDS) : 9
なおこの場合、終了直前3秒間に出力されるのは7, 8, 9なので7, 8, 9が出力されるかと思いきや6も出力されている。これは、9が出力された瞬間を起点に3秒前までの値を出力しているから(9(0秒) -> 8(1秒後) -> 7(2秒後) -> 6(3秒後))と思われる。
TakeLastBuffer
出力元Observable終了後、指定した個数の出力をListにバッファリングして出力する。
Observable.range(1, 10).takeLastBuffer(3).subscribe(list -> { Log.d("AAA", "takeLastBuffer(3) : " + list); }); // takeLastBuffer(3) : [8, 9, 10]
*1:intervalの最初の出力は0なので