blob: 2f975d22c64583dffc5dcceea23c5b32b3519eca [file] [log] [blame]
Matthew Treinish9e26ca82016-02-23 11:43:20 -05001# Copyright 2014 IBM Corp.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16import copy
17import datetime
Andrea Frittoli (andreaf)3e82af72016-05-05 22:53:38 +010018import testtools
Matthew Treinish9e26ca82016-02-23 11:43:20 -050019
20from oslotest import mockpatch
21
22from tempest.lib import auth
23from tempest.lib import exceptions
24from tempest.lib.services.identity.v2 import token_client as v2_client
25from tempest.lib.services.identity.v3 import token_client as v3_client
Matthew Treinishffad78a2016-04-16 14:39:52 -040026from tempest.tests import base
Matthew Treinish9e26ca82016-02-23 11:43:20 -050027from tempest.tests.lib import fake_credentials
Matthew Treinish9e26ca82016-02-23 11:43:20 -050028from tempest.tests.lib import fake_identity
29
30
31def fake_get_credentials(fill_in=True, identity_version='v2', **kwargs):
32 return fake_credentials.FakeCredentials()
33
34
35class BaseAuthTestsSetUp(base.TestCase):
36 _auth_provider_class = None
37 credentials = fake_credentials.FakeCredentials()
38
39 def _auth(self, credentials, auth_url, **params):
40 """returns auth method according to keystone"""
41 return self._auth_provider_class(credentials, auth_url, **params)
42
43 def setUp(self):
44 super(BaseAuthTestsSetUp, self).setUp()
Jordan Pittier0021c292016-03-29 21:33:34 +020045 self.patchobject(auth, 'get_credentials', fake_get_credentials)
Matthew Treinish9e26ca82016-02-23 11:43:20 -050046 self.auth_provider = self._auth(self.credentials,
47 fake_identity.FAKE_AUTH_URL)
48
49
50class TestBaseAuthProvider(BaseAuthTestsSetUp):
51 """Tests for base AuthProvider
52
53 This tests auth.AuthProvider class which is base for the other so we
54 obviously don't test not implemented method or the ones which strongly
55 depends on them.
56 """
57
58 class FakeAuthProviderImpl(auth.AuthProvider):
59 def _decorate_request(self):
60 pass
61
62 def _fill_credentials(self):
63 pass
64
65 def _get_auth(self):
66 pass
67
68 def base_url(self):
69 pass
70
71 def is_expired(self):
72 pass
73
74 _auth_provider_class = FakeAuthProviderImpl
75
76 def _auth(self, credentials, auth_url, **params):
77 """returns auth method according to keystone"""
78 return self._auth_provider_class(credentials, **params)
79
80 def test_check_credentials_bad_type(self):
81 self.assertFalse(self.auth_provider.check_credentials([]))
82
83 def test_auth_data_property_when_cache_exists(self):
84 self.auth_provider.cache = 'foo'
85 self.useFixture(mockpatch.PatchObject(self.auth_provider,
86 'is_expired',
87 return_value=False))
88 self.assertEqual('foo', getattr(self.auth_provider, 'auth_data'))
89
90 def test_delete_auth_data_property_through_deleter(self):
91 self.auth_provider.cache = 'foo'
92 del self.auth_provider.auth_data
93 self.assertIsNone(self.auth_provider.cache)
94
95 def test_delete_auth_data_property_through_clear_auth(self):
96 self.auth_provider.cache = 'foo'
97 self.auth_provider.clear_auth()
98 self.assertIsNone(self.auth_provider.cache)
99
100 def test_set_and_reset_alt_auth_data(self):
101 self.auth_provider.set_alt_auth_data('foo', 'bar')
102 self.assertEqual(self.auth_provider.alt_part, 'foo')
103 self.assertEqual(self.auth_provider.alt_auth_data, 'bar')
104
105 self.auth_provider.reset_alt_auth_data()
106 self.assertIsNone(self.auth_provider.alt_part)
107 self.assertIsNone(self.auth_provider.alt_auth_data)
108
109 def test_auth_class(self):
110 self.assertRaises(TypeError,
111 auth.AuthProvider,
112 fake_credentials.FakeCredentials)
113
114
115class TestKeystoneV2AuthProvider(BaseAuthTestsSetUp):
116 _endpoints = fake_identity.IDENTITY_V2_RESPONSE['access']['serviceCatalog']
117 _auth_provider_class = auth.KeystoneV2AuthProvider
118 credentials = fake_credentials.FakeKeystoneV2Credentials()
119
120 def setUp(self):
121 super(TestKeystoneV2AuthProvider, self).setUp()
Jordan Pittier0021c292016-03-29 21:33:34 +0200122 self.patchobject(v2_client.TokenClient, 'raw_request',
123 fake_identity._fake_v2_response)
Matthew Treinish9e26ca82016-02-23 11:43:20 -0500124 self.target_url = 'test_api'
125
126 def _get_fake_identity(self):
127 return fake_identity.IDENTITY_V2_RESPONSE['access']
128
129 def _get_fake_alt_identity(self):
130 return fake_identity.ALT_IDENTITY_V2_RESPONSE['access']
131
132 def _get_result_url_from_endpoint(self, ep, endpoint_type='publicURL',
133 replacement=None):
134 if replacement:
135 return ep[endpoint_type].replace('v2', replacement)
136 return ep[endpoint_type]
137
138 def _get_token_from_fake_identity(self):
139 return fake_identity.TOKEN
140
141 def _get_from_fake_identity(self, attr):
142 access = fake_identity.IDENTITY_V2_RESPONSE['access']
143 if attr == 'user_id':
144 return access['user']['id']
145 elif attr == 'tenant_id':
146 return access['token']['tenant']['id']
147
148 def _test_request_helper(self, filters, expected):
149 url, headers, body = self.auth_provider.auth_request('GET',
150 self.target_url,
151 filters=filters)
152
153 self.assertEqual(expected['url'], url)
154 self.assertEqual(expected['token'], headers['X-Auth-Token'])
155 self.assertEqual(expected['body'], body)
156
157 def _auth_data_with_expiry(self, date_as_string):
158 token, access = self.auth_provider.auth_data
159 access['token']['expires'] = date_as_string
160 return token, access
161
162 def test_request(self):
163 filters = {
164 'service': 'compute',
165 'endpoint_type': 'publicURL',
166 'region': 'FakeRegion'
167 }
168
169 url = self._get_result_url_from_endpoint(
170 self._endpoints[0]['endpoints'][1]) + '/' + self.target_url
171
172 expected = {
173 'body': None,
174 'url': url,
175 'token': self._get_token_from_fake_identity(),
176 }
177 self._test_request_helper(filters, expected)
178
179 def test_request_with_alt_auth_cleans_alt(self):
180 """Test alternate auth data for headers
181
182 Assert that when the alt data is provided for headers, after an
183 auth_request the data alt_data is cleaned-up.
184 """
185 self.auth_provider.set_alt_auth_data(
186 'headers',
187 (fake_identity.ALT_TOKEN, self._get_fake_alt_identity()))
188 filters = {
189 'service': 'compute',
190 'endpoint_type': 'publicURL',
191 'region': 'fakeRegion'
192 }
193 self.auth_provider.auth_request('GET', self.target_url,
194 filters=filters)
195
196 # Assert alt auth data is clear after it
197 self.assertIsNone(self.auth_provider.alt_part)
198 self.assertIsNone(self.auth_provider.alt_auth_data)
199
200 def _test_request_with_identical_alt_auth(self, part):
201 """Test alternate but identical auth data for headers
202
203 Assert that when the alt data is provided, but it's actually
204 identical, an exception is raised.
205 """
206 self.auth_provider.set_alt_auth_data(
207 part,
208 (fake_identity.TOKEN, self._get_fake_identity()))
209 filters = {
210 'service': 'compute',
211 'endpoint_type': 'publicURL',
212 'region': 'fakeRegion'
213 }
214
215 self.assertRaises(exceptions.BadAltAuth,
216 self.auth_provider.auth_request,
217 'GET', self.target_url, filters=filters)
218
219 def test_request_with_identical_alt_auth_headers(self):
220 self._test_request_with_identical_alt_auth('headers')
221
222 def test_request_with_identical_alt_auth_url(self):
223 self._test_request_with_identical_alt_auth('url')
224
225 def test_request_with_identical_alt_auth_body(self):
226 self._test_request_with_identical_alt_auth('body')
227
228 def test_request_with_alt_part_without_alt_data(self):
229 """Test empty alternate auth data
230
231 Assert that when alt_part is defined, the corresponding original
232 request element is kept the same.
233 """
234 filters = {
235 'service': 'compute',
236 'endpoint_type': 'publicURL',
237 'region': 'fakeRegion'
238 }
239 self.auth_provider.set_alt_auth_data('headers', None)
240
241 url, headers, body = self.auth_provider.auth_request('GET',
242 self.target_url,
243 filters=filters)
244 # The original headers where empty
245 self.assertNotEqual(url, self.target_url)
246 self.assertIsNone(headers)
guo yunxian0306a4a2016-07-29 16:32:28 +0800247 self.assertIsNone(body)
Matthew Treinish9e26ca82016-02-23 11:43:20 -0500248
249 def _test_request_with_alt_part_without_alt_data_no_change(self, body):
250 """Test empty alternate auth data with no effect
251
252 Assert that when alt_part is defined, no auth_data is provided,
Anh Trand44a8be2016-03-25 09:49:14 +0700253 and the corresponding original request element was not going to
Matthew Treinish9e26ca82016-02-23 11:43:20 -0500254 be changed anyways, and exception is raised
255 """
256 filters = {
257 'service': 'compute',
258 'endpoint_type': 'publicURL',
259 'region': 'fakeRegion'
260 }
261 self.auth_provider.set_alt_auth_data('body', None)
262
263 self.assertRaises(exceptions.BadAltAuth,
264 self.auth_provider.auth_request,
265 'GET', self.target_url, filters=filters)
266
267 def test_request_with_alt_part_without_alt_data_no_change_headers(self):
268 self._test_request_with_alt_part_without_alt_data_no_change('headers')
269
270 def test_request_with_alt_part_without_alt_data_no_change_url(self):
271 self._test_request_with_alt_part_without_alt_data_no_change('url')
272
273 def test_request_with_alt_part_without_alt_data_no_change_body(self):
274 self._test_request_with_alt_part_without_alt_data_no_change('body')
275
276 def test_request_with_bad_service(self):
277 filters = {
278 'service': 'BAD_SERVICE',
279 'endpoint_type': 'publicURL',
280 'region': 'fakeRegion'
281 }
282 self.assertRaises(exceptions.EndpointNotFound,
283 self.auth_provider.auth_request, 'GET',
284 self.target_url, filters=filters)
285
286 def test_request_without_service(self):
287 filters = {
288 'service': None,
289 'endpoint_type': 'publicURL',
290 'region': 'fakeRegion'
291 }
292 self.assertRaises(exceptions.EndpointNotFound,
293 self.auth_provider.auth_request, 'GET',
294 self.target_url, filters=filters)
295
296 def test_check_credentials_missing_attribute(self):
297 for attr in ['username', 'password']:
298 cred = copy.copy(self.credentials)
299 del cred[attr]
300 self.assertFalse(self.auth_provider.check_credentials(cred))
301
302 def test_fill_credentials(self):
303 self.auth_provider.fill_credentials()
304 creds = self.auth_provider.credentials
305 for attr in ['user_id', 'tenant_id']:
306 self.assertEqual(self._get_from_fake_identity(attr),
307 getattr(creds, attr))
308
309 def _test_base_url_helper(self, expected_url, filters,
310 auth_data=None):
311
312 url = self.auth_provider.base_url(filters, auth_data)
313 self.assertEqual(url, expected_url)
314
315 def test_base_url(self):
316 self.filters = {
317 'service': 'compute',
318 'endpoint_type': 'publicURL',
319 'region': 'FakeRegion'
320 }
321 expected = self._get_result_url_from_endpoint(
322 self._endpoints[0]['endpoints'][1])
323 self._test_base_url_helper(expected, self.filters)
324
325 def test_base_url_to_get_admin_endpoint(self):
326 self.filters = {
327 'service': 'compute',
328 'endpoint_type': 'adminURL',
329 'region': 'FakeRegion'
330 }
331 expected = self._get_result_url_from_endpoint(
332 self._endpoints[0]['endpoints'][1], endpoint_type='adminURL')
333 self._test_base_url_helper(expected, self.filters)
334
335 def test_base_url_unknown_region(self):
336 """If the region is unknown, the first endpoint is returned."""
337 self.filters = {
338 'service': 'compute',
339 'endpoint_type': 'publicURL',
340 'region': 'AintNoBodyKnowThisRegion'
341 }
342 expected = self._get_result_url_from_endpoint(
343 self._endpoints[0]['endpoints'][0])
344 self._test_base_url_helper(expected, self.filters)
345
346 def test_base_url_with_non_existent_service(self):
347 self.filters = {
348 'service': 'BAD_SERVICE',
349 'endpoint_type': 'publicURL',
350 'region': 'FakeRegion'
351 }
352 self.assertRaises(exceptions.EndpointNotFound,
353 self._test_base_url_helper, None, self.filters)
354
355 def test_base_url_without_service(self):
356 self.filters = {
357 'endpoint_type': 'publicURL',
358 'region': 'FakeRegion'
359 }
360 self.assertRaises(exceptions.EndpointNotFound,
361 self._test_base_url_helper, None, self.filters)
362
Eric Wehrmeister54c7bd42016-02-24 11:11:07 -0600363 def test_base_url_with_known_name(self):
364 """If name and service is known, return the endpoint."""
365 self.filters = {
366 'service': 'compute',
367 'endpoint_type': 'publicURL',
368 'region': 'FakeRegion',
369 'name': 'nova'
370 }
371 expected = self._get_result_url_from_endpoint(
372 self._endpoints[0]['endpoints'][1])
373 self._test_base_url_helper(expected, self.filters)
374
375 def test_base_url_with_known_name_and_unknown_servce(self):
376 """Test with Known Name and Unknown service
377
378 If the name is known but the service is unknown, raise an exception.
379 """
380 self.filters = {
381 'service': 'AintNoBodyKnowThatService',
382 'endpoint_type': 'publicURL',
383 'region': 'FakeRegion',
384 'name': 'AintNoBodyKnowThatName'
385 }
386 self.assertRaises(exceptions.EndpointNotFound,
387 self._test_base_url_helper, None, self.filters)
388
389 def test_base_url_with_unknown_name_and_known_service(self):
390 """Test with Unknown Name and Known Service
391
392 If the name is unknown, raise an exception. Note that filtering by
393 name is only successful service exists.
394 """
395
396 self.filters = {
397 'service': 'compute',
398 'endpoint_type': 'publicURL',
399 'region': 'FakeRegion',
400 'name': 'AintNoBodyKnowThatName'
401 }
402 self.assertRaises(exceptions.EndpointNotFound,
403 self._test_base_url_helper, None, self.filters)
404
405 def test_base_url_without_name(self):
406 self.filters = {
407 'service': 'compute',
408 'endpoint_type': 'publicURL',
409 'region': 'FakeRegion',
410 }
411 expected = self._get_result_url_from_endpoint(
412 self._endpoints[0]['endpoints'][1])
413 self._test_base_url_helper(expected, self.filters)
414
Matthew Treinish9e26ca82016-02-23 11:43:20 -0500415 def test_base_url_with_api_version_filter(self):
416 self.filters = {
417 'service': 'compute',
418 'endpoint_type': 'publicURL',
419 'region': 'FakeRegion',
420 'api_version': 'v12'
421 }
422 expected = self._get_result_url_from_endpoint(
423 self._endpoints[0]['endpoints'][1], replacement='v12')
424 self._test_base_url_helper(expected, self.filters)
425
426 def test_base_url_with_skip_path_filter(self):
427 self.filters = {
428 'service': 'compute',
429 'endpoint_type': 'publicURL',
430 'region': 'FakeRegion',
431 'skip_path': True
432 }
433 expected = 'http://fake_url/'
434 self._test_base_url_helper(expected, self.filters)
435
Jamie Lennoxa934a702016-03-09 11:36:36 +1100436 def test_base_url_with_unversioned_endpoint(self):
437 auth_data = {
438 'serviceCatalog': [
439 {
440 'type': 'identity',
441 'endpoints': [
442 {
443 'region': 'FakeRegion',
444 'publicURL': 'http://fake_url'
445 }
446 ]
447 }
448 ]
449 }
450
451 filters = {
452 'service': 'identity',
453 'endpoint_type': 'publicURL',
454 'region': 'FakeRegion',
455 'api_version': 'v2.0'
456 }
457
458 expected = 'http://fake_url/v2.0'
459 self._test_base_url_helper(expected, filters, ('token', auth_data))
460
Andrea Frittoli5e27eab2016-12-07 14:23:32 +0000461 def test_base_url_with_extra_path_endpoint(self):
462 auth_data = {
463 'serviceCatalog': [
464 {
465 'type': 'compute',
466 'endpoints': [
467 {
468 'region': 'FakeRegion',
469 'publicURL': 'http://fake_url/some_path/v2.0'
470 }
471 ]
472 }
473 ]
474 }
475
476 filters = {
477 'service': 'compute',
478 'endpoint_type': 'publicURL',
479 'region': 'FakeRegion',
480 'api_version': 'v2.0'
481 }
482
483 expected = 'http://fake_url/some_path/v2.0'
484 self._test_base_url_helper(expected, filters, ('token', auth_data))
485
486 def test_base_url_with_unversioned_extra_path_endpoint(self):
487 auth_data = {
488 'serviceCatalog': [
489 {
490 'type': 'compute',
491 'endpoints': [
492 {
493 'region': 'FakeRegion',
494 'publicURL': 'http://fake_url/some_path'
495 }
496 ]
497 }
498 ]
499 }
500
501 filters = {
502 'service': 'compute',
503 'endpoint_type': 'publicURL',
504 'region': 'FakeRegion',
505 'api_version': 'v2.0'
506 }
507
508 expected = 'http://fake_url/some_path/v2.0'
509 self._test_base_url_helper(expected, filters, ('token', auth_data))
510
Matthew Treinish9e26ca82016-02-23 11:43:20 -0500511 def test_token_not_expired(self):
512 expiry_data = datetime.datetime.utcnow() + datetime.timedelta(days=1)
513 self._verify_expiry(expiry_data=expiry_data, should_be_expired=False)
514
515 def test_token_expired(self):
516 expiry_data = datetime.datetime.utcnow() - datetime.timedelta(hours=1)
517 self._verify_expiry(expiry_data=expiry_data, should_be_expired=True)
518
519 def test_token_not_expired_to_be_renewed(self):
520 expiry_data = (datetime.datetime.utcnow() +
521 self.auth_provider.token_expiry_threshold / 2)
522 self._verify_expiry(expiry_data=expiry_data, should_be_expired=True)
523
524 def _verify_expiry(self, expiry_data, should_be_expired):
525 for expiry_format in self.auth_provider.EXPIRY_DATE_FORMATS:
526 auth_data = self._auth_data_with_expiry(
527 expiry_data.strftime(expiry_format))
528 self.assertEqual(self.auth_provider.is_expired(auth_data),
529 should_be_expired)
530
Andrea Frittoli (andreaf)3e82af72016-05-05 22:53:38 +0100531 def test_set_scope_all_valid(self):
532 for scope in self.auth_provider.SCOPES:
533 self.auth_provider.scope = scope
534 self.assertEqual(scope, self.auth_provider.scope)
535
536 def test_set_scope_invalid(self):
537 with testtools.ExpectedException(exceptions.InvalidScope,
538 '.* invalid_scope .*'):
539 self.auth_provider.scope = 'invalid_scope'
540
Matthew Treinish9e26ca82016-02-23 11:43:20 -0500541
542class TestKeystoneV3AuthProvider(TestKeystoneV2AuthProvider):
543 _endpoints = fake_identity.IDENTITY_V3_RESPONSE['token']['catalog']
544 _auth_provider_class = auth.KeystoneV3AuthProvider
545 credentials = fake_credentials.FakeKeystoneV3Credentials()
546
547 def setUp(self):
548 super(TestKeystoneV3AuthProvider, self).setUp()
Jordan Pittier0021c292016-03-29 21:33:34 +0200549 self.patchobject(v3_client.V3TokenClient, 'raw_request',
550 fake_identity._fake_v3_response)
Matthew Treinish9e26ca82016-02-23 11:43:20 -0500551
552 def _get_fake_identity(self):
553 return fake_identity.IDENTITY_V3_RESPONSE['token']
554
555 def _get_fake_alt_identity(self):
556 return fake_identity.ALT_IDENTITY_V3['token']
557
558 def _get_result_url_from_endpoint(self, ep, replacement=None):
559 if replacement:
560 return ep['url'].replace('v3', replacement)
561 return ep['url']
562
563 def _auth_data_with_expiry(self, date_as_string):
564 token, access = self.auth_provider.auth_data
565 access['expires_at'] = date_as_string
566 return token, access
567
568 def _get_from_fake_identity(self, attr):
569 token = fake_identity.IDENTITY_V3_RESPONSE['token']
570 if attr == 'user_id':
571 return token['user']['id']
572 elif attr == 'project_id':
573 return token['project']['id']
574 elif attr == 'user_domain_id':
575 return token['user']['domain']['id']
576 elif attr == 'project_domain_id':
577 return token['project']['domain']['id']
578
579 def test_check_credentials_missing_attribute(self):
580 # reset credentials to fresh ones
581 self.credentials.reset()
582 for attr in ['username', 'password', 'user_domain_name',
583 'project_domain_name']:
584 cred = copy.copy(self.credentials)
585 del cred[attr]
586 self.assertFalse(self.auth_provider.check_credentials(cred),
587 "Credentials should be invalid without %s" % attr)
588
589 def test_check_domain_credentials_missing_attribute(self):
590 # reset credentials to fresh ones
591 self.credentials.reset()
592 domain_creds = fake_credentials.FakeKeystoneV3DomainCredentials()
593 for attr in ['username', 'password', 'user_domain_name']:
594 cred = copy.copy(domain_creds)
595 del cred[attr]
596 self.assertFalse(self.auth_provider.check_credentials(cred),
597 "Credentials should be invalid without %s" % attr)
598
599 def test_fill_credentials(self):
600 self.auth_provider.fill_credentials()
601 creds = self.auth_provider.credentials
602 for attr in ['user_id', 'project_id', 'user_domain_id',
603 'project_domain_id']:
604 self.assertEqual(self._get_from_fake_identity(attr),
605 getattr(creds, attr))
606
607 # Overwrites v2 test
608 def test_base_url_to_get_admin_endpoint(self):
609 self.filters = {
610 'service': 'compute',
611 'endpoint_type': 'admin',
612 'region': 'MiddleEarthRegion'
613 }
614 expected = self._get_result_url_from_endpoint(
615 self._endpoints[0]['endpoints'][2])
616 self._test_base_url_helper(expected, self.filters)
Jamie Lennoxa934a702016-03-09 11:36:36 +1100617
618 # Overwrites v2 test
619 def test_base_url_with_unversioned_endpoint(self):
620 auth_data = {
621 'catalog': [
622 {
623 'type': 'identity',
624 'endpoints': [
625 {
626 'region': 'FakeRegion',
627 'url': 'http://fake_url',
628 'interface': 'public'
629 }
630 ]
631 }
632 ]
633 }
634
635 filters = {
636 'service': 'identity',
637 'endpoint_type': 'publicURL',
638 'region': 'FakeRegion',
639 'api_version': 'v3'
640 }
641
642 expected = 'http://fake_url/v3'
643 self._test_base_url_helper(expected, filters, ('token', auth_data))
John Warrenb10c6ca2016-02-26 15:32:37 -0500644
Andrea Frittoli5e27eab2016-12-07 14:23:32 +0000645 def test_base_url_with_extra_path_endpoint(self):
646 auth_data = {
647 'catalog': [
648 {
649 'type': 'compute',
650 'endpoints': [
651 {
652 'region': 'FakeRegion',
653 'url': 'http://fake_url/some_path/v2.0',
654 'interface': 'public'
655 }
656 ]
657 }
658 ]
659 }
660
661 filters = {
662 'service': 'compute',
663 'endpoint_type': 'publicURL',
664 'region': 'FakeRegion',
665 'api_version': 'v2.0'
666 }
667
668 expected = 'http://fake_url/some_path/v2.0'
669 self._test_base_url_helper(expected, filters, ('token', auth_data))
670
671 def test_base_url_with_unversioned_extra_path_endpoint(self):
672 auth_data = {
673 'catalog': [
674 {
675 'type': 'compute',
676 'endpoints': [
677 {
678 'region': 'FakeRegion',
679 'url': 'http://fake_url/some_path',
680 'interface': 'public'
681 }
682 ]
683 }
684 ]
685 }
686
687 filters = {
688 'service': 'compute',
689 'endpoint_type': 'publicURL',
690 'region': 'FakeRegion',
691 'api_version': 'v2.0'
692 }
693
694 expected = 'http://fake_url/some_path/v2.0'
695 self._test_base_url_helper(expected, filters, ('token', auth_data))
696
Andrea Frittoli (andreaf)3e82af72016-05-05 22:53:38 +0100697 # Base URL test with scope only for V3
698 def test_base_url_scope_project(self):
699 self.auth_provider.scope = 'project'
700 self.filters = {
701 'service': 'compute',
702 'endpoint_type': 'publicURL',
703 'region': 'FakeRegion'
704 }
705 expected = self._get_result_url_from_endpoint(
706 self._endpoints[0]['endpoints'][1])
707 self._test_base_url_helper(expected, self.filters)
708
709 # Base URL test with scope only for V3
710 def test_base_url_unscoped_identity(self):
711 self.auth_provider.scope = 'unscoped'
712 self.patchobject(v3_client.V3TokenClient, 'raw_request',
713 fake_identity._fake_v3_response_no_scope)
714 self.filters = {
715 'service': 'identity',
716 'endpoint_type': 'publicURL',
717 'region': 'FakeRegion'
718 }
719 expected = fake_identity.FAKE_AUTH_URL
720 self._test_base_url_helper(expected, self.filters)
721
722 # Base URL test with scope only for V3
723 def test_base_url_unscoped_other(self):
724 self.auth_provider.scope = 'unscoped'
725 self.patchobject(v3_client.V3TokenClient, 'raw_request',
726 fake_identity._fake_v3_response_no_scope)
727 self.filters = {
728 'service': 'compute',
729 'endpoint_type': 'publicURL',
730 'region': 'FakeRegion'
731 }
732 self.assertRaises(exceptions.EndpointNotFound,
733 self.auth_provider.base_url,
734 auth_data=self.auth_provider.auth_data,
735 filters=self.filters)
736
737 def test_auth_parameters_with_scope_unset(self):
738 # No scope defaults to 'project'
739 all_creds = fake_credentials.FakeKeystoneV3AllCredentials()
740 self.auth_provider.credentials = all_creds
741 auth_params = self.auth_provider._auth_params()
742 self.assertNotIn('scope', auth_params.keys())
743 for attr in all_creds.get_init_attributes():
744 if attr.startswith('domain_'):
745 self.assertNotIn(attr, auth_params.keys())
746 else:
747 self.assertIn(attr, auth_params.keys())
748 self.assertEqual(getattr(all_creds, attr), auth_params[attr])
749
750 def test_auth_parameters_with_project_scope(self):
751 all_creds = fake_credentials.FakeKeystoneV3AllCredentials()
752 self.auth_provider.credentials = all_creds
753 self.auth_provider.scope = 'project'
754 auth_params = self.auth_provider._auth_params()
755 self.assertNotIn('scope', auth_params.keys())
756 for attr in all_creds.get_init_attributes():
757 if attr.startswith('domain_'):
758 self.assertNotIn(attr, auth_params.keys())
759 else:
760 self.assertIn(attr, auth_params.keys())
761 self.assertEqual(getattr(all_creds, attr), auth_params[attr])
762
763 def test_auth_parameters_with_domain_scope(self):
764 all_creds = fake_credentials.FakeKeystoneV3AllCredentials()
765 self.auth_provider.credentials = all_creds
766 self.auth_provider.scope = 'domain'
767 auth_params = self.auth_provider._auth_params()
768 self.assertNotIn('scope', auth_params.keys())
769 for attr in all_creds.get_init_attributes():
770 if attr.startswith('project_'):
771 self.assertNotIn(attr, auth_params.keys())
772 else:
773 self.assertIn(attr, auth_params.keys())
774 self.assertEqual(getattr(all_creds, attr), auth_params[attr])
775
776 def test_auth_parameters_unscoped(self):
777 all_creds = fake_credentials.FakeKeystoneV3AllCredentials()
778 self.auth_provider.credentials = all_creds
779 self.auth_provider.scope = 'unscoped'
780 auth_params = self.auth_provider._auth_params()
781 self.assertNotIn('scope', auth_params.keys())
782 for attr in all_creds.get_init_attributes():
783 if attr.startswith('project_') or attr.startswith('domain_'):
784 self.assertNotIn(attr, auth_params.keys())
785 else:
786 self.assertIn(attr, auth_params.keys())
787 self.assertEqual(getattr(all_creds, attr), auth_params[attr])
788
John Warrenb10c6ca2016-02-26 15:32:37 -0500789
790class TestKeystoneV3Credentials(base.TestCase):
791 def testSetAttrUserDomain(self):
792 creds = auth.KeystoneV3Credentials()
793 creds.user_domain_name = 'user_domain'
794 creds.domain_name = 'domain'
795 self.assertEqual('user_domain', creds.user_domain_name)
796 creds = auth.KeystoneV3Credentials()
797 creds.domain_name = 'domain'
798 creds.user_domain_name = 'user_domain'
799 self.assertEqual('user_domain', creds.user_domain_name)
800
801 def testSetAttrProjectDomain(self):
802 creds = auth.KeystoneV3Credentials()
803 creds.project_domain_name = 'project_domain'
804 creds.domain_name = 'domain'
805 self.assertEqual('project_domain', creds.user_domain_name)
806 creds = auth.KeystoneV3Credentials()
807 creds.domain_name = 'domain'
808 creds.project_domain_name = 'project_domain'
809 self.assertEqual('project_domain', creds.project_domain_name)
810
811 def testProjectTenantNoCollision(self):
812 creds = auth.KeystoneV3Credentials(tenant_id='tenant')
813 self.assertEqual('tenant', creds.project_id)
814 creds = auth.KeystoneV3Credentials(project_id='project')
815 self.assertEqual('project', creds.tenant_id)
816 creds = auth.KeystoneV3Credentials(tenant_name='tenant')
817 self.assertEqual('tenant', creds.project_name)
818 creds = auth.KeystoneV3Credentials(project_name='project')
819 self.assertEqual('project', creds.tenant_name)
820
821 def testProjectTenantCollision(self):
822 attrs = {'tenant_id': 'tenant', 'project_id': 'project'}
823 self.assertRaises(
824 exceptions.InvalidCredentials, auth.KeystoneV3Credentials, **attrs)
825 attrs = {'tenant_name': 'tenant', 'project_name': 'project'}
826 self.assertRaises(
827 exceptions.InvalidCredentials, auth.KeystoneV3Credentials, **attrs)
Brant Knudsonf2d1f572016-04-11 15:02:01 -0500828
829
830class TestReplaceVersion(base.TestCase):
831 def test_version_no_trailing_path(self):
832 self.assertEqual(
833 'http://localhost:35357/v2.0',
834 auth.replace_version('http://localhost:35357/v3', 'v2.0'))
835
836 def test_version_no_trailing_path_solidus(self):
837 self.assertEqual(
838 'http://localhost:35357/v2.0/',
839 auth.replace_version('http://localhost:35357/v3/', 'v2.0'))
840
841 def test_version_trailing_path(self):
842 self.assertEqual(
843 'http://localhost:35357/v2.0/uuid',
844 auth.replace_version('http://localhost:35357/v3/uuid', 'v2.0'))
845
846 def test_version_trailing_path_solidus(self):
847 self.assertEqual(
848 'http://localhost:35357/v2.0/uuid/',
849 auth.replace_version('http://localhost:35357/v3/uuid/', 'v2.0'))
850
851 def test_no_version_base(self):
852 self.assertEqual(
853 'http://localhost:35357/v2.0',
854 auth.replace_version('http://localhost:35357', 'v2.0'))
855
856 def test_no_version_base_solidus(self):
Brant Knudsonf2d1f572016-04-11 15:02:01 -0500857 self.assertEqual(
Brant Knudson77293802016-04-11 15:14:54 -0500858 'http://localhost:35357/v2.0',
Brant Knudsonf2d1f572016-04-11 15:02:01 -0500859 auth.replace_version('http://localhost:35357/', 'v2.0'))
860
861 def test_no_version_path(self):
Brant Knudsonf2d1f572016-04-11 15:02:01 -0500862 self.assertEqual(
Brant Knudson77293802016-04-11 15:14:54 -0500863 'http://localhost/identity/v2.0',
Brant Knudsonf2d1f572016-04-11 15:02:01 -0500864 auth.replace_version('http://localhost/identity', 'v2.0'))
865
866 def test_no_version_path_solidus(self):
Brant Knudsonf2d1f572016-04-11 15:02:01 -0500867 self.assertEqual(
Brant Knudson77293802016-04-11 15:14:54 -0500868 'http://localhost/identity/v2.0',
Brant Knudsonf2d1f572016-04-11 15:02:01 -0500869 auth.replace_version('http://localhost/identity/', 'v2.0'))
870
871 def test_path_version(self):
872 self.assertEqual(
873 'http://localhost/identity/v2.0',
874 auth.replace_version('http://localhost/identity/v3', 'v2.0'))
875
876 def test_path_version_solidus(self):
877 self.assertEqual(
878 'http://localhost/identity/v2.0/',
879 auth.replace_version('http://localhost/identity/v3/', 'v2.0'))
880
881 def test_path_version_trailing_path(self):
882 self.assertEqual(
883 'http://localhost/identity/v2.0/uuid',
884 auth.replace_version('http://localhost/identity/v3/uuid', 'v2.0'))
885
886 def test_path_version_trailing_path_solidus(self):
887 self.assertEqual(
888 'http://localhost/identity/v2.0/uuid/',
889 auth.replace_version('http://localhost/identity/v3/uuid/', 'v2.0'))
Andrea Frittoli (andreaf)3e82af72016-05-05 22:53:38 +0100890
891
892class TestKeystoneV3AuthProvider_DomainScope(BaseAuthTestsSetUp):
893 _endpoints = fake_identity.IDENTITY_V3_RESPONSE['token']['catalog']
894 _auth_provider_class = auth.KeystoneV3AuthProvider
895 credentials = fake_credentials.FakeKeystoneV3Credentials()
896
897 def setUp(self):
898 super(TestKeystoneV3AuthProvider_DomainScope, self).setUp()
899 self.patchobject(v3_client.V3TokenClient, 'raw_request',
900 fake_identity._fake_v3_response_domain_scope)
901
902 def test_get_auth_with_domain_scope(self):
903 self.auth_provider.scope = 'domain'
904 _, auth_data = self.auth_provider.get_auth()
905 self.assertIn('domain', auth_data)
906 self.assertNotIn('project', auth_data)
ghanshyamc0d500a2016-06-15 09:50:21 +0900907
908
909class TestGetCredentials(base.TestCase):
910
911 def test_invalid_identity_version(self):
912 with testtools.ExpectedException(exceptions.InvalidIdentityVersion,
913 '.* v1 .*'):
914 auth.get_credentials('http://localhost/identity/v3',
915 identity_version='v1')