UniRx: Observable.ReturnUnit のソースを読む
Observable.ReturnUnit のサンプルコード
using System; using UniRx; using UnityEngine; public class Main : MonoBehaviour { void Start() { var s = Observable.ReturnUnit(); s.Subscribe(e => Debug.Log("e: " + e), () => Debug.Log("OnCompleted")); } }
実行結果です。
e: () OnCompleted
Observable.ReturnUnit のソースを読む
Observable.ReturnUnit() を Subscribe() するとき、内部でどのような処理が行われるのか眺めてみます。
まずは Observable.ReturnUnit() の内部を見てみます。
/// <summary> /// Same as Observable.Return(Unit.Default); but no allocate memory. /// </summary> public static IObservable<Unit> ReturnUnit() { return ImmutableReturnUnitObservable.Instance; }
ImmutableReturnUnitObservable.Instance インスタンスが使い回されているようです。
次に ImmutableReturnUnitObservable クラスの実装を眺めてみます。
internal class ImmutableReturnUnitObservable : IObservable<Unit>, IOptimizedObservable<Unit> { internal static ImmutableReturnUnitObservable Instance = new ImmutableReturnUnitObservable(); ImmutableReturnUnitObservable() { } public bool IsRequiredSubscribeOnCurrentThread() { return false; } public IDisposable Subscribe(IObserver<Unit> observer) { observer.OnNext(Unit.Default); observer.OnCompleted(); return Disposable.Empty; } }
ふむふむ。Subscribe() するとすぐに OnNext() と OnCompleted() を順に呼び出しています。
ふと疑問。サンプルコードで呼び出している Subscribe() は 2 引数なのですが、上の定義では 1 引数の定義しかありませんね。
2 引数を受け取る Subscribe() はどこで定義されているのでしょうか。
コードを読むと、以下の拡張メソッドで定義されていました。
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action onCompleted) { return source.Subscribe(Observer.CreateSubscribeObserver(onNext, Stubs.Throw, onCompleted)); }
ImmutableReturnUnitObservable クラスの Subscribe() の引数には、Observer.CreateSubscribeObserver() の戻り値が渡されます。
Observer.CreateSubscribeObserver() の内部では、引数で渡した onNext, onCompleted を呼び出す Subject<T> のインスタンスを生成しています。
// same as AnonymousObserver... class Subscribe<T> : IObserver<T> { readonly Action<T> onNext; readonly Action<Exception> onError; readonly Action onCompleted; int isStopped = 0; public Subscribe(Action<T> onNext, Action<Exception> onError, Action onCompleted) { this.onNext = onNext; this.onError = onError; this.onCompleted = onCompleted; } public void OnNext(T value) { if (isStopped == 0) { onNext(value); } } public void OnCompleted() { if (Interlocked.Increment(ref isStopped) == 1) { onCompleted(); } } // (コード一部省略・・・) }
以上で、Observable.ReturnUnit() を Subscribe() するときの処理の流れがわかりました。
ポイントは、ImmutableReturnUnitObservable.Subscribe() の処理でした(再掲)。
internal class ImmutableReturnUnitObservable : IObservable<Unit>, IOptimizedObservable<Unit> { public IDisposable Subscribe(IObserver<Unit> observer) { observer.OnNext(Unit.Default); observer.OnCompleted(); return Disposable.Empty; } }
Subscribe() すると即座に OnNext(), OnCompleted() を呼び出します。
引数として渡された observer の参照を掴んでいませんので、サンプルコード上で生成しているクロージャ(onNext, onCompleted)はリークしていないことも確認できました。