blob: 6da7e414a37099e9ad7ffa9b34aa769c22cbb117 [file] [log] [blame]
Matthew Treinish9e26ca82016-02-23 11:43:20 -05001# Copyright 2014 IBM Corp.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16import copy
17import datetime
Andrea Frittoli (andreaf)3e82af72016-05-05 22:53:38 +010018import testtools
Matthew Treinish9e26ca82016-02-23 11:43:20 -050019
20from oslotest import mockpatch
21
22from tempest.lib import auth
23from tempest.lib import exceptions
24from tempest.lib.services.identity.v2 import token_client as v2_client
25from tempest.lib.services.identity.v3 import token_client as v3_client
Matthew Treinishffad78a2016-04-16 14:39:52 -040026from tempest.tests import base
Matthew Treinish9e26ca82016-02-23 11:43:20 -050027from tempest.tests.lib import fake_credentials
Matthew Treinish9e26ca82016-02-23 11:43:20 -050028from tempest.tests.lib import fake_identity
29
30
31def fake_get_credentials(fill_in=True, identity_version='v2', **kwargs):
32 return fake_credentials.FakeCredentials()
33
34
35class BaseAuthTestsSetUp(base.TestCase):
36 _auth_provider_class = None
37 credentials = fake_credentials.FakeCredentials()
38
39 def _auth(self, credentials, auth_url, **params):
40 """returns auth method according to keystone"""
41 return self._auth_provider_class(credentials, auth_url, **params)
42
43 def setUp(self):
44 super(BaseAuthTestsSetUp, self).setUp()
Jordan Pittier0021c292016-03-29 21:33:34 +020045 self.patchobject(auth, 'get_credentials', fake_get_credentials)
Matthew Treinish9e26ca82016-02-23 11:43:20 -050046 self.auth_provider = self._auth(self.credentials,
47 fake_identity.FAKE_AUTH_URL)
48
49
50class TestBaseAuthProvider(BaseAuthTestsSetUp):
51 """Tests for base AuthProvider
52
53 This tests auth.AuthProvider class which is base for the other so we
54 obviously don't test not implemented method or the ones which strongly
55 depends on them.
56 """
57
58 class FakeAuthProviderImpl(auth.AuthProvider):
59 def _decorate_request(self):
60 pass
61
62 def _fill_credentials(self):
63 pass
64
65 def _get_auth(self):
66 pass
67
68 def base_url(self):
69 pass
70
71 def is_expired(self):
72 pass
73
74 _auth_provider_class = FakeAuthProviderImpl
75
76 def _auth(self, credentials, auth_url, **params):
77 """returns auth method according to keystone"""
78 return self._auth_provider_class(credentials, **params)
79
80 def test_check_credentials_bad_type(self):
81 self.assertFalse(self.auth_provider.check_credentials([]))
82
83 def test_auth_data_property_when_cache_exists(self):
84 self.auth_provider.cache = 'foo'
85 self.useFixture(mockpatch.PatchObject(self.auth_provider,
86 'is_expired',
87 return_value=False))
88 self.assertEqual('foo', getattr(self.auth_provider, 'auth_data'))
89
90 def test_delete_auth_data_property_through_deleter(self):
91 self.auth_provider.cache = 'foo'
92 del self.auth_provider.auth_data
93 self.assertIsNone(self.auth_provider.cache)
94
95 def test_delete_auth_data_property_through_clear_auth(self):
96 self.auth_provider.cache = 'foo'
97 self.auth_provider.clear_auth()
98 self.assertIsNone(self.auth_provider.cache)
99
100 def test_set_and_reset_alt_auth_data(self):
101 self.auth_provider.set_alt_auth_data('foo', 'bar')
102 self.assertEqual(self.auth_provider.alt_part, 'foo')
103 self.assertEqual(self.auth_provider.alt_auth_data, 'bar')
104
105 self.auth_provider.reset_alt_auth_data()
106 self.assertIsNone(self.auth_provider.alt_part)
107 self.assertIsNone(self.auth_provider.alt_auth_data)
108
109 def test_auth_class(self):
110 self.assertRaises(TypeError,
111 auth.AuthProvider,
112 fake_credentials.FakeCredentials)
113
114
115class TestKeystoneV2AuthProvider(BaseAuthTestsSetUp):
116 _endpoints = fake_identity.IDENTITY_V2_RESPONSE['access']['serviceCatalog']
117 _auth_provider_class = auth.KeystoneV2AuthProvider
118 credentials = fake_credentials.FakeKeystoneV2Credentials()
119
120 def setUp(self):
121 super(TestKeystoneV2AuthProvider, self).setUp()
Jordan Pittier0021c292016-03-29 21:33:34 +0200122 self.patchobject(v2_client.TokenClient, 'raw_request',
123 fake_identity._fake_v2_response)
Matthew Treinish9e26ca82016-02-23 11:43:20 -0500124 self.target_url = 'test_api'
125
126 def _get_fake_identity(self):
127 return fake_identity.IDENTITY_V2_RESPONSE['access']
128
129 def _get_fake_alt_identity(self):
130 return fake_identity.ALT_IDENTITY_V2_RESPONSE['access']
131
132 def _get_result_url_from_endpoint(self, ep, endpoint_type='publicURL',
133 replacement=None):
134 if replacement:
135 return ep[endpoint_type].replace('v2', replacement)
136 return ep[endpoint_type]
137
138 def _get_token_from_fake_identity(self):
139 return fake_identity.TOKEN
140
141 def _get_from_fake_identity(self, attr):
142 access = fake_identity.IDENTITY_V2_RESPONSE['access']
143 if attr == 'user_id':
144 return access['user']['id']
145 elif attr == 'tenant_id':
146 return access['token']['tenant']['id']
147
148 def _test_request_helper(self, filters, expected):
149 url, headers, body = self.auth_provider.auth_request('GET',
150 self.target_url,
151 filters=filters)
152
153 self.assertEqual(expected['url'], url)
154 self.assertEqual(expected['token'], headers['X-Auth-Token'])
155 self.assertEqual(expected['body'], body)
156
157 def _auth_data_with_expiry(self, date_as_string):
158 token, access = self.auth_provider.auth_data
159 access['token']['expires'] = date_as_string
160 return token, access
161
162 def test_request(self):
163 filters = {
164 'service': 'compute',
165 'endpoint_type': 'publicURL',
166 'region': 'FakeRegion'
167 }
168
169 url = self._get_result_url_from_endpoint(
170 self._endpoints[0]['endpoints'][1]) + '/' + self.target_url
171
172 expected = {
173 'body': None,
174 'url': url,
175 'token': self._get_token_from_fake_identity(),
176 }
177 self._test_request_helper(filters, expected)
178
179 def test_request_with_alt_auth_cleans_alt(self):
180 """Test alternate auth data for headers
181
182 Assert that when the alt data is provided for headers, after an
183 auth_request the data alt_data is cleaned-up.
184 """
185 self.auth_provider.set_alt_auth_data(
186 'headers',
187 (fake_identity.ALT_TOKEN, self._get_fake_alt_identity()))
188 filters = {
189 'service': 'compute',
190 'endpoint_type': 'publicURL',
191 'region': 'fakeRegion'
192 }
193 self.auth_provider.auth_request('GET', self.target_url,
194 filters=filters)
195
196 # Assert alt auth data is clear after it
197 self.assertIsNone(self.auth_provider.alt_part)
198 self.assertIsNone(self.auth_provider.alt_auth_data)
199
200 def _test_request_with_identical_alt_auth(self, part):
201 """Test alternate but identical auth data for headers
202
203 Assert that when the alt data is provided, but it's actually
204 identical, an exception is raised.
205 """
206 self.auth_provider.set_alt_auth_data(
207 part,
208 (fake_identity.TOKEN, self._get_fake_identity()))
209 filters = {
210 'service': 'compute',
211 'endpoint_type': 'publicURL',
212 'region': 'fakeRegion'
213 }
214
215 self.assertRaises(exceptions.BadAltAuth,
216 self.auth_provider.auth_request,
217 'GET', self.target_url, filters=filters)
218
219 def test_request_with_identical_alt_auth_headers(self):
220 self._test_request_with_identical_alt_auth('headers')
221
222 def test_request_with_identical_alt_auth_url(self):
223 self._test_request_with_identical_alt_auth('url')
224
225 def test_request_with_identical_alt_auth_body(self):
226 self._test_request_with_identical_alt_auth('body')
227
228 def test_request_with_alt_part_without_alt_data(self):
229 """Test empty alternate auth data
230
231 Assert that when alt_part is defined, the corresponding original
232 request element is kept the same.
233 """
234 filters = {
235 'service': 'compute',
236 'endpoint_type': 'publicURL',
237 'region': 'fakeRegion'
238 }
239 self.auth_provider.set_alt_auth_data('headers', None)
240
241 url, headers, body = self.auth_provider.auth_request('GET',
242 self.target_url,
243 filters=filters)
244 # The original headers where empty
245 self.assertNotEqual(url, self.target_url)
246 self.assertIsNone(headers)
guo yunxian0306a4a2016-07-29 16:32:28 +0800247 self.assertIsNone(body)
Matthew Treinish9e26ca82016-02-23 11:43:20 -0500248
249 def _test_request_with_alt_part_without_alt_data_no_change(self, body):
250 """Test empty alternate auth data with no effect
251
252 Assert that when alt_part is defined, no auth_data is provided,
Anh Trand44a8be2016-03-25 09:49:14 +0700253 and the corresponding original request element was not going to
Matthew Treinish9e26ca82016-02-23 11:43:20 -0500254 be changed anyways, and exception is raised
255 """
256 filters = {
257 'service': 'compute',
258 'endpoint_type': 'publicURL',
259 'region': 'fakeRegion'
260 }
261 self.auth_provider.set_alt_auth_data('body', None)
262
263 self.assertRaises(exceptions.BadAltAuth,
264 self.auth_provider.auth_request,
265 'GET', self.target_url, filters=filters)
266
267 def test_request_with_alt_part_without_alt_data_no_change_headers(self):
268 self._test_request_with_alt_part_without_alt_data_no_change('headers')
269
270 def test_request_with_alt_part_without_alt_data_no_change_url(self):
271 self._test_request_with_alt_part_without_alt_data_no_change('url')
272
273 def test_request_with_alt_part_without_alt_data_no_change_body(self):
274 self._test_request_with_alt_part_without_alt_data_no_change('body')
275
276 def test_request_with_bad_service(self):
277 filters = {
278 'service': 'BAD_SERVICE',
279 'endpoint_type': 'publicURL',
280 'region': 'fakeRegion'
281 }
282 self.assertRaises(exceptions.EndpointNotFound,
283 self.auth_provider.auth_request, 'GET',
284 self.target_url, filters=filters)
285
286 def test_request_without_service(self):
287 filters = {
288 'service': None,
289 'endpoint_type': 'publicURL',
290 'region': 'fakeRegion'
291 }
292 self.assertRaises(exceptions.EndpointNotFound,
293 self.auth_provider.auth_request, 'GET',
294 self.target_url, filters=filters)
295
296 def test_check_credentials_missing_attribute(self):
297 for attr in ['username', 'password']:
298 cred = copy.copy(self.credentials)
299 del cred[attr]
300 self.assertFalse(self.auth_provider.check_credentials(cred))
301
302 def test_fill_credentials(self):
303 self.auth_provider.fill_credentials()
304 creds = self.auth_provider.credentials
305 for attr in ['user_id', 'tenant_id']:
306 self.assertEqual(self._get_from_fake_identity(attr),
307 getattr(creds, attr))
308
309 def _test_base_url_helper(self, expected_url, filters,
310 auth_data=None):
311
312 url = self.auth_provider.base_url(filters, auth_data)
313 self.assertEqual(url, expected_url)
314
315 def test_base_url(self):
316 self.filters = {
317 'service': 'compute',
318 'endpoint_type': 'publicURL',
319 'region': 'FakeRegion'
320 }
321 expected = self._get_result_url_from_endpoint(
322 self._endpoints[0]['endpoints'][1])
323 self._test_base_url_helper(expected, self.filters)
324
325 def test_base_url_to_get_admin_endpoint(self):
326 self.filters = {
327 'service': 'compute',
328 'endpoint_type': 'adminURL',
329 'region': 'FakeRegion'
330 }
331 expected = self._get_result_url_from_endpoint(
332 self._endpoints[0]['endpoints'][1], endpoint_type='adminURL')
333 self._test_base_url_helper(expected, self.filters)
334
335 def test_base_url_unknown_region(self):
336 """If the region is unknown, the first endpoint is returned."""
337 self.filters = {
338 'service': 'compute',
339 'endpoint_type': 'publicURL',
340 'region': 'AintNoBodyKnowThisRegion'
341 }
342 expected = self._get_result_url_from_endpoint(
343 self._endpoints[0]['endpoints'][0])
344 self._test_base_url_helper(expected, self.filters)
345
346 def test_base_url_with_non_existent_service(self):
347 self.filters = {
348 'service': 'BAD_SERVICE',
349 'endpoint_type': 'publicURL',
350 'region': 'FakeRegion'
351 }
352 self.assertRaises(exceptions.EndpointNotFound,
353 self._test_base_url_helper, None, self.filters)
354
355 def test_base_url_without_service(self):
356 self.filters = {
357 'endpoint_type': 'publicURL',
358 'region': 'FakeRegion'
359 }
360 self.assertRaises(exceptions.EndpointNotFound,
361 self._test_base_url_helper, None, self.filters)
362
Eric Wehrmeister54c7bd42016-02-24 11:11:07 -0600363 def test_base_url_with_known_name(self):
364 """If name and service is known, return the endpoint."""
365 self.filters = {
366 'service': 'compute',
367 'endpoint_type': 'publicURL',
368 'region': 'FakeRegion',
369 'name': 'nova'
370 }
371 expected = self._get_result_url_from_endpoint(
372 self._endpoints[0]['endpoints'][1])
373 self._test_base_url_helper(expected, self.filters)
374
375 def test_base_url_with_known_name_and_unknown_servce(self):
376 """Test with Known Name and Unknown service
377
378 If the name is known but the service is unknown, raise an exception.
379 """
380 self.filters = {
381 'service': 'AintNoBodyKnowThatService',
382 'endpoint_type': 'publicURL',
383 'region': 'FakeRegion',
384 'name': 'AintNoBodyKnowThatName'
385 }
386 self.assertRaises(exceptions.EndpointNotFound,
387 self._test_base_url_helper, None, self.filters)
388
389 def test_base_url_with_unknown_name_and_known_service(self):
390 """Test with Unknown Name and Known Service
391
392 If the name is unknown, raise an exception. Note that filtering by
393 name is only successful service exists.
394 """
395
396 self.filters = {
397 'service': 'compute',
398 'endpoint_type': 'publicURL',
399 'region': 'FakeRegion',
400 'name': 'AintNoBodyKnowThatName'
401 }
402 self.assertRaises(exceptions.EndpointNotFound,
403 self._test_base_url_helper, None, self.filters)
404
405 def test_base_url_without_name(self):
406 self.filters = {
407 'service': 'compute',
408 'endpoint_type': 'publicURL',
409 'region': 'FakeRegion',
410 }
411 expected = self._get_result_url_from_endpoint(
412 self._endpoints[0]['endpoints'][1])
413 self._test_base_url_helper(expected, self.filters)
414
Matthew Treinish9e26ca82016-02-23 11:43:20 -0500415 def test_base_url_with_api_version_filter(self):
416 self.filters = {
417 'service': 'compute',
418 'endpoint_type': 'publicURL',
419 'region': 'FakeRegion',
420 'api_version': 'v12'
421 }
422 expected = self._get_result_url_from_endpoint(
423 self._endpoints[0]['endpoints'][1], replacement='v12')
424 self._test_base_url_helper(expected, self.filters)
425
426 def test_base_url_with_skip_path_filter(self):
427 self.filters = {
428 'service': 'compute',
429 'endpoint_type': 'publicURL',
430 'region': 'FakeRegion',
431 'skip_path': True
432 }
433 expected = 'http://fake_url/'
434 self._test_base_url_helper(expected, self.filters)
435
Jamie Lennoxa934a702016-03-09 11:36:36 +1100436 def test_base_url_with_unversioned_endpoint(self):
437 auth_data = {
438 'serviceCatalog': [
439 {
440 'type': 'identity',
441 'endpoints': [
442 {
443 'region': 'FakeRegion',
444 'publicURL': 'http://fake_url'
445 }
446 ]
447 }
448 ]
449 }
450
451 filters = {
452 'service': 'identity',
453 'endpoint_type': 'publicURL',
454 'region': 'FakeRegion',
455 'api_version': 'v2.0'
456 }
457
458 expected = 'http://fake_url/v2.0'
459 self._test_base_url_helper(expected, filters, ('token', auth_data))
460
Matthew Treinish9e26ca82016-02-23 11:43:20 -0500461 def test_token_not_expired(self):
462 expiry_data = datetime.datetime.utcnow() + datetime.timedelta(days=1)
463 self._verify_expiry(expiry_data=expiry_data, should_be_expired=False)
464
465 def test_token_expired(self):
466 expiry_data = datetime.datetime.utcnow() - datetime.timedelta(hours=1)
467 self._verify_expiry(expiry_data=expiry_data, should_be_expired=True)
468
469 def test_token_not_expired_to_be_renewed(self):
470 expiry_data = (datetime.datetime.utcnow() +
471 self.auth_provider.token_expiry_threshold / 2)
472 self._verify_expiry(expiry_data=expiry_data, should_be_expired=True)
473
474 def _verify_expiry(self, expiry_data, should_be_expired):
475 for expiry_format in self.auth_provider.EXPIRY_DATE_FORMATS:
476 auth_data = self._auth_data_with_expiry(
477 expiry_data.strftime(expiry_format))
478 self.assertEqual(self.auth_provider.is_expired(auth_data),
479 should_be_expired)
480
Andrea Frittoli (andreaf)3e82af72016-05-05 22:53:38 +0100481 def test_set_scope_all_valid(self):
482 for scope in self.auth_provider.SCOPES:
483 self.auth_provider.scope = scope
484 self.assertEqual(scope, self.auth_provider.scope)
485
486 def test_set_scope_invalid(self):
487 with testtools.ExpectedException(exceptions.InvalidScope,
488 '.* invalid_scope .*'):
489 self.auth_provider.scope = 'invalid_scope'
490
Matthew Treinish9e26ca82016-02-23 11:43:20 -0500491
492class TestKeystoneV3AuthProvider(TestKeystoneV2AuthProvider):
493 _endpoints = fake_identity.IDENTITY_V3_RESPONSE['token']['catalog']
494 _auth_provider_class = auth.KeystoneV3AuthProvider
495 credentials = fake_credentials.FakeKeystoneV3Credentials()
496
497 def setUp(self):
498 super(TestKeystoneV3AuthProvider, self).setUp()
Jordan Pittier0021c292016-03-29 21:33:34 +0200499 self.patchobject(v3_client.V3TokenClient, 'raw_request',
500 fake_identity._fake_v3_response)
Matthew Treinish9e26ca82016-02-23 11:43:20 -0500501
502 def _get_fake_identity(self):
503 return fake_identity.IDENTITY_V3_RESPONSE['token']
504
505 def _get_fake_alt_identity(self):
506 return fake_identity.ALT_IDENTITY_V3['token']
507
508 def _get_result_url_from_endpoint(self, ep, replacement=None):
509 if replacement:
510 return ep['url'].replace('v3', replacement)
511 return ep['url']
512
513 def _auth_data_with_expiry(self, date_as_string):
514 token, access = self.auth_provider.auth_data
515 access['expires_at'] = date_as_string
516 return token, access
517
518 def _get_from_fake_identity(self, attr):
519 token = fake_identity.IDENTITY_V3_RESPONSE['token']
520 if attr == 'user_id':
521 return token['user']['id']
522 elif attr == 'project_id':
523 return token['project']['id']
524 elif attr == 'user_domain_id':
525 return token['user']['domain']['id']
526 elif attr == 'project_domain_id':
527 return token['project']['domain']['id']
528
529 def test_check_credentials_missing_attribute(self):
530 # reset credentials to fresh ones
531 self.credentials.reset()
532 for attr in ['username', 'password', 'user_domain_name',
533 'project_domain_name']:
534 cred = copy.copy(self.credentials)
535 del cred[attr]
536 self.assertFalse(self.auth_provider.check_credentials(cred),
537 "Credentials should be invalid without %s" % attr)
538
539 def test_check_domain_credentials_missing_attribute(self):
540 # reset credentials to fresh ones
541 self.credentials.reset()
542 domain_creds = fake_credentials.FakeKeystoneV3DomainCredentials()
543 for attr in ['username', 'password', 'user_domain_name']:
544 cred = copy.copy(domain_creds)
545 del cred[attr]
546 self.assertFalse(self.auth_provider.check_credentials(cred),
547 "Credentials should be invalid without %s" % attr)
548
549 def test_fill_credentials(self):
550 self.auth_provider.fill_credentials()
551 creds = self.auth_provider.credentials
552 for attr in ['user_id', 'project_id', 'user_domain_id',
553 'project_domain_id']:
554 self.assertEqual(self._get_from_fake_identity(attr),
555 getattr(creds, attr))
556
557 # Overwrites v2 test
558 def test_base_url_to_get_admin_endpoint(self):
559 self.filters = {
560 'service': 'compute',
561 'endpoint_type': 'admin',
562 'region': 'MiddleEarthRegion'
563 }
564 expected = self._get_result_url_from_endpoint(
565 self._endpoints[0]['endpoints'][2])
566 self._test_base_url_helper(expected, self.filters)
Jamie Lennoxa934a702016-03-09 11:36:36 +1100567
568 # Overwrites v2 test
569 def test_base_url_with_unversioned_endpoint(self):
570 auth_data = {
571 'catalog': [
572 {
573 'type': 'identity',
574 'endpoints': [
575 {
576 'region': 'FakeRegion',
577 'url': 'http://fake_url',
578 'interface': 'public'
579 }
580 ]
581 }
582 ]
583 }
584
585 filters = {
586 'service': 'identity',
587 'endpoint_type': 'publicURL',
588 'region': 'FakeRegion',
589 'api_version': 'v3'
590 }
591
592 expected = 'http://fake_url/v3'
593 self._test_base_url_helper(expected, filters, ('token', auth_data))
John Warrenb10c6ca2016-02-26 15:32:37 -0500594
Andrea Frittoli (andreaf)3e82af72016-05-05 22:53:38 +0100595 # Base URL test with scope only for V3
596 def test_base_url_scope_project(self):
597 self.auth_provider.scope = 'project'
598 self.filters = {
599 'service': 'compute',
600 'endpoint_type': 'publicURL',
601 'region': 'FakeRegion'
602 }
603 expected = self._get_result_url_from_endpoint(
604 self._endpoints[0]['endpoints'][1])
605 self._test_base_url_helper(expected, self.filters)
606
607 # Base URL test with scope only for V3
608 def test_base_url_unscoped_identity(self):
609 self.auth_provider.scope = 'unscoped'
610 self.patchobject(v3_client.V3TokenClient, 'raw_request',
611 fake_identity._fake_v3_response_no_scope)
612 self.filters = {
613 'service': 'identity',
614 'endpoint_type': 'publicURL',
615 'region': 'FakeRegion'
616 }
617 expected = fake_identity.FAKE_AUTH_URL
618 self._test_base_url_helper(expected, self.filters)
619
620 # Base URL test with scope only for V3
621 def test_base_url_unscoped_other(self):
622 self.auth_provider.scope = 'unscoped'
623 self.patchobject(v3_client.V3TokenClient, 'raw_request',
624 fake_identity._fake_v3_response_no_scope)
625 self.filters = {
626 'service': 'compute',
627 'endpoint_type': 'publicURL',
628 'region': 'FakeRegion'
629 }
630 self.assertRaises(exceptions.EndpointNotFound,
631 self.auth_provider.base_url,
632 auth_data=self.auth_provider.auth_data,
633 filters=self.filters)
634
635 def test_auth_parameters_with_scope_unset(self):
636 # No scope defaults to 'project'
637 all_creds = fake_credentials.FakeKeystoneV3AllCredentials()
638 self.auth_provider.credentials = all_creds
639 auth_params = self.auth_provider._auth_params()
640 self.assertNotIn('scope', auth_params.keys())
641 for attr in all_creds.get_init_attributes():
642 if attr.startswith('domain_'):
643 self.assertNotIn(attr, auth_params.keys())
644 else:
645 self.assertIn(attr, auth_params.keys())
646 self.assertEqual(getattr(all_creds, attr), auth_params[attr])
647
648 def test_auth_parameters_with_project_scope(self):
649 all_creds = fake_credentials.FakeKeystoneV3AllCredentials()
650 self.auth_provider.credentials = all_creds
651 self.auth_provider.scope = 'project'
652 auth_params = self.auth_provider._auth_params()
653 self.assertNotIn('scope', auth_params.keys())
654 for attr in all_creds.get_init_attributes():
655 if attr.startswith('domain_'):
656 self.assertNotIn(attr, auth_params.keys())
657 else:
658 self.assertIn(attr, auth_params.keys())
659 self.assertEqual(getattr(all_creds, attr), auth_params[attr])
660
661 def test_auth_parameters_with_domain_scope(self):
662 all_creds = fake_credentials.FakeKeystoneV3AllCredentials()
663 self.auth_provider.credentials = all_creds
664 self.auth_provider.scope = 'domain'
665 auth_params = self.auth_provider._auth_params()
666 self.assertNotIn('scope', auth_params.keys())
667 for attr in all_creds.get_init_attributes():
668 if attr.startswith('project_'):
669 self.assertNotIn(attr, auth_params.keys())
670 else:
671 self.assertIn(attr, auth_params.keys())
672 self.assertEqual(getattr(all_creds, attr), auth_params[attr])
673
674 def test_auth_parameters_unscoped(self):
675 all_creds = fake_credentials.FakeKeystoneV3AllCredentials()
676 self.auth_provider.credentials = all_creds
677 self.auth_provider.scope = 'unscoped'
678 auth_params = self.auth_provider._auth_params()
679 self.assertNotIn('scope', auth_params.keys())
680 for attr in all_creds.get_init_attributes():
681 if attr.startswith('project_') or attr.startswith('domain_'):
682 self.assertNotIn(attr, auth_params.keys())
683 else:
684 self.assertIn(attr, auth_params.keys())
685 self.assertEqual(getattr(all_creds, attr), auth_params[attr])
686
John Warrenb10c6ca2016-02-26 15:32:37 -0500687
688class TestKeystoneV3Credentials(base.TestCase):
689 def testSetAttrUserDomain(self):
690 creds = auth.KeystoneV3Credentials()
691 creds.user_domain_name = 'user_domain'
692 creds.domain_name = 'domain'
693 self.assertEqual('user_domain', creds.user_domain_name)
694 creds = auth.KeystoneV3Credentials()
695 creds.domain_name = 'domain'
696 creds.user_domain_name = 'user_domain'
697 self.assertEqual('user_domain', creds.user_domain_name)
698
699 def testSetAttrProjectDomain(self):
700 creds = auth.KeystoneV3Credentials()
701 creds.project_domain_name = 'project_domain'
702 creds.domain_name = 'domain'
703 self.assertEqual('project_domain', creds.user_domain_name)
704 creds = auth.KeystoneV3Credentials()
705 creds.domain_name = 'domain'
706 creds.project_domain_name = 'project_domain'
707 self.assertEqual('project_domain', creds.project_domain_name)
708
709 def testProjectTenantNoCollision(self):
710 creds = auth.KeystoneV3Credentials(tenant_id='tenant')
711 self.assertEqual('tenant', creds.project_id)
712 creds = auth.KeystoneV3Credentials(project_id='project')
713 self.assertEqual('project', creds.tenant_id)
714 creds = auth.KeystoneV3Credentials(tenant_name='tenant')
715 self.assertEqual('tenant', creds.project_name)
716 creds = auth.KeystoneV3Credentials(project_name='project')
717 self.assertEqual('project', creds.tenant_name)
718
719 def testProjectTenantCollision(self):
720 attrs = {'tenant_id': 'tenant', 'project_id': 'project'}
721 self.assertRaises(
722 exceptions.InvalidCredentials, auth.KeystoneV3Credentials, **attrs)
723 attrs = {'tenant_name': 'tenant', 'project_name': 'project'}
724 self.assertRaises(
725 exceptions.InvalidCredentials, auth.KeystoneV3Credentials, **attrs)
Brant Knudsonf2d1f572016-04-11 15:02:01 -0500726
727
728class TestReplaceVersion(base.TestCase):
729 def test_version_no_trailing_path(self):
730 self.assertEqual(
731 'http://localhost:35357/v2.0',
732 auth.replace_version('http://localhost:35357/v3', 'v2.0'))
733
734 def test_version_no_trailing_path_solidus(self):
735 self.assertEqual(
736 'http://localhost:35357/v2.0/',
737 auth.replace_version('http://localhost:35357/v3/', 'v2.0'))
738
739 def test_version_trailing_path(self):
740 self.assertEqual(
741 'http://localhost:35357/v2.0/uuid',
742 auth.replace_version('http://localhost:35357/v3/uuid', 'v2.0'))
743
744 def test_version_trailing_path_solidus(self):
745 self.assertEqual(
746 'http://localhost:35357/v2.0/uuid/',
747 auth.replace_version('http://localhost:35357/v3/uuid/', 'v2.0'))
748
749 def test_no_version_base(self):
750 self.assertEqual(
751 'http://localhost:35357/v2.0',
752 auth.replace_version('http://localhost:35357', 'v2.0'))
753
754 def test_no_version_base_solidus(self):
Brant Knudsonf2d1f572016-04-11 15:02:01 -0500755 self.assertEqual(
Brant Knudson77293802016-04-11 15:14:54 -0500756 'http://localhost:35357/v2.0',
Brant Knudsonf2d1f572016-04-11 15:02:01 -0500757 auth.replace_version('http://localhost:35357/', 'v2.0'))
758
759 def test_no_version_path(self):
Brant Knudsonf2d1f572016-04-11 15:02:01 -0500760 self.assertEqual(
Brant Knudson77293802016-04-11 15:14:54 -0500761 'http://localhost/identity/v2.0',
Brant Knudsonf2d1f572016-04-11 15:02:01 -0500762 auth.replace_version('http://localhost/identity', 'v2.0'))
763
764 def test_no_version_path_solidus(self):
Brant Knudsonf2d1f572016-04-11 15:02:01 -0500765 self.assertEqual(
Brant Knudson77293802016-04-11 15:14:54 -0500766 'http://localhost/identity/v2.0',
Brant Knudsonf2d1f572016-04-11 15:02:01 -0500767 auth.replace_version('http://localhost/identity/', 'v2.0'))
768
769 def test_path_version(self):
770 self.assertEqual(
771 'http://localhost/identity/v2.0',
772 auth.replace_version('http://localhost/identity/v3', 'v2.0'))
773
774 def test_path_version_solidus(self):
775 self.assertEqual(
776 'http://localhost/identity/v2.0/',
777 auth.replace_version('http://localhost/identity/v3/', 'v2.0'))
778
779 def test_path_version_trailing_path(self):
780 self.assertEqual(
781 'http://localhost/identity/v2.0/uuid',
782 auth.replace_version('http://localhost/identity/v3/uuid', 'v2.0'))
783
784 def test_path_version_trailing_path_solidus(self):
785 self.assertEqual(
786 'http://localhost/identity/v2.0/uuid/',
787 auth.replace_version('http://localhost/identity/v3/uuid/', 'v2.0'))
Andrea Frittoli (andreaf)3e82af72016-05-05 22:53:38 +0100788
789
790class TestKeystoneV3AuthProvider_DomainScope(BaseAuthTestsSetUp):
791 _endpoints = fake_identity.IDENTITY_V3_RESPONSE['token']['catalog']
792 _auth_provider_class = auth.KeystoneV3AuthProvider
793 credentials = fake_credentials.FakeKeystoneV3Credentials()
794
795 def setUp(self):
796 super(TestKeystoneV3AuthProvider_DomainScope, self).setUp()
797 self.patchobject(v3_client.V3TokenClient, 'raw_request',
798 fake_identity._fake_v3_response_domain_scope)
799
800 def test_get_auth_with_domain_scope(self):
801 self.auth_provider.scope = 'domain'
802 _, auth_data = self.auth_provider.get_auth()
803 self.assertIn('domain', auth_data)
804 self.assertNotIn('project', auth_data)
ghanshyamc0d500a2016-06-15 09:50:21 +0900805
806
807class TestGetCredentials(base.TestCase):
808
809 def test_invalid_identity_version(self):
810 with testtools.ExpectedException(exceptions.InvalidIdentityVersion,
811 '.* v1 .*'):
812 auth.get_credentials('http://localhost/identity/v3',
813 identity_version='v1')