|
| 1 | +Netty 编码器和解码器 |
| 2 | +==== |
| 3 | + |
| 4 | +Netty 的是一个复杂和先进的框架,但它不虚幻。当我们请求设置一些 key 为给定值,我们现在知道,Request 类的一个实例被创建来代表这个请求。但 Netty 并不知道 Request 对象是如何转成 Memcached 所期望的。Memcached 所期望的是字节序列;忽略使用的协议,数据在网络上传输永远是字节序列。 |
| 5 | + |
| 6 | +将 Request 对象转为 Memcached 所需的字节序列,Netty 需要用 MemcachedRequest 来编码成另外一种格式。 |
| 7 | + |
| 8 | +Netty 提供了一个抽象类称为MessageToByteEncoder。它提供了一个抽象方法,将一条消息(在本例中我们 MemcachedRequest 对象)转为字节。你显示什么信息实现通过使用 Java 泛型可以处理;例如 , MessageToByteEncoder<MemcachedRequest> 说这个编码器要编码的对象类型是 MemcachedRequest |
| 9 | + |
| 10 | +*MessageToByteEncoder 和 Java 泛型* |
| 11 | + |
| 12 | +*使用 MessageToByteEncoder 可以绑定特定的参数类型。如果你有多个不同的消息类型,在相同的编码器里,也可以使用MessageToByteEncoder<Object>,注意检查消息示例的类型即可* |
| 13 | + |
| 14 | +这也适用于解码器,除了解码器将一系列字节转换回一个对象。 |
| 15 | +这个 Netty 的提供了 ByteToMessageDecoder 类,而不是提供一个编码方法用来实现解码。在接下来的两个部分你看看如何实现一个 Memcached 解码器和编码器。在你做之前,然而,它的重要的意识到在使用 Netty 时,你不总是需要提供自己的编码器和解码器。只是现在因为没有 Netty 这样对 Memcached 的内置支持。 |
| 16 | + |
| 17 | +*编码器和解码器* |
| 18 | + |
| 19 | +*记住,编码器处理出站和译码器处理入站。这基本上意味着编码器将编码数据,写入远端。译码器将从远端读取处理数据。重要的是要记住,出站和入站是两个不同的方向。* |
| 20 | + |
| 21 | +请注意,我们的编码器和译码器不检查任何值最大大小保持实现简单。在实际实现中你最有可能想放入一些验证检查,使用 EncoderException 或DecoderException(或一个子类)如果检测到违反协议。 |
| 22 | + |
| 23 | + |
| 24 | +### 实现 Memcached 编码器 |
| 25 | + |
| 26 | +本节我们将简要介绍编码器的实现。正如我们提到的,编码器负责编码消息为一系列字节。这些字节可以通过网络发送到远端。为了发送请求,我们首先创建 MemcachedRequest 类,稍后编码器实现会编码为一系列字节。下面的清单显示了我们的 MemcachedRequest 类 |
| 27 | + |
| 28 | +Listing 14.1 Implementation of a Memcached request |
| 29 | + |
| 30 | + public class MemcachedRequest { //1 |
| 31 | + private static final Random rand = new Random(); |
| 32 | + private final int magic = 0x80;//fixed so hard coded |
| 33 | + private final byte opCode; //the operation e.g. set or get |
| 34 | + private final String key; //the key to delete, get or set |
| 35 | + private final int flags = 0xdeadbeef; //random |
| 36 | + private final int expires; //0 = item never expires |
| 37 | + private final String body; //if opCode is set, the value |
| 38 | + private final int id = rand.nextInt(); //Opaque |
| 39 | + private final long cas = 0; //data version check...not used |
| 40 | + private final boolean hasExtras; //not all ops have extras |
| 41 | + |
| 42 | + public MemcachedRequest(byte opcode, String key, String value) { |
| 43 | + this.opCode = opcode; |
| 44 | + this.key = key; |
| 45 | + this.body = value == null ? "" : value; |
| 46 | + this.expires = 0; |
| 47 | + //only set command has extras in our example |
| 48 | + hasExtras = opcode == Opcode.SET; |
| 49 | + } |
| 50 | + |
| 51 | + public MemcachedRequest(byte opCode, String key) { |
| 52 | + this(opCode, key, null); |
| 53 | + } |
| 54 | + |
| 55 | + public int magic() { //2 |
| 56 | + return magic; |
| 57 | + } |
| 58 | + |
| 59 | + public int opCode() { //3 |
| 60 | + return opCode; |
| 61 | + } |
| 62 | + |
| 63 | + public String key() { //4 |
| 64 | + return key; |
| 65 | + } |
| 66 | + |
| 67 | + public int flags() { //5 |
| 68 | + return flags; |
| 69 | + } |
| 70 | + |
| 71 | + public int expires() { //6 |
| 72 | + return expires; |
| 73 | + } |
| 74 | + |
| 75 | + public String body() { //7 |
| 76 | + return body; |
| 77 | + } |
| 78 | + |
| 79 | + public int id() { //8 |
| 80 | + return id; |
| 81 | + } |
| 82 | + |
| 83 | + public long cas() { //9 |
| 84 | + return cas; |
| 85 | + } |
| 86 | + |
| 87 | + public boolean hasExtras() { //10 |
| 88 | + return hasExtras; |
| 89 | + } |
| 90 | + } |
| 91 | + |
| 92 | +1. 这个类将会发送请求到 Memcached server |
| 93 | +2. 幻数,它可以用来标记文件或者协议的格式 |
| 94 | +3. opCode,反应了响应的操作已经创建了 |
| 95 | +4. 执行操作的 key |
| 96 | +5. 使用的额外的 flag |
| 97 | +6. 表明到期时间 |
| 98 | +7. body |
| 99 | +8. 请求的 id。这个id将在响应中回显。 |
| 100 | +9. compare-and-check 的值 |
| 101 | +10. 如果有额外的使用,将返回 true |
| 102 | + |
| 103 | +你如果想实现 Memcached 的其余部分协议,你只需要将 client.op*(op * 任何新的操作添加)转换为其中一个方法请求。我们需要两个更多的支持类,在下一个清单所示 |
| 104 | + |
| 105 | +Listing 14.2 Possible Memcached operation codes and response statuses |
| 106 | + |
| 107 | + public class Status { |
| 108 | + public static final short NO_ERROR = 0x0000; |
| 109 | + public static final short KEY_NOT_FOUND = 0x0001; |
| 110 | + public static final short KEY_EXISTS = 0x0002; |
| 111 | + public static final short VALUE_TOO_LARGE = 0x0003; |
| 112 | + public static final short INVALID_ARGUMENTS = 0x0004; |
| 113 | + public static final short ITEM_NOT_STORED = 0x0005; |
| 114 | + public static final short INC_DEC_NON_NUM_VAL = 0x0006; |
| 115 | + } |
| 116 | + public class Opcode { |
| 117 | + public static final byte GET = 0x00; |
| 118 | + public static final byte SET = 0x01; |
| 119 | + public static final byte DELETE = 0x04; |
| 120 | + } |
| 121 | + |
| 122 | +一个 Opcode 告诉 Memcached 要执行哪些操作。每个操作都由一个字节表示。同样的,当 Memcached 响应一个请求,响应头中包含两个字节代表响应状态。状态和 Opcode 类表示这些 Memcached 的构造。这些操作码可以使用当你构建一个新的 MemcachedRequest 指定哪个行动应该由它引发的。 |
| 123 | + |
| 124 | +但现在可以集中精力在编码器上: |
| 125 | + |
| 126 | +Listing 14.3 MemcachedRequestEncoder implementation |
| 127 | + |
| 128 | + public class MemcachedRequestEncoder extends |
| 129 | + MessageToByteEncoder<MemcachedRequest> { //1 |
| 130 | + @Override |
| 131 | + protected void encode(ChannelHandlerContext ctx, MemcachedRequest msg, |
| 132 | + ByteBuf out) throws Exception { //2 |
| 133 | + byte[] key = msg.key().getBytes(CharsetUtil.UTF_8); |
| 134 | + byte[] body = msg.body().getBytes(CharsetUtil.UTF_8); |
| 135 | + //total size of the body = key size + content size + extras size //3 |
| 136 | + int bodySize = key.length + body.length + (msg.hasExtras() ? 8 : 0); |
| 137 | + |
| 138 | + //write magic byte //4 |
| 139 | + out.writeByte(msg.magic()); |
| 140 | + //write opcode byte //5 |
| 141 | + out.writeByte(msg.opCode()); |
| 142 | + //write key length (2 byte) //6 |
| 143 | + out.writeShort(key.length); //key length is max 2 bytes i.e. a Java short //7 |
| 144 | + //write extras length (1 byte) |
| 145 | + int extraSize = msg.hasExtras() ? 0x08 : 0x0; |
| 146 | + out.writeByte(extraSize); |
| 147 | + //byte is the data type, not currently implemented in Memcached but required //8 |
| 148 | + out.writeByte(0); |
| 149 | + //next two bytes are reserved, not currently implemented but are required //9 |
| 150 | + out.writeShort(0); |
| 151 | + |
| 152 | + //write total body length ( 4 bytes - 32 bit int) //10 |
| 153 | + out.writeInt(bodySize); |
| 154 | + //write opaque ( 4 bytes) - a 32 bit int that is returned in the response //11 |
| 155 | + out.writeInt(msg.id()); |
| 156 | + |
| 157 | + //write CAS ( 8 bytes) |
| 158 | + out.writeLong(msg.cas()); //24 byte header finishes with the CAS //12 |
| 159 | + |
| 160 | + if (msg.hasExtras()) { |
| 161 | + //write extras (flags and expiry, 4 bytes each) - 8 bytes total //13 |
| 162 | + out.writeInt(msg.flags()); |
| 163 | + out.writeInt(msg.expires()); |
| 164 | + } |
| 165 | + //write key //14 |
| 166 | + out.writeBytes(key); |
| 167 | + //write value //15 |
| 168 | + out.writeBytes(body); |
| 169 | + } |
| 170 | + } |
| 171 | + |
| 172 | +1. 该类是负责编码 MemachedRequest 为一系列字节 |
| 173 | +2. 转换的 key 和实际请求的 body 到字节数组 |
| 174 | +3. 计算 body 大小 |
| 175 | +4. 写幻数到 ByteBuf 字节 |
| 176 | +5. 写 opCode 作为字节 |
| 177 | +6. 写 key 长度z作为 short |
| 178 | +7. 编写额外的长度作为字节 |
| 179 | +8. 写数据类型,这总是0,因为目前不是在 Memcached,但可用于使用 |
| 180 | +后来的版本 |
| 181 | +9. 为保留字节写为 short ,后面的 Memcached 版本可能使用 |
| 182 | +10. 写 body 的大小作为 long |
| 183 | +11. 写 opaque 作为 int |
| 184 | +12. 写 cas 作为 long。这个是头文件的最后部分,在 body 的开始 |
| 185 | +13. 编写额外的 flag 和到期时间为 int |
| 186 | +14. 写 key |
| 187 | +15. 这个请求完成后 写 body。 |
| 188 | + |
| 189 | +总结,编码器 使用 Netty 的 ByteBuf 处理请求,编码 MemcachedRequest 成一套正确排序的字节。详细步骤为: |
| 190 | + |
| 191 | + |
| 192 | +* 写幻数字节。 |
| 193 | +* 写 opcode 字节。 |
| 194 | +* 写 key 长度(2字节)。 |
| 195 | +* 写额外的长度(1字节)。 |
| 196 | +* 写数据类型(1字节)。 |
| 197 | +* 为保留字节写 null 字节(2字节)。 |
| 198 | +* 写 body 长度(4字节- 32位整数)。 |
| 199 | +* 写 opaque(4个字节,一个32位整数在响应中返回)。 |
| 200 | +* 写 CAS(8个字节)。 |
| 201 | +* 写 额外的(flag 和 到期,4字节)= 8个字节 |
| 202 | +* 写 key |
| 203 | +* 写 值 |
| 204 | + |
| 205 | +无论你放入什么到输出缓冲区( 调用 ByteBuf) Netty 的将向服务器发送被写入请求。下一节将展示如何进行反向通过解码器工作。 |
| 206 | + |
| 207 | +### 实现 Memcached 编码器 |
| 208 | + |
| 209 | +将 MemcachedRequest 对象转为 字节序列,Memcached 仅需将字节转到响应对象返回即可。 |
| 210 | + |
| 211 | +先见一个 POJO: |
| 212 | + |
| 213 | +Listing 14.7 Implementation of a MemcachedResponse |
| 214 | + |
| 215 | + public class MemcachedResponse { //1 |
| 216 | + private final byte magic; |
| 217 | + private final byte opCode; |
| 218 | + private byte dataType; |
| 219 | + private final short status; |
| 220 | + private final int id; |
| 221 | + private final long cas; |
| 222 | + private final int flags; |
| 223 | + private final int expires; |
| 224 | + private final String key; |
| 225 | + private final String data; |
| 226 | + |
| 227 | + public MemcachedResponse(byte magic, byte opCode, |
| 228 | + byte dataType, short status, |
| 229 | + int id, long cas, |
| 230 | + int flags, int expires, String key, String data) { |
| 231 | + this.magic = magic; |
| 232 | + this.opCode = opCode; |
| 233 | + this.dataType = dataType; |
| 234 | + this.status = status; |
| 235 | + this.id = id; |
| 236 | + this.cas = cas; |
| 237 | + this.flags = flags; |
| 238 | + this.expires = expires; |
| 239 | + this.key = key; |
| 240 | + this.data = data; |
| 241 | + } |
| 242 | + |
| 243 | + public byte magic() { //2 |
| 244 | + return magic; |
| 245 | + } |
| 246 | + |
| 247 | + public byte opCode() { //3 |
| 248 | + return opCode; |
| 249 | + } |
| 250 | + |
| 251 | + public byte dataType() { //4 |
| 252 | + return dataType; |
| 253 | + } |
| 254 | + |
| 255 | + public short status() { //5 |
| 256 | + return status; |
| 257 | + } |
| 258 | + |
| 259 | + public int id() { //6 |
| 260 | + return id; |
| 261 | + } |
| 262 | + |
| 263 | + public long cas() { //7 |
| 264 | + return cas; |
| 265 | + } |
| 266 | + |
| 267 | + public int flags() { //8 |
| 268 | + return flags; |
| 269 | + } |
| 270 | + |
| 271 | + public int expires() { //9 |
| 272 | + return expires; |
| 273 | + } |
| 274 | + |
| 275 | + public String key() { //10 |
| 276 | + return key; |
| 277 | + } |
| 278 | + |
| 279 | + public String data() { //11 |
| 280 | + return data; |
| 281 | + } |
| 282 | + } |
| 283 | + |
| 284 | +1. 该类,代表从 Memcached 服务器返回的响应 |
| 285 | +2. 幻数 |
| 286 | +3. opCode,这反映了创建操作的响应 |
| 287 | +4. 数据类型,这表明这个是基于二进制还是文本 |
| 288 | +5. 响应的状态,这表明如果请求是成功的 |
| 289 | +6. 惟一的 id |
| 290 | +7. compare-and-set 值 |
| 291 | +8. 使用额外的 flag |
| 292 | +9. 表示该值存储的一个有效期 |
| 293 | +10. 响应创建的 key |
| 294 | +11. 实际数据 |
| 295 | + |
| 296 | +下面为 MemcachedResponseDecoder, 使用了 ByteToMessageDecoder 基类,用于将 字节序列转为 MemcachedResponse |
| 297 | + |
| 298 | +Listing 14.4 MemcachedResponseDecoder class |
| 299 | + |
| 300 | + public class MemcachedResponseDecoder extends ByteToMessageDecoder { //1 |
| 301 | + private enum State { //2 |
| 302 | + Header, |
| 303 | + Body |
| 304 | + } |
| 305 | + |
| 306 | + private State state = State.Header; |
| 307 | + private int totalBodySize; |
| 308 | + private byte magic; |
| 309 | + private byte opCode; |
| 310 | + private short keyLength; |
| 311 | + private byte extraLength; |
| 312 | + private short status; |
| 313 | + private int id; |
| 314 | + private long cas; |
| 315 | + |
| 316 | + @Override |
| 317 | + protected void decode(ChannelHandlerContext ctx, ByteBuf in, |
| 318 | + List<Object> out) { |
| 319 | + switch (state) { //3 |
| 320 | + case Header: |
| 321 | + if (in.readableBytes() < 24) { |
| 322 | + return;//response header is 24 bytes //4 |
| 323 | + } |
| 324 | + magic = in.readByte(); //5 |
| 325 | + opCode = in.readByte(); |
| 326 | + keyLength = in.readShort(); |
| 327 | + extraLength = in.readByte(); |
| 328 | + in.skipBytes(1); |
| 329 | + status = in.readShort(); |
| 330 | + totalBodySize = in.readInt(); |
| 331 | + id = in.readInt(); //referred to in the protocol spec as opaque |
| 332 | + cas = in.readLong(); |
| 333 | + |
| 334 | + state = State.Body; |
| 335 | + case Body: |
| 336 | + if (in.readableBytes() < totalBodySize) { |
| 337 | + return; //until we have the entire payload return //6 |
| 338 | + } |
| 339 | + int flags = 0, expires = 0; |
| 340 | + int actualBodySize = totalBodySize; |
| 341 | + if (extraLength > 0) { //7 |
| 342 | + flags = in.readInt(); |
| 343 | + actualBodySize -= 4; |
| 344 | + } |
| 345 | + if (extraLength > 4) { //8 |
| 346 | + expires = in.readInt(); |
| 347 | + actualBodySize -= 4; |
| 348 | + } |
| 349 | + String key = ""; |
| 350 | + if (keyLength > 0) { //9 |
| 351 | + ByteBuf keyBytes = in.readBytes(keyLength); |
| 352 | + key = keyBytes.toString(CharsetUtil.UTF_8); |
| 353 | + actualBodySize -= keyLength; |
| 354 | + } |
| 355 | + ByteBuf body = in.readBytes(actualBodySize); //10 |
| 356 | + String data = body.toString(CharsetUtil.UTF_8); |
| 357 | + out.add(new MemcachedResponse( //1 |
| 358 | + magic, |
| 359 | + opCode, |
| 360 | + status, |
| 361 | + id, |
| 362 | + cas, |
| 363 | + flags, |
| 364 | + expires, |
| 365 | + key, |
| 366 | + data |
| 367 | + )); |
| 368 | + |
| 369 | + state = State.Header; |
| 370 | + } |
| 371 | + |
| 372 | + } |
| 373 | + } |
| 374 | + |
| 375 | +1. 类负责创建的 MemcachedResponse 读取字节 |
| 376 | +2. 代表当前解析状态,这意味着我们需要解析的头或 body |
| 377 | +3. 根据解析状态切换 |
| 378 | +4. 如果不是至少24个字节是可读的,它不可能读整个头部,所以返回这里,等待再通知一次数据准备阅读 |
| 379 | +5. 阅读所有头的字段 |
| 380 | +6. 检查是否足够的数据是可读用来读取完整的响应的 body。长度是从头读取 |
| 381 | +7. 检查如果有任何额外的 flag 用于读,如果是这样做 |
| 382 | +8. 检查如果响应包含一个 expire 字段,有就读它 |
| 383 | +9. 检查响应是否包含一个 key ,有就读它 |
| 384 | +10. 读实际的 body 的 payload |
| 385 | +11. 从前面读取字段和数据构造一个新的 MemachedResponse |
| 386 | + |
| 387 | +所以在实现发生了什么事?我们知道一个 Memcached 响应有24位头;我们不知道是否所有数据,响应将被包含在输入 ByteBuf ,当解码方法调用时。这是因为底层网络堆栈可能将数据分解成块。所以确保我们只解码当我们有足够的数据,这段代码检查是否可用可读的字节的数量至少是24。一旦我们有24个字节,我们可以确定整个消息有多大,因为这个信息包含在24位头。 |
| 388 | + |
| 389 | +当我们解码整个消息,我们创建一个 MemcachedResponse 并将其添加到输出列表。任何对象添加到该列表将被转发到下一个ChannelInboundHandler 在 ChannelPipeline,因此允许处理。 |
| 390 | + |
0 commit comments