This commit is contained in:
ton
2023-10-05 03:11:24 +07:00
parent d76e4b2916
commit aa1631742f
5451 changed files with 919206 additions and 1 deletions

2541
.CondaPkg/env/Lib/test/support/__init__.py vendored Normal file

File diff suppressed because it is too large Load Diff

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,111 @@
from enum import Enum
import functools
import unittest
__all__ = [
"given",
"example",
"assume",
"reject",
"register_random",
"strategies",
"HealthCheck",
"settings",
"Verbosity",
]
from . import strategies
def given(*_args, **_kwargs):
def decorator(f):
if examples := getattr(f, "_examples", []):
@functools.wraps(f)
def test_function(self):
for example_args, example_kwargs in examples:
with self.subTest(*example_args, **example_kwargs):
f(self, *example_args, **example_kwargs)
else:
# If we have found no examples, we must skip the test. If @example
# is applied after @given, it will re-wrap the test to remove the
# skip decorator.
test_function = unittest.skip(
"Hypothesis required for property test with no " +
"specified examples"
)(f)
test_function._given = True
return test_function
return decorator
def example(*args, **kwargs):
if bool(args) == bool(kwargs):
raise ValueError("Must specify exactly one of *args or **kwargs")
def decorator(f):
base_func = getattr(f, "__wrapped__", f)
if not hasattr(base_func, "_examples"):
base_func._examples = []
base_func._examples.append((args, kwargs))
if getattr(f, "_given", False):
# If the given decorator is below all the example decorators,
# it would be erroneously skipped, so we need to re-wrap the new
# base function.
f = given()(base_func)
return f
return decorator
def assume(condition):
if not condition:
raise unittest.SkipTest("Unsatisfied assumption")
return True
def reject():
assume(False)
def register_random(*args, **kwargs):
pass # pragma: no cover
def settings(*args, **kwargs):
return lambda f: f # pragma: nocover
class HealthCheck(Enum):
data_too_large = 1
filter_too_much = 2
too_slow = 3
return_value = 5
large_base_example = 7
not_a_test_method = 8
@classmethod
def all(cls):
return list(cls)
class Verbosity(Enum):
quiet = 0
normal = 1
verbose = 2
debug = 3
class Phase(Enum):
explicit = 0
reuse = 1
generate = 2
target = 3
shrink = 4
explain = 5

View File

@@ -0,0 +1,43 @@
# Stub out only the subset of the interface that we actually use in our tests.
class StubClass:
def __init__(self, *args, **kwargs):
self.__stub_args = args
self.__stub_kwargs = kwargs
self.__repr = None
def _with_repr(self, new_repr):
new_obj = self.__class__(*self.__stub_args, **self.__stub_kwargs)
new_obj.__repr = new_repr
return new_obj
def __repr__(self):
if self.__repr is not None:
return self.__repr
argstr = ", ".join(self.__stub_args)
kwargstr = ", ".join(f"{kw}={val}" for kw, val in self.__stub_kwargs.items())
in_parens = argstr
if kwargstr:
in_parens += ", " + kwargstr
return f"{self.__class__.__qualname__}({in_parens})"
def stub_factory(klass, name, *, with_repr=None, _seen={}):
if (klass, name) not in _seen:
class Stub(klass):
def __init__(self, *args, **kwargs):
super().__init__()
self.__stub_args = args
self.__stub_kwargs = kwargs
Stub.__name__ = name
Stub.__qualname__ = name
if with_repr is not None:
Stub._repr = None
_seen.setdefault((klass, name, with_repr), Stub)
return _seen[(klass, name, with_repr)]

View File

@@ -0,0 +1,91 @@
import functools
from ._helpers import StubClass, stub_factory
class StubStrategy(StubClass):
def __make_trailing_repr(self, transformation_name, func):
func_name = func.__name__ or repr(func)
return f"{self!r}.{transformation_name}({func_name})"
def map(self, pack):
return self._with_repr(self.__make_trailing_repr("map", pack))
def flatmap(self, expand):
return self._with_repr(self.__make_trailing_repr("flatmap", expand))
def filter(self, condition):
return self._with_repr(self.__make_trailing_repr("filter", condition))
def __or__(self, other):
new_repr = f"one_of({self!r}, {other!r})"
return self._with_repr(new_repr)
_STRATEGIES = {
"binary",
"booleans",
"builds",
"characters",
"complex_numbers",
"composite",
"data",
"dates",
"datetimes",
"decimals",
"deferred",
"dictionaries",
"emails",
"fixed_dictionaries",
"floats",
"fractions",
"from_regex",
"from_type",
"frozensets",
"functions",
"integers",
"iterables",
"just",
"lists",
"none",
"nothing",
"one_of",
"permutations",
"random_module",
"randoms",
"recursive",
"register_type_strategy",
"runner",
"sampled_from",
"sets",
"shared",
"slices",
"timedeltas",
"times",
"text",
"tuples",
"uuids",
}
__all__ = sorted(_STRATEGIES)
def composite(f):
strategy = stub_factory(StubStrategy, f.__name__)
@functools.wraps(f)
def inner(*args, **kwargs):
return strategy(*args, **kwargs)
return inner
def __getattr__(name):
if name not in _STRATEGIES:
raise AttributeError(f"Unknown attribute {name}")
return stub_factory(StubStrategy, f"hypothesis.strategies.{name}")
def __dir__():
return __all__

View File

@@ -0,0 +1,43 @@
import ast
class ASTTestMixin:
"""Test mixing to have basic assertions for AST nodes."""
def assertASTEqual(self, ast1, ast2):
# Ensure the comparisons start at an AST node
self.assertIsInstance(ast1, ast.AST)
self.assertIsInstance(ast2, ast.AST)
# An AST comparison routine modeled after ast.dump(), but
# instead of string building, it traverses the two trees
# in lock-step.
def traverse_compare(a, b, missing=object()):
if type(a) is not type(b):
self.fail(f"{type(a)!r} is not {type(b)!r}")
if isinstance(a, ast.AST):
for field in a._fields:
value1 = getattr(a, field, missing)
value2 = getattr(b, field, missing)
# Singletons are equal by definition, so further
# testing can be skipped.
if value1 is not value2:
traverse_compare(value1, value2)
elif isinstance(a, list):
try:
for node1, node2 in zip(a, b, strict=True):
traverse_compare(node1, node2)
except ValueError:
# Attempt a "pretty" error ala assertSequenceEqual()
len1 = len(a)
len2 = len(b)
if len1 > len2:
what = "First"
diff = len1 - len2
else:
what = "Second"
diff = len2 - len1
msg = f"{what} list contains {diff} additional elements."
raise self.failureException(msg) from None
elif a != b:
self.fail(f"{a!r} != {b!r}")
traverse_compare(ast1, ast2)

View File

@@ -0,0 +1,314 @@
# TODO: This module was deprecated and removed from CPython 3.12
# Now it is a test-only helper. Any attempts to rewrite exising tests that
# are using this module and remove it completely are appreciated!
# See: https://github.com/python/cpython/issues/72719
# -*- Mode: Python; tab-width: 4 -*-
# Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp
# Author: Sam Rushing <rushing@nightmare.com>
# ======================================================================
# Copyright 1996 by Sam Rushing
#
# All Rights Reserved
#
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose and without fee is hereby
# granted, provided that the above copyright notice appear in all
# copies and that both that copyright notice and this permission
# notice appear in supporting documentation, and that the name of Sam
# Rushing not be used in advertising or publicity pertaining to
# distribution of the software without specific, written prior
# permission.
#
# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# ======================================================================
r"""A class supporting chat-style (command/response) protocols.
This class adds support for 'chat' style protocols - where one side
sends a 'command', and the other sends a response (examples would be
the common internet protocols - smtp, nntp, ftp, etc..).
The handle_read() method looks at the input stream for the current
'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n'
for multi-line output), calling self.found_terminator() on its
receipt.
for example:
Say you build an async nntp client using this class. At the start
of the connection, you'll have self.terminator set to '\r\n', in
order to process the single-line greeting. Just before issuing a
'LIST' command you'll set it to '\r\n.\r\n'. The output of the LIST
command will be accumulated (using your own 'collect_incoming_data'
method) up to the terminator, and then control will be returned to
you - by calling your self.found_terminator() method.
"""
from collections import deque
from test.support import asyncore
class async_chat(asyncore.dispatcher):
"""This is an abstract class. You must derive from this class, and add
the two methods collect_incoming_data() and found_terminator()"""
# these are overridable defaults
ac_in_buffer_size = 65536
ac_out_buffer_size = 65536
# we don't want to enable the use of encoding by default, because that is a
# sign of an application bug that we don't want to pass silently
use_encoding = 0
encoding = 'latin-1'
def __init__(self, sock=None, map=None):
# for string terminator matching
self.ac_in_buffer = b''
# we use a list here rather than io.BytesIO for a few reasons...
# del lst[:] is faster than bio.truncate(0)
# lst = [] is faster than bio.truncate(0)
self.incoming = []
# we toss the use of the "simple producer" and replace it with
# a pure deque, which the original fifo was a wrapping of
self.producer_fifo = deque()
asyncore.dispatcher.__init__(self, sock, map)
def collect_incoming_data(self, data):
raise NotImplementedError("must be implemented in subclass")
def _collect_incoming_data(self, data):
self.incoming.append(data)
def _get_data(self):
d = b''.join(self.incoming)
del self.incoming[:]
return d
def found_terminator(self):
raise NotImplementedError("must be implemented in subclass")
def set_terminator(self, term):
"""Set the input delimiter.
Can be a fixed string of any length, an integer, or None.
"""
if isinstance(term, str) and self.use_encoding:
term = bytes(term, self.encoding)
elif isinstance(term, int) and term < 0:
raise ValueError('the number of received bytes must be positive')
self.terminator = term
def get_terminator(self):
return self.terminator
# grab some more data from the socket,
# throw it to the collector method,
# check for the terminator,
# if found, transition to the next state.
def handle_read(self):
try:
data = self.recv(self.ac_in_buffer_size)
except BlockingIOError:
return
except OSError:
self.handle_error()
return
if isinstance(data, str) and self.use_encoding:
data = bytes(str, self.encoding)
self.ac_in_buffer = self.ac_in_buffer + data
# Continue to search for self.terminator in self.ac_in_buffer,
# while calling self.collect_incoming_data. The while loop
# is necessary because we might read several data+terminator
# combos with a single recv(4096).
while self.ac_in_buffer:
lb = len(self.ac_in_buffer)
terminator = self.get_terminator()
if not terminator:
# no terminator, collect it all
self.collect_incoming_data(self.ac_in_buffer)
self.ac_in_buffer = b''
elif isinstance(terminator, int):
# numeric terminator
n = terminator
if lb < n:
self.collect_incoming_data(self.ac_in_buffer)
self.ac_in_buffer = b''
self.terminator = self.terminator - lb
else:
self.collect_incoming_data(self.ac_in_buffer[:n])
self.ac_in_buffer = self.ac_in_buffer[n:]
self.terminator = 0
self.found_terminator()
else:
# 3 cases:
# 1) end of buffer matches terminator exactly:
# collect data, transition
# 2) end of buffer matches some prefix:
# collect data to the prefix
# 3) end of buffer does not match any prefix:
# collect data
terminator_len = len(terminator)
index = self.ac_in_buffer.find(terminator)
if index != -1:
# we found the terminator
if index > 0:
# don't bother reporting the empty string
# (source of subtle bugs)
self.collect_incoming_data(self.ac_in_buffer[:index])
self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:]
# This does the Right Thing if the terminator
# is changed here.
self.found_terminator()
else:
# check for a prefix of the terminator
index = find_prefix_at_end(self.ac_in_buffer, terminator)
if index:
if index != lb:
# we found a prefix, collect up to the prefix
self.collect_incoming_data(self.ac_in_buffer[:-index])
self.ac_in_buffer = self.ac_in_buffer[-index:]
break
else:
# no prefix, collect it all
self.collect_incoming_data(self.ac_in_buffer)
self.ac_in_buffer = b''
def handle_write(self):
self.initiate_send()
def handle_close(self):
self.close()
def push(self, data):
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError('data argument must be byte-ish (%r)',
type(data))
sabs = self.ac_out_buffer_size
if len(data) > sabs:
for i in range(0, len(data), sabs):
self.producer_fifo.append(data[i:i+sabs])
else:
self.producer_fifo.append(data)
self.initiate_send()
def push_with_producer(self, producer):
self.producer_fifo.append(producer)
self.initiate_send()
def readable(self):
"predicate for inclusion in the readable for select()"
# cannot use the old predicate, it violates the claim of the
# set_terminator method.
# return (len(self.ac_in_buffer) <= self.ac_in_buffer_size)
return 1
def writable(self):
"predicate for inclusion in the writable for select()"
return self.producer_fifo or (not self.connected)
def close_when_done(self):
"automatically close this channel once the outgoing queue is empty"
self.producer_fifo.append(None)
def initiate_send(self):
while self.producer_fifo and self.connected:
first = self.producer_fifo[0]
# handle empty string/buffer or None entry
if not first:
del self.producer_fifo[0]
if first is None:
self.handle_close()
return
# handle classic producer behavior
obs = self.ac_out_buffer_size
try:
data = first[:obs]
except TypeError:
data = first.more()
if data:
self.producer_fifo.appendleft(data)
else:
del self.producer_fifo[0]
continue
if isinstance(data, str) and self.use_encoding:
data = bytes(data, self.encoding)
# send the data
try:
num_sent = self.send(data)
except OSError:
self.handle_error()
return
if num_sent:
if num_sent < len(data) or obs < len(first):
self.producer_fifo[0] = first[num_sent:]
else:
del self.producer_fifo[0]
# we tried to send some actual data
return
def discard_buffers(self):
# Emergencies only!
self.ac_in_buffer = b''
del self.incoming[:]
self.producer_fifo.clear()
class simple_producer:
def __init__(self, data, buffer_size=512):
self.data = data
self.buffer_size = buffer_size
def more(self):
if len(self.data) > self.buffer_size:
result = self.data[:self.buffer_size]
self.data = self.data[self.buffer_size:]
return result
else:
result = self.data
self.data = b''
return result
# Given 'haystack', see if any prefix of 'needle' is at its end. This
# assumes an exact match has already been checked. Return the number of
# characters matched.
# for example:
# f_p_a_e("qwerty\r", "\r\n") => 1
# f_p_a_e("qwertydkjf", "\r\n") => 0
# f_p_a_e("qwerty\r\n", "\r\n") => <undefined>
# this could maybe be made faster with a computed regex?
# [answer: no; circa Python-2.0, Jan 2001]
# new python: 28961/s
# old python: 18307/s
# re: 12820/s
# regex: 14035/s
def find_prefix_at_end(haystack, needle):
l = len(needle) - 1
while l and not haystack.endswith(needle[:l]):
l -= 1
return l

View File

@@ -0,0 +1,649 @@
# TODO: This module was deprecated and removed from CPython 3.12
# Now it is a test-only helper. Any attempts to rewrite exising tests that
# are using this module and remove it completely are appreciated!
# See: https://github.com/python/cpython/issues/72719
# -*- Mode: Python -*-
# Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
# Author: Sam Rushing <rushing@nightmare.com>
# ======================================================================
# Copyright 1996 by Sam Rushing
#
# All Rights Reserved
#
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose and without fee is hereby
# granted, provided that the above copyright notice appear in all
# copies and that both that copyright notice and this permission
# notice appear in supporting documentation, and that the name of Sam
# Rushing not be used in advertising or publicity pertaining to
# distribution of the software without specific, written prior
# permission.
#
# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# ======================================================================
"""Basic infrastructure for asynchronous socket service clients and servers.
There are only two ways to have a program on a single processor do "more
than one thing at a time". Multi-threaded programming is the simplest and
most popular way to do it, but there is another very different technique,
that lets you have nearly all the advantages of multi-threading, without
actually using multiple threads. it's really only practical if your program
is largely I/O bound. If your program is CPU bound, then pre-emptive
scheduled threads are probably what you really need. Network servers are
rarely CPU-bound, however.
If your operating system supports the select() system call in its I/O
library (and nearly all do), then you can use it to juggle multiple
communication channels at once; doing other work while your I/O is taking
place in the "background." Although this strategy can seem strange and
complex, especially at first, it is in many ways easier to understand and
control than multi-threaded programming. The module documented here solves
many of the difficult problems for you, making the task of building
sophisticated high-performance network servers and clients a snap.
"""
import select
import socket
import sys
import time
import warnings
import os
from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, EINVAL, \
ENOTCONN, ESHUTDOWN, EISCONN, EBADF, ECONNABORTED, EPIPE, EAGAIN, \
errorcode
_DISCONNECTED = frozenset({ECONNRESET, ENOTCONN, ESHUTDOWN, ECONNABORTED, EPIPE,
EBADF})
try:
socket_map
except NameError:
socket_map = {}
def _strerror(err):
try:
return os.strerror(err)
except (ValueError, OverflowError, NameError):
if err in errorcode:
return errorcode[err]
return "Unknown error %s" %err
class ExitNow(Exception):
pass
_reraised_exceptions = (ExitNow, KeyboardInterrupt, SystemExit)
def read(obj):
try:
obj.handle_read_event()
except _reraised_exceptions:
raise
except:
obj.handle_error()
def write(obj):
try:
obj.handle_write_event()
except _reraised_exceptions:
raise
except:
obj.handle_error()
def _exception(obj):
try:
obj.handle_expt_event()
except _reraised_exceptions:
raise
except:
obj.handle_error()
def readwrite(obj, flags):
try:
if flags & select.POLLIN:
obj.handle_read_event()
if flags & select.POLLOUT:
obj.handle_write_event()
if flags & select.POLLPRI:
obj.handle_expt_event()
if flags & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
obj.handle_close()
except OSError as e:
if e.errno not in _DISCONNECTED:
obj.handle_error()
else:
obj.handle_close()
except _reraised_exceptions:
raise
except:
obj.handle_error()
def poll(timeout=0.0, map=None):
if map is None:
map = socket_map
if map:
r = []; w = []; e = []
for fd, obj in list(map.items()):
is_r = obj.readable()
is_w = obj.writable()
if is_r:
r.append(fd)
# accepting sockets should not be writable
if is_w and not obj.accepting:
w.append(fd)
if is_r or is_w:
e.append(fd)
if [] == r == w == e:
time.sleep(timeout)
return
r, w, e = select.select(r, w, e, timeout)
for fd in r:
obj = map.get(fd)
if obj is None:
continue
read(obj)
for fd in w:
obj = map.get(fd)
if obj is None:
continue
write(obj)
for fd in e:
obj = map.get(fd)
if obj is None:
continue
_exception(obj)
def poll2(timeout=0.0, map=None):
# Use the poll() support added to the select module in Python 2.0
if map is None:
map = socket_map
if timeout is not None:
# timeout is in milliseconds
timeout = int(timeout*1000)
pollster = select.poll()
if map:
for fd, obj in list(map.items()):
flags = 0
if obj.readable():
flags |= select.POLLIN | select.POLLPRI
# accepting sockets should not be writable
if obj.writable() and not obj.accepting:
flags |= select.POLLOUT
if flags:
pollster.register(fd, flags)
r = pollster.poll(timeout)
for fd, flags in r:
obj = map.get(fd)
if obj is None:
continue
readwrite(obj, flags)
poll3 = poll2 # Alias for backward compatibility
def loop(timeout=30.0, use_poll=False, map=None, count=None):
if map is None:
map = socket_map
if use_poll and hasattr(select, 'poll'):
poll_fun = poll2
else:
poll_fun = poll
if count is None:
while map:
poll_fun(timeout, map)
else:
while map and count > 0:
poll_fun(timeout, map)
count = count - 1
class dispatcher:
debug = False
connected = False
accepting = False
connecting = False
closing = False
addr = None
ignore_log_types = frozenset({'warning'})
def __init__(self, sock=None, map=None):
if map is None:
self._map = socket_map
else:
self._map = map
self._fileno = None
if sock:
# Set to nonblocking just to make sure for cases where we
# get a socket from a blocking source.
sock.setblocking(False)
self.set_socket(sock, map)
self.connected = True
# The constructor no longer requires that the socket
# passed be connected.
try:
self.addr = sock.getpeername()
except OSError as err:
if err.errno in (ENOTCONN, EINVAL):
# To handle the case where we got an unconnected
# socket.
self.connected = False
else:
# The socket is broken in some unknown way, alert
# the user and remove it from the map (to prevent
# polling of broken sockets).
self.del_channel(map)
raise
else:
self.socket = None
def __repr__(self):
status = [self.__class__.__module__+"."+self.__class__.__qualname__]
if self.accepting and self.addr:
status.append('listening')
elif self.connected:
status.append('connected')
if self.addr is not None:
try:
status.append('%s:%d' % self.addr)
except TypeError:
status.append(repr(self.addr))
return '<%s at %#x>' % (' '.join(status), id(self))
def add_channel(self, map=None):
#self.log_info('adding channel %s' % self)
if map is None:
map = self._map
map[self._fileno] = self
def del_channel(self, map=None):
fd = self._fileno
if map is None:
map = self._map
if fd in map:
#self.log_info('closing channel %d:%s' % (fd, self))
del map[fd]
self._fileno = None
def create_socket(self, family=socket.AF_INET, type=socket.SOCK_STREAM):
self.family_and_type = family, type
sock = socket.socket(family, type)
sock.setblocking(False)
self.set_socket(sock)
def set_socket(self, sock, map=None):
self.socket = sock
self._fileno = sock.fileno()
self.add_channel(map)
def set_reuse_addr(self):
# try to re-use a server port if possible
try:
self.socket.setsockopt(
socket.SOL_SOCKET, socket.SO_REUSEADDR,
self.socket.getsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR) | 1
)
except OSError:
pass
# ==================================================
# predicates for select()
# these are used as filters for the lists of sockets
# to pass to select().
# ==================================================
def readable(self):
return True
def writable(self):
return True
# ==================================================
# socket object methods.
# ==================================================
def listen(self, num):
self.accepting = True
if os.name == 'nt' and num > 5:
num = 5
return self.socket.listen(num)
def bind(self, addr):
self.addr = addr
return self.socket.bind(addr)
def connect(self, address):
self.connected = False
self.connecting = True
err = self.socket.connect_ex(address)
if err in (EINPROGRESS, EALREADY, EWOULDBLOCK) \
or err == EINVAL and os.name == 'nt':
self.addr = address
return
if err in (0, EISCONN):
self.addr = address
self.handle_connect_event()
else:
raise OSError(err, errorcode[err])
def accept(self):
# XXX can return either an address pair or None
try:
conn, addr = self.socket.accept()
except TypeError:
return None
except OSError as why:
if why.errno in (EWOULDBLOCK, ECONNABORTED, EAGAIN):
return None
else:
raise
else:
return conn, addr
def send(self, data):
try:
result = self.socket.send(data)
return result
except OSError as why:
if why.errno == EWOULDBLOCK:
return 0
elif why.errno in _DISCONNECTED:
self.handle_close()
return 0
else:
raise
def recv(self, buffer_size):
try:
data = self.socket.recv(buffer_size)
if not data:
# a closed connection is indicated by signaling
# a read condition, and having recv() return 0.
self.handle_close()
return b''
else:
return data
except OSError as why:
# winsock sometimes raises ENOTCONN
if why.errno in _DISCONNECTED:
self.handle_close()
return b''
else:
raise
def close(self):
self.connected = False
self.accepting = False
self.connecting = False
self.del_channel()
if self.socket is not None:
try:
self.socket.close()
except OSError as why:
if why.errno not in (ENOTCONN, EBADF):
raise
# log and log_info may be overridden to provide more sophisticated
# logging and warning methods. In general, log is for 'hit' logging
# and 'log_info' is for informational, warning and error logging.
def log(self, message):
sys.stderr.write('log: %s\n' % str(message))
def log_info(self, message, type='info'):
if type not in self.ignore_log_types:
print('%s: %s' % (type, message))
def handle_read_event(self):
if self.accepting:
# accepting sockets are never connected, they "spawn" new
# sockets that are connected
self.handle_accept()
elif not self.connected:
if self.connecting:
self.handle_connect_event()
self.handle_read()
else:
self.handle_read()
def handle_connect_event(self):
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
raise OSError(err, _strerror(err))
self.handle_connect()
self.connected = True
self.connecting = False
def handle_write_event(self):
if self.accepting:
# Accepting sockets shouldn't get a write event.
# We will pretend it didn't happen.
return
if not self.connected:
if self.connecting:
self.handle_connect_event()
self.handle_write()
def handle_expt_event(self):
# handle_expt_event() is called if there might be an error on the
# socket, or if there is OOB data
# check for the error condition first
err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if err != 0:
# we can get here when select.select() says that there is an
# exceptional condition on the socket
# since there is an error, we'll go ahead and close the socket
# like we would in a subclassed handle_read() that received no
# data
self.handle_close()
else:
self.handle_expt()
def handle_error(self):
nil, t, v, tbinfo = compact_traceback()
# sometimes a user repr method will crash.
try:
self_repr = repr(self)
except:
self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
self.log_info(
'uncaptured python exception, closing channel %s (%s:%s %s)' % (
self_repr,
t,
v,
tbinfo
),
'error'
)
self.handle_close()
def handle_expt(self):
self.log_info('unhandled incoming priority event', 'warning')
def handle_read(self):
self.log_info('unhandled read event', 'warning')
def handle_write(self):
self.log_info('unhandled write event', 'warning')
def handle_connect(self):
self.log_info('unhandled connect event', 'warning')
def handle_accept(self):
pair = self.accept()
if pair is not None:
self.handle_accepted(*pair)
def handle_accepted(self, sock, addr):
sock.close()
self.log_info('unhandled accepted event', 'warning')
def handle_close(self):
self.log_info('unhandled close event', 'warning')
self.close()
# ---------------------------------------------------------------------------
# adds simple buffered output capability, useful for simple clients.
# [for more sophisticated usage use asynchat.async_chat]
# ---------------------------------------------------------------------------
class dispatcher_with_send(dispatcher):
def __init__(self, sock=None, map=None):
dispatcher.__init__(self, sock, map)
self.out_buffer = b''
def initiate_send(self):
num_sent = 0
num_sent = dispatcher.send(self, self.out_buffer[:65536])
self.out_buffer = self.out_buffer[num_sent:]
def handle_write(self):
self.initiate_send()
def writable(self):
return (not self.connected) or len(self.out_buffer)
def send(self, data):
if self.debug:
self.log_info('sending %s' % repr(data))
self.out_buffer = self.out_buffer + data
self.initiate_send()
# ---------------------------------------------------------------------------
# used for debugging.
# ---------------------------------------------------------------------------
def compact_traceback():
exc = sys.exception()
tb = exc.__traceback__
if not tb: # Must have a traceback
raise AssertionError("traceback does not exist")
tbinfo = []
while tb:
tbinfo.append((
tb.tb_frame.f_code.co_filename,
tb.tb_frame.f_code.co_name,
str(tb.tb_lineno)
))
tb = tb.tb_next
# just to be safe
del tb
file, function, line = tbinfo[-1]
info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
return (file, function, line), type(exc), exc, info
def close_all(map=None, ignore_all=False):
if map is None:
map = socket_map
for x in list(map.values()):
try:
x.close()
except OSError as x:
if x.errno == EBADF:
pass
elif not ignore_all:
raise
except _reraised_exceptions:
raise
except:
if not ignore_all:
raise
map.clear()
# Asynchronous File I/O:
#
# After a little research (reading man pages on various unixen, and
# digging through the linux kernel), I've determined that select()
# isn't meant for doing asynchronous file i/o.
# Heartening, though - reading linux/mm/filemap.c shows that linux
# supports asynchronous read-ahead. So _MOST_ of the time, the data
# will be sitting in memory for us already when we go to read it.
#
# What other OS's (besides NT) support async file i/o? [VMS?]
#
# Regardless, this is useful for pipes, and stdin/stdout...
if os.name == 'posix':
class file_wrapper:
# Here we override just enough to make a file
# look like a socket for the purposes of asyncore.
# The passed fd is automatically os.dup()'d
def __init__(self, fd):
self.fd = os.dup(fd)
def __del__(self):
if self.fd >= 0:
warnings.warn("unclosed file %r" % self, ResourceWarning,
source=self)
self.close()
def recv(self, *args):
return os.read(self.fd, *args)
def send(self, *args):
return os.write(self.fd, *args)
def getsockopt(self, level, optname, buflen=None):
if (level == socket.SOL_SOCKET and
optname == socket.SO_ERROR and
not buflen):
return 0
raise NotImplementedError("Only asyncore specific behaviour "
"implemented.")
read = recv
write = send
def close(self):
if self.fd < 0:
return
fd = self.fd
self.fd = -1
os.close(fd)
def fileno(self):
return self.fd
class file_dispatcher(dispatcher):
def __init__(self, fd, map=None):
dispatcher.__init__(self, None, map)
self.connected = True
try:
fd = fd.fileno()
except AttributeError:
pass
self.set_file(fd)
# set it to non-blocking mode
os.set_blocking(fd, False)
def set_file(self, fd):
self.socket = file_wrapper(fd)
self._fileno = self.socket.fileno()
self.add_channel()

View File

@@ -0,0 +1,143 @@
"""bytecode_helper - support tools for testing correct bytecode generation"""
import unittest
import dis
import io
from _testinternalcapi import compiler_codegen, optimize_cfg, assemble_code_object
_UNSPECIFIED = object()
class BytecodeTestCase(unittest.TestCase):
"""Custom assertion methods for inspecting bytecode."""
def get_disassembly_as_string(self, co):
s = io.StringIO()
dis.dis(co, file=s)
return s.getvalue()
def assertInBytecode(self, x, opname, argval=_UNSPECIFIED):
"""Returns instr if opname is found, otherwise throws AssertionError"""
self.assertIn(opname, dis.opmap)
for instr in dis.get_instructions(x):
if instr.opname == opname:
if argval is _UNSPECIFIED or instr.argval == argval:
return instr
disassembly = self.get_disassembly_as_string(x)
if argval is _UNSPECIFIED:
msg = '%s not found in bytecode:\n%s' % (opname, disassembly)
else:
msg = '(%s,%r) not found in bytecode:\n%s'
msg = msg % (opname, argval, disassembly)
self.fail(msg)
def assertNotInBytecode(self, x, opname, argval=_UNSPECIFIED):
"""Throws AssertionError if opname is found"""
self.assertIn(opname, dis.opmap)
for instr in dis.get_instructions(x):
if instr.opname == opname:
disassembly = self.get_disassembly_as_string(x)
if argval is _UNSPECIFIED:
msg = '%s occurs in bytecode:\n%s' % (opname, disassembly)
self.fail(msg)
elif instr.argval == argval:
msg = '(%s,%r) occurs in bytecode:\n%s'
msg = msg % (opname, argval, disassembly)
self.fail(msg)
class CompilationStepTestCase(unittest.TestCase):
HAS_ARG = set(dis.hasarg)
HAS_TARGET = set(dis.hasjrel + dis.hasjabs + dis.hasexc)
HAS_ARG_OR_TARGET = HAS_ARG.union(HAS_TARGET)
class Label:
pass
def assertInstructionsMatch(self, actual_, expected_):
# get two lists where each entry is a label or
# an instruction tuple. Normalize the labels to the
# instruction count of the target, and compare the lists.
self.assertIsInstance(actual_, list)
self.assertIsInstance(expected_, list)
actual = self.normalize_insts(actual_)
expected = self.normalize_insts(expected_)
self.assertEqual(len(actual), len(expected))
# compare instructions
for act, exp in zip(actual, expected):
if isinstance(act, int):
self.assertEqual(exp, act)
continue
self.assertIsInstance(exp, tuple)
self.assertIsInstance(act, tuple)
# crop comparison to the provided expected values
if len(act) > len(exp):
act = act[:len(exp)]
self.assertEqual(exp, act)
def resolveAndRemoveLabels(self, insts):
idx = 0
res = []
for item in insts:
assert isinstance(item, (self.Label, tuple))
if isinstance(item, self.Label):
item.value = idx
else:
idx += 1
res.append(item)
return res
def normalize_insts(self, insts):
""" Map labels to instruction index.
Map opcodes to opnames.
"""
insts = self.resolveAndRemoveLabels(insts)
res = []
for item in insts:
assert isinstance(item, tuple)
opcode, oparg, *loc = item
opcode = dis.opmap.get(opcode, opcode)
if isinstance(oparg, self.Label):
arg = oparg.value
else:
arg = oparg if opcode in self.HAS_ARG else None
opcode = dis.opname[opcode]
res.append((opcode, arg, *loc))
return res
def complete_insts_info(self, insts):
# fill in omitted fields in location, and oparg 0 for ops with no arg.
res = []
for item in insts:
assert isinstance(item, tuple)
inst = list(item)
opcode = dis.opmap[inst[0]]
oparg = inst[1]
loc = inst[2:] + [-1] * (6 - len(inst))
res.append((opcode, oparg, *loc))
return res
class CodegenTestCase(CompilationStepTestCase):
def generate_code(self, ast):
insts, _ = compiler_codegen(ast, "my_file.py", 0)
return insts
class CfgOptimizationTestCase(CompilationStepTestCase):
def get_optimized(self, insts, consts, nlocals=0):
insts = self.normalize_insts(insts)
insts = self.complete_insts_info(insts)
insts = optimize_cfg(insts, consts, nlocals)
return insts, consts
class AssemblerTestCase(CompilationStepTestCase):
def get_code_object(self, filename, insts, metadata):
co = assemble_code_object(filename, insts, metadata)
return co

View File

@@ -0,0 +1,51 @@
import functools
import hashlib
import unittest
try:
import _hashlib
except ImportError:
_hashlib = None
def requires_hashdigest(digestname, openssl=None, usedforsecurity=True):
"""Decorator raising SkipTest if a hashing algorithm is not available
The hashing algorithm could be missing or blocked by a strict crypto
policy.
If 'openssl' is True, then the decorator checks that OpenSSL provides
the algorithm. Otherwise the check falls back to built-in
implementations. The usedforsecurity flag is passed to the constructor.
ValueError: [digital envelope routines: EVP_DigestInit_ex] disabled for FIPS
ValueError: unsupported hash type md4
"""
def decorator(func_or_class):
if isinstance(func_or_class, type):
setUpClass = func_or_class.__dict__.get('setUpClass')
if setUpClass is None:
def setUpClass(cls):
super(func_or_class, cls).setUpClass()
setUpClass.__qualname__ = func_or_class.__qualname__ + '.setUpClass'
setUpClass.__module__ = func_or_class.__module__
else:
setUpClass = setUpClass.__func__
setUpClass = classmethod(decorator(setUpClass))
func_or_class.setUpClass = setUpClass
return func_or_class
@functools.wraps(func_or_class)
def wrapper(*args, **kwargs):
try:
if openssl and _hashlib is not None:
_hashlib.new(digestname, usedforsecurity=usedforsecurity)
else:
hashlib.new(digestname, usedforsecurity=usedforsecurity)
except ValueError:
raise unittest.SkipTest(
f"hash digest '{digestname}' is not available."
)
return func_or_class(*args, **kwargs)
return wrapper
return decorator

View File

@@ -0,0 +1,38 @@
import os
try:
import hypothesis
except ImportError:
from . import _hypothesis_stubs as hypothesis
else:
# When using the real Hypothesis, we'll configure it to ignore occasional
# slow tests (avoiding flakiness from random VM slowness in CI).
hypothesis.settings.register_profile(
"slow-is-ok",
deadline=None,
suppress_health_check=[
hypothesis.HealthCheck.too_slow,
hypothesis.HealthCheck.differing_executors,
],
)
hypothesis.settings.load_profile("slow-is-ok")
# For local development, we'll write to the default on-local-disk database
# of failing examples, and also use a pull-through cache to automatically
# replay any failing examples discovered in CI. For details on how this
# works, see https://hypothesis.readthedocs.io/en/latest/database.html
if "CI" not in os.environ:
from hypothesis.database import (
GitHubArtifactDatabase,
MultiplexedDatabase,
ReadOnlyDatabase,
)
hypothesis.settings.register_profile(
"cpython-local-dev",
database=MultiplexedDatabase(
hypothesis.settings.default.database,
ReadOnlyDatabase(GitHubArtifactDatabase("python", "cpython")),
),
)
hypothesis.settings.load_profile("cpython-local-dev")

View File

@@ -0,0 +1,276 @@
import contextlib
import _imp
import importlib
import importlib.util
import os
import shutil
import sys
import unittest
import warnings
from .os_helper import unlink
@contextlib.contextmanager
def _ignore_deprecated_imports(ignore=True):
"""Context manager to suppress package and module deprecation
warnings when importing them.
If ignore is False, this context manager has no effect.
"""
if ignore:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", ".+ (module|package)",
DeprecationWarning)
yield
else:
yield
def unload(name):
try:
del sys.modules[name]
except KeyError:
pass
def forget(modname):
"""'Forget' a module was ever imported.
This removes the module from sys.modules and deletes any PEP 3147/488 or
legacy .pyc files.
"""
unload(modname)
for dirname in sys.path:
source = os.path.join(dirname, modname + '.py')
# It doesn't matter if they exist or not, unlink all possible
# combinations of PEP 3147/488 and legacy pyc files.
unlink(source + 'c')
for opt in ('', 1, 2):
unlink(importlib.util.cache_from_source(source, optimization=opt))
def make_legacy_pyc(source):
"""Move a PEP 3147/488 pyc file to its legacy pyc location.
:param source: The file system path to the source file. The source file
does not need to exist, however the PEP 3147/488 pyc file must exist.
:return: The file system path to the legacy pyc file.
"""
pyc_file = importlib.util.cache_from_source(source)
up_one = os.path.dirname(os.path.abspath(source))
legacy_pyc = os.path.join(up_one, source + 'c')
shutil.move(pyc_file, legacy_pyc)
return legacy_pyc
def import_module(name, deprecated=False, *, required_on=()):
"""Import and return the module to be tested, raising SkipTest if
it is not available.
If deprecated is True, any module or package deprecation messages
will be suppressed. If a module is required on a platform but optional for
others, set required_on to an iterable of platform prefixes which will be
compared against sys.platform.
"""
with _ignore_deprecated_imports(deprecated):
try:
return importlib.import_module(name)
except ImportError as msg:
if sys.platform.startswith(tuple(required_on)):
raise
raise unittest.SkipTest(str(msg))
def _save_and_remove_modules(names):
orig_modules = {}
prefixes = tuple(name + '.' for name in names)
for modname in list(sys.modules):
if modname in names or modname.startswith(prefixes):
orig_modules[modname] = sys.modules.pop(modname)
return orig_modules
@contextlib.contextmanager
def frozen_modules(enabled=True):
"""Force frozen modules to be used (or not).
This only applies to modules that haven't been imported yet.
Also, some essential modules will always be imported frozen.
"""
_imp._override_frozen_modules_for_tests(1 if enabled else -1)
try:
yield
finally:
_imp._override_frozen_modules_for_tests(0)
@contextlib.contextmanager
def multi_interp_extensions_check(enabled=True):
"""Force legacy modules to be allowed in subinterpreters (or not).
("legacy" == single-phase init)
This only applies to modules that haven't been imported yet.
It overrides the PyInterpreterConfig.check_multi_interp_extensions
setting (see support.run_in_subinterp_with_config() and
_xxsubinterpreters.create()).
Also see importlib.utils.allowing_all_extensions().
"""
old = _imp._override_multi_interp_extensions_check(1 if enabled else -1)
try:
yield
finally:
_imp._override_multi_interp_extensions_check(old)
def import_fresh_module(name, fresh=(), blocked=(), *,
deprecated=False,
usefrozen=False,
):
"""Import and return a module, deliberately bypassing sys.modules.
This function imports and returns a fresh copy of the named Python module
by removing the named module from sys.modules before doing the import.
Note that unlike reload, the original module is not affected by
this operation.
*fresh* is an iterable of additional module names that are also removed
from the sys.modules cache before doing the import. If one of these
modules can't be imported, None is returned.
*blocked* is an iterable of module names that are replaced with None
in the module cache during the import to ensure that attempts to import
them raise ImportError.
The named module and any modules named in the *fresh* and *blocked*
parameters are saved before starting the import and then reinserted into
sys.modules when the fresh import is complete.
Module and package deprecation messages are suppressed during this import
if *deprecated* is True.
This function will raise ImportError if the named module cannot be
imported.
If "usefrozen" is False (the default) then the frozen importer is
disabled (except for essential modules like importlib._bootstrap).
"""
# NOTE: test_heapq, test_json and test_warnings include extra sanity checks
# to make sure that this utility function is working as expected
with _ignore_deprecated_imports(deprecated):
# Keep track of modules saved for later restoration as well
# as those which just need a blocking entry removed
fresh = list(fresh)
blocked = list(blocked)
names = {name, *fresh, *blocked}
orig_modules = _save_and_remove_modules(names)
for modname in blocked:
sys.modules[modname] = None
try:
with frozen_modules(usefrozen):
# Return None when one of the "fresh" modules can not be imported.
try:
for modname in fresh:
__import__(modname)
except ImportError:
return None
return importlib.import_module(name)
finally:
_save_and_remove_modules(names)
sys.modules.update(orig_modules)
class CleanImport(object):
"""Context manager to force import to return a new module reference.
This is useful for testing module-level behaviours, such as
the emission of a DeprecationWarning on import.
Use like this:
with CleanImport("foo"):
importlib.import_module("foo") # new reference
If "usefrozen" is False (the default) then the frozen importer is
disabled (except for essential modules like importlib._bootstrap).
"""
def __init__(self, *module_names, usefrozen=False):
self.original_modules = sys.modules.copy()
for module_name in module_names:
if module_name in sys.modules:
module = sys.modules[module_name]
# It is possible that module_name is just an alias for
# another module (e.g. stub for modules renamed in 3.x).
# In that case, we also need delete the real module to clear
# the import cache.
if module.__name__ != module_name:
del sys.modules[module.__name__]
del sys.modules[module_name]
self._frozen_modules = frozen_modules(usefrozen)
def __enter__(self):
self._frozen_modules.__enter__()
return self
def __exit__(self, *ignore_exc):
sys.modules.update(self.original_modules)
self._frozen_modules.__exit__(*ignore_exc)
class DirsOnSysPath(object):
"""Context manager to temporarily add directories to sys.path.
This makes a copy of sys.path, appends any directories given
as positional arguments, then reverts sys.path to the copied
settings when the context ends.
Note that *all* sys.path modifications in the body of the
context manager, including replacement of the object,
will be reverted at the end of the block.
"""
def __init__(self, *paths):
self.original_value = sys.path[:]
self.original_object = sys.path
sys.path.extend(paths)
def __enter__(self):
return self
def __exit__(self, *ignore_exc):
sys.path = self.original_object
sys.path[:] = self.original_value
def modules_setup():
return sys.modules.copy(),
def modules_cleanup(oldmodules):
# Encoders/decoders are registered permanently within the internal
# codec cache. If we destroy the corresponding modules their
# globals will be set to None which will trip up the cached functions.
encodings = [(k, v) for k, v in sys.modules.items()
if k.startswith('encodings.')]
sys.modules.clear()
sys.modules.update(encodings)
# XXX: This kind of problem can affect more than just encodings.
# In particular extension modules (such as _ssl) don't cope
# with reloading properly. Really, test modules should be cleaning
# out the test specific modules they know they added (ala test_runpy)
# rather than relying on this function (as test_importhooks and test_pkg
# do currently). Implicitly imported *real* modules should be left alone
# (see issue 10556).
sys.modules.update(oldmodules)
def mock_register_at_fork(func):
# bpo-30599: Mock os.register_at_fork() when importing the random module,
# since this function doesn't allow to unregister callbacks and would leak
# memory.
from unittest import mock
return mock.patch('os.register_at_fork', create=True)(func)

View File

@@ -0,0 +1,198 @@
"""Subinterpreters High Level Module."""
import time
import _xxsubinterpreters as _interpreters
import _xxinterpchannels as _channels
# aliases:
from _xxsubinterpreters import is_shareable, RunFailedError
from _xxinterpchannels import (
ChannelError, ChannelNotFoundError, ChannelEmptyError,
)
__all__ = [
'Interpreter', 'get_current', 'get_main', 'create', 'list_all',
'SendChannel', 'RecvChannel',
'create_channel', 'list_all_channels', 'is_shareable',
'ChannelError', 'ChannelNotFoundError',
'ChannelEmptyError',
]
def create(*, isolated=True):
"""Return a new (idle) Python interpreter."""
id = _interpreters.create(isolated=isolated)
return Interpreter(id, isolated=isolated)
def list_all():
"""Return all existing interpreters."""
return [Interpreter(id) for id in _interpreters.list_all()]
def get_current():
"""Return the currently running interpreter."""
id = _interpreters.get_current()
return Interpreter(id)
def get_main():
"""Return the main interpreter."""
id = _interpreters.get_main()
return Interpreter(id)
class Interpreter:
"""A single Python interpreter."""
def __init__(self, id, *, isolated=None):
if not isinstance(id, (int, _interpreters.InterpreterID)):
raise TypeError(f'id must be an int, got {id!r}')
self._id = id
self._isolated = isolated
def __repr__(self):
data = dict(id=int(self._id), isolated=self._isolated)
kwargs = (f'{k}={v!r}' for k, v in data.items())
return f'{type(self).__name__}({", ".join(kwargs)})'
def __hash__(self):
return hash(self._id)
def __eq__(self, other):
if not isinstance(other, Interpreter):
return NotImplemented
else:
return other._id == self._id
@property
def id(self):
return self._id
@property
def isolated(self):
if self._isolated is None:
# XXX The low-level function has not been added yet.
# See bpo-....
self._isolated = _interpreters.is_isolated(self._id)
return self._isolated
def is_running(self):
"""Return whether or not the identified interpreter is running."""
return _interpreters.is_running(self._id)
def close(self):
"""Finalize and destroy the interpreter.
Attempting to destroy the current interpreter results
in a RuntimeError.
"""
return _interpreters.destroy(self._id)
def run(self, src_str, /, *, channels=None):
"""Run the given source code in the interpreter.
This blocks the current Python thread until done.
"""
_interpreters.run_string(self._id, src_str, channels)
def create_channel():
"""Return (recv, send) for a new cross-interpreter channel.
The channel may be used to pass data safely between interpreters.
"""
cid = _channels.create()
recv, send = RecvChannel(cid), SendChannel(cid)
return recv, send
def list_all_channels():
"""Return a list of (recv, send) for all open channels."""
return [(RecvChannel(cid), SendChannel(cid))
for cid in _channels.list_all()]
class _ChannelEnd:
"""The base class for RecvChannel and SendChannel."""
def __init__(self, id):
if not isinstance(id, (int, _channels.ChannelID)):
raise TypeError(f'id must be an int, got {id!r}')
self._id = id
def __repr__(self):
return f'{type(self).__name__}(id={int(self._id)})'
def __hash__(self):
return hash(self._id)
def __eq__(self, other):
if isinstance(self, RecvChannel):
if not isinstance(other, RecvChannel):
return NotImplemented
elif not isinstance(other, SendChannel):
return NotImplemented
return other._id == self._id
@property
def id(self):
return self._id
_NOT_SET = object()
class RecvChannel(_ChannelEnd):
"""The receiving end of a cross-interpreter channel."""
def recv(self, *, _sentinel=object(), _delay=10 / 1000): # 10 milliseconds
"""Return the next object from the channel.
This blocks until an object has been sent, if none have been
sent already.
"""
obj = _channels.recv(self._id, _sentinel)
while obj is _sentinel:
time.sleep(_delay)
obj = _channels.recv(self._id, _sentinel)
return obj
def recv_nowait(self, default=_NOT_SET):
"""Return the next object from the channel.
If none have been sent then return the default if one
is provided or fail with ChannelEmptyError. Otherwise this
is the same as recv().
"""
if default is _NOT_SET:
return _channels.recv(self._id)
else:
return _channels.recv(self._id, default)
class SendChannel(_ChannelEnd):
"""The sending end of a cross-interpreter channel."""
def send(self, obj):
"""Send the object (i.e. its data) to the channel's receiving end.
This blocks until the object is received.
"""
_channels.send(self._id, obj)
# XXX We are missing a low-level channel_send_wait().
# See bpo-32604 and gh-19829.
# Until that shows up we fake it:
time.sleep(2)
def send_nowait(self, obj):
"""Send the object to the channel's receiving end.
If the object is immediately received then return True
(else False). Otherwise this is the same as send().
"""
# XXX Note that at the moment channel_send() only ever returns
# None. This should be fixed when channel_send_wait() is added.
# See bpo-32604 and gh-19829.
return _channels.send(self._id, obj)

View File

@@ -0,0 +1,29 @@
import logging.handlers
class TestHandler(logging.handlers.BufferingHandler):
def __init__(self, matcher):
# BufferingHandler takes a "capacity" argument
# so as to know when to flush. As we're overriding
# shouldFlush anyway, we can set a capacity of zero.
# You can call flush() manually to clear out the
# buffer.
logging.handlers.BufferingHandler.__init__(self, 0)
self.matcher = matcher
def shouldFlush(self):
return False
def emit(self, record):
self.format(record)
self.buffer.append(record.__dict__)
def matches(self, **kwargs):
"""
Look for a saved dict whose keys/values match the supplied arguments.
"""
result = False
for d in self.buffer:
if self.matcher.matches(d, **kwargs):
result = True
break
return result

View File

@@ -0,0 +1,753 @@
import collections.abc
import contextlib
import errno
import os
import re
import stat
import string
import sys
import time
import unittest
import warnings
# Filename used for testing
TESTFN_ASCII = '@test'
# Disambiguate TESTFN for parallel testing, while letting it remain a valid
# module name.
TESTFN_ASCII = "{}_{}_tmp".format(TESTFN_ASCII, os.getpid())
# TESTFN_UNICODE is a non-ascii filename
TESTFN_UNICODE = TESTFN_ASCII + "-\xe0\xf2\u0258\u0141\u011f"
if sys.platform == 'darwin':
# In Mac OS X's VFS API file names are, by definition, canonically
# decomposed Unicode, encoded using UTF-8. See QA1173:
# http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
import unicodedata
TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
# TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
# encoded by the filesystem encoding (in strict mode). It can be None if we
# cannot generate such filename.
TESTFN_UNENCODABLE = None
if os.name == 'nt':
# skip win32s (0) or Windows 9x/ME (1)
if sys.getwindowsversion().platform >= 2:
# Different kinds of characters from various languages to minimize the
# probability that the whole name is encodable to MBCS (issue #9819)
TESTFN_UNENCODABLE = TESTFN_ASCII + "-\u5171\u0141\u2661\u0363\uDC80"
try:
TESTFN_UNENCODABLE.encode(sys.getfilesystemencoding())
except UnicodeEncodeError:
pass
else:
print('WARNING: The filename %r CAN be encoded by the filesystem '
'encoding (%s). Unicode filename tests may not be effective'
% (TESTFN_UNENCODABLE, sys.getfilesystemencoding()))
TESTFN_UNENCODABLE = None
# macOS and Emscripten deny unencodable filenames (invalid utf-8)
elif sys.platform not in {'darwin', 'emscripten', 'wasi'}:
try:
# ascii and utf-8 cannot encode the byte 0xff
b'\xff'.decode(sys.getfilesystemencoding())
except UnicodeDecodeError:
# 0xff will be encoded using the surrogate character u+DCFF
TESTFN_UNENCODABLE = TESTFN_ASCII \
+ b'-\xff'.decode(sys.getfilesystemencoding(), 'surrogateescape')
else:
# File system encoding (eg. ISO-8859-* encodings) can encode
# the byte 0xff. Skip some unicode filename tests.
pass
# FS_NONASCII: non-ASCII character encodable by os.fsencode(),
# or an empty string if there is no such character.
FS_NONASCII = ''
for character in (
# First try printable and common characters to have a readable filename.
# For each character, the encoding list are just example of encodings able
# to encode the character (the list is not exhaustive).
# U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
'\u00E6',
# U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
'\u0130',
# U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
'\u0141',
# U+03C6 (Greek Small Letter Phi): cp1253
'\u03C6',
# U+041A (Cyrillic Capital Letter Ka): cp1251
'\u041A',
# U+05D0 (Hebrew Letter Alef): Encodable to cp424
'\u05D0',
# U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
'\u060C',
# U+062A (Arabic Letter Teh): cp720
'\u062A',
# U+0E01 (Thai Character Ko Kai): cp874
'\u0E01',
# Then try more "special" characters. "special" because they may be
# interpreted or displayed differently depending on the exact locale
# encoding and the font.
# U+00A0 (No-Break Space)
'\u00A0',
# U+20AC (Euro Sign)
'\u20AC',
):
try:
# If Python is set up to use the legacy 'mbcs' in Windows,
# 'replace' error mode is used, and encode() returns b'?'
# for characters missing in the ANSI codepage
if os.fsdecode(os.fsencode(character)) != character:
raise UnicodeError
except UnicodeError:
pass
else:
FS_NONASCII = character
break
# Save the initial cwd
SAVEDCWD = os.getcwd()
# TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
# decoded from the filesystem encoding (in strict mode). It can be None if we
# cannot generate such filename (ex: the latin1 encoding can decode any byte
# sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
# to the surrogateescape error handler (PEP 383), but not from the filesystem
# encoding in strict mode.
TESTFN_UNDECODABLE = None
for name in (
# b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
# accepts it to create a file or a directory, or don't accept to enter to
# such directory (when the bytes name is used). So test b'\xe7' first:
# it is not decodable from cp932.
b'\xe7w\xf0',
# undecodable from ASCII, UTF-8
b'\xff',
# undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
# and cp857
b'\xae\xd5'
# undecodable from UTF-8 (UNIX and Mac OS X)
b'\xed\xb2\x80', b'\xed\xb4\x80',
# undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
# cp1253, cp1254, cp1255, cp1257, cp1258
b'\x81\x98',
):
try:
name.decode(sys.getfilesystemencoding())
except UnicodeDecodeError:
try:
name.decode(sys.getfilesystemencoding(),
sys.getfilesystemencodeerrors())
except UnicodeDecodeError:
continue
TESTFN_UNDECODABLE = os.fsencode(TESTFN_ASCII) + name
break
if FS_NONASCII:
TESTFN_NONASCII = TESTFN_ASCII + FS_NONASCII
else:
TESTFN_NONASCII = None
TESTFN = TESTFN_NONASCII or TESTFN_ASCII
def make_bad_fd():
"""
Create an invalid file descriptor by opening and closing a file and return
its fd.
"""
file = open(TESTFN, "wb")
try:
return file.fileno()
finally:
file.close()
unlink(TESTFN)
_can_symlink = None
def can_symlink():
global _can_symlink
if _can_symlink is not None:
return _can_symlink
# WASI / wasmtime prevents symlinks with absolute paths, see man
# openat2(2) RESOLVE_BENEATH. Almost all symlink tests use absolute
# paths. Skip symlink tests on WASI for now.
src = os.path.abspath(TESTFN)
symlink_path = src + "can_symlink"
try:
os.symlink(src, symlink_path)
can = True
except (OSError, NotImplementedError, AttributeError):
can = False
else:
os.remove(symlink_path)
_can_symlink = can
return can
def skip_unless_symlink(test):
"""Skip decorator for tests that require functional symlink"""
ok = can_symlink()
msg = "Requires functional symlink implementation"
return test if ok else unittest.skip(msg)(test)
_can_xattr = None
def can_xattr():
import tempfile
global _can_xattr
if _can_xattr is not None:
return _can_xattr
if not hasattr(os, "setxattr"):
can = False
else:
import platform
tmp_dir = tempfile.mkdtemp()
tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir)
try:
with open(TESTFN, "wb") as fp:
try:
# TESTFN & tempfile may use different file systems with
# different capabilities
os.setxattr(tmp_fp, b"user.test", b"")
os.setxattr(tmp_name, b"trusted.foo", b"42")
os.setxattr(fp.fileno(), b"user.test", b"")
# Kernels < 2.6.39 don't respect setxattr flags.
kernel_version = platform.release()
m = re.match(r"2.6.(\d{1,2})", kernel_version)
can = m is None or int(m.group(1)) >= 39
except OSError:
can = False
finally:
unlink(TESTFN)
unlink(tmp_name)
rmdir(tmp_dir)
_can_xattr = can
return can
def skip_unless_xattr(test):
"""Skip decorator for tests that require functional extended attributes"""
ok = can_xattr()
msg = "no non-broken extended attribute support"
return test if ok else unittest.skip(msg)(test)
_can_chmod = None
def can_chmod():
global _can_chmod
if _can_chmod is not None:
return _can_chmod
if not hasattr(os, "chown"):
_can_chmod = False
return _can_chmod
try:
with open(TESTFN, "wb") as f:
try:
os.chmod(TESTFN, 0o777)
mode1 = os.stat(TESTFN).st_mode
os.chmod(TESTFN, 0o666)
mode2 = os.stat(TESTFN).st_mode
except OSError as e:
can = False
else:
can = stat.S_IMODE(mode1) != stat.S_IMODE(mode2)
finally:
unlink(TESTFN)
_can_chmod = can
return can
def skip_unless_working_chmod(test):
"""Skip tests that require working os.chmod()
WASI SDK 15.0 cannot change file mode bits.
"""
ok = can_chmod()
msg = "requires working os.chmod()"
return test if ok else unittest.skip(msg)(test)
# Check whether the current effective user has the capability to override
# DAC (discretionary access control). Typically user root is able to
# bypass file read, write, and execute permission checks. The capability
# is independent of the effective user. See capabilities(7).
_can_dac_override = None
def can_dac_override():
global _can_dac_override
if not can_chmod():
_can_dac_override = False
if _can_dac_override is not None:
return _can_dac_override
try:
with open(TESTFN, "wb") as f:
os.chmod(TESTFN, 0o400)
try:
with open(TESTFN, "wb"):
pass
except OSError:
_can_dac_override = False
else:
_can_dac_override = True
finally:
unlink(TESTFN)
return _can_dac_override
def skip_if_dac_override(test):
ok = not can_dac_override()
msg = "incompatible with CAP_DAC_OVERRIDE"
return test if ok else unittest.skip(msg)(test)
def skip_unless_dac_override(test):
ok = can_dac_override()
msg = "requires CAP_DAC_OVERRIDE"
return test if ok else unittest.skip(msg)(test)
def unlink(filename):
try:
_unlink(filename)
except (FileNotFoundError, NotADirectoryError):
pass
if sys.platform.startswith("win"):
def _waitfor(func, pathname, waitall=False):
# Perform the operation
func(pathname)
# Now setup the wait loop
if waitall:
dirname = pathname
else:
dirname, name = os.path.split(pathname)
dirname = dirname or '.'
# Check for `pathname` to be removed from the filesystem.
# The exponential backoff of the timeout amounts to a total
# of ~1 second after which the deletion is probably an error
# anyway.
# Testing on an i7@4.3GHz shows that usually only 1 iteration is
# required when contention occurs.
timeout = 0.001
while timeout < 1.0:
# Note we are only testing for the existence of the file(s) in
# the contents of the directory regardless of any security or
# access rights. If we have made it this far, we have sufficient
# permissions to do that much using Python's equivalent of the
# Windows API FindFirstFile.
# Other Windows APIs can fail or give incorrect results when
# dealing with files that are pending deletion.
L = os.listdir(dirname)
if not (L if waitall else name in L):
return
# Increase the timeout and try again
time.sleep(timeout)
timeout *= 2
warnings.warn('tests may fail, delete still pending for ' + pathname,
RuntimeWarning, stacklevel=4)
def _unlink(filename):
_waitfor(os.unlink, filename)
def _rmdir(dirname):
_waitfor(os.rmdir, dirname)
def _rmtree(path):
from test.support import _force_run
def _rmtree_inner(path):
for name in _force_run(path, os.listdir, path):
fullname = os.path.join(path, name)
try:
mode = os.lstat(fullname).st_mode
except OSError as exc:
print("support.rmtree(): os.lstat(%r) failed with %s"
% (fullname, exc),
file=sys.__stderr__)
mode = 0
if stat.S_ISDIR(mode):
_waitfor(_rmtree_inner, fullname, waitall=True)
_force_run(fullname, os.rmdir, fullname)
else:
_force_run(fullname, os.unlink, fullname)
_waitfor(_rmtree_inner, path, waitall=True)
_waitfor(lambda p: _force_run(p, os.rmdir, p), path)
def _longpath(path):
try:
import ctypes
except ImportError:
# No ctypes means we can't expands paths.
pass
else:
buffer = ctypes.create_unicode_buffer(len(path) * 2)
length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer,
len(buffer))
if length:
return buffer[:length]
return path
else:
_unlink = os.unlink
_rmdir = os.rmdir
def _rmtree(path):
import shutil
try:
shutil.rmtree(path)
return
except OSError:
pass
def _rmtree_inner(path):
from test.support import _force_run
for name in _force_run(path, os.listdir, path):
fullname = os.path.join(path, name)
try:
mode = os.lstat(fullname).st_mode
except OSError:
mode = 0
if stat.S_ISDIR(mode):
_rmtree_inner(fullname)
_force_run(path, os.rmdir, fullname)
else:
_force_run(path, os.unlink, fullname)
_rmtree_inner(path)
os.rmdir(path)
def _longpath(path):
return path
def rmdir(dirname):
try:
_rmdir(dirname)
except FileNotFoundError:
pass
def rmtree(path):
try:
_rmtree(path)
except FileNotFoundError:
pass
@contextlib.contextmanager
def temp_dir(path=None, quiet=False):
"""Return a context manager that creates a temporary directory.
Arguments:
path: the directory to create temporarily. If omitted or None,
defaults to creating a temporary directory using tempfile.mkdtemp.
quiet: if False (the default), the context manager raises an exception
on error. Otherwise, if the path is specified and cannot be
created, only a warning is issued.
"""
import tempfile
dir_created = False
if path is None:
path = tempfile.mkdtemp()
dir_created = True
path = os.path.realpath(path)
else:
try:
os.mkdir(path)
dir_created = True
except OSError as exc:
if not quiet:
raise
warnings.warn(f'tests may fail, unable to create '
f'temporary directory {path!r}: {exc}',
RuntimeWarning, stacklevel=3)
if dir_created:
pid = os.getpid()
try:
yield path
finally:
# In case the process forks, let only the parent remove the
# directory. The child has a different process id. (bpo-30028)
if dir_created and pid == os.getpid():
rmtree(path)
@contextlib.contextmanager
def change_cwd(path, quiet=False):
"""Return a context manager that changes the current working directory.
Arguments:
path: the directory to use as the temporary current working directory.
quiet: if False (the default), the context manager raises an exception
on error. Otherwise, it issues only a warning and keeps the current
working directory the same.
"""
saved_dir = os.getcwd()
try:
os.chdir(os.path.realpath(path))
except OSError as exc:
if not quiet:
raise
warnings.warn(f'tests may fail, unable to change the current working '
f'directory to {path!r}: {exc}',
RuntimeWarning, stacklevel=3)
try:
yield os.getcwd()
finally:
os.chdir(saved_dir)
@contextlib.contextmanager
def temp_cwd(name='tempcwd', quiet=False):
"""
Context manager that temporarily creates and changes the CWD.
The function temporarily changes the current working directory
after creating a temporary directory in the current directory with
name *name*. If *name* is None, the temporary directory is
created using tempfile.mkdtemp.
If *quiet* is False (default) and it is not possible to
create or change the CWD, an error is raised. If *quiet* is True,
only a warning is raised and the original CWD is used.
"""
with temp_dir(path=name, quiet=quiet) as temp_path:
with change_cwd(temp_path, quiet=quiet) as cwd_dir:
yield cwd_dir
def create_empty_file(filename):
"""Create an empty file. If the file already exists, truncate it."""
fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
os.close(fd)
@contextlib.contextmanager
def open_dir_fd(path):
"""Open a file descriptor to a directory."""
assert os.path.isdir(path)
flags = os.O_RDONLY
if hasattr(os, "O_DIRECTORY"):
flags |= os.O_DIRECTORY
dir_fd = os.open(path, flags)
try:
yield dir_fd
finally:
os.close(dir_fd)
def fs_is_case_insensitive(directory):
"""Detects if the file system for the specified directory
is case-insensitive."""
import tempfile
with tempfile.NamedTemporaryFile(dir=directory) as base:
base_path = base.name
case_path = base_path.upper()
if case_path == base_path:
case_path = base_path.lower()
try:
return os.path.samefile(base_path, case_path)
except FileNotFoundError:
return False
class FakePath:
"""Simple implementation of the path protocol.
"""
def __init__(self, path):
self.path = path
def __repr__(self):
return f'<FakePath {self.path!r}>'
def __fspath__(self):
if (isinstance(self.path, BaseException) or
isinstance(self.path, type) and
issubclass(self.path, BaseException)):
raise self.path
else:
return self.path
def fd_count():
"""Count the number of open file descriptors.
"""
if sys.platform.startswith(('linux', 'freebsd', 'emscripten')):
try:
names = os.listdir("/proc/self/fd")
# Subtract one because listdir() internally opens a file
# descriptor to list the content of the /proc/self/fd/ directory.
return len(names) - 1
except FileNotFoundError:
pass
MAXFD = 256
if hasattr(os, 'sysconf'):
try:
MAXFD = os.sysconf("SC_OPEN_MAX")
except OSError:
pass
old_modes = None
if sys.platform == 'win32':
# bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process
# on invalid file descriptor if Python is compiled in debug mode
try:
import msvcrt
msvcrt.CrtSetReportMode
except (AttributeError, ImportError):
# no msvcrt or a release build
pass
else:
old_modes = {}
for report_type in (msvcrt.CRT_WARN,
msvcrt.CRT_ERROR,
msvcrt.CRT_ASSERT):
old_modes[report_type] = msvcrt.CrtSetReportMode(report_type,
0)
try:
count = 0
for fd in range(MAXFD):
try:
# Prefer dup() over fstat(). fstat() can require input/output
# whereas dup() doesn't.
fd2 = os.dup(fd)
except OSError as e:
if e.errno != errno.EBADF:
raise
else:
os.close(fd2)
count += 1
finally:
if old_modes is not None:
for report_type in (msvcrt.CRT_WARN,
msvcrt.CRT_ERROR,
msvcrt.CRT_ASSERT):
msvcrt.CrtSetReportMode(report_type, old_modes[report_type])
return count
if hasattr(os, "umask"):
@contextlib.contextmanager
def temp_umask(umask):
"""Context manager that temporarily sets the process umask."""
oldmask = os.umask(umask)
try:
yield
finally:
os.umask(oldmask)
else:
@contextlib.contextmanager
def temp_umask(umask):
"""no-op on platforms without umask()"""
yield
class EnvironmentVarGuard(collections.abc.MutableMapping):
"""Class to help protect the environment variable properly. Can be used as
a context manager."""
def __init__(self):
self._environ = os.environ
self._changed = {}
def __getitem__(self, envvar):
return self._environ[envvar]
def __setitem__(self, envvar, value):
# Remember the initial value on the first access
if envvar not in self._changed:
self._changed[envvar] = self._environ.get(envvar)
self._environ[envvar] = value
def __delitem__(self, envvar):
# Remember the initial value on the first access
if envvar not in self._changed:
self._changed[envvar] = self._environ.get(envvar)
if envvar in self._environ:
del self._environ[envvar]
def keys(self):
return self._environ.keys()
def __iter__(self):
return iter(self._environ)
def __len__(self):
return len(self._environ)
def set(self, envvar, value):
self[envvar] = value
def unset(self, envvar):
del self[envvar]
def copy(self):
# We do what os.environ.copy() does.
return dict(self)
def __enter__(self):
return self
def __exit__(self, *ignore_exc):
for (k, v) in self._changed.items():
if v is None:
if k in self._environ:
del self._environ[k]
else:
self._environ[k] = v
os.environ = self._environ
try:
import ctypes
kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
ERROR_FILE_NOT_FOUND = 2
DDD_REMOVE_DEFINITION = 2
DDD_EXACT_MATCH_ON_REMOVE = 4
DDD_NO_BROADCAST_SYSTEM = 8
except (ImportError, AttributeError):
def subst_drive(path):
raise unittest.SkipTest('ctypes or kernel32 is not available')
else:
@contextlib.contextmanager
def subst_drive(path):
"""Temporarily yield a substitute drive for a given path."""
for c in reversed(string.ascii_uppercase):
drive = f'{c}:'
if (not kernel32.QueryDosDeviceW(drive, None, 0) and
ctypes.get_last_error() == ERROR_FILE_NOT_FOUND):
break
else:
raise unittest.SkipTest('no available logical drive')
if not kernel32.DefineDosDeviceW(
DDD_NO_BROADCAST_SYSTEM, drive, path):
raise ctypes.WinError(ctypes.get_last_error())
try:
yield drive
finally:
if not kernel32.DefineDosDeviceW(
DDD_REMOVE_DEFINITION | DDD_EXACT_MATCH_ON_REMOVE,
drive, path):
raise ctypes.WinError(ctypes.get_last_error())

View File

@@ -0,0 +1,302 @@
# Common utility functions used by various script execution tests
# e.g. test_cmd_line, test_cmd_line_script and test_runpy
import collections
import importlib
import sys
import os
import os.path
import subprocess
import py_compile
import zipfile
from importlib.util import source_from_cache
from test import support
from test.support.import_helper import make_legacy_pyc
# Cached result of the expensive test performed in the function below.
__cached_interp_requires_environment = None
def interpreter_requires_environment():
"""
Returns True if our sys.executable interpreter requires environment
variables in order to be able to run at all.
This is designed to be used with @unittest.skipIf() to annotate tests
that need to use an assert_python*() function to launch an isolated
mode (-I) or no environment mode (-E) sub-interpreter process.
A normal build & test does not run into this situation but it can happen
when trying to run the standard library test suite from an interpreter that
doesn't have an obvious home with Python's current home finding logic.
Setting PYTHONHOME is one way to get most of the testsuite to run in that
situation. PYTHONPATH or PYTHONUSERSITE are other common environment
variables that might impact whether or not the interpreter can start.
"""
global __cached_interp_requires_environment
if __cached_interp_requires_environment is None:
# If PYTHONHOME is set, assume that we need it
if 'PYTHONHOME' in os.environ:
__cached_interp_requires_environment = True
return True
# cannot run subprocess, assume we don't need it
if not support.has_subprocess_support:
__cached_interp_requires_environment = False
return False
# Try running an interpreter with -E to see if it works or not.
try:
subprocess.check_call([sys.executable, '-E',
'-c', 'import sys; sys.exit(0)'])
except subprocess.CalledProcessError:
__cached_interp_requires_environment = True
else:
__cached_interp_requires_environment = False
return __cached_interp_requires_environment
class _PythonRunResult(collections.namedtuple("_PythonRunResult",
("rc", "out", "err"))):
"""Helper for reporting Python subprocess run results"""
def fail(self, cmd_line):
"""Provide helpful details about failed subcommand runs"""
# Limit to 80 lines to ASCII characters
maxlen = 80 * 100
out, err = self.out, self.err
if len(out) > maxlen:
out = b'(... truncated stdout ...)' + out[-maxlen:]
if len(err) > maxlen:
err = b'(... truncated stderr ...)' + err[-maxlen:]
out = out.decode('ascii', 'replace').rstrip()
err = err.decode('ascii', 'replace').rstrip()
raise AssertionError("Process return code is %d\n"
"command line: %r\n"
"\n"
"stdout:\n"
"---\n"
"%s\n"
"---\n"
"\n"
"stderr:\n"
"---\n"
"%s\n"
"---"
% (self.rc, cmd_line,
out,
err))
# Executing the interpreter in a subprocess
@support.requires_subprocess()
def run_python_until_end(*args, **env_vars):
env_required = interpreter_requires_environment()
cwd = env_vars.pop('__cwd', None)
if '__isolated' in env_vars:
isolated = env_vars.pop('__isolated')
else:
isolated = not env_vars and not env_required
cmd_line = [sys.executable, '-X', 'faulthandler']
if isolated:
# isolated mode: ignore Python environment variables, ignore user
# site-packages, and don't add the current directory to sys.path
cmd_line.append('-I')
elif not env_vars and not env_required:
# ignore Python environment variables
cmd_line.append('-E')
# But a special flag that can be set to override -- in this case, the
# caller is responsible to pass the full environment.
if env_vars.pop('__cleanenv', None):
env = {}
if sys.platform == 'win32':
# Windows requires at least the SYSTEMROOT environment variable to
# start Python.
env['SYSTEMROOT'] = os.environ['SYSTEMROOT']
# Other interesting environment variables, not copied currently:
# COMSPEC, HOME, PATH, TEMP, TMPDIR, TMP.
else:
# Need to preserve the original environment, for in-place testing of
# shared library builds.
env = os.environ.copy()
# set TERM='' unless the TERM environment variable is passed explicitly
# see issues #11390 and #18300
if 'TERM' not in env_vars:
env['TERM'] = ''
env.update(env_vars)
cmd_line.extend(args)
proc = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
env=env, cwd=cwd)
with proc:
try:
out, err = proc.communicate()
finally:
proc.kill()
subprocess._cleanup()
rc = proc.returncode
return _PythonRunResult(rc, out, err), cmd_line
@support.requires_subprocess()
def _assert_python(expected_success, /, *args, **env_vars):
res, cmd_line = run_python_until_end(*args, **env_vars)
if (res.rc and expected_success) or (not res.rc and not expected_success):
res.fail(cmd_line)
return res
def assert_python_ok(*args, **env_vars):
"""
Assert that running the interpreter with `args` and optional environment
variables `env_vars` succeeds (rc == 0) and return a (return code, stdout,
stderr) tuple.
If the __cleanenv keyword is set, env_vars is used as a fresh environment.
Python is started in isolated mode (command line option -I),
except if the __isolated keyword is set to False.
"""
return _assert_python(True, *args, **env_vars)
def assert_python_failure(*args, **env_vars):
"""
Assert that running the interpreter with `args` and optional environment
variables `env_vars` fails (rc != 0) and return a (return code, stdout,
stderr) tuple.
See assert_python_ok() for more options.
"""
return _assert_python(False, *args, **env_vars)
@support.requires_subprocess()
def spawn_python(*args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, **kw):
"""Run a Python subprocess with the given arguments.
kw is extra keyword args to pass to subprocess.Popen. Returns a Popen
object.
"""
cmd_line = [sys.executable]
if not interpreter_requires_environment():
cmd_line.append('-E')
cmd_line.extend(args)
# Under Fedora (?), GNU readline can output junk on stderr when initialized,
# depending on the TERM setting. Setting TERM=vt100 is supposed to disable
# that. References:
# - http://reinout.vanrees.org/weblog/2009/08/14/readline-invisible-character-hack.html
# - http://stackoverflow.com/questions/15760712/python-readline-module-prints-escape-character-during-import
# - http://lists.gnu.org/archive/html/bug-readline/2007-08/msg00004.html
env = kw.setdefault('env', dict(os.environ))
env['TERM'] = 'vt100'
return subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
stdout=stdout, stderr=stderr,
**kw)
def kill_python(p):
"""Run the given Popen process until completion and return stdout."""
p.stdin.close()
data = p.stdout.read()
p.stdout.close()
# try to cleanup the child so we don't appear to leak when running
# with regrtest -R.
p.wait()
subprocess._cleanup()
return data
def make_script(script_dir, script_basename, source, omit_suffix=False):
script_filename = script_basename
if not omit_suffix:
script_filename += os.extsep + 'py'
script_name = os.path.join(script_dir, script_filename)
# The script should be encoded to UTF-8, the default string encoding
with open(script_name, 'w', encoding='utf-8') as script_file:
script_file.write(source)
importlib.invalidate_caches()
return script_name
def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None):
zip_filename = zip_basename+os.extsep+'zip'
zip_name = os.path.join(zip_dir, zip_filename)
with zipfile.ZipFile(zip_name, 'w') as zip_file:
if name_in_zip is None:
parts = script_name.split(os.sep)
if len(parts) >= 2 and parts[-2] == '__pycache__':
legacy_pyc = make_legacy_pyc(source_from_cache(script_name))
name_in_zip = os.path.basename(legacy_pyc)
script_name = legacy_pyc
else:
name_in_zip = os.path.basename(script_name)
zip_file.write(script_name, name_in_zip)
#if test.support.verbose:
# with zipfile.ZipFile(zip_name, 'r') as zip_file:
# print 'Contents of %r:' % zip_name
# zip_file.printdir()
return zip_name, os.path.join(zip_name, name_in_zip)
def make_pkg(pkg_dir, init_source=''):
os.mkdir(pkg_dir)
make_script(pkg_dir, '__init__', init_source)
def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
source, depth=1, compiled=False):
unlink = []
init_name = make_script(zip_dir, '__init__', '')
unlink.append(init_name)
init_basename = os.path.basename(init_name)
script_name = make_script(zip_dir, script_basename, source)
unlink.append(script_name)
if compiled:
init_name = py_compile.compile(init_name, doraise=True)
script_name = py_compile.compile(script_name, doraise=True)
unlink.extend((init_name, script_name))
pkg_names = [os.sep.join([pkg_name]*i) for i in range(1, depth+1)]
script_name_in_zip = os.path.join(pkg_names[-1], os.path.basename(script_name))
zip_filename = zip_basename+os.extsep+'zip'
zip_name = os.path.join(zip_dir, zip_filename)
with zipfile.ZipFile(zip_name, 'w') as zip_file:
for name in pkg_names:
init_name_in_zip = os.path.join(name, init_basename)
zip_file.write(init_name, init_name_in_zip)
zip_file.write(script_name, script_name_in_zip)
for name in unlink:
os.unlink(name)
#if test.support.verbose:
# with zipfile.ZipFile(zip_name, 'r') as zip_file:
# print 'Contents of %r:' % zip_name
# zip_file.printdir()
return zip_name, os.path.join(zip_name, script_name_in_zip)
@support.requires_subprocess()
def run_test_script(script):
# use -u to try to get the full output if the test hangs or crash
if support.verbose:
def title(text):
return f"===== {text} ======"
name = f"script {os.path.basename(script)}"
print()
print(title(name), flush=True)
# In verbose mode, the child process inherit stdout and stdout,
# to see output in realtime and reduce the risk of losing output.
args = [sys.executable, "-E", "-X", "faulthandler", "-u", script, "-v"]
proc = subprocess.run(args)
print(title(f"{name} completed: exit code {proc.returncode}"),
flush=True)
if proc.returncode:
raise AssertionError(f"{name} failed")
else:
assert_python_ok("-u", script, "-v")

View File

@@ -0,0 +1,345 @@
import contextlib
import errno
import os.path
import socket
import sys
import subprocess
import tempfile
import unittest
from .. import support
from . import warnings_helper
HOST = "localhost"
HOSTv4 = "127.0.0.1"
HOSTv6 = "::1"
# WASI SDK 15.0 does not provide gethostname, stub raises OSError ENOTSUP.
has_gethostname = not support.is_wasi
def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
"""Returns an unused port that should be suitable for binding. This is
achieved by creating a temporary socket with the same family and type as
the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
the specified host address (defaults to 0.0.0.0) with the port set to 0,
eliciting an unused ephemeral port from the OS. The temporary socket is
then closed and deleted, and the ephemeral port is returned.
Either this method or bind_port() should be used for any tests where a
server socket needs to be bound to a particular port for the duration of
the test. Which one to use depends on whether the calling code is creating
a python socket, or if an unused port needs to be provided in a constructor
or passed to an external program (i.e. the -accept argument to openssl's
s_server mode). Always prefer bind_port() over find_unused_port() where
possible. Hard coded ports should *NEVER* be used. As soon as a server
socket is bound to a hard coded port, the ability to run multiple instances
of the test simultaneously on the same host is compromised, which makes the
test a ticking time bomb in a buildbot environment. On Unix buildbots, this
may simply manifest as a failed test, which can be recovered from without
intervention in most cases, but on Windows, the entire python process can
completely and utterly wedge, requiring someone to log in to the buildbot
and manually kill the affected process.
(This is easy to reproduce on Windows, unfortunately, and can be traced to
the SO_REUSEADDR socket option having different semantics on Windows versus
Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
listen and then accept connections on identical host/ports. An EADDRINUSE
OSError will be raised at some point (depending on the platform and
the order bind and listen were called on each socket).
However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
will ever be raised when attempting to bind two identical host/ports. When
accept() is called on each socket, the second caller's process will steal
the port from the first caller, leaving them both in an awkwardly wedged
state where they'll no longer respond to any signals or graceful kills, and
must be forcibly killed via OpenProcess()/TerminateProcess().
The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
instead of SO_REUSEADDR, which effectively affords the same semantics as
SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
Source world compared to Windows ones, this is a common mistake. A quick
look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
openssl.exe is called with the 's_server' option, for example. See
http://bugs.python.org/issue2550 for more info. The following site also
has a very thorough description about the implications of both REUSEADDR
and EXCLUSIVEADDRUSE on Windows:
https://learn.microsoft.com/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse
XXX: although this approach is a vast improvement on previous attempts to
elicit unused ports, it rests heavily on the assumption that the ephemeral
port returned to us by the OS won't immediately be dished back out to some
other process when we close and delete our temporary socket but before our
calling code has a chance to bind the returned port. We can deal with this
issue if/when we come across it.
"""
with socket.socket(family, socktype) as tempsock:
port = bind_port(tempsock)
del tempsock
return port
def bind_port(sock, host=HOST):
"""Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the sock.family
is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
or SO_REUSEPORT set on it. Tests should *never* set these socket options
for TCP/IP sockets. The only case for setting these options is testing
multicasting via multiple UDP sockets.
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
on Windows), it will be set on the socket. This will prevent anyone else
from bind()'ing to our host/port for the duration of the test.
"""
if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
if hasattr(socket, 'SO_REUSEADDR'):
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
raise support.TestFailed("tests should never set the "
"SO_REUSEADDR socket option on "
"TCP/IP sockets!")
if hasattr(socket, 'SO_REUSEPORT'):
try:
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
raise support.TestFailed("tests should never set the "
"SO_REUSEPORT socket option on "
"TCP/IP sockets!")
except OSError:
# Python's socket module was compiled using modern headers
# thus defining SO_REUSEPORT but this process is running
# under an older kernel that does not support SO_REUSEPORT.
pass
if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port = sock.getsockname()[1]
return port
def bind_unix_socket(sock, addr):
"""Bind a unix socket, raising SkipTest if PermissionError is raised."""
assert sock.family == socket.AF_UNIX
try:
sock.bind(addr)
except PermissionError:
sock.close()
raise unittest.SkipTest('cannot bind AF_UNIX sockets')
def _is_ipv6_enabled():
"""Check whether IPv6 is enabled on this host."""
if socket.has_ipv6:
sock = None
try:
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.bind((HOSTv6, 0))
return True
except OSError:
pass
finally:
if sock:
sock.close()
return False
IPV6_ENABLED = _is_ipv6_enabled()
_bind_nix_socket_error = None
def skip_unless_bind_unix_socket(test):
"""Decorator for tests requiring a functional bind() for unix sockets."""
if not hasattr(socket, 'AF_UNIX'):
return unittest.skip('No UNIX Sockets')(test)
global _bind_nix_socket_error
if _bind_nix_socket_error is None:
from .os_helper import TESTFN, unlink
path = TESTFN + "can_bind_unix_socket"
with socket.socket(socket.AF_UNIX) as sock:
try:
sock.bind(path)
_bind_nix_socket_error = False
except OSError as e:
_bind_nix_socket_error = e
finally:
unlink(path)
if _bind_nix_socket_error:
msg = 'Requires a functional unix bind(): %s' % _bind_nix_socket_error
return unittest.skip(msg)(test)
else:
return test
def get_socket_conn_refused_errs():
"""
Get the different socket error numbers ('errno') which can be received
when a connection is refused.
"""
errors = [errno.ECONNREFUSED]
if hasattr(errno, 'ENETUNREACH'):
# On Solaris, ENETUNREACH is returned sometimes instead of ECONNREFUSED
errors.append(errno.ENETUNREACH)
if hasattr(errno, 'EADDRNOTAVAIL'):
# bpo-31910: socket.create_connection() fails randomly
# with EADDRNOTAVAIL on Travis CI
errors.append(errno.EADDRNOTAVAIL)
if hasattr(errno, 'EHOSTUNREACH'):
# bpo-37583: The destination host cannot be reached
errors.append(errno.EHOSTUNREACH)
if not IPV6_ENABLED:
errors.append(errno.EAFNOSUPPORT)
return errors
_NOT_SET = object()
@contextlib.contextmanager
def transient_internet(resource_name, *, timeout=_NOT_SET, errnos=()):
"""Return a context manager that raises ResourceDenied when various issues
with the internet connection manifest themselves as exceptions."""
nntplib = warnings_helper.import_deprecated("nntplib")
import urllib.error
if timeout is _NOT_SET:
timeout = support.INTERNET_TIMEOUT
default_errnos = [
('ECONNREFUSED', 111),
('ECONNRESET', 104),
('EHOSTUNREACH', 113),
('ENETUNREACH', 101),
('ETIMEDOUT', 110),
# socket.create_connection() fails randomly with
# EADDRNOTAVAIL on Travis CI.
('EADDRNOTAVAIL', 99),
]
default_gai_errnos = [
('EAI_AGAIN', -3),
('EAI_FAIL', -4),
('EAI_NONAME', -2),
('EAI_NODATA', -5),
# Encountered when trying to resolve IPv6-only hostnames
('WSANO_DATA', 11004),
]
denied = support.ResourceDenied("Resource %r is not available" % resource_name)
captured_errnos = errnos
gai_errnos = []
if not captured_errnos:
captured_errnos = [getattr(errno, name, num)
for (name, num) in default_errnos]
gai_errnos = [getattr(socket, name, num)
for (name, num) in default_gai_errnos]
def filter_error(err):
n = getattr(err, 'errno', None)
if (isinstance(err, TimeoutError) or
(isinstance(err, socket.gaierror) and n in gai_errnos) or
(isinstance(err, urllib.error.HTTPError) and
500 <= err.code <= 599) or
(isinstance(err, urllib.error.URLError) and
(("ConnectionRefusedError" in err.reason) or
("TimeoutError" in err.reason) or
("EOFError" in err.reason))) or
n in captured_errnos):
if not support.verbose:
sys.stderr.write(denied.args[0] + "\n")
raise denied from err
old_timeout = socket.getdefaulttimeout()
try:
if timeout is not None:
socket.setdefaulttimeout(timeout)
yield
except nntplib.NNTPTemporaryError as err:
if support.verbose:
sys.stderr.write(denied.args[0] + "\n")
raise denied from err
except OSError as err:
# urllib can wrap original socket errors multiple times (!), we must
# unwrap to get at the original error.
while True:
a = err.args
if len(a) >= 1 and isinstance(a[0], OSError):
err = a[0]
# The error can also be wrapped as args[1]:
# except socket.error as msg:
# raise OSError('socket error', msg) from msg
elif len(a) >= 2 and isinstance(a[1], OSError):
err = a[1]
else:
break
filter_error(err)
raise
# XXX should we catch generic exceptions and look for their
# __cause__ or __context__?
finally:
socket.setdefaulttimeout(old_timeout)
def create_unix_domain_name():
"""
Create a UNIX domain name: socket.bind() argument of a AF_UNIX socket.
Return a path relative to the current directory to get a short path
(around 27 ASCII characters).
"""
return tempfile.mktemp(prefix="test_python_", suffix='.sock',
dir=os.path.curdir)
# consider that sysctl values should not change while tests are running
_sysctl_cache = {}
def _get_sysctl(name):
"""Get a sysctl value as an integer."""
try:
return _sysctl_cache[name]
except KeyError:
pass
# At least Linux and FreeBSD support the "-n" option
cmd = ['sysctl', '-n', name]
proc = subprocess.run(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True)
if proc.returncode:
support.print_warning(f'{' '.join(cmd)!r} command failed with '
f'exit code {proc.returncode}')
# cache the error to only log the warning once
_sysctl_cache[name] = None
return None
output = proc.stdout
# Parse '0\n' to get '0'
try:
value = int(output.strip())
except Exception as exc:
support.print_warning(f'Failed to parse {' '.join(cmd)!r} '
f'command output {output!r}: {exc!r}')
# cache the error to only log the warning once
_sysctl_cache[name] = None
return None
_sysctl_cache[name] = value
return value
def tcp_blackhole():
if not sys.platform.startswith('freebsd'):
return False
# gh-109015: test if FreeBSD TCP blackhole is enabled
value = _get_sysctl('net.inet.tcp.blackhole')
if value is None:
# don't skip if we fail to get the sysctl value
return False
return (value != 0)
def skip_if_tcp_blackhole(test):
"""Decorator skipping test if TCP blackhole is enabled."""
skip_if = unittest.skipIf(
tcp_blackhole(),
"TCP blackhole is enabled (sysctl net.inet.tcp.blackhole)"
)
return skip_if(test)

View File

@@ -0,0 +1,25 @@
class ExceptionIsLikeMixin:
def assertExceptionIsLike(self, exc, template):
"""
Passes when the provided `exc` matches the structure of `template`.
Individual exceptions don't have to be the same objects or even pass
an equality test: they only need to be the same type and contain equal
`exc_obj.args`.
"""
if exc is None and template is None:
return
if template is None:
self.fail(f"unexpected exception: {exc}")
if exc is None:
self.fail(f"expected an exception like {template!r}, got None")
if not isinstance(exc, ExceptionGroup):
self.assertEqual(exc.__class__, template.__class__)
self.assertEqual(exc.args[0], template.args[0])
else:
self.assertEqual(exc.message, template.message)
self.assertEqual(len(exc.exceptions), len(template.exceptions))
for e, t in zip(exc.exceptions, template.exceptions):
self.assertExceptionIsLike(e, t)

View File

@@ -0,0 +1,191 @@
'''Test runner and result class for the regression test suite.
'''
import functools
import io
import sys
import time
import traceback
import unittest
from test import support
class RegressionTestResult(unittest.TextTestResult):
USE_XML = False
def __init__(self, stream, descriptions, verbosity):
super().__init__(stream=stream, descriptions=descriptions,
verbosity=2 if verbosity else 0)
self.buffer = True
if self.USE_XML:
from xml.etree import ElementTree as ET
from datetime import datetime, UTC
self.__ET = ET
self.__suite = ET.Element('testsuite')
self.__suite.set('start',
datetime.now(UTC)
.replace(tzinfo=None)
.isoformat(' '))
self.__e = None
self.__start_time = None
@classmethod
def __getId(cls, test):
try:
test_id = test.id
except AttributeError:
return str(test)
try:
return test_id()
except TypeError:
return str(test_id)
return repr(test)
def startTest(self, test):
super().startTest(test)
if self.USE_XML:
self.__e = e = self.__ET.SubElement(self.__suite, 'testcase')
self.__start_time = time.perf_counter()
def _add_result(self, test, capture=False, **args):
if not self.USE_XML:
return
e = self.__e
self.__e = None
if e is None:
return
ET = self.__ET
e.set('name', args.pop('name', self.__getId(test)))
e.set('status', args.pop('status', 'run'))
e.set('result', args.pop('result', 'completed'))
if self.__start_time:
e.set('time', f'{time.perf_counter() - self.__start_time:0.6f}')
if capture:
if self._stdout_buffer is not None:
stdout = self._stdout_buffer.getvalue().rstrip()
ET.SubElement(e, 'system-out').text = stdout
if self._stderr_buffer is not None:
stderr = self._stderr_buffer.getvalue().rstrip()
ET.SubElement(e, 'system-err').text = stderr
for k, v in args.items():
if not k or not v:
continue
e2 = ET.SubElement(e, k)
if hasattr(v, 'items'):
for k2, v2 in v.items():
if k2:
e2.set(k2, str(v2))
else:
e2.text = str(v2)
else:
e2.text = str(v)
@classmethod
def __makeErrorDict(cls, err_type, err_value, err_tb):
if isinstance(err_type, type):
if err_type.__module__ == 'builtins':
typename = err_type.__name__
else:
typename = f'{err_type.__module__}.{err_type.__name__}'
else:
typename = repr(err_type)
msg = traceback.format_exception(err_type, err_value, None)
tb = traceback.format_exception(err_type, err_value, err_tb)
return {
'type': typename,
'message': ''.join(msg),
'': ''.join(tb),
}
def addError(self, test, err):
self._add_result(test, True, error=self.__makeErrorDict(*err))
super().addError(test, err)
def addExpectedFailure(self, test, err):
self._add_result(test, True, output=self.__makeErrorDict(*err))
super().addExpectedFailure(test, err)
def addFailure(self, test, err):
self._add_result(test, True, failure=self.__makeErrorDict(*err))
super().addFailure(test, err)
if support.failfast:
self.stop()
def addSkip(self, test, reason):
self._add_result(test, skipped=reason)
super().addSkip(test, reason)
def addSuccess(self, test):
self._add_result(test)
super().addSuccess(test)
def addUnexpectedSuccess(self, test):
self._add_result(test, outcome='UNEXPECTED_SUCCESS')
super().addUnexpectedSuccess(test)
def get_xml_element(self):
if not self.USE_XML:
raise ValueError("USE_XML is false")
e = self.__suite
e.set('tests', str(self.testsRun))
e.set('errors', str(len(self.errors)))
e.set('failures', str(len(self.failures)))
return e
class QuietRegressionTestRunner:
def __init__(self, stream, buffer=False):
self.result = RegressionTestResult(stream, None, 0)
self.result.buffer = buffer
def run(self, test):
test(self.result)
return self.result
def get_test_runner_class(verbosity, buffer=False):
if verbosity:
return functools.partial(unittest.TextTestRunner,
resultclass=RegressionTestResult,
buffer=buffer,
verbosity=verbosity)
return functools.partial(QuietRegressionTestRunner, buffer=buffer)
def get_test_runner(stream, verbosity, capture_output=False):
return get_test_runner_class(verbosity, capture_output)(stream)
if __name__ == '__main__':
import xml.etree.ElementTree as ET
RegressionTestResult.USE_XML = True
class TestTests(unittest.TestCase):
def test_pass(self):
pass
def test_pass_slow(self):
time.sleep(1.0)
def test_fail(self):
print('stdout', file=sys.stdout)
print('stderr', file=sys.stderr)
self.fail('failure message')
def test_error(self):
print('stdout', file=sys.stdout)
print('stderr', file=sys.stderr)
raise RuntimeError('error message')
suite = unittest.TestSuite()
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(TestTests))
stream = io.StringIO()
runner_cls = get_test_runner_class(sum(a == '-v' for a in sys.argv))
runner = runner_cls(sys.stdout)
result = runner.run(suite)
print('Output:', stream.getvalue())
print('XML: ', end='')
for s in ET.tostringlist(result.get_xml_element()):
print(s.decode(), end='')
print()

View File

@@ -0,0 +1,247 @@
import _thread
import contextlib
import functools
import sys
import threading
import time
import unittest
from test import support
#=======================================================================
# Threading support to prevent reporting refleaks when running regrtest.py -R
# NOTE: we use thread._count() rather than threading.enumerate() (or the
# moral equivalent thereof) because a threading.Thread object is still alive
# until its __bootstrap() method has returned, even after it has been
# unregistered from the threading module.
# thread._count(), on the other hand, only gets decremented *after* the
# __bootstrap() method has returned, which gives us reliable reference counts
# at the end of a test run.
def threading_setup():
return _thread._count(), threading._dangling.copy()
def threading_cleanup(*original_values):
_MAX_COUNT = 100
for count in range(_MAX_COUNT):
values = _thread._count(), threading._dangling
if values == original_values:
break
if not count:
# Display a warning at the first iteration
support.environment_altered = True
dangling_threads = values[1]
support.print_warning(f"threading_cleanup() failed to cleanup "
f"{values[0] - original_values[0]} threads "
f"(count: {values[0]}, "
f"dangling: {len(dangling_threads)})")
for thread in dangling_threads:
support.print_warning(f"Dangling thread: {thread!r}")
# Don't hold references to threads
dangling_threads = None
values = None
time.sleep(0.01)
support.gc_collect()
def reap_threads(func):
"""Use this function when threads are being used. This will
ensure that the threads are cleaned up even when the test fails.
"""
@functools.wraps(func)
def decorator(*args):
key = threading_setup()
try:
return func(*args)
finally:
threading_cleanup(*key)
return decorator
@contextlib.contextmanager
def wait_threads_exit(timeout=None):
"""
bpo-31234: Context manager to wait until all threads created in the with
statement exit.
Use _thread.count() to check if threads exited. Indirectly, wait until
threads exit the internal t_bootstrap() C function of the _thread module.
threading_setup() and threading_cleanup() are designed to emit a warning
if a test leaves running threads in the background. This context manager
is designed to cleanup threads started by the _thread.start_new_thread()
which doesn't allow to wait for thread exit, whereas thread.Thread has a
join() method.
"""
if timeout is None:
timeout = support.SHORT_TIMEOUT
old_count = _thread._count()
try:
yield
finally:
start_time = time.monotonic()
for _ in support.sleeping_retry(timeout, error=False):
support.gc_collect()
count = _thread._count()
if count <= old_count:
break
else:
dt = time.monotonic() - start_time
msg = (f"wait_threads() failed to cleanup {count - old_count} "
f"threads after {dt:.1f} seconds "
f"(count: {count}, old count: {old_count})")
raise AssertionError(msg)
def join_thread(thread, timeout=None):
"""Join a thread. Raise an AssertionError if the thread is still alive
after timeout seconds.
"""
if timeout is None:
timeout = support.SHORT_TIMEOUT
thread.join(timeout)
if thread.is_alive():
msg = f"failed to join the thread in {timeout:.1f} seconds"
raise AssertionError(msg)
@contextlib.contextmanager
def start_threads(threads, unlock=None):
try:
import faulthandler
except ImportError:
# It isn't supported on subinterpreters yet.
faulthandler = None
threads = list(threads)
started = []
try:
try:
for t in threads:
t.start()
started.append(t)
except:
if support.verbose:
print("Can't start %d threads, only %d threads started" %
(len(threads), len(started)))
raise
yield
finally:
try:
if unlock:
unlock()
endtime = time.monotonic()
for timeout in range(1, 16):
endtime += 60
for t in started:
t.join(max(endtime - time.monotonic(), 0.01))
started = [t for t in started if t.is_alive()]
if not started:
break
if support.verbose:
print('Unable to join %d threads during a period of '
'%d minutes' % (len(started), timeout))
finally:
started = [t for t in started if t.is_alive()]
if started:
if faulthandler is not None:
faulthandler.dump_traceback(sys.stdout)
raise AssertionError('Unable to join %d threads' % len(started))
class catch_threading_exception:
"""
Context manager catching threading.Thread exception using
threading.excepthook.
Attributes set when an exception is caught:
* exc_type
* exc_value
* exc_traceback
* thread
See threading.excepthook() documentation for these attributes.
These attributes are deleted at the context manager exit.
Usage:
with threading_helper.catch_threading_exception() as cm:
# code spawning a thread which raises an exception
...
# check the thread exception, use cm attributes:
# exc_type, exc_value, exc_traceback, thread
...
# exc_type, exc_value, exc_traceback, thread attributes of cm no longer
# exists at this point
# (to avoid reference cycles)
"""
def __init__(self):
self.exc_type = None
self.exc_value = None
self.exc_traceback = None
self.thread = None
self._old_hook = None
def _hook(self, args):
self.exc_type = args.exc_type
self.exc_value = args.exc_value
self.exc_traceback = args.exc_traceback
self.thread = args.thread
def __enter__(self):
self._old_hook = threading.excepthook
threading.excepthook = self._hook
return self
def __exit__(self, *exc_info):
threading.excepthook = self._old_hook
del self.exc_type
del self.exc_value
del self.exc_traceback
del self.thread
def _can_start_thread() -> bool:
"""Detect whether Python can start new threads.
Some WebAssembly platforms do not provide a working pthread
implementation. Thread support is stubbed and any attempt
to create a new thread fails.
- wasm32-wasi does not have threading.
- wasm32-emscripten can be compiled with or without pthread
support (-s USE_PTHREADS / __EMSCRIPTEN_PTHREADS__).
"""
if sys.platform == "emscripten":
return sys._emscripten_info.pthreads
elif sys.platform == "wasi":
return False
else:
# assume all other platforms have working thread support.
return True
can_start_thread = _can_start_thread()
def requires_working_threading(*, module=False):
"""Skip tests or modules that require working threading.
Can be used as a function/class decorator or to skip an entire module.
"""
msg = "requires threading support"
if module:
if not can_start_thread:
raise unittest.SkipTest(msg)
else:
return unittest.skipUnless(can_start_thread, msg)

View File

@@ -0,0 +1,207 @@
import contextlib
import functools
import importlib
import re
import sys
import warnings
def import_deprecated(name):
"""Import *name* while suppressing DeprecationWarning."""
with warnings.catch_warnings():
warnings.simplefilter('ignore', category=DeprecationWarning)
return importlib.import_module(name)
def check_syntax_warning(testcase, statement, errtext='',
*, lineno=1, offset=None):
# Test also that a warning is emitted only once.
from test.support import check_syntax_error
with warnings.catch_warnings(record=True) as warns:
warnings.simplefilter('always', SyntaxWarning)
compile(statement, '<testcase>', 'exec')
testcase.assertEqual(len(warns), 1, warns)
warn, = warns
testcase.assertTrue(issubclass(warn.category, SyntaxWarning),
warn.category)
if errtext:
testcase.assertRegex(str(warn.message), errtext)
testcase.assertEqual(warn.filename, '<testcase>')
testcase.assertIsNotNone(warn.lineno)
if lineno is not None:
testcase.assertEqual(warn.lineno, lineno)
# SyntaxWarning should be converted to SyntaxError when raised,
# since the latter contains more information and provides better
# error report.
with warnings.catch_warnings(record=True) as warns:
warnings.simplefilter('error', SyntaxWarning)
check_syntax_error(testcase, statement, errtext,
lineno=lineno, offset=offset)
# No warnings are leaked when a SyntaxError is raised.
testcase.assertEqual(warns, [])
def ignore_warnings(*, category):
"""Decorator to suppress warnings.
Use of context managers to hide warnings make diffs
more noisy and tools like 'git blame' less useful.
"""
def decorator(test):
@functools.wraps(test)
def wrapper(self, *args, **kwargs):
with warnings.catch_warnings():
warnings.simplefilter('ignore', category=category)
return test(self, *args, **kwargs)
return wrapper
return decorator
class WarningsRecorder(object):
"""Convenience wrapper for the warnings list returned on
entry to the warnings.catch_warnings() context manager.
"""
def __init__(self, warnings_list):
self._warnings = warnings_list
self._last = 0
def __getattr__(self, attr):
if len(self._warnings) > self._last:
return getattr(self._warnings[-1], attr)
elif attr in warnings.WarningMessage._WARNING_DETAILS:
return None
raise AttributeError("%r has no attribute %r" % (self, attr))
@property
def warnings(self):
return self._warnings[self._last:]
def reset(self):
self._last = len(self._warnings)
@contextlib.contextmanager
def check_warnings(*filters, **kwargs):
"""Context manager to silence warnings.
Accept 2-tuples as positional arguments:
("message regexp", WarningCategory)
Optional argument:
- if 'quiet' is True, it does not fail if a filter catches nothing
(default True without argument,
default False if some filters are defined)
Without argument, it defaults to:
check_warnings(("", Warning), quiet=True)
"""
quiet = kwargs.get('quiet')
if not filters:
filters = (("", Warning),)
# Preserve backward compatibility
if quiet is None:
quiet = True
return _filterwarnings(filters, quiet)
@contextlib.contextmanager
def check_no_warnings(testcase, message='', category=Warning, force_gc=False):
"""Context manager to check that no warnings are emitted.
This context manager enables a given warning within its scope
and checks that no warnings are emitted even with that warning
enabled.
If force_gc is True, a garbage collection is attempted before checking
for warnings. This may help to catch warnings emitted when objects
are deleted, such as ResourceWarning.
Other keyword arguments are passed to warnings.filterwarnings().
"""
from test.support import gc_collect
with warnings.catch_warnings(record=True) as warns:
warnings.filterwarnings('always',
message=message,
category=category)
yield
if force_gc:
gc_collect()
testcase.assertEqual(warns, [])
@contextlib.contextmanager
def check_no_resource_warning(testcase):
"""Context manager to check that no ResourceWarning is emitted.
Usage:
with check_no_resource_warning(self):
f = open(...)
...
del f
You must remove the object which may emit ResourceWarning before
the end of the context manager.
"""
with check_no_warnings(testcase, category=ResourceWarning, force_gc=True):
yield
def _filterwarnings(filters, quiet=False):
"""Catch the warnings, then check if all the expected
warnings have been raised and re-raise unexpected warnings.
If 'quiet' is True, only re-raise the unexpected warnings.
"""
# Clear the warning registry of the calling module
# in order to re-raise the warnings.
frame = sys._getframe(2)
registry = frame.f_globals.get('__warningregistry__')
if registry:
registry.clear()
with warnings.catch_warnings(record=True) as w:
# Set filter "always" to record all warnings. Because
# test_warnings swap the module, we need to look up in
# the sys.modules dictionary.
sys.modules['warnings'].simplefilter("always")
yield WarningsRecorder(w)
# Filter the recorded warnings
reraise = list(w)
missing = []
for msg, cat in filters:
seen = False
for w in reraise[:]:
warning = w.message
# Filter out the matching messages
if (re.match(msg, str(warning), re.I) and
issubclass(warning.__class__, cat)):
seen = True
reraise.remove(w)
if not seen and not quiet:
# This filter caught nothing
missing.append((msg, cat.__name__))
if reraise:
raise AssertionError("unhandled warning %s" % reraise[0])
if missing:
raise AssertionError("filter (%r, %s) did not catch any warning" %
missing[0])
@contextlib.contextmanager
def save_restore_warnings_filters():
old_filters = warnings.filters[:]
try:
yield
finally:
warnings.filters[:] = old_filters
def _warn_about_deprecation():
warnings.warn(
"This is used in test_support test to ensure"
" support.ignore_deprecations_from() works as expected."
" You should not be seeing this.",
DeprecationWarning,
stacklevel=0,
)