Home

Tornado源码分析笔记(2)

IOLoop就是 tornado 的事件循环,是一个全局的单例,通过IOLoop.current()IOLoop.instance()访问。IOLoop.current()内部调用的也是instance()方法。所以我们来看instance():

class IOLoop(Configurable):
 ...
 # Global lock for creating global IOLoop instance
 _instance_lock = threading.Lock()
 _current = threading.local()
 @staticmethod
 def instance():
 if not hasattr(IOLoop, "_instance"):
 with IOLoop._instance_lock:
 if not hasattr(IOLoop, "_instance"):
 # New instance after double check
 IOLoop._instance = IOLoop()
 return IOLoop._instance

IOLoop没有__init__()函数,所以我们要去看它的父类Configurable:

class Configurable(object):
 __impl_class = None
 __impl_kwargs = None
 def __new__(cls, *args, **kwargs):
 base = cls.configurable_base()
 init_kwargs = {}
 if cls is base:
 impl = cls.configured_class()
 if base.__impl_kwargs:
 init_kwargs.update(base.__impl_kwargs)
 else:
 impl = cls
 init_kwargs.update(kwargs)
 instance = super(Configurable, cls).__new__(impl)
 instance.initialize(*args, **init_kwargs)
 return instance 

IOLoop.confirable_base()返回的就是IOLoop自身,所以进入configured_class()

@classmethod
def configured_class(cls):
 """Returns the currently configured class."""
 base = cls.configurable_base()
 if cls.__impl_class is None:
 base.__impl_class = cls.configurable_default()
 return base.__impl_class

IOLoop.configured_class()返回的也是IOLoop这个类。

@classmethod
def configurable_default(cls):
 if hasattr(select, "epoll"):
 from tornado.platform.epoll import EPollIOLoop
 return EPollIOLoop
 if hasattr(select, "kqueue"):
 # Python 2.6+ on BSD or Mac
 from tornado.platform.kqueue import KQueueIOLoop
 return KQueueIOLoop
 from tornado.platform.select import SelectIOLoop
 return SelectIOLoop

在这里我们看到通过判断select模块有无 “epoll”,”kqueue” 属性在不同平台上实例化不同的IOLoop实例。EPollIOLoopKQueueIOLoopSelectIOLoop都继承于PollIOLoop并封装了统一的接口。在PollIOLoop中可以看到:

def add_handler(self, fd, handler, events):
 fd, obj = self.split_fd(fd)
 self._handlers[fd] = (obj, stack_context.wrap(handler))
 self._impl.register(fd, events | self.ERROR)
def update_handler(self, fd, events):
 fd, obj = self.split_fd(fd)
 self._impl.modify(fd, events | self.ERROR)
def remove_handler(self, fd):
 fd, obj = self.split_fd(fd)
 self._handlers.pop(fd, None)
 self._events.pop(fd, None)
 try:
 self._impl.unregister(fd)
 except Exception:
 gen_log.debug("Error deleting fd from IOLoop", exc_info=True)

如果要实现自己的 tornado 异步函数可以调用上面三个方法 在IOLoop中注册 fd 和 回调即可。 回到__new__函数,接下来就是创建实例instance然后初始化。 接下来我们到了重头戏——start()

start方法的实现在PollIOLoop,有近200行,不过可以分为几个部分

def start(self):
 # 初始化
 if self._running:
 raise RuntimeError("IOLoop is already running")
 self._setup_logging()
 if self._stopped:
 self._stopped = False
 return
 old_current = getattr(IOLoop._current, "instance", None)
 IOLoop._current.instance = self
 self._thread_ident = thread.get_ident()
 self._running = True
 ...
 try:
 # 下面就是事件循环
 while True:
 # Prevent IO event starvation by delaying new callbacks
 # to the next iteration of the event loop.
 with self._callback_lock:
 callbacks = self._callbacks
 self._callbacks = []
 # Add any timeouts that have come due to the callback
 due_timeouts = []
 if self._timeouts:
 now = self.time()
 while self._timeouts:
 if self._timeouts[0].callback is None:
 # The timeout was cancelled. Note that the
 # cancellation check is repeated below for timeouts
 # that are cancelled by another timeout or callback.
 heapq.heappop(self._timeouts)
 self._cancellations -= 1
 elif self._timeouts[0].deadline <= now:
 due_timeouts.append(heapq.heappop(self._timeouts))
 else:
 break
 if (self._cancellations > 512
 and self._cancellations > (len(self._timeouts) >> 1)):
 # Clean up the timeout queue when it gets large and it's
 # more than half cancellations.
 self._cancellations = 0
 self._timeouts = [x for x in self._timeouts
 if x.callback is not None]
 heapq.heapify(self._timeouts)
 # 使用self._run_callback来执行回调
 for callback in callbacks:
 self._run_callback(callback)
 # 执行所有到期的 timeout
 for timeout in due_timeouts:
 if timeout.callback is not None:
 self._run_callback(timeout.callback)
 # Closures may be holding on to a lot of memory, so allow
 # them to be freed before we go into our poll wait.
 callbacks = callback = due_timeouts = timeout = None
 ... 

在循环的开始 先执行self._callbacks列表里面的回调函数还有 到达deadlinetimeouttimeout类保存了一个self.deadlineself.callback并重载了__lt____le__方法,所以IOLoop._timeouts能使用优先队列heap来对它们排序,从而找出到期的timeout并执行它们的回调函数。 回到事件循环,执行完所有callbacksdue_timeouts后,poll终于出场了:

...
# 接上面
 if self._callbacks:
 # 如果在上面 self._callbacks 存在有callback,
 # poll 的等待时间设为0
 poll_timeout = 0.0
 # 如果在上面 self._timeouts 存在有timeout,
 # poll等待到最近timeout.deadline 
 elif self._timeouts:
 poll_timeout = self._timeouts[0].deadline - self.time()
 poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
 else:
 # No timeouts and no callbacks, so use the default.
 poll_timeout = _POLL_TIMEOUT
 if not self._running:
 break
 if self._blocking_signal_threshold is not None:
 # clear alarm so it doesn't fire while poll is waiting for
 # events.
 signal.setitimer(signal.ITIMER_REAL, 0, 0)
 try:
 # 终于 poll 了
 event_pairs = self._impl.poll(poll_timeout)
 except Exception as e:
 if errno_from_exception(e) == errno.EINTR:
 continue
 else:
 raise
 if self._blocking_signal_threshold is not None:
 signal.setitimer(signal.ITIMER_REAL,
 self._blocking_signal_threshold, 0)
 self._events.update(event_pairs)
 while self._events:
 fd, events = self._events.popitem()
 try:
 # 获取注册 fd 的回调并执行
 fd_obj, handler_func = self._handlers[fd]
 handler_func(fd_obj, events)
 except (OSError, IOError) as e:
 if errno_from_exception(e) == errno.EPIPE:
 # Happens when the client closes the connection
 pass
 else:
 self.handle_callback_exception(self._handlers.get(fd))
 except Exception:
 self.handle_callback_exception(self._handlers.get(fd))
 fd_obj = handler_func = None
finally:
 # reset the stopped flag so another start/stop pair can be issued
 self._stopped = False
 if self._blocking_signal_threshold is not None:
 signal.setitimer(signal.ITIMER_REAL, 0, 0)
 IOLoop._current.instance = old_current
 if old_wakeup_fd is not None:
 signal.set_wakeup_fd(old_wakeup_fd)

总结一下,tornado 的IOLoop是 基于poll的事件驱动模型IOLoop是单线程的,避免了多进程/多线程模型上下文切换的损耗,在 IO 密集的情况下有很好的性能。但如果在IOLoop循环中 callback 的执行时间太长会阻塞掉整个 IOLooop,造成性能的急剧下降,所以在实践中要十分的注意。

AltStyle によって変換されたページ (->オリジナル) /