UniRx: SubscribeWithState は Subscribe より効率がよい
以下のサンプルは、1 秒ごとにテキストを更新するサンプルです。
using System; using UniRx; using UnityEngine; using UnityEngine.UI; public class Main : MonoBehaviour { [SerializeField] private Text _text = null; void Start() { Observable.Interval(TimeSpan.FromSeconds(1f)) .Select(e => "Count " + e) .Subscribe(e => _text.text = e) .AddTo(this); } }
Subscribe
のところに注目してください。
.Subscribe(e => _text.text = e);
引数にクロージャを渡しています。このクロージャの生成コストを抑える手段があり、それはSubscribe
の代わりに SubscribeWithState
を使うことです。
void Start() { Observable.Interval(TimeSpan.FromSeconds(1f)) .Select(e => "Count " + e) // 外部変数を参照しないようになった(クロージャを排除できた) .SubscribeWithState(_text, (e, text) => text.text = e) .AddTo(this); }
UniRx ではこのパターンは SubscribeToText
というメソッドで定義されています。
public static partial class UnityUIComponentExtensions { public static IDisposable SubscribeToText(this IObservable<string> source, Text text) { return source.SubscribeWithState(text, (x, t) => t.text = x); } }
ですので、下のように書くことができます。
void Start() { Observable.Interval(TimeSpan.FromSeconds(1f)) .Select(e => "Count " + e) .SubscribeToText(_text) .AddTo(this); }
参考
- neue cc - Unityにおけるコルーチンの省メモリと高速化について、或いはUniRx 5.3.0でのその反映
SubscribeWithState
を使う上での UniRx 作者さんの解説があります。
- neue cc - Unityでのボクシングの殺し方、或いはラムダ式における見えないnewの見極め方
- クロージャおよび純粋な関数について、内部ではどのようなコードが生成されているかの解説があります。
UniRx: DistinctUntilChanged
Observable.DistinctUntilChanged
は同じ値を連続して流さないオペレータです。
using UniRx; using UniRx.Diagnostics; using UnityEngine; public class Main : MonoBehaviour { void Start() { var s = new[] {1, 2, 3, 2, 2, 2, 5, 5} .ToObservable() .DistinctUntilChanged(); s.Debug().Subscribe(); } }
実行結果です。
OnSubscribe OnNext(1) OnNext(2) OnNext(3) OnNext(2) OnNext(5) OnCompleted()
OnNext(2)
が 2 回出力されていますが、連続する重複値が流れないだけですので、ストリーム全体としては同じ値が複数回ながれることはあります。
IEqualityComparer を指定するサンプル
下のコードは、文字列の大文字と小文字を区別しないで比較するサンプルです。
using System; using System.Collections.Generic; using UniRx; using UniRx.Diagnostics; using UnityEngine; public class Main : MonoBehaviour { void Start() { var s = "abAaBbBb" .ToObservable() .DistinctUntilChanged(new CharEqualityComparer()); s.Debug().Subscribe(); } // 大文字小文字を区別せずに比較 class CharEqualityComparer : IEqualityComparer<char> { public bool Equals(char a, char b) { return char.ToLower(a).Equals(char.ToLower(b)); } public int GetHashCode(char obj) { return char.ToLower(obj).GetHashCode(); } } }
実行結果です。
OnSubscribe OnNext(a) OnNext(b) OnNext(A) OnNext(B) OnCompleted()
IEqualityComparer の実装方法については、UniRx には UnityEqualityComparer.cs に Vector2 や Color などの EqualityComparer の実装があり参考になります。
UniRx: Hot と Cold (2)
UniRx: Hot と Cold (1) - C#練習日記 の続きです。63 of 115 からです。
Cold な Observable は分岐できない
Cold な Observable は Observer をひとつしか持てません。
using System; using UniRx; using UnityEngine; public class Main : MonoBehaviour { void Start() { var s = Observable.Range(1, 3) .Do(e => Debug.Log(e)); s.Subscribe(); // 1 本目のストリーム s.Subscribe(); // 2 本目のストリーム } }
実行結果です。
1 // 1 本目のストリーム 2 3 1 // 2 本目のストリーム 2 3
このようにストリーム自体が 2 本になります。
Hot な Observable は分岐できる
Hot な Observable は Observer は複数持てます。
using System; using UniRx; using UnityEngine; public class Main : MonoBehaviour { void Start() { var s = Observable.Range(1, 3) .Do(e => Debug.Log(e)) .Publish(); s.Subscribe(); s.Subscribe(); s.Connect(); } }
実行結果です。
1 // ストリームは 1 本のみ 2 3
UniRx: Hot と Cold (1)
Rx の Hot, Cold について、以下のスライドが参考になります。
www.slideshare.net
このスライドではプログラミング言語として Swift を採用していますが、Rx は特定のプログラミング言語に依存しない概念です。
本記事では UniRx における実装を参照しながらスライドの各内容について見ていきます。
Subject は「Observable」である
Subject<T>
クラス定義をみると、
public sealed class Subject<T> : ISubject<T>, IDisposable, IOptimizedObservable<T> { // 実装は省略... }
となっています。ISubject<T>
の実装をみると、
public interface ISubject<T> : ISubject<T, T>, IObserver<T>, IObservable<T> { }
となっており、IObservable<T>
を実装しています。
よって、Subject<T>
は IObservable<T>
です。
各種 Operator は「Observable」である
たとえば Select
オペレーターの実装をみてみますと、
public static partial class Observable { public static IObservable<TR> Select<T, TR>(this IObservable<T> source, Func<T, int, TR> selector) { return new SelectObservable<T, TR>(source, selector); } }
SelectObservable
というクラスのインスタンスを返しているのがわかります。
SelectObservable
の定義をみると、
internal class SelectObservable<T, TR> : OperatorObservableBase<TR>, ISelect<TR> { }
となっており、親クラス OperatorObservableBase<TR>
の定義は、
public abstract class OperatorObservableBase<T> : IObservable<T>, IOptimizedObservable<T> { }
となっています。
OperatorObservableBase<T>
は IObservable<T>
を実装します。
よって、Select
オペレーターは IObservable<T>
です。
他のオペレーターについても Select
と非常によく似た構造になっています。
たとえば Where
オペレーターについてみてみますと、内部では WhereObservable<T>
クラスのインスタンスが作られますが、
この WhereObservable<T>
は、SelectObservable
と同様に OperatorObservableBase<T>
のサブクラスになっています。
Cold な Observable は Subscribe されるまで動作しない
Cold な Observable は Subscribe されるまで動作しません。
たとえば、以下のコードは実行しても何も出力されません。
var s = Observable.Range(1, 3) .Select(e => { Debug.Log(e); return e; });
Subscribe
するとストリームが流れるようになります。
var s = Observable.Range(1, 3) .Select(e => { Debug.Log(e); return e; }); s.Subscribe();
実行結果です。
1 2 3
Hot な Observable は Subscribe しなくても動作する
Cold は Subscribe されるまで動作しない
下のコードは実行しても何も出力されません。
var subject = new Subject<int>(); subject.Select(e => { Debug.Log(e); return e; }); subject.OnNext(1); subject.OnNext(2); subject.OnNext(3); subject.OnCompleted();
Subject
は Hot ですが、Select
オペレーターは Cold であり、Subscribe
されるまで動作しないためです。
Hot は前の Cold を可動させる
var subject = new Subject<int>(); subject.Select(e => { Debug.Log(e); return e; }).Publish().Connect(); // Hot 変換 subject.OnNext(1); subject.OnNext(2); subject.OnNext(3); subject.OnCompleted();
実行結果です。
1 2 3
Hot 変換とは何か
Publish
メソッドを呼び出すとストリームが Hot になります。
具体的に内部ではどのような処理が行われるのか、ソースを見ていきます。
public static IConnectableObservable<T> Multicast<T>(this IObservable<T> source, ISubject<T> subject) { return new ConnectableObservable<T>(source, subject); } public static IConnectableObservable<T> Publish<T>(this IObservable<T> source) { return source.Multicast(new Subject<T>()); }
Publish
メソッドを呼ぶと、ConnectableObservable<T>
クラスのインスタンスが作られます。
このクラスの Connect
メソッドを呼び出すと、内部で Subscribe
が呼ばれているのがわかります。
class ConnectableObservable<T> : IConnectableObservable<T> { readonly IObservable<T> source; readonly ISubject<T> subject; readonly object gate = new object(); Connection connection; public ConnectableObservable(IObservable<T> source, ISubject<T> subject) { this.source = source.AsObservable(); this.subject = subject; } public IDisposable Connect() { lock (gate) { // don't subscribe twice if (connection == null) { var subscription = source.Subscribe(subject); connection = new Connection(this, subscription); } return connection; } } }
つまり、(スライドにも記載ありますが)Hot な Observable は内部で Subscribe
を呼び出すことで Hot になるようです。
ここまででスライドの 62 of 115 まで見てきました。
UniRx: Distinct
Observable.Distinct
は、ストリームに重複値を流さないようにするオペレータです。
using UniRx; using UniRx.Diagnostics; using UnityEngine; public class Main : MonoBehaviour { void Start() { var s = new[] {1, 2, 3, 2, 2, 5, 5} .ToObservable() .Distinct(); s.Debug().Subscribe(); } }
実行結果です。
OnSubscribe OnNext(1) OnNext(2) OnNext(3) OnNext(5) OnCompleted()
重複する値はストリームには流れません。
Distict の実装を読む
public static IObservable<TSource> Distinct<TSource>(this IObservable<TSource> source) { #if !UniRxLibrary var comparer = UnityEqualityComparer.GetDefault<TSource>(); #else var comparer = EqualityComparer<TSource>.Default; #endif return new DistinctObservable<TSource>(source, comparer); }
実体は DistinctObservable<TSource>
です。
internal class DistinctObservable<T> : OperatorObservableBase<T> { readonly IObservable<T> source; readonly IEqualityComparer<T> comparer; public DistinctObservable(IObservable<T> source, IEqualityComparer<T> comparer) : base(source.IsRequiredSubscribeOnCurrentThread()) { this.source = source; this.comparer = comparer; } protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel) { return source.Subscribe(new Distinct(this, observer, cancel)); } }
ストリームに流れる値は Distinct
クラスで制御しています。
class Distinct : OperatorObserverBase<T, T> { readonly HashSet<T> hashSet; public Distinct(DistinctObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel) { hashSet = (parent.comparer == null) ? new HashSet<T>() : new HashSet<T>(parent.comparer); } public override void OnNext(T value) { var key = default(T); var isAdded = false; try { key = value; isAdded = hashSet.Add(key); } catch (Exception exception) { try { observer.OnError(exception); } finally { Dispose(); } return; } if (isAdded) { observer.OnNext(value); } } }
ストリームに流した値を HashSet<T>
で管理し、重複値を流さないようにしています。
参考
UniRx: コルーチンを IObservable 化する
Observable.FromCoroutineValue()
でコルーチンを IObservable
化することができます。
コルーチンの yield return の値が OnNext に渡ります。
using System; using System.Collections; using UniRx; using UniRx.Diagnostics; using UnityEngine; public class Main : MonoBehaviour { void Start() { var s = Observable.FromCoroutineValue<int>(G); s.Debug().Subscribe(); } IEnumerator G() { yield return 10; yield return 20; yield return 30; } }
実行結果です。
OnSubscribe OnNext(10) OnNext(20) OnNext(30) OnCompleted()
ちなみに、コルーチン内で yield return した値は全て同一フレーム内で流れてきます。
Time.frameCount
でフレーム値を表示するようにしてみます。
using System; using System.Collections; using UniRx; using UnityEngine; public class Main : MonoBehaviour { void Start() { var s = Observable.FromCoroutineValue<int>(G); s.Subscribe( e => Debug.Log($"e: {e} frame: {Time.frameCount}"), () => Debug.Log("OnCompleted")); } IEnumerator G() { yield return 10; yield return 20; yield return 30; } }
実行結果です。
e: 10 frame: 1 e: 20 frame: 1 e: 30 frame: 1 OnCompleted
コルーチン G()
の 3 つの値が全て Time.frameCount = 1
のときに流れてきているのが分かります。
UniRx: ToObservable() が From() に代わりになるのかな
Rx には From というオペレータがあります。
UniRx には From が定義されてないようですが、ToObservable() というのがあります。
このメソッドを使うと、IEnumerable<T>
を IObservable<T>
に変換できます。
using UniRx; using UniRx.Diagnostics; using UnityEngine; public class Main : MonoBehaviour { void Start() { var s = new[] {10, 20, 30}.ToObservable(); s.Debug().Subscribe(); } }
実行結果です。
[20:20:37:877] OnSubscribe [20:20:37:882] OnNext(10) [20:20:37:884] OnNext(20) [20:20:37:885] OnNext(30) [20:20:37:886] OnCompleted()
ToObservable()
は、コルーチンを Observable 化することもできます。
publishEveryYield が false の場合
void Start() { var s = G().ToObservable(publishEveryYield: false); s.Debug().Subscribe(); } IEnumerator G() { for (int i = 0; i < 3; i++) yield return null; }
実行結果です。
[20:30:57:423] OnSubscribe [20:30:57:822] OnNext(()) [20:30:57:824] OnCompleted()
publishEveryYield が true の場合
void Start() { var s = G().ToObservable(publishEveryYield: true); s.Debug().Subscribe(); } IEnumerator G() { for (int i = 0; i < 3; i++) yield return null; }
実行結果です。
[20:31:55:981] OnSubscribe [20:31:55:995] OnNext(()) [20:31:56:024] OnNext(()) [20:31:56:375] OnNext(()) [20:31:56:396] OnNext(()) [20:31:56:397] OnCompleted()
publishEveryYield が true の場合は、yield return が実行されるたびに OnNext されるようになります。
ログに出力されている 4 つの OnNext のうち、最初の 3 つの OnNext は yield return の実行分で、 最後のは、コルーチンが正常終了したときに通知される OnNext です。
コルーチンが yield return しなくても OnNext は呼ばれる
void Start() { var s = H().ToObservable(publishEveryYield: false); s.Debug().Subscribe(); } IEnumerator H() { yield break; }
実行結果です。
[20:43:58:160] OnSubscribe [20:43:58:172] OnNext(()) [20:43:58:174] OnCompleted()
コルーチンを Observable 化するとき、コルーチンが yield return せず、単に yield break するだけでも OnNext は呼ばれます。