Move pytest-specific code from testlib to pytest

Closes #1716053.

authorRémi Cardona <remi.cardona@logilab.fr>
changesetb2d560d76ee6
branchdefault
phasepublic
hiddenno
parent revision#1a80614c2855 [tests] Fix resource leak
child revision#dcdcb990b2c8 [py3k] dict.iteritems → dict.items
files modified by this revision
logilab/common/pytest.py
logilab/common/testlib.py
# HG changeset patch
# User Rémi Cardona <remi.cardona@logilab.fr>
# Date 1441970844 -7200
# Fri Sep 11 13:27:24 2015 +0200
# Node ID b2d560d76ee61ce150e7dc161136bd427f39e6d8
# Parent 1a80614c2855ba4fbe48fd00646aba467b92f6b9
Move pytest-specific code from testlib to pytest

Closes #1716053.

diff --git a/logilab/common/pytest.py b/logilab/common/pytest.py
@@ -116,20 +116,24 @@
1  import os, sys, re
2  import os.path as osp
3  from time import time, clock
4  import warnings
5  import types
6 +import inspect
7 +import traceback
8  from inspect import isgeneratorfunction, isclass
9  from contextlib import contextmanager
10  from random import shuffle
11 +from itertools import dropwhile
12 
13  from logilab.common.fileutils import abspath_listdir
14  from logilab.common import textutils
15  from logilab.common import testlib, STD_BLACKLIST
16  # use the same unittest module as testlib
17  from logilab.common.testlib import unittest, start_interactive_mode
18  from logilab.common.deprecation import deprecated
19 +from logilab.common.debugger import Debugger, colorize_source
20  import doctest
21 
22  import unittest as unittest_legacy
23  if not getattr(unittest_legacy, "__package__", None):
24      try:
@@ -936,11 +940,11 @@
25                      tags = tags | getattr(test.__self__.__class__, 'tags', testlib.Tags())
26                  return tags.match(tags_pattern)
27          return True # no pattern
28 
29      def _makeResult(self):
30 -        return testlib.SkipAwareTestResult(self.stream, self.descriptions,
31 +        return SkipAwareTestResult(self.stream, self.descriptions,
32                                     self.verbosity, self.exitfirst,
33                                     self.pdbmode, self.cvg, self.colorize)
34 
35      def run(self, test):
36          "Run the given test case or test suite."
@@ -981,10 +985,159 @@
37                  self.stream.write(', '.join(det_results))
38                  self.stream.write(")")
39              self.stream.writeln("")
40          return result
41 
42 +
43 +class SkipAwareTestResult(unittest._TextTestResult):
44 +
45 +    def __init__(self, stream, descriptions, verbosity,
46 +                 exitfirst=False, pdbmode=False, cvg=None, colorize=False):
47 +        super(SkipAwareTestResult, self).__init__(stream,
48 +                                                  descriptions, verbosity)
49 +        self.skipped = []
50 +        self.debuggers = []
51 +        self.fail_descrs = []
52 +        self.error_descrs = []
53 +        self.exitfirst = exitfirst
54 +        self.pdbmode = pdbmode
55 +        self.cvg = cvg
56 +        self.colorize = colorize
57 +        self.pdbclass = Debugger
58 +        self.verbose = verbosity > 1
59 +
60 +    def descrs_for(self, flavour):
61 +        return getattr(self, '%s_descrs' % flavour.lower())
62 +
63 +    def _create_pdb(self, test_descr, flavour):
64 +        self.descrs_for(flavour).append( (len(self.debuggers), test_descr) )
65 +        if self.pdbmode:
66 +            self.debuggers.append(self.pdbclass(sys.exc_info()[2]))
67 +
68 +    def _iter_valid_frames(self, frames):
69 +        """only consider non-testlib frames when formatting  traceback"""
70 +        lgc_testlib = osp.abspath(__file__)
71 +        std_testlib = osp.abspath(unittest.__file__)
72 +        invalid = lambda fi: osp.abspath(fi[1]) in (lgc_testlib, std_testlib)
73 +        for frameinfo in dropwhile(invalid, frames):
74 +            yield frameinfo
75 +
76 +    def _exc_info_to_string(self, err, test):
77 +        """Converts a sys.exc_info()-style tuple of values into a string.
78 +
79 +        This method is overridden here because we want to colorize
80 +        lines if --color is passed, and display local variables if
81 +        --verbose is passed
82 +        """
83 +        exctype, exc, tb = err
84 +        output = ['Traceback (most recent call last)']
85 +        frames = inspect.getinnerframes(tb)
86 +        colorize = self.colorize
87 +        frames = enumerate(self._iter_valid_frames(frames))
88 +        for index, (frame, filename, lineno, funcname, ctx, ctxindex) in frames:
89 +            filename = osp.abspath(filename)
90 +            if ctx is None: # pyc files or C extensions for instance
91 +                source = '<no source available>'
92 +            else:
93 +                source = ''.join(ctx)
94 +            if colorize:
95 +                filename = textutils.colorize_ansi(filename, 'magenta')
96 +                source = colorize_source(source)
97 +            output.append('  File "%s", line %s, in %s' % (filename, lineno, funcname))
98 +            output.append('    %s' % source.strip())
99 +            if self.verbose:
100 +                output.append('%r == %r' % (dir(frame), test.__module__))
101 +                output.append('')
102 +                output.append('    ' + ' local variables '.center(66, '-'))
103 +                for varname, value in sorted(frame.f_locals.items()):
104 +                    output.append('    %s: %r' % (varname, value))
105 +                    if varname == 'self': # special handy processing for self
106 +                        for varname, value in sorted(vars(value).items()):
107 +                            output.append('      self.%s: %r' % (varname, value))
108 +                output.append('    ' + '-' * 66)
109 +                output.append('')
110 +        output.append(''.join(traceback.format_exception_only(exctype, exc)))
111 +        return '\n'.join(output)
112 +
113 +    def addError(self, test, err):
114 +        """err ->  (exc_type, exc, tcbk)"""
115 +        exc_type, exc, _ = err
116 +        if isinstance(exc, testlib.SkipTest):
117 +            assert exc_type == SkipTest
118 +            self.addSkip(test, exc)
119 +        else:
120 +            if self.exitfirst:
121 +                self.shouldStop = True
122 +            descr = self.getDescription(test)
123 +            super(SkipAwareTestResult, self).addError(test, err)
124 +            self._create_pdb(descr, 'error')
125 +
126 +    def addFailure(self, test, err):
127 +        if self.exitfirst:
128 +            self.shouldStop = True
129 +        descr = self.getDescription(test)
130 +        super(SkipAwareTestResult, self).addFailure(test, err)
131 +        self._create_pdb(descr, 'fail')
132 +
133 +    def addSkip(self, test, reason):
134 +        self.skipped.append((test, reason))
135 +        if self.showAll:
136 +            self.stream.writeln("SKIPPED")
137 +        elif self.dots:
138 +            self.stream.write('S')
139 +
140 +    def printErrors(self):
141 +        super(SkipAwareTestResult, self).printErrors()
142 +        self.printSkippedList()
143 +
144 +    def printSkippedList(self):
145 +        # format (test, err) compatible with unittest2
146 +        for test, err in self.skipped:
147 +            descr = self.getDescription(test)
148 +            self.stream.writeln(self.separator1)
149 +            self.stream.writeln("%s: %s" % ('SKIPPED', descr))
150 +            self.stream.writeln("\t%s" % err)
151 +
152 +    def printErrorList(self, flavour, errors):
153 +        for (_, descr), (test, err) in zip(self.descrs_for(flavour), errors):
154 +            self.stream.writeln(self.separator1)
155 +            self.stream.writeln("%s: %s" % (flavour, descr))
156 +            self.stream.writeln(self.separator2)
157 +            self.stream.writeln(err)
158 +            self.stream.writeln('no stdout'.center(len(self.separator2)))
159 +            self.stream.writeln('no stderr'.center(len(self.separator2)))
160 +
161 +
162 +from .decorators import monkeypatch
163 +orig_call = testlib.TestCase.__call__
164 +@monkeypatch(testlib.TestCase, '__call__')
165 +def call(self, result=None, runcondition=None, options=None):
166 +    orig_call(self, result=result, runcondition=runcondition, options=options)
167 +    if hasattr(options, "exitfirst") and options.exitfirst:
168 +        # add this test to restart file
169 +        try:
170 +            restartfile = open(FILE_RESTART, 'a')
171 +            try:
172 +                descr = '.'.join((self.__class__.__module__,
173 +                                  self.__class__.__name__,
174 +                                  self._testMethodName))
175 +                restartfile.write(descr+os.linesep)
176 +            finally:
177 +                restartfile.close()
178 +        except Exception:
179 +            print("Error while saving succeeded test into",
180 +                  osp.join(os.getcwd(), FILE_RESTART),
181 +                  file=sys.__stderr__)
182 +            raise
183 +
184 +
185 +@monkeypatch(testlib.TestCase)
186 +def defaultTestResult(self):
187 +    """return a new instance of the defaultTestResult"""
188 +    return SkipAwareTestResult()
189 +
190 +
191  class NonStrictTestLoader(unittest.TestLoader):
192      """
193      Overrides default testloader to be able to omit classname when
194      specifying tests to run on command line.
195 
@@ -1184,11 +1337,11 @@
196          weaver.weave_module(arg, ContractAspect)
197      return True
198 
199 
200  # monkeypatch unittest and doctest (ouch !)
201 -unittest._TextTestResult = testlib.SkipAwareTestResult
202 +unittest._TextTestResult = SkipAwareTestResult
203  unittest.TextTestRunner = SkipAwareTextTestRunner
204  unittest.TestLoader = NonStrictTestLoader
205  unittest.TestProgram = SkipAwareTestProgram
206 
207  if sys.version_info >= (2, 4):
diff --git a/logilab/common/testlib.py b/logilab/common/testlib.py
@@ -45,19 +45,16 @@
208  # pylint: disable=C0103
209 
210  import sys
211  import os, os.path as osp
212  import re
213 -import traceback
214 -import inspect
215  import difflib
216  import tempfile
217  import math
218  import warnings
219  from shutil import rmtree
220  from operator import itemgetter
221 -from itertools import dropwhile
222  from inspect import isgeneratorfunction
223 
224  from six import string_types
225  from six.moves import builtins, range, configparser, input
226 
@@ -69,16 +66,16 @@
227          import unittest2 as unittest
228          from unittest2 import SkipTest
229      except ImportError:
230          raise ImportError("You have to install python-unittest2 to use %s" % __name__)
231  else:
232 -    import unittest
233 +    import unittest as unittest
234      from unittest import SkipTest
235 
236  from functools import wraps
237 
238 -from logilab.common.debugger import Debugger, colorize_source
239 +from logilab.common.debugger import Debugger
240  from logilab.common.decorators import cached, classproperty
241  from logilab.common import textutils
242 
243 
244  __all__ = ['unittest_main', 'find_tests']
@@ -202,127 +199,10 @@
245                  break
246 
247 
248  # test utils ##################################################################
249 
250 -class SkipAwareTestResult(unittest._TextTestResult):
251 -
252 -    def __init__(self, stream, descriptions, verbosity,
253 -                 exitfirst=False, pdbmode=False, cvg=None, colorize=False):
254 -        super(SkipAwareTestResult, self).__init__(stream,
255 -                                                  descriptions, verbosity)
256 -        self.skipped = []
257 -        self.debuggers = []
258 -        self.fail_descrs = []
259 -        self.error_descrs = []
260 -        self.exitfirst = exitfirst
261 -        self.pdbmode = pdbmode
262 -        self.cvg = cvg
263 -        self.colorize = colorize
264 -        self.pdbclass = Debugger
265 -        self.verbose = verbosity > 1
266 -
267 -    def descrs_for(self, flavour):
268 -        return getattr(self, '%s_descrs' % flavour.lower())
269 -
270 -    def _create_pdb(self, test_descr, flavour):
271 -        self.descrs_for(flavour).append( (len(self.debuggers), test_descr) )
272 -        if self.pdbmode:
273 -            self.debuggers.append(self.pdbclass(sys.exc_info()[2]))
274 -
275 -    def _iter_valid_frames(self, frames):
276 -        """only consider non-testlib frames when formatting  traceback"""
277 -        lgc_testlib = osp.abspath(__file__)
278 -        std_testlib = osp.abspath(unittest.__file__)
279 -        invalid = lambda fi: osp.abspath(fi[1]) in (lgc_testlib, std_testlib)
280 -        for frameinfo in dropwhile(invalid, frames):
281 -            yield frameinfo
282 -
283 -    def _exc_info_to_string(self, err, test):
284 -        """Converts a sys.exc_info()-style tuple of values into a string.
285 -
286 -        This method is overridden here because we want to colorize
287 -        lines if --color is passed, and display local variables if
288 -        --verbose is passed
289 -        """
290 -        exctype, exc, tb = err
291 -        output = ['Traceback (most recent call last)']
292 -        frames = inspect.getinnerframes(tb)
293 -        colorize = self.colorize
294 -        frames = enumerate(self._iter_valid_frames(frames))
295 -        for index, (frame, filename, lineno, funcname, ctx, ctxindex) in frames:
296 -            filename = osp.abspath(filename)
297 -            if ctx is None: # pyc files or C extensions for instance
298 -                source = '<no source available>'
299 -            else:
300 -                source = ''.join(ctx)
301 -            if colorize:
302 -                filename = textutils.colorize_ansi(filename, 'magenta')
303 -                source = colorize_source(source)
304 -            output.append('  File "%s", line %s, in %s' % (filename, lineno, funcname))
305 -            output.append('    %s' % source.strip())
306 -            if self.verbose:
307 -                output.append('%r == %r' % (dir(frame), test.__module__))
308 -                output.append('')
309 -                output.append('    ' + ' local variables '.center(66, '-'))
310 -                for varname, value in sorted(frame.f_locals.items()):
311 -                    output.append('    %s: %r' % (varname, value))
312 -                    if varname == 'self': # special handy processing for self
313 -                        for varname, value in sorted(vars(value).items()):
314 -                            output.append('      self.%s: %r' % (varname, value))
315 -                output.append('    ' + '-' * 66)
316 -                output.append('')
317 -        output.append(''.join(traceback.format_exception_only(exctype, exc)))
318 -        return '\n'.join(output)
319 -
320 -    def addError(self, test, err):
321 -        """err ->  (exc_type, exc, tcbk)"""
322 -        exc_type, exc, _ = err
323 -        if isinstance(exc, SkipTest):
324 -            assert exc_type == SkipTest
325 -            self.addSkip(test, exc)
326 -        else:
327 -            if self.exitfirst:
328 -                self.shouldStop = True
329 -            descr = self.getDescription(test)
330 -            super(SkipAwareTestResult, self).addError(test, err)
331 -            self._create_pdb(descr, 'error')
332 -
333 -    def addFailure(self, test, err):
334 -        if self.exitfirst:
335 -            self.shouldStop = True
336 -        descr = self.getDescription(test)
337 -        super(SkipAwareTestResult, self).addFailure(test, err)
338 -        self._create_pdb(descr, 'fail')
339 -
340 -    def addSkip(self, test, reason):
341 -        self.skipped.append((test, reason))
342 -        if self.showAll:
343 -            self.stream.writeln("SKIPPED")
344 -        elif self.dots:
345 -            self.stream.write('S')
346 -
347 -    def printErrors(self):
348 -        super(SkipAwareTestResult, self).printErrors()
349 -        self.printSkippedList()
350 -
351 -    def printSkippedList(self):
352 -        # format (test, err) compatible with unittest2
353 -        for test, err in self.skipped:
354 -            descr = self.getDescription(test)
355 -            self.stream.writeln(self.separator1)
356 -            self.stream.writeln("%s: %s" % ('SKIPPED', descr))
357 -            self.stream.writeln("\t%s" % err)
358 -
359 -    def printErrorList(self, flavour, errors):
360 -        for (_, descr), (test, err) in zip(self.descrs_for(flavour), errors):
361 -            self.stream.writeln(self.separator1)
362 -            self.stream.writeln("%s: %s" % (flavour, descr))
363 -            self.stream.writeln(self.separator2)
364 -            self.stream.writeln(err)
365 -            self.stream.writeln('no stdout'.center(len(self.separator2)))
366 -            self.stream.writeln('no stderr'.center(len(self.separator2)))
367 
368  # Add deprecation warnings about new api used by module level fixtures in unittest2
369  # http://www.voidspace.org.uk/python/articles/unittest2.shtml#setupmodule-and-teardownmodule
370  class _DebugResult(object): # simplify import statement among unittest flavors..
371      "Used by the TestSuite to hold previous class when running in debug."
@@ -485,14 +365,12 @@
372      def __call__(self, result=None, runcondition=None, options=None):
373          """rewrite TestCase.__call__ to support generative tests
374          This is mostly a copy/paste from unittest.py (i.e same
375          variable names, same logic, except for the generative tests part)
376          """
377 -        from logilab.common.pytest import FILE_RESTART
378          if result is None:
379              result = self.defaultTestResult()
380 -        result.pdbclass = self.pdbclass
381          self._options_ = options
382          # if result.cvg:
383          #     result.cvg.start()
384          testMethod = self._get_test_method()
385          if (getattr(self.__class__, "__unittest_skip__", False) or
@@ -520,26 +398,10 @@
386                  status = self._proceed(result, testMethod)
387                  success = (status == 0)
388              if not self.quiet_run(result, self.tearDown):
389                  return
390              if not generative and success:
391 -                if hasattr(options, "exitfirst") and options.exitfirst:
392 -                    # add this test to restart file
393 -                    try:
394 -                        restartfile = open(FILE_RESTART, 'a')
395 -                        try:
396 -                            descr = '.'.join((self.__class__.__module__,
397 -                                              self.__class__.__name__,
398 -                                              self._testMethodName))
399 -                            restartfile.write(descr+os.linesep)
400 -                        finally:
401 -                            restartfile.close()
402 -                    except Exception:
403 -                        print("Error while saving succeeded test into",
404 -                              osp.join(os.getcwd(), FILE_RESTART),
405 -                              file=sys.__stderr__)
406 -                        raise
407                  result.addSuccess(self)
408          finally:
409              # if result.cvg:
410              #     result.cvg.stop()
411              result.stopTest(self)
@@ -608,14 +470,10 @@
412          except:
413              result.addError(self, self.__exc_info())
414              return 2
415          return 0
416 
417 -    def defaultTestResult(self):
418 -        """return a new instance of the defaultTestResult"""
419 -        return SkipAwareTestResult()
420 -
421      def innerSkip(self, msg=None):
422          """mark a generative test as skipped for the <msg> reason"""
423          msg = msg or 'test was skipped'
424          raise InnerTestSkipped(msg)
425