Reactive Extensions 入門(2)

Reactive Extensions (以下 Rx) には中核を担うクラスがあります。その名も Observable。LINQ でおなじみの Enumerable を彷彿とさせる名前ですね。

Observable クラスは IObservable を生成するメソッドと、その IObservable 用の拡張メソッドを提供しています。名前だけでなく、提供する機能も似ていますね。他にもイベントや非同期処理で使うようなメソッドも提供していますが。

「Observable を制する者は Rx を制す」と、誰かが言った気がします。誰も言っていないなら、私が言ったことにします。今回は、その Observable クラスの IObservable を生成するメソッドをご紹介。

Return メソッド

渡した値を単純に通知する IObservable を生成します。単純な値を IObservable に変換して Rx で処理したいときに使っています。

var obs = Observable.Return(1);
obs.Subscribe(n => Console.WriteLine("Return1:{0}", n));

// [実行結果]
// Return1:1

Range メソッド

指定した範囲の値を通知する IObservable を生成します。Enumerable.Range に似ていますね。

var obs = Observable.Range(1, 5);
obs.Subscribe(n => Console.WriteLine("Range:{0}", n));

// [実行結果]
// Range:1
// Range:2
// Range:3
// Range:4
// Range:5

Repeat メソッド

第1引数で渡したデータを、第2引数で指定した回数繰り返して通知するような IObservable を生成します。

var obs = Observable.Repeat(new { Name = "Foo" }, 3);
obs.Subscribe(o => Console.WriteLine("Repeat:{0}", o.Name));

// [実行結果]
// Repeat:Foo
// Repeat:Foo
// Repeat:Foo

Generate メソッド

初期状態と、継続するか判定するデリゲートと、次の値を返すデリゲートと、IObservable に格納する値を返すデリゲートを指定します。
for 文を使って IObservable を生成するイメージ。分かりにくいですね。Range や Repeat よりも複雑な IObservable を生成できます。

var obs = Observable.Generate(
    0,
    n => n < 5,
    n => n + 1,
    n => new { Num = n });
obs.Subscribe(o => Console.WriteLine("Generate:{0}", o.Num));

// [実行結果]
// Generate:0
// Generate:1
// Generate:2
// Generate:3
// Generate:4

Case メソッド

複数用意された IObservable の中から、どれか1つを選択します。選択する方法もデリゲートで指定。
使いどころが難しいです。というか思いつきません。

// 3通りの IObservable<T> を用意
var sources = new Dictionary<int, IObservable<string>>()
{
    {0, Observable.Return("foo") },
    {1, Observable.Return("bar") },
    {2, Observable.Return("hoge") },
};

// 3つのうちどれか1つをランダムで選ぶ
Func<int> selector = () =>
{
    return new Random().Next(3);
};

var obs = Observable.Case(selector, sources);
obs.Subscribe(s => Console.WriteLine("Case1:{0}", s));
obs.Subscribe(s => Console.WriteLine("Case2:{0}", s));
obs.Subscribe(s => Console.WriteLine("Case3:{0}", s));

// [実行結果]
// Case1:foo
// Case2:foo
// Case3:bar

Throw メソッド

Subscribe すると例外を通知する IObservable を生成します。例外は Subscribe に渡したエラーハンドラの中で受け取れます。
テストくらいでしか使い道はなさそうです。

var obs = Observable.Throw<int>(new ArgumentOutOfRangeException("n", "hoge"));
obs.Subscribe(
    n => Console.WriteLine(n),
    ex => Console.WriteLine(ex.Message));

// [実行結果]
// hoge
// パラメーター名: n

Start メソッド

渡したデリゲートが返す値を非同期で通知するような IObservable を生成します。
Rx で非同期処理を行うときにお世話になることが多いメソッドです。

var obs = Observable.Start(() => new { Name = "Foo" });
obs.Subscribe(o => Console.WriteLine("Start:{0}", o.Name));

// [実行結果]
// Start:Foo

Defer メソッド

渡したファクトリを Subscribe するたびに実行するような IObservable を生成します。

var obs = Observable.Defer(() =>
{
    return Observable.Return(Guid.NewGuid());
});
obs.Subscribe(n => Console.WriteLine("Defer1:{0}", n));
obs.Subscribe(n => Console.WriteLine("Defer2:{0}", n));

// [実行結果]
// Defer1:da13a72d-0990-411e-a9f7-e9fded89fcbb
// Defer2:7d5c7702-6025-4d1b-8327-588e32b5ee49