diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 2abf761dbd..05a47cde0a 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -32,8 +32,8 @@ from eventlet.support.greenlets import GreenletExit from swift.common.ring import Ring from swift.common.utils import whataremyips, unlink_older_than, lock_path, \ - compute_eta, get_logger, write_pickle, renamer, dump_recon_cache, \ - rsync_ip, mkdirs + compute_eta, get_logger, write_pickle, renamer, dump_recon_cache, \ + rsync_ip, mkdirs, TRUE_VALUES from swift.common.bufferedhttp import http_connect from swift.common.daemon import Daemon from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE @@ -204,7 +204,7 @@ def get_hashes(partition_dir, recalculate=[], do_listdir=False, if modified: with lock_path(partition_dir): if not os.path.exists(hashes_file) or \ - os.path.getmtime(hashes_file) == mtime: + os.path.getmtime(hashes_file) == mtime: write_pickle( hashes, hashes_file, partition_dir, PICKLE_PROTOCOL) return hashed, hashes @@ -247,9 +247,9 @@ class ObjectReplicator(Daemon): self.logger = get_logger(conf, log_route='object-replicator') self.devices_dir = conf.get('devices', '/srv/node') self.mount_check = conf.get('mount_check', 'true').lower() in \ - ('true', 't', '1', 'on', 'yes', 'y') + TRUE_VALUES self.vm_test_mode = conf.get( - 'vm_test_mode', 'no').lower() in ('yes', 'true', 'on', '1') + 'vm_test_mode', 'no').lower() in TRUE_VALUES self.swift_dir = conf.get('swift_dir', '/etc/swift') self.port = int(conf.get('bind_port', 6000)) self.concurrency = int(conf.get('concurrency', 1)) @@ -278,8 +278,9 @@ class ObjectReplicator(Daemon): ret_val = None try: with Timeout(self.rsync_timeout): - proc = subprocess.Popen(args, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) + proc = subprocess.Popen(args, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) results = proc.stdout.read() ret_val = proc.wait() except Timeout: @@ -298,7 +299,7 @@ class ObjectReplicator(Daemon): self.logger.error(result) if ret_val: self.logger.error(_('Bad rsync return code: %(args)s -> %(ret)d'), - {'args': str(args), 'ret': ret_val}) + {'args': str(args), 'ret': ret_val}) elif results: self.logger.info( _("Successful rsync of %(src)s at %(dst)s (%(time).03f)"), @@ -384,13 +385,15 @@ class ObjectReplicator(Daemon): success = self.rsync(node, job, suffixes) if success: with Timeout(self.http_timeout): - http_connect(node['ip'], node['port'], + http_connect( + node['ip'], node['port'], node['device'], job['partition'], 'REPLICATE', '/' + '-'.join(suffixes), - headers={'Content-Length': '0'}).getresponse().read() + headers={'Content-Length': '0'}).\ + getresponse().read() responses.append(success) - if not suffixes or (len(responses) == \ - len(job['nodes']) and all(responses)): + if not suffixes or (len(responses) == + len(job['nodes']) and all(responses)): self.logger.info(_("Removing partition: %s"), job['path']) tpool.execute(shutil.rmtree, job['path'], ignore_errors=True) except (Exception, Timeout): @@ -409,49 +412,57 @@ class ObjectReplicator(Daemon): self.logger.increment('partition.update.count.%s' % (job['device'],)) begin = time.time() try: - hashed, local_hash = tpool_reraise(get_hashes, job['path'], - do_listdir=(self.replication_count % 10) == 0, - reclaim_age=self.reclaim_age) + hashed, local_hash = tpool_reraise( + get_hashes, job['path'], + do_listdir=(self.replication_count % 10) == 0, + reclaim_age=self.reclaim_age) self.suffix_hash += hashed self.logger.update_stats('suffix.hashes', hashed) attempts_left = len(job['nodes']) - nodes = itertools.chain(job['nodes'], - self.object_ring.get_more_nodes(int(job['partition']))) + nodes = itertools.chain( + job['nodes'], + self.object_ring.get_more_nodes(int(job['partition']))) while attempts_left> 0: # If this throws StopIterator it will be caught way below node = next(nodes) attempts_left -= 1 try: with Timeout(self.http_timeout): - resp = http_connect(node['ip'], node['port'], - node['device'], job['partition'], 'REPLICATE', + resp = http_connect( + node['ip'], node['port'], + node['device'], job['partition'], 'REPLICATE', '', headers={'Content-Length': '0'}).getresponse() if resp.status == HTTP_INSUFFICIENT_STORAGE: self.logger.error(_('%(ip)s/%(device)s responded' - ' as unmounted'), node) + ' as unmounted'), node) attempts_left += 1 continue if resp.status != HTTP_OK: self.logger.error(_("Invalid response %(resp)s " - "from %(ip)s"), - {'resp': resp.status, 'ip': node['ip']}) + "from %(ip)s"), + {'resp': resp.status, + 'ip': node['ip']}) continue remote_hash = pickle.loads(resp.read()) del resp suffixes = [suffix for suffix in local_hash if - local_hash[suffix] != remote_hash.get(suffix, -1)] + local_hash[suffix] != + remote_hash.get(suffix, -1)] if not suffixes: continue - hashed, recalc_hash = tpool_reraise(get_hashes, + hashed, recalc_hash = tpool_reraise( + get_hashes, job['path'], recalculate=suffixes, reclaim_age=self.reclaim_age) self.logger.update_stats('suffix.hashes', hashed) local_hash = recalc_hash suffixes = [suffix for suffix in local_hash if - local_hash[suffix] != remote_hash.get(suffix, -1)] + local_hash[suffix] != + remote_hash.get(suffix, -1)] self.rsync(node, job, suffixes) with Timeout(self.http_timeout): - conn = http_connect(node['ip'], node['port'], + conn = http_connect( + node['ip'], node['port'], node['device'], job['partition'], 'REPLICATE', '/' + '-'.join(suffixes), headers={'Content-Length': '0'}) @@ -460,7 +471,7 @@ class ObjectReplicator(Daemon): self.logger.update_stats('suffix.syncs', len(suffixes)) except (Exception, Timeout): self.logger.exception(_("Error syncing with node: %s") % - node) + node) self.suffix_count += len(local_hash) except (Exception, Timeout): self.logger.exception(_("Error syncing partition")) @@ -475,29 +486,34 @@ class ObjectReplicator(Daemon): if self.replication_count: elapsed = (time.time() - self.start) or 0.000001 rate = self.replication_count / elapsed - self.logger.info(_("%(replicated)d/%(total)d (%(percentage).2f%%)" - " partitions replicated in %(time).2fs (%(rate).2f/sec, " - "%(remaining)s remaining)"), + self.logger.info( + _("%(replicated)d/%(total)d (%(percentage).2f%%)" + " partitions replicated in %(time).2fs (%(rate).2f/sec, " + "%(remaining)s remaining)"), {'replicated': self.replication_count, 'total': self.job_count, 'percentage': self.replication_count * 100.0 / self.job_count, 'time': time.time() - self.start, 'rate': rate, 'remaining': '%d%s' % compute_eta(self.start, - self.replication_count, self.job_count)}) + self.replication_count, + self.job_count)}) if self.suffix_count: - self.logger.info(_("%(checked)d suffixes checked - " - "%(hashed).2f%% hashed, %(synced).2f%% synced"), + self.logger.info( + _("%(checked)d suffixes checked - " + "%(hashed).2f%% hashed, %(synced).2f%% synced"), {'checked': self.suffix_count, 'hashed': (self.suffix_hash * 100.0) / self.suffix_count, 'synced': (self.suffix_sync * 100.0) / self.suffix_count}) self.partition_times.sort() - self.logger.info(_("Partition times: max %(max).4fs, " - "min %(min).4fs, med %(med).4fs"), + self.logger.info( + _("Partition times: max %(max).4fs, " + "min %(min).4fs, med %(med).4fs"), {'max': self.partition_times[-1], 'min': self.partition_times[0], 'med': self.partition_times[ - len(self.partition_times) // 2]}) + len(self.partition_times) // 2]}) else: - self.logger.info(_("Nothing replicated for %s seconds."), + self.logger.info( + _("Nothing replicated for %s seconds."), (time.time() - self.start)) def kill_coros(self): @@ -538,7 +554,8 @@ class ObjectReplicator(Daemon): jobs = [] ips = whataremyips() for local_dev in [dev for dev in self.object_ring.devs - if dev and dev['ip'] in ips and dev['port'] == self.port]: + if dev and dev['ip'] in ips and + dev['port'] == self.port]: dev_path = join(self.devices_dir, local_dev['device']) obj_path = join(dev_path, 'objects') tmp_path = join(dev_path, 'tmp') @@ -563,11 +580,12 @@ class ObjectReplicator(Daemon): self.object_ring.get_part_nodes(int(partition)) nodes = [node for node in part_nodes if node['id'] != local_dev['id']] - jobs.append(dict(path=job_path, - device=local_dev['device'], - nodes=nodes, - delete=len(nodes)> len(part_nodes) - 1, - partition=partition)) + jobs.append( + dict(path=job_path, + device=local_dev['device'], + nodes=nodes, + delete=len(nodes)> len(part_nodes) - 1, + partition=partition)) except ValueError, OSError: continue random.shuffle(jobs) @@ -598,7 +616,7 @@ class ObjectReplicator(Daemon): continue if not self.check_ring(): self.logger.info(_("Ring change detected. Aborting " - "current replication pass.")) + "current replication pass.")) return if job['delete']: self.run_pool.spawn(self.update_deleted, job) @@ -638,5 +656,5 @@ class ObjectReplicator(Daemon): dump_recon_cache({'object_replication_time': total}, self.rcache, self.logger) self.logger.debug(_('Replication sleeping for %s seconds.'), - self.run_pause) + self.run_pause) sleep(self.run_pause)

AltStyle によって変換されたページ (->オリジナル) /