-
-
Notifications
You must be signed in to change notification settings - Fork 938
UniTask + R3 subscribe awaiting other subscribers #589
-
I'm new to Cysharp projects and I wonder is there a way to subscribe to Observable<T>
but callback must await any other called callbacks which was invoked before it.
I'm in case where for cards game I want to somehow react to the end of turn, one module wants to draw 2 cards every time, but another wants player to select new card as a reward and while both modules execution is async only second one can take more then 1 frame, because asking player to select something on GUI might take undetermined amount of time. I want next modules to await as long as need in such case.
One of the solution would be just implement custom List<Func<UniTask>>
and do manual subscription, but I really want to keep consistent with UniTask + R3, because whole project uses this query-like R3 syntax like: .Pairwise().Where()...
UPD#0:
I've wrote simple unit test to try some approach I've found here
[Test] public void SubscribeAwaitTest() { var observable = new Subject<Unit>(); var subs = new CompositeDisposable(); observable .Select(_ => Observable.FromAsync(async _ => await AsyncHandle())) .Concat() .Select(_ => Observable.FromAsync(async _ => await AsyncHandle())) .Concat() .Subscribe(); observable.OnNext(Unit.Default); Assert.AreEqual(1, _counter); subs.Dispose(); }
As I understand the trick here is to convert async callbacks to observables and sequentially .Concat()
them to original one. But to make it consistent to regular Rx syntax I guess I should implement something like AwaitObserversSequnceObservable<T> : Observable<T>
which will do .Concat()
inside .Subscribe()
method. Is it a good way to go?
UPD#1:
For now I ended up with next solution: I wrap Observable<T>
in container which holds UniTask
of last invoked async UniTask
callback. Client code should just take AsyncCallbackObservableContainer
and subscribe to it using extensions methods. At least it works and give a possibility to use R3 syntax.
From my point of view two weakest parts of it are:
- I need to expose
AsyncCallbackObservableContainer<T>
instead of regularObservable<T>
which seems a little inconvenient - For each async callback running
UniTaskCompletionSource
created to control async callback flow, which I guess is GC problem
using System; using Cysharp.Threading.Tasks; using R3; namespace Source.Extensions.R3 { public class AsyncCallbackObservableContainer<T> { // used to prevent overlapping of awaitable callbacks private Cysharp.Threading.Tasks.UniTask _currentHandlerTask = Cysharp.Threading.Tasks.UniTask.CompletedTask; public Observable<T> CoreObservable { get; } public Cysharp.Threading.Tasks.UniTask CurrentHandlerTask { get => _currentHandlerTask; set { if (_currentHandlerTask.Status == UniTaskStatus.Pending) throw new Exception($"{nameof(CurrentHandlerTask)} can't be set if existing task isn't completed yet! Please make sure you await existing task."); _currentHandlerTask = value; } } public AsyncCallbackObservableContainer(Observable<T> coreCoreObservable) => CoreObservable = coreCoreObservable; } public class AsyncHandler<T> { private readonly Func<T, Cysharp.Threading.Tasks.UniTask> _callback; public AsyncHandler(Func<T, Cysharp.Threading.Tasks.UniTask> callback) => _callback = callback; public async Cysharp.Threading.Tasks.UniTask Handle(AsyncCallbackObservableContainer<T> container, T value) { while (container.CurrentHandlerTask.Status == UniTaskStatus.Pending) await container.CurrentHandlerTask; var completionSource = new UniTaskCompletionSource(); container.CurrentHandlerTask = completionSource.Task; await _callback(value); completionSource.TrySetResult(); } } public static class SequentialAsyncSubscriptionExtensions { /// <summary> /// Subscribe awaitable callback which will be invoked in a sequence with another callbacks without overlapping. /// </summary> public static IDisposable SubscribeAsync<T>(this AsyncCallbackObservableContainer<T> container, Func<T, Cysharp.Threading.Tasks.UniTask> callback) { var seqAsyncHandler = new AsyncHandler<T>(callback); return container.CoreObservable.Subscribe(value => seqAsyncHandler.Handle(container, value).Forget()); } /// <summary> /// Subscribe awaitable callback which will be invoked in a sequence with another callbacks without overlapping. Use this, when you need to setup core Observable from container /// </summary> public static IDisposable SubscribeAsync<T>(this Observable<T> observable, AsyncCallbackObservableContainer<T> container, Func<T, Cysharp.Threading.Tasks.UniTask> callback) { var seqAsyncHandler = new AsyncHandler<T>(callback); return observable.Subscribe(value => seqAsyncHandler.Handle(container, value).Forget()); } /// <summary> /// Subscribe regular synchronous callback. Use it when you can guarantee your callback shouldn't be awaited. /// </summary> public static IDisposable Subscribe<T>(this AsyncCallbackObservableContainer<T> container, Action<T> callback) => container.CoreObservable.Subscribe(callback); } }
Beta Was this translation helpful? Give feedback.