I need an operator to allow a stream of booleans to act as a gate for another stream (let values pass when the gate stream is true, drop them when it's false). I would normally use Switch for this, but if the source stream is cold it will keep recreating it, which I don't want.
I also want to clean up after myself, so that the result completes if either of the source or the gate complete.
public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
{
var s = source.Publish().RefCount();
var g = gate.Publish().RefCount();
var sourceCompleted = s.TakeLast(1).DefaultIfEmpty().Select(_ => Unit.Default);
var gateCompleted = g.TakeLast(1).DefaultIfEmpty().Select(_ => Unit.Default);
var anyCompleted = Observable.Amb(sourceCompleted, gateCompleted);
var flag = false;
g.TakeUntil(anyCompleted).Subscribe(value => flag = value);
return s.Where(_ => flag).TakeUntil(anyCompleted);
}
Besides the overall verbosity, I dislike that I subscribe to the gate even if the result is never subscribed to (in which case this operator should be a no-op). Is there a way to get rid of that subscribe?
I have also tried this implementation, but it's even worse when it comes to cleaning up after itself:
return Observable.Create<T>(
o =>
{
var flag = false;
gate.Subscribe(value => flag = value);
return source.Subscribe(
value =>
{
if (flag) o.OnNext(value);
});
});
These are the tests I'm using to check the implementation:
[TestMethod]
public void TestMethod1()
{
var output = new List<int>();
var source = new Subject<int>();
var gate = new Subject<bool>();
var result = source.When(gate);
result.Subscribe(output.Add, () => output.Add(-1));
// the gate starts with false, so the source events are ignored
source.OnNext(1);
source.OnNext(2);
source.OnNext(3);
CollectionAssert.AreEqual(new int[0], output);
// setting the gate to true will let the source events pass
gate.OnNext(true);
source.OnNext(4);
CollectionAssert.AreEqual(new[] { 4 }, output);
source.OnNext(5);
CollectionAssert.AreEqual(new[] { 4, 5 }, output);
// setting the gate to false stops source events from propagating again
gate.OnNext(false);
source.OnNext(6);
source.OnNext(7);
CollectionAssert.AreEqual(new[] { 4, 5 }, output);
// completing the source also completes the result
source.OnCompleted();
CollectionAssert.AreEqual(new[] { 4, 5, -1 }, output);
}
[TestMethod]
public void TestMethod2()
{
// completing the gate also completes the result
var output = new List<int>();
var source = new Subject<int>();
var gate = new Subject<bool>();
var result = source.When(gate);
result.Subscribe(output.Add, () => output.Add(-1));
gate.OnCompleted();
CollectionAssert.AreEqual(new[] { -1 }, output);
}
1 Answer 1
Based on the responses from StackOverflow [1], this is what I came up with; it passes both tests plus a third one where I verified that cold observables are only processed once:
public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
{
return source.Publish(
ss =>
{
var gg = gate.Publish().RefCount();
var bothCompleted = Observable.Amb(ss.WhenCompleted(), gg.WhenCompleted());
return gate.Select(g => g ? ss : ss.IgnoreElements()).Switch().TakeUntil(bothCompleted);
});
}
private static IObservable<Unit> WhenCompleted<T>(this IObservable<T> source) =>
source.Select(_ => Unit.Default).IgnoreElements().Concat(Observable.Return(Unit.Default));
[1] https://stackoverflow.com/questions/50666864/rx-net-gate-operator
withLatestFrom
and an appropriate filter and map. Here is a RxSwift implementation: gist.github.com/dtartaglia/1a70c4f7b8960d06bd7f1bfa81802cc3 \$\endgroup\$