PyParallel alpha

PyParallel

An experimental, proof-of-concept fork of Python 3
designed to optimally exploit multiple CPU cores, fast
SSDs, NUMA architectures and 10Gb+ Ethernet networks.

Removes the limitation of the Global Interpreter Lock (GIL)
without needing to remove it at all.

Async I/O meets multicore.

PyParallel combines the concurrency benefits afforded by single-threaded async I/O
architectures with the performance benefits afforded by simultaneous multi-threading.

Scales linearly with cores and I/O bandwidth.

PyParallel exhibits excellent linear scaling across all cores when sufficient client
load can be generated, maintaining low-latency and high-throughput under load.

Includes PyParallel-compatible versions of NumPy, datrie, pyodbc, IPython, IPython Notebook, Pandas, Cython and many more.

Load large NumPy arrays or datrie tries in main memory once, then access them
efficiently from multiple cores.
Or, connect to an ODBC database and access data in parallel.

Ultra-low latency, high concurrency, maximal throughput.

All delivered from a single Python process that dynamically adjusts its
behavior to maximise the performance of underlying hardware.

[フレーム]

Performance

Linear Scaling

PyParallel's throughput (HTTP requests per second) scales linearly as concurrency
increases, indicating optimal use of multiple cores. Additionally, percentile
latencies show very desirable properties across the entire range.

Benchmarks

PyParallel fares well against the competition. In the following benchmark,
PyParallel (Fast) leverages the new SSE-accelerated HTTP parser and
other optimizations, whereas PyParallel (Slow) uses HTTP parser written
entirely in Python. A concurrency of 8 was used for the test. All times are
in microseconds.

Presentations

For more information about how PyParallel was implemented, see the following presentations.

Parallelizing the Python Interpreter: An alternate approach to async

PyParallel: How we removed the GIL and exploited all cores (without needing to remove the GIL at all)

Parallelism and Concurrency with Python

March, 2013
(35 pages)

November, 2013
(153 pages)

May, 2014
(41 pages)

A short presentation given to Python core developers at PyCon in March, 2013.

The first major public announcement of PyParallel, delivered at PyData NYC in November 2013. Deep-dive into the background of PyParallel, with focus on asynchronous I/O and event-driven socket servers.

A higher-level look at the notions of parallelism and concurrency within the Python ecosystem. Delivered at Bank of America developer summit day in NYC, May 2014.

Examples

Some real-life examples of PyParallel.

  • Plaintext (Fast)
  • Plaintext (Slow)
  • TEFB
  • Parallelopedia

# PyParallel (Fast)
# (Uses picohttpparser's SSE-accelerated HTTP parser.)
import parallel
class Plaintext:
 http11 = True
 def plaintext(self, transport, data):
 return b'Hello, World!'
server = parallel.server('0.0.0.0', 8080)
parallel.register(transport=server, protocol=Plaintext)
parallel.run()

# PyParallel (Slow)
# (Uses Python-based HTTP parser)
import parallel
from parallel.http.server import (
 router,
 make_routes,
 text_response,
 HttpServer,
)
routes = make_routes()
route = router(routes)
class Plaintext(HttpServer):
 @route
 def plaintext(self, request):
 return text_response(request, text='Hello, World!')
server = parallel.server('0.0.0.0', 8080)
parallel.register(transport=server, protocol=Plaintext)
parallel.run()

# Implementation of entire TEFB suite in pure Python.
#===============================================================================
# Imports
#===============================================================================
import sys
import json
import pyodbc
import parallel
from ctk.util import (
 try_int,
 Dict,
)
from parallel import (
 rdtsc,
 sys_stats,
 socket_stats,
 memory_stats,
 context_stats,
 enable_heap_override,
 disable_heap_override,
 call_from_main_thread,
 call_from_main_thread_and_wait,
)
from parallel.http.server import (
 quote_html,
 router,
 make_routes,
 text_response,
 html_response,
 json_serialization,
 Request,
 HttpServer,
)
# It can be a pain setting up the environment to load the debug version of
# numpy, so, if we can't import it, just default to normal Python random.
np = None
if not sys.executable.endswith('_d.exe'):
 try:
 import numpy as np
 randint = np.random.randint
 randints = lambda size: randint(0, high=10000, size=size)
 randints2d = lambda size: randint(0, high=10000, size=(2, size))
 print("Using NumPy random facilities.")
 except ImportError:
 pass
if not np:
 import random
 randint = random.randint
 randints = lambda size: [ randint(1, 10000) for _ in range(0, size) ]
 randints2d = lambda size: [
 (x, y) for (x, y) in zip(randints(size), randints(size))
 ]
 print("Couldn't load NumPy; reverting to normal CPython random facilities.")
#===============================================================================
# Aliases
#===============================================================================
#===============================================================================
# Globals/Templates
#===============================================================================
localhost_connect_string = (
 'Driver={SQL Server};'
 'Server=localhost;'
 'Database=hello_world;'
 'Uid=benchmarkdbuser;'
 'Pwd=B3nchmarkDBPass;'
)
#===============================================================================
# Helpers
#===============================================================================
#===============================================================================
# Classes
#===============================================================================
class Fortune:
 _bytes = [
 (b'fortune: No such file or directory', 1),
 (b"A computer scientist is someone who "
 b"fixes things that aren't broken.", 2),
 (b'After enough decimal places, nobody gives a damn.', 3),
 (b'A bad random number generator: 1, 1, 1, 1, '
 b'1, 4.33e+67, 1, 1, 1', 4),
 (b'A computer program does what you tell it to do, '
 b'not what you want it to do.', 5),
 (b'Emacs is a nice operating system, but I prefer UNIX. '
 b'\xe2\x80\x94 Tom Christaensen', 6),
 (b'Any program that runs right is obsolete.', 7),
 (b'A list is only as strong as its weakest link. '
 b'\xe2\x80\x94 Donald Knuth', 8),
 (b'Feature: A bug with seniority.', 9),
 (b'Computers make very fast, very accurate mistakes.', 10),
 (b'<script>alert("This should not be '
 b'displayed in a browser alert box.");</script>', 11),
 (b'\xe3\x83\x95\xe3\x83\xac\xe3\x83\xbc\xe3\x83\xa0'
 b'\xe3\x83\xaf\xe3\x83\xbc\xe3\x82\xaf\xe3\x81\xae'
 b'\xe3\x83\x99\xe3\x83\xb3\xe3\x83\x81\xe3\x83\x9e'
 b'\xe3\x83\xbc\xe3\x82\xaf', 12),
 ]
 fortunes = [ (r[0].decode('utf-8'), r[1]) for r in _bytes ]
 header = (
 '<!DOCTYPE html>'
 '<html>'
 '<head>'
 '<title>Fortunes</title>'
 '</head>'
 '<body>'
 '<table>'
 '<tr>'
 '<th>id</th>'
 '<th>message</th>'
 '</tr>'
 )
 row = '<tr><td>%d</td><td>%s</td></tr>'
 footer = (
 '</table>'
 '</body>'
 '</html>'
 )
 sql = 'select message, id from fortune'
 @classmethod
 def prepare_fortunes(cls, fortunes):
 fortunes = [ (f[0], f[1]) for f in fortunes ]
 fortunes.append(('Begin. The rest is easy.', 0))
 fortunes.sort()
 return fortunes
 @classmethod
 def render(cls, fortunes):
 fortunes = cls.prepare_fortunes(fortunes)
 row = cls.row
 return ''.join((
 cls.header,
 ''.join([ row % (f[1], quote_html(f[0])) for f in fortunes ]),
 cls.footer,
 ))
 @classmethod
 def render_raw(cls):
 return cls.render(cls.fortunes)
 @classmethod
 def render_db(cls, connect_string=None):
 fortunes = cls.load_from_db(connect_string)
 return cls.render(fortunes)
 @classmethod
 def load_from_db(cls, connect_string=None):
 cs = connect_string or localhost_connect_string
 con = pyodbc.connect(cs)
 cur = con.cursor()
 cur.execute(cls.sql)
 return cur.fetchall()
 @classmethod
 def json_from_db(cls, connect_string=None):
 results = Fortune.load_from_db(connect_string)
 results = Fortune.prepare_fortunes(results)
 fortunes = { r[1]: r[0] for r in results }
 return json.dumps(fortunes)
routes = make_routes()
route = router(routes)
class TefbHttpServer(HttpServer):
 routes = routes
 db_sql = 'select id, randomNumber from world where id = ?'
 update_sql = 'update world set randomNumber = ? where id = ?'
 fortune_sql = 'select * from fortune'
 @route
 def json(self, request):
 json_serialization(request, obj={'message': 'Hello, World'})
 @route
 def plaintext(self, request):
 text_response(request, text='Hello, World!')
 @route
 def fortunes(self, request):
 return html_response(request, Fortune.render_db(self.connect_string))
 @route
 def fortunes_raw(self, request):
 return html_response(request, Fortune.render_raw())
 @route
 def db(self, request):
 con = pyodbc.connect(self.connect_string)
 cur = con.cursor()
 cur.execute(self.db_sql, (randint(1, 10000)))
 results = cur.fetchall()
 db = {
 'id': results[0][0],
 'randomNumber': results[0][1],
 }
 return json_serialization(request, db)
 @route
 def queries(self, request):
 count = try_int(request.query.get('queries')) or 1
 if count < 1:
 count = 1
 elif count > 500:
 count = 500
 con = pyodbc.connect(self.connect_string)
 cur = con.cursor()
 ids = randints(count)
 results = []
 for npi in ids:
 i = int(npi)
 cur.execute(self.db_sql, i)
 r = cur.fetchall()
 results.append({'id': r[0][0], 'randomNumber': r[0][1]})
 return json_serialization(request, results)
 @route
 def updates(self, request):
 count = try_int(request.query.get('queries')) or 1
 if count < 1:
 count = 1
 elif count > 500:
 count = 500
 con = pyodbc.connect(self.connect_string)
 cur = con.cursor()
 ints2d = randints2d(count)
 results = []
 updates = []
 for npi in ints2d:
 (i, rn) = (int(npi[0]), int(npi[1]))
 cur.execute(self.db_sql, i)
 o = cur.fetchall()
 results.append((rn, i))
 updates.append({'id': i, 'randomNumber': rn})
 cur.executemany(self.update_sql, results)
 cur.commit()
 return json_serialization(request, updates)
 @route
 def stats(self, request):
 stats = {
 'system': dict(sys_stats()),
 'server': dict(socket_stats(request.transport.parent)),
 'memory': dict(memory_stats()),
 'contexts': dict(context_stats()),
 'elapsed': request.transport.elapsed(),
 'thread': parallel.thread_seq_id(),
 }
 json_serialization(request, stats)
 @route
 def hello(self, request, *args, **kwds):
 j = { 'args': args, 'kwds': kwds }
 return json_serialization(request, j)
 @route
 def shutdown(self, request):
 request.transport.shutdown_server()
 json_serialization(request, obj={'message': 'Shutdown'})
 @route
 def elapsed(self, request):
 obj = { 'elapsed': request.transport.elapsed() }
 json_serialization(request, obj)
plaintext_http11_response = (
 'HTTP/1.1 200 OK\r\n'
 'Server: PyParallel Web Server v0.1\r\n'
 'Date: 2015年5月16日 15:21:34 GMT\r\n'
 'Content-Type: text/plain;charset=utf-8\r\n'
 'Content-Length: 15\r\n'
 '\r\n'
 'Hello, World!\r\n'
)
class CheatingPlaintextHttpServer:
 initial_bytes_to_send = plaintext_http11_response
 next_bytes_to_send = plaintext_http11_response
# vim:set ts=8 sw=4 sts=4 tw=80 et:

"""
A very simple wiki search HTTP server that demonstrates useful techniques
afforded by PyParallel: the ability to load large reference data structures
into memory, and then query them as part of incoming request processing in
parallel.
"""
#===============================================================================
# Imports
#===============================================================================
import json
import parallel
import socket
import datrie
import string
import urllib
import numpy as np
from collections import (
 defaultdict,
)
from numpy import (
 uint64,
)
from parallel import (
 rdtsc,
 sys_stats,
 socket_stats,
 memory_stats,
 context_stats,
 call_from_main_thread,
 call_from_main_thread_and_wait,
 CachingBehavior,
)
from parallel.http.server import (
 router,
 make_routes,
 Request,
 HttpServer,
 RangedRequest,
)
from os.path import (
 join,
 exists,
 abspath,
 dirname,
 normpath,
)
def join_path(*args):
 return abspath(normpath(join(*args)))
#===============================================================================
# Configurables -- Change These!
#===============================================================================
# Change this to the directory containing the downloaded files.
#DATA_DIR = r'd:\data'
DATA_DIR = join_path(dirname(__file__), 'data')
# If you want to change the hostname listened on from the default (which will
# resolve to whatever IP address the computer name resolves to), do so here.
HOSTNAME = socket.gethostname()
# E.g.:
# HOSTNAME = 'localhost'
IPADDR = '0.0.0.0'
PORT = 8080
#===============================================================================
# Constants
#===============================================================================
# This file is huge when unzipped -- ~53GB. Although, granted, it is the
# entire Wikipedia in a single file. The bz2 version is much smaller, but
# still pretty huge. Search the web for instructions on how to download
# from one of the wiki mirrors, then bunzip2 it, then place in the same
# data directory.
WIKI_XML_NAME = 'enwiki-20150205-pages-articles.xml'
WIKI_XML_PATH = join_path(DATA_DIR, WIKI_XML_NAME)
# The following two files can be downloaded from
# http://download.pyparallel.org.
# This is a trie keyed by every <title>xxx</title> in the wiki XML; the value
# is a 64-bit byte offset within the file where the title was found.
# Specifically, it is the offset where the '<' bit of the <title> was found.
TITLES_TRIE_PATH = join_path(DATA_DIR, 'titles.trie')
# And because this file is so huge and the data structure takes so long to
# load, we have another version that was created for titles starting with Z
# and z (I picked 'z' as I figured it would have the least-ish titles). (This
# file was created via the save_titles_startingwith_z() method below.)
ZTITLES_TRIE_PATH = join_path(DATA_DIR, 'ztitles.trie')
# This is a sorted numpy array of uint64s representing the byte offset values
# in the trie. When given the byte offset of a title derived from a trie
# lookup, we can find the byte offset of where the next title starts within
# the xml file. That allows us to isolate the required byte range from the
# xml file where the particular title is defined. Such a byte range can be
# satisfied with a ranged HTTP request.
TITLES_OFFSETS_NPY_PATH = join_path(DATA_DIR, 'titles_offsets.npy')
#===============================================================================
# Aliases
#===============================================================================
uint64_7 = uint64(7)
uint64_11 = uint64(11)
#===============================================================================
# Globals
#===============================================================================
#wiki_xml = parallel.open(WIKI_XML_PATH, 'r', caching=CachingBehavior.RandomAccess)
offsets = np.load(TITLES_OFFSETS_NPY_PATH)
# Use the smaller one if the larger one doesn't exist.
if not exists(TITLES_TRIE_PATH):
 TRIE_PATH = ZTITLES_TRIE_PATH
else:
 TRIE_PATH = TITLES_TRIE_PATH
print("About to load titles trie, this will take a while...")
titles = datrie.Trie.load(TRIE_PATH)
#===============================================================================
# Misc Helpers
#===============================================================================
def save_titles_startingwith_z():
 # Ok, the 11GB trie that takes 2 minutes to load is painful to develop
 # with; let's whip up a little helper that just works on titles starting
 # with 'Z'.
 path = TITLES_TRIE_PATH.replace('titles.', 'ztitles.')
 allowed = (string.printable + string.punctuation)
 ztrie = datrie.Trie(allowed)
 for c in ('Z', 'z'):
 for (key, value) in titles.items(c):
 if key in ztrie:
 existing = ztrie[key]
 for v in value:
 if v not in existing:
 existing.append(v)
 existing.sort()
 else:
 ztrie[key] = value
 ztrie.save(path)
def json_serialization(request=None, obj=None):
 """
 Helper method for converting a dict `obj` into a JSON response for the
 incoming `request`.
 """
 transport = None
 if not request:
 request = Request(transport=None, data=None)
 else:
 transport = request.transport
 if not obj:
 obj = {}
 #parallel.debug('obj: %r' % obj)
 response = request.response
 response.code = 200
 response.message = 'OK'
 response.content_type = 'application/json; charset=UTF-8'
 response.body = json.dumps(obj)
 return request
def text_serialization(request=None, text=None):
 transport = None
 if not request:
 request = Request(transport=None, data=None)
 else:
 transport = request.transport
 if not text:
 text = 'Hello, World!'
 response = request.response
 response.code = 200
 response.message = 'OK'
 response.content_type = 'text/plain; charset=UTF-8'
 response.body = text
 return request
#===============================================================================
# Offset Helpers
#===============================================================================
# Three implementations of the same functionality: given a key, look up all
# items in the trie starting with that key, then return the relevant offsets
# for each one, such that a client can then issue a ranged HTTP request for
# the bytes returned.
# >>> results = titles.items('Ap')
# >>> len(results)
# 16333
#
# So, how long does it take to construct the result set for 16,333 hits?
# (That is, 16,333 Wikipedia pages whose page title starts with 'Ap'.)
#
# >>> from ctk.util import timer
# >>> with timer.timeit():
# ... _ = dumps(get_page_offsets_for_key2('Ap'))
# ...
# 274ms
# >>> with timer.timeit():
# ... _ = dumps(get_page_offsets_for_key('Ap'))
# ...
# 278ms
# >>> with timer.timeit():
# ... _ = dumps(get_page_offsets_for_key3('Ap'))
# ...
# 256ms
def get_page_offsets_for_key(search_string):
 items = titles.items(search_string)
 results = defaultdict(list)
 for (key, value) in items:
 for v in value:
 o = uint64(v if v > 0 else v*-1)
 ix = offsets.searchsorted(o, side='right')
 results[key].append((int(o-uint64_7), int(offsets[ix]-uint64_11)))
 return results or None
def get_page_offsets_for_key2(search_string):
 items = titles.items(search_string)
 if not items:
 return None
 results = [ [None, None, None] for _ in range(0, len(items)) ]
 assert len(results) == len(items), (len(results), len(items))
 for (i, pair) in enumerate(items):
 (key, value) = pair
 for (j, v) in enumerate(value):
 rx = i + j
 o = uint64(v if v > 0 else v*-1)
 ix = offsets.searchsorted(o, side='right')
 results[rx][0] = key
 results[rx][1] = int(o-uint64_7)
 results[rx][2] = int(offsets[ix]-uint64_11)
 return results
def get_page_offsets_for_key3(search_string):
 results = []
 items = titles.items(search_string)
 if not items:
 return results
 for (key, value) in items:
 v = value[0]
 o = uint64(v if v > 0 else v*-1)
 ix = offsets.searchsorted(o, side='right')
 results.append((key, int(o-uint64_7), int(offsets[ix]-uint64_11)))
 return results
#===============================================================================
# Web Helpers
#===============================================================================
def exact_title(title):
 if title in titles:
 return json.dumps([[title, ] + [ t for t in titles[title] ]])
 else:
 return json.dumps([])
#===============================================================================
# Classes
#===============================================================================
routes = make_routes()
route = router(routes)
class WikiServer(HttpServer):
 routes = routes
 @route
 def wiki(self, request, name, **kwds):
 # Do an exact lookup if we find a match.
 if name not in titles:
 return self.error(request, 404)
 o = titles[name][0]
 o = uint64(o if o > 0 else o*-1)
 ix = offsets.searchsorted(o, side='right')
 start = int(o-uint64_7)
 end = int(offsets[ix]-uint64_11)
 range_request = '%d-%d' % (start, end)
 request.range = RangedRequest(range_request)
 request.response.content_type = 'text/xml; charset=utf-8'
 return self.sendfile(request, WIKI_XML_PATH)
 @route
 def offsets(self, request, name, limit=None):
 if not name:
 return self.error(request, 400, "Missing name")
 if len(name) < 3:
 return self.error(request, 400, "Name too short (< 3 chars)")
 return json_serialization(request, get_page_offsets_for_key3(name))
 @route
 def xml(self, request, *args, **kwds):
 if not request.range:
 return self.error(request, 400, "Ranged-request required.")
 else:
 request.response.content_type = 'text/xml; charset=utf-8'
 return self.sendfile(request, WIKI_XML_PATH)
 @route
 def stats(self, request, *args, **kwds):
 stats = {
 'system': dict(sys_stats()),
 'server': dict(socket_stats(request.transport.parent)),
 'memory': dict(memory_stats()),
 'contexts': dict(context_stats()),
 'elapsed': request.transport.elapsed(),
 'thread': parallel.thread_seq_id(),
 }
 if args:
 name = args[0]
 if name in stats:
 stats = { name: stats[name] }
 return json_serialization(request, stats)
 @route
 def hello(self, request, *args, **kwds):
 j = { 'args': args, 'kwds': kwds }
 return json_serialization(request, j)
 @route
 def title(self, request, name, *args, **kwds):
 items = titles.items(name)
 return json_serialization(request, items)
 @route
 def elapsed(self, request, *args, **kwds):
 obj = { 'elapsed': request.transport.elapsed() }
 return json_serialization(obj)
 @route
 def json(self, request, *args, **kwds):
 return json_serialization(request, {'message': 'Hello, World!'})
 @route
 def plaintext(self, request, *args, **kwds):
 return text_serialization(request, text='Hello, World!')
#===============================================================================
# Main
#===============================================================================
def main():
 server = parallel.server(IPADDR, PORT)
 parallel.register(transport=server, protocol=WikiServer)
 parallel.run_once()
 return server
if __name__ == '__main__':
 server = main()
 parallel.run()
# vim:set ts=8 sw=4 sts=4 tw=78 et:

Download

PyParallel is alpha level quality and should not be used in production.
All interfaces and implementation details are subject to change.

Windows 7/8/10 64-bit

PyParallel was created by Trent Nelson

Follow @trentnelson [フレーム]

Sponsored in part by Continuum Analytics

Site design pilfered unrepentantly from Joe Walnes's websocketd website.

AltStyle によって変換されたページ (->オリジナル) /