Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 99059dc

Browse files
demo1彻底迭代重写
1 parent 3fdb4c1 commit 99059dc

File tree

4 files changed

+95
-57
lines changed

4 files changed

+95
-57
lines changed

‎chapter13 Reactive Streams/java9-reactive-demo1/src/main/java/com/dockerx/reactive/jdkaction/DockerXDemoApplication.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import java.util.List;
44
import java.util.concurrent.ExecutorService;
55
import java.util.concurrent.ForkJoinPool;
6-
import java.util.concurrent.SubmissionPublisher;
76
import java.util.concurrent.TimeUnit;
87
import java.util.stream.IntStream;
98

@@ -18,19 +17,18 @@ public static void main(String[] args) {
1817
Flow_submissionPublisher();
1918
}
2019

21-
private static void demoSubscribe(SubmissionPublisher<Integer> publisher, ExecutorServiceexecService, String subscriberName){
20+
private static void demoSubscribe(DockerXDemoPublisher<Integer> publisher, String subscriberName){
2221
DockerXDemoSubscriber<Integer> subscriber = new DockerXDemoSubscriber<>(4L,subscriberName);
23-
DockerXDemoSubscription subscription = new DockerXDemoSubscription(subscriber, execService);
24-
subscriber.onSubscribe(subscription);
22+
2523
publisher.subscribe(subscriber);
2624
}
2725

2826
private static void Flow_submissionPublisher() {
2927
ExecutorService execService = ForkJoinPool.commonPool();//Executors.newFixedThreadPool(3);
30-
try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()){
31-
demoSubscribe(publisher, execService, "One");
32-
demoSubscribe(publisher, execService, "Two");
33-
demoSubscribe(publisher, execService, "Three");
28+
try (DockerXDemoPublisher<Integer> publisher = new DockerXDemoPublisher<>()){
29+
demoSubscribe(publisher, "One");
30+
demoSubscribe(publisher, "Two");
31+
demoSubscribe(publisher, "Three");
3432
IntStream.range(1, 5).forEach(publisher::submit);
3533
} finally {
3634
try {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package com.dockerx.reactive.jdkaction;
2+
3+
import java.util.concurrent.*;
4+
5+
/**
6+
* @author Author 知秋
7+
* @email fei6751803@163.com
8+
* @time Created by Auser on 2017年12月24日 23:28.
9+
*/
10+
public class DockerXDemoPublisher<T> implements Flow.Publisher<T>, AutoCloseable {
11+
private final ExecutorService executor = ForkJoinPool.commonPool(); // daemon-based
12+
private CopyOnWriteArrayList<DockerXDemoSubscription> list = new CopyOnWriteArrayList();
13+
14+
15+
public void submit(T item) {
16+
list.forEach(e -> {
17+
e.subscriber.onNext(item);
18+
});
19+
}
20+
21+
public void close() {
22+
list.forEach(e -> {
23+
e.subscriber.onComplete();
24+
});
25+
}
26+
27+
@Override
28+
public void subscribe(Flow.Subscriber<? super T> subscriber) {
29+
subscriber.onSubscribe(new DockerXDemoSubscription(subscriber, executor));
30+
list.add(new DockerXDemoSubscription(subscriber, executor));
31+
32+
}
33+
34+
static class DockerXDemoSubscription<T> implements Flow.Subscription {
35+
private final Flow.Subscriber<? super T> subscriber;
36+
private final ExecutorService executor;
37+
private Future<?> future;
38+
private T item;
39+
private boolean completed;
40+
41+
42+
public DockerXDemoSubscription(Flow.Subscriber<? super T> subscriber, ExecutorService executor) {
43+
this.subscriber = subscriber;
44+
this.executor = executor;
45+
}
46+
47+
48+
@Override
49+
public void request(long n) {
50+
if (n != 0 && !completed) {
51+
if (n < 0) {
52+
IllegalArgumentException ex = new IllegalArgumentException();
53+
executor.execute(() -> subscriber.onError(ex));
54+
} else {
55+
future = executor.submit(() -> {
56+
subscriber.onNext(item);
57+
});
58+
}
59+
} else {
60+
subscriber.onComplete();
61+
}
62+
}
63+
64+
@Override
65+
public void cancel() {
66+
completed = true;
67+
if (future != null && !future.isCancelled()) {
68+
this.future.cancel(true);
69+
}
70+
}
71+
}
72+
}

‎chapter13 Reactive Streams/java9-reactive-demo1/src/main/java/com/dockerx/reactive/jdkaction/DockerXDemoSubscriber.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,34 @@ public class DockerXDemoSubscriber<T> implements Flow.Subscriber<T>{
1313
final long bufferSize;
1414
long count;
1515

16-
public DockerXDemoSubscriber(long bufferSize,String name) {
16+
public String getName() {
17+
return name;
18+
}
19+
20+
public Flow.Subscription getSubscription() {
21+
return subscription;
22+
}
23+
24+
public DockerXDemoSubscriber(long bufferSize, String name) {
1725
this.bufferSize = bufferSize;
1826
this.name = name;
1927
}
2028

2129
public void onSubscribe(Flow.Subscription subscription) {
2230
count = bufferSize - bufferSize / 2;// 当消费一半的时候重新请求
2331
(this.subscription = subscription).request(bufferSize);
32+
System.out.println("开始onSubscribe订阅");
33+
try {
34+
Thread.sleep(100);
35+
} catch (InterruptedException e) {
36+
e.printStackTrace();
37+
}
2438
}
2539

2640
public void onNext(T item) {
2741
if (--count <= 0) subscription.request(count = bufferSize - bufferSize / 2);
28-
System.out.println(name + " received: " + item);
42+
System.out.println(" ############# " + Thread.currentThread().getName()+" "+name+" "+item+" "+count + " ############# ");
43+
System.out.println(name + " received: " + item);
2944

3045

3146

‎chapter13 Reactive Streams/java9-reactive-demo1/src/main/java/com/dockerx/reactive/jdkaction/DockerXDemoSubscription.java

Lines changed: 0 additions & 47 deletions
This file was deleted.

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /