UniRx: Distinct

Observable.Distinct は、ストリームに重複値を流さないようにするオペレータです。

f:id:noriok:20180916185429p:plain

using UniRx;
using UniRx.Diagnostics;
using UnityEngine;

public class Main : MonoBehaviour
{
    void Start()
    {
        var s = new[] {1, 2, 3, 2, 2, 5, 5}
            .ToObservable()
            .Distinct();

        s.Debug().Subscribe();
    }
}

実行結果です。

OnSubscribe
OnNext(1)
OnNext(2)
OnNext(3)
OnNext(5)
OnCompleted()

重複する値はストリームには流れません。

Distict の実装を読む

public static IObservable<TSource> Distinct<TSource>(this IObservable<TSource> source)
{
#if !UniRxLibrary
    var comparer = UnityEqualityComparer.GetDefault<TSource>();
#else
    var comparer = EqualityComparer<TSource>.Default;
#endif

    return new DistinctObservable<TSource>(source, comparer);
}

実体は DistinctObservable<TSource> です。

internal class DistinctObservable<T> : OperatorObservableBase<T>
{
    readonly IObservable<T> source;
    readonly IEqualityComparer<T> comparer;

    public DistinctObservable(IObservable<T> source, IEqualityComparer<T> comparer)
        : base(source.IsRequiredSubscribeOnCurrentThread())
    {
        this.source = source;
        this.comparer = comparer;
    }

    protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
    {
        return source.Subscribe(new Distinct(this, observer, cancel));
    }
}

ストリームに流れる値は Distinct クラスで制御しています。

class Distinct : OperatorObserverBase<T, T>
{
    readonly HashSet<T> hashSet;

    public Distinct(DistinctObservable<T> parent, IObserver<T> observer, IDisposable cancel)
        : base(observer, cancel)
    {
        hashSet = (parent.comparer == null)
            ? new HashSet<T>()
            : new HashSet<T>(parent.comparer);
    }

    public override void OnNext(T value)
    {
        var key = default(T);
        var isAdded = false;
        try
        {
            key = value;
            isAdded = hashSet.Add(key);
        }
        catch (Exception exception)
        {
            try { observer.OnError(exception); } finally { Dispose(); }
            return;
        }

        if (isAdded)
        {
            observer.OnNext(value);
        }
    }
}

ストリームに流した値を HashSet<T> で管理し、重複値を流さないようにしています。

参考