Reactive Extensions 入門(5)

Reactive Extensions(以下 Rx)で個人的に注目していたものに、非同期呼び出しを IObservable に変換する機能があります。これに興味をもったから Rx に入門したといってもいいくらいです。
ちなみにここで言う「非同期呼び出し」は、「Beginなんとか」と「Endなんとか」を使った呼び出しのこと。今回はBeginInvoke/EndInvoke を使います。


例えば、次の関数を非同期で呼び出したいとき。

Func<string, string> hello = name =>
{
    Thread.Sleep(5000);
    return string.Format("Hello,{0}.", name);
};

BeginInvoke/EndInvoke を使って、フツーに書くとこうなります。

hello.BeginInvoke("ルフィ", delegate(IAsyncResult result)
{
    var s = hello.EndInvoke(result);
    Console.WriteLine(s);
}, null);

BeginInvoke に渡すコールバックを、匿名メソッドを使ってその場に書いているので、この段階ではシンプルです。

同じ事を Rx を使って実装します。Observable には FromAsyncPattern というメソッドがあって、このメソッドを使えば、非同期呼び出しを、IObservableを返すメソッドに変換できます。

Observable.FromAsyncPattern<string, string>(
        hello.BeginInvoke,
        hello.EndInvoke)("ゾロ")
    .Subscribe(s => Console.WriteLine(s));

…これだと Rx を使うメリットがわからないですね。


例えば非同期処理A・B・Cがあって、Aが終わったらB、Bが終わったらC、という風に非同期処理を順番に実行したい場合に役立ちます。

Func<string, string> hello = name =>
{
    Thread.Sleep(5000);
    return string.Format("Hello,{0}.", name);
};
Func<string, string> hi = name =>
{
    Thread.Sleep(2000);
    return string.Format("Hi,{0}.", name);
};
Func<string, string> afterNoon = name =>
{
    Thread.Sleep(3000);
    return string.Format("Good afternoon, {0}.", name);
};

この3つの関数を非同期に、かつ上から順に実行する場合、BeginInvoke/EndInvoke を使うとこうなります。

hello.BeginInvoke("ナミ", delegate(IAsyncResult result1)
{
    var s = hello.EndInvoke(result1);
    Console.WriteLine(s);

    hi.BeginInvoke("ウソップ", delegate(IAsyncResult result2)
    {
        var s2 = hi.EndInvoke(result2);
        Console.WriteLine(s2);

        afterNoon.BeginInvoke("サンジ", delegate(IAsyncResult result3)
        {
            var s3 = afterNoon.EndInvoke(result3);
            Console.WriteLine(s3);
        }, null);
    }, null);
}, null);

入れ子になっていて見づらいですね。たった3つでこれです。もっと数が増えたら…

f:id:griefworker:20110128224710j:image

そこで Rx の出番。FromAsyncPattern に SelectMany を組み合わせることで、コールバックを何重も入れ子にする必要がなくなります。

Observable.FromAsyncPattern<string, string>(
        hello.BeginInvoke,
        hello.EndInvoke)("チョッパー")
    .SelectMany(s =>
    {
        Console.WriteLine(s);
        return Observable.FromAsyncPattern<string, string>(
            hi.BeginInvoke,
            hi.EndInvoke)("ロビン");
    })
    .SelectMany(s =>
    {
        Console.WriteLine(s);
        return Observable.FromAsyncPattern<string, string>(
            afterNoon.BeginInvoke,
            afterNoon.EndInvoke)("フランキー");
    })
    .Subscribe(s => Console.WriteLine(s));

非同期処理の数が増えると、SelectMany が増えるだけ。「順番に呼び出している」というのがわかりやすいですね。