diff --git a/springboot-netty-sample/README.md b/springboot-netty-sample/README.md index ccb927c..d884eb3 100644 --- a/springboot-netty-sample/README.md +++ b/springboot-netty-sample/README.md @@ -4,6 +4,8 @@ image-20220124110420220 + + ##### Netty并不是只支持过NIO,但是不建议(depercate)阻塞I/O(BIO/OIO) - 连接数高的情况下:阻塞 -> 消耗源、效率低 @@ -31,6 +33,8 @@ BIO 下是 Thread-Per-Connection image-20220124112504631 + + ***Thread-Per-Connection:对应每个连接都有1个线程处理,1个线程同时处理:读取、解码、计算、编码、发送*** @@ -39,6 +43,8 @@ NIO 下是 Reactor image-20220124112753682 + + ***Reactor 多线程模式,由多个线程负责:读取、发送,由线程池负责处理:解码、计算、编码*** ***Reactor 主从多线程模式,由单独mainReactor 单线程负责接收请求,subReactor和 Reactor 多线程模式一致*** @@ -89,6 +95,146 @@ serverBootStrap.group(bossGroup, workerGroup); +##### Netty 支持主从 Reactor 源码分析 + +1.初始化 Main EventLoopGroup + +```java +public abstract class AbstractBootstrap, C extends Channel> implements Cloneable { + + // main Event Loop Group + volatile EventLoopGroup group; + + .... + + // 初始化 mian Event Loop Group 方法 + public B group(EventLoopGroup group) { + ObjectUtil.checkNotNull(group, "group"); + if (this.group != null) { + throw new IllegalStateException("group set already"); + } + this.group = group; + return self(); + } + + .... +} +``` + +2. 初始化 Worker EventLoopGroup + +```java +public class ServerBootstrap extends AbstractBootstrap { + + // woker Events Loop Group + private volatile EventLoopGroup childGroup; + ..... + + public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { + super.group(parentGroup); + ObjectUtil.checkNotNull(childGroup, "childGroup"); + if (this.childGroup != null) { + throw new IllegalStateException("childGroup set already"); + } + // 初始化 worker Event Loop Group 方法 + this.childGroup = childGroup; + return this; + } + + .... +} +``` + +3. MainEventLoopGroup 和 WorkerEventLoop 绑定# bind(),并实现新建和初始化 SocketChannel 绑定到 MainEventLoopGroup中 + +```java +// 绑定 地址:端口 +public ChannelFuture bind(SocketAddress localAddress) { + validate(); + return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress")); +} + +// 绑定逻辑 +private ChannelFuture doBind(final SocketAddress localAddress) { + // 初始化 & 注册 MainEventLoopGroup + final ChannelFuture regFuture = initAndRegister(); + final Channel channel = regFuture.channel(); + .... +} + +// 初始化 & 注册 MainEventLoopGroup +final ChannelFuture initAndRegister() { + Channel channel = null; + try { + // 创建新的 ServerSocketChannel + channel = channelFactory.newChannel(); + // 初始化 ServerSocketChannel 中的 Handler + init(channel); + } catch (Throwable t) { + if (channel != null) { + // channel can be null if newChannel crashed (eg SocketException("too many open files")) + channel.unsafe().closeForcibly(); + // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor + return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); + } + // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor + return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); + } + + // 将 ServerSocketChannel 注册到 MainEventLoop 中 + // 因为端口和地址 只有1个,channel只能被注册一次,所以 MainEventLoopGroup 是单线程的 + ChannelFuture regFuture = config().group().register(channel); + if (regFuture.cause() != null) { + if (channel.isRegistered()) { + channel.close(); + } else { + channel.unsafe().closeForcibly(); + } + } + ... +} + +``` + +4. WorkerEventLoopGroup 和 SocketChannel 绑定关系 + +```java +private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { + @Override + @SuppressWarnings("unchecked") + public void channelRead(ChannelHandlerContext ctx, Object msg) { + // 每次读取都是一个 SocketChannel + final Channel child = (Channel) msg; + + child.pipeline().addLast(childHandler); + + setChannelOptions(child, childOptions, logger); + + for (Entry, Object> e: childAttrs) { + child.attr((AttributeKey) e.getKey()).set(e.getValue()); + } + + try { + // 将 SocketChannel 注册到 workerEventLoopGroup中 + childGroup.register(child).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + forceClose(child, future.cause()); + } + } + }); + } catch (Throwable t) { + forceClose(child, t); + } + } +} +``` + + + + + #### 3.Netty 粘包/半包解决方案 关于半包的主要原因: @@ -100,6 +246,8 @@ serverBootStrap.group(bossGroup, workerGroup); + + 关于粘包的主要原因: - 发送方每次写入数据> 套接字缓冲区大小 @@ -162,3 +310,5 @@ TCP 是流式协议,消息无边界 + +

AltStyle によって変換されたページ (->オリジナル) /