Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 446464e

Browse files
test multiple event with order + leader
1 parent 6cd3031 commit 446464e

14 files changed

+573
-0
lines changed

‎multiple-event-demo/pom.xml

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>me.zaccoding</groupId>
8+
<artifactId>demo</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
11+
<properties>
12+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
13+
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
14+
<java.version>1.8</java.version>
15+
</properties>
16+
17+
<dependencies>
18+
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
19+
<dependency>
20+
<groupId>org.projectlombok</groupId>
21+
<artifactId>lombok</artifactId>
22+
<version>1.18.4</version>
23+
<scope>provided</scope>
24+
</dependency>
25+
26+
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
27+
<dependency>
28+
<groupId>com.google.guava</groupId>
29+
<artifactId>guava</artifactId>
30+
<version>27.0.1-jre</version>
31+
</dependency>
32+
33+
34+
<!-- https://mvnrepository.com/artifact/junit/junit -->
35+
<dependency>
36+
<groupId>junit</groupId>
37+
<artifactId>junit</artifactId>
38+
<version>4.12</version>
39+
<scope>test</scope>
40+
</dependency>
41+
</dependencies>
42+
43+
<build>
44+
<plugins>
45+
<plugin>
46+
<groupId>org.apache.maven.plugins</groupId>
47+
<artifactId>maven-compiler-plugin</artifactId>
48+
<configuration>
49+
<source>${java.version}</source>
50+
<target>>${java.version}</target>
51+
</configuration>
52+
</plugin>
53+
54+
<plugin>
55+
<artifactId>maven-assembly-plugin</artifactId>
56+
<version>2.4</version>
57+
<configuration>
58+
<finalName>demo</finalName>
59+
<archive>
60+
<manifest>
61+
<mainClass></mainClass>
62+
</manifest>
63+
</archive>
64+
<descriptorRefs>
65+
<descriptorRef>jar-with-dependencies</descriptorRef>
66+
</descriptorRefs>
67+
<appendAssemblyId>false</appendAssemblyId>
68+
</configuration>
69+
<executions>
70+
<execution>
71+
<id>make-assembly</id>
72+
<phase>package</phase>
73+
<goals>
74+
<goal>single</goal>
75+
</goals>
76+
</execution>
77+
</executions>
78+
</plugin>
79+
</plugins>
80+
</build>
81+
82+
83+
</project>
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package demo.blockchain;
2+
3+
import demo.event.BlockEvent;
4+
5+
/**
6+
* @GitHub : https://github.com/zacscoding
7+
*/
8+
public interface BlockEventListener {
9+
10+
void onBlock(BlockEvent blockEvent);
11+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package demo.blockchain;
2+
3+
/**
4+
* @GitHub : https://github.com/zacscoding
5+
*/
6+
public interface Blockchain {
7+
8+
void addListener(BlockEventListener listener);
9+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package demo.blockchain;
2+
3+
import demo.event.BlockEvent;
4+
import java.util.ArrayList;
5+
import java.util.HashMap;
6+
import java.util.List;
7+
import java.util.Map;
8+
import java.util.UUID;
9+
import java.util.concurrent.TimeUnit;
10+
11+
/**
12+
* @GitHub : https://github.com/zacscoding
13+
*/
14+
public class SimpleBlockchain extends Thread implements Blockchain {
15+
16+
private String networkName;
17+
private long bestBlockNumber;
18+
private Map<Long, String> blockStore;
19+
private List<BlockEventListener> listeners;
20+
21+
public SimpleBlockchain(String networkName) {
22+
this.bestBlockNumber = 0L;
23+
this.networkName = networkName;
24+
this.blockStore = new HashMap<>();
25+
this.listeners = new ArrayList<>();
26+
}
27+
28+
@Override
29+
public void addListener(BlockEventListener listener) {
30+
if (listener != null) {
31+
listeners.add(listener);
32+
}
33+
}
34+
35+
@Override
36+
public void run() {
37+
try {
38+
while (!Thread.currentThread().isInterrupted()) {
39+
BlockEvent newBlock = BlockEvent.builder()
40+
.networkName(networkName)
41+
.blockNumber(bestBlockNumber++)
42+
.hash(UUID.randomUUID().toString().replace("-", ""))
43+
.build();
44+
45+
blockStore.put(newBlock.getBlockNumber(), newBlock.getHash());
46+
47+
for (BlockEventListener listener : listeners) {
48+
listener.onBlock(newBlock);
49+
}
50+
51+
TimeUnit.SECONDS.sleep(5L);
52+
}
53+
} catch (InterruptedException e) {
54+
return;
55+
} catch (Exception e) {
56+
e.printStackTrace();
57+
}
58+
}
59+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package demo.db;
2+
3+
import demo.event.BlockEvent;
4+
import java.util.concurrent.ConcurrentHashMap;
5+
import java.util.concurrent.atomic.AtomicInteger;
6+
7+
/**
8+
* @GitHub : https://github.com/zacscoding
9+
*/
10+
public class BlockchainStore {
11+
12+
private ConcurrentHashMap<Long, BlockEvent> blocks = new ConcurrentHashMap<>();
13+
private long bestBlockNumber = 0L;
14+
private AtomicInteger triedCount = new AtomicInteger(0);
15+
16+
public boolean save(BlockEvent blockEvent) {
17+
if (bestBlockNumber < blockEvent.getBlockNumber()) {
18+
this.bestBlockNumber = blockEvent.getBlockNumber();
19+
}
20+
triedCount.incrementAndGet();
21+
return blocks.putIfAbsent(blockEvent.getBlockNumber(), blockEvent) == null;
22+
}
23+
24+
public boolean existBlock(long blockNumber) {
25+
boolean result = blocks.containsKey(blockNumber);
26+
return result;
27+
}
28+
29+
public long getBestBlockNumber() {
30+
return bestBlockNumber;
31+
}
32+
33+
public int saveTriedCount() {
34+
return triedCount.get();
35+
}
36+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package demo.event;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Builder;
5+
import lombok.Getter;
6+
import lombok.NoArgsConstructor;
7+
import lombok.Setter;
8+
import lombok.ToString;
9+
10+
/**
11+
* @GitHub : https://github.com/zacscoding
12+
*/
13+
@Getter
14+
@Setter
15+
@NoArgsConstructor
16+
@AllArgsConstructor
17+
@Builder
18+
@ToString
19+
public class BlockEvent {
20+
21+
private String networkName;
22+
private long blockNumber;
23+
private String hash;
24+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package demo.event;
2+
3+
import com.google.common.eventbus.AsyncEventBus;
4+
import com.google.common.eventbus.EventBus;
5+
import demo.event.BlockEvent;
6+
import demo.util.CollectorThreadFactory;
7+
import java.util.Objects;
8+
import java.util.concurrent.Executors;
9+
10+
/**
11+
* @GitHub : https://github.com/zacscoding
12+
*/
13+
public class BlockPublisher {
14+
15+
private EventBus asyncEventBus;
16+
17+
public BlockPublisher() {
18+
this.asyncEventBus = new AsyncEventBus("block-event-bus",
19+
Executors.newCachedThreadPool(new CollectorThreadFactory("block-publisher", true)));
20+
}
21+
22+
public void publish(final BlockEvent blockEvent) {
23+
if (blockEvent == null) {
24+
return;
25+
}
26+
27+
asyncEventBus.post(blockEvent);
28+
}
29+
30+
public void register(Object listener) {
31+
Objects.requireNonNull(listener, "listener must be not null");
32+
asyncEventBus.register(listener);
33+
}
34+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package demo.event;
2+
3+
import com.google.common.eventbus.AllowConcurrentEvents;
4+
import com.google.common.eventbus.Subscribe;
5+
import demo.db.BlockchainStore;
6+
import demo.event.BlockEvent;
7+
import demo.event.BlockPublisher;
8+
import demo.leader.LeaderSelector;
9+
import java.util.concurrent.TimeUnit;
10+
11+
/**
12+
* @GitHub : https://github.com/zacscoding
13+
*/
14+
public class BlockSaveListener {
15+
16+
private BlockchainStore blockchainStore;
17+
private LeaderSelector leaderSelector;
18+
19+
public BlockSaveListener(BlockchainStore blockchainStore, LeaderSelector leaderSelector,
20+
BlockPublisher blockPublisher) {
21+
22+
this.blockchainStore = blockchainStore;
23+
this.leaderSelector = leaderSelector;
24+
blockPublisher.register(this);
25+
}
26+
27+
@Subscribe
28+
@AllowConcurrentEvents
29+
public void onBlock(BlockEvent blockEvent) {
30+
String jobId = blockEvent.getNetworkName() + blockEvent.getBlockNumber();
31+
int order = leaderSelector.getJobOrder(jobId);
32+
33+
try {
34+
if (!leaderSelector.isTakenLeadership(order)) {
35+
int sleepSeconds = Math.min(order, 5);
36+
System.out.printf("[%s] Received block : %s but could not took leadership. so wait %d sec.\n",
37+
getThreadName(), blockEvent, sleepSeconds);
38+
TimeUnit.SECONDS.sleep(sleepSeconds);
39+
}
40+
} catch (InterruptedException e) {
41+
Thread.currentThread().interrupt();
42+
System.out.println("InterruptedException exception occur");
43+
return;
44+
}
45+
46+
if (blockchainStore.existBlock(blockEvent.getBlockNumber())) {
47+
System.out.printf("## [%s] Skip : %s because already stored\n", getThreadName(), blockEvent);
48+
return;
49+
}
50+
51+
if (blockchainStore.save(blockEvent)) {
52+
System.out.printf("[%s] Success to save %s\n", getThreadName(), blockEvent);
53+
}
54+
}
55+
56+
private String getThreadName() {
57+
Thread currentThread = Thread.currentThread();
58+
return currentThread.getName() + "-" + currentThread.getId();
59+
}
60+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package demo.leader;
2+
3+
/**
4+
* @GitHub : https://github.com/zacscoding
5+
*/
6+
public interface LeaderSelector {
7+
8+
int leaderNumber = 0;
9+
10+
/**
11+
* Get order about this job id that started with 0 and increase by 1 during cache time
12+
*/
13+
int getJobOrder(String jobId);
14+
15+
default boolean isTakenLeadership(int order) {
16+
return order == leaderNumber;
17+
}
18+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package demo.leader;
2+
3+
import com.google.common.cache.CacheBuilder;
4+
import com.google.common.cache.CacheLoader;
5+
import com.google.common.cache.LoadingCache;
6+
import java.util.concurrent.TimeUnit;
7+
import java.util.concurrent.atomic.AtomicInteger;
8+
9+
/**
10+
* @GitHub : https://github.com/zacscoding
11+
*/
12+
public class LocalCacheLeaderSelector implements LeaderSelector {
13+
14+
private LoadingCache<String, AtomicInteger> cache;
15+
16+
public LocalCacheLeaderSelector() {
17+
this.cache = CacheBuilder.newBuilder()
18+
.maximumSize(100)
19+
.expireAfterWrite(5000L, TimeUnit.MILLISECONDS)
20+
.build(
21+
new CacheLoader<String, AtomicInteger>() {
22+
@Override
23+
public AtomicInteger load(String key) {
24+
return new AtomicInteger(leaderNumber);
25+
}
26+
}
27+
);
28+
}
29+
30+
public int getJobOrder(String jobId) {
31+
return cache.getUnchecked(jobId).getAndIncrement();
32+
}
33+
}

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /