UniRx: Distinct
Observable.Distinct
は、ストリームに重複値を流さないようにするオペレータです。
using UniRx; using UniRx.Diagnostics; using UnityEngine; public class Main : MonoBehaviour { void Start() { var s = new[] {1, 2, 3, 2, 2, 5, 5} .ToObservable() .Distinct(); s.Debug().Subscribe(); } }
実行結果です。
OnSubscribe OnNext(1) OnNext(2) OnNext(3) OnNext(5) OnCompleted()
重複する値はストリームには流れません。
Distict の実装を読む
public static IObservable<TSource> Distinct<TSource>(this IObservable<TSource> source) { #if !UniRxLibrary var comparer = UnityEqualityComparer.GetDefault<TSource>(); #else var comparer = EqualityComparer<TSource>.Default; #endif return new DistinctObservable<TSource>(source, comparer); }
実体は DistinctObservable<TSource>
です。
internal class DistinctObservable<T> : OperatorObservableBase<T> { readonly IObservable<T> source; readonly IEqualityComparer<T> comparer; public DistinctObservable(IObservable<T> source, IEqualityComparer<T> comparer) : base(source.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.comparer = comparer; } protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel) { return source.Subscribe(new Distinct(this, observer, cancel)); } }
ストリームに流れる値は Distinct
クラスで制御しています。
class Distinct : OperatorObserverBase<T, T> { readonly HashSet<T> hashSet; public Distinct(DistinctObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel) { hashSet = (parent.comparer == null) ? new HashSet<T>() : new HashSet<T>(parent.comparer); } public override void OnNext(T value) { var key = default(T); var isAdded = false; try { key = value; isAdded = hashSet.Add(key); } catch (Exception exception) { try { observer.OnError(exception); } finally { Dispose(); } return; } if (isAdded) { observer.OnNext(value); } } }
ストリームに流した値を HashSet<T>
で管理し、重複値を流さないようにしています。