From dcd5a265f6f36551e0bc0feac0e08bfdd09301a3 Mon Sep 17 00:00:00 2001 From: Yan Xiao Date: 2024年9月26日 09:35:35 -0400 Subject: [PATCH] proxy-logging: Add real-time transfer bytes counters Currently we can get one proxy-logging transfer stat emission over the duration of the upload/download. We want another stat coming out of proxy-logging: something that gets emitted periodically as bytes are actually sent/received so we can get reasonably accurate point-in-time breakdowns of bandwidth usage. Co-Authored-By: Alistair Coles Co-Authored-By: Shreeya Deshpande Change-Id: Ideecd0aa58ddf091c9f25f15022a9066088f532b Signed-off-by: Yan Xiao --- etc/proxy-server.conf-sample | 14 +- swift/common/middleware/proxy_logging.py | 167 +- swift/common/wsgi.py | 2 +- test/unit/common/middleware/helpers.py | 14 +- .../common/middleware/test_proxy_logging.py | 1376 ++++++++++++++++- test/unit/common/test_wsgi.py | 12 + 6 files changed, 1528 insertions(+), 57 deletions(-) diff --git a/etc/proxy-server.conf-sample b/etc/proxy-server.conf-sample index e5cbf5335b..87d6b7b42b 100644 --- a/etc/proxy-server.conf-sample +++ b/etc/proxy-server.conf-sample @@ -1128,7 +1128,19 @@ use = egg:swift#proxy_logging # will be substituted with the appropriate values. For more information, see # https://docs.openstack.org/swift/latest/logs.html # log_msg_template = {client_ip} {remote_addr} {end_time.datetime} {method} {path} {protocol} {status_int} {referer} {user_agent} {auth_token} {bytes_recvd} {bytes_sent} {client_etag} {transaction_id} {headers} {request_time} {source} {log_info} {start_time} {end_time} {policy_index} {access_user_id} - +# The proxy_logging middleware attempts to translate s3api request paths to +# swift paths. To do so it needs to know which, if any, storage domains are +# configured for virtual-hosted style requests. This option should be a +# comma-separated list of host names, exactly the same as that set for the +# s3api middleware. +# storage_domain = +# +# A float value in seconds for how often the proxy server should emit statsd +# real-time buffer transfer bytes counter metrics for WSGI input or output. +# Negative value for not emitting such metrics. 0 for always emitting such +# metrics on every input or output call. Default is -1. +# statsd_emit_buffer_xfer_bytes_seconds = -1 +# # Note: Put before both ratelimit and auth in the pipeline. [filter:bulk] use = egg:swift#bulk diff --git a/swift/common/middleware/proxy_logging.py b/swift/common/middleware/proxy_logging.py index 54ac677bd6..8edac2843b 100644 --- a/swift/common/middleware/proxy_logging.py +++ b/swift/common/middleware/proxy_logging.py @@ -87,17 +87,20 @@ bandwidth usage will want to only sum up logs with no ``swift.source``. import os import time +from collections import ChainMap from swift.common.constraints import valid_api_version from swift.common.middleware.catch_errors import ByteEnforcer +from swift.common.middleware.s3api.utils import extract_bucket_and_key, \ + is_s3_req from swift.common.request_helpers import get_log_info from swift.common.swob import Request from swift.common.utils import (get_logger, get_remote_client, config_true_value, reiterate, close_if_possible, cap_length, - InputProxy, list_from_csv, get_policy_index, - split_path, StrAnonymizer, StrFormatTime, - LogStringFormatter) + InputProxy, list_from_csv, + get_policy_index, LogStringFormatter, + split_path, StrAnonymizer, StrFormatTime) from swift.common.statsd_client import get_labeled_statsd_client from swift.common.storage_policy import POLICIES @@ -105,6 +108,83 @@ from swift.common.registry import get_sensitive_headers, \ get_sensitive_params, register_sensitive_header +def statsd_metric_resp_labels(base_labels, status_int=None, policy_index=None): + # compose labels used for response metrics + extra_labels = {} + if policy_index is not None: + extra_labels['policy'] = policy_index + if status_int: + extra_labels['status'] = status_int + labels_source = ChainMap(extra_labels, base_labels) + return labels_source + + +class CallbackInputProxy(InputProxy): + """ + :param wsgi_input: file-like object to be wrapped + :param callback: a function or a callable that + accept args (chunk, eof), + and returns chunk or a modified chunk. + eof is ``True`` if there are no more bytes to + read from the wrapped input, ``False`` otherwise. + """ + def __init__(self, wsgi_input, callback): + super().__init__(wsgi_input) + self.callback = callback + + def chunk_update(self, chunk, eof, *arg, **kwargs): + return self.callback(chunk, eof) + + +class BufferXferEmitCallback(object): + def __init__(self, metric_name, labels, statsd, + emit_buffer_xfer_bytes_sec): + self.metric_name = metric_name + self.labels = labels + self.statsd = statsd + self.emit_buffer_xfer_bytes_sec = emit_buffer_xfer_bytes_sec + self.emit_bytes = 0 + self.next_emit_time = 0 + if self.emit_buffer_xfer_bytes_sec> 0: + self.next_emit_time = (time.time() + + self.emit_buffer_xfer_bytes_sec) + + def __call__(self, buffer, eof=False): + self._maybe_emit_stat(buffer, eof) + return buffer + + def _maybe_emit_stat(self, buffer, eof=False): + """ + Accumulate the length of ``buffer`` and periodically emit a stat + with the accumulated length. + + :param buffer: the buffer that has been read. + :param eof: if True, a stat is emitted immediately; otherwise a + stat will be emitted when ``next_emit_time`` has been reached. + """ + + if self.emit_buffer_xfer_bytes_sec < 0: + return + buffer_len = len(buffer) + self.emit_bytes += buffer_len + if not self.labels.get('account', None): + # tolerate no account, maybe it'll be there in time for next stat + return + + now = time.time() + if eof is False and self.next_emit_time> now: + return + + if self.emit_bytes != 0: + self.statsd.update_stats( + self.metric_name, + self.emit_bytes, + labels=self.labels, + ) + self.emit_bytes = 0 + self.next_emit_time = (now + self.emit_buffer_xfer_bytes_sec) + + class ProxyLoggingMiddleware(object): """ Middleware that logs Swift proxy requests in the swift log format. @@ -126,6 +206,7 @@ class ProxyLoggingMiddleware(object): # convert it now to prevent useless convertion later. self.anonymization_method = conf.get('log_anonymization_method', 'md5') self.anonymization_salt = conf.get('log_anonymization_salt', '') + self.storage_domains = list_from_csv(conf.get('storage_domain', '')) self.log_hdrs = config_true_value(conf.get( 'access_log_headers', conf.get('log_headers', 'no'))) @@ -167,6 +248,8 @@ class ProxyLoggingMiddleware(object): self.reveal_sensitive_prefix = int( conf.get('reveal_sensitive_prefix', 16)) self.check_log_msg_template_validity() + self.emit_buffer_xfer_bytes_sec = float( + conf.get('statsd_emit_buffer_xfer_bytes_seconds', -1)) def check_log_msg_template_validity(self): replacements = { @@ -398,9 +481,7 @@ class ProxyLoggingMiddleware(object): acc, cont, obj = None, None, None return acc, cont, obj - def get_metric_name_type(self, req): - swift_path = req.environ.get('swift.backend_path', req.path) - acc, cont, obj = self.get_aco_from_path(swift_path) + def get_resource_type_from_aco(self, req, acc, cont, obj): if obj: return 'object' if cont: @@ -409,23 +490,58 @@ class ProxyLoggingMiddleware(object): return 'account' return req.environ.get('swift.source') or 'UNKNOWN' + def get_resource_type(self, req): + swift_path = req.environ.get('swift.backend_path', req.path) + acc, cont, obj = self.get_aco_from_path(swift_path) + return self.get_resource_type_from_aco(req, acc, cont, obj) + def statsd_metric_method(self, method): return method if method in self.valid_methods else 'BAD_METHOD' def statsd_metric_name(self, req, status_int, metric_method): - stat_type = self.get_metric_name_type(req) - return '.'.join((stat_type, metric_method, str(status_int))) + resource_type = self.get_resource_type(req) + return '.'.join((resource_type, metric_method, str(status_int))) + + def update_swift_base_labels(self, req): + acc, cont, obj = self.get_aco_from_path(req.path) + base_labels = req.environ.get('swift.base_labels') + if base_labels is None: + # expected in the left-most proxy_logging instance + if acc is None and is_s3_req(req): + cont, obj = extract_bucket_and_key( + req, self.storage_domains, False) + + method = self.method_from_req(req) + metric_method = self.statsd_metric_method(method) + resource_type = self.get_resource_type_from_aco( + req, acc, cont, obj) + base_labels = { + 'method': metric_method, + } + if resource_type != 'UNKNOWN' or not is_s3_req(req): + base_labels['resource'] = resource_type + if acc: + base_labels['account'] = acc + if cont: + base_labels['container'] = cont + req.environ['swift.base_labels'] = base_labels + elif acc: + # expected in the right-most proxy_logging instance + resource_type = self.get_resource_type_from_aco( + req, acc, cont, obj) + base_labels.setdefault('account', acc) + base_labels.setdefault('resource', resource_type) def statsd_metric_name_policy(self, req, status_int, metric_method, policy_index): if policy_index is None: return None - stat_type = self.get_metric_name_type(req) - if stat_type == 'object': + resource_type = self.get_resource_type(req) + if resource_type == 'object': # The policy may not exist policy = POLICIES.get_by_index(policy_index) if policy: - return '.'.join((stat_type, 'policy', str(policy_index), + return '.'.join((resource_type, 'policy', str(policy_index), metric_method, str(status_int))) else: return None @@ -434,7 +550,7 @@ class ProxyLoggingMiddleware(object): def statsd_metric_labels(self, req, status_int, metric_method, acc=None, cont=None, policy_index=None): - resource_type = self.get_metric_name_type(req) + resource_type = self.get_resource_type(req) labels = { 'resource': resource_type, @@ -452,13 +568,24 @@ class ProxyLoggingMiddleware(object): return labels def __call__(self, env, start_response): + req = Request(env) + self.update_swift_base_labels(req) + if self.req_already_logged(env): return self.app(env, start_response) self.mark_req_logged(env) start_response_args = [None] - input_proxy = InputProxy(env['wsgi.input']) + + xfer_metric_name = 'swift_proxy_server_request_body_streaming_bytes' + xfer_labels = req.environ.get('swift.base_labels') + + statsd_emit_callback = BufferXferEmitCallback( + xfer_metric_name, xfer_labels, self.statsd, + self.emit_buffer_xfer_bytes_sec) + input_proxy = CallbackInputProxy(env['wsgi.input'], + statsd_emit_callback) env['wsgi.input'] = input_proxy start_time = time.time() @@ -486,7 +613,6 @@ class ProxyLoggingMiddleware(object): start_response_args[0][1].append( ('Content-Length', str(content_length))) - req = Request(env) method = self.method_from_req(req) if method == 'HEAD': content_length = 0 @@ -497,10 +623,11 @@ class ProxyLoggingMiddleware(object): resp_headers = dict(start_response_args[0][1]) start_response(*start_response_args[0]) + policy_index = get_policy_index(req.headers, resp_headers) + # Log timing information for time-to-first-byte (GET requests only) ttfb = 0.0 if method == 'GET': - policy_index = get_policy_index(req.headers, resp_headers) swift_path = req.environ.get('swift.backend_path', req.path) acc, cont, _ = self.get_aco_from_path(swift_path) labels = self.statsd_metric_labels( @@ -525,10 +652,19 @@ class ProxyLoggingMiddleware(object): labels=labels, ) + resp_xfer_labels = statsd_metric_resp_labels( + xfer_labels, status_int=wire_status_int, + policy_index=policy_index) + bytes_sent = 0 + statsd_emit_callback = BufferXferEmitCallback( + 'swift_proxy_server_response_body_streaming_bytes', + resp_xfer_labels, + self.statsd, self.emit_buffer_xfer_bytes_sec) try: for chunk in iterator: bytes_sent += len(chunk) + statsd_emit_callback(chunk) yield chunk except GeneratorExit: # generator was closed before we finished env['swift.proxy_logging_status'] = 499 @@ -537,6 +673,7 @@ class ProxyLoggingMiddleware(object): env['swift.proxy_logging_status'] = 500 raise finally: + statsd_emit_callback(b'', eof=True) env.setdefault('swift.proxy_logging_status', wire_status_int) status_int = status_int_for_logging() self.log_request( diff --git a/swift/common/wsgi.py b/swift/common/wsgi.py index 760a2215f4..f09d88eae8 100644 --- a/swift/common/wsgi.py +++ b/swift/common/wsgi.py @@ -1352,7 +1352,7 @@ def make_env(env, method=None, path=None, agent='Swift', query_string=None, 'swift.trans_id', 'swift.authorize_override', 'swift.authorize', 'HTTP_X_USER_ID', 'HTTP_X_PROJECT_ID', 'HTTP_REFERER', 'swift.infocache', - 'swift.shard_listing_history'): + 'swift.base_labels', 'swift.shard_listing_history'): if name in env: newenv[name] = env[name] if method: diff --git a/test/unit/common/middleware/helpers.py b/test/unit/common/middleware/helpers.py index bd844f4097..6225e1d136 100644 --- a/test/unit/common/middleware/helpers.py +++ b/test/unit/common/middleware/helpers.py @@ -193,8 +193,9 @@ class FakeSwift(object): container_existence_skip_cache = 0.0 account_existence_skip_cache = 0.0 - def __init__(self, capture_unexpected_calls=True): + def __init__(self, capture_unexpected_calls=True, test_read_size=-1): self.capture_unexpected_calls = capture_unexpected_calls + self.read_size = test_read_size self._calls = [] self._unclosed_req_keys = defaultdict(int) self._unread_req_paths = defaultdict(int) @@ -325,7 +326,14 @@ class FakeSwift(object): if (cont and not obj and method == 'UPDATE') or ( obj and method == 'PUT'): - call.body = b''.join(iter(env['wsgi.input'].read, b'')) + if self.read_size < 0: + call.body = b''.join(iter(env['wsgi.input'].read, b'')) + else: + call.body = b'' + buf = env['wsgi.input'].read(self.read_size) + while buf: + call.body += buf + buf = env['wsgi.input'].read(self.read_size) # simulate object PUT if method == 'PUT' and obj: @@ -377,7 +385,7 @@ class FakeSwift(object): resolve_ignore_range_header(req, headers) # range requests ought to work, hence conditional_response=True - if isinstance(body, list): + if not isinstance(body, (bytes, str)): resp = resp_class( req=req, headers=headers, app_iter=body, conditional_response=req.method in ('GET', 'HEAD'), diff --git a/test/unit/common/middleware/test_proxy_logging.py b/test/unit/common/middleware/test_proxy_logging.py index d6fa90ce56..f9900c9580 100644 --- a/test/unit/common/middleware/test_proxy_logging.py +++ b/test/unit/common/middleware/test_proxy_logging.py @@ -12,9 +12,11 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. +import itertools import logging from unittest import mock +import email import time import unittest from io import BytesIO @@ -22,23 +24,28 @@ from logging.handlers import SysLogHandler from urllib.parse import unquote -from swift.common.utils import get_swift_logger, split_path +from swift.common.utils import get_swift_logger, split_path, md5 from swift.common.middleware import proxy_logging from swift.common.registry import register_sensitive_header, \ register_sensitive_param, get_sensitive_headers -from swift.common.swob import Request, Response, HTTPServiceUnavailable +from swift.common.swob import Request, Response, HTTPServiceUnavailable, \ + HTTPCreated, HTTPOk from swift.common import constraints, registry, statsd_client from swift.common.storage_policy import StoragePolicy + from test.debug_logger import debug_logger, FakeStatsdClient, \ FakeLabeledStatsdClient, debug_labeled_statsd_client from test.unit import patch_policies -from test.unit.common.middleware.helpers import FakeAppThatExcepts, FakeSwift +from test.unit.common.middleware.helpers import FakeAppThatExcepts, \ + FakeSwift +from test.unit.common.middleware.s3api import FakeAuthApp, filter_factory \ + as s3api_filter_factory class FakeApp(object): def __init__(self, body=None, response_str='200 OK', policy_idx='0', - chunked=False, environ_updates=None): + chunked=False, environ_updates=None, read_callback=None): if body is None: body = [b'FAKE APP'] elif isinstance(body, bytes): @@ -49,6 +56,7 @@ class FakeApp(object): self.policy_idx = policy_idx self.chunked = chunked self.environ_updates = environ_updates or {} + self.read_callback = read_callback def __call__(self, env, start_response): try: @@ -67,10 +75,14 @@ class FakeApp(object): if is_container_or_object_req and self.policy_idx is not None: headers.append(('X-Backend-Storage-Policy-Index', - str(self.policy_idx))) + str(self.policy_idx))) start_response(self.response_str, headers) - while env['wsgi.input'].read(5): - pass + while True: + buf = env['wsgi.input'].read(5) + if self.read_callback is not None: + self.read_callback(len(buf)) + if not buf: + break # N.B. mw can set this anytime before the resp is finished env.update(self.environ_updates) return self.body @@ -112,12 +124,287 @@ class FakeAppReadline(object): return [b"FAKE APP"] +class PathRewritingApp: + """ + Rewrite request path, modifying the container part of the path, to emulate + the behavior of, for example, a multipart upload. + """ + # note: tests deliberately use this explicit rewriting middleware rather + # than relying on the behavior of other middleware that might change + def __init__(self, app, logger): + self.app = app + self.logger = logger + + def __call__(self, env, start_response): + orig_path = env['PATH_INFO'] + req = Request(env) + parts = req.split_path(4, rest_with_last=True) + parts[2] += '+segments' + env['PATH_INFO'] = '/' + '/'.join(parts) + try: + resp = req.get_response(self.app) + except Exception: + self.logger.exception('PathRewritingApp (re-raising)') + raise + env['PATH_INFO'] = orig_path + return resp(self.app, start_response) + + def start_response(*args): pass +class BaseTestProxyLogging(unittest.TestCase): + + def assertLabeledUpdateStats(self, exp_metrics_values_labels): + statsd_calls = self.statsd.calls['update_stats'] + for statsd_call in statsd_calls: + statsd_call[1]['labels'] = dict(statsd_call[1]['labels']) + exp_calls = [] + for metric, value, labels in exp_metrics_values_labels: + exp_calls.append(((metric, value), {'labels': labels})) + self.assertEqual(exp_calls, statsd_calls) + + def assertLabeledTimingStats(self, exp_metrics_values_labels): + statsd_calls = self.statsd.calls['timing'] + exp_calls = [] + for metric, value, labels in exp_metrics_values_labels: + exp_calls.append(((metric, mock.ANY), {'labels': labels})) + self.assertEqual(exp_calls, statsd_calls) + for i, (metric, value, labels) in enumerate(exp_metrics_values_labels): + self.assertAlmostEqual( + value, statsd_calls[i][0][1], places=4, msg=i) + + +class TestCallbackInputProxy(unittest.TestCase): + + def test_read_all(self): + callback = mock.MagicMock(return_value=b'xyz') + self.assertEqual( + b'xyz', proxy_logging.CallbackInputProxy( + BytesIO(b'abc'), callback).read()) + self.assertEqual([mock.call(b'abc', True)], callback.call_args_list) + + callback = mock.MagicMock(return_value=b'xyz') + self.assertEqual( + b'xyz', proxy_logging.CallbackInputProxy( + BytesIO(b'abc'), callback).read(-1)) + self.assertEqual([mock.call(b'abc', True)], callback.call_args_list) + + callback = mock.MagicMock(return_value=b'xyz') + self.assertEqual( + b'xyz', proxy_logging.CallbackInputProxy( + BytesIO(b'abc'), callback).read(None)) + self.assertEqual([mock.call(b'abc', True)], callback.call_args_list) + + def test_read_size(self): + callback = mock.MagicMock(side_effect=[b'a', b'bc', b'']) + cip = proxy_logging.CallbackInputProxy(BytesIO(b'abc'), callback) + self.assertEqual( + b'a', cip.read(1)) + self.assertEqual([mock.call(b'a', False)], callback.call_args_list) + self.assertEqual( + b'bc', cip.read(2)) + self.assertEqual( + [mock.call(b'a', False), mock.call(b'bc', False)], + callback.call_args_list) + self.assertEqual( + b'', cip.read()) + self.assertEqual([mock.call(b'a', False), mock.call(b'bc', False), + mock.call(b'', True)], callback.call_args_list) + + +class TestBufferXferEmitCallback(BaseTestProxyLogging): + + def setUp(self): + self.logger = debug_logger() + + def test_buffer_xfer_emit_callback(self): + conf = { + 'log_headers': 'yes', + 'statsd_label_mode': 'dogstatsd', + 'statsd_emit_buffer_xfer_bytes_seconds': 0, + } + self.statsd = debug_labeled_statsd_client(conf) + app = proxy_logging.ProxyLoggingMiddleware( + FakeApp(), conf, logger=self.logger) + app.statsd = self.statsd + + labels = { + 'account': 'a', + 'container': 'c', + 'method': 'POST', + 'resource': 'container'} + callback = proxy_logging.BufferXferEmitCallback( + 'swift_proxy_example_metric', labels, app.statsd, + app.emit_buffer_xfer_bytes_sec) + callback('abcde') + self.assertLabeledUpdateStats([ + ('swift_proxy_example_metric', 5, { + 'account': 'a', + 'container': 'c', + 'method': 'POST', + 'resource': 'container'}), + ]) + callback('abcdef') + self.assertLabeledUpdateStats([ + ('swift_proxy_example_metric', 5, { + 'account': 'a', + 'container': 'c', + 'method': 'POST', + 'resource': 'container'}), + ('swift_proxy_example_metric', 6, { + 'account': 'a', + 'container': 'c', + 'method': 'POST', + 'resource': 'container'}), + ]) + callback('abcdefg') + self.assertLabeledUpdateStats([ + ('swift_proxy_example_metric', 5, { + 'account': 'a', + 'container': 'c', + 'method': 'POST', + 'resource': 'container'}), + ('swift_proxy_example_metric', 6, { + 'account': 'a', + 'container': 'c', + 'method': 'POST', + 'resource': 'container'}), + ('swift_proxy_example_metric', 7, { + 'account': 'a', + 'container': 'c', + 'method': 'POST', + 'resource': 'container'}), + ]) + + def test_buffer_xfer_emit_callback_negative(self): + conf = { + 'log_headers': 'yes', + 'statsd_label_mode': 'dogstatsd', + 'statsd_emit_buffer_xfer_bytes_seconds': -1, + } + self.statsd = debug_labeled_statsd_client(conf) + app = proxy_logging.ProxyLoggingMiddleware( + FakeApp(), conf, logger=self.logger) + app.statsd = self.statsd + + labels = { + 'account': 'a', + 'container': 'c', + 'method': 'POST', + 'resource': 'container'} + callback = proxy_logging.BufferXferEmitCallback( + 'swift_proxy_example_metric', labels, app.statsd, + app.emit_buffer_xfer_bytes_sec) + callback('abcde') + self.assertLabeledUpdateStats([]) + callback('abcdef') + self.assertLabeledUpdateStats([]) + callback('abcdefg') + self.assertLabeledUpdateStats([]) + + def test_buffer_xfer_emit_callback_positive(self): + conf = { + 'log_headers': 'yes', + 'statsd_label_mode': 'dogstatsd', + 'statsd_emit_buffer_xfer_bytes_seconds': 1000, + } + self.statsd = debug_labeled_statsd_client(conf) + app = proxy_logging.ProxyLoggingMiddleware( + FakeApp(), conf, logger=self.logger) + app.statsd = self.statsd + + labels = { + 'account': 'a', + 'container': 'c', + 'method': 'POST', + 'resource': 'container'} + callback = proxy_logging.BufferXferEmitCallback( + 'swift_proxy_example_metric', labels, app.statsd, + app.emit_buffer_xfer_bytes_sec) + callback('abcde') + # gotta wait for that emit delay + self.assertLabeledUpdateStats([]) + callback('abcdef') + # still no new stats + self.assertLabeledUpdateStats([]) + callback('abcdefg') + # no new stats + # eof always emits + callback('', eof=True) + self.assertLabeledUpdateStats([ + ('swift_proxy_example_metric', 18, { + 'account': 'a', + 'container': 'c', + 'method': 'POST', + 'resource': 'container'}), + ]) + + def test_buffer_eof(self): + buffers = 'abcd' + buffer_len = len(buffers) + conf = { + 'log_headers': 'yes', + 'statsd_label_mode': 'dogstatsd', + 'statsd_emit_buffer_xfer_bytes_seconds': 1, + } + self.statsd = debug_labeled_statsd_client(conf) + app = proxy_logging.ProxyLoggingMiddleware( + FakeApp(), conf, logger=self.logger) + app.statsd = self.statsd + labels = { + 'account': 'a', + 'container': 'c', + 'method': 'POST', + 'resource': 'container'} + callback = proxy_logging.BufferXferEmitCallback( + 'swift_proxy_example_metric', labels, app.statsd, + app.emit_buffer_xfer_bytes_sec) + now = time.time() + with mock.patch('swift.common.middleware.proxy_logging.time.time', + return_value=now): + callback(buffers, eof=False) + self.assertLabeledUpdateStats([]) + with mock.patch('swift.common.middleware.proxy_logging.time.time', + return_value=now + 1): + callback(buffers, eof=False) + self.assertLabeledUpdateStats([ + ('swift_proxy_example_metric', buffer_len * 2, { + 'account': 'a', + 'container': 'c', + 'method': 'POST', + 'resource': 'container'})]) + + with mock.patch('swift.common.middleware.proxy_logging.time.time', + return_value=now + 1.5): + callback(buffers, eof=False) + self.assertLabeledUpdateStats([ + ('swift_proxy_example_metric', buffer_len * 2, { + 'account': 'a', + 'container': 'c', + 'method': 'POST', + 'resource': 'container'})]) + + with mock.patch('swift.common.middleware.proxy_logging.time.time', + return_value=now): + callback(buffers[:-1], eof=True) + self.assertLabeledUpdateStats([ + ('swift_proxy_example_metric', buffer_len * 2, { + 'account': 'a', + 'container': 'c', + 'method': 'POST', + 'resource': 'container'}), + ('swift_proxy_example_metric', buffer_len * 2 - 1, { + 'account': 'a', + 'container': 'c', + 'method': 'POST', + 'resource': 'container'}) + ]) + + @patch_policies([StoragePolicy(0, 'zero', False)]) -class TestProxyLogging(unittest.TestCase): +class TestProxyLogging(BaseTestProxyLogging): def setUp(self): self.logger = debug_logger() # really, this would come by way of base_prefix/tail_prefix in @@ -215,23 +502,6 @@ class TestProxyLogging(unittest.TestCase): ('host', 8125)), app.access_logger.statsd_client.sendto_calls) - def assertLabeledTimingStats(self, exp_metrics_values_labels): - statsd_calls = self.statsd.calls['timing'] - exp_calls = [] - for metric, value, labels in exp_metrics_values_labels: - exp_calls.append(((metric, mock.ANY), {'labels': labels})) - self.assertEqual(exp_calls, statsd_calls) - for i, (metric, value, labels) in enumerate(exp_metrics_values_labels): - self.assertAlmostEqual( - value, statsd_calls[i][0][1], places=4, msg=i) - - def assertLabeledUpdateStats(self, exp_metrics_values_labels): - statsd_calls = self.statsd.calls['update_stats'] - exp_calls = [] - for metric, value, labels in exp_metrics_values_labels: - exp_calls.append(((metric, value), {'labels': labels})) - self.assertEqual(exp_calls, statsd_calls) - def test_init_logger_and_legacy_statsd_options_log_prefix(self): conf = { 'log_headers': 'no', @@ -1115,7 +1385,7 @@ class TestProxyLogging(unittest.TestCase): req = Request.blank('/', environ={'REQUEST_METHOD': 'GET'}) resp = app(req.environ, start_response) # exhaust generator - [x for x in resp] + self.assertEqual(b'FAKE APP', b''.join(resp)) log_parts = self._log_parts(app) headers = unquote(log_parts[14]).split('\n') self.assertIn('Host: localhost:80', headers) @@ -1132,7 +1402,7 @@ class TestProxyLogging(unittest.TestCase): 'Third': '3'}) resp = app(req.environ, start_response) # exhaust generator - [x for x in resp] + self.assertEqual(b'FAKE APP', b''.join(resp)) log_parts = self._log_parts(app) headers = unquote(log_parts[14]).split('\n') self.assertIn('First: 1', headers) @@ -1152,7 +1422,7 @@ class TestProxyLogging(unittest.TestCase): 'wsgi.input': BytesIO(b'some stuff')}) resp = app(req.environ, start_response) # exhaust generator - [x for x in resp] + self.assertEqual(b'FAKE APP', b''.join(resp)) log_parts = self._log_parts(app) self.assertEqual(log_parts[11], str(len('FAKE APP'))) self.assertEqual(log_parts[10], str(len('some stuff'))) @@ -1191,7 +1461,7 @@ class TestProxyLogging(unittest.TestCase): 'wsgi.input': BytesIO(b'some stuff')}) resp = app(req.environ, start_response) # exhaust generator - [x for x in resp] + self.assertEqual(b'FAKE APP', b''.join(resp)) log_parts = self._log_parts(app) self.assertEqual(log_parts[11], str(len('FAKE APP'))) self.assertEqual(log_parts[10], str(len('some stuff'))) @@ -1225,7 +1495,7 @@ class TestProxyLogging(unittest.TestCase): 'wsgi.input': BytesIO(b'some stuff')}) resp = app(req.environ, start_response) # exhaust generator - [x for x in resp] + self.assertEqual(b'FAKE APP', b''.join(resp)) log_parts = self._log_parts(app) self.assertEqual(log_parts[11], str(len('FAKE APP'))) self.assertEqual(log_parts[10], str(len('some stuff'))) @@ -1259,7 +1529,7 @@ class TestProxyLogging(unittest.TestCase): 'wsgi.input': BytesIO(b'some stuff\nsome other stuff\n')}) resp = app(req.environ, start_response) # exhaust generator - [x for x in resp] + self.assertEqual(b'FAKE APP', b''.join(resp)) log_parts = self._log_parts(app) self.assertEqual(log_parts[11], str(len('FAKE APP'))) self.assertEqual(log_parts[10], str(len('some stuff\n'))) @@ -1282,6 +1552,1038 @@ class TestProxyLogging(unittest.TestCase): 'container': 'c'}) ]) + def test_init_storage_domain_default(self): + conf = {} + app = proxy_logging.ProxyLoggingMiddleware(FakeApp(), conf) + self.assertEqual([], app.storage_domains) + + def test_init_storage_domain(self): + conf = {'storage_domain': 'domain'} + app = proxy_logging.ProxyLoggingMiddleware(FakeApp(), conf) + self.assertEqual(['domain'], app.storage_domains) + + def test_init_storage_domain_list(self): + conf = {'storage_domain': 'domain,some.other.domain'} + app = proxy_logging.ProxyLoggingMiddleware(FakeApp(), conf) + self.assertEqual(['domain', 'some.other.domain'], app.storage_domains) + + def _do_test_swift_base_labels(self, mw_conf, path, req_hdrs, + extra_environ=None): + req_environs = [] + + def fake_app(env, start_response): + req_environs.append(env) + return HTTPOk()(env, start_response) + + mw = proxy_logging.ProxyLoggingMiddleware( + fake_app, mw_conf, logger=self.logger) + + environ = { + 'REQUEST_METHOD': 'PUT', + 'HTTP_HOST': 'foo.domain', + } + if extra_environ: + environ.update(extra_environ) + req = Request.blank(path, environ=environ, headers=req_hdrs) + req.get_response(mw) + self.assertEqual(1, len(req_environs)) + return req_environs[0].get('swift.base_labels') + + def test_update_swift_base_labels_swift_request(self): + mw_conf = {} + req_hdrs = {} + self.assertEqual( + { + 'resource': 'account', + 'method': 'PUT', + 'account': 'a', + }, + self._do_test_swift_base_labels(mw_conf, '/v1/a', req_hdrs)) + + self.assertEqual( + { + 'resource': 'container', + 'method': 'PUT', + 'account': 'a', + 'container': 'c' + }, + self._do_test_swift_base_labels(mw_conf, '/v1/a/c', req_hdrs)) + + self.assertEqual( + { + 'resource': 'object', + 'method': 'PUT', + 'account': 'a', + 'container': 'c', + }, + self._do_test_swift_base_labels(mw_conf, '/v1/a/c/o', req_hdrs)) + + def test_update_swift_base_labels_swift_request_partial_existing(self): + # verify that existing container field is not updated + mw_conf = {} + req_hdrs = {} + extra_environ = { + 'swift.base_labels': { + 'resource': 'object', + 'method': 'PUT', + 'container': 'c', + }, + } + + self.assertEqual( + { + 'resource': 'object', + 'method': 'PUT', + 'account': 'a', + 'container': 'c', + }, + self._do_test_swift_base_labels( + mw_conf, '/v1/a', req_hdrs, extra_environ=extra_environ)) + + self.assertEqual( + { + 'resource': 'object', + 'method': 'PUT', + 'account': 'a', + 'container': 'c', + }, + self._do_test_swift_base_labels( + mw_conf, '/v1/a/ccc', req_hdrs, extra_environ=extra_environ)) + + self.assertEqual( + { + 'resource': 'object', + 'method': 'PUT', + 'account': 'a', + 'container': 'c', + }, + self._do_test_swift_base_labels( + mw_conf, '/v1/a/c/ooo', req_hdrs, extra_environ=extra_environ)) + + def test_update_swift_base_labels_swift_request_empty_existing(self): + # verify that missing container field is + # not set once base_labels exists + mw_conf = {} + req_hdrs = {} + extra_environ = { + 'swift.base_labels': { + 'resource': 'object', + 'method': 'PUT', + }, + } + + self.assertEqual( + { + 'resource': 'object', + 'method': 'PUT', + 'account': 'a', + }, + self._do_test_swift_base_labels( + mw_conf, '/v1/a', req_hdrs, extra_environ=extra_environ)) + + self.assertEqual( + { + 'resource': 'object', + 'method': 'PUT', + 'account': 'a', + }, + self._do_test_swift_base_labels( + mw_conf, '/v1/a/c', req_hdrs, extra_environ=extra_environ)) + + self.assertEqual( + { + 'resource': 'object', + 'method': 'PUT', + 'account': 'a', + }, + self._do_test_swift_base_labels( + mw_conf, '/v1/a/c/o', req_hdrs, extra_environ=extra_environ)) + + def test_update_swift_base_labels_swift_request_complete_existing(self): + # verify that existing account and container fields are not replaced + mw_conf = {} + req_hdrs = {} + extra_environ = { + 'swift.base_labels': { + 'resource': 'object', + 'method': 'PUT', + 'account': 'a', + 'container': 'c', + }, + } + + self.assertEqual( + { + 'resource': 'object', + 'method': 'PUT', + 'account': 'a', + 'container': 'c', + }, + self._do_test_swift_base_labels( + mw_conf, '/v1/aa', req_hdrs, extra_environ=extra_environ)) + + self.assertEqual( + { + 'resource': 'object', + 'method': 'PUT', + 'account': 'a', + 'container': 'c', + }, + self._do_test_swift_base_labels( + mw_conf, '/v1/aa/cc', req_hdrs, extra_environ=extra_environ)) + + def test_update_swift_base_labels_s3_request_partial_existing(self): + # verify that existing container field is not replaced by s3 field + mw_conf = {} + req_hdrs = { + 'Authorization': 'AWS test:tester:hmac', + 'Date': email.utils.formatdate(time.time() + 0), + } + extra_environ = { + 'swift.base_labels': { + 'resource': 'object', + 'method': 'PUT', + 'container': 'c', + }, + } + self.assertEqual( + { + 'resource': 'object', + 'method': 'PUT', + 'container': 'c', + }, + self._do_test_swift_base_labels( + mw_conf, '/bucket', req_hdrs, extra_environ=extra_environ)) + + self.assertEqual( + { + 'resource': 'object', + 'method': 'PUT', + 'container': 'c', + }, + self._do_test_swift_base_labels( + mw_conf, '/bucket/obj', req_hdrs, extra_environ=extra_environ)) + + extra_environ = { + 'swift.base_labels': { + 'resource': 'object', + 'method': 'PUT', + }, + } + self.assertEqual( + { + 'resource': 'object', + 'method': 'PUT', + }, + self._do_test_swift_base_labels( + mw_conf, '/bucket', req_hdrs, extra_environ=extra_environ)) + + self.assertEqual( + { + 'resource': 'object', + 'method': 'PUT', + }, + self._do_test_swift_base_labels( + mw_conf, '/bucket/obj', req_hdrs, extra_environ=extra_environ)) + + mw_conf = {'storage_domain': 'domain'} + self.assertEqual( + { + 'resource': 'object', + 'method': 'PUT', + }, + self._do_test_swift_base_labels( + mw_conf, '/bucket/obj', req_hdrs, extra_environ=extra_environ)) + + def test_update_swift_base_labels_s3_request(self): + mw_conf = {} + req_hdrs = { + 'Authorization': 'AWS test:tester:hmac', + 'Date': email.utils.formatdate(time.time() + 0), + } + self.assertEqual( + { + 'resource': 'container', + 'method': 'PUT', + 'container': 'bucket' + }, + self._do_test_swift_base_labels(mw_conf, '/bucket', req_hdrs)) + + self.assertEqual( + { + 'resource': 'object', + 'method': 'PUT', + 'container': 'bucket', + }, + self._do_test_swift_base_labels(mw_conf, '/bucket/obj', req_hdrs)) + + self.assertEqual( + { + 'resource': 'object', + 'method': 'PUT', + 'container': 'bucket', + }, + self._do_test_swift_base_labels(mw_conf, '/bucket/obj/x', + req_hdrs)) + + def test_update_swift_base_labels_s3_request_bucket_in_host(self): + mw_conf = {'storage_domain': 'domain'} + req_hdrs = { + 'Authorization': 'AWS test:tester:hmac', + 'Date': email.utils.formatdate(time.time() + 0), + } + self.assertEqual( + { + 'resource': 'container', + 'method': 'PUT', + 'container': 'foo' + }, + self._do_test_swift_base_labels(mw_conf, '/', req_hdrs)) + + self.assertEqual( + { + 'resource': 'object', + 'method': 'PUT', + 'container': 'foo', + }, + self._do_test_swift_base_labels(mw_conf, '/obj', req_hdrs)) + + self.assertEqual( + { + 'resource': 'object', + 'method': 'PUT', + 'container': 'foo', + }, + self._do_test_swift_base_labels(mw_conf, '/obj/x', req_hdrs)) + + mw_conf = {'storage_domain': 'not-domain'} + self.assertEqual( + { + 'resource': 'object', + 'method': 'PUT', + 'container': 'bucket', + }, + self._do_test_swift_base_labels(mw_conf, '/bucket/obj', req_hdrs)) + + def _do_test_base_labels_end_to_end(self, orig_path, new_path=None, + req_hdrs=None): + # if new_path is given then pretend an s3api/auth middleware + # combination replaces the request path with new_path + mw_conf = {} + base_labels = [] + + def fake_mw(env, start_response): + base_labels.append(dict(env.get('swift.base_labels'))) + env['PATH_INFO'] = new_path or orig_path + return right_mw(env, start_response) + + def fake_app(env, start_response): + base_labels.append(dict(env.get('swift.base_labels'))) + return HTTPOk()(env, start_response) + + left_mw = proxy_logging.ProxyLoggingMiddleware( + fake_mw, mw_conf, logger=self.logger) + right_mw = proxy_logging.ProxyLoggingMiddleware( + fake_app, mw_conf, logger=self.logger) + + req = Request.blank(orig_path, headers=req_hdrs) + req.method = 'PUT' + req.get_response(left_mw) + self.assertEqual(2, len(base_labels)) + + return base_labels + + def test_base_labels_end_to_end_info(self): + base_labels = self._do_test_base_labels_end_to_end('/info') + + self.assertEqual( + [{'method': 'PUT', 'resource': 'UNKNOWN'}, + {'method': 'PUT', 'resource': 'UNKNOWN'}], + base_labels) + + def test_base_labels_end_to_end_account(self): + base_labels = self._do_test_base_labels_end_to_end('/v1/a') + + self.assertEqual( + [{'account': 'a', 'method': 'PUT', 'resource': 'account'}, + {'account': 'a', 'method': 'PUT', 'resource': 'account'}], + base_labels) + + def test_base_labels_end_to_end_container(self): + base_labels = self._do_test_base_labels_end_to_end('/v1/a/c') + + self.assertEqual([{'account': 'a', 'container': 'c', 'method': 'PUT', + 'resource': 'container'}, + {'account': 'a', 'container': 'c', 'method': 'PUT', + 'resource': 'container'}], + base_labels) + + def test_base_labels_end_to_end_object(self): + base_labels = self._do_test_base_labels_end_to_end('/v1/a/c/o') + + self.assertEqual( + [{'account': 'a', 'container': 'c', 'method': 'PUT', + 'resource': 'object'}, + {'account': 'a', 'container': 'c', 'method': 'PUT', + 'resource': 'object'}], + base_labels) + + def test_swift_base_labels_end_to_end_account_s3(self): + req_hdrs = { + 'Authorization': 'AWS test:tester:hmac', + 'Date': email.utils.formatdate(time.time() + 0), + } + + base_labels = self._do_test_base_labels_end_to_end( + '/', '/v1/a', req_hdrs) + self.assertEqual( + [{'method': 'PUT'}, + {'account': 'a', 'method': 'PUT', 'resource': 'account'}], + base_labels) + + def test_base_labels_end_to_end_container_s3(self): + req_hdrs = { + 'Authorization': 'AWS test:tester:hmac', + 'Date': email.utils.formatdate(time.time() + 0), + } + + base_labels = self._do_test_base_labels_end_to_end( + '/bucket', '/v1/a/bucket', req_hdrs) + self.assertEqual( + [{'container': 'bucket', 'method': 'PUT', 'resource': 'container'}, + {'account': 'a', 'container': 'bucket', 'method': 'PUT', + 'resource': 'container'}], + base_labels) + + def test_base_labels_end_to_end_object_s3(self): + req_hdrs = { + 'Authorization': 'AWS test:tester:hmac', + 'Date': email.utils.formatdate(time.time() + 0), + } + + base_labels = self._do_test_base_labels_end_to_end( + '/bucket/o', '/v1/a/bucket/o', + req_hdrs) + self.assertEqual( + [{'container': 'bucket', 'method': 'PUT', 'resource': 'object'}, + {'account': 'a', 'container': 'bucket', + 'method': 'PUT', 'resource': 'object'}], + base_labels) + + def _do_test_call_app(self, req, app): + status, headers, body_iter = req.call_application(app) + body = b''.join(body_iter) + return status, headers, body + + def test_xfer_stats_put(self): + buffer_str = (b'some stuff\n' + b'some other stuff\n' + b'some additional extra stuff\n') + buffer_len = len(buffer_str) + read_calls = 0 + read_bytes = 0 + + # statsd calls expected while the request body is being read... + # (these are in the form expected by assertLabeledUpdateStats) + exp_req_stats_per_iter = [] + nbytes = 0 + while nbytes + 5 <= buffer_len: + iter_stats = [ + ('swift_proxy_server_request_body_streaming_bytes', 5, { + 'account': 'a', + 'container': 'c', + 'method': 'PUT', + 'resource': 'container'}) + ] + exp_req_stats_per_iter.append(iter_stats) + + nbytes += 5 + if nbytes < buffer_len: + iter_stats = [ + ('swift_proxy_server_request_body_streaming_bytes', + buffer_len - nbytes, { + 'account': 'a', + 'container': 'c', + 'method': 'PUT', + 'resource': 'container'}) + ] + exp_req_stats_per_iter.append(iter_stats) + # statsd calls expected while the response is being handled... + expect_resp_stats = [ + ('swift_proxy_server_response_body_streaming_bytes', + len('FAKE APP'), { + 'resource': 'container', + 'method': 'PUT', + 'status': 200, + 'policy': '0', + 'account': 'a', + 'container': 'c'}), + ('swift_proxy_server_request_body_bytes', buffer_len, { + 'resource': 'container', + 'method': 'PUT', + 'status': 200, + 'account': 'a', + 'container': 'c'}), + ('swift_proxy_server_response_body_bytes', + len('FAKE APP'), { + 'account': 'a', + 'container': 'c', + 'method': 'PUT', + 'resource': 'container', + 'status': 200}), + ] + captured_req_stats_per_iter = [] + + def capture_stats(nbytes): + # capture stats emitted while the request body is being read + statsd_calls = self.statsd.calls['update_stats'] + metric_value_labels = [] + for statsd_call in statsd_calls: + metric_value_labels.append( + (statsd_call[0][0], statsd_call[0][1], + statsd_call[1]['labels'])) + if len(metric_value_labels)> 0: + captured_req_stats_per_iter.append(metric_value_labels) + nonlocal read_calls + nonlocal read_bytes + read_calls += 1 + read_bytes += nbytes + self.statsd.clear() + + conf = { + 'statsd_label_mode': 'dogstatsd', + 'statsd_emit_buffer_xfer_bytes_seconds': 0, + } + app = proxy_logging.ProxyLoggingMiddleware( + FakeApp(read_callback=capture_stats), conf, logger=self.logger) + app.statsd = self.statsd + req = Request.blank( + '/v1/a/c', + environ={'REQUEST_METHOD': 'PUT', + 'wsgi.input': BytesIO(buffer_str), + }) + resp = app(req.environ, start_response) + self.assertEqual(b'FAKE APP', b''.join(resp)) + self.assertEqual(read_bytes, buffer_len) + self.assertEqual(read_calls, len(captured_req_stats_per_iter)) + + self.assertEqual(exp_req_stats_per_iter, + captured_req_stats_per_iter) + # note: the fake statsd was cleared after the request body was read so + # just has the response handling statsd calls... + self.assertLabeledUpdateStats(expect_resp_stats) + self.assertUpdateStats([ + ('container.PUT.200.xfer', + buffer_len + len('FAKE APP')), + ], app) + + def test_xfer_stats_get(self): + buffers = [b'some stuff\n', + b'some other stuff\n', + b'some additional stuff\n'] + buffer_len = sum(len(b) for b in buffers) + conf = { + 'log_headers': 'yes', + 'statsd_label_mode': 'dogstatsd', + 'statsd_emit_buffer_xfer_bytes_seconds': 0, + } + app = proxy_logging.ProxyLoggingMiddleware( + FakeApp( + body=buffers, + ), conf, logger=self.logger) + app.statsd = self.statsd + req = Request.blank( + '/v1/a/c', environ={'REQUEST_METHOD': 'GET'}) + resp = app(req.environ, start_response) + resp_body = b''.join(resp) + expected_resp = b''.join(buffers) + log_parts = self._log_parts(app) + self.assertEqual(log_parts[3], 'GET') + self.assertEqual(log_parts[4], '/v1/a/c') + self.assertEqual(log_parts[5], 'HTTP/1.0') + self.assertEqual(log_parts[6], '200') + self.assertEqual(resp_body, expected_resp) + self.assertEqual(log_parts[11], str(buffer_len)) + self.assertUpdateStats([ + ('container.GET.200.xfer', + buffer_len), + ], app) + self.assertLabeledUpdateStats([ + ('swift_proxy_server_response_body_streaming_bytes', 11, { + 'account': 'a', + 'container': 'c', + 'method': 'GET', + 'policy': '0', + 'resource': 'container', + 'status': 200}), + ('swift_proxy_server_response_body_streaming_bytes', 17, { + 'account': 'a', + 'container': 'c', + 'method': 'GET', + 'policy': '0', + 'resource': 'container', + 'status': 200}), + ('swift_proxy_server_response_body_streaming_bytes', 22, { + 'account': 'a', + 'container': 'c', + 'method': 'GET', + 'policy': '0', + 'resource': 'container', + 'status': 200}), + ('swift_proxy_server_request_body_bytes', 0, { + 'resource': 'container', + 'method': 'GET', + 'status': 200, + 'account': 'a', + 'container': 'c'}), + ('swift_proxy_server_response_body_bytes', buffer_len, { + 'resource': 'container', + 'method': 'GET', + 'status': 200, + 'account': 'a', + 'container': 'c'}), + ]) + + def test_xfer_stats_emit_frequency_put(self): + + conf = { + 'statsd_label_mode': 'dogstatsd', + 'statsd_emit_buffer_xfer_bytes_seconds': 0.005, + } + app = proxy_logging.ProxyLoggingMiddleware( + FakeApp(), conf, logger=self.logger) + app.statsd = self.statsd + buffer_str = (b'some stuff\n' + b'some other stuff\n' + b'some additional stuff and blah\n') + buffer_len = len(buffer_str) + req = Request.blank( + '/v1/a/c', + environ={'REQUEST_METHOD': 'PUT', + 'wsgi.input': BytesIO(buffer_str), + }) + with mock.patch( + 'time.time', + side_effect=(0.001 * i for i in itertools.count())): + resp = app(req.environ, start_response) + # exhaust generator + self.assertEqual(b'FAKE APP', b''.join(resp)) + log_parts = self._log_parts(app) + self.assertEqual(log_parts[11], str(len('FAKE APP'))) + self.assertEqual(log_parts[10], str(buffer_len)) + self.assertUpdateStats([ + ('container.PUT.200.xfer', + buffer_len + len('FAKE APP')), + ], app) + self.assertLabeledUpdateStats([ + ('swift_proxy_server_request_body_streaming_bytes', 20, { + 'account': 'a', + 'container': 'c', + 'method': 'PUT', + 'resource': 'container'}), + ('swift_proxy_server_request_body_streaming_bytes', 25, { + 'account': 'a', + 'container': 'c', + 'method': 'PUT', + 'resource': 'container'}), + ('swift_proxy_server_request_body_streaming_bytes', 14, { + 'account': 'a', + 'container': 'c', + 'method': 'PUT', + 'resource': 'container'}), + ('swift_proxy_server_response_body_streaming_bytes', + len('FAKE APP'), { + 'resource': 'container', + 'method': 'PUT', + 'status': 200, + 'policy': '0', + 'account': 'a', + 'container': 'c'}), + ('swift_proxy_server_request_body_bytes', buffer_len, { + 'resource': 'container', + 'method': 'PUT', + 'status': 200, + 'account': 'a', + 'container': 'c'}), + ('swift_proxy_server_response_body_bytes', 8, { + 'account': 'a', + 'container': 'c', + 'method': 'PUT', + 'resource': 'container', + 'status': 200}) + ]) + + def test_xfer_stats_emit_frequency_get(self): + + buffers = [b'some stuff\n', + b'some other stuff\n', + b'some additional stuff and all\n'] + buffer_len = sum(len(b) for b in buffers) + conf = { + 'statsd_label_mode': 'dogstatsd', + 'statsd_emit_buffer_xfer_bytes_seconds': 0.005, + } + app = proxy_logging.ProxyLoggingMiddleware( + FakeApp( + body=buffers, + ), conf, logger=self.logger) + app.statsd = self.statsd + req = Request.blank( + '/v1/a/c', environ={'REQUEST_METHOD': 'GET'}) + with mock.patch( + 'time.time', + side_effect=(0.001 * i for i in itertools.count())): + resp = app(req.environ, start_response) + resp_body = b''.join(resp) + expected_resp = b''.join(buffers) + self.assertEqual(resp_body, expected_resp) + self.assertUpdateStats([ + ('container.GET.200.xfer', buffer_len), + ], app + ) + self.assertLabeledUpdateStats([ + ('swift_proxy_server_response_body_streaming_bytes', 58, { + 'account': 'a', + 'container': 'c', + 'method': 'GET', + 'policy': '0', + 'resource': 'container', + 'status': 200}), + ('swift_proxy_server_request_body_bytes', 0, { + 'resource': 'container', + 'method': 'GET', + 'status': 200, + 'account': 'a', + 'container': 'c'}), + ('swift_proxy_server_response_body_bytes', buffer_len, { + 'resource': 'container', + 'method': 'GET', + 'status': 200, + 'account': 'a', + 'container': 'c'}), + ]) + + def _make_logged_pipeline(self, storage_domain=None, rewrite_path=False): + # make a pipeline: + # proxy_logging s3api fake_auth [rewrite_path] proxy_logging fake_swift + fake_swift = FakeSwift(test_read_size=5) + app = proxy_logging.ProxyLoggingMiddleware(fake_swift, { + 'access_log_route': 'subrequest', + }, logger=self.logger) + if rewrite_path: + app = PathRewritingApp(app, self.logger) + app = FakeAuthApp(app) + app._pipeline_final_app = fake_swift + app = s3api_filter_factory({ + 'force_swift_request_proxy_log': False, + 'storage_domain': storage_domain, + })(app) + proxy_logging_conf = { + 'access_log_route': 'proxy_access', + 'statsd_label_mode': 'dogstatsd', + 'statsd_emit_buffer_xfer_bytes_seconds': 0, + 'storage_domain': storage_domain, + } + app = proxy_logging.ProxyLoggingMiddleware( + app, proxy_logging_conf, logger=self.logger) + app.statsd = self.statsd + return app, fake_swift + + def test_xfer_stats_put_s3api(self): + app, swift = self._make_logged_pipeline(rewrite_path=True) + buffer_str = (b'some stuff\n' + b'some other stuff\n' + b'some additional stuff\n') + buffer_len = len(buffer_str) + etag = md5(buffer_str, usedforsecurity=False).hexdigest() + last_modified = 'Fri, 01 Apr 2014 12:00:00 GMT' + + swift.register('PUT', '/v1/AUTH_test/bucket+segments/object', + HTTPCreated, + {'etag': etag, + 'last-modified': last_modified, + 'Content-Length': 0}, + None) + + date_header = email.utils.formatdate(time.time() + 0) + req = Request.blank( + '/bucket/object', + environ={ + 'REQUEST_METHOD': 'PUT', + 'wsgi.input': BytesIO(buffer_str), + }, + headers={ + 'Authorization': 'AWS test:tester:hmac', + 'Date': date_header, + }, + ) + + status, headers, body = self._do_test_call_app(req, app) + self.assertEqual('200 OK', status) + + self.assertEqual('/v1/AUTH_test/bucket/object', + req.environ['swift.backend_path']) + base_labels = req.environ.get('swift.base_labels') + self.assertIsNotNone(base_labels) + + self.assertEqual(swift.calls, [ + ('PUT', '/v1/AUTH_test/bucket+segments/object'), + ]) + + self.assertUpdateStats([ + ('object.PUT.200.xfer', buffer_len), + ('object.policy.0.PUT.200.xfer', buffer_len) + ], app) + + stats = [] + for i in range(10): + stats.append( + ('swift_proxy_server_request_body_streaming_bytes', 5, { + 'account': 'AUTH_test', + 'container': 'bucket', + 'method': 'PUT', + 'resource': 'object'})) + stats += [ + ('swift_proxy_server_request_body_bytes', buffer_len, { + 'resource': 'object', + 'method': 'PUT', + 'status': 200, + 'account': 'AUTH_test', + 'container': 'bucket', + 'policy': '0'}), + ('swift_proxy_server_response_body_bytes', 0, { + 'resource': 'object', + 'method': 'PUT', + 'status': 200, + 'account': 'AUTH_test', + 'container': 'bucket', + 'policy': '0'})] + self.assertLabeledUpdateStats(stats) + + req = Request.blank('/bucket/object', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Authorization': 'AWS test:tester:hmac', + 'Date': date_header}) + status, headers, body = self._do_test_call_app(req, app) + self.assertEqual(status.split()[0], '200') + + def test_xfer_stats_get_s3api(self): + app, swift = self._make_logged_pipeline() + buffers = [b'some stuff\n', + b'some other stuff\n', + b'some additional stuff\n'] + buffer_len = sum(len(b) for b in buffers) + last_modified = 'Fri, 01 Apr 2014 12:00:00 GMT' + date_header = email.utils.formatdate(time.time() + 0) + + swift.register('GET', '/v1/AUTH_test/bucket/object', + HTTPOk, + {'last-modified': last_modified}, + buffers) + + req = Request.blank( + '/bucket/object', + environ={'REQUEST_METHOD': 'GET'}, + headers={'Authorization': 'AWS test:tester:hmac', + 'Date': date_header}, + ) + status, headers, body = self._do_test_call_app(req, app) + self.assertEqual('200 OK', status) + + self.assertEqual('/v1/AUTH_test/bucket/object', + req.environ['swift.backend_path']) + base_labels = req.environ.get('swift.base_labels', None) + self.assertEqual(base_labels, { + 'resource': 'object', + 'method': 'GET', + 'account': 'AUTH_test', + 'container': 'bucket', + }) + + self.assertEqual(swift.calls, [ + ('GET', '/v1/AUTH_test/bucket/object'), + ]) + + self.assertUpdateStats([ + ('object.GET.200.xfer', buffer_len), + ('object.policy.0.GET.200.xfer', buffer_len) + ], app) + + self.assertLabeledUpdateStats([ + ('swift_proxy_server_response_body_streaming_bytes', 11, { + 'resource': 'object', + 'method': 'GET', + 'status': 200, + 'policy': '0', + 'account': 'AUTH_test', + 'container': 'bucket', + }), + ('swift_proxy_server_response_body_streaming_bytes', 17, { + 'resource': 'object', + 'method': 'GET', + 'status': 200, + 'policy': '0', + 'account': 'AUTH_test', + 'container': 'bucket', + }), + ('swift_proxy_server_response_body_streaming_bytes', 22, { + 'resource': 'object', + 'method': 'GET', + 'status': 200, + 'policy': '0', + 'account': 'AUTH_test', + 'container': 'bucket', + }), + ('swift_proxy_server_request_body_bytes', 0, { + 'resource': 'object', + 'method': 'GET', + 'status': 200, + 'account': 'AUTH_test', + 'container': 'bucket', + 'policy': '0'}), + ('swift_proxy_server_response_body_bytes', buffer_len, { + 'resource': 'object', + 'method': 'GET', + 'status': 200, + 'account': 'AUTH_test', + 'container': 'bucket', + 'policy': '0'}), + ]) + + def test_base_labels_put_swift(self): + app, swift = self._make_logged_pipeline() + buffer_str = (b'some stuff\n' + b'some other stuff\n' + b'some additional stuff\n') + etag = md5(buffer_str, usedforsecurity=False).hexdigest() + last_modified = 'Fri, 01 Apr 2014 12:00:00 GMT' + + swift.register('PUT', '/v1/AUTH_test/bucket/object', + HTTPCreated, + {'etag': etag, + 'last-modified': last_modified, + 'Content-Length': 0}, + None) + + date_header = email.utils.formatdate(time.time() + 0) + req = Request.blank( + '/v1/AUTH_test/bucket/object', + environ={ + 'REQUEST_METHOD': 'PUT', + 'wsgi.input': BytesIO(buffer_str), + }, + headers={ + 'Date': date_header, + }, + ) + + status, headers, body = self._do_test_call_app(req, app) + self.assertEqual('201 Created', status) + + base_labels = req.environ.get('swift.base_labels', None) + self.assertEqual(base_labels, { + 'resource': 'object', + 'method': 'PUT', + 'account': 'AUTH_test', + 'container': 'bucket', + }) + + self.assertEqual(swift.calls, [ + ('PUT', '/v1/AUTH_test/bucket/object'), + ]) + + def test_base_labels_put_s3api(self): + app, swift = self._make_logged_pipeline() + buffer_str = (b'some stuff\n' + b'some other stuff\n' + b'some additional stuff\n') + etag = md5(buffer_str, usedforsecurity=False).hexdigest() + last_modified = 'Fri, 01 Apr 2014 12:00:00 GMT' + + swift.register('PUT', '/v1/AUTH_test/bucket/object', + HTTPCreated, + {'etag': etag, + 'last-modified': last_modified, + 'Content-Length': 0}, + None) + + date_header = email.utils.formatdate(time.time() + 0) + req = Request.blank( + '/bucket/object', + environ={ + 'REQUEST_METHOD': 'PUT', + 'wsgi.input': BytesIO(buffer_str), + }, + headers={ + 'Authorization': 'AWS test:tester:hmac', + 'Date': date_header, + }, + ) + + status, headers, body = self._do_test_call_app(req, app) + self.assertEqual('200 OK', status) + + self.assertEqual('/v1/AUTH_test/bucket/object', + req.environ['swift.backend_path']) + base_labels = req.environ.get('swift.base_labels', None) + self.assertEqual(base_labels, { + 'resource': 'object', + 'method': 'PUT', + 'account': 'AUTH_test', + 'container': 'bucket', + }) + + self.assertEqual(swift.calls, [ + ('PUT', '/v1/AUTH_test/bucket/object'), + ]) + + def test_base_labels_put_s3api_storage_domain(self): + app, swift = self._make_logged_pipeline(storage_domain='domain') + buffer_str = (b'some stuff\n' + b'some other stuff\n' + b'some additional stuff\n') + + etag = md5(buffer_str, usedforsecurity=False).hexdigest() + last_modified = 'Fri, 01 Apr 2014 12:00:00 GMT' + + swift.register('PUT', '/v1/AUTH_test/ahost/object', + HTTPCreated, + {'etag': etag, + 'last-modified': last_modified, + 'Content-Length': 0}, + None) + + date_header = email.utils.formatdate(time.time() + 0) + req = Request.blank( + '/object', + environ={ + 'REQUEST_METHOD': 'PUT', + 'HTTP_HOST': 'ahost.domain', + 'wsgi.input': BytesIO(buffer_str), + }, + headers={ + 'Authorization': 'AWS test:tester:hmac', + 'Date': date_header, + }, + ) + + status, headers, body = self._do_test_call_app(req, app) + self.assertEqual('200 OK', status) + + self.assertEqual('/v1/AUTH_test/ahost/object', + req.environ['swift.backend_path']) + base_labels = req.environ.get('swift.base_labels', None) + self.assertEqual(base_labels, { + 'resource': 'object', + 'method': 'PUT', + 'account': 'AUTH_test', + 'container': 'ahost', + }) + + self.assertEqual(swift.calls, [ + ('PUT', '/v1/AUTH_test/ahost/object'), + ]) + def test_log_query_string(self): app = proxy_logging.ProxyLoggingMiddleware(FakeApp(), {}) app.access_logger = debug_logger() @@ -1289,7 +2591,7 @@ class TestProxyLogging(unittest.TestCase): 'QUERY_STRING': 'x=3'}) resp = app(req.environ, start_response) # exhaust generator - [x for x in resp] + self.assertEqual(b'FAKE APP', b''.join(resp)) log_parts = self._log_parts(app) self.assertEqual(unquote(log_parts[4]), '/?x=3') @@ -1300,7 +2602,7 @@ class TestProxyLogging(unittest.TestCase): 'REMOTE_ADDR': '1.2.3.4'}) resp = app(req.environ, start_response) # exhaust generator - [x for x in resp] + self.assertEqual(b'FAKE APP', b''.join(resp)) log_parts = self._log_parts(app) self.assertEqual(log_parts[0], '1.2.3.4') # client ip self.assertEqual(log_parts[1], '1.2.3.4') # remote addr @@ -1330,7 +2632,7 @@ class TestProxyLogging(unittest.TestCase): 'REMOTE_ADDR': '1.2.3.4'}) resp = app(req.environ, start_response) # exhaust generator - [x for x in resp] + self.assertEqual(b'CloseableBody', b''.join(resp)) self.assertTrue(body.closed) def test_chunked_response(self): @@ -1338,7 +2640,7 @@ class TestProxyLogging(unittest.TestCase): req = Request.blank('/') resp = app(req.environ, start_response) # exhaust generator - [x for x in resp] + self.assertEqual(b'FAKE APP', b''.join(resp)) def test_proxy_client_logging(self): app = proxy_logging.ProxyLoggingMiddleware(FakeApp(), {}) @@ -1349,7 +2651,7 @@ class TestProxyLogging(unittest.TestCase): 'HTTP_X_FORWARDED_FOR': '4.5.6.7,8.9.10.11'}) resp = app(req.environ, start_response) # exhaust generator - [x for x in resp] + self.assertEqual(b'FAKE APP', b''.join(resp)) log_parts = self._log_parts(app) self.assertEqual(log_parts[0], '4.5.6.7') # client ip self.assertEqual(log_parts[1], '1.2.3.4') # remote addr @@ -1362,7 +2664,7 @@ class TestProxyLogging(unittest.TestCase): 'HTTP_X_CLUSTER_CLIENT_IP': '4.5.6.7'}) resp = app(req.environ, start_response) # exhaust generator - [x for x in resp] + self.assertEqual(b'FAKE APP', b''.join(resp)) log_parts = self._log_parts(app) self.assertEqual(log_parts[0], '4.5.6.7') # client ip self.assertEqual(log_parts[1], '1.2.3.4') # remote addr diff --git a/test/unit/common/test_wsgi.py b/test/unit/common/test_wsgi.py index 557ee99bc6..ce35fec772 100644 --- a/test/unit/common/test_wsgi.py +++ b/test/unit/common/test_wsgi.py @@ -1264,6 +1264,18 @@ class TestWSGI(unittest.TestCase): newenv = wsgi.make_env(oldenv) self.assertIs(newenv.get('swift.infocache'), oldenv['swift.infocache']) + def test_make_env_keeps_shard_listing_history(self): + oldenv = {'swift.shard_listing_history': []} + newenv = wsgi.make_env(oldenv) + self.assertIs(newenv.get('swift.shard_listing_history'), + oldenv['swift.shard_listing_history']) + + def test_make_env_keeps_base_labels(self): + oldenv = {'swift.base_labels': []} + newenv = wsgi.make_env(oldenv) + self.assertIs(newenv.get('swift.base_labels'), + oldenv['swift.base_labels']) + class CommonTestMixin(object):

AltStyle によって変換されたページ (->オリジナル) /