fix
This commit is contained in:
85
.CondaPkg/env/Lib/unittest/__init__.py
vendored
85
.CondaPkg/env/Lib/unittest/__init__.py
vendored
@@ -1,85 +0,0 @@
|
||||
"""
|
||||
Python unit testing framework, based on Erich Gamma's JUnit and Kent Beck's
|
||||
Smalltalk testing framework (used with permission).
|
||||
|
||||
This module contains the core framework classes that form the basis of
|
||||
specific test cases and suites (TestCase, TestSuite etc.), and also a
|
||||
text-based utility class for running the tests and reporting the results
|
||||
(TextTestRunner).
|
||||
|
||||
Simple usage:
|
||||
|
||||
import unittest
|
||||
|
||||
class IntegerArithmeticTestCase(unittest.TestCase):
|
||||
def testAdd(self): # test method names begin with 'test'
|
||||
self.assertEqual((1 + 2), 3)
|
||||
self.assertEqual(0 + 1, 1)
|
||||
def testMultiply(self):
|
||||
self.assertEqual((0 * 10), 0)
|
||||
self.assertEqual((5 * 8), 40)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
Further information is available in the bundled documentation, and from
|
||||
|
||||
http://docs.python.org/library/unittest.html
|
||||
|
||||
Copyright (c) 1999-2003 Steve Purcell
|
||||
Copyright (c) 2003-2010 Python Software Foundation
|
||||
This module is free software, and you may redistribute it and/or modify
|
||||
it under the same terms as Python itself, so long as this copyright message
|
||||
and disclaimer are retained in their original form.
|
||||
|
||||
IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
|
||||
SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
|
||||
THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
|
||||
DAMAGE.
|
||||
|
||||
THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
|
||||
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
|
||||
PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
|
||||
AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
|
||||
SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
|
||||
"""
|
||||
|
||||
__all__ = ['TestResult', 'TestCase', 'IsolatedAsyncioTestCase', 'TestSuite',
|
||||
'TextTestRunner', 'TestLoader', 'FunctionTestCase', 'main',
|
||||
'defaultTestLoader', 'SkipTest', 'skip', 'skipIf', 'skipUnless',
|
||||
'expectedFailure', 'TextTestResult', 'installHandler',
|
||||
'registerResult', 'removeResult', 'removeHandler',
|
||||
'addModuleCleanup', 'doModuleCleanups', 'enterModuleContext']
|
||||
|
||||
# Expose obsolete functions for backwards compatibility
|
||||
# bpo-5846: Deprecated in Python 3.11, scheduled for removal in Python 3.13.
|
||||
__all__.extend(['getTestCaseNames', 'makeSuite', 'findTestCases'])
|
||||
|
||||
__unittest = True
|
||||
|
||||
from .result import TestResult
|
||||
from .case import (addModuleCleanup, TestCase, FunctionTestCase, SkipTest, skip,
|
||||
skipIf, skipUnless, expectedFailure, doModuleCleanups,
|
||||
enterModuleContext)
|
||||
from .suite import BaseTestSuite, TestSuite
|
||||
from .loader import TestLoader, defaultTestLoader
|
||||
from .main import TestProgram, main
|
||||
from .runner import TextTestRunner, TextTestResult
|
||||
from .signals import installHandler, registerResult, removeResult, removeHandler
|
||||
# IsolatedAsyncioTestCase will be imported lazily.
|
||||
from .loader import makeSuite, getTestCaseNames, findTestCases
|
||||
|
||||
|
||||
# Lazy import of IsolatedAsyncioTestCase from .async_case
|
||||
# It imports asyncio, which is relatively heavy, but most tests
|
||||
# do not need it.
|
||||
|
||||
def __dir__():
|
||||
return globals().keys() | {'IsolatedAsyncioTestCase'}
|
||||
|
||||
def __getattr__(name):
|
||||
if name == 'IsolatedAsyncioTestCase':
|
||||
global IsolatedAsyncioTestCase
|
||||
from .async_case import IsolatedAsyncioTestCase
|
||||
return IsolatedAsyncioTestCase
|
||||
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
|
||||
18
.CondaPkg/env/Lib/unittest/__main__.py
vendored
18
.CondaPkg/env/Lib/unittest/__main__.py
vendored
@@ -1,18 +0,0 @@
|
||||
"""Main entry point"""
|
||||
|
||||
import sys
|
||||
if sys.argv[0].endswith("__main__.py"):
|
||||
import os.path
|
||||
# We change sys.argv[0] to make help message more useful
|
||||
# use executable without path, unquoted
|
||||
# (it's just a hint anyway)
|
||||
# (if you have spaces in your executable you get what you deserve!)
|
||||
executable = os.path.basename(sys.executable)
|
||||
sys.argv[0] = executable + " -m unittest"
|
||||
del os
|
||||
|
||||
__unittest = True
|
||||
|
||||
from .main import main
|
||||
|
||||
main(module=None)
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
86
.CondaPkg/env/Lib/unittest/_log.py
vendored
86
.CondaPkg/env/Lib/unittest/_log.py
vendored
@@ -1,86 +0,0 @@
|
||||
import logging
|
||||
import collections
|
||||
|
||||
from .case import _BaseTestCaseContext
|
||||
|
||||
|
||||
_LoggingWatcher = collections.namedtuple("_LoggingWatcher",
|
||||
["records", "output"])
|
||||
|
||||
class _CapturingHandler(logging.Handler):
|
||||
"""
|
||||
A logging handler capturing all (raw and formatted) logging output.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
logging.Handler.__init__(self)
|
||||
self.watcher = _LoggingWatcher([], [])
|
||||
|
||||
def flush(self):
|
||||
pass
|
||||
|
||||
def emit(self, record):
|
||||
self.watcher.records.append(record)
|
||||
msg = self.format(record)
|
||||
self.watcher.output.append(msg)
|
||||
|
||||
|
||||
class _AssertLogsContext(_BaseTestCaseContext):
|
||||
"""A context manager for assertLogs() and assertNoLogs() """
|
||||
|
||||
LOGGING_FORMAT = "%(levelname)s:%(name)s:%(message)s"
|
||||
|
||||
def __init__(self, test_case, logger_name, level, no_logs):
|
||||
_BaseTestCaseContext.__init__(self, test_case)
|
||||
self.logger_name = logger_name
|
||||
if level:
|
||||
self.level = logging._nameToLevel.get(level, level)
|
||||
else:
|
||||
self.level = logging.INFO
|
||||
self.msg = None
|
||||
self.no_logs = no_logs
|
||||
|
||||
def __enter__(self):
|
||||
if isinstance(self.logger_name, logging.Logger):
|
||||
logger = self.logger = self.logger_name
|
||||
else:
|
||||
logger = self.logger = logging.getLogger(self.logger_name)
|
||||
formatter = logging.Formatter(self.LOGGING_FORMAT)
|
||||
handler = _CapturingHandler()
|
||||
handler.setLevel(self.level)
|
||||
handler.setFormatter(formatter)
|
||||
self.watcher = handler.watcher
|
||||
self.old_handlers = logger.handlers[:]
|
||||
self.old_level = logger.level
|
||||
self.old_propagate = logger.propagate
|
||||
logger.handlers = [handler]
|
||||
logger.setLevel(self.level)
|
||||
logger.propagate = False
|
||||
if self.no_logs:
|
||||
return
|
||||
return handler.watcher
|
||||
|
||||
def __exit__(self, exc_type, exc_value, tb):
|
||||
self.logger.handlers = self.old_handlers
|
||||
self.logger.propagate = self.old_propagate
|
||||
self.logger.setLevel(self.old_level)
|
||||
|
||||
if exc_type is not None:
|
||||
# let unexpected exceptions pass through
|
||||
return False
|
||||
|
||||
if self.no_logs:
|
||||
# assertNoLogs
|
||||
if len(self.watcher.records) > 0:
|
||||
self._raiseFailure(
|
||||
"Unexpected logs found: {!r}".format(
|
||||
self.watcher.output
|
||||
)
|
||||
)
|
||||
|
||||
else:
|
||||
# assertLogs
|
||||
if len(self.watcher.records) == 0:
|
||||
self._raiseFailure(
|
||||
"no logs of level {} or higher triggered on {}"
|
||||
.format(logging.getLevelName(self.level), self.logger.name))
|
||||
142
.CondaPkg/env/Lib/unittest/async_case.py
vendored
142
.CondaPkg/env/Lib/unittest/async_case.py
vendored
@@ -1,142 +0,0 @@
|
||||
import asyncio
|
||||
import contextvars
|
||||
import inspect
|
||||
import warnings
|
||||
|
||||
from .case import TestCase
|
||||
|
||||
|
||||
class IsolatedAsyncioTestCase(TestCase):
|
||||
# Names intentionally have a long prefix
|
||||
# to reduce a chance of clashing with user-defined attributes
|
||||
# from inherited test case
|
||||
#
|
||||
# The class doesn't call loop.run_until_complete(self.setUp()) and family
|
||||
# but uses a different approach:
|
||||
# 1. create a long-running task that reads self.setUp()
|
||||
# awaitable from queue along with a future
|
||||
# 2. await the awaitable object passing in and set the result
|
||||
# into the future object
|
||||
# 3. Outer code puts the awaitable and the future object into a queue
|
||||
# with waiting for the future
|
||||
# The trick is necessary because every run_until_complete() call
|
||||
# creates a new task with embedded ContextVar context.
|
||||
# To share contextvars between setUp(), test and tearDown() we need to execute
|
||||
# them inside the same task.
|
||||
|
||||
# Note: the test case modifies event loop policy if the policy was not instantiated
|
||||
# yet.
|
||||
# asyncio.get_event_loop_policy() creates a default policy on demand but never
|
||||
# returns None
|
||||
# I believe this is not an issue in user level tests but python itself for testing
|
||||
# should reset a policy in every test module
|
||||
# by calling asyncio.set_event_loop_policy(None) in tearDownModule()
|
||||
|
||||
def __init__(self, methodName='runTest'):
|
||||
super().__init__(methodName)
|
||||
self._asyncioRunner = None
|
||||
self._asyncioTestContext = contextvars.copy_context()
|
||||
|
||||
async def asyncSetUp(self):
|
||||
pass
|
||||
|
||||
async def asyncTearDown(self):
|
||||
pass
|
||||
|
||||
def addAsyncCleanup(self, func, /, *args, **kwargs):
|
||||
# A trivial trampoline to addCleanup()
|
||||
# the function exists because it has a different semantics
|
||||
# and signature:
|
||||
# addCleanup() accepts regular functions
|
||||
# but addAsyncCleanup() accepts coroutines
|
||||
#
|
||||
# We intentionally don't add inspect.iscoroutinefunction() check
|
||||
# for func argument because there is no way
|
||||
# to check for async function reliably:
|
||||
# 1. It can be "async def func()" itself
|
||||
# 2. Class can implement "async def __call__()" method
|
||||
# 3. Regular "def func()" that returns awaitable object
|
||||
self.addCleanup(*(func, *args), **kwargs)
|
||||
|
||||
async def enterAsyncContext(self, cm):
|
||||
"""Enters the supplied asynchronous context manager.
|
||||
|
||||
If successful, also adds its __aexit__ method as a cleanup
|
||||
function and returns the result of the __aenter__ method.
|
||||
"""
|
||||
# We look up the special methods on the type to match the with
|
||||
# statement.
|
||||
cls = type(cm)
|
||||
try:
|
||||
enter = cls.__aenter__
|
||||
exit = cls.__aexit__
|
||||
except AttributeError:
|
||||
raise TypeError(f"'{cls.__module__}.{cls.__qualname__}' object does "
|
||||
f"not support the asynchronous context manager protocol"
|
||||
) from None
|
||||
result = await enter(cm)
|
||||
self.addAsyncCleanup(exit, cm, None, None, None)
|
||||
return result
|
||||
|
||||
def _callSetUp(self):
|
||||
# Force loop to be initialized and set as the current loop
|
||||
# so that setUp functions can use get_event_loop() and get the
|
||||
# correct loop instance.
|
||||
self._asyncioRunner.get_loop()
|
||||
self._asyncioTestContext.run(self.setUp)
|
||||
self._callAsync(self.asyncSetUp)
|
||||
|
||||
def _callTestMethod(self, method):
|
||||
if self._callMaybeAsync(method) is not None:
|
||||
warnings.warn(f'It is deprecated to return a value that is not None from a '
|
||||
f'test case ({method})', DeprecationWarning, stacklevel=4)
|
||||
|
||||
def _callTearDown(self):
|
||||
self._callAsync(self.asyncTearDown)
|
||||
self._asyncioTestContext.run(self.tearDown)
|
||||
|
||||
def _callCleanup(self, function, *args, **kwargs):
|
||||
self._callMaybeAsync(function, *args, **kwargs)
|
||||
|
||||
def _callAsync(self, func, /, *args, **kwargs):
|
||||
assert self._asyncioRunner is not None, 'asyncio runner is not initialized'
|
||||
assert inspect.iscoroutinefunction(func), f'{func!r} is not an async function'
|
||||
return self._asyncioRunner.run(
|
||||
func(*args, **kwargs),
|
||||
context=self._asyncioTestContext
|
||||
)
|
||||
|
||||
def _callMaybeAsync(self, func, /, *args, **kwargs):
|
||||
assert self._asyncioRunner is not None, 'asyncio runner is not initialized'
|
||||
if inspect.iscoroutinefunction(func):
|
||||
return self._asyncioRunner.run(
|
||||
func(*args, **kwargs),
|
||||
context=self._asyncioTestContext,
|
||||
)
|
||||
else:
|
||||
return self._asyncioTestContext.run(func, *args, **kwargs)
|
||||
|
||||
def _setupAsyncioRunner(self):
|
||||
assert self._asyncioRunner is None, 'asyncio runner is already initialized'
|
||||
runner = asyncio.Runner(debug=True)
|
||||
self._asyncioRunner = runner
|
||||
|
||||
def _tearDownAsyncioRunner(self):
|
||||
runner = self._asyncioRunner
|
||||
runner.close()
|
||||
|
||||
def run(self, result=None):
|
||||
self._setupAsyncioRunner()
|
||||
try:
|
||||
return super().run(result)
|
||||
finally:
|
||||
self._tearDownAsyncioRunner()
|
||||
|
||||
def debug(self):
|
||||
self._setupAsyncioRunner()
|
||||
super().debug()
|
||||
self._tearDownAsyncioRunner()
|
||||
|
||||
def __del__(self):
|
||||
if self._asyncioRunner is not None:
|
||||
self._tearDownAsyncioRunner()
|
||||
1456
.CondaPkg/env/Lib/unittest/case.py
vendored
1456
.CondaPkg/env/Lib/unittest/case.py
vendored
File diff suppressed because it is too large
Load Diff
483
.CondaPkg/env/Lib/unittest/loader.py
vendored
483
.CondaPkg/env/Lib/unittest/loader.py
vendored
@@ -1,483 +0,0 @@
|
||||
"""Loading unittests."""
|
||||
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import traceback
|
||||
import types
|
||||
import functools
|
||||
|
||||
from fnmatch import fnmatch, fnmatchcase
|
||||
|
||||
from . import case, suite, util
|
||||
|
||||
__unittest = True
|
||||
|
||||
# what about .pyc (etc)
|
||||
# we would need to avoid loading the same tests multiple times
|
||||
# from '.py', *and* '.pyc'
|
||||
VALID_MODULE_NAME = re.compile(r'[_a-z]\w*\.py$', re.IGNORECASE)
|
||||
|
||||
|
||||
class _FailedTest(case.TestCase):
|
||||
_testMethodName = None
|
||||
|
||||
def __init__(self, method_name, exception):
|
||||
self._exception = exception
|
||||
super(_FailedTest, self).__init__(method_name)
|
||||
|
||||
def __getattr__(self, name):
|
||||
if name != self._testMethodName:
|
||||
return super(_FailedTest, self).__getattr__(name)
|
||||
def testFailure():
|
||||
raise self._exception
|
||||
return testFailure
|
||||
|
||||
|
||||
def _make_failed_import_test(name, suiteClass):
|
||||
message = 'Failed to import test module: %s\n%s' % (
|
||||
name, traceback.format_exc())
|
||||
return _make_failed_test(name, ImportError(message), suiteClass, message)
|
||||
|
||||
def _make_failed_load_tests(name, exception, suiteClass):
|
||||
message = 'Failed to call load_tests:\n%s' % (traceback.format_exc(),)
|
||||
return _make_failed_test(
|
||||
name, exception, suiteClass, message)
|
||||
|
||||
def _make_failed_test(methodname, exception, suiteClass, message):
|
||||
test = _FailedTest(methodname, exception)
|
||||
return suiteClass((test,)), message
|
||||
|
||||
def _make_skipped_test(methodname, exception, suiteClass):
|
||||
@case.skip(str(exception))
|
||||
def testSkipped(self):
|
||||
pass
|
||||
attrs = {methodname: testSkipped}
|
||||
TestClass = type("ModuleSkipped", (case.TestCase,), attrs)
|
||||
return suiteClass((TestClass(methodname),))
|
||||
|
||||
def _splitext(path):
|
||||
return os.path.splitext(path)[0]
|
||||
|
||||
|
||||
class TestLoader(object):
|
||||
"""
|
||||
This class is responsible for loading tests according to various criteria
|
||||
and returning them wrapped in a TestSuite
|
||||
"""
|
||||
testMethodPrefix = 'test'
|
||||
sortTestMethodsUsing = staticmethod(util.three_way_cmp)
|
||||
testNamePatterns = None
|
||||
suiteClass = suite.TestSuite
|
||||
_top_level_dir = None
|
||||
|
||||
def __init__(self):
|
||||
super(TestLoader, self).__init__()
|
||||
self.errors = []
|
||||
# Tracks packages which we have called into via load_tests, to
|
||||
# avoid infinite re-entrancy.
|
||||
self._loading_packages = set()
|
||||
|
||||
def loadTestsFromTestCase(self, testCaseClass):
|
||||
"""Return a suite of all test cases contained in testCaseClass"""
|
||||
if issubclass(testCaseClass, suite.TestSuite):
|
||||
raise TypeError("Test cases should not be derived from "
|
||||
"TestSuite. Maybe you meant to derive from "
|
||||
"TestCase?")
|
||||
testCaseNames = self.getTestCaseNames(testCaseClass)
|
||||
if not testCaseNames and hasattr(testCaseClass, 'runTest'):
|
||||
testCaseNames = ['runTest']
|
||||
loaded_suite = self.suiteClass(map(testCaseClass, testCaseNames))
|
||||
return loaded_suite
|
||||
|
||||
def loadTestsFromModule(self, module, *, pattern=None):
|
||||
"""Return a suite of all test cases contained in the given module"""
|
||||
tests = []
|
||||
for name in dir(module):
|
||||
obj = getattr(module, name)
|
||||
if isinstance(obj, type) and issubclass(obj, case.TestCase):
|
||||
tests.append(self.loadTestsFromTestCase(obj))
|
||||
|
||||
load_tests = getattr(module, 'load_tests', None)
|
||||
tests = self.suiteClass(tests)
|
||||
if load_tests is not None:
|
||||
try:
|
||||
return load_tests(self, tests, pattern)
|
||||
except Exception as e:
|
||||
error_case, error_message = _make_failed_load_tests(
|
||||
module.__name__, e, self.suiteClass)
|
||||
self.errors.append(error_message)
|
||||
return error_case
|
||||
return tests
|
||||
|
||||
def loadTestsFromName(self, name, module=None):
|
||||
"""Return a suite of all test cases given a string specifier.
|
||||
|
||||
The name may resolve either to a module, a test case class, a
|
||||
test method within a test case class, or a callable object which
|
||||
returns a TestCase or TestSuite instance.
|
||||
|
||||
The method optionally resolves the names relative to a given module.
|
||||
"""
|
||||
parts = name.split('.')
|
||||
error_case, error_message = None, None
|
||||
if module is None:
|
||||
parts_copy = parts[:]
|
||||
while parts_copy:
|
||||
try:
|
||||
module_name = '.'.join(parts_copy)
|
||||
module = __import__(module_name)
|
||||
break
|
||||
except ImportError:
|
||||
next_attribute = parts_copy.pop()
|
||||
# Last error so we can give it to the user if needed.
|
||||
error_case, error_message = _make_failed_import_test(
|
||||
next_attribute, self.suiteClass)
|
||||
if not parts_copy:
|
||||
# Even the top level import failed: report that error.
|
||||
self.errors.append(error_message)
|
||||
return error_case
|
||||
parts = parts[1:]
|
||||
obj = module
|
||||
for part in parts:
|
||||
try:
|
||||
parent, obj = obj, getattr(obj, part)
|
||||
except AttributeError as e:
|
||||
# We can't traverse some part of the name.
|
||||
if (getattr(obj, '__path__', None) is not None
|
||||
and error_case is not None):
|
||||
# This is a package (no __path__ per importlib docs), and we
|
||||
# encountered an error importing something. We cannot tell
|
||||
# the difference between package.WrongNameTestClass and
|
||||
# package.wrong_module_name so we just report the
|
||||
# ImportError - it is more informative.
|
||||
self.errors.append(error_message)
|
||||
return error_case
|
||||
else:
|
||||
# Otherwise, we signal that an AttributeError has occurred.
|
||||
error_case, error_message = _make_failed_test(
|
||||
part, e, self.suiteClass,
|
||||
'Failed to access attribute:\n%s' % (
|
||||
traceback.format_exc(),))
|
||||
self.errors.append(error_message)
|
||||
return error_case
|
||||
|
||||
if isinstance(obj, types.ModuleType):
|
||||
return self.loadTestsFromModule(obj)
|
||||
elif isinstance(obj, type) and issubclass(obj, case.TestCase):
|
||||
return self.loadTestsFromTestCase(obj)
|
||||
elif (isinstance(obj, types.FunctionType) and
|
||||
isinstance(parent, type) and
|
||||
issubclass(parent, case.TestCase)):
|
||||
name = parts[-1]
|
||||
inst = parent(name)
|
||||
# static methods follow a different path
|
||||
if not isinstance(getattr(inst, name), types.FunctionType):
|
||||
return self.suiteClass([inst])
|
||||
elif isinstance(obj, suite.TestSuite):
|
||||
return obj
|
||||
if callable(obj):
|
||||
test = obj()
|
||||
if isinstance(test, suite.TestSuite):
|
||||
return test
|
||||
elif isinstance(test, case.TestCase):
|
||||
return self.suiteClass([test])
|
||||
else:
|
||||
raise TypeError("calling %s returned %s, not a test" %
|
||||
(obj, test))
|
||||
else:
|
||||
raise TypeError("don't know how to make test from: %s" % obj)
|
||||
|
||||
def loadTestsFromNames(self, names, module=None):
|
||||
"""Return a suite of all test cases found using the given sequence
|
||||
of string specifiers. See 'loadTestsFromName()'.
|
||||
"""
|
||||
suites = [self.loadTestsFromName(name, module) for name in names]
|
||||
return self.suiteClass(suites)
|
||||
|
||||
def getTestCaseNames(self, testCaseClass):
|
||||
"""Return a sorted sequence of method names found within testCaseClass
|
||||
"""
|
||||
def shouldIncludeMethod(attrname):
|
||||
if not attrname.startswith(self.testMethodPrefix):
|
||||
return False
|
||||
testFunc = getattr(testCaseClass, attrname)
|
||||
if not callable(testFunc):
|
||||
return False
|
||||
fullName = f'%s.%s.%s' % (
|
||||
testCaseClass.__module__, testCaseClass.__qualname__, attrname
|
||||
)
|
||||
return self.testNamePatterns is None or \
|
||||
any(fnmatchcase(fullName, pattern) for pattern in self.testNamePatterns)
|
||||
testFnNames = list(filter(shouldIncludeMethod, dir(testCaseClass)))
|
||||
if self.sortTestMethodsUsing:
|
||||
testFnNames.sort(key=functools.cmp_to_key(self.sortTestMethodsUsing))
|
||||
return testFnNames
|
||||
|
||||
def discover(self, start_dir, pattern='test*.py', top_level_dir=None):
|
||||
"""Find and return all test modules from the specified start
|
||||
directory, recursing into subdirectories to find them and return all
|
||||
tests found within them. Only test files that match the pattern will
|
||||
be loaded. (Using shell style pattern matching.)
|
||||
|
||||
All test modules must be importable from the top level of the project.
|
||||
If the start directory is not the top level directory then the top
|
||||
level directory must be specified separately.
|
||||
|
||||
If a test package name (directory with '__init__.py') matches the
|
||||
pattern then the package will be checked for a 'load_tests' function. If
|
||||
this exists then it will be called with (loader, tests, pattern) unless
|
||||
the package has already had load_tests called from the same discovery
|
||||
invocation, in which case the package module object is not scanned for
|
||||
tests - this ensures that when a package uses discover to further
|
||||
discover child tests that infinite recursion does not happen.
|
||||
|
||||
If load_tests exists then discovery does *not* recurse into the package,
|
||||
load_tests is responsible for loading all tests in the package.
|
||||
|
||||
The pattern is deliberately not stored as a loader attribute so that
|
||||
packages can continue discovery themselves. top_level_dir is stored so
|
||||
load_tests does not need to pass this argument in to loader.discover().
|
||||
|
||||
Paths are sorted before being imported to ensure reproducible execution
|
||||
order even on filesystems with non-alphabetical ordering like ext3/4.
|
||||
"""
|
||||
set_implicit_top = False
|
||||
if top_level_dir is None and self._top_level_dir is not None:
|
||||
# make top_level_dir optional if called from load_tests in a package
|
||||
top_level_dir = self._top_level_dir
|
||||
elif top_level_dir is None:
|
||||
set_implicit_top = True
|
||||
top_level_dir = start_dir
|
||||
|
||||
top_level_dir = os.path.abspath(top_level_dir)
|
||||
|
||||
if not top_level_dir in sys.path:
|
||||
# all test modules must be importable from the top level directory
|
||||
# should we *unconditionally* put the start directory in first
|
||||
# in sys.path to minimise likelihood of conflicts between installed
|
||||
# modules and development versions?
|
||||
sys.path.insert(0, top_level_dir)
|
||||
self._top_level_dir = top_level_dir
|
||||
|
||||
is_not_importable = False
|
||||
if os.path.isdir(os.path.abspath(start_dir)):
|
||||
start_dir = os.path.abspath(start_dir)
|
||||
if start_dir != top_level_dir:
|
||||
is_not_importable = not os.path.isfile(os.path.join(start_dir, '__init__.py'))
|
||||
else:
|
||||
# support for discovery from dotted module names
|
||||
try:
|
||||
__import__(start_dir)
|
||||
except ImportError:
|
||||
is_not_importable = True
|
||||
else:
|
||||
the_module = sys.modules[start_dir]
|
||||
top_part = start_dir.split('.')[0]
|
||||
try:
|
||||
start_dir = os.path.abspath(
|
||||
os.path.dirname((the_module.__file__)))
|
||||
except AttributeError:
|
||||
if the_module.__name__ in sys.builtin_module_names:
|
||||
# builtin module
|
||||
raise TypeError('Can not use builtin modules '
|
||||
'as dotted module names') from None
|
||||
else:
|
||||
raise TypeError(
|
||||
f"don't know how to discover from {the_module!r}"
|
||||
) from None
|
||||
|
||||
if set_implicit_top:
|
||||
self._top_level_dir = self._get_directory_containing_module(top_part)
|
||||
sys.path.remove(top_level_dir)
|
||||
|
||||
if is_not_importable:
|
||||
raise ImportError('Start directory is not importable: %r' % start_dir)
|
||||
|
||||
tests = list(self._find_tests(start_dir, pattern))
|
||||
return self.suiteClass(tests)
|
||||
|
||||
def _get_directory_containing_module(self, module_name):
|
||||
module = sys.modules[module_name]
|
||||
full_path = os.path.abspath(module.__file__)
|
||||
|
||||
if os.path.basename(full_path).lower().startswith('__init__.py'):
|
||||
return os.path.dirname(os.path.dirname(full_path))
|
||||
else:
|
||||
# here we have been given a module rather than a package - so
|
||||
# all we can do is search the *same* directory the module is in
|
||||
# should an exception be raised instead
|
||||
return os.path.dirname(full_path)
|
||||
|
||||
def _get_name_from_path(self, path):
|
||||
if path == self._top_level_dir:
|
||||
return '.'
|
||||
path = _splitext(os.path.normpath(path))
|
||||
|
||||
_relpath = os.path.relpath(path, self._top_level_dir)
|
||||
assert not os.path.isabs(_relpath), "Path must be within the project"
|
||||
assert not _relpath.startswith('..'), "Path must be within the project"
|
||||
|
||||
name = _relpath.replace(os.path.sep, '.')
|
||||
return name
|
||||
|
||||
def _get_module_from_name(self, name):
|
||||
__import__(name)
|
||||
return sys.modules[name]
|
||||
|
||||
def _match_path(self, path, full_path, pattern):
|
||||
# override this method to use alternative matching strategy
|
||||
return fnmatch(path, pattern)
|
||||
|
||||
def _find_tests(self, start_dir, pattern):
|
||||
"""Used by discovery. Yields test suites it loads."""
|
||||
# Handle the __init__ in this package
|
||||
name = self._get_name_from_path(start_dir)
|
||||
# name is '.' when start_dir == top_level_dir (and top_level_dir is by
|
||||
# definition not a package).
|
||||
if name != '.' and name not in self._loading_packages:
|
||||
# name is in self._loading_packages while we have called into
|
||||
# loadTestsFromModule with name.
|
||||
tests, should_recurse = self._find_test_path(start_dir, pattern)
|
||||
if tests is not None:
|
||||
yield tests
|
||||
if not should_recurse:
|
||||
# Either an error occurred, or load_tests was used by the
|
||||
# package.
|
||||
return
|
||||
# Handle the contents.
|
||||
paths = sorted(os.listdir(start_dir))
|
||||
for path in paths:
|
||||
full_path = os.path.join(start_dir, path)
|
||||
tests, should_recurse = self._find_test_path(full_path, pattern)
|
||||
if tests is not None:
|
||||
yield tests
|
||||
if should_recurse:
|
||||
# we found a package that didn't use load_tests.
|
||||
name = self._get_name_from_path(full_path)
|
||||
self._loading_packages.add(name)
|
||||
try:
|
||||
yield from self._find_tests(full_path, pattern)
|
||||
finally:
|
||||
self._loading_packages.discard(name)
|
||||
|
||||
def _find_test_path(self, full_path, pattern):
|
||||
"""Used by discovery.
|
||||
|
||||
Loads tests from a single file, or a directories' __init__.py when
|
||||
passed the directory.
|
||||
|
||||
Returns a tuple (None_or_tests_from_file, should_recurse).
|
||||
"""
|
||||
basename = os.path.basename(full_path)
|
||||
if os.path.isfile(full_path):
|
||||
if not VALID_MODULE_NAME.match(basename):
|
||||
# valid Python identifiers only
|
||||
return None, False
|
||||
if not self._match_path(basename, full_path, pattern):
|
||||
return None, False
|
||||
# if the test file matches, load it
|
||||
name = self._get_name_from_path(full_path)
|
||||
try:
|
||||
module = self._get_module_from_name(name)
|
||||
except case.SkipTest as e:
|
||||
return _make_skipped_test(name, e, self.suiteClass), False
|
||||
except:
|
||||
error_case, error_message = \
|
||||
_make_failed_import_test(name, self.suiteClass)
|
||||
self.errors.append(error_message)
|
||||
return error_case, False
|
||||
else:
|
||||
mod_file = os.path.abspath(
|
||||
getattr(module, '__file__', full_path))
|
||||
realpath = _splitext(
|
||||
os.path.realpath(mod_file))
|
||||
fullpath_noext = _splitext(
|
||||
os.path.realpath(full_path))
|
||||
if realpath.lower() != fullpath_noext.lower():
|
||||
module_dir = os.path.dirname(realpath)
|
||||
mod_name = _splitext(
|
||||
os.path.basename(full_path))
|
||||
expected_dir = os.path.dirname(full_path)
|
||||
msg = ("%r module incorrectly imported from %r. Expected "
|
||||
"%r. Is this module globally installed?")
|
||||
raise ImportError(
|
||||
msg % (mod_name, module_dir, expected_dir))
|
||||
return self.loadTestsFromModule(module, pattern=pattern), False
|
||||
elif os.path.isdir(full_path):
|
||||
if not os.path.isfile(os.path.join(full_path, '__init__.py')):
|
||||
return None, False
|
||||
|
||||
load_tests = None
|
||||
tests = None
|
||||
name = self._get_name_from_path(full_path)
|
||||
try:
|
||||
package = self._get_module_from_name(name)
|
||||
except case.SkipTest as e:
|
||||
return _make_skipped_test(name, e, self.suiteClass), False
|
||||
except:
|
||||
error_case, error_message = \
|
||||
_make_failed_import_test(name, self.suiteClass)
|
||||
self.errors.append(error_message)
|
||||
return error_case, False
|
||||
else:
|
||||
load_tests = getattr(package, 'load_tests', None)
|
||||
# Mark this package as being in load_tests (possibly ;))
|
||||
self._loading_packages.add(name)
|
||||
try:
|
||||
tests = self.loadTestsFromModule(package, pattern=pattern)
|
||||
if load_tests is not None:
|
||||
# loadTestsFromModule(package) has loaded tests for us.
|
||||
return tests, False
|
||||
return tests, True
|
||||
finally:
|
||||
self._loading_packages.discard(name)
|
||||
else:
|
||||
return None, False
|
||||
|
||||
|
||||
defaultTestLoader = TestLoader()
|
||||
|
||||
|
||||
# These functions are considered obsolete for long time.
|
||||
# They will be removed in Python 3.13.
|
||||
|
||||
def _makeLoader(prefix, sortUsing, suiteClass=None, testNamePatterns=None):
|
||||
loader = TestLoader()
|
||||
loader.sortTestMethodsUsing = sortUsing
|
||||
loader.testMethodPrefix = prefix
|
||||
loader.testNamePatterns = testNamePatterns
|
||||
if suiteClass:
|
||||
loader.suiteClass = suiteClass
|
||||
return loader
|
||||
|
||||
def getTestCaseNames(testCaseClass, prefix, sortUsing=util.three_way_cmp, testNamePatterns=None):
|
||||
import warnings
|
||||
warnings.warn(
|
||||
"unittest.getTestCaseNames() is deprecated and will be removed in Python 3.13. "
|
||||
"Please use unittest.TestLoader.getTestCaseNames() instead.",
|
||||
DeprecationWarning, stacklevel=2
|
||||
)
|
||||
return _makeLoader(prefix, sortUsing, testNamePatterns=testNamePatterns).getTestCaseNames(testCaseClass)
|
||||
|
||||
def makeSuite(testCaseClass, prefix='test', sortUsing=util.three_way_cmp,
|
||||
suiteClass=suite.TestSuite):
|
||||
import warnings
|
||||
warnings.warn(
|
||||
"unittest.makeSuite() is deprecated and will be removed in Python 3.13. "
|
||||
"Please use unittest.TestLoader.loadTestsFromTestCase() instead.",
|
||||
DeprecationWarning, stacklevel=2
|
||||
)
|
||||
return _makeLoader(prefix, sortUsing, suiteClass).loadTestsFromTestCase(
|
||||
testCaseClass)
|
||||
|
||||
def findTestCases(module, prefix='test', sortUsing=util.three_way_cmp,
|
||||
suiteClass=suite.TestSuite):
|
||||
import warnings
|
||||
warnings.warn(
|
||||
"unittest.findTestCases() is deprecated and will be removed in Python 3.13. "
|
||||
"Please use unittest.TestLoader.loadTestsFromModule() instead.",
|
||||
DeprecationWarning, stacklevel=2
|
||||
)
|
||||
return _makeLoader(prefix, sortUsing, suiteClass).loadTestsFromModule(\
|
||||
module)
|
||||
291
.CondaPkg/env/Lib/unittest/main.py
vendored
291
.CondaPkg/env/Lib/unittest/main.py
vendored
@@ -1,291 +0,0 @@
|
||||
"""Unittest main program"""
|
||||
|
||||
import sys
|
||||
import argparse
|
||||
import os
|
||||
import warnings
|
||||
|
||||
from . import loader, runner
|
||||
from .signals import installHandler
|
||||
|
||||
__unittest = True
|
||||
_NO_TESTS_EXITCODE = 5
|
||||
|
||||
MAIN_EXAMPLES = """\
|
||||
Examples:
|
||||
%(prog)s test_module - run tests from test_module
|
||||
%(prog)s module.TestClass - run tests from module.TestClass
|
||||
%(prog)s module.Class.test_method - run specified test method
|
||||
%(prog)s path/to/test_file.py - run tests from test_file.py
|
||||
"""
|
||||
|
||||
MODULE_EXAMPLES = """\
|
||||
Examples:
|
||||
%(prog)s - run default set of tests
|
||||
%(prog)s MyTestSuite - run suite 'MyTestSuite'
|
||||
%(prog)s MyTestCase.testSomething - run MyTestCase.testSomething
|
||||
%(prog)s MyTestCase - run all 'test*' test methods
|
||||
in MyTestCase
|
||||
"""
|
||||
|
||||
def _convert_name(name):
|
||||
# on Linux / Mac OS X 'foo.PY' is not importable, but on
|
||||
# Windows it is. Simpler to do a case insensitive match
|
||||
# a better check would be to check that the name is a
|
||||
# valid Python module name.
|
||||
if os.path.isfile(name) and name.lower().endswith('.py'):
|
||||
if os.path.isabs(name):
|
||||
rel_path = os.path.relpath(name, os.getcwd())
|
||||
if os.path.isabs(rel_path) or rel_path.startswith(os.pardir):
|
||||
return name
|
||||
name = rel_path
|
||||
# on Windows both '\' and '/' are used as path
|
||||
# separators. Better to replace both than rely on os.path.sep
|
||||
return os.path.normpath(name)[:-3].replace('\\', '.').replace('/', '.')
|
||||
return name
|
||||
|
||||
def _convert_names(names):
|
||||
return [_convert_name(name) for name in names]
|
||||
|
||||
|
||||
def _convert_select_pattern(pattern):
|
||||
if not '*' in pattern:
|
||||
pattern = '*%s*' % pattern
|
||||
return pattern
|
||||
|
||||
|
||||
class TestProgram(object):
|
||||
"""A command-line program that runs a set of tests; this is primarily
|
||||
for making test modules conveniently executable.
|
||||
"""
|
||||
# defaults for testing
|
||||
module=None
|
||||
verbosity = 1
|
||||
failfast = catchbreak = buffer = progName = warnings = testNamePatterns = None
|
||||
_discovery_parser = None
|
||||
|
||||
def __init__(self, module='__main__', defaultTest=None, argv=None,
|
||||
testRunner=None, testLoader=loader.defaultTestLoader,
|
||||
exit=True, verbosity=1, failfast=None, catchbreak=None,
|
||||
buffer=None, warnings=None, *, tb_locals=False,
|
||||
durations=None):
|
||||
if isinstance(module, str):
|
||||
self.module = __import__(module)
|
||||
for part in module.split('.')[1:]:
|
||||
self.module = getattr(self.module, part)
|
||||
else:
|
||||
self.module = module
|
||||
if argv is None:
|
||||
argv = sys.argv
|
||||
|
||||
self.exit = exit
|
||||
self.failfast = failfast
|
||||
self.catchbreak = catchbreak
|
||||
self.verbosity = verbosity
|
||||
self.buffer = buffer
|
||||
self.tb_locals = tb_locals
|
||||
self.durations = durations
|
||||
if warnings is None and not sys.warnoptions:
|
||||
# even if DeprecationWarnings are ignored by default
|
||||
# print them anyway unless other warnings settings are
|
||||
# specified by the warnings arg or the -W python flag
|
||||
self.warnings = 'default'
|
||||
else:
|
||||
# here self.warnings is set either to the value passed
|
||||
# to the warnings args or to None.
|
||||
# If the user didn't pass a value self.warnings will
|
||||
# be None. This means that the behavior is unchanged
|
||||
# and depends on the values passed to -W.
|
||||
self.warnings = warnings
|
||||
self.defaultTest = defaultTest
|
||||
self.testRunner = testRunner
|
||||
self.testLoader = testLoader
|
||||
self.progName = os.path.basename(argv[0])
|
||||
self.parseArgs(argv)
|
||||
self.runTests()
|
||||
|
||||
def usageExit(self, msg=None):
|
||||
warnings.warn("TestProgram.usageExit() is deprecated and will be"
|
||||
" removed in Python 3.13", DeprecationWarning)
|
||||
if msg:
|
||||
print(msg)
|
||||
if self._discovery_parser is None:
|
||||
self._initArgParsers()
|
||||
self._print_help()
|
||||
sys.exit(2)
|
||||
|
||||
def _print_help(self, *args, **kwargs):
|
||||
if self.module is None:
|
||||
print(self._main_parser.format_help())
|
||||
print(MAIN_EXAMPLES % {'prog': self.progName})
|
||||
self._discovery_parser.print_help()
|
||||
else:
|
||||
print(self._main_parser.format_help())
|
||||
print(MODULE_EXAMPLES % {'prog': self.progName})
|
||||
|
||||
def parseArgs(self, argv):
|
||||
self._initArgParsers()
|
||||
if self.module is None:
|
||||
if len(argv) > 1 and argv[1].lower() == 'discover':
|
||||
self._do_discovery(argv[2:])
|
||||
return
|
||||
self._main_parser.parse_args(argv[1:], self)
|
||||
if not self.tests:
|
||||
# this allows "python -m unittest -v" to still work for
|
||||
# test discovery.
|
||||
self._do_discovery([])
|
||||
return
|
||||
else:
|
||||
self._main_parser.parse_args(argv[1:], self)
|
||||
|
||||
if self.tests:
|
||||
self.testNames = _convert_names(self.tests)
|
||||
if __name__ == '__main__':
|
||||
# to support python -m unittest ...
|
||||
self.module = None
|
||||
elif self.defaultTest is None:
|
||||
# createTests will load tests from self.module
|
||||
self.testNames = None
|
||||
elif isinstance(self.defaultTest, str):
|
||||
self.testNames = (self.defaultTest,)
|
||||
else:
|
||||
self.testNames = list(self.defaultTest)
|
||||
self.createTests()
|
||||
|
||||
def createTests(self, from_discovery=False, Loader=None):
|
||||
if self.testNamePatterns:
|
||||
self.testLoader.testNamePatterns = self.testNamePatterns
|
||||
if from_discovery:
|
||||
loader = self.testLoader if Loader is None else Loader()
|
||||
self.test = loader.discover(self.start, self.pattern, self.top)
|
||||
elif self.testNames is None:
|
||||
self.test = self.testLoader.loadTestsFromModule(self.module)
|
||||
else:
|
||||
self.test = self.testLoader.loadTestsFromNames(self.testNames,
|
||||
self.module)
|
||||
|
||||
def _initArgParsers(self):
|
||||
parent_parser = self._getParentArgParser()
|
||||
self._main_parser = self._getMainArgParser(parent_parser)
|
||||
self._discovery_parser = self._getDiscoveryArgParser(parent_parser)
|
||||
|
||||
def _getParentArgParser(self):
|
||||
parser = argparse.ArgumentParser(add_help=False)
|
||||
|
||||
parser.add_argument('-v', '--verbose', dest='verbosity',
|
||||
action='store_const', const=2,
|
||||
help='Verbose output')
|
||||
parser.add_argument('-q', '--quiet', dest='verbosity',
|
||||
action='store_const', const=0,
|
||||
help='Quiet output')
|
||||
parser.add_argument('--locals', dest='tb_locals',
|
||||
action='store_true',
|
||||
help='Show local variables in tracebacks')
|
||||
parser.add_argument('--durations', dest='durations', type=int,
|
||||
default=None, metavar="N",
|
||||
help='Show the N slowest test cases (N=0 for all)')
|
||||
if self.failfast is None:
|
||||
parser.add_argument('-f', '--failfast', dest='failfast',
|
||||
action='store_true',
|
||||
help='Stop on first fail or error')
|
||||
self.failfast = False
|
||||
if self.catchbreak is None:
|
||||
parser.add_argument('-c', '--catch', dest='catchbreak',
|
||||
action='store_true',
|
||||
help='Catch Ctrl-C and display results so far')
|
||||
self.catchbreak = False
|
||||
if self.buffer is None:
|
||||
parser.add_argument('-b', '--buffer', dest='buffer',
|
||||
action='store_true',
|
||||
help='Buffer stdout and stderr during tests')
|
||||
self.buffer = False
|
||||
if self.testNamePatterns is None:
|
||||
parser.add_argument('-k', dest='testNamePatterns',
|
||||
action='append', type=_convert_select_pattern,
|
||||
help='Only run tests which match the given substring')
|
||||
self.testNamePatterns = []
|
||||
|
||||
return parser
|
||||
|
||||
def _getMainArgParser(self, parent):
|
||||
parser = argparse.ArgumentParser(parents=[parent])
|
||||
parser.prog = self.progName
|
||||
parser.print_help = self._print_help
|
||||
|
||||
parser.add_argument('tests', nargs='*',
|
||||
help='a list of any number of test modules, '
|
||||
'classes and test methods.')
|
||||
|
||||
return parser
|
||||
|
||||
def _getDiscoveryArgParser(self, parent):
|
||||
parser = argparse.ArgumentParser(parents=[parent])
|
||||
parser.prog = '%s discover' % self.progName
|
||||
parser.epilog = ('For test discovery all test modules must be '
|
||||
'importable from the top level directory of the '
|
||||
'project.')
|
||||
|
||||
parser.add_argument('-s', '--start-directory', dest='start',
|
||||
help="Directory to start discovery ('.' default)")
|
||||
parser.add_argument('-p', '--pattern', dest='pattern',
|
||||
help="Pattern to match tests ('test*.py' default)")
|
||||
parser.add_argument('-t', '--top-level-directory', dest='top',
|
||||
help='Top level directory of project (defaults to '
|
||||
'start directory)')
|
||||
for arg in ('start', 'pattern', 'top'):
|
||||
parser.add_argument(arg, nargs='?',
|
||||
default=argparse.SUPPRESS,
|
||||
help=argparse.SUPPRESS)
|
||||
|
||||
return parser
|
||||
|
||||
def _do_discovery(self, argv, Loader=None):
|
||||
self.start = '.'
|
||||
self.pattern = 'test*.py'
|
||||
self.top = None
|
||||
if argv is not None:
|
||||
# handle command line args for test discovery
|
||||
if self._discovery_parser is None:
|
||||
# for testing
|
||||
self._initArgParsers()
|
||||
self._discovery_parser.parse_args(argv, self)
|
||||
|
||||
self.createTests(from_discovery=True, Loader=Loader)
|
||||
|
||||
def runTests(self):
|
||||
if self.catchbreak:
|
||||
installHandler()
|
||||
if self.testRunner is None:
|
||||
self.testRunner = runner.TextTestRunner
|
||||
if isinstance(self.testRunner, type):
|
||||
try:
|
||||
try:
|
||||
testRunner = self.testRunner(verbosity=self.verbosity,
|
||||
failfast=self.failfast,
|
||||
buffer=self.buffer,
|
||||
warnings=self.warnings,
|
||||
tb_locals=self.tb_locals,
|
||||
durations=self.durations)
|
||||
except TypeError:
|
||||
# didn't accept the tb_locals or durations argument
|
||||
testRunner = self.testRunner(verbosity=self.verbosity,
|
||||
failfast=self.failfast,
|
||||
buffer=self.buffer,
|
||||
warnings=self.warnings)
|
||||
except TypeError:
|
||||
# didn't accept the verbosity, buffer or failfast arguments
|
||||
testRunner = self.testRunner()
|
||||
else:
|
||||
# it is assumed to be a TestRunner instance
|
||||
testRunner = self.testRunner
|
||||
self.result = testRunner.run(self.test)
|
||||
if self.exit:
|
||||
if self.result.testsRun == 0:
|
||||
sys.exit(_NO_TESTS_EXITCODE)
|
||||
elif self.result.wasSuccessful():
|
||||
sys.exit(0)
|
||||
else:
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
main = TestProgram
|
||||
3007
.CondaPkg/env/Lib/unittest/mock.py
vendored
3007
.CondaPkg/env/Lib/unittest/mock.py
vendored
File diff suppressed because it is too large
Load Diff
256
.CondaPkg/env/Lib/unittest/result.py
vendored
256
.CondaPkg/env/Lib/unittest/result.py
vendored
@@ -1,256 +0,0 @@
|
||||
"""Test result object"""
|
||||
|
||||
import io
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
from . import util
|
||||
from functools import wraps
|
||||
|
||||
__unittest = True
|
||||
|
||||
def failfast(method):
|
||||
@wraps(method)
|
||||
def inner(self, *args, **kw):
|
||||
if getattr(self, 'failfast', False):
|
||||
self.stop()
|
||||
return method(self, *args, **kw)
|
||||
return inner
|
||||
|
||||
STDOUT_LINE = '\nStdout:\n%s'
|
||||
STDERR_LINE = '\nStderr:\n%s'
|
||||
|
||||
|
||||
class TestResult(object):
|
||||
"""Holder for test result information.
|
||||
|
||||
Test results are automatically managed by the TestCase and TestSuite
|
||||
classes, and do not need to be explicitly manipulated by writers of tests.
|
||||
|
||||
Each instance holds the total number of tests run, and collections of
|
||||
failures and errors that occurred among those test runs. The collections
|
||||
contain tuples of (testcase, exceptioninfo), where exceptioninfo is the
|
||||
formatted traceback of the error that occurred.
|
||||
"""
|
||||
_previousTestClass = None
|
||||
_testRunEntered = False
|
||||
_moduleSetUpFailed = False
|
||||
def __init__(self, stream=None, descriptions=None, verbosity=None):
|
||||
self.failfast = False
|
||||
self.failures = []
|
||||
self.errors = []
|
||||
self.testsRun = 0
|
||||
self.skipped = []
|
||||
self.expectedFailures = []
|
||||
self.unexpectedSuccesses = []
|
||||
self.collectedDurations = []
|
||||
self.shouldStop = False
|
||||
self.buffer = False
|
||||
self.tb_locals = False
|
||||
self._stdout_buffer = None
|
||||
self._stderr_buffer = None
|
||||
self._original_stdout = sys.stdout
|
||||
self._original_stderr = sys.stderr
|
||||
self._mirrorOutput = False
|
||||
|
||||
def printErrors(self):
|
||||
"Called by TestRunner after test run"
|
||||
|
||||
def startTest(self, test):
|
||||
"Called when the given test is about to be run"
|
||||
self.testsRun += 1
|
||||
self._mirrorOutput = False
|
||||
self._setupStdout()
|
||||
|
||||
def _setupStdout(self):
|
||||
if self.buffer:
|
||||
if self._stderr_buffer is None:
|
||||
self._stderr_buffer = io.StringIO()
|
||||
self._stdout_buffer = io.StringIO()
|
||||
sys.stdout = self._stdout_buffer
|
||||
sys.stderr = self._stderr_buffer
|
||||
|
||||
def startTestRun(self):
|
||||
"""Called once before any tests are executed.
|
||||
|
||||
See startTest for a method called before each test.
|
||||
"""
|
||||
|
||||
def stopTest(self, test):
|
||||
"""Called when the given test has been run"""
|
||||
self._restoreStdout()
|
||||
self._mirrorOutput = False
|
||||
|
||||
def _restoreStdout(self):
|
||||
if self.buffer:
|
||||
if self._mirrorOutput:
|
||||
output = sys.stdout.getvalue()
|
||||
error = sys.stderr.getvalue()
|
||||
if output:
|
||||
if not output.endswith('\n'):
|
||||
output += '\n'
|
||||
self._original_stdout.write(STDOUT_LINE % output)
|
||||
if error:
|
||||
if not error.endswith('\n'):
|
||||
error += '\n'
|
||||
self._original_stderr.write(STDERR_LINE % error)
|
||||
|
||||
sys.stdout = self._original_stdout
|
||||
sys.stderr = self._original_stderr
|
||||
self._stdout_buffer.seek(0)
|
||||
self._stdout_buffer.truncate()
|
||||
self._stderr_buffer.seek(0)
|
||||
self._stderr_buffer.truncate()
|
||||
|
||||
def stopTestRun(self):
|
||||
"""Called once after all tests are executed.
|
||||
|
||||
See stopTest for a method called after each test.
|
||||
"""
|
||||
|
||||
@failfast
|
||||
def addError(self, test, err):
|
||||
"""Called when an error has occurred. 'err' is a tuple of values as
|
||||
returned by sys.exc_info().
|
||||
"""
|
||||
self.errors.append((test, self._exc_info_to_string(err, test)))
|
||||
self._mirrorOutput = True
|
||||
|
||||
@failfast
|
||||
def addFailure(self, test, err):
|
||||
"""Called when an error has occurred. 'err' is a tuple of values as
|
||||
returned by sys.exc_info()."""
|
||||
self.failures.append((test, self._exc_info_to_string(err, test)))
|
||||
self._mirrorOutput = True
|
||||
|
||||
def addSubTest(self, test, subtest, err):
|
||||
"""Called at the end of a subtest.
|
||||
'err' is None if the subtest ended successfully, otherwise it's a
|
||||
tuple of values as returned by sys.exc_info().
|
||||
"""
|
||||
# By default, we don't do anything with successful subtests, but
|
||||
# more sophisticated test results might want to record them.
|
||||
if err is not None:
|
||||
if getattr(self, 'failfast', False):
|
||||
self.stop()
|
||||
if issubclass(err[0], test.failureException):
|
||||
errors = self.failures
|
||||
else:
|
||||
errors = self.errors
|
||||
errors.append((subtest, self._exc_info_to_string(err, test)))
|
||||
self._mirrorOutput = True
|
||||
|
||||
def addSuccess(self, test):
|
||||
"Called when a test has completed successfully"
|
||||
pass
|
||||
|
||||
def addSkip(self, test, reason):
|
||||
"""Called when a test is skipped."""
|
||||
self.skipped.append((test, reason))
|
||||
|
||||
def addExpectedFailure(self, test, err):
|
||||
"""Called when an expected failure/error occurred."""
|
||||
self.expectedFailures.append(
|
||||
(test, self._exc_info_to_string(err, test)))
|
||||
|
||||
@failfast
|
||||
def addUnexpectedSuccess(self, test):
|
||||
"""Called when a test was expected to fail, but succeed."""
|
||||
self.unexpectedSuccesses.append(test)
|
||||
|
||||
def addDuration(self, test, elapsed):
|
||||
"""Called when a test finished to run, regardless of its outcome.
|
||||
*test* is the test case corresponding to the test method.
|
||||
*elapsed* is the time represented in seconds, and it includes the
|
||||
execution of cleanup functions.
|
||||
"""
|
||||
# support for a TextTestRunner using an old TestResult class
|
||||
if hasattr(self, "collectedDurations"):
|
||||
# Pass test repr and not the test object itself to avoid resources leak
|
||||
self.collectedDurations.append((str(test), elapsed))
|
||||
|
||||
def wasSuccessful(self):
|
||||
"""Tells whether or not this result was a success."""
|
||||
# The hasattr check is for test_result's OldResult test. That
|
||||
# way this method works on objects that lack the attribute.
|
||||
# (where would such result instances come from? old stored pickles?)
|
||||
return ((len(self.failures) == len(self.errors) == 0) and
|
||||
(not hasattr(self, 'unexpectedSuccesses') or
|
||||
len(self.unexpectedSuccesses) == 0))
|
||||
|
||||
def stop(self):
|
||||
"""Indicates that the tests should be aborted."""
|
||||
self.shouldStop = True
|
||||
|
||||
def _exc_info_to_string(self, err, test):
|
||||
"""Converts a sys.exc_info()-style tuple of values into a string."""
|
||||
exctype, value, tb = err
|
||||
tb = self._clean_tracebacks(exctype, value, tb, test)
|
||||
tb_e = traceback.TracebackException(
|
||||
exctype, value, tb,
|
||||
capture_locals=self.tb_locals, compact=True)
|
||||
msgLines = list(tb_e.format())
|
||||
|
||||
if self.buffer:
|
||||
output = sys.stdout.getvalue()
|
||||
error = sys.stderr.getvalue()
|
||||
if output:
|
||||
if not output.endswith('\n'):
|
||||
output += '\n'
|
||||
msgLines.append(STDOUT_LINE % output)
|
||||
if error:
|
||||
if not error.endswith('\n'):
|
||||
error += '\n'
|
||||
msgLines.append(STDERR_LINE % error)
|
||||
return ''.join(msgLines)
|
||||
|
||||
def _clean_tracebacks(self, exctype, value, tb, test):
|
||||
ret = None
|
||||
first = True
|
||||
excs = [(exctype, value, tb)]
|
||||
seen = {id(value)} # Detect loops in chained exceptions.
|
||||
while excs:
|
||||
(exctype, value, tb) = excs.pop()
|
||||
# Skip test runner traceback levels
|
||||
while tb and self._is_relevant_tb_level(tb):
|
||||
tb = tb.tb_next
|
||||
|
||||
# Skip assert*() traceback levels
|
||||
if exctype is test.failureException:
|
||||
self._remove_unittest_tb_frames(tb)
|
||||
|
||||
if first:
|
||||
ret = tb
|
||||
first = False
|
||||
else:
|
||||
value.__traceback__ = tb
|
||||
|
||||
if value is not None:
|
||||
for c in (value.__cause__, value.__context__):
|
||||
if c is not None and id(c) not in seen:
|
||||
excs.append((type(c), c, c.__traceback__))
|
||||
seen.add(id(c))
|
||||
return ret
|
||||
|
||||
def _is_relevant_tb_level(self, tb):
|
||||
return '__unittest' in tb.tb_frame.f_globals
|
||||
|
||||
def _remove_unittest_tb_frames(self, tb):
|
||||
'''Truncates usercode tb at the first unittest frame.
|
||||
|
||||
If the first frame of the traceback is in user code,
|
||||
the prefix up to the first unittest frame is returned.
|
||||
If the first frame is already in the unittest module,
|
||||
the traceback is not modified.
|
||||
'''
|
||||
prev = None
|
||||
while tb and not self._is_relevant_tb_level(tb):
|
||||
prev = tb
|
||||
tb = tb.tb_next
|
||||
if prev is not None:
|
||||
prev.tb_next = None
|
||||
|
||||
def __repr__(self):
|
||||
return ("<%s run=%i errors=%i failures=%i>" %
|
||||
(util.strclass(self.__class__), self.testsRun, len(self.errors),
|
||||
len(self.failures)))
|
||||
292
.CondaPkg/env/Lib/unittest/runner.py
vendored
292
.CondaPkg/env/Lib/unittest/runner.py
vendored
@@ -1,292 +0,0 @@
|
||||
"""Running tests"""
|
||||
|
||||
import sys
|
||||
import time
|
||||
import warnings
|
||||
|
||||
from . import result
|
||||
from .case import _SubTest
|
||||
from .signals import registerResult
|
||||
|
||||
__unittest = True
|
||||
|
||||
|
||||
class _WritelnDecorator(object):
|
||||
"""Used to decorate file-like objects with a handy 'writeln' method"""
|
||||
def __init__(self,stream):
|
||||
self.stream = stream
|
||||
|
||||
def __getattr__(self, attr):
|
||||
if attr in ('stream', '__getstate__'):
|
||||
raise AttributeError(attr)
|
||||
return getattr(self.stream,attr)
|
||||
|
||||
def writeln(self, arg=None):
|
||||
if arg:
|
||||
self.write(arg)
|
||||
self.write('\n') # text-mode streams translate to \r\n if needed
|
||||
|
||||
|
||||
class TextTestResult(result.TestResult):
|
||||
"""A test result class that can print formatted text results to a stream.
|
||||
|
||||
Used by TextTestRunner.
|
||||
"""
|
||||
separator1 = '=' * 70
|
||||
separator2 = '-' * 70
|
||||
|
||||
def __init__(self, stream, descriptions, verbosity, *, durations=None):
|
||||
"""Construct a TextTestResult. Subclasses should accept **kwargs
|
||||
to ensure compatibility as the interface changes."""
|
||||
super(TextTestResult, self).__init__(stream, descriptions, verbosity)
|
||||
self.stream = stream
|
||||
self.showAll = verbosity > 1
|
||||
self.dots = verbosity == 1
|
||||
self.descriptions = descriptions
|
||||
self._newline = True
|
||||
self.durations = durations
|
||||
|
||||
def getDescription(self, test):
|
||||
doc_first_line = test.shortDescription()
|
||||
if self.descriptions and doc_first_line:
|
||||
return '\n'.join((str(test), doc_first_line))
|
||||
else:
|
||||
return str(test)
|
||||
|
||||
def startTest(self, test):
|
||||
super(TextTestResult, self).startTest(test)
|
||||
if self.showAll:
|
||||
self.stream.write(self.getDescription(test))
|
||||
self.stream.write(" ... ")
|
||||
self.stream.flush()
|
||||
self._newline = False
|
||||
|
||||
def _write_status(self, test, status):
|
||||
is_subtest = isinstance(test, _SubTest)
|
||||
if is_subtest or self._newline:
|
||||
if not self._newline:
|
||||
self.stream.writeln()
|
||||
if is_subtest:
|
||||
self.stream.write(" ")
|
||||
self.stream.write(self.getDescription(test))
|
||||
self.stream.write(" ... ")
|
||||
self.stream.writeln(status)
|
||||
self.stream.flush()
|
||||
self._newline = True
|
||||
|
||||
def addSubTest(self, test, subtest, err):
|
||||
if err is not None:
|
||||
if self.showAll:
|
||||
if issubclass(err[0], subtest.failureException):
|
||||
self._write_status(subtest, "FAIL")
|
||||
else:
|
||||
self._write_status(subtest, "ERROR")
|
||||
elif self.dots:
|
||||
if issubclass(err[0], subtest.failureException):
|
||||
self.stream.write('F')
|
||||
else:
|
||||
self.stream.write('E')
|
||||
self.stream.flush()
|
||||
super(TextTestResult, self).addSubTest(test, subtest, err)
|
||||
|
||||
def addSuccess(self, test):
|
||||
super(TextTestResult, self).addSuccess(test)
|
||||
if self.showAll:
|
||||
self._write_status(test, "ok")
|
||||
elif self.dots:
|
||||
self.stream.write('.')
|
||||
self.stream.flush()
|
||||
|
||||
def addError(self, test, err):
|
||||
super(TextTestResult, self).addError(test, err)
|
||||
if self.showAll:
|
||||
self._write_status(test, "ERROR")
|
||||
elif self.dots:
|
||||
self.stream.write('E')
|
||||
self.stream.flush()
|
||||
|
||||
def addFailure(self, test, err):
|
||||
super(TextTestResult, self).addFailure(test, err)
|
||||
if self.showAll:
|
||||
self._write_status(test, "FAIL")
|
||||
elif self.dots:
|
||||
self.stream.write('F')
|
||||
self.stream.flush()
|
||||
|
||||
def addSkip(self, test, reason):
|
||||
super(TextTestResult, self).addSkip(test, reason)
|
||||
if self.showAll:
|
||||
self._write_status(test, "skipped {0!r}".format(reason))
|
||||
elif self.dots:
|
||||
self.stream.write("s")
|
||||
self.stream.flush()
|
||||
|
||||
def addExpectedFailure(self, test, err):
|
||||
super(TextTestResult, self).addExpectedFailure(test, err)
|
||||
if self.showAll:
|
||||
self.stream.writeln("expected failure")
|
||||
self.stream.flush()
|
||||
elif self.dots:
|
||||
self.stream.write("x")
|
||||
self.stream.flush()
|
||||
|
||||
def addUnexpectedSuccess(self, test):
|
||||
super(TextTestResult, self).addUnexpectedSuccess(test)
|
||||
if self.showAll:
|
||||
self.stream.writeln("unexpected success")
|
||||
self.stream.flush()
|
||||
elif self.dots:
|
||||
self.stream.write("u")
|
||||
self.stream.flush()
|
||||
|
||||
def printErrors(self):
|
||||
if self.dots or self.showAll:
|
||||
self.stream.writeln()
|
||||
self.stream.flush()
|
||||
self.printErrorList('ERROR', self.errors)
|
||||
self.printErrorList('FAIL', self.failures)
|
||||
unexpectedSuccesses = getattr(self, 'unexpectedSuccesses', ())
|
||||
if unexpectedSuccesses:
|
||||
self.stream.writeln(self.separator1)
|
||||
for test in unexpectedSuccesses:
|
||||
self.stream.writeln(f"UNEXPECTED SUCCESS: {self.getDescription(test)}")
|
||||
self.stream.flush()
|
||||
|
||||
def printErrorList(self, flavour, errors):
|
||||
for test, err in errors:
|
||||
self.stream.writeln(self.separator1)
|
||||
self.stream.writeln("%s: %s" % (flavour,self.getDescription(test)))
|
||||
self.stream.writeln(self.separator2)
|
||||
self.stream.writeln("%s" % err)
|
||||
self.stream.flush()
|
||||
|
||||
|
||||
class TextTestRunner(object):
|
||||
"""A test runner class that displays results in textual form.
|
||||
|
||||
It prints out the names of tests as they are run, errors as they
|
||||
occur, and a summary of the results at the end of the test run.
|
||||
"""
|
||||
resultclass = TextTestResult
|
||||
|
||||
def __init__(self, stream=None, descriptions=True, verbosity=1,
|
||||
failfast=False, buffer=False, resultclass=None, warnings=None,
|
||||
*, tb_locals=False, durations=None):
|
||||
"""Construct a TextTestRunner.
|
||||
|
||||
Subclasses should accept **kwargs to ensure compatibility as the
|
||||
interface changes.
|
||||
"""
|
||||
if stream is None:
|
||||
stream = sys.stderr
|
||||
self.stream = _WritelnDecorator(stream)
|
||||
self.descriptions = descriptions
|
||||
self.verbosity = verbosity
|
||||
self.failfast = failfast
|
||||
self.buffer = buffer
|
||||
self.tb_locals = tb_locals
|
||||
self.durations = durations
|
||||
self.warnings = warnings
|
||||
if resultclass is not None:
|
||||
self.resultclass = resultclass
|
||||
|
||||
def _makeResult(self):
|
||||
try:
|
||||
return self.resultclass(self.stream, self.descriptions,
|
||||
self.verbosity, durations=self.durations)
|
||||
except TypeError:
|
||||
# didn't accept the durations argument
|
||||
return self.resultclass(self.stream, self.descriptions,
|
||||
self.verbosity)
|
||||
|
||||
def _printDurations(self, result):
|
||||
if not result.collectedDurations:
|
||||
return
|
||||
ls = sorted(result.collectedDurations, key=lambda x: x[1],
|
||||
reverse=True)
|
||||
if self.durations > 0:
|
||||
ls = ls[:self.durations]
|
||||
self.stream.writeln("Slowest test durations")
|
||||
if hasattr(result, 'separator2'):
|
||||
self.stream.writeln(result.separator2)
|
||||
hidden = False
|
||||
for test, elapsed in ls:
|
||||
if self.verbosity < 2 and elapsed < 0.001:
|
||||
hidden = True
|
||||
continue
|
||||
self.stream.writeln("%-10s %s" % ("%.3fs" % elapsed, test))
|
||||
if hidden:
|
||||
self.stream.writeln("\n(durations < 0.001s were hidden; "
|
||||
"use -v to show these durations)")
|
||||
else:
|
||||
self.stream.writeln("")
|
||||
|
||||
def run(self, test):
|
||||
"Run the given test case or test suite."
|
||||
result = self._makeResult()
|
||||
registerResult(result)
|
||||
result.failfast = self.failfast
|
||||
result.buffer = self.buffer
|
||||
result.tb_locals = self.tb_locals
|
||||
with warnings.catch_warnings():
|
||||
if self.warnings:
|
||||
# if self.warnings is set, use it to filter all the warnings
|
||||
warnings.simplefilter(self.warnings)
|
||||
startTime = time.perf_counter()
|
||||
startTestRun = getattr(result, 'startTestRun', None)
|
||||
if startTestRun is not None:
|
||||
startTestRun()
|
||||
try:
|
||||
test(result)
|
||||
finally:
|
||||
stopTestRun = getattr(result, 'stopTestRun', None)
|
||||
if stopTestRun is not None:
|
||||
stopTestRun()
|
||||
stopTime = time.perf_counter()
|
||||
timeTaken = stopTime - startTime
|
||||
result.printErrors()
|
||||
if self.durations is not None:
|
||||
self._printDurations(result)
|
||||
|
||||
if hasattr(result, 'separator2'):
|
||||
self.stream.writeln(result.separator2)
|
||||
|
||||
run = result.testsRun
|
||||
self.stream.writeln("Ran %d test%s in %.3fs" %
|
||||
(run, run != 1 and "s" or "", timeTaken))
|
||||
self.stream.writeln()
|
||||
|
||||
expectedFails = unexpectedSuccesses = skipped = 0
|
||||
try:
|
||||
results = map(len, (result.expectedFailures,
|
||||
result.unexpectedSuccesses,
|
||||
result.skipped))
|
||||
except AttributeError:
|
||||
pass
|
||||
else:
|
||||
expectedFails, unexpectedSuccesses, skipped = results
|
||||
|
||||
infos = []
|
||||
if not result.wasSuccessful():
|
||||
self.stream.write("FAILED")
|
||||
failed, errored = len(result.failures), len(result.errors)
|
||||
if failed:
|
||||
infos.append("failures=%d" % failed)
|
||||
if errored:
|
||||
infos.append("errors=%d" % errored)
|
||||
elif run == 0:
|
||||
self.stream.write("NO TESTS RAN")
|
||||
else:
|
||||
self.stream.write("OK")
|
||||
if skipped:
|
||||
infos.append("skipped=%d" % skipped)
|
||||
if expectedFails:
|
||||
infos.append("expected failures=%d" % expectedFails)
|
||||
if unexpectedSuccesses:
|
||||
infos.append("unexpected successes=%d" % unexpectedSuccesses)
|
||||
if infos:
|
||||
self.stream.writeln(" (%s)" % (", ".join(infos),))
|
||||
else:
|
||||
self.stream.write("\n")
|
||||
self.stream.flush()
|
||||
return result
|
||||
71
.CondaPkg/env/Lib/unittest/signals.py
vendored
71
.CondaPkg/env/Lib/unittest/signals.py
vendored
@@ -1,71 +0,0 @@
|
||||
import signal
|
||||
import weakref
|
||||
|
||||
from functools import wraps
|
||||
|
||||
__unittest = True
|
||||
|
||||
|
||||
class _InterruptHandler(object):
|
||||
def __init__(self, default_handler):
|
||||
self.called = False
|
||||
self.original_handler = default_handler
|
||||
if isinstance(default_handler, int):
|
||||
if default_handler == signal.SIG_DFL:
|
||||
# Pretend it's signal.default_int_handler instead.
|
||||
default_handler = signal.default_int_handler
|
||||
elif default_handler == signal.SIG_IGN:
|
||||
# Not quite the same thing as SIG_IGN, but the closest we
|
||||
# can make it: do nothing.
|
||||
def default_handler(unused_signum, unused_frame):
|
||||
pass
|
||||
else:
|
||||
raise TypeError("expected SIGINT signal handler to be "
|
||||
"signal.SIG_IGN, signal.SIG_DFL, or a "
|
||||
"callable object")
|
||||
self.default_handler = default_handler
|
||||
|
||||
def __call__(self, signum, frame):
|
||||
installed_handler = signal.getsignal(signal.SIGINT)
|
||||
if installed_handler is not self:
|
||||
# if we aren't the installed handler, then delegate immediately
|
||||
# to the default handler
|
||||
self.default_handler(signum, frame)
|
||||
|
||||
if self.called:
|
||||
self.default_handler(signum, frame)
|
||||
self.called = True
|
||||
for result in _results.keys():
|
||||
result.stop()
|
||||
|
||||
_results = weakref.WeakKeyDictionary()
|
||||
def registerResult(result):
|
||||
_results[result] = 1
|
||||
|
||||
def removeResult(result):
|
||||
return bool(_results.pop(result, None))
|
||||
|
||||
_interrupt_handler = None
|
||||
def installHandler():
|
||||
global _interrupt_handler
|
||||
if _interrupt_handler is None:
|
||||
default_handler = signal.getsignal(signal.SIGINT)
|
||||
_interrupt_handler = _InterruptHandler(default_handler)
|
||||
signal.signal(signal.SIGINT, _interrupt_handler)
|
||||
|
||||
|
||||
def removeHandler(method=None):
|
||||
if method is not None:
|
||||
@wraps(method)
|
||||
def inner(*args, **kwargs):
|
||||
initial = signal.getsignal(signal.SIGINT)
|
||||
removeHandler()
|
||||
try:
|
||||
return method(*args, **kwargs)
|
||||
finally:
|
||||
signal.signal(signal.SIGINT, initial)
|
||||
return inner
|
||||
|
||||
global _interrupt_handler
|
||||
if _interrupt_handler is not None:
|
||||
signal.signal(signal.SIGINT, _interrupt_handler.original_handler)
|
||||
379
.CondaPkg/env/Lib/unittest/suite.py
vendored
379
.CondaPkg/env/Lib/unittest/suite.py
vendored
@@ -1,379 +0,0 @@
|
||||
"""TestSuite"""
|
||||
|
||||
import sys
|
||||
|
||||
from . import case
|
||||
from . import util
|
||||
|
||||
__unittest = True
|
||||
|
||||
|
||||
def _call_if_exists(parent, attr):
|
||||
func = getattr(parent, attr, lambda: None)
|
||||
func()
|
||||
|
||||
|
||||
class BaseTestSuite(object):
|
||||
"""A simple test suite that doesn't provide class or module shared fixtures.
|
||||
"""
|
||||
_cleanup = True
|
||||
|
||||
def __init__(self, tests=()):
|
||||
self._tests = []
|
||||
self._removed_tests = 0
|
||||
self.addTests(tests)
|
||||
|
||||
def __repr__(self):
|
||||
return "<%s tests=%s>" % (util.strclass(self.__class__), list(self))
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, self.__class__):
|
||||
return NotImplemented
|
||||
return list(self) == list(other)
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self._tests)
|
||||
|
||||
def countTestCases(self):
|
||||
cases = self._removed_tests
|
||||
for test in self:
|
||||
if test:
|
||||
cases += test.countTestCases()
|
||||
return cases
|
||||
|
||||
def addTest(self, test):
|
||||
# sanity checks
|
||||
if not callable(test):
|
||||
raise TypeError("{} is not callable".format(repr(test)))
|
||||
if isinstance(test, type) and issubclass(test,
|
||||
(case.TestCase, TestSuite)):
|
||||
raise TypeError("TestCases and TestSuites must be instantiated "
|
||||
"before passing them to addTest()")
|
||||
self._tests.append(test)
|
||||
|
||||
def addTests(self, tests):
|
||||
if isinstance(tests, str):
|
||||
raise TypeError("tests must be an iterable of tests, not a string")
|
||||
for test in tests:
|
||||
self.addTest(test)
|
||||
|
||||
def run(self, result):
|
||||
for index, test in enumerate(self):
|
||||
if result.shouldStop:
|
||||
break
|
||||
test(result)
|
||||
if self._cleanup:
|
||||
self._removeTestAtIndex(index)
|
||||
return result
|
||||
|
||||
def _removeTestAtIndex(self, index):
|
||||
"""Stop holding a reference to the TestCase at index."""
|
||||
try:
|
||||
test = self._tests[index]
|
||||
except TypeError:
|
||||
# support for suite implementations that have overridden self._tests
|
||||
pass
|
||||
else:
|
||||
# Some unittest tests add non TestCase/TestSuite objects to
|
||||
# the suite.
|
||||
if hasattr(test, 'countTestCases'):
|
||||
self._removed_tests += test.countTestCases()
|
||||
self._tests[index] = None
|
||||
|
||||
def __call__(self, *args, **kwds):
|
||||
return self.run(*args, **kwds)
|
||||
|
||||
def debug(self):
|
||||
"""Run the tests without collecting errors in a TestResult"""
|
||||
for test in self:
|
||||
test.debug()
|
||||
|
||||
|
||||
class TestSuite(BaseTestSuite):
|
||||
"""A test suite is a composite test consisting of a number of TestCases.
|
||||
|
||||
For use, create an instance of TestSuite, then add test case instances.
|
||||
When all tests have been added, the suite can be passed to a test
|
||||
runner, such as TextTestRunner. It will run the individual test cases
|
||||
in the order in which they were added, aggregating the results. When
|
||||
subclassing, do not forget to call the base class constructor.
|
||||
"""
|
||||
|
||||
def run(self, result, debug=False):
|
||||
topLevel = False
|
||||
if getattr(result, '_testRunEntered', False) is False:
|
||||
result._testRunEntered = topLevel = True
|
||||
|
||||
for index, test in enumerate(self):
|
||||
if result.shouldStop:
|
||||
break
|
||||
|
||||
if _isnotsuite(test):
|
||||
self._tearDownPreviousClass(test, result)
|
||||
self._handleModuleFixture(test, result)
|
||||
self._handleClassSetUp(test, result)
|
||||
result._previousTestClass = test.__class__
|
||||
|
||||
if (getattr(test.__class__, '_classSetupFailed', False) or
|
||||
getattr(result, '_moduleSetUpFailed', False)):
|
||||
continue
|
||||
|
||||
if not debug:
|
||||
test(result)
|
||||
else:
|
||||
test.debug()
|
||||
|
||||
if self._cleanup:
|
||||
self._removeTestAtIndex(index)
|
||||
|
||||
if topLevel:
|
||||
self._tearDownPreviousClass(None, result)
|
||||
self._handleModuleTearDown(result)
|
||||
result._testRunEntered = False
|
||||
return result
|
||||
|
||||
def debug(self):
|
||||
"""Run the tests without collecting errors in a TestResult"""
|
||||
debug = _DebugResult()
|
||||
self.run(debug, True)
|
||||
|
||||
################################
|
||||
|
||||
def _handleClassSetUp(self, test, result):
|
||||
previousClass = getattr(result, '_previousTestClass', None)
|
||||
currentClass = test.__class__
|
||||
if currentClass == previousClass:
|
||||
return
|
||||
if result._moduleSetUpFailed:
|
||||
return
|
||||
if getattr(currentClass, "__unittest_skip__", False):
|
||||
return
|
||||
|
||||
failed = False
|
||||
try:
|
||||
currentClass._classSetupFailed = False
|
||||
except TypeError:
|
||||
# test may actually be a function
|
||||
# so its class will be a builtin-type
|
||||
pass
|
||||
|
||||
setUpClass = getattr(currentClass, 'setUpClass', None)
|
||||
doClassCleanups = getattr(currentClass, 'doClassCleanups', None)
|
||||
if setUpClass is not None:
|
||||
_call_if_exists(result, '_setupStdout')
|
||||
try:
|
||||
try:
|
||||
setUpClass()
|
||||
except Exception as e:
|
||||
if isinstance(result, _DebugResult):
|
||||
raise
|
||||
failed = True
|
||||
try:
|
||||
currentClass._classSetupFailed = True
|
||||
except TypeError:
|
||||
pass
|
||||
className = util.strclass(currentClass)
|
||||
self._createClassOrModuleLevelException(result, e,
|
||||
'setUpClass',
|
||||
className)
|
||||
if failed and doClassCleanups is not None:
|
||||
doClassCleanups()
|
||||
for exc_info in currentClass.tearDown_exceptions:
|
||||
self._createClassOrModuleLevelException(
|
||||
result, exc_info[1], 'setUpClass', className,
|
||||
info=exc_info)
|
||||
finally:
|
||||
_call_if_exists(result, '_restoreStdout')
|
||||
|
||||
def _get_previous_module(self, result):
|
||||
previousModule = None
|
||||
previousClass = getattr(result, '_previousTestClass', None)
|
||||
if previousClass is not None:
|
||||
previousModule = previousClass.__module__
|
||||
return previousModule
|
||||
|
||||
|
||||
def _handleModuleFixture(self, test, result):
|
||||
previousModule = self._get_previous_module(result)
|
||||
currentModule = test.__class__.__module__
|
||||
if currentModule == previousModule:
|
||||
return
|
||||
|
||||
self._handleModuleTearDown(result)
|
||||
|
||||
|
||||
result._moduleSetUpFailed = False
|
||||
try:
|
||||
module = sys.modules[currentModule]
|
||||
except KeyError:
|
||||
return
|
||||
setUpModule = getattr(module, 'setUpModule', None)
|
||||
if setUpModule is not None:
|
||||
_call_if_exists(result, '_setupStdout')
|
||||
try:
|
||||
try:
|
||||
setUpModule()
|
||||
except Exception as e:
|
||||
if isinstance(result, _DebugResult):
|
||||
raise
|
||||
result._moduleSetUpFailed = True
|
||||
self._createClassOrModuleLevelException(result, e,
|
||||
'setUpModule',
|
||||
currentModule)
|
||||
if result._moduleSetUpFailed:
|
||||
try:
|
||||
case.doModuleCleanups()
|
||||
except Exception as e:
|
||||
self._createClassOrModuleLevelException(result, e,
|
||||
'setUpModule',
|
||||
currentModule)
|
||||
finally:
|
||||
_call_if_exists(result, '_restoreStdout')
|
||||
|
||||
def _createClassOrModuleLevelException(self, result, exc, method_name,
|
||||
parent, info=None):
|
||||
errorName = f'{method_name} ({parent})'
|
||||
self._addClassOrModuleLevelException(result, exc, errorName, info)
|
||||
|
||||
def _addClassOrModuleLevelException(self, result, exception, errorName,
|
||||
info=None):
|
||||
error = _ErrorHolder(errorName)
|
||||
addSkip = getattr(result, 'addSkip', None)
|
||||
if addSkip is not None and isinstance(exception, case.SkipTest):
|
||||
addSkip(error, str(exception))
|
||||
else:
|
||||
if not info:
|
||||
result.addError(error, sys.exc_info())
|
||||
else:
|
||||
result.addError(error, info)
|
||||
|
||||
def _handleModuleTearDown(self, result):
|
||||
previousModule = self._get_previous_module(result)
|
||||
if previousModule is None:
|
||||
return
|
||||
if result._moduleSetUpFailed:
|
||||
return
|
||||
|
||||
try:
|
||||
module = sys.modules[previousModule]
|
||||
except KeyError:
|
||||
return
|
||||
|
||||
_call_if_exists(result, '_setupStdout')
|
||||
try:
|
||||
tearDownModule = getattr(module, 'tearDownModule', None)
|
||||
if tearDownModule is not None:
|
||||
try:
|
||||
tearDownModule()
|
||||
except Exception as e:
|
||||
if isinstance(result, _DebugResult):
|
||||
raise
|
||||
self._createClassOrModuleLevelException(result, e,
|
||||
'tearDownModule',
|
||||
previousModule)
|
||||
try:
|
||||
case.doModuleCleanups()
|
||||
except Exception as e:
|
||||
if isinstance(result, _DebugResult):
|
||||
raise
|
||||
self._createClassOrModuleLevelException(result, e,
|
||||
'tearDownModule',
|
||||
previousModule)
|
||||
finally:
|
||||
_call_if_exists(result, '_restoreStdout')
|
||||
|
||||
def _tearDownPreviousClass(self, test, result):
|
||||
previousClass = getattr(result, '_previousTestClass', None)
|
||||
currentClass = test.__class__
|
||||
if currentClass == previousClass or previousClass is None:
|
||||
return
|
||||
if getattr(previousClass, '_classSetupFailed', False):
|
||||
return
|
||||
if getattr(result, '_moduleSetUpFailed', False):
|
||||
return
|
||||
if getattr(previousClass, "__unittest_skip__", False):
|
||||
return
|
||||
|
||||
tearDownClass = getattr(previousClass, 'tearDownClass', None)
|
||||
doClassCleanups = getattr(previousClass, 'doClassCleanups', None)
|
||||
if tearDownClass is None and doClassCleanups is None:
|
||||
return
|
||||
|
||||
_call_if_exists(result, '_setupStdout')
|
||||
try:
|
||||
if tearDownClass is not None:
|
||||
try:
|
||||
tearDownClass()
|
||||
except Exception as e:
|
||||
if isinstance(result, _DebugResult):
|
||||
raise
|
||||
className = util.strclass(previousClass)
|
||||
self._createClassOrModuleLevelException(result, e,
|
||||
'tearDownClass',
|
||||
className)
|
||||
if doClassCleanups is not None:
|
||||
doClassCleanups()
|
||||
for exc_info in previousClass.tearDown_exceptions:
|
||||
if isinstance(result, _DebugResult):
|
||||
raise exc_info[1]
|
||||
className = util.strclass(previousClass)
|
||||
self._createClassOrModuleLevelException(result, exc_info[1],
|
||||
'tearDownClass',
|
||||
className,
|
||||
info=exc_info)
|
||||
finally:
|
||||
_call_if_exists(result, '_restoreStdout')
|
||||
|
||||
|
||||
class _ErrorHolder(object):
|
||||
"""
|
||||
Placeholder for a TestCase inside a result. As far as a TestResult
|
||||
is concerned, this looks exactly like a unit test. Used to insert
|
||||
arbitrary errors into a test suite run.
|
||||
"""
|
||||
# Inspired by the ErrorHolder from Twisted:
|
||||
# http://twistedmatrix.com/trac/browser/trunk/twisted/trial/runner.py
|
||||
|
||||
# attribute used by TestResult._exc_info_to_string
|
||||
failureException = None
|
||||
|
||||
def __init__(self, description):
|
||||
self.description = description
|
||||
|
||||
def id(self):
|
||||
return self.description
|
||||
|
||||
def shortDescription(self):
|
||||
return None
|
||||
|
||||
def __repr__(self):
|
||||
return "<ErrorHolder description=%r>" % (self.description,)
|
||||
|
||||
def __str__(self):
|
||||
return self.id()
|
||||
|
||||
def run(self, result):
|
||||
# could call result.addError(...) - but this test-like object
|
||||
# shouldn't be run anyway
|
||||
pass
|
||||
|
||||
def __call__(self, result):
|
||||
return self.run(result)
|
||||
|
||||
def countTestCases(self):
|
||||
return 0
|
||||
|
||||
def _isnotsuite(test):
|
||||
"A crude way to tell apart testcases and suites with duck-typing"
|
||||
try:
|
||||
iter(test)
|
||||
except TypeError:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class _DebugResult(object):
|
||||
"Used by the TestSuite to hold previous class when running in debug."
|
||||
_previousTestClass = None
|
||||
_moduleSetUpFailed = False
|
||||
shouldStop = False
|
||||
170
.CondaPkg/env/Lib/unittest/util.py
vendored
170
.CondaPkg/env/Lib/unittest/util.py
vendored
@@ -1,170 +0,0 @@
|
||||
"""Various utility functions."""
|
||||
|
||||
from collections import namedtuple, Counter
|
||||
from os.path import commonprefix
|
||||
|
||||
__unittest = True
|
||||
|
||||
_MAX_LENGTH = 80
|
||||
_PLACEHOLDER_LEN = 12
|
||||
_MIN_BEGIN_LEN = 5
|
||||
_MIN_END_LEN = 5
|
||||
_MIN_COMMON_LEN = 5
|
||||
_MIN_DIFF_LEN = _MAX_LENGTH - \
|
||||
(_MIN_BEGIN_LEN + _PLACEHOLDER_LEN + _MIN_COMMON_LEN +
|
||||
_PLACEHOLDER_LEN + _MIN_END_LEN)
|
||||
assert _MIN_DIFF_LEN >= 0
|
||||
|
||||
def _shorten(s, prefixlen, suffixlen):
|
||||
skip = len(s) - prefixlen - suffixlen
|
||||
if skip > _PLACEHOLDER_LEN:
|
||||
s = '%s[%d chars]%s' % (s[:prefixlen], skip, s[len(s) - suffixlen:])
|
||||
return s
|
||||
|
||||
def _common_shorten_repr(*args):
|
||||
args = tuple(map(safe_repr, args))
|
||||
maxlen = max(map(len, args))
|
||||
if maxlen <= _MAX_LENGTH:
|
||||
return args
|
||||
|
||||
prefix = commonprefix(args)
|
||||
prefixlen = len(prefix)
|
||||
|
||||
common_len = _MAX_LENGTH - \
|
||||
(maxlen - prefixlen + _MIN_BEGIN_LEN + _PLACEHOLDER_LEN)
|
||||
if common_len > _MIN_COMMON_LEN:
|
||||
assert _MIN_BEGIN_LEN + _PLACEHOLDER_LEN + _MIN_COMMON_LEN + \
|
||||
(maxlen - prefixlen) < _MAX_LENGTH
|
||||
prefix = _shorten(prefix, _MIN_BEGIN_LEN, common_len)
|
||||
return tuple(prefix + s[prefixlen:] for s in args)
|
||||
|
||||
prefix = _shorten(prefix, _MIN_BEGIN_LEN, _MIN_COMMON_LEN)
|
||||
return tuple(prefix + _shorten(s[prefixlen:], _MIN_DIFF_LEN, _MIN_END_LEN)
|
||||
for s in args)
|
||||
|
||||
def safe_repr(obj, short=False):
|
||||
try:
|
||||
result = repr(obj)
|
||||
except Exception:
|
||||
result = object.__repr__(obj)
|
||||
if not short or len(result) < _MAX_LENGTH:
|
||||
return result
|
||||
return result[:_MAX_LENGTH] + ' [truncated]...'
|
||||
|
||||
def strclass(cls):
|
||||
return "%s.%s" % (cls.__module__, cls.__qualname__)
|
||||
|
||||
def sorted_list_difference(expected, actual):
|
||||
"""Finds elements in only one or the other of two, sorted input lists.
|
||||
|
||||
Returns a two-element tuple of lists. The first list contains those
|
||||
elements in the "expected" list but not in the "actual" list, and the
|
||||
second contains those elements in the "actual" list but not in the
|
||||
"expected" list. Duplicate elements in either input list are ignored.
|
||||
"""
|
||||
i = j = 0
|
||||
missing = []
|
||||
unexpected = []
|
||||
while True:
|
||||
try:
|
||||
e = expected[i]
|
||||
a = actual[j]
|
||||
if e < a:
|
||||
missing.append(e)
|
||||
i += 1
|
||||
while expected[i] == e:
|
||||
i += 1
|
||||
elif e > a:
|
||||
unexpected.append(a)
|
||||
j += 1
|
||||
while actual[j] == a:
|
||||
j += 1
|
||||
else:
|
||||
i += 1
|
||||
try:
|
||||
while expected[i] == e:
|
||||
i += 1
|
||||
finally:
|
||||
j += 1
|
||||
while actual[j] == a:
|
||||
j += 1
|
||||
except IndexError:
|
||||
missing.extend(expected[i:])
|
||||
unexpected.extend(actual[j:])
|
||||
break
|
||||
return missing, unexpected
|
||||
|
||||
|
||||
def unorderable_list_difference(expected, actual):
|
||||
"""Same behavior as sorted_list_difference but
|
||||
for lists of unorderable items (like dicts).
|
||||
|
||||
As it does a linear search per item (remove) it
|
||||
has O(n*n) performance."""
|
||||
missing = []
|
||||
while expected:
|
||||
item = expected.pop()
|
||||
try:
|
||||
actual.remove(item)
|
||||
except ValueError:
|
||||
missing.append(item)
|
||||
|
||||
# anything left in actual is unexpected
|
||||
return missing, actual
|
||||
|
||||
def three_way_cmp(x, y):
|
||||
"""Return -1 if x < y, 0 if x == y and 1 if x > y"""
|
||||
return (x > y) - (x < y)
|
||||
|
||||
_Mismatch = namedtuple('Mismatch', 'actual expected value')
|
||||
|
||||
def _count_diff_all_purpose(actual, expected):
|
||||
'Returns list of (cnt_act, cnt_exp, elem) triples where the counts differ'
|
||||
# elements need not be hashable
|
||||
s, t = list(actual), list(expected)
|
||||
m, n = len(s), len(t)
|
||||
NULL = object()
|
||||
result = []
|
||||
for i, elem in enumerate(s):
|
||||
if elem is NULL:
|
||||
continue
|
||||
cnt_s = cnt_t = 0
|
||||
for j in range(i, m):
|
||||
if s[j] == elem:
|
||||
cnt_s += 1
|
||||
s[j] = NULL
|
||||
for j, other_elem in enumerate(t):
|
||||
if other_elem == elem:
|
||||
cnt_t += 1
|
||||
t[j] = NULL
|
||||
if cnt_s != cnt_t:
|
||||
diff = _Mismatch(cnt_s, cnt_t, elem)
|
||||
result.append(diff)
|
||||
|
||||
for i, elem in enumerate(t):
|
||||
if elem is NULL:
|
||||
continue
|
||||
cnt_t = 0
|
||||
for j in range(i, n):
|
||||
if t[j] == elem:
|
||||
cnt_t += 1
|
||||
t[j] = NULL
|
||||
diff = _Mismatch(0, cnt_t, elem)
|
||||
result.append(diff)
|
||||
return result
|
||||
|
||||
def _count_diff_hashable(actual, expected):
|
||||
'Returns list of (cnt_act, cnt_exp, elem) triples where the counts differ'
|
||||
# elements must be hashable
|
||||
s, t = Counter(actual), Counter(expected)
|
||||
result = []
|
||||
for elem, cnt_s in s.items():
|
||||
cnt_t = t.get(elem, 0)
|
||||
if cnt_s != cnt_t:
|
||||
diff = _Mismatch(cnt_s, cnt_t, elem)
|
||||
result.append(diff)
|
||||
for elem, cnt_t in t.items():
|
||||
if elem not in s:
|
||||
diff = _Mismatch(0, cnt_t, elem)
|
||||
result.append(diff)
|
||||
return result
|
||||
Reference in New Issue
Block a user