Use allow_access helper method

This helper method creates an access rule, waiting for it
to be active and delete it at the end of the test.
By using this methos we can reduce lines of code.
Change-Id: If1712cd412d236901232f9a2537f88d775f7d9f1
This commit is contained in:
lkuchlan
2022年05月08日 09:23:23 +03:00
parent c0e182601b
commit ad511b6ac0

View File

@@ -95,14 +95,11 @@ class MigrationShareServerBase(base.BaseSharesAdminTest):
# protocols.
access_rules = self._get_access_rule_data_for_protocols()
for rule in access_rules:
self.shares_v2_client.create_access_rule(
self.allow_access(
share['id'], access_type=rule.get('access_type'),
access_to=rule.get('access_to'),
access_level=rule.get('access_level')
)
waiters.wait_for_resource_status(
self.shares_v2_client, share['id'], constants.RULE_STATE_ACTIVE,
status_attr='access_rules_status')
share = self.shares_v2_client.get_share(share['id'])['share']
@@ -124,8 +121,8 @@ class MigrationShareServerBase(base.BaseSharesAdminTest):
self.assertIn(snapshot['status'], statuses)
def _validate_share_server_migration_complete(
self, share, dest_host, dest_server_id, snapshot_id=None,
share_network_id=None, version=CONF.share.max_api_microversion):
self, share, dest_host, dest_server_id, snapshot_id=None,
share_network_id=None, version=CONF.share.max_api_microversion):
"""Validates the share server migration complete. """
# Check the export locations

View File

@@ -58,6 +58,8 @@ TAGS_PATTERN = re.compile(
r"(?=.*\[.*\b(%(p)s|%(n)s)\b.*\])(?=.*\[.*\b(%(a)s|%(b)s|%(ab)s)\b.*\])" %
TAGS_MAPPER)
LATEST_MICROVERSION = CONF.share.max_api_microversion
def verify_test_has_appropriate_tags(self):
if not TAGS_PATTERN.match(self.id()):
@@ -1046,7 +1048,8 @@ class BaseSharesTest(test.BaseTestCase):
return waiters.wait_for_message(self.shares_v2_client, share['id'])
def allow_access(self, share_id, client=None, access_type=None,
access_level='rw', access_to=None, status='active',
access_level='rw', access_to=None, metadata=None,
version=LATEST_MICROVERSION, status='active',
raise_rule_in_error_state=True, cleanup=True):
client = client or self.shares_v2_client
@@ -1054,15 +1057,23 @@ class BaseSharesTest(test.BaseTestCase):
access_type = access_type or a_type
access_to = access_to or a_to
rule = client.create_access_rule(share_id, access_type, access_to,
access_level)['access']
kwargs = {
'access_type': access_type,
'access_to': access_to,
'access_level': access_level
}
if client is self.shares_v2_client:
kwargs.update({'metadata': metadata, 'version': version})
rule = client.create_access_rule(share_id, **kwargs)['access']
waiters.wait_for_resource_status(
client, share_id, status, resource_name='access_rule',
rule_id=rule['id'],
rule_id=rule['id'], version=version,
raise_rule_in_error_state=raise_rule_in_error_state)
if cleanup:
self.addCleanup(client.wait_for_resource_deletion,
rule_id=rule['id'], share_id=share_id)
self.addCleanup(
client.wait_for_resource_deletion, rule_id=rule['id'],
share_id=share_id, version=version)
self.addCleanup(client.delete_access_rule, share_id, rule['id'])
return rule

View File

@@ -87,13 +87,10 @@ class AccessRulesMetadataTest(base.BaseSharesMixedTest):
def test_set_get_delete_access_metadata(self):
data = {"key1": "v" * 255, "k" * 255: "value2"}
# set metadata
access = self.shares_v2_client.create_access_rule(
self.share["id"], self.access_type,
self.access_to[self.access_type].pop(), 'rw',
metadata=data)['access']
waiters.wait_for_resource_status(
self.shares_v2_client, self.share["id"], "active",
resource_name='access_rule', rule_id=access["id"])
access = self.allow_access(
self.share["id"], access_type=self.access_type,
access_to=self.access_to[self.access_type].pop(),
access_level='rw', metadata=data)
# read metadata
get_access = self.shares_v2_client.get_access_rule(
@@ -110,10 +107,6 @@ class AccessRulesMetadataTest(base.BaseSharesMixedTest):
access_without_md = self.shares_v2_client.get_access_rule(
access["id"])['access']
self.assertEqual({}, access_without_md['metadata'])
self.shares_v2_client.delete_access_rule(self.share["id"],
access["id"])
self.shares_v2_client.wait_for_resource_deletion(
rule_id=access["id"], share_id=self.share["id"])
@decorators.idempotent_id('8c294d7d-0702-49ce-b964-0945ec323370')
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
@@ -136,13 +129,10 @@ class AccessRulesMetadataTest(base.BaseSharesMixedTest):
def test_list_access_filter_by_metadata(self):
data = {"key3": "v3", "key4": "value4"}
# set metadata
access = self.shares_v2_client.create_access_rule(
self.share["id"], self.access_type,
self.access_to[self.access_type].pop(), 'rw',
metadata=data)['access']
waiters.wait_for_resource_status(
self.shares_v2_client, self.share["id"], "active",
resource_name='access_rule', rule_id=access["id"])
access = self.allow_access(
self.share["id"], access_type=self.access_type,
access_to=self.access_to[self.access_type].pop(),
access_level='rw', metadata=data)
# list metadata with metadata filter
list_access = self.shares_v2_client.list_access_rules(

View File

@@ -331,12 +331,9 @@ class ReplicationTest(base.BaseSharesMixedTest):
def test_add_access_rule_create_replica_delete_rule(self):
# Add access rule to the share
access_type, access_to = self._get_access_rule_data_from_config()
rule = self.shares_v2_client.create_access_rule(
self.shares[0]["id"], access_type, access_to, 'ro')['access']
waiters.wait_for_resource_status(
self.shares_v2_client, self.shares[0]["id"],
constants.RULE_STATE_ACTIVE, resource_name='access_rule',
rule_id=rule["id"])
self.allow_access(
self.shares[0]["id"], access_type=access_type, access_to=access_to,
access_level='ro')
# Create the replica
self._verify_create_replica()
@@ -346,12 +343,6 @@ class ReplicationTest(base.BaseSharesMixedTest):
self.shares_v2_client, self.shares[0]["id"],
constants.RULE_STATE_ACTIVE, status_attr='access_rules_status')
# Delete rule and wait for deletion
self.shares_v2_client.delete_access_rule(self.shares[0]["id"],
rule["id"])
self.shares_v2_client.wait_for_resource_deletion(
rule_id=rule["id"], share_id=self.shares[0]['id'])
@decorators.idempotent_id('3af3f19a-1195-464e-870b-1a3918914f1b')
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
def test_create_replica_add_access_rule_delete_replica(self):
@@ -360,12 +351,9 @@ class ReplicationTest(base.BaseSharesMixedTest):
share_replica = self._verify_create_replica()
# Add access rule
self.shares_v2_client.create_access_rule(
self.shares[0]["id"], access_type, access_to, 'ro')
waiters.wait_for_resource_status(
self.shares_v2_client, self.shares[0]["id"],
constants.RULE_STATE_ACTIVE, status_attr='access_rules_status')
self.allow_access(
self.shares[0]["id"], access_type=access_type, access_to=access_to,
access_level='ro')
# Delete the replica
self.delete_share_replica(share_replica["id"])
@@ -421,11 +409,9 @@ class ReplicationTest(base.BaseSharesMixedTest):
share = self.create_shares([self.creation_data])[0]
# Add access rule
access_type, access_to = self._get_access_rule_data_from_config()
rule = self.shares_v2_client.create_access_rule(
share["id"], access_type, access_to, 'ro')['access']
waiters.wait_for_resource_status(
self.shares_v2_client, share["id"], constants.RULE_STATE_ACTIVE,
resource_name='access_rule', rule_id=rule["id"])
self.allow_access(
share["id"], access_type=access_type, access_to=access_to,
access_level='ro')
original_replica = self.shares_v2_client.list_share_replicas(
share["id"])['share_replicas'][0]

View File

@@ -22,7 +22,6 @@ from tempest.lib import exceptions as lib_exc
import testtools
from testtools import testcase as tc
from manila_tempest_tests.common import waiters
from manila_tempest_tests.tests.api import base
from manila_tempest_tests import utils
@@ -37,13 +36,14 @@ def _create_delete_ro_access_rule(self, version):
:param self: instance of test class
"""
if utils.is_microversion_eq(version, '1.0'):
rule = self.shares_client.create_access_rule(
self.share["id"], self.access_type, self.access_to, 'ro')['access']
if utils.is_microversion_le(version, '2.9'):
client = self.shares_client
else:
rule = self.shares_v2_client.create_access_rule(
self.share["id"], self.access_type, self.access_to, 'ro',
version=version)['access']
client = self.shares_v2_client
rule = self.allow_access(
self.share["id"], client=client, access_type=self.access_type,
access_to=self.access_to, access_level='ro', version=version)
self.assertEqual('ro', rule['access_level'])
for key in ('deleted', 'deleted_at', 'instance_mappings'):
@@ -55,14 +55,6 @@ def _create_delete_ro_access_rule(self, version):
else:
self.assertEqual("queued_to_apply", rule['state'])
if utils.is_microversion_le(version, '2.9'):
waiters.wait_for_resource_status(
self.shares_client, self.share["id"], "active",
resource_name='access_rule', rule_id=rule["id"])
else:
waiters.wait_for_resource_status(
self.shares_v2_client, self.share["id"], "active",
status_attr='access_rules_status', version=version)
# If the 'access_rules_status' transitions to 'active',
# rule state must too
rules = self.shares_v2_client.list_access_rules(
@@ -70,16 +62,6 @@ def _create_delete_ro_access_rule(self, version):
rule = [r for r in rules if r['id'] == rule['id']][0]
self.assertEqual("active", rule['state'])
if utils.is_microversion_eq(version, '1.0'):
self.shares_client.delete_access_rule(self.share["id"], rule["id"])
self.shares_client.wait_for_resource_deletion(
rule_id=rule["id"], share_id=self.share['id'])
else:
self.shares_v2_client.delete_access_rule(
self.share["id"], rule["id"], version=version)
self.shares_v2_client.wait_for_resource_deletion(
rule_id=rule["id"], share_id=self.share['id'], version=version)
@ddt.ddt
class ShareIpRulesForNFSTest(base.BaseSharesMixedTest):
@@ -123,14 +105,16 @@ class ShareIpRulesForNFSTest(base.BaseSharesMixedTest):
access_to = utils.rand_ip()
else:
access_to = utils.rand_ipv6_ip()
# create rule
if utils.is_microversion_eq(version, '1.0'):
rule = self.shares_client.create_access_rule(
self.share["id"], self.access_type, access_to)['access']
if utils.is_microversion_le(version, '2.9'):
client = self.shares_client
else:
rule = self.shares_v2_client.create_access_rule(
self.share["id"], self.access_type, access_to,
version=version)['access']
client = self.shares_v2_client
# create rule
rule = self.allow_access(
self.share["id"], client=client, access_type=self.access_type,
access_to=access_to, version=version)
self.assertEqual('rw', rule['access_level'])
for key in ('deleted', 'deleted_at', 'instance_mappings'):
@@ -142,30 +126,6 @@ class ShareIpRulesForNFSTest(base.BaseSharesMixedTest):
else:
self.assertEqual("queued_to_apply", rule['state'])
if utils.is_microversion_eq(version, '1.0'):
waiters.wait_for_resource_status(
self.shares_client, self.share["id"], "active",
resource_name='access_rule', rule_id=rule["id"])
elif utils.is_microversion_eq(version, '2.9'):
waiters.wait_for_resource_status(
self.shares_v2_client, self.share["id"], "active",
resource_name='access_rule', rule_id=rule["id"])
else:
waiters.wait_for_resource_status(
self.shares_v2_client, self.share["id"], "active",
status_attr='access_rules_status', version=version)
# delete rule and wait for deletion
if utils.is_microversion_eq(version, '1.0'):
self.shares_client.delete_access_rule(self.share["id"], rule["id"])
self.shares_client.wait_for_resource_deletion(
rule_id=rule["id"], share_id=self.share['id'])
else:
self.shares_v2_client.delete_access_rule(
self.share["id"], rule["id"], version=version)
self.shares_v2_client.wait_for_resource_deletion(
rule_id=rule["id"], share_id=self.share['id'], version=version)
@decorators.idempotent_id('5d25168a-d646-443e-8cf1-3151eb7887f5')
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
@ddt.data(*itertools.chain(
@@ -181,49 +141,19 @@ class ShareIpRulesForNFSTest(base.BaseSharesMixedTest):
access_to = utils.rand_ip(network=True)
else:
access_to = utils.rand_ipv6_ip(network=True)
# create rule
if utils.is_microversion_eq(version, '1.0'):
rule = self.shares_client.create_access_rule(
self.share["id"], self.access_type, access_to)['access']
if utils.is_microversion_le(version, '2.9'):
client = self.shares_client
else:
rule = self.shares_v2_client.create_access_rule(
self.share["id"], self.access_type, access_to,
version=version)['access']
client = self.shares_v2_client
# create rule
rule = self.allow_access(
self.share["id"], client=client, access_type=self.access_type,
access_to=access_to, version=version)
for key in ('deleted', 'deleted_at', 'instance_mappings'):
self.assertNotIn(key, rule.keys())
self.assertEqual('rw', rule['access_level'])
# rules must start out in 'new' until 2.28 & 'queued_to_apply' after
if utils.is_microversion_le(version, "2.27"):
self.assertEqual("new", rule['state'])
else:
self.assertEqual("queued_to_apply", rule['state'])
if utils.is_microversion_eq(version, '1.0'):
waiters.wait_for_resource_status(
self.shares_client, self.share["id"], "active",
resource_name='access_rule', rule_id=rule["id"])
elif utils.is_microversion_eq(version, '2.9'):
waiters.wait_for_resource_status(
self.shares_v2_client, self.share["id"], "active",
resource_name='access_rule', rule_id=rule["id"])
else:
waiters.wait_for_resource_status(
self.shares_v2_client, self.share["id"], "active",
status_attr='access_rules_status', version=version)
# delete rule and wait for deletion
if utils.is_microversion_eq(version, '1.0'):
self.shares_client.delete_access_rule(self.share["id"], rule["id"])
self.shares_client.wait_for_resource_deletion(
rule_id=rule["id"], share_id=self.share['id'])
else:
self.shares_v2_client.delete_access_rule(
self.share["id"], rule["id"], version=version)
self.shares_v2_client.wait_for_resource_deletion(
rule_id=rule["id"], share_id=self.share['id'], version=version)
@decorators.idempotent_id('187a4fb0-ba1d-45b9-83c9-f0272e7e6f3e')
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
@testtools.skipIf(
@@ -283,15 +213,15 @@ class ShareUserRulesForNFSTest(base.BaseSharesMixedTest):
@ddt.data(*utils.deduplicate(['1.0', '2.9', '2.27', '2.28',
LATEST_MICROVERSION]))
def test_create_delete_user_rule(self, version):
if utils.is_microversion_le(version, '2.9'):
client = self.shares_client
else:
client = self.shares_v2_client
# create rule
if utils.is_microversion_eq(version, '1.0'):
rule = self.shares_client.create_access_rule(
self.share["id"], self.access_type, self.access_to)['access']
else:
rule = self.shares_v2_client.create_access_rule(
self.share["id"], self.access_type, self.access_to,
version=version)['access']
rule = self.allow_access(
self.share["id"], client=client, access_type=self.access_type,
access_to=self.access_to, version=version)
self.assertEqual('rw', rule['access_level'])
for key in ('deleted', 'deleted_at', 'instance_mappings'):
@@ -303,30 +233,6 @@ class ShareUserRulesForNFSTest(base.BaseSharesMixedTest):
else:
self.assertEqual("queued_to_apply", rule['state'])
if utils.is_microversion_eq(version, '1.0'):
waiters.wait_for_resource_status(
self.shares_client, self.share["id"], "active",
resource_name='access_rule', rule_id=rule["id"])
elif utils.is_microversion_eq(version, '2.9'):
waiters.wait_for_resource_status(
self.shares_v2_client, self.share["id"], "active",
resource_name='access_rule', rule_id=rule["id"])
else:
waiters.wait_for_resource_status(
self.shares_v2_client, self.share["id"], "active",
status_attr='access_rules_status', version=version)
# delete rule and wait for deletion
if utils.is_microversion_eq(version, '1.0'):
self.shares_client.delete_access_rule(self.share["id"], rule["id"])
self.shares_client.wait_for_resource_deletion(
rule_id=rule["id"], share_id=self.share['id'])
else:
self.shares_v2_client.delete_access_rule(
self.share["id"], rule["id"], version=version)
self.shares_v2_client.wait_for_resource_deletion(
rule_id=rule["id"], share_id=self.share['id'], version=version)
@decorators.idempotent_id('ccb08342-b7ef-4dda-84ba-8de9879d8862')
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
@testtools.skipIf(
@@ -387,15 +293,15 @@ class ShareCertRulesForGLUSTERFSTest(base.BaseSharesMixedTest):
@ddt.data(*utils.deduplicate(['1.0', '2.9', '2.27', '2.28',
LATEST_MICROVERSION]))
def test_create_delete_cert_rule(self, version):
if utils.is_microversion_le(version, '2.9'):
client = self.shares_client
else:
client = self.shares_v2_client
# create rule
if utils.is_microversion_eq(version, '1.0'):
rule = self.shares_client.create_access_rule(
self.share["id"], self.access_type, self.access_to)['access']
else:
rule = self.shares_v2_client.create_access_rule(
self.share["id"], self.access_type, self.access_to,
version=version)['access']
rule = self.allow_access(
self.share["id"], client=client, access_type=self.access_type,
access_to=self.access_to, version=version)
self.assertEqual('rw', rule['access_level'])
for key in ('deleted', 'deleted_at', 'instance_mappings'):
@@ -407,30 +313,6 @@ class ShareCertRulesForGLUSTERFSTest(base.BaseSharesMixedTest):
else:
self.assertEqual("queued_to_apply", rule['state'])
if utils.is_microversion_eq(version, '1.0'):
waiters.wait_for_resource_status(
self.shares_client, self.share["id"], "active",
resource_name='access_rule', rule_id=rule["id"])
elif utils.is_microversion_eq(version, '2.9'):
waiters.wait_for_resource_status(
self.shares_v2_client, self.share["id"], "active",
resource_name='access_rule', rule_id=rule["id"])
else:
waiters.wait_for_resource_status(
self.shares_v2_client, self.share["id"], "active",
status_attr='access_rules_status', version=version)
# delete rule
if utils.is_microversion_eq(version, '1.0'):
self.shares_client.delete_access_rule(self.share["id"], rule["id"])
self.shares_client.wait_for_resource_deletion(
rule_id=rule["id"], share_id=self.share['id'])
else:
self.shares_v2_client.delete_access_rule(
self.share["id"], rule["id"], version=version)
self.shares_v2_client.wait_for_resource_deletion(
rule_id=rule["id"], share_id=self.share['id'], version=version)
@decorators.idempotent_id('cdd93d8e-7255-4ed4-8ef0-929a62bb302c')
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
@testtools.skipIf(
@@ -439,13 +321,13 @@ class ShareCertRulesForGLUSTERFSTest(base.BaseSharesMixedTest):
@ddt.data(*utils.deduplicate(['1.0', '2.9', '2.27', '2.28',
LATEST_MICROVERSION]))
def test_create_delete_cert_ro_access_rule(self, version):
if utils.is_microversion_eq(version, '1.0'):
rule = self.shares_client.create_access_rule(
self.share["id"], 'cert', 'client2.com', 'ro')['access']
if utils.is_microversion_le(version, '2.9'):
client = self.shares_client
else:
rule = self.shares_v2_client.create_access_rule(
self.share["id"], 'cert', 'client2.com', 'ro',
version=version)['access']
client = self.shares_v2_client
rule = self.allow_access(
self.share["id"], client=client, access_type='cert',
access_to='client2.com', access_level='ro', version=version)
self.assertEqual('ro', rule['access_level'])
for key in ('deleted', 'deleted_at', 'instance_mappings'):
@@ -457,29 +339,6 @@ class ShareCertRulesForGLUSTERFSTest(base.BaseSharesMixedTest):
else:
self.assertEqual("queued_to_apply", rule['state'])
if utils.is_microversion_eq(version, '1.0'):
waiters.wait_for_resource_status(
self.shares_client, self.share["id"], "active",
resource_name='access_rule', rule_id=rule["id"])
elif utils.is_microversion_eq(version, '2.9'):
waiters.wait_for_resource_status(
self.shares_v2_client, self.share["id"], "active",
resource_name='access_rule', rule_id=rule["id"])
else:
waiters.wait_for_resource_status(
self.shares_v2_client, self.share["id"], "active",
status_attr='access_rules_status', version=version)
if utils.is_microversion_eq(version, '1.0'):
self.shares_client.delete_access_rule(self.share["id"], rule["id"])
self.shares_client.wait_for_resource_deletion(
rule_id=rule["id"], share_id=self.share['id'])
else:
self.shares_v2_client.delete_access_rule(
self.share["id"], rule["id"], version=version)
self.shares_v2_client.wait_for_resource_deletion(
rule_id=rule["id"], share_id=self.share['id'], version=version)
@ddt.ddt
class ShareCephxRulesForCephFSTest(base.BaseSharesMixedTest):
@@ -518,31 +377,21 @@ class ShareCephxRulesForCephFSTest(base.BaseSharesMixedTest):
('rw', 'ro')))
@ddt.unpack
def test_create_delete_cephx_rule(self, version, access_to, access_level):
rule = self.shares_v2_client.create_access_rule(
self.share["id"], self.access_type, access_to, version=version,
access_level=access_level)['access']
rule = self.allow_access(
self.share["id"], access_type=self.access_type,
access_to=access_to, version=version, access_level=access_level)
self.assertEqual(access_level, rule['access_level'])
for key in ('deleted', 'deleted_at', 'instance_mappings'):
self.assertNotIn(key, rule.keys())
waiters.wait_for_resource_status(
self.shares_v2_client, self.share["id"], "active",
resource_name='access_rule', rule_id=rule["id"])
self.shares_v2_client.delete_access_rule(
self.share["id"], rule["id"], version=version)
self.shares_v2_client.wait_for_resource_deletion(
rule_id=rule["id"], share_id=self.share['id'])
@decorators.idempotent_id('ad907303-a439-4fcb-8845-fe91ecab7dc2')
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
def test_different_users_in_same_tenant_can_use_same_cephx_id(self):
# Grant access to the share
access1 = self.shares_v2_client.create_access_rule(
self.share['id'], self.access_type, self.access_to, 'rw')['access']
waiters.wait_for_resource_status(
self.shares_v2_client, self.share["id"], "active",
resource_name='access_rule', rule_id=access1["id"])
self.allow_access(
self.share['id'], access_type=self.access_type,
access_to=self.access_to, access_level='rw')
# Create a new user in the current project
project = self.os_admin.projects_client.show_project(
@@ -556,11 +405,10 @@ class ShareCephxRulesForCephFSTest(base.BaseSharesMixedTest):
# Grant access to the second share using the same cephx ID that was
# used in access1
access2 = user_client.shares_v2_client.create_access_rule(
share2['id'], self.access_type, self.access_to, 'rw')['access']
waiters.wait_for_resource_status(
user_client.shares_v2_client, share2['id'], "active",
resource_name='access_rule', rule_id=access2['id'])
self.allow_access(
share2['id'], client=user_client.shares_v2_client,
access_type=self.access_type, access_to=self.access_to,
access_level='rw')
@ddt.ddt
@@ -612,14 +460,14 @@ class ShareRulesTest(base.BaseSharesMixedTest):
metadata = None
if utils.is_microversion_ge(version, '2.45'):
metadata = {'key1': 'v1', 'key2': 'v2'}
# create rule
if utils.is_microversion_eq(version, '1.0'):
rule = self.shares_client.create_access_rule(
self.share["id"], self.access_type, self.access_to)['access']
if utils.is_microversion_le(version, '2.9'):
client = self.shares_client
else:
rule = self.shares_v2_client.create_access_rule(
self.share["id"], self.access_type, self.access_to,
metadata=metadata, version=version)['access']
client = self.shares_v2_client
# create rule
rule = self.allow_access(
self.share["id"], client=client, access_type=self.access_type,
access_to=self.access_to, metadata=metadata, version=version)
# verify added rule keys since 2.33 when create rule
if utils.is_microversion_ge(version, '2.33'):
@@ -635,19 +483,6 @@ class ShareRulesTest(base.BaseSharesMixedTest):
else:
self.assertEqual("queued_to_apply", rule['state'])
if utils.is_microversion_eq(version, '1.0'):
waiters.wait_for_resource_status(
self.shares_client, self.share["id"], "active",
resource_name="access_rule", rule_id=rule["id"])
elif utils.is_microversion_eq(version, '2.9'):
waiters.wait_for_resource_status(
self.shares_v2_client, self.share["id"], "active",
resource_name="access_rule", rule_id=rule["id"])
else:
waiters.wait_for_resource_status(
self.shares_v2_client, self.share["id"], "active",
status_attr='access_rules_status', version=version)
# list rules
if utils.is_microversion_eq(version, '1.0'):
rules = self.shares_client.list_access_rules(
@@ -684,16 +519,6 @@ class ShareRulesTest(base.BaseSharesMixedTest):
msg = "expected id lists %s times in rule list" % (len(gen))
self.assertEqual(1, len(gen), msg)
if utils.is_microversion_eq(version, '1.0'):
self.shares_client.delete_access_rule(self.share["id"], rule["id"])
self.shares_client.wait_for_resource_deletion(
rule_id=rule["id"], share_id=self.share['id'])
else:
self.shares_v2_client.delete_access_rule(
self.share["id"], rule["id"], version=version)
self.shares_v2_client.wait_for_resource_deletion(
rule_id=rule["id"], share_id=self.share['id'], version=version)
@decorators.idempotent_id('b77bcbda-9754-48f0-9be6-79341ad1af64')
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
@ddt.data(*utils.deduplicate(['1.0', '2.9', '2.27', '2.28',
@@ -704,18 +529,18 @@ class ShareRulesTest(base.BaseSharesMixedTest):
msg = ("API version %s does not support cephx access type, need "
"version >= 2.13." % version)
raise self.skipException(msg)
if utils.is_microversion_le(version, '2.9'):
client = self.shares_client
else:
client = self.shares_v2_client
# create share
share = self.create_share(share_type_id=self.share_type_id)
# create rule
if utils.is_microversion_eq(version, '1.0'):
rule = self.shares_client.create_access_rule(
share["id"], self.access_type, self.access_to)['access']
else:
rule = self.shares_v2_client.create_access_rule(
share["id"], self.access_type, self.access_to,
version=version)['access']
rule = self.allow_access(
share["id"], client=client, access_type=self.access_type,
access_to=self.access_to, version=version, cleanup=False)
# rules must start out in 'new' until 2.28 & 'queued_to_apply' after
if utils.is_microversion_le(version, "2.27"):
@@ -723,19 +548,6 @@ class ShareRulesTest(base.BaseSharesMixedTest):
else:
self.assertEqual("queued_to_apply", rule['state'])
if utils.is_microversion_eq(version, '1.0'):
waiters.wait_for_resource_status(
self.shares_client, self.share["id"], "active",
resource_name="access_rule", rule_id=rule["id"])
elif utils.is_microversion_eq(version, '2.9'):
waiters.wait_for_resource_status(
self.shares_v2_client, self.share["id"], "active",
resource_name="access_rule", rule_id=rule["id"])
else:
waiters.wait_for_resource_status(
self.shares_v2_client, share["id"], "active",
status_attr='access_rules_status', version=version)
# delete share
if utils.is_microversion_eq(version, '1.0'):
self.shares_client.delete_share(share['id'])

View File

@@ -99,27 +99,15 @@ class ShareIpRulesForNFSNegativeTest(base.BaseSharesMixedTest):
access_type = "ip"
access_to = "1.2.3.4"
# create rule
if utils.is_microversion_eq(version, '1.0'):
rule = self.shares_client.create_access_rule(
self.share["id"], access_type, access_to)['access']
client = self.shares_client
else:
rule = self.shares_v2_client.create_access_rule(
self.share["id"], access_type, access_to,
version=version)['access']
client = self.shares_v2_client
if utils.is_microversion_eq(version, '1.0'):
waiters.wait_for_resource_status(
self.shares_client, self.share["id"], "active",
resource_name='access_rule', rule_id=rule["id"])
elif utils.is_microversion_eq(version, '2.9'):
waiters.wait_for_resource_status(
self.shares_v2_client, self.share["id"], "active",
resource_name="access_rule", rule_id=rule["id"])
else:
waiters.wait_for_resource_status(
self.shares_v2_client, self.share["id"], "active",
status_attr='access_rules_status', version=version)
# create rule
self.allow_access(
self.share["id"], client=client, access_type=access_type,
access_to=access_to, version=version)
# try create duplicate of rule
if utils.is_microversion_eq(version, '1.0'):
@@ -132,18 +120,6 @@ class ShareIpRulesForNFSNegativeTest(base.BaseSharesMixedTest):
self.share["id"], access_type, access_to,
version=version)
# delete rule and wait for deletion
if utils.is_microversion_eq(version, '1.0'):
self.shares_client.delete_access_rule(self.share["id"],
rule["id"])
self.shares_client.wait_for_resource_deletion(
rule_id=rule["id"], share_id=self.share["id"])
else:
self.shares_v2_client.delete_access_rule(self.share["id"],
rule["id"])
self.shares_v2_client.wait_for_resource_deletion(
rule_id=rule["id"], share_id=self.share["id"], version=version)
@decorators.idempotent_id('63932d1d-a60a-4af7-ba3b-7cf6c68aaee9')
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
@ddt.data("10.20.30.40", "fd8c:b029:bba6:ac54::1",
@@ -157,13 +133,8 @@ class ShareIpRulesForNFSNegativeTest(base.BaseSharesMixedTest):
"is %s" % CONF.share.max_api_microversion)
raise self.skipException(reason)
rule = self.shares_v2_client.create_access_rule(
self.share["id"], "ip", access_to)['access']
self.addCleanup(self.shares_v2_client.delete_access_rule,
self.share["id"], rule['id'])
waiters.wait_for_resource_status(
self.shares_v2_client, self.share["id"], "active",
status_attr='access_rules_status')
self.allow_access(
self.share["id"], access_type="ip", access_to=access_to)
self.assertRaises(lib_exc.BadRequest,
self.shares_v2_client.create_access_rule,
Reference in New Issue
openstack/manila-tempest-plugin
Block a user
Blocking a user prevents them from interacting with repositories, such as opening or commenting on pull requests or issues. Learn more about blocking a user.

The note is not visible to the blocked user.