Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 0a2fb6e

Browse files
在此多处细节迭代
1 parent 99059dc commit 0a2fb6e

File tree

3 files changed

+14
-9
lines changed

3 files changed

+14
-9
lines changed

‎chapter13 Reactive Streams/java9-reactive-demo1/src/main/java/com/dockerx/reactive/jdkaction/DockerXDemoApplication.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
public class DockerXDemoApplication {
1515

1616
public static void main(String[] args) {
17-
Flow_submissionPublisher();
17+
Flow_customsubmissionPublisher();
1818
}
1919

2020
private static void demoSubscribe(DockerXDemoPublisher<Integer> publisher, String subscriberName){
@@ -23,7 +23,7 @@ private static void demoSubscribe(DockerXDemoPublisher<Integer> publisher, Strin
2323
publisher.subscribe(subscriber);
2424
}
2525

26-
private static void Flow_submissionPublisher() {
26+
private static void Flow_customsubmissionPublisher() {
2727
ExecutorService execService = ForkJoinPool.commonPool();//Executors.newFixedThreadPool(3);
2828
try (DockerXDemoPublisher<Integer> publisher = new DockerXDemoPublisher<>()){
2929
demoSubscribe(publisher, "One");

‎chapter13 Reactive Streams/java9-reactive-demo1/src/main/java/com/dockerx/reactive/jdkaction/DockerXDemoPublisher.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,16 @@ public class DockerXDemoPublisher<T> implements Flow.Publisher<T>, AutoCloseable
1313

1414

1515
public void submit(T item) {
16+
System.out.println("***************** 开始发布元素 item: "+item+" *****************");
1617
list.forEach(e -> {
17-
e.subscriber.onNext(item);
18+
e.future=executor.submit(() -> { e.subscriber.onNext(item);});
19+
1820
});
1921
}
2022

2123
public void close() {
2224
list.forEach(e -> {
23-
e.subscriber.onComplete();
25+
e.future=executor.submit(() -> { e.subscriber.onComplete();});
2426
});
2527
}
2628

‎chapter13 Reactive Streams/java9-reactive-demo1/src/main/java/com/dockerx/reactive/jdkaction/DockerXDemoSubscriber.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public DockerXDemoSubscriber(long bufferSize, String name) {
2727
}
2828

2929
public void onSubscribe(Flow.Subscription subscription) {
30-
count = bufferSize - bufferSize / 2;// 当消费一半的时候重新请求
30+
//count = bufferSize - bufferSize / 2;// 当消费一半的时候重新请求
3131
(this.subscription = subscription).request(bufferSize);
3232
System.out.println("开始onSubscribe订阅");
3333
try {
@@ -38,11 +38,14 @@ public void onSubscribe(Flow.Subscription subscription) {
3838
}
3939

4040
public void onNext(T item) {
41-
if (--count <= 0) subscription.request(count = bufferSize - bufferSize / 2);
42-
System.out.println(" ############# " + Thread.currentThread().getName()+" "+name+" "+item+" "+count+ " ############# ");
41+
//if (--count <= 0) subscription.request(count = bufferSize - bufferSize / 2);
42+
System.out.println(" ############# " + Thread.currentThread().getName()+" name: "+name+" item: "+item+ " ############# ");
4343
System.out.println(name + " received: " + item);
44-
45-
44+
try {
45+
Thread.sleep(10);
46+
} catch (InterruptedException e) {
47+
e.printStackTrace();
48+
}
4649

4750
}
4851

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /