node源码粗读(12):通过`net.createConnection`来看socket的Event Emitter的实现
发布于 8 年前 作者 xtx1130 3773 次浏览 来自 分享

这篇文章主要从net.createConnection入手,详细讲解socket常用的connet、data、error、close等事件是如何实现的。

socket的创建

相信用过net.createConnection都比较了解了。这个API会创建一个客户端的socket链接:

net.createConnection(options[, connectListener])

其中connectListener将被添加为返回 socket 上的 ‘connect’ 事件上的监听器。 简单了解了API,我们直奔./lib/net.js来看一下是如何实现的:

// ./lib/net.js connect构造函数
function connect(...args) {
 var normalized = normalizeArgs(args);
 var options = normalized[0];
 // ...
 var socket = new Socket(options);
 // ...
 return Socket.prototype.connect.call(socket, normalized);
}

首先我们关注一下new Socket(options)的实现:

const {
 TCP,
 TCPConnectWrap,
 constants: TCPConstants
} = process.binding('tcp_wrap');
// ... socket构造函数
function Socket(options) {
 // ...
 this._hadError = false;
 this._handle = null;
 this._parent = null;
 this._host = null;
 this[kLastWriteQueueSize] = 0;
 this[kTimeout] = null;
 // ...
 stream.Duplex.call(this, options);
 this.allowHalfOpen = Boolean(allowHalfOpen);
 if (options.handle) {
 this._handle = options.handle; // private
 this[async_id_symbol] = getNewAsyncId(this._handle);
 } else if (options.fd !== undefined) {
 const { fd } = options;
 this._handle = createHandle(fd, false);
 this._handle.open(fd);
 this[async_id_symbol] = this._handle.getAsyncId();
 // ...
 }
 initSocketHandle(this);
 // ...
 }
 // ...
}
util.inherits(Socket, stream.Duplex);
// ... createHandle 函数
function createHandle(fd, is_server) {
 const type = TTYWrap.guessHandleType(fd);
 // ...
 if (type === 'TCP') {
 return new TCP(
 is_server ? TCPConstants.SERVER : TCPConstants.SOCKET
 );
 }
 throw new ERR_INVALID_FD_TYPE(type);
}

通过上面这一段代码我们可以得到两个关键的信息:

  • this._handle = createHandle(fd, false);TCP的实例
  • stream.Duplex.call(this, options); socket的实例继承了stream.Duplex的属性和方法 接下来是connect:
Socket.prototype.connect = function(...args) {
 if (pipe) {
 // ...
 } else {
 lookupAndConnect(this, options);
 }
}
/// connect调用的核心函数
function internalConnect(
 self, address, port, addressType, localAddress, localPort) {
 if (addressType === 6 || addressType === 4) {
 const req = new TCPConnectWrap();
 req.oncomplete = afterConnect;
 req.address = address;
 req.port = port;
 req.localAddress = localAddress;
 req.localPort = localPort;
 if (addressType === 4)
 err = self._handle.connect(req, address, port);
 // ...
 } else {
 // ...
 }
}

里面需要注意的地方:

  • self._handle.connect(req, address, port);这一段是真正的实现socket连接的地方
  • req.oncomplete = afterConnect; 这一段是连接成功之后的回调

this._handle和’connect’事件

在上一节我们知道this._handleTCP的实例,进而我们长驱直入tcp_wrap.cc:

void TCPWrap::Initialize(Local<Object> target,
 Local<Value> unused,
 Local<Context> context) {
 // ...
 Local<FunctionTemplate> t = env->NewFunctionTemplate(New);
 Local<String> tcpString = FIXED_ONE_BYTE_STRING(env->isolate(), "TCP");
 t->SetClassName(tcpString);
 t->InstanceTemplate()->SetInternalFieldCount(1);
 t->InstanceTemplate()->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "reading"),
 Boolean::New(env->isolate(), false));
 t->InstanceTemplate()->Set(env->owner_string(), Null(env->isolate()));
 t->InstanceTemplate()->Set(env->onread_string(), Null(env->isolate()));
 t->InstanceTemplate()->Set(env->onconnection_string(), Null(env->isolate()));
 // ...
 env->SetProtoMethod(t, "connect", Connect);
 // ...
}
void TCPWrap::New(const FunctionCallbackInfo<Value>& args) {
 // ...
 Environment* env = Environment::GetCurrent(args);
 int type_value = args[0].As<Int32>()->Value();
 TCPWrap::SocketType type = static_cast<TCPWrap::SocketType>(type_value);
 ProviderType provider;
 switch (type) {
 case SOCKET:
 provider = PROVIDER_TCPWRAP;
 break;
 case SERVER:
 provider = PROVIDER_TCPSERVERWRAP;
 break;
 default:
 UNREACHABLE();
 }
 new TCPWrap(env, args.This(), provider);
}
TCPWrap::TCPWrap(Environment* env, Local<Object> object, ProviderType provider)
 : ConnectionWrap(env, object, provider) {
 int r = uv_tcp_init(env->event_loop(), &handle_);
 CHECK_EQ(r, 0); // How do we proxy this error up to javascript?
 // Suggestion: uv_tcp_init() returns void.
}

通过上面的代码,我们可以清晰地看到TCPWrap向js层抛出的构造函数TCP,重点注意下

env->SetProtoMethod(t, "connect", Connect);

这里对TCP的实例添加了connect方法,意即this._handle.connect调用的为TCPWrap::Connect:

void TCPWrap::Connect(const FunctionCallbackInfo<Value>& args) {
 // ...
 if (err == 0) {
 AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap);
 ConnectWrap* req_wrap =
 new ConnectWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_TCPCONNECTWRAP);
 err = req_wrap->Dispatch(uv_tcp_connect,
 &wrap->handle_,
 reinterpret_cast<const sockaddr*>(&addr),
 AfterConnect);
 // ...
 }
 args.GetReturnValue().Set(err);
}

通过这段代码可以清晰地看到,uv_tcp_connect的cb为AfterConnect,libuv中的逻辑就不做详细介绍了,有兴趣的可以观光一下tcp.cuv__tcp_connect函数,最终会走入到uv__io_poll的轮回中。我们重点看一下AfterConnect,代码在connection_wrap.cc中:

void ConnectionWrap<WrapType, UVType>::AfterConnect(uv_connect_t* req,
 int status) {
 // ...
 req_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv);
 delete req_wrap;
}

可以看到最终调用的函数为oncomplete,这时又回到了net.js中:

const req = new TCPConnectWrap();
req.oncomplete = afterConnect;

afterConnect的定义中有这样一句:

function afterConnect(status, handle, req, readable, writable) {
 // ...
 if (status === 0) {
 // ...
 self.emit('connect');
 self.emit('ready');
 }
}

最终在这里emit出了connect方法,即连接成功的回调,也就是net.createConnection(options[, connectListener])connectListener函数最终被emit触发的地方。

stream和’data’事件

不知道读者还有没有记得,文中的第一节曾分析过:

stream.Duplex.call(this, options); socket的实例继承了stream.Duplex的属性和方法

没错,'data’事件就是在stream.Duplex中emit的,在上一篇文章中我曾介绍过stream的写入过程,而在这里值stream的读取过程,刚才在分析TCPWrap的时候,在c++中构造js的TCP构造函数时候有这样一句:

t->InstanceTemplate()->Set(env->onread_string(), Null(env->isolate()));

由于socket继承了stream.Duplex,整个的溯源过程就不详细查找了,最终onread的触发在stream_base.cc中:

void StreamBase::CallJSOnreadMethod(ssize_t nread, Local<Object> buf) {
 Local<Value> argv[] = {
 Integer::New(env->isolate(), nread),
 buf
 };
 // ...
 wrap->MakeCallback(env->onread_string(), arraysize(argv), argv);
}

而在js中的触发则在net.js:

function onread(nread, buffer) {
 // ...
 if (nread > 0) {
 // ...
 var ret = self.push(buffer);
 // ...
 }
}

push方法则是从stream继承来的,视线转移到_stream_readable.js中:

Readable.prototype.push = function(chunk, encoding) {
 // ...
 return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);
}

readableAddChunk函数的作用是对流不断进行拼接并在过程中进行容错处理:

function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
 // ...
 if (addToFront) {
 if (state.endEmitted)
 stream.emit('error', new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
 else
 addChunk(stream, state, chunk, true);
 } else if (state.ended) {
 stream.emit('error', new ERR_STREAM_PUSH_AFTER_EOF());
 } else if (state.destroyed) {
 return false;
 } else {
 state.reading = false;
 if (state.decoder && !encoding) {
 chunk = state.decoder.write(chunk);
 if (state.objectMode || chunk.length !== 0)
 addChunk(stream, state, chunk, false);
 else
 maybeReadMore(stream, state);
 } else {
 addChunk(stream, state, chunk, false);
 }
}

在这里我只截取了关键代码,**可以看到在addChunk的同时,如果出错会立刻emit出’error’事件。在上游net.js中还有一些情况会emit 'error’事件,就不做详尽分析了。**接下来我们看下addChunk:

function addChunk(stream, state, chunk, addToFront) {
 if (state.flowing && state.length === 0 && !state.sync) {
 state.awaitDrain = 0;
 stream.emit('data', chunk);
 } else {
 // update the buffer info.
 state.length += state.objectMode ? 1 : chunk.length;
 if (addToFront)
 state.buffer.unshift(chunk);
 else
 state.buffer.push(chunk);
 if (state.needReadable)
 emitReadable(stream);
 }
 maybeReadMore(stream, state);
}

通过stream.emit('data', chunk);会emit出’data’事件,并且把chunk传入到参数中。

总结

至此,socket的’connect’、‘data’、'error’事件便分析完毕了。有意思的是,这三个事件的来源不尽相同:

  • 'connect’是通过TCPWrap类及一系列函数触发的emit
  • 'data’则是和StreamBase类相关
  • 'error’则相当于全程的兜底,不管在哪里出了问题,总会emit出’error’事件。

by 小菜
原文地址:https://github.com/xtx1130/blog/issues/26 欢迎watch和star。如果文中有表述错误的地方还请大神斧正

1 回复
回到顶部

AltStyle によって変換されたページ (->オリジナル) /