blob: 7b0f48b8a7a6a753fa074899985bae2946a2fba2 [file] [log] [blame]
# Copyright 2016 NEC Corporation. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.compute import base
from tempest.common import utils
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
CONF = config.CONF
class VolumesAdminNegativeTest(base.BaseV2ComputeAdminTest):
create_default_network = True
@classmethod
def skip_checks(cls):
super(VolumesAdminNegativeTest, cls).skip_checks()
if not CONF.service_available.cinder:
skip_msg = ("%s skipped as Cinder is not available" % cls.__name__)
raise cls.skipException(skip_msg)
@classmethod
def resource_setup(cls):
super(VolumesAdminNegativeTest, cls).resource_setup()
cls.server = cls.create_test_server(wait_until='ACTIVE')
@decorators.attr(type=['negative'])
@decorators.idempotent_id('309b5ecd-0585-4a7e-a36f-d2b2bf55259d')
def test_update_attached_volume_with_nonexistent_volume_in_uri(self):
volume = self.create_volume()
nonexistent_volume = data_utils.rand_uuid()
self.assertRaises(lib_exc.NotFound,
self.admin_servers_client.update_attached_volume,
self.server['id'], nonexistent_volume,
volumeId=volume['id'])
@decorators.related_bug('1629110', status_code=400)
@decorators.attr(type=['negative'])
@decorators.idempotent_id('7dcac15a-b107-46d3-a5f6-cb863f4e454a')
def test_update_attached_volume_with_nonexistent_volume_in_body(self):
volume = self.create_volume()
self.attach_volume(self.server, volume)
nonexistent_volume = data_utils.rand_uuid()
self.assertRaises(lib_exc.BadRequest,
self.admin_servers_client.update_attached_volume,
self.server['id'], volume['id'],
volumeId=nonexistent_volume)
class UpdateMultiattachVolumeNegativeTest(base.BaseV2ComputeAdminTest):
min_microversion = '2.60'
volume_min_microversion = '3.27'
@classmethod
def skip_checks(cls):
super(UpdateMultiattachVolumeNegativeTest, cls).skip_checks()
if not CONF.compute_feature_enabled.volume_multiattach:
raise cls.skipException('Volume multi-attach is not available.')
@decorators.attr(type=['negative'])
@decorators.idempotent_id('7576d497-b7c6-44bd-9cc5-c5b4e50fec71')
@utils.services('volume')
def test_multiattach_rw_volume_update_failure(self):
# Create two multiattach capable volumes.
vol1 = self.create_volume(multiattach=True)
vol2 = self.create_volume(multiattach=True)
# Create two instances.
server1 = self.create_test_server(wait_until='ACTIVE')
server2 = self.create_test_server(wait_until='ACTIVE')
# Attach vol1 to both of these instances.
vol1_attachment1 = self.attach_volume(server1, vol1)
vol1_attachment2 = self.attach_volume(server2, vol1)
# Assert that we now have two attachments.
vol1 = self.volumes_client.show_volume(vol1['id'])['volume']
self.assertEqual(2, len(vol1['attachments']))
# By default both of these attachments should have an attach_mode of
# read-write, assert that here to ensure the following calls to update
# the volume will be rejected.
for volume_attachment in vol1['attachments']:
attachment_id = volume_attachment['attachment_id']
attachment = self.attachments_client.show_attachment(
attachment_id)['attachment']
self.assertEqual('rw', attachment['attach_mode'])
# Assert that a BadRequest is raised when we attempt to update volume1
# to volume2 on server1 or server2.
self.assertRaises(lib_exc.BadRequest,
self.admin_servers_client.update_attached_volume,
server1['id'], vol1['id'], volumeId=vol2['id'])
self.assertRaises(lib_exc.BadRequest,
self.admin_servers_client.update_attached_volume,
server2['id'], vol1['id'], volumeId=vol2['id'])
# Fetch the volume 1 to check the current attachments.
vol1 = self.volumes_client.show_volume(vol1['id'])['volume']
vol1_attachment_ids = [a['id'] for a in vol1['attachments']]
# Assert that volume 1 is still attached to both server 1 and 2.
self.assertIn(vol1_attachment1['id'], vol1_attachment_ids)
self.assertIn(vol1_attachment2['id'], vol1_attachment_ids)
# Assert that volume 2 has no attachments.
vol2 = self.volumes_client.show_volume(vol2['id'])['volume']
self.assertEqual([], vol2['attachments'])