UniRx: Observable.ReturnUnit のソースを読む
Observable.ReturnUnit
のサンプルコード
using System; using UniRx; using UnityEngine; public class Main : MonoBehaviour { void Start() { var s = Observable.ReturnUnit(); s.Subscribe(e => Debug.Log("e: " + e), () => Debug.Log("OnCompleted")); } }
実行結果です。
e: () OnCompleted
Observable.ReturnUnit
のソースを読む
Observable.ReturnUnit()
を Subscribe()
するとき、内部でどのような処理が行われるのか眺めてみます。
まずは Observable.ReturnUnit()
の内部を見てみます。
/// <summary> /// Same as Observable.Return(Unit.Default); but no allocate memory. /// </summary> public static IObservable<Unit> ReturnUnit() { return ImmutableReturnUnitObservable.Instance; }
ImmutableReturnUnitObservable.Instance
インスタンスが使い回されているようです。
次に ImmutableReturnUnitObservable
クラスの実装を眺めてみます。
internal class ImmutableReturnUnitObservable : IObservable<Unit>, IOptimizedObservable<Unit> { internal static ImmutableReturnUnitObservable Instance = new ImmutableReturnUnitObservable(); ImmutableReturnUnitObservable() { } public bool IsRequiredSubscribeOnCurrentThread() { return false; } public IDisposable Subscribe(IObserver<Unit> observer) { observer.OnNext(Unit.Default); observer.OnCompleted(); return Disposable.Empty; } }
ふむふむ。Subscribe()
するとすぐに OnNext()
と OnCompleted()
を順に呼び出しています。
ふと疑問。サンプルコードで呼び出している Subscribe()
は 2 引数なのですが、上の定義では 1 引数の定義しかありませんね。
2 引数を受け取る Subscribe()
はどこで定義されているのでしょうか。
コードを読むと、以下の拡張メソッドで定義されていました。
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action onCompleted) { return source.Subscribe(Observer.CreateSubscribeObserver(onNext, Stubs.Throw, onCompleted)); }
ImmutableReturnUnitObservable
クラスの Subscribe()
の引数には、Observer.CreateSubscribeObserver()
の戻り値が渡されます。
Observer.CreateSubscribeObserver()
の内部では、引数で渡した onNext
, onCompleted
を呼び出す Subject<T>
のインスタンスを生成しています。
// same as AnonymousObserver... class Subscribe<T> : IObserver<T> { readonly Action<T> onNext; readonly Action<Exception> onError; readonly Action onCompleted; int isStopped = 0; public Subscribe(Action<T> onNext, Action<Exception> onError, Action onCompleted) { this.onNext = onNext; this.onError = onError; this.onCompleted = onCompleted; } public void OnNext(T value) { if (isStopped == 0) { onNext(value); } } public void OnCompleted() { if (Interlocked.Increment(ref isStopped) == 1) { onCompleted(); } } // (コード一部省略・・・) }
以上で、Observable.ReturnUnit()
を Subscribe()
するときの処理の流れがわかりました。
ポイントは、ImmutableReturnUnitObservable.Subscribe()
の処理でした(再掲)。
internal class ImmutableReturnUnitObservable : IObservable<Unit>, IOptimizedObservable<Unit> { public IDisposable Subscribe(IObserver<Unit> observer) { observer.OnNext(Unit.Default); observer.OnCompleted(); return Disposable.Empty; } }
Subscribe()
すると即座に OnNext()
, OnCompleted()
を呼び出します。
引数として渡された observer
の参照を掴んでいませんので、サンプルコード上で生成しているクロージャ(onNext
, onCompleted
)はリークしていないことも確認できました。