blob: e12dedd788d19b9f8b869b7738a15098d1a69284 [file] [log] [blame]
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from tempest.api.network import base
from tempest.common.utils import data_utils
from tempest import config
from tempest import exceptions
from tempest.test import attr
CONF = config.CONF
class NetworksTestJSON(base.BaseNetworkTest):
_interface = 'json'
"""
Tests the following operations in the Neutron API using the REST client for
Neutron:
create a network for a tenant
list tenant's networks
show a tenant network details
create a subnet for a tenant
list tenant's subnets
show a tenant subnet details
port create
port delete
port list
port show
port update
network update
subnet update
v2.0 of the Neutron API is assumed. It is also assumed that the following
options are defined in the [network] section of etc/tempest.conf:
tenant_network_cidr with a block of cidr's from which smaller blocks
can be allocated for tenant networks
tenant_network_mask_bits with the mask bits to be used to partition the
block defined by tenant-network_cidr
"""
@classmethod
def setUpClass(cls):
super(NetworksTestJSON, cls).setUpClass()
cls.network = cls.create_network()
cls.name = cls.network['name']
cls.subnet = cls.create_subnet(cls.network)
cls.cidr = cls.subnet['cidr']
cls.port = cls.create_port(cls.network)
@attr(type='smoke')
def test_create_update_delete_network_subnet(self):
# Creates a network
name = data_utils.rand_name('network-')
resp, body = self.client.create_network(name=name)
self.assertEqual('201', resp['status'])
network = body['network']
net_id = network['id']
# Verification of network update
new_name = "New_network"
resp, body = self.client.update_network(net_id, name=new_name)
self.assertEqual('200', resp['status'])
updated_net = body['network']
self.assertEqual(updated_net['name'], new_name)
# Find a cidr that is not in use yet and create a subnet with it
cidr = netaddr.IPNetwork(CONF.network.tenant_network_cidr)
mask_bits = CONF.network.tenant_network_mask_bits
for subnet_cidr in cidr.subnet(mask_bits):
try:
resp, body = self.client.create_subnet(
network_id=net_id,
cidr=str(subnet_cidr),
ip_version=4)
break
except exceptions.BadRequest as e:
is_overlapping_cidr = 'overlaps with another subnet' in str(e)
if not is_overlapping_cidr:
raise
self.assertEqual('201', resp['status'])
subnet = body['subnet']
subnet_id = subnet['id']
# Verification of subnet update
new_subnet = "New_subnet"
resp, body = self.client.update_subnet(subnet_id,
name=new_subnet)
self.assertEqual('200', resp['status'])
updated_subnet = body['subnet']
self.assertEqual(updated_subnet['name'], new_subnet)
# Delete subnet and network
resp, body = self.client.delete_subnet(subnet_id)
self.assertEqual('204', resp['status'])
resp, body = self.client.delete_network(net_id)
self.assertEqual('204', resp['status'])
@attr(type='smoke')
def test_show_network(self):
# Verifies the details of a network
resp, body = self.client.show_network(self.network['id'])
self.assertEqual('200', resp['status'])
network = body['network']
self.assertEqual(self.network['id'], network['id'])
self.assertEqual(self.name, network['name'])
@attr(type='smoke')
def test_show_network_fields(self):
# Verifies showing some fields of a network works
field_list = [('fields', 'id'), ('fields', 'name'), ]
resp, body = self.client.show_network(self.network['id'],
field_list=field_list)
self.assertEqual('200', resp['status'])
network = body['network']
self.assertEqual(len(network), 2)
self.assertEqual(self.network['id'], network['id'])
self.assertEqual(self.name, network['name'])
@attr(type='smoke')
def test_list_networks(self):
# Verify the network exists in the list of all networks
resp, body = self.client.list_networks()
self.assertEqual('200', resp['status'])
networks = body['networks']
found = None
for n in networks:
if (n['id'] == self.network['id']):
found = n['id']
msg = "Network list doesn't contain created network"
self.assertIsNotNone(found, msg)
@attr(type='smoke')
def test_list_networks_fields(self):
# Verify listing some fields of the networks
resp, body = self.client.list_networks(fields='id')
self.assertEqual('200', resp['status'])
networks = body['networks']
found = None
for n in networks:
self.assertEqual(len(n), 1)
self.assertIn('id', n)
if (n['id'] == self.network['id']):
found = n['id']
self.assertIsNotNone(found,
"Created network id not found in the list")
@attr(type='smoke')
def test_show_subnet(self):
# Verifies the details of a subnet
resp, body = self.client.show_subnet(self.subnet['id'])
self.assertEqual('200', resp['status'])
subnet = body['subnet']
self.assertEqual(self.subnet['id'], subnet['id'])
self.assertEqual(self.cidr, subnet['cidr'])
@attr(type='smoke')
def test_show_subnet_fields(self):
# Verifies showing some fields of a subnet works
field_list = [('fields', 'id'), ('fields', 'cidr'), ]
resp, body = self.client.show_subnet(self.subnet['id'],
field_list=field_list)
self.assertEqual('200', resp['status'])
subnet = body['subnet']
self.assertEqual(len(subnet), 2)
self.assertEqual(self.subnet['id'], subnet['id'])
self.assertEqual(self.cidr, subnet['cidr'])
@attr(type='smoke')
def test_list_subnets(self):
# Verify the subnet exists in the list of all subnets
resp, body = self.client.list_subnets()
self.assertEqual('200', resp['status'])
subnets = body['subnets']
found = None
for n in subnets:
if (n['id'] == self.subnet['id']):
found = n['id']
msg = "Subnet list doesn't contain created subnet"
self.assertIsNotNone(found, msg)
@attr(type='smoke')
def test_list_subnets_fields(self):
# Verify listing some fields of the subnets
resp, body = self.client.list_subnets(fields='id')
self.assertEqual('200', resp['status'])
subnets = body['subnets']
found = None
for n in subnets:
self.assertEqual(len(n), 1)
self.assertIn('id', n)
if (n['id'] == self.subnet['id']):
found = n['id']
self.assertIsNotNone(found,
"Created subnet id not found in the list")
@attr(type='smoke')
def test_create_update_delete_port(self):
# Verify that successful port creation, update & deletion
resp, body = self.client.create_port(
network_id=self.network['id'])
self.assertEqual('201', resp['status'])
port = body['port']
self.assertTrue(port['admin_state_up'])
# Verification of port update
new_port = "New_Port"
resp, body = self.client.update_port(
port['id'],
name=new_port,
admin_state_up=False)
self.assertEqual('200', resp['status'])
updated_port = body['port']
self.assertEqual(updated_port['name'], new_port)
self.assertFalse(updated_port['admin_state_up'])
# Verification of port delete
resp, body = self.client.delete_port(port['id'])
self.assertEqual('204', resp['status'])
@attr(type='smoke')
def test_show_port(self):
# Verify the details of port
resp, body = self.client.show_port(self.port['id'])
self.assertEqual('200', resp['status'])
port = body['port']
self.assertEqual(self.port['id'], port['id'])
@attr(type='smoke')
def test_show_port_fields(self):
# Verifies showing fields of a port works
field_list = [('fields', 'id'), ]
resp, body = self.client.show_port(self.port['id'],
field_list=field_list)
self.assertEqual('200', resp['status'])
port = body['port']
self.assertEqual(len(port), 1)
self.assertEqual(self.port['id'], port['id'])
@attr(type='smoke')
def test_list_ports(self):
# Verify the port exists in the list of all ports
resp, body = self.client.list_ports()
self.assertEqual('200', resp['status'])
ports_list = body['ports']
found = None
for n in ports_list:
if (n['id'] == self.port['id']):
found = n['id']
self.assertIsNotNone(found, "Port list doesn't contain created port")
@attr(type='smoke')
def test_list_ports_fields(self):
# Verify listing some fields of the ports
resp, body = self.client.list_ports(fields='id')
self.assertEqual('200', resp['status'])
ports_list = body['ports']
found = None
for n in ports_list:
self.assertEqual(len(n), 1)
self.assertIn('id', n)
if (n['id'] == self.port['id']):
found = n['id']
self.assertIsNotNone(found,
"Created port id not found in the list")
class NetworksTestXML(NetworksTestJSON):
_interface = 'xml'
class BulkNetworkOpsTestJSON(base.BaseNetworkTest):
_interface = 'json'
"""
Tests the following operations in the Neutron API using the REST client for
Neutron:
bulk network creation
bulk subnet creation
bulk port creation
list tenant's networks
v2.0 of the Neutron API is assumed. It is also assumed that the following
options are defined in the [network] section of etc/tempest.conf:
tenant_network_cidr with a block of cidr's from which smaller blocks
can be allocated for tenant networks
tenant_network_mask_bits with the mask bits to be used to partition the
block defined by tenant-network_cidr
"""
@classmethod
def setUpClass(cls):
super(BulkNetworkOpsTestJSON, cls).setUpClass()
cls.network1 = cls.create_network()
cls.network2 = cls.create_network()
def _delete_networks(self, created_networks):
for n in created_networks:
resp, body = self.client.delete_network(n['id'])
self.assertEqual(204, resp.status)
# Asserting that the networks are not found in the list after deletion
resp, body = self.client.list_networks()
networks_list = list()
for network in body['networks']:
networks_list.append(network['id'])
for n in created_networks:
self.assertNotIn(n['id'], networks_list)
def _delete_subnets(self, created_subnets):
for n in created_subnets:
resp, body = self.client.delete_subnet(n['id'])
self.assertEqual(204, resp.status)
# Asserting that the subnets are not found in the list after deletion
resp, body = self.client.list_subnets()
subnets_list = list()
for subnet in body['subnets']:
subnets_list.append(subnet['id'])
for n in created_subnets:
self.assertNotIn(n['id'], subnets_list)
def _delete_ports(self, created_ports):
for n in created_ports:
resp, body = self.client.delete_port(n['id'])
self.assertEqual(204, resp.status)
# Asserting that the ports are not found in the list after deletion
resp, body = self.client.list_ports()
ports_list = list()
for port in body['ports']:
ports_list.append(port['id'])
for n in created_ports:
self.assertNotIn(n['id'], ports_list)
@attr(type='smoke')
def test_bulk_create_delete_network(self):
# Creates 2 networks in one request
network_names = [data_utils.rand_name('network-'),
data_utils.rand_name('network-')]
resp, body = self.client.create_bulk_network(2, network_names)
created_networks = body['networks']
self.assertEqual('201', resp['status'])
self.addCleanup(self._delete_networks, created_networks)
# Asserting that the networks are found in the list after creation
resp, body = self.client.list_networks()
networks_list = list()
for network in body['networks']:
networks_list.append(network['id'])
for n in created_networks:
self.assertIsNotNone(n['id'])
self.assertIn(n['id'], networks_list)
@attr(type='smoke')
def test_bulk_create_delete_subnet(self):
# Creates 2 subnets in one request
cidr = netaddr.IPNetwork(CONF.network.tenant_network_cidr)
mask_bits = CONF.network.tenant_network_mask_bits
cidrs = []
for subnet_cidr in cidr.subnet(mask_bits):
cidrs.append(subnet_cidr)
names = []
networks = [self.network1['id'], self.network2['id']]
for i in range(len(networks)):
names.append(data_utils.rand_name('subnet-'))
subnet_list = []
# TODO(raies): "for IPv6, version list [4, 6] will be used.
# and cidr for IPv6 will be of IPv6"
ip_version = [4, 4]
for i in range(len(names)):
p1 = {
'network_id': networks[i],
'cidr': str(cidrs[(i)]),
'name': names[i],
'ip_version': ip_version[i]
}
subnet_list.append(p1)
del subnet_list[1]['name']
resp, body = self.client.create_bulk_subnet(subnet_list)
created_subnets = body['subnets']
self.addCleanup(self._delete_subnets, created_subnets)
self.assertEqual('201', resp['status'])
# Asserting that the subnets are found in the list after creation
resp, body = self.client.list_subnets()
subnets_list = list()
for subnet in body['subnets']:
subnets_list.append(subnet['id'])
for n in created_subnets:
self.assertIsNotNone(n['id'])
self.assertIn(n['id'], subnets_list)
@attr(type='smoke')
def test_bulk_create_delete_port(self):
# Creates 2 ports in one request
names = []
networks = [self.network1['id'], self.network2['id']]
for i in range(len(networks)):
names.append(data_utils.rand_name('port-'))
port_list = []
state = [True, False]
for i in range(len(names)):
p1 = {
'network_id': networks[i],
'name': names[i],
'admin_state_up': state[i],
}
port_list.append(p1)
del port_list[1]['name']
resp, body = self.client.create_bulk_port(port_list)
created_ports = body['ports']
self.addCleanup(self._delete_ports, created_ports)
self.assertEqual('201', resp['status'])
# Asserting that the ports are found in the list after creation
resp, body = self.client.list_ports()
ports_list = list()
for port in body['ports']:
ports_list.append(port['id'])
for n in created_ports:
self.assertIsNotNone(n['id'])
self.assertIn(n['id'], ports_list)
class BulkNetworkOpsTestXML(BulkNetworkOpsTestJSON):
_interface = 'xml'