update
This commit is contained in:
@@ -1,16 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Distributed under the (new) BSD License. See LICENSE.txt for more info.
|
||||
|
||||
""" This subpackage provides the core functionality of imageio
|
||||
(everything but the plugins).
|
||||
"""
|
||||
|
||||
# flake8: noqa
|
||||
|
||||
from .util import Image, Array, Dict, asarray, image_as_uint, urlopen
|
||||
from .util import BaseProgressIndicator, StdoutProgressIndicator, IS_PYPY
|
||||
from .util import get_platform, appdata_dir, resource_dirs, has_module
|
||||
from .findlib import load_lib
|
||||
from .fetching import get_remote_file, InternetNotAllowedError, NeedDownloadError
|
||||
from .request import Request, read_n_bytes, RETURN_BYTES
|
||||
from .format import Format, FormatManager
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,247 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Based on code from the vispy project
|
||||
# Distributed under the (new) BSD License. See LICENSE.txt for more info.
|
||||
|
||||
"""Data downloading and reading functions
|
||||
"""
|
||||
|
||||
from math import log
|
||||
import os
|
||||
from os import path as op
|
||||
import sys
|
||||
import shutil
|
||||
import time
|
||||
|
||||
from . import appdata_dir, resource_dirs
|
||||
from . import StdoutProgressIndicator, urlopen
|
||||
|
||||
|
||||
class InternetNotAllowedError(IOError):
|
||||
"""Plugins that need resources can just use get_remote_file(), but
|
||||
should catch this error and silently ignore it.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class NeedDownloadError(IOError):
|
||||
"""Is raised when a remote file is requested that is not locally
|
||||
available, but which needs to be explicitly downloaded by the user.
|
||||
"""
|
||||
|
||||
|
||||
def get_remote_file(fname, directory=None, force_download=False, auto=True):
|
||||
"""Get a the filename for the local version of a file from the web
|
||||
|
||||
Parameters
|
||||
----------
|
||||
fname : str
|
||||
The relative filename on the remote data repository to download.
|
||||
These correspond to paths on
|
||||
``https://github.com/imageio/imageio-binaries/``.
|
||||
directory : str | None
|
||||
The directory where the file will be cached if a download was
|
||||
required to obtain the file. By default, the appdata directory
|
||||
is used. This is also the first directory that is checked for
|
||||
a local version of the file. If the directory does not exist,
|
||||
it will be created.
|
||||
force_download : bool | str
|
||||
If True, the file will be downloaded even if a local copy exists
|
||||
(and this copy will be overwritten). Can also be a YYYY-MM-DD date
|
||||
to ensure a file is up-to-date (modified date of a file on disk,
|
||||
if present, is checked).
|
||||
auto : bool
|
||||
Whether to auto-download the file if its not present locally. Default
|
||||
True. If False and a download is needed, raises NeedDownloadError.
|
||||
|
||||
Returns
|
||||
-------
|
||||
fname : str
|
||||
The path to the file on the local system.
|
||||
"""
|
||||
_url_root = "https://github.com/imageio/imageio-binaries/raw/master/"
|
||||
url = _url_root + fname
|
||||
nfname = op.normcase(fname) # convert to native
|
||||
# Get dirs to look for the resource
|
||||
given_directory = directory
|
||||
directory = given_directory or appdata_dir("imageio")
|
||||
dirs = resource_dirs()
|
||||
dirs.insert(0, directory) # Given dir has preference
|
||||
# Try to find the resource locally
|
||||
for dir in dirs:
|
||||
filename = op.join(dir, nfname)
|
||||
if op.isfile(filename):
|
||||
if not force_download: # we're done
|
||||
if given_directory and given_directory != dir:
|
||||
filename2 = os.path.join(given_directory, nfname)
|
||||
# Make sure the output directory exists
|
||||
if not op.isdir(op.dirname(filename2)):
|
||||
os.makedirs(op.abspath(op.dirname(filename2)))
|
||||
shutil.copy(filename, filename2)
|
||||
return filename2
|
||||
return filename
|
||||
if isinstance(force_download, str):
|
||||
ntime = time.strptime(force_download, "%Y-%m-%d")
|
||||
ftime = time.gmtime(op.getctime(filename))
|
||||
if ftime >= ntime:
|
||||
if given_directory and given_directory != dir:
|
||||
filename2 = os.path.join(given_directory, nfname)
|
||||
# Make sure the output directory exists
|
||||
if not op.isdir(op.dirname(filename2)):
|
||||
os.makedirs(op.abspath(op.dirname(filename2)))
|
||||
shutil.copy(filename, filename2)
|
||||
return filename2
|
||||
return filename
|
||||
else:
|
||||
print("File older than %s, updating..." % force_download)
|
||||
break
|
||||
|
||||
# If we get here, we're going to try to download the file
|
||||
if os.getenv("IMAGEIO_NO_INTERNET", "").lower() in ("1", "true", "yes"):
|
||||
raise InternetNotAllowedError(
|
||||
"Will not download resource from the "
|
||||
"internet because environment variable "
|
||||
"IMAGEIO_NO_INTERNET is set."
|
||||
)
|
||||
|
||||
# Can we proceed with auto-download?
|
||||
if not auto:
|
||||
raise NeedDownloadError()
|
||||
|
||||
# Get filename to store to and make sure the dir exists
|
||||
filename = op.join(directory, nfname)
|
||||
if not op.isdir(op.dirname(filename)):
|
||||
os.makedirs(op.abspath(op.dirname(filename)))
|
||||
# let's go get the file
|
||||
if os.getenv("CONTINUOUS_INTEGRATION", False): # pragma: no cover
|
||||
# On CI, we retry a few times ...
|
||||
for i in range(2):
|
||||
try:
|
||||
_fetch_file(url, filename)
|
||||
return filename
|
||||
except IOError:
|
||||
time.sleep(0.5)
|
||||
else:
|
||||
_fetch_file(url, filename)
|
||||
return filename
|
||||
else: # pragma: no cover
|
||||
_fetch_file(url, filename)
|
||||
return filename
|
||||
|
||||
|
||||
def _fetch_file(url, file_name, print_destination=True):
|
||||
"""Load requested file, downloading it if needed or requested
|
||||
|
||||
Parameters
|
||||
----------
|
||||
url: string
|
||||
The url of file to be downloaded.
|
||||
file_name: string
|
||||
Name, along with the path, of where downloaded file will be saved.
|
||||
print_destination: bool, optional
|
||||
If true, destination of where file was saved will be printed after
|
||||
download finishes.
|
||||
resume: bool, optional
|
||||
If true, try to resume partially downloaded files.
|
||||
"""
|
||||
# Adapted from NISL:
|
||||
# https://github.com/nisl/tutorial/blob/master/nisl/datasets.py
|
||||
|
||||
print(
|
||||
"Imageio: %r was not found on your computer; "
|
||||
"downloading it now." % os.path.basename(file_name)
|
||||
)
|
||||
|
||||
temp_file_name = file_name + ".part"
|
||||
local_file = None
|
||||
initial_size = 0
|
||||
errors = []
|
||||
for tries in range(4):
|
||||
try:
|
||||
# Checking file size and displaying it alongside the download url
|
||||
remote_file = urlopen(url, timeout=5.0)
|
||||
file_size = int(remote_file.headers["Content-Length"].strip())
|
||||
size_str = _sizeof_fmt(file_size)
|
||||
print("Try %i. Download from %s (%s)" % (tries + 1, url, size_str))
|
||||
# Downloading data (can be extended to resume if need be)
|
||||
local_file = open(temp_file_name, "wb")
|
||||
_chunk_read(remote_file, local_file, initial_size=initial_size)
|
||||
# temp file must be closed prior to the move
|
||||
if not local_file.closed:
|
||||
local_file.close()
|
||||
shutil.move(temp_file_name, file_name)
|
||||
if print_destination is True:
|
||||
sys.stdout.write("File saved as %s.\n" % file_name)
|
||||
break
|
||||
except Exception as e:
|
||||
errors.append(e)
|
||||
print("Error while fetching file: %s." % str(e))
|
||||
finally:
|
||||
if local_file is not None:
|
||||
if not local_file.closed:
|
||||
local_file.close()
|
||||
else:
|
||||
raise IOError(
|
||||
"Unable to download %r. Perhaps there is no internet "
|
||||
"connection? If there is, please report this problem."
|
||||
% os.path.basename(file_name)
|
||||
)
|
||||
|
||||
|
||||
def _chunk_read(response, local_file, chunk_size=8192, initial_size=0):
|
||||
"""Download a file chunk by chunk and show advancement
|
||||
|
||||
Can also be used when resuming downloads over http.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
response: urllib.response.addinfourl
|
||||
Response to the download request in order to get file size.
|
||||
local_file: file
|
||||
Hard disk file where data should be written.
|
||||
chunk_size: integer, optional
|
||||
Size of downloaded chunks. Default: 8192
|
||||
initial_size: int, optional
|
||||
If resuming, indicate the initial size of the file.
|
||||
"""
|
||||
# Adapted from NISL:
|
||||
# https://github.com/nisl/tutorial/blob/master/nisl/datasets.py
|
||||
|
||||
bytes_so_far = initial_size
|
||||
# Returns only amount left to download when resuming, not the size of the
|
||||
# entire file
|
||||
total_size = int(response.headers["Content-Length"].strip())
|
||||
total_size += initial_size
|
||||
|
||||
progress = StdoutProgressIndicator("Downloading")
|
||||
progress.start("", "bytes", total_size)
|
||||
|
||||
while True:
|
||||
chunk = response.read(chunk_size)
|
||||
bytes_so_far += len(chunk)
|
||||
if not chunk:
|
||||
break
|
||||
_chunk_write(chunk, local_file, progress)
|
||||
progress.finish("Done")
|
||||
|
||||
|
||||
def _chunk_write(chunk, local_file, progress):
|
||||
"""Write a chunk to file and update the progress bar"""
|
||||
local_file.write(chunk)
|
||||
progress.increase_progress(len(chunk))
|
||||
time.sleep(0) # Give other threads a chance, e.g. those that handle stdout pipes
|
||||
|
||||
|
||||
def _sizeof_fmt(num):
|
||||
"""Turn number of bytes into human-readable str"""
|
||||
units = ["bytes", "kB", "MB", "GB", "TB", "PB"]
|
||||
decimals = [0, 0, 1, 2, 2, 2]
|
||||
"""Human friendly file size"""
|
||||
if num > 1:
|
||||
exponent = min(int(log(num, 1024)), len(units) - 1)
|
||||
quotient = float(num) / 1024**exponent
|
||||
unit = units[exponent]
|
||||
num_decimals = decimals[exponent]
|
||||
format_string = "{0:.%sf} {1}" % num_decimals
|
||||
return format_string.format(quotient, unit)
|
||||
return "0 bytes" if num == 0 else "1 byte"
|
||||
@@ -1,161 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright (c) 2015-1018, imageio contributors
|
||||
# Copyright (C) 2013, Zach Pincus, Almar Klein and others
|
||||
|
||||
""" This module contains generic code to find and load a dynamic library.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import ctypes
|
||||
|
||||
|
||||
LOCALDIR = os.path.abspath(os.path.dirname(__file__))
|
||||
|
||||
# Flag that can be patched / set to True to disable loading non-system libs
|
||||
SYSTEM_LIBS_ONLY = False
|
||||
|
||||
|
||||
def looks_lib(fname):
|
||||
"""Returns True if the given filename looks like a dynamic library.
|
||||
Based on extension, but cross-platform and more flexible.
|
||||
"""
|
||||
fname = fname.lower()
|
||||
if sys.platform.startswith("win"):
|
||||
return fname.endswith(".dll")
|
||||
elif sys.platform.startswith("darwin"):
|
||||
return fname.endswith(".dylib")
|
||||
else:
|
||||
return fname.endswith(".so") or ".so." in fname
|
||||
|
||||
|
||||
def generate_candidate_libs(lib_names, lib_dirs=None):
|
||||
"""Generate a list of candidate filenames of what might be the dynamic
|
||||
library corresponding with the given list of names.
|
||||
Returns (lib_dirs, lib_paths)
|
||||
"""
|
||||
lib_dirs = lib_dirs or []
|
||||
|
||||
# Get system dirs to search
|
||||
sys_lib_dirs = [
|
||||
"/lib",
|
||||
"/usr/lib",
|
||||
"/usr/lib/x86_64-linux-gnu",
|
||||
"/usr/lib/aarch64-linux-gnu",
|
||||
"/usr/local/lib",
|
||||
"/opt/local/lib",
|
||||
]
|
||||
|
||||
# Get Python dirs to search (shared if for Pyzo)
|
||||
py_sub_dirs = ["bin", "lib", "DLLs", "Library/bin", "shared"]
|
||||
py_lib_dirs = [os.path.join(sys.prefix, d) for d in py_sub_dirs]
|
||||
if hasattr(sys, "base_prefix"):
|
||||
py_lib_dirs += [os.path.join(sys.base_prefix, d) for d in py_sub_dirs]
|
||||
|
||||
# Get user dirs to search (i.e. HOME)
|
||||
home_dir = os.path.expanduser("~")
|
||||
user_lib_dirs = [os.path.join(home_dir, d) for d in ["lib"]]
|
||||
|
||||
# Select only the dirs for which a directory exists, and remove duplicates
|
||||
potential_lib_dirs = lib_dirs + sys_lib_dirs + py_lib_dirs + user_lib_dirs
|
||||
lib_dirs = []
|
||||
for ld in potential_lib_dirs:
|
||||
if os.path.isdir(ld) and ld not in lib_dirs:
|
||||
lib_dirs.append(ld)
|
||||
|
||||
# Now attempt to find libraries of that name in the given directory
|
||||
# (case-insensitive)
|
||||
lib_paths = []
|
||||
for lib_dir in lib_dirs:
|
||||
# Get files, prefer short names, last version
|
||||
files = os.listdir(lib_dir)
|
||||
files = reversed(sorted(files))
|
||||
files = sorted(files, key=len)
|
||||
for lib_name in lib_names:
|
||||
# Test all filenames for name and ext
|
||||
for fname in files:
|
||||
if fname.lower().startswith(lib_name) and looks_lib(fname):
|
||||
lib_paths.append(os.path.join(lib_dir, fname))
|
||||
|
||||
# Return (only the items which are files)
|
||||
lib_paths = [lp for lp in lib_paths if os.path.isfile(lp)]
|
||||
return lib_dirs, lib_paths
|
||||
|
||||
|
||||
def load_lib(exact_lib_names, lib_names, lib_dirs=None):
|
||||
"""load_lib(exact_lib_names, lib_names, lib_dirs=None)
|
||||
|
||||
Load a dynamic library.
|
||||
|
||||
This function first tries to load the library from the given exact
|
||||
names. When that fails, it tries to find the library in common
|
||||
locations. It searches for files that start with one of the names
|
||||
given in lib_names (case insensitive). The search is performed in
|
||||
the given lib_dirs and a set of common library dirs.
|
||||
|
||||
Returns ``(ctypes_library, library_path)``
|
||||
"""
|
||||
|
||||
# Checks
|
||||
assert isinstance(exact_lib_names, list)
|
||||
assert isinstance(lib_names, list)
|
||||
if lib_dirs is not None:
|
||||
assert isinstance(lib_dirs, list)
|
||||
exact_lib_names = [n for n in exact_lib_names if n]
|
||||
lib_names = [n for n in lib_names if n]
|
||||
|
||||
# Get reference name (for better messages)
|
||||
if lib_names:
|
||||
the_lib_name = lib_names[0]
|
||||
elif exact_lib_names:
|
||||
the_lib_name = exact_lib_names[0]
|
||||
else:
|
||||
raise ValueError("No library name given.")
|
||||
|
||||
# Collect filenames of potential libraries
|
||||
# First try a few bare library names that ctypes might be able to find
|
||||
# in the default locations for each platform.
|
||||
if SYSTEM_LIBS_ONLY:
|
||||
lib_dirs, lib_paths = [], []
|
||||
else:
|
||||
lib_dirs, lib_paths = generate_candidate_libs(lib_names, lib_dirs)
|
||||
lib_paths = exact_lib_names + lib_paths
|
||||
|
||||
# Select loader
|
||||
if sys.platform.startswith("win"):
|
||||
loader = ctypes.windll
|
||||
else:
|
||||
loader = ctypes.cdll
|
||||
|
||||
# Try to load until success
|
||||
the_lib = None
|
||||
errors = []
|
||||
for fname in lib_paths:
|
||||
try:
|
||||
the_lib = loader.LoadLibrary(fname)
|
||||
break
|
||||
except Exception as err:
|
||||
# Don't record errors when it couldn't load the library from an
|
||||
# exact name -- this fails often, and doesn't provide any useful
|
||||
# debugging information anyway, beyond "couldn't find library..."
|
||||
if fname not in exact_lib_names:
|
||||
errors.append((fname, err))
|
||||
|
||||
# No success ...
|
||||
if the_lib is None:
|
||||
if errors:
|
||||
# No library loaded, and load-errors reported for some
|
||||
# candidate libs
|
||||
err_txt = ["%s:\n%s" % (lib, str(e)) for lib, e in errors]
|
||||
msg = (
|
||||
"One or more %s libraries were found, but "
|
||||
+ "could not be loaded due to the following errors:\n%s"
|
||||
)
|
||||
raise OSError(msg % (the_lib_name, "\n\n".join(err_txt)))
|
||||
else:
|
||||
# No errors, because no potential libraries found at all!
|
||||
msg = "Could not find a %s library in any of:\n%s"
|
||||
raise OSError(msg % (the_lib_name, "\n".join(lib_dirs)))
|
||||
|
||||
# Done
|
||||
return the_lib, fname
|
||||
@@ -1,865 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# imageio is distributed under the terms of the (new) BSD License.
|
||||
|
||||
"""
|
||||
|
||||
.. note::
|
||||
imageio is under construction, some details with regard to the
|
||||
Reader and Writer classes may change.
|
||||
|
||||
These are the main classes of imageio. They expose an interface for
|
||||
advanced users and plugin developers. A brief overview:
|
||||
|
||||
* imageio.FormatManager - for keeping track of registered formats.
|
||||
* imageio.Format - representation of a file format reader/writer
|
||||
* imageio.Format.Reader - object used during the reading of a file.
|
||||
* imageio.Format.Writer - object used during saving a file.
|
||||
* imageio.Request - used to store the filename and other info.
|
||||
|
||||
Plugins need to implement a Format class and register
|
||||
a format object using ``imageio.formats.add_format()``.
|
||||
|
||||
"""
|
||||
|
||||
# todo: do we even use the known extensions?
|
||||
|
||||
# Some notes:
|
||||
#
|
||||
# The classes in this module use the Request object to pass filename and
|
||||
# related info around. This request object is instantiated in
|
||||
# imageio.get_reader and imageio.get_writer.
|
||||
|
||||
import sys
|
||||
import warnings
|
||||
import contextlib
|
||||
|
||||
import numpy as np
|
||||
from pathlib import Path
|
||||
|
||||
from . import Array, asarray
|
||||
from .request import ImageMode
|
||||
from ..config import known_plugins, known_extensions, PluginConfig, FileExtension
|
||||
from ..config.plugins import _original_order
|
||||
from .imopen import imopen
|
||||
|
||||
|
||||
# survived for backwards compatibility
|
||||
# I don't know if external plugin code depends on it existing
|
||||
# We no longer do
|
||||
MODENAMES = ImageMode
|
||||
|
||||
|
||||
def _get_config(plugin):
|
||||
"""Old Plugin resolution logic.
|
||||
|
||||
Remove once we remove the old format manager.
|
||||
"""
|
||||
|
||||
extension_name = None
|
||||
|
||||
if Path(plugin).suffix.lower() in known_extensions:
|
||||
extension_name = Path(plugin).suffix.lower()
|
||||
elif plugin in known_plugins:
|
||||
pass
|
||||
elif plugin.lower() in known_extensions:
|
||||
extension_name = plugin.lower()
|
||||
elif "." + plugin.lower() in known_extensions:
|
||||
extension_name = "." + plugin.lower()
|
||||
else:
|
||||
raise IndexError(f"No format known by name `{plugin}`.")
|
||||
|
||||
if extension_name is not None:
|
||||
for plugin_name in [
|
||||
x
|
||||
for file_extension in known_extensions[extension_name]
|
||||
for x in file_extension.priority
|
||||
]:
|
||||
if known_plugins[plugin_name].is_legacy:
|
||||
plugin = plugin_name
|
||||
break
|
||||
|
||||
return known_plugins[plugin]
|
||||
|
||||
|
||||
class Format(object):
|
||||
"""Represents an implementation to read/write a particular file format
|
||||
|
||||
A format instance is responsible for 1) providing information about
|
||||
a format; 2) determining whether a certain file can be read/written
|
||||
with this format; 3) providing a reader/writer class.
|
||||
|
||||
Generally, imageio will select the right format and use that to
|
||||
read/write an image. A format can also be explicitly chosen in all
|
||||
read/write functions. Use ``print(format)``, or ``help(format_name)``
|
||||
to see its documentation.
|
||||
|
||||
To implement a specific format, one should create a subclass of
|
||||
Format and the Format.Reader and Format.Writer classes. See
|
||||
:class:`imageio.plugins` for details.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
name : str
|
||||
A short name of this format. Users can select a format using its name.
|
||||
description : str
|
||||
A one-line description of the format.
|
||||
extensions : str | list | None
|
||||
List of filename extensions that this format supports. If a
|
||||
string is passed it should be space or comma separated. The
|
||||
extensions are used in the documentation and to allow users to
|
||||
select a format by file extension. It is not used to determine
|
||||
what format to use for reading/saving a file.
|
||||
modes : str
|
||||
A string containing the modes that this format can handle ('iIvV'),
|
||||
“i” for an image, “I” for multiple images, “v” for a volume,
|
||||
“V” for multiple volumes.
|
||||
This attribute is used in the documentation and to select the
|
||||
formats when reading/saving a file.
|
||||
"""
|
||||
|
||||
def __init__(self, name, description, extensions=None, modes=None):
|
||||
"""Initialize the Plugin.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
name : str
|
||||
A short name of this format. Users can select a format using its name.
|
||||
description : str
|
||||
A one-line description of the format.
|
||||
extensions : str | list | None
|
||||
List of filename extensions that this format supports. If a
|
||||
string is passed it should be space or comma separated. The
|
||||
extensions are used in the documentation and to allow users to
|
||||
select a format by file extension. It is not used to determine
|
||||
what format to use for reading/saving a file.
|
||||
modes : str
|
||||
A string containing the modes that this format can handle ('iIvV'),
|
||||
“i” for an image, “I” for multiple images, “v” for a volume,
|
||||
“V” for multiple volumes.
|
||||
This attribute is used in the documentation and to select the
|
||||
formats when reading/saving a file.
|
||||
"""
|
||||
|
||||
# Store name and description
|
||||
self._name = name.upper()
|
||||
self._description = description
|
||||
|
||||
# Store extensions, do some effort to normalize them.
|
||||
# They are stored as a list of lowercase strings without leading dots.
|
||||
if extensions is None:
|
||||
extensions = []
|
||||
elif isinstance(extensions, str):
|
||||
extensions = extensions.replace(",", " ").split(" ")
|
||||
#
|
||||
if isinstance(extensions, (tuple, list)):
|
||||
self._extensions = tuple(
|
||||
["." + e.strip(".").lower() for e in extensions if e]
|
||||
)
|
||||
else:
|
||||
raise ValueError("Invalid value for extensions given.")
|
||||
|
||||
# Store mode
|
||||
self._modes = modes or ""
|
||||
if not isinstance(self._modes, str):
|
||||
raise ValueError("Invalid value for modes given.")
|
||||
for m in self._modes:
|
||||
if m not in "iIvV?":
|
||||
raise ValueError("Invalid value for mode given.")
|
||||
|
||||
def __repr__(self):
|
||||
# Short description
|
||||
return "<Format %s - %s>" % (self.name, self.description)
|
||||
|
||||
def __str__(self):
|
||||
return self.doc
|
||||
|
||||
@property
|
||||
def doc(self):
|
||||
"""The documentation for this format (name + description + docstring)."""
|
||||
# Our docsring is assumed to be indented by four spaces. The
|
||||
# first line needs special attention.
|
||||
return "%s - %s\n\n %s\n" % (
|
||||
self.name,
|
||||
self.description,
|
||||
self.__doc__.strip(),
|
||||
)
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
"""The name of this format."""
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def description(self):
|
||||
"""A short description of this format."""
|
||||
return self._description
|
||||
|
||||
@property
|
||||
def extensions(self):
|
||||
"""A list of file extensions supported by this plugin.
|
||||
These are all lowercase with a leading dot.
|
||||
"""
|
||||
return self._extensions
|
||||
|
||||
@property
|
||||
def modes(self):
|
||||
"""A string specifying the modes that this format can handle."""
|
||||
return self._modes
|
||||
|
||||
def get_reader(self, request):
|
||||
"""get_reader(request)
|
||||
|
||||
Return a reader object that can be used to read data and info
|
||||
from the given file. Users are encouraged to use
|
||||
imageio.get_reader() instead.
|
||||
"""
|
||||
select_mode = request.mode[1] if request.mode[1] in "iIvV" else ""
|
||||
if select_mode not in self.modes:
|
||||
raise RuntimeError(
|
||||
f"Format {self.name} cannot read in {request.mode.image_mode} mode"
|
||||
)
|
||||
return self.Reader(self, request)
|
||||
|
||||
def get_writer(self, request):
|
||||
"""get_writer(request)
|
||||
|
||||
Return a writer object that can be used to write data and info
|
||||
to the given file. Users are encouraged to use
|
||||
imageio.get_writer() instead.
|
||||
"""
|
||||
select_mode = request.mode[1] if request.mode[1] in "iIvV" else ""
|
||||
if select_mode not in self.modes:
|
||||
raise RuntimeError(
|
||||
f"Format {self.name} cannot write in {request.mode.image_mode} mode"
|
||||
)
|
||||
return self.Writer(self, request)
|
||||
|
||||
def can_read(self, request):
|
||||
"""can_read(request)
|
||||
|
||||
Get whether this format can read data from the specified uri.
|
||||
"""
|
||||
return self._can_read(request)
|
||||
|
||||
def can_write(self, request):
|
||||
"""can_write(request)
|
||||
|
||||
Get whether this format can write data to the speciefed uri.
|
||||
"""
|
||||
return self._can_write(request)
|
||||
|
||||
def _can_read(self, request): # pragma: no cover
|
||||
"""Check if Plugin can read from ImageResource.
|
||||
|
||||
This method is called when the format manager is searching for a format
|
||||
to read a certain image. Return True if this format can do it.
|
||||
|
||||
The format manager is aware of the extensions and the modes that each
|
||||
format can handle. It will first ask all formats that *seem* to be able
|
||||
to read it whether they can. If none can, it will ask the remaining
|
||||
formats if they can: the extension might be missing, and this allows
|
||||
formats to provide functionality for certain extensions, while giving
|
||||
preference to other plugins.
|
||||
|
||||
If a format says it can, it should live up to it. The format would
|
||||
ideally check the request.firstbytes and look for a header of some kind.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
request : Request
|
||||
A request that can be used to access the ImageResource and obtain
|
||||
metadata about it.
|
||||
|
||||
Returns
|
||||
-------
|
||||
can_read : bool
|
||||
True if the plugin can read from the ImageResource, False otherwise.
|
||||
|
||||
"""
|
||||
return None # Plugins must implement this
|
||||
|
||||
def _can_write(self, request): # pragma: no cover
|
||||
"""Check if Plugin can write to ImageResource.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
request : Request
|
||||
A request that can be used to access the ImageResource and obtain
|
||||
metadata about it.
|
||||
|
||||
Returns
|
||||
-------
|
||||
can_read : bool
|
||||
True if the plugin can write to the ImageResource, False otherwise.
|
||||
|
||||
"""
|
||||
return None # Plugins must implement this
|
||||
|
||||
# -----
|
||||
|
||||
class _BaseReaderWriter(object):
|
||||
"""Base class for the Reader and Writer class to implement common
|
||||
functionality. It implements a similar approach for opening/closing
|
||||
and context management as Python's file objects.
|
||||
"""
|
||||
|
||||
def __init__(self, format, request):
|
||||
self.__closed = False
|
||||
self._BaseReaderWriter_last_index = -1
|
||||
self._format = format
|
||||
self._request = request
|
||||
# Open the reader/writer
|
||||
self._open(**self.request.kwargs.copy())
|
||||
|
||||
@property
|
||||
def format(self):
|
||||
"""The :class:`.Format` object corresponding to the current
|
||||
read/write operation.
|
||||
"""
|
||||
return self._format
|
||||
|
||||
@property
|
||||
def request(self):
|
||||
"""The :class:`.Request` object corresponding to the
|
||||
current read/write operation.
|
||||
"""
|
||||
return self._request
|
||||
|
||||
def __enter__(self):
|
||||
self._checkClosed()
|
||||
return self
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
if value is None:
|
||||
# Otherwise error in close hide the real error.
|
||||
self.close()
|
||||
|
||||
def __del__(self):
|
||||
try:
|
||||
self.close()
|
||||
except Exception: # pragma: no cover
|
||||
pass # Supress noise when called during interpreter shutdown
|
||||
|
||||
def close(self):
|
||||
"""Flush and close the reader/writer.
|
||||
This method has no effect if it is already closed.
|
||||
"""
|
||||
if self.__closed:
|
||||
return
|
||||
self.__closed = True
|
||||
self._close()
|
||||
# Process results and clean request object
|
||||
self.request.finish()
|
||||
|
||||
@property
|
||||
def closed(self):
|
||||
"""Whether the reader/writer is closed."""
|
||||
return self.__closed
|
||||
|
||||
def _checkClosed(self, msg=None):
|
||||
"""Internal: raise an ValueError if reader/writer is closed"""
|
||||
if self.closed:
|
||||
what = self.__class__.__name__
|
||||
msg = msg or ("I/O operation on closed %s." % what)
|
||||
raise RuntimeError(msg)
|
||||
|
||||
# To implement
|
||||
|
||||
def _open(self, **kwargs):
|
||||
"""_open(**kwargs)
|
||||
|
||||
Plugins should probably implement this.
|
||||
|
||||
It is called when reader/writer is created. Here the
|
||||
plugin can do its initialization. The given keyword arguments
|
||||
are those that were given by the user at imageio.read() or
|
||||
imageio.write().
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def _close(self):
|
||||
"""_close()
|
||||
|
||||
Plugins should probably implement this.
|
||||
|
||||
It is called when the reader/writer is closed. Here the plugin
|
||||
can do a cleanup, flush, etc.
|
||||
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
# -----
|
||||
|
||||
class Reader(_BaseReaderWriter):
|
||||
"""
|
||||
The purpose of a reader object is to read data from an image
|
||||
resource, and should be obtained by calling :func:`.get_reader`.
|
||||
|
||||
A reader can be used as an iterator to read multiple images,
|
||||
and (if the format permits) only reads data from the file when
|
||||
new data is requested (i.e. streaming). A reader can also be
|
||||
used as a context manager so that it is automatically closed.
|
||||
|
||||
Plugins implement Reader's for different formats. Though rare,
|
||||
plugins may provide additional functionality (beyond what is
|
||||
provided by the base reader class).
|
||||
"""
|
||||
|
||||
def get_length(self):
|
||||
"""get_length()
|
||||
|
||||
Get the number of images in the file. (Note: you can also
|
||||
use ``len(reader_object)``.)
|
||||
|
||||
The result can be:
|
||||
* 0 for files that only have meta data
|
||||
* 1 for singleton images (e.g. in PNG, JPEG, etc.)
|
||||
* N for image series
|
||||
* inf for streams (series of unknown length)
|
||||
"""
|
||||
return self._get_length()
|
||||
|
||||
def get_data(self, index, **kwargs):
|
||||
"""get_data(index, **kwargs)
|
||||
|
||||
Read image data from the file, using the image index. The
|
||||
returned image has a 'meta' attribute with the meta data.
|
||||
Raises IndexError if the index is out of range.
|
||||
|
||||
Some formats may support additional keyword arguments. These are
|
||||
listed in the documentation of those formats.
|
||||
"""
|
||||
self._checkClosed()
|
||||
self._BaseReaderWriter_last_index = index
|
||||
try:
|
||||
im, meta = self._get_data(index, **kwargs)
|
||||
except StopIteration:
|
||||
raise IndexError(index)
|
||||
return Array(im, meta) # Array tests im and meta
|
||||
|
||||
def get_next_data(self, **kwargs):
|
||||
"""get_next_data(**kwargs)
|
||||
|
||||
Read the next image from the series.
|
||||
|
||||
Some formats may support additional keyword arguments. These are
|
||||
listed in the documentation of those formats.
|
||||
"""
|
||||
return self.get_data(self._BaseReaderWriter_last_index + 1, **kwargs)
|
||||
|
||||
def set_image_index(self, index, **kwargs):
|
||||
"""set_image_index(index)
|
||||
|
||||
Set the internal pointer such that the next call to
|
||||
get_next_data() returns the image specified by the index
|
||||
"""
|
||||
self._checkClosed()
|
||||
n = self.get_length()
|
||||
self._BaseReaderWriter_last_index = min(max(index - 1, -1), n)
|
||||
|
||||
def get_meta_data(self, index=None):
|
||||
"""get_meta_data(index=None)
|
||||
|
||||
Read meta data from the file. using the image index. If the
|
||||
index is omitted or None, return the file's (global) meta data.
|
||||
|
||||
Note that ``get_data`` also provides the meta data for the returned
|
||||
image as an atrribute of that image.
|
||||
|
||||
The meta data is a dict, which shape depends on the format.
|
||||
E.g. for JPEG, the dict maps group names to subdicts and each
|
||||
group is a dict with name-value pairs. The groups represent
|
||||
the different metadata formats (EXIF, XMP, etc.).
|
||||
"""
|
||||
self._checkClosed()
|
||||
meta = self._get_meta_data(index)
|
||||
if not isinstance(meta, dict):
|
||||
raise ValueError(
|
||||
"Meta data must be a dict, not %r" % meta.__class__.__name__
|
||||
)
|
||||
return meta
|
||||
|
||||
def iter_data(self):
|
||||
"""iter_data()
|
||||
|
||||
Iterate over all images in the series. (Note: you can also
|
||||
iterate over the reader object.)
|
||||
|
||||
"""
|
||||
self._checkClosed()
|
||||
n = self.get_length()
|
||||
i = 0
|
||||
while i < n:
|
||||
try:
|
||||
im, meta = self._get_data(i)
|
||||
except StopIteration:
|
||||
return
|
||||
except IndexError:
|
||||
if n == float("inf"):
|
||||
return
|
||||
raise
|
||||
yield Array(im, meta)
|
||||
i += 1
|
||||
|
||||
# Compatibility
|
||||
|
||||
def __iter__(self):
|
||||
return self.iter_data()
|
||||
|
||||
def __len__(self):
|
||||
n = self.get_length()
|
||||
if n == float("inf"):
|
||||
n = sys.maxsize
|
||||
return n
|
||||
|
||||
# To implement
|
||||
|
||||
def _get_length(self):
|
||||
"""_get_length()
|
||||
|
||||
Plugins must implement this.
|
||||
|
||||
The retured scalar specifies the number of images in the series.
|
||||
See Reader.get_length for more information.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def _get_data(self, index):
|
||||
"""_get_data()
|
||||
|
||||
Plugins must implement this, but may raise an IndexError in
|
||||
case the plugin does not support random access.
|
||||
|
||||
It should return the image and meta data: (ndarray, dict).
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def _get_meta_data(self, index):
|
||||
"""_get_meta_data(index)
|
||||
|
||||
Plugins must implement this.
|
||||
|
||||
It should return the meta data as a dict, corresponding to the
|
||||
given index, or to the file's (global) meta data if index is
|
||||
None.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
# -----
|
||||
|
||||
class Writer(_BaseReaderWriter):
|
||||
"""
|
||||
The purpose of a writer object is to write data to an image
|
||||
resource, and should be obtained by calling :func:`.get_writer`.
|
||||
|
||||
A writer will (if the format permits) write data to the file
|
||||
as soon as new data is provided (i.e. streaming). A writer can
|
||||
also be used as a context manager so that it is automatically
|
||||
closed.
|
||||
|
||||
Plugins implement Writer's for different formats. Though rare,
|
||||
plugins may provide additional functionality (beyond what is
|
||||
provided by the base writer class).
|
||||
"""
|
||||
|
||||
def append_data(self, im, meta=None):
|
||||
"""append_data(im, meta={})
|
||||
|
||||
Append an image (and meta data) to the file. The final meta
|
||||
data that is used consists of the meta data on the given
|
||||
image (if applicable), updated with the given meta data.
|
||||
"""
|
||||
self._checkClosed()
|
||||
# Check image data
|
||||
if not isinstance(im, np.ndarray):
|
||||
raise ValueError("append_data requires ndarray as first arg")
|
||||
# Get total meta dict
|
||||
total_meta = {}
|
||||
if hasattr(im, "meta") and isinstance(im.meta, dict):
|
||||
total_meta.update(im.meta)
|
||||
if meta is None:
|
||||
pass
|
||||
elif not isinstance(meta, dict):
|
||||
raise ValueError("Meta must be a dict.")
|
||||
else:
|
||||
total_meta.update(meta)
|
||||
|
||||
# Decouple meta info
|
||||
im = asarray(im)
|
||||
# Call
|
||||
return self._append_data(im, total_meta)
|
||||
|
||||
def set_meta_data(self, meta):
|
||||
"""set_meta_data(meta)
|
||||
|
||||
Sets the file's (global) meta data. The meta data is a dict which
|
||||
shape depends on the format. E.g. for JPEG the dict maps
|
||||
group names to subdicts, and each group is a dict with
|
||||
name-value pairs. The groups represents the different
|
||||
metadata formats (EXIF, XMP, etc.).
|
||||
|
||||
Note that some meta formats may not be supported for
|
||||
writing, and individual fields may be ignored without
|
||||
warning if they are invalid.
|
||||
"""
|
||||
self._checkClosed()
|
||||
if not isinstance(meta, dict):
|
||||
raise ValueError("Meta must be a dict.")
|
||||
else:
|
||||
return self._set_meta_data(meta)
|
||||
|
||||
# To implement
|
||||
|
||||
def _append_data(self, im, meta):
|
||||
# Plugins must implement this
|
||||
raise NotImplementedError()
|
||||
|
||||
def _set_meta_data(self, meta):
|
||||
# Plugins must implement this
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class FormatManager(object):
|
||||
"""
|
||||
The FormatManager is a singleton plugin factory.
|
||||
|
||||
The format manager supports getting a format object using indexing (by
|
||||
format name or extension). When used as an iterator, this object
|
||||
yields all registered format objects.
|
||||
|
||||
See also :func:`.help`.
|
||||
"""
|
||||
|
||||
@property
|
||||
def _formats(self):
|
||||
available_formats = list()
|
||||
|
||||
for config in known_plugins.values():
|
||||
with contextlib.suppress(ImportError):
|
||||
# if an exception is raised, then format not installed
|
||||
if config.is_legacy and config.format is not None:
|
||||
available_formats.append(config)
|
||||
|
||||
return available_formats
|
||||
|
||||
def __repr__(self):
|
||||
return f"<imageio.FormatManager with {len(self._formats)} registered formats>"
|
||||
|
||||
def __iter__(self):
|
||||
return iter(x.format for x in self._formats)
|
||||
|
||||
def __len__(self):
|
||||
return len(self._formats)
|
||||
|
||||
def __str__(self):
|
||||
ss = []
|
||||
for config in self._formats:
|
||||
ext = config.legacy_args["extensions"]
|
||||
desc = config.legacy_args["description"]
|
||||
s = f"{config.name} - {desc} [{ext}]"
|
||||
ss.append(s)
|
||||
return "\n".join(ss)
|
||||
|
||||
def __getitem__(self, name):
|
||||
warnings.warn(
|
||||
"The usage of `FormatManager` is deprecated and it will be "
|
||||
"removed in Imageio v3. Use `iio.imopen` instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
if not isinstance(name, str):
|
||||
raise ValueError(
|
||||
"Looking up a format should be done by name or by extension."
|
||||
)
|
||||
|
||||
if name == "":
|
||||
raise ValueError("No format matches the empty string.")
|
||||
|
||||
# Test if name is existing file
|
||||
if Path(name).is_file():
|
||||
# legacy compatibility - why test reading here??
|
||||
try:
|
||||
return imopen(name, "r", legacy_mode=True)._format
|
||||
except ValueError:
|
||||
# no plugin can read the file
|
||||
pass
|
||||
|
||||
config = _get_config(name.upper())
|
||||
|
||||
try:
|
||||
return config.format
|
||||
except ImportError:
|
||||
raise ImportError(
|
||||
f"The `{config.name}` format is not installed. "
|
||||
f"Use `pip install imageio[{config.install_name}]` to install it."
|
||||
)
|
||||
|
||||
def sort(self, *names):
|
||||
"""sort(name1, name2, name3, ...)
|
||||
|
||||
Sort the formats based on zero or more given names; a format with
|
||||
a name that matches one of the given names will take precedence
|
||||
over other formats. A match means an equal name, or ending with
|
||||
that name (though the former counts higher). Case insensitive.
|
||||
|
||||
Format preference will match the order of the given names: using
|
||||
``sort('TIFF', '-FI', '-PIL')`` would prefer the FreeImage formats
|
||||
over the Pillow formats, but prefer TIFF even more. Each time
|
||||
this is called, the starting point is the default format order,
|
||||
and calling ``sort()`` with no arguments will reset the order.
|
||||
|
||||
Be aware that using the function can affect the behavior of
|
||||
other code that makes use of imageio.
|
||||
|
||||
Also see the ``IMAGEIO_FORMAT_ORDER`` environment variable.
|
||||
"""
|
||||
|
||||
warnings.warn(
|
||||
"`FormatManager` is deprecated and it will be removed in ImageIO v3."
|
||||
" Migrating `FormatManager.sort` depends on your use-case:\n"
|
||||
"\t- modify `iio.config.known_plugins` to specify the search order for "
|
||||
"unrecognized formats.\n"
|
||||
"\t- modify `iio.config.known_extensions[<extension>].priority`"
|
||||
" to control a specific extension.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
# Check and sanitize imput
|
||||
for name in names:
|
||||
if not isinstance(name, str):
|
||||
raise TypeError("formats.sort() accepts only string names.")
|
||||
if any(c in name for c in ".,"):
|
||||
raise ValueError(
|
||||
"Names given to formats.sort() should not "
|
||||
"contain dots `.` or commas `,`."
|
||||
)
|
||||
|
||||
should_reset = len(names) == 0
|
||||
if should_reset:
|
||||
names = _original_order
|
||||
|
||||
sane_names = [name.strip().upper() for name in names if name != ""]
|
||||
|
||||
# enforce order for every extension that uses it
|
||||
flat_extensions = [
|
||||
ext for ext_list in known_extensions.values() for ext in ext_list
|
||||
]
|
||||
for extension in flat_extensions:
|
||||
if should_reset:
|
||||
extension.reset()
|
||||
continue
|
||||
|
||||
for name in reversed(sane_names):
|
||||
for plugin in [x for x in extension.default_priority]:
|
||||
if plugin.endswith(name):
|
||||
extension.priority.remove(plugin)
|
||||
extension.priority.insert(0, plugin)
|
||||
|
||||
old_order = known_plugins.copy()
|
||||
known_plugins.clear()
|
||||
|
||||
for name in sane_names:
|
||||
plugin = old_order.pop(name, None)
|
||||
if plugin is not None:
|
||||
known_plugins[name] = plugin
|
||||
|
||||
known_plugins.update(old_order)
|
||||
|
||||
def add_format(self, iio_format, overwrite=False):
|
||||
"""add_format(format, overwrite=False)
|
||||
|
||||
Register a format, so that imageio can use it. If a format with the
|
||||
same name already exists, an error is raised, unless overwrite is True,
|
||||
in which case the current format is replaced.
|
||||
"""
|
||||
|
||||
warnings.warn(
|
||||
"`FormatManager` is deprecated and it will be removed in ImageIO v3."
|
||||
"To migrate `FormatManager.add_format` add the plugin directly to "
|
||||
"`iio.config.known_plugins`.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
if not isinstance(iio_format, Format):
|
||||
raise ValueError("add_format needs argument to be a Format object")
|
||||
elif not overwrite and iio_format.name in self.get_format_names():
|
||||
raise ValueError(
|
||||
f"A Format named {iio_format.name} is already registered, use"
|
||||
" `overwrite=True` to replace."
|
||||
)
|
||||
|
||||
config = PluginConfig(
|
||||
name=iio_format.name.upper(),
|
||||
class_name=iio_format.__class__.__name__,
|
||||
module_name=iio_format.__class__.__module__,
|
||||
is_legacy=True,
|
||||
install_name="unknown",
|
||||
legacy_args={
|
||||
"name": iio_format.name,
|
||||
"description": iio_format.description,
|
||||
"extensions": " ".join(iio_format.extensions),
|
||||
"modes": iio_format.modes,
|
||||
},
|
||||
)
|
||||
|
||||
known_plugins[config.name] = config
|
||||
|
||||
for extension in iio_format.extensions:
|
||||
# be conservative and always treat it as a unique file format
|
||||
ext = FileExtension(
|
||||
extension=extension,
|
||||
priority=[config.name],
|
||||
name="Unique Format",
|
||||
description="A format inserted at runtime."
|
||||
f" It is being read by the `{config.name}` plugin.",
|
||||
)
|
||||
known_extensions.setdefault(extension, list()).append(ext)
|
||||
|
||||
def search_read_format(self, request):
|
||||
"""search_read_format(request)
|
||||
|
||||
Search a format that can read a file according to the given request.
|
||||
Returns None if no appropriate format was found. (used internally)
|
||||
"""
|
||||
|
||||
try:
|
||||
# in legacy_mode imopen returns a LegacyPlugin
|
||||
return imopen(request, request.mode.io_mode, legacy_mode=True)._format
|
||||
except ValueError:
|
||||
# no plugin can read this request
|
||||
# but the legacy API doesn't raise
|
||||
return None
|
||||
|
||||
def search_write_format(self, request):
|
||||
"""search_write_format(request)
|
||||
|
||||
Search a format that can write a file according to the given request.
|
||||
Returns None if no appropriate format was found. (used internally)
|
||||
"""
|
||||
|
||||
try:
|
||||
# in legacy_mode imopen returns a LegacyPlugin
|
||||
return imopen(request, request.mode.io_mode, legacy_mode=True)._format
|
||||
except ValueError:
|
||||
# no plugin can write this request
|
||||
# but the legacy API doesn't raise
|
||||
return None
|
||||
|
||||
def get_format_names(self):
|
||||
"""Get the names of all registered formats."""
|
||||
|
||||
warnings.warn(
|
||||
"`FormatManager` is deprecated and it will be removed in ImageIO v3."
|
||||
"To migrate `FormatManager.get_format_names` use `iio.config.known_plugins.keys()` instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
return [f.name for f in self._formats]
|
||||
|
||||
def show(self):
|
||||
"""Show a nicely formatted list of available formats"""
|
||||
print(self)
|
||||
@@ -1,87 +0,0 @@
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
import numpy as np
|
||||
|
||||
from ..typing import ArrayLike
|
||||
from . import Array
|
||||
from .request import Request
|
||||
from ..config import PluginConfig
|
||||
|
||||
def _get_config(plugin: str) -> PluginConfig: ...
|
||||
|
||||
class Format(object):
|
||||
@property
|
||||
def doc(self) -> str: ...
|
||||
@property
|
||||
def name(self) -> str: ...
|
||||
@property
|
||||
def description(self) -> str: ...
|
||||
@property
|
||||
def extensions(self) -> List[str]: ...
|
||||
@property
|
||||
def modes(self) -> str: ...
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
description: str,
|
||||
extensions: Union[str, list, tuple, None] = None,
|
||||
modes: str = None,
|
||||
) -> None: ...
|
||||
def __repr__(self) -> str: ...
|
||||
def __str__(self) -> str: ...
|
||||
def get_reader(self, request: Request) -> Reader: ...
|
||||
def get_writer(self, request: Request) -> Writer: ...
|
||||
def can_read(self, request: Request) -> bool: ...
|
||||
def can_write(self, request: Request) -> bool: ...
|
||||
def _can_read(self, request: Request) -> bool: ...
|
||||
def _can_write(self, request: Request) -> bool: ...
|
||||
|
||||
class _BaseReaderWriter(object):
|
||||
@property
|
||||
def format(self) -> Format: ...
|
||||
@property
|
||||
def request(self) -> Request: ...
|
||||
@property
|
||||
def closed(self) -> bool: ...
|
||||
def __init__(self, format: Format, request: Request) -> None: ...
|
||||
def __enter__(self) -> Format._BaseReaderWriter: ...
|
||||
def __exit__(self, type, value, traceback) -> None: ...
|
||||
def __del__(self) -> None: ...
|
||||
def close(self) -> None: ...
|
||||
def _checkClosed(self, msg=None) -> None: ...
|
||||
def _open(self, **kwargs) -> None: ...
|
||||
def _close(self) -> None: ...
|
||||
|
||||
class Reader(_BaseReaderWriter):
|
||||
def get_length(self) -> int: ...
|
||||
def get_data(self, index: int, **kwargs) -> Array: ...
|
||||
def get_next_data(self, **kwargs) -> Dict[str, Any]: ...
|
||||
def set_image_index(self, index: int, **kwargs) -> None: ...
|
||||
def get_meta_data(self, index: int = None) -> Dict[str, Any]: ...
|
||||
def iter_data(self) -> Array: ...
|
||||
def __iter__(self) -> Array: ...
|
||||
def __len__(self) -> int: ...
|
||||
def _get_length(self) -> int: ...
|
||||
def _get_data(self, index: int) -> Array: ...
|
||||
def _get_meta_data(self, index: int) -> Dict[str, Any]: ...
|
||||
|
||||
class Writer(_BaseReaderWriter):
|
||||
def append_data(self, im: ArrayLike, meta: Dict[str, Any] = None) -> None: ...
|
||||
def set_meta_data(self, meta: Dict[str, Any]) -> None: ...
|
||||
def _append_data(self, im: ArrayLike, meta: Dict[str, Any]) -> None: ...
|
||||
def _set_meta_data(self, meta: Dict[str, Any]) -> None: ...
|
||||
|
||||
class FormatManager(object):
|
||||
@property
|
||||
def _formats(self) -> List[Format]: ...
|
||||
def __repr__(self) -> str: ...
|
||||
def __iter__(self) -> Format: ...
|
||||
def __len__(self) -> int: ...
|
||||
def __str__(self) -> str: ...
|
||||
def __getitem__(self, name: str) -> Format: ...
|
||||
def sort(self, *names: str) -> None: ...
|
||||
def add_format(self, iio_format: Format, overwrite: bool = False) -> None: ...
|
||||
def search_read_format(self, request: Request) -> Optional[Format]: ...
|
||||
def search_write_format(self, request: Request) -> Optional[Format]: ...
|
||||
def get_format_names(self) -> List[str]: ...
|
||||
def show(self) -> None: ...
|
||||
@@ -1,298 +0,0 @@
|
||||
from pathlib import Path
|
||||
import warnings
|
||||
|
||||
from ..config import known_plugins
|
||||
from ..config.extensions import known_extensions
|
||||
from .request import (
|
||||
SPECIAL_READ_URIS,
|
||||
URI_FILENAME,
|
||||
InitializationError,
|
||||
IOMode,
|
||||
Request,
|
||||
)
|
||||
|
||||
|
||||
def imopen(
|
||||
uri,
|
||||
io_mode,
|
||||
*,
|
||||
plugin=None,
|
||||
extension=None,
|
||||
format_hint=None,
|
||||
legacy_mode=False,
|
||||
**kwargs,
|
||||
):
|
||||
"""Open an ImageResource.
|
||||
|
||||
.. warning::
|
||||
This warning is for pypy users. If you are not using a context manager,
|
||||
remember to deconstruct the returned plugin to avoid leaking the file
|
||||
handle to an unclosed file.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
uri : str or pathlib.Path or bytes or file or Request
|
||||
The :doc:`ImageResource <../../user_guide/requests>` to load the
|
||||
image from.
|
||||
io_mode : str
|
||||
The mode in which the file is opened. Possible values are::
|
||||
|
||||
``r`` - open the file for reading
|
||||
``w`` - open the file for writing
|
||||
|
||||
Depreciated since v2.9:
|
||||
A second character can be added to give the reader a hint on what
|
||||
the user expects. This will be ignored by new plugins and will
|
||||
only have an effect on legacy plugins. Possible values are::
|
||||
|
||||
``i`` for a single image,
|
||||
``I`` for multiple images,
|
||||
``v`` for a single volume,
|
||||
``V`` for multiple volumes,
|
||||
``?`` for don't care (default)
|
||||
|
||||
plugin : str, Plugin, or None
|
||||
The plugin to use. If set to None (default) imopen will perform a
|
||||
search for a matching plugin. If not None, this takes priority over
|
||||
the provided format hint.
|
||||
extension : str
|
||||
If not None, treat the provided ImageResource as if it had the given
|
||||
extension. This affects the order in which backends are considered, and
|
||||
when writing this may also influence the format used when encoding.
|
||||
format_hint : str
|
||||
Deprecated. Use `extension` instead.
|
||||
legacy_mode : bool
|
||||
If true (default) use the v2 behavior when searching for a suitable
|
||||
plugin. This will ignore v3 plugins and will check ``plugin``
|
||||
against known extensions if no plugin with the given name can be found.
|
||||
**kwargs : Any
|
||||
Additional keyword arguments will be passed to the plugin upon
|
||||
construction.
|
||||
|
||||
Notes
|
||||
-----
|
||||
Registered plugins are controlled via the ``known_plugins`` dict in
|
||||
``imageio.config``.
|
||||
|
||||
Passing a ``Request`` as the uri is only supported if ``legacy_mode``
|
||||
is ``True``. In this case ``io_mode`` is ignored.
|
||||
|
||||
Using the kwarg ``format_hint`` does not enforce the given format. It merely
|
||||
provides a `hint` to the selection process and plugin. The selection
|
||||
processes uses this hint for optimization; however, a plugin's decision how
|
||||
to read a ImageResource will - typically - still be based on the content of
|
||||
the resource.
|
||||
|
||||
|
||||
Examples
|
||||
--------
|
||||
|
||||
>>> import imageio.v3 as iio
|
||||
>>> with iio.imopen("/path/to/image.png", "r") as file:
|
||||
>>> im = file.read()
|
||||
|
||||
>>> with iio.imopen("/path/to/output.jpg", "w") as file:
|
||||
>>> file.write(im)
|
||||
|
||||
"""
|
||||
|
||||
if isinstance(uri, Request) and legacy_mode:
|
||||
warnings.warn(
|
||||
"`iio.core.Request` is a low-level object and using it"
|
||||
" directly as input to `imopen` is discouraged. This will raise"
|
||||
" an exception in ImageIO v3.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
request = uri
|
||||
uri = request.raw_uri
|
||||
io_mode = request.mode.io_mode
|
||||
request.format_hint = format_hint
|
||||
else:
|
||||
request = Request(uri, io_mode, format_hint=format_hint, extension=extension)
|
||||
|
||||
source = "<bytes>" if isinstance(uri, bytes) else uri
|
||||
|
||||
# fast-path based on plugin
|
||||
# (except in legacy mode)
|
||||
if plugin is not None:
|
||||
if isinstance(plugin, str):
|
||||
try:
|
||||
config = known_plugins[plugin]
|
||||
except KeyError:
|
||||
request.finish()
|
||||
raise ValueError(
|
||||
f"`{plugin}` is not a registered plugin name."
|
||||
) from None
|
||||
|
||||
def loader(request, **kwargs):
|
||||
return config.plugin_class(request, **kwargs)
|
||||
|
||||
elif not legacy_mode:
|
||||
|
||||
def loader(request, **kwargs):
|
||||
return plugin(request, **kwargs)
|
||||
|
||||
else:
|
||||
request.finish()
|
||||
raise ValueError("The `plugin` argument must be a string.")
|
||||
|
||||
try:
|
||||
return loader(request, **kwargs)
|
||||
except InitializationError as class_specific:
|
||||
err_from = class_specific
|
||||
err_type = RuntimeError if legacy_mode else IOError
|
||||
err_msg = f"`{plugin}` can not handle the given uri."
|
||||
except ImportError:
|
||||
err_from = None
|
||||
err_type = ImportError
|
||||
err_msg = (
|
||||
f"The `{config.name}` plugin is not installed. "
|
||||
f"Use `pip install imageio[{config.install_name}]` to install it."
|
||||
)
|
||||
except Exception as generic_error:
|
||||
err_from = generic_error
|
||||
err_type = IOError
|
||||
err_msg = f"An unknown error occured while initializing plugin `{plugin}`."
|
||||
|
||||
request.finish()
|
||||
raise err_type(err_msg) from err_from
|
||||
|
||||
# fast-path based on format_hint
|
||||
if request.format_hint is not None:
|
||||
for candidate_format in known_extensions[format_hint]:
|
||||
for plugin_name in candidate_format.priority:
|
||||
config = known_plugins[plugin_name]
|
||||
|
||||
# v2 compatibility; delete in v3
|
||||
if legacy_mode and not config.is_legacy:
|
||||
continue
|
||||
|
||||
try:
|
||||
candidate_plugin = config.plugin_class
|
||||
except ImportError:
|
||||
# not installed
|
||||
continue
|
||||
|
||||
try:
|
||||
plugin_instance = candidate_plugin(request, **kwargs)
|
||||
except InitializationError:
|
||||
# file extension doesn't match file type
|
||||
continue
|
||||
|
||||
return plugin_instance
|
||||
else:
|
||||
resource = (
|
||||
"<bytes>" if isinstance(request.raw_uri, bytes) else request.raw_uri
|
||||
)
|
||||
warnings.warn(f"`{resource}` can not be opened as a `{format_hint}` file.")
|
||||
|
||||
# fast-path based on file extension
|
||||
if request.extension in known_extensions:
|
||||
for candidate_format in known_extensions[request.extension]:
|
||||
for plugin_name in candidate_format.priority:
|
||||
config = known_plugins[plugin_name]
|
||||
|
||||
# v2 compatibility; delete in v3
|
||||
if legacy_mode and not config.is_legacy:
|
||||
continue
|
||||
|
||||
try:
|
||||
candidate_plugin = config.plugin_class
|
||||
except ImportError:
|
||||
# not installed
|
||||
continue
|
||||
|
||||
try:
|
||||
plugin_instance = candidate_plugin(request, **kwargs)
|
||||
except InitializationError:
|
||||
# file extension doesn't match file type
|
||||
continue
|
||||
|
||||
return plugin_instance
|
||||
|
||||
# error out for read-only special targets
|
||||
# this is hacky; can we come up with a better solution for this?
|
||||
if request.mode.io_mode == IOMode.write:
|
||||
if isinstance(uri, str) and uri.startswith(SPECIAL_READ_URIS):
|
||||
request.finish()
|
||||
err_type = ValueError if legacy_mode else IOError
|
||||
err_msg = f"`{source}` is read-only."
|
||||
raise err_type(err_msg)
|
||||
|
||||
# error out for directories
|
||||
# this is a bit hacky and should be cleaned once we decide
|
||||
# how to gracefully handle DICOM
|
||||
if request._uri_type == URI_FILENAME and Path(request.raw_uri).is_dir():
|
||||
request.finish()
|
||||
err_type = ValueError if legacy_mode else IOError
|
||||
err_msg = (
|
||||
"ImageIO does not generally support reading folders. "
|
||||
"Limited support may be available via specific plugins. "
|
||||
"Specify the plugin explicitly using the `plugin` kwarg, e.g. `plugin='DICOM'`"
|
||||
)
|
||||
raise err_type(err_msg)
|
||||
|
||||
# close the current request here and use fresh/new ones while trying each
|
||||
# plugin This is slow (means potentially reopening a resource several
|
||||
# times), but should only happen rarely because this is the fallback if all
|
||||
# else fails.
|
||||
request.finish()
|
||||
|
||||
# fallback option: try all plugins
|
||||
for config in known_plugins.values():
|
||||
# Note: for v2 compatibility
|
||||
# this branch can be removed in ImageIO v3.0
|
||||
if legacy_mode and not config.is_legacy:
|
||||
continue
|
||||
|
||||
# each plugin gets its own request
|
||||
request = Request(uri, io_mode, format_hint=format_hint)
|
||||
|
||||
try:
|
||||
plugin_instance = config.plugin_class(request, **kwargs)
|
||||
except InitializationError:
|
||||
continue
|
||||
except ImportError:
|
||||
continue
|
||||
else:
|
||||
return plugin_instance
|
||||
|
||||
err_type = ValueError if legacy_mode else IOError
|
||||
err_msg = f"Could not find a backend to open `{source}`` with iomode `{io_mode}`."
|
||||
|
||||
# check if a missing plugin could help
|
||||
if request.extension in known_extensions:
|
||||
missing_plugins = list()
|
||||
|
||||
formats = known_extensions[request.extension]
|
||||
plugin_names = [
|
||||
plugin for file_format in formats for plugin in file_format.priority
|
||||
]
|
||||
for name in plugin_names:
|
||||
config = known_plugins[name]
|
||||
|
||||
try:
|
||||
config.plugin_class
|
||||
continue
|
||||
except ImportError:
|
||||
missing_plugins.append(config)
|
||||
|
||||
if len(missing_plugins) > 0:
|
||||
install_candidates = "\n".join(
|
||||
[
|
||||
(
|
||||
f" {config.name}: "
|
||||
f"pip install imageio[{config.install_name}]"
|
||||
)
|
||||
for config in missing_plugins
|
||||
]
|
||||
)
|
||||
err_msg += (
|
||||
"\nBased on the extension, the following plugins might add capable backends:\n"
|
||||
f"{install_candidates}"
|
||||
)
|
||||
|
||||
request.finish()
|
||||
raise err_type(err_msg)
|
||||
@@ -1,87 +0,0 @@
|
||||
from typing import Literal, Type, TypeVar, overload
|
||||
|
||||
from ..plugins.opencv import OpenCVPlugin
|
||||
from ..plugins.pillow import PillowPlugin
|
||||
from ..plugins.pyav import PyAVPlugin
|
||||
from ..plugins.tifffile_v3 import TifffilePlugin
|
||||
from ..typing import ImageResource
|
||||
from .legacy_plugin_wrapper import LegacyPlugin
|
||||
from .v3_plugin_api import PluginV3
|
||||
|
||||
CustomPlugin = TypeVar("CustomPlugin", bound=PluginV3)
|
||||
|
||||
@overload
|
||||
def imopen(
|
||||
uri: ImageResource,
|
||||
io_mode: Literal["r", "w"],
|
||||
*,
|
||||
extension: str = None,
|
||||
format_hint: str = None,
|
||||
) -> PluginV3: ...
|
||||
@overload
|
||||
def imopen(
|
||||
uri: ImageResource,
|
||||
io_mode: Literal["r", "w"],
|
||||
*,
|
||||
plugin: str = None,
|
||||
format_hint: str = None,
|
||||
extension: str = None,
|
||||
legacy_mode: Literal[True],
|
||||
**kwargs,
|
||||
) -> LegacyPlugin: ...
|
||||
@overload
|
||||
def imopen(
|
||||
uri: ImageResource,
|
||||
io_mode: Literal["r", "w"],
|
||||
*,
|
||||
format_hint: str = None,
|
||||
extension: str = None,
|
||||
legacy_mode: Literal[False] = False,
|
||||
) -> PluginV3: ...
|
||||
@overload
|
||||
def imopen(
|
||||
uri: ImageResource,
|
||||
io_mode: Literal["r", "w"],
|
||||
*,
|
||||
plugin: Literal["pillow"],
|
||||
extension: str = None,
|
||||
format_hint: str = None,
|
||||
) -> PillowPlugin: ...
|
||||
@overload
|
||||
def imopen(
|
||||
uri: ImageResource,
|
||||
io_mode: Literal["r", "w"],
|
||||
*,
|
||||
plugin: Literal["pyav"],
|
||||
extension: str = None,
|
||||
format_hint: str = None,
|
||||
container: str = None,
|
||||
) -> PyAVPlugin: ...
|
||||
@overload
|
||||
def imopen(
|
||||
uri,
|
||||
io_mode: Literal["r", "w"],
|
||||
*,
|
||||
plugin: Literal["opencv"],
|
||||
extension: str = None,
|
||||
format_hint: str = None,
|
||||
) -> OpenCVPlugin: ...
|
||||
@overload
|
||||
def imopen(
|
||||
uri,
|
||||
io_mode: Literal["r", "w"],
|
||||
*,
|
||||
plugin: Literal["tifffile"],
|
||||
extension: str = None,
|
||||
format_hint: str = None,
|
||||
) -> TifffilePlugin: ...
|
||||
@overload
|
||||
def imopen(
|
||||
uri: ImageResource,
|
||||
io_mode: Literal["r", "w"],
|
||||
*,
|
||||
plugin: Type[CustomPlugin],
|
||||
extension: str = None,
|
||||
format_hint: str = None,
|
||||
**kwargs,
|
||||
) -> CustomPlugin: ...
|
||||
@@ -1,363 +0,0 @@
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
|
||||
from ..config import known_extensions
|
||||
from .request import InitializationError, IOMode
|
||||
from .v3_plugin_api import ImageProperties, PluginV3
|
||||
|
||||
|
||||
def _legacy_default_index(format):
|
||||
if format._name == "FFMPEG":
|
||||
index = Ellipsis
|
||||
elif format._name == "GIF-PIL":
|
||||
index = Ellipsis
|
||||
else:
|
||||
index = 0
|
||||
|
||||
return index
|
||||
|
||||
|
||||
class LegacyPlugin(PluginV3):
|
||||
"""A plugin to make old (v2.9) plugins compatible with v3.0
|
||||
|
||||
.. depreciated:: 2.9
|
||||
`legacy_get_reader` will be removed in a future version of imageio.
|
||||
`legacy_get_writer` will be removed in a future version of imageio.
|
||||
|
||||
This plugin is a wrapper around the old FormatManager class and exposes
|
||||
all the old plugins via the new API. On top of this it has
|
||||
``legacy_get_reader`` and ``legacy_get_writer`` methods to allow using
|
||||
it with the v2.9 API.
|
||||
|
||||
Methods
|
||||
-------
|
||||
read(index=None, **kwargs)
|
||||
Read the image at position ``index``.
|
||||
write(image, **kwargs)
|
||||
Write image to the URI.
|
||||
iter(**kwargs)
|
||||
Iteratively yield images from the given URI.
|
||||
get_meta(index=None)
|
||||
Return the metadata for the image at position ``index``.
|
||||
legacy_get_reader(**kwargs)
|
||||
Returns the v2.9 image reader. (depreciated)
|
||||
legacy_get_writer(**kwargs)
|
||||
Returns the v2.9 image writer. (depreciated)
|
||||
|
||||
Examples
|
||||
--------
|
||||
|
||||
>>> import imageio.v3 as iio
|
||||
>>> with iio.imopen("/path/to/image.tiff", "r", legacy_mode=True) as file:
|
||||
>>> reader = file.legacy_get_reader() # depreciated
|
||||
>>> for im in file.iter():
|
||||
>>> print(im.shape)
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, request, legacy_plugin):
|
||||
"""Instantiate a new Legacy Plugin
|
||||
|
||||
Parameters
|
||||
----------
|
||||
uri : {str, pathlib.Path, bytes, file}
|
||||
The resource to load the image from, e.g. a filename, pathlib.Path,
|
||||
http address or file object, see the docs for more info.
|
||||
legacy_plugin : Format
|
||||
The (legacy) format to use to interface with the URI.
|
||||
|
||||
"""
|
||||
self._request = request
|
||||
self._format = legacy_plugin
|
||||
|
||||
source = (
|
||||
"<bytes>"
|
||||
if isinstance(self._request.raw_uri, bytes)
|
||||
else self._request.raw_uri
|
||||
)
|
||||
if self._request.mode.io_mode == IOMode.read:
|
||||
if not self._format.can_read(request):
|
||||
raise InitializationError(
|
||||
f"`{self._format.name}`" f" can not read `{source}`."
|
||||
)
|
||||
else:
|
||||
if not self._format.can_write(request):
|
||||
raise InitializationError(
|
||||
f"`{self._format.name}`" f" can not write to `{source}`."
|
||||
)
|
||||
|
||||
def legacy_get_reader(self, **kwargs):
|
||||
"""legacy_get_reader(**kwargs)
|
||||
|
||||
a utility method to provide support vor the V2.9 API
|
||||
|
||||
Parameters
|
||||
----------
|
||||
kwargs : ...
|
||||
Further keyword arguments are passed to the reader. See :func:`.help`
|
||||
to see what arguments are available for a particular format.
|
||||
"""
|
||||
|
||||
# Note: this will break thread-safety
|
||||
self._request._kwargs = kwargs
|
||||
|
||||
# safeguard for DICOM plugin reading from folders
|
||||
try:
|
||||
assert Path(self._request.filename).is_dir()
|
||||
except OSError:
|
||||
pass # not a valid path on this OS
|
||||
except AssertionError:
|
||||
pass # not a folder
|
||||
else:
|
||||
return self._format.get_reader(self._request)
|
||||
|
||||
self._request.get_file().seek(0)
|
||||
return self._format.get_reader(self._request)
|
||||
|
||||
def read(self, *, index=None, **kwargs):
|
||||
"""
|
||||
Parses the given URI and creates a ndarray from it.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
index : {integer, None}
|
||||
If the URI contains a list of ndimages return the index-th
|
||||
image. If None, stack all images into an ndimage along the
|
||||
0-th dimension (equivalent to np.stack(imgs, axis=0)).
|
||||
kwargs : ...
|
||||
Further keyword arguments are passed to the reader. See
|
||||
:func:`.help` to see what arguments are available for a particular
|
||||
format.
|
||||
|
||||
Returns
|
||||
-------
|
||||
ndimage : np.ndarray
|
||||
A numpy array containing the decoded image data.
|
||||
|
||||
"""
|
||||
|
||||
if index is None:
|
||||
index = _legacy_default_index(self._format)
|
||||
|
||||
if index is Ellipsis:
|
||||
img = np.stack([im for im in self.iter(**kwargs)])
|
||||
return img
|
||||
|
||||
reader = self.legacy_get_reader(**kwargs)
|
||||
return reader.get_data(index)
|
||||
|
||||
def legacy_get_writer(self, **kwargs):
|
||||
"""legacy_get_writer(**kwargs)
|
||||
|
||||
Returns a :class:`.Writer` object which can be used to write data
|
||||
and meta data to the specified file.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
kwargs : ...
|
||||
Further keyword arguments are passed to the writer. See :func:`.help`
|
||||
to see what arguments are available for a particular format.
|
||||
"""
|
||||
|
||||
# Note: this will break thread-safety
|
||||
self._request._kwargs = kwargs
|
||||
return self._format.get_writer(self._request)
|
||||
|
||||
def write(self, ndimage, *, is_batch=None, metadata=None, **kwargs):
|
||||
"""
|
||||
Write an ndimage to the URI specified in path.
|
||||
|
||||
If the URI points to a file on the current host and the file does not
|
||||
yet exist it will be created. If the file exists already, it will be
|
||||
appended if possible; otherwise, it will be replaced.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
ndimage : numpy.ndarray
|
||||
The ndimage or list of ndimages to write.
|
||||
is_batch : bool
|
||||
If True, treat the supplied ndimage as a batch of images. If False,
|
||||
treat the supplied ndimage as a single image. If None, try to
|
||||
determine ``is_batch`` from the ndimage's shape and ndim.
|
||||
metadata : dict
|
||||
The metadata passed to write alongside the image.
|
||||
kwargs : ...
|
||||
Further keyword arguments are passed to the writer. See
|
||||
:func:`.help` to see what arguments are available for a
|
||||
particular format.
|
||||
|
||||
|
||||
Returns
|
||||
-------
|
||||
buffer : bytes
|
||||
When writing to the special target "<bytes>", this function will
|
||||
return the encoded image data as a bytes string. Otherwise it
|
||||
returns None.
|
||||
|
||||
Notes
|
||||
-----
|
||||
Automatically determining ``is_batch`` may fail for some images due to
|
||||
shape aliasing. For example, it may classify a channel-first color image
|
||||
as a batch of gray images. In most cases this automatic deduction works
|
||||
fine (it has for almost a decade), but if you do have one of those edge
|
||||
cases (or are worried that you might) consider explicitly setting
|
||||
``is_batch``.
|
||||
|
||||
"""
|
||||
|
||||
if is_batch or isinstance(ndimage, (list, tuple)):
|
||||
pass # ndimage is list of images
|
||||
elif is_batch is False:
|
||||
ndimage = [ndimage]
|
||||
else:
|
||||
# Write the largest possible block by guessing the meaning of each
|
||||
# dimension from the shape/ndim and then checking if any batch
|
||||
# dimensions are left.
|
||||
ndimage = np.asanyarray(ndimage)
|
||||
batch_dims = ndimage.ndim
|
||||
|
||||
# two spatial dimensions
|
||||
batch_dims = max(batch_dims - 2, 0)
|
||||
|
||||
# packed (channel-last) image
|
||||
if ndimage.ndim >= 3 and ndimage.shape[-1] < 5:
|
||||
batch_dims = max(batch_dims - 1, 0)
|
||||
|
||||
# format supports volumetric images
|
||||
ext_infos = known_extensions.get(self._request.extension, list())
|
||||
for ext_info in ext_infos:
|
||||
if self._format.name in ext_info.priority and ext_info.volume_support:
|
||||
batch_dims = max(batch_dims - 1, 0)
|
||||
break
|
||||
|
||||
if batch_dims == 0:
|
||||
ndimage = [ndimage]
|
||||
|
||||
with self.legacy_get_writer(**kwargs) as writer:
|
||||
for image in ndimage:
|
||||
image = np.asanyarray(image)
|
||||
|
||||
if image.ndim < 2:
|
||||
raise ValueError(
|
||||
"The image must have at least two spatial dimensions."
|
||||
)
|
||||
|
||||
if not np.issubdtype(image.dtype, np.number) and not np.issubdtype(
|
||||
image.dtype, bool
|
||||
):
|
||||
raise ValueError(
|
||||
f"All images have to be numeric, and not `{image.dtype}`."
|
||||
)
|
||||
|
||||
writer.append_data(image, metadata)
|
||||
|
||||
return writer.request.get_result()
|
||||
|
||||
def iter(self, **kwargs):
|
||||
"""Iterate over a list of ndimages given by the URI
|
||||
|
||||
Parameters
|
||||
----------
|
||||
kwargs : ...
|
||||
Further keyword arguments are passed to the reader. See
|
||||
:func:`.help` to see what arguments are available for a particular
|
||||
format.
|
||||
"""
|
||||
|
||||
reader = self.legacy_get_reader(**kwargs)
|
||||
for image in reader:
|
||||
yield image
|
||||
|
||||
def properties(self, index=None):
|
||||
"""Standardized ndimage metadata.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
index : int
|
||||
The index of the ndimage for which to return properties. If the
|
||||
index is out of bounds a ``ValueError`` is raised. If ``None``,
|
||||
return the properties for the ndimage stack. If this is impossible,
|
||||
e.g., due to shape missmatch, an exception will be raised.
|
||||
|
||||
Returns
|
||||
-------
|
||||
properties : ImageProperties
|
||||
A dataclass filled with standardized image metadata.
|
||||
|
||||
"""
|
||||
|
||||
if index is None:
|
||||
index = _legacy_default_index(self._format)
|
||||
|
||||
# for backwards compatibility ... actually reads pixel data :(
|
||||
if index is Ellipsis:
|
||||
image = self.read(index=0)
|
||||
n_images = self.legacy_get_reader().get_length()
|
||||
return ImageProperties(
|
||||
shape=(n_images, *image.shape),
|
||||
dtype=image.dtype,
|
||||
n_images=n_images,
|
||||
is_batch=True,
|
||||
)
|
||||
|
||||
image = self.read(index=index)
|
||||
return ImageProperties(
|
||||
shape=image.shape,
|
||||
dtype=image.dtype,
|
||||
is_batch=False,
|
||||
)
|
||||
|
||||
def get_meta(self, *, index=None):
|
||||
"""Read ndimage metadata from the URI
|
||||
|
||||
Parameters
|
||||
----------
|
||||
index : {integer, None}
|
||||
If the URI contains a list of ndimages return the metadata
|
||||
corresponding to the index-th image. If None, behavior depends on
|
||||
the used api
|
||||
|
||||
Legacy-style API: return metadata of the first element (index=0)
|
||||
New-style API: Behavior depends on the used Plugin.
|
||||
|
||||
Returns
|
||||
-------
|
||||
metadata : dict
|
||||
A dictionary of metadata.
|
||||
|
||||
"""
|
||||
|
||||
return self.metadata(index=index, exclude_applied=False)
|
||||
|
||||
def metadata(self, index=None, exclude_applied: bool = True):
|
||||
"""Format-Specific ndimage metadata.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
index : int
|
||||
The index of the ndimage to read. If the index is out of bounds a
|
||||
``ValueError`` is raised. If ``None``, global metadata is returned.
|
||||
exclude_applied : bool
|
||||
This parameter exists for compatibility and has no effect. Legacy
|
||||
plugins always report all metadata they find.
|
||||
|
||||
Returns
|
||||
-------
|
||||
metadata : dict
|
||||
A dictionary filled with format-specific metadata fields and their
|
||||
values.
|
||||
|
||||
"""
|
||||
|
||||
if index is None:
|
||||
index = _legacy_default_index(self._format)
|
||||
|
||||
return self.legacy_get_reader().get_meta_data(index=index)
|
||||
|
||||
def __del__(self) -> None:
|
||||
pass
|
||||
# turns out we can't close the file here for LegacyPlugin
|
||||
# because it would break backwards compatibility
|
||||
# with legacy_get_writer and legacy_get_reader
|
||||
# self._request.finish()
|
||||
@@ -1,27 +0,0 @@
|
||||
import numpy as np
|
||||
from typing import Optional, Dict, Any, Union, List, Iterator
|
||||
|
||||
from .request import Request
|
||||
from .v3_plugin_api import PluginV3, ImageProperties
|
||||
from .format import Format
|
||||
from ..typing import ArrayLike
|
||||
|
||||
class LegacyPlugin(PluginV3):
|
||||
def __init__(self, request: Request, legacy_plugin: Format) -> None: ...
|
||||
def legacy_get_reader(self, **kwargs) -> Format.Reader: ...
|
||||
def read(self, *, index: Optional[int] = 0, **kwargs) -> np.ndarray: ...
|
||||
def legacy_get_writer(self, **kwargs) -> Format.Writer: ...
|
||||
def write(
|
||||
self,
|
||||
ndimage: Union[ArrayLike, List[ArrayLike]],
|
||||
*,
|
||||
is_batch: bool = None,
|
||||
**kwargs
|
||||
) -> Optional[bytes]: ...
|
||||
def iter(self, **kwargs) -> Iterator[np.ndarray]: ...
|
||||
def properties(self, index: Optional[int] = 0) -> ImageProperties: ...
|
||||
def get_meta(self, *, index: Optional[int] = 0) -> Dict[str, Any]: ...
|
||||
def metadata(
|
||||
self, index: Optional[int] = 0, exclude_applied: bool = True
|
||||
) -> Dict[str, Any]: ...
|
||||
def __del__(self) -> None: ...
|
||||
@@ -1,750 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# imageio is distributed under the terms of the (new) BSD License.
|
||||
|
||||
"""
|
||||
Definition of the Request object, which acts as a kind of bridge between
|
||||
what the user wants and what the plugins can.
|
||||
"""
|
||||
|
||||
import os
|
||||
from io import BytesIO
|
||||
import zipfile
|
||||
import tempfile
|
||||
import shutil
|
||||
import enum
|
||||
import warnings
|
||||
|
||||
from ..core import urlopen, get_remote_file
|
||||
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlparse
|
||||
from typing import Optional
|
||||
|
||||
# URI types
|
||||
URI_BYTES = 1
|
||||
URI_FILE = 2
|
||||
URI_FILENAME = 3
|
||||
URI_ZIPPED = 4
|
||||
URI_HTTP = 5
|
||||
URI_FTP = 6
|
||||
|
||||
|
||||
class IOMode(str, enum.Enum):
|
||||
"""Available Image modes
|
||||
|
||||
This is a helper enum for ``Request.Mode`` which is a composite of a
|
||||
``Request.ImageMode`` and ``Request.IOMode``. The IOMode that tells the
|
||||
plugin if the resource should be read from or written to. Available values are
|
||||
|
||||
- read ("r"): Read from the specified resource
|
||||
- write ("w"): Write to the specified resource
|
||||
|
||||
"""
|
||||
|
||||
read = "r"
|
||||
write = "w"
|
||||
|
||||
|
||||
class ImageMode(str, enum.Enum):
|
||||
"""Available Image modes
|
||||
|
||||
This is a helper enum for ``Request.Mode`` which is a composite of a
|
||||
``Request.ImageMode`` and ``Request.IOMode``. The image mode that tells the
|
||||
plugin the desired (and expected) image shape. Available values are
|
||||
|
||||
- single_image ("i"): Return a single image extending in two spacial
|
||||
dimensions
|
||||
- multi_image ("I"): Return a list of images extending in two spacial
|
||||
dimensions
|
||||
- single_volume ("v"): Return an image extending into multiple dimensions.
|
||||
E.g. three spacial dimensions for image stacks, or two spatial and one
|
||||
time dimension for videos
|
||||
- multi_volume ("V"): Return a list of images extending into multiple
|
||||
dimensions.
|
||||
- any_mode ("?"): Return an image in any format (the plugin decides the
|
||||
appropriate action).
|
||||
|
||||
"""
|
||||
|
||||
single_image = "i"
|
||||
multi_image = "I"
|
||||
single_volume = "v"
|
||||
multi_volume = "V"
|
||||
any_mode = "?"
|
||||
|
||||
|
||||
@enum.unique
|
||||
class Mode(str, enum.Enum):
|
||||
"""The mode to use when interacting with the resource
|
||||
|
||||
``Request.Mode`` is a composite of ``Request.ImageMode`` and
|
||||
``Request.IOMode``. The image mode that tells the plugin the desired (and
|
||||
expected) image shape and the ``Request.IOMode`` tells the plugin the way
|
||||
the resource should be interacted with. For a detailed description of the
|
||||
available modes, see the documentation for ``Request.ImageMode`` and
|
||||
``Request.IOMode`` respectively.
|
||||
|
||||
Available modes are all combinations of ``Request.IOMode`` and ``Request.ImageMode``:
|
||||
|
||||
- read_single_image ("ri")
|
||||
- read_multi_image ("rI")
|
||||
- read_single_volume ("rv")
|
||||
- read_multi_volume ("rV")
|
||||
- read_any ("r?")
|
||||
- write_single_image ("wi")
|
||||
- write_multi_image ("wI")
|
||||
- write_single_volume ("wv")
|
||||
- write_multi_volume ("wV")
|
||||
- write_any ("w?")
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> Request.Mode("rI") # a list of simple images should be read from the resource
|
||||
>>> Request.Mode("wv") # a single volume should be written to the resource
|
||||
|
||||
"""
|
||||
|
||||
read_single_image = "ri"
|
||||
read_multi_image = "rI"
|
||||
read_single_volume = "rv"
|
||||
read_multi_volume = "rV"
|
||||
read_any = "r?"
|
||||
write_single_image = "wi"
|
||||
write_multi_image = "wI"
|
||||
write_single_volume = "wv"
|
||||
write_multi_volume = "wV"
|
||||
write_any = "w?"
|
||||
|
||||
@classmethod
|
||||
def _missing_(cls, value):
|
||||
"""Enable Mode("r") and Mode("w")
|
||||
|
||||
The sunder method ``_missing_`` is called whenever the constructor fails
|
||||
to directly look up the corresponding enum value from the given input.
|
||||
In our case, we use it to convert the modes "r" and "w" (from the v3
|
||||
API) into their legacy versions "r?" and "w?".
|
||||
|
||||
More info on _missing_:
|
||||
https://docs.python.org/3/library/enum.html#supported-sunder-names
|
||||
"""
|
||||
|
||||
if value == "r":
|
||||
return cls("r?")
|
||||
elif value == "w":
|
||||
return cls("w?")
|
||||
else:
|
||||
raise ValueError(f"{value} is no valid Mode.")
|
||||
|
||||
@property
|
||||
def io_mode(self) -> IOMode:
|
||||
return IOMode(self.value[0])
|
||||
|
||||
@property
|
||||
def image_mode(self) -> ImageMode:
|
||||
return ImageMode(self.value[1])
|
||||
|
||||
def __getitem__(self, key):
|
||||
"""For backwards compatibility with the old non-enum modes"""
|
||||
if key == 0:
|
||||
return self.io_mode
|
||||
elif key == 1:
|
||||
return self.image_mode
|
||||
else:
|
||||
raise IndexError(f"Mode has no item {key}")
|
||||
|
||||
|
||||
SPECIAL_READ_URIS = "<video", "<screen>", "<clipboard>"
|
||||
|
||||
# The user can use this string in a write call to get the data back as bytes.
|
||||
RETURN_BYTES = "<bytes>"
|
||||
|
||||
# Example images that will be auto-downloaded
|
||||
EXAMPLE_IMAGES = {
|
||||
"astronaut.png": "Image of the astronaut Eileen Collins",
|
||||
"camera.png": "A grayscale image of a photographer",
|
||||
"checkerboard.png": "Black and white image of a chekerboard",
|
||||
"wood.jpg": "A (repeatable) texture of wooden planks",
|
||||
"bricks.jpg": "A (repeatable) texture of stone bricks",
|
||||
"clock.png": "Photo of a clock with motion blur (Stefan van der Walt)",
|
||||
"coffee.png": "Image of a cup of coffee (Rachel Michetti)",
|
||||
"chelsea.png": "Image of Stefan's cat",
|
||||
"wikkie.png": "Image of Almar's cat",
|
||||
"coins.png": "Image showing greek coins from Pompeii",
|
||||
"horse.png": "Image showing the silhouette of a horse (Andreas Preuss)",
|
||||
"hubble_deep_field.png": "Photograph taken by Hubble telescope (NASA)",
|
||||
"immunohistochemistry.png": "Immunohistochemical (IHC) staining",
|
||||
"moon.png": "Image showing a portion of the surface of the moon",
|
||||
"page.png": "A scanned page of text",
|
||||
"text.png": "A photograph of handdrawn text",
|
||||
"chelsea.zip": "The chelsea.png in a zipfile (for testing)",
|
||||
"chelsea.bsdf": "The chelsea.png in a BSDF file(for testing)",
|
||||
"newtonscradle.gif": "Animated GIF of a newton's cradle",
|
||||
"cockatoo.mp4": "Video file of a cockatoo",
|
||||
"stent.npz": "Volumetric image showing a stented abdominal aorta",
|
||||
"meadow_cube.jpg": "A cubemap image of a meadow, e.g. to render a skybox.",
|
||||
}
|
||||
|
||||
|
||||
class Request(object):
|
||||
"""ImageResource handling utility.
|
||||
|
||||
Represents a request for reading or saving an image resource. This
|
||||
object wraps information to that request and acts as an interface
|
||||
for the plugins to several resources; it allows the user to read
|
||||
from filenames, files, http, zipfiles, raw bytes, etc., but offer
|
||||
a simple interface to the plugins via ``get_file()`` and
|
||||
``get_local_filename()``.
|
||||
|
||||
For each read/write operation a single Request instance is used and passed
|
||||
to the can_read/can_write method of a format, and subsequently to
|
||||
the Reader/Writer class. This allows rudimentary passing of
|
||||
information between different formats and between a format and
|
||||
associated reader/writer.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
uri : {str, bytes, file}
|
||||
The resource to load the image from.
|
||||
mode : str
|
||||
The first character is "r" or "w", indicating a read or write
|
||||
request. The second character is used to indicate the kind of data:
|
||||
"i" for an image, "I" for multiple images, "v" for a volume,
|
||||
"V" for multiple volumes, "?" for don't care.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, uri, mode, *, extension=None, format_hint: str = None, **kwargs):
|
||||
# General
|
||||
self.raw_uri = uri
|
||||
self._uri_type = None
|
||||
self._filename = None
|
||||
self._extension = None
|
||||
self._format_hint = None
|
||||
self._kwargs = kwargs
|
||||
self._result = None # Some write actions may have a result
|
||||
|
||||
# To handle the user-side
|
||||
self._filename_zip = None # not None if a zipfile is used
|
||||
self._bytes = None # Incoming bytes
|
||||
self._zipfile = None # To store a zipfile instance (if used)
|
||||
|
||||
# To handle the plugin side
|
||||
self._file = None # To store the file instance
|
||||
self._file_is_local = False # whether the data needs to be copied at end
|
||||
self._filename_local = None # not None if using tempfile on this FS
|
||||
self._firstbytes = None # For easy header parsing
|
||||
|
||||
# To store formats that may be able to fulfil this request
|
||||
# self._potential_formats = []
|
||||
|
||||
# Check mode
|
||||
try:
|
||||
self._mode = Mode(mode)
|
||||
except ValueError:
|
||||
raise ValueError(f"Invalid Request.Mode: {mode}")
|
||||
|
||||
# Parse what was given
|
||||
self._parse_uri(uri)
|
||||
|
||||
# Set extension
|
||||
if extension is not None:
|
||||
if extension[0] != ".":
|
||||
raise ValueError(
|
||||
"`extension` should be a file extension starting with a `.`,"
|
||||
f" but is `{extension}`."
|
||||
)
|
||||
self._extension = extension
|
||||
elif self._filename is not None:
|
||||
if self._uri_type in (URI_FILENAME, URI_ZIPPED):
|
||||
path = self._filename
|
||||
else:
|
||||
path = urlparse(self._filename).path
|
||||
ext = Path(path).suffix.lower()
|
||||
self._extension = ext if ext != "" else None
|
||||
|
||||
if format_hint is not None:
|
||||
warnings.warn(
|
||||
"The usage of `format_hint` is deprecated and will be removed "
|
||||
"in ImageIO v3. Use `extension` instead.",
|
||||
DeprecationWarning,
|
||||
)
|
||||
|
||||
if format_hint is not None and format_hint[0] != ".":
|
||||
raise ValueError(
|
||||
"`format_hint` should be a file extension starting with a `.`,"
|
||||
f" but is `{format_hint}`."
|
||||
)
|
||||
|
||||
self.format_hint = format_hint
|
||||
|
||||
def _parse_uri(self, uri):
|
||||
"""Try to figure our what we were given"""
|
||||
is_read_request = self.mode.io_mode is IOMode.read
|
||||
is_write_request = self.mode.io_mode is IOMode.write
|
||||
|
||||
if isinstance(uri, str):
|
||||
# Explicit
|
||||
if uri.startswith("imageio:"):
|
||||
if is_write_request:
|
||||
raise RuntimeError("Cannot write to the standard images.")
|
||||
fn = uri.split(":", 1)[-1].lower()
|
||||
fn, _, zip_part = fn.partition(".zip/")
|
||||
if zip_part:
|
||||
fn += ".zip"
|
||||
if fn not in EXAMPLE_IMAGES:
|
||||
raise ValueError("Unknown standard image %r." % fn)
|
||||
self._uri_type = URI_FILENAME
|
||||
self._filename = get_remote_file("images/" + fn, auto=True)
|
||||
if zip_part:
|
||||
self._filename += "/" + zip_part
|
||||
elif uri.startswith("http://") or uri.startswith("https://"):
|
||||
self._uri_type = URI_HTTP
|
||||
self._filename = uri
|
||||
elif uri.startswith("ftp://") or uri.startswith("ftps://"):
|
||||
self._uri_type = URI_FTP
|
||||
self._filename = uri
|
||||
elif uri.startswith("file://"):
|
||||
self._uri_type = URI_FILENAME
|
||||
self._filename = uri[7:]
|
||||
elif uri.startswith(SPECIAL_READ_URIS) and is_read_request:
|
||||
self._uri_type = URI_BYTES
|
||||
self._filename = uri
|
||||
elif uri.startswith(RETURN_BYTES) and is_write_request:
|
||||
self._uri_type = URI_BYTES
|
||||
self._filename = uri
|
||||
else:
|
||||
self._uri_type = URI_FILENAME
|
||||
self._filename = uri
|
||||
|
||||
elif isinstance(uri, memoryview) and is_read_request:
|
||||
self._uri_type = URI_BYTES
|
||||
self._filename = "<bytes>"
|
||||
self._bytes = uri.tobytes()
|
||||
elif isinstance(uri, bytes) and is_read_request:
|
||||
self._uri_type = URI_BYTES
|
||||
self._filename = "<bytes>"
|
||||
self._bytes = uri
|
||||
elif isinstance(uri, Path):
|
||||
self._uri_type = URI_FILENAME
|
||||
self._filename = str(uri)
|
||||
# Files
|
||||
elif is_read_request:
|
||||
if hasattr(uri, "read") and hasattr(uri, "close"):
|
||||
self._uri_type = URI_FILE
|
||||
self._filename = "<file>"
|
||||
self._file = uri # Data must be read from here
|
||||
elif is_write_request:
|
||||
if hasattr(uri, "write") and hasattr(uri, "close"):
|
||||
self._uri_type = URI_FILE
|
||||
self._filename = "<file>"
|
||||
self._file = uri # Data must be written here
|
||||
|
||||
# Expand user dir
|
||||
if self._uri_type == URI_FILENAME and self._filename.startswith("~"):
|
||||
self._filename = os.path.expanduser(self._filename)
|
||||
|
||||
# Check if a zipfile
|
||||
if self._uri_type == URI_FILENAME:
|
||||
# Search for zip extension followed by a path separater
|
||||
for needle in [".zip/", ".zip\\"]:
|
||||
zip_i = self._filename.lower().find(needle)
|
||||
if zip_i > 0:
|
||||
zip_i += 4
|
||||
zip_path = self._filename[:zip_i]
|
||||
if os.path.isdir(zip_path):
|
||||
pass # is an existing dir (see #548)
|
||||
elif is_write_request or os.path.isfile(zip_path):
|
||||
self._uri_type = URI_ZIPPED
|
||||
self._filename_zip = (
|
||||
zip_path,
|
||||
self._filename[zip_i:].lstrip("/\\"),
|
||||
)
|
||||
break
|
||||
|
||||
# Check if we could read it
|
||||
if self._uri_type is None:
|
||||
uri_r = repr(uri)
|
||||
if len(uri_r) > 60:
|
||||
uri_r = uri_r[:57] + "..."
|
||||
raise IOError("Cannot understand given URI: %s." % uri_r)
|
||||
|
||||
# Check if this is supported
|
||||
noWriting = [URI_HTTP, URI_FTP]
|
||||
if is_write_request and self._uri_type in noWriting:
|
||||
raise IOError("imageio does not support writing to http/ftp.")
|
||||
|
||||
# Deprecated way to load standard images, give a sensible error message
|
||||
if is_read_request and self._uri_type in [URI_FILENAME, URI_ZIPPED]:
|
||||
fn = self._filename
|
||||
if self._filename_zip:
|
||||
fn = self._filename_zip[0]
|
||||
if (not os.path.exists(fn)) and (fn in EXAMPLE_IMAGES):
|
||||
raise IOError(
|
||||
"No such file: %r. This file looks like one of "
|
||||
"the standard images, but from imageio 2.1, "
|
||||
"standard images have to be specified using "
|
||||
'"imageio:%s".' % (fn, fn)
|
||||
)
|
||||
|
||||
# Make filename absolute
|
||||
if self._uri_type in [URI_FILENAME, URI_ZIPPED]:
|
||||
if self._filename_zip:
|
||||
self._filename_zip = (
|
||||
os.path.abspath(self._filename_zip[0]),
|
||||
self._filename_zip[1],
|
||||
)
|
||||
else:
|
||||
self._filename = os.path.abspath(self._filename)
|
||||
|
||||
# Check whether file name is valid
|
||||
if self._uri_type in [URI_FILENAME, URI_ZIPPED]:
|
||||
fn = self._filename
|
||||
if self._filename_zip:
|
||||
fn = self._filename_zip[0]
|
||||
if is_read_request:
|
||||
# Reading: check that the file exists (but is allowed a dir)
|
||||
if not os.path.exists(fn):
|
||||
raise FileNotFoundError("No such file: '%s'" % fn)
|
||||
else:
|
||||
# Writing: check that the directory to write to does exist
|
||||
dn = os.path.dirname(fn)
|
||||
if not os.path.exists(dn):
|
||||
raise FileNotFoundError("The directory %r does not exist" % dn)
|
||||
|
||||
@property
|
||||
def filename(self):
|
||||
"""Name of the ImageResource.
|
||||
|
||||
|
||||
The uri for which reading/saving was requested. This
|
||||
can be a filename, an http address, or other resource
|
||||
identifier. Do not rely on the filename to obtain the data,
|
||||
but use ``get_file()`` or ``get_local_filename()`` instead.
|
||||
"""
|
||||
return self._filename
|
||||
|
||||
@property
|
||||
def extension(self) -> str:
|
||||
"""The (lowercase) extension of the requested filename.
|
||||
Suffixes in url's are stripped. Can be None if the request is
|
||||
not based on a filename.
|
||||
"""
|
||||
return self._extension
|
||||
|
||||
@property
|
||||
def format_hint(self) -> Optional[str]:
|
||||
return self._format_hint
|
||||
|
||||
@format_hint.setter
|
||||
def format_hint(self, format: str) -> None:
|
||||
self._format_hint = format
|
||||
if self._extension is None:
|
||||
self._extension = format
|
||||
|
||||
@property
|
||||
def mode(self):
|
||||
"""The mode of the request. The first character is "r" or "w",
|
||||
indicating a read or write request. The second character is
|
||||
used to indicate the kind of data:
|
||||
"i" for an image, "I" for multiple images, "v" for a volume,
|
||||
"V" for multiple volumes, "?" for don't care.
|
||||
"""
|
||||
return self._mode
|
||||
|
||||
@property
|
||||
def kwargs(self):
|
||||
"""The dict of keyword arguments supplied by the user."""
|
||||
return self._kwargs
|
||||
|
||||
# For obtaining data
|
||||
|
||||
def get_file(self):
|
||||
"""get_file()
|
||||
Get a file object for the resource associated with this request.
|
||||
If this is a reading request, the file is in read mode,
|
||||
otherwise in write mode. This method is not thread safe. Plugins
|
||||
should not close the file when done.
|
||||
|
||||
This is the preferred way to read/write the data. But if a
|
||||
format cannot handle file-like objects, they should use
|
||||
``get_local_filename()``.
|
||||
"""
|
||||
want_to_write = self.mode.io_mode is IOMode.write
|
||||
|
||||
# Is there already a file?
|
||||
# Either _uri_type == URI_FILE, or we already opened the file,
|
||||
# e.g. by using firstbytes
|
||||
if self._file is not None:
|
||||
return self._file
|
||||
|
||||
if self._uri_type == URI_BYTES:
|
||||
if want_to_write:
|
||||
# Create new file object, we catch the bytes in finish()
|
||||
self._file = BytesIO()
|
||||
self._file_is_local = True
|
||||
else:
|
||||
self._file = BytesIO(self._bytes)
|
||||
|
||||
elif self._uri_type == URI_FILENAME:
|
||||
if want_to_write:
|
||||
self._file = open(self.filename, "wb")
|
||||
else:
|
||||
self._file = open(self.filename, "rb")
|
||||
|
||||
elif self._uri_type == URI_ZIPPED:
|
||||
# Get the correct filename
|
||||
filename, name = self._filename_zip
|
||||
if want_to_write:
|
||||
# Create new file object, we catch the bytes in finish()
|
||||
self._file = BytesIO()
|
||||
self._file_is_local = True
|
||||
else:
|
||||
# Open zipfile and open new file object for specific file
|
||||
self._zipfile = zipfile.ZipFile(filename, "r")
|
||||
self._file = self._zipfile.open(name, "r")
|
||||
self._file = SeekableFileObject(self._file)
|
||||
|
||||
elif self._uri_type in [URI_HTTP or URI_FTP]:
|
||||
assert not want_to_write # This should have been tested in init
|
||||
timeout = os.getenv("IMAGEIO_REQUEST_TIMEOUT")
|
||||
if timeout is None or not timeout.isdigit():
|
||||
timeout = 5
|
||||
self._file = urlopen(self.filename, timeout=float(timeout))
|
||||
self._file = SeekableFileObject(self._file)
|
||||
|
||||
return self._file
|
||||
|
||||
def get_local_filename(self):
|
||||
"""get_local_filename()
|
||||
If the filename is an existing file on this filesystem, return
|
||||
that. Otherwise a temporary file is created on the local file
|
||||
system which can be used by the format to read from or write to.
|
||||
"""
|
||||
|
||||
if self._uri_type == URI_FILENAME:
|
||||
return self._filename
|
||||
else:
|
||||
# Get filename
|
||||
if self.extension is not None:
|
||||
ext = self.extension
|
||||
else:
|
||||
ext = os.path.splitext(self._filename)[1]
|
||||
self._filename_local = tempfile.mktemp(ext, "imageio_")
|
||||
# Write stuff to it?
|
||||
if self.mode.io_mode == IOMode.read:
|
||||
with open(self._filename_local, "wb") as file:
|
||||
shutil.copyfileobj(self.get_file(), file)
|
||||
return self._filename_local
|
||||
|
||||
def finish(self) -> None:
|
||||
"""Wrap up this request.
|
||||
|
||||
Finishes any pending reads or writes, closes any open files and frees
|
||||
any resources allocated by this request.
|
||||
"""
|
||||
|
||||
if self.mode.io_mode == IOMode.write:
|
||||
# See if we "own" the data and must put it somewhere
|
||||
bytes = None
|
||||
if self._filename_local:
|
||||
bytes = Path(self._filename_local).read_bytes()
|
||||
elif self._file_is_local:
|
||||
self._file_is_local = False
|
||||
bytes = self._file.getvalue()
|
||||
|
||||
# Put the data in the right place
|
||||
if bytes is not None:
|
||||
if self._uri_type == URI_BYTES:
|
||||
self._result = bytes # Picked up by imread function
|
||||
elif self._uri_type == URI_FILE:
|
||||
self._file.write(bytes)
|
||||
elif self._uri_type == URI_ZIPPED:
|
||||
zf = zipfile.ZipFile(self._filename_zip[0], "a")
|
||||
zf.writestr(self._filename_zip[1], bytes)
|
||||
zf.close()
|
||||
# elif self._uri_type == URI_FILENAME: -> is always direct
|
||||
# elif self._uri_type == URI_FTP/HTTP: -> write not supported
|
||||
|
||||
# Close open files that we know of (and are responsible for)
|
||||
if self._file and self._uri_type != URI_FILE:
|
||||
self._file.close()
|
||||
self._file = None
|
||||
if self._zipfile:
|
||||
self._zipfile.close()
|
||||
self._zipfile = None
|
||||
|
||||
# Remove temp file
|
||||
if self._filename_local:
|
||||
try:
|
||||
os.remove(self._filename_local)
|
||||
except Exception: # pragma: no cover
|
||||
warnings.warn(
|
||||
"Failed to delete the temporary file at "
|
||||
f"`{self._filename_local}`. Please report this issue."
|
||||
)
|
||||
self._filename_local = None
|
||||
|
||||
# Detach so gc can clean even if a reference of self lingers
|
||||
self._bytes = None
|
||||
|
||||
def get_result(self):
|
||||
"""For internal use. In some situations a write action can have
|
||||
a result (bytes data). That is obtained with this function.
|
||||
"""
|
||||
# Is there a reason to disallow reading multiple times?
|
||||
self._result, res = None, self._result
|
||||
return res
|
||||
|
||||
@property
|
||||
def firstbytes(self):
|
||||
"""The first 256 bytes of the file. These can be used to
|
||||
parse the header to determine the file-format.
|
||||
"""
|
||||
if self._firstbytes is None:
|
||||
self._read_first_bytes()
|
||||
return self._firstbytes
|
||||
|
||||
def _read_first_bytes(self, N=256):
|
||||
if self._bytes is not None:
|
||||
self._firstbytes = self._bytes[:N]
|
||||
else:
|
||||
# Prepare
|
||||
try:
|
||||
f = self.get_file()
|
||||
except IOError:
|
||||
if os.path.isdir(self.filename): # A directory, e.g. for DICOM
|
||||
self._firstbytes = bytes()
|
||||
return
|
||||
raise
|
||||
try:
|
||||
i = f.tell()
|
||||
except Exception:
|
||||
i = None
|
||||
# Read
|
||||
self._firstbytes = read_n_bytes(f, N)
|
||||
# Set back
|
||||
try:
|
||||
if i is None:
|
||||
raise Exception("cannot seek with None")
|
||||
f.seek(i)
|
||||
except Exception:
|
||||
# Prevent get_file() from reusing the file
|
||||
self._file = None
|
||||
# If the given URI was a file object, we have a problem,
|
||||
if self._uri_type == URI_FILE:
|
||||
raise IOError("Cannot seek back after getting firstbytes!")
|
||||
|
||||
|
||||
def read_n_bytes(f, N):
|
||||
"""read_n_bytes(file, n)
|
||||
|
||||
Read n bytes from the given file, or less if the file has less
|
||||
bytes. Returns zero bytes if the file is closed.
|
||||
"""
|
||||
bb = bytes()
|
||||
while len(bb) < N:
|
||||
extra_bytes = f.read(N - len(bb))
|
||||
if not extra_bytes:
|
||||
break
|
||||
bb += extra_bytes
|
||||
return bb
|
||||
|
||||
|
||||
class SeekableFileObject:
|
||||
"""A readonly wrapper file object that add support for seeking, even if
|
||||
the wrapped file object does not. The allows us to stream from http and
|
||||
still use Pillow.
|
||||
"""
|
||||
|
||||
def __init__(self, f):
|
||||
self.f = f
|
||||
self._i = 0 # >=0 but can exceed buffer
|
||||
self._buffer = b""
|
||||
self._have_all = False
|
||||
self.closed = False
|
||||
|
||||
def read(self, n=None):
|
||||
# Fix up n
|
||||
if n is None:
|
||||
pass
|
||||
else:
|
||||
n = int(n)
|
||||
if n < 0:
|
||||
n = None
|
||||
|
||||
# Can and must we read more?
|
||||
if not self._have_all:
|
||||
more = b""
|
||||
if n is None:
|
||||
more = self.f.read()
|
||||
self._have_all = True
|
||||
else:
|
||||
want_i = self._i + n
|
||||
want_more = want_i - len(self._buffer)
|
||||
if want_more > 0:
|
||||
more = self.f.read(want_more)
|
||||
if len(more) < want_more:
|
||||
self._have_all = True
|
||||
self._buffer += more
|
||||
|
||||
# Read data from buffer and update pointer
|
||||
if n is None:
|
||||
res = self._buffer[self._i :]
|
||||
else:
|
||||
res = self._buffer[self._i : self._i + n]
|
||||
self._i += len(res)
|
||||
|
||||
return res
|
||||
|
||||
def tell(self):
|
||||
return self._i
|
||||
|
||||
def seek(self, i, mode=0):
|
||||
# Mimic BytesIO behavior
|
||||
|
||||
# Get the absolute new position
|
||||
i = int(i)
|
||||
if mode == 0:
|
||||
if i < 0:
|
||||
raise ValueError("negative seek value " + str(i))
|
||||
real_i = i
|
||||
elif mode == 1:
|
||||
real_i = max(0, self._i + i) # negative ok here
|
||||
elif mode == 2:
|
||||
if not self._have_all:
|
||||
self.read()
|
||||
real_i = max(0, len(self._buffer) + i)
|
||||
else:
|
||||
raise ValueError("invalid whence (%s, should be 0, 1 or 2)" % i)
|
||||
|
||||
# Read some?
|
||||
if real_i <= len(self._buffer):
|
||||
pass # no need to read
|
||||
elif not self._have_all:
|
||||
assert real_i > self._i # if we don't have all, _i cannot be > _buffer
|
||||
self.read(real_i - self._i) # sets self._i
|
||||
|
||||
self._i = real_i
|
||||
return self._i
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
self.f.close()
|
||||
|
||||
def isatty(self):
|
||||
return False
|
||||
|
||||
def seekable(self):
|
||||
return True
|
||||
|
||||
|
||||
class InitializationError(Exception):
|
||||
"""The plugin could not initialize from the given request.
|
||||
|
||||
This is a _internal_ error that is raised by plugins that fail to handle
|
||||
a given request. We use this to differentiate incompatibility between
|
||||
a plugin and a request from an actual error/bug inside a plugin.
|
||||
|
||||
"""
|
||||
|
||||
pass
|
||||
@@ -1,90 +0,0 @@
|
||||
from typing import BinaryIO, Optional, Dict, Any, Sequence, overload, Literal
|
||||
from ..typing import ImageResource
|
||||
import enum
|
||||
|
||||
EXAMPLE_IMAGES: Dict[str, str]
|
||||
RETURN_BYTES = "<bytes>"
|
||||
URI_BYTES = 1
|
||||
URI_FILE = 2
|
||||
URI_FILENAME = 3
|
||||
URI_ZIPPED = 4
|
||||
URI_HTTP = 5
|
||||
URI_FTP = 6
|
||||
|
||||
class IOMode(str, enum.Enum):
|
||||
read = "r"
|
||||
write = "w"
|
||||
|
||||
class ImageMode(str, enum.Enum):
|
||||
single_image = "i"
|
||||
multi_image = "I"
|
||||
single_volume = "v"
|
||||
multi_volume = "V"
|
||||
any_mode = "?"
|
||||
|
||||
@enum.unique
|
||||
class Mode(str, enum.Enum):
|
||||
read_single_image = "ri"
|
||||
read_multi_image = "rI"
|
||||
read_single_volume = "rv"
|
||||
read_multi_volume = "rV"
|
||||
read_any = "r?"
|
||||
write_single_image = "wi"
|
||||
write_multi_image = "wI"
|
||||
write_single_volume = "wv"
|
||||
write_multi_volume = "wV"
|
||||
write_any = "w?"
|
||||
|
||||
@classmethod
|
||||
def _missing_(cls, value: Any) -> Mode: ...
|
||||
@property
|
||||
def io_mode(self) -> IOMode: ...
|
||||
@property
|
||||
def image_mode(self) -> ImageMode: ...
|
||||
|
||||
class InitializationError(Exception): ...
|
||||
|
||||
class Request(object):
|
||||
_uri_type: int
|
||||
raw_uri: ImageResource
|
||||
|
||||
@property
|
||||
def filename(self) -> str: ...
|
||||
@property
|
||||
def extension(self) -> str: ...
|
||||
@property
|
||||
def format_hint(self) -> Optional[str]: ...
|
||||
@format_hint.setter
|
||||
def format_hint(self, format: str) -> None: ...
|
||||
@property
|
||||
def mode(self) -> Mode: ...
|
||||
@property
|
||||
def kwargs(self) -> Dict[str, Any]: ...
|
||||
@property
|
||||
def firstbytes(self) -> bytes: ...
|
||||
def __init__(
|
||||
self,
|
||||
uri: ImageResource,
|
||||
mode: str,
|
||||
*,
|
||||
extension: str = None,
|
||||
format_hint: str = None,
|
||||
**kwargs
|
||||
) -> None: ...
|
||||
def _parse_uri(self, uri: ImageResource) -> None: ...
|
||||
def get_file(self) -> BinaryIO: ...
|
||||
def get_local_filename(self) -> str: ...
|
||||
def finish(self) -> None: ...
|
||||
def get_result(self) -> Optional[bytes]: ...
|
||||
def _read_first_bytes(self, N: int = 256) -> bytes: ...
|
||||
|
||||
def read_n_bytes(f: BinaryIO, N: int) -> bytes: ...
|
||||
|
||||
class SeekableFileObject:
|
||||
def __init__(self, f: BinaryIO) -> None: ...
|
||||
def read(self, n: int = None) -> bytes: ...
|
||||
def tell(self) -> int: ...
|
||||
def seek(self, i: int, mode: int = 0) -> int: ...
|
||||
def close(self) -> None: ...
|
||||
def isatty(self) -> bool: ...
|
||||
def seekable(self) -> bool: ...
|
||||
559
.CondaPkg/env/Lib/site-packages/imageio/core/util.py
vendored
559
.CondaPkg/env/Lib/site-packages/imageio/core/util.py
vendored
@@ -1,559 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# imageio is distributed under the terms of the (new) BSD License.
|
||||
|
||||
"""
|
||||
Various utilities for imageio
|
||||
"""
|
||||
|
||||
|
||||
from collections import OrderedDict
|
||||
import numpy as np
|
||||
import os
|
||||
import re
|
||||
import struct
|
||||
import sys
|
||||
import time
|
||||
import logging
|
||||
|
||||
|
||||
logger = logging.getLogger("imageio")
|
||||
|
||||
IS_PYPY = "__pypy__" in sys.builtin_module_names
|
||||
THIS_DIR = os.path.abspath(os.path.dirname(__file__))
|
||||
|
||||
|
||||
def urlopen(*args, **kwargs):
|
||||
"""Compatibility function for the urlopen function. Raises an
|
||||
RuntimeError if urlopen could not be imported (which can occur in
|
||||
frozen applications.
|
||||
"""
|
||||
try:
|
||||
from urllib.request import urlopen
|
||||
except ImportError:
|
||||
raise RuntimeError("Could not import urlopen.")
|
||||
return urlopen(*args, **kwargs)
|
||||
|
||||
|
||||
def _precision_warn(p1, p2, extra=""):
|
||||
t = (
|
||||
"Lossy conversion from {} to {}. {} Convert image to {} prior to "
|
||||
"saving to suppress this warning."
|
||||
)
|
||||
logger.warning(t.format(p1, p2, extra, p2))
|
||||
|
||||
|
||||
def image_as_uint(im, bitdepth=None):
|
||||
"""Convert the given image to uint (default: uint8)
|
||||
|
||||
If the dtype already matches the desired format, it is returned
|
||||
as-is. If the image is float, and all values are between 0 and 1,
|
||||
the values are multiplied by np.power(2.0, bitdepth). In all other
|
||||
situations, the values are scaled such that the minimum value
|
||||
becomes 0 and the maximum value becomes np.power(2.0, bitdepth)-1
|
||||
(255 for 8-bit and 65535 for 16-bit).
|
||||
"""
|
||||
if not bitdepth:
|
||||
bitdepth = 8
|
||||
if not isinstance(im, np.ndarray):
|
||||
raise ValueError("Image must be a numpy array")
|
||||
if bitdepth == 8:
|
||||
out_type = np.uint8
|
||||
elif bitdepth == 16:
|
||||
out_type = np.uint16
|
||||
else:
|
||||
raise ValueError("Bitdepth must be either 8 or 16")
|
||||
dtype_str1 = str(im.dtype)
|
||||
dtype_str2 = out_type.__name__
|
||||
if (im.dtype == np.uint8 and bitdepth == 8) or (
|
||||
im.dtype == np.uint16 and bitdepth == 16
|
||||
):
|
||||
# Already the correct format? Return as-is
|
||||
return im
|
||||
if dtype_str1.startswith("float") and np.nanmin(im) >= 0 and np.nanmax(im) <= 1:
|
||||
_precision_warn(dtype_str1, dtype_str2, "Range [0, 1].")
|
||||
im = im.astype(np.float64) * (np.power(2.0, bitdepth) - 1) + 0.499999999
|
||||
elif im.dtype == np.uint16 and bitdepth == 8:
|
||||
_precision_warn(dtype_str1, dtype_str2, "Losing 8 bits of resolution.")
|
||||
im = np.right_shift(im, 8)
|
||||
elif im.dtype == np.uint32:
|
||||
_precision_warn(
|
||||
dtype_str1,
|
||||
dtype_str2,
|
||||
"Losing {} bits of resolution.".format(32 - bitdepth),
|
||||
)
|
||||
im = np.right_shift(im, 32 - bitdepth)
|
||||
elif im.dtype == np.uint64:
|
||||
_precision_warn(
|
||||
dtype_str1,
|
||||
dtype_str2,
|
||||
"Losing {} bits of resolution.".format(64 - bitdepth),
|
||||
)
|
||||
im = np.right_shift(im, 64 - bitdepth)
|
||||
else:
|
||||
mi = np.nanmin(im)
|
||||
ma = np.nanmax(im)
|
||||
if not np.isfinite(mi):
|
||||
raise ValueError("Minimum image value is not finite")
|
||||
if not np.isfinite(ma):
|
||||
raise ValueError("Maximum image value is not finite")
|
||||
if ma == mi:
|
||||
return im.astype(out_type)
|
||||
_precision_warn(dtype_str1, dtype_str2, "Range [{}, {}].".format(mi, ma))
|
||||
# Now make float copy before we scale
|
||||
im = im.astype("float64")
|
||||
# Scale the values between 0 and 1 then multiply by the max value
|
||||
im = (im - mi) / (ma - mi) * (np.power(2.0, bitdepth) - 1) + 0.499999999
|
||||
assert np.nanmin(im) >= 0
|
||||
assert np.nanmax(im) < np.power(2.0, bitdepth)
|
||||
return im.astype(out_type)
|
||||
|
||||
|
||||
class Array(np.ndarray):
|
||||
"""Array(array, meta=None)
|
||||
|
||||
A subclass of np.ndarray that has a meta attribute. Get the dictionary
|
||||
that contains the meta data using ``im.meta``. Convert to a plain numpy
|
||||
array using ``np.asarray(im)``.
|
||||
|
||||
"""
|
||||
|
||||
def __new__(cls, array, meta=None):
|
||||
# Check
|
||||
if not isinstance(array, np.ndarray):
|
||||
raise ValueError("Array expects a numpy array.")
|
||||
if not (meta is None or isinstance(meta, dict)):
|
||||
raise ValueError("Array expects meta data to be a dict.")
|
||||
# Convert and return
|
||||
meta = meta if meta is not None else getattr(array, "meta", {})
|
||||
try:
|
||||
ob = array.view(cls)
|
||||
except AttributeError: # pragma: no cover
|
||||
# Just return the original; no metadata on the array in Pypy!
|
||||
return array
|
||||
ob._copy_meta(meta)
|
||||
return ob
|
||||
|
||||
def _copy_meta(self, meta):
|
||||
"""Make a 2-level deep copy of the meta dictionary."""
|
||||
self._meta = Dict()
|
||||
for key, val in meta.items():
|
||||
if isinstance(val, dict):
|
||||
val = Dict(val) # Copy this level
|
||||
self._meta[key] = val
|
||||
|
||||
@property
|
||||
def meta(self):
|
||||
"""The dict with the meta data of this image."""
|
||||
return self._meta
|
||||
|
||||
def __array_finalize__(self, ob):
|
||||
"""So the meta info is maintained when doing calculations with
|
||||
the array.
|
||||
"""
|
||||
if isinstance(ob, Array):
|
||||
self._copy_meta(ob.meta)
|
||||
else:
|
||||
self._copy_meta({})
|
||||
|
||||
def __array_wrap__(self, out, context=None):
|
||||
"""So that we return a native numpy array (or scalar) when a
|
||||
reducting ufunc is applied (such as sum(), std(), etc.)
|
||||
"""
|
||||
if not out.shape:
|
||||
return out.dtype.type(out) # Scalar
|
||||
elif out.shape != self.shape:
|
||||
return out.view(type=np.ndarray)
|
||||
else:
|
||||
return out # Type Array
|
||||
|
||||
|
||||
Image = Array # Alias for backwards compatibility
|
||||
|
||||
|
||||
def asarray(a):
|
||||
"""Pypy-safe version of np.asarray. Pypy's np.asarray consumes a
|
||||
*lot* of memory if the given array is an ndarray subclass. This
|
||||
function does not.
|
||||
"""
|
||||
if isinstance(a, np.ndarray):
|
||||
if IS_PYPY: # pragma: no cover
|
||||
a = a.copy() # pypy has issues with base views
|
||||
plain = a.view(type=np.ndarray)
|
||||
return plain
|
||||
return np.asarray(a)
|
||||
|
||||
|
||||
class Dict(OrderedDict):
|
||||
"""A dict in which the keys can be get and set as if they were
|
||||
attributes. Very convenient in combination with autocompletion.
|
||||
|
||||
This Dict still behaves as much as possible as a normal dict, and
|
||||
keys can be anything that are otherwise valid keys. However,
|
||||
keys that are not valid identifiers or that are names of the dict
|
||||
class (such as 'items' and 'copy') cannot be get/set as attributes.
|
||||
"""
|
||||
|
||||
__reserved_names__ = dir(OrderedDict()) # Also from OrderedDict
|
||||
__pure_names__ = dir(dict())
|
||||
|
||||
def __getattribute__(self, key):
|
||||
try:
|
||||
return object.__getattribute__(self, key)
|
||||
except AttributeError:
|
||||
if key in self:
|
||||
return self[key]
|
||||
else:
|
||||
raise
|
||||
|
||||
def __setattr__(self, key, val):
|
||||
if key in Dict.__reserved_names__:
|
||||
# Either let OrderedDict do its work, or disallow
|
||||
if key not in Dict.__pure_names__:
|
||||
return OrderedDict.__setattr__(self, key, val)
|
||||
else:
|
||||
raise AttributeError(
|
||||
"Reserved name, this key can only "
|
||||
+ "be set via ``d[%r] = X``" % key
|
||||
)
|
||||
else:
|
||||
# if isinstance(val, dict): val = Dict(val) -> no, makes a copy!
|
||||
self[key] = val
|
||||
|
||||
def __dir__(self):
|
||||
def isidentifier(x):
|
||||
return bool(re.match(r"[a-z_]\w*$", x, re.I))
|
||||
|
||||
names = [k for k in self.keys() if (isinstance(k, str) and isidentifier(k))]
|
||||
return Dict.__reserved_names__ + names
|
||||
|
||||
|
||||
class BaseProgressIndicator(object):
|
||||
"""BaseProgressIndicator(name)
|
||||
|
||||
A progress indicator helps display the progres of a task to the
|
||||
user. Progress can be pending, running, finished or failed.
|
||||
|
||||
Each task has:
|
||||
* a name - a short description of what needs to be done.
|
||||
* an action - the current action in performing the task (e.g. a subtask)
|
||||
* progress - how far the task is completed
|
||||
* max - max number of progress units. If 0, the progress is indefinite
|
||||
* unit - the units in which the progress is counted
|
||||
* status - 0: pending, 1: in progress, 2: finished, 3: failed
|
||||
|
||||
This class defines an abstract interface. Subclasses should implement
|
||||
_start, _stop, _update_progress(progressText), _write(message).
|
||||
"""
|
||||
|
||||
def __init__(self, name):
|
||||
self._name = name
|
||||
self._action = ""
|
||||
self._unit = ""
|
||||
self._max = 0
|
||||
self._status = 0
|
||||
self._last_progress_update = 0
|
||||
|
||||
def start(self, action="", unit="", max=0):
|
||||
"""start(action='', unit='', max=0)
|
||||
|
||||
Start the progress. Optionally specify an action, a unit,
|
||||
and a maxium progress value.
|
||||
"""
|
||||
if self._status == 1:
|
||||
self.finish()
|
||||
self._action = action
|
||||
self._unit = unit
|
||||
self._max = max
|
||||
#
|
||||
self._progress = 0
|
||||
self._status = 1
|
||||
self._start()
|
||||
|
||||
def status(self):
|
||||
"""status()
|
||||
|
||||
Get the status of the progress - 0: pending, 1: in progress,
|
||||
2: finished, 3: failed
|
||||
"""
|
||||
return self._status
|
||||
|
||||
def set_progress(self, progress=0, force=False):
|
||||
"""set_progress(progress=0, force=False)
|
||||
|
||||
Set the current progress. To avoid unnecessary progress updates
|
||||
this will only have a visual effect if the time since the last
|
||||
update is > 0.1 seconds, or if force is True.
|
||||
"""
|
||||
self._progress = progress
|
||||
# Update or not?
|
||||
if not (force or (time.time() - self._last_progress_update > 0.1)):
|
||||
return
|
||||
self._last_progress_update = time.time()
|
||||
# Compose new string
|
||||
unit = self._unit or ""
|
||||
progressText = ""
|
||||
if unit == "%":
|
||||
progressText = "%2.1f%%" % progress
|
||||
elif self._max > 0:
|
||||
percent = 100 * float(progress) / self._max
|
||||
progressText = "%i/%i %s (%2.1f%%)" % (progress, self._max, unit, percent)
|
||||
elif progress > 0:
|
||||
if isinstance(progress, float):
|
||||
progressText = "%0.4g %s" % (progress, unit)
|
||||
else:
|
||||
progressText = "%i %s" % (progress, unit)
|
||||
# Update
|
||||
self._update_progress(progressText)
|
||||
|
||||
def increase_progress(self, extra_progress):
|
||||
"""increase_progress(extra_progress)
|
||||
|
||||
Increase the progress by a certain amount.
|
||||
"""
|
||||
self.set_progress(self._progress + extra_progress)
|
||||
|
||||
def finish(self, message=None):
|
||||
"""finish(message=None)
|
||||
|
||||
Finish the progress, optionally specifying a message. This will
|
||||
not set the progress to the maximum.
|
||||
"""
|
||||
self.set_progress(self._progress, True) # fore update
|
||||
self._status = 2
|
||||
self._stop()
|
||||
if message is not None:
|
||||
self._write(message)
|
||||
|
||||
def fail(self, message=None):
|
||||
"""fail(message=None)
|
||||
|
||||
Stop the progress with a failure, optionally specifying a message.
|
||||
"""
|
||||
self.set_progress(self._progress, True) # fore update
|
||||
self._status = 3
|
||||
self._stop()
|
||||
message = "FAIL " + (message or "")
|
||||
self._write(message)
|
||||
|
||||
def write(self, message):
|
||||
"""write(message)
|
||||
|
||||
Write a message during progress (such as a warning).
|
||||
"""
|
||||
if self.__class__ == BaseProgressIndicator:
|
||||
# When this class is used as a dummy, print explicit message
|
||||
print(message)
|
||||
else:
|
||||
return self._write(message)
|
||||
|
||||
# Implementing classes should implement these
|
||||
|
||||
def _start(self):
|
||||
pass
|
||||
|
||||
def _stop(self):
|
||||
pass
|
||||
|
||||
def _update_progress(self, progressText):
|
||||
pass
|
||||
|
||||
def _write(self, message):
|
||||
pass
|
||||
|
||||
|
||||
class StdoutProgressIndicator(BaseProgressIndicator):
|
||||
"""StdoutProgressIndicator(name)
|
||||
|
||||
A progress indicator that shows the progress in stdout. It
|
||||
assumes that the tty can appropriately deal with backspace
|
||||
characters.
|
||||
"""
|
||||
|
||||
def _start(self):
|
||||
self._chars_prefix, self._chars = "", ""
|
||||
# Write message
|
||||
if self._action:
|
||||
self._chars_prefix = "%s (%s): " % (self._name, self._action)
|
||||
else:
|
||||
self._chars_prefix = "%s: " % self._name
|
||||
sys.stdout.write(self._chars_prefix)
|
||||
sys.stdout.flush()
|
||||
|
||||
def _update_progress(self, progressText):
|
||||
# If progress is unknown, at least make something move
|
||||
if not progressText:
|
||||
i1, i2, i3, i4 = "-\\|/"
|
||||
M = {i1: i2, i2: i3, i3: i4, i4: i1}
|
||||
progressText = M.get(self._chars, i1)
|
||||
# Store new string and write
|
||||
delChars = "\b" * len(self._chars)
|
||||
self._chars = progressText
|
||||
sys.stdout.write(delChars + self._chars)
|
||||
sys.stdout.flush()
|
||||
|
||||
def _stop(self):
|
||||
self._chars = self._chars_prefix = ""
|
||||
sys.stdout.write("\n")
|
||||
sys.stdout.flush()
|
||||
|
||||
def _write(self, message):
|
||||
# Write message
|
||||
delChars = "\b" * len(self._chars_prefix + self._chars)
|
||||
sys.stdout.write(delChars + " " + message + "\n")
|
||||
# Reprint progress text
|
||||
sys.stdout.write(self._chars_prefix + self._chars)
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
# From pyzolib/paths.py (https://bitbucket.org/pyzo/pyzolib/src/tip/paths.py)
|
||||
def appdata_dir(appname=None, roaming=False):
|
||||
"""appdata_dir(appname=None, roaming=False)
|
||||
|
||||
Get the path to the application directory, where applications are allowed
|
||||
to write user specific files (e.g. configurations). For non-user specific
|
||||
data, consider using common_appdata_dir().
|
||||
If appname is given, a subdir is appended (and created if necessary).
|
||||
If roaming is True, will prefer a roaming directory (Windows Vista/7).
|
||||
"""
|
||||
|
||||
# Define default user directory
|
||||
userDir = os.getenv("IMAGEIO_USERDIR", None)
|
||||
if userDir is None:
|
||||
userDir = os.path.expanduser("~")
|
||||
if not os.path.isdir(userDir): # pragma: no cover
|
||||
userDir = "/var/tmp" # issue #54
|
||||
|
||||
# Get system app data dir
|
||||
path = None
|
||||
if sys.platform.startswith("win"):
|
||||
path1, path2 = os.getenv("LOCALAPPDATA"), os.getenv("APPDATA")
|
||||
path = (path2 or path1) if roaming else (path1 or path2)
|
||||
elif sys.platform.startswith("darwin"):
|
||||
path = os.path.join(userDir, "Library", "Application Support")
|
||||
# On Linux and as fallback
|
||||
if not (path and os.path.isdir(path)):
|
||||
path = userDir
|
||||
|
||||
# Maybe we should store things local to the executable (in case of a
|
||||
# portable distro or a frozen application that wants to be portable)
|
||||
prefix = sys.prefix
|
||||
if getattr(sys, "frozen", None):
|
||||
prefix = os.path.abspath(os.path.dirname(sys.executable))
|
||||
for reldir in ("settings", "../settings"):
|
||||
localpath = os.path.abspath(os.path.join(prefix, reldir))
|
||||
if os.path.isdir(localpath): # pragma: no cover
|
||||
try:
|
||||
open(os.path.join(localpath, "test.write"), "wb").close()
|
||||
os.remove(os.path.join(localpath, "test.write"))
|
||||
except IOError:
|
||||
pass # We cannot write in this directory
|
||||
else:
|
||||
path = localpath
|
||||
break
|
||||
|
||||
# Get path specific for this app
|
||||
if appname:
|
||||
if path == userDir:
|
||||
appname = "." + appname.lstrip(".") # Make it a hidden directory
|
||||
path = os.path.join(path, appname)
|
||||
if not os.path.isdir(path): # pragma: no cover
|
||||
os.makedirs(path, exist_ok=True)
|
||||
|
||||
# Done
|
||||
return path
|
||||
|
||||
|
||||
def resource_dirs():
|
||||
"""resource_dirs()
|
||||
|
||||
Get a list of directories where imageio resources may be located.
|
||||
The first directory in this list is the "resources" directory in
|
||||
the package itself. The second directory is the appdata directory
|
||||
(~/.imageio on Linux). The list further contains the application
|
||||
directory (for frozen apps), and may include additional directories
|
||||
in the future.
|
||||
"""
|
||||
dirs = [resource_package_dir()]
|
||||
# Resource dir baked in the package.
|
||||
# Appdata directory
|
||||
try:
|
||||
dirs.append(appdata_dir("imageio"))
|
||||
except Exception: # pragma: no cover
|
||||
pass # The home dir may not be writable
|
||||
# Directory where the app is located (mainly for frozen apps)
|
||||
if getattr(sys, "frozen", None):
|
||||
dirs.append(os.path.abspath(os.path.dirname(sys.executable)))
|
||||
elif sys.path and sys.path[0]:
|
||||
dirs.append(os.path.abspath(sys.path[0]))
|
||||
return dirs
|
||||
|
||||
|
||||
def resource_package_dir():
|
||||
"""package_dir
|
||||
|
||||
Get the resources directory in the imageio package installation
|
||||
directory.
|
||||
|
||||
Notes
|
||||
-----
|
||||
This is a convenience method that is used by `resource_dirs` and
|
||||
imageio entry point scripts.
|
||||
"""
|
||||
# Make pkg_resources optional if setuptools is not available
|
||||
try:
|
||||
# Avoid importing pkg_resources in the top level due to how slow it is
|
||||
# https://github.com/pypa/setuptools/issues/510
|
||||
import pkg_resources
|
||||
except ImportError:
|
||||
pkg_resources = None
|
||||
|
||||
if pkg_resources:
|
||||
# The directory returned by `pkg_resources.resource_filename`
|
||||
# also works with eggs.
|
||||
pdir = pkg_resources.resource_filename("imageio", "resources")
|
||||
else:
|
||||
# If setuptools is not available, use fallback
|
||||
pdir = os.path.abspath(os.path.join(THIS_DIR, "..", "resources"))
|
||||
return pdir
|
||||
|
||||
|
||||
def get_platform():
|
||||
"""get_platform()
|
||||
|
||||
Get a string that specifies the platform more specific than
|
||||
sys.platform does. The result can be: linux32, linux64, win32,
|
||||
win64, osx32, osx64. Other platforms may be added in the future.
|
||||
"""
|
||||
# Get platform
|
||||
if sys.platform.startswith("linux"):
|
||||
plat = "linux%i"
|
||||
elif sys.platform.startswith("win"):
|
||||
plat = "win%i"
|
||||
elif sys.platform.startswith("darwin"):
|
||||
plat = "osx%i"
|
||||
elif sys.platform.startswith("freebsd"):
|
||||
plat = "freebsd%i"
|
||||
else: # pragma: no cover
|
||||
return None
|
||||
|
||||
return plat % (struct.calcsize("P") * 8) # 32 or 64 bits
|
||||
|
||||
|
||||
def has_module(module_name):
|
||||
"""Check to see if a python module is available."""
|
||||
if sys.version_info > (3, 4):
|
||||
import importlib
|
||||
|
||||
name_parts = module_name.split(".")
|
||||
for i in range(len(name_parts)):
|
||||
if importlib.util.find_spec(".".join(name_parts[: i + 1])) is None:
|
||||
return False
|
||||
return True
|
||||
else: # pragma: no cover
|
||||
import imp
|
||||
|
||||
try:
|
||||
imp.find_module(module_name)
|
||||
except ImportError:
|
||||
return False
|
||||
return True
|
||||
@@ -1,370 +0,0 @@
|
||||
from . import Request
|
||||
from ..typing import ArrayLike
|
||||
import numpy as np
|
||||
from typing import Optional, Dict, Any, Tuple, Union, List, Iterator
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class ImageProperties:
|
||||
"""Standardized Metadata
|
||||
|
||||
ImageProperties represent a set of standardized metadata that is available
|
||||
under the same name for every supported format. If the ImageResource (or
|
||||
format) does not specify the value, a sensible default value is chosen
|
||||
instead.
|
||||
|
||||
Attributes
|
||||
----------
|
||||
shape : Tuple[int, ...]
|
||||
The shape of the loaded ndimage.
|
||||
dtype : np.dtype
|
||||
The dtype of the loaded ndimage.
|
||||
n_images : int
|
||||
Number of images in the file if ``index=...``, `None` for single images.
|
||||
is_batch : bool
|
||||
If True, the first dimension of the ndimage represents a batch dimension
|
||||
along which several images are stacked.
|
||||
spacing : Tuple
|
||||
A tuple describing the spacing between pixels along each axis of the
|
||||
ndimage. If the spacing is uniform along an axis the value corresponding
|
||||
to that axis is a single float. If the spacing is non-uniform, the value
|
||||
corresponding to that axis is a tuple in which the i-th element
|
||||
indicates the spacing between the i-th and (i+1)-th pixel along that
|
||||
axis.
|
||||
|
||||
"""
|
||||
|
||||
shape: Tuple[int, ...]
|
||||
dtype: np.dtype
|
||||
n_images: Optional[int] = None
|
||||
is_batch: bool = False
|
||||
spacing: Optional[tuple] = None
|
||||
|
||||
|
||||
class PluginV3:
|
||||
"""A ImageIO Plugin.
|
||||
|
||||
This is an abstract plugin that documents the v3 plugin API interface. A
|
||||
plugin is an adapter/wrapper around a backend that converts a request from
|
||||
iio.core (e.g., read an image from file) into a sequence of instructions for
|
||||
the backend that fullfill the request.
|
||||
|
||||
Plugin authors may choose to subclass this class when implementing a new
|
||||
plugin, but aren't obliged to do so. As long as the plugin class implements
|
||||
the interface (methods) described below the ImageIO core will treat it just
|
||||
like any other plugin.
|
||||
|
||||
|
||||
Parameters
|
||||
----------
|
||||
request : iio.Request
|
||||
A request object that represents the users intent. It provides a
|
||||
standard interface to access the various ImageResources and serves them
|
||||
to the plugin as a file object (or file). Check the docs for details.
|
||||
**kwargs : Any
|
||||
Additional configuration arguments for the plugin or backend. Usually
|
||||
these match the configuration arguments available on the backend and
|
||||
are forwarded to it.
|
||||
|
||||
|
||||
Raises
|
||||
------
|
||||
InitializationError
|
||||
During ``__init__`` the plugin tests if it can fulfill the request. If
|
||||
it can't, e.g., because the request points to a file in the wrong
|
||||
format, then it should raise an ``InitializationError`` and provide a
|
||||
reason for failure. This reason may be reported to the user.
|
||||
ImportError
|
||||
Plugins will be imported dynamically when listed in
|
||||
``iio.config.known_plugins`` to fullfill requests. This way, users only
|
||||
have to load plugins/backends they actually use. If this plugin's backend
|
||||
is not installed, it should raise an ``ImportError`` either during
|
||||
module import or during class construction.
|
||||
|
||||
Notes
|
||||
-----
|
||||
Upon successful construction the plugin takes ownership of the provided
|
||||
request. This means that it is the plugin's responsibility to call
|
||||
request.finish() to close the resource when it is no longer needed.
|
||||
|
||||
Plugins _must_ implement a context manager that closes and cleans any
|
||||
resources held by the plugin upon exit.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, request: Request) -> None:
|
||||
"""Initialize a new Plugin Instance.
|
||||
|
||||
See Plugin's docstring for detailed documentation.
|
||||
|
||||
Notes
|
||||
-----
|
||||
The implementation here stores the request as a local variable that is
|
||||
exposed using a @property below. If you inherit from PluginV3, remember
|
||||
to call ``super().__init__(request)``.
|
||||
|
||||
"""
|
||||
|
||||
self._request = request
|
||||
|
||||
def read(self, *, index: int = 0) -> np.ndarray:
|
||||
"""Read a ndimage.
|
||||
|
||||
The ``read`` method loads a (single) ndimage, located at ``index`` from
|
||||
the requested ImageResource.
|
||||
|
||||
It is at the plugin's descretion to decide (and document) what
|
||||
constitutes a single ndimage. A sensible way to make this decision is to
|
||||
choose based on the ImageResource's format and on what users will expect
|
||||
from such a format. For example, a sensible choice for a TIFF file
|
||||
produced by an ImageJ hyperstack is to read it as a volumetric ndimage
|
||||
(1 color dimension followed by 3 spatial dimensions). On the other hand,
|
||||
a sensible choice for a MP4 file produced by Davinci Resolve is to treat
|
||||
each frame as a ndimage (2 spatial dimensions followed by 1 color
|
||||
dimension).
|
||||
|
||||
The value ``index=None`` is special. It requests the plugin to load all
|
||||
ndimages in the file and stack them along a new first axis. For example,
|
||||
if a MP4 file is read with ``index=None`` and the plugin identifies
|
||||
single frames as ndimages, then the plugin should read all frames and
|
||||
stack them into a new ndimage which now contains a time axis as its
|
||||
first axis. If a PNG file (single image format) is read with
|
||||
``index=None`` the plugin does a very similar thing: It loads all
|
||||
ndimages in the file (here it's just one) and stacks them along a new
|
||||
first axis, effectively prepending an axis with size 1 to the image. If
|
||||
a plugin does not wish to support ``index=None`` it should set a more
|
||||
sensible default and raise a ``ValueError`` when requested to read using
|
||||
``index=None``.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
index : int
|
||||
If the ImageResource contains multiple ndimages, and index is an
|
||||
integer, select the index-th ndimage from among them and return it.
|
||||
If index is an ellipsis (...), read all ndimages in the file and
|
||||
stack them along a new batch dimension. If index is None, let the
|
||||
plugin decide. If the index is out of bounds a ``ValueError`` is
|
||||
raised.
|
||||
**kwargs : Any
|
||||
The read method may accept any number of plugin-specific keyword
|
||||
arguments to further customize the read behavior. Usually these
|
||||
match the arguments available on the backend and are forwarded to
|
||||
it.
|
||||
|
||||
Returns
|
||||
-------
|
||||
ndimage : np.ndarray
|
||||
A ndimage containing decoded pixel data (sometimes called bitmap).
|
||||
|
||||
Notes
|
||||
-----
|
||||
The ImageResource from which the plugin should read is managed by the
|
||||
provided request object. Directly accessing the managed ImageResource is
|
||||
_not_ permitted. Instead, you can get FileLike access to the
|
||||
ImageResource via request.get_file().
|
||||
|
||||
If the backend doesn't support reading from FileLike objects, you can
|
||||
request a temporary file to pass to the backend via
|
||||
``request.get_local_filename()``. This is, however, not very performant
|
||||
(involves copying the Request's content into a temporary file), so you
|
||||
should avoid doing this whenever possible. Consider it a fallback method
|
||||
in case all else fails.
|
||||
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def write(self, ndimage: Union[ArrayLike, List[ArrayLike]]) -> Optional[bytes]:
|
||||
"""Write a ndimage to a ImageResource.
|
||||
|
||||
The ``write`` method encodes the given ndimage into the format handled
|
||||
by the backend and writes it to the ImageResource. It overwrites
|
||||
any content that may have been previously stored in the file.
|
||||
|
||||
If the backend supports only a single format then it must check if
|
||||
the ImageResource matches that format and raise an exception if not.
|
||||
Typically, this should be done during initialization in the form of a
|
||||
``InitializationError``.
|
||||
|
||||
If the backend supports more than one format it must determine the
|
||||
requested/desired format. Usually this can be done by inspecting the
|
||||
ImageResource (e.g., by checking ``request.extension``), or by providing
|
||||
a mechanism to explicitly set the format (perhaps with a - sensible -
|
||||
default value). If the plugin can not determine the desired format, it
|
||||
**must not** write to the ImageResource, but raise an exception instead.
|
||||
|
||||
If the backend supports at least one format that can hold multiple
|
||||
ndimages it should be capable of handling ndimage batches and lists of
|
||||
ndimages. If the ``ndimage`` input is a list of ndimages, the plugin
|
||||
should not assume that the ndimages are not stackable, i.e., ndimages
|
||||
may have different shapes. Otherwise, the ``ndimage`` may be a batch of
|
||||
multiple ndimages stacked along the first axis of the array. The plugin
|
||||
must be able to discover this, either automatically or via additional
|
||||
`kwargs`. If there is ambiguity in the process, the plugin must clearly
|
||||
document what happens in such cases and, if possible, describe how to
|
||||
resolve this ambiguity.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
ndimage : ArrayLike
|
||||
The ndimage to encode and write to the current ImageResource.
|
||||
**kwargs : Any
|
||||
The write method may accept any number of plugin-specific keyword
|
||||
arguments to customize the writing behavior. Usually these match the
|
||||
arguments available on the backend and are forwarded to it.
|
||||
|
||||
Returns
|
||||
-------
|
||||
encoded_image : bytes or None
|
||||
If the chosen ImageResource is the special target ``"<bytes>"`` then
|
||||
write should return a byte string containing the encoded image data.
|
||||
Otherwise, it returns None.
|
||||
|
||||
Notes
|
||||
-----
|
||||
The ImageResource to which the plugin should write to is managed by the
|
||||
provided request object. Directly accessing the managed ImageResource is
|
||||
_not_ permitted. Instead, you can get FileLike access to the
|
||||
ImageResource via request.get_file().
|
||||
|
||||
If the backend doesn't support writing to FileLike objects, you can
|
||||
request a temporary file to pass to the backend via
|
||||
``request.get_local_filename()``. This is, however, not very performant
|
||||
(involves copying the Request's content from a temporary file), so you
|
||||
should avoid doing this whenever possible. Consider it a fallback method
|
||||
in case all else fails.
|
||||
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def iter(self) -> Iterator[np.ndarray]:
|
||||
"""Iterate the ImageResource.
|
||||
|
||||
This method returns a generator that yields ndimages in the order in which
|
||||
they appear in the file. This is roughly equivalent to::
|
||||
|
||||
idx = 0
|
||||
while True:
|
||||
try:
|
||||
yield self.read(index=idx)
|
||||
except ValueError:
|
||||
break
|
||||
|
||||
It works very similar to ``read``, and you can consult the documentation
|
||||
of that method for additional information on desired behavior.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
**kwargs : Any
|
||||
The iter method may accept any number of plugin-specific keyword
|
||||
arguments to further customize the reading/iteration behavior.
|
||||
Usually these match the arguments available on the backend and are
|
||||
forwarded to it.
|
||||
|
||||
Yields
|
||||
------
|
||||
ndimage : np.ndarray
|
||||
A ndimage containing decoded pixel data (sometimes called bitmap).
|
||||
|
||||
See Also
|
||||
--------
|
||||
PluginV3.read
|
||||
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def properties(self, index: int = 0) -> ImageProperties:
|
||||
"""Standardized ndimage metadata.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
index : int
|
||||
If the ImageResource contains multiple ndimages, and index is an
|
||||
integer, select the index-th ndimage from among them and return its
|
||||
properties. If index is an ellipsis (...), read all ndimages in the file
|
||||
and stack them along a new batch dimension and return their properties.
|
||||
If index is None, the plugin decides the default.
|
||||
|
||||
Returns
|
||||
-------
|
||||
properties : ImageProperties
|
||||
A dataclass filled with standardized image metadata.
|
||||
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def metadata(self, index: int = 0, exclude_applied: bool = True) -> Dict[str, Any]:
|
||||
"""Format-Specific ndimage metadata.
|
||||
|
||||
The method reads metadata stored in the ImageResource and returns it as
|
||||
a python dict. The plugin is free to choose which name to give a piece
|
||||
of metadata; however, if possible, it should match the name given by the
|
||||
format. There is no requirement regarding the fields a plugin must
|
||||
expose; however, if a plugin does expose any,``exclude_applied`` applies
|
||||
to these fields.
|
||||
|
||||
If the plugin does return metadata items, it must check the value of
|
||||
``exclude_applied`` before returning them. If ``exclude applied`` is
|
||||
True, then any metadata item that would be applied to an ndimage
|
||||
returned by ``read`` (or ``iter``) must not be returned. This is done to
|
||||
avoid confusion; for example, if an ImageResource defines the ExIF
|
||||
rotation tag, and the plugin applies the rotation to the data before
|
||||
returning it, then ``exclude_applied`` prevents confusion on whether the
|
||||
tag was already applied or not.
|
||||
|
||||
The `kwarg` ``index`` behaves similar to its counterpart in ``read``
|
||||
with one exception: If the ``index`` is None, then global metadata is
|
||||
returned instead of returning a combination of all metadata items. If
|
||||
there is no global metadata, the Plugin should return an empty dict or
|
||||
raise an exception.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
index : int
|
||||
If the ImageResource contains multiple ndimages, and index is an
|
||||
integer, select the index-th ndimage from among them and return its
|
||||
metadata. If index is an ellipsis (...), return global metadata. If
|
||||
index is None, the plugin decides the default.
|
||||
exclude_applied : bool
|
||||
If True (default), do not report metadata fields that the plugin
|
||||
would apply/consume while reading the image.
|
||||
|
||||
Returns
|
||||
-------
|
||||
metadata : dict
|
||||
A dictionary filled with format-specific metadata fields and their
|
||||
values.
|
||||
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the ImageResource.
|
||||
|
||||
This method allows a plugin to behave similar to the python build-in ``open``::
|
||||
|
||||
image_file = my_plugin(Request, "r")
|
||||
...
|
||||
image_file.close()
|
||||
|
||||
It is used by the context manager and deconstructor below to avoid leaking
|
||||
ImageResources. If the plugin has no other cleanup to do it doesn't have
|
||||
to overwrite this method itself and can rely on the implementation
|
||||
below.
|
||||
|
||||
"""
|
||||
|
||||
self.request.finish()
|
||||
|
||||
@property
|
||||
def request(self) -> Request:
|
||||
return self._request
|
||||
|
||||
def __enter__(self) -> "PluginV3":
|
||||
return self
|
||||
|
||||
def __exit__(self, type, value, traceback) -> None:
|
||||
self.close()
|
||||
|
||||
def __del__(self) -> None:
|
||||
self.close()
|
||||
Reference in New Issue
Block a user