Merge "Remove XML related code from RestClient"
diff --git a/etc/tempest.conf.sample b/etc/tempest.conf.sample
index 3c6eb44..86dda80 100644
--- a/etc/tempest.conf.sample
+++ b/etc/tempest.conf.sample
@@ -517,9 +517,6 @@
# From tempest.config
#
-# Enable diagnostic commands (boolean value)
-#enable = true
-
# A regex to determine which requests should be traced. This is a
# regex to match the caller for rest client requests to be able to
# selectively trace calls out of specific classes and methods. It
diff --git a/setup.cfg b/setup.cfg
index 90ea944..1e7cc2b 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,6 +1,6 @@
[metadata]
name = tempest
-version = 3
+version = 4
summary = OpenStack Integration Testing
description-file =
README.rst
diff --git a/tempest/common/commands.py b/tempest/common/commands.py
index 6583475..e68c20e 100644
--- a/tempest/common/commands.py
+++ b/tempest/common/commands.py
@@ -19,63 +19,6 @@
LOG = logging.getLogger(__name__)
-# NOTE(afazekas):
-# These commands assumes the tempest node is the same as
-# the only one service node. all-in-one installation.
-
-
-def sudo_cmd_call(cmd):
- args = shlex.split(cmd.encode('utf-8'))
- subprocess_args = {'stdout': subprocess.PIPE,
- 'stderr': subprocess.STDOUT}
- proc = subprocess.Popen(['/usr/bin/sudo', '-n'] + args,
- **subprocess_args)
- stdout = proc.communicate()[0]
- if proc.returncode != 0:
- LOG.error(("Command {0} returned with exit status {1},"
- "output {2}").format(cmd, proc.returncode, stdout))
- return stdout
-
-
-def ip_addr_raw():
- return sudo_cmd_call("ip a")
-
-
-def ip_route_raw():
- return sudo_cmd_call("ip r")
-
-
-def ip_ns_raw():
- return sudo_cmd_call("ip netns list")
-
-
-def iptables_raw(table):
- return sudo_cmd_call("iptables --line-numbers -L -nv -t " + table)
-
-
-def ip_ns_list():
- return ip_ns_raw().split()
-
-
-def ip_ns_exec(ns, cmd):
- return sudo_cmd_call(" ".join(("ip netns exec", ns, cmd)))
-
-
-def ip_ns_addr(ns):
- return ip_ns_exec(ns, "ip a")
-
-
-def ip_ns_route(ns):
- return ip_ns_exec(ns, "ip r")
-
-
-def iptables_ns(ns, table):
- return ip_ns_exec(ns, "iptables -v -S -t " + table)
-
-
-def ovs_db_dump():
- return sudo_cmd_call("ovsdb-client dump")
-
def copy_file_to_host(file_from, dest, host, username, pkey):
dest = "%s@%s:%s" % (username, host, dest)
diff --git a/tempest/common/debug.py b/tempest/common/debug.py
deleted file mode 100644
index 16e5ffe..0000000
--- a/tempest/common/debug.py
+++ /dev/null
@@ -1,51 +0,0 @@
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest.common import commands
-from tempest import config
-from tempest.openstack.common import log as logging
-
-CONF = config.CONF
-LOG = logging.getLogger(__name__)
-
-TABLES = ['filter', 'nat', 'mangle']
-
-
-def log_ip_ns():
- if not CONF.debug.enable:
- return
- LOG.info("Host Addr:\n" + commands.ip_addr_raw())
- LOG.info("Host Route:\n" + commands.ip_route_raw())
- for table in TABLES:
- LOG.info('Host %s table:\n%s', table, commands.iptables_raw(table))
- ns_list = commands.ip_ns_list()
- LOG.info("Host ns list" + str(ns_list))
- for ns in ns_list:
- LOG.info("ns(%s) Addr:\n%s", ns, commands.ip_ns_addr(ns))
- LOG.info("ns(%s) Route:\n%s", ns, commands.ip_ns_route(ns))
- for table in TABLES:
- LOG.info('ns(%s) table(%s):\n%s', ns, table,
- commands.iptables_ns(ns, table))
-
-
-def log_ovs_db():
- if not CONF.debug.enable or not CONF.service_available.neutron:
- return
- db_dump = commands.ovs_db_dump()
- LOG.info("OVS DB:\n" + db_dump)
-
-
-def log_net_debug():
- log_ip_ns()
- log_ovs_db()
diff --git a/tempest/common/utils/linux/remote_client.py b/tempest/common/utils/linux/remote_client.py
index 6a238d0..d8bfef8 100644
--- a/tempest/common/utils/linux/remote_client.py
+++ b/tempest/common/utils/linux/remote_client.py
@@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import netaddr
import re
import time
@@ -87,7 +88,9 @@
return self.exec_command(cmd)
def ping_host(self, host):
- cmd = 'ping -c1 -w1 %s' % host
+ addr = netaddr.IPAddress(host)
+ cmd = 'ping6' if addr.version == 6 else 'ping'
+ cmd += ' -c1 -w1 {0}'.format(host)
return self.exec_command(cmd)
def get_mac_address(self):
diff --git a/tempest/config.py b/tempest/config.py
index 03346c9..cc6d626 100644
--- a/tempest/config.py
+++ b/tempest/config.py
@@ -926,9 +926,6 @@
title="Debug System")
DebugGroup = [
- cfg.BoolOpt('enable',
- default=True,
- help="Enable diagnostic commands"),
cfg.StrOpt('trace_requests',
default='',
help="""A regex to determine which requests should be traced.
diff --git a/tempest/scenario/manager.py b/tempest/scenario/manager.py
index e46ec6d..410d90a 100644
--- a/tempest/scenario/manager.py
+++ b/tempest/scenario/manager.py
@@ -24,7 +24,6 @@
from tempest import auth
from tempest import clients
from tempest.common import credentials
-from tempest.common import debug
from tempest.common.utils import data_utils
from tempest.common.utils.linux import remote_client
from tempest import config
@@ -326,7 +325,6 @@
linux_client.validate_authentication()
except Exception:
LOG.exception('Initializing SSH connection to %s failed' % ip)
- debug.log_net_debug()
# If we don't explicitely set for which servers we want to
# log the console output then all the servers will be logged.
# See the definition of _log_console_output()
@@ -400,7 +398,6 @@
# network debug is called as part of ssh init
if not isinstance(exc, exceptions.SSHTimeout):
LOG.debug('Network information on a devstack host')
- debug.log_net_debug()
def create_server_snapshot(self, server, name=None):
# Glance client
@@ -510,15 +507,12 @@
username,
private_key,
should_connect=should_connect)
- except Exception as e:
+ except Exception:
ex_msg = 'Public network connectivity check failed'
if msg:
ex_msg += ": " + msg
LOG.exception(ex_msg)
self._log_console_output(servers)
- # network debug is called as part of ssh init
- if not isinstance(e, exceptions.SSHTimeout):
- debug.log_net_debug()
raise
def create_floating_ip(self, thing, pool_name=None):
@@ -612,22 +606,31 @@
cidr_in_use = self._list_subnets(tenant_id=tenant_id, cidr=cidr)
return len(cidr_in_use) != 0
- tenant_cidr = netaddr.IPNetwork(CONF.network.tenant_network_cidr)
+ ip_version = kwargs.pop('ip_version', 4)
+
+ if ip_version == 6:
+ tenant_cidr = netaddr.IPNetwork(
+ CONF.network.tenant_network_v6_cidr)
+ num_bits = CONF.network.tenant_network_v6_mask_bits
+ else:
+ tenant_cidr = netaddr.IPNetwork(CONF.network.tenant_network_cidr)
+ num_bits = CONF.network.tenant_network_mask_bits
+
result = None
+ str_cidr = None
# Repeatedly attempt subnet creation with sequential cidr
# blocks until an unallocated block is found.
- for subnet_cidr in tenant_cidr.subnet(
- CONF.network.tenant_network_mask_bits):
+ for subnet_cidr in tenant_cidr.subnet(num_bits):
str_cidr = str(subnet_cidr)
if cidr_in_use(str_cidr, tenant_id=network.tenant_id):
continue
subnet = dict(
name=data_utils.rand_name(namestart),
- ip_version=4,
network_id=network.id,
tenant_id=network.tenant_id,
cidr=str_cidr,
+ ip_version=ip_version,
**kwargs
)
try:
@@ -658,12 +661,17 @@
self.addCleanup(self.delete_wrapper, port.delete)
return port
- def _get_server_port_id(self, server, ip_addr=None):
+ def _get_server_port_id_and_ip4(self, server, ip_addr=None):
ports = self._list_ports(device_id=server['id'],
fixed_ip=ip_addr)
self.assertEqual(len(ports), 1,
"Unable to determine which port to target.")
- return ports[0]['id']
+ # it might happen here that this port has more then one ip address
+ # as in case of dual stack- when this port is created on 2 subnets
+ for ip46 in ports[0]['fixed_ips']:
+ ip = ip46['ip_address']
+ if netaddr.valid_ipv4(ip):
+ return ports[0]['id'], ip
def _get_network_by_name(self, network_name):
net = self._list_networks(name=network_name)
@@ -679,11 +687,14 @@
if not client:
client = self.network_client
if not port_id:
- port_id = self._get_server_port_id(thing)
+ port_id, ip4 = self._get_server_port_id_and_ip4(thing)
+ else:
+ ip4 = None
_, result = client.create_floatingip(
floating_network_id=external_network_id,
port_id=port_id,
- tenant_id=thing['tenant_id']
+ tenant_id=thing['tenant_id'],
+ fixed_ip_address=ip4
)
floating_ip = net_resources.DeletableFloatingIp(
client=client,
@@ -692,7 +703,7 @@
return floating_ip
def _associate_floating_ip(self, floating_ip, server):
- port_id = self._get_server_port_id(server)
+ port_id, _ = self._get_server_port_id_and_ip4(server)
floating_ip.update(port_id=port_id)
self.assertEqual(port_id, floating_ip.port_id)
return floating_ip
diff --git a/tempest/scenario/test_network_basic_ops.py b/tempest/scenario/test_network_basic_ops.py
index d24eb01..98e3fda 100644
--- a/tempest/scenario/test_network_basic_ops.py
+++ b/tempest/scenario/test_network_basic_ops.py
@@ -18,7 +18,6 @@
import testtools
-from tempest.common import debug
from tempest.common.utils import data_utils
from tempest import config
from tempest import exceptions
@@ -325,7 +324,6 @@
LOG.exception("Unable to access {dest} via ssh to "
"floating-ip {src}".format(dest=remote_ip,
src=floating_ip))
- debug.log_ip_ns()
raise
@test.attr(type='smoke')
diff --git a/tempest/scenario/test_network_v6.py b/tempest/scenario/test_network_v6.py
new file mode 100644
index 0000000..c9f1fe1
--- /dev/null
+++ b/tempest/scenario/test_network_v6.py
@@ -0,0 +1,146 @@
+# Copyright 2014 Cisco Systems, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import netaddr
+from tempest import config
+from tempest.openstack.common import log as logging
+from tempest.scenario import manager
+from tempest import test
+
+
+CONF = config.CONF
+LOG = logging.getLogger(__name__)
+
+
+class TestGettingAddress(manager.NetworkScenarioTest):
+ """Create network with 2 subnets: IPv4 and IPv6 in a given address mode
+ Boot 2 VMs on this network
+ Allocate and assign 2 FIP4
+ Check that vNIC of server matches port data from OpenStack DB
+ Ping4 tenant IPv4 of one VM from another one
+ Will do the same with ping6 when available in VM
+ """
+
+ @classmethod
+ def resource_setup(cls):
+ # Create no network resources for these tests.
+ cls.set_network_resources()
+ super(TestGettingAddress, cls).resource_setup()
+
+ @classmethod
+ def check_preconditions(cls):
+ if not (CONF.network_feature_enabled.ipv6
+ and CONF.network_feature_enabled.ipv6_subnet_attributes):
+ cls.enabled = False
+ raise cls.skipException('IPv6 or its attributes not supported')
+ if not (CONF.network.tenant_networks_reachable
+ or CONF.network.public_network_id):
+ msg = ('Either tenant_networks_reachable must be "true", or '
+ 'public_network_id must be defined.')
+ cls.enabled = False
+ raise cls.skipException(msg)
+ super(TestGettingAddress, cls).check_preconditions()
+
+ def setUp(self):
+ super(TestGettingAddress, self).setUp()
+ self.keypair = self.create_keypair()
+ self.sec_grp = self._create_security_group(tenant_id=self.tenant_id)
+ self.srv_kwargs = {
+ 'key_name': self.keypair['name'],
+ 'security_groups': [self.sec_grp]}
+
+ def prepare_network(self, address6_mode):
+ """Creates network with
+ one IPv6 subnet in the given mode and
+ one IPv4 subnet
+ Creates router with ports on both subnets
+ """
+ net = self._create_network(tenant_id=self.tenant_id)
+ sub4 = self._create_subnet(network=net,
+ namestart='sub4',
+ ip_version=4,)
+ # since https://bugs.launchpad.net/neutron/+bug/1394112 we need
+ # to specify gateway_ip manually
+ net_range = netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr)
+ gateway_ip = (netaddr.IPAddress(net_range) + 1).format()
+ sub6 = self._create_subnet(network=net,
+ namestart='sub6',
+ ip_version=6,
+ gateway_ip=gateway_ip,
+ ipv6_ra_mode=address6_mode,
+ ipv6_address_mode=address6_mode)
+
+ router = self._get_router(tenant_id=self.tenant_id)
+ sub4.add_to_router(router_id=router['id'])
+ sub6.add_to_router(router_id=router['id'])
+ self.addCleanup(sub4.delete)
+ self.addCleanup(sub6.delete)
+
+ @staticmethod
+ def define_server_ips(srv):
+ for net_name, nics in srv['addresses'].iteritems():
+ for nic in nics:
+ if nic['version'] == 6:
+ srv['accessIPv6'] = nic['addr']
+ else:
+ srv['accessIPv4'] = nic['addr']
+
+ def prepare_server(self):
+ username = CONF.compute.image_ssh_user
+
+ srv = self.create_server(create_kwargs=self.srv_kwargs)
+ fip = self.create_floating_ip(thing=srv)
+ self.define_server_ips(srv=srv)
+ ssh = self.get_remote_client(
+ server_or_ip=fip.floating_ip_address,
+ username=username)
+ return ssh, srv
+
+ def _prepare_and_test(self, address6_mode):
+ self.prepare_network(address6_mode=address6_mode)
+
+ ssh1, srv1 = self.prepare_server()
+ ssh2, srv2 = self.prepare_server()
+
+ result = ssh1.get_ip_list()
+ self.assertIn(srv1['accessIPv4'], result)
+ # v6 should be configured since the image supports it
+ self.assertIn(srv1['accessIPv6'], result)
+ result = ssh2.get_ip_list()
+ self.assertIn(srv2['accessIPv4'], result)
+ # v6 should be configured since the image supports it
+ self.assertIn(srv2['accessIPv6'], result)
+ result = ssh1.ping_host(srv2['accessIPv4'])
+ self.assertIn('0% packet loss', result)
+ result = ssh2.ping_host(srv1['accessIPv4'])
+ self.assertIn('0% packet loss', result)
+
+ # Some VM (like cirros) may not have ping6 utility
+ result = ssh1.exec_command('whereis ping6')
+ is_ping6 = False if result == 'ping6:\n' else True
+ if is_ping6:
+ result = ssh1.ping_host(srv2['accessIPv6'])
+ self.assertIn('0% packet loss', result)
+ result = ssh2.ping_host(srv1['accessIPv6'])
+ self.assertIn('0% packet loss', result)
+ else:
+ LOG.warning('Ping6 is not available, skipping')
+
+ @test.services('compute', 'network')
+ def test_slaac_from_os(self):
+ self._prepare_and_test(address6_mode='slaac')
+
+ @test.services('compute', 'network')
+ def test_dhcp6_stateless_from_os(self):
+ self._prepare_and_test(address6_mode='dhcpv6-stateless')
diff --git a/tempest/scenario/test_security_groups_basic_ops.py b/tempest/scenario/test_security_groups_basic_ops.py
index 7e6805c..d8f7a26 100644
--- a/tempest/scenario/test_security_groups_basic_ops.py
+++ b/tempest/scenario/test_security_groups_basic_ops.py
@@ -14,7 +14,6 @@
# under the License.
from tempest import clients
-from tempest.common import debug
from tempest.common.utils import data_utils
from tempest import config
from tempest.openstack.common import log as logging
@@ -334,15 +333,8 @@
msg = "Timed out waiting for %s to become reachable" % ip
else:
msg = "%s is reachable" % ip
- try:
- self.assertTrue(self._check_remote_connectivity(access_point, ip,
- should_succeed),
- msg)
- except test.exceptions.SSHTimeout:
- raise
- except Exception:
- debug.log_net_debug()
- raise
+ self.assertTrue(self._check_remote_connectivity(access_point, ip,
+ should_succeed), msg)
def _test_in_tenant_block(self, tenant):
access_point_ssh = self._connect_to_access_point(tenant)
diff --git a/tempest/services/network/json/network_client.py b/tempest/services/network/json/network_client.py
index 46475f0..809c98b 100644
--- a/tempest/services/network/json/network_client.py
+++ b/tempest/services/network/json/network_client.py
@@ -40,8 +40,13 @@
def deserialize_list(self, body):
res = json.loads(body)
# expecting response in form
- # {'resources': [ res1, res2] }
- return res[res.keys()[0]]
+ # {'resources': [ res1, res2] } => when pagination disabled
+ # {'resources': [..], 'resources_links': {}} => if pagination enabled
+ pagination_suffix = "_links"
+ for k in res.keys():
+ if k[-len(pagination_suffix):] == pagination_suffix:
+ continue
+ return res[k]
def serialize(self, data):
return json.dumps(data)
diff --git a/tempest/tests/common/test_debug.py b/tempest/tests/common/test_debug.py
deleted file mode 100644
index 8a880f2..0000000
--- a/tempest/tests/common/test_debug.py
+++ /dev/null
@@ -1,122 +0,0 @@
-# Copyright 2014 NEC Corporation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import mock
-
-from tempest.common import debug
-from tempest import config
-from tempest.openstack.common.fixture import mockpatch
-from tempest import test
-from tempest.tests import base
-from tempest.tests import fake_config
-
-
-class TestDebug(base.TestCase):
-
- def setUp(self):
- super(TestDebug, self).setUp()
- self.useFixture(fake_config.ConfigFixture())
- self.stubs.Set(config, 'TempestConfigPrivate', fake_config.FakePrivate)
-
- common_pre = 'tempest.common.commands'
- self.ip_addr_raw_mock = self.patch(common_pre + '.ip_addr_raw')
- self.ip_route_raw_mock = self.patch(common_pre + '.ip_route_raw')
- self.iptables_raw_mock = self.patch(common_pre + '.iptables_raw')
- self.ip_ns_list_mock = self.patch(common_pre + '.ip_ns_list')
- self.ip_ns_addr_mock = self.patch(common_pre + '.ip_ns_addr')
- self.ip_ns_route_mock = self.patch(common_pre + '.ip_ns_route')
- self.iptables_ns_mock = self.patch(common_pre + '.iptables_ns')
- self.ovs_db_dump_mock = self.patch(common_pre + '.ovs_db_dump')
-
- self.log_mock = self.patch('tempest.common.debug.LOG')
-
- def test_log_ip_ns_debug_disabled(self):
- self.useFixture(mockpatch.PatchObject(test.CONF.debug,
- 'enable', False))
- debug.log_ip_ns()
- self.assertFalse(self.ip_addr_raw_mock.called)
- self.assertFalse(self.log_mock.info.called)
-
- def test_log_ip_ns_debug_enabled(self):
- self.useFixture(mockpatch.PatchObject(test.CONF.debug,
- 'enable', True))
-
- self.ip_ns_list_mock.return_value = [1, 2]
-
- debug.log_ip_ns()
- self.ip_addr_raw_mock.assert_called_with()
- self.assertTrue(self.log_mock.info.called)
- self.ip_route_raw_mock.assert_called_with()
- self.assertEqual(len(debug.TABLES), self.iptables_raw_mock.call_count)
- for table in debug.TABLES:
- self.assertIn(mock.call(table),
- self.iptables_raw_mock.call_args_list)
-
- self.ip_ns_list_mock.assert_called_with()
- self.assertEqual(len(self.ip_ns_list_mock.return_value),
- self.ip_ns_addr_mock.call_count)
- self.assertEqual(len(self.ip_ns_list_mock.return_value),
- self.ip_ns_route_mock.call_count)
- for ns in self.ip_ns_list_mock.return_value:
- self.assertIn(mock.call(ns),
- self.ip_ns_addr_mock.call_args_list)
- self.assertIn(mock.call(ns),
- self.ip_ns_route_mock.call_args_list)
-
- self.assertEqual(len(debug.TABLES) *
- len(self.ip_ns_list_mock.return_value),
- self.iptables_ns_mock.call_count)
- for ns in self.ip_ns_list_mock.return_value:
- for table in debug.TABLES:
- self.assertIn(mock.call(ns, table),
- self.iptables_ns_mock.call_args_list)
-
- def test_log_ovs_db_debug_disabled(self):
- self.useFixture(mockpatch.PatchObject(test.CONF.debug,
- 'enable', False))
- self.useFixture(mockpatch.PatchObject(test.CONF.service_available,
- 'neutron', False))
- debug.log_ovs_db()
- self.assertFalse(self.ovs_db_dump_mock.called)
-
- self.useFixture(mockpatch.PatchObject(test.CONF.debug,
- 'enable', True))
- self.useFixture(mockpatch.PatchObject(test.CONF.service_available,
- 'neutron', False))
- debug.log_ovs_db()
- self.assertFalse(self.ovs_db_dump_mock.called)
-
- self.useFixture(mockpatch.PatchObject(test.CONF.debug,
- 'enable', False))
- self.useFixture(mockpatch.PatchObject(test.CONF.service_available,
- 'neutron', True))
- debug.log_ovs_db()
- self.assertFalse(self.ovs_db_dump_mock.called)
-
- def test_log_ovs_db_debug_enabled(self):
- self.useFixture(mockpatch.PatchObject(test.CONF.debug,
- 'enable', True))
- self.useFixture(mockpatch.PatchObject(test.CONF.service_available,
- 'neutron', True))
- debug.log_ovs_db()
- self.ovs_db_dump_mock.assert_called_with()
-
- def test_log_net_debug(self):
- self.log_ip_ns_mock = self.patch('tempest.common.debug.log_ip_ns')
- self.log_ovs_db_mock = self.patch('tempest.common.debug.log_ovs_db')
-
- debug.log_net_debug()
- self.log_ip_ns_mock.assert_called_with()
- self.log_ovs_db_mock.assert_called_with()
diff --git a/tempest/tests/test_commands.py b/tempest/tests/test_commands.py
deleted file mode 100644
index 2379741..0000000
--- a/tempest/tests/test_commands.py
+++ /dev/null
@@ -1,89 +0,0 @@
-# Copyright 2014 NEC Corporation. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import subprocess
-
-import mock
-
-from tempest.common import commands
-from tempest.tests import base
-
-
-class TestCommands(base.TestCase):
-
- def setUp(self):
- super(TestCommands, self).setUp()
- self.subprocess_args = {'stdout': subprocess.PIPE,
- 'stderr': subprocess.STDOUT}
-
- @mock.patch('subprocess.Popen')
- def test_ip_addr_raw(self, mock):
- expected = ['/usr/bin/sudo', '-n', 'ip', 'a']
- commands.ip_addr_raw()
- mock.assert_called_once_with(expected, **self.subprocess_args)
-
- @mock.patch('subprocess.Popen')
- def test_ip_route_raw(self, mock):
- expected = ['/usr/bin/sudo', '-n', 'ip', 'r']
- commands.ip_route_raw()
- mock.assert_called_once_with(expected, **self.subprocess_args)
-
- @mock.patch('subprocess.Popen')
- def test_ip_ns_raw(self, mock):
- expected = ['/usr/bin/sudo', '-n', 'ip', 'netns', 'list']
- commands.ip_ns_raw()
- mock.assert_called_once_with(expected, **self.subprocess_args)
-
- @mock.patch('subprocess.Popen')
- def test_iptables_raw(self, mock):
- table = 'filter'
- expected = ['/usr/bin/sudo', '-n', 'iptables', '--line-numbers',
- '-L', '-nv', '-t',
- '%s' % table]
- commands.iptables_raw(table)
- mock.assert_called_once_with(expected, **self.subprocess_args)
-
- @mock.patch('subprocess.Popen')
- def test_ip_ns_list(self, mock):
- expected = ['/usr/bin/sudo', '-n', 'ip', 'netns', 'list']
- commands.ip_ns_list()
- mock.assert_called_once_with(expected, **self.subprocess_args)
-
- @mock.patch('subprocess.Popen')
- def test_ip_ns_addr(self, mock):
- ns_list = commands.ip_ns_list()
- for ns in ns_list:
- expected = ['/usr/bin/sudo', '-n', 'ip', 'netns', 'exec', ns,
- 'ip', 'a']
- commands.ip_ns_addr(ns)
- mock.assert_called_once_with(expected, **self.subprocess_args)
-
- @mock.patch('subprocess.Popen')
- def test_ip_ns_route(self, mock):
- ns_list = commands.ip_ns_list()
- for ns in ns_list:
- expected = ['/usr/bin/sudo', '-n', 'ip', 'netns', 'exec', ns,
- 'ip', 'r']
- commands.ip_ns_route(ns)
- mock.assert_called_once_with(expected, **self.subprocess_args)
-
- @mock.patch('subprocess.Popen')
- def test_iptables_ns(self, mock):
- table = 'filter'
- ns_list = commands.ip_ns_list()
- for ns in ns_list:
- expected = ['/usr/bin/sudo', '-n', 'ip', 'netns', 'exec', ns,
- 'iptables', '-v', '-S', '-t', table]
- commands.iptables_ns(ns, table)
- mock.assert_called_once_with(expected, **self.subprocess_args)