Merge "Remove nova-network from scenario tests"
diff --git a/tempest/scenario/manager.py b/tempest/scenario/manager.py
index 27d30f2..5188191 100644
--- a/tempest/scenario/manager.py
+++ b/tempest/scenario/manager.py
@@ -96,10 +96,6 @@
"""Compute and Compute security groups client"""
cls.compute_images_client = cls.os_primary.compute_images_client
cls.keypairs_client = cls.os_primary.keypairs_client
- cls.compute_security_groups_client = (
- cls.os_primary.compute_security_groups_client)
- cls.compute_security_group_rules_client = (
- cls.os_primary.compute_security_group_rules_client)
cls.servers_client = cls.os_primary.servers_client
cls.interface_client = cls.os_primary.interfaces_client
@@ -546,69 +542,163 @@
self.addCleanup(self.cleanup_volume_type, volume_type)
return volume_type
- def create_loginable_secgroup_rule(self, secgroup_id=None, rulesets=None):
- """Create loginable security group rule by compute clients.
-
- This function will create by default the following rules:
- 1. tcp port 22 allow rule in order to allow ssh access for ipv4
- 2. ipv4 icmp allow rule in order to allow icmpv4
- """
-
- _client = self.compute_security_groups_client
- _client_rules = self.compute_security_group_rules_client
- if secgroup_id is None:
- sgs = _client.list_security_groups()['security_groups']
- for sg in sgs:
- if sg['name'] == 'default':
- secgroup_id = sg['id']
-
- # These rules are intended to permit inbound ssh and icmp
- # traffic from all sources, so no group_id is provided.
- # Setting a group_id would only permit traffic from ports
- # belonging to the same security group.
- if not rulesets:
- rulesets = [
- {
- # ssh
- 'ip_protocol': 'tcp',
- 'from_port': 22,
- 'to_port': 22,
- 'cidr': '0.0.0.0/0',
- },
- {
- # ping
- 'ip_protocol': 'icmp',
- 'from_port': -1,
- 'to_port': -1,
- 'cidr': '0.0.0.0/0',
- }
- ]
- rules = list()
- for ruleset in rulesets:
- sg_rule = _client_rules.create_security_group_rule(
- parent_group_id=secgroup_id, **ruleset)['security_group_rule']
- rules.append(sg_rule)
- return rules
-
- def create_security_group(self, **kwargs):
- """Create security group and add rules to security group"""
- if not kwargs.get('name'):
- kwargs['name'] = data_utils.rand_name(self.__class__.__name__)
- if not kwargs.get('description'):
- kwargs['description'] = kwargs['name'] + " description"
- secgroup = self.compute_security_groups_client.create_security_group(
- **kwargs)['security_group']
- self.assertEqual(secgroup['name'], kwargs['name'])
- self.assertEqual(secgroup['description'], kwargs['description'])
- self.addCleanup(
- test_utils.call_and_ignore_notfound_exc,
- self.compute_security_groups_client.delete_security_group,
- secgroup['id'])
+ def create_security_group(self, security_group_rules_client=None,
+ project_id=None,
+ namestart='secgroup-smoke',
+ security_groups_client=None):
+ if security_group_rules_client is None:
+ security_group_rules_client = self.security_group_rules_client
+ if security_groups_client is None:
+ security_groups_client = self.security_groups_client
+ if project_id is None:
+ project_id = security_groups_client.project_id
+ secgroup = self.create_empty_security_group(
+ namestart=namestart, client=security_groups_client,
+ project_id=project_id)
# Add rules to the security group
- self.create_loginable_secgroup_rule(secgroup['id'])
+ rules = self.create_loginable_secgroup_rule(
+ security_group_rules_client=security_group_rules_client,
+ secgroup=secgroup,
+ security_groups_client=security_groups_client)
+ for rule in rules:
+ self.assertEqual(project_id, rule['project_id'])
+ self.assertEqual(secgroup['id'], rule['security_group_id'])
return secgroup
+ def create_empty_security_group(self, client=None, project_id=None,
+ namestart='secgroup-smoke'):
+ """Create a security group without rules.
+
+ Default rules will be created:
+ - IPv4 egress to any
+ - IPv6 egress to any
+ :param project_id: secgroup will be created in this project
+ :returns: the created security group
+ """
+
+ if client is None:
+ client = self.security_groups_client
+ if not project_id:
+ project_id = client.project_id
+ sg_name = data_utils.rand_name(namestart)
+ sg_desc = sg_name + " description"
+ sg_dict = dict(name=sg_name,
+ description=sg_desc)
+ sg_dict['project_id'] = project_id
+ result = client.create_security_group(**sg_dict)
+
+ secgroup = result['security_group']
+ self.assertEqual(secgroup['name'], sg_name)
+ self.assertEqual(project_id, secgroup['project_id'])
+ self.assertEqual(secgroup['description'], sg_desc)
+
+ self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+ client.delete_security_group, secgroup['id'])
+ return secgroup
+
+ def create_security_group_rule(self, secgroup=None,
+ sec_group_rules_client=None,
+ project_id=None,
+ security_groups_client=None, **kwargs):
+ """Create a rule from a dictionary of rule parameters.
+
+ Create a rule in a secgroup. if secgroup not defined will search for
+ default secgroup in project_id.
+ :param secgroup: the security group.
+ :param project_id: if secgroup not passed -- the tenant in which to
+ search for default secgroup
+ :param kwargs: a dictionary containing rule parameters:
+ for example, to allow incoming ssh:
+ rule = {
+ direction: 'ingress'
+ protocol:'tcp',
+ port_range_min: 22,
+ port_range_max: 22
+ }
+ """
+
+ if sec_group_rules_client is None:
+ sec_group_rules_client = self.security_group_rules_client
+ if security_groups_client is None:
+ security_groups_client = self.security_groups_client
+ if not project_id:
+ project_id = security_groups_client.project_id
+ if secgroup is None:
+ # Get default secgroup for project_id
+ default_secgroups = security_groups_client.list_security_groups(
+ name='default', project_id=project_id)['security_groups']
+ msg = "No default security group for project %s." % (project_id)
+ self.assertNotEmpty(default_secgroups, msg)
+ secgroup = default_secgroups[0]
+
+ ruleset = dict(security_group_id=secgroup['id'],
+ project_id=secgroup['project_id'])
+ ruleset.update(kwargs)
+
+ sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
+ sg_rule = sg_rule['security_group_rule']
+
+ self.assertEqual(secgroup['tenant_id'], sg_rule['tenant_id'])
+ self.assertEqual(secgroup['id'], sg_rule['security_group_id'])
+
+ return sg_rule
+
+ def create_loginable_secgroup_rule(self, security_group_rules_client=None,
+ secgroup=None,
+ security_groups_client=None):
+ """Create loginable security group rule by neutron clients by default.
+
+ This function will create:
+ 1. egress and ingress tcp port 22 allow rule in order to allow ssh
+ access for ipv4.
+ 2. egress and ingress ipv6 icmp allow rule, in order to allow icmpv6.
+ 3. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4.
+ """
+
+ if security_group_rules_client is None:
+ security_group_rules_client = self.security_group_rules_client
+ if security_groups_client is None:
+ security_groups_client = self.security_groups_client
+ rules = []
+ rulesets = [
+ dict(
+ # ssh
+ protocol='tcp',
+ port_range_min=22,
+ port_range_max=22,
+ ),
+ dict(
+ # ping
+ protocol='icmp',
+ ),
+ dict(
+ # ipv6-icmp for ping6
+ protocol='icmp',
+ ethertype='IPv6',
+ )
+ ]
+ sec_group_rules_client = security_group_rules_client
+ for ruleset in rulesets:
+ for r_direction in ['ingress', 'egress']:
+ ruleset['direction'] = r_direction
+ try:
+ sg_rule = self.create_security_group_rule(
+ sec_group_rules_client=sec_group_rules_client,
+ secgroup=secgroup,
+ security_groups_client=security_groups_client,
+ **ruleset)
+ except lib_exc.Conflict as ex:
+ # if rule already exist - skip rule and continue
+ msg = 'Security group rule already exists'
+ if msg not in ex._error_string:
+ raise ex
+ else:
+ self.assertEqual(r_direction, sg_rule['direction'])
+ rules.append(sg_rule)
+
+ return rules
+
def get_remote_client(self, ip_address, username=None, private_key=None,
server=None):
"""Get a SSH client to a remote server
@@ -1381,165 +1471,6 @@
self.log_console_output()
self.fail(msg)
- def create_security_group(self, security_group_rules_client=None,
- project_id=None,
- namestart='secgroup-smoke',
- security_groups_client=None):
- if security_group_rules_client is None:
- security_group_rules_client = self.security_group_rules_client
- if security_groups_client is None:
- security_groups_client = self.security_groups_client
- if project_id is None:
- project_id = security_groups_client.project_id
- secgroup = self.create_empty_security_group(
- namestart=namestart, client=security_groups_client,
- project_id=project_id)
-
- # Add rules to the security group
- rules = self.create_loginable_secgroup_rule(
- security_group_rules_client=security_group_rules_client,
- secgroup=secgroup,
- security_groups_client=security_groups_client)
- for rule in rules:
- self.assertEqual(project_id, rule['project_id'])
- self.assertEqual(secgroup['id'], rule['security_group_id'])
- return secgroup
-
- def create_empty_security_group(self, client=None, project_id=None,
- namestart='secgroup-smoke'):
- """Create a security group without rules.
-
- Default rules will be created:
- - IPv4 egress to any
- - IPv6 egress to any
-
- :param project_id: secgroup will be created in this project
- :returns: the created security group
- """
-
- if client is None:
- client = self.security_groups_client
- if not project_id:
- project_id = client.project_id
- sg_name = data_utils.rand_name(namestart)
- sg_desc = sg_name + " description"
- sg_dict = dict(name=sg_name,
- description=sg_desc)
- sg_dict['project_id'] = project_id
- result = client.create_security_group(**sg_dict)
-
- secgroup = result['security_group']
- self.assertEqual(secgroup['name'], sg_name)
- self.assertEqual(project_id, secgroup['project_id'])
- self.assertEqual(secgroup['description'], sg_desc)
-
- self.addCleanup(test_utils.call_and_ignore_notfound_exc,
- client.delete_security_group, secgroup['id'])
- return secgroup
-
- def create_security_group_rule(self, secgroup=None,
- sec_group_rules_client=None,
- project_id=None,
- security_groups_client=None, **kwargs):
- """Create a rule from a dictionary of rule parameters.
-
- Create a rule in a secgroup. if secgroup not defined will search for
- default secgroup in project_id.
-
- :param secgroup: the security group.
- :param project_id: if secgroup not passed -- the tenant in which to
- search for default secgroup
- :param kwargs: a dictionary containing rule parameters:
- for example, to allow incoming ssh:
- rule = {
- direction: 'ingress'
- protocol:'tcp',
- port_range_min: 22,
- port_range_max: 22
- }
- """
-
- if sec_group_rules_client is None:
- sec_group_rules_client = self.security_group_rules_client
- if security_groups_client is None:
- security_groups_client = self.security_groups_client
- if not project_id:
- project_id = security_groups_client.project_id
- if secgroup is None:
- # Get default secgroup for project_id
- default_secgroups = security_groups_client.list_security_groups(
- name='default', project_id=project_id)['security_groups']
- msg = "No default security group for project %s." % (project_id)
- self.assertNotEmpty(default_secgroups, msg)
- secgroup = default_secgroups[0]
-
- ruleset = dict(security_group_id=secgroup['id'],
- project_id=secgroup['project_id'])
- ruleset.update(kwargs)
-
- sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
- sg_rule = sg_rule['security_group_rule']
-
- self.assertEqual(secgroup['tenant_id'], sg_rule['tenant_id'])
- self.assertEqual(secgroup['id'], sg_rule['security_group_id'])
-
- return sg_rule
-
- def create_loginable_secgroup_rule(self, security_group_rules_client=None,
- secgroup=None,
- security_groups_client=None):
- """Create loginable security group rule by neutron clients by default.
-
- This function will create:
- 1. egress and ingress tcp port 22 allow rule in order to allow ssh
- access for ipv4.
- 2. egress and ingress ipv6 icmp allow rule, in order to allow icmpv6.
- 3. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4.
- """
-
- if security_group_rules_client is None:
- security_group_rules_client = self.security_group_rules_client
- if security_groups_client is None:
- security_groups_client = self.security_groups_client
- rules = []
- rulesets = [
- dict(
- # ssh
- protocol='tcp',
- port_range_min=22,
- port_range_max=22,
- ),
- dict(
- # ping
- protocol='icmp',
- ),
- dict(
- # ipv6-icmp for ping6
- protocol='icmp',
- ethertype='IPv6',
- )
- ]
- sec_group_rules_client = security_group_rules_client
- for ruleset in rulesets:
- for r_direction in ['ingress', 'egress']:
- ruleset['direction'] = r_direction
- try:
- sg_rule = self.create_security_group_rule(
- sec_group_rules_client=sec_group_rules_client,
- secgroup=secgroup,
- security_groups_client=security_groups_client,
- **ruleset)
- except lib_exc.Conflict as ex:
- # if rule already exist - skip rule and continue
- msg = 'Security group rule already exists'
- if msg not in ex._error_string:
- raise ex
- else:
- self.assertEqual(r_direction, sg_rule['direction'])
- rules.append(sg_rule)
-
- return rules
-
def get_router(self, client=None, project_id=None, **kwargs):
"""Retrieve a router for the given tenant id.