Merge "Make scenario tests requiring admin skip if no admin creds"
diff --git a/tempest/scenario/manager.py b/tempest/scenario/manager.py
index 81e771c..f8cc17c 100644
--- a/tempest/scenario/manager.py
+++ b/tempest/scenario/manager.py
@@ -14,7 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-import os
import subprocess
import netaddr
@@ -24,7 +23,6 @@
from tempest_lib import exceptions as lib_exc
from tempest import clients
-from tempest.common import cred_provider
from tempest.common import credentials
from tempest.common.utils.linux import remote_client
from tempest import config
@@ -50,7 +48,6 @@
cls.manager = clients.Manager(
credentials=cls.credentials()
)
- cls.admin_manager = clients.Manager(cls.admin_credentials())
@classmethod
def setup_clients(cls):
@@ -63,7 +60,6 @@
# Compute image client
cls.images_client = cls.manager.images_client
cls.keypairs_client = cls.manager.keypairs_client
- cls.networks_client = cls.admin_manager.networks_client
# Nova security groups client
cls.security_groups_client = cls.manager.security_groups_client
cls.servers_client = cls.manager.servers_client
@@ -542,6 +538,14 @@
super(NetworkScenarioTest, cls).skip_checks()
if not CONF.service_available.neutron:
raise cls.skipException('Neutron not available')
+ if not credentials.is_admin_available():
+ msg = ("Missing Identity Admin API credentials in configuration.")
+ raise cls.skipException(msg)
+
+ @classmethod
+ def setup_credentials(cls):
+ super(NetworkScenarioTest, cls).setup_credentials()
+ cls.admin_manager = clients.Manager(cls.admin_credentials())
@classmethod
def resource_setup(cls):
@@ -1283,9 +1287,17 @@
"""
@classmethod
+ def skip_checks(cls):
+ super(EncryptionScenarioTest, cls).skip_checks()
+ if not credentials.is_admin_available():
+ msg = ("Missing Identity Admin API credentials in configuration.")
+ raise cls.skipException(msg)
+
+ @classmethod
def setup_clients(cls):
super(EncryptionScenarioTest, cls).setup_clients()
- cls.admin_volume_types_client = cls.admin_manager.volume_types_client
+ admin_manager = clients.Manager(cls.admin_credentials())
+ cls.admin_volume_types_client = admin_manager.volume_types_client
def _wait_for_volume_status(self, status):
self.status_timeout(
@@ -1324,49 +1336,6 @@
control_location=control_location)
-class OrchestrationScenarioTest(ScenarioTest):
- """
- Base class for orchestration scenario tests
- """
-
- @classmethod
- def skip_checks(cls):
- super(OrchestrationScenarioTest, cls).skip_checks()
- if not CONF.service_available.heat:
- raise cls.skipException("Heat support is required")
-
- @classmethod
- def credentials(cls):
- admin_creds = cred_provider.get_configured_credentials(
- 'identity_admin')
- creds = cred_provider.get_configured_credentials('user')
- admin_creds.tenant_name = creds.tenant_name
- return admin_creds
-
- def _load_template(self, base_file, file_name):
- filepath = os.path.join(os.path.dirname(os.path.realpath(base_file)),
- file_name)
- with open(filepath) as f:
- return f.read()
-
- @classmethod
- def _stack_rand_name(cls):
- return data_utils.rand_name(cls.__name__ + '-')
-
- @classmethod
- def _get_default_network(cls):
- networks = cls.networks_client.list_networks()
- for net in networks:
- if net['label'] == CONF.compute.fixed_network_name:
- return net
-
- @staticmethod
- def _stack_output(stack, output_key):
- """Return a stack output value for a given key."""
- return next((o['output_value'] for o in stack['outputs']
- if o['output_key'] == output_key), None)
-
-
class SwiftScenarioTest(ScenarioTest):
"""
Provide harness to do Swift scenario tests.