blob: 578bb480262a51f00e656d55be747d5944ac5145 [file] [log] [blame]
Mh Raies96594fc2014-03-26 16:34:18 +05301# Copyright 2014 NEC Corporation. All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15from tempest.api.network import base
16from tempest.common.utils import data_utils
Adam Gandelman77876cb2014-04-06 15:08:28 -070017from tempest import config
Mh Raies96594fc2014-03-26 16:34:18 +053018from tempest import exceptions
19from tempest import test
20
Adam Gandelman77876cb2014-04-06 15:08:28 -070021CONF = config.CONF
22
Mh Raies96594fc2014-03-26 16:34:18 +053023
24class FWaaSExtensionTestJSON(base.BaseNetworkTest):
25 _interface = 'json'
26
27 """
28 Tests the following operations in the Neutron API using the REST client for
29 Neutron:
30
31 List firewall rules
32 Create firewall rule
33 Update firewall rule
34 Delete firewall rule
35 Show firewall rule
36 List firewall policies
37 Create firewall policy
38 Update firewall policy
39 Delete firewall policy
40 Show firewall policy
41 List firewall
42 Create firewall
43 Update firewall
44 Delete firewall
45 Show firewall
46 """
47
48 @classmethod
49 def setUpClass(cls):
50 super(FWaaSExtensionTestJSON, cls).setUpClass()
51 if not test.is_extension_enabled('fwaas', 'network'):
52 msg = "FWaaS Extension not enabled."
53 raise cls.skipException(msg)
54 cls.fw_rule = cls.create_firewall_rule("allow", "tcp")
55 cls.fw_policy = cls.create_firewall_policy()
56
57 def _try_delete_policy(self, policy_id):
58 # delete policy, if it exists
59 try:
60 self.client.delete_firewall_policy(policy_id)
61 # if policy is not found, this means it was deleted in the test
62 except exceptions.NotFound:
63 pass
64
65 def _try_delete_firewall(self, fw_id):
66 # delete firewall, if it exists
67 try:
68 self.client.delete_firewall(fw_id)
69 # if firewall is not found, this means it was deleted in the test
70 except exceptions.NotFound:
71 pass
72
Adam Gandelman77876cb2014-04-06 15:08:28 -070073 self.client.wait_for_resource_deletion('firewall', fw_id)
74
armando-migliaccioc9e9bf62014-08-22 13:57:23 -070075 def _wait_until_ready(self, fw_id):
76 target_states = ('ACTIVE', 'CREATED')
77
Adam Gandelman77876cb2014-04-06 15:08:28 -070078 def _wait():
79 resp, firewall = self.client.show_firewall(fw_id)
80 self.assertEqual('200', resp['status'])
81 firewall = firewall['firewall']
armando-migliaccioc9e9bf62014-08-22 13:57:23 -070082 return firewall['status'] in target_states
Adam Gandelman77876cb2014-04-06 15:08:28 -070083
84 if not test.call_until_true(_wait, CONF.network.build_timeout,
85 CONF.network.build_interval):
armando-migliaccioc9e9bf62014-08-22 13:57:23 -070086 m = ("Timed out waiting for firewall %s to reach %s state(s)" %
87 (fw_id, target_states))
Adam Gandelman77876cb2014-04-06 15:08:28 -070088 raise exceptions.TimeoutException(m)
89
Mh Raies96594fc2014-03-26 16:34:18 +053090 @test.attr(type='smoke')
91 def test_list_firewall_rules(self):
92 # List firewall rules
93 resp, fw_rules = self.client.list_firewall_rules()
94 self.assertEqual('200', resp['status'])
95 fw_rules = fw_rules['firewall_rules']
96 self.assertIn((self.fw_rule['id'],
97 self.fw_rule['name'],
98 self.fw_rule['action'],
99 self.fw_rule['protocol'],
100 self.fw_rule['ip_version'],
101 self.fw_rule['enabled']),
102 [(m['id'],
103 m['name'],
104 m['action'],
105 m['protocol'],
106 m['ip_version'],
107 m['enabled']) for m in fw_rules])
108
109 @test.attr(type='smoke')
110 def test_create_update_delete_firewall_rule(self):
111 # Create firewall rule
112 resp, body = self.client.create_firewall_rule(
113 name=data_utils.rand_name("fw-rule"),
114 action="allow",
115 protocol="tcp")
116 self.assertEqual('201', resp['status'])
117 fw_rule_id = body['firewall_rule']['id']
118
119 # Update firewall rule
120 resp, body = self.client.update_firewall_rule(fw_rule_id,
121 shared=True)
122 self.assertEqual('200', resp['status'])
123 self.assertTrue(body["firewall_rule"]['shared'])
124
125 # Delete firewall rule
126 resp, _ = self.client.delete_firewall_rule(fw_rule_id)
127 self.assertEqual('204', resp['status'])
128 # Confirm deletion
129 resp, fw_rules = self.client.list_firewall_rules()
130 self.assertNotIn(fw_rule_id,
131 [m['id'] for m in fw_rules['firewall_rules']])
132
133 @test.attr(type='smoke')
134 def test_show_firewall_rule(self):
135 # show a created firewall rule
136 resp, fw_rule = self.client.show_firewall_rule(self.fw_rule['id'])
137 self.assertEqual('200', resp['status'])
138 for key, value in fw_rule['firewall_rule'].iteritems():
139 self.assertEqual(self.fw_rule[key], value)
140
141 @test.attr(type='smoke')
142 def test_list_firewall_policies(self):
143 resp, fw_policies = self.client.list_firewall_policies()
144 self.assertEqual('200', resp['status'])
145 fw_policies = fw_policies['firewall_policies']
146 self.assertIn((self.fw_policy['id'],
147 self.fw_policy['name'],
148 self.fw_policy['firewall_rules']),
149 [(m['id'],
150 m['name'],
151 m['firewall_rules']) for m in fw_policies])
152
153 @test.attr(type='smoke')
154 def test_create_update_delete_firewall_policy(self):
155 # Create firewall policy
156 resp, body = self.client.create_firewall_policy(
157 name=data_utils.rand_name("fw-policy"))
158 self.assertEqual('201', resp['status'])
159 fw_policy_id = body['firewall_policy']['id']
160 self.addCleanup(self._try_delete_policy, fw_policy_id)
161
162 # Update firewall policy
163 resp, body = self.client.update_firewall_policy(fw_policy_id,
164 shared=True,
165 name="updated_policy")
166 self.assertEqual('200', resp['status'])
167 updated_fw_policy = body["firewall_policy"]
168 self.assertTrue(updated_fw_policy['shared'])
169 self.assertEqual("updated_policy", updated_fw_policy['name'])
170
171 # Delete firewall policy
172 resp, _ = self.client.delete_firewall_policy(fw_policy_id)
173 self.assertEqual('204', resp['status'])
174 # Confirm deletion
175 resp, fw_policies = self.client.list_firewall_policies()
176 fw_policies = fw_policies['firewall_policies']
177 self.assertNotIn(fw_policy_id, [m['id'] for m in fw_policies])
178
179 @test.attr(type='smoke')
180 def test_show_firewall_policy(self):
181 # show a created firewall policy
182 resp, fw_policy = self.client.show_firewall_policy(
183 self.fw_policy['id'])
184 self.assertEqual('200', resp['status'])
185 fw_policy = fw_policy['firewall_policy']
186 for key, value in fw_policy.iteritems():
187 self.assertEqual(self.fw_policy[key], value)
188
189 @test.attr(type='smoke')
190 def test_create_show_delete_firewall(self):
Adam Gandelman77876cb2014-04-06 15:08:28 -0700191 # Create tenant network resources required for an ACTIVE firewall
192 network = self.create_network()
193 subnet = self.create_subnet(network)
194 router = self.create_router(
195 data_utils.rand_name('router-'),
196 admin_state_up=True)
197 self.client.add_router_interface_with_subnet_id(
198 router['id'], subnet['id'])
199
Mh Raies96594fc2014-03-26 16:34:18 +0530200 # Create firewall
201 resp, body = self.client.create_firewall(
202 name=data_utils.rand_name("firewall"),
203 firewall_policy_id=self.fw_policy['id'])
204 self.assertEqual('201', resp['status'])
205 created_firewall = body['firewall']
206 firewall_id = created_firewall['id']
207 self.addCleanup(self._try_delete_firewall, firewall_id)
208
armando-migliaccioc9e9bf62014-08-22 13:57:23 -0700209 # Wait for the firewall resource to become ready
210 self._wait_until_ready(firewall_id)
Adam Gandelman77876cb2014-04-06 15:08:28 -0700211
Mh Raies96594fc2014-03-26 16:34:18 +0530212 # show a created firewall
213 resp, firewall = self.client.show_firewall(firewall_id)
214 self.assertEqual('200', resp['status'])
215 firewall = firewall['firewall']
Adam Gandelman77876cb2014-04-06 15:08:28 -0700216
Mh Raies96594fc2014-03-26 16:34:18 +0530217 for key, value in firewall.iteritems():
Adam Gandelman77876cb2014-04-06 15:08:28 -0700218 if key == 'status':
219 continue
Mh Raies96594fc2014-03-26 16:34:18 +0530220 self.assertEqual(created_firewall[key], value)
221
222 # list firewall
223 resp, firewalls = self.client.list_firewalls()
224 self.assertEqual('200', resp['status'])
225 firewalls = firewalls['firewalls']
226 self.assertIn((created_firewall['id'],
227 created_firewall['name'],
228 created_firewall['firewall_policy_id']),
229 [(m['id'],
230 m['name'],
231 m['firewall_policy_id']) for m in firewalls])
232
233 # Delete firewall
234 resp, _ = self.client.delete_firewall(firewall_id)
235 self.assertEqual('204', resp['status'])
Mh Raies96594fc2014-03-26 16:34:18 +0530236
237
238class FWaaSExtensionTestXML(FWaaSExtensionTestJSON):
239 _interface = 'xml'