38

I am using publishOn vs subscribeOn both on the same flux as follows:

 System.out.println("*********Calling Concurrency************");
 List<Integer> elements = new ArrayList<>();
 Flux.just(1, 2, 3, 4)
 .map(i -> i * 2)
 .log()
 .publishOn(Schedulers.elastic())
 .subscribeOn(Schedulers.parallel())
 .subscribe(elements::add);
 System.out.println("-------------------------------------");

Although, when i use both, nothing is printed in logs. But when i use only publishOn, i got the following info logs:

*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------

Is that publishOn is more recommended than subscribeOn? Or it has more preference than subscribeOn? What is the difference between the two and when to use which?

Oleh Dokuka
12.3k6 gold badges43 silver badges67 bronze badges
asked Jan 3, 2018 at 7:54
2

4 Answers 4

53

It took me sometime to understand it, maybe because publishOn is usually explained before subscribeOn, here's a hopefully more simple layman explanation.

subscribeOn means running the initial source emission e.g subscribe(), onSubscribe() and request() on a specified scheduler worker (other thread), and also the same for any subsequent operations like for example onNext/onError/onComplete, map etc and no matter the position of subscribeOn(), this behavior would happen

And if you didn't do any publishOn in the fluent calls then that's it, everything would run on such thread.

But as soon as you call publishOn() let's say in the middle, then any subsequent operator call will be run on the supplied scheduler worker to such publishOn().

here's an example

Consumer<Integer> consumer = s -> System.out.println(s + " : " + Thread.currentThread().getName());
Flux.range(1, 5)
 .doOnNext(consumer)
 .map(i -> {
 System.out.println("Inside map the thread is " + Thread.currentThread().getName());
 return i * 10;
 })
 .publishOn(Schedulers.newElastic("First_PublishOn()_thread"))
 .doOnNext(consumer)
 .publishOn(Schedulers.newElastic("Second_PublishOn()_thread"))
 .doOnNext(consumer)
 .subscribeOn(Schedulers.newElastic("subscribeOn_thread"))
 .subscribe();

The result would be


1 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
2 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
10 : First_PublishOn()_thread-6
3 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
20 : First_PublishOn()_thread-6
4 : subscribeOn_thread-4
10 : Second_PublishOn()_thread-5
30 : First_PublishOn()_thread-6
20 : Second_PublishOn()_thread-5
Inside map the thread is subscribeOn_thread-4
30 : Second_PublishOn()_thread-5
5 : subscribeOn_thread-4
40 : First_PublishOn()_thread-6
Inside map the thread is subscribeOn_thread-4
40 : Second_PublishOn()_thread-5
50 : First_PublishOn()_thread-6
50 : Second_PublishOn()_thread-5

As you can see the first doOnNext() and the following map() is running on the thread called subscribeOn_thread , that happens till any publishOn() called, then any subsequent call would run on the supplied scheduler to that publishOn() and again this will happen for any subsequent call till anyone calls another publishOn().

answered Dec 25, 2019 at 19:26
Sign up to request clarification or add additional context in comments.

2 Comments

The code you have in there is a painful brain teaser if one forgets the fact that map executes once per cyle and not ahead of time, the fact that "Inside map" is printed with the same thread name is an awesome explanation of how subscribeOn changes the thread of the emission and how publishOn changes the thread of execution based on the position of the chain rather than the instant of its calling. Very nice
What if the subscribeOn() is undefined, does it by default run in the original thread?
17

Here is a small documentation which i got:

publishOn applies in the same way as any other operator, in the middle of the subscriber chain. It takes signals from downstream and replays them upstream while executing the callback on a worker from the associated Scheduler. Consequently, it affects where the subsequent operators will execute (until another publishOn is chained in).

subscribeOn applies to the subscription process, when that backward chain is constructed. As a consequence, no matter where you place the subscribeOn in the chain, it always affects the context of the source emission. However, this does not affect the behavior of subsequent calls to publishOn. They still switch the execution context for the part of the chain after them.

and

publishOn forces the next operator (and possibly subsequent operators after the next one) to run on a different thread. Similarly, subscribeOn forces the previous operator (and possibly operators prior to the previous one) to run on a different thread.

answered Jan 3, 2018 at 8:17

1 Comment

to be precise "publishOn" it should be from upstream to downstream and not the opposite
4

publishOn is applied in the middle of a chain. It affects subsequent operators after publishOn - they will be executed on a thread picked from publishOn's scheduler.

 Flux.range(1, 2)
 .map(i -> {
 System.out.println(String.format("First map - (%s), Thread: %s", i, Thread.currentThread().getName()));
 return i;
 })
 .publishOn(schedulerA)
 .map(i -> {
 System.out.println(String.format("Second map - (%s), Thread: %s", i, Thread.currentThread().getName()));
 return i;
 })
 .blockLast();

The output is

 First map - (1), Thread: main
 First map - (2), Thread: main
 Second map - (1), Thread: scheduler-a-1
 Second map - (2), Thread: scheduler-a-1

If you place a subscribeOn in a chain, it affects the source emission in the entire chain

Flux.range(1, 2)
 .map(i -> {
 System.out.println(String.format("First map - (%s), Thread: %s", i, Thread.currentThread().getName()));
 return i;
 })
 .subscribeOn(schedulerA)
 .map(i -> {
 System.out.println(String.format("Second map - (%s), Thread: %s", i, Thread.currentThread().getName()));
 return i;
 })
 .blockLast();

The output is

 First map - (1), Thread: scheduler-a-1
 Second map - (1), Thread: scheduler-a-1
 First map - (2), Thread: scheduler-a-1
 Second map - (2), Thread: scheduler-a-1

reference article can be found here

answered Sep 9, 2023 at 9:21

Comments

2

Following is an excerpt from excellent blog post https://spring.io/blog/2019/12/13/flight-of-the-flux-3-hopping-threads-and-schedulers

publishOn

This is the basic operator you need when you want to hop threads. Incoming signals from its source are published on the given Scheduler, effectively switching threads to one of that scheduler’s workers.

This is valid for the onNext, onComplete and onError signals. That is, signals that flow from an upstream source to a downstream subscriber.

So in essence, every processing step that appears below this operator will execute on the new Scheduler s, until another operator switches again (eg. another publishOn).

Flux.fromIterable(firstListOfUrls) //contains A, B and C
 .publishOn(Schedulers.boundedElastic())
 .map(url -> blockingWebClient.get(url))
 .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Flux.fromIterable(secondListOfUrls) //contains D and E
 .publishOn(Schedulers.boundedElastic())
 .map(url -> blockingWebClient.get(url))
 .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

Output

boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C

subscribeOn

This operator changes where the subscribe method is executed. And since the subscribe signal flows upward, it directly influences where the source Flux subscribes and starts generating data.

As a consequence, it can seem to act on the parts of the reactive chain of operators upward and downward (as long as there is no publishOn thrown in the mix):

final Flux<String> fetchUrls(List<String> urls) {
 return Flux.fromIterable(urls)
 .map(url -> blockingWebClient.get(url));
}
// sample code:
fetchUrls(A, B, C)
 .subscribeOn(Schedulers.boundedElastic())
 .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
fetchUrls(D, E)
 .subscribeOn(Schedulers.boundedElastic())
 .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

Output

boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C
answered Oct 17, 2020 at 17:18

Comments

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.