11
\$\begingroup\$

I'm working on a WPF app which uses ReactiveUI and Rx, there's part of the workflow that watches two data sources (ReferenceData and PredictedData properties on a View Model) and an area of that data source (FocusArea property) which is used to show a line graph of that data.

Sometimes there can be a lot of data (and sometimes both data sources change at the same time), and building the line graphs can take a while. So it's done on a background thread. I'm also using the Ramer–Douglas–Peucker algorithm to approximate the graph for display (the ReduceGraphPoints method, which returns Task<List<Point>>). So the final graph is only updated when the line data for both data streams are calculated and simplified.

Once the calculation is started, the Loading property of the View Model is updated so we can mark the current graph as about-to-be-replaced

The code I have for that bit of functionally at the moment is:

this.WhenAny(me => me.ReferenceData, 
 me => me.PredictedData, 
 me => me.FocusArea, 
 (refData, predictedData, area) => new { ReferenceData = refData.Value, PredictedData = predictedData.Value, Area = area.Value })
 .Where(x => !x.Area.IsEmpty)
 .Do(_ => Loading = true)
 .Throttle(TimeSpan.FromMilliseconds(50))
 .Select(x => Observable.FromAsync(token => Task.Run(() =>
 {
 var refPoints = x.ReferenceData.GetPointsInRange(x.Area.Range).ToList();
 var reducedRef = ReduceGraphPoints(refPoints, token);
 var predictedPoints = x.PredictedData.GetPointsInRange(x.Area.Range).ToList();
 var reducedPredicted = ReduceGraphPoints(predictedPoints, token);
 return new { ReferencePoints = reducedRef.Result, PredictedPoints = reducedPredicted.Result };
 }, token)))
 .Switch()
 .Subscribe(x => 
 {
 ReferencePoints = x.ReferencePoints;
 PredictedPoints = x.PredictedPoints;
 Loading = false;
 });

So I'm looking for feedback from any Rx experts on this. Things like the use of the Do method, and the cancellation token. Most of the methods called use ThrowIfCancellationRequested on the token, but I'm ignoring the cancellation exception because if it has been cancelled, then Switch() will already be watching the next IObservable and won't care about the old one - is that the best way of doing this?

asked Mar 3, 2014 at 17:10
\$\endgroup\$

2 Answers 2

4
\$\begingroup\$

When Switch unsubscribes from the previous Observable to connect to the new one, FromAsync should signal the cancellation token and also correctly eat the ThrowOnCancellation exception.

While it'd probably be better to try to use a ReactiveCommand to model the loading bool (which might not be easy given the cancelation and Switch), if this code works for you it should be fine.

Make sure to specify RxApp.MainThreadScheduler for the Throttle though, it's more efficient

answered Mar 4, 2014 at 3:04
\$\endgroup\$
3
\$\begingroup\$
return new { ReferencePoints = reducedRef.Result, PredictedPoints = reducedPredicted.Result };

You can use await here, so that you're not unnecessarily blocking a thread. This would require you to change the lambda to async and then change the line to:

return new { ReferencePoints = await reducedRef, PredictedPoints = await reducedPredicted };
answered Mar 10, 2014 at 16:48
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.