This commit is contained in:
ton
2024-10-07 10:13:40 +07:00
parent aa1631742f
commit 3a7d696db6
9729 changed files with 1832837 additions and 161742 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1,15 +1,26 @@
import os
import subprocess
from __future__ import annotations
import contextlib
import functools
import tempfile
import shutil
import operator
import os
import shutil
import subprocess
import sys
import tempfile
import urllib.request
import warnings
from typing import Iterator
if sys.version_info < (3, 12):
from backports import tarfile
else:
import tarfile
@contextlib.contextmanager
def pushd(dir):
def pushd(dir: str | os.PathLike) -> Iterator[str | os.PathLike]:
"""
>>> tmp_path = getfixture('tmp_path')
>>> with pushd(tmp_path):
@@ -26,33 +37,88 @@ def pushd(dir):
@contextlib.contextmanager
def tarball_context(url, target_dir=None, runner=None, pushd=pushd):
def tarball(
url, target_dir: str | os.PathLike | None = None
) -> Iterator[str | os.PathLike]:
"""
Get a tarball, extract it, change to that directory, yield, then
clean up.
`runner` is the function to invoke commands.
`pushd` is a context manager for changing the directory.
Get a tarball, extract it, yield, then clean up.
>>> import urllib.request
>>> url = getfixture('tarfile_served')
>>> target = getfixture('tmp_path') / 'out'
>>> tb = tarball(url, target_dir=target)
>>> import pathlib
>>> with tb as extracted:
... contents = pathlib.Path(extracted, 'contents.txt').read_text(encoding='utf-8')
>>> assert not os.path.exists(extracted)
"""
if target_dir is None:
target_dir = os.path.basename(url).replace('.tar.gz', '').replace('.tgz', '')
if runner is None:
runner = functools.partial(subprocess.check_call, shell=True)
else:
warnings.warn("runner parameter is deprecated", DeprecationWarning)
# In the tar command, use --strip-components=1 to strip the first path and
# then
# use -C to cause the files to be extracted to {target_dir}. This ensures
# that we always know where the files were extracted.
runner('mkdir {target_dir}'.format(**vars()))
os.mkdir(target_dir)
try:
getter = 'wget {url} -O -'
extract = 'tar x{compression} --strip-components=1 -C {target_dir}'
cmd = ' | '.join((getter, extract))
runner(cmd.format(compression=infer_compression(url), **vars()))
with pushd(target_dir):
yield target_dir
req = urllib.request.urlopen(url)
with tarfile.open(fileobj=req, mode='r|*') as tf:
tf.extractall(path=target_dir, filter=strip_first_component)
yield target_dir
finally:
runner('rm -Rf {target_dir}'.format(**vars()))
shutil.rmtree(target_dir)
def strip_first_component(
member: tarfile.TarInfo,
path,
) -> tarfile.TarInfo:
_, member.name = member.name.split('/', 1)
return member
def _compose(*cmgrs):
"""
Compose any number of dependent context managers into a single one.
The last, innermost context manager may take arbitrary arguments, but
each successive context manager should accept the result from the
previous as a single parameter.
Like :func:`jaraco.functools.compose`, behavior works from right to
left, so the context manager should be indicated from outermost to
innermost.
Example, to create a context manager to change to a temporary
directory:
>>> temp_dir_as_cwd = _compose(pushd, temp_dir)
>>> with temp_dir_as_cwd() as dir:
... assert os.path.samefile(os.getcwd(), dir)
"""
def compose_two(inner, outer):
def composed(*args, **kwargs):
with inner(*args, **kwargs) as saved, outer(saved) as res:
yield res
return contextlib.contextmanager(composed)
return functools.reduce(compose_two, reversed(cmgrs))
tarball_cwd = _compose(pushd, tarball)
@contextlib.contextmanager
def tarball_context(*args, **kwargs):
warnings.warn(
"tarball_context is deprecated. Use tarball or tarball_cwd instead.",
DeprecationWarning,
stacklevel=2,
)
pushd_ctx = kwargs.pop('pushd', pushd)
with tarball(*args, **kwargs) as tball, pushd_ctx(tball) as dir:
yield dir
def infer_compression(url):
@@ -68,6 +134,11 @@ def infer_compression(url):
>>> infer_compression('file.xz')
'J'
"""
warnings.warn(
"infer_compression is deprecated with no replacement",
DeprecationWarning,
stacklevel=2,
)
# cheat and just assume it's the last two characters
compression_indicator = url[-2:]
mapping = dict(gz='z', bz='j', xz='J')
@@ -84,7 +155,7 @@ def temp_dir(remover=shutil.rmtree):
>>> import pathlib
>>> with temp_dir() as the_dir:
... assert os.path.isdir(the_dir)
... _ = pathlib.Path(the_dir).joinpath('somefile').write_text('contents')
... _ = pathlib.Path(the_dir).joinpath('somefile').write_text('contents', encoding='utf-8')
>>> assert not os.path.exists(the_dir)
"""
temp_dir = tempfile.mkdtemp()
@@ -113,15 +184,23 @@ def repo_context(url, branch=None, quiet=True, dest_ctx=temp_dir):
yield repo_dir
@contextlib.contextmanager
def null():
"""
A null context suitable to stand in for a meaningful context.
>>> with null() as value:
... assert value is None
This context is most useful when dealing with two or more code
branches but only some need a context. Wrap the others in a null
context to provide symmetry across all options.
"""
yield
warnings.warn(
"null is deprecated. Use contextlib.nullcontext",
DeprecationWarning,
stacklevel=2,
)
return contextlib.nullcontext()
class ExceptionTrap:
@@ -267,13 +346,7 @@ class on_interrupt(contextlib.ContextDecorator):
... on_interrupt('ignore')(do_interrupt)()
"""
def __init__(
self,
action='error',
# py3.7 compat
# /,
code=1,
):
def __init__(self, action='error', /, code=1):
self.action = action
self.code = code

View File

@@ -1,17 +1,13 @@
import collections.abc
import functools
import time
import inspect
import collections
import types
import itertools
import operator
import time
import types
import warnings
import setuptools.extern.more_itertools
from typing import Callable, TypeVar
CallableT = TypeVar("CallableT", bound=Callable[..., object])
import more_itertools
def compose(*funcs):
@@ -38,24 +34,6 @@ def compose(*funcs):
return functools.reduce(compose_two, funcs)
def method_caller(method_name, *args, **kwargs):
"""
Return a function that will call a named method on the
target object with optional positional and keyword
arguments.
>>> lower = method_caller('lower')
>>> lower('MyString')
'mystring'
"""
def call_method(target):
func = getattr(target, method_name)
return func(*args, **kwargs)
return call_method
def once(func):
"""
Decorate func so it's only ever called the first time.
@@ -98,12 +76,7 @@ def once(func):
return wrapper
def method_cache(
method: CallableT,
cache_wrapper: Callable[
[CallableT], CallableT
] = functools.lru_cache(), # type: ignore[assignment]
) -> CallableT:
def method_cache(method, cache_wrapper=functools.lru_cache()):
"""
Wrap lru_cache to support storing the cache data in the object instances.
@@ -171,21 +144,17 @@ def method_cache(
for another implementation and additional justification.
"""
def wrapper(self: object, *args: object, **kwargs: object) -> object:
def wrapper(self, *args, **kwargs):
# it's the first call, replace the method with a cached, bound method
bound_method: CallableT = types.MethodType( # type: ignore[assignment]
method, self
)
bound_method = types.MethodType(method, self)
cached_method = cache_wrapper(bound_method)
setattr(self, method.__name__, cached_method)
return cached_method(*args, **kwargs)
# Support cache clear even before cache has been created.
wrapper.cache_clear = lambda: None # type: ignore[attr-defined]
wrapper.cache_clear = lambda: None
return ( # type: ignore[return-value]
_special_method_cache(method, cache_wrapper) or wrapper
)
return _special_method_cache(method, cache_wrapper) or wrapper
def _special_method_cache(method, cache_wrapper):
@@ -201,12 +170,13 @@ def _special_method_cache(method, cache_wrapper):
"""
name = method.__name__
special_names = '__getattr__', '__getitem__'
if name not in special_names:
return
return None
wrapper_name = '__cached' + name
def proxy(self, *args, **kwargs):
def proxy(self, /, *args, **kwargs):
if wrapper_name not in vars(self):
bound = types.MethodType(method, self)
cache = cache_wrapper(bound)
@@ -243,7 +213,7 @@ def result_invoke(action):
r"""
Decorate a function with an action function that is
invoked on the results returned from the decorated
function (for its side-effect), then return the original
function (for its side effect), then return the original
result.
>>> @result_invoke(print)
@@ -267,7 +237,7 @@ def result_invoke(action):
return wrap
def invoke(f, *args, **kwargs):
def invoke(f, /, *args, **kwargs):
"""
Call a function for its side effect after initialization.
@@ -302,25 +272,15 @@ def invoke(f, *args, **kwargs):
Use functools.partial to pass parameters to the initial call
>>> @functools.partial(invoke, name='bingo')
... def func(name): print("called with", name)
... def func(name): print('called with', name)
called with bingo
"""
f(*args, **kwargs)
return f
def call_aside(*args, **kwargs):
"""
Deprecated name for invoke.
"""
warnings.warn("call_aside is deprecated, use invoke", DeprecationWarning)
return invoke(*args, **kwargs)
class Throttler:
"""
Rate-limit a function (or other callable)
"""
"""Rate-limit a function (or other callable)."""
def __init__(self, func, max_rate=float('Inf')):
if isinstance(func, Throttler):
@@ -337,20 +297,20 @@ class Throttler:
return self.func(*args, **kwargs)
def _wait(self):
"ensure at least 1/max_rate seconds from last call"
"""Ensure at least 1/max_rate seconds from last call."""
elapsed = time.time() - self.last_called
must_wait = 1 / self.max_rate - elapsed
time.sleep(max(0, must_wait))
self.last_called = time.time()
def __get__(self, obj, type=None):
def __get__(self, obj, owner=None):
return first_invoke(self._wait, functools.partial(self.func, obj))
def first_invoke(func1, func2):
"""
Return a function that when invoked will invoke func1 without
any parameters (for its side-effect) and then invoke func2
any parameters (for its side effect) and then invoke func2
with whatever parameters were passed, returning its result.
"""
@@ -361,6 +321,17 @@ def first_invoke(func1, func2):
return wrapper
method_caller = first_invoke(
lambda: warnings.warn(
'`jaraco.functools.method_caller` is deprecated, '
'use `operator.methodcaller` instead',
DeprecationWarning,
stacklevel=3,
),
operator.methodcaller,
)
def retry_call(func, cleanup=lambda: None, retries=0, trap=()):
"""
Given a callable func, trap the indicated exceptions
@@ -369,7 +340,7 @@ def retry_call(func, cleanup=lambda: None, retries=0, trap=()):
to propagate.
"""
attempts = itertools.count() if retries == float('inf') else range(retries)
for attempt in attempts:
for _ in attempts:
try:
return func()
except trap:
@@ -406,7 +377,7 @@ def retry(*r_args, **r_kwargs):
def print_yielded(func):
"""
Convert a generator into a function that prints all yielded elements
Convert a generator into a function that prints all yielded elements.
>>> @print_yielded
... def x():
@@ -422,7 +393,7 @@ def print_yielded(func):
def pass_none(func):
"""
Wrap func so it's not called if its first param is None
Wrap func so it's not called if its first param is None.
>>> print_text = pass_none(print)
>>> print_text('text')
@@ -431,9 +402,10 @@ def pass_none(func):
"""
@functools.wraps(func)
def wrapper(param, *args, **kwargs):
def wrapper(param, /, *args, **kwargs):
if param is not None:
return func(param, *args, **kwargs)
return None
return wrapper
@@ -507,7 +479,7 @@ def save_method_args(method):
args_and_kwargs = collections.namedtuple('args_and_kwargs', 'args kwargs')
@functools.wraps(method)
def wrapper(self, *args, **kwargs):
def wrapper(self, /, *args, **kwargs):
attr_name = '_saved_' + method.__name__
attr = args_and_kwargs(args, kwargs)
setattr(self, attr_name, attr)
@@ -554,3 +526,108 @@ def except_(*exceptions, replace=None, use=None):
return wrapper
return decorate
def identity(x):
"""
Return the argument.
>>> o = object()
>>> identity(o) is o
True
"""
return x
def bypass_when(check, *, _op=identity):
"""
Decorate a function to return its parameter when ``check``.
>>> bypassed = [] # False
>>> @bypass_when(bypassed)
... def double(x):
... return x * 2
>>> double(2)
4
>>> bypassed[:] = [object()] # True
>>> double(2)
2
"""
def decorate(func):
@functools.wraps(func)
def wrapper(param, /):
return param if _op(check) else func(param)
return wrapper
return decorate
def bypass_unless(check):
"""
Decorate a function to return its parameter unless ``check``.
>>> enabled = [object()] # True
>>> @bypass_unless(enabled)
... def double(x):
... return x * 2
>>> double(2)
4
>>> del enabled[:] # False
>>> double(2)
2
"""
return bypass_when(check, _op=operator.not_)
@functools.singledispatch
def _splat_inner(args, func):
"""Splat args to func."""
return func(*args)
@_splat_inner.register
def _(args: collections.abc.Mapping, func):
"""Splat kargs to func as kwargs."""
return func(**args)
def splat(func):
"""
Wrap func to expect its parameters to be passed positionally in a tuple.
Has a similar effect to that of ``itertools.starmap`` over
simple ``map``.
>>> pairs = [(-1, 1), (0, 2)]
>>> more_itertools.consume(itertools.starmap(print, pairs))
-1 1
0 2
>>> more_itertools.consume(map(splat(print), pairs))
-1 1
0 2
The approach generalizes to other iterators that don't have a "star"
equivalent, such as a "starfilter".
>>> list(filter(splat(operator.add), pairs))
[(0, 2)]
Splat also accepts a mapping argument.
>>> def is_nice(msg, code):
... return "smile" in msg or code == 0
>>> msgs = [
... dict(msg='smile!', code=20),
... dict(msg='error :(', code=1),
... dict(msg='unknown', code=0),
... ]
>>> for msg in filter(splat(is_nice), msgs):
... print(msg)
{'msg': 'smile!', 'code': 20}
{'msg': 'unknown', 'code': 0}
"""
return functools.wraps(func)(functools.partial(_splat_inner, func=func))

View File

@@ -0,0 +1,125 @@
from collections.abc import Callable, Hashable, Iterator
from functools import partial
from operator import methodcaller
import sys
from typing import (
Any,
Generic,
Protocol,
TypeVar,
overload,
)
if sys.version_info >= (3, 10):
from typing import Concatenate, ParamSpec
else:
from typing_extensions import Concatenate, ParamSpec
_P = ParamSpec('_P')
_R = TypeVar('_R')
_T = TypeVar('_T')
_R1 = TypeVar('_R1')
_R2 = TypeVar('_R2')
_V = TypeVar('_V')
_S = TypeVar('_S')
_R_co = TypeVar('_R_co', covariant=True)
class _OnceCallable(Protocol[_P, _R]):
saved_result: _R
reset: Callable[[], None]
def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R: ...
class _ProxyMethodCacheWrapper(Protocol[_R_co]):
cache_clear: Callable[[], None]
def __call__(self, *args: Hashable, **kwargs: Hashable) -> _R_co: ...
class _MethodCacheWrapper(Protocol[_R_co]):
def cache_clear(self) -> None: ...
def __call__(self, *args: Hashable, **kwargs: Hashable) -> _R_co: ...
# `compose()` overloads below will cover most use cases.
@overload
def compose(
__func1: Callable[[_R], _T],
__func2: Callable[_P, _R],
/,
) -> Callable[_P, _T]: ...
@overload
def compose(
__func1: Callable[[_R], _T],
__func2: Callable[[_R1], _R],
__func3: Callable[_P, _R1],
/,
) -> Callable[_P, _T]: ...
@overload
def compose(
__func1: Callable[[_R], _T],
__func2: Callable[[_R2], _R],
__func3: Callable[[_R1], _R2],
__func4: Callable[_P, _R1],
/,
) -> Callable[_P, _T]: ...
def once(func: Callable[_P, _R]) -> _OnceCallable[_P, _R]: ...
def method_cache(
method: Callable[..., _R],
cache_wrapper: Callable[[Callable[..., _R]], _MethodCacheWrapper[_R]] = ...,
) -> _MethodCacheWrapper[_R] | _ProxyMethodCacheWrapper[_R]: ...
def apply(
transform: Callable[[_R], _T]
) -> Callable[[Callable[_P, _R]], Callable[_P, _T]]: ...
def result_invoke(
action: Callable[[_R], Any]
) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]: ...
def invoke(
f: Callable[_P, _R], /, *args: _P.args, **kwargs: _P.kwargs
) -> Callable[_P, _R]: ...
class Throttler(Generic[_R]):
last_called: float
func: Callable[..., _R]
max_rate: float
def __init__(
self, func: Callable[..., _R] | Throttler[_R], max_rate: float = ...
) -> None: ...
def reset(self) -> None: ...
def __call__(self, *args: Any, **kwargs: Any) -> _R: ...
def __get__(self, obj: Any, owner: type[Any] | None = ...) -> Callable[..., _R]: ...
def first_invoke(
func1: Callable[..., Any], func2: Callable[_P, _R]
) -> Callable[_P, _R]: ...
method_caller: Callable[..., methodcaller]
def retry_call(
func: Callable[..., _R],
cleanup: Callable[..., None] = ...,
retries: int | float = ...,
trap: type[BaseException] | tuple[type[BaseException], ...] = ...,
) -> _R: ...
def retry(
cleanup: Callable[..., None] = ...,
retries: int | float = ...,
trap: type[BaseException] | tuple[type[BaseException], ...] = ...,
) -> Callable[[Callable[..., _R]], Callable[..., _R]]: ...
def print_yielded(func: Callable[_P, Iterator[Any]]) -> Callable[_P, None]: ...
def pass_none(
func: Callable[Concatenate[_T, _P], _R]
) -> Callable[Concatenate[_T, _P], _R]: ...
def assign_params(
func: Callable[..., _R], namespace: dict[str, Any]
) -> partial[_R]: ...
def save_method_args(
method: Callable[Concatenate[_S, _P], _R]
) -> Callable[Concatenate[_S, _P], _R]: ...
def except_(
*exceptions: type[BaseException], replace: Any = ..., use: Any = ...
) -> Callable[[Callable[_P, Any]], Callable[_P, Any]]: ...
def identity(x: _T) -> _T: ...
def bypass_when(
check: _V, *, _op: Callable[[_V], Any] = ...
) -> Callable[[Callable[[_T], _R]], Callable[[_T], _T | _R]]: ...
def bypass_unless(
check: Any,
) -> Callable[[Callable[[_T], _R]], Callable[[_T], _T | _R]]: ...

View File

@@ -0,0 +1,2 @@
Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.
Curabitur pretium tincidunt lacus. Nulla gravida orci a odio. Nullam varius, turpis et commodo pharetra, est eros bibendum elit, nec luctus magna felis sollicitudin mauris. Integer in mauris eu nibh euismod gravida. Duis ac tellus et risus vulputate vehicula. Donec lobortis risus a elit. Etiam tempor. Ut ullamcorper, ligula eu tempor congue, eros est euismod turpis, id tincidunt sapien risus a quam. Maecenas fermentum consequat mi. Donec fermentum. Pellentesque malesuada nulla a mi. Duis sapien sem, aliquet nec, commodo eget, consequat quis, neque. Aliquam faucibus, elit ut dictum aliquet, felis nisl adipiscing sapien, sed malesuada diam lacus eget erat. Cras mollis scelerisque nunc. Nullam arcu. Aliquam consequat. Curabitur augue lorem, dapibus quis, laoreet et, pretium ac, nisi. Aenean magna nisl, mollis quis, molestie eu, feugiat in, orci. In hac habitasse platea dictumst.

View File

@@ -6,10 +6,10 @@ import functools
try:
from importlib.resources import files # type: ignore
except ImportError: # pragma: nocover
from setuptools.extern.importlib_resources import files # type: ignore
from importlib_resources import files # type: ignore
from setuptools.extern.jaraco.functools import compose, method_cache
from setuptools.extern.jaraco.context import ExceptionTrap
from jaraco.functools import compose, method_cache
from jaraco.context import ExceptionTrap
def substitution(old, new):
@@ -66,7 +66,7 @@ class FoldedCase(str):
>>> s in ["Hello World"]
True
You may test for set inclusion, but candidate and elements
Allows testing for set inclusion, but candidate and elements
must both be folded.
>>> FoldedCase("Hello World") in {s}
@@ -92,37 +92,40 @@ class FoldedCase(str):
>>> FoldedCase('hello') > FoldedCase('Hello')
False
>>> FoldedCase('ß') == FoldedCase('ss')
True
"""
def __lt__(self, other):
return self.lower() < other.lower()
return self.casefold() < other.casefold()
def __gt__(self, other):
return self.lower() > other.lower()
return self.casefold() > other.casefold()
def __eq__(self, other):
return self.lower() == other.lower()
return self.casefold() == other.casefold()
def __ne__(self, other):
return self.lower() != other.lower()
return self.casefold() != other.casefold()
def __hash__(self):
return hash(self.lower())
return hash(self.casefold())
def __contains__(self, other):
return super().lower().__contains__(other.lower())
return super().casefold().__contains__(other.casefold())
def in_(self, other):
"Does self appear in other?"
return self in FoldedCase(other)
# cache lower since it's likely to be called frequently.
# cache casefold since it's likely to be called frequently.
@method_cache
def lower(self):
return super().lower()
def casefold(self):
return super().casefold()
def index(self, sub):
return self.lower().index(sub.lower())
return self.casefold().index(sub.casefold())
def split(self, splitter=' ', maxsplit=0):
pattern = re.compile(re.escape(splitter), re.I)
@@ -224,9 +227,12 @@ def unwrap(s):
return '\n'.join(cleaned)
lorem_ipsum: str = (
files(__name__).joinpath('Lorem ipsum.txt').read_text(encoding='utf-8')
)
class Splitter(object):
class Splitter:
"""object that will split a string with the given arguments for each call
>>> s = Splitter(',')
@@ -276,7 +282,7 @@ class WordSet(tuple):
>>> WordSet.parse("myABCClass")
('my', 'ABC', 'Class')
The result is a WordSet, so you can get the form you need.
The result is a WordSet, providing access to other forms.
>>> WordSet.parse("myABCClass").underscore_separated()
'my_ABC_Class'
@@ -363,7 +369,7 @@ class WordSet(tuple):
return self.trim_left(item).trim_right(item)
def __getitem__(self, item):
result = super(WordSet, self).__getitem__(item)
result = super().__getitem__(item)
if isinstance(item, slice):
result = WordSet(result)
return result
@@ -578,7 +584,7 @@ def join_continuation(lines):
['foobarbaz']
Not sure why, but...
The character preceeding the backslash is also elided.
The character preceding the backslash is also elided.
>>> list(join_continuation(['goo\\', 'dly']))
['godly']
@@ -597,3 +603,22 @@ def join_continuation(lines):
except StopIteration:
return
yield item
def read_newlines(filename, limit=1024):
r"""
>>> tmp_path = getfixture('tmp_path')
>>> filename = tmp_path / 'out.txt'
>>> _ = filename.write_text('foo\n', newline='', encoding='utf-8')
>>> read_newlines(filename)
'\n'
>>> _ = filename.write_text('foo\r\n', newline='', encoding='utf-8')
>>> read_newlines(filename)
'\r\n'
>>> _ = filename.write_text('foo\r\nbar\nbing\r', newline='', encoding='utf-8')
>>> read_newlines(filename)
('\r', '\n', '\r\n')
"""
with open(filename, encoding='utf-8') as fp:
fp.read(limit)
return fp.newlines

View File

@@ -0,0 +1,25 @@
qwerty = "-=qwertyuiop[]asdfghjkl;'zxcvbnm,./_+QWERTYUIOP{}ASDFGHJKL:\"ZXCVBNM<>?"
dvorak = "[]',.pyfgcrl/=aoeuidhtns-;qjkxbmwvz{}\"<>PYFGCRL?+AOEUIDHTNS_:QJKXBMWVZ"
to_dvorak = str.maketrans(qwerty, dvorak)
to_qwerty = str.maketrans(dvorak, qwerty)
def translate(input, translation):
"""
>>> translate('dvorak', to_dvorak)
'ekrpat'
>>> translate('qwerty', to_qwerty)
'x,dokt'
"""
return input.translate(translation)
def _translate_stream(stream, translation):
"""
>>> import io
>>> _translate_stream(io.StringIO('foo'), to_dvorak)
urr
"""
print(translate(stream.read(), translation))

View File

@@ -0,0 +1,33 @@
import autocommand
import inflect
from more_itertools import always_iterable
import jaraco.text
def report_newlines(filename):
r"""
Report the newlines in the indicated file.
>>> tmp_path = getfixture('tmp_path')
>>> filename = tmp_path / 'out.txt'
>>> _ = filename.write_text('foo\nbar\n', newline='', encoding='utf-8')
>>> report_newlines(filename)
newline is '\n'
>>> filename = tmp_path / 'out.txt'
>>> _ = filename.write_text('foo\nbar\r\n', newline='', encoding='utf-8')
>>> report_newlines(filename)
newlines are ('\n', '\r\n')
"""
newlines = jaraco.text.read_newlines(filename)
count = len(tuple(always_iterable(newlines)))
engine = inflect.engine()
print(
engine.plural_noun("newline", count),
engine.plural_verb("is", count),
repr(newlines),
)
autocommand.autocommand(__name__)(report_newlines)

View File

@@ -0,0 +1,21 @@
import sys
import autocommand
from jaraco.text import Stripper
def strip_prefix():
r"""
Strip any common prefix from stdin.
>>> import io, pytest
>>> getfixture('monkeypatch').setattr('sys.stdin', io.StringIO('abcdef\nabc123'))
>>> strip_prefix()
def
123
"""
sys.stdout.writelines(Stripper.strip_prefix(sys.stdin).lines)
autocommand.autocommand(__name__)(strip_prefix)

View File

@@ -0,0 +1,6 @@
import sys
from . import layouts
__name__ == '__main__' and layouts._translate_stream(sys.stdin, layouts.to_dvorak)

View File

@@ -0,0 +1,6 @@
import sys
from . import layouts
__name__ == '__main__' and layouts._translate_stream(sys.stdin, layouts.to_qwerty)