Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

UniTask + R3 subscribe awaiting other subscribers #589

Unanswered
Antoshidza asked this question in Q&A
Discussion options

I'm new to Cysharp projects and I wonder is there a way to subscribe to Observable<T> but callback must await any other called callbacks which was invoked before it.

I'm in case where for cards game I want to somehow react to the end of turn, one module wants to draw 2 cards every time, but another wants player to select new card as a reward and while both modules execution is async only second one can take more then 1 frame, because asking player to select something on GUI might take undetermined amount of time. I want next modules to await as long as need in such case.

One of the solution would be just implement custom List<Func<UniTask>> and do manual subscription, but I really want to keep consistent with UniTask + R3, because whole project uses this query-like R3 syntax like: .Pairwise().Where()...

UPD#0:
I've wrote simple unit test to try some approach I've found here

[Test]
public void SubscribeAwaitTest()
{
 var observable = new Subject<Unit>();
 var subs = new CompositeDisposable();
 observable
 .Select(_ => Observable.FromAsync(async _ => await AsyncHandle()))
 .Concat()
 .Select(_ => Observable.FromAsync(async _ => await AsyncHandle()))
 .Concat()
 .Subscribe();
 observable.OnNext(Unit.Default);
 
 Assert.AreEqual(1, _counter);
 
 subs.Dispose();
}

As I understand the trick here is to convert async callbacks to observables and sequentially .Concat() them to original one. But to make it consistent to regular Rx syntax I guess I should implement something like AwaitObserversSequnceObservable<T> : Observable<T> which will do .Concat() inside .Subscribe() method. Is it a good way to go?

UPD#1:
For now I ended up with next solution: I wrap Observable<T> in container which holds UniTask of last invoked async UniTask callback. Client code should just take AsyncCallbackObservableContainer and subscribe to it using extensions methods. At least it works and give a possibility to use R3 syntax.

From my point of view two weakest parts of it are:

  • I need to expose AsyncCallbackObservableContainer<T> instead of regular Observable<T> which seems a little inconvenient
  • For each async callback running UniTaskCompletionSource created to control async callback flow, which I guess is GC problem
using System;
using Cysharp.Threading.Tasks;
using R3;
namespace Source.Extensions.R3
{
 public class AsyncCallbackObservableContainer<T>
 {
 // used to prevent overlapping of awaitable callbacks
 private Cysharp.Threading.Tasks.UniTask _currentHandlerTask = Cysharp.Threading.Tasks.UniTask.CompletedTask;
 
 public Observable<T> CoreObservable { get; }
 public Cysharp.Threading.Tasks.UniTask CurrentHandlerTask
 {
 get => _currentHandlerTask;
 set
 {
 if (_currentHandlerTask.Status == UniTaskStatus.Pending)
 throw new Exception($"{nameof(CurrentHandlerTask)} can't be set if existing task isn't completed yet! Please make sure you await existing task.");
 _currentHandlerTask = value;
 }
 }
 public AsyncCallbackObservableContainer(Observable<T> coreCoreObservable) 
 => CoreObservable = coreCoreObservable;
 }
 public class AsyncHandler<T>
 {
 private readonly Func<T, Cysharp.Threading.Tasks.UniTask> _callback;
 public AsyncHandler(Func<T, Cysharp.Threading.Tasks.UniTask> callback)
 => _callback = callback;
 
 public async Cysharp.Threading.Tasks.UniTask Handle(AsyncCallbackObservableContainer<T> container, T value)
 {
 while (container.CurrentHandlerTask.Status == UniTaskStatus.Pending)
 await container.CurrentHandlerTask;
 
 var completionSource = new UniTaskCompletionSource();
 container.CurrentHandlerTask = completionSource.Task;
 await _callback(value);
 
 completionSource.TrySetResult();
 }
 }
 public static class SequentialAsyncSubscriptionExtensions
 {
 /// <summary>
 /// Subscribe awaitable callback which will be invoked in a sequence with another callbacks without overlapping.
 /// </summary>
 public static IDisposable SubscribeAsync<T>(this AsyncCallbackObservableContainer<T> container, Func<T, Cysharp.Threading.Tasks.UniTask> callback)
 {
 var seqAsyncHandler = new AsyncHandler<T>(callback);
 return container.CoreObservable.Subscribe(value => seqAsyncHandler.Handle(container, value).Forget());
 }
 
 /// <summary>
 /// Subscribe awaitable callback which will be invoked in a sequence with another callbacks without overlapping. Use this, when you need to setup core Observable from container
 /// </summary>
 public static IDisposable SubscribeAsync<T>(this Observable<T> observable, AsyncCallbackObservableContainer<T> container, Func<T, Cysharp.Threading.Tasks.UniTask> callback)
 {
 var seqAsyncHandler = new AsyncHandler<T>(callback);
 return observable.Subscribe(value => seqAsyncHandler.Handle(container, value).Forget());
 }
 
 /// <summary>
 /// Subscribe regular synchronous callback. Use it when you can guarantee your callback shouldn't be awaited.
 /// </summary>
 public static IDisposable Subscribe<T>(this AsyncCallbackObservableContainer<T> container, Action<T> callback) 
 => container.CoreObservable.Subscribe(callback);
 }
}
You must be logged in to vote

Replies: 0 comments

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
Q&A
Labels
None yet
1 participant

AltStyle によって変換されたページ (->オリジナル) /