浅谈 Thin 的事件驱动模型

{% include related/rack.md %}

在上一篇文章中我们已经介绍了 WEBrick 的实现,它的 handler 是写在 Rack 工程中的,而在这篇文章介绍的 webserver thin 的 Rack 处理器也是写在 Rack 中的;与 WEBrick 相同,Thin 的实现也非常简单,官方对它的介绍是:

A very fast & simple Ruby web server.

它将 MongrelEventMachineRack 三者进行组合,在其中起到胶水的作用,所以在理解 Thin 的实现的过程中我们也需要分析 EventMachine 到底是如何工作的。

Thin 的实现

在这一节中我们将从源代码来分析介绍 Thin 的实现原理,因为部分代码仍然是在 Rack 工程中实现的,所以我们要从 Rack 工程的代码开始理解 Thin 的实现。

从 Rack 开始

Thin 的处理器 Rack::Handler::Thin 与其他遵循 Rack 协议的 webserver 一样都实现了 .run 方法,接受 Rack 应用和 options 作为输入:

module Rack
 module Handler
 class Thin
 def self.run(app, options={})
 environment = ENV['RACK_ENV'] || 'development'
 default_host = environment == 'development' ? 'localhost' : '0.0.0.0'
 host = options.delete(:Host) || default_host
 port = options.delete(:Port) || 8080
 args = [host, port, app, options]
 args.pop if ::Thin::VERSION::MAJOR < 1 && ::Thin::VERSION::MINOR < 8
 server = ::Thin::Server.new(*args)
 yield server if block_given?
 server.start
 end
 end
 end
end

上述方法仍然会从 options 中取出 ip 地址和端口号,然后初始化一个 Thin::Server 的实例后,执行 #start 方法在 8080 端口上监听来自用户的请求。

初始化服务

Thin 服务的初始化由以下的代码来处理,首先会处理在 Rack::Handler::Thin.run 中传入的几个参数 hostportappoptions,将 Rack 应用存储在临时变量中:

From: lib/thin/server.rb @ line 100:
Owner: Thin::Server
def initialize(*args, &block)
 host, port, options = DEFAULT_HOST, DEFAULT_PORT, {}
 args.each do |arg|
 case arg
 when 0.class, /^\d+$/ then port = arg.to_i
 when String then host = arg
 when Hash then options = arg
 else
 @app = arg if arg.respond_to?(:call)
 end
 end
 @backend = select_backend(host, port, options)
 @backend.server = self
 @backend.maximum_connections = DEFAULT_MAXIMUM_CONNECTIONS
 @backend.maximum_persistent_connections = DEFAULT_MAXIMUM_PERSISTENT_CONNECTIONS
 @backend.timeout = options[:timeout] || DEFAULT_TIMEOUT
 @app = Rack::Builder.new(&block).to_app if block
end

在初始化服务的过程中,总共只做了三件事情,处理参数、选择并配置 backend,创建新的应用:

thin-initialize-serve

处理参数的过程自然不用多说,只是这里判断的方式并不是按照顺序处理的,而是按照参数的类型;在初始化器的最后,如果向初始化器传入了 block,那么就会使用 Rack::Builder 和 block 中的代码初始化一个新的 Rack 应用。

选择后端

在选择后端时 Thin 使用了 #select_backend 方法,这里使用 case 语句替代多个 ifelse,也是一个我们可以使用的小技巧:

From: lib/thin/server.rb @ line 261:
Owner: Thin::Server
def select_backend(host, port, options)
 case
 when options.has_key?(:backend)
 raise ArgumentError, ":backend must be a class" unless options[:backend].is_a?(Class)
 options[:backend].new(host, port, options)
 when options.has_key?(:swiftiply)
 Backends::SwiftiplyClient.new(host, port, options)
 when host.include?('/')
 Backends::UnixServer.new(host)
 else
 Backends::TcpServer.new(host, port)
 end
end

在大多数时候,我们只会选择 UnixServerTcpServer 两种后端中的一个,而后者又是两者中使用更为频繁的后端:

From: lib/thin/backends/tcp_server.rb @ line 8:
Owner: Thin::Backends::TcpServer
def initialize(host, port)
 @host = host
 @port = port
 super()
end
From: lib/thin/backends/base.rb @ line 47:
Owner: Thin::Backends::Base
def initialize
 @connections = {}
 @timeout = Server::DEFAULT_TIMEOUT
 @persistent_connection_count = 0
 @maximum_connections = Server::DEFAULT_MAXIMUM_CONNECTIONS
 @maximum_persistent_connections = Server::DEFAULT_MAXIMUM_PERSISTENT_CONNECTIONS
 @no_epoll = false
 @ssl = nil
 @threaded = nil
 @started_reactor = false
end

初始化的过程中只是对属性设置默认值,比如 hostport 以及超时时间等等,并没有太多值得注意的代码。

启动服务

在启动服务时会直接调用 TcpServer#start 方法并在其中传入一个用于处理信号的 block:

From: lib/thin/server.rb @ line 152:
Owner: Thin::Server
def start
 raise ArgumentError, 'app required' unless @app
 log_info "Thin web server (v#{VERSION::STRING} codename #{VERSION::CODENAME})"
 log_debug "Debugging ON"
 trace "Tracing ON"
 log_info "Maximum connections set to #{@backend.maximum_connections}"
 log_info "Listening on #{@backend}, CTRL+C to stop"
 @backend.start { setup_signals if @setup_signals }
end

虽然这里的 backend 其实已经被选择成了 TcpServer,但是该子类并没有覆写 #start 方法,这里执行的方法其实是从父类继承的:

From: lib/thin/backends/base.rb @ line 60:
Owner: Thin::Backends::Base
def start
 @stopping = false
 starter = proc do
 connect
 yield if block_given?
 @running = true
 end
 # Allow for early run up of eventmachine.
 if EventMachine.reactor_running?
 starter.call
 else
 @started_reactor = true
 EventMachine.run(&starter)
 end
end

上述方法在构建一个 starter block 之后,将该 block 传入 EventMachine.run 方法,随后执行的 #connect 会启动一个 EventMachine 的服务器用于处理用户的网络请求:

From: lib/thin/backends/tcp_server.rb @ line 15:
Owner: Thin::Backends::TcpServer
def connect
 @signature = EventMachine.start_server(@host, @port, Connection, &method(:initialize_connection))
 binary_name = EventMachine.get_sockname( @signature )
 port_name = Socket.unpack_sockaddr_in( binary_name )
 @port = port_name[0]
 @host = port_name[1]
 @signature
end

在 EventMachine 的文档中,.start_server 方法被描述成一个在指定的地址和端口上初始化 TCP 服务的方法,正如这里所展示的,它经常在 .run 方法的 block 中执行;该方法的参数 Connection 作为处理 TCP 请求的类,会实现不同的方法接受各种各样的回调,传入的 initialize_connection block 会在有请求需要处理时对 Connection 对象进行初始化:

Connection 对象继承自 EventMachine::Connection,是 EventMachine 与外界的接口,在 EventMachine 中的大部分事件都会调用 Connection 的一个实例方法来传递数据和参数。

From: lib/thin/backends/base.rb @ line 145:
Owner: Thin::Backends::Base
def initialize_connection(connection)
 connection.backend = self
 connection.app = @server.app
 connection.comm_inactivity_timeout = @timeout
 connection.threaded = @threaded
 connection.start_tls(@ssl_options) if @ssl
 if @persistent_connection_count < @maximum_persistent_connections
 connection.can_persist!
 @persistent_connection_count += 1
 end
 @connections[connection.__id__] = connection
end

处理请求的连接

Connection 类中有很多的方法 #post_init#receive_data 方法等等都是由 EventMachine 在接收到请求时调用的,当 Thin 的服务接收到来自客户端的数据时就会调用 #receive_data 方法:

From: lib/thin/connection.rb @ line 36:
Owner: Thin::Connection
def receive_data(data)
 @idle = false
 trace data
 process if @request.parse(data)
rescue InvalidRequest => e
 log_error("Invalid request", e)
 post_process Response::BAD_REQUEST
end

在这里我们看到了与 WEBrick 在处理来自客户端的原始数据时使用的方法 #parse,它会解析客户端请求的原始数据并执行 #process 来处理 HTTP 请求:

From: lib/thin/connection.rb @ line 47:
Owner: Thin::Connection
def process
 if threaded?
 @request.threaded = true
 EventMachine.defer { post_process(pre_process) }
 else
 @request.threaded = false
 post_process(pre_process)
 end
end

如果当前的连接允许并行处理多个用户的请求,那么就会在 EventMachine.defer 的 block 中执行两个方法 #pre_process#post_process:

From: lib/thin/connection.rb @ line 63:
Owner: Thin::Connection
def pre_process
 @request.remote_address = remote_address
 @request.async_callback = method(:post_process)
 response = AsyncResponse
 catch(:async) do
 response = @app.call(@request.env)
 end
 response
rescue Exception => e
 unexpected_error(e)
 can_persist? && @request.persistent? ? Response::PERSISTENT_ERROR : Response::ERROR
end

#pre_process 中没有做太多的事情,只是调用了 Rack 应用的 #call 方法,得到一个三元组 response,在这之后将这个数组传入 #post_process 方法:

From: lib/thin/connection.rb @ line 95:
Owner: Thin::Connection
def post_process(result)
 return unless result
 result = result.to_a
 return if result.first == AsyncResponse.first
 @response.status, @response.headers, @response.body = *result
 @response.each do |chunk|
 send_data chunk
 end
rescue Exception => e
 unexpected_error(e)
 close_connection
ensure
 if @response.body.respond_to?(:callback) && @response.body.respond_to?(:errback)
 @response.body.callback { terminate_request }
 @response.body.errback { terminate_request }
 else
 terminate_request unless result && result.first == AsyncResponse.first
 end
end

#post_response 方法将传入的数组赋值给 responsestatusheadersbody 这三部分,在这之后通过 #send_data 方法将 HTTP 响应以块的形式写回 Socket;写回结束后可能会调用对应的 callback 并关闭持有的 requestresponse 两个实例变量。

上述方法中调用的 #send_data 继承自 EventMachine::Connection 类。

小结

到此为止,我们对于 Thin 是如何处理来自用户的 HTTP 请求的就比较清楚了,我们可以看到 Thin 本身并没有做一些类似解析 HTTP 数据包以及发送数据的问题,它使用了来自 Rack 和 EventMachine 两个开源框架中很多已有的代码逻辑,确实只做了一些胶水的事情。

对于 Rack 是如何工作的我们在前面的文章 谈谈 Rack 协议与实现 中已经介绍过了;虽然我们看到了很多与 EventMachine 相关的代码,但是到这里我们仍然对 EventMachine 不是太了解。

EventMachine 和 Reactor 模式

为了更好地理解 Thin 的工作原理,在这里我们会介绍一个 EventMachine 和 Reactor 模式。

EventMachine 其实是一个使用 Ruby 实现的事件驱动的并行框架,它使用 Reactor 模式提供了事件驱动的 IO 模型,如果你对 Node.js 有所了解的话,那么你一定对事件驱动这个词并不陌生,EventMachine 的出现主要是为了解决两个核心问题:

  • 为生产环境提供更高的可伸缩性、更好的性能和稳定性;
  • 为上层提供了一些能够减少高性能的网络编程复杂性的 API;

其实 EventMachine 的主要作用就是将所有同步的 IO 都变成异步的,调度都通过事件来进行,这样用于监听用户请求的进程不会被其他代码阻塞,能够同时为更多的客户端提供服务;在这一节中,我们需要了解一下在 Thin 中使用的 EventMachine 中几个常用方法的实现。

启动事件循环

EventMachine 其实就是一个事件循环(Event Loop),当我们想使用 EventMachine 来处理某些任务时就一定需要调用 .run 方法启动这个事件循环来接受外界触发的各种事件:

From: lib/eventmachine.rb @ line 149:
Owner: #<Class:EventMachine>
def self.run blk=nil, tail=nil, &block
 # ...
 begin
 @reactor_pid = Process.pid
 @reactor_running = true
 initialize_event_machine
 (b = blk || block) and add_timer(0, b)
 if @next_tick_queue && !@next_tick_queue.empty?
 add_timer(0) { signal_loopbreak }
 end
 @reactor_thread = Thread.current
 run_machine
 ensure
 until @tails.empty?
 @tails.pop.call
 end
 release_machine
 cleanup_machine
 @reactor_running = false
 @reactor_thread = nil
 end
end

在这里我们会使用 .initialize_event_machine 初始化当前的事件循环,其实也就是一个全局的 Reactor 的单例,最终会执行 Reactor#initialize_for_run 方法:

From: lib/em/pure_ruby.rb @ line 522:
Owner: EventMachine::Reactor
def initialize_for_run
 @running = false
 @stop_scheduled = false
 @selectables ||= {}; @selectables.clear
 @timers = SortedSet.new # []
 set_timer_quantum(0.1)
 @current_loop_time = Time.now
 @next_heartbeat = @current_loop_time + HeartbeatInterval
end

在启动事件循环的过程中,它还会将传入的 block 与一个 interval 为 0 的键组成键值对存到 @timers 字典中,所有加入的键值对都会在大约 interval 的时间过后执行一次 block。

随后执行的 #run_machine 在最后也会执行 Reactor#run 方法,该方法中包含一个 loop 语句,也就是我们一直说的事件循环:

From: lib/em/pure_ruby.rb @ line 540:
Owner: EventMachine::Reactor
def run
 raise Error.new( "already running" ) if @running
 @running = true
 begin
 open_loopbreaker
 loop {
 @current_loop_time = Time.now
 break if @stop_scheduled
 run_timers
 break if @stop_scheduled
 crank_selectables
 break if @stop_scheduled
 run_heartbeats
 }
 ensure
 close_loopbreaker
 @selectables.each {|k, io| io.close}
 @selectables.clear
 @running = false
 end
end

在启动事件循环之间会在 #open_loopbreaker 中创建一个 LoopbreakReader 的实例绑定在 127.0.0.1 和随机的端口号组成的地址上,然后开始运行事件循环。

reactor-eventloop

在事件循环中,Reactor 总共需要执行三部分的任务,分别是执行定时器、处理 Socket 上的事件以及运行心跳方法。

无论是运行定时器还是执行心跳方法其实都非常简单,只要与当前时间进行比较,如果到了触发的时间就调用正确的方法或者回调,最后的 #crank_selectables 方法就是用于处理 Socket 上读写事件的方法了:

From: lib/em/pure_ruby.rb @ line 540:
Owner: EventMachine::Reactor
def crank_selectables
 readers = @selectables.values.select { |io| io.select_for_reading? }
 writers = @selectables.values.select { |io| io.select_for_writing? }
 s = select(readers, writers, nil, @timer_quantum)
 s and s[1] and s[1].each { |w| w.eventable_write }
 s and s[0] and s[0].each { |r| r.eventable_read }
 @selectables.delete_if {|k,io|
 if io.close_scheduled?
 io.close
 begin
 EventMachine::event_callback io.uuid, ConnectionUnbound, nil
 rescue ConnectionNotBound; end
 true
 end
 }
end

上述方法会在 Socket 变成可读或者可写时执行 #eventable_write#eventable_read 执行事件的回调,我们暂时放下这两个方法,先来了解一下 EventMachine 是如何启动服务的。

启动服务

在启动服务的过程中,最重要的目的就是创建一个 Socket 并绑定在指定的 ip 和端口上,在实现这个目的的过程中,我们使用了以下的几个方法,首先是 EventMachine.start_server:

From: lib/eventmachine.rb @ line 516:
Owner: #<Class:EventMachine>
def self.start_server server, port=nil, handler=nil, *args, &block
 port = Integer(port)
 klass = klass_from_handler(Connection, handler, *args)
 s = if port
 start_tcp_server server, port
 else
 start_unix_server server
 end
 @acceptors[s] = [klass, args, block]
 s
end

该方法其实使我们在使用 EventMachine 时常见的接口,只要我们想要启动一个新的 TCP 或者 UNIX 服务器,就可以上述方法,在这里会根据端口号是否存在,选择执行 .start_tcp_server 或者 .start_unix_server 创建一个新的 Socket 并存储在 @acceptors 中:

From: lib/em/pure_ruby.rb @ line 184:
Owner: #<Class:EventMachine>
def self.start_tcp_server host, port
 (s = EvmaTCPServer.start_server host, port) or raise "no acceptor"
 s.uuid
end

EventMachine.start_tcp_server 在这里也只做了个『转发』方法的作用的,直接调用 EvmaTCPServer.start_server 创建一个新的 Socket 对象并绑定到传入的 <host, port> 上:

From: lib/em/pure_ruby.rb @ line 1108:
Owner: #<Class:EventMachine::EvmaTCPServer>
def self.start_server host, port
 sd = Socket.new( Socket::AF_LOCAL, Socket::SOCK_STREAM, 0 )
 sd.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEADDR, true )
 sd.bind( Socket.pack_sockaddr_in( port, host ))
 sd.listen( 50 ) # 5 is what you see in all the books. Ain't enough.
 EvmaTCPServer.new sd
end

方法的最后会创建一个新的 EvmaTCPServer 实例的过程中,我们需要通过 #fcntl 将 Socket 变成非阻塞式的:

From: lib/em/pure_ruby.rb @ line 687:
Owner: EventMachine::Selectable
def initialize io
 @io = io
 @uuid = UuidGenerator.generate
 @is_server = false
 @last_activity = Reactor.instance.current_loop_time
 m = @io.fcntl(Fcntl::F_GETFL, 0)
 @io.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK | m)
 @close_scheduled = false
 @close_requested = false
 se = self; @io.instance_eval { @my_selectable = se }
 Reactor.instance.add_selectable @io
end

不只是 EvmaTCPServer,所有的 Selectable 子类在初始化的最后都会将新的 Socket 以 uuid 为键存储到 Reactor 单例对象的 @selectables 字典中:

From: lib/em/pure_ruby.rb @ line 532:
Owner: EventMachine::Reactor
def add_selectable io
 @selectables[io.uuid] = io
end

在整个事件循环的大循环中,这里存入的所有 Socket 都会被 #select 方法监听,在响应的事件发生时交给合适的回调处理,作者在 Redis 中的事件循环 一文中也介绍过非常相似的处理过程。

eventmachine-select

所有的 Socket 都会存储在一个 @selectables 的哈希中并由 #select 方法监听所有的读写事件,一旦相应的事件触发就会通过 eventable_read 或者 eventable_write 方法来响应该事件。

处理读写事件

所有的读写事件都是通过 Selectable 和它的子类来处理的,在 EventMachine 中,总共有以下的几种子类:

selectable-and-subclasses

所有处理服务端读写事件的都是 Selectable 的子类,也就是 EvmaTCPServerEvmaUNIXServer,而所有处理客户端读写事件的都是 StreamObject 的子类 EvmaTCPServerEvmaUNIXClient

当我们初始化的绑定在 <host, port> 上的 Socket 对象监听到了来自用户的 TCP 请求时,当前的 Socket 就会变得可读,事件循环中的 #select 方法就会调用 EvmaTCPClient#eventable_read 通知由一个请求需要处理:

From: lib/em/pure_ruby.rb @ line 1130:
Owner: EventMachine::EvmaTCPServer
def eventable_read
 begin
 10.times {
 descriptor, peername = io.accept_nonblock
 sd = EvmaTCPClient.new descriptor
 sd.is_server = true
 EventMachine::event_callback uuid, ConnectionAccepted, sd.uuid
 }
 rescue Errno::EWOULDBLOCK, Errno::EAGAIN
 end
end

在这里会尝试多次 #accept_non_block 当前的 Socket 并会创建一个 TCP 的客户端对象 EvmaTCPClient,同时通过 .event_callback 方法发送 ConnectionAccepted 消息。

EventMachine::event_callback 就像是一个用于处理所有事件的中心方法,所有的回调都要通过这个中继器进行调度,在实现上就是一个庞大的 ifelse 语句,里面处理了 EventMachine 中可能出现的 10 种状态和操作:

event-callback

大多数事件在触发时,都会从 @conns 中取出相应的 Connection 对象,最后执行合适的方法来处理,而这里触发的 ConnectionAccepted 事件是通过以下的代码来处理的:

From: lib/eventmachine.rb @ line 1462:
Owner: #<Class:EventMachine>
def self.event_callback conn_binding, opcode, data
 if opcode == # ...
 # ...
 elsif opcode == ConnectionAccepted
 accep, args, blk = @acceptors[conn_binding]
 raise NoHandlerForAcceptedConnection unless accep
 c = accep.new data, *args
 @conns[data] = c
 blk and blk.call(c)
 c
 else
 # ...
 end
end

上述的 accep 变量就是我们在 Thin 调用 .start_server 时传入的 Connection 类,在这里我们初始化了一个新的实例,同时以 Socket 的 uuid 作为键存到 @conns 中。

在这之后 #select 方法就会监听更多 Socket 上的事件了,当这个 “accept” 后创建的 Socket 接收到数据时,就会触发下面的 #eventable_read 方法:

From: lib/em/pure_ruby.rb @ line 1130:
Owner: EventMachine::StreamObject
def eventable_read
 @last_activity = Reactor.instance.current_loop_time
 begin
 if io.respond_to?(:read_nonblock)
 10.times {
 data = io.read_nonblock(4096)
 EventMachine::event_callback uuid, ConnectionData, data
 }
 else
 data = io.sysread(4096)
 EventMachine::event_callback uuid, ConnectionData, data
 end
 rescue Errno::EAGAIN, Errno::EWOULDBLOCK, SSLConnectionWaitReadable
 rescue Errno::ECONNRESET, Errno::ECONNREFUSED, EOFError, Errno::EPIPE, OpenSSL::SSL::SSLError
 @close_scheduled = true
 EventMachine::event_callback uuid, ConnectionUnbound, nil
 end
end

方法会从 Socket 中读取数据并通过 .event_callback 发送 ConnectionData 事件:

From: lib/eventmachine.rb @ line 1462:
Owner: #<Class:EventMachine>
def self.event_callback conn_binding, opcode, data
 if opcode == # ...
 # ...
 elsif opcode == ConnectionData
 c = @conns[conn_binding] or raise ConnectionNotBound, "received data #{data} for unknown signature: #{conn_binding}"
 c.receive_data data
 else
 # ...
 end
end

从上述方法对 ConnectionData 事件的处理就可以看到通过传入 Socket 的 uuid 和数据,就可以找到上面初始化的 Connection 对象,#receive_data 方法就能够将数据传递到上层,让用户在自定义的 Connection 中实现自己的处理逻辑,这也就是 Thin 需要覆写 #receive_data 方法来接受数据的原因了。

当 Thin 以及 Rack 应用已经接收到了来自用户的请求、完成处理并返回之后经过一系列复杂的调用栈就会执行 Connection#send_data 方法:

From: lib/em/connection.rb @ line 324:
Owner: EventMachine::Connection
def send_data data
 data = data.to_s
 size = data.bytesize if data.respond_to?(:bytesize)
 size ||= data.size
 EventMachine::send_data @signature, data, size
end
From: lib/em/pure_ruby.rb @ line 172:
Owner: #<Class:EventMachine>
def self.send_data target, data, datalength
 selectable = Reactor.instance.get_selectable( target ) or raise "unknown send_data target"
 selectable.send_data data
end
From: lib/em/pure_ruby.rb @ line 851:
Owner: EventMachine::StreamObject
def send_data data
 unless @close_scheduled or @close_requested or !data or data.length <= 0
 @outbound_q << data.to_s
 end
end

经过一系列同名方法的调用,在调用栈末尾的 StreamObject#send_data 中,将所有需要写入的数据全部加入 @outbound_q 中,这其实就是一个待写入数据的队列。

当 Socket 变得可写之后,就会由 #select 方法触发 #eventable_write@outbound_q 队列中的数据通过 #write_nonblock 或者 syswrite 写入 Socket,也就是将请求返回给客户端。

From: lib/em/pure_ruby.rb @ line 823:
Owner: EventMachine::StreamObject
def eventable_write
 @last_activity = Reactor.instance.current_loop_time
 while data = @outbound_q.shift do
 begin
 data = data.to_s
 w = if io.respond_to?(:write_nonblock)
 io.write_nonblock data
 else
 io.syswrite data
 end
 if w < data.length
 @outbound_q.unshift data[w..-1]
 break
 end
 rescue Errno::EAGAIN, SSLConnectionWaitReadable, SSLConnectionWaitWritable
 @outbound_q.unshift data
 break
 rescue EOFError, Errno::ECONNRESET, Errno::ECONNREFUSED, Errno::EPIPE, OpenSSL::SSL::SSLError
 @close_scheduled = true
 @outbound_q.clear
 end
 end
end

关闭 Socket

当数据写入时发生了 EOFError 或者其他错误时就会将 close_scheduled 标记为 true,在随后的事件循环中会关闭 Socket 并发送 ConnectionUnbound 事件:

From: lib/em/pure_ruby.rb @ line 540:
Owner: EventMachine::Reactor
def crank_selectables
 # ...
 @selectables.delete_if {|k,io|
 if io.close_scheduled?
 io.close
 begin
 EventMachine::event_callback io.uuid, ConnectionUnbound, nil
 rescue ConnectionNotBound; end
 true
 end
 }
end

.event_callback 在处理 ConnectionUnbound 事件时会在 @conns 中将结束的 Connection 剔除:

def self.event_callback conn_binding, opcode, data
 if opcode == ConnectionUnbound
 if c = @conns.delete( conn_binding )
 c.unbind
 io = c.instance_variable_get(:@io)
 begin
 io.close
 rescue Errno::EBADF, IOError
 end
 elsif c = @acceptors.delete( conn_binding )
 else
 raise ConnectionNotBound, "received ConnectionUnbound for an unknown signature: #{conn_binding}"
 end
 elsif opcode = 1
 #...
 end
end

在这之后会调用 Connection#unbind 方法,再次执行 #close 确保 Socket 连接已经断掉了。

小结

EventMachine 在处理用户的请求时,会通过一个事件循环和一个中心化的事件处理中心 .event_callback 来响应所有的事件,你可以看到在使用 EventMachine 时所有的响应都是异步的,尤其是对 Socket 的读写,所有外部的输入在 EventMachine 看来都是一个事件,它们会被 EventMachine 选择合适的处理器进行转发。

I/O 模型

Thin 本身其实没有实现任何的 I/O 模型,它通过对 EventMachine 进行封装,使用了其事件驱动的特点,为上层提供了处理并发 I/O 的 Reactor 模型,在不同的阶段有着不同的工作流程,在启动 Thin 的服务时,Thin 会直接通过 .start_server 创建一个 Socket 监听一个 <host, port> 组成的元组:

thin-start-server

当服务启动之后,就可以接受来自客户端的 HTTP 请求了,处理 HTTP 请求总共需要三个模块的合作,分别是 EventMachine、Thin 以及 Rack 应用:

thin-handle-request

在上图中省略了 Rack 的处理部分,不过对于其他部分的展示还是比较详细的,EventMachine 负责对 TCP Socket 进行监听,在发生事件时通过 .event_callback 进行处理,将消息转发给位于 Thin 中的 Connection,该类以及模块负责处理 HTTP 协议相关的内容,将整个请求包装成一个 env 对象,调用 #call 方法。

在这时就开始了返回响应的逻辑了,#call 方法会返回一个三元组,经过 Thin 中的 #send_data 最终将数据写入 outbound_q 队列中等待处理:

thin-send-response

EventMachine 会通过一个事件循环,使用 #select 监听当前 Socket 的可读写状态,并在合适的时候触发 #eventable_writeoutbound_q 队列中读取数据写入 Socket,在写入结束后 Socket 就会被关闭,整个请求的响应也就结束了。

thin-io-model

Thin 使用了 EventMachine 作为底层处理 TCP 协议的框架,提供了事件驱动的 I/O 模型,也就是我们理解的 Reactor 模型,对于每一个 HTTP 请求都会创建一个对应的 Connection 对象,所有的事件都由 EventMachine 来派发,最大程度做到了 I/O 的读写都是异步的,不会阻塞当前的线程,这也是 Thin 以及 Node.js 能够并发处理大量请求的原因。

总结

Thin 作为一个 Ruby 社区中简单的 webserver,其实本身没有做太多的事情,只是使用了 EventMachine 提供的事件驱动的 I/O 模型,为上层提供了更加易用的 API,相比于其他同步处理请求的 webserver,Reactor 模式的优点就是 Thin 的优点,主程序只负责监听事件和分发事件,一旦涉及到 I/O 的工作都尽量使用回调的方式处理,当回调完成后再发送通知,这种方式能够减少进程的等待时间,时刻都在处理用户的请求和事件。

相关文章

{% include related/rack.md %}

Reference

wechat-account-qrcode

转载申请

知识共享许可协议
本作品采用知识共享署名 4.0 国际许可协议进行许可,转载时请注明原文链接,图片在使用时请保留全部内容,可适当缩放并在引用处附上图片所在的文章链接。

Go 语言设计与实现

各位读者朋友,很高兴大家通过本博客学习 Go 语言,感谢一路相伴! 《Go语言设计与实现》 的纸质版图书已经上架京东,本书目前已经四印,印数超过 10,000 册,有需要的朋友请点击 链接 或者下面的图片购买。

golang-book-intro

文章图片

你可以在 技术文章配图指南 中找到画图的方法和素材。