blob: 3f68a51017a5d698819f796efde9ede008d0f6e2 [file] [log] [blame]
Matthew Treinish0e2d8aa2013-07-18 15:16:37 -04001#!/usr/bin/env python
Matthew Treinish0e2d8aa2013-07-18 15:16:37 -04002
3# Copyright (c) 2013, Nebula, Inc.
4# Copyright 2010 United States Government as represented by the
5# Administrator of the National Aeronautics and Space Administration.
6# All Rights Reserved.
7#
8# Licensed under the Apache License, Version 2.0 (the "License"); you may
9# not use this file except in compliance with the License. You may obtain
10# a copy of the License at
11#
12# http://www.apache.org/licenses/LICENSE-2.0
13#
14# Unless required by applicable law or agreed to in writing, software
15# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
16# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
17# License for the specific language governing permissions and limitations
18# under the License.
19#
20# Colorizer Code is borrowed from Twisted:
21# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
22#
23# Permission is hereby granted, free of charge, to any person obtaining
24# a copy of this software and associated documentation files (the
25# "Software"), to deal in the Software without restriction, including
26# without limitation the rights to use, copy, modify, merge, publish,
27# distribute, sublicense, and/or sell copies of the Software, and to
28# permit persons to whom the Software is furnished to do so, subject to
29# the following conditions:
30#
31# The above copyright notice and this permission notice shall be
32# included in all copies or substantial portions of the Software.
33#
34# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
35# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
36# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
37# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
38# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
39# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
40# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
41
42"""Display a subunit stream through a colorized unittest test runner."""
43
44import heapq
Matthew Treinish0e2d8aa2013-07-18 15:16:37 -040045import sys
46import unittest
47
Matthew Treinish96e9e882014-06-09 18:37:19 -040048import subunit
Matthew Treinish0e2d8aa2013-07-18 15:16:37 -040049import testtools
50
51
52class _AnsiColorizer(object):
Ken'ichi Ohmichi7616d192015-11-17 13:12:55 +000053 """A colorizer is an object that loosely wraps around a stream
54
55 allowing callers to write text to the stream in a particular color.
Matthew Treinish0e2d8aa2013-07-18 15:16:37 -040056
57 Colorizer classes must implement C{supported()} and C{write(text, color)}.
58 """
59 _colors = dict(black=30, red=31, green=32, yellow=33,
60 blue=34, magenta=35, cyan=36, white=37)
61
62 def __init__(self, stream):
63 self.stream = stream
64
65 def supported(cls, stream=sys.stdout):
Ken'ichi Ohmichi7616d192015-11-17 13:12:55 +000066 """Check the current platform supports coloring terminal output
67
Matthew Treinish0e2d8aa2013-07-18 15:16:37 -040068 A class method that returns True if the current platform supports
69 coloring terminal output using this method. Returns False otherwise.
70 """
71 if not stream.isatty():
72 return False # auto color only on TTYs
73 try:
74 import curses
75 except ImportError:
76 return False
77 else:
78 try:
79 try:
80 return curses.tigetnum("colors") > 2
81 except curses.error:
82 curses.setupterm()
83 return curses.tigetnum("colors") > 2
84 except Exception:
85 # guess false in case of error
86 return False
87 supported = classmethod(supported)
88
89 def write(self, text, color):
Ken'ichi Ohmichi7616d192015-11-17 13:12:55 +000090 """Write the given text to the stream in the given color.
Matthew Treinish0e2d8aa2013-07-18 15:16:37 -040091
92 @param text: Text to be written to the stream.
93
94 @param color: A string label for a color. e.g. 'red', 'white'.
95 """
96 color = self._colors[color]
97 self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text))
98
99
100class _Win32Colorizer(object):
Ken'ichi Ohmichi7616d192015-11-17 13:12:55 +0000101 """See _AnsiColorizer docstring."""
Matthew Treinish0e2d8aa2013-07-18 15:16:37 -0400102 def __init__(self, stream):
103 import win32console
104 red, green, blue, bold = (win32console.FOREGROUND_RED,
105 win32console.FOREGROUND_GREEN,
106 win32console.FOREGROUND_BLUE,
107 win32console.FOREGROUND_INTENSITY)
108 self.stream = stream
109 self.screenBuffer = win32console.GetStdHandle(
110 win32console.STD_OUT_HANDLE)
111 self._colors = {'normal': red | green | blue,
112 'red': red | bold,
113 'green': green | bold,
114 'blue': blue | bold,
115 'yellow': red | green | bold,
116 'magenta': red | blue | bold,
117 'cyan': green | blue | bold,
118 'white': red | green | blue | bold}
119
120 def supported(cls, stream=sys.stdout):
121 try:
122 import win32console
123 screenBuffer = win32console.GetStdHandle(
124 win32console.STD_OUT_HANDLE)
125 except ImportError:
126 return False
127 import pywintypes
128 try:
129 screenBuffer.SetConsoleTextAttribute(
130 win32console.FOREGROUND_RED |
131 win32console.FOREGROUND_GREEN |
132 win32console.FOREGROUND_BLUE)
133 except pywintypes.error:
134 return False
135 else:
136 return True
137 supported = classmethod(supported)
138
139 def write(self, text, color):
140 color = self._colors[color]
141 self.screenBuffer.SetConsoleTextAttribute(color)
142 self.stream.write(text)
143 self.screenBuffer.SetConsoleTextAttribute(self._colors['normal'])
144
145
146class _NullColorizer(object):
Ken'ichi Ohmichi7616d192015-11-17 13:12:55 +0000147 """See _AnsiColorizer docstring."""
Matthew Treinish0e2d8aa2013-07-18 15:16:37 -0400148 def __init__(self, stream):
149 self.stream = stream
150
151 def supported(cls, stream=sys.stdout):
152 return True
153 supported = classmethod(supported)
154
155 def write(self, text, color):
156 self.stream.write(text)
157
158
159def get_elapsed_time_color(elapsed_time):
160 if elapsed_time > 1.0:
161 return 'red'
162 elif elapsed_time > 0.25:
163 return 'yellow'
164 else:
165 return 'green'
166
167
168class NovaTestResult(testtools.TestResult):
169 def __init__(self, stream, descriptions, verbosity):
170 super(NovaTestResult, self).__init__()
171 self.stream = stream
172 self.showAll = verbosity > 1
173 self.num_slow_tests = 10
174 self.slow_tests = [] # this is a fixed-sized heap
175 self.colorizer = None
176 # NOTE(vish): reset stdout for the terminal check
177 stdout = sys.stdout
178 sys.stdout = sys.__stdout__
179 for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]:
180 if colorizer.supported():
181 self.colorizer = colorizer(self.stream)
182 break
183 sys.stdout = stdout
184 self.start_time = None
185 self.last_time = {}
186 self.results = {}
187 self.last_written = None
188
189 def _writeElapsedTime(self, elapsed):
190 color = get_elapsed_time_color(elapsed)
191 self.colorizer.write(" %.2f" % elapsed, color)
192
193 def _addResult(self, test, *args):
194 try:
195 name = test.id()
196 except AttributeError:
197 name = 'Unknown.unknown'
198 test_class, test_name = name.rsplit('.', 1)
199
200 elapsed = (self._now() - self.start_time).total_seconds()
201 item = (elapsed, test_class, test_name)
202 if len(self.slow_tests) >= self.num_slow_tests:
203 heapq.heappushpop(self.slow_tests, item)
204 else:
205 heapq.heappush(self.slow_tests, item)
206
207 self.results.setdefault(test_class, [])
208 self.results[test_class].append((test_name, elapsed) + args)
209 self.last_time[test_class] = self._now()
210 self.writeTests()
211
212 def _writeResult(self, test_name, elapsed, long_result, color,
213 short_result, success):
214 if self.showAll:
215 self.stream.write(' %s' % str(test_name).ljust(66))
216 self.colorizer.write(long_result, color)
217 if success:
218 self._writeElapsedTime(elapsed)
219 self.stream.writeln()
220 else:
221 self.colorizer.write(short_result, color)
222
223 def addSuccess(self, test):
224 super(NovaTestResult, self).addSuccess(test)
225 self._addResult(test, 'OK', 'green', '.', True)
226
227 def addFailure(self, test, err):
228 if test.id() == 'process-returncode':
229 return
230 super(NovaTestResult, self).addFailure(test, err)
231 self._addResult(test, 'FAIL', 'red', 'F', False)
232
233 def addError(self, test, err):
234 super(NovaTestResult, self).addFailure(test, err)
235 self._addResult(test, 'ERROR', 'red', 'E', False)
236
237 def addSkip(self, test, reason=None, details=None):
238 super(NovaTestResult, self).addSkip(test, reason, details)
239 self._addResult(test, 'SKIP', 'blue', 'S', True)
240
241 def startTest(self, test):
242 self.start_time = self._now()
243 super(NovaTestResult, self).startTest(test)
244
245 def writeTestCase(self, cls):
246 if not self.results.get(cls):
247 return
248 if cls != self.last_written:
249 self.colorizer.write(cls, 'white')
250 self.stream.writeln()
251 for result in self.results[cls]:
252 self._writeResult(*result)
253 del self.results[cls]
254 self.stream.flush()
255 self.last_written = cls
256
257 def writeTests(self):
258 time = self.last_time.get(self.last_written, self._now())
259 if not self.last_written or (self._now() - time).total_seconds() > 2.0:
260 diff = 3.0
261 while diff > 2.0:
262 classes = self.results.keys()
263 oldest = min(classes, key=lambda x: self.last_time[x])
264 diff = (self._now() - self.last_time[oldest]).total_seconds()
265 self.writeTestCase(oldest)
266 else:
267 self.writeTestCase(self.last_written)
268
269 def done(self):
270 self.stopTestRun()
271
272 def stopTestRun(self):
273 for cls in list(self.results.iterkeys()):
274 self.writeTestCase(cls)
275 self.stream.writeln()
276 self.writeSlowTests()
277
278 def writeSlowTests(self):
279 # Pare out 'fast' tests
280 slow_tests = [item for item in self.slow_tests
281 if get_elapsed_time_color(item[0]) != 'green']
282 if slow_tests:
283 slow_total_time = sum(item[0] for item in slow_tests)
284 slow = ("Slowest %i tests took %.2f secs:"
285 % (len(slow_tests), slow_total_time))
286 self.colorizer.write(slow, 'yellow')
287 self.stream.writeln()
288 last_cls = None
289 # sort by name
290 for elapsed, cls, name in sorted(slow_tests,
291 key=lambda x: x[1] + x[2]):
292 if cls != last_cls:
293 self.colorizer.write(cls, 'white')
294 self.stream.writeln()
295 last_cls = cls
296 self.stream.write(' %s' % str(name).ljust(68))
297 self._writeElapsedTime(elapsed)
298 self.stream.writeln()
299
300 def printErrors(self):
301 if self.showAll:
302 self.stream.writeln()
303 self.printErrorList('ERROR', self.errors)
304 self.printErrorList('FAIL', self.failures)
305
306 def printErrorList(self, flavor, errors):
307 for test, err in errors:
308 self.colorizer.write("=" * 70, 'red')
309 self.stream.writeln()
310 self.colorizer.write(flavor, 'red')
311 self.stream.writeln(": %s" % test.id())
312 self.colorizer.write("-" * 70, 'red')
313 self.stream.writeln()
314 self.stream.writeln("%s" % err)
315
316
317test = subunit.ProtocolTestCase(sys.stdin, passthrough=None)
318
319if sys.version_info[0:2] <= (2, 6):
320 runner = unittest.TextTestRunner(verbosity=2)
321else:
322 runner = unittest.TextTestRunner(verbosity=2, resultclass=NovaTestResult)
323
324if runner.run(test).wasSuccessful():
325 exit_code = 0
326else:
327 exit_code = 1
328sys.exit(exit_code)