Remove neutron OO wrappers
There are object-oriented wrappers around neutron objects that add
needless complexity to the code and make it more difficult to
read, understand and debug. This patch will remove those wrappers
(which are all contained in tempest/scenario/network_resources.py).
The tests that previously used those wrappers have been updated to
client results directly.
Co-Authored-by: Joshua White <joshua.l.white@intel.com>
Change-Id: I3879ae24e47590d12fb09a4bb4697f139f57caf7
diff --git a/tempest/scenario/test_network_basic_ops.py b/tempest/scenario/test_network_basic_ops.py
index f5134e5..402a70c 100644
--- a/tempest/scenario/test_network_basic_ops.py
+++ b/tempest/scenario/test_network_basic_ops.py
@@ -25,7 +25,6 @@
from tempest import exceptions
from tempest.lib.common.utils import test_utils
from tempest.scenario import manager
-from tempest.scenario import network_resources
from tempest import test
CONF = config.CONF
@@ -114,7 +113,7 @@
self.port_id = None
if boot_with_port:
# create a port on the network and boot with that
- self.port_id = self._create_port(self.network['id']).id
+ self.port_id = self._create_port(self.network['id'])['id']
self.ports.append({'port': self.port_id})
name = data_utils.rand_name('server-smoke')
@@ -133,30 +132,30 @@
seen_nets = self._list_networks()
seen_names = [n['name'] for n in seen_nets]
seen_ids = [n['id'] for n in seen_nets]
- self.assertIn(self.network.name, seen_names)
- self.assertIn(self.network.id, seen_ids)
+ self.assertIn(self.network['name'], seen_names)
+ self.assertIn(self.network['id'], seen_ids)
if self.subnet:
seen_subnets = self._list_subnets()
seen_net_ids = [n['network_id'] for n in seen_subnets]
seen_subnet_ids = [n['id'] for n in seen_subnets]
- self.assertIn(self.network.id, seen_net_ids)
- self.assertIn(self.subnet.id, seen_subnet_ids)
+ self.assertIn(self.network['id'], seen_net_ids)
+ self.assertIn(self.subnet['id'], seen_subnet_ids)
if self.router:
seen_routers = self._list_routers()
seen_router_ids = [n['id'] for n in seen_routers]
seen_router_names = [n['name'] for n in seen_routers]
- self.assertIn(self.router.name,
+ self.assertIn(self.router['name'],
seen_router_names)
- self.assertIn(self.router.id,
+ self.assertIn(self.router['id'],
seen_router_ids)
def _create_server(self, name, network, port_id=None):
keypair = self.create_keypair()
self.keypairs[keypair['name']] = keypair
security_groups = [{'name': self.security_group['name']}]
- network = {'uuid': network.id}
+ network = {'uuid': network['id']}
if port_id is not None:
network['port'] = port_id
@@ -198,7 +197,7 @@
"""
ssh_login = CONF.validation.image_ssh_user
floating_ip, server = self.floating_ip_tuple
- ip_address = floating_ip.floating_ip_address
+ ip_address = floating_ip['floating_ip_address']
private_key = None
floatingip_status = 'DOWN'
if should_connect:
@@ -239,7 +238,7 @@
def _hotplug_server(self):
old_floating_ip, server = self.floating_ip_tuple
- ip_address = old_floating_ip.floating_ip_address
+ ip_address = old_floating_ip['floating_ip_address']
private_key = self._get_server_key(server)
ssh_client = self.get_remote_client(
ip_address, private_key=private_key)
@@ -250,7 +249,7 @@
old_port = port_list[0]
interface = self.interface_client.create_interface(
server_id=server['id'],
- net_id=self.new_net.id)['interfaceAttachment']
+ net_id=self.new_net['id'])['interfaceAttachment']
self.addCleanup(self.ports_client.wait_for_resource_deletion,
interface['port_id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
@@ -270,9 +269,7 @@
"Old port: %s. Number of new ports: %d" % (
CONF.network.build_timeout, old_port,
len(self.new_port_list)))
- new_port = network_resources.DeletablePort(
- ports_client=self.ports_client,
- **self.new_port_list[0])
+ new_port = self.new_port_list[0]
def check_new_nic():
new_nic_list = self._get_server_nics(ssh_client)
@@ -287,7 +284,8 @@
num, new_nic = self.diff_list[0]
ssh_client.assign_static_ip(nic=new_nic,
- addr=new_port.fixed_ips[0]['ip_address'])
+ addr=new_port['fixed_ips'][0][
+ 'ip_address'])
ssh_client.set_nic_state(nic=new_nic)
def _get_server_nics(self, ssh_client):
@@ -307,7 +305,7 @@
# get all network ports in the new network
internal_ips = (p['fixed_ips'][0]['ip_address'] for p in
self._list_ports(tenant_id=server['tenant_id'],
- network_id=network.id)
+ network_id=network['id'])
if p['device_owner'].startswith('network'))
self._check_server_connectivity(floating_ip,
@@ -335,7 +333,7 @@
def _check_server_connectivity(self, floating_ip, address_list,
should_connect=True):
- ip_address = floating_ip.floating_ip_address
+ ip_address = floating_ip['floating_ip_address']
private_key = self._get_server_key(self.floating_ip_tuple.server)
ssh_source = self.get_remote_client(
ip_address, private_key=private_key)
@@ -451,7 +449,13 @@
self._create_server(name, self.new_net)
self._check_network_internal_connectivity(network=self.new_net,
should_connect=False)
- self.new_subnet.add_to_router(self.router.id)
+ router_id = self.router['id']
+ self.routers_client.add_router_interface(
+ router_id, subnet_id=self.new_subnet['id'])
+
+ self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+ self.routers_client.remove_router_interface,
+ router_id, subnet_id=self.new_subnet['id'])
self._check_network_internal_connectivity(network=self.new_net,
should_connect=True)
@@ -553,7 +557,7 @@
self.check_public_network_connectivity(should_connect=True)
floating_ip, server = self.floating_ip_tuple
- ip_address = floating_ip.floating_ip_address
+ ip_address = floating_ip['floating_ip_address']
private_key = self._get_server_key(server)
ssh_client = self.get_remote_client(
ip_address, private_key=private_key)
@@ -568,9 +572,11 @@
act_serv=servers,
trgt_serv=dns_servers))
- self.subnet.update(dns_nameservers=[alt_dns_server])
+ self.subnet = self.subnets_client.update_subnet(
+ self.subnet['id'], dns_nameservers=[alt_dns_server])['subnet']
+
# asserts that Neutron DB has updated the nameservers
- self.assertEqual([alt_dns_server], self.subnet.dns_nameservers,
+ self.assertEqual([alt_dns_server], self.subnet['dns_nameservers'],
"Failed to update subnet's nameservers")
def check_new_dns_server():
@@ -695,7 +701,8 @@
# NOTE(kevinbenton): we have to use the admin credentials to check
# for the distributed flag because self.router only has a project view.
- admin = self.admin_manager.routers_client.show_router(self.router.id)
+ admin = self.admin_manager.routers_client.show_router(
+ self.router['id'])
if admin['router'].get('distributed', False):
msg = "Rescheduling test does not apply to distributed routers."
raise self.skipException(msg)
@@ -704,16 +711,16 @@
# remove resource from agents
hosting_agents = set(a["id"] for a in
- list_hosts(self.router.id)['agents'])
+ list_hosts(self.router['id'])['agents'])
no_migration = agent_list_alive == hosting_agents
LOG.info("Router will be assigned to {mig} hosting agent".
format(mig="the same" if no_migration else "a new"))
for hosting_agent in hosting_agents:
- unschedule_router(hosting_agent, self.router.id)
+ unschedule_router(hosting_agent, self.router['id'])
self.assertNotIn(hosting_agent,
[a["id"] for a in
- list_hosts(self.router.id)['agents']],
+ list_hosts(self.router['id'])['agents']],
'unscheduling router failed')
# verify resource is un-functional
@@ -730,7 +737,7 @@
router_id=self.router['id'])
self.assertEqual(
target_agent,
- list_hosts(self.router.id)['agents'][0]['id'],
+ list_hosts(self.router['id'])['agents'][0]['id'],
"Router failed to reschedule. Hosting agent doesn't match "
"target agent")
@@ -776,12 +783,12 @@
network_id=self.new_net["id"])
spoof_port = new_ports[0]
private_key = self._get_server_key(server)
- ssh_client = self.get_remote_client(fip.floating_ip_address,
+ ssh_client = self.get_remote_client(fip['floating_ip_address'],
private_key=private_key)
spoof_nic = ssh_client.get_nic_name_by_mac(spoof_port["mac_address"])
name = data_utils.rand_name('peer')
peer = self._create_server(name, self.new_net)
- peer_address = peer['addresses'][self.new_net.name][0]['addr']
+ peer_address = peer['addresses'][self.new_net['name']][0]['addr']
self._check_remote_connectivity(ssh_client, dest=peer_address,
nic=spoof_nic, should_succeed=True)
ssh_client.set_mac_address(spoof_nic, spoof_mac)