From d9883d083409baac3db44e1db14bf3c79a75f411 Mon Sep 17 00:00:00 2001 From: Jianjian Huo Date: 2024年3月20日 13:06:35 -0700 Subject: [PATCH] proxy: use cooperative tokens to coalesce updating shard range requests into backend The cost of memcache misses could be deadly. For example, when updating shard range cache query miss, PUT requests would have to query the backend to figure out which shard to upload the objects. And when a lot of requests are sending to the backend at the same time, this could easily overload the root containers and cause a lot of 500/503 errors; and when proxy-servers receive responses of all those 200 backend shard range queries, they could in turn try to write the same shard range data into memcached servers at the same time, and cause memcached to return OOM failures too. We have seen cache misses frequently to updating shard range cache in production, due to Memcached out-of-memory and cache evictions. To cope with those kind of situations, a memcached based cooperative token mechanism can be added into proxy-server to coalesce lots of in-flight backend requests into a few: when updating shard range cache misses, only the first few of requests will get global cooperative tokens and then be able to fetch updating shard ranges from backend container servers. And the following cache miss requests will wait for cache filling to finish, instead of all querying the backend container servers. This will prevent a flood of backend requests to overload both container servers and memcached servers. Drive-by fix: when memcache is not available, object controller will only need to retrieve a specific shard range from the container server to send the update request to. Co-Authored-By: Clay Gerrard Co-Authored-By: Tim Burke Co-Authored-By: Yan Xiao Co-Authored-By: Shreeya Deshpande Signed-off-by: Jianjian Huo Change-Id: I38c11b7aae8c4112bb3d671fa96012ab0c44d5a2 --- etc/proxy-server.conf-sample | 13 + swift/common/utils/__init__.py | 7 + swift/common/utils/config.py | 15 + swift/proxy/controllers/base.py | 57 +- swift/proxy/controllers/container.py | 2 +- swift/proxy/controllers/obj.py | 112 ++- swift/proxy/server.py | 23 +- test/unit/common/utils/test_config.py | 27 + test/unit/proxy/controllers/test_base.py | 24 +- test/unit/proxy/controllers/test_obj.py | 877 ++++++++++++++++++++++- test/unit/proxy/test_server.py | 127 +++- 11 files changed, 1199 insertions(+), 85 deletions(-) diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index 701b514acf..c0b832daaa 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -207,6 +207,19 @@ use = egg:swift#proxy # container_listing_shard_ranges_skip_cache_pct = 0.0 # account_existence_skip_cache_pct = 0.0 # +# Use cooperative token on updating namespace cache to coalesce the requests +# which fetch updating namespaces from the backend and set them in memcached. +# Number of cooperative tokens per each token session, 0 means to disable the +# usage of cooperative token and directly talk to the backend and memcache. +# namespace_cache_tokens_per_session = 3 +# +# The average time spent on getting updating namespaces from the container +# servers, this will be used as basic unit for cooperative token to figure out +# intervals for the retries when requests didn't acquire a token and are +# waiting for other requests to fill in the cache; and a cooperative token +# session (`token_ttl`) will be 10 times of this value. +# namespace_avg_backend_fetch_time = 0.3 +# # object_chunk_size = 65536 # client_chunk_size = 65536 # diff --git a/swift/common/utils/__init__.py b/swift/common/utils/__init__.py index 22a09c4499..ce6318e1f6 100644 --- a/swift/common/utils/__init__.py +++ b/swift/common/utils/__init__.py @@ -108,6 +108,7 @@ from swift.common.utils.config import ( # noqa non_negative_float, non_negative_int, config_positive_int_value, + config_positive_float_value, config_float_value, config_auto_int_value, config_percent_value, @@ -3887,6 +3888,12 @@ class NamespaceBoundList(object): return False return self.bounds == other.bounds + def __len__(self): + """ + Return the number of namespaces in the NamespaceBoundList. + """ + return len(self.bounds) + @classmethod def parse(cls, namespaces): """ diff --git a/swift/common/utils/config.py b/swift/common/utils/config.py index f56d89f0fd..d070648b28 100644 --- a/swift/common/utils/config.py +++ b/swift/common/utils/config.py @@ -83,6 +83,21 @@ def config_positive_int_value(value): return result +def config_positive_float_value(value): + """ + Returns positive float value if it can be cast by float() and it's a + float> 0. (not including zero) Raises ValueError otherwise. + """ + try: + result = float(value) + if result <= 0: + raise ValueError() + except (TypeError, ValueError): + raise ValueError( + 'Config option must be a positive float number, not "%s".' % value) + return result + + def config_float_value(value, minimum=None, maximum=None): try: val = float(value) diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index a2cba72816..34f4688231 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -860,6 +860,34 @@ def _get_info_from_caches(app, env, account, container=None): return info, cache_state +def namespace_bounds_to_list(bounds): + """ + This function converts the namespaces bounds to ``NamespaceBoundList``. + + :param bounds: a list of namespaces bounds(tuple of lower and name). + :returns: the object instance of ``NamespaceBoundList``; None if ``bounds`` + is None or empty. + """ + ns_bound_list = None + if bounds: + ns_bound_list = NamespaceBoundList(bounds) + return ns_bound_list + + +def namespace_list_to_bounds(ns_bound_list): + """ + This function converts ``NamespaceBoundList`` to the namespaces bounds. + + :param ns_bound_list: an object instance of ``NamespaceBoundList``. + :returns: a list of namespaces bounds(tuple of lower and name); None if + ``ns_bound_list`` is None or empty. + """ + bounds = None + if ns_bound_list: + bounds = ns_bound_list.bounds + return bounds + + def get_namespaces_from_cache(req, cache_key, skip_chance): """ Get cached namespaces from infocache or memcache. @@ -880,8 +908,6 @@ def get_namespaces_from_cache(req, cache_key, skip_chance): # then try get them from memcache memcache = cache_from_env(req.environ, True) - if not memcache: - return None, 'disabled' if skip_chance and random.random() < skip_chance: return None, 'skip' try: @@ -891,11 +917,8 @@ def get_namespaces_from_cache(req, cache_key, skip_chance): bounds = None cache_state = 'error' - if bounds: - ns_bound_list = NamespaceBoundList(bounds) - infocache[cache_key] = ns_bound_list - else: - ns_bound_list = None + ns_bound_list = namespace_bounds_to_list(bounds) + infocache[cache_key] = ns_bound_list return ns_bound_list, cache_state @@ -905,22 +928,32 @@ def set_namespaces_in_cache(req, cache_key, ns_bound_list, time): :param req: a :class:`swift.common.swob.Request` object. :param cache_key: the cache key for both infocache and memcache. - :param ns_bound_list: a :class:`swift.common.utils.NamespaceBoundList`. + :param ns_bound_list: a :class:`swift.common.utils.NamespaceBoundList`; + should NOT be None nor empty. :param time: how long the namespaces should remain in memcache. :return: the cache_state. """ + if cache_key.startswith('shard-updating'): + raise ValueError('shard-updating cache should use ' + 'CooperativeNamespaceCachePopulator') infocache = req.environ.setdefault('swift.infocache', {}) infocache[cache_key] = ns_bound_list memcache = cache_from_env(req.environ, True) - if memcache and ns_bound_list: + if memcache: + bounds = namespace_list_to_bounds(ns_bound_list) try: - memcache.set(cache_key, ns_bound_list.bounds, time=time, - raise_on_error=True) + memcache.set(cache_key, bounds, time=time, raise_on_error=True) except MemcacheConnectionError: cache_state = 'set_error' else: cache_state = 'set' else: + # N.B. get_namespaces_from_cache is used for both types of namespace + # cache objects (updating and listing), and both code paths only call + # that helper if memcache is enabled. But this function is now only + # used to set cache *listing* namespace objects over in + # ContainerController, so if that code path learns to not call it when + # memcache is disabled this func could also drop cache_state=disabled cache_state = 'disabled' return cache_state @@ -1177,6 +1210,7 @@ class GetterBase(object): server type. :param logger: a logger instance. """ + def __init__(self, app, req, node_iter, partition, policy, path, backend_headers, node_timeout, resource_type, logger=None): @@ -1360,6 +1394,7 @@ class GetOrHeadHandler(GetterBase): :param policy: the policy instance, or None if Account or Container. :param logger: a logger instance. """ + def __init__(self, app, req, server_type, node_iter, partition, path, backend_headers, concurrency=1, policy=None, logger=None): newest = config_true_value(req.headers.get('x-newest', 'f')) diff --git a/swift/proxy/controllers/container.py b/swift/proxy/controllers/container.py index dcbbcc7851..7fc43f3dad 100644 --- a/swift/proxy/controllers/container.py +++ b/swift/proxy/controllers/container.py @@ -182,7 +182,7 @@ class ContainerController(Controller): :param req: the request object. :param namespaces: a list of :class:`~swift.common.utils.Namespace` - objects. + objects; must be not None or empty. :return: a list of :class:`~swift.common.utils.Namespace` objects. """ cache_key = get_cache_key(self.account_name, self.container_name, diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 7ac2d70564..80a94235e2 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -45,7 +45,8 @@ from swift.common.utils import ( normalize_delete_at_timestamp, public, document_iters_to_http_response_body, parse_content_range, quorum_size, reiterate, close_if_possible, safe_json_loads, md5, - NamespaceBoundList, CooperativeIterator) + NamespaceBoundList, CooperativeIterator, cache_from_env, + CooperativeCachePopulator) from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_metadata, check_object_creation from swift.common import constraints @@ -66,7 +67,7 @@ from swift.proxy.controllers.base import Controller, delay_denial, \ cors_validation, update_headers, bytes_to_skip, ByteCountEnforcer, \ record_cache_op_metrics, get_cache_key, GetterBase, GetterSource, \ is_good_source, NodeIter, get_namespaces_from_cache, \ - set_namespaces_in_cache + namespace_bounds_to_list, namespace_list_to_bounds from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \ HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \ HTTPServerError, HTTPServiceUnavailable, HTTPClientDisconnect, \ @@ -161,6 +162,45 @@ class ObjectControllerRouter(object): return self.policy_to_controller_cls[int(policy)] +class CooperativeNamespaceCachePopulator(CooperativeCachePopulator): + """ + CooperativeCachePopulator to fetch updating namespaces from backend + container cooperatively using cooperative token and memcached. + """ + + def __init__(self, ctrl, account, container, req, cache_key): + infocache = req.environ.setdefault('swift.infocache', {}) + memcache = cache_from_env(req.environ, True) + cache_ttl = ctrl.app.recheck_updating_shard_ranges + avg_backend_fetch_time = ctrl.app.namespace_avg_backend_fetch_time + num_tokens = ctrl.app.namespace_cache_tokens_per_session + labels = { + 'resource': 'shard_updating', + } + if account is not None: + labels['account'] = account + if container is not None: + labels['container'] = container + super().__init__( + ctrl.app, infocache, memcache, cache_key, cache_ttl, + avg_backend_fetch_time, num_tokens, labels=labels + ) + self.ctrl = ctrl + self.account = account + self.container = container + self.req = req + + def cache_encoder(self, ns_bound_list): + return namespace_list_to_bounds(ns_bound_list) + + def cache_decoder(self, bounds): + return namespace_bounds_to_list(bounds) + + def do_fetch_backend(self): + return self.ctrl._get_backend_updating_namespaces( + self.req, self.account, self.container) + + class BaseObjectController(Controller): """Base WSGI controller for object requests.""" server_type = 'Object' @@ -281,7 +321,7 @@ class BaseObjectController(Controller): """Handler for HTTP HEAD requests.""" return self.GETorHEAD(req) - def _get_updating_namespaces( + def _do_get_updating_namespaces( self, req, account, container, includes=None): """ Fetch namespaces in 'updating' states from given `account/container`. @@ -310,7 +350,7 @@ class BaseObjectController(Controller): def _get_update_shard_caching_disabled(self, req, account, container, obj): """ - Fetch all updating shard ranges for the given root container when + Fetch the corresponding updating shard range for the given object when all caching is disabled. :param req: original Request instance. @@ -321,7 +361,7 @@ class BaseObjectController(Controller): or None if the update should go back to the root """ # legacy behavior requests container server for includes=obj - namespaces, response = self._get_updating_namespaces( + namespaces, response = self._do_get_updating_namespaces( req, account, container, includes=obj) record_cache_op_metrics( self.logger, self.server_type.lower(), 'shard_updating', @@ -329,6 +369,21 @@ class BaseObjectController(Controller): # there will be only one Namespace in the list if any return namespaces[0] if namespaces else None + def _get_backend_updating_namespaces(self, req, account, container): + """ + Retrieve the updating namespaces from the backend. + + :param req: original Request instance. + :param account: account from which namespaces should be fetched. + :param container: container from which namespaces should be fetched. + :return: a tuple of (NamespaceBoundList, response). + """ + # pull full set of updating namespaces from backend + namespaces, backend_response = self._do_get_updating_namespaces( + req, account, container) + ns_bound_list = NamespaceBoundList.parse(namespaces) + return ns_bound_list, backend_response + def _get_update_shard(self, req, account, container, obj): """ Find the appropriate shard range for an object update. @@ -344,36 +399,42 @@ class BaseObjectController(Controller): :return: an instance of :class:`swift.common.utils.Namespace`, or None if the update should go back to the root """ - if not self.app.recheck_updating_shard_ranges: + memcache = cache_from_env(req.environ, True) + if not self.app.recheck_updating_shard_ranges or not memcache: # caching is disabled return self._get_update_shard_caching_disabled( req, account, container, obj) # caching is enabled, try to get from caches - response = None cache_key = get_cache_key(account, container, shard='updating') skip_chance = self.app.container_updating_shard_ranges_skip_cache ns_bound_list, get_cache_state = get_namespaces_from_cache( req, cache_key, skip_chance) + response = None if not ns_bound_list: - # namespaces not found in either infocache or memcache so pull full - # set of updating shard ranges from backend - namespaces, response = self._get_updating_namespaces( - req, account, container) - if namespaces: - # only store the list of namespace lower bounds and names into - # infocache and memcache. - ns_bound_list = NamespaceBoundList.parse(namespaces) - set_cache_state = set_namespaces_in_cache( - req, cache_key, ns_bound_list, - self.app.recheck_updating_shard_ranges) + # namespaces not found in memcache or cache was skipped, so pull + # the full set of updating shard ranges from the backend and set in + # the memcache with the usage of cooperative token. + cache_populator = CooperativeNamespaceCachePopulator( + self, account, container, req, cache_key) + ns_bound_list = cache_populator.fetch_data() + if cache_populator.set_cache_state: + # record the general cache set metrics. record_cache_op_metrics( self.logger, self.server_type.lower(), 'shard_updating', - set_cache_state, None) - if set_cache_state == 'set': - self.logger.info( - 'Caching updating shards for %s (%d shards)', - cache_key, len(namespaces)) + cache_populator.set_cache_state, None) + # TODO: use enum to unify 'set_cache_state' in existing + # 'set_namespaces_in_cache' and CooperativeCachePopulator, and + # convert existing usages of response to just status code. + if cache_populator.set_cache_state == 'set': + message = "Caching updating shards for %s (%d shards)" % ( + cache_key, len(ns_bound_list)) + if cache_populator.token_acquired: + message += " with a finished token" + self.logger.info(message) + response = cache_populator.backend_resp + + # record the general cache get metrics. record_cache_op_metrics( self.logger, self.server_type.lower(), 'shard_updating', get_cache_state, response) @@ -1130,6 +1191,7 @@ class ECAppIter(object): :param logger: a logger """ + def __init__(self, path, policy, internal_parts_iters, range_specs, fa_length, obj_length, logger): self.path = path @@ -1693,6 +1755,7 @@ class Putter(object): :param logger: a Logger instance :param chunked: boolean indicating if the request encoding is chunked """ + def __init__(self, conn, node, resp, path, connect_duration, watchdog, write_timeout, send_exception_handler, logger, chunked=False): @@ -1853,6 +1916,7 @@ class MIMEPutter(Putter): An HTTP PUT request that supports streaming. """ + def __init__(self, conn, node, resp, path, connect_duration, watchdog, write_timeout, send_exception_handler, logger, mime_boundary, multiphase=False): @@ -2095,6 +2159,7 @@ class ECGetResponseBucket(object): A helper class to encapsulate the properties of buckets in which fragment getters and alternate nodes are collected. """ + def __init__(self, policy, timestamp): """ :param policy: an instance of ECStoragePolicy @@ -2228,6 +2293,7 @@ class ECGetResponseCollection(object): This class encapsulates logic for selecting the best bucket from the collection, and for choosing alternate nodes. """ + def __init__(self, policy): """ :param policy: an instance of ECStoragePolicy diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 8646cc5bfb..250dd29b21 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -37,9 +37,10 @@ from swift.common.utils import Watchdog, get_logger, \ affinity_key_function, affinity_locality_predicate, list_from_csv, \ parse_prefixed_conf, config_auto_int_value, node_to_string, \ config_request_node_count_value, config_percent_value, cap_length, \ - parse_options + parse_options, non_negative_int, config_positive_float_value from swift.common.registry import register_swift_info from swift.common.constraints import check_utf8, valid_api_version +from swift.common.statsd_client import get_labeled_statsd_client from swift.proxy.controllers import AccountController, ContainerController, \ ObjectControllerRouter, InfoController from swift.proxy.controllers.base import get_container_info, \ @@ -53,6 +54,9 @@ from swift.common.exceptions import APIVersionError from swift.common.wsgi import run_wsgi from swift.obj import expirer +DEFAULT_NAMESPACE_AVG_BACKEND_FETCH_TIME = 0.3 # seconds +DEFAULT_NAMESPACE_CACHE_TOKENS_PER_SESSION = 3 # 3 tokens per session + # List of entry points for mandatory middlewares. # @@ -196,7 +200,7 @@ class Application(object): """WSGI application for the proxy server.""" def __init__(self, conf, logger=None, account_ring=None, - container_ring=None): + container_ring=None, statsd=None): # This is for the sake of tests which instantiate an Application # directly rather than via loadapp(). self._pipeline_final_app = self @@ -208,6 +212,7 @@ class Application(object): statsd_tail_prefix='proxy-server') else: self.logger = logger + self.statsd = statsd or get_labeled_statsd_client(conf, self.logger) self.backend_user_agent = 'proxy-server %s' % os.getpid() swift_dir = conf.get('swift_dir', '/etc/swift') @@ -249,6 +254,20 @@ class Application(object): 'container_listing_shard_ranges_skip_cache_pct', 0)) self.account_existence_skip_cache = config_percent_value( conf.get('account_existence_skip_cache_pct', 0)) + self.namespace_avg_backend_fetch_time = \ + config_positive_float_value( + conf.get( + 'namespace_avg_backend_fetch_time', + DEFAULT_NAMESPACE_AVG_BACKEND_FETCH_TIME + ) + ) + self.namespace_cache_tokens_per_session = \ + non_negative_int( + conf.get( + 'namespace_cache_tokens_per_session', + DEFAULT_NAMESPACE_CACHE_TOKENS_PER_SESSION + ) + ) self.allow_account_management = \ config_true_value(conf.get('allow_account_management', 'no')) self.container_ring = container_ring or Ring(swift_dir, diff --git a/test/unit/common/utils/test_config.py b/test/unit/common/utils/test_config.py index 6598408672..7d0b531a60 100644 --- a/test/unit/common/utils/test_config.py +++ b/test/unit/common/utils/test_config.py @@ -150,6 +150,33 @@ class TestUtilsConfig(unittest.TestCase): else: self.assertEqual(expected, rv) + def test_config_positive_float_value(self): + # Test cases that should pass + for value, expected in ( + (99, 99.0), + (99.01, 99.01), + ('99', 99.0), + ('99.01', 99.01), + (1, 1.0), + ('0.00001', 0.00001), + ): + actual = config.config_positive_float_value(value) + self.assertEqual(expected, actual) + + # Test cases that should raise ValueError + for value in ( + 0, '0', 0.0, '0.0', + -99, -99.01, + '-99', '-99.01', + None, 'not-a-float' + ): + with self.assertRaises(ValueError) as cm: + config.config_positive_float_value(value) + expected_msg = ( + 'Config option must be a positive float number, not "%s".' % + value) + self.assertEqual(expected_msg, str(cm.exception)) + def test_config_float_value(self): for args, expected in ( ((99, None, None), 99.0), diff --git a/test/unit/proxy/controllers/test_base.py b/test/unit/proxy/controllers/test_base.py index 493553db4a..1b54fd7f70 100644 --- a/test/unit/proxy/controllers/test_base.py +++ b/test/unit/proxy/controllers/test_base.py @@ -200,12 +200,6 @@ class BaseTest(unittest.TestCase): @patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())]) class TestFuncs(BaseTest): - def test_get_namespaces_from_cache_disabled(self): - cache_key = 'shard-updating-v2/a/c/' - req = Request.blank('a/c') - actual = get_namespaces_from_cache(req, cache_key, 0) - self.assertEqual((None, 'disabled'), actual) - def test_get_namespaces_from_cache_miss(self): cache_key = 'shard-updating-v2/a/c/' req = Request.blank('a/c') @@ -273,7 +267,7 @@ class TestFuncs(BaseTest): self.assertEqual((None, 'error'), actual) def test_set_namespaces_in_cache_disabled(self): - cache_key = 'shard-updating-v2/a/c/' + cache_key = 'shard-testing-v2/a/c/' ns_bound_list = NamespaceBoundList([['', 'sr1'], ['k', 'sr2']]) req = Request.blank('a/c') actual = set_namespaces_in_cache(req, cache_key, ns_bound_list, 123) @@ -282,7 +276,7 @@ class TestFuncs(BaseTest): req.environ['swift.infocache']) def test_set_namespaces_in_cache_ok(self): - cache_key = 'shard-updating-v2/a/c/' + cache_key = 'shard-testing-v2/a/c/' ns_bound_list = NamespaceBoundList([['', 'sr1'], ['k', 'sr2']]) req = Request.blank('a/c') req.environ['swift.cache'] = self.cache @@ -294,7 +288,7 @@ class TestFuncs(BaseTest): self.assertEqual(123, self.cache.times.get(cache_key)) def test_set_namespaces_in_cache_infocache_exists(self): - cache_key = 'shard-updating-v2/a/c/' + cache_key = 'shard-testing-v2/a/c/' ns_bound_list = NamespaceBoundList([['', 'sr1'], ['k', 'sr2']]) req = Request.blank('a/c') req.environ['swift.infocache'] = {'already': 'exists'} @@ -304,7 +298,7 @@ class TestFuncs(BaseTest): req.environ['swift.infocache']) def test_set_namespaces_in_cache_error(self): - cache_key = 'shard-updating-v2/a/c/' + cache_key = 'shard-testing-v2/a/c/' ns_bound_list = NamespaceBoundList([['', 'sr1'], ['k', 'sr2']]) req = Request.blank('a/c') req.environ['swift.cache'] = self.cache @@ -314,6 +308,16 @@ class TestFuncs(BaseTest): self.assertEqual(ns_bound_list, req.environ['swift.infocache'].get(cache_key)) + def test_set_namespaces_in_cache_raises_exception(self): + cache_key = 'shard-updating-v2/a/c/' + ns_bound_list = NamespaceBoundList([['', 'sr1'], ['k', 'sr2']]) + req = Request.blank('a/c') + with self.assertRaises(ValueError) as cm: + set_namespaces_in_cache(req, cache_key, ns_bound_list, 123) + self.assertIn('shard-updating cache should use ' + 'CooperativeNamespaceCachePopulator', + str(cm.exception)) + def test_get_info_zero_recheck(self): mock_cache = mock.Mock() mock_cache.get.return_value = None diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index 4e9fbd6d44..74cb470e14 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -26,6 +26,7 @@ from contextlib import contextmanager import json from unittest import mock +import eventlet from eventlet import Timeout, sleep from eventlet.queue import Empty @@ -46,7 +47,7 @@ from swift.proxy.controllers.base import \ NodeIter from swift.common.storage_policy import POLICIES, ECDriverError, \ StoragePolicy, ECStoragePolicy -from swift.common.swob import Request, wsgi_to_str +from swift.common.swob import Request, Response, wsgi_to_str from test.debug_logger import debug_logger from test.unit import ( FakeRing, fake_http_connect, patch_policies, SlowBody, FakeStatus, @@ -8014,9 +8015,11 @@ class TestGetUpdateShard(BaseObjectControllerMixin, unittest.TestCase): self.assertFalse(self.app.logger.get_lines_for_level('error')) def test_get_update_shard_cache_not_available(self): - # verify case when memcache is not available + # when memcache is not available, object controller will only need to + # retrieve a specific shard range from the container server to send the + # update request to. req = Request.blank('/v1/a/c/o', method='PUT') - body, resp_headers = self._create_response_data(self.shard_ranges) + body, resp_headers = self._create_response_data([self.shard_ranges[1]]) with mocked_http_conn( 200, 200, body_iter=iter([b'', body]), headers=resp_headers) as fake_conn: @@ -8031,7 +8034,9 @@ class TestGetUpdateShard(BaseObjectControllerMixin, unittest.TestCase): self.assertEqual('a/c', captured[1]['path'][7:]) params = sorted(captured[1]['qs'].split('&')) self.assertEqual( - ['format=json', 'states=updating'], params) + ['format=json', 'includes=' + quote(self.item), 'states=updating'], + params + ) captured_hdrs = captured[1]['headers'] self.assertEqual('shard', captured_hdrs.get('X-Backend-Record-Type')) self.assertEqual('namespace', @@ -8039,7 +8044,7 @@ class TestGetUpdateShard(BaseObjectControllerMixin, unittest.TestCase): self.assertIsNone(self.memcache.get('shard-updating-v2/a/c')) exp_ns = Namespace(self.shard_ranges[1].name, self.shard_ranges[1].lower, - self.shard_ranges[2].lower) + self.shard_ranges[1].upper) self.assertEqual(exp_ns, actual) self.assertFalse(self.app.logger.get_lines_for_level('error')) @@ -8116,7 +8121,7 @@ class TestGetUpdatingNamespacesErrors(BaseObjectControllerMixin, resp_headers = {'X-Backend-Record-Type': 'shard'} with mocked_http_conn(200, 200, body_iter=iter([b'', body]), headers=resp_headers): - actual, resp = self.ctrl._get_updating_namespaces( + actual, resp = self.ctrl._do_get_updating_namespaces( req, 'a', 'c', '1_test') self.assertEqual(200, resp.status_int) self.assertIsNone(actual) @@ -8174,7 +8179,7 @@ class TestGetUpdatingNamespacesErrors(BaseObjectControllerMixin, body = json.dumps([dict(sr)]).encode('ascii') with mocked_http_conn( 200, 200, body_iter=iter([b'', body])): - actual, resp = self.ctrl._get_updating_namespaces( + actual, resp = self.ctrl._do_get_updating_namespaces( req, 'a', 'c', '1_test') self.assertEqual(200, resp.status_int) self.assertIsNone(actual) @@ -8192,7 +8197,7 @@ class TestGetUpdatingNamespacesErrors(BaseObjectControllerMixin, with mocked_http_conn( 200, 200, body_iter=iter([b'', body]), headers=headers): - actual, resp = self.ctrl._get_updating_namespaces( + actual, resp = self.ctrl._do_get_updating_namespaces( req, 'a', 'c', '1_test') self.assertEqual(200, resp.status_int) self.assertIsNone(actual) @@ -8205,7 +8210,7 @@ class TestGetUpdatingNamespacesErrors(BaseObjectControllerMixin, def test_get_namespaces_request_failed(self): req = Request.blank('/v1/a/c/o', method='PUT') with mocked_http_conn(200, 404, 404, 404): - actual, resp = self.ctrl._get_updating_namespaces( + actual, resp = self.ctrl._do_get_updating_namespaces( req, 'a', 'c', '1_test') self.assertEqual(404, resp.status_int) self.assertIsNone(actual) @@ -8224,5 +8229,859 @@ class TestGetUpdatingNamespacesErrors(BaseObjectControllerMixin, self.assertFalse(warning_lines[1:]) +@patch_policies([ + StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()), + StoragePolicy(1, 'one', object_ring=FakeRing()), +]) +class TestCooperativeToken(BaseObjectControllerMixin, unittest.TestCase): + """ + Test suite for cooperative token functionality in object controllers + """ + + def setUp(self): + super(TestCooperativeToken, self).setUp() + # Import needed modules from test_server.py context + from test.debug_logger import debug_labeled_statsd_client + + conf = { + 'log_statsd_host': 'host', + 'log_statsd_port': 8125, + 'statsd_label_mode': 'dogstatsd', + 'statsd_emit_legacy': True, + } + self.statsd = debug_labeled_statsd_client(conf) + # Reset the application with statsd + self.app = PatchedObjControllerApp( + conf, account_ring=FakeRing(), + container_ring=FakeRing(), logger=self.logger) + self.app.statsd = self.statsd + self.logger.clear() + + def _check_request(self, request, method=None, path=None, headers=None, + params=None): + """Helper method to check request attributes""" + if method: + self.assertEqual(request['method'], method) + if path: + self.assertEqual(request['path'], path) + if headers: + for header, value in headers.items(): + self.assertEqual(request['headers'][header], value) + if params: + actual_params = dict(parse_qsl(request['qs'], + keep_blank_values=True)) + self.assertEqual(actual_params, params) + + def test_get_backend_updating_shard_with_cooperative_token_configs(self): + conf = {} + self.app = proxy_server.Application( + conf, + logger=self.logger, + account_ring=FakeRing(), + container_ring=FakeRing()) + self.assertEqual(self.app.namespace_avg_backend_fetch_time, 0.3) + self.assertEqual(self.app.namespace_cache_tokens_per_session, 3) + + conf = {'namespace_cache_tokens_per_session': '0'} + self.app = proxy_server.Application( + conf, + logger=self.logger, + account_ring=FakeRing(), + container_ring=FakeRing()) + self.assertEqual(self.app.namespace_avg_backend_fetch_time, 0.3) + self.assertEqual(self.app.namespace_cache_tokens_per_session, 0) + + conf = {'namespace_avg_backend_fetch_time': 0.2, + 'namespace_cache_tokens_per_session': 1} + self.app = proxy_server.Application( + conf, + logger=self.logger, + account_ring=FakeRing(), + container_ring=FakeRing()) + self.assertEqual(self.app.namespace_avg_backend_fetch_time, 0.2) + self.assertEqual(self.app.namespace_cache_tokens_per_session, 1) + + conf = {'namespace_avg_backend_fetch_time': 0.2, + 'namespace_cache_tokens_per_session': 1} + self.app = proxy_server.Application( + conf, + logger=self.logger, + account_ring=FakeRing(), + container_ring=FakeRing()) + self.assertEqual(self.app.namespace_avg_backend_fetch_time, 0.2) + self.assertEqual(self.app.namespace_cache_tokens_per_session, 1) + + def test_get_backend_updating_shard_with_cooperative_token_acquired(self): + # verify that the request to get updating shard from the container + # backend works with cooperative token acquired. + # reset the router post patch_policies + conf = {} + self.app = proxy_server.Application( + conf, + logger=self.logger, + statsd=self.statsd, + account_ring=FakeRing(), + container_ring=FakeRing()) + self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes + + def do_test(method, sharding_state): + self.app.logger.clear() # clean capture state + self.app.statsd.clear() + req = Request.blank( + '/v1/a/c/o', {'swift.cache': FakeMemcache()}, + method=method, body='', headers={'Content-Type': 'text/plain'}) + + cache_key = 'shard-updating-v2/a/c' + token_key = "_cache_token/%s" % cache_key + if not random.choice([True, False]): + # Add some randomization to this test case. If True, this + # request which gets updating shard will be the first to + # acquire a token; otherwise, it would be the second one. + req.environ['swift.cache'].incr(token_key) + + # we want the container_info response to say policy index of 1 and + # sharding state + # acc HEAD, cont HEAD, cont shard GET, obj POSTs + status_codes = (200, 200, 200, 202, 202, 202) + resp_headers = {'X-Backend-Storage-Policy-Index': 1, + 'x-backend-sharding-state': sharding_state, + 'X-Backend-Record-Type': 'shard'} + shard_ranges = [ + utils.ShardRange( + '.shards_a/c_not_used', utils.Timestamp.now(), '', 'l'), + utils.ShardRange( + '.shards_a/c_shard', utils.Timestamp.now(), 'l', 'u'), + utils.ShardRange( + '.shards_a/c_nope', utils.Timestamp.now(), 'u', ''), + ] + body = json.dumps([ + dict(shard_range) + for shard_range in shard_ranges]).encode('ascii') + with mocked_http_conn(*status_codes, headers=resp_headers, + body=body) as fake_conn: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 202) + stats = self.app.logger.statsd_client.get_stats_counts() + self.assertEqual( + { + "account.info.cache.miss.200": 1, + "account.info.infocache.hit": 2, + "container.info.cache.miss.200": 1, + "container.info.infocache.hit": 1, + "object.shard_updating.cache.miss.200": 1, + "object.shard_updating.cache.set": 1, + }, + stats, + ) + stats = self.app.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'shard_updating'), + ('account', 'a'), + ('container', 'c'), + ('event', 'backend_reqs'), + ('set_cache_state', 'set'), + ('token', 'with_token'), + ('status', 200)), + )): 1, + }, stats) + self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix']) + info_lines = self.logger.get_lines_for_level('info') + self.assertIn( + 'Caching updating shards for shard-updating-v2/a/c (3 shards)' + ' with a finished token', + info_lines) + + backend_requests = fake_conn.requests + account_request = backend_requests[0] + self._check_request( + account_request, method='HEAD', path='/sda/0/a') + container_request = backend_requests[1] + self._check_request( + container_request, method='HEAD', path='/sda/0/a/c') + container_request_shard = backend_requests[2] + self._check_request( + container_request_shard, method='GET', path='/sda/0/a/c', + params={'states': 'updating', 'format': 'json'}, + headers={'X-Backend-Record-Type': 'shard'}) + + self.assertIn(cache_key, req.environ['swift.cache'].store) + cached_namespaces = NamespaceBoundList.parse(shard_ranges) + self.assertEqual( + req.environ['swift.cache'].store[cache_key], + cached_namespaces.bounds) + self.assertIn(cache_key, req.environ.get('swift.infocache')) + self.assertEqual( + req.environ['swift.infocache'][cache_key].bounds, + cached_namespaces.bounds) + + # make sure backend requests included expected container headers + for (i, device), request in zip( + enumerate(['sda', 'sdb', 'sdc']), backend_requests[3:]): + expectations = { + 'method': method, + 'path': f'/{device}/0/a/c/o', + 'headers': { + 'X-Container-Partition': '0', + 'Host': 'localhost:80', + 'Referer': '%s http://localhost/v1/a/c/o' % method, + 'X-Backend-Storage-Policy-Index': '1', + 'X-Backend-Quoted-Container-Path': + shard_ranges[1].name, + 'X-Container-Device': device, + 'X-Container-Host': '10.0.0.%d:100%d' % (i, i), + }, + } + self._check_request(request, **expectations) + + do_test('POST', 'sharding') + do_test('POST', 'sharded') + do_test('DELETE', 'sharding') + do_test('DELETE', 'sharded') + do_test('PUT', 'sharding') + do_test('PUT', 'sharded') + + def test_get_backend_updating_shard_wo_cooperative_token_acquired(self): + # verify that the request to get updating shard from the container + # backend will be served out of memcached when other requests have + # grabbed all available cooperative tokens and filled the updating + # shard ranges into the memcache. + # reset the router post patch_policies + conf = { + 'namespace_cache_tokens_per_session': 2, + 'namespace_avg_backend_fetch_time': 0.05, + } + self.app = proxy_server.Application( + conf, + logger=self.logger, + statsd=self.statsd, + account_ring=FakeRing(), + container_ring=FakeRing()) + self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes + cache_key = 'shard-updating-v2/a/c' + token_key = "_cache_token/%s" % cache_key + + def do_test(method, sharding_state): + retries = 0 + + class CustomizedFakeCache(FakeMemcache): + def get(self, key, raise_on_error=False): + nonlocal retries + if key != cache_key: + return super(CustomizedFakeCache, self).get(key) + + retries += 1 + if retries < 4: + return super(CustomizedFakeCache, self).get( + "NOT_EXISTED_YET") + else: + return super(CustomizedFakeCache, self).get(key) + + self.app.logger.clear() # clean capture state + self.app.statsd.clear() + req = Request.blank( + '/v1/a/c/o', {'swift.cache': CustomizedFakeCache()}, + method=method, body='', headers={'Content-Type': 'text/plain'}) + + # we want the container_info response to say policy index of 1 and + # sharding state + # acc HEAD, cont HEAD, obj POSTs + status_codes = (200, 200, 202, 202, 202) + resp_headers = {'X-Backend-Storage-Policy-Index': 1, + 'x-backend-sharding-state': sharding_state, + 'X-Backend-Record-Type': 'shard'} + shard_ranges = [ + utils.ShardRange( + '.shards_a/c_not_used', utils.Timestamp.now(), '', 'l'), + utils.ShardRange( + '.shards_a/c_shard', utils.Timestamp.now(), 'l', 'u'), + utils.ShardRange( + '.shards_a/c_nope', utils.Timestamp.now(), 'u', ''), + ] + cached_namespaces = NamespaceBoundList.parse(shard_ranges) + body = json.dumps([ + dict(shard_range) + for shard_range in shard_ranges]).encode('ascii') + + # Preset 'token_key' to be value of 3, then make this request of + # getting updating shard to not able to acquire a token. + req.environ['swift.cache'].incr(token_key, 3) + # Preset the cache value, but only available after 4 retries. + req.environ['swift.cache'].set(cache_key, cached_namespaces.bounds) + + with mock.patch('swift.common.utils.eventlet.sleep'), \ + mocked_http_conn(*status_codes, headers=resp_headers, + body=body) as fake_conn: + resp = req.get_response(self.app) + + self.assertEqual(4, retries) + self.assertEqual(resp.status_int, 202) + stats = self.app.logger.statsd_client.get_stats_counts() + self.assertEqual({'account.info.cache.miss.200': 1, + 'account.info.infocache.hit': 1, + 'container.info.cache.miss.200': 1, + 'container.info.infocache.hit': 1, + 'object.shard_updating.cache.miss': 1, + }, + stats) + stats = self.app.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'shard_updating'), + ('account', 'a'), + ('container', 'c'), + ('event', 'cache_served'), + ('token', 'no_token'), + ('lack_retries', False)), + )): 1, + }, stats) + self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix']) + + backend_requests = fake_conn.requests + account_request = backend_requests[0] + self._check_request( + account_request, method='HEAD', path='/sda/0/a') + container_request = backend_requests[1] + self._check_request( + container_request, method='HEAD', path='/sda/0/a/c') + + self.assertIn(cache_key, req.environ['swift.cache'].store) + self.assertEqual( + req.environ['swift.cache'].store[cache_key], + cached_namespaces.bounds) + + # make sure backend requests included expected container headers + for (i, device), request in zip( + enumerate(['sda', 'sdb', 'sdc']), backend_requests[2:]): + expectations = { + 'method': method, + 'path': f'/{device}/0/a/c/o', + 'headers': { + 'X-Container-Partition': '0', + 'Host': 'localhost:80', + 'Referer': '%s http://localhost/v1/a/c/o' % method, + 'X-Backend-Storage-Policy-Index': '1', + 'X-Backend-Quoted-Container-Path': + shard_ranges[1].name, + 'X-Container-Device': device, + 'X-Container-Host': '10.0.0.%d:100%d' % (i, i), + }, + } + self._check_request(request, **expectations) + + do_test('POST', 'sharding') + do_test('POST', 'sharded') + do_test('DELETE', 'sharding') + do_test('DELETE', 'sharded') + do_test('PUT', 'sharding') + do_test('PUT', 'sharded') + + def test_get_backend_updating_shard_wo_token_lack_retries(self): + # verify that the request to get updating shard from the container + # backend will be served out of memcached when other requests have + # grabbed all available cooperative tokens. Due to simulated busy + # eventlet scheduler, this request would be underserved and only get + # two normal retries during the token session, but eventually get data + # from cache by use of the forced retry. + # reset the router post patch_policies + conf = { + 'namespace_avg_backend_fetch_time': 0.005, + } + self.app = proxy_server.Application( + conf, + logger=self.logger, + statsd=self.statsd, + account_ring=FakeRing(), + container_ring=FakeRing()) + self.app.obj_controller_router = proxy_server.ObjectControllerRouter() + self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes + self.app.recheck_updating_shard_ranges = 3600 + cache_key = 'shard-updating-v2/a/c' + token_key = "_cache_token/%s" % cache_key + + def do_test(method, sharding_state): + retries = 0 + + class CustomizedFakeCache(FakeMemcache): + def get(self, key, raise_on_error=False): + nonlocal retries + if key != cache_key: + return super(CustomizedFakeCache, self).get(key) + + retries += 1 + if retries < 3: + return super(CustomizedFakeCache, self).get( + "NOT_EXISTED_YET") + else: + return super(CustomizedFakeCache, self).get(key) + + self.app.logger.clear() # clean capture state + self.app.statsd.clear() + req = Request.blank( + '/v1/a/c/o', {'swift.cache': CustomizedFakeCache()}, + method=method, body='', headers={'Content-Type': 'text/plain'}) + + # we want the container_info response to say policy index of 1 and + # sharding state + # acc HEAD, cont HEAD, obj POSTs + status_codes = (200, 200, 202, 202, 202) + resp_headers = {'X-Backend-Storage-Policy-Index': 1, + 'x-backend-sharding-state': sharding_state, + 'X-Backend-Record-Type': 'shard'} + shard_ranges = [ + utils.ShardRange( + '.shards_a/c_not_used', utils.Timestamp.now(), '', 'l'), + utils.ShardRange( + '.shards_a/c_shard', utils.Timestamp.now(), 'l', 'u'), + utils.ShardRange( + '.shards_a/c_nope', utils.Timestamp.now(), 'u', ''), + ] + cached_namespaces = NamespaceBoundList.parse(shard_ranges) + body = json.dumps([ + dict(shard_range) + for shard_range in shard_ranges]).encode('ascii') + + # Preset 'token_key' to be value of 3, then make this request of + # getting updating shard to not able to acquire a token. + req.environ['swift.cache'].incr(token_key, 3) + # Preset the cache value, but only available after 4 retries. + req.environ['swift.cache'].set(cache_key, cached_namespaces.bounds) + + with mock.patch( + 'swift.proxy.controllers.obj.time.time') as mock_time, \ + mock.patch('swift.common.utils.sleep') as mock_sleep, \ + mocked_http_conn( + *status_codes, headers=resp_headers, body=body) \ + as fake_conn: + mock_time.side_effect = itertools.count(4000.99, 1.0) + resp = req.get_response(self.app) + + # our populator only sleeps once, when it wakes up we're past the + # deadline and make one more try! + self.assertEqual([mock.call(0.005 * 1.5)], + mock_sleep.call_args_list) + # N.B. one of these memcache.get "attempts" happens *before* + # coop-populator; it's very first "retry" is already after cuttoff! + self.assertEqual(3, retries) + self.assertEqual(resp.status_int, 202) + stats = self.app.logger.statsd_client.get_stats_counts() + self.assertEqual({'account.info.cache.miss.200': 1, + 'account.info.infocache.hit': 1, + 'container.info.cache.miss.200': 1, + 'container.info.infocache.hit': 1, + 'object.shard_updating.cache.miss': 1}, + stats) + stats = self.app.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'shard_updating'), + ('account', 'a'), + ('container', 'c'), + ('event', 'cache_served'), + ('token', 'no_token'), + ('lack_retries', True)), + )): 1, + }, stats) + self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix']) + + backend_requests = fake_conn.requests + account_request = backend_requests[0] + self._check_request( + account_request, method='HEAD', path='/sda/0/a') + container_request = backend_requests[1] + self._check_request( + container_request, method='HEAD', path='/sda/0/a/c') + + self.assertIn(cache_key, req.environ['swift.cache'].store) + self.assertEqual( + req.environ['swift.cache'].store[cache_key], + cached_namespaces.bounds) + + # make sure backend requests included expected container headers + for (i, device), request in zip( + enumerate(['sda', 'sdb', 'sdc']), backend_requests[2:]): + expectations = { + 'method': method, + 'path': f'/{device}/0/a/c/o', + 'headers': { + 'X-Container-Partition': '0', + 'Host': 'localhost:80', + 'Referer': '%s http://localhost/v1/a/c/o' % method, + 'X-Backend-Storage-Policy-Index': '1', + 'X-Backend-Quoted-Container-Path': + shard_ranges[1].name, + 'X-Container-Device': device, + 'X-Container-Host': '10.0.0.%d:100%d' % (i, i), + }, + } + self._check_request(request, **expectations) + + do_test('POST', 'sharding') + do_test('POST', 'sharded') + do_test('DELETE', 'sharding') + do_test('DELETE', 'sharded') + do_test('PUT', 'sharding') + do_test('PUT', 'sharded') + + def test_get_backend_updating_shard_with_cooperative_token_timeout(self): + # verify that the request to get updating shard from the container + # backend works with cooperative token timeout. + conf = { + 'namespace_avg_backend_fetch_time': 0.01, + } + self.app = proxy_server.Application( + conf, + logger=self.logger, + statsd=self.statsd, + account_ring=FakeRing(), + container_ring=FakeRing()) + self.app.obj_controller_router = proxy_server.ObjectControllerRouter() + self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes + self.app.recheck_updating_shard_ranges = 3600 + cache_key = 'shard-updating-v2/a/c' + token_key = "_cache_token/%s" % cache_key + + def do_test(method, sharding_state): + + class CustomizedFakeCache(FakeMemcache): + def get(self, key, raise_on_error=False): + if key != cache_key: + return super(CustomizedFakeCache, self).get(key) + # all fail forever - just like real memcache! + return super(CustomizedFakeCache, self).get( + "NOT_EXISTED_YET") + + self.app.logger.clear() # clean capture state + self.app.statsd.clear() + req = Request.blank( + '/v1/a/c/o', {'swift.cache': CustomizedFakeCache()}, + method=method, body='', headers={'Content-Type': 'text/plain'}) + + # Preset 'token_key' to be value of 3+, then make this request of + # getting updating shard to not able to acquire a token. + req.environ['swift.cache'].incr(token_key, 30) + + # we want the container_info response to say policy index of 1 and + # sharding state + # acc HEAD, cont HEAD, cont shard GET, obj POSTs + status_codes = (200, 200, 200, 202, 202, 202) + resp_headers = {'X-Backend-Storage-Policy-Index': 1, + 'x-backend-sharding-state': sharding_state, + 'X-Backend-Record-Type': 'shard'} + shard_ranges = [ + utils.ShardRange( + '.shards_a/c_not_used', utils.Timestamp.now(), '', 'l'), + utils.ShardRange( + '.shards_a/c_shard', utils.Timestamp.now(), 'l', 'u'), + utils.ShardRange( + '.shards_a/c_nope', utils.Timestamp.now(), 'u', ''), + ] + body = json.dumps([ + dict(shard_range) + for shard_range in shard_ranges]).encode('ascii') + with mocked_http_conn(*status_codes, headers=resp_headers, + body=body) as fake_conn: + resp = req.get_response(self.app) + + self.assertEqual(resp.status_int, 202) + stats = self.app.logger.statsd_client.get_stats_counts() + self.assertEqual( + { + 'account.info.cache.miss.200': 1, + 'account.info.infocache.hit': 2, + 'container.info.cache.miss.200': 1, + 'container.info.infocache.hit': 1, + 'object.shard_updating.cache.miss.200': 1, + 'object.shard_updating.cache.set': 1, + }, + stats + ) + stats = self.app.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'shard_updating'), + ('account', 'a'), + ('container', 'c'), + ('event', 'backend_reqs'), + ('token', 'no_token'), + ('lack_retries', False), + ('set_cache_state', 'set'), + ('status', 200)), + )): 1, + }, stats) + self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix']) + info_lines = self.logger.get_lines_for_level('info') + self.assertIn( + 'Caching updating shards for shard-updating-v2/a/c (3 shards)', + info_lines) + + backend_requests = fake_conn.requests + account_request = backend_requests[0] + self._check_request( + account_request, method='HEAD', path='/sda/0/a') + container_request = backend_requests[1] + self._check_request( + container_request, method='HEAD', path='/sda/0/a/c') + container_request_shard = backend_requests[2] + self._check_request( + container_request_shard, method='GET', path='/sda/0/a/c', + params={'states': 'updating', 'format': 'json'}, + headers={'X-Backend-Record-Type': 'shard'}) + + self.assertIn(cache_key, req.environ['swift.cache'].store) + cached_namespaces = NamespaceBoundList.parse(shard_ranges) + self.assertEqual( + req.environ['swift.cache'].store[cache_key], + cached_namespaces.bounds) + self.assertIn(cache_key, req.environ.get('swift.infocache')) + self.assertEqual( + req.environ['swift.infocache'][cache_key].bounds, + cached_namespaces.bounds) + + # make sure backend requests included expected container headers + for (i, device), request in zip( + enumerate(['sda', 'sdb', 'sdc']), backend_requests[3:]): + expectations = { + 'method': method, + 'path': f'/{device}/0/a/c/o', + 'headers': { + 'X-Container-Partition': '0', + 'Host': 'localhost:80', + 'Referer': '%s http://localhost/v1/a/c/o' % method, + 'X-Backend-Storage-Policy-Index': '1', + 'X-Backend-Quoted-Container-Path': + shard_ranges[1].name, + 'X-Container-Device': device, + 'X-Container-Host': '10.0.0.%d:100%d' % (i, i), + }, + } + self._check_request(request, **expectations) + + do_test('POST', 'sharding') + do_test('POST', 'sharded') + do_test('DELETE', 'sharding') + do_test('DELETE', 'sharded') + do_test('PUT', 'sharding') + do_test('PUT', 'sharded') + + def test_get_backend_updating_shard_concurrent_reqs_cooperatively(self): + self.memcache = FakeMemcache() + self.logger.clear() + self.statsd.clear() + conf = { + 'namespace_cache_use_token': 'True', + 'namespace_avg_backend_fetch_time': 0.003, + } + shard_ranges = [ + utils.ShardRange( + '.shards_a/c_not_used', utils.Timestamp.now(), '', 'l'), + utils.ShardRange( + '.shards_a/c_shard', utils.Timestamp.now(), 'l', 'u'), + utils.ShardRange( + '.shards_a/c_nope', utils.Timestamp.now(), 'u', ''), + ] + cache_key = 'shard-updating-v2/a/c' + + def delayed_fetch_backend(self): + eventlet.sleep(0.0005) + body = json.dumps([ + dict(shard_range) + for shard_range in shard_ranges]).encode('ascii') + resp = Response(body=body, status=200) + return NamespaceBoundList.parse(shard_ranges), resp + + def worker(method, unique_path): + app = proxy_server.Application( + conf, + logger=self.logger, + statsd=self.statsd, + account_ring=FakeRing(), + container_ring=FakeRing()) + app.obj_controller_router = proxy_server.ObjectControllerRouter() + app.sort_nodes = lambda nodes, *args, **kwargs: nodes + app.recheck_updating_shard_ranges = 3600 + req = Request.blank( + unique_path, {'swift.cache': self.memcache}, + method=method, body='', headers={'Content-Type': 'text/plain'}) + resp = req.get_response(app) + self.assertEqual(resp.status_int, 200) + + num_processes = 10 + status_codes = (200, 200, 200, 200, 200) * num_processes + # we want the container_info response to say policy index of 1 and + # sharding state + resp_headers = { + 'X-Backend-Storage-Policy-Index': 1, + 'x-backend-sharding-state': 'sharding', + 'X-Backend-Record-Type': 'shard' + } + with mocked_http_conn(*status_codes, headers=resp_headers), \ + mock.patch( + 'swift.proxy.controllers.obj.' + 'CooperativeNamespaceCachePopulator.do_fetch_backend', + delayed_fetch_backend): + pool = eventlet.GreenPool() + for i in range(num_processes): + pool.spawn(worker, 'POST', '/v1/a/c/o' + str(i)) + pool.waitall() + + stats = self.app.logger.statsd_client.get_stats_counts() + self.assertEqual( + { + "account.info.cache.miss.200": num_processes, + "account.info.infocache.hit": num_processes, + "container.info.cache.miss.200": num_processes, + "container.info.infocache.hit": num_processes, + "object.shard_updating.cache.set": 3, + "object.shard_updating.cache.miss.200": 3, + 'object.shard_updating.cache.miss': num_processes - 3, + }, + stats, + ) + stats = self.app.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('account', 'a'), + ('container', 'c'), + ('resource', 'shard_updating'), + ('event', 'backend_reqs'), + ('token', 'with_token'), + ('set_cache_state', 'set'), + ('status', 200)), + )): 3, + ('swift_coop_cache', frozenset(( + ('account', 'a'), + ('container', 'c'), + ('resource', 'shard_updating'), + ('event', 'cache_served'), + ('token', 'no_token'), + ('lack_retries', False)), + )): num_processes - 3 + }, stats) + self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix']) + self.assertIn(cache_key, self.memcache.store) + cached_namespaces = NamespaceBoundList.parse(shard_ranges) + self.assertEqual( + self.memcache.store[cache_key], cached_namespaces.bounds) + + def test_get_backend_updating_shard_concurrent_reqs_with_failures(self): + # Tests token-based cooperative caching resilience when 1-2 of the 3 + # token winners fail to fetch shard ranges (503 errors) during 100 + # concurrent requests. Verifies that other token winners successfully + # retry and cache results, serving the remaining 97-99 requests from + # cache without additional backend calls, demonstrating proper failure + # handling. + self.memcache = FakeMemcache() + self.logger.clear() + conf = {'namespace_cache_use_token': 'True'} + shard_ranges = [ + utils.ShardRange( + '.shards_a/c_not_used', utils.Timestamp.now(), '', 'l'), + utils.ShardRange( + '.shards_a/c_shard', utils.Timestamp.now(), 'l', 'u'), + utils.ShardRange( + '.shards_a/c_nope', utils.Timestamp.now(), 'u', ''), + ] + cache_key = 'shard-updating-v2/a/c' + failures = random.randint(1, 2) + failures_injected = 0 + + def delayed_fetch_backend(self): + nonlocal failures_injected + eventlet.sleep(0.2) + body = json.dumps([ + dict(shard_range) + for shard_range in shard_ranges]).encode('ascii') + resp = Response(body=body, status=200) + if failures_injected < failures: + failures_injected += 1 + return None, Response(status=503) + else: + return NamespaceBoundList.parse(shard_ranges), resp + + def worker(method, unique_path): + app = proxy_server.Application( + conf, + logger=self.logger, + statsd=self.statsd, + account_ring=FakeRing(), + container_ring=FakeRing()) + app.obj_controller_router = proxy_server.ObjectControllerRouter() + app.sort_nodes = lambda nodes, *args, **kwargs: nodes + app.recheck_updating_shard_ranges = 3600 + req = Request.blank( + unique_path, {'swift.cache': self.memcache}, + method=method, body='', headers={'Content-Type': 'text/plain'}) + resp = req.get_response(app) + self.assertEqual(resp.status_int, 200) + + num_processes = 100 + status_codes = ([200, 200, 200, 200, 200] * num_processes) + # we want the container_info response to say policy index of 1 and + # sharding state + resp_headers = { + 'X-Backend-Storage-Policy-Index': 1, + 'x-backend-sharding-state': 'sharding', + 'X-Backend-Record-Type': 'shard' + } + with mocked_http_conn(*status_codes, headers=resp_headers), \ + mock.patch( + 'swift.proxy.controllers.obj.' + 'CooperativeNamespaceCachePopulator.do_fetch_backend', + delayed_fetch_backend): + pool = eventlet.GreenPool() + for i in range(num_processes): + pool.spawn(worker, 'POST', '/v1/a/c/o' + str(i)) + pool.waitall() + + stats = self.app.logger.statsd_client.get_stats_counts() + expected = { + 'account.info.cache.miss.200': num_processes, + 'account.info.infocache.hit': num_processes, + 'container.info.cache.miss.200': num_processes, + 'container.info.infocache.hit': num_processes, + 'object.shard_updating.cache.miss.503': failures, + 'object.shard_updating.cache.set': 3 - failures, + 'object.shard_updating.cache.miss.200': 3 - failures, + 'object.shard_updating.cache.miss': num_processes - 3, + } + self.assertEqual(expected, stats) + + stats = self.app.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('account', 'a'), + ('container', 'c'), + ('resource', 'shard_updating'), + ('event', 'backend_reqs'), + ('token', 'with_token'), + ('status', 503)), + )): failures, + ('swift_coop_cache', frozenset(( + ('account', 'a'), + ('container', 'c'), + ('resource', 'shard_updating'), + ('event', 'backend_reqs'), + ('set_cache_state', 'set'), + ('token', 'with_token'), + ('status', 200)), + )): 3 - failures, + ('swift_coop_cache', frozenset(( + ('account', 'a'), + ('container', 'c'), + ('resource', 'shard_updating'), + ('event', 'cache_served'), + ('token', 'no_token'), + ('lack_retries', False)), + )): num_processes - 3 + }, stats) + self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix']) + self.assertIn(cache_key, self.memcache.store) + cached_namespaces = NamespaceBoundList.parse(shard_ranges) + self.assertEqual( + self.memcache.store[cache_key], cached_namespaces.bounds) + + if __name__ == '__main__': unittest.main() diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 307807f1b1..e92ab06209 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -50,7 +50,8 @@ from io import BytesIO from urllib.parse import quote, parse_qsl from test import listen_zero -from test.debug_logger import debug_logger, FakeStatsdClient +from test.debug_logger import debug_logger, FakeStatsdClient, \ + debug_labeled_statsd_client from test.unit import ( connect_tcp, readuntil2crlfs, fake_http_connect, FakeRing, FakeMemcache, patch_policies, write_fake_ring, mocked_http_conn, @@ -2568,9 +2569,17 @@ class TestReplicatedObjectController( skip_if_no_xattrs() _test_servers[0].error_limiter.stats.clear() # clear out errors self.logger = debug_logger('proxy-ut') + conf = { + 'log_statsd_host': 'host', + 'log_statsd_port': 8125, + 'statsd_label_mode': 'dogstatsd', + 'statsd_emit_legacy': True, + } + self.statsd = debug_labeled_statsd_client(conf) self.app = proxy_server.Application( - None, + conf, logger=self.logger, + statsd=self.statsd, account_ring=FakeRing(), container_ring=FakeRing()) self.policy = POLICIES[0] @@ -4380,9 +4389,11 @@ class TestReplicatedObjectController( self.app.obj_controller_router = proxy_server.ObjectControllerRouter() self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes self.app.recheck_updating_shard_ranges = 3600 + self.app.namespace_cache_tokens_per_session = 0 def do_test(method, sharding_state): self.app.logger.clear() # clean capture state + self.app.statsd.clear() req = Request.blank( '/v1/a/c/o', {'swift.cache': FakeMemcache()}, method=method, body='', headers={'Content-Type': 'text/plain'}) @@ -4411,13 +4422,26 @@ class TestReplicatedObjectController( self.assertEqual(resp.status_int, 202) stats = self.app.logger.statsd_client.get_stats_counts() - self.assertEqual({'account.info.cache.miss.200': 1, - 'account.info.infocache.hit': 2, - 'container.info.cache.miss.200': 1, - 'container.info.infocache.hit': 1, - 'object.shard_updating.cache.miss.200': 1, - 'object.shard_updating.cache.set': 1}, - stats) + self.assertEqual(stats, { + 'account.info.cache.miss.200': 1, + 'account.info.infocache.hit': 2, + 'container.info.cache.miss.200': 1, + 'container.info.infocache.hit': 1, + 'object.shard_updating.cache.miss.200': 1, + 'object.shard_updating.cache.set': 1 + }) + stats = self.app.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'shard_updating'), + ('account', 'a'), + ('container', 'c'), + ('event', 'backend_reqs'), + ('set_cache_state', 'set'), + ('token', 'disabled'), + ('status', 200)), + )): 1, + }, stats) self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix']) info_lines = self.logger.get_lines_for_level('info') self.assertIn( @@ -4603,10 +4627,13 @@ class TestReplicatedObjectController( utils.ShardRange( '.shards_a/c_nope', utils.Timestamp.now(), 'u', ''), ] + cache = FakeMemcache() infocache = { 'shard-updating-v2/a/c': NamespaceBoundList.parse(shard_ranges)} - req = Request.blank('/v1/a/c/o', {'swift.infocache': infocache}, + req = Request.blank('/v1/a/c/o', + {'swift.cache': cache, + 'swift.infocache': infocache}, method=method, body='', headers={'Content-Type': 'text/plain'}) @@ -4625,9 +4652,9 @@ class TestReplicatedObjectController( self.assertEqual(resp.status_int, 202) stats = self.app.logger.statsd_client.get_stats_counts() - self.assertEqual({'account.info.cache.disabled.200': 1, + self.assertEqual({'account.info.cache.miss.200': 1, 'account.info.infocache.hit': 1, - 'container.info.cache.disabled.200': 1, + 'container.info.cache.miss.200': 1, 'container.info.infocache.hit': 1, 'object.shard_updating.infocache.hit': 1}, stats) # verify statsd prefix is not mutated @@ -4692,9 +4719,11 @@ class TestReplicatedObjectController( self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes self.app.recheck_updating_shard_ranges = 3600 self.app.container_updating_shard_ranges_skip_cache = 0.001 + self.app.namespace_cache_tokens_per_session = 0 def do_test(method, sharding_state): self.app.logger.clear() # clean capture state + self.app.statsd.clear() cached_shard_ranges = [ utils.ShardRange( '.shards_a/c_nope', utils.Timestamp.now(), '', 'l'), @@ -4773,16 +4802,29 @@ class TestReplicatedObjectController( self.assertEqual(resp.status_int, 202) stats = self.app.logger.statsd_client.get_stats_counts() - self.assertEqual({'account.info.cache.miss.200': 1, - 'account.info.infocache.hit': 1, - 'container.info.cache.miss.200': 1, - 'container.info.infocache.hit': 2, - 'object.shard_updating.cache.hit': 1, - 'container.info.cache.hit': 1, - 'account.info.cache.hit': 1, - 'object.shard_updating.cache.skip.200': 1, - 'object.shard_updating.cache.set': 1}, - stats) + self.assertEqual(stats, { + 'account.info.cache.miss.200': 1, + 'account.info.infocache.hit': 1, + 'container.info.cache.miss.200': 1, + 'container.info.infocache.hit': 2, + 'object.shard_updating.cache.hit': 1, + 'container.info.cache.hit': 1, + 'account.info.cache.hit': 1, + 'object.shard_updating.cache.skip.200': 1, + 'object.shard_updating.cache.set': 1 + }) + stats = self.app.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'shard_updating'), + ('account', 'a'), + ('container', 'c'), + ('event', 'backend_reqs'), + ('set_cache_state', 'set'), + ('token', 'disabled'), + ('status', 200)), + )): 1, + }, stats) # verify statsd prefix is not mutated self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix']) @@ -4854,6 +4896,18 @@ class TestReplicatedObjectController( 'object.shard_updating.cache.error.200': 1, 'object.shard_updating.cache.set': 2 }) + stats = self.app.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'shard_updating'), + ('account', 'a'), + ('container', 'c'), + ('event', 'backend_reqs'), + ('set_cache_state', 'set'), + ('token', 'disabled'), + ('status', 200)), + )): 2, + }, stats) do_test('POST', 'sharding') do_test('POST', 'sharded') @@ -4874,9 +4928,11 @@ class TestReplicatedObjectController( self.app.sort_nodes = lambda nodes, *args, **kwargs: nodes self.app.recheck_updating_shard_ranges = 3600 self.app.container_updating_shard_ranges_skip_cache = 0.001 + self.app.namespace_cache_tokens_per_session = 0 def do_test(method, sharding_state): self.app.logger.clear() # clean capture state + self.app.statsd.clear() # simulate memcache error when setting updating namespaces; # expect 4 memcache sets: account info, container info, container # info again from namespaces GET subrequest, namespaces @@ -4908,13 +4964,26 @@ class TestReplicatedObjectController( self.assertEqual(resp.status_int, 202) stats = self.app.logger.statsd_client.get_stats_counts() - self.assertEqual({'account.info.cache.miss.200': 1, - 'account.info.infocache.hit': 2, - 'container.info.cache.miss.200': 1, - 'container.info.infocache.hit': 1, - 'object.shard_updating.cache.skip.200': 1, - 'object.shard_updating.cache.set_error': 1}, - stats) + self.assertEqual(stats, { + 'account.info.cache.miss.200': 1, + 'account.info.infocache.hit': 2, + 'container.info.cache.miss.200': 1, + 'container.info.infocache.hit': 1, + 'object.shard_updating.cache.skip.200': 1, + 'object.shard_updating.cache.set_error': 1 + }) + stats = self.app.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'shard_updating'), + ('account', 'a'), + ('container', 'c'), + ('event', 'backend_reqs'), + ('set_cache_state', 'set_error'), + ('token', 'disabled'), + ('status', 200)), + )): 1, + }, stats) # verify statsd prefix is not mutated self.assertEqual([], self.app.logger.log_dict['set_statsd_prefix']) # sanity check: namespaces not in cache

AltStyle によって変換されたページ (->オリジナル) /