Migrate encrypter tests to assert FakeSwiftCall attributes
Make encrypter unit test assertions more explicit by using assertions on the named attributes of FakeSwiftCall rather than assertions on position-indexed call tuples. Change-Id: I871ddcc4ba559e7e4c0d0e28464780c6cd669797
This commit is contained in:
1 changed files with 77 additions and 77 deletions
@@ -96,13 +96,12 @@ class TestEncrypter(unittest.TestCase):
self.assertEqual(plaintext_etag, resp.headers['Etag'])
# verify metadata items
self.assertEqual(1, len(self.app.calls), self.app.calls)
self.assertEqual('PUT', self.app.calls[0][0])
actual_footers = self.app.call_list[0].footers
actual_headers = self.app.call_list[0].headers
self.assertEqual(1, len(self.app.call_list), self.app.calls)
call = self.app.call_list[0]
self.assertEqual(('PUT', '/v1/a/c/o'), (call.method, call.path))
# verify body crypto meta
actual = actual_footers['X-Object-Sysmeta-Crypto-Body-Meta']
actual = call.footers['X-Object-Sysmeta-Crypto-Body-Meta']
actual = json.loads(urllib.parse.unquote_plus(actual))
self.assertEqual(Crypto().cipher, actual['cipher'])
self.assertEqual(FAKE_IV, base64.b64decode(actual['iv']))
@@ -116,10 +115,10 @@ class TestEncrypter(unittest.TestCase):
self.assertEqual(fetch_crypto_keys()['id'], actual['key_id'])
# verify etag
self.assertEqual(ciphertext_etag, actual_footers['Etag'])
self.assertEqual(ciphertext_etag, call.footers['Etag'])
encrypted_etag, _junk, etag_meta = \
actual_footers['X-Object-Sysmeta-Crypto-Etag'].partition(
call.footers['X-Object-Sysmeta-Crypto-Etag'].partition(
'; swift_meta=')
# verify crypto_meta was appended to this etag
self.assertTrue(etag_meta)
@@ -134,7 +133,7 @@ class TestEncrypter(unittest.TestCase):
# verify etag MAC for conditional requests
actual_hmac = base64.b64decode(
actual_footers['X-Object-Sysmeta-Crypto-Etag-Mac'])
call.footers['X-Object-Sysmeta-Crypto-Etag-Mac'])
exp_hmac = hmac.new(
object_key,
plaintext_etag.encode('ascii'),
@@ -143,8 +142,8 @@ class TestEncrypter(unittest.TestCase):
# verify encrypted etag for container update
self.assertIn(
'X-Object-Sysmeta-Container-Update-Override-Etag', actual_footers)
parts = actual_footers[
'X-Object-Sysmeta-Container-Update-Override-Etag', call.footers)
parts = call.footers[
'X-Object-Sysmeta-Container-Update-Override-Etag'].rsplit(';', 1)
self.assertEqual(2, len(parts))
@@ -165,18 +164,18 @@ class TestEncrypter(unittest.TestCase):
self.assertEqual(exp_etag, base64.b64decode(parts[0]))
# content-type is not encrypted
self.assertEqual('text/plain', actual_headers['Content-Type'])
self.assertEqual('text/plain', call.headers['Content-Type'])
# user meta is encrypted
self._verify_user_metadata(
actual_headers, 'Test', 'encrypt me', object_key)
call.headers, 'Test', 'encrypt me', object_key)
self._verify_user_metadata(
actual_headers, 'Etag', 'not to be confused with the Etag!',
call.headers, 'Etag', 'not to be confused with the Etag!',
object_key)
# sysmeta is not encrypted
self.assertEqual('do not encrypt me',
actual_headers['X-Object-Sysmeta-Test'])
call.headers['X-Object-Sysmeta-Test'])
# verify object is encrypted by getting direct from the app
get_req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'})
@@ -205,30 +204,30 @@ class TestEncrypter(unittest.TestCase):
self.assertEqual('201 Created', resp.status)
self.assertEqual(plaintext_etag, resp.headers['Etag'])
self.assertEqual(1, len(self.app.calls), self.app.calls)
self.assertEqual('PUT', self.app.calls[0][0])
req_hdrs = self.app.headers[0]
actual_footers = self.app.call_list[0].footers
self.assertEqual(1, len(self.app.call_list), self.app.calls)
call = self.app.call_list[0]
self.assertEqual('PUT', call.method)
# verify that there is no body crypto meta
self.assertNotIn('X-Object-Sysmeta-Crypto-Meta', actual_footers)
self.assertNotIn('X-Object-Sysmeta-Crypto-Meta', call.footers)
# verify etag is md5 of plaintext
self.assertEqual(MD5_OF_EMPTY_STRING, actual_footers['Etag'])
self.assertEqual(MD5_OF_EMPTY_STRING, call.footers['Etag'])
# verify there is no etag crypto meta
self.assertNotIn('X-Object-Sysmeta-Crypto-Etag', actual_footers)
self.assertNotIn('X-Object-Sysmeta-Crypto-Etag', call.footers)
# verify there is no container update override for etag
self.assertNotIn(
'X-Object-Sysmeta-Container-Update-Override-Etag', actual_footers)
'X-Object-Sysmeta-Container-Update-Override-Etag', call.footers)
# user meta is still encrypted
self._verify_user_metadata(req_hdrs, 'Test', 'encrypt me', object_key)
self._verify_user_metadata(
req_hdrs, 'Etag', 'not to be confused with the Etag!',
call.headers, 'Test', 'encrypt me', object_key)
self._verify_user_metadata(
call.headers, 'Etag', 'not to be confused with the Etag!',
object_key)
# sysmeta is not encrypted
self.assertEqual('do not encrypt me',
req_hdrs['X-Object-Sysmeta-Test'])
call.headers['X-Object-Sysmeta-Test'])
# verify object is empty by getting direct from the app
get_req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'})
@@ -273,26 +272,26 @@ class TestEncrypter(unittest.TestCase):
self.assertEqual(plaintext_etag, resp.headers['Etag'])
# verify metadata items
self.assertEqual(1, len(self.app.calls), self.app.calls)
self.assertEqual('PUT', self.app.calls[0][0])
actual_footers = self.app.call_list[0].footers
self.assertEqual(1, len(self.app.call_list), self.app.calls)
call = self.app.call_list[0]
self.assertEqual(('PUT', '/v1/a/c/o'), (call.method, call.path))
# verify that other middleware's footers made it to app, including any
# container update overrides but nothing Etag-related
other_footers.pop('Etag')
other_footers.pop('X-Object-Sysmeta-Container-Update-Override-Etag')
for k, v in other_footers.items():
self.assertEqual(v, actual_footers[k])
self.assertEqual(v, call.footers[k])
# verify encryption footers are ok
encrypted_etag, _junk, etag_meta = \
actual_footers['X-Object-Sysmeta-Crypto-Etag'].partition(
call.footers['X-Object-Sysmeta-Crypto-Etag'].partition(
'; swift_meta=')
self.assertTrue(etag_meta)
actual_meta = json.loads(urllib.parse.unquote_plus(etag_meta))
self.assertEqual(Crypto().cipher, actual_meta['cipher'])
self.assertEqual(ciphertext_etag, actual_footers['Etag'])
self.assertEqual(ciphertext_etag, call.footers['Etag'])
actual = base64.b64decode(encrypted_etag)
etag_iv = base64.b64decode(actual_meta['iv'])
exp_etag = encrypt(plaintext_etag.encode('ascii'), object_key, etag_iv)
@@ -300,8 +299,8 @@ class TestEncrypter(unittest.TestCase):
# verify encrypted etag for container update
self.assertIn(
'X-Object-Sysmeta-Container-Update-Override-Etag', actual_footers)
parts = actual_footers[
'X-Object-Sysmeta-Container-Update-Override-Etag', call.footers)
parts = call.footers[
'X-Object-Sysmeta-Container-Update-Override-Etag'].rsplit(';', 1)
self.assertEqual(2, len(parts))
@@ -321,7 +320,7 @@ class TestEncrypter(unittest.TestCase):
self.assertEqual(exp_etag, base64.b64decode(parts[0]))
# verify body crypto meta
actual = actual_footers['X-Object-Sysmeta-Crypto-Body-Meta']
actual = call.footers['X-Object-Sysmeta-Crypto-Body-Meta']
actual = json.loads(urllib.parse.unquote_plus(actual))
self.assertEqual(Crypto().cipher, actual['cipher'])
self.assertEqual(FAKE_IV, base64.b64decode(actual['iv']))
@@ -364,14 +363,14 @@ class TestEncrypter(unittest.TestCase):
self.assertEqual(plaintext_etag, resp.headers['Etag'])
# verify metadata items
self.assertEqual(1, len(self.app.calls), self.app.calls)
self.assertEqual(('PUT', '/v1/a/c/o'), self.app.calls[0])
actual_footers = self.app.call_list[0].footers
self.assertEqual(1, len(self.app.call_list), self.app.calls)
call = self.app.call_list[0]
self.assertEqual(('PUT', '/v1/a/c/o'), (call.method, call.path))
# verify encrypted etag for container update
self.assertIn(
'X-Object-Sysmeta-Container-Update-Override-Etag', actual_footers)
parts = actual_footers[
'X-Object-Sysmeta-Container-Update-Override-Etag', call.footers)
parts = call.footers[
'X-Object-Sysmeta-Container-Update-Override-Etag'].rsplit(';', 1)
self.assertEqual(2, len(parts))
cont_key = fetch_crypto_keys()['container']
@@ -418,14 +417,14 @@ class TestEncrypter(unittest.TestCase):
self.assertEqual('201 Created', resp.status)
self.assertEqual(plaintext_etag, resp.headers['Etag'])
self.assertEqual(1, len(self.app.calls), self.app.calls)
self.assertEqual(('PUT', '/v1/a/c/o'), self.app.calls[0])
req_hdrs = self.app.headers[0]
self.assertEqual(1, len(self.app.call_list), self.app.calls)
call = self.app.call_list[0]
self.assertEqual(('PUT', '/v1/a/c/o'), (call.method, call.path))
self.assertIn(
'X-Object-Sysmeta-Container-Update-Override-Etag', req_hdrs)
'X-Object-Sysmeta-Container-Update-Override-Etag', call.headers)
self.assertEqual(
override_etag,
req_hdrs['X-Object-Sysmeta-Container-Update-Override-Etag'])
call.headers['X-Object-Sysmeta-Container-Update-Override-Etag'])
def test_PUT_with_empty_etag_override_in_headers(self):
self._test_PUT_with_empty_etag_override_in_headers(b'body')
@@ -455,14 +454,14 @@ class TestEncrypter(unittest.TestCase):
self.assertEqual('201 Created', resp.status)
self.assertEqual(plaintext_etag, resp.headers['Etag'])
self.assertEqual(1, len(self.app.calls), self.app.calls)
self.assertEqual(('PUT', '/v1/a/c/o'), self.app.calls[0])
actual_footers = self.app.call_list[0].footers
self.assertEqual(1, len(self.app.call_list), self.app.calls)
call = self.app.call_list[0]
self.assertEqual(('PUT', '/v1/a/c/o'), (call.method, call.path))
self.assertIn(
'X-Object-Sysmeta-Container-Update-Override-Etag', actual_footers)
'X-Object-Sysmeta-Container-Update-Override-Etag', call.footers)
self.assertEqual(
override_etag,
actual_footers['X-Object-Sysmeta-Container-Update-Override-Etag'])
call.footers['X-Object-Sysmeta-Container-Update-Override-Etag'])
def test_PUT_with_empty_etag_override_in_footers(self):
self._test_PUT_with_empty_etag_override_in_footers(b'body')
@@ -655,18 +654,18 @@ class TestEncrypter(unittest.TestCase):
self.assertNotIn('Etag', resp.headers)
# verify metadata items
self.assertEqual(1, len(self.app.calls), self.app.calls)
self.assertEqual('POST', self.app.calls[0][0])
req_hdrs = self.app.headers[0]
self.assertEqual(1, len(self.app.call_list), self.app.calls)
call = self.app.call_list[0]
self.assertEqual(('POST', '/v1/a/c/o'), (call.method, call.path))
# user meta is encrypted
self._verify_user_metadata(req_hdrs, 'Test', 'encrypt me', key)
self._verify_user_metadata(call.headers, 'Test', 'encrypt me', key)
# unless it had no value
self.assertEqual('', req_hdrs['X-Object-Meta-Test2'])
self.assertEqual('', call.headers['X-Object-Meta-Test2'])
# sysmeta is not encrypted
self.assertEqual('do not encrypt me',
req_hdrs['X-Object-Sysmeta-Test'])
call.headers['X-Object-Sysmeta-Test'])
def _test_no_user_metadata(self, method):
# verify that x-object-transient-sysmeta-crypto-meta is not set when
@@ -700,19 +699,19 @@ class TestEncrypter(unittest.TestCase):
resp = req.get_response(encrypter.Encrypter(app, {}))
self.assertEqual('200 OK', resp.status)
self.assertEqual(1, len(app.calls), app.calls)
self.assertEqual(method, app.calls[0][0])
actual_headers = app.headers[0]
self.assertEqual(1, len(app.call_list), app.calls)
call = app.call_list[0]
self.assertEqual(method, call.method)
# verify the alternate etag location has been specified
if match_header_value and match_header_value != '*':
self.assertIn('X-Backend-Etag-Is-At', actual_headers)
self.assertIn('X-Backend-Etag-Is-At', call.headers)
self.assertEqual('X-Object-Sysmeta-Crypto-Etag-Mac',
actual_headers['X-Backend-Etag-Is-At'])
call.headers['X-Backend-Etag-Is-At'])
# verify etags have been supplemented with masked values
self.assertIn(match_header_name, actual_headers)
actual_etags = set(actual_headers[match_header_name].split(', '))
self.assertIn(match_header_name, call.headers)
actual_etags = set(call.headers[match_header_name].split(', '))
# masked values for secret_id None
key = fetch_crypto_keys()['object']
masked_etags = [
@@ -769,14 +768,14 @@ class TestEncrypter(unittest.TestCase):
resp = req.get_response(self.encrypter)
self.assertEqual('200 OK', resp.status)
self.assertEqual(1, len(self.app.calls), self.app.calls)
self.assertEqual(method, self.app.calls[0][0])
actual_headers = self.app.headers[0]
self.assertIn('X-Backend-Etag-Is-At', actual_headers)
self.assertEqual(1, len(self.app.call_list), self.app.calls)
call = self.app.call_list[0]
self.assertEqual((method, '/v1/a/c/o'), (call.method, call.path))
self.assertIn('X-Backend-Etag-Is-At', call.headers)
self.assertEqual(
'X-Object-Sysmeta-Other-Etag,X-Object-Sysmeta-Crypto-Etag-Mac',
actual_headers['X-Backend-Etag-Is-At'])
actual_etags = set(actual_headers[match_header_name].split(', '))
call.headers['X-Backend-Etag-Is-At'])
actual_etags = set(call.headers[match_header_name].split(', '))
self.assertIn('"an etag"', actual_etags)
def test_GET_if_match_with_existing_etag_is_at_header(self):
@@ -805,19 +804,19 @@ class TestEncrypter(unittest.TestCase):
self.assertEqual('200 OK', resp.status)
self.assertEqual(1, len(self.app.calls), self.app.calls)
self.assertEqual(method, self.app.calls[0][0])
actual_headers = self.app.headers[0]
self.assertIn('X-Backend-Etag-Is-At', actual_headers)
call = self.app.call_list[0]
self.assertEqual((method, '/v1/a/c/o'), (call.method, call.path))
self.assertIn('X-Backend-Etag-Is-At', call.headers)
self.assertEqual('X-Object-Sysmeta-Crypto-Etag-Mac',
actual_headers['X-Backend-Etag-Is-At'])
call.headers['X-Backend-Etag-Is-At'])
self.assertIn('"%s"' % bytes_to_wsgi(base64.b64encode(
hmac.new(key, b'an etag', hashlib.sha256).digest())),
actual_headers['If-Match'])
self.assertIn('"another etag"', actual_headers['If-None-Match'])
call.headers['If-Match'])
self.assertIn('"another etag"', call.headers['If-None-Match'])
self.assertIn('"%s"' % bytes_to_wsgi(base64.b64encode(
hmac.new(key, b'another etag', hashlib.sha256).digest())),
actual_headers['If-None-Match'])
call.headers['If-None-Match'])
def test_GET_etag_is_at_not_duplicated(self):
self._test_etag_is_at_not_duplicated('GET')
@@ -952,9 +951,10 @@ class TestEncrypter(unittest.TestCase):
self.assertEqual('201 Created', resp.status)
# verify that other middleware's footers made it to app
actual_footers = self.app.call_list[0].footers
self.assertEqual(1, len(self.app.call_list))
call = self.app.call_list[0]
for k, v in other_footers.items():
self.assertEqual(v, actual_footers[k])
self.assertEqual(v, call.footers[k])
# verify object is NOT encrypted by getting direct from the app
get_req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'})
Reference in New Issue
Block a user
Blocking a user prevents them from interacting with repositories, such as opening or commenting on pull requests or issues. Learn more about blocking a user.