UniRx: Hot と Cold (1)
Rx の Hot, Cold について、以下のスライドが参考になります。
www.slideshare.net
このスライドではプログラミング言語として Swift を採用していますが、Rx は特定のプログラミング言語に依存しない概念です。
本記事では UniRx における実装を参照しながらスライドの各内容について見ていきます。
Subject は「Observable」である
Subject<T>
クラス定義をみると、
public sealed class Subject<T> : ISubject<T>, IDisposable, IOptimizedObservable<T> { // 実装は省略... }
となっています。ISubject<T>
の実装をみると、
public interface ISubject<T> : ISubject<T, T>, IObserver<T>, IObservable<T> { }
となっており、IObservable<T>
を実装しています。
よって、Subject<T>
は IObservable<T>
です。
各種 Operator は「Observable」である
たとえば Select
オペレーターの実装をみてみますと、
public static partial class Observable { public static IObservable<TR> Select<T, TR>(this IObservable<T> source, Func<T, int, TR> selector) { return new SelectObservable<T, TR>(source, selector); } }
SelectObservable
というクラスのインスタンスを返しているのがわかります。
SelectObservable
の定義をみると、
internal class SelectObservable<T, TR> : OperatorObservableBase<TR>, ISelect<TR> { }
となっており、親クラス OperatorObservableBase<TR>
の定義は、
public abstract class OperatorObservableBase<T> : IObservable<T>, IOptimizedObservable<T> { }
となっています。
OperatorObservableBase<T>
は IObservable<T>
を実装します。
よって、Select
オペレーターは IObservable<T>
です。
他のオペレーターについても Select
と非常によく似た構造になっています。
たとえば Where
オペレーターについてみてみますと、内部では WhereObservable<T>
クラスのインスタンスが作られますが、
この WhereObservable<T>
は、SelectObservable
と同様に OperatorObservableBase<T>
のサブクラスになっています。
Cold な Observable は Subscribe されるまで動作しない
Cold な Observable は Subscribe されるまで動作しません。
たとえば、以下のコードは実行しても何も出力されません。
var s = Observable.Range(1, 3) .Select(e => { Debug.Log(e); return e; });
Subscribe
するとストリームが流れるようになります。
var s = Observable.Range(1, 3) .Select(e => { Debug.Log(e); return e; }); s.Subscribe();
実行結果です。
1 2 3
Hot な Observable は Subscribe しなくても動作する
Cold は Subscribe されるまで動作しない
下のコードは実行しても何も出力されません。
var subject = new Subject<int>(); subject.Select(e => { Debug.Log(e); return e; }); subject.OnNext(1); subject.OnNext(2); subject.OnNext(3); subject.OnCompleted();
Subject
は Hot ですが、Select
オペレーターは Cold であり、Subscribe
されるまで動作しないためです。
Hot は前の Cold を可動させる
var subject = new Subject<int>(); subject.Select(e => { Debug.Log(e); return e; }).Publish().Connect(); // Hot 変換 subject.OnNext(1); subject.OnNext(2); subject.OnNext(3); subject.OnCompleted();
実行結果です。
1 2 3
Hot 変換とは何か
Publish
メソッドを呼び出すとストリームが Hot になります。
具体的に内部ではどのような処理が行われるのか、ソースを見ていきます。
public static IConnectableObservable<T> Multicast<T>(this IObservable<T> source, ISubject<T> subject) { return new ConnectableObservable<T>(source, subject); } public static IConnectableObservable<T> Publish<T>(this IObservable<T> source) { return source.Multicast(new Subject<T>()); }
Publish
メソッドを呼ぶと、ConnectableObservable<T>
クラスのインスタンスが作られます。
このクラスの Connect
メソッドを呼び出すと、内部で Subscribe
が呼ばれているのがわかります。
class ConnectableObservable<T> : IConnectableObservable<T> { readonly IObservable<T> source; readonly ISubject<T> subject; readonly object gate = new object(); Connection connection; public ConnectableObservable(IObservable<T> source, ISubject<T> subject) { this.source = source.AsObservable(); this.subject = subject; } public IDisposable Connect() { lock (gate) { // don't subscribe twice if (connection == null) { var subscription = source.Subscribe(subject); connection = new Connection(this, subscription); } return connection; } } }
つまり、(スライドにも記載ありますが)Hot な Observable は内部で Subscribe
を呼び出すことで Hot になるようです。
ここまででスライドの 62 of 115 まで見てきました。