diff --git a/etc/log-processing.conf-sample b/etc/log-processing.conf-sample index 2d9293c4c5..cc7ea6e276 100644 --- a/etc/log-processing.conf-sample +++ b/etc/log-processing.conf-sample @@ -14,7 +14,7 @@ swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 container_name = log_data source_filename_format = %Y%m%d%H* -class_path = swift.stats.access_processor +class_path = swift.stats.access_processor.AccessLogProcessor # service ips is for client ip addresses that should be counted as servicenet # service_ips = @@ -23,5 +23,5 @@ class_path = swift.stats.access_processor swift_account = AUTH_7abbc116-8a07-4b63-819d-02715d3e0f31 container_name = account_stats source_filename_format = %Y%m%d%H* -class_path = swift.stats.stats_processor +class_path = swift.stats.stats_processor.StatsLogProcessor # account_server_conf = /etc/swift/account-server.conf diff --git a/swift/stats/log_processor.py b/swift/stats/log_processor.py new file mode 100644 index 0000000000..b802ba4404 --- /dev/null +++ b/swift/stats/log_processor.py @@ -0,0 +1,226 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +class LogProcessor(object): + + def __init__(self, conf, logger): + stats_conf = conf.get('log-processor', {}) + + working_dir = stats_conf.get('working_dir', '/tmp/swift/') + if working_dir.endswith('/') and len(working_dir)> 1: + working_dir = working_dir[:-1] + self.working_dir = working_dir + proxy_server_conf_loc = stats_conf.get('proxy_server_conf', + '/etc/swift/proxy-server.conf') + try: + c = ConfigParser() + c.read(proxy_server_conf_loc) + proxy_server_conf = dict(c.items('proxy-server')) + except: + proxy_server_conf = None + self.proxy_server_conf = proxy_server_conf + if isinstance(logger, tuple): + self.logger = get_logger(*logger) + else: + self.logger = logger + + # load the processing plugins + self.plugins = {} + plugin_prefix = 'log-processor-' + for section in (x for x in conf if x.startswith(plugin_prefix)): + plugin_name = section[len(plugin_prefix):] + plugin_conf = conf.get(section, {}) + self.plugins[plugin_name] = plugin_conf + import_target, class_name = plugin_conf['class_path'].rsplit('.', 1) + module = __import__(import_target, fromlist=[import_target]) + klass = getattr(module, class_name) + self.plugins[plugin_name]['instance'] = klass(plugin_conf) + + def process_one_file(self, plugin_name, account, container, object_name): + # get an iter of the object data + compressed = object_name.endswith('.gz') + stream = self.get_object_data(account, container, object_name, + compressed=compressed) + # look up the correct plugin and send the stream to it + return self.plugins[plugin_name]['instance'].process(stream) + + def get_data_list(self, start_date=None, end_date=None, listing_filter=None): + total_list = [] + for p in self.plugins: + account = p['swift_account'] + container = p['container_name'] + l = self.get_container_listing(account, container, start_date, + end_date, listing_filter) + for i in l: + total_list.append((p, account, container, i)) + return total_list + + def get_container_listing(self, swift_account, container_name, start_date=None, + end_date=None, listing_filter=None): + ''' + Get a container listing, filtered by start_date, end_date, and + listing_filter. Dates, if given, should be in YYYYMMDDHH format + ''' + search_key = None + if start_date is not None: + date_parts = [] + try: + year, start_date = start_date[:4], start_date[4:] + if year: + date_parts.append(year) + month, start_date = start_date[:2], start_date[2:] + if month: + date_parts.append(month) + day, start_date = start_date[:2], start_date[2:] + if day: + date_parts.append(day) + hour, start_date = start_date[:2], start_date[2:] + if hour: + date_parts.append(hour) + except IndexError: + pass + else: + search_key = '/'.join(date_parts) + end_key = None + if end_date is not None: + date_parts = [] + try: + year, end_date = end_date[:4], end_date[4:] + if year: + date_parts.append(year) + month, end_date = end_date[:2], end_date[2:] + if month: + date_parts.append(month) + day, end_date = end_date[:2], end_date[2:] + if day: + date_parts.append(day) + hour, end_date = end_date[:2], end_date[2:] + if hour: + date_parts.append(hour) + except IndexError: + pass + else: + end_key = '/'.join(date_parts) + container_listing = self.private_proxy.get_container_list( + swift_account, + container_name, + marker=search_key) + results = [] + if container_listing is not None: + if listing_filter is None: + listing_filter = set() + for item in container_listing: + name = item['name'] + if end_key and name> end_key: + break + if name not in listing_filter: + results.append(name) + return results + + def get_object_data(self, swift_account, container_name, object_name, + compressed=False): + '''reads an object and yields its lines''' + o = self.private_proxy.get_object(swift_account, + container_name, + object_name) + tmp_file = tempfile.TemporaryFile(dir=self.working_dir) + with tmp_file as f: + bad_file = False + try: + for chunk in o: + f.write(chunk) + except ChunkReadTimeout: + bad_file = True + if bad_file: + raise BadFileDownload() + f.flush() + f.seek(0) # rewind to start reading + last_part = '' + last_compressed_part = '' + # magic in the following zlib.decompressobj argument is courtesy of + # http://stackoverflow.com/questions/2423866/python-decompressing-gzip-chunk-by-chunk + d = zlib.decompressobj(16+zlib.MAX_WBITS) + for chunk in iter(lambda: f.read(16384), ''): + if compressed: + try: + chunk = d.decompress(chunk) + except zlib.error: + raise BadFileDownload() # bad compressed data + parts = chunk.split('\n') + parts[0] = last_part + parts[0] + for part in parts[:-1]: + yield part + last_part = parts[-1] + if last_part: + yield last_part + +def multiprocess_collate(processor_args, + start_date=None, + end_date=None, + listing_filter=None): + '''get listing of files and yield hourly data from them''' + p = LogProcessor(*processor_args) + all_files = p.get_data_list(start_date, end_date, listing_filter) + + p.logger.info('loaded %d files to process' % len(all_files)) + + if not all_files: + # no work to do + return + + worker_count = multiprocessing.cpu_count() - 1 + results = [] + in_queue = multiprocessing.Queue() + out_queue = multiprocessing.Queue() + for _ in range(worker_count): + p = multiprocessing.Process(target=collate_worker, + args=(processor_args, + in_queue, + out_queue)) + p.start() + results.append(p) + for x in all_files: + in_queue.put(x) + for _ in range(worker_count): + in_queue.put(None) + count = 0 + while True: + try: + item, data = out_queue.get_nowait() + count += 1 + if data: + yield item, data + if count>= len(all_files): + # this implies that one result will come from every request + break + except Queue.Empty: + time.sleep(.1) + for r in results: + r.join() + +def collate_worker(processor_args, in_queue, out_queue): + '''worker process for multiprocess_collate''' + p = LogProcessor(*processor_args) + while True: + try: + item = in_queue.get_nowait() + if item is None: + break + except Queue.Empty: + time.sleep(.1) + else: + ret = None + ret = p.process_one_file(item) + out_queue.put((item, ret)) \ No newline at end of file diff --git a/swift/stats/stats_processor.py b/swift/stats/stats_processor.py new file mode 100644 index 0000000000..793963ff04 --- /dev/null +++ b/swift/stats/stats_processor.py @@ -0,0 +1,55 @@ +# Copyright (c) 2010 OpenStack, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +class StatsLogProcessor(object): + + def __init__(self, conf): + pass + + def process(self, obj_stream): + '''generate hourly groupings of data from one stats log file''' + account_totals = {} + year, month, day, hour, _ = item.split('/') + for line in obj_stream: + if not line: + continue + try: + (account, + container_count, + object_count, + bytes_used, + created_at) = line.split(',') + account = account.strip('"') + if account_name and account_name != account: + continue + container_count = int(container_count.strip('"')) + object_count = int(object_count.strip('"')) + bytes_used = int(bytes_used.strip('"')) + aggr_key = account + aggr_key = (account, year, month, day, hour) + d = account_totals.get(aggr_key, {}) + d['count'] = d.setdefault('count', 0) + 1 + d['container_count'] = d.setdefault('container_count', 0) + \ + container_count + d['object_count'] = d.setdefault('object_count', 0) + \ + object_count + d['bytes_used'] = d.setdefault('bytes_used', 0) + \ + bytes_used + d['created_at'] = created_at + account_totals[aggr_key] = d + except (IndexError, ValueError): + # bad line data + pass + return account_totals, item