diff --git a/doc/source/deployment_guide.rst b/doc/source/deployment_guide.rst index c9f81854bf..8653375d03 100644 --- a/doc/source/deployment_guide.rst +++ b/doc/source/deployment_guide.rst @@ -378,6 +378,13 @@ mb_per_sync 512 On PUT requests, sync file every n MB keep_cache_size 5242880 Largest object size to keep in buffer cache keep_cache_private false Allow non-public objects to stay in kernel's buffer cache +threads_per_disk 0 Size of the per-disk thread pool used for + performing disk I/O. The default of 0 means + to not use a per-disk thread pool. It is + recommended to keep this value small, as + large values can result in high read + latencies due to large queue depths. A good + starting point is 4 threads per disk. ================== ============= =========================================== [object-replicator] diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index bc12a78c6e..588ec5dc2c 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -83,6 +83,8 @@ use = egg:swift#object # verbs, set to "False". Unless you have a separate replication network, you # should not specify any value for "replication_server". # replication_server = False +# A value of 0 means "don't use thread pools". A reasonable starting point is 4. +# threads_per_disk = 0 [filter:healthcheck] use = egg:swift#healthcheck diff --git a/swift/common/utils.py b/swift/common/utils.py index 5d168ce9d4..31a07de451 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -22,6 +22,7 @@ import os import pwd import re import sys +import threading as stdlib_threading import time import uuid import functools @@ -34,6 +35,7 @@ import ctypes.util from ConfigParser import ConfigParser, NoSectionError, NoOptionError, \ RawConfigParser from optparse import OptionParser +from Queue import Queue, Empty from tempfile import mkstemp, NamedTemporaryFile try: import simplejson as json @@ -46,7 +48,8 @@ import itertools import eventlet import eventlet.semaphore -from eventlet import GreenPool, sleep, Timeout, tpool +from eventlet import GreenPool, sleep, Timeout, tpool, greenthread, \ + greenio, event from eventlet.green import socket, threading import netifaces import codecs @@ -1814,3 +1817,164 @@ def tpool_reraise(func, *args, **kwargs): if isinstance(resp, BaseException): raise resp return resp + + +class ThreadPool(object): + BYTE = 'a'.encode('utf-8') + + """ + Perform blocking operations in background threads. + + Call its methods from within greenlets to green-wait for results without + blocking the eventlet reactor (hopefully). + """ + def __init__(self, nthreads=2): + self.nthreads = nthreads + self._run_queue = Queue() + self._result_queue = Queue() + self._threads = [] + + if nthreads <= 0: + return + + # We spawn a greenthread whose job it is to pull results from the + # worker threads via a real Queue and send them to eventlet Events so + # that the calling greenthreads can be awoken. + # + # Since each OS thread has its own collection of greenthreads, it + # doesn't work to have the worker thread send stuff to the event, as + # it then notifies its own thread-local eventlet hub to wake up, which + # doesn't do anything to help out the actual calling greenthread over + # in the main thread. + # + # Thus, each worker sticks its results into a result queue and then + # writes a byte to a pipe, signaling the result-consuming greenlet (in + # the main thread) to wake up and consume results. + # + # This is all stuff that eventlet.tpool does, but that code can't have + # multiple instances instantiated. Since the object server uses one + # pool per disk, we have to reimplement this stuff. + _raw_rpipe, self.wpipe = os.pipe() + self.rpipe = greenio.GreenPipe(_raw_rpipe, 'rb', bufsize=0) + + for _junk in xrange(nthreads): + thr = stdlib_threading.Thread( + + args=(self._run_queue, self._result_queue)) + thr.daemon = True + thr.start() + self._threads.append(thr) + + # This is the result-consuming greenthread that runs in the main OS + # thread, as described above. + self._consumer_coro = greenthread.spawn_n(self._consume_results, + self._result_queue) + + def _worker(self, work_queue, result_queue): + """ + Pulls an item from the queue and runs it, then puts the result into + the result queue. Repeats forever. + + :param work_queue: queue from which to pull work + :param result_queue: queue into which to place results + """ + while True: + item = work_queue.get() + ev, func, args, kwargs = item + try: + result = func(*args, **kwargs) + result_queue.put((ev, True, result)) + except BaseException, err: + result_queue.put((ev, False, err)) + finally: + work_queue.task_done() + os.write(self.wpipe, self.BYTE) + + def _consume_results(self, queue): + """ + Runs as a greenthread in the same OS thread as callers of + run_in_thread(). + + Takes results from the worker OS threads and sends them to the waiting + greenthreads. + """ + while True: + try: + self.rpipe.read(1) + except ValueError: + # can happen at process shutdown when pipe is closed + break + + while True: + try: + ev, success, result = queue.get(block=False) + except Empty: + break + + try: + if success: + ev.send(result) + else: + ev.send_exception(result) + finally: + queue.task_done() + + def run_in_thread(self, func, *args, **kwargs): + """ + Runs func(*args, **kwargs) in a thread. Blocks the current greenlet + until results are available. + + Exceptions thrown will be reraised in the calling thread. + + If the threadpool was initialized with nthreads=0, just calls + func(*args, **kwargs). + + :returns: result of calling func + :raises: whatever func raises + """ + if self.nthreads <= 0: + return func(*args, **kwargs) + + ev = event.Event() + self._run_queue.put((ev, func, args, kwargs), block=False) + + # blocks this greenlet (and only *this* greenlet) until the real + # thread calls ev.send(). + result = ev.wait() + return result + + def _run_in_eventlet_tpool(self, func, *args, **kwargs): + """ + Really run something in an external thread, even if we haven't got any + threads of our own. + """ + def inner(): + try: + return (True, func(*args, **kwargs)) + except (Timeout, BaseException) as err: + return (False, err) + + success, result = tpool.execute(inner) + if success: + return result + else: + raise result + + def force_run_in_thread(self, func, *args, **kwargs): + """ + Runs func(*args, **kwargs) in a thread. Blocks the current greenlet + until results are available. + + Exceptions thrown will be reraised in the calling thread. + + If the threadpool was initialized with nthreads=0, uses eventlet.tpool + to run the function. This is in contrast to run_in_thread(), which + will (in that case) simply execute func in the calling thread. + + :returns: result of calling func + :raises: whatever func raises + """ + if self.nthreads <= 0: + return self._run_in_eventlet_tpool(func, *args, **kwargs) + else: + return self.run_in_thread(func, *args, **kwargs) diff --git a/swift/obj/server.py b/swift/obj/server.py index 5fd7b94e0e..999da25e0d 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -21,6 +21,7 @@ import errno import os import time import traceback +from collections import defaultdict from datetime import datetime from hashlib import md5 from tempfile import mkstemp @@ -28,13 +29,13 @@ from urllib import unquote from contextlib import contextmanager from xattr import getxattr, setxattr -from eventlet import sleep, Timeout, tpool +from eventlet import sleep, Timeout from swift.common.utils import mkdirs, normalize_timestamp, public, \ storage_directory, hash_path, renamer, fallocate, fsync, fdatasync, \ split_path, drop_buffer_cache, get_logger, write_pickle, \ config_true_value, validate_device_partition, timing_stats, \ - tpool_reraise + ThreadPool from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_object_creation, check_mount, \ check_float, check_utf8 @@ -100,12 +101,13 @@ class DiskWriter(object): requests. Serves as the context manager object for DiskFile's writer() method. """ - def __init__(self, disk_file, fd, tmppath): + def __init__(self, disk_file, fd, tmppath, threadpool): self.disk_file = disk_file self.fd = fd self.tmppath = tmppath self.upload_size = 0 self.last_sync = 0 + self.threadpool = threadpool def write(self, chunk): """ @@ -113,16 +115,21 @@ class DiskWriter(object): :param chunk: the chunk of data to write as a string object """ - while chunk: - written = os.write(self.fd, chunk) - self.upload_size += written - chunk = chunk[written:] - # For large files sync every 512MB (by default) written - diff = self.upload_size - self.last_sync - if diff>= self.disk_file.bytes_per_sync: - tpool.execute(fdatasync, self.fd) - drop_buffer_cache(self.fd, self.last_sync, diff) - self.last_sync = self.upload_size + + def _write_entire_chunk(chunk): + while chunk: + written = os.write(self.fd, chunk) + self.upload_size += written + chunk = chunk[written:] + + self.threadpool.run_in_thread(_write_entire_chunk, chunk) + + # For large files sync every 512MB (by default) written + diff = self.upload_size - self.last_sync + if diff>= self.disk_file.bytes_per_sync: + self.threadpool.force_run_in_thread(fdatasync, self.fd) + drop_buffer_cache(self.fd, self.last_sync, diff) + self.last_sync = self.upload_size def put(self, metadata, extension='.data'): """ @@ -136,22 +143,27 @@ class DiskWriter(object): assert self.tmppath is not None timestamp = normalize_timestamp(metadata['X-Timestamp']) metadata['name'] = self.disk_file.name - # Write the metadata before calling fsync() so that both data and - # metadata are flushed to disk. - write_metadata(self.fd, metadata) - # We call fsync() before calling drop_cache() to lower the amount of - # redundant work the drop cache code will perform on the pages (now - # that after fsync the pages will be all clean). - tpool.execute(fsync, self.fd) - # From the Department of the Redundancy Department, make sure we - # call drop_cache() after fsync() to avoid redundant work (pages - # all clean). - drop_buffer_cache(self.fd, 0, self.upload_size) - invalidate_hash(os.path.dirname(self.disk_file.datadir)) - # After the rename completes, this object will be available for other - # requests to reference. - renamer(self.tmppath, - os.path.join(self.disk_file.datadir, timestamp + extension)) + + def finalize_put(): + # Write the metadata before calling fsync() so that both data and + # metadata are flushed to disk. + write_metadata(self.fd, metadata) + # We call fsync() before calling drop_cache() to lower the amount + # of redundant work the drop cache code will perform on the pages + # (now that after fsync the pages will be all clean). + fsync(self.fd) + # From the Department of the Redundancy Department, make sure + # we call drop_cache() after fsync() to avoid redundant work + # (pages all clean). + drop_buffer_cache(self.fd, 0, self.upload_size) + invalidate_hash(os.path.dirname(self.disk_file.datadir)) + # After the rename completes, this object will be available for + # other requests to reference. + renamer(self.tmppath, + os.path.join(self.disk_file.datadir, + timestamp + extension)) + + self.threadpool.force_run_in_thread(finalize_put) self.disk_file.metadata = metadata @@ -169,12 +181,15 @@ class DiskFile(object): :param disk_chunk_size: size of chunks on file reads :param bytes_per_sync: number of bytes between fdatasync calls :param iter_hook: called when __iter__ returns a chunk + :param threadpool: thread pool in which to do blocking operations + :raises DiskFileCollision: on md5 collision """ def __init__(self, path, device, partition, account, container, obj, logger, keep_data_fp=False, disk_chunk_size=65536, - bytes_per_sync=(512 * 1024 * 1024), iter_hook=None): + bytes_per_sync=(512 * 1024 * 1024), iter_hook=None, + threadpool=None): self.disk_chunk_size = disk_chunk_size self.bytes_per_sync = bytes_per_sync self.iter_hook = iter_hook @@ -195,6 +210,7 @@ class DiskFile(object): self.quarantined_dir = None self.keep_cache = False self.suppress_file_closing = False + self.threadpool = threadpool or ThreadPool(nthreads=0) if not os.path.exists(self.datadir): return files = sorted(os.listdir(self.datadir), reverse=True) @@ -240,7 +256,8 @@ class DiskFile(object): self.started_at_0 = True self.iter_etag = md5() while True: - chunk = self.fp.read(self.disk_chunk_size) + chunk = self.threadpool.run_in_thread( + self.fp.read, self.disk_chunk_size) if chunk: if self.iter_etag: self.iter_etag.update(chunk) @@ -366,7 +383,7 @@ class DiskFile(object): fallocate(fd, size) except OSError: raise DiskFileNoSpace() - yield DiskWriter(self, fd, tmppath) + yield DiskWriter(self, fd, tmppath, self.threadpool) finally: try: os.close(fd) @@ -396,13 +413,16 @@ class DiskFile(object): :param timestamp: timestamp to compare with each file """ timestamp = normalize_timestamp(timestamp) - for fname in os.listdir(self.datadir): - if fname < timestamp: - try: - os.unlink(os.path.join(self.datadir, fname)) - except OSError, err: # pragma: no cover - if err.errno != errno.ENOENT: - raise + + def _unlinkold(): + for fname in os.listdir(self.datadir): + if fname < timestamp: + try: + os.unlink(os.path.join(self.datadir, fname)) + except OSError, err: # pragma: no cover + if err.errno != errno.ENOENT: + raise + self.threadpool.run_in_thread(_unlinkold) def _drop_cache(self, fd, offset, length): """Method for no-oping buffer cache drop method.""" @@ -418,8 +438,8 @@ class DiskFile(object): directory otherwise None """ if not (self.is_deleted() or self.quarantined_dir): - self.quarantined_dir = quarantine_renamer(self.device_path, - self.data_file) + self.quarantined_dir = self.threadpool.run_in_thread( + quarantine_renamer, self.device_path, self.data_file) self.logger.increment('quarantines') return self.quarantined_dir @@ -436,7 +456,8 @@ class DiskFile(object): try: file_size = 0 if self.data_file: - file_size = os.path.getsize(self.data_file) + file_size = self.threadpool.run_in_thread( + os.path.getsize, self.data_file) if 'Content-Length' in self.metadata: metadata_size = int(self.metadata['Content-Length']) if file_size != metadata_size: @@ -486,6 +507,9 @@ class ObjectController(object): allowed_methods = ['DELETE', 'PUT', 'HEAD', 'GET', 'POST'] self.replication_server = replication_server self.allowed_methods = allowed_methods + self.threads_per_disk = int(conf.get('threads_per_disk', '0')) + self.threadpools = defaultdict( + lambda: ThreadPool(nthreads=self.threads_per_disk)) default_allowed_headers = ''' content-disposition, content-encoding, @@ -547,7 +571,8 @@ class ObjectController(object): async_dir = os.path.join(self.devices, objdevice, ASYNCDIR) ohash = hash_path(account, container, obj) self.logger.increment('async_pendings') - write_pickle( + self.threadpools[objdevice].run_in_thread( + write_pickle, {'op': op, 'account': account, 'container': container, 'obj': obj, 'headers': headers_out}, os.path.join(async_dir, ohash[-3:], ohash + '-' + @@ -688,7 +713,8 @@ class ObjectController(object): disk_file = DiskFile(self.devices, device, partition, account, container, obj, self.logger, disk_chunk_size=self.disk_chunk_size, - bytes_per_sync=self.bytes_per_sync) + bytes_per_sync=self.bytes_per_sync, + threadpool=self.threadpools[device]) if disk_file.is_deleted() or disk_file.is_expired(): return HTTPNotFound(request=request) try: @@ -746,7 +772,8 @@ class ObjectController(object): disk_file = DiskFile(self.devices, device, partition, account, container, obj, self.logger, disk_chunk_size=self.disk_chunk_size, - bytes_per_sync=self.bytes_per_sync) + bytes_per_sync=self.bytes_per_sync, + threadpool=self.threadpools[device]) old_delete_at = int(disk_file.metadata.get('X-Delete-At') or 0) orig_timestamp = disk_file.metadata.get('X-Timestamp') upload_expiration = time.time() + self.max_upload_time @@ -831,6 +858,7 @@ class ObjectController(object): container, obj, self.logger, keep_data_fp=True, disk_chunk_size=self.disk_chunk_size, bytes_per_sync=self.bytes_per_sync, + threadpool=self.threadpools[device], iter_hook=sleep) if disk_file.is_deleted() or disk_file.is_expired(): if request.headers.get('if-match') == '*': @@ -913,7 +941,8 @@ class ObjectController(object): disk_file = DiskFile(self.devices, device, partition, account, container, obj, self.logger, disk_chunk_size=self.disk_chunk_size, - bytes_per_sync=self.bytes_per_sync) + bytes_per_sync=self.bytes_per_sync, + threadpool=self.threadpools[device]) if disk_file.is_deleted() or disk_file.is_expired(): return HTTPNotFound(request=request) try: @@ -958,7 +987,8 @@ class ObjectController(object): disk_file = DiskFile(self.devices, device, partition, account, container, obj, self.logger, disk_chunk_size=self.disk_chunk_size, - bytes_per_sync=self.bytes_per_sync) + bytes_per_sync=self.bytes_per_sync, + threadpool=self.threadpools[device]) if 'x-if-delete-at' in request.headers and \ int(request.headers['x-if-delete-at']) != \ int(disk_file.metadata.get('X-Delete-At') or 0): @@ -1006,7 +1036,8 @@ class ObjectController(object): if not os.path.exists(path): mkdirs(path) suffixes = suffix.split('-') if suffix else [] - _junk, hashes = tpool_reraise(get_hashes, path, recalculate=suffixes) + _junk, hashes = self.threadpools[device].force_run_in_thread( + get_hashes, path, recalculate=suffixes) return Response(body=pickle.dumps(hashes)) def __call__(self, env, start_response): diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 80d661f655..cf41f057d4 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -27,9 +27,9 @@ import re import socket import sys from textwrap import dedent +import threading import time import unittest -from threading import Thread from Queue import Queue, Empty from getpass import getuser from shutil import rmtree @@ -1582,7 +1582,7 @@ class TestStatsdLoggingDelegation(unittest.TestCase): self.sock.bind(('localhost', 0)) self.port = self.sock.getsockname()[1] self.queue = Queue() - self.reader_thread = Thread( + self.reader_thread = threading.Thread( self.reader_thread.setDaemon(1) self.reader_thread.start() @@ -1866,5 +1866,91 @@ class TestStatsdLoggingDelegation(unittest.TestCase): self.assertEquals(called, [12345]) +class TestThreadpool(unittest.TestCase): + + def _thread_id(self): + return threading.current_thread().ident + + def _capture_args(self, *args, **kwargs): + return {'args': args, 'kwargs': kwargs} + + def _raise_valueerror(self): + return int('fishcakes') + + def test_run_in_thread_with_threads(self): + tp = utils.ThreadPool(1) + + my_id = self._thread_id() + other_id = tp.run_in_thread(self._thread_id) + self.assertNotEquals(my_id, other_id) + + result = tp.run_in_thread(self._capture_args, 1, 2, bert='ernie') + self.assertEquals(result, {'args': (1, 2), + 'kwargs': {'bert': 'ernie'}}) + + caught = False + try: + tp.run_in_thread(self._raise_valueerror) + except ValueError: + caught = True + self.assertTrue(caught) + + def test_force_run_in_thread_with_threads(self): + # with nthreads> 0, force_run_in_thread looks just like run_in_thread + tp = utils.ThreadPool(1) + + my_id = self._thread_id() + other_id = tp.force_run_in_thread(self._thread_id) + self.assertNotEquals(my_id, other_id) + + result = tp.force_run_in_thread(self._capture_args, 1, 2, bert='ernie') + self.assertEquals(result, {'args': (1, 2), + 'kwargs': {'bert': 'ernie'}}) + + caught = False + try: + tp.force_run_in_thread(self._raise_valueerror) + except ValueError: + caught = True + self.assertTrue(caught) + + def test_run_in_thread_without_threads(self): + # with zero threads, run_in_thread doesn't actually do so + tp = utils.ThreadPool(0) + + my_id = self._thread_id() + other_id = tp.run_in_thread(self._thread_id) + self.assertEquals(my_id, other_id) + + result = tp.run_in_thread(self._capture_args, 1, 2, bert='ernie') + self.assertEquals(result, {'args': (1, 2), + 'kwargs': {'bert': 'ernie'}}) + + caught = False + try: + tp.run_in_thread(self._raise_valueerror) + except ValueError: + caught = True + self.assertTrue(caught) + + def test_force_run_in_thread_without_threads(self): + # with zero threads, force_run_in_thread uses eventlet.tpool + tp = utils.ThreadPool(0) + + my_id = self._thread_id() + other_id = tp.force_run_in_thread(self._thread_id) + self.assertNotEquals(my_id, other_id) + + result = tp.force_run_in_thread(self._capture_args, 1, 2, bert='ernie') + self.assertEquals(result, {'args': (1, 2), + 'kwargs': {'bert': 'ernie'}}) + caught = False + try: + tp.force_run_in_thread(self._raise_valueerror) + except ValueError: + caught = True + self.assertTrue(caught) + + if __name__ == '__main__': unittest.main() diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index cb98f3efb8..cb56897eaf 100755 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -49,13 +49,15 @@ class TestDiskFile(unittest.TestCase): self.testdir = os.path.join(mkdtemp(), 'tmp_test_obj_server_DiskFile') mkdirs(os.path.join(self.testdir, 'sda1', 'tmp')) - def fake_exe(*args, **kwargs): - pass + self._real_tpool_execute = tpool.execute + def fake_exe(meth, *args, **kwargs): + return meth(*args, **kwargs) tpool.execute = fake_exe def tearDown(self): """ Tear down for testing swift.object_server.ObjectController """ rmtree(os.path.dirname(self.testdir)) + tpool.execute = self._real_tpool_execute def _create_test_file(self, data, keep_data_fp=True): df = object_server.DiskFile(self.testdir, 'sda1', '0', 'a', 'c', 'o',