Merge "Reduce complexity during import phase"
diff --git a/tempest/common/generator/base_generator.py b/tempest/common/generator/base_generator.py
index 0398af1..ae4e743 100644
--- a/tempest/common/generator/base_generator.py
+++ b/tempest/common/generator/base_generator.py
@@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import copy
import functools
import jsonschema
@@ -30,9 +31,11 @@
return expected_result
-def generator_type(*args):
+def generator_type(*args, **kwargs):
def wrapper(func):
func.types = args
+ for key in kwargs:
+ setattr(func, key, kwargs[key])
return func
return wrapper
@@ -106,37 +109,71 @@
jsonschema.Draft4Validator.check_schema(schema['json-schema'])
jsonschema.validate(schema, self.schema)
- def generate(self, schema):
+ def generate_scenarios(self, schema, path=None):
"""
- Generate an json dictionary based on a schema.
- Only one value is mis-generated for each dictionary created.
+ Generates the scenario (all possible test cases) out of the given
+ schema.
- Any generator must return a list of tuples or a single tuple.
- The values of this tuple are:
- result[0]: Name of the test
- result[1]: json schema for the test
- result[2]: expected result of the test (can be None)
+ :param schema: a dict style schema (see ``BasicGeneratorSet.schema``)
+ :param path: the schema path if the given schema is a subschema
"""
- LOG.debug("generate_invalid: %s" % schema)
- schema_type = schema["type"]
- if isinstance(schema_type, list):
+ schema_type = schema['type']
+ scenarios = []
+
+ if schema_type == 'object':
+ properties = schema["properties"]
+ for attribute, definition in properties.iteritems():
+ current_path = copy.copy(path)
+ if path is not None:
+ current_path.append(attribute)
+ else:
+ current_path = [attribute]
+ scenarios.extend(
+ self.generate_scenarios(definition, current_path))
+ elif isinstance(schema_type, list):
if "integer" in schema_type:
schema_type = "integer"
else:
raise Exception("non-integer list types not supported")
- result = []
- if schema_type not in self.types_dict:
- raise TypeError("generator (%s) doesn't support type: %s"
- % (self.__class__.__name__, schema_type))
for generator in self.types_dict[schema_type]:
- ret = generator(schema)
- if ret is not None:
- if isinstance(ret, list):
- result.extend(ret)
- elif isinstance(ret, tuple):
- result.append(ret)
- else:
- raise Exception("generator (%s) returns invalid result: %s"
- % (generator, ret))
- LOG.debug("result: %s" % result)
- return result
+ if hasattr(generator, "needed_property"):
+ prop = generator.needed_property
+ if (prop not in schema or
+ schema[prop] is None or
+ schema[prop] is False):
+ continue
+
+ name = generator.__name__
+ if path is not None:
+ name = "%s_%s" % ("_".join(path), name)
+ scenarios.append({
+ "_negtest_name": name,
+ "_negtest_generator": generator,
+ "_negtest_schema": schema,
+ "_negtest_path": path})
+ return scenarios
+
+ def generate_payload(self, test, schema):
+ """
+ Generates one jsonschema out of the given test. It's mandatory to use
+ generate_scenarios before to register all needed variables to the test.
+
+ :param test: A test object (scenario) with all _negtest variables on it
+ :param schema: schema for the test
+ """
+ generator = test._negtest_generator
+ ret = generator(test._negtest_schema)
+ path = copy.copy(test._negtest_path)
+ expected_result = None
+
+ if ret is not None:
+ generator_result = generator(test._negtest_schema)
+ invalid_snippet = generator_result[1]
+ expected_result = generator_result[2]
+ element = path.pop()
+ if len(path) > 0:
+ schema_snip = reduce(dict.get, path, schema)
+ schema_snip[element] = invalid_snippet
+ else:
+ schema[element] = invalid_snippet
+ return expected_result
diff --git a/tempest/common/generator/negative_generator.py b/tempest/common/generator/negative_generator.py
index 4f3d2cd..1d5ed43 100644
--- a/tempest/common/generator/negative_generator.py
+++ b/tempest/common/generator/negative_generator.py
@@ -47,65 +47,32 @@
if min_length > 0:
return "x" * (min_length - 1)
- @base.generator_type("string")
+ @base.generator_type("string", needed_property="maxLength")
@base.simple_generator
def gen_str_max_length(self, schema):
max_length = schema.get("maxLength", -1)
- if max_length > -1:
- return "x" * (max_length + 1)
+ return "x" * (max_length + 1)
- @base.generator_type("integer")
+ @base.generator_type("integer", needed_property="minimum")
@base.simple_generator
def gen_int_min(self, schema):
- if "minimum" in schema:
- minimum = schema["minimum"]
- if "exclusiveMinimum" not in schema:
- minimum -= 1
- return minimum
+ minimum = schema["minimum"]
+ if "exclusiveMinimum" not in schema:
+ minimum -= 1
+ return minimum
- @base.generator_type("integer")
+ @base.generator_type("integer", needed_property="maximum")
@base.simple_generator
def gen_int_max(self, schema):
- if "maximum" in schema:
- maximum = schema["maximum"]
- if "exclusiveMaximum" not in schema:
- maximum += 1
- return maximum
+ maximum = schema["maximum"]
+ if "exclusiveMaximum" not in schema:
+ maximum += 1
+ return maximum
- @base.generator_type("object")
- def gen_obj_remove_attr(self, schema):
- invalids = []
- valid_schema = valid.ValidTestGenerator().generate_valid(schema)
- required = schema.get("required", [])
- for r in required:
- new_valid = copy.deepcopy(valid_schema)
- del new_valid[r]
- invalids.append(("gen_obj_remove_attr", new_valid, None))
- return invalids
-
- @base.generator_type("object")
+ @base.generator_type("object", needed_property="additionalProperties")
@base.simple_generator
def gen_obj_add_attr(self, schema):
valid_schema = valid.ValidTestGenerator().generate_valid(schema)
- if not schema.get("additionalProperties", True):
- new_valid = copy.deepcopy(valid_schema)
- new_valid["$$$$$$$$$$"] = "xxx"
- return new_valid
-
- @base.generator_type("object")
- def gen_inv_prop_obj(self, schema):
- LOG.debug("generate_invalid_object: %s" % schema)
- valid_schema = valid.ValidTestGenerator().generate_valid(schema)
- invalids = []
- properties = schema["properties"]
-
- for k, v in properties.iteritems():
- for invalid in self.generate(v):
- LOG.debug(v)
- new_valid = copy.deepcopy(valid_schema)
- new_valid[k] = invalid[1]
- name = "prop_%s_%s" % (k, invalid[0])
- invalids.append((name, new_valid, invalid[2]))
-
- LOG.debug("generate_invalid_object return: %s" % invalids)
- return invalids
+ new_valid = copy.deepcopy(valid_schema)
+ new_valid["$$$$$$$$$$"] = "xxx"
+ return new_valid
diff --git a/tempest/common/generator/valid_generator.py b/tempest/common/generator/valid_generator.py
index 0d7b398..7b80afc 100644
--- a/tempest/common/generator/valid_generator.py
+++ b/tempest/common/generator/valid_generator.py
@@ -54,5 +54,28 @@
obj[k] = self.generate_valid(v)
return obj
+ def generate(self, schema):
+ schema_type = schema["type"]
+ if isinstance(schema_type, list):
+ if "integer" in schema_type:
+ schema_type = "integer"
+ else:
+ raise Exception("non-integer list types not supported")
+ result = []
+ if schema_type not in self.types_dict:
+ raise TypeError("generator (%s) doesn't support type: %s"
+ % (self.__class__.__name__, schema_type))
+ for generator in self.types_dict[schema_type]:
+ ret = generator(schema)
+ if ret is not None:
+ if isinstance(ret, list):
+ result.extend(ret)
+ elif isinstance(ret, tuple):
+ result.append(ret)
+ else:
+ raise Exception("generator (%s) returns invalid result: %s"
+ % (generator, ret))
+ return result
+
def generate_valid(self, schema):
return self.generate(schema)[0][1]
diff --git a/tempest/test.py b/tempest/test.py
index d3cedf6..adc1d37 100644
--- a/tempest/test.py
+++ b/tempest/test.py
@@ -510,13 +510,9 @@
"expected_result": expected_result
}))
if schema is not None:
- for name, schema, expected_result in generator.generate(schema):
- if (expected_result is None and
- "default_result_code" in description):
- expected_result = description["default_result_code"]
- scenario_list.append((name,
- {"schema": schema,
- "expected_result": expected_result}))
+ for scenario in generator.generate_scenarios(schema):
+ scenario_list.append((scenario['_negtest_name'],
+ scenario))
LOG.debug(scenario_list)
return scenario_list
@@ -546,8 +542,14 @@
"""
LOG.info("Executing %s" % description["name"])
LOG.debug(description)
+ generator = importutils.import_class(
+ CONF.negative.test_generator)()
+ schema = description.get("json-schema", None)
method = description["http-method"]
url = description["url"]
+ expected_result = None
+ if "default_result_code" in description:
+ expected_result = description["default_result_code"]
resources = [self.get_resource(r) for
r in description.get("resources", [])]
@@ -557,13 +559,19 @@
# entry (see get_resource).
# We just send a valid json-schema with it
valid_schema = None
- schema = description.get("json-schema", None)
if schema:
valid_schema = \
valid.ValidTestGenerator().generate_valid(schema)
new_url, body = self._http_arguments(valid_schema, url, method)
- elif hasattr(self, "schema"):
- new_url, body = self._http_arguments(self.schema, url, method)
+ elif hasattr(self, "_negtest_name"):
+ schema_under_test = \
+ valid.ValidTestGenerator().generate_valid(schema)
+ local_expected_result = \
+ generator.generate_payload(self, schema_under_test)
+ if local_expected_result is not None:
+ expected_result = local_expected_result
+ new_url, body = \
+ self._http_arguments(schema_under_test, url, method)
else:
raise Exception("testscenarios are not active. Please make sure "
"that your test runner supports the load_tests "
@@ -575,7 +583,7 @@
client = self.client
resp, resp_body = client.send_request(method, new_url,
resources, body=body)
- self._check_negative_response(resp.status, resp_body)
+ self._check_negative_response(expected_result, resp.status, resp_body)
def _http_arguments(self, json_dict, url, method):
LOG.debug("dict: %s url: %s method: %s" % (json_dict, url, method))
@@ -586,8 +594,7 @@
else:
return url, json.dumps(json_dict)
- def _check_negative_response(self, result, body):
- expected_result = getattr(self, "expected_result", None)
+ def _check_negative_response(self, expected_result, result, body):
self.assertTrue(result >= 400 and result < 500 and result != 413,
"Expected client error, got %s:%s" %
(result, body))
diff --git a/tempest/tests/negative/test_negative_auto_test.py b/tempest/tests/negative/test_negative_auto_test.py
index dddd083..fb1da43 100644
--- a/tempest/tests/negative/test_negative_auto_test.py
+++ b/tempest/tests/negative/test_negative_auto_test.py
@@ -43,9 +43,9 @@
def _check_prop_entries(self, result, entry):
entries = [a for a in result if entry in a[0]]
self.assertIsNotNone(entries)
- self.assertIs(len(entries), 2)
+ self.assertGreater(len(entries), 1)
for entry in entries:
- self.assertIsNotNone(entry[1]['schema'])
+ self.assertIsNotNone(entry[1]['_negtest_name'])
def _check_resource_entries(self, result, entry):
entries = [a for a in result if entry in a[0]]
@@ -57,12 +57,11 @@
def test_generate_scenario(self):
scenarios = test.NegativeAutoTest.\
generate_scenario(self.fake_input_desc)
-
self.assertIsInstance(scenarios, list)
for scenario in scenarios:
self.assertIsInstance(scenario, tuple)
self.assertIsInstance(scenario[0], str)
self.assertIsInstance(scenario[1], dict)
- self._check_prop_entries(scenarios, "prop_minRam")
- self._check_prop_entries(scenarios, "prop_minDisk")
+ self._check_prop_entries(scenarios, "minRam")
+ self._check_prop_entries(scenarios, "minDisk")
self._check_resource_entries(scenarios, "inv_res")
diff --git a/tempest/tests/negative/test_negative_generators.py b/tempest/tests/negative/test_negative_generators.py
index a7af619..2fa6933 100644
--- a/tempest/tests/negative/test_negative_generators.py
+++ b/tempest/tests/negative/test_negative_generators.py
@@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
+import copy
+
import jsonschema
import mock
@@ -86,15 +88,6 @@
class BaseNegativeGenerator(object):
types = ['string', 'integer', 'object']
- fake_input_str = {"type": "string",
- "minLength": 2,
- "maxLength": 8,
- 'results': {'gen_int': 404}}
-
- fake_input_int = {"type": "integer",
- "maximum": 255,
- "minimum": 1}
-
fake_input_obj = {"type": "object",
"properties": {"minRam": {"type": "integer"},
"diskName": {"type": "string"},
@@ -106,31 +99,21 @@
"type": "not_defined"
}
- def _validate_result(self, data):
- self.assertTrue(isinstance(data, list))
- for t in data:
- self.assertIsInstance(t, tuple)
- self.assertEqual(3, len(t))
- self.assertIsInstance(t[0], str)
+ class fake_test_class(object):
+ def __init__(self, scenario):
+ for k, v in scenario.iteritems():
+ setattr(self, k, v)
- def test_generate_string(self):
- result = self.generator.generate(self.fake_input_str)
- self._validate_result(result)
-
- def test_generate_integer(self):
- result = self.generator.generate(self.fake_input_int)
- self._validate_result(result)
-
- def test_generate_obj(self):
- result = self.generator.generate(self.fake_input_obj)
- self._validate_result(result)
+ def _validate_result(self, valid_schema, invalid_schema):
+ for k, v in valid_schema.iteritems():
+ self.assertTrue(k in invalid_schema)
def test_generator_mandatory_functions(self):
for data_type in self.types:
self.assertIn(data_type, self.generator.types_dict)
def test_generate_with_unknown_type(self):
- self.assertRaises(TypeError, self.generator.generate,
+ self.assertRaises(TypeError, self.generator.generate_payload,
self.unknown_type_schema)
@@ -151,3 +134,16 @@
def setUp(self):
super(TestNegativeNegativeGenerator, self).setUp()
self.generator = negative_generator.NegativeTestGenerator()
+
+ def test_generate_obj(self):
+ schema = self.fake_input_obj
+ scenarios = self.generator.generate_scenarios(schema)
+ for scenario in scenarios:
+ test = self.fake_test_class(scenario)
+ valid_schema = \
+ valid_generator.ValidTestGenerator().generate_valid(schema)
+ schema_under_test = copy.copy(valid_schema)
+ expected_result = \
+ self.generator.generate_payload(test, schema_under_test)
+ self.assertEqual(expected_result, None)
+ self._validate_result(valid_schema, schema_under_test)