UniRx: TakeUntil

TakeUntil は引数で渡した IObservable が最初のデータを通知したら、自身の通知をそこで終了させるオペレータです。

f:id:noriok:20181014175916p:plain

以下のサンプルは、1 秒ごとにデータを通知しますが、スペースキーが押されると、そこで通知を終了します。

using System;
using UniRx;
using UniRx.Diagnostics;
using UnityEngine;

public class Main : MonoBehaviour
{
    private Subject<Unit> _subject;
    
    void Start()
    {
        _subject = new Subject<Unit>();

        var s = Observable
            .Interval(TimeSpan.FromSeconds(1))
            .TakeUntil(_subject);
        s.Debug().Subscribe();
    }
    
    void Update()
    {
        // スペースキーを押すと、データ通知
        if (Input.GetKeyDown(KeyCode.Space))
        {
            Debug.Log("スペースキーを押しました");
            _subject.OnNext(Unit.Default);
            _subject.OnCompleted();            
        }
    }        
}

実行結果です。

[18:20:19:830] OnSubscribe
[18:20:21:037] OnNext(0)
[18:20:22:047] OnNext(1)
[18:20:23:063] OnNext(2)
[18:20:24:070] OnNext(3)
[18:20:24:417] スペースキーを押しました
[18:20:24:419] OnCompleted()

スペースキーを押すと、OnCompleted が呼ばれているのが確認できます。

RxJava には通知データを元に終了判定を行うオーバーロードがある

RxJava の takeUntil メソッド には、引数に述語関数を渡して、通知データを元に通知を終了するかを判定するオーバーロードが定義されています。

f:id:noriok:20181014180420p:plain

UniRx には実装が見当たらなかったので、自作してみました。

using System;
using UniRx;

public static class ObservableExtensions
{
    public static IObservable<T> TakeUntil<T>(this IObservable<T> source, Predicate<T> stopPredicate)
    {
        bool stopValueFound = false;
        T stopValue = default;
        var s = source.TakeWhile(e =>
        {
            if (stopPredicate(e))
            {
                stopValueFound = true;
                stopValue = e;
                return false;
            }
            return true;
        });

        return s.Concat(Observable.Create<T>(observer =>
        {
            if (stopValueFound) observer.OnNext(stopValue);
            observer.OnCompleted();
            return Disposable.Empty;
        }));
    }    
}

TakeWhilestopPredicate が true になるかをチェックします。 もし true になる場合は、その値を Concat で末尾に加えています。

お試しコード:

using UniRx;
using UniRx.Diagnostics;
using UnityEngine;

public class Main : MonoBehaviour
{
    void Start()
    {        
        // 偶数が流れてくるまでデータ通知
        var s = new[] { 1, 3, 5, 10, 2, 9, 0 }
            .ToObservable()
            .TakeUntil(e => e % 2 == 0);
        s.Debug().Subscribe();
        
        // 条件にマッチしないなら全てのデータを通知
        var t = Observable.Range(1, 3)
            .TakeUntil(e => e > 10);
        t.Debug().Subscribe();
    }    
}

実行結果です。

OnSubscribe
OnNext(1)
OnNext(3)
OnNext(5)
OnNext(10) // 偶数が流れてきたので、ここで通知を終了する
OnCompleted()
OnSubscribe
OnNext(1)
OnNext(2)
OnNext(3)
OnCompleted()

参考