Matthew Treinish | 0e2d8aa | 2013-07-18 15:16:37 -0400 | [diff] [blame^] | 1 | #!/usr/bin/env python |
| 2 | # vim: tabstop=4 shiftwidth=4 softtabstop=4 |
| 3 | |
| 4 | # Copyright (c) 2013, Nebula, Inc. |
| 5 | # Copyright 2010 United States Government as represented by the |
| 6 | # Administrator of the National Aeronautics and Space Administration. |
| 7 | # All Rights Reserved. |
| 8 | # |
| 9 | # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| 10 | # not use this file except in compliance with the License. You may obtain |
| 11 | # a copy of the License at |
| 12 | # |
| 13 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 14 | # |
| 15 | # Unless required by applicable law or agreed to in writing, software |
| 16 | # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| 17 | # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| 18 | # License for the specific language governing permissions and limitations |
| 19 | # under the License. |
| 20 | # |
| 21 | # Colorizer Code is borrowed from Twisted: |
| 22 | # Copyright (c) 2001-2010 Twisted Matrix Laboratories. |
| 23 | # |
| 24 | # Permission is hereby granted, free of charge, to any person obtaining |
| 25 | # a copy of this software and associated documentation files (the |
| 26 | # "Software"), to deal in the Software without restriction, including |
| 27 | # without limitation the rights to use, copy, modify, merge, publish, |
| 28 | # distribute, sublicense, and/or sell copies of the Software, and to |
| 29 | # permit persons to whom the Software is furnished to do so, subject to |
| 30 | # the following conditions: |
| 31 | # |
| 32 | # The above copyright notice and this permission notice shall be |
| 33 | # included in all copies or substantial portions of the Software. |
| 34 | # |
| 35 | # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, |
| 36 | # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
| 37 | # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND |
| 38 | # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE |
| 39 | # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION |
| 40 | # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION |
| 41 | # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
| 42 | |
| 43 | """Display a subunit stream through a colorized unittest test runner.""" |
| 44 | |
| 45 | import heapq |
| 46 | import subunit |
| 47 | import sys |
| 48 | import unittest |
| 49 | |
| 50 | import testtools |
| 51 | |
| 52 | |
| 53 | class _AnsiColorizer(object): |
| 54 | """ |
| 55 | A colorizer is an object that loosely wraps around a stream, allowing |
| 56 | callers to write text to the stream in a particular color. |
| 57 | |
| 58 | Colorizer classes must implement C{supported()} and C{write(text, color)}. |
| 59 | """ |
| 60 | _colors = dict(black=30, red=31, green=32, yellow=33, |
| 61 | blue=34, magenta=35, cyan=36, white=37) |
| 62 | |
| 63 | def __init__(self, stream): |
| 64 | self.stream = stream |
| 65 | |
| 66 | def supported(cls, stream=sys.stdout): |
| 67 | """ |
| 68 | A class method that returns True if the current platform supports |
| 69 | coloring terminal output using this method. Returns False otherwise. |
| 70 | """ |
| 71 | if not stream.isatty(): |
| 72 | return False # auto color only on TTYs |
| 73 | try: |
| 74 | import curses |
| 75 | except ImportError: |
| 76 | return False |
| 77 | else: |
| 78 | try: |
| 79 | try: |
| 80 | return curses.tigetnum("colors") > 2 |
| 81 | except curses.error: |
| 82 | curses.setupterm() |
| 83 | return curses.tigetnum("colors") > 2 |
| 84 | except Exception: |
| 85 | # guess false in case of error |
| 86 | return False |
| 87 | supported = classmethod(supported) |
| 88 | |
| 89 | def write(self, text, color): |
| 90 | """ |
| 91 | Write the given text to the stream in the given color. |
| 92 | |
| 93 | @param text: Text to be written to the stream. |
| 94 | |
| 95 | @param color: A string label for a color. e.g. 'red', 'white'. |
| 96 | """ |
| 97 | color = self._colors[color] |
| 98 | self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text)) |
| 99 | |
| 100 | |
| 101 | class _Win32Colorizer(object): |
| 102 | """ |
| 103 | See _AnsiColorizer docstring. |
| 104 | """ |
| 105 | def __init__(self, stream): |
| 106 | import win32console |
| 107 | red, green, blue, bold = (win32console.FOREGROUND_RED, |
| 108 | win32console.FOREGROUND_GREEN, |
| 109 | win32console.FOREGROUND_BLUE, |
| 110 | win32console.FOREGROUND_INTENSITY) |
| 111 | self.stream = stream |
| 112 | self.screenBuffer = win32console.GetStdHandle( |
| 113 | win32console.STD_OUT_HANDLE) |
| 114 | self._colors = {'normal': red | green | blue, |
| 115 | 'red': red | bold, |
| 116 | 'green': green | bold, |
| 117 | 'blue': blue | bold, |
| 118 | 'yellow': red | green | bold, |
| 119 | 'magenta': red | blue | bold, |
| 120 | 'cyan': green | blue | bold, |
| 121 | 'white': red | green | blue | bold} |
| 122 | |
| 123 | def supported(cls, stream=sys.stdout): |
| 124 | try: |
| 125 | import win32console |
| 126 | screenBuffer = win32console.GetStdHandle( |
| 127 | win32console.STD_OUT_HANDLE) |
| 128 | except ImportError: |
| 129 | return False |
| 130 | import pywintypes |
| 131 | try: |
| 132 | screenBuffer.SetConsoleTextAttribute( |
| 133 | win32console.FOREGROUND_RED | |
| 134 | win32console.FOREGROUND_GREEN | |
| 135 | win32console.FOREGROUND_BLUE) |
| 136 | except pywintypes.error: |
| 137 | return False |
| 138 | else: |
| 139 | return True |
| 140 | supported = classmethod(supported) |
| 141 | |
| 142 | def write(self, text, color): |
| 143 | color = self._colors[color] |
| 144 | self.screenBuffer.SetConsoleTextAttribute(color) |
| 145 | self.stream.write(text) |
| 146 | self.screenBuffer.SetConsoleTextAttribute(self._colors['normal']) |
| 147 | |
| 148 | |
| 149 | class _NullColorizer(object): |
| 150 | """ |
| 151 | See _AnsiColorizer docstring. |
| 152 | """ |
| 153 | def __init__(self, stream): |
| 154 | self.stream = stream |
| 155 | |
| 156 | def supported(cls, stream=sys.stdout): |
| 157 | return True |
| 158 | supported = classmethod(supported) |
| 159 | |
| 160 | def write(self, text, color): |
| 161 | self.stream.write(text) |
| 162 | |
| 163 | |
| 164 | def get_elapsed_time_color(elapsed_time): |
| 165 | if elapsed_time > 1.0: |
| 166 | return 'red' |
| 167 | elif elapsed_time > 0.25: |
| 168 | return 'yellow' |
| 169 | else: |
| 170 | return 'green' |
| 171 | |
| 172 | |
| 173 | class NovaTestResult(testtools.TestResult): |
| 174 | def __init__(self, stream, descriptions, verbosity): |
| 175 | super(NovaTestResult, self).__init__() |
| 176 | self.stream = stream |
| 177 | self.showAll = verbosity > 1 |
| 178 | self.num_slow_tests = 10 |
| 179 | self.slow_tests = [] # this is a fixed-sized heap |
| 180 | self.colorizer = None |
| 181 | # NOTE(vish): reset stdout for the terminal check |
| 182 | stdout = sys.stdout |
| 183 | sys.stdout = sys.__stdout__ |
| 184 | for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]: |
| 185 | if colorizer.supported(): |
| 186 | self.colorizer = colorizer(self.stream) |
| 187 | break |
| 188 | sys.stdout = stdout |
| 189 | self.start_time = None |
| 190 | self.last_time = {} |
| 191 | self.results = {} |
| 192 | self.last_written = None |
| 193 | |
| 194 | def _writeElapsedTime(self, elapsed): |
| 195 | color = get_elapsed_time_color(elapsed) |
| 196 | self.colorizer.write(" %.2f" % elapsed, color) |
| 197 | |
| 198 | def _addResult(self, test, *args): |
| 199 | try: |
| 200 | name = test.id() |
| 201 | except AttributeError: |
| 202 | name = 'Unknown.unknown' |
| 203 | test_class, test_name = name.rsplit('.', 1) |
| 204 | |
| 205 | elapsed = (self._now() - self.start_time).total_seconds() |
| 206 | item = (elapsed, test_class, test_name) |
| 207 | if len(self.slow_tests) >= self.num_slow_tests: |
| 208 | heapq.heappushpop(self.slow_tests, item) |
| 209 | else: |
| 210 | heapq.heappush(self.slow_tests, item) |
| 211 | |
| 212 | self.results.setdefault(test_class, []) |
| 213 | self.results[test_class].append((test_name, elapsed) + args) |
| 214 | self.last_time[test_class] = self._now() |
| 215 | self.writeTests() |
| 216 | |
| 217 | def _writeResult(self, test_name, elapsed, long_result, color, |
| 218 | short_result, success): |
| 219 | if self.showAll: |
| 220 | self.stream.write(' %s' % str(test_name).ljust(66)) |
| 221 | self.colorizer.write(long_result, color) |
| 222 | if success: |
| 223 | self._writeElapsedTime(elapsed) |
| 224 | self.stream.writeln() |
| 225 | else: |
| 226 | self.colorizer.write(short_result, color) |
| 227 | |
| 228 | def addSuccess(self, test): |
| 229 | super(NovaTestResult, self).addSuccess(test) |
| 230 | self._addResult(test, 'OK', 'green', '.', True) |
| 231 | |
| 232 | def addFailure(self, test, err): |
| 233 | if test.id() == 'process-returncode': |
| 234 | return |
| 235 | super(NovaTestResult, self).addFailure(test, err) |
| 236 | self._addResult(test, 'FAIL', 'red', 'F', False) |
| 237 | |
| 238 | def addError(self, test, err): |
| 239 | super(NovaTestResult, self).addFailure(test, err) |
| 240 | self._addResult(test, 'ERROR', 'red', 'E', False) |
| 241 | |
| 242 | def addSkip(self, test, reason=None, details=None): |
| 243 | super(NovaTestResult, self).addSkip(test, reason, details) |
| 244 | self._addResult(test, 'SKIP', 'blue', 'S', True) |
| 245 | |
| 246 | def startTest(self, test): |
| 247 | self.start_time = self._now() |
| 248 | super(NovaTestResult, self).startTest(test) |
| 249 | |
| 250 | def writeTestCase(self, cls): |
| 251 | if not self.results.get(cls): |
| 252 | return |
| 253 | if cls != self.last_written: |
| 254 | self.colorizer.write(cls, 'white') |
| 255 | self.stream.writeln() |
| 256 | for result in self.results[cls]: |
| 257 | self._writeResult(*result) |
| 258 | del self.results[cls] |
| 259 | self.stream.flush() |
| 260 | self.last_written = cls |
| 261 | |
| 262 | def writeTests(self): |
| 263 | time = self.last_time.get(self.last_written, self._now()) |
| 264 | if not self.last_written or (self._now() - time).total_seconds() > 2.0: |
| 265 | diff = 3.0 |
| 266 | while diff > 2.0: |
| 267 | classes = self.results.keys() |
| 268 | oldest = min(classes, key=lambda x: self.last_time[x]) |
| 269 | diff = (self._now() - self.last_time[oldest]).total_seconds() |
| 270 | self.writeTestCase(oldest) |
| 271 | else: |
| 272 | self.writeTestCase(self.last_written) |
| 273 | |
| 274 | def done(self): |
| 275 | self.stopTestRun() |
| 276 | |
| 277 | def stopTestRun(self): |
| 278 | for cls in list(self.results.iterkeys()): |
| 279 | self.writeTestCase(cls) |
| 280 | self.stream.writeln() |
| 281 | self.writeSlowTests() |
| 282 | |
| 283 | def writeSlowTests(self): |
| 284 | # Pare out 'fast' tests |
| 285 | slow_tests = [item for item in self.slow_tests |
| 286 | if get_elapsed_time_color(item[0]) != 'green'] |
| 287 | if slow_tests: |
| 288 | slow_total_time = sum(item[0] for item in slow_tests) |
| 289 | slow = ("Slowest %i tests took %.2f secs:" |
| 290 | % (len(slow_tests), slow_total_time)) |
| 291 | self.colorizer.write(slow, 'yellow') |
| 292 | self.stream.writeln() |
| 293 | last_cls = None |
| 294 | # sort by name |
| 295 | for elapsed, cls, name in sorted(slow_tests, |
| 296 | key=lambda x: x[1] + x[2]): |
| 297 | if cls != last_cls: |
| 298 | self.colorizer.write(cls, 'white') |
| 299 | self.stream.writeln() |
| 300 | last_cls = cls |
| 301 | self.stream.write(' %s' % str(name).ljust(68)) |
| 302 | self._writeElapsedTime(elapsed) |
| 303 | self.stream.writeln() |
| 304 | |
| 305 | def printErrors(self): |
| 306 | if self.showAll: |
| 307 | self.stream.writeln() |
| 308 | self.printErrorList('ERROR', self.errors) |
| 309 | self.printErrorList('FAIL', self.failures) |
| 310 | |
| 311 | def printErrorList(self, flavor, errors): |
| 312 | for test, err in errors: |
| 313 | self.colorizer.write("=" * 70, 'red') |
| 314 | self.stream.writeln() |
| 315 | self.colorizer.write(flavor, 'red') |
| 316 | self.stream.writeln(": %s" % test.id()) |
| 317 | self.colorizer.write("-" * 70, 'red') |
| 318 | self.stream.writeln() |
| 319 | self.stream.writeln("%s" % err) |
| 320 | |
| 321 | |
| 322 | test = subunit.ProtocolTestCase(sys.stdin, passthrough=None) |
| 323 | |
| 324 | if sys.version_info[0:2] <= (2, 6): |
| 325 | runner = unittest.TextTestRunner(verbosity=2) |
| 326 | else: |
| 327 | runner = unittest.TextTestRunner(verbosity=2, resultclass=NovaTestResult) |
| 328 | |
| 329 | if runner.run(test).wasSuccessful(): |
| 330 | exit_code = 0 |
| 331 | else: |
| 332 | exit_code = 1 |
| 333 | sys.exit(exit_code) |