|
| 1 | +写监视器 |
| 2 | +==== |
| 3 | + |
| 4 | +这一节我们编写一个监视器:EventLogMonitor ,也就是用来接收事件的程序,用来代替 netcat 。EventLogMonitor 做下面事情: |
| 5 | + |
| 6 | +* 接收 LogEventBroadcaster 广播的 UDP DatagramPacket |
| 7 | +* 解码 LogEvent 消息 |
| 8 | +* 输出 LogEvent 消息 |
| 9 | + |
| 10 | +和之前一样,将实现自定义 ChannelHandler 的逻辑。图13.4描述了LogEventMonitor 的 ChannelPipeline 并表明了 LogEvent 的流经情况。 |
| 11 | + |
| 12 | + |
| 13 | + |
| 14 | +Figure 13.4 LogEventMonitor |
| 15 | + |
| 16 | +图中显示我们的两个自定义 ChannelHandlers,LogEventDecoder 和 LogEventHandler。首先是负责将网络上接收到的 DatagramPacket 解码到 LogEvent 消息。清单13.6显示了实现。 |
| 17 | + |
| 18 | +Listing 13.6 LogEventDecoder |
| 19 | + |
| 20 | + public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> { |
| 21 | + @Override |
| 22 | + protected void decode(ChannelHandlerContext ctx, DatagramPacket datagramPacket, List<Object> out) throws Exception { |
| 23 | + ByteBuf data = datagramPacket.content(); //1 |
| 24 | + int i = data.indexOf(0, data.readableBytes(), LogEvent.SEPARATOR); //2 |
| 25 | + String filename = data.slice(0, i).toString(CharsetUtil.UTF_8); //3 |
| 26 | + String logMsg = data.slice(i + 1, data.readableBytes()).toString(CharsetUtil.UTF_8); //4 |
| 27 | + |
| 28 | + LogEvent event = new LogEvent(datagramPacket.recipient(), System.currentTimeMillis(), |
| 29 | + filename,logMsg); //5 |
| 30 | + out.add(event); |
| 31 | + } |
| 32 | + } |
| 33 | + |
| 34 | +1. 获取 DatagramPacket 中数据的引用 |
| 35 | +2. 获取 SEPARATOR 的索引 |
| 36 | +3. 从数据中读取文件名 |
| 37 | +4. 读取数据中的日志消息 |
| 38 | +5. 构造新的 LogEvent 对象并将其添加到列表中 |
| 39 | + |
| 40 | +第二个 ChannelHandler 将执行一些首先创建的 LogEvent 消息。在这种情况下,我们只会写入 system.out。在真实的应用程序可能用到一个单独的日志文件或放到数据库。 |
| 41 | + |
| 42 | +下面的清单显示了 LogEventHandler。 |
| 43 | + |
| 44 | +Listing 13.7 LogEventHandler |
| 45 | + |
| 46 | + public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> { //1 |
| 47 | + |
| 48 | + @Override |
| 49 | + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { |
| 50 | + cause.printStackTrace(); //2 |
| 51 | + ctx.close(); |
| 52 | + } |
| 53 | + |
| 54 | + @Override |
| 55 | + public void channelRead0(ChannelHandlerContext channelHandlerContext, LogEvent event) throws Exception { |
| 56 | + StringBuilder builder = new StringBuilder(); //3 |
| 57 | + builder.append(event.getReceivedTimestamp()); |
| 58 | + builder.append(" ["); |
| 59 | + builder.append(event.getSource().toString()); |
| 60 | + builder.append("] ["); |
| 61 | + builder.append(event.getLogfile()); |
| 62 | + builder.append("] : "); |
| 63 | + builder.append(event.getMsg()); |
| 64 | + |
| 65 | + System.out.println(builder.toString()); //4 |
| 66 | + } |
| 67 | + } |
| 68 | + |
| 69 | +1. 继承 SimpleChannelInboundHandler 用于处理 LogEvent 消息 |
| 70 | +2. 在异常时,输出消息并关闭 channel |
| 71 | +3. 建立一个 StringBuilder 并构建输出 |
| 72 | +4. 打印出 LogEvent 的数据 |
| 73 | + |
| 74 | +LogEventHandler 打印出 LogEvent 的一个易读的格式,包括以下: |
| 75 | + |
| 76 | +* 收到时间戳以毫秒为单位 |
| 77 | +* 发送方的 InetSocketAddress,包括IP地址和端口 |
| 78 | +* LogEvent 生成绝对文件名 |
| 79 | +* 实际的日志消息,代表在日志文件中一行 |
| 80 | + |
| 81 | +现在我们需要安装处理程序到 ChannelPipeline ,如图13.4所示。下一个清单显示了这是如何实现 LogEventMonitor 类的一部分。 |
| 82 | + |
| 83 | +Listing 13.8 LogEventMonitor |
| 84 | + |
| 85 | + public class LogEventMonitor { |
| 86 | + |
| 87 | + private final Bootstrap bootstrap; |
| 88 | + private final EventLoopGroup group; |
| 89 | + public LogEventMonitor(InetSocketAddress address) { |
| 90 | + group = new NioEventLoopGroup(); |
| 91 | + bootstrap = new Bootstrap(); |
| 92 | + bootstrap.group(group) //1 |
| 93 | + .channel(NioDatagramChannel.class) |
| 94 | + .option(ChannelOption.SO_BROADCAST, true) |
| 95 | + .handler(new ChannelInitializer<Channel>() { |
| 96 | + @Override |
| 97 | + protected void initChannel(Channel channel) throws Exception { |
| 98 | + ChannelPipeline pipeline = channel.pipeline(); |
| 99 | + pipeline.addLast(new LogEventDecoder()); //2 |
| 100 | + pipeline.addLast(new LogEventHandler()); |
| 101 | + } |
| 102 | + }).localAddress(address); |
| 103 | + |
| 104 | + } |
| 105 | + |
| 106 | + public Channel bind() { |
| 107 | + return bootstrap.bind().syncUninterruptibly().channel(); //3 |
| 108 | + } |
| 109 | + |
| 110 | + public void stop() { |
| 111 | + group.shutdownGracefully(); |
| 112 | + } |
| 113 | + |
| 114 | + public static void main(String[] args) throws Exception { |
| 115 | + if (args.length != 1) { |
| 116 | + throw new IllegalArgumentException("Usage: LogEventMonitor <port>"); |
| 117 | + } |
| 118 | + LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(Integer.parseInt(args[0]))); //4 |
| 119 | + try { |
| 120 | + Channel channel = monitor.bind(); |
| 121 | + System.out.println("LogEventMonitor running"); |
| 122 | + |
| 123 | + channel.closeFuture().await(); |
| 124 | + } finally { |
| 125 | + monitor.stop(); |
| 126 | + } |
| 127 | + } |
| 128 | + } |
| 129 | + |
| 130 | +1. 引导 NioDatagramChannel。设置 SO_BROADCAST socket 选项。 |
| 131 | +2. 添加 ChannelHandler 到 ChannelPipeline |
| 132 | +3. 绑定的通道。注意,在使用 DatagramChannel 是没有连接,因为这些 |
| 133 | +无连接 |
| 134 | +4. 构建一个新的 LogEventMonitor |
| 135 | + |
| 136 | + |
| 137 | + |
0 commit comments