diff --git a/doc/source/development_watchers.rst b/doc/source/development_watchers.rst new file mode 100644 index 0000000000..48e9310b17 --- /dev/null +++ b/doc/source/development_watchers.rst @@ -0,0 +1,126 @@ +================ +Auditor Watchers +================ + +-------- +Overview +-------- + +The duty of auditors is to guard Swift against corruption in the +storage media. But because auditors crawl all objects, they can be +used to program Swift to operate on every object. It is done through +an API known as "watcher". + +Watchers do not have any private view into the cluster. +An operator can write a standalone program that walks the +directories and performs any desired inspection or maintenance. +What watcher brings to the table is a framework to do the same +job easily, under resource restrictions already in place +for the auditor. + +Operations performed by watchers are often site-specific, or else +they would be incorporated into Swift already. However, the code in +the tree provides a reference implementation for convenience. +It is located in swift/obj/watchers/dark_data.py and implements +so-called "Dark Data Watcher". + +Currently, only object auditor supports the watchers. + +------------- +The API class +------------- + +The implementation of a watcher is a Python class that may look like this:: + + class MyWatcher(object): + + def __init__(self, conf, logger, **kwargs): + pass + + def start(self, audit_type, **kwargs): + pass + + def see_object(self, object_metadata, policy_index, partition, + data_file_path, **kwargs): + pass + + def end(self, **kwargs): + pass + +Arguments to watcher methods are passed as keyword arguments, +and methods are expected to consume new, unknown arguments. + +The method __init__() is used to save configuration and logger +at the start of the plug-in. + +The method start() is invoked when auditor starts a pass. +It usually resets counters. The argument `auditor_type` is string of +`"ALL"` or `"ZBF"`, according to the type of the auditor running +the watcher. Watchers that talk to the network tend to hang off the +ALL-type auditor, the lightweight ones are okay with the ZBF-type. + +The method end() is the closing bracket for start(). It is typically +used to log something, or dump some statistics. + +The method see_object() is called when auditor completed an audit +of an object. This is where most of the work is done. + +The protocol for see_object() allows it to raise a special exception, +QuarantienRequested. Auditor catches it and quarantines the object. +In general, it's okay for watcher methods to throw exceptions, so +an author of a watcher plugin does not have to catch them explicitly +with a try:; they can be just permitted to bubble up naturally. + +------------------- +Loading the plugins +------------------- + +Swift auditor loads watcher classes from eggs, so it is necessary +to wrap the class and provide it an entry point:: + + $ cat /usr/lib/python3.8/site-p*/mywatcher*egg-info/entry_points.txt + [mywatcher.mysection] + mywatcherentry = mywatcher:MyWatcher + +Operator tells Swift auditor what plugins to load by adding them +to object-server.conf in the section [object-auditor]. It is also +possible to pass parameters, arriving in the argument conf{} of +method start():: + + [object-auditor] + watchers = mywatcher#mywatcherentry,swift#dark_data + + [object-auditor:watcher:mywatcher#mywatcherentry] + myparam=testing2020 + +Do not forget to remove the watcher from auditors when done. +Although the API itself is very lightweight, it is common for watchers +to incur a significant performance penalty: they can talk to networked +services or access additional objects. + +----------------- +Dark Data Watcher +----------------- + +The watcher API is assumed to be under development. Operators who +need extensions are welcome to report any needs for more arguments +to see_object(). For now, start by copying the provided template watcher +swift/obj/watchers/dark_data.py and see if it is sufficient. + +The name of "Dark Data" refers to the scientific hypothesis of Dark Matter, +which supposes that the universe contains a lot of matter than we cannot +observe. The Dark Data in Swift is the name of objects that are not +accounted in the containers. + +The experience of running large scale clusters suggests that Swift does +not have any particular bugs that trigger creation of dark data. So, +this is an excercise in writing watchers, with a plausible function. + +When enabled, Dark Data watcher definitely drags down the cluster's overall +performance, as mentioned above. Of course, the load increase can be +mitigated as usual, but at the expense of the total time taken by +the pass of auditor. + +Finally, keep in mind that Dark Data watcher needs the container +ring to operate, but runs on an object node. This can come up if +cluster has nodes separated by function. diff --git a/doc/source/index.rst b/doc/source/index.rst index 425b447f7e..f9f23e464f 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -88,6 +88,7 @@ Developer Documentation development_auth development_middleware development_ondisk_backends + development_watchers Administrator Documentation =========================== diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index b4ae1c25ea..1e1d3b6eed 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -480,6 +480,27 @@ use = egg:swift#recon # to 86400 (1 day). # rsync_tempfile_timeout = auto +# A comma-separated list of watcher entry points. This lets operators +# programmatically see audited objects. +# +# The entry point group name is "swift.object_audit_watcher". If your +# setup.py has something like this: +# +# entry_points={'swift.object_audit_watcher': [ +# 'some_watcher = some_module:Watcher']} +# +# then you would enable it with "watchers = some_package#some_watcher". +# For example, the built-in reference implementation is enabled as +# "watchers = swift#dark_data". +# +# watchers = + +# Watcher-specific parameters can he added after "object-auditor:watcher:" +# like the following (note that entry points are qualified by package#): +# +# [object-auditor:watcher:swift#dark_data] +# action=log + [object-expirer] # If this true, this expirer will execute tasks from legacy expirer task queue, # at least one object server should run with dequeue_from_legacy = true diff --git a/setup.cfg b/setup.cfg index a8ee4f617d..19d86c6c5b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -133,6 +133,9 @@ swift.diskfile = replication.fs = swift.obj.diskfile:DiskFileManager erasure_coding.fs = swift.obj.diskfile:ECDiskFileManager +swift.object_audit_watcher = + dark_data = swift.obj.watchers.dark_data:DarkDataWatcher + [egg_info] tag_build = tag_date = 0 diff --git a/swift/common/exceptions.py b/swift/common/exceptions.py index 675da10f89..2849284c69 100644 --- a/swift/common/exceptions.py +++ b/swift/common/exceptions.py @@ -235,6 +235,10 @@ class UnknownSecretIdError(EncryptionException): pass +class QuarantineRequest(SwiftException): + pass + + class ClientException(Exception): def __init__(self, msg, http_scheme='', http_host='', http_port='', diff --git a/swift/common/utils.py b/swift/common/utils.py index 1ace1beac7..e4637a6559 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -3071,6 +3071,27 @@ def readconf(conf_path, section_name=None, log_name=None, defaults=None, return conf +def parse_prefixed_conf(conf_file, prefix): + """ + Search the config file for any common-prefix sections and load those + sections to a dict mapping the after-prefix reference to options. + + :param conf_file: the file name of the config to parse + :param prefix: the common prefix of the sections + :return: a dict mapping policy reference -> dict of policy options + :raises ValueError: if a policy config section has an invalid name + """ + + ret_config = {} + all_conf = readconf(conf_file) + for section, options in all_conf.items(): + if not section.startswith(prefix): + continue + target_ref = section[len(prefix):] + ret_config[target_ref] = options + return ret_config + + def write_pickle(obj, dest, tmp=None, pickle_protocol=0): """ Ensure that a pickle file gets written to disk. The file diff --git a/swift/obj/auditor.py b/swift/obj/auditor.py index 490582d640..dba12a073b 100644 --- a/swift/obj/auditor.py +++ b/swift/obj/auditor.py @@ -25,19 +25,23 @@ from contextlib import closing from eventlet import Timeout from swift.obj import diskfile, replicator -from swift.common.utils import ( - get_logger, ratelimit_sleep, dump_recon_cache, list_from_csv, listdir, - unlink_paths_older_than, readconf, config_auto_int_value, round_robin_iter) from swift.common.exceptions import DiskFileQuarantined, DiskFileNotExist,\ - DiskFileDeleted, DiskFileExpired + DiskFileDeleted, DiskFileExpired, QuarantineRequest from swift.common.daemon import Daemon from swift.common.storage_policy import POLICIES +from swift.common.utils import ( + config_auto_int_value, dump_recon_cache, get_logger, list_from_csv, + listdir, load_pkg_resource, parse_prefixed_conf, ratelimit_sleep, + readconf, round_robin_iter, unlink_paths_older_than, PrefixLoggerAdapter) class AuditorWorker(object): """Walk through file system to audit objects""" - def __init__(self, conf, logger, rcache, devices, zero_byte_only_at_fps=0): + def __init__(self, conf, logger, rcache, devices, zero_byte_only_at_fps=0, + watcher_defs=None): + if watcher_defs is None: + watcher_defs = {} self.conf = conf self.logger = logger self.devices = devices @@ -95,6 +99,11 @@ class AuditorWorker(object): self.stats_buckets = dict( [(s, 0) for s in self.stats_sizes + ['OVER']]) + self.watchers = [ + WatcherWrapper(wdef['klass'], name, wdef['conf'], logger) + for name, wdef in watcher_defs.items()] + logger.debug("%d audit watcher(s) loaded", len(self.watchers)) + def create_recon_nested_dict(self, top_level_key, device_list, item): if device_list: device_key = ''.join(sorted(device_list)) @@ -114,6 +123,8 @@ class AuditorWorker(object): '%(description)s)') % {'mode': mode, 'audi_type': self.auditor_type, 'description': description}) + for watcher in self.watchers: + watcher.start(self.auditor_type) begin = reported = time.time() self.total_bytes_processed = 0 self.total_files_processed = 0 @@ -187,6 +198,8 @@ class AuditorWorker(object): 'frate': self.total_files_processed / elapsed, 'brate': self.total_bytes_processed / elapsed, 'audit': time_auditing, 'audit_rate': time_auditing / elapsed}) + for watcher in self.watchers: + watcher.end() if self.stats_sizes: self.logger.info( _('Object audit stats: %s') % json.dumps(self.stats_buckets)) @@ -259,6 +272,15 @@ class AuditorWorker(object): incr_by=chunk_len) self.bytes_processed += chunk_len self.total_bytes_processed += chunk_len + for watcher in self.watchers: + try: + watcher.see_object( + metadata, + df._ondisk_info['data_file']) + except QuarantineRequest: + raise df._quarantine( + df._data_file, + "Requested by %s" % watcher.watcher_name) except DiskFileQuarantined as err: self.quarantines += 1 self.logger.error(_('ERROR Object %(obj)s failed audit and was' @@ -303,6 +325,20 @@ class ObjectAuditor(Daemon): self.rcache = join(self.recon_cache_path, "object.recon") self.interval = int(conf.get('interval', 30)) + watcher_names = set(list_from_csv(conf.get('watchers', ''))) + # Normally '__file__' is always in config, but tests neglect it often. + watcher_configs = \ + parse_prefixed_conf(conf['__file__'], 'object-auditor:watcher:') \ + if '__file__' in conf else {} + self.watcher_defs = {} + for name in watcher_names: + self.logger.debug("Loading entry point '%s'", name) + wconf = dict(conf) + wconf.update(watcher_configs.get(name, {})) + self.watcher_defs[name] = { + 'conf': wconf, + 'klass': load_pkg_resource("swift.object_audit_watcher", name)} + def _sleep(self): time.sleep(self.interval) @@ -318,7 +354,8 @@ class ObjectAuditor(Daemon): device_dirs = kwargs.get('device_dirs') worker = AuditorWorker(self.conf, self.logger, self.rcache, self.devices, - zero_byte_only_at_fps=zero_byte_only_at_fps) + zero_byte_only_at_fps=zero_byte_only_at_fps, + watcher_defs=self.watcher_defs) worker.audit_all_objects(mode=mode, device_dirs=device_dirs) def fork_child(self, zero_byte_fps=False, sleep_between_zbf_scanner=False, @@ -438,3 +475,62 @@ class ObjectAuditor(Daemon): **kwargs) except (Exception, Timeout) as err: self.logger.exception(_('ERROR auditing: %s'), err) + + +class WatcherWrapper(object): + """ + Run the user-supplied watcher. + + Simple and gets the job done. Note that we aren't doing anything + to isolate ourselves from hangs or file descriptor leaks + in the plugins. + """ + + def __init__(self, watcher_class, watcher_name, conf, logger): + self.watcher_name = watcher_name + self.watcher_in_error = False + self.logger = PrefixLoggerAdapter(logger, {}) + self.logger.set_prefix('[audit-watcher %s] ' % watcher_name) + + try: + self.watcher = watcher_class(conf, self.logger) + except (Exception, Timeout): + self.logger.exception('Error intializing watcher') + self.watcher_in_error = True + + def start(self, audit_type): + if self.watcher_in_error: + return # can't trust the state of the thing; bail + try: + self.watcher.start(audit_type=audit_type) + except (Exception, Timeout): + self.logger.exception('Error starting watcher') + self.watcher_in_error = True + + def see_object(self, meta, data_file_path): + if self.watcher_in_error: + return # can't trust the state of the thing; bail + kwargs = {'object_metadata': meta, + 'data_file_path': data_file_path} + try: + self.watcher.see_object(**kwargs) + except QuarantineRequest: + # Avoid extra logging. + raise + except (Exception, Timeout): + self.logger.exception( + 'Error in see_object(meta=%r, data_file_path=%r)', + meta, data_file_path) + # Do *not* flag watcher as being in an error state; a failure + # to process one object shouldn't impact the ability to process + # others. + + def end(self): + if self.watcher_in_error: + return # can't trust the state of the thing; bail + kwargs = {} + try: + self.watcher.end(**kwargs) + except (Exception, Timeout): + self.logger.exception('Error ending watcher') + self.watcher_in_error = True diff --git a/swift/obj/watchers/__init__.py b/swift/obj/watchers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/swift/obj/watchers/dark_data.py b/swift/obj/watchers/dark_data.py new file mode 100644 index 0000000000..524fccc3a6 --- /dev/null +++ b/swift/obj/watchers/dark_data.py @@ -0,0 +1,146 @@ +# Copyright (c) 2019 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# This is an audit watcher that manages the dark data in the cluster. +# Since the API for audit watchers is intended to use external plugins, +# this code is invoked as if it were external: through pkg_resources. +# Our setup.py comes pre-configured for convenience, but the operator has +# to enable this watcher honestly by additing DarkDataWatcher to watchers= +# in object-server.conf. The default is off, as if this does not exist. +# Which is for the best, because of a large performance impact of this. +# + +import os +import random +import shutil + +from eventlet import Timeout + +from swift.common.direct_client import direct_get_container +from swift.common.exceptions import ClientException, QuarantineRequest +from swift.common.ring import Ring +from swift.common.utils import split_path + + +class ContainerError(Exception): + pass + + +class DarkDataWatcher(object): + def __init__(self, conf, logger): + + self.logger = logger + + swift_dir = '/etc/swift' + self.container_ring = Ring(swift_dir, ring_name='container') + self.dark_data_policy = conf.get('action') + if self.dark_data_policy not in ['log', 'delete', 'quarantine']: + self.logger.warning( + "Dark data action %r unknown, defaults to action = 'log'" % + (self.dark_data_policy,)) + self.dark_data_policy = 'log' + + def start(self, audit_type, **other_kwargs): + self.is_zbf = audit_type == 'ZBF' + self.tot_unknown = 0 + self.tot_dark = 0 + self.tot_okay = 0 + + def policy_based_object_handling(self, data_file_path, metadata): + obj_path = metadata['name'] + + if self.dark_data_policy == "quarantine": + self.logger.info("quarantining dark data %s" % obj_path) + raise QuarantineRequest + elif self.dark_data_policy == "log": + self.logger.info("reporting dark data %s" % obj_path) + elif self.dark_data_policy == "delete": + obj_dir = os.path.dirname(data_file_path) + self.logger.info("deleting dark data %s" % obj_dir) + shutil.rmtree(obj_dir) + + def see_object(self, object_metadata, data_file_path, **other_kwargs): + + # No point in loading the container servers with unnecessary requests. + if self.is_zbf: + return + + obj_path = object_metadata['name'] + try: + obj_info = get_info_1(self.container_ring, obj_path, self.logger) + except ContainerError: + self.tot_unknown += 1 + return + + if obj_info is None: + self.tot_dark += 1 + self.policy_based_object_handling(data_file_path, object_metadata) + else: + # OK, object is there, but in the future we might want to verify + # more. Watch out for versioned objects, EC, and all that. + self.tot_okay += 1 + + def end(self, **other_kwargs): + if self.is_zbf: + return + self.logger.info("total unknown %d ok %d dark %d" % + (self.tot_unknown, self.tot_okay, self.tot_dark)) + + +# +# Get the information for 1 object from container server +# +def get_info_1(container_ring, obj_path, logger): + + path_comps = split_path(obj_path, 1, 3, True) + account_name = path_comps[0] + container_name = path_comps[1] + obj_name = path_comps[2] + + container_part, container_nodes = \ + container_ring.get_nodes(account_name, container_name) + + if not container_nodes: + raise ContainerError() + + # Perhaps we should do something about the way we select the container + # nodes. For now we just shuffle. It spreads the load, but it does not + # improve upon the the case when some nodes are down, so auditor slows + # to a crawl (if this plugin is enabled). + random.shuffle(container_nodes) + + dark_flag = 0 + for node in container_nodes: + try: + headers, objs = direct_get_container( + node, container_part, account_name, container_name, + prefix=obj_name, limit=1) + except (ClientException, Timeout): + # Something is wrong with that server, treat as an error. + continue + if not objs or objs[0]['name'] != obj_name: + dark_flag += 1 + continue + return objs[0] + + # We do not ask for a quorum of container servers to know the object. + # Even if 1 server knows the object, we return with the info above. + # So, we only end here when all servers either have no record of the + # object or error out. In such case, even one non-error server means + # that the object is dark. + if dark_flag: + return None + raise ContainerError() diff --git a/swift/proxy/server.py b/swift/proxy/server.py index 46afe49b5c..a69531f9e1 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -35,7 +35,7 @@ from swift.common.ring import Ring from swift.common.utils import Watchdog, get_logger, \ get_remote_client, split_path, config_true_value, generate_trans_id, \ affinity_key_function, affinity_locality_predicate, list_from_csv, \ - register_swift_info, readconf, config_auto_int_value + register_swift_info, parse_prefixed_conf, config_auto_int_value from swift.common.constraints import check_utf8, valid_api_version from swift.proxy.controllers import AccountController, ContainerController, \ ObjectControllerRouter, InfoController @@ -773,15 +773,8 @@ def parse_per_policy_config(conf): :return: a dict mapping policy reference -> dict of policy options :raises ValueError: if a policy config section has an invalid name """ - policy_config = {} - all_conf = readconf(conf['__file__']) policy_section_prefix = conf['__name__'] + ':policy:' - for section, options in all_conf.items(): - if not section.startswith(policy_section_prefix): - continue - policy_ref = section[len(policy_section_prefix):] - policy_config[policy_ref] = options - return policy_config + return parse_prefixed_conf(conf['__file__'], policy_section_prefix) def app_factory(global_conf, **local_conf): diff --git a/test/probe/common.py b/test/probe/common.py index 9277198f7b..a03083f1bd 100644 --- a/test/probe/common.py +++ b/test/probe/common.py @@ -416,6 +416,11 @@ class ProbeTest(unittest.TestCase): def tearDown(self): Manager(['all']).kill() + def assertLengthEqual(self, obj, length): + obj_len = len(obj) + self.assertEqual(obj_len, length, 'len(%r) == %d, not %d' % ( + obj, obj_len, length)) + def device_dir(self, node): server_type, config_number = get_server_number( (node['ip'], node['port']), self.ipport2server) diff --git a/test/probe/test_dark_data.py b/test/probe/test_dark_data.py new file mode 100644 index 0000000000..19427e07d0 --- /dev/null +++ b/test/probe/test_dark_data.py @@ -0,0 +1,183 @@ +#!/usr/bin/python -u +# Copyright (c) 2010-2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import unittest + +import os +import uuid +import shutil + +from datetime import datetime +from six.moves.configparser import ConfigParser + +from test.probe.brain import BrainSplitter +from test.probe.common import ReplProbeTest +from swift.common import manager +from swift.common.storage_policy import get_policy_string +from swift.common.manager import Manager, Server +from swift.common.utils import readconf + + +CONF_SECTION = 'object-auditor:watcher:swift#dark_data' + + +class TestDarkDataDeletion(ReplProbeTest): + # NB: could be 'quarantine' in another test + action = 'delete' + + def setUp(self): + """ + Reset all environment and start all servers. + """ + super(TestDarkDataDeletion, self).setUp() + + self.conf_dest = \ + os.path.join('/tmp/', + datetime.now().strftime('swift-%Y-%m-%d_%H-%M-%S-%f')) + os.mkdir(self.conf_dest) + + object_server_dir = os.path.join(self.conf_dest, 'object-server') + os.mkdir(object_server_dir) + + for conf_file in Server('object-auditor').conf_files(): + config = readconf(conf_file) + if 'object-auditor' not in config: + continue # *somebody* should be set up to run the auditor + config['object-auditor'].update( + {'watchers': 'swift#dark_data'}) + # Note that this setdefault business may mean the watcher doesn't + # pick up DEFAULT values, but that (probably?) won't matter + config.setdefault(CONF_SECTION, {}).update( + {'action': self.action}) + + parser = ConfigParser() + for section in ('object-auditor', CONF_SECTION): + parser.add_section(section) + for option, value in config[section].items(): + parser.set(section, option, value) + + file_name = os.path.basename(conf_file) + if file_name.endswith('.d'): + # Work around conf.d setups (like you might see with VSAIO) + file_name = file_name[:-2] + with open(os.path.join(object_server_dir, file_name), 'w') as fp: + parser.write(fp) + + self.container_name = 'container-%s' % uuid.uuid4() + self.object_name = 'object-%s' % uuid.uuid4() + self.brain = BrainSplitter(self.url, self.token, self.container_name, + self.object_name, 'object', + policy=self.policy) + + def tearDown(self): + shutil.rmtree(self.conf_dest) + + def gather_object_files_by_ext(self): + result = collections.defaultdict(set) + for node in self.brain.nodes: + for path, _, files in os.walk(os.path.join( + self.device_dir(node), + get_policy_string('objects', self.policy))): + for file in files: + if file in ('.lock', 'hashes.pkl', 'hashes.invalid'): + continue + _, ext = os.path.splitext(file) + result[ext].add(os.path.join(path, file)) + return result + + def test_dark_data(self): + self.brain.put_container() + self.brain.put_object() + self.brain.stop_handoff_half() + self.brain.delete_object() + Manager(['object-updater']).once() + Manager(['container-replicator']).once() + + # Sanity check: + # * all containers are empty + # * primaries that are still up have two .ts files + # * primary that's down has one .data file + for index, (headers, items) in self.direct_get_container( + container=self.container_name).items(): + self.assertEqual(headers['X-Container-Object-Count'], '0') + self.assertEqual(items, []) + + files = self.gather_object_files_by_ext() + self.assertLengthEqual(files, 2) + self.assertLengthEqual(files['.ts'], 2) + self.assertLengthEqual(files['.data'], 1) + + # Simulate a reclaim_age passing, + # so the tombstones all got cleaned up + for file_path in files['.ts']: + os.unlink(file_path) + + # Old node gets reintroduced to the cluster + self.brain.start_handoff_half() + # ...so replication thinks its got some work to do + Manager(['object-replicator']).once() + + # Now we're back to *three* .data files + files = self.gather_object_files_by_ext() + self.assertLengthEqual(files, 1) + self.assertLengthEqual(files['.data'], 3) + + # But that's OK, audit watchers to the rescue! + old_swift_dir = manager.SWIFT_DIR + manager.SWIFT_DIR = self.conf_dest + try: + Manager(['object-auditor']).once() + finally: + manager.SWIFT_DIR = old_swift_dir + + # Verify that the policy was applied. + self.check_on_disk_files(files['.data']) + + def check_on_disk_files(self, files): + for file_path in files: + # File's not there + self.assertFalse(os.path.exists(file_path)) + # And it's not quaratined, either! + self.assertPathDoesNotExist(os.path.join( + file_path[:file_path.index('objects')], 'quarantined')) + + def assertPathExists(self, path): + msg = "Expected path %r to exist, but it doesn't" % path + self.assertTrue(os.path.exists(path), msg) + + def assertPathDoesNotExist(self, path): + msg = "Expected path %r to not exist, but it does" % path + self.assertFalse(os.path.exists(path), msg) + + +class TestDarkDataQuarantining(TestDarkDataDeletion): + action = 'quarantine' + + def check_on_disk_files(self, files): + for file_path in files: + # File's not there + self.assertPathDoesNotExist(file_path) + # Got quarantined + parts = file_path.split(os.path.sep) + quarantine_dir = parts[:parts.index('objects')] + ['quarantined'] + quarantine_path = os.path.sep.join( + quarantine_dir + ['objects'] + parts[-2:]) + self.assertPathExists(quarantine_path) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py index 01b7b8ccc4..267b733c58 100644 --- a/test/probe/test_sharder.py +++ b/test/probe/test_sharder.py @@ -242,11 +242,6 @@ class BaseTestContainerSharding(ReplProbeTest): '\n '.join(result['other'])) return result - def assertLengthEqual(self, obj, length): - obj_len = len(obj) - self.assertEqual(obj_len, length, 'len(%r) == %d, not %d' % ( - obj, obj_len, length)) - def assert_dict_contains(self, expected_items, actual_dict): ignored = set(expected_items) ^ set(actual_dict) filtered_actual = {k: actual_dict[k] diff --git a/test/unit/obj/test_auditor.py b/test/unit/obj/test_auditor.py index 2c7fe3dd94..14a15f9c2e 100644 --- a/test/unit/obj/test_auditor.py +++ b/test/unit/obj/test_auditor.py @@ -12,32 +12,35 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. -import json import unittest +import json import mock import os -import sys +import pkg_resources import signal -import time import string +import sys +import time import xattr from shutil import rmtree from tempfile import mkdtemp import textwrap from os.path import dirname, basename -from test.unit import (debug_logger, patch_policies, make_timestamp_iter, - DEFAULT_TEST_EC_TYPE, skip_if_no_xattrs) +from test.unit import ( + debug_logger, DEFAULT_TEST_EC_TYPE, + make_timestamp_iter, patch_policies, skip_if_no_xattrs) +from test.unit.obj.common import write_diskfile from swift.obj import auditor, replicator +from swift.obj.watchers.dark_data import DarkDataWatcher from swift.obj.diskfile import ( DiskFile, write_metadata, invalidate_hash, get_data_dir, DiskFileManager, ECDiskFileManager, AuditLocation, clear_auditor_status, get_auditor_status, HASH_FILE, HASH_INVALIDATIONS_FILE) from swift.common.utils import ( - mkdirs, normalize_timestamp, Timestamp, readconf, md5) + mkdirs, normalize_timestamp, Timestamp, readconf, md5, PrefixLoggerAdapter) from swift.common.storage_policy import ( ECStoragePolicy, StoragePolicy, POLICIES, EC_POLICY) -from test.unit.obj.common import write_diskfile _mocked_policies = [ StoragePolicy(0, 'zero', False), @@ -60,8 +63,33 @@ def works_only_once(callable_thing, exception): return only_once -@patch_policies(_mocked_policies) -class TestAuditor(unittest.TestCase): +def no_audit_watchers(group, name=None): + if group == 'swift.object_audit_watcher': + return iter([]) + else: + return pkg_resources.iter_entry_points(group, name) + + +class FakeRing1(object): + + def __init__(self, swift_dir, ring_name=None): + return + + def get_nodes(self, *args, **kwargs): + x = 1 + node1 = {'ip': '10.0.0.%s' % x, + 'replication_ip': '10.0.0.%s' % x, + 'port': 6200 + x, + 'replication_port': 6200 + x, + 'device': 'sda', + 'zone': x % 3, + 'region': x % 2, + 'id': x, + 'handoff_index': 1} + return (1, [node1]) + + +class TestAuditorBase(unittest.TestCase): def setUp(self): skip_if_no_xattrs() @@ -113,7 +141,7 @@ class TestAuditor(unittest.TestCase): # diskfiles for policy 0, 1, 2 self.disk_file = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o', policy=POLICIES[0]) - self.disk_file_p1 = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', + self.disk_file_p1 = self.df_mgr.get_diskfile('sda', '0', 'a', 'c2', 'o', policy=POLICIES[1]) self.disk_file_ec = self.ec_df_mgr.get_diskfile( 'sda', '0', 'a', 'c', 'o', policy=POLICIES[2], frag_index=1) @@ -121,6 +149,10 @@ class TestAuditor(unittest.TestCase): def tearDown(self): rmtree(os.path.dirname(self.testdir), ignore_errors=1) + +@patch_policies(_mocked_policies) +class TestAuditor(TestAuditorBase): + def test_worker_conf_parms(self): def check_common_defaults(): self.assertEqual(auditor_worker.max_bytes_per_second, 10000000) @@ -1532,5 +1564,163 @@ class TestAuditor(unittest.TestCase): .format(outstanding_pids)) +@mock.patch('pkg_resources.iter_entry_points', no_audit_watchers) +@patch_policies(_mocked_policies) +class TestAuditWatchers(TestAuditorBase): + + def setUp(self): + super(TestAuditWatchers, self).setUp() + + timestamp = Timestamp(time.time()) + + data = b'0' * 1024 + etag = md5() + with self.disk_file.create() as writer: + writer.write(data) + etag.update(data) + etag = etag.hexdigest() + metadata = { + 'ETag': etag, + 'X-Timestamp': timestamp.internal, + 'Content-Length': str(len(data)), + 'X-Object-Meta-Flavor': 'banana', + } + writer.put(metadata) + + data = b'1' * 2048 + etag = md5() + with self.disk_file_p1.create() as writer: + writer.write(data) + etag.update(data) + etag = etag.hexdigest() + metadata = { + 'ETag': etag, + 'X-Timestamp': timestamp.internal, + 'Content-Length': str(len(data)), + 'X-Object-Meta-Flavor': 'orange', + } + writer.put(metadata) + + def test_watchers(self): + + calls = [] + + class TestWatcher(object): + def __init__(self, conf, logger): + self._started = False + self._ended = False + calls.append(["__init__", conf, logger]) + + # Make sure the logger is capable of quacking like a logger + logger.debug("getting started") + + def start(self, audit_type, **other_kwargs): + if self._started: + raise Exception("don't call it twice") + self._started = True + calls.append(['start', audit_type]) + + def see_object(self, object_metadata, + data_file_path, **other_kwargs): + calls.append(['see_object', object_metadata, + data_file_path, other_kwargs]) + + def end(self, **other_kwargs): + if self._ended: + raise Exception("don't call it twice") + self._ended = True + calls.append(['end']) + + conf = self.conf.copy() + conf['watchers'] = 'test_watcher1' + conf['__file__'] = '/etc/swift/swift.conf' + ret_config = {'swift#dark_data': {'action': 'log'}} + with mock.patch('swift.obj.auditor.parse_prefixed_conf', + return_value=ret_config), \ + mock.patch('swift.obj.auditor.load_pkg_resource', + side_effect=[TestWatcher]) as mock_load, \ + mock.patch('swift.obj.auditor.get_logger', + lambda *a, **kw: self.logger): + my_auditor = auditor.ObjectAuditor(conf) + + self.assertEqual(mock_load.mock_calls, [ + mock.call('swift.object_audit_watcher', 'test_watcher1'), + ]) + + my_auditor.run_audit(mode='once', zero_byte_fps=float("inf")) + + self.assertEqual(len(calls), 5) + + self.assertEqual(calls[0], ["__init__", conf, mock.ANY]) + self.assertIsInstance(calls[0][2], PrefixLoggerAdapter) + self.assertIs(calls[0][2].logger, self.logger) + + self.assertEqual(calls[1], ["start", "ZBF"]) + + self.assertEqual(calls[2][0], "see_object") + self.assertEqual(calls[3][0], "see_object") + + # The order in which the auditor finds things on the filesystem is + # irrelevant; what matters is that it finds all the things. + calls[2:4] = sorted(calls[2:4], key=lambda item: item[1]['name']) + + self.assertDictContainsSubset({'name': '/a/c/o', + 'X-Object-Meta-Flavor': 'banana'}, + calls[2][1]) + self.assertIn('node/sda/objects/0/', calls[2][2]) # data_file_path + self.assertTrue(calls[2][2].endswith('.data')) # data_file_path + self.assertEqual({}, calls[2][3]) + + self.assertDictContainsSubset({'name': '/a/c2/o', + 'X-Object-Meta-Flavor': 'orange'}, + calls[3][1]) + self.assertIn('node/sda/objects-1/0/', calls[3][2]) # data_file_path + self.assertTrue(calls[3][2].endswith('.data')) # data_file_path + self.assertEqual({}, calls[3][3]) + + self.assertEqual(calls[4], ["end"]) + + log_lines = self.logger.get_lines_for_level('debug') + self.assertIn( + "[audit-watcher test_watcher1] getting started", + log_lines) + + def test_builtin_watchers(self): + + conf = self.conf.copy() + conf['watchers'] = 'test_watcher1' + conf['__file__'] = '/etc/swift/swift.conf' + ret_config = {'swift#dark_data': {'action': 'log'}} + with mock.patch('swift.obj.auditor.parse_prefixed_conf', + return_value=ret_config), \ + mock.patch('swift.obj.auditor.load_pkg_resource', + side_effect=[DarkDataWatcher]): + my_auditor = auditor.ObjectAuditor(conf, logger=self.logger) + + def fake_direct_get_container(node, part, account, container, + prefix=None, limit=None): + self.assertEqual(part, 1) + self.assertEqual(limit, 1) + # The returned entry is not abbreviated, but is full of nonsese. + entry = {'bytes': 30968411, + 'hash': '60303f4122966fe5925f045eb52d1129', + 'name': '%s' % prefix, + 'content_type': 'video/mp4', + 'last_modified': '2017-08-15T03:30:57.693210'} + return {}, [entry] + + with mock.patch('swift.obj.watchers.dark_data.Ring', FakeRing1), \ + mock.patch("swift.obj.watchers.dark_data.direct_get_container", + fake_direct_get_container): + my_auditor.run_audit(mode='once') + + # N.B. We want to check for ok files instead of dark because + # if anything goes wrong inside, we want it fail the test. + log_lines = self.logger.get_lines_for_level('info') + self.assertIn( + '[audit-watcher test_watcher1] total unknown 0 ok 2 dark 0', + log_lines) + + if __name__ == '__main__': unittest.main()

AltStyle によって変換されたページ (->オリジナル) /