UniRx: Subject への購読とそのキャンセル処理を眺める

Subject クラスへの購読と、その購読のキャンセル処理について以下のサンプルコードで見ていきます。

using UniRx;
using UnityEngine;

public class Main : MonoBehaviour
{
    void Start()
    {
        var subject = new Subject<int>();
        // 購読する
        var disposable = subject.Subscribe();
        // 購読をキャンセルする
        disposable.Dispose();
    }
}

Subject の、

  • Subscribe() を呼び出すと、内部でどのような処理が行われるか
  • Subscribe() から返される IDisposableDispose() を呼び出すと、どのような処理が行われるか

を見ていきます。

Subscribe() メソッドは、拡張メソッドとして定義されています。

public static IDisposable Subscribe<T>(this IObservable<T> source)
{
    return source.Subscribe(UniRx.InternalUtil.ThrowObserver<T>.Instance);
}

次に Subject.Subscribe() が呼ばれます。

public sealed class Subject<T> : ISubject<T>, IDisposable, IOptimizedObservable<T>
{
    public IDisposable Subscribe(IObserver<T> observer)
    {
        if (observer == null) throw new ArgumentNullException("observer");

        var ex = default(Exception);

        lock (observerLock)
        {
            ThrowIfDisposed();
            if (!isStopped)
            {
                var listObserver = outObserver as ListObserver<T>;
                if (listObserver != null)
                {
                    outObserver = listObserver.Add(observer);
                }
                else
                {
                    var current = outObserver;
                    if (current is EmptyObserver<T>)
                    {
                        outObserver = observer;
                    }
                    else
                    {
                        outObserver = new ListObserver<T>(new ImmutableList<IObserver<T>>(new[] { current, observer }));
                    }
                }

                return new Subscription(this, observer);
            }

            ex = lastError;
        }

        if (ex != null)
        {
            observer.OnError(ex);
        }
        else
        {
            observer.OnCompleted();
        }

        return Disposable.Empty;
    }
}

Subject クラス内部で ListObserver という IObserver を格納するためのコンテナを持っており、そのコンテナに引数で渡された observer を格納しています。興味深い特徴として、ListObserver に新たな observer を追加するたびに、新しい ListObserver が作られます。

ListObserver がそのような仕組みになっている理由について、UniRx 作者さんのブログにそれらしい記述を見つけたのですが、自分の理解不足のためかよく理解できませんでした。

次に Subscribe() メソッドの戻り値の Subscription クラスの実装を見てみます。

class Subscription : IDisposable
{
    readonly object gate = new object();
    Subject<T> parent;
    IObserver<T> unsubscribeTarget;

    public Subscription(Subject<T> parent, IObserver<T> unsubscribeTarget)
    {
        this.parent = parent;
        this.unsubscribeTarget = unsubscribeTarget;
    }

    public void Dispose()
    {
        lock (gate)
        {
            if (parent != null)
            {
                lock (parent.observerLock)
                {
                    var listObserver = parent.outObserver as ListObserver<T>;
                    if (listObserver != null)
                    {
                        parent.outObserver = listObserver.Remove(unsubscribeTarget);
                    }
                    else
                    {
                        parent.outObserver = EmptyObserver<T>.Instance;
                    }

                    unsubscribeTarget = null;
                    parent = null;
                }
            }
        }
    }
}

Subscription.Dispose() を呼び出すと、ListObserver に登録した observer が削除される仕組みになっています。

まとめると、以下の処理が行われています。

  • Subject.Subscribe(IObserver<T> observer) を呼ぶと、引数の observer を ListObserver に登録
  • SubscriptionDispose() を呼ぶと、ListObserver から observer を削除