"""
Loader utility functions.
"""
# This file is part of Echofilter.
#
# Copyright (C) 2020-2022 Scott C. Lowe and Offshore Energy Research Association (OERA)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import numpy as np
import scipy.interpolate
from ..utils import get_indicator_onoffsets, mode
[docs]def interp1d_preserve_nan(
x, y, x_samples, nan_threshold=0.0, bounds_error=False, **kwargs
):
"""
Interpolate a 1-D function, preserving NaNs.
Inputs ``x`` and ``y`` are arrays of values used to approximate some function f:
``y = f(x)``. We exclude NaNs for the interpolation and then mask out
entries which are adjacent (or close to) a NaN in the input.
Parameters
----------
x : (N,) array_like
A 1-D array of real values. Must not contain NaNs.
y : (...,N,...) array_like
A N-D array of real values. The length of ``y`` along the interpolation
axis must be equal to the length of ``x``. May contain NaNs.
x_samples : array_like
A 1-D array of real values at which the interpolation function will
be sampled.
nan_threshold : float, optional
Minimum amount of influence a NaN must have on an output sample for it
to become a NaN. Default is ``0.`` i.e. any influence.
bounds_error : bool, optional
If ``True``, a ValueError is raised any time interpolation is attempted
on a value outside of the range of ``x`` (where extrapolation is
necessary). If ``False`` (default), out of bounds values are assigned
value ``fill_value`` (whose default is NaN).
**kwargs
Additional keyword arguments are as per :meth:`scipy.interpolate.interp1d`.
Returns
-------
y_samples : (...,N,...) np.ndarray
The result of interpolating, with sample points close to NaNs in the
input returned as NaN.
"""
# First, run with NaNs masked out.
is_nan = np.isnan(y)
if np.sum(~is_nan) < 2 and np.ndim(y) == 1:
y_samples = np.empty(x_samples.shape, dtype=y.dtype)
y_samples[:] = np.nan
return y_samples
y_samples = scipy.interpolate.interp1d(
x[~is_nan], y[~is_nan], bounds_error=bounds_error, **kwargs
)(x_samples)
if np.sum(is_nan) == 0:
# Shortcut if there are no NaNs
return y_samples
# Then find the points close to NaNs
influence = scipy.interpolate.interp1d(
x, is_nan, bounds_error=bounds_error, **kwargs
)(x_samples)
# and remove the points too close to a NaN in the input
y_samples[influence > nan_threshold] = np.nan
return y_samples
[docs]def pad1d(array, pad_width, axis=0, **kwargs):
"""
Pad an array along a single axis only.
Parameters
----------
array : numpy.ndarary
Array to be padded.
pad_width : int or tuple
The amount to pad, either a length two tuple of values for each edge,
or an int if the padding should be the same for each side.
axis : int, optional
The axis to pad. Default is ``0``.
**kwargs
As per :meth:`numpy.pad`.
Returns
-------
numpy.ndarary
Padded array.
See Also
--------
numpy.pad
"""
pads = [(0, 0) for _ in range(array.ndim)]
if hasattr(pad_width, "__len__"):
pads[axis] = pad_width
else:
pads[axis] = (pad_width, pad_width)
return np.pad(array, pads, **kwargs)
[docs]def medfilt1d(signal, kernel_size, axis=-1, pad_mode="reflect"):
"""
Median filter in 1d, with support for selecting padding mode.
Parameters
----------
signal : array_like
The signal to filter.
kernel_size
Size of the median kernel to use.
axis : int, optional
Which axis to operate along. Default is ``-1``.
pad_mode : str, optional
Method with which to pad the vector at the edges.
Must be supported by :meth:`numpy.pad`. Default is ``"reflect"``.
Returns
-------
filtered : array_like
The filtered signal.
See Also
--------
scipy.signal.medfilt
pad1d
"""
offset = kernel_size // 2
signal = pad1d(signal, offset, axis=axis, mode=pad_mode)
filtered = scipy.signal.medfilt(signal, kernel_size)[offset:-offset]
return filtered
[docs]def squash_gaps(mask, max_gap_squash, axis=-1, inplace=False):
"""
Merge small gaps between zero values in a boolean array.
Parameters
----------
mask : boolean array
The input mask, with small gaps between zero values which will be
squashed with zeros.
max_gap_squash : int
Maximum length of gap to squash.
axis : int, optional
Axis on which to operate. Default is ``-1``.
inplace : bool, optional
Whether to operate on the original array. If ``False``, a copy is
created and returned.
Returns
-------
merged_mask : boolean array
Mask as per the input, but with small gaps squashed.
"""
if not inplace:
mask = mask.copy()
L = mask.shape[axis]
for i in range(min(max_gap_squash, L - 1), 1, -1):
check = np.stack(
[
pad1d(
mask.take(range(i // 2, L), axis=axis),
(0, i // 2),
axis=axis,
mode="constant",
),
pad1d(
mask.take(range(0, L - ((i + 1) // 2)), axis=axis),
((i + 1) // 2, 0),
axis=axis,
mode="constant",
),
]
)
li = ~np.any(check, axis=0)
mask[li] = 0
return mask
[docs]def integrate_area_of_contour(x, y, closed=None, preserve_sign=False):
"""
Compute the area within a contour, using Green's algorithm.
Parameters
----------
x : array_like vector
x co-ordinates of nodes along the contour.
y : array_like vector
y co-ordinates of nodes along the contour.
closed : bool or None, optional
Whether the contour is already closed. If ``False``, it will be closed
before deterimining the area. If ``None`` (default), it is automatically
determined as to whether the contour is already closed, and is closed
if necessary.
preserve_sign : bool, optional
Whether to preserve the sign of the area. If ``True``, the area is
positive if the contour is anti-clockwise and negative if it is
clockwise oriented. Default is ``False``, which always returns a positive
area.
Returns
-------
area : float
The integral of the area witihn the contour.
Notes
-----
https://en.wikipedia.org/wiki/Green%27s_theorem#Area_calculation
"""
if closed is None:
closed = x[0] == x[-1] and y[0] == y[-1]
if not closed:
x = np.concatenate([x, x[[0]]])
y = np.concatenate([y, y[[0]]])
# Integrate to find the area
A = 0.5 * np.sum(y[:-1] * np.diff(x) - x[:-1] * np.diff(y))
# Take the abs in case the curve was clockwise instead of anti-clockwise
A = np.abs(A)
return A
[docs]def fillholes2d(arr, nan_thr=2, interp_method="linear", inplace=False):
"""
Interpolate to replace NaN values in 2d gridded array data.
Parameters
----------
arr : 2d numpy.ndarray
Array in 2d which, may contain NaNs.
nan_thr : int, default=2
Minimum number of NaN values needed in a row/column for it
to be included in the (rectangular) area where NaNs are fixed.
interp_method : str, default="linear"
Interpolation method.
inplace : bool, default=False
Whether to update arr instead of a copy.
Returns
-------
arr : 2d numpy.ndarray
Like input ``arr``, but with NaN values replaced with
interpolated values.
"""
# Find where the nans are
isna = np.isnan(arr[1:-1])
x_extent = np.nonzero(isna.sum(1) >= nan_thr)[0]
y_extent = np.nonzero(isna.sum(0) >= nan_thr)[0]
if len(x_extent) < 1 and len(y_extent) < 1:
# Nothing to do
return arr
if len(x_extent) < 1:
# There is y-axis things to do, so be more lenient with x
x_extent = np.nonzero(isna.any(1))[0]
if len(y_extent) < 1:
# There is x-axis things to do, so be more lenient with y
y_extent = np.nonzero(isna.any(0))[0]
if len(x_extent) < 1 or len(y_extent) < 1:
# Nothing to do
return arr
x_extent = x_extent[[0, -1]] + 1
y_extent = y_extent[[0, -1]] + 1
# We will extract an area bigger than the space covering all NaNs
x_extent += [-4, 4]
y_extent += [-4, 4]
x_extent = np.minimum(arr.shape[0] - 1, np.maximum(0, x_extent))
y_extent = np.minimum(arr.shape[1] - 1, np.maximum(0, y_extent))
# Hanndle indexing the array
z_source = arr[x_extent[0] : x_extent[1] + 1, y_extent[0] : y_extent[1] + 1]
x_source = np.arange(x_extent[0], x_extent[1] + 1)
y_source = np.arange(y_extent[0], y_extent[1] + 1)
xx, yy = np.meshgrid(x_source, y_source)
xx = xx.T
yy = yy.T
# Find all the NaNs. This tells us where to update and the inverse tells us
# where to take training values for the interpolation
is_missing = np.isnan(z_source)
in_coords = np.stack([xx[~is_missing], yy[~is_missing]], -1)
out_coords = np.stack([xx[is_missing], yy[is_missing]], -1)
# Perform 2d interpolation
z_out = scipy.interpolate.griddata(
in_coords,
z_source[~is_missing],
out_coords,
method=interp_method,
)
# Convert 2d coordinates into linear indices for put
out_indices = np.ravel_multi_index((out_coords[:, 0], out_coords[:, 1]), arr.shape)
# Insert the interpolated values at their locations
if not inplace:
arr = arr.copy()
np.put(arr, out_indices, z_out)
return arr