Merge "Add missing tests for compute_node_* methods"
This commit is contained in:
1 changed files with 148 additions and 141 deletions
@@ -1516,147 +1516,6 @@ class SqlAlchemyDbApiTestCase(DbTestCase):
self.assertEqual(types.UnicodeType, type(result[0]))
class CapacityTestCase(test.TestCase):
def setUp(self):
super(CapacityTestCase, self).setUp()
self.ctxt = context.get_admin_context()
service_dict = dict(host='host1', binary='binary1',
topic='compute', report_count=1,
disabled=False)
self.service = db.service_create(self.ctxt, service_dict)
self.compute_node_dict = dict(vcpus=2, memory_mb=1024, local_gb=2048,
vcpus_used=0, memory_mb_used=0,
local_gb_used=0, free_ram_mb=1024,
free_disk_gb=2048, hypervisor_type="xen",
hypervisor_version=1, cpu_info="",
running_vms=0, current_workload=0,
service_id=self.service['id'])
# add some random stats
stats = dict(num_instances=3, num_proj_12345=2,
num_proj_23456=2, num_vm_building=3)
self.compute_node_dict['stats'] = stats
self.flags(reserved_host_memory_mb=0)
self.flags(reserved_host_disk_mb=0)
def _create_helper(self, host):
self.compute_node_dict['host'] = host
return db.compute_node_create(self.ctxt, self.compute_node_dict)
def _stats_as_dict(self, stats):
d = {}
for s in stats:
key = s['key']
d[key] = s['value']
return d
def test_compute_node_create(self):
item = self._create_helper('host1')
self.assertEquals(item['free_ram_mb'], 1024)
self.assertEquals(item['free_disk_gb'], 2048)
self.assertEquals(item['running_vms'], 0)
self.assertEquals(item['current_workload'], 0)
stats = self._stats_as_dict(item['stats'])
self.assertEqual(3, stats['num_instances'])
self.assertEqual(2, stats['num_proj_12345'])
self.assertEqual(3, stats['num_vm_building'])
def test_compute_node_get_all(self):
item = self._create_helper('host1')
nodes = db.compute_node_get_all(self.ctxt)
self.assertEqual(1, len(nodes))
node = nodes[0]
self.assertEqual(2, node['vcpus'])
stats = self._stats_as_dict(node['stats'])
self.assertEqual(3, int(stats['num_instances']))
self.assertEqual(2, int(stats['num_proj_12345']))
self.assertEqual(3, int(stats['num_vm_building']))
def test_compute_node_update(self):
item = self._create_helper('host1')
compute_node_id = item['id']
stats = self._stats_as_dict(item['stats'])
# change some values:
stats['num_instances'] = 8
stats['num_tribbles'] = 1
values = {
'vcpus': 4,
'stats': stats,
}
item = db.compute_node_update(self.ctxt, compute_node_id, values)
stats = self._stats_as_dict(item['stats'])
self.assertEqual(4, item['vcpus'])
self.assertEqual(8, int(stats['num_instances']))
self.assertEqual(2, int(stats['num_proj_12345']))
self.assertEqual(1, int(stats['num_tribbles']))
def test_compute_node_update_always_updates_updated_at(self):
item = self._create_helper('host1')
item_updated = db.compute_node_update(self.ctxt,
item['id'], {})
self.assertNotEqual(item['updated_at'], item_updated['updated_at'])
def test_compute_node_stat_unchanged(self):
# don't update unchanged stat values:
item = self._create_helper('host1')
compute_node_id = item['id']
stats = self._stats_as_dict(item['stats'])
self.assertEqual(4, len(stats.keys()))
orig_update_stats = sqlalchemy_api._update_stats
def update(context, new_stats, compute_id, session, prune_stats=False):
# wrap the session object to see which stats get updated
orig_add = session.add
added = []
def add(instance):
added.append(instance)
orig_add(instance)
self.stubs.Set(session, 'add', add)
orig_update_stats(context, new_stats, compute_id, session,
prune_stats=False)
# no stats should have been added to the session:
self.assertEqual(0, len(added))
self.stubs.Set(sqlalchemy_api, '_update_stats', update)
# save with same (unchanged) stats again:
values = {'stats': stats}
db.compute_node_update(self.ctxt, compute_node_id, values)
def test_compute_node_stat_prune(self):
item = self._create_helper('host1')
for stat in item['stats']:
if stat['key'] == 'num_instances':
num_instance_stat = stat
break
values = {
'stats': dict(num_instances=1)
}
db.compute_node_update(self.ctxt, item['id'], values, prune_stats=True)
item = db.compute_node_get_all(self.ctxt)[0]
self.assertEqual(1, len(item['stats']))
stat = item['stats'][0]
self.assertEqual(num_instance_stat['id'], stat['id'])
self.assertEqual(num_instance_stat['key'], stat['key'])
self.assertEqual(1, int(stat['value']))
class MigrationTestCase(test.TestCase):
def setUp(self):
@@ -4760,6 +4619,154 @@ class S3ImageTestCase(test.TestCase):
self.ctxt, uuidutils.generate_uuid())
class ComputeNodeTestCase(test.TestCase, ModelsObjectComparatorMixin):
_ignored_keys = ['id', 'deleted', 'deleted_at', 'created_at', 'updated_at']
def setUp(self):
super(ComputeNodeTestCase, self).setUp()
self.ctxt = context.get_admin_context()
self.service_dict = dict(host='host1', binary='binary1',
topic='compute', report_count=1,
disabled=False)
self.service = db.service_create(self.ctxt, self.service_dict)
self.compute_node_dict = dict(vcpus=2, memory_mb=1024, local_gb=2048,
vcpus_used=0, memory_mb_used=0,
local_gb_used=0, free_ram_mb=1024,
free_disk_gb=2048, hypervisor_type="xen",
hypervisor_version=1, cpu_info="",
running_vms=0, current_workload=0,
service_id=self.service['id'],
disk_available_least=100,
hypervisor_hostname='abracadabra104')
# add some random stats
self.stats = dict(num_instances=3, num_proj_12345=2,
num_proj_23456=2, num_vm_building=3)
self.compute_node_dict['stats'] = self.stats
self.flags(reserved_host_memory_mb=0)
self.flags(reserved_host_disk_mb=0)
self.item = db.compute_node_create(self.ctxt, self.compute_node_dict)
def _stats_as_dict(self, stats):
d = {}
for s in stats:
key = s['key']
d[key] = s['value']
return d
def _stats_equal(self, stats, new_stats):
for k, v in stats.iteritems():
self.assertEqual(v, int(new_stats[k]))
def test_compute_node_create(self):
self._assertEqualObjects(self.compute_node_dict, self.item,
ignored_keys=self._ignored_keys + ['stats'])
new_stats = self._stats_as_dict(self.item['stats'])
self._stats_equal(self.stats, new_stats)
def test_compute_node_get_all(self):
nodes = db.compute_node_get_all(self.ctxt)
self.assertEqual(1, len(nodes))
node = nodes[0]
self._assertEqualObjects(self.compute_node_dict, node,
ignored_keys=self._ignored_keys + ['stats', 'service'])
new_stats = self._stats_as_dict(node['stats'])
self._stats_equal(self.stats, new_stats)
def test_compute_node_get(self):
compute_node_id = self.item['id']
node = db.compute_node_get(self.ctxt, compute_node_id)
self._assertEqualObjects(self.compute_node_dict, node,
ignored_keys=self._ignored_keys + ['stats', 'service'])
new_stats = self._stats_as_dict(node['stats'])
self._stats_equal(self.stats, new_stats)
def test_compute_node_update(self):
compute_node_id = self.item['id']
stats = self._stats_as_dict(self.item['stats'])
# change some values:
stats['num_instances'] = 8
stats['num_tribbles'] = 1
values = {
'vcpus': 4,
'stats': stats,
}
item_updated = db.compute_node_update(self.ctxt, compute_node_id,
values)
self.assertEqual(4, item_updated['vcpus'])
new_stats = self._stats_as_dict(item_updated['stats'])
self._stats_equal(stats, new_stats)
def test_compute_node_delete(self):
compute_node_id = self.item['id']
db.compute_node_delete(self.ctxt, compute_node_id)
nodes = db.compute_node_get_all(self.ctxt)
self.assertEqual(len(nodes), 0)
def test_compute_node_search_by_hypervisor(self):
nodes_created = []
for i in xrange(3):
service = db.service_create(self.ctxt, self.service_dict)
self.compute_node_dict['service_id'] = service['id']
self.compute_node_dict['hypervisor_hostname'] = 'testhost' + str(i)
self.compute_node_dict['stats'] = self.stats
node = db.compute_node_create(self.ctxt, self.compute_node_dict)
nodes_created.append(node)
nodes = db.compute_node_search_by_hypervisor(self.ctxt, 'host')
self.assertEqual(3, len(nodes))
self._assertEqualListsOfObjects(nodes_created, nodes,
ignored_keys=self._ignored_keys + ['stats', 'service'])
def test_compute_node_statistics(self):
stats = db.compute_node_statistics(self.ctxt)
self.assertEqual(stats.pop('count'), 1)
for k, v in stats.iteritems():
self.assertEqual(v, self.item[k])
def test_compute_node_not_found(self):
self.assertRaises(exception.ComputeHostNotFound, db.compute_node_get,
self.ctxt, 100500)
def test_compute_node_update_always_updates_updated_at(self):
item_updated = db.compute_node_update(self.ctxt,
self.item['id'], {})
self.assertNotEqual(self.item['updated_at'],
item_updated['updated_at'])
def test_compute_node_stat_unchanged(self):
# don't update unchanged stat values:
stats = self.item['stats']
stats_updated_at = dict([(stat['key'], stat['updated_at'])
for stat in stats])
stats_values = self._stats_as_dict(stats)
new_values = {'stats': stats_values}
compute_node_id = self.item['id']
db.compute_node_update(self.ctxt, compute_node_id, new_values)
updated_node = db.compute_node_get(self.ctxt, compute_node_id)
updated_stats = updated_node['stats']
for stat in updated_stats:
self.assertEqual(stat['updated_at'], stats_updated_at[stat['key']])
def test_compute_node_stat_prune(self):
for stat in self.item['stats']:
if stat['key'] == 'num_instances':
num_instance_stat = stat
break
values = {
'stats': dict(num_instances=1)
}
db.compute_node_update(self.ctxt, self.item['id'], values,
prune_stats=True)
item_updated = db.compute_node_get_all(self.ctxt)[0]
self.assertEqual(1, len(item_updated['stats']))
stat = item_updated['stats'][0]
self.assertEqual(num_instance_stat['id'], stat['id'])
self.assertEqual(num_instance_stat['key'], stat['key'])
self.assertEqual(1, int(stat['value']))
class ArchiveTestCase(test.TestCase):
def setUp(self):
Reference in New Issue
Block a user
Blocking a user prevents them from interacting with repositories, such as opening or commenting on pull requests or issues. Learn more about blocking a user.