UniRx: SubscribeWithState は Subscribe より効率がよい

以下のサンプルは、1 秒ごとにテキストを更新するサンプルです。

using System;
using UniRx;
using UnityEngine;
using UnityEngine.UI;

public class Main : MonoBehaviour
{
    [SerializeField] private Text _text = null;

    void Start()
    {
        Observable.Interval(TimeSpan.FromSeconds(1f))
            .Select(e => "Count " + e)
            .Subscribe(e => _text.text = e)
            .AddTo(this);
    }
}

Subscribe のところに注目してください。

.Subscribe(e => _text.text = e);

引数にクロージャを渡しています。このクロージャの生成コストを抑える手段があり、それはSubscribe の代わりに SubscribeWithState を使うことです。

void Start()
{
    Observable.Interval(TimeSpan.FromSeconds(1f))
        .Select(e => "Count " + e)
        // 外部変数を参照しないようになった(クロージャを排除できた)
        .SubscribeWithState(_text, (e, text) => text.text = e)
        .AddTo(this);
}

UniRx ではこのパターンは SubscribeToText というメソッドで定義されています。

public static partial class UnityUIComponentExtensions
{
    public static IDisposable SubscribeToText(this IObservable<string> source, Text text)
    {
        return source.SubscribeWithState(text, (x, t) => t.text = x);
    }
}

ですので、下のように書くことができます。

void Start()
{
    Observable.Interval(TimeSpan.FromSeconds(1f))
        .Select(e => "Count " + e)
        .SubscribeToText(_text)            
        .AddTo(this);
}

参考

UniRx: DistinctUntilChanged

Observable.DistinctUntilChanged は同じ値を連続して流さないオペレータです。

f:id:noriok:20180917143151p:plain

using UniRx;
using UniRx.Diagnostics;
using UnityEngine;

public class Main : MonoBehaviour
{
    void Start()
    {
        var s = new[] {1, 2, 3, 2, 2, 2, 5, 5}
            .ToObservable()
            .DistinctUntilChanged();

        s.Debug().Subscribe();
    }
}

実行結果です。

OnSubscribe
OnNext(1)
OnNext(2)
OnNext(3)
OnNext(2)
OnNext(5)
OnCompleted()

OnNext(2) が 2 回出力されていますが、連続する重複値が流れないだけですので、ストリーム全体としては同じ値が複数回ながれることはあります。

IEqualityComparer を指定するサンプル

下のコードは、文字列の大文字と小文字を区別しないで比較するサンプルです。

using System;
using System.Collections.Generic;
using UniRx;
using UniRx.Diagnostics;
using UnityEngine;

public class Main : MonoBehaviour
{
    void Start()
    {
        var s = "abAaBbBb"
            .ToObservable()
            .DistinctUntilChanged(new CharEqualityComparer());

        s.Debug().Subscribe();
    }

    // 大文字小文字を区別せずに比較
    class CharEqualityComparer : IEqualityComparer<char>
    {
        public bool Equals(char a, char b)
        {
            return char.ToLower(a).Equals(char.ToLower(b));
        }

        public int GetHashCode(char obj)
        {
            return char.ToLower(obj).GetHashCode();
        }
    } 
}

実行結果です。

OnSubscribe
OnNext(a)
OnNext(b)
OnNext(A)
OnNext(B)
OnCompleted()

IEqualityComparer の実装方法については、UniRx には UnityEqualityComparer.cs に Vector2 や Color などの EqualityComparer の実装があり参考になります。

UniRx: Hot と Cold (2)

UniRx: Hot と Cold (1) - C#練習日記 の続きです。63 of 115 からです。

Cold な Observable は分岐できない

Cold な Observable は Observer をひとつしか持てません。

using System;
using UniRx;
using UnityEngine;

public class Main : MonoBehaviour
{
    void Start()
    {
        var s = Observable.Range(1, 3)
            .Do(e => Debug.Log(e));

        s.Subscribe(); // 1 本目のストリーム
        s.Subscribe(); // 2 本目のストリーム
    }
}

実行結果です。

1 // 1 本目のストリーム
2
3
1 // 2 本目のストリーム
2
3

このようにストリーム自体が 2 本になります。

f:id:noriok:20180917142610p:plain

Hot な Observable は分岐できる

Hot な Observable は Observer は複数持てます。

using System;
using UniRx;
using UnityEngine;

public class Main : MonoBehaviour
{
    void Start()
    {
        var s = Observable.Range(1, 3)
            .Do(e => Debug.Log(e))
            .Publish();

        s.Subscribe();
        s.Subscribe();

        s.Connect();
    }
}

実行結果です。

1 // ストリームは 1 本のみ
2
3

f:id:noriok:20180917142623p:plain

UniRx: Hot と Cold (1)

Rx の Hot, Cold について、以下のスライドが参考になります。

www.slideshare.net

このスライドではプログラミング言語として Swift を採用していますが、Rx は特定のプログラミング言語に依存しない概念です。
本記事では UniRx における実装を参照しながらスライドの各内容について見ていきます。

Subject は「Observable」である

Subject<T> クラス定義をみると、

public sealed class Subject<T> : ISubject<T>, IDisposable, IOptimizedObservable<T>
{
    // 実装は省略...
}

となっています。ISubject<T> の実装をみると、

public interface ISubject<T> : ISubject<T, T>, IObserver<T>, IObservable<T>
{
}

となっており、IObservable<T> を実装しています。

よって、Subject<T>IObservable<T> です。

各種 Operator は「Observable」である

たとえば Select オペレーターの実装をみてみますと、

public static partial class Observable
{       
    public static IObservable<TR> Select<T, TR>(this IObservable<T> source, Func<T, int, TR> selector)
    {
        return new SelectObservable<T, TR>(source, selector);
    }
}    

SelectObservable というクラスのインスタンスを返しているのがわかります。

SelectObservable の定義をみると、

internal class SelectObservable<T, TR> : OperatorObservableBase<TR>, ISelect<TR>
{
}

となっており、親クラス OperatorObservableBase<TR> の定義は、

public abstract class OperatorObservableBase<T> : IObservable<T>, IOptimizedObservable<T>
{
}

となっています。

OperatorObservableBase<T>IObservable<T> を実装します。
よって、Select オペレーターは IObservable<T> です。

他のオペレーターについても Select と非常によく似た構造になっています。
たとえば Where オペレーターについてみてみますと、内部では WhereObservable<T> クラスのインスタンスが作られますが、 この WhereObservable<T> は、SelectObservable と同様に OperatorObservableBase<T> のサブクラスになっています。

Cold な Observable は Subscribe されるまで動作しない

Cold な Observable は Subscribe されるまで動作しません。
たとえば、以下のコードは実行しても何も出力されません。

var s = Observable.Range(1, 3)
    .Select(e =>
    {
        Debug.Log(e);
        return e;
    });

Subscribe するとストリームが流れるようになります。

var s = Observable.Range(1, 3)
    .Select(e =>
    {
        Debug.Log(e);
        return e;
    });
s.Subscribe();

実行結果です。

1
2
3

Hot な Observable は Subscribe しなくても動作する

Cold は Subscribe されるまで動作しない

下のコードは実行しても何も出力されません。

var subject = new Subject<int>();
subject.Select(e =>
{
    Debug.Log(e);
    return e;
});

subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnCompleted();

Subject は Hot ですが、Select オペレーターは Cold であり、Subscribe されるまで動作しないためです。

f:id:noriok:20180916190513p:plain

Hot は前の Cold を可動させる

var subject = new Subject<int>();
subject.Select(e =>
{
    Debug.Log(e);
    return e;
}).Publish().Connect(); // Hot 変換

subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnCompleted();

実行結果です。

1
2
3

f:id:noriok:20180916190521p:plain

Hot 変換とは何か

Publish メソッドを呼び出すとストリームが Hot になります。
具体的に内部ではどのような処理が行われるのか、ソースを見ていきます。

public static IConnectableObservable<T> Multicast<T>(this IObservable<T> source, ISubject<T> subject)
{
    return new ConnectableObservable<T>(source, subject);
}

public static IConnectableObservable<T> Publish<T>(this IObservable<T> source)
{
    return source.Multicast(new Subject<T>());
}

Publish メソッドを呼ぶと、ConnectableObservable<T> クラスのインスタンスが作られます。
このクラスの Connect メソッドを呼び出すと、内部で Subscribe が呼ばれているのがわかります。

class ConnectableObservable<T> : IConnectableObservable<T>
{
    readonly IObservable<T> source;
    readonly ISubject<T> subject;
    readonly object gate = new object();
    Connection connection;

    public ConnectableObservable(IObservable<T> source, ISubject<T> subject)
    {
        this.source = source.AsObservable();
        this.subject = subject;
    }

    public IDisposable Connect()
    {
        lock (gate)
        {
            // don't subscribe twice
            if (connection == null)
            {
                var subscription = source.Subscribe(subject);
                connection = new Connection(this, subscription);
            }

            return connection;
        }
    }
}    

つまり、(スライドにも記載ありますが)Hot な Observable は内部で Subscribe を呼び出すことで Hot になるようです。

ここまででスライドの 62 of 115 まで見てきました。

UniRx: Distinct

Observable.Distinct は、ストリームに重複値を流さないようにするオペレータです。

f:id:noriok:20180916185429p:plain

using UniRx;
using UniRx.Diagnostics;
using UnityEngine;

public class Main : MonoBehaviour
{
    void Start()
    {
        var s = new[] {1, 2, 3, 2, 2, 5, 5}
            .ToObservable()
            .Distinct();

        s.Debug().Subscribe();
    }
}

実行結果です。

OnSubscribe
OnNext(1)
OnNext(2)
OnNext(3)
OnNext(5)
OnCompleted()

重複する値はストリームには流れません。

Distict の実装を読む

public static IObservable<TSource> Distinct<TSource>(this IObservable<TSource> source)
{
#if !UniRxLibrary
    var comparer = UnityEqualityComparer.GetDefault<TSource>();
#else
    var comparer = EqualityComparer<TSource>.Default;
#endif

    return new DistinctObservable<TSource>(source, comparer);
}

実体は DistinctObservable<TSource> です。

internal class DistinctObservable<T> : OperatorObservableBase<T>
{
    readonly IObservable<T> source;
    readonly IEqualityComparer<T> comparer;

    public DistinctObservable(IObservable<T> source, IEqualityComparer<T> comparer)
        : base(source.IsRequiredSubscribeOnCurrentThread())
    {
        this.source = source;
        this.comparer = comparer;
    }

    protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
    {
        return source.Subscribe(new Distinct(this, observer, cancel));
    }
}

ストリームに流れる値は Distinct クラスで制御しています。

class Distinct : OperatorObserverBase<T, T>
{
    readonly HashSet<T> hashSet;

    public Distinct(DistinctObservable<T> parent, IObserver<T> observer, IDisposable cancel)
        : base(observer, cancel)
    {
        hashSet = (parent.comparer == null)
            ? new HashSet<T>()
            : new HashSet<T>(parent.comparer);
    }

    public override void OnNext(T value)
    {
        var key = default(T);
        var isAdded = false;
        try
        {
            key = value;
            isAdded = hashSet.Add(key);
        }
        catch (Exception exception)
        {
            try { observer.OnError(exception); } finally { Dispose(); }
            return;
        }

        if (isAdded)
        {
            observer.OnNext(value);
        }
    }
}

ストリームに流した値を HashSet<T> で管理し、重複値を流さないようにしています。

参考

UniRx: コルーチンを IObservable 化する

Observable.FromCoroutineValue() でコルーチンを IObservable 化することができます。 コルーチンの yield return の値が OnNext に渡ります。

using System;
using System.Collections;
using UniRx;
using UniRx.Diagnostics;
using UnityEngine;

public class Main : MonoBehaviour
{
    void Start()
    {
        var s = Observable.FromCoroutineValue<int>(G);
        s.Debug().Subscribe();
    }
    
    IEnumerator G()
    {        
        yield return 10;
        yield return 20;
        yield return 30;
    }
}

実行結果です。

OnSubscribe
OnNext(10)
OnNext(20)
OnNext(30)
OnCompleted()

ちなみに、コルーチン内で yield return した値は全て同一フレーム内で流れてきます。

Time.frameCount でフレーム値を表示するようにしてみます。

using System;
using System.Collections;
using UniRx;
using UnityEngine;

public class Main : MonoBehaviour
{
    void Start()
    {
        var s = Observable.FromCoroutineValue<int>(G);
        s.Subscribe(
            e => Debug.Log($"e: {e} frame: {Time.frameCount}"),
            () => Debug.Log("OnCompleted"));
    }
    
    IEnumerator G()
    {        
        yield return 10;
        yield return 20;
        yield return 30;
    }
}

実行結果です。

e: 10 frame: 1
e: 20 frame: 1
e: 30 frame: 1
OnCompleted

コルーチン G() の 3 つの値が全て Time.frameCount = 1 のときに流れてきているのが分かります。

UniRx: ToObservable() が From() に代わりになるのかな

Rx には From というオペレータがあります。 UniRx には From が定義されてないようですが、ToObservable() というのがあります。 このメソッドを使うと、IEnumerable<T>IObservable<T> に変換できます。

using UniRx;
using UniRx.Diagnostics;
using UnityEngine;

public class Main : MonoBehaviour
{
    void Start()
    {
        var s = new[] {10, 20, 30}.ToObservable();
        s.Debug().Subscribe();      
    }
}

実行結果です。

[20:20:37:877] OnSubscribe
[20:20:37:882] OnNext(10)
[20:20:37:884] OnNext(20)
[20:20:37:885] OnNext(30)
[20:20:37:886] OnCompleted()

ToObservable() は、コルーチンを Observable 化することもできます。

publishEveryYield が false の場合

void Start()
{
  var s = G().ToObservable(publishEveryYield: false);
  s.Debug().Subscribe();
}

IEnumerator G()
{
  for (int i = 0; i < 3; i++)
    yield return null;
}

実行結果です。

[20:30:57:423] OnSubscribe
[20:30:57:822] OnNext(())
[20:30:57:824] OnCompleted()

publishEveryYield が true の場合

void Start()
{
  var s = G().ToObservable(publishEveryYield: true);
  s.Debug().Subscribe();
}

IEnumerator G()
{       
  for (int i = 0; i < 3; i++)
    yield return null;
}

実行結果です。

[20:31:55:981] OnSubscribe
[20:31:55:995] OnNext(())
[20:31:56:024] OnNext(())
[20:31:56:375] OnNext(())
[20:31:56:396] OnNext(())
[20:31:56:397] OnCompleted()

publishEveryYield が true の場合は、yield return が実行されるたびに OnNext されるようになります。

ログに出力されている 4 つの OnNext のうち、最初の 3 つの OnNext は yield return の実行分で、 最後のは、コルーチンが正常終了したときに通知される OnNext です。

コルーチンが yield return しなくても OnNext は呼ばれる

void Start()
{
  var s = H().ToObservable(publishEveryYield: false);
  s.Debug().Subscribe();
}

IEnumerator H()
{       
  yield break;
}

実行結果です。

[20:43:58:160] OnSubscribe
[20:43:58:172] OnNext(())
[20:43:58:174] OnCompleted()

コルーチンを Observable 化するとき、コルーチンが yield return せず、単に yield break するだけでも OnNext は呼ばれます。

参考