Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit 437fc31

Browse files
[A] 基于Netty实现的单机版RPC
1 parent 79b8ced commit 437fc31

File tree

14 files changed

+593
-2
lines changed

14 files changed

+593
-2
lines changed

Spring-Netty/src/main/java/com/bruis/learnnetty/nio/demo01/NIOClient.java renamed to Spring-Netty/src/main/java/com/bruis/learnnetty/nio/NIOClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.bruis.learnnetty.nio.demo01;
1+
package com.bruis.learnnetty.nio;
22

33
import java.net.InetSocketAddress;
44
import java.nio.ByteBuffer;

Spring-Netty/src/main/java/com/bruis/learnnetty/nio/demo01/NIOServer.java renamed to Spring-Netty/src/main/java/com/bruis/learnnetty/nio/NIOServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.bruis.learnnetty.nio.demo01;
1+
package com.bruis.learnnetty.nio;
22

33
import java.net.InetSocketAddress;
44
import java.nio.ByteBuffer;
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.bruis.learnnetty.rpc.client;
2+
3+
import com.alibaba.fastjson.JSONObject;
4+
import com.bruis.learnnetty.rpc.utils.RequestFuture;
5+
import com.bruis.learnnetty.rpc.utils.Response;
6+
import io.netty.channel.ChannelHandlerContext;
7+
import io.netty.channel.ChannelInboundHandlerAdapter;
8+
9+
/**
10+
* @author lhy
11+
* @date 2022年2月11日
12+
*/
13+
public class ClientHandler extends ChannelInboundHandlerAdapter {
14+
@Override
15+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
16+
Response response = JSONObject.parseObject(msg.toString(), Response.class);
17+
RequestFuture.received(response);
18+
}
19+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package com.bruis.learnnetty.rpc.client;
2+
3+
import com.alibaba.fastjson.JSONObject;
4+
import com.bruis.learnnetty.rpc.utils.RequestFuture;
5+
import io.netty.bootstrap.Bootstrap;
6+
import io.netty.buffer.PooledByteBufAllocator;
7+
import io.netty.channel.ChannelFuture;
8+
import io.netty.channel.ChannelInitializer;
9+
import io.netty.channel.ChannelOption;
10+
import io.netty.channel.EventLoopGroup;
11+
import io.netty.channel.nio.NioEventLoopGroup;
12+
import io.netty.channel.socket.nio.NioSocketChannel;
13+
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
14+
import io.netty.handler.codec.LengthFieldPrepender;
15+
import io.netty.handler.codec.string.StringDecoder;
16+
import io.netty.handler.codec.string.StringEncoder;
17+
import org.springframework.util.StringUtils;
18+
19+
import java.nio.charset.Charset;
20+
21+
/**
22+
* @author lhy
23+
* @date 2022年2月16日
24+
*/
25+
public class NettyClient {
26+
public static EventLoopGroup group = null;
27+
public static Bootstrap bootstrap = null;
28+
public static ChannelFuture future = null;
29+
static {
30+
bootstrap = new Bootstrap();
31+
group = new NioEventLoopGroup();
32+
bootstrap.channel(NioSocketChannel.class);
33+
bootstrap.group(group);
34+
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
35+
final ClientHandler clientHandler = new ClientHandler();
36+
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
37+
@Override
38+
protected void initChannel(NioSocketChannel ch) throws Exception {
39+
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,
40+
0, 4, 0, 4));
41+
ch.pipeline().addLast(new StringDecoder());
42+
ch.pipeline().addLast(clientHandler);
43+
ch.pipeline().addLast(new LengthFieldPrepender(4, false));
44+
ch.pipeline().addLast(new StringEncoder(Charset.forName("utf-8")));
45+
}
46+
});
47+
try {
48+
future = bootstrap.connect("127.0.0.1", 8080).sync();
49+
} catch (InterruptedException e) {
50+
e.printStackTrace();
51+
}
52+
}
53+
54+
/**
55+
* 说明:对于这个长连接的例子中,使用了静态化,即单链接、长连接,如果是多链接的话不可使用静态化,需使用线程池。
56+
* @param msg
57+
* @return
58+
*/
59+
public Object sendRequest(Object msg, String path) {
60+
try {
61+
RequestFuture request = new RequestFuture();
62+
request.setRequest(msg);
63+
request.setPath(path);
64+
String requestStr = JSONObject.toJSONString(request);
65+
future.channel().writeAndFlush(requestStr);
66+
myselfPrint("我阻塞了", null);
67+
return request.get();
68+
} catch (Exception e) {
69+
e.printStackTrace();
70+
throw e;
71+
}
72+
}
73+
public static void main(String[] args) {
74+
NettyClient nettyClient = new NettyClient();
75+
for (int i = 0; i < 10; i++) {
76+
Object result = nettyClient.sendRequest("hello-" + i, "getUserNameById");
77+
myselfPrint("拿到结果了", result);
78+
}
79+
}
80+
81+
public static void myselfPrint(String description, Object value) {
82+
StringBuilder builder = new StringBuilder();
83+
builder.append(Thread.currentThread().getName());
84+
if (!StringUtils.isEmpty(description)) {
85+
builder.append("-").append(description);
86+
}
87+
if (!StringUtils.isEmpty(value)) {
88+
builder.append("-").append(value);
89+
}
90+
System.out.println(builder.toString());
91+
}
92+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.bruis.learnnetty.rpc.controller;
2+
3+
import com.bruis.learnnetty.rpc.utils.Remote;
4+
import org.springframework.stereotype.Controller;
5+
6+
/**
7+
* @author lhy
8+
* @date 2022年2月17日
9+
*/
10+
@Controller
11+
public class UserController {
12+
@Remote(value = "getUserNameById")
13+
public Object getUserNameById(String userId) {
14+
System.out.println(Thread.currentThread().getName() + "-> 接受到请求:" + userId);
15+
return "做了业务处理了,结果是用户编号userId为" + userId + "的用户姓名为张三";
16+
}
17+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.bruis.learnnetty.rpc.server;
2+
3+
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
4+
5+
/**
6+
* @author lhy
7+
* @date 2022年2月17日
8+
*/
9+
public class ApplicationMain {
10+
11+
private static volatile boolean running = true;
12+
13+
public static void main(String[] args) {
14+
try {
15+
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext("com.bruis.learnnetty.rpc");
16+
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
17+
try {
18+
context.stop();
19+
} catch (Exception e) {}
20+
21+
synchronized (ApplicationMain.class) {
22+
running = false;
23+
ApplicationMain.class.notify();
24+
}
25+
}));
26+
context.start();
27+
} catch (Exception e) {
28+
e.printStackTrace();
29+
System.exit(1);
30+
}
31+
System.out.println("服务器已启动");
32+
synchronized (ApplicationMain.class) {
33+
try {
34+
ApplicationMain.class.wait();
35+
} catch (Exception e) {}
36+
}
37+
}
38+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.bruis.learnnetty.rpc.server;
2+
3+
import com.bruis.learnnetty.rpc.utils.Mediator;
4+
import com.bruis.learnnetty.rpc.utils.Remote;
5+
import org.springframework.context.ApplicationListener;
6+
import org.springframework.context.event.ContextRefreshedEvent;
7+
import org.springframework.core.Ordered;
8+
import org.springframework.stereotype.Component;
9+
import org.springframework.stereotype.Controller;
10+
11+
import java.lang.reflect.Method;
12+
import java.util.Map;
13+
14+
/**
15+
* @author lhy
16+
* @date 2022年2月17日
17+
*/
18+
@Component
19+
public class InitLoadRemoteMethod implements ApplicationListener<ContextRefreshedEvent>, Ordered {
20+
21+
@Override
22+
public void onApplicationEvent(ContextRefreshedEvent context) {
23+
// 获取Spring容器中带有@Controller的注解类
24+
Map<String, Object> controllerBeans = context.getApplicationContext()
25+
.getBeansWithAnnotation(Controller.class);
26+
for (String beanName : controllerBeans.keySet()) {
27+
Object beanObj = controllerBeans.get(beanName);
28+
// 获取这个bean的方法集合
29+
Method[] methods = beanObj.getClass().getMethods();
30+
for (Method method : methods) {
31+
// 判断这个方法是否带有@Remote注解
32+
if (method.isAnnotationPresent(Remote.class)) {
33+
Remote remote = method.getAnnotation(Remote.class);
34+
// 注解的值
35+
String remoteValue = remote.value();
36+
// 缓存这个类
37+
Mediator.MethodBean methodBean = new Mediator.MethodBean();
38+
methodBean.setBean(beanObj);
39+
methodBean.setMethod(method);
40+
// @Remote的value值作为key,MethodBean作为value
41+
Mediator.methodBeans.put(remoteValue, methodBean);
42+
}
43+
}
44+
}
45+
}
46+
47+
/**
48+
* 值越小优先级越高
49+
* @return
50+
*/
51+
@Override
52+
public int getOrder() {
53+
return -1;
54+
}
55+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.bruis.learnnetty.rpc.server;
2+
3+
import org.springframework.context.ApplicationListener;
4+
import org.springframework.context.event.ContextRefreshedEvent;
5+
import org.springframework.stereotype.Component;
6+
7+
/**
8+
* @author lhy
9+
* @date 2022年2月17日
10+
*/
11+
@Component
12+
public class NettyApplicationListener implements ApplicationListener<ContextRefreshedEvent> {
13+
@Override
14+
public void onApplicationEvent(ContextRefreshedEvent event) {
15+
// 开启额外线程启动Netty服务
16+
new Thread() {
17+
@Override
18+
public void run() {
19+
NettyServer.start();
20+
}
21+
}.start();
22+
}
23+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.bruis.learnnetty.rpc.server;
2+
3+
import io.netty.bootstrap.ServerBootstrap;
4+
import io.netty.channel.ChannelFuture;
5+
import io.netty.channel.ChannelInitializer;
6+
import io.netty.channel.ChannelOption;
7+
import io.netty.channel.EventLoopGroup;
8+
import io.netty.channel.nio.NioEventLoopGroup;
9+
import io.netty.channel.socket.SocketChannel;
10+
import io.netty.channel.socket.nio.NioServerSocketChannel;
11+
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
12+
import io.netty.handler.codec.LengthFieldPrepender;
13+
import io.netty.handler.codec.string.StringDecoder;
14+
import io.netty.handler.codec.string.StringEncoder;
15+
16+
/**
17+
* 基于短连接的Netty服务端
18+
*
19+
* @author lhy
20+
* @date 2022年2月11日
21+
*/
22+
public class NettyServer {
23+
public static void start() {
24+
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
25+
EventLoopGroup workerGroup = new NioEventLoopGroup();
26+
try {
27+
ServerBootstrap serverBootstrap = new ServerBootstrap();
28+
serverBootstrap.group(bossGroup, workerGroup);
29+
serverBootstrap.channel(NioServerSocketChannel.class);
30+
31+
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128)
32+
.childHandler(new ChannelInitializer<SocketChannel>() {
33+
@Override
34+
protected void initChannel(SocketChannel ch) throws Exception {
35+
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
36+
.addLast(new StringDecoder())
37+
.addLast(new ServerHandler())
38+
.addLast(new LengthFieldPrepender(4, false))
39+
.addLast(new StringEncoder());
40+
}
41+
});
42+
ChannelFuture future = serverBootstrap.bind(8080).sync();
43+
future.channel().closeFuture().sync();
44+
} catch (Exception e) {
45+
e.printStackTrace();
46+
} finally {
47+
bossGroup.shutdownGracefully();
48+
workerGroup.shutdownGracefully();
49+
}
50+
}
51+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.bruis.learnnetty.rpc.server;
2+
3+
import com.alibaba.fastjson.JSONObject;
4+
import com.bruis.learnnetty.rpc.utils.Mediator;
5+
import com.bruis.learnnetty.rpc.utils.RequestFuture;
6+
import com.bruis.learnnetty.rpc.utils.Response;
7+
import io.netty.channel.ChannelHandler;
8+
import io.netty.channel.ChannelHandlerContext;
9+
import io.netty.channel.ChannelInboundHandlerAdapter;
10+
11+
/**
12+
* @author lhy
13+
* @date 2022年2月11日
14+
*/
15+
@ChannelHandler.Sharable
16+
public class ServerHandler extends ChannelInboundHandlerAdapter {
17+
/**
18+
* 接受客户端发送过来的请求
19+
* @param ctx
20+
* @param msg
21+
* @throws Exception
22+
*/
23+
@Override
24+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
25+
RequestFuture request = JSONObject.parseObject(msg.toString(), RequestFuture.class);
26+
Response response = Mediator.process(request);
27+
ctx.channel().writeAndFlush(JSONObject.toJSONString(response));
28+
}
29+
}

0 commit comments

Comments
(0)

AltStyle によって変換されたページ (->オリジナル) /