Reactive Extensions 入門(3)

Reactive Extensions (以下 Rx) の Observable クラスには、IObservable 用の拡張メソッドが大量に用意されています。インテリセンスに表示されるメソッドが多いのなんの。その中には Where や Select といった、LINQ でお馴染みのメソッドも数多く見られます。

拡張メソッドを使って何が嬉しいかというと、Subscribe で通知する内容をいじれること。フィルタリングしたり、変換したり、合成したりと自由自在。私自身、触り始めたばかりなので、嬉しい場面にはまだ出会っていなんですけどね。本格的に開発で使うようになれば出会うでしょう。きっと。

とにかく拡張メソッドの数が膨大なので、よく使いそうなものだけを、独断と偏見で紹介します。LINQ で使っていたメソッドばかりですけど、残りのメソッドは追々。

Where メソッド

IObservable が通知する内容をフィルタリングします。LINQ で使っている Where と使い方は同じです。

Observable.Range(1, 5)
          .Where(n => n % 2 != 0)
          .Subscribe(n => Console.WriteLine("Where:{0}", n));

// [実行結果]
// Where:1
// Where:3
// Whare:5

Select メソッド

IObservable が通知する内容を変換します。下は数値を文字列に変換する単純なサンプル。

Observable.Range(1, 5)
          .Select(n => "test" + n)
          .Subscribe(s => Console.WriteLine("Select:{0}", s));

// [実行結果]
// Select:test1
// Select:test2
// Select:test3
// Select:test4
// Select:test5

SelectMany メソッド

SelectMany は渡された関数が返す IObservable を1つにまとめます。

Select の戻り値で IObservable を返すと、メソッドチェインの次に渡されるデータが、IObservable> にされてしまいます。そこで SelectMany の出番。

SelectMany に渡すハンドラ内では、 IObservable を返す処理(サービス呼び出しとか) を書いたりしています。下は単純に IObservable を生成して返しているだけですけど。

Observable.Range(1, 3)
          .SelectMany(n => Observable.Range(1, n))
          .Subscribe(n => Console.WriteLine("SelectMany:{0}", n));

// [実行結果]
// SelectMany:1
// SelectMany:1
// SelectMany:2
// SelectMany:1
// SelectMany:2
// SelectMany:3

Aggregate メソッド

IObservable が通知する内容を集約します。合計を計算するときにも使えますし、1つの文字列として結合するときに使ったりもします。
下は5の階乗を出力するサンプル。

Observable.Range(1, 5)
          .Aggregate((x, y) => x * y)
          .Subscribe(n => Console.WriteLine("Aggregate:{0}", n));

// [実行結果]
// Aggregate:120

Zip メソッド

2 つの IObservable が通知する内容を合成します。通知の数が揃っていない場合、少ない方に揃えられてしまうので、要注意。

Observable.Range(1, 5)
          .Zip(Observable.Range(1, 3),
               (n, right) => string.Format("left={0}, right={1}", n, right))
          .Subscribe(s => Console.WriteLine("Zip:{0}", s));

// [実行結果]
// Zip:left=1, right=1
// Zip:left=2, right=2
// Zip:left=3, right=3