blob: fbb25a83a5576c29695a415a549758afb0657837 [file] [log] [blame]
Elena Ezhova1ec6e182013-12-24 17:45:59 +04001# Copyright 2014 OpenStack Foundation
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16import socket
17
18from tempest.api.network import base
19from tempest.common.utils import data_utils
20from tempest import config
21from tempest import test
22
23CONF = config.CONF
24
25
26class PortsTestJSON(base.BaseNetworkTest):
27 _interface = 'json'
28
29 @classmethod
30 def setUpClass(cls):
31 super(PortsTestJSON, cls).setUpClass()
32 cls.network = cls.create_network()
33 cls.port = cls.create_port(cls.network)
34
35 def _delete_port(self, port_id):
36 resp, body = self.client.delete_port(port_id)
37 self.assertEqual('204', resp['status'])
38 resp, body = self.client.list_ports()
39 self.assertEqual('200', resp['status'])
40 ports_list = body['ports']
41 self.assertFalse(port_id in [n['id'] for n in ports_list])
42
43 @test.attr(type='smoke')
44 def test_create_update_delete_port(self):
45 # Verify port creation
46 resp, body = self.client.create_port(network_id=self.network['id'])
47 self.assertEqual('201', resp['status'])
48 port = body['port']
49 self.assertTrue(port['admin_state_up'])
50 # Verify port update
51 new_name = "New_Port"
52 resp, body = self.client.update_port(
53 port['id'],
54 name=new_name,
55 admin_state_up=False)
56 self.assertEqual('200', resp['status'])
57 updated_port = body['port']
58 self.assertEqual(updated_port['name'], new_name)
59 self.assertFalse(updated_port['admin_state_up'])
60 # Verify port deletion
61 resp, body = self.client.delete_port(port['id'])
62 self.assertEqual('204', resp['status'])
63
64 @test.attr(type='smoke')
65 def test_show_port(self):
66 # Verify the details of port
67 resp, body = self.client.show_port(self.port['id'])
68 self.assertEqual('200', resp['status'])
69 port = body['port']
70 self.assertIn('id', port)
71 self.assertEqual(port['id'], self.port['id'])
72 self.assertEqual(self.port['admin_state_up'], port['admin_state_up'])
73 self.assertEqual(self.port['device_id'], port['device_id'])
74 self.assertEqual(self.port['device_owner'], port['device_owner'])
75 self.assertEqual(self.port['mac_address'], port['mac_address'])
76 self.assertEqual(self.port['name'], port['name'])
77 self.assertEqual(self.port['security_groups'],
78 port['security_groups'])
79 self.assertEqual(self.port['network_id'], port['network_id'])
80 self.assertEqual(self.port['security_groups'],
81 port['security_groups'])
82
83 @test.attr(type='smoke')
84 def test_show_port_fields(self):
85 # Verify specific fields of a port
86 field_list = [('fields', 'id'), ]
87 resp, body = self.client.show_port(self.port['id'],
88 field_list=field_list)
89 self.assertEqual('200', resp['status'])
90 port = body['port']
91 self.assertEqual(len(port), len(field_list))
92 for label, field_name in field_list:
93 self.assertEqual(port[field_name], self.port[field_name])
94
95 @test.attr(type='smoke')
96 def test_list_ports(self):
97 # Verify the port exists in the list of all ports
98 resp, body = self.client.list_ports()
99 self.assertEqual('200', resp['status'])
100 ports = [port['id'] for port in body['ports']
101 if port['id'] == self.port['id']]
102 self.assertNotEmpty(ports, "Created port not found in the list")
103
104 @test.attr(type='smoke')
105 def test_port_list_filter_by_router_id(self):
106 # Create a router
107 network = self.create_network()
108 self.create_subnet(network)
109 router = self.create_router(data_utils.rand_name('router-'))
110 resp, port = self.client.create_port(network_id=network['id'])
111 # Add router interface to port created above
112 resp, interface = self.client.add_router_interface_with_port_id(
113 router['id'], port['port']['id'])
114 self.addCleanup(self.client.remove_router_interface_with_port_id,
115 router['id'], port['port']['id'])
116 # List ports filtered by router_id
117 resp, port_list = self.client.list_ports(
118 device_id=router['id'])
119 self.assertEqual('200', resp['status'])
120 ports = port_list['ports']
121 self.assertEqual(len(ports), 1)
122 self.assertEqual(ports[0]['id'], port['port']['id'])
123 self.assertEqual(ports[0]['device_id'], router['id'])
124
125 @test.attr(type='smoke')
126 def test_list_ports_fields(self):
127 # Verify specific fields of ports
128 resp, body = self.client.list_ports(fields='id')
129 self.assertEqual('200', resp['status'])
130 ports = body['ports']
131 self.assertNotEmpty(ports, "Port list returned is empty")
132 # Asserting the fields returned are correct
133 for port in ports:
134 self.assertEqual(len(port), 1)
135 self.assertIn('id', port)
136
137
138class PortsTestXML(PortsTestJSON):
139 _interface = 'xml'
140
141
142class PortsAdminExtendedAttrsTestJSON(base.BaseAdminNetworkTest):
143 _interface = 'json'
144
145 @classmethod
146 def setUpClass(cls):
147 super(PortsAdminExtendedAttrsTestJSON, cls).setUpClass()
148 cls.identity_client = cls._get_identity_admin_client()
149 cls.tenant = cls.identity_client.get_tenant_by_name(
150 CONF.identity.tenant_name)
151 cls.network = cls.create_network()
152 cls.host_id = socket.gethostname()
153
154 @test.attr(type='smoke')
155 def test_create_port_binding_ext_attr(self):
156 post_body = {"network_id": self.network['id'],
157 "binding:host_id": self.host_id}
158 resp, body = self.admin_client.create_port(**post_body)
159 self.assertEqual('201', resp['status'])
160 port = body['port']
161 self.addCleanup(self.admin_client.delete_port, port['id'])
162 host_id = port['binding:host_id']
163 self.assertIsNotNone(host_id)
164 self.assertEqual(self.host_id, host_id)
165
166 @test.attr(type='smoke')
167 def test_update_port_binding_ext_attr(self):
168 post_body = {"network_id": self.network['id']}
169 resp, body = self.admin_client.create_port(**post_body)
170 self.assertEqual('201', resp['status'])
171 port = body['port']
172 self.addCleanup(self.admin_client.delete_port, port['id'])
173 update_body = {"binding:host_id": self.host_id}
174 resp, body = self.admin_client.update_port(port['id'], **update_body)
175 self.assertEqual('200', resp['status'])
176 updated_port = body['port']
177 host_id = updated_port['binding:host_id']
178 self.assertIsNotNone(host_id)
179 self.assertEqual(self.host_id, host_id)
180
181 @test.attr(type='smoke')
182 def test_list_ports_binding_ext_attr(self):
183 resp, body = self.admin_client.list_ports(
184 **{'tenant_id': self.tenant['id']})
185 self.assertEqual('200', resp['status'])
186 ports_list = body['ports']
187 for port in ports_list:
188 vif_type = port['binding:vif_type']
189 self.assertIsNotNone(vif_type)
190 vif_details = port['binding:vif_details']['port_filter']
191 self.assertIsNotNone(vif_details)
192
193 @test.attr(type='smoke')
194 def test_show_port_binding_ext_attr(self):
195 resp, body = self.admin_client.create_port(
196 network_id=self.network['id'])
197 self.assertEqual('201', resp['status'])
198 port = body['port']
199 self.addCleanup(self.admin_client.delete_port, port['id'])
200 resp, body = self.admin_client.show_port(port['id'])
201 self.assertEqual('200', resp['status'])
202 show_port = body['port']
203 self.assertEqual(port['binding:host_id'],
204 show_port['binding:host_id'])
205 self.assertEqual(port['binding:vif_type'],
206 show_port['binding:vif_type'])
207 self.assertEqual(port['binding:vif_details'],
208 show_port['binding:vif_details'])
209
210
211class PortsAdminExtendedAttrsTestXML(PortsAdminExtendedAttrsTestJSON):
212 _interface = 'xml'
213
214
215class PortsIpV6TestJSON(PortsTestJSON):
216 _ip_version = 6
217 _tenant_network_cidr = CONF.network.tenant_network_v6_cidr
218 _tenant_network_mask_bits = CONF.network.tenant_network_v6_mask_bits
219
220 @classmethod
221 def setUpClass(cls):
222 super(PortsIpV6TestJSON, cls).setUpClass()
223 if not CONF.network_feature_enabled.ipv6:
224 cls.tearDownClass()
225 skip_msg = "IPv6 Tests are disabled."
226 raise cls.skipException(skip_msg)
227
228
229class PortsIpV6TestXML(PortsIpV6TestJSON):
230 _interface = 'xml'
231
232
233class PortsAdminExtendedAttrsIpV6TestJSON(PortsAdminExtendedAttrsTestJSON):
234 _ip_version = 6
235 _tenant_network_cidr = CONF.network.tenant_network_v6_cidr
236 _tenant_network_mask_bits = CONF.network.tenant_network_v6_mask_bits
237
238 @classmethod
239 def setUpClass(cls):
240 super(PortsAdminExtendedAttrsIpV6TestJSON, cls).setUpClass()
241 if not CONF.network_feature_enabled.ipv6:
242 cls.tearDownClass()
243 skip_msg = "IPv6 Tests are disabled."
244 raise cls.skipException(skip_msg)
245
246
247class PortsAdminExtendedAttrsIpV6TestXML(
248 PortsAdminExtendedAttrsIpV6TestJSON):
249 _interface = 'xml'