comment here

This commit is contained in:
ton
2023-03-18 20:03:34 +07:00
commit 4553a0a589
14513 changed files with 2685043 additions and 0 deletions

View File

@@ -0,0 +1,8 @@
from ._optical_flow import optical_flow_tvl1, optical_flow_ilk
from ._phase_cross_correlation import phase_cross_correlation
__all__ = [
'optical_flow_ilk',
'optical_flow_tvl1',
'phase_cross_correlation'
]

View File

@@ -0,0 +1,303 @@
"""
Implementation of the masked normalized cross-correlation.
Based on the following publication:
D. Padfield. Masked object registration in the Fourier domain.
IEEE Transactions on Image Processing (2012)
and the author's original MATLAB implementation, available on this website:
http://www.dirkpadfield.com/
"""
from functools import partial
import numpy as np
import scipy.fft as fftmodule
from scipy.fft import next_fast_len
from .._shared.utils import _supported_float_type
def _masked_phase_cross_correlation(reference_image, moving_image,
reference_mask, moving_mask=None,
overlap_ratio=0.3):
"""Masked image translation registration by masked normalized
cross-correlation.
Parameters
----------
reference_image : ndarray
Reference image.
moving_image : ndarray
Image to register. Must be same dimensionality as ``reference_image``,
but not necessarily the same size.
reference_mask : ndarray
Boolean mask for ``reference_image``. The mask should evaluate
to ``True`` (or 1) on valid pixels. ``reference_mask`` should
have the same shape as ``reference_image``.
moving_mask : ndarray or None, optional
Boolean mask for ``moving_image``. The mask should evaluate to ``True``
(or 1) on valid pixels. ``moving_mask`` should have the same shape
as ``moving_image``. If ``None``, ``reference_mask`` will be used.
overlap_ratio : float, optional
Minimum allowed overlap ratio between images. The correlation for
translations corresponding with an overlap ratio lower than this
threshold will be ignored. A lower `overlap_ratio` leads to smaller
maximum translation, while a higher `overlap_ratio` leads to greater
robustness against spurious matches due to small overlap between
masked images.
Returns
-------
shifts : ndarray
Shift vector (in pixels) required to register ``moving_image``
with ``reference_image``. Axis ordering is consistent with
numpy (e.g. Z, Y, X)
References
----------
.. [1] Dirk Padfield. Masked Object Registration in the Fourier Domain.
IEEE Transactions on Image Processing, vol. 21(5),
pp. 2706-2718 (2012). :DOI:`10.1109/TIP.2011.2181402`
.. [2] D. Padfield. "Masked FFT registration". In Proc. Computer Vision and
Pattern Recognition, pp. 2918-2925 (2010).
:DOI:`10.1109/CVPR.2010.5540032`
"""
if moving_mask is None:
if reference_image.shape != moving_image.shape:
raise ValueError(
"Input images have different shapes, moving_mask must "
"be explicitly set.")
moving_mask = reference_mask.astype(bool)
# We need masks to be of the same size as their respective images
for (im, mask) in [(reference_image, reference_mask),
(moving_image, moving_mask)]:
if im.shape != mask.shape:
raise ValueError(
"Image sizes must match their respective mask sizes.")
xcorr = cross_correlate_masked(moving_image, reference_image,
moving_mask, reference_mask,
axes=tuple(range(moving_image.ndim)),
mode='full',
overlap_ratio=overlap_ratio)
# Generalize to the average of multiple equal maxima
maxima = np.stack(np.nonzero(xcorr == xcorr.max()), axis=1)
center = np.mean(maxima, axis=0)
shifts = center - np.array(reference_image.shape) + 1
# The mismatch in size will impact the center location of the
# cross-correlation
size_mismatch = (np.array(moving_image.shape)
- np.array(reference_image.shape))
return -shifts + (size_mismatch / 2)
def cross_correlate_masked(arr1, arr2, m1, m2, mode='full', axes=(-2, -1),
overlap_ratio=0.3):
"""
Masked normalized cross-correlation between arrays.
Parameters
----------
arr1 : ndarray
First array.
arr2 : ndarray
Seconds array. The dimensions of `arr2` along axes that are not
transformed should be equal to that of `arr1`.
m1 : ndarray
Mask of `arr1`. The mask should evaluate to `True`
(or 1) on valid pixels. `m1` should have the same shape as `arr1`.
m2 : ndarray
Mask of `arr2`. The mask should evaluate to `True`
(or 1) on valid pixels. `m2` should have the same shape as `arr2`.
mode : {'full', 'same'}, optional
'full':
This returns the convolution at each point of overlap. At
the end-points of the convolution, the signals do not overlap
completely, and boundary effects may be seen.
'same':
The output is the same size as `arr1`, centered with respect
to the `full` output. Boundary effects are less prominent.
axes : tuple of ints, optional
Axes along which to compute the cross-correlation.
overlap_ratio : float, optional
Minimum allowed overlap ratio between images. The correlation for
translations corresponding with an overlap ratio lower than this
threshold will be ignored. A lower `overlap_ratio` leads to smaller
maximum translation, while a higher `overlap_ratio` leads to greater
robustness against spurious matches due to small overlap between
masked images.
Returns
-------
out : ndarray
Masked normalized cross-correlation.
Raises
------
ValueError : if correlation `mode` is not valid, or array dimensions along
non-transformation axes are not equal.
References
----------
.. [1] Dirk Padfield. Masked Object Registration in the Fourier Domain.
IEEE Transactions on Image Processing, vol. 21(5),
pp. 2706-2718 (2012). :DOI:`10.1109/TIP.2011.2181402`
.. [2] D. Padfield. "Masked FFT registration". In Proc. Computer Vision and
Pattern Recognition, pp. 2918-2925 (2010).
:DOI:`10.1109/CVPR.2010.5540032`
"""
if mode not in {'full', 'same'}:
raise ValueError(f"Correlation mode '{mode}' is not valid.")
fixed_image = np.asarray(arr1)
moving_image = np.asarray(arr2)
float_dtype = _supported_float_type(
[fixed_image.dtype, moving_image.dtype]
)
if float_dtype.kind == 'c':
raise ValueError("complex-valued arr1, arr2 are not supported")
fixed_image = fixed_image.astype(float_dtype)
fixed_mask = np.array(m1, dtype=bool)
moving_image = moving_image.astype(float_dtype)
moving_mask = np.array(m2, dtype=bool)
eps = np.finfo(float_dtype).eps
# Array dimensions along non-transformation axes should be equal.
all_axes = set(range(fixed_image.ndim))
for axis in (all_axes - set(axes)):
if fixed_image.shape[axis] != moving_image.shape[axis]:
raise ValueError(
f'Array shapes along non-transformation axes should be '
f'equal, but dimensions along axis {axis} are not.')
# Determine final size along transformation axes
# Note that it might be faster to compute Fourier transform in a slightly
# larger shape (`fast_shape`). Then, after all fourier transforms are done,
# we slice back to`final_shape` using `final_slice`.
final_shape = list(arr1.shape)
for axis in axes:
final_shape[axis] = fixed_image.shape[axis] + \
moving_image.shape[axis] - 1
final_shape = tuple(final_shape)
final_slice = tuple([slice(0, int(sz)) for sz in final_shape])
# Extent transform axes to the next fast length (i.e. multiple of 3, 5, or
# 7)
fast_shape = tuple([next_fast_len(final_shape[ax]) for ax in axes])
# We use the new scipy.fft because they allow leaving the transform axes
# unchanged which was not possible with scipy.fftpack's
# fftn/ifftn in older versions of SciPy.
# E.g. arr shape (2, 3, 7), transform along axes (0, 1) with shape (4, 4)
# results in arr_fft shape (4, 4, 7)
fft = partial(fftmodule.fftn, s=fast_shape, axes=axes)
_ifft = partial(fftmodule.ifftn, s=fast_shape, axes=axes)
def ifft(x):
return _ifft(x).real
fixed_image[np.logical_not(fixed_mask)] = 0.0
moving_image[np.logical_not(moving_mask)] = 0.0
# N-dimensional analog to rotation by 180deg is flip over all relevant axes.
# See [1] for discussion.
rotated_moving_image = _flip(moving_image, axes=axes)
rotated_moving_mask = _flip(moving_mask, axes=axes)
fixed_fft = fft(fixed_image)
rotated_moving_fft = fft(rotated_moving_image)
fixed_mask_fft = fft(fixed_mask.astype(float_dtype))
rotated_moving_mask_fft = fft(rotated_moving_mask.astype(float_dtype))
# Calculate overlap of masks at every point in the convolution.
# Locations with high overlap should not be taken into account.
number_overlap_masked_px = ifft(rotated_moving_mask_fft * fixed_mask_fft)
number_overlap_masked_px[:] = np.round(number_overlap_masked_px)
number_overlap_masked_px[:] = np.fmax(number_overlap_masked_px, eps)
masked_correlated_fixed_fft = ifft(rotated_moving_mask_fft * fixed_fft)
masked_correlated_rotated_moving_fft = ifft(
fixed_mask_fft * rotated_moving_fft)
numerator = ifft(rotated_moving_fft * fixed_fft)
numerator -= masked_correlated_fixed_fft * \
masked_correlated_rotated_moving_fft / number_overlap_masked_px
fixed_squared_fft = fft(np.square(fixed_image))
fixed_denom = ifft(rotated_moving_mask_fft * fixed_squared_fft)
fixed_denom -= np.square(masked_correlated_fixed_fft) / \
number_overlap_masked_px
fixed_denom[:] = np.fmax(fixed_denom, 0.0)
rotated_moving_squared_fft = fft(np.square(rotated_moving_image))
moving_denom = ifft(fixed_mask_fft * rotated_moving_squared_fft)
moving_denom -= np.square(masked_correlated_rotated_moving_fft) / \
number_overlap_masked_px
moving_denom[:] = np.fmax(moving_denom, 0.0)
denom = np.sqrt(fixed_denom * moving_denom)
# Slice back to expected convolution shape.
numerator = numerator[final_slice]
denom = denom[final_slice]
number_overlap_masked_px = number_overlap_masked_px[final_slice]
if mode == 'same':
_centering = partial(_centered,
newshape=fixed_image.shape, axes=axes)
denom = _centering(denom)
numerator = _centering(numerator)
number_overlap_masked_px = _centering(number_overlap_masked_px)
# Pixels where `denom` is very small will introduce large
# numbers after division. To get around this problem,
# we zero-out problematic pixels.
tol = 1e3 * eps * np.max(np.abs(denom), axis=axes, keepdims=True)
nonzero_indices = denom > tol
# explicitly set out dtype for compatibility with SciPy < 1.4, where
# fftmodule will be numpy.fft which always uses float64 dtype.
out = np.zeros_like(denom, dtype=float_dtype)
out[nonzero_indices] = numerator[nonzero_indices] / denom[nonzero_indices]
np.clip(out, a_min=-1, a_max=1, out=out)
# Apply overlap ratio threshold
number_px_threshold = overlap_ratio * np.max(number_overlap_masked_px,
axis=axes, keepdims=True)
out[number_overlap_masked_px < number_px_threshold] = 0.0
return out
def _centered(arr, newshape, axes):
""" Return the center `newshape` portion of `arr`, leaving axes not
in `axes` untouched. """
newshape = np.asarray(newshape)
currshape = np.array(arr.shape)
slices = [slice(None, None)] * arr.ndim
for ax in axes:
startind = (currshape[ax] - newshape[ax]) // 2
endind = startind + newshape[ax]
slices[ax] = slice(startind, endind)
return arr[tuple(slices)]
def _flip(arr, axes=None):
""" Reverse array over many axes. Generalization of arr[::-1] for many
dimensions. If `axes` is `None`, flip along all axes. """
if axes is None:
reverse = [slice(None, None, -1)] * arr.ndim
else:
reverse = [slice(None, None, None)] * arr.ndim
for axis in axes:
reverse[axis] = slice(None, None, -1)
return arr[tuple(reverse)]

View File

@@ -0,0 +1,384 @@
"""TV-L1 optical flow algorithm implementation.
"""
from functools import partial
from itertools import combinations_with_replacement
import numpy as np
from scipy import ndimage as ndi
from .._shared.filters import gaussian as gaussian_filter
from .._shared.utils import _supported_float_type
from ..transform import warp
from ._optical_flow_utils import coarse_to_fine, get_warp_points
def _tvl1(reference_image, moving_image, flow0, attachment, tightness,
num_warp, num_iter, tol, prefilter):
"""TV-L1 solver for optical flow estimation.
Parameters
----------
reference_image : ndarray, shape (M, N[, P[, ...]])
The first gray scale image of the sequence.
moving_image : ndarray, shape (M, N[, P[, ...]])
The second gray scale image of the sequence.
flow0 : ndarray, shape (image0.ndim, M, N[, P[, ...]])
Initialization for the vector field.
attachment : float
Attachment parameter. The smaller this parameter is,
the smoother is the solutions.
tightness : float
Tightness parameter. It should have a small value in order to
maintain attachment and regularization parts in
correspondence.
num_warp : int
Number of times moving_image is warped.
num_iter : int
Number of fixed point iteration.
tol : float
Tolerance used as stopping criterion based on the L² distance
between two consecutive values of (u, v).
prefilter : bool
Whether to prefilter the estimated optical flow before each
image warp.
Returns
-------
flow : ndarray, shape ((image0.ndim, M, N[, P[, ...]])
The estimated optical flow components for each axis.
"""
dtype = reference_image.dtype
grid = np.meshgrid(*[np.arange(n, dtype=dtype)
for n in reference_image.shape],
indexing='ij', sparse=True)
dt = 0.5 / reference_image.ndim
reg_num_iter = 2
f0 = attachment * tightness
f1 = dt / tightness
tol *= reference_image.size
flow_current = flow_previous = flow0
g = np.zeros((reference_image.ndim,) + reference_image.shape, dtype=dtype)
proj = np.zeros((reference_image.ndim, reference_image.ndim,)
+ reference_image.shape, dtype=dtype)
s_g = [slice(None), ] * g.ndim
s_p = [slice(None), ] * proj.ndim
s_d = [slice(None), ] * (proj.ndim-2)
for _ in range(num_warp):
if prefilter:
flow_current = ndi.median_filter(flow_current,
[1] + reference_image.ndim * [3])
image1_warp = warp(moving_image, get_warp_points(grid, flow_current),
mode='edge')
grad = np.array(np.gradient(image1_warp))
NI = (grad*grad).sum(0)
NI[NI == 0] = 1
rho_0 = image1_warp - reference_image - (grad * flow_current).sum(0)
for _ in range(num_iter):
# Data term
rho = rho_0 + (grad*flow_current).sum(0)
idx = abs(rho) <= f0 * NI
flow_auxiliary = flow_current
flow_auxiliary[:, idx] -= rho[idx]*grad[:, idx]/NI[idx]
idx = ~idx
srho = f0 * np.sign(rho[idx])
flow_auxiliary[:, idx] -= srho*grad[:, idx]
# Regularization term
flow_current = flow_auxiliary.copy()
for idx in range(reference_image.ndim):
s_p[0] = idx
for _ in range(reg_num_iter):
for ax in range(reference_image.ndim):
s_g[0] = ax
s_g[ax+1] = slice(0, -1)
g[tuple(s_g)] = np.diff(flow_current[idx], axis=ax)
s_g[ax+1] = slice(None)
norm = np.sqrt((g ** 2).sum(0))[np.newaxis, ...]
norm *= f1
norm += 1.
proj[idx] -= dt * g
proj[idx] /= norm
# d will be the (negative) divergence of proj[idx]
d = -proj[idx].sum(0)
for ax in range(reference_image.ndim):
s_p[1] = ax
s_p[ax+2] = slice(0, -1)
s_d[ax] = slice(1, None)
d[tuple(s_d)] += proj[tuple(s_p)]
s_p[ax+2] = slice(None)
s_d[ax] = slice(None)
flow_current[idx] = flow_auxiliary[idx] + d
flow_previous -= flow_current # The difference as stopping criteria
if (flow_previous*flow_previous).sum() < tol:
break
flow_previous = flow_current
return flow_current
def optical_flow_tvl1(reference_image, moving_image,
*,
attachment=15, tightness=0.3, num_warp=5, num_iter=10,
tol=1e-4, prefilter=False, dtype=np.float32):
r"""Coarse to fine optical flow estimator.
The TV-L1 solver is applied at each level of the image
pyramid. TV-L1 is a popular algorithm for optical flow estimation
introduced by Zack et al. [1]_, improved in [2]_ and detailed in [3]_.
Parameters
----------
reference_image : ndarray, shape (M, N[, P[, ...]])
The first gray scale image of the sequence.
moving_image : ndarray, shape (M, N[, P[, ...]])
The second gray scale image of the sequence.
attachment : float, optional
Attachment parameter (:math:`\lambda` in [1]_). The smaller
this parameter is, the smoother the returned result will be.
tightness : float, optional
Tightness parameter (:math:`\tau` in [1]_). It should have
a small value in order to maintain attachment and
regularization parts in correspondence.
num_warp : int, optional
Number of times moving_image is warped.
num_iter : int, optional
Number of fixed point iteration.
tol : float, optional
Tolerance used as stopping criterion based on the L² distance
between two consecutive values of (u, v).
prefilter : bool, optional
Whether to prefilter the estimated optical flow before each
image warp. When True, a median filter with window size 3
along each axis is applied. This helps to remove potential
outliers.
dtype : dtype, optional
Output data type: must be floating point. Single precision
provides good results and saves memory usage and computation
time compared to double precision.
Returns
-------
flow : ndarray, shape ((image0.ndim, M, N[, P[, ...]])
The estimated optical flow components for each axis.
Notes
-----
Color images are not supported.
References
----------
.. [1] Zach, C., Pock, T., & Bischof, H. (2007, September). A
duality based approach for realtime TV-L 1 optical flow. In Joint
pattern recognition symposium (pp. 214-223). Springer, Berlin,
Heidelberg. :DOI:`10.1007/978-3-540-74936-3_22`
.. [2] Wedel, A., Pock, T., Zach, C., Bischof, H., & Cremers,
D. (2009). An improved algorithm for TV-L 1 optical flow. In
Statistical and geometrical approaches to visual motion analysis
(pp. 23-45). Springer, Berlin, Heidelberg.
:DOI:`10.1007/978-3-642-03061-1_2`
.. [3] Pérez, J. S., Meinhardt-Llopis, E., & Facciolo,
G. (2013). TV-L1 optical flow estimation. Image Processing On
Line, 2013, 137-150. :DOI:`10.5201/ipol.2013.26`
Examples
--------
>>> from skimage.color import rgb2gray
>>> from skimage.data import stereo_motorcycle
>>> from skimage.registration import optical_flow_tvl1
>>> image0, image1, disp = stereo_motorcycle()
>>> # --- Convert the images to gray level: color is not supported.
>>> image0 = rgb2gray(image0)
>>> image1 = rgb2gray(image1)
>>> flow = optical_flow_tvl1(image1, image0)
"""
solver = partial(_tvl1, attachment=attachment,
tightness=tightness, num_warp=num_warp, num_iter=num_iter,
tol=tol, prefilter=prefilter)
if np.dtype(dtype) != _supported_float_type(dtype):
msg = f"dtype={dtype} is not supported. Try 'float32' or 'float64.'"
raise ValueError(msg)
return coarse_to_fine(reference_image, moving_image, solver, dtype=dtype)
def _ilk(reference_image, moving_image, flow0, radius, num_warp, gaussian,
prefilter):
"""Iterative Lucas-Kanade (iLK) solver for optical flow estimation.
Parameters
----------
reference_image : ndarray, shape (M, N[, P[, ...]])
The first gray scale image of the sequence.
moving_image : ndarray, shape (M, N[, P[, ...]])
The second gray scale image of the sequence.
flow0 : ndarray, shape (reference_image.ndim, M, N[, P[, ...]])
Initialization for the vector field.
radius : int
Radius of the window considered around each pixel.
num_warp : int
Number of times moving_image is warped.
gaussian : bool
if True, a gaussian kernel is used for the local
integration. Otherwise, a uniform kernel is used.
prefilter : bool
Whether to prefilter the estimated optical flow before each
image warp. This helps to remove potential outliers.
Returns
-------
flow : ndarray, shape ((reference_image.ndim, M, N[, P[, ...]])
The estimated optical flow components for each axis.
"""
dtype = reference_image.dtype
ndim = reference_image.ndim
size = 2 * radius + 1
if gaussian:
sigma = ndim * (size / 4, )
filter_func = partial(gaussian_filter, sigma=sigma, mode='mirror')
else:
filter_func = partial(ndi.uniform_filter, size=ndim * (size, ),
mode='mirror')
flow = flow0
# For each pixel location (i, j), the optical flow X = flow[:, i, j]
# is the solution of the ndim x ndim linear system
# A[i, j] * X = b[i, j]
A = np.zeros(reference_image.shape + (ndim, ndim), dtype=dtype)
b = np.zeros(reference_image.shape + (ndim, ), dtype=dtype)
grid = np.meshgrid(*[np.arange(n, dtype=dtype)
for n in reference_image.shape],
indexing='ij', sparse=True)
for _ in range(num_warp):
if prefilter:
flow = ndi.median_filter(flow, (1, ) + ndim * (3, ))
moving_image_warp = warp(moving_image, get_warp_points(grid, flow),
mode='edge')
grad = np.stack(np.gradient(moving_image_warp), axis=0)
error_image = ((grad * flow).sum(axis=0)
+ reference_image - moving_image_warp)
# Local linear systems creation
for i, j in combinations_with_replacement(range(ndim), 2):
A[..., i, j] = A[..., j, i] = filter_func(grad[i] * grad[j])
for i in range(ndim):
b[..., i] = filter_func(grad[i] * error_image)
# Don't consider badly conditioned linear systems
idx = abs(np.linalg.det(A)) < 1e-14
A[idx] = np.eye(ndim, dtype=dtype)
b[idx] = 0
# Solve the local linear systems
flow = np.moveaxis(np.linalg.solve(A, b), ndim, 0)
return flow
def optical_flow_ilk(reference_image, moving_image, *,
radius=7, num_warp=10, gaussian=False,
prefilter=False, dtype=np.float32):
"""Coarse to fine optical flow estimator.
The iterative Lucas-Kanade (iLK) solver is applied at each level
of the image pyramid. iLK [1]_ is a fast and robust alternative to
TVL1 algorithm although less accurate for rendering flat surfaces
and object boundaries (see [2]_).
Parameters
----------
reference_image : ndarray, shape (M, N[, P[, ...]])
The first gray scale image of the sequence.
moving_image : ndarray, shape (M, N[, P[, ...]])
The second gray scale image of the sequence.
radius : int, optional
Radius of the window considered around each pixel.
num_warp : int, optional
Number of times moving_image is warped.
gaussian : bool, optional
If True, a Gaussian kernel is used for the local
integration. Otherwise, a uniform kernel is used.
prefilter : bool, optional
Whether to prefilter the estimated optical flow before each
image warp. When True, a median filter with window size 3
along each axis is applied. This helps to remove potential
outliers.
dtype : dtype, optional
Output data type: must be floating point. Single precision
provides good results and saves memory usage and computation
time compared to double precision.
Returns
-------
flow : ndarray, shape ((reference_image.ndim, M, N[, P[, ...]])
The estimated optical flow components for each axis.
Notes
-----
- The implemented algorithm is described in **Table2** of [1]_.
- Color images are not supported.
References
----------
.. [1] Le Besnerais, G., & Champagnat, F. (2005, September). Dense
optical flow by iterative local window registration. In IEEE
International Conference on Image Processing 2005 (Vol. 1,
pp. I-137). IEEE. :DOI:`10.1109/ICIP.2005.1529706`
.. [2] Plyer, A., Le Besnerais, G., & Champagnat,
F. (2016). Massively parallel Lucas Kanade optical flow for
real-time video processing applications. Journal of Real-Time
Image Processing, 11(4), 713-730. :DOI:`10.1007/s11554-014-0423-0`
Examples
--------
>>> from skimage.color import rgb2gray
>>> from skimage.data import stereo_motorcycle
>>> from skimage.registration import optical_flow_ilk
>>> reference_image, moving_image, disp = stereo_motorcycle()
>>> # --- Convert the images to gray level: color is not supported.
>>> reference_image = rgb2gray(reference_image)
>>> moving_image = rgb2gray(moving_image)
>>> flow = optical_flow_ilk(moving_image, reference_image)
"""
solver = partial(_ilk, radius=radius, num_warp=num_warp, gaussian=gaussian,
prefilter=prefilter)
if np.dtype(dtype) != _supported_float_type(dtype):
msg = f"dtype={dtype} is not supported. Try 'float32' or 'float64.'"
raise ValueError(msg)
return coarse_to_fine(reference_image, moving_image, solver, dtype=dtype)

View File

@@ -0,0 +1,150 @@
"""Common tools to optical flow algorithms.
"""
import numpy as np
from scipy import ndimage as ndi
from ..transform import pyramid_reduce
from ..util.dtype import _convert
def get_warp_points(grid, flow):
"""Compute warp point coordinates.
Parameters
----------
grid : iterable
The sparse grid to be warped (obtained using
``np.meshgrid(..., sparse=True)).``)
flow : ndarray
The warping motion field.
Returns
-------
out : ndarray
The warp point coordinates.
"""
out = flow.copy()
for idx, g in enumerate(grid):
out[idx, ...] += g
return out
def resize_flow(flow, shape):
"""Rescale the values of the vector field (u, v) to the desired shape.
The values of the output vector field are scaled to the new
resolution.
Parameters
----------
flow : ndarray
The motion field to be processed.
shape : iterable
Couple of integers representing the output shape.
Returns
-------
rflow : ndarray
The resized and rescaled motion field.
"""
scale = [n / o for n, o in zip(shape, flow.shape[1:])]
scale_factor = np.array(scale, dtype=flow.dtype)
for _ in shape:
scale_factor = scale_factor[..., np.newaxis]
rflow = scale_factor*ndi.zoom(flow, [1] + scale, order=0,
mode='nearest', prefilter=False)
return rflow
def get_pyramid(I, downscale=2.0, nlevel=10, min_size=16):
"""Construct image pyramid.
Parameters
----------
I : ndarray
The image to be preprocessed (Gray scale or RGB).
downscale : float
The pyramid downscale factor.
nlevel : int
The maximum number of pyramid levels.
min_size : int
The minimum size for any dimension of the pyramid levels.
Returns
-------
pyramid : list[ndarray]
The coarse to fine images pyramid.
"""
pyramid = [I]
size = min(I.shape)
count = 1
while (count < nlevel) and (size > downscale * min_size):
J = pyramid_reduce(pyramid[-1], downscale, channel_axis=None)
pyramid.append(J)
size = min(J.shape)
count += 1
return pyramid[::-1]
def coarse_to_fine(I0, I1, solver, downscale=2, nlevel=10, min_size=16,
dtype=np.float32):
"""Generic coarse to fine solver.
Parameters
----------
I0 : ndarray
The first gray scale image of the sequence.
I1 : ndarray
The second gray scale image of the sequence.
solver : callable
The solver applied at each pyramid level.
downscale : float
The pyramid downscale factor.
nlevel : int
The maximum number of pyramid levels.
min_size : int
The minimum size for any dimension of the pyramid levels.
dtype : dtype
Output data type.
Returns
-------
flow : ndarray
The estimated optical flow components for each axis.
"""
if I0.shape != I1.shape:
raise ValueError("Input images should have the same shape")
if np.dtype(dtype).char not in 'efdg':
raise ValueError("Only floating point data type are valid"
" for optical flow")
pyramid = list(zip(get_pyramid(_convert(I0, dtype),
downscale, nlevel, min_size),
get_pyramid(_convert(I1, dtype),
downscale, nlevel, min_size)))
# Initialization to 0 at coarsest level.
flow = np.zeros((pyramid[0][0].ndim, ) + pyramid[0][0].shape,
dtype=dtype)
flow = solver(pyramid[0][0], pyramid[0][1], flow)
for J0, J1 in pyramid[1:]:
flow = solver(J0, J1, resize_flow(flow, J0.shape))
return flow

View File

@@ -0,0 +1,418 @@
"""
Port of Manuel Guizar's code from:
http://www.mathworks.com/matlabcentral/fileexchange/18401-efficient-subpixel-image-registration-by-cross-correlation
"""
import itertools
import warnings
import numpy as np
from scipy.fft import fftn, ifftn, fftfreq
from scipy import ndimage as ndi
from ._masked_phase_cross_correlation import _masked_phase_cross_correlation
def _upsampled_dft(data, upsampled_region_size,
upsample_factor=1, axis_offsets=None):
"""
Upsampled DFT by matrix multiplication.
This code is intended to provide the same result as if the following
operations were performed:
- Embed the array "data" in an array that is ``upsample_factor`` times
larger in each dimension. ifftshift to bring the center of the
image to (1,1).
- Take the FFT of the larger array.
- Extract an ``[upsampled_region_size]`` region of the result, starting
with the ``[axis_offsets+1]`` element.
It achieves this result by computing the DFT in the output array without
the need to zeropad. Much faster and memory efficient than the zero-padded
FFT approach if ``upsampled_region_size`` is much smaller than
``data.size * upsample_factor``.
Parameters
----------
data : array
The input data array (DFT of original data) to upsample.
upsampled_region_size : integer or tuple of integers, optional
The size of the region to be sampled. If one integer is provided, it
is duplicated up to the dimensionality of ``data``.
upsample_factor : integer, optional
The upsampling factor. Defaults to 1.
axis_offsets : tuple of integers, optional
The offsets of the region to be sampled. Defaults to None (uses
image center)
Returns
-------
output : ndarray
The upsampled DFT of the specified region.
"""
# if people pass in an integer, expand it to a list of equal-sized sections
if not hasattr(upsampled_region_size, "__iter__"):
upsampled_region_size = [upsampled_region_size, ] * data.ndim
else:
if len(upsampled_region_size) != data.ndim:
raise ValueError("shape of upsampled region sizes must be equal "
"to input data's number of dimensions.")
if axis_offsets is None:
axis_offsets = [0, ] * data.ndim
else:
if len(axis_offsets) != data.ndim:
raise ValueError("number of axis offsets must be equal to input "
"data's number of dimensions.")
im2pi = 1j * 2 * np.pi
dim_properties = list(zip(data.shape, upsampled_region_size, axis_offsets))
for (n_items, ups_size, ax_offset) in dim_properties[::-1]:
kernel = ((np.arange(ups_size) - ax_offset)[:, None]
* fftfreq(n_items, upsample_factor))
kernel = np.exp(-im2pi * kernel)
# use kernel with same precision as the data
kernel = kernel.astype(data.dtype, copy=False)
# Equivalent to:
# data[i, j, k] = kernel[i, :] @ data[j, k].T
data = np.tensordot(kernel, data, axes=(1, -1))
return data
def _compute_phasediff(cross_correlation_max):
"""
Compute global phase difference between the two images (should be
zero if images are non-negative).
Parameters
----------
cross_correlation_max : complex
The complex value of the cross correlation at its maximum point.
"""
return np.arctan2(cross_correlation_max.imag, cross_correlation_max.real)
def _compute_error(cross_correlation_max, src_amp, target_amp):
"""
Compute RMS error metric between ``src_image`` and ``target_image``.
Parameters
----------
cross_correlation_max : complex
The complex value of the cross correlation at its maximum point.
src_amp : float
The normalized average image intensity of the source image
target_amp : float
The normalized average image intensity of the target image
"""
error = 1.0 - cross_correlation_max * cross_correlation_max.conj() /\
(src_amp * target_amp)
return np.sqrt(np.abs(error))
def _disambiguate_shift(reference_image, moving_image, shift):
"""Determine the correct real-space shift based on periodic shift.
When determining a translation shift from phase cross-correlation in
Fourier space, the shift is only correct to within a period of the image
size along each axis, resulting in $2^n$ possible shifts, where $n$ is the
number of dimensions of the image. This function checks the
cross-correlation in real space for each of those shifts, and returns the
one with the highest cross-correlation.
The strategy we use is to perform the shift on the moving image *using the
'grid-wrap' mode* in `scipy.ndimage`. The moving image's original borders
then define $2^n$ quadrants, which we cross-correlate with the reference
image in turn using slicing. The entire operation is thus $O(2^n + m)$,
where $m$ is the number of pixels in the image (and typically dominates).
Parameters
----------
reference_image : numpy array
The reference (non-moving) image.
moving_image : numpy array
The moving image: applying the shift to this image overlays it on the
reference image. Must be the same shape as the reference image.
shift : tuple of float
The shift to apply to each axis of the moving image, *modulo* image
size. The length of ``shift`` must be equal to ``moving_image.ndim``.
Returns
-------
real_shift : tuple of float
The shift disambiguated in real space.
"""
shape = reference_image.shape
positive_shift = [shift_i % s for shift_i, s in zip(shift, shape)]
negative_shift = [shift_i - s
for shift_i, s in zip(positive_shift, shape)]
subpixel = np.any(np.array(shift) % 1 != 0)
interp_order = 3 if subpixel else 0
shifted = ndi.shift(
moving_image, shift, mode='grid-wrap', order=interp_order
)
indices = np.round(positive_shift).astype(int)
splits_per_dim = [(slice(0, i), slice(i, None)) for i in indices]
max_corr = -1.0
max_slice = None
for test_slice in itertools.product(*splits_per_dim):
reference_tile = np.reshape(reference_image[test_slice], -1)
moving_tile = np.reshape(shifted[test_slice], -1)
corr = -1.0
if reference_tile.size > 2:
corr = np.corrcoef(reference_tile, moving_tile)[0, 1]
if corr > max_corr:
max_corr = corr
max_slice = test_slice
real_shift_acc = []
for sl, pos_shift, neg_shift in zip(
max_slice, positive_shift, negative_shift
):
real_shift_acc.append(pos_shift if sl.stop is None else neg_shift)
if not subpixel:
real_shift = tuple(map(int, real_shift_acc))
else:
real_shift = tuple(real_shift_acc)
return real_shift
def phase_cross_correlation(reference_image, moving_image, *,
upsample_factor=1, space="real",
disambiguate=False,
return_error=True, reference_mask=None,
moving_mask=None, overlap_ratio=0.3,
normalization="phase"):
"""Efficient subpixel image translation registration by cross-correlation.
This code gives the same precision as the FFT upsampled cross-correlation
in a fraction of the computation time and with reduced memory requirements.
It obtains an initial estimate of the cross-correlation peak by an FFT and
then refines the shift estimation by upsampling the DFT only in a small
neighborhood of that estimate by means of a matrix-multiply DFT [1]_.
Parameters
----------
reference_image : array
Reference image.
moving_image : array
Image to register. Must be same dimensionality as
``reference_image``.
upsample_factor : int, optional
Upsampling factor. Images will be registered to within
``1 / upsample_factor`` of a pixel. For example
``upsample_factor == 20`` means the images will be registered
within 1/20th of a pixel. Default is 1 (no upsampling).
Not used if any of ``reference_mask`` or ``moving_mask`` is not None.
space : string, one of "real" or "fourier", optional
Defines how the algorithm interprets input data. "real" means
data will be FFT'd to compute the correlation, while "fourier"
data will bypass FFT of input data. Case insensitive. Not
used if any of ``reference_mask`` or ``moving_mask`` is not
None.
disambiguate : bool
The shift returned by this function is only accurate *modulo* the
image shape, due to the periodic nature of the Fourier transform. If
this parameter is set to ``True``, the *real* space cross-correlation
is computed for each possible shift, and the shift with the highest
cross-correlation within the overlapping area is returned.
return_error : bool, {"always"}, optional
Returns error and phase difference if "always" is given. If False, or
either ``reference_mask`` or ``moving_mask`` are given, only the shift
is returned.
reference_mask : ndarray
Boolean mask for ``reference_image``. The mask should evaluate
to ``True`` (or 1) on valid pixels. ``reference_mask`` should
have the same shape as ``reference_image``.
moving_mask : ndarray or None, optional
Boolean mask for ``moving_image``. The mask should evaluate to ``True``
(or 1) on valid pixels. ``moving_mask`` should have the same shape
as ``moving_image``. If ``None``, ``reference_mask`` will be used.
overlap_ratio : float, optional
Minimum allowed overlap ratio between images. The correlation for
translations corresponding with an overlap ratio lower than this
threshold will be ignored. A lower `overlap_ratio` leads to smaller
maximum translation, while a higher `overlap_ratio` leads to greater
robustness against spurious matches due to small overlap between
masked images. Used only if one of ``reference_mask`` or
``moving_mask`` is not None.
normalization : {"phase", None}
The type of normalization to apply to the cross-correlation. This
parameter is unused when masks (`reference_mask` and `moving_mask`) are
supplied.
Returns
-------
shift : ndarray
Shift vector (in pixels) required to register ``moving_image``
with ``reference_image``. Axis ordering is consistent with
the axis order of the input array.
error : float
Translation invariant normalized RMS error between
``reference_image`` and ``moving_image``. For masked cross-correlation
this error is not available and NaN is returned if ``return_error``
is "always".
phasediff : float
Global phase difference between the two images (should be
zero if images are non-negative). For masked cross-correlation
this phase difference is not available and NaN is returned if
``return_error`` is "always".
Notes
-----
The use of cross-correlation to estimate image translation has a long
history dating back to at least [2]_. The "phase correlation"
method (selected by ``normalization="phase"``) was first proposed in [3]_.
Publications [1]_ and [2]_ use an unnormalized cross-correlation
(``normalization=None``). Which form of normalization is better is
application-dependent. For example, the phase correlation method works
well in registering images under different illumination, but is not very
robust to noise. In a high noise scenario, the unnormalized method may be
preferable.
When masks are provided, a masked normalized cross-correlation algorithm is
used [5]_, [6]_.
References
----------
.. [1] Manuel Guizar-Sicairos, Samuel T. Thurman, and James R. Fienup,
"Efficient subpixel image registration algorithms,"
Optics Letters 33, 156-158 (2008). :DOI:`10.1364/OL.33.000156`
.. [2] P. Anuta, Spatial registration of multispectral and multitemporal
digital imagery using fast Fourier transform techniques, IEEE Trans.
Geosci. Electron., vol. 8, no. 4, pp. 353368, Oct. 1970.
:DOI:`10.1109/TGE.1970.271435`.
.. [3] C. D. Kuglin D. C. Hines. The phase correlation image alignment
method, Proceeding of IEEE International Conference on Cybernetics
and Society, pp. 163-165, New York, NY, USA, 1975, pp. 163165.
.. [4] James R. Fienup, "Invariant error metrics for image reconstruction"
Optics Letters 36, 8352-8357 (1997). :DOI:`10.1364/AO.36.008352`
.. [5] Dirk Padfield. Masked Object Registration in the Fourier Domain.
IEEE Transactions on Image Processing, vol. 21(5),
pp. 2706-2718 (2012). :DOI:`10.1109/TIP.2011.2181402`
.. [6] D. Padfield. "Masked FFT registration". In Proc. Computer Vision and
Pattern Recognition, pp. 2918-2925 (2010).
:DOI:`10.1109/CVPR.2010.5540032`
"""
def warn_return_error():
warnings.warn(
"In scikit-image 0.21, phase_cross_correlation will start "
"returning a tuple or 3 items (shift, error, phasediff) always. "
"To enable the new return behavior and silence this warning, use "
"return_error='always'.",
category=FutureWarning,
stacklevel=3,
)
if (reference_mask is not None) or (moving_mask is not None):
shift = _masked_phase_cross_correlation(
reference_image, moving_image,
reference_mask, moving_mask,
overlap_ratio
)
if return_error == "always":
return shift, np.nan, np.nan
else:
warn_return_error()
return shift
# images must be the same shape
if reference_image.shape != moving_image.shape:
raise ValueError("images must be same shape")
# assume complex data is already in Fourier space
if space.lower() == 'fourier':
src_freq = reference_image
target_freq = moving_image
# real data needs to be fft'd.
elif space.lower() == 'real':
src_freq = fftn(reference_image)
target_freq = fftn(moving_image)
else:
raise ValueError('space argument must be "real" of "fourier"')
# Whole-pixel shift - Compute cross-correlation by an IFFT
shape = src_freq.shape
image_product = src_freq * target_freq.conj()
if normalization == "phase":
eps = np.finfo(image_product.real.dtype).eps
image_product /= np.maximum(np.abs(image_product), 100 * eps)
elif normalization is not None:
raise ValueError("normalization must be either phase or None")
cross_correlation = ifftn(image_product)
# Locate maximum
maxima = np.unravel_index(np.argmax(np.abs(cross_correlation)),
cross_correlation.shape)
midpoint = np.array([np.fix(axis_size / 2) for axis_size in shape])
float_dtype = image_product.real.dtype
shift = np.stack(maxima).astype(float_dtype, copy=False)
shift[shift > midpoint] -= np.array(shape)[shift > midpoint]
if upsample_factor == 1:
if return_error:
src_amp = np.sum(np.real(src_freq * src_freq.conj()))
src_amp /= src_freq.size
target_amp = np.sum(np.real(target_freq * target_freq.conj()))
target_amp /= target_freq.size
CCmax = cross_correlation[maxima]
# If upsampling > 1, then refine estimate with matrix multiply DFT
else:
# Initial shift estimate in upsampled grid
upsample_factor = np.array(upsample_factor, dtype=float_dtype)
shift = np.round(shift * upsample_factor) / upsample_factor
upsampled_region_size = np.ceil(upsample_factor * 1.5)
# Center of output array at dftshift + 1
dftshift = np.fix(upsampled_region_size / 2.0)
# Matrix multiply DFT around the current shift estimate
sample_region_offset = dftshift - shift*upsample_factor
cross_correlation = _upsampled_dft(image_product.conj(),
upsampled_region_size,
upsample_factor,
sample_region_offset).conj()
# Locate maximum and map back to original pixel grid
maxima = np.unravel_index(np.argmax(np.abs(cross_correlation)),
cross_correlation.shape)
CCmax = cross_correlation[maxima]
maxima = np.stack(maxima).astype(float_dtype, copy=False)
maxima -= dftshift
shift += maxima / upsample_factor
if return_error:
src_amp = np.sum(np.real(src_freq * src_freq.conj()))
target_amp = np.sum(np.real(target_freq * target_freq.conj()))
# If its only one row or column the shift along that dimension has no
# effect. We set to zero.
for dim in range(src_freq.ndim):
if shape[dim] == 1:
shift[dim] = 0
if disambiguate:
if space.lower() != 'real':
reference_image = ifftn(reference_image)
moving_image = ifftn(moving_image)
shift = _disambiguate_shift(reference_image, moving_image, shift)
if return_error:
# Redirect user to masked_phase_cross_correlation if NaNs are observed
if np.isnan(CCmax) or np.isnan(src_amp) or np.isnan(target_amp):
raise ValueError(
"NaN values found, please remove NaNs from your "
"input data or use the `reference_mask`/`moving_mask` "
"keywords, eg: "
"phase_cross_correlation(reference_image, moving_image, "
"reference_mask=~np.isnan(reference_image), "
"moving_mask=~np.isnan(moving_image))")
return shift, _compute_error(CCmax, src_amp, target_amp),\
_compute_phasediff(CCmax)
else:
warn_return_error()
return shift

View File

@@ -0,0 +1,98 @@
import numpy as np
import pytest
from skimage._shared.utils import _supported_float_type
from skimage.registration import optical_flow_ilk
from .test_tvl1 import _sin_flow_gen
@pytest.mark.parametrize('dtype', [np.float16, np.float32, np.float64])
@pytest.mark.parametrize('gaussian', [True, False])
@pytest.mark.parametrize('prefilter', [True, False])
def test_2d_motion(dtype, gaussian, prefilter):
# Generate synthetic data
rnd = np.random.default_rng(0)
image0 = rnd.normal(size=(256, 256))
gt_flow, image1 = _sin_flow_gen(image0)
image1 = image1.astype(dtype, copy=False)
float_dtype = _supported_float_type(dtype)
# Estimate the flow
flow = optical_flow_ilk(image0, image1, gaussian=gaussian,
prefilter=prefilter, dtype=float_dtype)
assert flow.dtype == _supported_float_type(dtype)
# Assert that the average absolute error is less then half a pixel
assert abs(flow - gt_flow).mean() < 0.5
if dtype != float_dtype:
with pytest.raises(ValueError):
optical_flow_ilk(image0, image1, gaussian=gaussian,
prefilter=prefilter, dtype=dtype)
@pytest.mark.parametrize('gaussian', [True, False])
@pytest.mark.parametrize('prefilter', [True, False])
def test_3d_motion(gaussian, prefilter):
# Generate synthetic data
rnd = np.random.default_rng(123)
image0 = rnd.normal(size=(50, 55, 60))
gt_flow, image1 = _sin_flow_gen(image0, npics=3)
# Estimate the flow
flow = optical_flow_ilk(image0, image1, radius=5,
gaussian=gaussian, prefilter=prefilter)
# Assert that the average absolute error is less then half a pixel
assert abs(flow - gt_flow).mean() < 0.5
def test_no_motion_2d():
rnd = np.random.default_rng(0)
img = rnd.normal(size=(256, 256))
flow = optical_flow_ilk(img, img)
assert np.all(flow == 0)
def test_no_motion_3d():
rnd = np.random.default_rng(0)
img = rnd.normal(size=(64, 64, 64))
flow = optical_flow_ilk(img, img)
assert np.all(flow == 0)
def test_optical_flow_dtype():
# Generate synthetic data
rnd = np.random.default_rng(0)
image0 = rnd.normal(size=(256, 256))
gt_flow, image1 = _sin_flow_gen(image0)
# Estimate the flow at double precision
flow_f64 = optical_flow_ilk(image0, image1, dtype='float64')
assert flow_f64.dtype == 'float64'
# Estimate the flow at single precision
flow_f32 = optical_flow_ilk(image0, image1, dtype='float32')
assert flow_f32.dtype == 'float32'
# Assert that floating point precision does not affect the quality
# of the estimated flow
assert abs(flow_f64 - flow_f32).mean() < 1e-3
def test_incompatible_shapes():
rnd = np.random.default_rng(0)
I0 = rnd.normal(size=(256, 256))
I1 = rnd.normal(size=(255, 256))
with pytest.raises(ValueError):
u, v = optical_flow_ilk(I0, I1)
def test_wrong_dtype():
rnd = np.random.default_rng(0)
img = rnd.normal(size=(256, 256))
with pytest.raises(ValueError):
u, v = optical_flow_ilk(img, img, dtype='int')

View File

@@ -0,0 +1,279 @@
import numpy as np
import pytest
from numpy.testing import (assert_almost_equal, assert_array_almost_equal,
assert_array_equal, assert_array_less, assert_equal)
from scipy.ndimage import fourier_shift, shift as real_shift
import scipy.fft as fft
from skimage._shared.testing import fetch
from skimage._shared.utils import _supported_float_type
from skimage.data import camera, brain
from skimage.io import imread
from skimage.registration._masked_phase_cross_correlation import (
_masked_phase_cross_correlation as masked_register_translation,
cross_correlate_masked)
from skimage.registration import phase_cross_correlation
def test_masked_registration_vs_phase_cross_correlation():
"""masked_register_translation should give the same results as
phase_cross_correlation in the case of trivial masks."""
reference_image = camera()
shift = (-7, 12)
shifted = np.real(fft.ifft2(fourier_shift(
fft.fft2(reference_image), shift)))
trivial_mask = np.ones_like(reference_image)
nonmasked_result, *_ = phase_cross_correlation(reference_image, shifted)
masked_result = masked_register_translation(reference_image,
shifted,
reference_mask=trivial_mask,
overlap_ratio=1 / 10)
assert_equal(nonmasked_result, masked_result)
def test_masked_registration_random_masks():
"""masked_register_translation should be able to register translations
between images even with random masks."""
# See random number generator for reproducible results
np.random.seed(23)
reference_image = camera()
shift = (-7, 12)
shifted = np.real(fft.ifft2(fourier_shift(
fft.fft2(reference_image), shift)))
# Random masks with 75% of pixels being valid
ref_mask = np.random.choice(
[True, False], reference_image.shape, p=[3 / 4, 1 / 4])
shifted_mask = np.random.choice(
[True, False], shifted.shape, p=[3 / 4, 1 / 4])
measured_shift = masked_register_translation(reference_image,
shifted,
reference_mask=ref_mask,
moving_mask=shifted_mask)
assert_equal(measured_shift, -np.array(shift))
def test_masked_registration_3d_contiguous_mask():
"""masked_register_translation should be able to register translations
between volumes with contiguous masks."""
ref_vol = brain()[:, ::2, ::2]
offset = (1, -5, 10)
# create square mask
ref_mask = np.zeros_like(ref_vol, dtype=bool)
ref_mask[:-2, 75:100, 75:100] = True
ref_shifted = real_shift(ref_vol, offset)
measured_offset = masked_register_translation(
ref_vol, ref_shifted, reference_mask=ref_mask, moving_mask=ref_mask
)
assert_equal(offset, -np.array(measured_offset))
def test_masked_registration_random_masks_non_equal_sizes():
"""masked_register_translation should be able to register
translations between images that are not the same size even
with random masks."""
# See random number generator for reproducible results
np.random.seed(23)
reference_image = camera()
shift = (-7, 12)
shifted = np.real(fft.ifft2(fourier_shift(
fft.fft2(reference_image), shift)))
# Crop the shifted image
shifted = shifted[64:-64, 64:-64]
# Random masks with 75% of pixels being valid
ref_mask = np.random.choice(
[True, False], reference_image.shape, p=[3 / 4, 1 / 4])
shifted_mask = np.random.choice(
[True, False], shifted.shape, p=[3 / 4, 1 / 4])
measured_shift = masked_register_translation(
reference_image,
shifted,
reference_mask=np.ones_like(ref_mask),
moving_mask=np.ones_like(shifted_mask))
assert_equal(measured_shift, -np.array(shift))
def test_masked_registration_padfield_data():
""" Masked translation registration should behave like in the original
publication """
# Test translated from MATLABimplementation `MaskedFFTRegistrationTest`
# file. You can find the source code here:
# http://www.dirkpadfield.com/Home/MaskedFFTRegistrationCode.zip
shifts = [(75, 75), (-130, 130), (130, 130)]
for xi, yi in shifts:
fixed_image = imread(
fetch(f'registration/tests/data/OriginalX{xi}Y{yi}.png'))
moving_image = imread(
fetch(f'registration/tests/data/TransformedX{xi}Y{yi}.png'))
# Valid pixels are 1
fixed_mask = (fixed_image != 0)
moving_mask = (moving_image != 0)
# Note that shifts in x and y and shifts in cols and rows
shift_y, shift_x = masked_register_translation(
fixed_image, moving_image, reference_mask=fixed_mask,
moving_mask=moving_mask, overlap_ratio=0.1)
# Note: by looking at the test code from Padfield's
# MaskedFFTRegistrationCode repository, the
# shifts were not xi and yi, but xi and -yi
assert_equal((shift_x, shift_y), (-xi, yi))
@pytest.mark.parametrize('dtype', [np.float16, np.float32, np.float64])
def test_cross_correlate_masked_output_shape(dtype):
"""Masked normalized cross-correlation should return a shape
of N + M + 1 for each transform axis."""
shape1 = (15, 4, 5)
shape2 = (6, 12, 7)
expected_full_shape = tuple(np.array(shape1) + np.array(shape2) - 1)
expected_same_shape = shape1
arr1 = np.zeros(shape1, dtype=dtype)
arr2 = np.zeros(shape2, dtype=dtype)
# Trivial masks
m1 = np.ones_like(arr1)
m2 = np.ones_like(arr2)
float_dtype = _supported_float_type(dtype)
full_xcorr = cross_correlate_masked(
arr1, arr2, m1, m2, axes=(0, 1, 2), mode='full')
assert_equal(full_xcorr.shape, expected_full_shape)
assert full_xcorr.dtype == float_dtype
same_xcorr = cross_correlate_masked(
arr1, arr2, m1, m2, axes=(0, 1, 2), mode='same')
assert_equal(same_xcorr.shape, expected_same_shape)
assert same_xcorr.dtype == float_dtype
def test_cross_correlate_masked_test_against_mismatched_dimensions():
"""Masked normalized cross-correlation should raise an error if array
dimensions along non-transformation axes are mismatched."""
shape1 = (23, 1, 1)
shape2 = (6, 2, 2)
arr1 = np.zeros(shape1)
arr2 = np.zeros(shape2)
# Trivial masks
m1 = np.ones_like(arr1)
m2 = np.ones_like(arr2)
with pytest.raises(ValueError):
cross_correlate_masked(arr1, arr2, m1, m2, axes=(1, 2))
def test_cross_correlate_masked_output_range():
"""Masked normalized cross-correlation should return between 1 and -1."""
# See random number generator for reproducible results
np.random.seed(23)
# Array dimensions must match along non-transformation axes, in
# this case
# axis 0
shape1 = (15, 4, 5)
shape2 = (15, 12, 7)
# Initial array ranges between -5 and 5
arr1 = 10 * np.random.random(shape1) - 5
arr2 = 10 * np.random.random(shape2) - 5
# random masks
m1 = np.random.choice([True, False], arr1.shape)
m2 = np.random.choice([True, False], arr2.shape)
xcorr = cross_correlate_masked(arr1, arr2, m1, m2, axes=(1, 2))
# No assert array less or equal, so we add an eps
# Also could not find an `assert_array_greater`, Use (-xcorr) instead
eps = np.finfo(float).eps
assert_array_less(xcorr, 1 + eps)
assert_array_less(-xcorr, 1 + eps)
def test_cross_correlate_masked_side_effects():
"""Masked normalized cross-correlation should not modify the inputs."""
shape1 = (2, 2, 2)
shape2 = (2, 2, 2)
arr1 = np.zeros(shape1)
arr2 = np.zeros(shape2)
# Trivial masks
m1 = np.ones_like(arr1)
m2 = np.ones_like(arr2)
for arr in (arr1, arr2, m1, m2):
arr.setflags(write=False)
cross_correlate_masked(arr1, arr2, m1, m2)
def test_cross_correlate_masked_over_axes():
"""Masked normalized cross-correlation over axes should be
equivalent to a loop over non-transform axes."""
# See random number generator for reproducible results
np.random.seed(23)
arr1 = np.random.random((8, 8, 5))
arr2 = np.random.random((8, 8, 5))
m1 = np.random.choice([True, False], arr1.shape)
m2 = np.random.choice([True, False], arr2.shape)
# Loop over last axis
with_loop = np.empty_like(arr1, dtype=complex)
for index in range(arr1.shape[-1]):
with_loop[:, :, index] = cross_correlate_masked(arr1[:, :, index],
arr2[:, :, index],
m1[:, :, index],
m2[:, :, index],
axes=(0, 1),
mode='same')
over_axes = cross_correlate_masked(
arr1, arr2, m1, m2, axes=(0, 1), mode='same')
assert_array_almost_equal(with_loop, over_axes)
def test_cross_correlate_masked_autocorrelation_trivial_masks():
"""Masked normalized cross-correlation between identical arrays
should reduce to an autocorrelation even with random masks."""
# See random number generator for reproducible results
np.random.seed(23)
arr1 = camera()
# Random masks with 75% of pixels being valid
m1 = np.random.choice([True, False], arr1.shape, p=[3 / 4, 1 / 4])
m2 = np.random.choice([True, False], arr1.shape, p=[3 / 4, 1 / 4])
xcorr = cross_correlate_masked(arr1, arr1, m1, m2, axes=(0, 1),
mode='same', overlap_ratio=0).real
max_index = np.unravel_index(np.argmax(xcorr), xcorr.shape)
# Autocorrelation should have maximum in center of array
# uint8 inputs will be processed in float32, so reduce decimal to 5
assert_almost_equal(xcorr.max(), 1, decimal=5)
assert_array_equal(max_index, np.array(arr1.shape) / 2)

View File

@@ -0,0 +1,268 @@
import itertools
import warnings
import re
import numpy as np
import pytest
from numpy.testing import assert_allclose
from scipy.ndimage import fourier_shift
import scipy.fft as fft
from skimage import img_as_float
from skimage._shared._warnings import expected_warnings
from skimage._shared.utils import _supported_float_type
from skimage.data import camera, binary_blobs, eagle
from skimage.registration._phase_cross_correlation import (
phase_cross_correlation, _upsampled_dft
)
@pytest.mark.parametrize('normalization', [None, 'phase'])
def test_correlation(normalization):
reference_image = fft.fftn(camera())
shift = (-7, 12)
shifted_image = fourier_shift(reference_image, shift)
# pixel precision
result, _, _ = phase_cross_correlation(reference_image,
shifted_image,
space="fourier",
normalization=normalization)
assert_allclose(result[:2], -np.array(shift))
@pytest.mark.parametrize('normalization', ['nonexisting'])
def test_correlation_invalid_normalization(normalization):
reference_image = fft.fftn(camera())
shift = (-7, 12)
shifted_image = fourier_shift(reference_image, shift)
# pixel precision
with pytest.raises(ValueError):
phase_cross_correlation(reference_image,
shifted_image,
space="fourier",
normalization=normalization)
@pytest.mark.parametrize('normalization', [None, 'phase'])
def test_subpixel_precision(normalization):
reference_image = fft.fftn(camera())
subpixel_shift = (-2.4, 1.32)
shifted_image = fourier_shift(reference_image, subpixel_shift)
# subpixel precision
result, _, _ = phase_cross_correlation(reference_image,
shifted_image,
upsample_factor=100,
space="fourier",
normalization=normalization)
assert_allclose(result[:2], -np.array(subpixel_shift), atol=0.05)
@pytest.mark.parametrize('dtype', [np.float16, np.float32, np.float64])
def test_real_input(dtype):
reference_image = camera().astype(dtype, copy=False)
subpixel_shift = (-2.4, 1.32)
shifted_image = fourier_shift(fft.fftn(reference_image), subpixel_shift)
shifted_image = fft.ifftn(shifted_image).real.astype(dtype, copy=False)
# subpixel precision
result, error, diffphase = phase_cross_correlation(reference_image,
shifted_image,
upsample_factor=100)
assert result.dtype == _supported_float_type(dtype)
assert_allclose(result[:2], -np.array(subpixel_shift), atol=0.05)
def test_size_one_dimension_input():
# take a strip of the input image
reference_image = fft.fftn(camera()[:, 15]).reshape((-1, 1))
subpixel_shift = (-2.4, 4)
shifted_image = fourier_shift(reference_image, subpixel_shift)
# subpixel precision
result, error, diffphase = phase_cross_correlation(reference_image,
shifted_image,
upsample_factor=20,
space="fourier")
assert_allclose(result[:2], -np.array((-2.4, 0)), atol=0.05)
def test_3d_input():
phantom = img_as_float(binary_blobs(length=32, n_dim=3))
reference_image = fft.fftn(phantom)
shift = (-2., 1., 5.)
shifted_image = fourier_shift(reference_image, shift)
result, error, diffphase = phase_cross_correlation(reference_image,
shifted_image,
space="fourier")
assert_allclose(result, -np.array(shift), atol=0.05)
# subpixel precision now available for 3-D data
subpixel_shift = (-2.3, 1.7, 5.4)
shifted_image = fourier_shift(reference_image, subpixel_shift)
result, error, diffphase = phase_cross_correlation(reference_image,
shifted_image,
upsample_factor=100,
space="fourier")
assert_allclose(result, -np.array(subpixel_shift), atol=0.05)
def test_unknown_space_input():
image = np.ones((5, 5))
with pytest.raises(ValueError):
phase_cross_correlation(
image, image,
space="frank")
def test_wrong_input():
# Dimensionality mismatch
image = np.ones((5, 5, 1))
template = np.ones((5, 5))
with pytest.raises(ValueError):
phase_cross_correlation(template, image)
# Size mismatch
image = np.ones((5, 5))
template = np.ones((4, 4))
with pytest.raises(ValueError):
phase_cross_correlation(template, image)
# NaN values in data
image = np.ones((5, 5))
image[0][0] = np.nan
template = np.ones((5, 5))
with expected_warnings(
[
r"invalid value encountered in true_divide"
+ r"|"
+ r"invalid value encountered in divide"
+ r"|\A\Z"
]
):
with pytest.raises(ValueError):
phase_cross_correlation(template, image, return_error=True)
def test_4d_input_pixel():
phantom = img_as_float(binary_blobs(length=32, n_dim=4))
reference_image = fft.fftn(phantom)
shift = (-2., 1., 5., -3)
shifted_image = fourier_shift(reference_image, shift)
result, error, diffphase = phase_cross_correlation(reference_image,
shifted_image,
space="fourier")
assert_allclose(result, -np.array(shift), atol=0.05)
def test_4d_input_subpixel():
phantom = img_as_float(binary_blobs(length=32, n_dim=4))
reference_image = fft.fftn(phantom)
subpixel_shift = (-2.3, 1.7, 5.4, -3.2)
shifted_image = fourier_shift(reference_image, subpixel_shift)
result, error, diffphase = phase_cross_correlation(reference_image,
shifted_image,
upsample_factor=10,
space="fourier")
assert_allclose(result, -np.array(subpixel_shift), atol=0.05)
@pytest.mark.parametrize("return_error", [True, False, "always"])
@pytest.mark.parametrize("reference_mask", [None, True])
def test_phase_cross_correlation_deprecation(return_error, reference_mask):
# For now, assert that phase_cross_correlation raises a warning that
# returning only shifts is deprecated. In skimage 0.21, this test should be
# updated for the deprecation of the return_error parameter.
should_warn = (
return_error is False
or (return_error != "always" and reference_mask is True)
)
reference_image = np.ones((10, 10))
moving_image = np.ones_like(reference_image)
if reference_mask is True:
# moving_mask defaults to reference_mask, passing moving_mask only is
# not supported, so we don't need to test it
reference_mask = np.ones_like(reference_image)
if should_warn:
msg = (
"In scikit-image 0.21, phase_cross_correlation will start "
"returning a tuple or 3 items (shift, error, phasediff) always. "
"To enable the new return behavior and silence this warning, use "
"return_error='always'."
)
with pytest.warns(FutureWarning, match=re.escape(msg)):
out = phase_cross_correlation(
reference_image=reference_image,
moving_image=moving_image,
return_error=return_error,
reference_mask=reference_mask,
)
assert not isinstance(out, tuple)
else:
with warnings.catch_warnings():
warnings.simplefilter("error")
out = phase_cross_correlation(
reference_image=reference_image,
moving_image=moving_image,
return_error=return_error,
reference_mask=reference_mask,
)
assert isinstance(out, tuple)
assert len(out) == 3
def test_mismatch_upsampled_region_size():
with pytest.raises(ValueError):
_upsampled_dft(
np.ones((4, 4)),
upsampled_region_size=[3, 2, 1, 4])
def test_mismatch_offsets_size():
with pytest.raises(ValueError):
_upsampled_dft(np.ones((4, 4)), 3,
axis_offsets=[3, 2, 1, 4])
@pytest.mark.parametrize(
('shift0', 'shift1'),
itertools.product((100, -100, 350, -350), (100, -100, 350, -350)),
)
def test_disambiguate_2d(shift0, shift1):
image = eagle()[500:, 900:] # use a highly textured part of image
shift = (shift0, shift1)
origin0 = []
for s in shift:
if s > 0:
origin0.append(0)
else:
origin0.append(-s)
origin1 = np.array(origin0) + shift
slice0 = tuple(slice(o, o+450) for o in origin0)
slice1 = tuple(slice(o, o+450) for o in origin1)
reference = image[slice0]
moving = image[slice1]
computed_shift, _, _ = phase_cross_correlation(
reference, moving, disambiguate=True, return_error='always'
)
np.testing.assert_equal(shift, computed_shift)
def test_disambiguate_zero_shift():
"""When the shift is 0, disambiguation becomes degenerate.
Some quadrants become size 0, which prevents computation of
cross-correlation. This test ensures that nothing bad happens in that
scenario.
"""
image = camera()
computed_shift, _, _ = phase_cross_correlation(
image, image, disambiguate=True, return_error='always'
)
assert computed_shift == (0, 0)

View File

@@ -0,0 +1,117 @@
import numpy as np
import pytest
from skimage._shared.utils import _supported_float_type
from skimage.registration import optical_flow_tvl1
from skimage.transform import warp
def _sin_flow_gen(image0, max_motion=4.5, npics=5):
"""Generate a synthetic ground truth optical flow with a sinusoid as
first component.
Parameters:
----
image0: ndarray
The base image to be warped.
max_motion: float
Maximum flow magnitude.
npics: int
Number of sinusoid pics.
Returns
-------
flow, image1 : ndarray
The synthetic ground truth optical flow with a sinusoid as
first component and the corresponding warped image.
"""
grid = np.meshgrid(*[np.arange(n) for n in image0.shape], indexing='ij')
grid = np.stack(grid)
gt_flow = np.zeros_like(grid, dtype=float)
gt_flow[0, ...] = max_motion * np.sin(grid[0]/grid[0].max()*npics*np.pi)
image1 = warp(image0, grid - gt_flow, mode='edge')
return gt_flow, image1
@pytest.mark.parametrize('dtype', [np.float16, np.float32, np.float64])
def test_2d_motion(dtype):
# Generate synthetic data
rnd = np.random.default_rng(0)
image0 = rnd.normal(size=(256, 256))
gt_flow, image1 = _sin_flow_gen(image0)
image1 = image1.astype(dtype, copy=False)
float_dtype = _supported_float_type(dtype)
# Estimate the flow
flow = optical_flow_tvl1(image0, image1, attachment=5, dtype=float_dtype)
assert flow.dtype == float_dtype
# Assert that the average absolute error is less then half a pixel
assert abs(flow - gt_flow) .mean() < 0.5
if dtype != float_dtype:
with pytest.raises(ValueError):
optical_flow_tvl1(image0, image1, attachment=5, dtype=dtype)
def test_3d_motion():
# Generate synthetic data
rnd = np.random.default_rng(0)
image0 = rnd.normal(size=(100, 100, 100))
gt_flow, image1 = _sin_flow_gen(image0)
# Estimate the flow
flow = optical_flow_tvl1(image0, image1, attachment=10)
# Assert that the average absolute error is less then half a pixel
assert abs(flow - gt_flow) .mean() < 0.5
def test_no_motion_2d():
rnd = np.random.default_rng(0)
img = rnd.normal(size=(256, 256))
flow = optical_flow_tvl1(img, img)
assert np.all(flow == 0)
def test_no_motion_3d():
rnd = np.random.default_rng(0)
img = rnd.normal(size=(64, 64, 64))
flow = optical_flow_tvl1(img, img)
assert np.all(flow == 0)
def test_optical_flow_dtype():
# Generate synthetic data
rnd = np.random.default_rng(0)
image0 = rnd.normal(size=(256, 256))
gt_flow, image1 = _sin_flow_gen(image0)
# Estimate the flow at double precision
flow_f64 = optical_flow_tvl1(image0, image1, attachment=5, dtype=np.float64)
assert flow_f64.dtype == np.float64
# Estimate the flow at single precision
flow_f32 = optical_flow_tvl1(image0, image1, attachment=5, dtype=np.float32)
assert flow_f32.dtype == np.float32
# Assert that floating point precision does not affect the quality
# of the estimated flow
assert np.abs(flow_f64 - flow_f32).mean() < 1e-3
def test_incompatible_shapes():
rnd = np.random.default_rng(0)
I0 = rnd.normal(size=(256, 256))
I1 = rnd.normal(size=(128, 256))
with pytest.raises(ValueError):
u, v = optical_flow_tvl1(I0, I1)
def test_wrong_dtype():
rnd = np.random.default_rng(0)
img = rnd.normal(size=(256, 256))
with pytest.raises(ValueError):
u, v = optical_flow_tvl1(img, img, dtype=np.int64)