diff --git a/test/unit/common/middleware/crypto/test_encrypter.py b/test/unit/common/middleware/crypto/test_encrypter.py index a511efb706..79ffc3886d 100644 --- a/test/unit/common/middleware/crypto/test_encrypter.py +++ b/test/unit/common/middleware/crypto/test_encrypter.py @@ -96,13 +96,12 @@ class TestEncrypter(unittest.TestCase): self.assertEqual(plaintext_etag, resp.headers['Etag']) # verify metadata items - self.assertEqual(1, len(self.app.calls), self.app.calls) - self.assertEqual('PUT', self.app.calls[0][0]) - actual_footers = self.app.call_list[0].footers - actual_headers = self.app.call_list[0].headers + self.assertEqual(1, len(self.app.call_list), self.app.calls) + call = self.app.call_list[0] + self.assertEqual(('PUT', '/v1/a/c/o'), (call.method, call.path)) # verify body crypto meta - actual = actual_footers['X-Object-Sysmeta-Crypto-Body-Meta'] + actual = call.footers['X-Object-Sysmeta-Crypto-Body-Meta'] actual = json.loads(urllib.parse.unquote_plus(actual)) self.assertEqual(Crypto().cipher, actual['cipher']) self.assertEqual(FAKE_IV, base64.b64decode(actual['iv'])) @@ -116,10 +115,10 @@ class TestEncrypter(unittest.TestCase): self.assertEqual(fetch_crypto_keys()['id'], actual['key_id']) # verify etag - self.assertEqual(ciphertext_etag, actual_footers['Etag']) + self.assertEqual(ciphertext_etag, call.footers['Etag']) encrypted_etag, _junk, etag_meta = \ - actual_footers['X-Object-Sysmeta-Crypto-Etag'].partition( + call.footers['X-Object-Sysmeta-Crypto-Etag'].partition( '; swift_meta=') # verify crypto_meta was appended to this etag self.assertTrue(etag_meta) @@ -134,7 +133,7 @@ class TestEncrypter(unittest.TestCase): # verify etag MAC for conditional requests actual_hmac = base64.b64decode( - actual_footers['X-Object-Sysmeta-Crypto-Etag-Mac']) + call.footers['X-Object-Sysmeta-Crypto-Etag-Mac']) exp_hmac = hmac.new( object_key, plaintext_etag.encode('ascii'), @@ -143,8 +142,8 @@ class TestEncrypter(unittest.TestCase): # verify encrypted etag for container update self.assertIn( - 'X-Object-Sysmeta-Container-Update-Override-Etag', actual_footers) - parts = actual_footers[ + 'X-Object-Sysmeta-Container-Update-Override-Etag', call.footers) + parts = call.footers[ 'X-Object-Sysmeta-Container-Update-Override-Etag'].rsplit(';', 1) self.assertEqual(2, len(parts)) @@ -165,18 +164,18 @@ class TestEncrypter(unittest.TestCase): self.assertEqual(exp_etag, base64.b64decode(parts[0])) # content-type is not encrypted - self.assertEqual('text/plain', actual_headers['Content-Type']) + self.assertEqual('text/plain', call.headers['Content-Type']) # user meta is encrypted self._verify_user_metadata( - actual_headers, 'Test', 'encrypt me', object_key) + call.headers, 'Test', 'encrypt me', object_key) self._verify_user_metadata( - actual_headers, 'Etag', 'not to be confused with the Etag!', + call.headers, 'Etag', 'not to be confused with the Etag!', object_key) # sysmeta is not encrypted self.assertEqual('do not encrypt me', - actual_headers['X-Object-Sysmeta-Test']) + call.headers['X-Object-Sysmeta-Test']) # verify object is encrypted by getting direct from the app get_req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'}) @@ -205,30 +204,30 @@ class TestEncrypter(unittest.TestCase): self.assertEqual('201 Created', resp.status) self.assertEqual(plaintext_etag, resp.headers['Etag']) - self.assertEqual(1, len(self.app.calls), self.app.calls) - self.assertEqual('PUT', self.app.calls[0][0]) - req_hdrs = self.app.headers[0] - actual_footers = self.app.call_list[0].footers + self.assertEqual(1, len(self.app.call_list), self.app.calls) + call = self.app.call_list[0] + self.assertEqual('PUT', call.method) # verify that there is no body crypto meta - self.assertNotIn('X-Object-Sysmeta-Crypto-Meta', actual_footers) + self.assertNotIn('X-Object-Sysmeta-Crypto-Meta', call.footers) # verify etag is md5 of plaintext - self.assertEqual(MD5_OF_EMPTY_STRING, actual_footers['Etag']) + self.assertEqual(MD5_OF_EMPTY_STRING, call.footers['Etag']) # verify there is no etag crypto meta - self.assertNotIn('X-Object-Sysmeta-Crypto-Etag', actual_footers) + self.assertNotIn('X-Object-Sysmeta-Crypto-Etag', call.footers) # verify there is no container update override for etag self.assertNotIn( - 'X-Object-Sysmeta-Container-Update-Override-Etag', actual_footers) + 'X-Object-Sysmeta-Container-Update-Override-Etag', call.footers) # user meta is still encrypted - self._verify_user_metadata(req_hdrs, 'Test', 'encrypt me', object_key) self._verify_user_metadata( - req_hdrs, 'Etag', 'not to be confused with the Etag!', + call.headers, 'Test', 'encrypt me', object_key) + self._verify_user_metadata( + call.headers, 'Etag', 'not to be confused with the Etag!', object_key) # sysmeta is not encrypted self.assertEqual('do not encrypt me', - req_hdrs['X-Object-Sysmeta-Test']) + call.headers['X-Object-Sysmeta-Test']) # verify object is empty by getting direct from the app get_req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'}) @@ -273,26 +272,26 @@ class TestEncrypter(unittest.TestCase): self.assertEqual(plaintext_etag, resp.headers['Etag']) # verify metadata items - self.assertEqual(1, len(self.app.calls), self.app.calls) - self.assertEqual('PUT', self.app.calls[0][0]) - actual_footers = self.app.call_list[0].footers + self.assertEqual(1, len(self.app.call_list), self.app.calls) + call = self.app.call_list[0] + self.assertEqual(('PUT', '/v1/a/c/o'), (call.method, call.path)) # verify that other middleware's footers made it to app, including any # container update overrides but nothing Etag-related other_footers.pop('Etag') other_footers.pop('X-Object-Sysmeta-Container-Update-Override-Etag') for k, v in other_footers.items(): - self.assertEqual(v, actual_footers[k]) + self.assertEqual(v, call.footers[k]) # verify encryption footers are ok encrypted_etag, _junk, etag_meta = \ - actual_footers['X-Object-Sysmeta-Crypto-Etag'].partition( + call.footers['X-Object-Sysmeta-Crypto-Etag'].partition( '; swift_meta=') self.assertTrue(etag_meta) actual_meta = json.loads(urllib.parse.unquote_plus(etag_meta)) self.assertEqual(Crypto().cipher, actual_meta['cipher']) - self.assertEqual(ciphertext_etag, actual_footers['Etag']) + self.assertEqual(ciphertext_etag, call.footers['Etag']) actual = base64.b64decode(encrypted_etag) etag_iv = base64.b64decode(actual_meta['iv']) exp_etag = encrypt(plaintext_etag.encode('ascii'), object_key, etag_iv) @@ -300,8 +299,8 @@ class TestEncrypter(unittest.TestCase): # verify encrypted etag for container update self.assertIn( - 'X-Object-Sysmeta-Container-Update-Override-Etag', actual_footers) - parts = actual_footers[ + 'X-Object-Sysmeta-Container-Update-Override-Etag', call.footers) + parts = call.footers[ 'X-Object-Sysmeta-Container-Update-Override-Etag'].rsplit(';', 1) self.assertEqual(2, len(parts)) @@ -321,7 +320,7 @@ class TestEncrypter(unittest.TestCase): self.assertEqual(exp_etag, base64.b64decode(parts[0])) # verify body crypto meta - actual = actual_footers['X-Object-Sysmeta-Crypto-Body-Meta'] + actual = call.footers['X-Object-Sysmeta-Crypto-Body-Meta'] actual = json.loads(urllib.parse.unquote_plus(actual)) self.assertEqual(Crypto().cipher, actual['cipher']) self.assertEqual(FAKE_IV, base64.b64decode(actual['iv'])) @@ -364,14 +363,14 @@ class TestEncrypter(unittest.TestCase): self.assertEqual(plaintext_etag, resp.headers['Etag']) # verify metadata items - self.assertEqual(1, len(self.app.calls), self.app.calls) - self.assertEqual(('PUT', '/v1/a/c/o'), self.app.calls[0]) - actual_footers = self.app.call_list[0].footers + self.assertEqual(1, len(self.app.call_list), self.app.calls) + call = self.app.call_list[0] + self.assertEqual(('PUT', '/v1/a/c/o'), (call.method, call.path)) # verify encrypted etag for container update self.assertIn( - 'X-Object-Sysmeta-Container-Update-Override-Etag', actual_footers) - parts = actual_footers[ + 'X-Object-Sysmeta-Container-Update-Override-Etag', call.footers) + parts = call.footers[ 'X-Object-Sysmeta-Container-Update-Override-Etag'].rsplit(';', 1) self.assertEqual(2, len(parts)) cont_key = fetch_crypto_keys()['container'] @@ -418,14 +417,14 @@ class TestEncrypter(unittest.TestCase): self.assertEqual('201 Created', resp.status) self.assertEqual(plaintext_etag, resp.headers['Etag']) - self.assertEqual(1, len(self.app.calls), self.app.calls) - self.assertEqual(('PUT', '/v1/a/c/o'), self.app.calls[0]) - req_hdrs = self.app.headers[0] + self.assertEqual(1, len(self.app.call_list), self.app.calls) + call = self.app.call_list[0] + self.assertEqual(('PUT', '/v1/a/c/o'), (call.method, call.path)) self.assertIn( - 'X-Object-Sysmeta-Container-Update-Override-Etag', req_hdrs) + 'X-Object-Sysmeta-Container-Update-Override-Etag', call.headers) self.assertEqual( override_etag, - req_hdrs['X-Object-Sysmeta-Container-Update-Override-Etag']) + call.headers['X-Object-Sysmeta-Container-Update-Override-Etag']) def test_PUT_with_empty_etag_override_in_headers(self): self._test_PUT_with_empty_etag_override_in_headers(b'body') @@ -455,14 +454,14 @@ class TestEncrypter(unittest.TestCase): self.assertEqual('201 Created', resp.status) self.assertEqual(plaintext_etag, resp.headers['Etag']) - self.assertEqual(1, len(self.app.calls), self.app.calls) - self.assertEqual(('PUT', '/v1/a/c/o'), self.app.calls[0]) - actual_footers = self.app.call_list[0].footers + self.assertEqual(1, len(self.app.call_list), self.app.calls) + call = self.app.call_list[0] + self.assertEqual(('PUT', '/v1/a/c/o'), (call.method, call.path)) self.assertIn( - 'X-Object-Sysmeta-Container-Update-Override-Etag', actual_footers) + 'X-Object-Sysmeta-Container-Update-Override-Etag', call.footers) self.assertEqual( override_etag, - actual_footers['X-Object-Sysmeta-Container-Update-Override-Etag']) + call.footers['X-Object-Sysmeta-Container-Update-Override-Etag']) def test_PUT_with_empty_etag_override_in_footers(self): self._test_PUT_with_empty_etag_override_in_footers(b'body') @@ -655,18 +654,18 @@ class TestEncrypter(unittest.TestCase): self.assertNotIn('Etag', resp.headers) # verify metadata items - self.assertEqual(1, len(self.app.calls), self.app.calls) - self.assertEqual('POST', self.app.calls[0][0]) - req_hdrs = self.app.headers[0] + self.assertEqual(1, len(self.app.call_list), self.app.calls) + call = self.app.call_list[0] + self.assertEqual(('POST', '/v1/a/c/o'), (call.method, call.path)) # user meta is encrypted - self._verify_user_metadata(req_hdrs, 'Test', 'encrypt me', key) + self._verify_user_metadata(call.headers, 'Test', 'encrypt me', key) # unless it had no value - self.assertEqual('', req_hdrs['X-Object-Meta-Test2']) + self.assertEqual('', call.headers['X-Object-Meta-Test2']) # sysmeta is not encrypted self.assertEqual('do not encrypt me', - req_hdrs['X-Object-Sysmeta-Test']) + call.headers['X-Object-Sysmeta-Test']) def _test_no_user_metadata(self, method): # verify that x-object-transient-sysmeta-crypto-meta is not set when @@ -700,19 +699,19 @@ class TestEncrypter(unittest.TestCase): resp = req.get_response(encrypter.Encrypter(app, {})) self.assertEqual('200 OK', resp.status) - self.assertEqual(1, len(app.calls), app.calls) - self.assertEqual(method, app.calls[0][0]) - actual_headers = app.headers[0] + self.assertEqual(1, len(app.call_list), app.calls) + call = app.call_list[0] + self.assertEqual(method, call.method) # verify the alternate etag location has been specified if match_header_value and match_header_value != '*': - self.assertIn('X-Backend-Etag-Is-At', actual_headers) + self.assertIn('X-Backend-Etag-Is-At', call.headers) self.assertEqual('X-Object-Sysmeta-Crypto-Etag-Mac', - actual_headers['X-Backend-Etag-Is-At']) + call.headers['X-Backend-Etag-Is-At']) # verify etags have been supplemented with masked values - self.assertIn(match_header_name, actual_headers) - actual_etags = set(actual_headers[match_header_name].split(', ')) + self.assertIn(match_header_name, call.headers) + actual_etags = set(call.headers[match_header_name].split(', ')) # masked values for secret_id None key = fetch_crypto_keys()['object'] masked_etags = [ @@ -769,14 +768,14 @@ class TestEncrypter(unittest.TestCase): resp = req.get_response(self.encrypter) self.assertEqual('200 OK', resp.status) - self.assertEqual(1, len(self.app.calls), self.app.calls) - self.assertEqual(method, self.app.calls[0][0]) - actual_headers = self.app.headers[0] - self.assertIn('X-Backend-Etag-Is-At', actual_headers) + self.assertEqual(1, len(self.app.call_list), self.app.calls) + call = self.app.call_list[0] + self.assertEqual((method, '/v1/a/c/o'), (call.method, call.path)) + self.assertIn('X-Backend-Etag-Is-At', call.headers) self.assertEqual( 'X-Object-Sysmeta-Other-Etag,X-Object-Sysmeta-Crypto-Etag-Mac', - actual_headers['X-Backend-Etag-Is-At']) - actual_etags = set(actual_headers[match_header_name].split(', ')) + call.headers['X-Backend-Etag-Is-At']) + actual_etags = set(call.headers[match_header_name].split(', ')) self.assertIn('"an etag"', actual_etags) def test_GET_if_match_with_existing_etag_is_at_header(self): @@ -805,19 +804,19 @@ class TestEncrypter(unittest.TestCase): self.assertEqual('200 OK', resp.status) self.assertEqual(1, len(self.app.calls), self.app.calls) - self.assertEqual(method, self.app.calls[0][0]) - actual_headers = self.app.headers[0] - self.assertIn('X-Backend-Etag-Is-At', actual_headers) + call = self.app.call_list[0] + self.assertEqual((method, '/v1/a/c/o'), (call.method, call.path)) + self.assertIn('X-Backend-Etag-Is-At', call.headers) self.assertEqual('X-Object-Sysmeta-Crypto-Etag-Mac', - actual_headers['X-Backend-Etag-Is-At']) + call.headers['X-Backend-Etag-Is-At']) self.assertIn('"%s"' % bytes_to_wsgi(base64.b64encode( hmac.new(key, b'an etag', hashlib.sha256).digest())), - actual_headers['If-Match']) - self.assertIn('"another etag"', actual_headers['If-None-Match']) + call.headers['If-Match']) + self.assertIn('"another etag"', call.headers['If-None-Match']) self.assertIn('"%s"' % bytes_to_wsgi(base64.b64encode( hmac.new(key, b'another etag', hashlib.sha256).digest())), - actual_headers['If-None-Match']) + call.headers['If-None-Match']) def test_GET_etag_is_at_not_duplicated(self): self._test_etag_is_at_not_duplicated('GET') @@ -952,9 +951,10 @@ class TestEncrypter(unittest.TestCase): self.assertEqual('201 Created', resp.status) # verify that other middleware's footers made it to app - actual_footers = self.app.call_list[0].footers + self.assertEqual(1, len(self.app.call_list)) + call = self.app.call_list[0] for k, v in other_footers.items(): - self.assertEqual(v, actual_footers[k]) + self.assertEqual(v, call.footers[k]) # verify object is NOT encrypted by getting direct from the app get_req = Request.blank('/v1/a/c/o', environ={'REQUEST_METHOD': 'GET'})

AltStyle によって変換されたページ (->オリジナル) /