blob: df04d6562bf12488eb265be0438222b3a56e331d [file] [log] [blame]
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -05001# Copyright 2014 IBM Corp.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16import copy
17
18from tempest import auth
19from tempest.common import http
20from tempest import config
21from tempest import exceptions
22from tempest.openstack.common.fixture import mockpatch
23from tempest.tests import base
24from tempest.tests import fake_config
25from tempest.tests import fake_http
26from tempest.tests import fake_identity
27
28
29class BaseAuthTestsSetUp(base.TestCase):
30 _auth_provider_class = None
31 credentials = {
32 'username': 'fake_user',
33 'password': 'fake_pwd',
34 'tenant_name': 'fake_tenant'
35 }
36
37 def _auth(self, credentials, **params):
38 """
39 returns auth method according to keystone
40 """
41 return self._auth_provider_class(credentials, **params)
42
43 def setUp(self):
44 super(BaseAuthTestsSetUp, self).setUp()
45 self.stubs.Set(config, 'TempestConfigPrivate', fake_config.FakeConfig)
46 self.fake_http = fake_http.fake_httplib2(return_type=200)
47 self.stubs.Set(http.ClosingHttp, 'request', self.fake_http.request)
48 self.auth_provider = self._auth(self.credentials)
49
50
51class TestBaseAuthProvider(BaseAuthTestsSetUp):
52 """
53 This tests auth.AuthProvider class which is base for the other so we
54 obviously don't test not implemented method or the ones which strongly
55 depends on them.
56 """
57 _auth_provider_class = auth.AuthProvider
58
59 def test_check_credentials_is_dict(self):
60 self.assertTrue(self.auth_provider.check_credentials({}))
61
62 def test_check_credentials_bad_type(self):
63 self.assertFalse(self.auth_provider.check_credentials([]))
64
65 def test_instantiate_with_bad_credentials_type(self):
66 """
67 Assure that credentials with bad type fail with TypeError
68 """
69 self.assertRaises(TypeError, self._auth, [])
70
71 def test_auth_data_property(self):
72 self.assertRaises(NotImplementedError, getattr, self.auth_provider,
73 'auth_data')
74
75 def test_auth_data_property_when_cache_exists(self):
76 self.auth_provider.cache = 'foo'
77 self.useFixture(mockpatch.PatchObject(self.auth_provider,
78 'is_expired',
79 return_value=False))
80 self.assertEqual('foo', getattr(self.auth_provider, 'auth_data'))
81
82 def test_delete_auth_data_property_through_deleter(self):
83 self.auth_provider.cache = 'foo'
84 del self.auth_provider.auth_data
85 self.assertIsNone(self.auth_provider.cache)
86
87 def test_delete_auth_data_property_through_clear_auth(self):
88 self.auth_provider.cache = 'foo'
89 self.auth_provider.clear_auth()
90 self.assertIsNone(self.auth_provider.cache)
91
92 def test_set_and_reset_alt_auth_data(self):
93 self.auth_provider.set_alt_auth_data('foo', 'bar')
94 self.assertEqual(self.auth_provider.alt_part, 'foo')
95 self.assertEqual(self.auth_provider.alt_auth_data, 'bar')
96
97 self.auth_provider.reset_alt_auth_data()
98 self.assertIsNone(self.auth_provider.alt_part)
99 self.assertIsNone(self.auth_provider.alt_auth_data)
100
101
102class TestKeystoneV2AuthProvider(BaseAuthTestsSetUp):
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500103 _endpoints = fake_identity.IDENTITY_V2_RESPONSE['access']['serviceCatalog']
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500104 _auth_provider_class = auth.KeystoneV2AuthProvider
105
106 def setUp(self):
107 super(TestKeystoneV2AuthProvider, self).setUp()
108 self.stubs.Set(http.ClosingHttp, 'request',
109 fake_identity._fake_v2_response)
110 self.target_url = 'test_api'
111
112 def _get_fake_alt_identity(self):
113 return fake_identity.ALT_IDENTITY_V2_RESPONSE['access']
114
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500115 def _get_result_url_from_endpoint(self, ep, endpoint_type='publicURL',
116 replacement=None):
117 if replacement:
118 return ep[endpoint_type].replace('v2', replacement)
119 return ep[endpoint_type]
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500120
121 def _get_token_from_fake_identity(self):
122 return fake_identity.TOKEN
123
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500124 def _test_request_helper(self, filters, expected):
125 url, headers, body = self.auth_provider.auth_request('GET',
126 self.target_url,
127 filters=filters)
128
129 self.assertEqual(expected['url'], url)
130 self.assertEqual(expected['token'], headers['X-Auth-Token'])
131 self.assertEqual(expected['body'], body)
132
133 def test_request(self):
134 filters = {
135 'service': 'compute',
136 'endpoint_type': 'publicURL',
137 'region': 'FakeRegion'
138 }
139
140 url = self._get_result_url_from_endpoint(
141 self._endpoints[0]['endpoints'][1]) + '/' + self.target_url
142
143 expected = {
144 'body': None,
145 'url': url,
146 'token': self._get_token_from_fake_identity(),
147 }
148 self._test_request_helper(filters, expected)
149
150 def test_request_with_alt_auth_cleans_alt(self):
151 self.auth_provider.set_alt_auth_data(
152 'body',
153 (fake_identity.ALT_TOKEN, self._get_fake_alt_identity()))
154 self.test_request()
155 # Assert alt auth data is clear after it
156 self.assertIsNone(self.auth_provider.alt_part)
157 self.assertIsNone(self.auth_provider.alt_auth_data)
158
159 def test_request_with_alt_part_without_alt_data(self):
160 """
161 Assert that when alt_part is defined, the corresponding original
162 request element is kept the same.
163 """
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500164 filters = {
165 'service': 'compute',
166 'endpoint_type': 'publicURL',
167 'region': 'fakeRegion'
168 }
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500169 self.auth_provider.set_alt_auth_data('url', None)
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500170
171 url, headers, body = self.auth_provider.auth_request('GET',
172 self.target_url,
173 filters=filters)
174
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500175 self.assertEqual(url, self.target_url)
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500176 self.assertEqual(self._get_token_from_fake_identity(),
177 headers['X-Auth-Token'])
178 self.assertEqual(body, None)
179
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500180 def test_request_with_bad_service(self):
181 filters = {
182 'service': 'BAD_SERVICE',
183 'endpoint_type': 'publicURL',
184 'region': 'fakeRegion'
185 }
186 self.assertRaises(exceptions.EndpointNotFound,
187 self.auth_provider.auth_request, 'GET',
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500188 self.target_url, filters=filters)
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500189
190 def test_request_without_service(self):
191 filters = {
192 'service': None,
193 'endpoint_type': 'publicURL',
194 'region': 'fakeRegion'
195 }
196 self.assertRaises(exceptions.EndpointNotFound,
197 self.auth_provider.auth_request, 'GET',
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500198 self.target_url, filters=filters)
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500199
200 def test_check_credentials_missing_attribute(self):
201 for attr in ['username', 'password']:
202 cred = copy.copy(self.credentials)
203 del cred[attr]
204 self.assertFalse(self.auth_provider.check_credentials(cred))
205
206 def test_check_credentials_not_scoped_missing_tenant_name(self):
207 cred = copy.copy(self.credentials)
208 del cred['tenant_name']
209 self.assertTrue(self.auth_provider.check_credentials(cred,
210 scoped=False))
211
212 def test_check_credentials_missing_tenant_name(self):
213 cred = copy.copy(self.credentials)
214 del cred['tenant_name']
215 self.assertFalse(self.auth_provider.check_credentials(cred))
216
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500217 def _test_base_url_helper(self, expected_url, filters,
218 auth_data=None):
219
220 url = self.auth_provider.base_url(filters, auth_data)
221 self.assertEqual(url, expected_url)
222
223 def test_base_url(self):
224 self.filters = {
225 'service': 'compute',
226 'endpoint_type': 'publicURL',
227 'region': 'FakeRegion'
228 }
229 expected = self._get_result_url_from_endpoint(
230 self._endpoints[0]['endpoints'][1])
231 self._test_base_url_helper(expected, self.filters)
232
233 def test_base_url_to_get_admin_endpoint(self):
234 self.filters = {
235 'service': 'compute',
236 'endpoint_type': 'adminURL',
237 'region': 'FakeRegion'
238 }
239 expected = self._get_result_url_from_endpoint(
240 self._endpoints[0]['endpoints'][1], endpoint_type='adminURL')
241 self._test_base_url_helper(expected, self.filters)
242
243 def test_base_url_unknown_region(self):
244 """
245 Assure that if the region is unknow the first endpoint is returned.
246 """
247 self.filters = {
248 'service': 'compute',
249 'endpoint_type': 'publicURL',
250 'region': 'AintNoBodyKnowThisRegion'
251 }
252 expected = self._get_result_url_from_endpoint(
253 self._endpoints[0]['endpoints'][0])
254 self._test_base_url_helper(expected, self.filters)
255
256 def test_base_url_with_non_existent_service(self):
257 self.filters = {
258 'service': 'BAD_SERVICE',
259 'endpoint_type': 'publicURL',
260 'region': 'FakeRegion'
261 }
262 self.assertRaises(exceptions.EndpointNotFound,
263 self._test_base_url_helper, None, self.filters)
264
265 def test_base_url_without_service(self):
266 self.filters = {
267 'endpoint_type': 'publicURL',
268 'region': 'FakeRegion'
269 }
270 self.assertRaises(exceptions.EndpointNotFound,
271 self._test_base_url_helper, None, self.filters)
272
273 def test_base_url_with_api_version_filter(self):
274 self.filters = {
275 'service': 'compute',
276 'endpoint_type': 'publicURL',
277 'region': 'FakeRegion',
278 'api_version': 'v12'
279 }
280 expected = self._get_result_url_from_endpoint(
281 self._endpoints[0]['endpoints'][1], replacement='v12')
282 self._test_base_url_helper(expected, self.filters)
283
284 def test_base_url_with_skip_path_filter(self):
285 self.filters = {
286 'service': 'compute',
287 'endpoint_type': 'publicURL',
288 'region': 'FakeRegion',
289 'skip_path': True
290 }
291 expected = 'http://fake_url/'
292 self._test_base_url_helper(expected, self.filters)
293
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500294
295class TestKeystoneV3AuthProvider(TestKeystoneV2AuthProvider):
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500296 _endpoints = fake_identity.IDENTITY_V3_RESPONSE['token']['catalog']
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500297 _auth_provider_class = auth.KeystoneV3AuthProvider
298 credentials = {
299 'username': 'fake_user',
300 'password': 'fake_pwd',
301 'tenant_name': 'fake_tenant',
302 'domain_name': 'fake_domain_name',
303 }
304
305 def setUp(self):
306 super(TestKeystoneV3AuthProvider, self).setUp()
307 self.stubs.Set(http.ClosingHttp, 'request',
308 fake_identity._fake_v3_response)
309
310 def _get_fake_alt_identity(self):
311 return fake_identity.ALT_IDENTITY_V3['token']
312
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500313 def _get_result_url_from_endpoint(self, ep, replacement=None):
314 if replacement:
315 return ep['url'].replace('v3', replacement)
316 return ep['url']
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500317
318 def test_check_credentials_missing_tenant_name(self):
319 cred = copy.copy(self.credentials)
320 del cred['domain_name']
321 self.assertFalse(self.auth_provider.check_credentials(cred))
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500322
323 # Overwrites v2 test
324 def test_base_url_to_get_admin_endpoint(self):
325 self.filters = {
326 'service': 'compute',
327 'endpoint_type': 'admin',
328 'region': 'MiddleEarthRegion'
329 }
330 expected = self._get_result_url_from_endpoint(
331 self._endpoints[0]['endpoints'][2])
332 self._test_base_url_helper(expected, self.filters)