JavaScript is disabled on your browser.
Skip navigation links
  • Summary:
  • Nested |
  • Field |
  • Constr |
  • Method
  • Detail:
  • Field |
  • Constr |
  • Method
io.reactivex.rxjava3.subjects

Class ReplaySubject<T>

  • Type Parameters:
    T - the value type
    All Implemented Interfaces:
    ObservableSource<T>, Observer<T>


    public final class ReplaySubject<T>
    extends Subject<T>
    Replays events (in a configurable bounded or unbounded manner) to current and late Observers.

    This subject does not have a public constructor by design; a new empty instance of this ReplaySubject can be created via the following create methods that allow specifying the retention policy for items:

    • create() - creates an empty, unbounded ReplaySubject that caches all items and the terminal event it receives.

    • create(int) - creates an empty, unbounded ReplaySubject with a hint about how many total items one expects to retain.
    • createWithSize(int) - creates an empty, size-bound ReplaySubject that retains at most the given number of the latest item it receives.

    • createWithTime(long, TimeUnit, Scheduler) - creates an empty, time-bound ReplaySubject that retains items no older than the specified time amount.

    • createWithTimeAndSize(long, TimeUnit, Scheduler, int) - creates an empty, time- and size-bound ReplaySubject that retains at most the given number items that are also not older than the specified time amount.

    Since a Subject is conceptionally derived from the Processor type in the Reactive Streams specification, nulls are not allowed (Rule 2.13) as parameters to onNext(Object) and onError(Throwable). Such calls will result in a NullPointerException being thrown and the subject's state is not changed.

    Since a ReplaySubject is an Observable, it does not support backpressure.

    When this ReplaySubject is terminated via onError(Throwable) or onComplete(), late Observers will receive the retained/cached items first (if any) followed by the respective terminal event. If the ReplaySubject has a time-bound, the age of the retained/cached items are still considered when replaying and thus it may result in no items being emitted before the terminal event.

    Once an Observer has subscribed, it will receive items continuously from that point on. Bounds only affect how many past items a new Observer will receive before it catches up with the live event feed.

    Even though ReplaySubject implements the Observer interface, calling onSubscribe is not required (Rule 2.12) if the subject is used as a standalone source. However, calling onSubscribe after the ReplaySubject reached its terminal state will result in the given Disposable being disposed immediately.

    Calling onNext(Object), onError(Throwable) and onComplete() is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). The Subject.toSerialized() method available to all Subjects provides such serialization and also protects against reentrance (i.e., when a downstream Observer consuming this subject also wants to call onNext(Object) on this subject recursively).

    This ReplaySubject supports the standard state-peeking methods hasComplete(), hasThrowable(), getThrowable() and hasObservers() as well as means to read the retained/cached items in a non-blocking and thread-safe manner via hasValue(), getValue(), getValues() or getValues(Object[]).

    Note that due to concurrency requirements, a size- and time-bounded ReplaySubject may hold strong references to more source emissions than specified while it isn't terminated yet. Use the cleanupBuffer() to allow such inaccessible items to be cleaned up by GC once no consumer references it anymore.

    Scheduler:
    ReplaySubject does not operate by default on a particular Scheduler and the Observers get notified on the thread the respective onXXX methods were invoked. Time-bound ReplaySubjects use the given Scheduler in their create methods as time source to timestamp of items received for the age checks.
    Error handling:
    When the onError(Throwable) is called, the ReplaySubject enters into a terminal state and emits the same Throwable instance to the last set of Observers. During this emission, if one or more Observers dispose their respective Disposables, the Throwable is delivered to the global error handler via RxJavaPlugins.onError(Throwable) (multiple times if multiple Observers cancel at once). If there were no Observers subscribed to this ReplaySubject when the onError() was called, the global error handler is not invoked.

    Example usage:

     
     ReplaySubject<Object> subject = ReplaySubject.create();
     subject.onNext("one");
     subject.onNext("two");
     subject.onNext("three");
     subject.onComplete();
     // both of the following will get the onNext/onComplete calls from above
     subject.subscribe(observer1);
     subject.subscribe(observer2);
      
    • Method Detail

      • create

        @CheckReturnValue
         @NonNull
        public static <T> @NonNull ReplaySubject<T> create()
        Creates an unbounded replay subject.

        The internal buffer is backed by an ArrayList and starts with an initial capacity of 16. Once the number of items reaches this capacity, it will grow as necessary (usually by 50%). However, as the number of items grows, this causes frequent array reallocation and copying, and may hurt performance and latency. This can be avoided with the create(int) overload which takes an initial capacity parameter and can be tuned to reduce the array reallocation frequency as needed.

        Type Parameters:
        T - the type of items observed and emitted by the Subject
        Returns:
        the created subject
      • create

        @CheckReturnValue
         @NonNull
        public static <T> @NonNull ReplaySubject<T> create(int capacityHint)
        Creates an unbounded replay subject with the specified initial buffer capacity.

        Use this method to avoid excessive array reallocation while the internal buffer grows to accommodate new items. For example, if you know that the buffer will hold 32k items, you can ask the ReplaySubject to preallocate its internal array with a capacity to hold that many items. Once the items start to arrive, the internal array won't need to grow, creating less garbage and no overhead due to frequent array-copying.

        Type Parameters:
        T - the type of items observed and emitted by the Subject
        Parameters:
        capacityHint - the initial buffer capacity
        Returns:
        the created subject
        Throws:
        IllegalArgumentException - if capacityHint is non-positive
      • createWithSize

        @CheckReturnValue
         @NonNull
        public static <T> @NonNull ReplaySubject<T> createWithSize(int maxSize)
        Creates a size-bounded replay subject.

        In this setting, the ReplaySubject holds at most size items in its internal buffer and discards the oldest item.

        When observers subscribe to a terminated ReplaySubject, they are guaranteed to see at most size onNext events followed by a termination event.

        If an observer subscribes while the ReplaySubject is active, it will observe all items in the buffer at that point in time and each item observed afterwards, even if the buffer evicts items due to the size constraint in the mean time. In other words, once an Observer subscribes, it will receive items without gaps in the sequence.

        Type Parameters:
        T - the type of items observed and emitted by the Subject
        Parameters:
        maxSize - the maximum number of buffered items
        Returns:
        the created subject
        Throws:
        IllegalArgumentException - if maxSize is non-positive
      • createWithTime

        @CheckReturnValue
         @NonNull
        public static <T> @NonNull ReplaySubject<T> createWithTime(long maxAge,
         @NonNull TimeUnit unit,
         @NonNull Scheduler scheduler)
        Creates a time-bounded replay subject.

        In this setting, the ReplaySubject internally tags each observed item with a timestamp value supplied by the Scheduler and keeps only those whose age is less than the supplied time value converted to milliseconds. For example, an item arrives at T=0 and the max age is set to 5; at T>=5 this first item is then evicted by any subsequent item or termination event, leaving the buffer empty.

        Once the subject is terminated, observers subscribing to it will receive items that remained in the buffer after the terminal event, regardless of their age.

        If an observer subscribes while the ReplaySubject is active, it will observe only those items from within the buffer that have an age less than the specified time, and each item observed thereafter, even if the buffer evicts items due to the time constraint in the mean time. In other words, once an observer subscribes, it observes items without gaps in the sequence except for any outdated items at the beginning of the sequence.

        Note that terminal notifications (onError and onComplete) trigger eviction as well. For example, with a max age of 5, the first item is observed at T=0, then an onComplete notification arrives at T=10. If an observer subscribes at T=11, it will find an empty ReplaySubject with just an onComplete notification.

        Type Parameters:
        T - the type of items observed and emitted by the Subject
        Parameters:
        maxAge - the maximum age of the contained items
        unit - the time unit of time
        scheduler - the Scheduler that provides the current time
        Returns:
        the created subject
        Throws:
        NullPointerException - if unit or scheduler is null
        IllegalArgumentException - if maxAge is non-positive
      • createWithTimeAndSize

        @CheckReturnValue
         @NonNull
        public static <T> @NonNull ReplaySubject<T> createWithTimeAndSize(long maxAge,
         @NonNull TimeUnit unit,
         @NonNull Scheduler scheduler,
         int maxSize)
        Creates a time- and size-bounded replay subject.

        In this setting, the ReplaySubject internally tags each received item with a timestamp value supplied by the Scheduler and holds at most size items in its internal buffer. It evicts items from the start of the buffer if their age becomes less-than or equal to the supplied age in milliseconds or the buffer reaches its size limit.

        When observers subscribe to a terminated ReplaySubject, they observe the items that remained in the buffer after the terminal notification, regardless of their age, but at most size items.

        If an observer subscribes while the ReplaySubject is active, it will observe only those items from within the buffer that have age less than the specified time and each subsequent item, even if the buffer evicts items due to the time constraint in the mean time. In other words, once an observer subscribes, it observes items without gaps in the sequence except for the outdated items at the beginning of the sequence.

        Note that terminal notifications (onError and onComplete) trigger eviction as well. For example, with a max age of 5, the first item is observed at T=0, then an onComplete notification arrives at T=10. If an observer subscribes at T=11, it will find an empty ReplaySubject with just an onComplete notification.

        Type Parameters:
        T - the type of items observed and emitted by the Subject
        Parameters:
        maxAge - the maximum age of the contained items
        unit - the time unit of time
        maxSize - the maximum number of buffered items
        scheduler - the Scheduler that provides the current time
        Returns:
        the created subject
        Throws:
        NullPointerException - if unit or scheduler is null
        IllegalArgumentException - if maxAge or maxSize is non-positive
      • subscribeActual

        protected void subscribeActual(Observer<? super T> observer)
        Description copied from class: Observable
        Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incoming Observers.

        There is no need to call any of the plugin hooks on the current Observable instance or the Observer; all hooks and basic safeguards have been applied by Observable.subscribe(Observer) before this method gets called.

        Specified by:
        subscribeActual in class Observable<T>
        Parameters:
        observer - the incoming Observer, never null
      • hasObservers

        @CheckReturnValue
        public boolean hasObservers()
        Description copied from class: Subject
        Returns true if the subject has any Observers.

        The method is thread-safe.

        Specified by:
        hasObservers in class Subject<T>
        Returns:
        true if the subject has any Observers
      • getThrowable

        @Nullable
         @CheckReturnValue
        public @Nullable Throwable getThrowable()
        Description copied from class: Subject
        Returns the error that caused the Subject to terminate or null if the Subject hasn't terminated yet.

        The method is thread-safe.

        Specified by:
        getThrowable in class Subject<T>
        Returns:
        the error that caused the Subject to terminate or null if the Subject hasn't terminated yet
      • getValue

        @Nullable
         @CheckReturnValue
        public T getValue()
        Returns a single value the Subject currently has or null if no such value exists.

        The method is thread-safe.

        Returns:
        a single value the Subject currently has or null if no such value exists
      • cleanupBuffer

        public void cleanupBuffer()
        Makes sure the item cached by the head node in a bounded ReplaySubject is released (as it is never part of a replay).

        By default, live bounded buffers will remember one item before the currently receivable one to ensure subscribers can always receive a continuous sequence of items. A terminated ReplaySubject automatically releases this inaccessible item.

        The method must be called sequentially, similar to the standard onXXX methods.

        History: 2.1.11 - experimental

        Since:
        2.2
      • getValues

        @CheckReturnValue
        public Object[] getValues()
        Returns an Object array containing snapshot all values of the Subject.

        The method is thread-safe.

        Returns:
        the array containing the snapshot of all values of the Subject
      • getValues

        @CheckReturnValue
        public T[] getValues(T[] array)
        Returns a typed array containing a snapshot of all values of the Subject.

        The method follows the conventions of Collection.toArray by setting the array element after the last value to null (if the capacity permits).

        The method is thread-safe.

        Parameters:
        array - the target array to copy values into if it fits
        Returns:
        the given array if the values fit into it or a new array containing all values
      • hasComplete

        @CheckReturnValue
        public boolean hasComplete()
        Description copied from class: Subject
        Returns true if the subject has reached a terminal state through a complete event.

        The method is thread-safe.

        Specified by:
        hasComplete in class Subject<T>
        Returns:
        true if the subject has reached a terminal state through a complete event
        See Also:
        Subject.hasThrowable()
      • hasValue

        @CheckReturnValue
        public boolean hasValue()
        Returns true if the subject has any value.

        The method is thread-safe.

        Returns:
        true if the subject has any value
Skip navigation links
  • Summary:
  • Nested |
  • Field |
  • Constr |
  • Method
  • Detail:
  • Field |
  • Constr |
  • Method

AltStyle によって変換されたページ (->オリジナル) /