diff --git a/etc/container-server.conf-sample b/etc/container-server.conf-sample index 7184c1fb26..25cf917efa 100644 --- a/etc/container-server.conf-sample +++ b/etc/container-server.conf-sample @@ -507,6 +507,12 @@ use = egg:swift#xprofile # The default is 12 hours (12 x 60 x 60) # recon_sharded_timeout = 43200 # +# Maximum amount of time in seconds after sharding has been started on a shard +# container and before it's considered as timeout. After this amount of time, +# sharder will warn that a container DB has not completed sharding. +# The default is 48 hours (48 x 60 x 60) +# container_sharding_timeout = 172800 +# # Large databases tend to take a while to work with, but we want to make sure # we write down our progress. Use a larger-than-normal broker timeout to make # us less likely to bomb out on a LockTimeout. diff --git a/swift/container/sharder.py b/swift/container/sharder.py index 466325a100..a316b67496 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -709,6 +709,8 @@ class ContainerSharderConf(object): 'recon_candidates_limit', int, 5) self.recon_sharded_timeout = get_val( 'recon_sharded_timeout', int, 43200) + self.container_sharding_timeout = get_val( + 'container_sharding_timeout', int, 172800) self.conn_timeout = get_val( 'conn_timeout', float, 5) self.auto_shard = get_val( @@ -910,36 +912,59 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): category['top'] = candidates def _record_sharding_progress(self, broker, node, error): - own_shard_range = broker.get_own_shard_range() db_state = broker.get_db_state() - if (db_state in (UNSHARDED, SHARDING, SHARDED) - and own_shard_range.state in (ShardRange.SHARDING, - ShardRange.SHARDED)): - if db_state == SHARDED: - contexts = CleavingContext.load_all(broker) - if not contexts: - return - context_ts = max(float(ts) for c, ts in contexts) - if context_ts + self.recon_sharded_timeout \ - < Timestamp.now().timestamp: - # last context timestamp too old for the - # broker to be recorded - return + if db_state not in (UNSHARDED, SHARDING, SHARDED): + return + own_shard_range = broker.get_own_shard_range() + if own_shard_range.state not in ( + ShardRange.SHARDING, ShardRange.SHARDED, + ShardRange.SHRINKING, ShardRange.SHRUNK): + return - info = self._make_stats_info(broker, node, own_shard_range) - info['state'] = own_shard_range.state_text - info['db_state'] = broker.get_db_state() - states = [ShardRange.FOUND, ShardRange.CREATED, - ShardRange.CLEAVED, ShardRange.ACTIVE] - shard_ranges = broker.get_shard_ranges(states=states) - state_count = {} - for state in states: - state_count[ShardRange.STATES[state]] = 0 - for shard_range in shard_ranges: - state_count[shard_range.state_text] += 1 - info.update(state_count) - info['error'] = error and str(error) - self._append_stat('sharding_in_progress', 'all', info) + if db_state == SHARDED: + contexts = CleavingContext.load_all(broker) + if not contexts: + return + context_ts = max(float(ts) for c, ts in contexts) + if context_ts + self.recon_sharded_timeout \ + < float(Timestamp.now()): + # last context timestamp too old for the + # broker to be recorded + return + + info = self._make_stats_info(broker, node, own_shard_range) + info['state'] = own_shard_range.state_text + info['db_state'] = broker.get_db_state() + states = [ShardRange.FOUND, ShardRange.CREATED, + ShardRange.CLEAVED, ShardRange.ACTIVE] + shard_ranges = broker.get_shard_ranges(states=states) + state_count = {} + for state in states: + state_count[ShardRange.STATES[state]] = 0 + for shard_range in shard_ranges: + state_count[shard_range.state_text] += 1 + info.update(state_count) + info['error'] = error and str(error) + self._append_stat('sharding_in_progress', 'all', info) + + if broker.sharding_required() and ( + own_shard_range.epoch is not None) and ( + float(own_shard_range.epoch) + + self.container_sharding_timeout < + time.time()): + # Note: There is no requirement that own_shard_range.epoch equals + # the time at which the own_shard_range was merged into the + # container DB, which predicates sharding starting. But s-m-s-r and + # auto-sharding do set epoch and then merge, so we use it to tell + # whether sharding has been taking too long or not. + self.logger.warning( + 'Cleaving has not completed in %.2f seconds since %s.' + ' Container DB file and path: %s (%s), DB state: %s,' + ' own_shard_range state: %s, state count of shard ranges: %s' % + (time.time() - float(own_shard_range.epoch), + own_shard_range.epoch.isoformat, broker.db_file, + quote(broker.path), db_state, + own_shard_range.state_text, str(state_count))) def _report_stats(self): # report accumulated stats since start of one sharder cycle diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index 7d44c00aa1..58b1e742a3 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -114,6 +114,21 @@ class BaseTestSharder(unittest.TestCase): self.assertNotEqual(old_db_id, broker.get_info()['id']) # sanity check return broker + def _make_shrinking_broker(self, account='.shards_a', container='shard_c', + lower='here', upper='there', objects=None): + # caller should merge any acceptor range(s) into returned broker + broker = self._make_broker(account=account, container=container) + for obj in objects or []: + broker.put_object(*obj) + own_shard_range = ShardRange( + broker.path, next(self.ts_iter), lower, upper, + state=ShardRange.SHRINKING, epoch=next(self.ts_iter)) + broker.merge_shard_ranges([own_shard_range]) + broker.set_sharding_sysmeta('Root', 'a/c') + self.assertFalse(broker.is_root_container()) # sanity check + self.assertTrue(broker.set_sharding_state()) + return broker + def _make_shard_ranges(self, bounds, state=None, object_count=0, timestamp=Timestamp.now(), **kwargs): if not isinstance(state, (tuple, list)): @@ -1922,22 +1937,14 @@ class TestSharder(BaseTestSharder): # 'unique' ensures fresh dbs on each test iteration unique[0] += 1 - broker = self._make_broker(account='.shards_a', - container='donor_%s' % unique[0]) - own_shard_range = ShardRange( - broker.path, next(self.ts_iter), 'h', 'w', - state=ShardRange.SHRINKING, epoch=next(self.ts_iter)) - broker.merge_shard_ranges([own_shard_range]) - broker.set_sharding_sysmeta('Root', 'a/c') - self.assertFalse(broker.is_root_container()) # sanity check - objects = [ ('i', self.ts_encoded(), 3, 'text/plain', 'etag_t', 0, 0), ('m', self.ts_encoded(), 33, 'text/plain', 'etag_m', 0, 0), ('w', self.ts_encoded(), 100, 'text/plain', 'etag_w', 0, 0), ] - for obj in objects: - broker.put_object(*obj) + broker = self._make_shrinking_broker( + container='donor_%s' % unique[0], lower='h', upper='w', + objects=objects) acceptor_epoch = next(self.ts_iter) acceptors = [ ShardRange('.shards_a/acceptor_%s_%s' % (unique[0], bounds[1]), @@ -1966,7 +1973,6 @@ class TestSharder(BaseTestSharder): db_hash[-3:], db_hash, db_name)) broker.merge_shard_ranges(acceptors) - broker.set_sharding_state() # run cleave with mock_timestamp_now_with_iter(self.ts_iter): @@ -1978,15 +1984,15 @@ class TestSharder(BaseTestSharder): context = CleavingContext.load(broker) self.assertTrue(context.misplaced_done) self.assertEqual(expect_delete, context.cleaving_done) + own_sr = broker.get_own_shard_range() if exp_progress_acceptors: expected_cursor = exp_progress_acceptors[-1].upper_str else: - expected_cursor = own_shard_range.lower_str + expected_cursor = own_sr.lower_str self.assertEqual(expected_cursor, context.cursor) self.assertEqual(3, context.cleave_to_row) self.assertEqual(3, context.max_row) self.assertEqual(SHARDING, broker.get_db_state()) - own_sr = broker.get_own_shard_range() if expect_delete and len(acceptor_bounds) == 1: self.assertTrue(own_sr.deleted) self.assertEqual(ShardRange.SHRUNK, own_sr.state) @@ -2754,24 +2760,18 @@ class TestSharder(BaseTestSharder): self.assertEqual(0, context.ranges_todo) def test_cleave_shrinking_to_active_root_range(self): - broker = self._make_broker(account='.shards_a', container='shard_c') - broker.put_object( - 'here_a', next(self.ts_iter), 10, 'text/plain', 'etag_a', 0, 0) - # a donor previously shrunk to own... + broker = self._make_shrinking_broker(account='.shards_a', + container='shard_c') deleted_range = ShardRange( '.shards/other', next(self.ts_iter), 'here', 'there', deleted=True, state=ShardRange.SHRUNK, epoch=next(self.ts_iter)) - own_shard_range = ShardRange( - broker.path, next(self.ts_iter), 'here', '', - state=ShardRange.SHRINKING, epoch=next(self.ts_iter)) # root is the acceptor... root = ShardRange( 'a/c', next(self.ts_iter), '', '', state=ShardRange.ACTIVE, epoch=next(self.ts_iter)) - broker.merge_shard_ranges([deleted_range, own_shard_range, root]) + broker.merge_shard_ranges([deleted_range, root]) broker.set_sharding_sysmeta('Root', 'a/c') self.assertFalse(broker.is_root_container()) # sanity check - self.assertTrue(broker.set_sharding_state()) # expect cleave to the root with self._mock_sharder() as sharder: @@ -2818,12 +2818,9 @@ class TestSharder(BaseTestSharder): # if shrinking shard has both active root and active other acceptor, # verify that shard only cleaves to one of them; # root will sort before acceptor if acceptor.upper==MAX - broker = self._make_broker(account='.shards_a', container='shard_c') - broker.put_object( - 'here_a', next(self.ts_iter), 10, 'text/plain', 'etag_a', 0, 0) - own_shard_range = ShardRange( - broker.path, next(self.ts_iter), 'here', 'there', - state=ShardRange.SHRINKING, epoch=next(self.ts_iter)) + objects = ( + ('here_a', next(self.ts_iter), 10, 'text/plain', 'etag_a', 0, 0),) + broker = self._make_shrinking_broker(objects=objects) # active acceptor with upper bound == MAX acceptor = ShardRange( '.shards/other', next(self.ts_iter), 'here', '', deleted=False, @@ -2832,10 +2829,9 @@ class TestSharder(BaseTestSharder): root = ShardRange( 'a/c', next(self.ts_iter), '', '', state=ShardRange.ACTIVE, epoch=next(self.ts_iter)) - broker.merge_shard_ranges([own_shard_range, acceptor, root]) + broker.merge_shard_ranges([acceptor, root]) broker.set_sharding_sysmeta('Root', 'a/c') self.assertFalse(broker.is_root_container()) # sanity check - self.assertTrue(broker.set_sharding_state()) # expect cleave to the root acceptor.upper = '' @@ -2859,12 +2855,9 @@ class TestSharder(BaseTestSharder): # if shrinking shard has both active root and active other acceptor, # verify that shard only cleaves to one of them; # root will sort after acceptor if acceptor.upper

AltStyle によって変換されたページ (->オリジナル) /