Skip to main content
Code Review

Return to Question

replaced http://stackoverflow.com/ with https://stackoverflow.com/
Source Link

I have a couple of calls that I need to cancel a subscription based on a CancellationToken. I saw How to cancel an observable sequence How to cancel an observable sequence but I needed a more generic way. I created an extension method that seems to work.

public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource> source, CancellationToken token)
{
 var cancelationObservable =
 Observable.Create<object>(observer => token.Register(() => observer.OnNext(null)));
 return source.TakeUntil(cancelationObservable);
}

Is there a built in way of Rx to cover this? Is Observable.Create the best way to trigger the pulse?

This seems so basic that I feel I may be missing something.

I have a couple of calls that I need to cancel a subscription based on a CancellationToken. I saw How to cancel an observable sequence but I needed a more generic way. I created an extension method that seems to work.

public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource> source, CancellationToken token)
{
 var cancelationObservable =
 Observable.Create<object>(observer => token.Register(() => observer.OnNext(null)));
 return source.TakeUntil(cancelationObservable);
}

Is there a built in way of Rx to cover this? Is Observable.Create the best way to trigger the pulse?

This seems so basic that I feel I may be missing something.

I have a couple of calls that I need to cancel a subscription based on a CancellationToken. I saw How to cancel an observable sequence but I needed a more generic way. I created an extension method that seems to work.

public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource> source, CancellationToken token)
{
 var cancelationObservable =
 Observable.Create<object>(observer => token.Register(() => observer.OnNext(null)));
 return source.TakeUntil(cancelationObservable);
}

Is there a built in way of Rx to cover this? Is Observable.Create the best way to trigger the pulse?

This seems so basic that I feel I may be missing something.

link formatting
Source Link
svick
  • 24.5k
  • 4
  • 53
  • 89

I have a couple of calls that I need to cancel a subscription based on a CancellationToken. I saw this http://stackoverflow.com/questions/6759833/how-to-cancel-an-observable-sequenceHow to cancel an observable sequence but I needed a more generic way. I created an extension method that seems to work.

public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource> source, CancellationToken token)
{
 var cancelationObservable =
 Observable.Create<object>(observer => token.Register(() => observer.OnNext(null)));
 return source.TakeUntil(cancelationObservable);
}

Is there a built in way of Rx to cover this? Is Observable.Create the best way to trigger the pulse?

This seems so basic that I feel I may be missing something.

I have a couple of calls that I need to cancel a subscription based on a CancellationToken. I saw this http://stackoverflow.com/questions/6759833/how-to-cancel-an-observable-sequence but I needed a more generic way. I created an extension method that seems to work.

public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource> source, CancellationToken token)
{
 var cancelationObservable =
 Observable.Create<object>(observer => token.Register(() => observer.OnNext(null)));
 return source.TakeUntil(cancelationObservable);
}

Is there a built in way of Rx to cover this? Is Observable.Create the best way to trigger the pulse?

This seems so basic that I feel I may be missing something.

I have a couple of calls that I need to cancel a subscription based on a CancellationToken. I saw How to cancel an observable sequence but I needed a more generic way. I created an extension method that seems to work.

public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource> source, CancellationToken token)
{
 var cancelationObservable =
 Observable.Create<object>(observer => token.Register(() => observer.OnNext(null)));
 return source.TakeUntil(cancelationObservable);
}

Is there a built in way of Rx to cover this? Is Observable.Create the best way to trigger the pulse?

This seems so basic that I feel I may be missing something.

edited tags
Link
200_success
  • 145.5k
  • 22
  • 190
  • 479
Made code simpler since Token.Register returns an IDisposble.
Source Link
CharlesNRice
  • 4.4k
  • 16
  • 18
Loading
Source Link
CharlesNRice
  • 4.4k
  • 16
  • 18
Loading
lang-cs

AltStyle によって変換されたページ (->オリジナル) /