blob: 9274fce207bc38f1b46be0bd187126c95c881602 [file] [log] [blame]
wanghaoaa1f2f92013-10-10 11:30:37 +08001# Copyright 2013 Huawei Technologies Co.,LTD
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
Zhi Kun Liubb363a22013-11-28 18:47:39 +080016from tempest.api.volume import base
wanghaoaa1f2f92013-10-10 11:30:37 +080017from tempest.common.utils import data_utils as utils
18from tempest.test import attr
19
20
Zhi Kun Liubb363a22013-11-28 18:47:39 +080021class VolumesActionsTest(base.BaseVolumeV1AdminTest):
wanghaoaa1f2f92013-10-10 11:30:37 +080022 _interface = "json"
23
24 @classmethod
25 def setUpClass(cls):
26 super(VolumesActionsTest, cls).setUpClass()
27 cls.client = cls.volumes_client
28
29 # Create admin volume client
30 cls.admin_volume_client = cls.os_adm.volumes_client
31
32 # Create a test shared volume for tests
33 vol_name = utils.rand_name(cls.__name__ + '-Volume-')
34
35 resp, cls.volume = cls.client.create_volume(size=1,
36 display_name=vol_name)
37 cls.client.wait_for_volume_status(cls.volume['id'], 'available')
38
39 @classmethod
40 def tearDownClass(cls):
41 # Delete the test volume
42 cls.client.delete_volume(cls.volume['id'])
43 cls.client.wait_for_resource_deletion(cls.volume['id'])
44
45 super(VolumesActionsTest, cls).tearDownClass()
46
47 def _reset_volume_status(self, volume_id, status):
wanghao9d3d6cb2013-11-12 15:10:10 +080048 # Reset the volume status
wanghaoaa1f2f92013-10-10 11:30:37 +080049 resp, body = self.admin_volume_client.reset_volume_status(volume_id,
50 status)
51 return resp, body
52
53 def tearDown(self):
54 # Set volume's status to available after test
55 self._reset_volume_status(self.volume['id'], 'available')
56 super(VolumesActionsTest, self).tearDown()
57
wanghao9d3d6cb2013-11-12 15:10:10 +080058 def _create_temp_volume(self):
59 # Create a temp volume for force delete tests
60 vol_name = utils.rand_name('Volume')
61 resp, temp_volume = self.client.create_volume(size=1,
62 display_name=vol_name)
63 self.client.wait_for_volume_status(temp_volume['id'], 'available')
64
65 return temp_volume
66
67 def _create_reset_and_force_delete_temp_volume(self, status=None):
68 # Create volume, reset volume status, and force delete temp volume
69 temp_volume = self._create_temp_volume()
70 if status:
71 resp, body = self._reset_volume_status(temp_volume['id'], status)
72 self.assertEqual(202, resp.status)
73 resp_delete, volume_delete = self.admin_volume_client.\
74 force_delete_volume(temp_volume['id'])
75 self.assertEqual(202, resp_delete.status)
76 self.client.wait_for_resource_deletion(temp_volume['id'])
77
wanghaoaa1f2f92013-10-10 11:30:37 +080078 @attr(type='gate')
79 def test_volume_reset_status(self):
80 # test volume reset status : available->error->available
81 resp, body = self._reset_volume_status(self.volume['id'], 'error')
82 self.assertEqual(202, resp.status)
83 resp_get, volume_get = self.admin_volume_client.get_volume(
84 self.volume['id'])
85 self.assertEqual('error', volume_get['status'])
86
87 @attr(type='gate')
88 def test_volume_begin_detaching(self):
89 # test volume begin detaching : available -> detaching -> available
90 resp, body = self.client.volume_begin_detaching(self.volume['id'])
91 self.assertEqual(202, resp.status)
92 resp_get, volume_get = self.client.get_volume(self.volume['id'])
93 self.assertEqual('detaching', volume_get['status'])
94
95 @attr(type='gate')
96 def test_volume_roll_detaching(self):
97 # test volume roll detaching : detaching -> in-use -> available
98 resp, body = self.client.volume_begin_detaching(self.volume['id'])
99 self.assertEqual(202, resp.status)
100 resp, body = self.client.volume_roll_detaching(self.volume['id'])
101 self.assertEqual(202, resp.status)
102 resp_get, volume_get = self.client.get_volume(self.volume['id'])
103 self.assertEqual('in-use', volume_get['status'])
104
wanghao9d3d6cb2013-11-12 15:10:10 +0800105 def test_volume_force_delete_when_volume_is_creating(self):
106 # test force delete when status of volume is creating
107 self._create_reset_and_force_delete_temp_volume('creating')
108
109 def test_volume_force_delete_when_volume_is_attaching(self):
110 # test force delete when status of volume is attaching
111 self._create_reset_and_force_delete_temp_volume('attaching')
112
113 @attr(type='gate')
114 def test_volume_force_delete_when_volume_is_error(self):
115 # test force delete when status of volume is error
116 self._create_reset_and_force_delete_temp_volume('error')
117
wanghaoaa1f2f92013-10-10 11:30:37 +0800118
119class VolumesActionsTestXML(VolumesActionsTest):
120 _interface = "xml"