UniRx: Hot と Cold (1)

Rx の Hot, Cold について、以下のスライドが参考になります。

www.slideshare.net

このスライドではプログラミング言語として Swift を採用していますが、Rx は特定のプログラミング言語に依存しない概念です。
本記事では UniRx における実装を参照しながらスライドの各内容について見ていきます。

Subject は「Observable」である

Subject<T> クラス定義をみると、

public sealed class Subject<T> : ISubject<T>, IDisposable, IOptimizedObservable<T>
{
    // 実装は省略...
}

となっています。ISubject<T> の実装をみると、

public interface ISubject<T> : ISubject<T, T>, IObserver<T>, IObservable<T>
{
}

となっており、IObservable<T> を実装しています。

よって、Subject<T>IObservable<T> です。

各種 Operator は「Observable」である

たとえば Select オペレーターの実装をみてみますと、

public static partial class Observable
{       
    public static IObservable<TR> Select<T, TR>(this IObservable<T> source, Func<T, int, TR> selector)
    {
        return new SelectObservable<T, TR>(source, selector);
    }
}    

SelectObservable というクラスのインスタンスを返しているのがわかります。

SelectObservable の定義をみると、

internal class SelectObservable<T, TR> : OperatorObservableBase<TR>, ISelect<TR>
{
}

となっており、親クラス OperatorObservableBase<TR> の定義は、

public abstract class OperatorObservableBase<T> : IObservable<T>, IOptimizedObservable<T>
{
}

となっています。

OperatorObservableBase<T>IObservable<T> を実装します。
よって、Select オペレーターは IObservable<T> です。

他のオペレーターについても Select と非常によく似た構造になっています。
たとえば Where オペレーターについてみてみますと、内部では WhereObservable<T> クラスのインスタンスが作られますが、 この WhereObservable<T> は、SelectObservable と同様に OperatorObservableBase<T> のサブクラスになっています。

Cold な Observable は Subscribe されるまで動作しない

Cold な Observable は Subscribe されるまで動作しません。
たとえば、以下のコードは実行しても何も出力されません。

var s = Observable.Range(1, 3)
    .Select(e =>
    {
        Debug.Log(e);
        return e;
    });

Subscribe するとストリームが流れるようになります。

var s = Observable.Range(1, 3)
    .Select(e =>
    {
        Debug.Log(e);
        return e;
    });
s.Subscribe();

実行結果です。

1
2
3

Hot な Observable は Subscribe しなくても動作する

Cold は Subscribe されるまで動作しない

下のコードは実行しても何も出力されません。

var subject = new Subject<int>();
subject.Select(e =>
{
    Debug.Log(e);
    return e;
});

subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnCompleted();

Subject は Hot ですが、Select オペレーターは Cold であり、Subscribe されるまで動作しないためです。

f:id:noriok:20180916190513p:plain

Hot は前の Cold を可動させる

var subject = new Subject<int>();
subject.Select(e =>
{
    Debug.Log(e);
    return e;
}).Publish().Connect(); // Hot 変換

subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnCompleted();

実行結果です。

1
2
3

f:id:noriok:20180916190521p:plain

Hot 変換とは何か

Publish メソッドを呼び出すとストリームが Hot になります。
具体的に内部ではどのような処理が行われるのか、ソースを見ていきます。

public static IConnectableObservable<T> Multicast<T>(this IObservable<T> source, ISubject<T> subject)
{
    return new ConnectableObservable<T>(source, subject);
}

public static IConnectableObservable<T> Publish<T>(this IObservable<T> source)
{
    return source.Multicast(new Subject<T>());
}

Publish メソッドを呼ぶと、ConnectableObservable<T> クラスのインスタンスが作られます。
このクラスの Connect メソッドを呼び出すと、内部で Subscribe が呼ばれているのがわかります。

class ConnectableObservable<T> : IConnectableObservable<T>
{
    readonly IObservable<T> source;
    readonly ISubject<T> subject;
    readonly object gate = new object();
    Connection connection;

    public ConnectableObservable(IObservable<T> source, ISubject<T> subject)
    {
        this.source = source.AsObservable();
        this.subject = subject;
    }

    public IDisposable Connect()
    {
        lock (gate)
        {
            // don't subscribe twice
            if (connection == null)
            {
                var subscription = source.Subscribe(subject);
                connection = new Connection(this, subscription);
            }

            return connection;
        }
    }
}    

つまり、(スライドにも記載ありますが)Hot な Observable は内部で Subscribe を呼び出すことで Hot になるようです。

ここまででスライドの 62 of 115 まで見てきました。