5
\$\begingroup\$

I need an operator to allow a stream of booleans to act as a gate for another stream (let values pass when the gate stream is true, drop them when it's false). I would normally use Switch for this, but if the source stream is cold it will keep recreating it, which I don't want.

I also want to clean up after myself, so that the result completes if either of the source or the gate complete.

public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
{
 var s = source.Publish().RefCount();
 var g = gate.Publish().RefCount();
 var sourceCompleted = s.TakeLast(1).DefaultIfEmpty().Select(_ => Unit.Default);
 var gateCompleted = g.TakeLast(1).DefaultIfEmpty().Select(_ => Unit.Default);
 var anyCompleted = Observable.Amb(sourceCompleted, gateCompleted);
 var flag = false;
 g.TakeUntil(anyCompleted).Subscribe(value => flag = value);
 return s.Where(_ => flag).TakeUntil(anyCompleted);
}

Besides the overall verbosity, I dislike that I subscribe to the gate even if the result is never subscribed to (in which case this operator should be a no-op). Is there a way to get rid of that subscribe?

I have also tried this implementation, but it's even worse when it comes to cleaning up after itself:

return Observable.Create<T>(
 o =>
 {
 var flag = false;
 gate.Subscribe(value => flag = value);
 return source.Subscribe(
 value =>
 {
 if (flag) o.OnNext(value);
 });
 });

These are the tests I'm using to check the implementation:

[TestMethod]
public void TestMethod1()
{
 var output = new List<int>();
 var source = new Subject<int>();
 var gate = new Subject<bool>();
 var result = source.When(gate);
 result.Subscribe(output.Add, () => output.Add(-1));
 // the gate starts with false, so the source events are ignored
 source.OnNext(1);
 source.OnNext(2);
 source.OnNext(3);
 CollectionAssert.AreEqual(new int[0], output);
 // setting the gate to true will let the source events pass
 gate.OnNext(true);
 source.OnNext(4);
 CollectionAssert.AreEqual(new[] { 4 }, output);
 source.OnNext(5);
 CollectionAssert.AreEqual(new[] { 4, 5 }, output);
 // setting the gate to false stops source events from propagating again
 gate.OnNext(false);
 source.OnNext(6);
 source.OnNext(7);
 CollectionAssert.AreEqual(new[] { 4, 5 }, output);
 // completing the source also completes the result
 source.OnCompleted();
 CollectionAssert.AreEqual(new[] { 4, 5, -1 }, output);
}
[TestMethod]
public void TestMethod2()
{
 // completing the gate also completes the result
 var output = new List<int>();
 var source = new Subject<int>();
 var gate = new Subject<bool>();
 var result = source.When(gate);
 result.Subscribe(output.Add, () => output.Add(-1));
 gate.OnCompleted();
 CollectionAssert.AreEqual(new[] { -1 }, output);
}
Jamal
35.2k13 gold badges134 silver badges238 bronze badges
asked Jun 2, 2018 at 22:01
\$\endgroup\$
1

1 Answer 1

1
\$\begingroup\$

Based on the responses from StackOverflow [1], this is what I came up with; it passes both tests plus a third one where I verified that cold observables are only processed once:

 public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
 {
 return source.Publish(
 ss =>
 {
 var gg = gate.Publish().RefCount();
 var bothCompleted = Observable.Amb(ss.WhenCompleted(), gg.WhenCompleted());
 return gate.Select(g => g ? ss : ss.IgnoreElements()).Switch().TakeUntil(bothCompleted);
 });
 }
 private static IObservable<Unit> WhenCompleted<T>(this IObservable<T> source) =>
 source.Select(_ => Unit.Default).IgnoreElements().Concat(Observable.Return(Unit.Default));

[1] https://stackoverflow.com/questions/50666864/rx-net-gate-operator

answered Jun 4, 2018 at 13:21
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.