From 4741fcd4410f6d7f728308efe425b9bfbe5abdf7 Mon Sep 17 00:00:00 2001 From: Michael Barton Date: 2011年1月16日 14:50:29 +0000 Subject: [PATCH 01/26] proxy refactor --- swift/proxy/server.py | 408 ++++++++------------------------- test/unit/proxy/test_server.py | 7 +- 2 files changed, 104 insertions(+), 311 deletions(-) diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 32c7ad9004..94b2792743 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -30,7 +30,7 @@ import uuid import functools from hashlib import md5 -from eventlet import sleep +from eventlet import sleep, GreenPile from eventlet.timeout import Timeout from webob.exc import HTTPBadRequest, HTTPMethodNotAllowed, \ HTTPNotFound, HTTPPreconditionFailed, \ @@ -493,6 +493,54 @@ class Controller(object): for node in ring.get_more_nodes(partition): yield node + def _make_request(self, node, part, method, path, headers, hl, query): + if self.error_limited(node): + return False + try: + with ConnectionTimeout(self.app.conn_timeout): + conn = http_connect(node['ip'], node['port'], + node['device'], part, method, path, + headers=headers, query_string=query) + conn.node = node + with Timeout(self.app.node_timeout): + resp = conn.getresponse() + if 200 <= resp.status < 300 or 400 <= resp.status < 500: + return resp.status, resp.reason, resp.read() + elif resp.status == 507: + self.error_limit(node) + except Exception: + self.error_limit(node) + self.exception_occurred(node, _('Object'), # TODO FIX LOGGIN' + _('Trying to %(method)s %(path)s') % + {'method': method, 'path': path}) + hl.append(headers) + + def make_requests(self, req, ring, partition, method, path, headers, + query_string=''): + """ + Sends an HTTP request to multiple nodes and aggregates the results. + It attempts the primary nodes concurrently, then iterates over the + handoff nodes as needed. + """ + nodes = ring.get_part_nodes(partition) + pool = GreenPile(len(nodes)) + for node in nodes: + pool.spawn(self._make_request, node, partition, method, path, + headers.pop(), headers, query_string) + response = [resp for resp in pool if resp] + more_nodes = ring.get_more_nodes(partition) + while len(response) < len(nodes): + try: + resp = self._make_request(next(more_nodes), partition, method, + path, headers.pop(), headers, query_string) + if resp: + response.append(resp) + except StopIteration: + response.append((503, '', '')) + statuses, reasons, bodies = zip(*response) + return self.best_response(req, statuses, reasons, bodies, + _('Container POST')) # TODO fix loggin' + def get_update_nodes(self, partition, nodes, ring): """ Returns ring.replica_count nodes; the nodes will not be error limited, if possible. """ @@ -666,37 +714,6 @@ class ObjectController(Controller): self.container_name = unquote(container_name) self.object_name = unquote(object_name) - def node_post_or_delete(self, req, partition, node, path): - """ - Handle common POST/DELETE functionality - - :param req: webob.Request object - :param partition: partition for the object - :param node: node dictionary for the object - :param path: path to send for the request - """ - if self.error_limited(node): - return 500, '', '' - try: - with ConnectionTimeout(self.app.conn_timeout): - conn = http_connect(node['ip'], node['port'], node['device'], - partition, req.method, path, req.headers) - with Timeout(self.app.node_timeout): - response = conn.getresponse() - body = response.read() - if response.status == 507: - self.error_limit(node) - elif response.status>= 500: - self.error_occurred(node, - _('ERROR %(status)d %(body)s From Object Server') % - {'status': response.status, 'body': body[:1024]}) - return response.status, response.reason, body - except: - self.exception_occurred(node, _('Object'), - _('Trying to %(method)s %(path)s') % - {'method': req.method, 'path': req.path}) - return 500, '', '' - def GETorHEAD(self, req): """Handle HTTP GET or HEAD requests.""" if 'swift.authorize' in req.environ: @@ -877,30 +894,15 @@ class ObjectController(Controller): partition, nodes = self.app.object_ring.get_nodes( self.account_name, self.container_name, self.object_name) req.headers['X-Timestamp'] = normalize_timestamp(time.time()) - statuses = [] - reasons = [] - bodies = [] - for node in self.iter_nodes(partition, nodes, self.app.object_ring): - container = containers.pop() - req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container - req.headers['X-Container-Partition'] = container_partition - req.headers['X-Container-Device'] = container['device'] - status, reason, body = \ - self.node_post_or_delete(req, partition, node, req.path_info) - if 200 <= status < 300 or 400 <= status < 500: - statuses.append(status) - reasons.append(reason) - bodies.append(body) - else: - containers.insert(0, container) - if not containers: - break - while len(statuses) < len(nodes): - statuses.append(503) - reasons.append('') - bodies.append('') - return self.best_response(req, statuses, reasons, - bodies, _('Object POST')) + headers = [] + for container in containers: + nheaders = dict(req.headers.iteritems()) + nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container + nheaders['X-Container-Partition'] = container_partition + nheaders['X-Container-Device'] = container['device'] + headers.append(nheaders) + return self.make_requests(req, self.app.object_ring, + partition, 'POST', req.path_info, headers) @public @delay_denial @@ -1125,30 +1127,15 @@ class ObjectController(Controller): partition, nodes = self.app.object_ring.get_nodes( self.account_name, self.container_name, self.object_name) req.headers['X-Timestamp'] = normalize_timestamp(time.time()) - statuses = [] - reasons = [] - bodies = [] - for node in self.iter_nodes(partition, nodes, self.app.object_ring): - container = containers.pop() - req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container - req.headers['X-Container-Partition'] = container_partition - req.headers['X-Container-Device'] = container['device'] - status, reason, body = \ - self.node_post_or_delete(req, partition, node, req.path_info) - if 200 <= status < 300 or 400 <= status < 500: - statuses.append(status) - reasons.append(reason) - bodies.append(body) - else: - containers.insert(0, container) - if not containers: - break - while len(statuses) < len(nodes): - statuses.append(503) - reasons.append('') - bodies.append('') - return self.best_response(req, statuses, reasons, bodies, - _('Object DELETE')) + headers = [] + for container in containers: + nheaders = dict(req.headers.iteritems()) + nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container + nheaders['X-Container-Partition'] = container_partition + nheaders['X-Container-Device'] = container['device'] + headers.append(nheaders) + return self.make_requests(req, self.app.object_ring, + partition, 'DELETE', req.path_info, headers) @public @delay_denial @@ -1261,55 +1248,23 @@ class ContainerController(Controller): self.app.account_ring) container_partition, containers = self.app.container_ring.get_nodes( self.account_name, self.container_name) - headers = {'X-Timestamp': normalize_timestamp(time.time()), - 'x-cf-trans-id': self.trans_id} - headers.update(value for value in req.headers.iteritems() - if value[0].lower() in self.pass_through_headers or - value[0].lower().startswith('x-container-meta-')) - statuses = [] - reasons = [] - bodies = [] - for node in self.iter_nodes(container_partition, containers, - self.app.container_ring): - if self.error_limited(node): - continue - try: - account = accounts.pop() - headers['X-Account-Host'] = '%(ip)s:%(port)s' % account - headers['X-Account-Partition'] = account_partition - headers['X-Account-Device'] = account['device'] - with ConnectionTimeout(self.app.conn_timeout): - conn = http_connect(node['ip'], node['port'], - node['device'], container_partition, 'PUT', - req.path_info, headers) - with Timeout(self.app.node_timeout): - source = conn.getresponse() - body = source.read() - if 200 <= source.status < 300 \ - or 400 <= source.status < 500: - statuses.append(source.status) - reasons.append(source.reason) - bodies.append(body) - else: - if source.status == 507: - self.error_limit(node) - accounts.insert(0, account) - except: - accounts.insert(0, account) - self.exception_occurred(node, _('Container'), - _('Trying to PUT to %s') % req.path) - if not accounts: - break - while len(statuses) < len(containers): - statuses.append(503) - reasons.append('') - bodies.append('') + headers = [] + for account in accounts: + nheaders = {'X-Timestamp': normalize_timestamp(time.time()), + 'x-cf-trans-id': self.trans_id, + 'X-Account-Host': '%(ip)s:%(port)s' % account, + 'X-Account-Partition': account_partition, + 'X-Account-Device': account['device']} + nheaders.update(value for value in req.headers.iteritems() + if value[0].lower() in self.pass_through_headers or + value[0].lower().startswith('x-container-meta-')) + headers.append(nheaders) if self.app.memcache: cache_key = get_container_memcache_key(self.account_name, self.container_name) self.app.memcache.delete(cache_key) - return self.best_response(req, statuses, reasons, bodies, - _('Container PUT')) + return self.make_requests(req, self.app.container_ring, + container_partition, 'PUT', req.path_info, headers) @public def POST(self, req): @@ -1328,43 +1283,13 @@ class ContainerController(Controller): headers.update(value for value in req.headers.iteritems() if value[0].lower() in self.pass_through_headers or value[0].lower().startswith('x-container-meta-')) - statuses = [] - reasons = [] - bodies = [] - for node in self.iter_nodes(container_partition, containers, - self.app.container_ring): - if self.error_limited(node): - continue - try: - with ConnectionTimeout(self.app.conn_timeout): - conn = http_connect(node['ip'], node['port'], - node['device'], container_partition, 'POST', - req.path_info, headers) - with Timeout(self.app.node_timeout): - source = conn.getresponse() - body = source.read() - if 200 <= source.status < 300 \ - or 400 <= source.status < 500: - statuses.append(source.status) - reasons.append(source.reason) - bodies.append(body) - elif source.status == 507: - self.error_limit(node) - except: - self.exception_occurred(node, _('Container'), - _('Trying to POST %s') % req.path) - if len(statuses)>= len(containers): - break - while len(statuses) < len(containers): - statuses.append(503) - reasons.append('') - bodies.append('') if self.app.memcache: cache_key = get_container_memcache_key(self.account_name, self.container_name) self.app.memcache.delete(cache_key) - return self.best_response(req, statuses, reasons, bodies, - _('Container POST')) + return self.make_requests(req, self.app.container_ring, + container_partition, 'POST', req.path_info, + [headers] * len(containers)) @public def DELETE(self, req): @@ -1376,61 +1301,20 @@ class ContainerController(Controller): self.app.account_ring) container_partition, containers = self.app.container_ring.get_nodes( self.account_name, self.container_name) - headers = {'X-Timestamp': normalize_timestamp(time.time()), - 'x-cf-trans-id': self.trans_id} - statuses = [] - reasons = [] - bodies = [] - for node in self.iter_nodes(container_partition, containers, - self.app.container_ring): - if self.error_limited(node): - continue - try: - account = accounts.pop() - headers['X-Account-Host'] = '%(ip)s:%(port)s' % account - headers['X-Account-Partition'] = account_partition - headers['X-Account-Device'] = account['device'] - with ConnectionTimeout(self.app.conn_timeout): - conn = http_connect(node['ip'], node['port'], - node['device'], container_partition, 'DELETE', - req.path_info, headers) - with Timeout(self.app.node_timeout): - source = conn.getresponse() - body = source.read() - if 200 <= source.status < 300 \ - or 400 <= source.status < 500: - statuses.append(source.status) - reasons.append(source.reason) - bodies.append(body) - else: - if source.status == 507: - self.error_limit(node) - accounts.insert(0, account) - except: - accounts.insert(0, account) - self.exception_occurred(node, _('Container'), - _('Trying to DELETE %s') % req.path) - if not accounts: - break - while len(statuses) < len(containers): - statuses.append(503) - reasons.append('') - bodies.append('') + headers = [] + for account in accounts: + headers.append({'X-Timestamp': normalize_timestamp(time.time()), + 'X-Cf-Trans-Id': self.trans_id, + 'X-Account-Host': '%(ip)s:%(port)s' % account, + 'X-Account-Partition': account_partition, + 'X-Account-Device': account['device']}) if self.app.memcache: cache_key = get_container_memcache_key(self.account_name, self.container_name) self.app.memcache.delete(cache_key) - resp = self.best_response(req, statuses, reasons, bodies, - _('Container DELETE')) - if 200 <= resp.status_int <= 299: - for status in statuses: - if status < 200 or status> 299: - # If even one node doesn't do the delete, we can't be sure - # what the outcome will be once everything is in sync; so - # we 503. - self.app.logger.error(_('Returning 503 because not all ' - 'container nodes confirmed DELETE')) - return HTTPServiceUnavailable(request=req) + resp = self.make_requests(req, self.app.container_ring, + container_partition, 'PUT', req.path_info, + [headers] * len(containers)) if resp.status_int == 202: # Indicates no server had the container return HTTPNotFound(request=req) return resp @@ -1468,42 +1352,10 @@ class AccountController(Controller): 'x-cf-trans-id': self.trans_id} headers.update(value for value in req.headers.iteritems() if value[0].lower().startswith('x-account-meta-')) - statuses = [] - reasons = [] - bodies = [] - for node in self.iter_nodes(account_partition, accounts, - self.app.account_ring): - if self.error_limited(node): - continue - try: - with ConnectionTimeout(self.app.conn_timeout): - conn = http_connect(node['ip'], node['port'], - node['device'], account_partition, 'PUT', - req.path_info, headers) - with Timeout(self.app.node_timeout): - source = conn.getresponse() - body = source.read() - if 200 <= source.status < 300 \ - or 400 <= source.status < 500: - statuses.append(source.status) - reasons.append(source.reason) - bodies.append(body) - else: - if source.status == 507: - self.error_limit(node) - except: - self.exception_occurred(node, _('Account'), - _('Trying to PUT to %s') % req.path) - if len(statuses)>= len(accounts): - break - while len(statuses) < len(accounts): - statuses.append(503) - reasons.append('') - bodies.append('') if self.app.memcache: self.app.memcache.delete('account%s' % req.path_info.rstrip('/')) - return self.best_response(req, statuses, reasons, bodies, - _('Account PUT')) + return self.make_requests(req, self.app.account_ring, account_partition, + 'PUT', req.path_info, [headers] * len(accounts)) @public def POST(self, req): @@ -1517,41 +1369,10 @@ class AccountController(Controller): 'X-CF-Trans-Id': self.trans_id} headers.update(value for value in req.headers.iteritems() if value[0].lower().startswith('x-account-meta-')) - statuses = [] - reasons = [] - bodies = [] - for node in self.iter_nodes(account_partition, accounts, - self.app.account_ring): - if self.error_limited(node): - continue - try: - with ConnectionTimeout(self.app.conn_timeout): - conn = http_connect(node['ip'], node['port'], - node['device'], account_partition, 'POST', - req.path_info, headers) - with Timeout(self.app.node_timeout): - source = conn.getresponse() - body = source.read() - if 200 <= source.status < 300 \ - or 400 <= source.status < 500: - statuses.append(source.status) - reasons.append(source.reason) - bodies.append(body) - elif source.status == 507: - self.error_limit(node) - except: - self.exception_occurred(node, _('Account'), - _('Trying to POST %s') % req.path) - if len(statuses)>= len(accounts): - break - while len(statuses) < len(accounts): - statuses.append(503) - reasons.append('') - bodies.append('') if self.app.memcache: self.app.memcache.delete('account%s' % req.path_info.rstrip('/')) - return self.best_response(req, statuses, reasons, bodies, - _('Account POST')) + return self.make_requests(req, self.app.account_ring, account_partition, + 'POST', req.path_info, [headers] * len(accounts)) @public def DELETE(self, req): @@ -1562,41 +1383,10 @@ class AccountController(Controller): self.app.account_ring.get_nodes(self.account_name) headers = {'X-Timestamp': normalize_timestamp(time.time()), 'X-CF-Trans-Id': self.trans_id} - statuses = [] - reasons = [] - bodies = [] - for node in self.iter_nodes(account_partition, accounts, - self.app.account_ring): - if self.error_limited(node): - continue - try: - with ConnectionTimeout(self.app.conn_timeout): - conn = http_connect(node['ip'], node['port'], - node['device'], account_partition, 'DELETE', - req.path_info, headers) - with Timeout(self.app.node_timeout): - source = conn.getresponse() - body = source.read() - if 200 <= source.status < 300 \ - or 400 <= source.status < 500: - statuses.append(source.status) - reasons.append(source.reason) - bodies.append(body) - elif source.status == 507: - self.error_limit(node) - except: - self.exception_occurred(node, _('Account'), - _('Trying to DELETE %s') % req.path) - if len(statuses)>= len(accounts): - break - while len(statuses) < len(accounts): - statuses.append(503) - reasons.append('') - bodies.append('') if self.app.memcache: self.app.memcache.delete('account%s' % req.path_info.rstrip('/')) - return self.best_response(req, statuses, reasons, bodies, - _('Account DELETE')) + return self.make_requests(req, self.app.account_ring, account_partition, + 'DELETE', req.path_info, [headers] * len(accounts)) class BaseApplication(object): diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index e5a4e40652..a6ed4a5ee0 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -155,6 +155,9 @@ class FakeRing(object): {'ip': '10.0.0.%s' % x, 'port': 1000 + x, 'device': 'sda'} return 1, devs + def get_part_nodes(self, part): + return self.get_nodes('blah')[1] + def get_more_nodes(self, nodes): # 9 is the true cap for x in xrange(3, min(3 + self.max_more_nodes, 9)): @@ -2556,8 +2559,8 @@ class TestContainerController(unittest.TestCase): 'container') self.assert_status_map(controller.DELETE, (200, 204, 204, 204), 204) - self.assert_status_map(controller.DELETE, - (200, 204, 204, 503), 503) +# self.assert_status_map(controller.DELETE, +# (200, 204, 204, 503), 503) self.assert_status_map(controller.DELETE, (200, 204, 503, 503), 503) self.assert_status_map(controller.DELETE, From 0af0369389ecc73b6e0cec8e61e440e644df6391 Mon Sep 17 00:00:00 2001 From: Michael Barton Date: 2011年1月19日 04:35:59 +0000 Subject: [PATCH 02/26] concurrent object PUT streaming --- etc/proxy-server.conf-sample | 1 + swift/common/utils.py | 11 +++++ swift/proxy/server.py | 85 +++++++++++++++++++++--------------- 3 files changed, 61 insertions(+), 36 deletions(-) diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index fda7d0d034..132dbfe6b1 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -35,6 +35,7 @@ use = egg:swift#proxy # If set to 'true' any authorized user may create and delete accounts; if # 'false' no one, even authorized, can. # allow_account_management = false +# put_queue_depth = 10 # Only needed for DevAuth [filter:auth] diff --git a/swift/common/utils.py b/swift/common/utils.py index 299980493a..865675947e 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -845,3 +845,14 @@ def ratelimit_sleep(running_time, max_rate, incr_by=1): elif running_time - now> time_per_request: eventlet.sleep((running_time - now) / clock_accuracy) return running_time + time_per_request + + +class ContextPool(GreenPool): + "GreenPool subclassed to kill its coros when it gets gc'ed" + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + for coro in list(self.coroutines_running): + coro.kill() diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 94b2792743..31a37c9d68 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -30,7 +30,7 @@ import uuid import functools from hashlib import md5 -from eventlet import sleep, GreenPile +from eventlet import sleep, GreenPile, Queue from eventlet.timeout import Timeout from webob.exc import HTTPBadRequest, HTTPMethodNotAllowed, \ HTTPNotFound, HTTPPreconditionFailed, \ @@ -41,7 +41,7 @@ from webob import Request, Response from swift.common.ring import Ring from swift.common.utils import get_logger, normalize_timestamp, split_path, \ - cache_from_env + cache_from_env, ContextPool from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_metadata, check_object_creation, \ check_utf8, CONTAINER_LISTING_LIMIT, MAX_ACCOUNT_NAME_LENGTH, \ @@ -904,6 +904,19 @@ class ObjectController(Controller): return self.make_requests(req, self.app.object_ring, partition, 'POST', req.path_info, headers) + def _send_file(self, conn, path): + while True: + chunk = conn.queue.get() + if not conn.failed: + try: + with ChunkWriteTimeout(self.app.node_timeout): + conn.send(chunk) + except (Exception, ChunkWriteTimeout): + conn.failed = True + self.exception_occurred(conn.node, _('Object'), + _('Trying to write to %s') % path) + conn.queue.task_done() + @public @delay_denial def PUT(self, req): @@ -1018,46 +1031,45 @@ class ObjectController(Controller): 'required connections'), {'conns': len(conns), 'nodes': len(nodes) // 2 + 1}) return HTTPServiceUnavailable(request=req) + chunked = req.headers.get('transfer-encoding') try: - req.bytes_transferred = 0 - while True: - with ChunkReadTimeout(self.app.client_timeout): - try: - chunk = data_source.next() - except StopIteration: - if req.headers.get('transfer-encoding'): - chunk = '' - else: + with ContextPool(len(nodes)) as pool: + for conn in conns: + conn.failed = False + conn.queue = Queue(self.app.put_queue_depth) + pool.spawn(self._send_file, conn, req.path) + req.bytes_transferred = 0 + while True: + with ChunkReadTimeout(self.app.client_timeout): + try: + chunk = next(data_source) + except StopIteration: + if chunked: + [conn.queue.put('0\r\n\r\n') for conn in conns] break - len_chunk = len(chunk) - req.bytes_transferred += len_chunk - if req.bytes_transferred> MAX_FILE_SIZE: - return HTTPRequestEntityTooLarge(request=req) - for conn in list(conns): - try: - with ChunkWriteTimeout(self.app.node_timeout): - if req.headers.get('transfer-encoding'): - conn.send('%x\r\n%s\r\n' % (len_chunk, chunk)) - else: - conn.send(chunk) - except: - self.exception_occurred(conn.node, _('Object'), - _('Trying to write to %s') % req.path) - conns.remove(conn) - if len(conns) <= len(nodes) / 2: - self.app.logger.error( - _('Object PUT exceptions during send, ' - '%(conns)s/%(nodes)s required connections'), - {'conns': len(conns), - 'nodes': len(nodes) // 2 + 1}) - return HTTPServiceUnavailable(request=req) - if req.headers.get('transfer-encoding') and chunk == '': - break + req.bytes_transferred += len(chunk) + if req.bytes_transferred> MAX_FILE_SIZE: + return HTTPRequestEntityTooLarge(request=req) + for conn in list(conns): + if not conn.failed: + conn.queue.put('%x\r\n%s\r\n' % (len(chunk), chunk) + if chunked else chunk) + else: + conns.remove(conn) + if len(conns) <= len(nodes) / 2: + self.app.logger.error(_('Object PUT exceptions during' + ' send, %(conns)s/%(nodes)s required connections'), + {'conns': len(conns), 'nodes': len(nodes) / 2 + 1}) + return HTTPServiceUnavailable(request=req) + for conn in conns: + if conn.queue.unfinished_tasks: + conn.queue.join() + conns = [conn for conn in conns if not conn.failed] except ChunkReadTimeout, err: self.app.logger.info( _('ERROR Client read timeout (%ss)'), err.seconds) return HTTPRequestTimeout(request=req) - except: + except Exception: req.client_disconnect = True self.app.logger.exception( _('ERROR Exception causing client disconnect')) @@ -1404,6 +1416,7 @@ class BaseApplication(object): self.node_timeout = int(conf.get('node_timeout', 10)) self.conn_timeout = float(conf.get('conn_timeout', 0.5)) self.client_timeout = int(conf.get('client_timeout', 60)) + self.put_queue_depth = int(conf.get('put_queue_depth', 10)) self.object_chunk_size = int(conf.get('object_chunk_size', 65536)) self.client_chunk_size = int(conf.get('client_chunk_size', 65536)) self.log_headers = conf.get('log_headers') == 'True' From b210cde178c0cee3504689613eeaf1f4805fcfec Mon Sep 17 00:00:00 2001 From: Michael Barton Date: 2011年1月19日 12:25:29 +0000 Subject: [PATCH 03/26] bug fixes in concurrency --- swift/proxy/server.py | 41 ++++++++++++++++++++--------------------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 31a37c9d68..65326b0d34 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -494,25 +494,24 @@ class Controller(object): yield node def _make_request(self, node, part, method, path, headers, hl, query): - if self.error_limited(node): - return False - try: - with ConnectionTimeout(self.app.conn_timeout): - conn = http_connect(node['ip'], node['port'], - node['device'], part, method, path, - headers=headers, query_string=query) - conn.node = node - with Timeout(self.app.node_timeout): - resp = conn.getresponse() - if 200 <= resp.status < 300 or 400 <= resp.status < 500: - return resp.status, resp.reason, resp.read() - elif resp.status == 507: - self.error_limit(node) - except Exception: - self.error_limit(node) - self.exception_occurred(node, _('Object'), # TODO FIX LOGGIN' - _('Trying to %(method)s %(path)s') % - {'method': method, 'path': path}) + if not self.error_limited(node): + try: + with ConnectionTimeout(self.app.conn_timeout): + conn = http_connect(node['ip'], node['port'], + node['device'], part, method, path, + headers=headers, query_string=query) + conn.node = node + with Timeout(self.app.node_timeout): + resp = conn.getresponse() + if 200 <= resp.status < 300 or 400 <= resp.status < 500: + return resp.status, resp.reason, resp.read() + elif resp.status == 507: + self.error_limit(node) + except Exception: + self.error_limit(node) + self.exception_occurred(node, _('Object'), # TODO FIX LOGGIN' + _('Trying to %(method)s %(path)s') % + {'method': method, 'path': path}) hl.append(headers) def make_requests(self, req, ring, partition, method, path, headers, @@ -524,6 +523,7 @@ class Controller(object): """ nodes = ring.get_part_nodes(partition) pool = GreenPile(len(nodes)) + assert(len(headers) == len(nodes)) for node in nodes: pool.spawn(self._make_request, node, partition, method, path, headers.pop(), headers, query_string) @@ -1325,8 +1325,7 @@ class ContainerController(Controller): self.container_name) self.app.memcache.delete(cache_key) resp = self.make_requests(req, self.app.container_ring, - container_partition, 'PUT', req.path_info, - [headers] * len(containers)) + container_partition, 'DELETE', req.path_info, headers) if resp.status_int == 202: # Indicates no server had the container return HTTPNotFound(request=req) return resp From 89c318b2373a17390a86e8737fd3e5ce23b5473a Mon Sep 17 00:00:00 2001 From: Michael Barton Date: 2011年1月19日 13:55:07 +0000 Subject: [PATCH 04/26] make_requests refactor --- swift/proxy/server.py | 31 ++++++++++++------------------- 1 file changed, 12 insertions(+), 19 deletions(-) diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 65326b0d34..414a9c22cf 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -493,8 +493,10 @@ class Controller(object): for node in ring.get_more_nodes(partition): yield node - def _make_request(self, node, part, method, path, headers, hl, query): - if not self.error_limited(node): + def _make_request(self, nodes, part, method, path, headers, query): + for node in nodes: + if self.error_limited(node): + continue try: with ConnectionTimeout(self.app.conn_timeout): conn = http_connect(node['ip'], node['port'], @@ -512,31 +514,22 @@ class Controller(object): self.exception_occurred(node, _('Object'), # TODO FIX LOGGIN' _('Trying to %(method)s %(path)s') % {'method': method, 'path': path}) - hl.append(headers) - def make_requests(self, req, ring, partition, method, path, headers, + def make_requests(self, req, ring, part, method, path, headers, query_string=''): """ Sends an HTTP request to multiple nodes and aggregates the results. It attempts the primary nodes concurrently, then iterates over the handoff nodes as needed. """ - nodes = ring.get_part_nodes(partition) - pool = GreenPile(len(nodes)) - assert(len(headers) == len(nodes)) - for node in nodes: - pool.spawn(self._make_request, node, partition, method, path, - headers.pop(), headers, query_string) + nodes = self.iter_nodes(part, ring.get_part_nodes(part), ring) + pool = GreenPile(ring.replica_count) + for head in headers: + pool.spawn(self._make_request, nodes, part, method, path, + head, query_string) response = [resp for resp in pool if resp] - more_nodes = ring.get_more_nodes(partition) - while len(response) < len(nodes): - try: - resp = self._make_request(next(more_nodes), partition, method, - path, headers.pop(), headers, query_string) - if resp: - response.append(resp) - except StopIteration: - response.append((503, '', '')) + while len(response) < ring.replica_count: + response.append((503, '', '')) statuses, reasons, bodies = zip(*response) return self.best_response(req, statuses, reasons, bodies, _('Container POST')) # TODO fix loggin' From 0ebfa97c58c6592bcce5b80cc451bb34a1f744b8 Mon Sep 17 00:00:00 2001 From: Michael Barton Date: 2011年1月19日 14:08:41 +0000 Subject: [PATCH 05/26] error_limited refactor --- swift/proxy/server.py | 72 +++++++++---------------------------------- 1 file changed, 15 insertions(+), 57 deletions(-) diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 414a9c22cf..3a3fb00f9d 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -358,8 +358,6 @@ class Controller(object): path = '/%s' % account headers = {'x-cf-trans-id': self.trans_id} for node in self.iter_nodes(partition, nodes, self.app.account_ring): - if self.error_limited(node): - continue try: with ConnectionTimeout(self.app.conn_timeout): conn = http_connect(node['ip'], node['port'], @@ -432,8 +430,6 @@ class Controller(object): attempts_left = self.app.container_ring.replica_count headers = {'x-cf-trans-id': self.trans_id} for node in self.iter_nodes(partition, nodes, self.app.container_ring): - if self.error_limited(node): - continue try: with ConnectionTimeout(self.app.conn_timeout): conn = http_connect(node['ip'], node['port'], @@ -489,14 +485,14 @@ class Controller(object): :param ring: ring to get handoff nodes from """ for node in nodes: - yield node + if not self.error_limited(node): + yield node for node in ring.get_more_nodes(partition): - yield node + if not self.error_limited(node): + yield node def _make_request(self, nodes, part, method, path, headers, query): for node in nodes: - if self.error_limited(node): - continue try: with ConnectionTimeout(self.app.conn_timeout): conn = http_connect(node['ip'], node['port'], @@ -534,33 +530,6 @@ class Controller(object): return self.best_response(req, statuses, reasons, bodies, _('Container POST')) # TODO fix loggin' - def get_update_nodes(self, partition, nodes, ring): - """ Returns ring.replica_count nodes; the nodes will not be error - limited, if possible. """ - """ - Attempt to get a non error limited list of nodes. - - :param partition: partition for the nodes - :param nodes: list of node dicts for the partition - :param ring: ring to get handoff nodes from - :returns: list of node dicts that are not error limited (if possible) - """ - - # make a copy so we don't modify caller's list - nodes = list(nodes) - update_nodes = [] - for node in self.iter_nodes(partition, nodes, ring): - if self.error_limited(node): - continue - update_nodes.append(node) - if len(update_nodes)>= ring.replica_count: - break - while len(update_nodes) < ring.replica_count: - node = nodes.pop() - if node not in update_nodes: - update_nodes.append(node) - return update_nodes - def best_response(self, req, statuses, reasons, bodies, server_type, etag=None): """ @@ -882,8 +851,6 @@ class ObjectController(Controller): return aresp if not containers: return HTTPNotFound(request=req) - containers = self.get_update_nodes(container_partition, containers, - self.app.container_ring) partition, nodes = self.app.object_ring.get_nodes( self.account_name, self.container_name, self.object_name) req.headers['X-Timestamp'] = normalize_timestamp(time.time()) @@ -922,8 +889,6 @@ class ObjectController(Controller): return aresp if not containers: return HTTPNotFound(request=req) - containers = self.get_update_nodes(container_partition, containers, - self.app.container_ring) partition, nodes = self.app.object_ring.get_nodes( self.account_name, self.container_name, self.object_name) req.headers['X-Timestamp'] = normalize_timestamp(time.time()) @@ -997,18 +962,17 @@ class ObjectController(Controller): req.headers['X-Container-Device'] = container['device'] req.headers['Expect'] = '100-continue' resp = conn = None - if not self.error_limited(node): - try: - with ConnectionTimeout(self.app.conn_timeout): - conn = http_connect(node['ip'], node['port'], - node['device'], partition, 'PUT', - req.path_info, req.headers) - conn.node = node - with Timeout(self.app.node_timeout): - resp = conn.getexpect() - except: - self.exception_occurred(node, _('Object'), - _('Expect: 100-continue on %s') % req.path) + try: + with ConnectionTimeout(self.app.conn_timeout): + conn = http_connect(node['ip'], node['port'], + node['device'], partition, 'PUT', + req.path_info, req.headers) + conn.node = node + with Timeout(self.app.node_timeout): + resp = conn.getexpect() + except: + self.exception_occurred(node, _('Object'), + _('Expect: 100-continue on %s') % req.path) if conn and resp: if resp.status == 100: conns.append(conn) @@ -1127,8 +1091,6 @@ class ObjectController(Controller): return aresp if not containers: return HTTPNotFound(request=req) - containers = self.get_update_nodes(container_partition, containers, - self.app.container_ring) partition, nodes = self.app.object_ring.get_nodes( self.account_name, self.container_name, self.object_name) req.headers['X-Timestamp'] = normalize_timestamp(time.time()) @@ -1249,8 +1211,6 @@ class ContainerController(Controller): account_partition, accounts = self.account_info(self.account_name) if not accounts: return HTTPNotFound(request=req) - accounts = self.get_update_nodes(account_partition, accounts, - self.app.account_ring) container_partition, containers = self.app.container_ring.get_nodes( self.account_name, self.container_name) headers = [] @@ -1302,8 +1262,6 @@ class ContainerController(Controller): account_partition, accounts = self.account_info(self.account_name) if not accounts: return HTTPNotFound(request=req) - accounts = self.get_update_nodes(account_partition, accounts, - self.app.account_ring) container_partition, containers = self.app.container_ring.get_nodes( self.account_name, self.container_name) headers = [] From f68f199ae5803088e2e8a73cbaa3190d2e4a0beb Mon Sep 17 00:00:00 2001 From: Michael Barton Date: 2011年1月19日 14:22:32 +0000 Subject: [PATCH 06/26] concurrent object PUT connections --- swift/proxy/server.py | 57 +++++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 27 deletions(-) diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 3a3fb00f9d..12c11979a3 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -865,6 +865,7 @@ class ObjectController(Controller): partition, 'POST', req.path_info, headers) def _send_file(self, conn, path): + """Method for a file PUT coro""" while True: chunk = conn.queue.get() if not conn.failed: @@ -877,6 +878,24 @@ class ObjectController(Controller): _('Trying to write to %s') % path) conn.queue.task_done() + def _connect_put_node(self, nodes, part, path, headers): + """Method for a file PUT connect""" + for node in nodes: + try: + with ConnectionTimeout(self.app.conn_timeout): + conn = http_connect(node['ip'], node['port'], + node['device'], part, 'PUT', path, headers) + with Timeout(self.app.node_timeout): + resp = conn.getexpect() + if resp.status == 100: + conn.node = node + return conn + elif resp.status == 507: + self.error_limit(node) + except: + self.exception_occurred(node, _('Object'), + _('Expect: 100-continue on %s') % path) + @public @delay_denial def PUT(self, req): @@ -955,33 +974,17 @@ class ObjectController(Controller): if k.lower().startswith('x-object-meta-'): new_req.headers[k] = v req = new_req - for node in self.iter_nodes(partition, nodes, self.app.object_ring): - container = containers.pop() - req.headers['X-Container-Host'] = '%(ip)s:%(port)s' % container - req.headers['X-Container-Partition'] = container_partition - req.headers['X-Container-Device'] = container['device'] - req.headers['Expect'] = '100-continue' - resp = conn = None - try: - with ConnectionTimeout(self.app.conn_timeout): - conn = http_connect(node['ip'], node['port'], - node['device'], partition, 'PUT', - req.path_info, req.headers) - conn.node = node - with Timeout(self.app.node_timeout): - resp = conn.getexpect() - except: - self.exception_occurred(node, _('Object'), - _('Expect: 100-continue on %s') % req.path) - if conn and resp: - if resp.status == 100: - conns.append(conn) - if not containers: - break - continue - elif resp.status == 507: - self.error_limit(node) - containers.insert(0, container) + node_iter = self.iter_nodes(partition, nodes, self.app.object_ring) + pile = GreenPile(len(nodes)) + for container in containers: + nheaders = dict(req.headers.iteritems()) + nheaders['X-Container-Host'] = '%(ip)s:%(port)s' % container + nheaders['X-Container-Partition'] = container_partition + nheaders['X-Container-Device'] = container['device'] + nheaders['Expect'] = '100-continue' + pile.spawn(self._connect_put_node, node_iter, partition, + req.path_info, nheaders) + conns = [conn for conn in pile if conn] if len(conns) <= len(nodes) / 2: self.app.logger.error( _('Object PUT returning 503, %(conns)s/%(nodes)s ' From 12839d06ade515f817a2d4e5428bff898477415b Mon Sep 17 00:00:00 2001 From: Michael Barton Date: 2011年1月31日 01:46:33 +0000 Subject: [PATCH 07/26] tweaks --- swift/container/server.py | 2 +- swift/proxy/server.py | 2 +- test/unit/proxy/test_server.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/swift/container/server.py b/swift/container/server.py index cfcdded1e4..fab93bf927 100644 --- a/swift/container/server.py +++ b/swift/container/server.py @@ -160,7 +160,7 @@ class ContainerController(object): return resp if existed: return HTTPNoContent(request=req) - return HTTPAccepted(request=req) + return HTTPNotFound() def PUT(self, req): """Handle HTTP PUT request.""" diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 6dd9bf7040..f34a982a50 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -502,7 +502,7 @@ class Controller(object): conn.node = node with Timeout(self.app.node_timeout): resp = conn.getresponse() - if 200 <= resp.status < 300 or 400 <= resp.status < 500: + if 200 <= resp.status < 500: return resp.status, resp.reason, resp.read() elif resp.status == 507: self.error_limit(node) diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index aa5509dbd8..fda3281ab9 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -2608,8 +2608,8 @@ class TestContainerController(unittest.TestCase): 'container') self.assert_status_map(controller.DELETE, (200, 204, 204, 204), 204) -# self.assert_status_map(controller.DELETE, -# (200, 204, 204, 503), 503) + self.assert_status_map(controller.DELETE, + (200, 204, 204, 503), 204) self.assert_status_map(controller.DELETE, (200, 204, 503, 503), 503) self.assert_status_map(controller.DELETE, From c0a2b77d452a499d2d5fc0b54f416e01dcd51a35 Mon Sep 17 00:00:00 2001 From: John Dickinson Date: Tue, 8 Feb 2011 14:46:11 -0600 Subject: [PATCH 08/26] fixed edge case when log processor cannot get a log file for processing --- swift/stats/log_processor.py | 48 +++---- test/unit/stats/test_log_processor.py | 182 ++++++++++++++++++++++++-- 2 files changed, 195 insertions(+), 35 deletions(-) diff --git a/swift/stats/log_processor.py b/swift/stats/log_processor.py index 5dbc92afbe..23cc64d598 100644 --- a/swift/stats/log_processor.py +++ b/swift/stats/log_processor.py @@ -159,11 +159,10 @@ class LogProcessor(object): def get_object_data(self, swift_account, container_name, object_name, compressed=False): '''reads an object and yields its lines''' - code, o = self.internal_proxy.get_object(swift_account, - container_name, - object_name) + code, o = self.internal_proxy.get_object(swift_account, container_name, + object_name) if code < 200 or code>= 300: - return + raise BadFileDownload() last_part = '' last_compressed_part = '' # magic in the following zlib.decompressobj argument is courtesy of @@ -273,7 +272,7 @@ class LogProcessorDaemon(Daemon): already_processed_files = cPickle.loads(buf) else: already_processed_files = set() - except Exception: + except BadFileDownload: already_processed_files = set() self.logger.debug(_('found %d processed files') % \ len(already_processed_files)) @@ -362,7 +361,11 @@ class LogProcessorDaemon(Daemon): def multiprocess_collate(processor_args, logs_to_process, worker_count): - '''yield hourly data from logs_to_process''' + ''' + yield hourly data from logs_to_process + Every item that this function yields will be added to the processed files + list. + ''' results = [] in_queue = multiprocessing.Queue() out_queue = multiprocessing.Queue() @@ -376,33 +379,30 @@ def multiprocess_collate(processor_args, logs_to_process, worker_count): for x in logs_to_process: in_queue.put(x) for _junk in range(worker_count): - in_queue.put(None) - count = 0 + in_queue.put(None) # tell the worker to end while True: try: item, data = out_queue.get_nowait() - count += 1 - if data: - yield item, data - if count>= len(logs_to_process): - # this implies that one result will come from every request - break except Queue.Empty: - time.sleep(.1) - for r in results: - r.join() + time.sleep(.01) + else: + if not isinstance(data, BadFileDownload): + yield item, data + if not any(r.is_alive() for r in results) and out_queue.empty(): + # all the workers are done and nothing is in the queue + break def collate_worker(processor_args, in_queue, out_queue): '''worker process for multiprocess_collate''' p = LogProcessor(*processor_args) while True: + item = in_queue.get() + if item is None: + # no more work to process + break try: - item = in_queue.get_nowait() - if item is None: - break - except Queue.Empty: - time.sleep(.1) - else: ret = p.process_one_file(*item) - out_queue.put((item, ret)) + except BadFileDownload, err: + ret = err + out_queue.put((item, ret)) diff --git a/test/unit/stats/test_log_processor.py b/test/unit/stats/test_log_processor.py index 75acc02123..1a355205a1 100644 --- a/test/unit/stats/test_log_processor.py +++ b/test/unit/stats/test_log_processor.py @@ -15,9 +15,11 @@ import unittest from test.unit import tmpfile +import Queue from swift.common import internal_proxy from swift.stats import log_processor +from swift.common.exceptions import ChunkReadTimeout class FakeUploadApp(object): @@ -33,6 +35,11 @@ class DumbLogger(object): pass class DumbInternalProxy(object): + def __init__(self, code=200, timeout=False, bad_compressed=False): + self.code = code + self.timeout = timeout + self.bad_compressed = bad_compressed + def get_container_list(self, account, container, marker=None, end_marker=None): n = '2010/03/14/13/obj1' @@ -46,22 +53,28 @@ class DumbInternalProxy(object): return [] def get_object(self, account, container, object_name): - code = 200 if object_name.endswith('.gz'): - # same data as below, compressed with gzip -9 - def data(): - yield '\x1f\x8b\x08' - yield '\x08"\xd79L' - yield '\x02\x03te' - yield 'st\x00\xcbO' - yield '\xca\xe2JI,I' - yield '\xe4\x02\x00O\xff' - yield '\xa3Y\t\x00\x00\x00' + if self.bad_compressed: + # invalid compressed data + def data(): + yield '\xff\xff\xff\xff\xff\xff\xff' + else: + # 'obj\ndata', compressed with gzip -9 + def data(): + yield '\x1f\x8b\x08' + yield '\x08"\xd79L' + yield '\x02\x03te' + yield 'st\x00\xcbO' + yield '\xca\xe2JI,I' + yield '\xe4\x02\x00O\xff' + yield '\xa3Y\t\x00\x00\x00' else: def data(): yield 'obj\n' + if self.timeout: + raise ChunkReadTimeout yield 'data' - return code, data() + return self.code, data() class TestLogProcessor(unittest.TestCase): @@ -159,6 +172,19 @@ use = egg:swift#proxy 'prefix_query': 0}} self.assertEquals(result, expected) + def test_process_one_access_file_error(self): + access_proxy_config = self.proxy_config.copy() + access_proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + p = log_processor.LogProcessor(access_proxy_config, DumbLogger()) + p._internal_proxy = DumbInternalProxy(code=500) + self.assertRaises(log_processor.BadFileDownload, p.process_one_file, + 'access', 'a', 'c', 'o') + def test_get_container_listing(self): p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) p._internal_proxy = DumbInternalProxy() @@ -193,6 +219,18 @@ use = egg:swift#proxy result = list(p.get_object_data('a', 'c', 'o.gz', True)) self.assertEquals(result, expected) + def test_get_object_data_errors(self): + p = log_processor.LogProcessor(self.proxy_config, DumbLogger()) + p._internal_proxy = DumbInternalProxy(code=500) + result = p.get_object_data('a', 'c', 'o') + self.assertRaises(log_processor.BadFileDownload, list, result) + p._internal_proxy = DumbInternalProxy(bad_compressed=True) + result = p.get_object_data('a', 'c', 'o.gz', True) + self.assertRaises(log_processor.BadFileDownload, list, result) + p._internal_proxy = DumbInternalProxy(timeout=True) + result = p.get_object_data('a', 'c', 'o') + self.assertRaises(log_processor.BadFileDownload, list, result) + def test_get_stat_totals(self): stats_proxy_config = self.proxy_config.copy() stats_proxy_config.update({ @@ -262,3 +300,125 @@ use = egg:swift#proxy # these only work for Py2.7+ #self.assertIsInstance(k, str) self.assertTrue(isinstance(k, str), type(k)) + + def test_collate_worker(self): + try: + log_processor.LogProcessor._internal_proxy = DumbInternalProxy() + def get_object_data(*a,**kw): + return [self.access_test_line] + orig_get_object_data = log_processor.LogProcessor.get_object_data + log_processor.LogProcessor.get_object_data = get_object_data + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + processor_args = (proxy_config, DumbLogger()) + q_in = Queue.Queue() + q_out = Queue.Queue() + work_request = ('access', 'a','c','o') + q_in.put(work_request) + q_in.put(None) + log_processor.collate_worker(processor_args, q_in, q_out) + item, ret = q_out.get() + self.assertEquals(item, work_request) + expected = {('acct', '2010', '07', '09', '04'): + {('public', 'object', 'GET', '2xx'): 1, + ('public', 'bytes_out'): 95, + 'marker_query': 0, + 'format_query': 1, + 'delimiter_query': 0, + 'path_query': 0, + ('public', 'bytes_in'): 6, + 'prefix_query': 0}} + self.assertEquals(ret, expected) + finally: + log_processor.LogProcessor._internal_proxy = None + log_processor.LogProcessor.get_object_data = orig_get_object_data + + def test_collate_worker_error(self): + try: + log_processor.LogProcessor._internal_proxy = \ + DumbInternalProxy(code=500) + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + processor_args = (proxy_config, DumbLogger()) + q_in = Queue.Queue() + q_out = Queue.Queue() + work_request = ('access', 'a','c','o') + q_in.put(work_request) + q_in.put(None) + log_processor.collate_worker(processor_args, q_in, q_out) + item, ret = q_out.get() + self.assertEquals(item, work_request) + # these only work for Py2.7+ + #self.assertIsInstance(ret, log_processor.BadFileDownload) + self.assertTrue(isinstance(ret, log_processor.BadFileDownload), + type(ret)) + finally: + log_processor.LogProcessor._internal_proxy = None + + def test_multiprocess_collate(self): + try: + log_processor.LogProcessor._internal_proxy = DumbInternalProxy() + def get_object_data(*a,**kw): + return [self.access_test_line] + orig_get_object_data = log_processor.LogProcessor.get_object_data + log_processor.LogProcessor.get_object_data = get_object_data + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + processor_args = (proxy_config, DumbLogger()) + item = ('access', 'a','c','o') + logs_to_process = [item] + results = log_processor.multiprocess_collate(processor_args, + logs_to_process, + 1) + results = list(results) + expected = [(item, {('acct', '2010', '07', '09', '04'): + {('public', 'object', 'GET', '2xx'): 1, + ('public', 'bytes_out'): 95, + 'marker_query': 0, + 'format_query': 1, + 'delimiter_query': 0, + 'path_query': 0, + ('public', 'bytes_in'): 6, + 'prefix_query': 0}})] + self.assertEquals(results, expected) + finally: + log_processor.LogProcessor._internal_proxy = None + log_processor.LogProcessor.get_object_data = orig_get_object_data + + def test_multiprocess_collate_errors(self): + try: + log_processor.LogProcessor._internal_proxy = \ + DumbInternalProxy(code=500) + proxy_config = self.proxy_config.copy() + proxy_config.update({ + 'log-processor-access': { + 'source_filename_format':'%Y%m%d%H*', + 'class_path': + 'swift.stats.access_processor.AccessLogProcessor' + }}) + processor_args = (proxy_config, DumbLogger()) + item = ('access', 'a','c','o') + logs_to_process = [item] + results = log_processor.multiprocess_collate(processor_args, + logs_to_process, + 1) + results = list(results) + expected = [] + self.assertEquals(results, expected) + finally: + log_processor.LogProcessor._internal_proxy = None From bd1e2a0daf0fcb1230fda56b746ecc26dbd3100d Mon Sep 17 00:00:00 2001 From: David Goetz Date: Wed, 9 Feb 2011 21:36:14 +0000 Subject: [PATCH 09/26] do not return devs with weight zero in get_more_nodes --- swift/common/ring/ring.py | 10 +++- test/unit/common/ring/test_ring.py | 83 +++++++++++++++++++++--------- 2 files changed, 67 insertions(+), 26 deletions(-) diff --git a/swift/common/ring/ring.py b/swift/common/ring/ring.py index 45ab407563..381764a64c 100644 --- a/swift/common/ring/ring.py +++ b/swift/common/ring/ring.py @@ -139,4 +139,12 @@ class Ring(object): zones.remove(self.devs[part2dev_id[part]]['zone']) while zones: zone = zones.pop(part % len(zones)) - yield self.zone2devs[zone][part % len(self.zone2devs[zone])] + weighted_node = None + for i in xrange(len(self.zone2devs[zone])): + node = self.zone2devs[zone][(part + i) % + len(self.zone2devs[zone])] + if node.get('weight'): + weighted_node = node + break + if weighted_node: + yield weighted_node diff --git a/test/unit/common/ring/test_ring.py b/test/unit/common/ring/test_ring.py index ad72a4c990..1b20480e00 100644 --- a/test/unit/common/ring/test_ring.py +++ b/test/unit/common/ring/test_ring.py @@ -50,7 +50,8 @@ class TestRing(unittest.TestCase): os.mkdir(self.testdir) self.testgz = os.path.join(self.testdir, 'ring.gz') self.intended_replica2part2dev_id = [[0, 2, 0, 2], [2, 0, 2, 0]] - self.intended_devs = [{'id': 0, 'zone': 0}, None, {'id': 2, 'zone': 2}] + self.intended_devs = [{'id': 0, 'zone': 0, 'weight': 1.0}, None, + {'id': 2, 'zone': 2, 'weight': 1.0}] self.intended_part_shift = 30 self.intended_reload_time = 15 pickle.dump(ring.RingData(self.intended_replica2part2dev_id, @@ -72,7 +73,7 @@ class TestRing(unittest.TestCase): def test_has_changed(self): self.assertEquals(self.ring.has_changed(), False) - os.utime(self.testgz, (time()+60, time()+60)) + os.utime(self.testgz, (time() + 60, time() + 60)) self.assertEquals(self.ring.has_changed(), True) def test_reload(self): @@ -80,7 +81,7 @@ class TestRing(unittest.TestCase): self.ring = ring.Ring(self.testgz, reload_time=0.001) orig_mtime = self.ring._mtime self.assertEquals(len(self.ring.devs), 3) - self.intended_devs.append({'id': 3, 'zone': 3}) + self.intended_devs.append({'id': 3, 'zone': 3, 'weight': 1.0}) pickle.dump(ring.RingData(self.intended_replica2part2dev_id, self.intended_devs, self.intended_part_shift), GzipFile(self.testgz, 'wb')) @@ -93,7 +94,7 @@ class TestRing(unittest.TestCase): self.ring = ring.Ring(self.testgz, reload_time=0.001) orig_mtime = self.ring._mtime self.assertEquals(len(self.ring.devs), 4) - self.intended_devs.append({'id': 4, 'zone': 4}) + self.intended_devs.append({'id': 4, 'zone': 4, 'weight': 1.0}) pickle.dump(ring.RingData(self.intended_replica2part2dev_id, self.intended_devs, self.intended_part_shift), GzipFile(self.testgz, 'wb')) @@ -108,7 +109,7 @@ class TestRing(unittest.TestCase): orig_mtime = self.ring._mtime part, nodes = self.ring.get_nodes('a') self.assertEquals(len(self.ring.devs), 5) - self.intended_devs.append({'id': 5, 'zone': 5}) + self.intended_devs.append({'id': 5, 'zone': 5, 'weight': 1.0}) pickle.dump(ring.RingData(self.intended_replica2part2dev_id, self.intended_devs, self.intended_part_shift), GzipFile(self.testgz, 'wb')) @@ -127,57 +128,71 @@ class TestRing(unittest.TestCase): self.assertRaises(TypeError, self.ring.get_nodes) part, nodes = self.ring.get_nodes('a') self.assertEquals(part, 0) - self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}]) + self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0}, + {'id': 2, 'zone': 2, 'weight': 1.0}]) part, nodes = self.ring.get_nodes('a1') self.assertEquals(part, 0) - self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}]) + self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0}, + {'id': 2, 'zone': 2, 'weight': 1.0}]) part, nodes = self.ring.get_nodes('a4') self.assertEquals(part, 1) - self.assertEquals(nodes, [{'id': 2, 'zone': 2}, {'id': 0, 'zone': 0}]) + self.assertEquals(nodes, [{'id': 2, 'zone': 2, 'weight': 1.0}, + {'id': 0, 'zone': 0, 'weight': 1.0}]) part, nodes = self.ring.get_nodes('aa') self.assertEquals(part, 1) - self.assertEquals(nodes, [{'id': 2, 'zone': 2}, {'id': 0, 'zone': 0}]) + self.assertEquals(nodes, [{'id': 2, 'zone': 2, 'weight': 1.0}, + {'id': 0, 'zone': 0, 'weight': 1.0}]) part, nodes = self.ring.get_nodes('a', 'c1') self.assertEquals(part, 0) - self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}]) + self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0}, + {'id': 2, 'zone': 2, 'weight': 1.0}]) part, nodes = self.ring.get_nodes('a', 'c0') self.assertEquals(part, 3) - self.assertEquals(nodes, [{'id': 2, 'zone': 2}, {'id': 0, 'zone': 0}]) + self.assertEquals(nodes, [{'id': 2, 'zone': 2, 'weight': 1.0}, + {'id': 0, 'zone': 0, 'weight': 1.0}]) part, nodes = self.ring.get_nodes('a', 'c3') self.assertEquals(part, 2) - self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}]) + self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0}, + {'id': 2, 'zone': 2, 'weight': 1.0}]) part, nodes = self.ring.get_nodes('a', 'c2') self.assertEquals(part, 2) - self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}]) + self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0}, + {'id': 2, 'zone': 2, 'weight': 1.0}]) part, nodes = self.ring.get_nodes('a', 'c', 'o1') self.assertEquals(part, 1) - self.assertEquals(nodes, [{'id': 2, 'zone': 2}, {'id': 0, 'zone': 0}]) + self.assertEquals(nodes, [{'id': 2, 'zone': 2, 'weight': 1.0}, + {'id': 0, 'zone': 0, 'weight': 1.0}]) part, nodes = self.ring.get_nodes('a', 'c', 'o5') self.assertEquals(part, 0) - self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}]) + self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0}, + {'id': 2, 'zone': 2, 'weight': 1.0}]) part, nodes = self.ring.get_nodes('a', 'c', 'o0') self.assertEquals(part, 0) - self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}]) + self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0}, + {'id': 2, 'zone': 2, 'weight': 1.0}]) part, nodes = self.ring.get_nodes('a', 'c', 'o2') self.assertEquals(part, 2) - self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}]) + self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0}, + {'id': 2, 'zone': 2, 'weight': 1.0}]) def test_get_more_nodes(self): # Yes, these tests are deliberately very fragile. We want to make sure # that if someone changes the results the ring produces, they know it. part, nodes = self.ring.get_nodes('a', 'c', 'o2') self.assertEquals(part, 2) - self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}]) + self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0}, + {'id': 2, 'zone': 2, 'weight': 1.0}]) nodes = list(self.ring.get_more_nodes(part)) self.assertEquals(nodes, []) - self.ring.devs.append({'id': 3, 'zone': 0}) + self.ring.devs.append({'id': 3, 'zone': 0, 'weight': 1.0}) self.ring.zone2devs[0].append(self.ring.devs[3]) part, nodes = self.ring.get_nodes('a', 'c', 'o2') self.assertEquals(part, 2) - self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}]) + self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0}, + {'id': 2, 'zone': 2, 'weight': 1.0}]) nodes = list(self.ring.get_more_nodes(part)) self.assertEquals(nodes, []) @@ -186,18 +201,36 @@ class TestRing(unittest.TestCase): self.ring.zone2devs[3] = [self.ring.devs[3]] part, nodes = self.ring.get_nodes('a', 'c', 'o2') self.assertEquals(part, 2) - self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}]) + self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0}, + {'id': 2, 'zone': 2, 'weight': 1.0}]) nodes = list(self.ring.get_more_nodes(part)) - self.assertEquals(nodes, [{'id': 3, 'zone': 3}]) + self.assertEquals(nodes, [{'id': 3, 'zone': 3, 'weight': 1.0}]) self.ring.devs.append(None) - self.ring.devs.append({'id': 5, 'zone': 5}) + self.ring.devs.append({'id': 5, 'zone': 5, 'weight': 1.0}) self.ring.zone2devs[5] = [self.ring.devs[5]] part, nodes = self.ring.get_nodes('a', 'c', 'o2') self.assertEquals(part, 2) - self.assertEquals(nodes, [{'id': 0, 'zone': 0}, {'id': 2, 'zone': 2}]) + self.assertEquals(nodes, [{'id': 0, 'zone': 0, 'weight': 1.0}, + {'id': 2, 'zone': 2, 'weight': 1.0}]) nodes = list(self.ring.get_more_nodes(part)) - self.assertEquals(nodes, [{'id': 3, 'zone': 3}, {'id': 5, 'zone': 5}]) + self.assertEquals(nodes, [{'id': 3, 'zone': 3, 'weight': 1.0}, + {'id': 5, 'zone': 5, 'weight': 1.0}]) + + self.ring.devs.append({'id': 6, 'zone': 5, 'weight': 1.0}) + self.ring.zone2devs[5].append(self.ring.devs[6]) + nodes = list(self.ring.get_more_nodes(part)) + self.assertEquals(nodes, [{'id': 3, 'zone': 3, 'weight': 1.0}, + {'id': 5, 'zone': 5, 'weight': 1.0}]) + self.ring.devs[5]['weight'] = 0 + nodes = list(self.ring.get_more_nodes(part)) + self.assertEquals(nodes, [{'id': 3, 'zone': 3, 'weight': 1.0}, + {'id': 6, 'zone': 5, 'weight': 1.0}]) + self.ring.devs[3]['weight'] = 0 + self.ring.devs.append({'id': 7, 'zone': 6, 'weight': 0.0}) + self.ring.zone2devs[6] = [self.ring.devs[7]] + nodes = list(self.ring.get_more_nodes(part)) + self.assertEquals(nodes, [{'id': 6, 'zone': 5, 'weight': 1.0}]) if __name__ == '__main__': From f57e69a6cca490879256b64b82206809b3ad14ca Mon Sep 17 00:00:00 2001 From: Clay Gerrard Date: 2011年2月16日 16:07:24 -0600 Subject: [PATCH 10/26] fix log_processor tests --- test/unit/stats/test_log_processor.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/test/unit/stats/test_log_processor.py b/test/unit/stats/test_log_processor.py index 1a355205a1..d0d0816f4e 100644 --- a/test/unit/stats/test_log_processor.py +++ b/test/unit/stats/test_log_processor.py @@ -339,9 +339,11 @@ use = egg:swift#proxy log_processor.LogProcessor.get_object_data = orig_get_object_data def test_collate_worker_error(self): + def get_object_data(*a,**kw): + raise log_processor.BadFileDownload() + orig_get_object_data = log_processor.LogProcessor.get_object_data try: - log_processor.LogProcessor._internal_proxy = \ - DumbInternalProxy(code=500) + log_processor.LogProcessor.get_object_data = get_object_data proxy_config = self.proxy_config.copy() proxy_config.update({ 'log-processor-access': { @@ -363,7 +365,7 @@ use = egg:swift#proxy self.assertTrue(isinstance(ret, log_processor.BadFileDownload), type(ret)) finally: - log_processor.LogProcessor._internal_proxy = None + log_processor.LogProcessor.get_object_data = orig_get_object_data def test_multiprocess_collate(self): try: @@ -401,9 +403,11 @@ use = egg:swift#proxy log_processor.LogProcessor.get_object_data = orig_get_object_data def test_multiprocess_collate_errors(self): + def get_object_data(*a,**kw): + raise log_processor.BadFileDownload() + orig_get_object_data = log_processor.LogProcessor.get_object_data try: - log_processor.LogProcessor._internal_proxy = \ - DumbInternalProxy(code=500) + log_processor.LogProcessor.get_object_data = get_object_data proxy_config = self.proxy_config.copy() proxy_config.update({ 'log-processor-access': { From ed69db162aa9daa84cc3c86aec0af0c3f840acf6 Mon Sep 17 00:00:00 2001 From: gholt Date: 2011年2月17日 09:30:41 -0800 Subject: [PATCH 11/26] Revert wal+index --- swift/account/server.py | 7 + swift/common/db.py | 375 ++++++++++++++++++++++------------ swift/common/db_replicator.py | 6 +- swift/container/server.py | 4 + 4 files changed, 258 insertions(+), 134 deletions(-) diff --git a/swift/account/server.py b/swift/account/server.py index 79b840b501..f15ac38c11 100644 --- a/swift/account/server.py +++ b/swift/account/server.py @@ -86,6 +86,8 @@ class AccountController(object): return Response(status='507 %s is not mounted' % drive) broker = self._get_account_broker(drive, part, account) if container: # put account container + if 'x-cf-trans-id' in req.headers: + broker.pending_timeout = 3 if req.headers.get('x-account-override-deleted', 'no').lower() != \ 'yes' and broker.is_deleted(): return HTTPNotFound(request=req) @@ -138,6 +140,9 @@ class AccountController(object): if self.mount_check and not check_mount(self.root, drive): return Response(status='507 %s is not mounted' % drive) broker = self._get_account_broker(drive, part, account) + if not container: + broker.pending_timeout = 0.1 + broker.stale_reads_ok = True if broker.is_deleted(): return HTTPNotFound(request=req) info = broker.get_info() @@ -166,6 +171,8 @@ class AccountController(object): if self.mount_check and not check_mount(self.root, drive): return Response(status='507 %s is not mounted' % drive) broker = self._get_account_broker(drive, part, account) + broker.pending_timeout = 0.1 + broker.stale_reads_ok = True if broker.is_deleted(): return HTTPNotFound(request=req) info = broker.get_info() diff --git a/swift/common/db.py b/swift/common/db.py index 9f322e7b7d..be96411619 100644 --- a/swift/common/db.py +++ b/swift/common/db.py @@ -27,14 +27,13 @@ import cPickle as pickle import errno from random import randint from tempfile import mkstemp -import traceback from eventlet import sleep import simplejson as json import sqlite3 from swift.common.utils import normalize_timestamp, renamer, \ - mkdirs, lock_parent_directory + mkdirs, lock_parent_directory, fallocate from swift.common.exceptions import LockTimeout @@ -42,9 +41,8 @@ from swift.common.exceptions import LockTimeout BROKER_TIMEOUT = 25 #: Pickle protocol to use PICKLE_PROTOCOL = 2 -CONNECT_ATTEMPTS = 4 -PENDING_COMMIT_TIMEOUT = 900 -AUTOCHECKPOINT = 8192 +#: Max number of pending entries +PENDING_CAP = 131072 class DatabaseConnectionError(sqlite3.DatabaseError): @@ -125,48 +123,48 @@ def get_db_connection(path, timeout=30, okay_to_create=False): :param okay_to_create: if True, create the DB if it doesn't exist :returns: DB connection object """ - # retry logic to address: - # http://www.mail-archive.com/sqlite-users@sqlite.org/msg57092.html - for attempt in xrange(CONNECT_ATTEMPTS): - try: - connect_time = time.time() - conn = sqlite3.connect(path, check_same_thread=False, - factory=GreenDBConnection, timeout=timeout) + try: + connect_time = time.time() + conn = sqlite3.connect(path, check_same_thread=False, + factory=GreenDBConnection, timeout=timeout) + if path != ':memory:' and not okay_to_create: # attempt to detect and fail when connect creates the db file - if path != ':memory:' and not okay_to_create: - stat = os.stat(path) - if stat.st_size == 0 and stat.st_ctime>= connect_time: - os.unlink(path) - raise DatabaseConnectionError(path, - 'DB file created by connect?') - conn.execute('PRAGMA journal_mode = WAL') - conn.execute('PRAGMA synchronous = NORMAL') - conn.execute('PRAGMA wal_autocheckpoint = %s' % AUTOCHECKPOINT) - conn.execute('PRAGMA count_changes = OFF') - conn.execute('PRAGMA temp_store = MEMORY') - conn.create_function('chexor', 3, chexor) - conn.row_factory = sqlite3.Row - conn.text_factory = str - return conn - except sqlite3.DatabaseError, e: - errstr = traceback.format_exc() - raise DatabaseConnectionError(path, errstr, timeout=timeout) + stat = os.stat(path) + if stat.st_size == 0 and stat.st_ctime>= connect_time: + os.unlink(path) + raise DatabaseConnectionError(path, + 'DB file created by connect?') + conn.row_factory = sqlite3.Row + conn.text_factory = str + conn.execute('PRAGMA synchronous = NORMAL') + conn.execute('PRAGMA count_changes = OFF') + conn.execute('PRAGMA temp_store = MEMORY') + conn.execute('PRAGMA journal_mode = DELETE') + conn.create_function('chexor', 3, chexor) + except sqlite3.DatabaseError: + import traceback + raise DatabaseConnectionError(path, traceback.format_exc(), + timeout=timeout) + return conn class DatabaseBroker(object): """Encapsulates working with a database.""" def __init__(self, db_file, timeout=BROKER_TIMEOUT, logger=None, - account=None, container=None): + account=None, container=None, pending_timeout=10, + stale_reads_ok=False): """ Encapsulates working with a database. """ self.conn = None self.db_file = db_file + self.pending_file = self.db_file + '.pending' + self.pending_timeout = pending_timeout + self.stale_reads_ok = stale_reads_ok self.db_dir = os.path.dirname(db_file) self.timeout = timeout self.logger = logger or logging.getLogger() self.account = account self.container = container - self._db_version = -1 def initialize(self, put_timestamp=None): """ @@ -235,7 +233,7 @@ class DatabaseBroker(object): conn.close() with open(tmp_db_file, 'r+b') as fp: os.fsync(fp.fileno()) - with lock_parent_directory(self.db_file, self.timeout): + with lock_parent_directory(self.db_file, self.pending_timeout): if os.path.exists(self.db_file): # It's as if there was a "condition" where different parts # of the system were "racing" each other. @@ -287,7 +285,6 @@ class DatabaseBroker(object): self.conn = None orig_isolation_level = conn.isolation_level conn.isolation_level = None - conn.execute('PRAGMA journal_mode = DELETE') # remove journal files conn.execute('BEGIN IMMEDIATE') try: yield True @@ -295,7 +292,6 @@ class DatabaseBroker(object): pass try: conn.execute('ROLLBACK') - conn.execute('PRAGMA journal_mode = WAL') # back to WAL mode conn.isolation_level = orig_isolation_level self.conn = conn except Exception: @@ -352,6 +348,11 @@ class DatabaseBroker(object): :param count: number to get :returns: list of objects between start and end """ + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise with self.get() as conn: curs = conn.execute(''' SELECT * FROM %s WHERE ROWID> ? ORDER BY ROWID ASC LIMIT ? @@ -400,7 +401,11 @@ class DatabaseBroker(object): :returns: dict containing keys: hash, id, created_at, put_timestamp, delete_timestamp, count, max_row, and metadata """ - self._commit_puts() + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise query_part1 = ''' SELECT hash, id, created_at, put_timestamp, delete_timestamp, %s_count AS count, @@ -450,6 +455,34 @@ class DatabaseBroker(object): (rec['sync_point'], rec['remote_id'])) conn.commit() + def _preallocate(self): + """ + The idea is to allocate space in front of an expanding db. If it gets + within 512k of a boundary, it allocates to the next boundary. + Boundaries are 2m, 5m, 10m, 25m, 50m, then every 50m after. + """ + if self.db_file == ':memory:': + return + MB = (1024 * 1024) + + def prealloc_points(): + for pm in (1, 2, 5, 10, 25, 50): + yield pm * MB + while True: + pm += 50 + yield pm * MB + + stat = os.stat(self.db_file) + file_size = stat.st_size + allocated_size = stat.st_blocks * 512 + for point in prealloc_points(): + if file_size <= point - MB / 2: + prealloc_size = point + break + if allocated_size < prealloc_size: + with open(self.db_file, 'rb+') as fp: + fallocate(fp.fileno(), int(prealloc_size)) + @property def metadata(self): """ @@ -574,7 +607,7 @@ class ContainerBroker(DatabaseBroker): conn.executescript(""" CREATE TABLE object ( ROWID INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT, + name TEXT UNIQUE, created_at TEXT, size INTEGER, content_type TEXT, @@ -582,7 +615,7 @@ class ContainerBroker(DatabaseBroker): deleted INTEGER DEFAULT 0 ); - CREATE INDEX ix_object_deleted_name ON object (deleted, name); + CREATE INDEX ix_object_deleted ON object (deleted); CREATE TRIGGER object_insert AFTER INSERT ON object BEGIN @@ -645,15 +678,6 @@ class ContainerBroker(DatabaseBroker): ''', (self.account, self.container, normalize_timestamp(time.time()), str(uuid4()), put_timestamp)) - def _get_db_version(self, conn): - if self._db_version == -1: - self._db_version = 0 - for row in conn.execute(''' - SELECT name FROM sqlite_master - WHERE name = 'ix_object_deleted_name' '''): - self._db_version = 1 - return self._db_version - def _newid(self, conn): conn.execute(''' UPDATE container_stat @@ -693,6 +717,11 @@ class ContainerBroker(DatabaseBroker): :returns: True if the database has no active objects, False otherwise """ + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise with self.get() as conn: row = conn.execute( 'SELECT object_count from container_stat').fetchone() @@ -700,16 +729,17 @@ class ContainerBroker(DatabaseBroker): def _commit_puts(self, item_list=None): """Handles commiting rows in .pending files.""" - pending_file = self.db_file + '.pending' - if self.db_file == ':memory:' or not os.path.exists(pending_file): - return - if not os.path.getsize(pending_file): - os.unlink(pending_file) + if self.db_file == ':memory:' or not os.path.exists(self.pending_file): return if item_list is None: item_list = [] - with lock_parent_directory(pending_file, PENDING_COMMIT_TIMEOUT): - with open(pending_file, 'r+b') as fp: + with lock_parent_directory(self.pending_file, self.pending_timeout): + self._preallocate() + if not os.path.getsize(self.pending_file): + if item_list: + self.merge_items(item_list) + return + with open(self.pending_file, 'r+b') as fp: for entry in fp.read().split(':'): if entry: try: @@ -722,11 +752,11 @@ class ContainerBroker(DatabaseBroker): except Exception: self.logger.exception( _('Invalid pending entry %(file)s: %(entry)s'), - {'file': pending_file, 'entry': entry}) + {'file': self.pending_file, 'entry': entry}) if item_list: self.merge_items(item_list) try: - os.unlink(pending_file) + os.ftruncate(fp.fileno(), 0) except OSError, err: if err.errno != errno.ENOENT: raise @@ -744,6 +774,7 @@ class ContainerBroker(DatabaseBroker): delete :param sync_timestamp: max update_at timestamp of sync rows to delete """ + self._commit_puts() with self.get() as conn: conn.execute(""" DELETE FROM object @@ -787,9 +818,30 @@ class ContainerBroker(DatabaseBroker): record = {'name': name, 'created_at': timestamp, 'size': size, 'content_type': content_type, 'etag': etag, 'deleted': deleted} - if self.db_file != ':memory:' and not os.path.exists(self.db_file): + if self.db_file == ':memory:': + self.merge_items([record]) + return + if not os.path.exists(self.db_file): raise DatabaseConnectionError(self.db_file, "DB doesn't exist") - self.merge_items([record]) + pending_size = 0 + try: + pending_size = os.path.getsize(self.pending_file) + except OSError, err: + if err.errno != errno.ENOENT: + raise + if pending_size> PENDING_CAP: + self._commit_puts([record]) + else: + with lock_parent_directory( + self.pending_file, self.pending_timeout): + with open(self.pending_file, 'a+b') as fp: + # Colons aren't used in base64 encoding; so they are our + # delimiter + fp.write(':') + fp.write(pickle.dumps( + (name, timestamp, size, content_type, etag, deleted), + protocol=PICKLE_PROTOCOL).encode('base64')) + fp.flush() def is_deleted(self, timestamp=None): """ @@ -799,6 +851,11 @@ class ContainerBroker(DatabaseBroker): """ if self.db_file != ':memory:' and not os.path.exists(self.db_file): return True + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise with self.get() as conn: row = conn.execute(''' SELECT put_timestamp, delete_timestamp, object_count @@ -821,6 +878,11 @@ class ContainerBroker(DatabaseBroker): reported_put_timestamp, reported_delete_timestamp, reported_object_count, reported_bytes_used, hash, id) """ + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise with self.get() as conn: return conn.execute(''' SELECT account, container, created_at, put_timestamp, @@ -857,6 +919,11 @@ class ContainerBroker(DatabaseBroker): :returns: list of object names """ + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise rv = [] with self.get() as conn: row = conn.execute(''' @@ -893,6 +960,11 @@ class ContainerBroker(DatabaseBroker): :returns: list of tuples of (name, created_at, size, content_type, etag) """ + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise if path is not None: prefix = path if path: @@ -916,10 +988,7 @@ class ContainerBroker(DatabaseBroker): elif prefix: query += ' name>= ? AND' query_args.append(prefix) - if self._get_db_version(conn) < 1: - query += ' +deleted = 0 ORDER BY name LIMIT ?' - else: - query += ' deleted = 0 ORDER BY name LIMIT ?' + query += ' +deleted = 0 ORDER BY name LIMIT ?' query_args.append(limit - len(results)) curs = conn.execute(query, query_args) curs.row_factory = None @@ -967,19 +1036,18 @@ class ContainerBroker(DatabaseBroker): max_rowid = -1 for rec in item_list: conn.execute(''' - DELETE FROM object WHERE name = ? AND created_at < ? AND - deleted IN (0, 1) + DELETE FROM object WHERE name = ? AND + (created_at < ?) ''', (rec['name'], rec['created_at'])) - if not conn.execute(''' - SELECT name FROM object WHERE name = ? AND - deleted IN (0, 1) - ''', (rec['name'],)).fetchall(): + try: conn.execute(''' INSERT INTO object (name, created_at, size, content_type, etag, deleted) VALUES (?, ?, ?, ?, ?, ?) ''', ([rec['name'], rec['created_at'], rec['size'], rec['content_type'], rec['etag'], rec['deleted']])) + except sqlite3.IntegrityError: + pass if source: max_rowid = max(max_rowid, rec['ROWID']) if source: @@ -1023,7 +1091,7 @@ class AccountBroker(DatabaseBroker): conn.executescript(""" CREATE TABLE container ( ROWID INTEGER PRIMARY KEY AUTOINCREMENT, - name TEXT, + name TEXT UNIQUE, put_timestamp TEXT, delete_timestamp TEXT, object_count INTEGER, @@ -1031,9 +1099,8 @@ class AccountBroker(DatabaseBroker): deleted INTEGER DEFAULT 0 ); - CREATE INDEX ix_container_deleted_name ON - container (deleted, name); - + CREATE INDEX ix_container_deleted ON container (deleted); + CREATE INDEX ix_container_name ON container (name); CREATE TRIGGER container_insert AFTER INSERT ON container BEGIN UPDATE account_stat @@ -1097,15 +1164,6 @@ class AccountBroker(DatabaseBroker): ''', (self.account, normalize_timestamp(time.time()), str(uuid4()), put_timestamp)) - def _get_db_version(self, conn): - if self._db_version == -1: - self._db_version = 0 - for row in conn.execute(''' - SELECT name FROM sqlite_master - WHERE name = 'ix_container_deleted_name' '''): - self._db_version = 1 - return self._db_version - def update_put_timestamp(self, timestamp): """ Update the put_timestamp. Only modifies it if it is greater than @@ -1135,16 +1193,17 @@ class AccountBroker(DatabaseBroker): def _commit_puts(self, item_list=None): """Handles commiting rows in .pending files.""" - pending_file = self.db_file + '.pending' - if self.db_file == ':memory:' or not os.path.exists(pending_file): - return - if not os.path.getsize(pending_file): - os.unlink(pending_file) + if self.db_file == ':memory:' or not os.path.exists(self.pending_file): return if item_list is None: item_list = [] - with lock_parent_directory(pending_file, PENDING_COMMIT_TIMEOUT): - with open(pending_file, 'r+b') as fp: + with lock_parent_directory(self.pending_file, self.pending_timeout): + self._preallocate() + if not os.path.getsize(self.pending_file): + if item_list: + self.merge_items(item_list) + return + with open(self.pending_file, 'r+b') as fp: for entry in fp.read().split(':'): if entry: try: @@ -1160,11 +1219,11 @@ class AccountBroker(DatabaseBroker): except Exception: self.logger.exception( _('Invalid pending entry %(file)s: %(entry)s'), - {'file': pending_file, 'entry': entry}) + {'file': self.pending_file, 'entry': entry}) if item_list: self.merge_items(item_list) try: - os.unlink(pending_file) + os.ftruncate(fp.fileno(), 0) except OSError, err: if err.errno != errno.ENOENT: raise @@ -1175,6 +1234,11 @@ class AccountBroker(DatabaseBroker): :returns: True if the database has no active containers. """ + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise with self.get() as conn: row = conn.execute( 'SELECT container_count from account_stat').fetchone() @@ -1194,6 +1258,7 @@ class AccountBroker(DatabaseBroker): :param sync_timestamp: max update_at timestamp of sync rows to delete """ + self._commit_puts() with self.get() as conn: conn.execute(''' DELETE FROM container WHERE @@ -1221,6 +1286,11 @@ class AccountBroker(DatabaseBroker): :returns: put_timestamp of the container """ + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise with self.get() as conn: ret = conn.execute(''' SELECT put_timestamp FROM container @@ -1241,8 +1311,6 @@ class AccountBroker(DatabaseBroker): :param object_count: number of objects in the container :param bytes_used: number of bytes used by the container """ - if self.db_file != ':memory:' and not os.path.exists(self.db_file): - raise DatabaseConnectionError(self.db_file, "DB doesn't exist") if delete_timestamp> put_timestamp and \ object_count in (None, '', 0, '0'): deleted = 1 @@ -1253,7 +1321,24 @@ class AccountBroker(DatabaseBroker): 'object_count': object_count, 'bytes_used': bytes_used, 'deleted': deleted} - self.merge_items([record]) + if self.db_file == ':memory:': + self.merge_items([record]) + return + commit = False + with lock_parent_directory(self.pending_file, self.pending_timeout): + with open(self.pending_file, 'a+b') as fp: + # Colons aren't used in base64 encoding; so they are our + # delimiter + fp.write(':') + fp.write(pickle.dumps( + (name, put_timestamp, delete_timestamp, object_count, + bytes_used, deleted), + protocol=PICKLE_PROTOCOL).encode('base64')) + fp.flush() + if fp.tell()> PENDING_CAP: + commit = True + if commit: + self._commit_puts() def can_delete_db(self, cutoff): """ @@ -1261,6 +1346,7 @@ class AccountBroker(DatabaseBroker): :returns: True if the account can be deleted, False otherwise """ + self._commit_puts() with self.get() as conn: row = conn.execute(''' SELECT status, put_timestamp, delete_timestamp, container_count @@ -1286,6 +1372,11 @@ class AccountBroker(DatabaseBroker): """ if self.db_file != ':memory:' and not os.path.exists(self.db_file): return True + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise with self.get() as conn: row = conn.execute(''' SELECT put_timestamp, delete_timestamp, container_count, status @@ -1310,6 +1401,11 @@ class AccountBroker(DatabaseBroker): delete_timestamp, container_count, object_count, bytes_used, hash, id) """ + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise with self.get() as conn: return conn.execute(''' SELECT account, created_at, put_timestamp, delete_timestamp, @@ -1326,6 +1422,11 @@ class AccountBroker(DatabaseBroker): :returns: list of container names """ + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise rv = [] with self.get() as conn: row = conn.execute(''' @@ -1359,6 +1460,11 @@ class AccountBroker(DatabaseBroker): :returns: list of tuples of (name, object_count, bytes_used, 0) """ + try: + self._commit_puts() + except LockTimeout: + if not self.stale_reads_ok: + raise if delimiter and not prefix: prefix = '' orig_marker = marker @@ -1379,10 +1485,7 @@ class AccountBroker(DatabaseBroker): elif prefix: query += ' name>= ? AND' query_args.append(prefix) - if self._get_db_version(conn) < 1: - query += ' +deleted = 0 ORDER BY name LIMIT ?' - else: - query += ' deleted = 0 ORDER BY name LIMIT ?' + query += ' +deleted = 0 ORDER BY name LIMIT ?' query_args.append(limit - len(results)) curs = conn.execute(query, query_args) curs.row_factory = None @@ -1426,39 +1529,51 @@ class AccountBroker(DatabaseBroker): record = [rec['name'], rec['put_timestamp'], rec['delete_timestamp'], rec['object_count'], rec['bytes_used'], rec['deleted']] - curs = conn.execute(''' - SELECT name, put_timestamp, delete_timestamp, - object_count, bytes_used, deleted - FROM container WHERE name = ? AND - deleted IN (0, 1) - ''', (rec['name'],)) - curs.row_factory = None - row = curs.fetchone() - if row: - row = list(row) - for i in xrange(5): - if record[i] is None and row[i] is not None: - record[i] = row[i] - if row[1]> record[1]: # Keep newest put_timestamp - record[1] = row[1] - if row[2]> record[2]: # Keep newest delete_timestamp - record[2] = row[2] - # If deleted, mark as such - if record[2]> record[1] and \ - record[3] in (None, '', 0, '0'): - record[5] = 1 - else: - record[5] = 0 - conn.execute(''' - DELETE FROM container WHERE name = ? AND - deleted IN (0, 1) - ''', (record[0],)) - conn.execute(''' - INSERT INTO container (name, put_timestamp, - delete_timestamp, object_count, bytes_used, - deleted) - VALUES (?, ?, ?, ?, ?, ?) - ''', record) + try: + conn.execute(''' + INSERT INTO container (name, put_timestamp, + delete_timestamp, object_count, bytes_used, + deleted) + VALUES (?, ?, ?, ?, ?, ?) + ''', record) + except sqlite3.IntegrityError: + curs = conn.execute(''' + SELECT name, put_timestamp, delete_timestamp, + object_count, bytes_used, deleted + FROM container WHERE name = ? AND + (put_timestamp < ? OR delete_timestamp < ? OR + object_count != ? OR bytes_used != ?)''', + (rec['name'], rec['put_timestamp'], + rec['delete_timestamp'], rec['object_count'], + rec['bytes_used'])) + curs.row_factory = None + row = curs.fetchone() + if row: + row = list(row) + for i in xrange(5): + if record[i] is None and row[i] is not None: + record[i] = row[i] + if row[1]> record[1]: # Keep newest put_timestamp + record[1] = row[1] + if row[2]> record[2]: # Keep newest delete_timestamp + record[2] = row[2] + conn.execute('DELETE FROM container WHERE name = ?', + (record[0],)) + # If deleted, mark as such + if record[2]> record[1] and \ + record[3] in (None, '', 0, '0'): + record[5] = 1 + else: + record[5] = 0 + try: + conn.execute(''' + INSERT INTO container (name, put_timestamp, + delete_timestamp, object_count, bytes_used, + deleted) + VALUES (?, ?, ?, ?, ?, ?) + ''', record) + except sqlite3.IntegrityError: + continue if source: max_rowid = max(max_rowid, rec['ROWID']) if source: diff --git a/swift/common/db_replicator.py b/swift/common/db_replicator.py index 4b4b30fd30..3c3731d45a 100644 --- a/swift/common/db_replicator.py +++ b/swift/common/db_replicator.py @@ -180,9 +180,7 @@ class Replicator(Daemon): return False # perform block-level sync if the db was modified during the first sync if os.path.exists(broker.db_file + '-journal') or \ - os.path.exists(broker.db_file + '-wal') or \ - os.path.exists(broker.db_file + '-shm') or \ - os.path.getmtime(broker.db_file)> mtime: + os.path.getmtime(broker.db_file)> mtime: # grab a lock so nobody else can modify it with broker.lock(): if not self._rsync_file(broker.db_file, remote_file, False): @@ -318,7 +316,7 @@ class Replicator(Daemon): self.logger.debug(_('Replicating db %s'), object_file) self.stats['attempted'] += 1 try: - broker = self.brokerclass(object_file) + broker = self.brokerclass(object_file, pending_timeout=30) broker.reclaim(time.time() - self.reclaim_age, time.time() - (self.reclaim_age * 2)) info = broker.get_replication_info() diff --git a/swift/container/server.py b/swift/container/server.py index 1bfba49c36..549fc47596 100644 --- a/swift/container/server.py +++ b/swift/container/server.py @@ -219,6 +219,8 @@ class ContainerController(object): if self.mount_check and not check_mount(self.root, drive): return Response(status='507 %s is not mounted' % drive) broker = self._get_container_broker(drive, part, account, container) + broker.pending_timeout = 0.1 + broker.stale_reads_ok = True if broker.is_deleted(): return HTTPNotFound(request=req) info = broker.get_info() @@ -244,6 +246,8 @@ class ContainerController(object): if self.mount_check and not check_mount(self.root, drive): return Response(status='507 %s is not mounted' % drive) broker = self._get_container_broker(drive, part, account, container) + broker.pending_timeout = 0.1 + broker.stale_reads_ok = True if broker.is_deleted(): return HTTPNotFound(request=req) info = broker.get_info() From 55ebda9d65b0c4543f45bbf1eac0b4f2ded1bcdf Mon Sep 17 00:00:00 2001 From: FUJITA Tomonori Date: 2011年2月23日 11:55:49 +0900 Subject: [PATCH 12/26] s3api: use boto to get canonical string for signature Replace the homegrown function to get a canonical string for signature. --- swift/common/middleware/swift3.py | 28 ++++++++-------------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/swift/common/middleware/swift3.py b/swift/common/middleware/swift3.py index 26626375bc..e071624399 100644 --- a/swift/common/middleware/swift3.py +++ b/swift/common/middleware/swift3.py @@ -55,6 +55,7 @@ import rfc822 import hmac import base64 import errno +import boto.utils from xml.sax.saxutils import escape as xml_escape import cgi @@ -378,31 +379,18 @@ class Swift3Middleware(object): return ServiceController, d def get_account_info(self, env, req): - if req.headers.get("content-md5"): - md5 = req.headers.get("content-md5") - else: - md5 = "" - - if req.headers.get("content-type"): - content_type = req.headers.get("content-type") - else: - content_type = "" - - if req.headers.get("date"): - date = req.headers.get("date") - else: - date = "" - - h = req.method + "\n" + md5 + "\n" + content_type + "\n" + date + "\n" - for header in req.headers: - if header.startswith("X-Amz-"): - h += header.lower() + ":" + str(req.headers[header]) + "\n" - h += req.path try: account, user, _junk = \ req.headers['Authorization'].split(' ')[-1].split(':') except Exception: return None, None + + headers = {} + for key in req.headers: + if type(req.headers[key]) == str: + headers[key] = req.headers[key] + + h = boto.utils.canonical_string(req.method, req.path_qs, headers) token = base64.urlsafe_b64encode(h) return '%s:%s' % (account, user), token From 14d0784c0139d8eaa4918af57af4a175f61fafc3 Mon Sep 17 00:00:00 2001 From: FUJITA Tomonori Date: 2011年2月23日 11:57:31 +0900 Subject: [PATCH 13/26] update docs about python-boto dependency --- doc/source/development_saio.rst | 2 +- doc/source/getting_started.rst | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/doc/source/development_saio.rst b/doc/source/development_saio.rst index 38c0475975..10b89e9051 100644 --- a/doc/source/development_saio.rst +++ b/doc/source/development_saio.rst @@ -31,7 +31,7 @@ Installing dependencies and the core code #. `apt-get install curl gcc bzr memcached python-configobj python-coverage python-dev python-nose python-setuptools python-simplejson python-xattr sqlite3 xfsprogs python-webob python-eventlet - python-greenlet python-pastedeploy python-netifaces` + python-greenlet python-pastedeploy python-netifaces python-boto` #. Install anything else you want, like screen, ssh, vim, etc. #. Next, choose either :ref:`partition-section` or :ref:`loopback-section`. diff --git a/doc/source/getting_started.rst b/doc/source/getting_started.rst index 219adfb462..9f24970063 100644 --- a/doc/source/getting_started.rst +++ b/doc/source/getting_started.rst @@ -22,6 +22,7 @@ And the following python libraries: * Nose * Sphinx * netifaces +* boto ----------- Development From a3474704c2b6fc53089824bae87d016b00404549 Mon Sep 17 00:00:00 2001 From: David Goetz Date: 2011年2月23日 11:44:36 -0800 Subject: [PATCH 14/26] ratelimiting does not handle memcache restart --- swift/common/memcached.py | 7 +++ swift/common/middleware/ratelimit.py | 44 ++++++++++--------- test/unit/common/middleware/test_ratelimit.py | 19 ++++++++ test/unit/common/test_memcached.py | 10 +++++ 4 files changed, 60 insertions(+), 20 deletions(-) diff --git a/swift/common/memcached.py b/swift/common/memcached.py index 17378e1aae..8a7ae1a8a5 100644 --- a/swift/common/memcached.py +++ b/swift/common/memcached.py @@ -45,6 +45,10 @@ def md5hash(key): return md5(key).hexdigest() +class MemcacheConnectionError(Exception): + pass + + class MemcacheRing(object): """ Simple, consistent-hashed memcache client. @@ -180,6 +184,7 @@ class MemcacheRing(object): :param delta: amount to add to the value of key (or set as the value if the key is not found) will be cast to an int :param timeout: ttl in memcache + :raises MemcacheConnectionError: """ key = md5hash(key) command = 'incr' @@ -209,6 +214,7 @@ class MemcacheRing(object): return ret except Exception, e: self._exception_occurred(server, e) + raise MemcacheConnectionError("No Memcached connections succeeded.") def decr(self, key, delta=1, timeout=0): """ @@ -220,6 +226,7 @@ class MemcacheRing(object): value to 0 if the key is not found) will be cast to an int :param timeout: ttl in memcache + :raises MemcacheConnectionError: """ self.incr(key, delta=-delta, timeout=timeout) diff --git a/swift/common/middleware/ratelimit.py b/swift/common/middleware/ratelimit.py index 485b1db26e..836cb51bb2 100644 --- a/swift/common/middleware/ratelimit.py +++ b/swift/common/middleware/ratelimit.py @@ -18,6 +18,7 @@ from webob.exc import HTTPNotFound from swift.common.utils import split_path, cache_from_env, get_logger from swift.proxy.server import get_container_memcache_key +from swift.common.memcached import MemcacheConnectionError class MaxSleepTimeHitError(Exception): @@ -136,28 +137,31 @@ class RateLimitMiddleware(object): :param max_rate: maximum rate allowed in requests per second :raises: MaxSleepTimeHitError if max sleep time is exceeded. ''' - now_m = int(round(time.time() * self.clock_accuracy)) - time_per_request_m = int(round(self.clock_accuracy / max_rate)) - running_time_m = self.memcache_client.incr(key, - delta=time_per_request_m) - need_to_sleep_m = 0 - if (now_m - running_time_m> - self.rate_buffer_seconds * self.clock_accuracy): - next_avail_time = int(now_m + time_per_request_m) - self.memcache_client.set(key, str(next_avail_time), - serialize=False) - else: - need_to_sleep_m = \ - max(running_time_m - now_m - time_per_request_m, 0) + try: + now_m = int(round(time.time() * self.clock_accuracy)) + time_per_request_m = int(round(self.clock_accuracy / max_rate)) + running_time_m = self.memcache_client.incr(key, + delta=time_per_request_m) + need_to_sleep_m = 0 + if (now_m - running_time_m> + self.rate_buffer_seconds * self.clock_accuracy): + next_avail_time = int(now_m + time_per_request_m) + self.memcache_client.set(key, str(next_avail_time), + serialize=False) + else: + need_to_sleep_m = \ + max(running_time_m - now_m - time_per_request_m, 0) - max_sleep_m = self.max_sleep_time_seconds * self.clock_accuracy - if max_sleep_m - need_to_sleep_m <= self.clock_accuracy * 0.01: - # treat as no-op decrement time - self.memcache_client.decr(key, delta=time_per_request_m) - raise MaxSleepTimeHitError("Max Sleep Time Exceeded: %s" % - need_to_sleep_m) + max_sleep_m = self.max_sleep_time_seconds * self.clock_accuracy + if max_sleep_m - need_to_sleep_m <= self.clock_accuracy * 0.01: + # treat as no-op decrement time + self.memcache_client.decr(key, delta=time_per_request_m) + raise MaxSleepTimeHitError("Max Sleep Time Exceeded: %s" % + need_to_sleep_m) - return float(need_to_sleep_m) / self.clock_accuracy + return float(need_to_sleep_m) / self.clock_accuracy + except MemcacheConnectionError: + return 0 def handle_ratelimit(self, req, account_name, container_name, obj_name): ''' diff --git a/test/unit/common/middleware/test_ratelimit.py b/test/unit/common/middleware/test_ratelimit.py index ef1abca91e..4afefb0351 100644 --- a/test/unit/common/middleware/test_ratelimit.py +++ b/test/unit/common/middleware/test_ratelimit.py @@ -21,12 +21,14 @@ from webob import Request from swift.common.middleware import ratelimit from swift.proxy.server import get_container_memcache_key +from swift.common.memcached import MemcacheConnectionError class FakeMemcache(object): def __init__(self): self.store = {} + self.error_on_incr = False def get(self, key): return self.store.get(key) @@ -36,6 +38,8 @@ class FakeMemcache(object): return True def incr(self, key, delta=1, timeout=0): + if self.error_on_incr: + raise MemcacheConnectionError('Memcache restarting') self.store[key] = int(self.store.setdefault(key, 0)) + int(delta) if self.store[key] < 0: self.store[key] = 0 @@ -403,6 +407,21 @@ class TestRateLimit(unittest.TestCase): start_response) self._run(make_app_call, num_calls, current_rate) + def test_restarting_memcache(self): + current_rate = 2 + num_calls = 5 + conf_dict = {'account_ratelimit': current_rate} + self.test_ratelimit = ratelimit.filter_factory(conf_dict)(FakeApp()) + ratelimit.http_connect = mock_http_connect(204) + req = Request.blank('/v/a') + req.environ['swift.cache'] = FakeMemcache() + req.environ['swift.cache'].error_on_incr = True + make_app_call = lambda: self.test_ratelimit(req.environ, + start_response) + begin = time.time() + self._run(make_app_call, num_calls, current_rate, check_time=False) + time_took = time.time() - begin + self.assert_(round(time_took, 1) == 0) # no memcache, no limiting if __name__ == '__main__': unittest.main() diff --git a/test/unit/common/test_memcached.py b/test/unit/common/test_memcached.py index 43f11650cf..b54f915dca 100644 --- a/test/unit/common/test_memcached.py +++ b/test/unit/common/test_memcached.py @@ -50,6 +50,7 @@ class MockMemcached(object): self.cache = {} self.down = False self.exc_on_delete = False + self.read_return_none = False def sendall(self, string): if self.down: @@ -110,6 +111,8 @@ class MockMemcached(object): else: self.outbuf += 'NOT_FOUND\r\n' def readline(self): + if self.read_return_none: + return None if self.down: raise Exception('mock is down') if '\n' in self.outbuf: @@ -166,6 +169,9 @@ class TestMemcached(unittest.TestCase): self.assertEquals(memcache_client.get('some_key'), '6') memcache_client.incr('some_key', delta=-15) self.assertEquals(memcache_client.get('some_key'), '0') + mock.read_return_none = True + self.assertRaises(memcached.MemcacheConnectionError, + memcache_client.incr, 'some_key', delta=-15) def test_decr(self): memcache_client = memcached.MemcacheRing(['1.2.3.4:11211']) @@ -179,6 +185,10 @@ class TestMemcached(unittest.TestCase): self.assertEquals(memcache_client.get('some_key'), '11') memcache_client.decr('some_key', delta=15) self.assertEquals(memcache_client.get('some_key'), '0') + mock.read_return_none = True + self.assertRaises(memcached.MemcacheConnectionError, + memcache_client.decr, 'some_key', delta=15) + def test_retry(self): logging.getLogger().addHandler(NullLoggingHandler()) From c6f1137191d5203abd36ad7f572073a5a5191398 Mon Sep 17 00:00:00 2001 From: Michael Barton Date: 2011年2月24日 08:07:26 +0000 Subject: [PATCH 15/26] fix loggin --- swift/proxy/server.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/swift/proxy/server.py b/swift/proxy/server.py index d2100af010..bf24d2048a 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -266,6 +266,7 @@ class SegmentedIterable(object): class Controller(object): """Base WSGI controller class for the proxy""" + server_type = _('Base') def __init__(self, app): self.account_name = None @@ -508,7 +509,7 @@ class Controller(object): self.error_limit(node) except Exception: self.error_limit(node) - self.exception_occurred(node, _('Object'), # TODO FIX LOGGIN' + self.exception_occurred(node, self.server_type, _('Trying to %(method)s %(path)s') % {'method': method, 'path': path}) @@ -529,7 +530,7 @@ class Controller(object): response.append((503, '', '')) statuses, reasons, bodies = zip(*response) return self.best_response(req, statuses, reasons, bodies, - _('Container POST')) # TODO fix loggin' + '%s %s' % (self.server_type, req.method)) def best_response(self, req, statuses, reasons, bodies, server_type, etag=None): @@ -669,6 +670,7 @@ class Controller(object): class ObjectController(Controller): """WSGI controller for object requests.""" + server_type = _('Object') def __init__(self, app, account_name, container_name, object_name, **kwargs): @@ -1141,6 +1143,7 @@ class ObjectController(Controller): class ContainerController(Controller): """WSGI controller for container requests""" + server_type = _('Container') # Ensure these are all lowercase pass_through_headers = ['x-container-read', 'x-container-write'] @@ -1289,6 +1292,7 @@ class ContainerController(Controller): class AccountController(Controller): """WSGI controller for account requests""" + server_type = _('Container') def __init__(self, app, account_name, **kwargs): Controller.__init__(self, app) From e1bd4e87697d6c921806d52a2637966b19face10 Mon Sep 17 00:00:00 2001 From: Michael Barton Date: 2011年2月24日 09:02:50 +0000 Subject: [PATCH 16/26] small refactors --- swift/proxy/server.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/swift/proxy/server.py b/swift/proxy/server.py index bf24d2048a..7334625b58 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -521,11 +521,12 @@ class Controller(object): handoff nodes as needed. """ nodes = self.iter_nodes(part, ring.get_part_nodes(part), ring) - pool = GreenPile(ring.replica_count) - for head in headers: - pool.spawn(self._make_request, nodes, part, method, path, - head, query_string) - response = [resp for resp in pool if resp] + with ContextPool(ring.replica_count) as pool: + pile = GreenPile(pool) + for head in headers: + pile.spawn(self._make_request, nodes, part, method, path, + head, query_string) + response = [resp for resp in pile if resp] while len(response) < ring.replica_count: response.append((503, '', '')) statuses, reasons, bodies = zip(*response) @@ -919,15 +920,12 @@ class ObjectController(Controller): content_type_manually_set = True if not req.headers.get('content-type'): guessed_type, _junk = mimetypes.guess_type(req.path_info) - if not guessed_type: - req.headers['Content-Type'] = 'application/octet-stream' - else: - req.headers['Content-Type'] = guessed_type + req.headers['Content-Type'] = guessed_type or \ + 'application/octet-stream' content_type_manually_set = False error_response = check_object_creation(req, self.object_name) if error_response: return error_response - conns = [] data_source = \ iter(lambda: req.body_file.read(self.app.client_chunk_size), '') source_header = req.headers.get('X-Copy-From') @@ -1017,7 +1015,7 @@ class ObjectController(Controller): for conn in list(conns): if not conn.failed: conn.queue.put('%x\r\n%s\r\n' % (len(chunk), chunk) - if chunked else chunk) + if chunked else chunk) else: conns.remove(conn) if len(conns) <= len(nodes) / 2: From e357ffcb1804d0939e90a853e74c627152892454 Mon Sep 17 00:00:00 2001 From: Michael Barton Date: 2011年2月24日 09:46:08 +0000 Subject: [PATCH 17/26] fixes --- swift/proxy/server.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 7334625b58..b87f585c96 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -521,12 +521,11 @@ class Controller(object): handoff nodes as needed. """ nodes = self.iter_nodes(part, ring.get_part_nodes(part), ring) - with ContextPool(ring.replica_count) as pool: - pile = GreenPile(pool) - for head in headers: - pile.spawn(self._make_request, nodes, part, method, path, - head, query_string) - response = [resp for resp in pile if resp] + pile = GreenPile(ring.replica_count) + for head in headers: + pile.spawn(self._make_request, nodes, part, method, path, + head, query_string) + response = [resp for resp in pile if resp] while len(response) < ring.replica_count: response.append((503, '', '')) statuses, reasons, bodies = zip(*response) @@ -1290,7 +1289,7 @@ class ContainerController(Controller): class AccountController(Controller): """WSGI controller for account requests""" - server_type = _('Container') + server_type = _('Account') def __init__(self, app, account_name, **kwargs): Controller.__init__(self, app) From 35e161f89e65c1cef3280fe1836dce0bd662dc7a Mon Sep 17 00:00:00 2001 From: Greg Lange Date: 2011年2月24日 17:22:57 +0000 Subject: [PATCH 18/26] fixed problem with how processed files list download failures are handled --- swift/stats/log_processor.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/swift/stats/log_processor.py b/swift/stats/log_processor.py index 23cc64d598..deae41c5cb 100644 --- a/swift/stats/log_processor.py +++ b/swift/stats/log_processor.py @@ -32,7 +32,8 @@ from swift.common.daemon import Daemon class BadFileDownload(Exception): - pass + def __init__(self, status_code=None): + self.status_code = status_code class LogProcessor(object): @@ -162,7 +163,7 @@ class LogProcessor(object): code, o = self.internal_proxy.get_object(swift_account, container_name, object_name) if code < 200 or code>= 300: - raise BadFileDownload() + raise BadFileDownload(code) last_part = '' last_compressed_part = '' # magic in the following zlib.decompressobj argument is courtesy of @@ -272,8 +273,13 @@ class LogProcessorDaemon(Daemon): already_processed_files = cPickle.loads(buf) else: already_processed_files = set() - except BadFileDownload: - already_processed_files = set() + except BadFileDownload, e: + if e.status_code == 404: + already_processed_files = set() + else: + self.logger.debug(_('Log processing was unable to download ' + 'processed files list')) + return self.logger.debug(_('found %d processed files') % \ len(already_processed_files)) logs_to_process = self.log_processor.get_data_list(lookback_start, From 507a9bf113d2711de2b6187e792501fb4d755fc5 Mon Sep 17 00:00:00 2001 From: Greg Lange Date: 2011年2月25日 21:01:35 +0000 Subject: [PATCH 19/26] reverts changes in my last merge request for lp707549 --- swift/stats/log_processor.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/swift/stats/log_processor.py b/swift/stats/log_processor.py index deae41c5cb..23cc64d598 100644 --- a/swift/stats/log_processor.py +++ b/swift/stats/log_processor.py @@ -32,8 +32,7 @@ from swift.common.daemon import Daemon class BadFileDownload(Exception): - def __init__(self, status_code=None): - self.status_code = status_code + pass class LogProcessor(object): @@ -163,7 +162,7 @@ class LogProcessor(object): code, o = self.internal_proxy.get_object(swift_account, container_name, object_name) if code < 200 or code>= 300: - raise BadFileDownload(code) + raise BadFileDownload() last_part = '' last_compressed_part = '' # magic in the following zlib.decompressobj argument is courtesy of @@ -273,13 +272,8 @@ class LogProcessorDaemon(Daemon): already_processed_files = cPickle.loads(buf) else: already_processed_files = set() - except BadFileDownload, e: - if e.status_code == 404: - already_processed_files = set() - else: - self.logger.debug(_('Log processing was unable to download ' - 'processed files list')) - return + except BadFileDownload: + already_processed_files = set() self.logger.debug(_('found %d processed files') % \ len(already_processed_files)) logs_to_process = self.log_processor.get_data_list(lookback_start, From 83caec04dcab17d494f52d20b334fd49a9c67d95 Mon Sep 17 00:00:00 2001 From: David Goetz Date: 2011年2月28日 10:59:16 -0800 Subject: [PATCH 20/26] lower memcache error limit duration --- swift/common/memcached.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swift/common/memcached.py b/swift/common/memcached.py index 17378e1aae..269c75f3e6 100644 --- a/swift/common/memcached.py +++ b/swift/common/memcached.py @@ -38,7 +38,7 @@ TRY_COUNT = 3 # will be considered failed for ERROR_LIMIT_DURATION seconds. ERROR_LIMIT_COUNT = 10 ERROR_LIMIT_TIME = 60 -ERROR_LIMIT_DURATION = 300 +ERROR_LIMIT_DURATION = 60 def md5hash(key): From e3ca838685631a4ae6e753ef9fb0c1c87ad7da50 Mon Sep 17 00:00:00 2001 From: Michael Barton Date: 2011年2月28日 23:07:53 +0000 Subject: [PATCH 21/26] doc fix --- doc/source/development_saio.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/development_saio.rst b/doc/source/development_saio.rst index 38c0475975..f7963b1b24 100644 --- a/doc/source/development_saio.rst +++ b/doc/source/development_saio.rst @@ -215,7 +215,7 @@ Configuring each node Sample configuration files are provided with all defaults in line-by-line comments. - #. If your going to use the DevAuth (the default swift-auth-server), create + #. If you're going to use the DevAuth (the default swift-auth-server), create `/etc/swift/auth-server.conf` (you can skip this if you're going to use Swauth):: From 8dafefbb660139d28b37be6750fc494ea9f1c6e9 Mon Sep 17 00:00:00 2001 From: FUJITA Tomonori Date: Tue, 1 Mar 2011 20:50:28 +0900 Subject: [PATCH 22/26] s3api: move boto dependency descriptions to swift3.py --- doc/source/development_saio.rst | 2 +- doc/source/getting_started.rst | 1 - swift/common/middleware/swift3.py | 3 +++ 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/doc/source/development_saio.rst b/doc/source/development_saio.rst index 10b89e9051..38c0475975 100644 --- a/doc/source/development_saio.rst +++ b/doc/source/development_saio.rst @@ -31,7 +31,7 @@ Installing dependencies and the core code #. `apt-get install curl gcc bzr memcached python-configobj python-coverage python-dev python-nose python-setuptools python-simplejson python-xattr sqlite3 xfsprogs python-webob python-eventlet - python-greenlet python-pastedeploy python-netifaces python-boto` + python-greenlet python-pastedeploy python-netifaces` #. Install anything else you want, like screen, ssh, vim, etc. #. Next, choose either :ref:`partition-section` or :ref:`loopback-section`. diff --git a/doc/source/getting_started.rst b/doc/source/getting_started.rst index 9f24970063..219adfb462 100644 --- a/doc/source/getting_started.rst +++ b/doc/source/getting_started.rst @@ -22,7 +22,6 @@ And the following python libraries: * Nose * Sphinx * netifaces -* boto ----------- Development diff --git a/swift/common/middleware/swift3.py b/swift/common/middleware/swift3.py index e071624399..423b0123be 100644 --- a/swift/common/middleware/swift3.py +++ b/swift/common/middleware/swift3.py @@ -16,6 +16,9 @@ """ The swift3 middleware will emulate the S3 REST api on top of swift. +The boto python library is necessary to use this middleware (install +the python-boto package if you use Ubuntu). + The following opperations are currently supported: * GET Service From 326dfac188efd16a061b1fa048de8f35425db858 Mon Sep 17 00:00:00 2001 From: Michael Barton Date: Wed, 2 Mar 2011 20:38:08 +0000 Subject: [PATCH 23/26] remove offending probe checks --- test/probe/test_container_failures.py | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/test/probe/test_container_failures.py b/test/probe/test_container_failures.py index 656f637a15..585835d2a8 100755 --- a/test/probe/test_container_failures.py +++ b/test/probe/test_container_failures.py @@ -70,17 +70,6 @@ class TestContainerFailures(unittest.TestCase): self.assert_(object1 in [o['name'] for o in client.get_container(self.url, self.token, container)[1]]) - # This fails because all three nodes have to indicate deletion before - # we tell the user it worked. Since the first node 409s (it hasn't got - # the update that the object was deleted yet), the whole must 503 - # (until every is synced up, then the delete would work). - exc = None - try: - client.delete_container(self.url, self.token, container) - except client.ClientException, err: - exc = err - self.assert_(exc) - self.assert_(exc.http_status, 503) # Unfortunately, the following might pass or fail, depending on the # position of the account server associated with the first container # server we had killed. If the associated happens to be the first @@ -144,17 +133,6 @@ class TestContainerFailures(unittest.TestCase): self.assert_(object1 not in [o['name'] for o in client.get_container(self.url, self.token, container)[1]]) - # This fails because all three nodes have to indicate deletion before - # we tell the user it worked. Since the first node 409s (it hasn't got - # the update that the object was deleted yet), the whole must 503 - # (until every is synced up, then the delete would work). - exc = None - try: - client.delete_container(self.url, self.token, container) - except client.ClientException, err: - exc = err - self.assert_(exc) - self.assert_(exc.http_status, 503) # Unfortunately, the following might pass or fail, depending on the # position of the account server associated with the first container # server we had killed. If the associated happens to be the first From d2deaa666fb20ddff9dca31eb910874b6e72de10 Mon Sep 17 00:00:00 2001 From: Michael Barton Date: Thu, 3 Mar 2011 20:24:03 +0000 Subject: [PATCH 24/26] recommended doc changes --- etc/proxy-server.conf-sample | 1 - swift/proxy/server.py | 4 ++++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index eb2e70e869..3af7db0f8a 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -43,7 +43,6 @@ use = egg:swift#proxy # If set to 'true' any authorized user may create and delete accounts; if # 'false' no one, even authorized, can. # allow_account_management = false -# put_queue_depth = 10 # Only needed for DevAuth [filter:auth] diff --git a/swift/proxy/server.py b/swift/proxy/server.py index b87f585c96..0a0991c517 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -519,6 +519,10 @@ class Controller(object): Sends an HTTP request to multiple nodes and aggregates the results. It attempts the primary nodes concurrently, then iterates over the handoff nodes as needed. + + :param headers: a list of dicts, where each dict represents one + backend request that should be made. + :returns: a webob Response object """ nodes = self.iter_nodes(part, ring.get_part_nodes(part), ring) pile = GreenPile(ring.replica_count) From 4a27b7e29929c65155a94617f24e93491d7ae405 Mon Sep 17 00:00:00 2001 From: Chuck Thier Date: Thu, 3 Mar 2011 20:34:48 +0000 Subject: [PATCH 25/26] Swift3 tests will now skip if boto is not installed --- test/unit/common/middleware/test_swift3.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/test/unit/common/middleware/test_swift3.py b/test/unit/common/middleware/test_swift3.py index f84a0ffe8a..09c30feac3 100644 --- a/test/unit/common/middleware/test_swift3.py +++ b/test/unit/common/middleware/test_swift3.py @@ -22,9 +22,16 @@ from webob.exc import HTTPUnauthorized, HTTPCreated, HTTPNoContent,\ HTTPAccepted, HTTPBadRequest, HTTPNotFound, HTTPConflict import xml.dom.minidom import simplejson +from nose.plugins.skip import SkipTest from swift.common.middleware import swift3 +try: + import boto.s3 + skip = False +except Exception: + # Skip the swift3 tests if boto is not installed + skip = True class FakeApp(object): def __init__(self): @@ -190,6 +197,8 @@ def start_response(*args): class TestSwift3(unittest.TestCase): def setUp(self): + if skip: + raise SkipTest self.app = swift3.filter_factory({})(FakeApp()) def test_non_s3_request_passthrough(self): From 6c892ad1c7bdb3ae19aa7fc0df8db30d9c3e25cb Mon Sep 17 00:00:00 2001 From: Chuck Thier Date: Thu, 3 Mar 2011 20:47:23 +0000 Subject: [PATCH 26/26] heh --- test/unit/common/middleware/test_swift3.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/unit/common/middleware/test_swift3.py b/test/unit/common/middleware/test_swift3.py index 09c30feac3..d91cda379f 100644 --- a/test/unit/common/middleware/test_swift3.py +++ b/test/unit/common/middleware/test_swift3.py @@ -24,10 +24,8 @@ import xml.dom.minidom import simplejson from nose.plugins.skip import SkipTest -from swift.common.middleware import swift3 - try: - import boto.s3 + from swift.common.middleware import swift3 skip = False except Exception: # Skip the swift3 tests if boto is not installed

AltStyle によって変換されたページ (->オリジナル) /