UniRx: Observable.ReturnUnit のソースを読む

Observable.ReturnUnit のサンプルコード

using System;
using UniRx;
using UnityEngine;

public class Main : MonoBehaviour
{   
    void Start()
    {
        var s = Observable.ReturnUnit();
        s.Subscribe(e => Debug.Log("e: " + e), () => Debug.Log("OnCompleted"));
    }
}

実行結果です。

e: ()
OnCompleted

Observable.ReturnUnit のソースを読む

Observable.ReturnUnit()Subscribe() するとき、内部でどのような処理が行われるのか眺めてみます。

まずは Observable.ReturnUnit() の内部を見てみます。

/// <summary>
/// Same as Observable.Return(Unit.Default); but no allocate memory.
/// </summary>
public static IObservable<Unit> ReturnUnit()
{
    return ImmutableReturnUnitObservable.Instance;
}

ImmutableReturnUnitObservable.Instance インスタンスが使い回されているようです。

次に ImmutableReturnUnitObservable クラスの実装を眺めてみます。

internal class ImmutableReturnUnitObservable : IObservable<Unit>, IOptimizedObservable<Unit>
{
    internal static ImmutableReturnUnitObservable Instance = new ImmutableReturnUnitObservable();

    ImmutableReturnUnitObservable()
    {

    }

    public bool IsRequiredSubscribeOnCurrentThread()
    {
        return false;
    }

    public IDisposable Subscribe(IObserver<Unit> observer)
    {
        observer.OnNext(Unit.Default);
        observer.OnCompleted();
        return Disposable.Empty;
    }
}

ふむふむ。Subscribe() するとすぐに OnNext() と OnCompleted() を順に呼び出しています。

ふと疑問。サンプルコードで呼び出している Subscribe() は 2 引数なのですが、上の定義では 1 引数の定義しかありませんね。 2 引数を受け取る Subscribe() はどこで定義されているのでしょうか。

コードを読むと、以下の拡張メソッドで定義されていました。

public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action onCompleted)
{
    return source.Subscribe(Observer.CreateSubscribeObserver(onNext, Stubs.Throw, onCompleted));
}

ImmutableReturnUnitObservable クラスの Subscribe() の引数には、Observer.CreateSubscribeObserver() の戻り値が渡されます。

Observer.CreateSubscribeObserver() の内部では、引数で渡した onNext, onCompleted を呼び出す Subject<T>インスタンスを生成しています。

// same as AnonymousObserver...
class Subscribe<T> : IObserver<T>
{
    readonly Action<T> onNext;
    readonly Action<Exception> onError;
    readonly Action onCompleted;

    int isStopped = 0;

    public Subscribe(Action<T> onNext, Action<Exception> onError, Action onCompleted)
    {
        this.onNext = onNext;
        this.onError = onError;
        this.onCompleted = onCompleted;
    }

    public void OnNext(T value)
    {
        if (isStopped == 0)
        {
            onNext(value);
        }
    }

    public void OnCompleted()
    {
        if (Interlocked.Increment(ref isStopped) == 1)
        {
            onCompleted();
        }
    }
    
    // (コード一部省略・・・)
}

以上で、Observable.ReturnUnit()Subscribe() するときの処理の流れがわかりました。

ポイントは、ImmutableReturnUnitObservable.Subscribe() の処理でした(再掲)。

internal class ImmutableReturnUnitObservable : IObservable<Unit>, IOptimizedObservable<Unit>
{
    public IDisposable Subscribe(IObserver<Unit> observer)
    {
        observer.OnNext(Unit.Default);
        observer.OnCompleted();
        return Disposable.Empty;
    }
}

Subscribe() すると即座に OnNext(), OnCompleted() を呼び出します。 引数として渡された observer の参照を掴んでいませんので、サンプルコード上で生成しているクロージャ(onNext, onCompleted)はリークしていないことも確認できました。

参考