s3api: Implement object versioning API

Translate AWS S3 Object Versioning API requests to native Swift Object
Versioning API, speficially:
 * bucket versioning status
 * bucket versioned objects listing params
 * object GETorHEAD & DELETE versionId
 * multi_delete versionId
Change-Id: I8296681b61996e073b3ba12ad46f99042dc15c37
Co-Authored-By: Tim Burke <tim.burke@gmail.com>
Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>
This commit is contained in:
karen chan
2017年01月27日 07:43:37 -08:00
committed by Tim Burke
parent 2759d5d51c
commit 6097660f0c

View File

@@ -16,8 +16,11 @@
import logging
import os
import unittest
import uuid
import time
import boto3
from botocore.exceptions import ClientError
from six.moves import urllib
from swift.common.utils import config_true_value
@@ -80,11 +83,14 @@ def get_s3_client(user=1, signature_version='s3v4', addressing_style='path'):
path -- produces URLs like ``http(s)://host.domain/bucket/key``
virtual -- produces URLs like ``http(s)://bucket.host.domain/key``
'''
endpoint = get_opt_or_error('endpoint')
scheme = urllib.parse.urlsplit(endpoint).scheme
if scheme not in ('http', 'https'):
raise ConfigError('unexpected scheme in endpoint: %r; '
'expected http or https' % scheme)
endpoint = get_opt('endpoint', None)
if endpoint:
scheme = urllib.parse.urlsplit(endpoint).scheme
if scheme not in ('http', 'https'):
raise ConfigError('unexpected scheme in endpoint: %r; '
'expected http or https' % scheme)
else:
scheme = None
region = get_opt('region', 'us-east-1')
access_key = get_opt_or_error('access_key%d' % user)
secret_key = get_opt_or_error('secret_key%d' % user)
@@ -112,6 +118,9 @@ def get_s3_client(user=1, signature_version='s3v4', addressing_style='path'):
)
TEST_PREFIX = 's3api-test-'
class BaseS3TestCase(unittest.TestCase):
# Default to v4 signatures (as aws-cli does), but subclasses can override
signature_version = 's3v4'
@@ -121,15 +130,77 @@ class BaseS3TestCase(unittest.TestCase):
return get_s3_client(user, cls.signature_version)
@classmethod
def clear_bucket(cls, client, bucket):
for key in client.list_objects(Bucket=bucket).get('Contents', []):
client.delete_key(Bucket=bucket, Key=key['Name'])
def _remove_all_object_versions_from_bucket(cls, client, bucket_name):
resp = client.list_object_versions(Bucket=bucket_name)
objs_to_delete = (resp.get('Versions', []) +
resp.get('DeleteMarkers', []))
while objs_to_delete:
multi_delete_body = {
'Objects': [
{'Key': obj['Key'], 'VersionId': obj['VersionId']}
for obj in objs_to_delete
],
'Quiet': False,
}
del_resp = client.delete_objects(Bucket=bucket_name,
Delete=multi_delete_body)
if any(del_resp.get('Errors', [])):
raise Exception('Unable to delete %r' % del_resp['Errors'])
if not resp['IsTruncated']:
break
key_marker = resp['NextKeyMarker']
version_id_marker = resp['NextVersionIdMarker']
resp = client.list_object_versions(
Bucket=bucket_name, KeyMarker=key_marker,
VersionIdMarker=version_id_marker)
objs_to_delete = (resp.get('Versions', []) +
resp.get('DeleteMarkers', []))
@classmethod
def clear_bucket(cls, client, bucket_name):
timeout = time.time() + 10
backoff = 0.1
cls._remove_all_object_versions_from_bucket(client, bucket_name)
try:
client.delete_bucket(Bucket=bucket_name)
except ClientError as e:
if 'BucketNotEmpty' not in str(e):
raise
# Something's gone sideways. Try harder
client.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration={'Status': 'Suspended'})
while True:
cls._remove_all_object_versions_from_bucket(
client, bucket_name)
# also try some version-unaware operations...
for key in client.list_objects(Bucket=bucket_name).get(
'Contents', []):
client.delete_object(Bucket=bucket_name, Key=key['Key'])
# *then* try again
try:
client.delete_bucket(Bucket=bucket_name)
except ClientError as e:
if 'BucketNotEmpty' not in str(e):
raise
if time.time() > timeout:
raise Exception('Timeout clearing %r' % bucket_name)
time.sleep(backoff)
backoff *= 2
else:
break
def create_name(self, slug):
return '%s%s-%s' % (TEST_PREFIX, slug, uuid.uuid4().hex)
@classmethod
def clear_account(cls, client):
for bucket in client.list_buckets()['Buckets']:
if not bucket['Name'].startswith(TEST_PREFIX):
# these tests run against real s3 accounts
continue
cls.clear_bucket(client, bucket['Name'])
client.delete_bucket(Bucket=bucket['Name'])
def tearDown(self):
client = self.get_s3_client(1)

View File

@@ -14,7 +14,6 @@
# limitations under the License.
import unittest
import uuid
from test.s3api import BaseS3TestCase, ConfigError
@@ -43,7 +42,7 @@ class TestGetServiceSigV4(BaseS3TestCase):
def test_service_with_buckets(self):
c = self.get_s3_client(1)
buckets = [str(uuid.uuid4()) for _ in range(5)]
buckets = [self.create_name('bucket%s' % i) for i in range(5)]
for bucket in buckets:
c.create_bucket(Bucket=bucket)
@@ -65,7 +64,7 @@ class TestGetServiceSigV4(BaseS3TestCase):
c2 = self.get_s3_client(2)
except ConfigError as err:
raise unittest.SkipTest(str(err))
buckets2 = [str(uuid.uuid4()) for _ in range(2)]
buckets2 = [self.create_name('bucket%s' % i) for i in range(2)]
for bucket in buckets2:
c2.create_bucket(Bucket=bucket)
self.assertEqual(sorted(buckets2), [

View File

@@ -0,0 +1,758 @@
# Copyright (c) 2019 SwiftStack, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import hashlib
from collections import defaultdict
from botocore.exceptions import ClientError
import six
from swift.common.header_key_dict import HeaderKeyDict
from test.s3api import BaseS3TestCase
def retry(f, timeout=10):
timelimit = time.time() + timeout
while True:
try:
f()
except (ClientError, AssertionError):
if time.time() > timelimit:
raise
continue
else:
break
class TestObjectVersioning(BaseS3TestCase):
maxDiff = None
def setUp(self):
self.client = self.get_s3_client(1)
self.bucket_name = self.create_name('versioning')
resp = self.client.create_bucket(Bucket=self.bucket_name)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
def enable_versioning():
resp = self.client.put_bucket_versioning(
Bucket=self.bucket_name,
VersioningConfiguration={'Status': 'Enabled'})
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
retry(enable_versioning)
def tearDown(self):
resp = self.client.put_bucket_versioning(
Bucket=self.bucket_name,
VersioningConfiguration={'Status': 'Suspended'})
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.clear_bucket(self.client, self.bucket_name)
super(TestObjectVersioning, self).tearDown()
def test_setup(self):
bucket_name = self.create_name('new-bucket')
resp = self.client.create_bucket(Bucket=bucket_name)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
expected_location = '/%s' % bucket_name
self.assertEqual(expected_location, resp['Location'])
headers = HeaderKeyDict(resp['ResponseMetadata']['HTTPHeaders'])
self.assertEqual('0', headers['content-length'])
self.assertEqual(expected_location, headers['location'])
# get versioning
resp = self.client.get_bucket_versioning(Bucket=bucket_name)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.assertNotIn('Status', resp)
# put versioning
versioning_config = {
'Status': 'Enabled',
}
resp = self.client.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration=versioning_config)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
# ... now it's enabled
def check_status():
resp = self.client.get_bucket_versioning(Bucket=bucket_name)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
try:
self.assertEqual('Enabled', resp['Status'])
except KeyError:
self.fail('Status was not in %r' % resp)
retry(check_status)
# send over some bogus junk
versioning_config['Status'] = 'Disabled'
with self.assertRaises(ClientError) as ctx:
self.client.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration=versioning_config)
expected_err = 'An error occurred (MalformedXML) when calling the ' \
'PutBucketVersioning operation: The XML you provided was ' \
'not well-formed or did not validate against our published schema'
self.assertEqual(expected_err, str(ctx.exception))
# disable it
versioning_config['Status'] = 'Suspended'
resp = self.client.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration=versioning_config)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
# ... now it's disabled again
def check_status():
resp = self.client.get_bucket_versioning(Bucket=bucket_name)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.assertEqual('Suspended', resp['Status'])
retry(check_status)
def test_upload_fileobj_versioned(self):
obj_data = self.create_name('some-data').encode('ascii')
obj_etag = hashlib.md5(obj_data).hexdigest()
obj_name = self.create_name('versioned-obj')
self.client.upload_fileobj(six.BytesIO(obj_data),
self.bucket_name, obj_name)
# object is in the listing
resp = self.client.list_objects_v2(Bucket=self.bucket_name)
objs = resp.get('Contents', [])
for obj in objs:
obj.pop('LastModified')
self.assertEqual([{
'ETag': '"%s"' % obj_etag,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}], objs)
# object version listing
resp = self.client.list_object_versions(Bucket=self.bucket_name)
objs = resp.get('Versions', [])
for obj in objs:
obj.pop('LastModified')
obj.pop('Owner')
obj.pop('VersionId')
self.assertEqual([{
'ETag': '"%s"' % obj_etag,
'IsLatest': True,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}], objs)
# overwrite the object
new_obj_data = self.create_name('some-new-data').encode('ascii')
new_obj_etag = hashlib.md5(new_obj_data).hexdigest()
self.client.upload_fileobj(six.BytesIO(new_obj_data),
self.bucket_name, obj_name)
# new object is in the listing
resp = self.client.list_objects_v2(Bucket=self.bucket_name)
objs = resp.get('Contents', [])
for obj in objs:
obj.pop('LastModified')
self.assertEqual([{
'ETag': '"%s"' % new_obj_etag,
'Key': obj_name,
'Size': len(new_obj_data),
'StorageClass': 'STANDARD',
}], objs)
# both object versions in the versions listing
resp = self.client.list_object_versions(Bucket=self.bucket_name)
objs = resp.get('Versions', [])
for obj in objs:
obj.pop('LastModified')
obj.pop('Owner')
obj.pop('VersionId')
self.assertEqual([{
'ETag': '"%s"' % new_obj_etag,
'IsLatest': True,
'Key': obj_name,
'Size': len(new_obj_data),
'StorageClass': 'STANDARD',
}, {
'ETag': '"%s"' % obj_etag,
'IsLatest': False,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}], objs)
def test_delete_versioned_objects(self):
etags = []
obj_name = self.create_name('versioned-obj')
for i in range(3):
obj_data = self.create_name('some-data-%s' % i).encode('ascii')
etags.insert(0, hashlib.md5(obj_data).hexdigest())
self.client.upload_fileobj(six.BytesIO(obj_data),
self.bucket_name, obj_name)
# only one object appears in the listing
resp = self.client.list_objects_v2(Bucket=self.bucket_name)
objs = resp.get('Contents', [])
for obj in objs:
obj.pop('LastModified')
self.assertEqual([{
'ETag': '"%s"' % etags[0],
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}], objs)
# but everything is layed out in the object versions listing
resp = self.client.list_object_versions(Bucket=self.bucket_name)
objs = resp.get('Versions', [])
versions = []
for obj in objs:
obj.pop('LastModified')
obj.pop('Owner')
versions.append(obj.pop('VersionId'))
self.assertEqual([{
'ETag': '"%s"' % etags[0],
'IsLatest': True,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}, {
'ETag': '"%s"' % etags[1],
'IsLatest': False,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}, {
'ETag': '"%s"' % etags[2],
'IsLatest': False,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}], objs)
# we can delete a specific version
resp = self.client.delete_object(Bucket=self.bucket_name,
Key=obj_name,
VersionId=versions[1])
# and that just pulls it out of the versions listing
resp = self.client.list_object_versions(Bucket=self.bucket_name)
objs = resp.get('Versions', [])
for obj in objs:
obj.pop('LastModified')
obj.pop('Owner')
obj.pop('VersionId')
self.assertEqual([{
'ETag': '"%s"' % etags[0],
'IsLatest': True,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}, {
'ETag': '"%s"' % etags[2],
'IsLatest': False,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}], objs)
# ... but the current listing is unaffected
resp = self.client.list_objects_v2(Bucket=self.bucket_name)
objs = resp.get('Contents', [])
for obj in objs:
obj.pop('LastModified')
self.assertEqual([{
'ETag': '"%s"' % etags[0],
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}], objs)
# OTOH, if you delete specifically the latest version
# we can delete a specific version
resp = self.client.delete_object(Bucket=self.bucket_name,
Key=obj_name,
VersionId=versions[0])
# the versions listing has a new IsLatest
resp = self.client.list_object_versions(Bucket=self.bucket_name)
objs = resp.get('Versions', [])
for obj in objs:
obj.pop('LastModified')
obj.pop('Owner')
obj.pop('VersionId')
self.assertEqual([{
'ETag': '"%s"' % etags[2],
'IsLatest': True,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}], objs)
# and the stack pops
resp = self.client.list_objects_v2(Bucket=self.bucket_name)
objs = resp.get('Contents', [])
for obj in objs:
obj.pop('LastModified')
self.assertEqual([{
'ETag': '"%s"' % etags[2],
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}], objs)
def test_delete_versioned_deletes(self):
etags = []
obj_name = self.create_name('versioned-obj')
for i in range(3):
obj_data = self.create_name('some-data-%s' % i).encode('ascii')
etags.insert(0, hashlib.md5(obj_data).hexdigest())
self.client.upload_fileobj(six.BytesIO(obj_data),
self.bucket_name, obj_name)
# and make a delete marker
self.client.delete_object(Bucket=self.bucket_name, Key=obj_name)
# current listing is empty
resp = self.client.list_objects_v2(Bucket=self.bucket_name)
objs = resp.get('Contents', [])
self.assertEqual([], objs)
# but everything is in layed out in the versions listing
resp = self.client.list_object_versions(Bucket=self.bucket_name)
objs = resp.get('Versions', [])
versions = []
for obj in objs:
obj.pop('LastModified')
obj.pop('Owner')
versions.append(obj.pop('VersionId'))
self.assertEqual([{
'ETag': '"%s"' % etag,
'IsLatest': False,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
} for etag in etags], objs)
# ... plus the delete markers
delete_markers = resp.get('DeleteMarkers', [])
marker_versions = []
for marker in delete_markers:
marker.pop('LastModified')
marker.pop('Owner')
marker_versions.append(marker.pop('VersionId'))
self.assertEqual([{
'Key': obj_name,
'IsLatest': is_latest,
} for is_latest in (True, False, False)], delete_markers)
# delete an old delete markers
resp = self.client.delete_object(Bucket=self.bucket_name,
Key=obj_name,
VersionId=marker_versions[2])
# since IsLatest is still marker we'll raise NoSuchKey
with self.assertRaises(ClientError) as caught:
resp = self.client.get_object(Bucket=self.bucket_name,
Key=obj_name)
expected_err = 'An error occurred (NoSuchKey) when calling the ' \
'GetObject operation: The specified key does not exist.'
self.assertEqual(expected_err, str(caught.exception))
# now delete the delete marker (IsLatest)
resp = self.client.delete_object(Bucket=self.bucket_name,
Key=obj_name,
VersionId=marker_versions[0])
# most recent version is now latest
resp = self.client.get_object(Bucket=self.bucket_name,
Key=obj_name)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.assertEqual('"%s"' % etags[0], resp['ETag'])
# now delete the IsLatest object version
resp = self.client.delete_object(Bucket=self.bucket_name,
Key=obj_name,
VersionId=versions[0])
# and object is deleted again
with self.assertRaises(ClientError) as caught:
resp = self.client.get_object(Bucket=self.bucket_name,
Key=obj_name)
expected_err = 'An error occurred (NoSuchKey) when calling the ' \
'GetObject operation: The specified key does not exist.'
self.assertEqual(expected_err, str(caught.exception))
# delete marker IsLatest
resp = self.client.list_object_versions(Bucket=self.bucket_name)
delete_markers = resp.get('DeleteMarkers', [])
for marker in delete_markers:
marker.pop('LastModified')
marker.pop('Owner')
self.assertEqual([{
'Key': obj_name,
'IsLatest': True,
'VersionId': marker_versions[1],
}], delete_markers)
def test_multipart_upload(self):
obj_name = self.create_name('versioned-obj')
obj_data = b'data'
mu = self.client.create_multipart_upload(
Bucket=self.bucket_name,
Key=obj_name)
part_md5 = self.client.upload_part(
Bucket=self.bucket_name,
Key=obj_name,
UploadId=mu['UploadId'],
PartNumber=1,
Body=obj_data)['ETag']
complete_response = self.client.complete_multipart_upload(
Bucket=self.bucket_name,
Key=obj_name,
UploadId=mu['UploadId'],
MultipartUpload={'Parts': [
{'PartNumber': 1, 'ETag': part_md5},
]})
obj_etag = complete_response['ETag']
delete_response = self.client.delete_object(
Bucket=self.bucket_name,
Key=obj_name)
marker_version_id = delete_response['VersionId']
resp = self.client.list_object_versions(Bucket=self.bucket_name)
objs = resp.get('Versions', [])
versions = []
for obj in objs:
obj.pop('LastModified')
obj.pop('Owner')
versions.append(obj.pop('VersionId'))
self.assertEqual([{
'ETag': obj_etag,
'IsLatest': False,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}], objs)
markers = resp.get('DeleteMarkers', [])
for marker in markers:
marker.pop('LastModified')
marker.pop('Owner')
self.assertEqual([{
'IsLatest': True,
'Key': obj_name,
'VersionId': marker_version_id,
}], markers)
# Can still get the old version
resp = self.client.get_object(
Bucket=self.bucket_name,
Key=obj_name,
VersionId=versions[0])
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.assertEqual(obj_etag, resp['ETag'])
delete_response = self.client.delete_object(
Bucket=self.bucket_name,
Key=obj_name,
VersionId=versions[0])
resp = self.client.list_object_versions(Bucket=self.bucket_name)
self.assertEqual([], resp.get('Versions', []))
markers = resp.get('DeleteMarkers', [])
for marker in markers:
marker.pop('LastModified')
marker.pop('Owner')
self.assertEqual([{
'IsLatest': True,
'Key': obj_name,
'VersionId': marker_version_id,
}], markers)
def test_get_versioned_object(self):
etags = []
obj_name = self.create_name('versioned-obj')
for i in range(3):
obj_data = self.create_name('some-data-%s' % i).encode('ascii')
# TODO: pull etag from response instead
etags.insert(0, hashlib.md5(obj_data).hexdigest())
self.client.upload_fileobj(
six.BytesIO(obj_data), self.bucket_name, obj_name)
resp = self.client.list_object_versions(Bucket=self.bucket_name)
objs = resp.get('Versions', [])
versions = []
for obj in objs:
obj.pop('LastModified')
obj.pop('Owner')
versions.append(obj.pop('VersionId'))
self.assertEqual([{
'ETag': '"%s"' % etags[0],
'IsLatest': True,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}, {
'ETag': '"%s"' % etags[1],
'IsLatest': False,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}, {
'ETag': '"%s"' % etags[2],
'IsLatest': False,
'Key': obj_name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
}], objs)
# un-versioned get_object returns IsLatest
resp = self.client.get_object(Bucket=self.bucket_name, Key=obj_name)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.assertEqual('"%s"' % etags[0], resp['ETag'])
# but you can get any object by version
for i, version in enumerate(versions):
resp = self.client.get_object(
Bucket=self.bucket_name, Key=obj_name, VersionId=version)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.assertEqual('"%s"' % etags[i], resp['ETag'])
# and head_object works about the same
resp = self.client.head_object(Bucket=self.bucket_name, Key=obj_name)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.assertEqual('"%s"' % etags[0], resp['ETag'])
self.assertEqual(versions[0], resp['VersionId'])
for version, etag in zip(versions, etags):
resp = self.client.head_object(
Bucket=self.bucket_name, Key=obj_name, VersionId=version)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.assertEqual(version, resp['VersionId'])
self.assertEqual('"%s"' % etag, resp['ETag'])
def test_get_versioned_object_invalid_params(self):
with self.assertRaises(ClientError) as ctx:
self.client.list_object_versions(Bucket=self.bucket_name,
KeyMarker='',
VersionIdMarker='bogus')
expected_err = 'An error occurred (InvalidArgument) when calling ' \
'the ListObjectVersions operation: Invalid version id specified'
self.assertEqual(expected_err, str(ctx.exception))
with self.assertRaises(ClientError) as ctx:
self.client.list_object_versions(
Bucket=self.bucket_name,
VersionIdMarker='a' * 32)
expected_err = 'An error occurred (InvalidArgument) when calling ' \
'the ListObjectVersions operation: A version-id marker cannot ' \
'be specified without a key marker.'
self.assertEqual(expected_err, str(ctx.exception))
def test_get_versioned_object_key_marker(self):
obj00_name = self.create_name('00-versioned-obj')
obj01_name = self.create_name('01-versioned-obj')
names = [obj00_name] * 3 + [obj01_name] * 3
latest = [True, False, False, True, False, False]
etags = []
for i in range(3):
obj_data = self.create_name('some-data-%s' % i).encode('ascii')
etags.insert(0, '"%s"' % hashlib.md5(obj_data).hexdigest())
self.client.upload_fileobj(
six.BytesIO(obj_data), self.bucket_name, obj01_name)
for i in range(3):
obj_data = self.create_name('some-data-%s' % i).encode('ascii')
etags.insert(0, '"%s"' % hashlib.md5(obj_data).hexdigest())
self.client.upload_fileobj(
six.BytesIO(obj_data), self.bucket_name, obj00_name)
resp = self.client.list_object_versions(Bucket=self.bucket_name)
versions = []
objs = []
for o in resp.get('Versions', []):
versions.append(o['VersionId'])
objs.append({
'Key': o['Key'],
'VersionId': o['VersionId'],
'IsLatest': o['IsLatest'],
'ETag': o['ETag'],
})
expected = [{
'Key': name,
'VersionId': version,
'IsLatest': is_latest,
'ETag': etag,
} for name, etag, version, is_latest in zip(
names, etags, versions, latest)]
self.assertEqual(expected, objs)
# on s3 this makes expected[0]['IsLatest'] magicaly change to False?
# resp = self.client.list_object_versions(Bucket=self.bucket_name,
# KeyMarker='',
# VersionIdMarker=versions[0])
# objs = [{
# 'Key': o['Key'],
# 'VersionId': o['VersionId'],
# 'IsLatest': o['IsLatest'],
# 'ETag': o['ETag'],
# } for o in resp.get('Versions', [])]
# self.assertEqual(expected, objs)
# KeyMarker skips past that key
resp = self.client.list_object_versions(Bucket=self.bucket_name,
KeyMarker=obj00_name)
objs = [{
'Key': o['Key'],
'VersionId': o['VersionId'],
'IsLatest': o['IsLatest'],
'ETag': o['ETag'],
} for o in resp.get('Versions', [])]
self.assertEqual(expected[3:], objs)
# KeyMarker with VersionIdMarker skips past that version
resp = self.client.list_object_versions(Bucket=self.bucket_name,
KeyMarker=obj00_name,
VersionIdMarker=versions[0])
objs = [{
'Key': o['Key'],
'VersionId': o['VersionId'],
'IsLatest': o['IsLatest'],
'ETag': o['ETag'],
} for o in resp.get('Versions', [])]
self.assertEqual(expected[1:], objs)
# KeyMarker with bogus version skips past that key
resp = self.client.list_object_versions(
Bucket=self.bucket_name,
KeyMarker=obj00_name,
VersionIdMarker=versions[4])
objs = [{
'Key': o['Key'],
'VersionId': o['VersionId'],
'IsLatest': o['IsLatest'],
'ETag': o['ETag'],
} for o in resp.get('Versions', [])]
self.assertEqual(expected[3:], objs)
def test_list_objects(self):
etags = defaultdict(list)
for i in range(3):
obj_name = self.create_name('versioned-obj')
for i in range(3):
obj_data = self.create_name('some-data-%s' % i).encode('ascii')
etags[obj_name].insert(0, hashlib.md5(obj_data).hexdigest())
self.client.upload_fileobj(
six.BytesIO(obj_data), self.bucket_name, obj_name)
# both unversioned list_objects responses are similar
expected = []
for name, obj_etags in sorted(etags.items()):
expected.append({
'ETag': '"%s"' % obj_etags[0],
'Key': name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
})
resp = self.client.list_objects(Bucket=self.bucket_name)
objs = resp.get('Contents', [])
for obj in objs:
obj.pop('LastModified')
# one difference seems to be the Owner key
self.assertEqual({'DisplayName', 'ID'},
set(obj.pop('Owner').keys()))
self.assertEqual(expected, objs)
resp = self.client.list_objects_v2(Bucket=self.bucket_name)
objs = resp.get('Contents', [])
for obj in objs:
obj.pop('LastModified')
self.assertEqual(expected, objs)
# versioned listings has something for everyone
expected = []
for name, obj_etags in sorted(etags.items()):
is_latest = True
for etag in obj_etags:
expected.append({
'ETag': '"%s"' % etag,
'IsLatest': is_latest,
'Key': name,
'Size': len(obj_data),
'StorageClass': 'STANDARD',
})
is_latest = False
resp = self.client.list_object_versions(Bucket=self.bucket_name)
objs = resp.get('Versions', [])
versions = []
for obj in objs:
obj.pop('LastModified')
obj.pop('Owner')
versions.append(obj.pop('VersionId'))
self.assertEqual(expected, objs)
def test_copy_object(self):
etags = []
obj_name = self.create_name('versioned-obj')
for i in range(3):
obj_data = self.create_name('some-data-%s' % i).encode('ascii')
etags.insert(0, hashlib.md5(obj_data).hexdigest())
self.client.upload_fileobj(
six.BytesIO(obj_data), self.bucket_name, obj_name)
resp = self.client.list_object_versions(Bucket=self.bucket_name)
objs = resp.get('Versions', [])
versions = []
for obj in objs:
versions.append(obj.pop('VersionId'))
# CopySource can just be Bucket/Key string
first_target = self.create_name('target-obj1')
copy_resp = self.client.copy_object(
Bucket=self.bucket_name, Key=first_target,
CopySource='%s/%s' % (self.bucket_name, obj_name))
self.assertEqual(versions[0], copy_resp['CopySourceVersionId'])
# and you'll just get the most recent version
resp = self.client.head_object(Bucket=self.bucket_name,
Key=first_target)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.assertEqual('"%s"' % etags[0], resp['ETag'])
# or you can be more explicit
explicit_target = self.create_name('target-%s' % versions[0])
copy_source = {'Bucket': self.bucket_name, 'Key': obj_name,
'VersionId': versions[0]}
copy_resp = self.client.copy_object(
Bucket=self.bucket_name, Key=explicit_target,
CopySource=copy_source)
self.assertEqual(versions[0], copy_resp['CopySourceVersionId'])
# and you still get the same thing
resp = self.client.head_object(Bucket=self.bucket_name,
Key=explicit_target)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.assertEqual('"%s"' % etags[0], resp['ETag'])
# but you can also copy from a specific version
version_target = self.create_name('target-%s' % versions[2])
copy_source['VersionId'] = versions[2]
copy_resp = self.client.copy_object(
Bucket=self.bucket_name, Key=version_target,
CopySource=copy_source)
self.assertEqual(versions[2], copy_resp['CopySourceVersionId'])
resp = self.client.head_object(Bucket=self.bucket_name,
Key=version_target)
self.assertEqual(200, resp['ResponseMetadata']['HTTPStatusCode'])
self.assertEqual('"%s"' % etags[2], resp['ETag'])
Reference in New Issue
openstack/swift
Block a user
Blocking a user prevents them from interacting with repositories, such as opening or commenting on pull requests or issues. Learn more about blocking a user.

The note is not visible to the blocked user.