root / yatest.py

Revision b4631902a780af63e938a41fbb6d0e0bc6aaacc5, 13.0 kB (checked in by njs@…, 10 months ago)

make yatest -S output more readable

Line 
1#!/usr/bin/env python
2
3# Yet Another Test framework
4#
5# Basically does test scanning like nose or py.test (but simpler), and then
6# the actual test running is way simplified, and -- critically -- it can fork
7# before the test to give them each a pristine environment, even if there are
8# obnoxious shared libraries that don't want to play along (*cough* GTK+
9# *cough*).
10#
11# It will scan a package, looking for all modules whose name contains "test"
12# or "Test" (anywhere in their full name), and then within each such module it
13# will look for all classes whose name contains "test" or "Test", and then for
14# each such class it will instantiate (with no arguments) one instance for
15# each method whose name contains "test" or "Test", and on that instance run
16# .setUp(), then the test method, then .tearDown().  If the test method throws
17# an exception then it failed, if it doesn't then it succeeded.  If .setUp()
18# or .tearDown() throw an exception, then the test is an error (tearDown()
19# will be called in any case).
20#
21# Except, if the module or class or method has __test__ = False set, then it
22# will be ignored.
23#
24# If the codespeak 'py' package or the 'nose' package are installed, then
25# yatest will take advantage of them to give more detailed information on
26# errors.  Having both installed gives the most detail.
27#
28# Desireable future enhancements:
29#   -- Timeout support (even more fun select stuff -- this may call for
30#      twisted...).
31#   -- Some way to have setup that happens in the parent for multiple children
32#      (e.g. spawning Xvfb, dbus-daemon, and trusting that they will reset
33#      when all their clients have been *killed off*, even if we do not trust
34#      anything less than that to clean things up fully).
35#   -- Parallelized testing?
36#   -- Check test item's __module__ attribute, so as to only run items where
37#      they are defined, not where they have been imported?
38
39import sys
40import os
41import os.path
42import traceback
43import signal
44import tempfile
45from cPickle import dump, load
46from types import ClassType
47from optparse import OptionParser
48
49try:
50    from nose.inspector import inspect_traceback
51except ImportError:
52    def inspect_traceback(*args):
53        return "(unknown; install 'nose' for more details)"
54
55SKIPPED = "SKIPPED"
56SUCCESS = "SUCCESS"
57FAILURE = "FAILURE"
58   
59def ispkg(path):
60    return (os.path.isdir(path)
61            and os.path.exists(os.path.join(path, "__init__.py")))
62
63class YaTest(object):
64    def main(self):
65        parser = OptionParser(usage="%prog -p PATH-TO-PACKAGE [TEST-NAMES]")
66        parser.add_option("-S", "--nocapture",
67                          dest="capture_output",
68                          action="store_false", default=True,
69                          help="disable capture of stdout/stderr from tests")
70        parser.add_option("-p", "--package",
71                          dest="packages",
72                          action="append",
73                          help="package(s) to scan for tests")
74        (opts, args) = parser.parse_args()
75           
76        pkg_paths = opts.packages
77           
78        assert pkg_paths
79        for pkg in pkg_paths:
80            assert ispkg(pkg)
81
82        test_names = args
83
84        # Set up environment:
85        for pkg in pkg_paths:
86            pkg_dir, pkg_name = os.path.split(pkg)
87            sys.path.insert(0, pkg_dir)
88
89        if "DBUS_SESSION_BUS_ADDRESS" in os.environ:
90            del os.environ["DBUS_SESSION_BUS_ADDRESS"]
91        if "DISPLAY" in os.environ:
92            del os.environ["DISPLAY"]
93
94        try:
95            import py
96            magic_invoke = py.magic.invoke
97            magic_revoke = py.magic.revoke
98        except ImportError:
99            sys.stderr.write("py package is not installed; "
100                             + "giving unnecessarily boring tracebacks")
101            magic_invoke = lambda **kwargs: None
102            magic_revoke = lambda **kwargs: None
103
104        try:
105            magic_invoke(assertion=1)
106            # Go.
107            reporter = Reporter()
108            for pkg in pkg_paths:
109                pkg_dir, pkg_name = os.path.split(pkg)
110                Runner(reporter, opts.capture_output).scan_pkg(pkg, pkg_name, test_names)
111            reporter.close()
112        finally:
113            magic_revoke(assertion=1)
114
115class Runner(object):
116    def __init__(self, reporter, capture_output):
117        self.reporter = reporter
118        self.capture_output = capture_output
119
120    def thing_looks_testy(self, name, obj):
121        return (("test" in name or "Test" in name)
122                and getattr(obj, "__test__", True))
123
124    def scan_pkg(self, pkg_path, pkg_name, test_names):
125        assert ispkg(pkg_path)
126        # packages are themselves basically modules
127        self.maybe_load_and_scan_module(pkg_name, test_names)
128        # look for children
129        for child_basename in os.listdir(pkg_path):
130            child_path = os.path.join(pkg_path, child_basename)
131            if ispkg(child_path):
132                self.scan_pkg(child_path,
133                              ".".join([pkg_name, child_basename]),
134                              test_names)
135            if (child_path.endswith(".py")
136                and child_basename != "__init__.py"):
137                child_modname = ".".join([pkg_name, child_basename[:-3]])
138                self.maybe_load_and_scan_module(child_modname, test_names)
139           
140    def maybe_load_and_scan_module(self, module_name, test_names):
141        # Hack: Skip out early if the module cannot possibly be interesting.
142        if not self.thing_looks_testy(module_name, None):
143            return
144        # __import__("foo.bar.baz") returns the foo module object:
145        try:
146            mod = __import__(module_name)
147        except (ImportError, SyntaxError), e:
148            sys.stderr.write("Error loading module %s; skipping\n"
149                             "(error was: %s)\n"
150                             % (module_name, e))
151            return
152        for comp in module_name.split(".")[1:]:
153           mod = getattr(mod, comp)
154        if not self.thing_looks_testy(module_name, mod):
155            return
156
157        for key, val in mod.__dict__.iteritems():
158            if (self.thing_looks_testy(key, val)
159                and isinstance(val, (type, ClassType))):
160                self.run_test_class(".".join([module_name, key]), val, test_names)
161
162    def method_matches_names(self, method_name, test_names):
163        if not test_names:
164            return True
165        else:
166            matches = False
167            for name in test_names:
168                if name in method_name:
169                    matches = True
170            return matches
171
172    def run_test_class(self, class_name, cls, test_names):
173        for key, val in cls.__dict__.iteritems():
174            if (self.thing_looks_testy(key, val)
175                and callable(val)
176                and self.method_matches_names(".".join([class_name, key]), test_names)):
177                self.run_test_method(class_name, cls, key)
178
179    def run_test_method(self, class_name, cls, name):
180        if hasattr(cls, "preForkClassSetUp"):
181            try:
182                cls.preForkClassSetUp()
183            except Exception, e:
184                sys.stderr.write("Error in preForkClassSetUp: %s; skipping %s\n"
185                                 % (e, cls))
186                return
187
188        (readable_fd, writeable_fd) = os.pipe()
189        readable = os.fdopen(readable_fd, "rb")
190        writeable = os.fdopen(writeable_fd, "wb")
191        if self.capture_output:
192            output = tempfile.TemporaryFile()
193        else:
194            output = None
195            print "----- Starting test %s" % (".".join([class_name, name]),)
196        pid = os.fork()
197        if pid:
198            writeable.close()
199            self.run_test_method_in_parent(pid,
200                                           class_name, cls, name, readable,
201                                           output)
202        else:
203            readable.close()
204            self.run_test_method_in_child(cls, name, writeable,
205                                          output)
206            # This should not return
207            assert False
208       
209    def run_test_method_in_parent(self, child_pid,
210                                  class_name, cls, name, readable, output):
211        method_name = ".".join([class_name, name])
212        try:
213            try:
214                result = load(readable)
215            finally:
216                # Kill off children, even on control-C etc.
217                os.kill(-child_pid, signal.SIGTERM)
218        except EOFError:
219            one_result = (FAILURE, "?? (child blew up before reporting back)")
220            result = (one_result, one_result, one_result)
221        readable.close()
222        os.waitpid(child_pid, 0)
223        if output is not None:
224            output.seek(0)
225            output_data = output.read()
226            output.close()
227        else:
228            output_data = None
229        self.reporter.report(method_name, output_data, result)
230
231    def string_for_traceback(self, exc_info):
232        tb = "".join(traceback.format_exception(*exc_info))
233        # nose's inspect_traceback blows up when run on exceptions thrown out
234        # of Pyrex.  FIXME: file nose bug
235        try:
236            details = inspect_traceback(exc_info[2])
237        except SystemExit, KeyboardInterrupt:
238            raise
239        except Exception, e:
240            details = ("(failed to extract details;\n"
241                       + "nose.inspect.inspect_traceback threw exception\n"
242                       + "(maybe because the error was in pyrex code):\n"
243                       + traceback.format_exc()
244                       + ")")
245        return "%s\nDetails of failing source code:\n%s" % (tb, details)
246
247    def marshal_one_result(self, result):
248        if result is None:
249            return (SKIPPED,)
250        elif result is True:
251            return (SUCCESS,)
252        else:
253            return (FAILURE, self.string_for_traceback(result))
254
255    def run_test_method_in_child(self, cls, name, writeable, output):
256        os.setpgid(0, 0)
257        if output is not None:
258            os.dup2(output.fileno(), 1)
259            os.dup2(output.fileno(), 2)
260
261        instance = None         # None or instance of cls
262        setup_result = None     # True or exc_info
263        test_result = None      # None or True or exc_info
264        teardown_result = None  # True or exc_info
265        # If at first you don't succeed...
266        try:
267            try:
268                try# ...again.
269                    instance = cls()
270                    if hasattr(instance, "setUp"):
271                        instance.setUp()
272                except:
273                    setup_result = sys.exc_info()
274                else:
275                    setup_result = True
276                if setup_result is True:
277                    try:
278                        getattr(instance, name)()
279                    except:
280                        test_result = sys.exc_info()
281                    else:
282                        test_result = True
283            finally:
284                try:
285                    if hasattr(instance, "tearDown"):
286                        instance.tearDown()
287                except:
288                    teardown_result = sys.exc_info()
289                else:
290                    teardown_result = True
291
292                # Send the results back to our parent.
293                dump((self.marshal_one_result(setup_result),
294                      self.marshal_one_result(test_result),
295                      self.marshal_one_result(teardown_result)),
296                     writeable, -1)
297        except:
298            pass
299        writeable.close()
300        sys.exit()
301           
302
303class Reporter(object):
304    def __init__(self):
305        self.total_run = 0
306        self.total_passed = 0
307        sys.stdout.write("Testing: ")
308        sys.stdout.flush()
309
310    def report(self, method_name, output_data, marshalled_result):
311        # NB output_data may be None if output capturing is disabled
312        self.total_run += 1
313        (setup, test, teardown) = marshalled_result
314        if (setup[0] == SUCCESS
315            and test[0] == SUCCESS
316            and teardown[0] == SUCCESS):
317            self.total_passed += 1
318            sys.stdout.write(".")
319            sys.stdout.flush()
320        else:
321            # NB the newline at the start of this string
322            if output_data is None:
323                output_string = "<child output not captured>"
324            else:
325                # FIXME: sanitize output?
326                output_string = output_data
327            sys.stdout.write("""
328=========================================
329Problem in: %s
330=========================================
331test output:
332%s
333-----------------------------------------
334__init__ and setUp: %s
335-----------------------------------------
336test itself: %s
337-----------------------------------------
338tearDown: %s
339-----------------------------------------
340""" % (method_name,
341       output_string,
342       "\n".join(setup),
343       "\n".join(test),
344       "\n".join(teardown)))
345            sys.stdout.flush()
346
347    def close(self):
348        sys.stdout.write("\nRun complete; %s tests, %s failures.\n"
349                         % (self.total_run,
350                            (self.total_run - self.total_passed) or "no"))
351
352if __name__ == "__main__":
353    YaTest().main()
Note: See TracBrowser for help on using the browser.