blob: a3a0616a9c9a8444cb6355f7289317f7a5e21e56 [file] [log] [blame]
Matthew Treinish0e2d8aa2013-07-18 15:16:37 -04001#!/usr/bin/env python
Matthew Treinish0e2d8aa2013-07-18 15:16:37 -04002
3# Copyright (c) 2013, Nebula, Inc.
4# Copyright 2010 United States Government as represented by the
5# Administrator of the National Aeronautics and Space Administration.
6# All Rights Reserved.
7#
8# Licensed under the Apache License, Version 2.0 (the "License"); you may
9# not use this file except in compliance with the License. You may obtain
10# a copy of the License at
11#
12# http://www.apache.org/licenses/LICENSE-2.0
13#
14# Unless required by applicable law or agreed to in writing, software
15# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17# License for the specific language governing permissions and limitations
18# under the License.
19#
20# Colorizer Code is borrowed from Twisted:
21# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
22#
23# Permission is hereby granted, free of charge, to any person obtaining
24# a copy of this software and associated documentation files (the
25# "Software"), to deal in the Software without restriction, including
26# without limitation the rights to use, copy, modify, merge, publish,
27# distribute, sublicense, and/or sell copies of the Software, and to
28# permit persons to whom the Software is furnished to do so, subject to
29# the following conditions:
30#
31# The above copyright notice and this permission notice shall be
32# included in all copies or substantial portions of the Software.
33#
34# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
35# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
36# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
37# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
38# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
39# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
40# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
41
42"""Display a subunit stream through a colorized unittest test runner."""
43
44import heapq
45import subunit
46import sys
47import unittest
48
49import testtools
50
51
52class _AnsiColorizer(object):
53 """
54 A colorizer is an object that loosely wraps around a stream, allowing
55 callers to write text to the stream in a particular color.
56
57 Colorizer classes must implement C{supported()} and C{write(text, color)}.
58 """
59 _colors = dict(black=30, red=31, green=32, yellow=33,
60 blue=34, magenta=35, cyan=36, white=37)
61
62 def __init__(self, stream):
63 self.stream = stream
64
65 def supported(cls, stream=sys.stdout):
66 """
67 A class method that returns True if the current platform supports
68 coloring terminal output using this method. Returns False otherwise.
69 """
70 if not stream.isatty():
71 return False # auto color only on TTYs
72 try:
73 import curses
74 except ImportError:
75 return False
76 else:
77 try:
78 try:
79 return curses.tigetnum("colors") > 2
80 except curses.error:
81 curses.setupterm()
82 return curses.tigetnum("colors") > 2
83 except Exception:
84 # guess false in case of error
85 return False
86 supported = classmethod(supported)
87
88 def write(self, text, color):
89 """
90 Write the given text to the stream in the given color.
91
92 @param text: Text to be written to the stream.
93
94 @param color: A string label for a color. e.g. 'red', 'white'.
95 """
96 color = self._colors[color]
97 self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text))
98
99
100class _Win32Colorizer(object):
101 """
102 See _AnsiColorizer docstring.
103 """
104 def __init__(self, stream):
105 import win32console
106 red, green, blue, bold = (win32console.FOREGROUND_RED,
107 win32console.FOREGROUND_GREEN,
108 win32console.FOREGROUND_BLUE,
109 win32console.FOREGROUND_INTENSITY)
110 self.stream = stream
111 self.screenBuffer = win32console.GetStdHandle(
112 win32console.STD_OUT_HANDLE)
113 self._colors = {'normal': red | green | blue,
114 'red': red | bold,
115 'green': green | bold,
116 'blue': blue | bold,
117 'yellow': red | green | bold,
118 'magenta': red | blue | bold,
119 'cyan': green | blue | bold,
120 'white': red | green | blue | bold}
121
122 def supported(cls, stream=sys.stdout):
123 try:
124 import win32console
125 screenBuffer = win32console.GetStdHandle(
126 win32console.STD_OUT_HANDLE)
127 except ImportError:
128 return False
129 import pywintypes
130 try:
131 screenBuffer.SetConsoleTextAttribute(
132 win32console.FOREGROUND_RED |
133 win32console.FOREGROUND_GREEN |
134 win32console.FOREGROUND_BLUE)
135 except pywintypes.error:
136 return False
137 else:
138 return True
139 supported = classmethod(supported)
140
141 def write(self, text, color):
142 color = self._colors[color]
143 self.screenBuffer.SetConsoleTextAttribute(color)
144 self.stream.write(text)
145 self.screenBuffer.SetConsoleTextAttribute(self._colors['normal'])
146
147
148class _NullColorizer(object):
149 """
150 See _AnsiColorizer docstring.
151 """
152 def __init__(self, stream):
153 self.stream = stream
154
155 def supported(cls, stream=sys.stdout):
156 return True
157 supported = classmethod(supported)
158
159 def write(self, text, color):
160 self.stream.write(text)
161
162
163def get_elapsed_time_color(elapsed_time):
164 if elapsed_time > 1.0:
165 return 'red'
166 elif elapsed_time > 0.25:
167 return 'yellow'
168 else:
169 return 'green'
170
171
172class NovaTestResult(testtools.TestResult):
173 def __init__(self, stream, descriptions, verbosity):
174 super(NovaTestResult, self).__init__()
175 self.stream = stream
176 self.showAll = verbosity > 1
177 self.num_slow_tests = 10
178 self.slow_tests = [] # this is a fixed-sized heap
179 self.colorizer = None
180 # NOTE(vish): reset stdout for the terminal check
181 stdout = sys.stdout
182 sys.stdout = sys.__stdout__
183 for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]:
184 if colorizer.supported():
185 self.colorizer = colorizer(self.stream)
186 break
187 sys.stdout = stdout
188 self.start_time = None
189 self.last_time = {}
190 self.results = {}
191 self.last_written = None
192
193 def _writeElapsedTime(self, elapsed):
194 color = get_elapsed_time_color(elapsed)
195 self.colorizer.write(" %.2f" % elapsed, color)
196
197 def _addResult(self, test, *args):
198 try:
199 name = test.id()
200 except AttributeError:
201 name = 'Unknown.unknown'
202 test_class, test_name = name.rsplit('.', 1)
203
204 elapsed = (self._now() - self.start_time).total_seconds()
205 item = (elapsed, test_class, test_name)
206 if len(self.slow_tests) >= self.num_slow_tests:
207 heapq.heappushpop(self.slow_tests, item)
208 else:
209 heapq.heappush(self.slow_tests, item)
210
211 self.results.setdefault(test_class, [])
212 self.results[test_class].append((test_name, elapsed) + args)
213 self.last_time[test_class] = self._now()
214 self.writeTests()
215
216 def _writeResult(self, test_name, elapsed, long_result, color,
217 short_result, success):
218 if self.showAll:
219 self.stream.write(' %s' % str(test_name).ljust(66))
220 self.colorizer.write(long_result, color)
221 if success:
222 self._writeElapsedTime(elapsed)
223 self.stream.writeln()
224 else:
225 self.colorizer.write(short_result, color)
226
227 def addSuccess(self, test):
228 super(NovaTestResult, self).addSuccess(test)
229 self._addResult(test, 'OK', 'green', '.', True)
230
231 def addFailure(self, test, err):
232 if test.id() == 'process-returncode':
233 return
234 super(NovaTestResult, self).addFailure(test, err)
235 self._addResult(test, 'FAIL', 'red', 'F', False)
236
237 def addError(self, test, err):
238 super(NovaTestResult, self).addFailure(test, err)
239 self._addResult(test, 'ERROR', 'red', 'E', False)
240
241 def addSkip(self, test, reason=None, details=None):
242 super(NovaTestResult, self).addSkip(test, reason, details)
243 self._addResult(test, 'SKIP', 'blue', 'S', True)
244
245 def startTest(self, test):
246 self.start_time = self._now()
247 super(NovaTestResult, self).startTest(test)
248
249 def writeTestCase(self, cls):
250 if not self.results.get(cls):
251 return
252 if cls != self.last_written:
253 self.colorizer.write(cls, 'white')
254 self.stream.writeln()
255 for result in self.results[cls]:
256 self._writeResult(*result)
257 del self.results[cls]
258 self.stream.flush()
259 self.last_written = cls
260
261 def writeTests(self):
262 time = self.last_time.get(self.last_written, self._now())
263 if not self.last_written or (self._now() - time).total_seconds() > 2.0:
264 diff = 3.0
265 while diff > 2.0:
266 classes = self.results.keys()
267 oldest = min(classes, key=lambda x: self.last_time[x])
268 diff = (self._now() - self.last_time[oldest]).total_seconds()
269 self.writeTestCase(oldest)
270 else:
271 self.writeTestCase(self.last_written)
272
273 def done(self):
274 self.stopTestRun()
275
276 def stopTestRun(self):
277 for cls in list(self.results.iterkeys()):
278 self.writeTestCase(cls)
279 self.stream.writeln()
280 self.writeSlowTests()
281
282 def writeSlowTests(self):
283 # Pare out 'fast' tests
284 slow_tests = [item for item in self.slow_tests
285 if get_elapsed_time_color(item[0]) != 'green']
286 if slow_tests:
287 slow_total_time = sum(item[0] for item in slow_tests)
288 slow = ("Slowest %i tests took %.2f secs:"
289 % (len(slow_tests), slow_total_time))
290 self.colorizer.write(slow, 'yellow')
291 self.stream.writeln()
292 last_cls = None
293 # sort by name
294 for elapsed, cls, name in sorted(slow_tests,
295 key=lambda x: x[1] + x[2]):
296 if cls != last_cls:
297 self.colorizer.write(cls, 'white')
298 self.stream.writeln()
299 last_cls = cls
300 self.stream.write(' %s' % str(name).ljust(68))
301 self._writeElapsedTime(elapsed)
302 self.stream.writeln()
303
304 def printErrors(self):
305 if self.showAll:
306 self.stream.writeln()
307 self.printErrorList('ERROR', self.errors)
308 self.printErrorList('FAIL', self.failures)
309
310 def printErrorList(self, flavor, errors):
311 for test, err in errors:
312 self.colorizer.write("=" * 70, 'red')
313 self.stream.writeln()
314 self.colorizer.write(flavor, 'red')
315 self.stream.writeln(": %s" % test.id())
316 self.colorizer.write("-" * 70, 'red')
317 self.stream.writeln()
318 self.stream.writeln("%s" % err)
319
320
321test = subunit.ProtocolTestCase(sys.stdin, passthrough=None)
322
323if sys.version_info[0:2] <= (2, 6):
324 runner = unittest.TextTestRunner(verbosity=2)
325else:
326 runner = unittest.TextTestRunner(verbosity=2, resultclass=NovaTestResult)
327
328if runner.run(test).wasSuccessful():
329 exit_code = 0
330else:
331 exit_code = 1
332sys.exit(exit_code)