blob: 7b5b4d65f41f5048d9302dff60d717fc04e00f50 [file] [log] [blame]
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -05001# Copyright 2014 IBM Corp.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16import copy
Andrea Frittolidbd02512014-03-21 10:06:19 +000017import datetime
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -050018
19from tempest import auth
20from tempest.common import http
21from tempest import config
22from tempest import exceptions
23from tempest.openstack.common.fixture import mockpatch
24from tempest.tests import base
Andrea Frittoli7d707a52014-04-06 11:46:32 +010025from tempest.tests import fake_auth_provider
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -050026from tempest.tests import fake_config
Andrea Frittoli7d707a52014-04-06 11:46:32 +010027from tempest.tests import fake_credentials
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -050028from tempest.tests import fake_http
29from tempest.tests import fake_identity
30
31
32class BaseAuthTestsSetUp(base.TestCase):
33 _auth_provider_class = None
Andrea Frittoli7d707a52014-04-06 11:46:32 +010034 credentials = fake_credentials.FakeCredentials()
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -050035
36 def _auth(self, credentials, **params):
37 """
38 returns auth method according to keystone
39 """
40 return self._auth_provider_class(credentials, **params)
41
42 def setUp(self):
43 super(BaseAuthTestsSetUp, self).setUp()
Matthew Treinishff598482014-02-28 16:13:58 -050044 self.useFixture(fake_config.ConfigFixture())
45 self.stubs.Set(config, 'TempestConfigPrivate', fake_config.FakePrivate)
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -050046 self.fake_http = fake_http.fake_httplib2(return_type=200)
47 self.stubs.Set(http.ClosingHttp, 'request', self.fake_http.request)
Andrea Frittoli7d707a52014-04-06 11:46:32 +010048 self.stubs.Set(auth, 'get_credentials',
49 fake_auth_provider.get_credentials)
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -050050 self.auth_provider = self._auth(self.credentials)
51
52
53class TestBaseAuthProvider(BaseAuthTestsSetUp):
54 """
55 This tests auth.AuthProvider class which is base for the other so we
56 obviously don't test not implemented method or the ones which strongly
57 depends on them.
58 """
59 _auth_provider_class = auth.AuthProvider
60
Andrea Frittoli7d707a52014-04-06 11:46:32 +010061 def test_check_credentials_class(self):
62 self.assertRaises(NotImplementedError,
63 self.auth_provider.check_credentials,
64 auth.Credentials())
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -050065
66 def test_check_credentials_bad_type(self):
67 self.assertFalse(self.auth_provider.check_credentials([]))
68
Andrea Frittoli7d707a52014-04-06 11:46:32 +010069 def test_instantiate_with_dict(self):
70 # Dict credentials are only supported for backward compatibility
71 auth_provider = self._auth(credentials={})
72 self.assertIsInstance(auth_provider.credentials, auth.Credentials)
73
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -050074 def test_instantiate_with_bad_credentials_type(self):
75 """
76 Assure that credentials with bad type fail with TypeError
77 """
78 self.assertRaises(TypeError, self._auth, [])
79
80 def test_auth_data_property(self):
81 self.assertRaises(NotImplementedError, getattr, self.auth_provider,
82 'auth_data')
83
84 def test_auth_data_property_when_cache_exists(self):
85 self.auth_provider.cache = 'foo'
86 self.useFixture(mockpatch.PatchObject(self.auth_provider,
87 'is_expired',
88 return_value=False))
89 self.assertEqual('foo', getattr(self.auth_provider, 'auth_data'))
90
91 def test_delete_auth_data_property_through_deleter(self):
92 self.auth_provider.cache = 'foo'
93 del self.auth_provider.auth_data
94 self.assertIsNone(self.auth_provider.cache)
95
96 def test_delete_auth_data_property_through_clear_auth(self):
97 self.auth_provider.cache = 'foo'
98 self.auth_provider.clear_auth()
99 self.assertIsNone(self.auth_provider.cache)
100
101 def test_set_and_reset_alt_auth_data(self):
102 self.auth_provider.set_alt_auth_data('foo', 'bar')
103 self.assertEqual(self.auth_provider.alt_part, 'foo')
104 self.assertEqual(self.auth_provider.alt_auth_data, 'bar')
105
106 self.auth_provider.reset_alt_auth_data()
107 self.assertIsNone(self.auth_provider.alt_part)
108 self.assertIsNone(self.auth_provider.alt_auth_data)
109
110
111class TestKeystoneV2AuthProvider(BaseAuthTestsSetUp):
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500112 _endpoints = fake_identity.IDENTITY_V2_RESPONSE['access']['serviceCatalog']
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500113 _auth_provider_class = auth.KeystoneV2AuthProvider
Andrea Frittoli7d707a52014-04-06 11:46:32 +0100114 credentials = fake_credentials.FakeKeystoneV2Credentials()
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500115
116 def setUp(self):
117 super(TestKeystoneV2AuthProvider, self).setUp()
118 self.stubs.Set(http.ClosingHttp, 'request',
119 fake_identity._fake_v2_response)
120 self.target_url = 'test_api'
121
122 def _get_fake_alt_identity(self):
123 return fake_identity.ALT_IDENTITY_V2_RESPONSE['access']
124
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500125 def _get_result_url_from_endpoint(self, ep, endpoint_type='publicURL',
126 replacement=None):
127 if replacement:
128 return ep[endpoint_type].replace('v2', replacement)
129 return ep[endpoint_type]
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500130
131 def _get_token_from_fake_identity(self):
132 return fake_identity.TOKEN
133
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500134 def _test_request_helper(self, filters, expected):
135 url, headers, body = self.auth_provider.auth_request('GET',
136 self.target_url,
137 filters=filters)
138
139 self.assertEqual(expected['url'], url)
140 self.assertEqual(expected['token'], headers['X-Auth-Token'])
141 self.assertEqual(expected['body'], body)
142
Andrea Frittolidbd02512014-03-21 10:06:19 +0000143 def _auth_data_with_expiry(self, date_as_string):
144 token, access = self.auth_provider.auth_data
145 access['token']['expires'] = date_as_string
146 return token, access
147
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500148 def test_request(self):
149 filters = {
150 'service': 'compute',
151 'endpoint_type': 'publicURL',
152 'region': 'FakeRegion'
153 }
154
155 url = self._get_result_url_from_endpoint(
156 self._endpoints[0]['endpoints'][1]) + '/' + self.target_url
157
158 expected = {
159 'body': None,
160 'url': url,
161 'token': self._get_token_from_fake_identity(),
162 }
163 self._test_request_helper(filters, expected)
164
165 def test_request_with_alt_auth_cleans_alt(self):
166 self.auth_provider.set_alt_auth_data(
167 'body',
168 (fake_identity.ALT_TOKEN, self._get_fake_alt_identity()))
169 self.test_request()
170 # Assert alt auth data is clear after it
171 self.assertIsNone(self.auth_provider.alt_part)
172 self.assertIsNone(self.auth_provider.alt_auth_data)
173
174 def test_request_with_alt_part_without_alt_data(self):
175 """
176 Assert that when alt_part is defined, the corresponding original
177 request element is kept the same.
178 """
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500179 filters = {
180 'service': 'compute',
181 'endpoint_type': 'publicURL',
182 'region': 'fakeRegion'
183 }
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500184 self.auth_provider.set_alt_auth_data('url', None)
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500185
186 url, headers, body = self.auth_provider.auth_request('GET',
187 self.target_url,
188 filters=filters)
189
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500190 self.assertEqual(url, self.target_url)
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500191 self.assertEqual(self._get_token_from_fake_identity(),
192 headers['X-Auth-Token'])
193 self.assertEqual(body, None)
194
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500195 def test_request_with_bad_service(self):
196 filters = {
197 'service': 'BAD_SERVICE',
198 'endpoint_type': 'publicURL',
199 'region': 'fakeRegion'
200 }
201 self.assertRaises(exceptions.EndpointNotFound,
202 self.auth_provider.auth_request, 'GET',
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500203 self.target_url, filters=filters)
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500204
205 def test_request_without_service(self):
206 filters = {
207 'service': None,
208 'endpoint_type': 'publicURL',
209 'region': 'fakeRegion'
210 }
211 self.assertRaises(exceptions.EndpointNotFound,
212 self.auth_provider.auth_request, 'GET',
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500213 self.target_url, filters=filters)
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500214
215 def test_check_credentials_missing_attribute(self):
216 for attr in ['username', 'password']:
217 cred = copy.copy(self.credentials)
218 del cred[attr]
219 self.assertFalse(self.auth_provider.check_credentials(cred))
220
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500221 def _test_base_url_helper(self, expected_url, filters,
222 auth_data=None):
223
224 url = self.auth_provider.base_url(filters, auth_data)
225 self.assertEqual(url, expected_url)
226
227 def test_base_url(self):
228 self.filters = {
229 'service': 'compute',
230 'endpoint_type': 'publicURL',
231 'region': 'FakeRegion'
232 }
233 expected = self._get_result_url_from_endpoint(
234 self._endpoints[0]['endpoints'][1])
235 self._test_base_url_helper(expected, self.filters)
236
237 def test_base_url_to_get_admin_endpoint(self):
238 self.filters = {
239 'service': 'compute',
240 'endpoint_type': 'adminURL',
241 'region': 'FakeRegion'
242 }
243 expected = self._get_result_url_from_endpoint(
244 self._endpoints[0]['endpoints'][1], endpoint_type='adminURL')
245 self._test_base_url_helper(expected, self.filters)
246
247 def test_base_url_unknown_region(self):
248 """
249 Assure that if the region is unknow the first endpoint is returned.
250 """
251 self.filters = {
252 'service': 'compute',
253 'endpoint_type': 'publicURL',
254 'region': 'AintNoBodyKnowThisRegion'
255 }
256 expected = self._get_result_url_from_endpoint(
257 self._endpoints[0]['endpoints'][0])
258 self._test_base_url_helper(expected, self.filters)
259
260 def test_base_url_with_non_existent_service(self):
261 self.filters = {
262 'service': 'BAD_SERVICE',
263 'endpoint_type': 'publicURL',
264 'region': 'FakeRegion'
265 }
266 self.assertRaises(exceptions.EndpointNotFound,
267 self._test_base_url_helper, None, self.filters)
268
269 def test_base_url_without_service(self):
270 self.filters = {
271 'endpoint_type': 'publicURL',
272 'region': 'FakeRegion'
273 }
274 self.assertRaises(exceptions.EndpointNotFound,
275 self._test_base_url_helper, None, self.filters)
276
277 def test_base_url_with_api_version_filter(self):
278 self.filters = {
279 'service': 'compute',
280 'endpoint_type': 'publicURL',
281 'region': 'FakeRegion',
282 'api_version': 'v12'
283 }
284 expected = self._get_result_url_from_endpoint(
285 self._endpoints[0]['endpoints'][1], replacement='v12')
286 self._test_base_url_helper(expected, self.filters)
287
288 def test_base_url_with_skip_path_filter(self):
289 self.filters = {
290 'service': 'compute',
291 'endpoint_type': 'publicURL',
292 'region': 'FakeRegion',
293 'skip_path': True
294 }
295 expected = 'http://fake_url/'
296 self._test_base_url_helper(expected, self.filters)
297
Andrea Frittolidbd02512014-03-21 10:06:19 +0000298 def test_token_not_expired(self):
299 expiry_data = datetime.datetime.utcnow() + datetime.timedelta(days=1)
300 auth_data = self._auth_data_with_expiry(
301 expiry_data.strftime(self.auth_provider.EXPIRY_DATE_FORMAT))
302 self.assertFalse(self.auth_provider.is_expired(auth_data))
303
304 def test_token_expired(self):
305 expiry_data = datetime.datetime.utcnow() - datetime.timedelta(hours=1)
306 auth_data = self._auth_data_with_expiry(
307 expiry_data.strftime(self.auth_provider.EXPIRY_DATE_FORMAT))
308 self.assertTrue(self.auth_provider.is_expired(auth_data))
309
310 def test_token_not_expired_to_be_renewed(self):
311 expiry_data = datetime.datetime.utcnow() + \
312 self.auth_provider.token_expiry_threshold / 2
313 auth_data = self._auth_data_with_expiry(
314 expiry_data.strftime(self.auth_provider.EXPIRY_DATE_FORMAT))
315 self.assertTrue(self.auth_provider.is_expired(auth_data))
316
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500317
318class TestKeystoneV3AuthProvider(TestKeystoneV2AuthProvider):
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500319 _endpoints = fake_identity.IDENTITY_V3_RESPONSE['token']['catalog']
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500320 _auth_provider_class = auth.KeystoneV3AuthProvider
321 credentials = {
322 'username': 'fake_user',
323 'password': 'fake_pwd',
324 'tenant_name': 'fake_tenant',
325 'domain_name': 'fake_domain_name',
326 }
327
328 def setUp(self):
329 super(TestKeystoneV3AuthProvider, self).setUp()
330 self.stubs.Set(http.ClosingHttp, 'request',
331 fake_identity._fake_v3_response)
332
333 def _get_fake_alt_identity(self):
334 return fake_identity.ALT_IDENTITY_V3['token']
335
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500336 def _get_result_url_from_endpoint(self, ep, replacement=None):
337 if replacement:
338 return ep['url'].replace('v3', replacement)
339 return ep['url']
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500340
Andrea Frittolidbd02512014-03-21 10:06:19 +0000341 def _auth_data_with_expiry(self, date_as_string):
342 token, access = self.auth_provider.auth_data
343 access['expires_at'] = date_as_string
344 return token, access
345
Mauro S. M. Rodriguesc3e573c2014-02-19 07:59:29 -0500346 def test_check_credentials_missing_tenant_name(self):
347 cred = copy.copy(self.credentials)
348 del cred['domain_name']
349 self.assertFalse(self.auth_provider.check_credentials(cred))
Mauro S. M. Rodrigues4e23c452014-02-24 15:21:58 -0500350
351 # Overwrites v2 test
352 def test_base_url_to_get_admin_endpoint(self):
353 self.filters = {
354 'service': 'compute',
355 'endpoint_type': 'admin',
356 'region': 'MiddleEarthRegion'
357 }
358 expected = self._get_result_url_from_endpoint(
359 self._endpoints[0]['endpoints'][2])
360 self._test_base_url_helper(expected, self.filters)