"""
Convert echograms into Pytorch dataset.
Tools for converting a dataset of echograms (transects) into a Pytorch dataset
and sampling from it.
"""
# This file is part of Echofilter.
#
# Copyright (C) 2020-2022 Scott C. Lowe and Offshore Energy Research Association (OERA)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import os
import random
import warnings
import numpy as np
import torch.utils.data
from ..raw import shardloader
from . import utils
[docs]class TransectDataset(torch.utils.data.Dataset):
"""
Load a collection of transects as a PyTorch dataset.
Parameters
----------
transect_paths : list
Absolute paths to transects.
window_len : int
Width (number of timestamps) to load. Default is ``128``.
p_scale_window : float, optional
Probability of rescaling window. Default is ``0``, which results in no
randomization of the window widths.
window_sf : float, optional
Maximum window scale factor. Scale factors will be log-uniformly
sampled in the range ``1/window_sf`` to ``window_sf``. Default is ``2``.
num_windows_per_transect : int
Number of windows to extract for each transect. Start indices for
the windows will be equally spaced across the total width of the
transect. If this is ``0``, the number of windows will be inferred
automatically based on ``window_len`` and the total width of the
transect, resulting in a different number of windows for each
transect. Default is ``0``.
use_dynamic_offsets : bool
Whether starting indices for each window should be randomly offset.
Set to ``True`` for training and ``False`` for testing. Default is
``True``.
crop_depth : float
Maximum depth to include, in metres. Deeper data will be cropped
away. Default is ``None``.
transform : callable
Operations to perform to the dictionary containing a single sample.
These are performed before generating the turbulence/bottom/overall
mask. Default is ``None``.
remove_nearfield : bool, optional
Whether to remove turbulence and bottom lines affected by nearfield
removal. If ``True`` (default), targets for the line near to the
sounder (bottom if upward facing, turbulence otherwise) which are
closer than or equal to a distance of ``nearfield_distance`` become
reduced to ``nearfield_visible_dist``.
nearfield_distance : float, optional
Nearfield distance in metres. Regions closer than the nearfield
may have been masked out from the dataset, but their effect will
be removed from the targets if ``remove_nearfield=True``.
Default is ``1.7``.
nearfield_visible_dist : float, optional
The distance at which the effect of being to close to the sounder
is obvious to the naked eye, and hence the distance which nearfield
will be mapped to if ``remove_nearfield=True``. Default is ``0.0``.
remove_offset_turbulence : float, optional
Line offset built in to the turbulence line. If given, this will be
removed from the samples within the dataset. Default is ``0``.
remove_offset_bottom : float, optional
Line offset built in to the bottom line. If given, this will be
removed from the samples within the dataset. Default is ``0``.
"""
def __init__(
self,
transect_paths,
window_len=128,
p_scale_window=0,
window_sf=2,
num_windows_per_transect=0,
use_dynamic_offsets=True,
crop_depth=None,
transform=None,
remove_nearfield=True,
nearfield_distance=1.7,
nearfield_visible_dist=0.0,
remove_offset_turbulence=0,
remove_offset_bottom=0,
):
super(TransectDataset, self).__init__()
self.transect_paths = transect_paths
self.window_len = window_len
self.p_scale_window = p_scale_window
self.max_window_sf = window_sf if p_scale_window else 1
self.num_windows = num_windows_per_transect
self.use_dynamic_offsets = use_dynamic_offsets
self.crop_depth = crop_depth
self.transform = transform
self.remove_nearfield = remove_nearfield
self.nearfield_distance = nearfield_distance
self.nearfield_visible_dist = nearfield_visible_dist
self.remove_offset_turbulence = remove_offset_turbulence
self.remove_offset_bottom = remove_offset_bottom
self.initialise_datapoints()
[docs] def initialise_datapoints(self):
"""
Parse ``transect_paths`` to generate sampling windows for each transect.
Manually calling this method will resample the transect offsets and
widths if they were randomly generated.
"""
self.datapoints = []
for transect_path in self.transect_paths:
# Check how many segments the transect was divided into
segments_meta_fname = os.path.join(transect_path, "n_segment.txt")
if not os.path.isfile(segments_meta_fname):
# Silently skip missing transects
continue
with open(segments_meta_fname, "r") as f:
n_segment = int(f.readline().strip())
# For each segment, specify some samples over its duration
for i_segment in range(n_segment):
seg_path = os.path.join(transect_path, str(i_segment))
# Lookup the number of rows in the transect
# Load the sharding metadata
with open(os.path.join(seg_path, "shard_size.txt"), "r") as f:
n_timestamps, shard_len = f.readline().strip().split(",")
n_timestamps = int(n_timestamps)
# Generate an array for window centers within the transect
# - if this is for training, we want to randomise the offsets
# - if this is for validation, we want stable windows
num_windows = self.num_windows
if self.num_windows is None or self.num_windows == 0:
# Load enough windows to include all datapoints
num_windows = int(np.ceil(n_timestamps / self.window_len))
centers = np.linspace(0, n_timestamps, num_windows + 1)[:num_windows]
if len(centers) > 1:
max_dy_offset = centers[1] - centers[0]
else:
max_dy_offset = n_timestamps
if self.use_dynamic_offsets:
centers += random.random() * max_dy_offset
else:
centers += max_dy_offset / 2
centers = np.round(centers)
# Generate a width for each window, and add each
# (transect, center, width) to the list for this epoch
for center_idx in centers:
cur_win_len = self.window_len
if self.p_scale_window and random.random() < self.p_scale_window:
sf = np.exp(np.log(self.max_window_sf) * random.uniform(-1, 1))
cur_win_len *= sf
cur_win_len = int(np.round(cur_win_len))
self.datapoints.append((seg_path, int(center_idx), cur_win_len))
def __getitem__(self, index):
transect_pth, center_idx, win_len = self.datapoints[index]
# Load data from shards
sample = shardloader.load_transect_from_shards_abs(
transect_pth,
center_idx - int(win_len / 2),
center_idx - int(win_len / 2) + win_len,
pad_mode="reflect",
)
# Fixup sample
sample = fixup_dataset_sample(
sample,
remove_nearfield=self.remove_nearfield,
nearfield_distance=self.nearfield_distance,
nearfield_visible_dist=self.nearfield_visible_dist,
remove_offset_turbulence=self.remove_offset_turbulence,
remove_offset_bottom=self.remove_offset_bottom,
crop_depth=self.crop_depth,
transform=self.transform,
)
# Rename keys for depth of lines
sample["d_turbulence"] = sample.pop("turbulence")
sample["d_bottom"] = sample.pop("bottom")
sample["d_surface"] = sample.pop("surface")
sample["d_turbulence-original"] = sample.pop("turbulence-original")
sample["d_bottom-original"] = sample.pop("bottom-original")
sample["signals"] = sample.pop("Sv")
if sample["depths"][-1] < sample["depths"][0]:
# Found some upward-facing data that needs to be reflected
for k in ["depths", "signals", "mask"]:
sample[k] = np.flip(sample[k], -1).copy()
# Convert mask patches into floating point arrays
for suffix in ("", "-original", "-ntob"):
sample["mask_patches" + suffix] = sample["mask_patches" + suffix].astype(
np.float32
)
sample["mask"] = sample["mask"].astype(np.float32)
# Add relative depth for lines
depth_range = abs(sample["depths"][-1] - sample["depths"][0])
for key in [
"d_turbulence",
"d_bottom",
"d_surface",
"d_turbulence-original",
"d_bottom-original",
]:
sample["r" + key[1:]] = sample[key] / depth_range
# Ensure everything is float32 datatype
for key in sample:
sample[key] = sample[key].astype(np.float32)
# Ensure no entries are invalid numbers
nan_count = np.sum(np.isnan(sample[key]))
if nan_count > 0:
print("WARNING: NaN present in {}".format(key))
sample[key][np.isnan(sample[key])] = 0
inf_count = np.sum(np.isinf(sample[key]))
if inf_count > 0:
print("WARNING: inf present in {}".format(key))
sample[key][np.isinf(sample[key])] = 0
inf_count = np.sum(~np.isfinite(sample[key]))
if inf_count > 0:
print("WARNING: non-finite numbers present in {}".format(key))
sample[key][~np.isfinite(sample[key])] = 0
input = np.expand_dims(sample["signals"], 0)
return input, sample
def __len__(self):
return len(self.datapoints)
[docs]class ConcatDataset(torch.utils.data.ConcatDataset):
"""
Dataset as a concatenation of multiple TransectDatasets.
This class is useful to assemble different existing datasets.
Parameters
----------
datasets : sequence
List of datasets to be concatenated.
Notes
-----
A subclass of :class:`torch.utils.data.ConcatDataset` which supports the
``initialise_datapoints`` method.
"""
[docs] def initialise_datapoints(self):
for dataset in self.datasets:
dataset.initialise_datapoints()
[docs]class StratifiedRandomSampler(torch.utils.data.Sampler):
"""
Sample elements randomly without repetition, stratified across datasets.
Parameters
----------
data_source : torch.utils.data.ConcatDataset
Dataset to sample from. Must possess a ``cumulative_sizes`` attribute.
"""
def __init__(self, data_source):
self.data_source = data_source
@property
def num_samples(self):
# dataset size might change at runtime
return len(self.data_source)
def __iter__(self):
n_sample = len(self.data_source)
n_dataset = len(self.data_source.cumulative_sizes)
perms = []
lower = 0
for upper in self.data_source.cumulative_sizes:
p = list(range(lower, upper))
random.shuffle(p)
perms.append(p)
lower = upper
dataset_sizes = np.array([len(p) for p in perms])
target_fraction = dataset_sizes / n_sample
sub_tallies = np.zeros(n_dataset, dtype=int)
cur_fraction = np.zeros(n_dataset)
indices = []
while len(indices) < n_sample:
i = np.argmax(target_fraction - cur_fraction)
indices.append(perms[i].pop())
sub_tallies[i] += 1
cur_fraction = sub_tallies / len(indices)
return iter(indices)
def __len__(self):
return self.num_samples
[docs]def fixup_dataset_sample(
sample,
remove_nearfield=True,
nearfield_distance=1.7,
nearfield_visible_dist=0.0,
remove_offset_turbulence=0.0,
remove_offset_bottom=0.0,
crop_depth=None,
transform=None,
):
"""
Handle a dataset transect sample.
Parameters
----------
sample : dict
Transect dictionary.
remove_nearfield : bool, default=True
Whether to remove turbulence and bottom lines affected by nearfield
removal. If ``True`` (default), targets for the line near to the
sounder (bottom if upward facing, turbulence otherwise) which are
closer than or equal to a distance of ``nearfield_distance`` become
reduced to ``nearfield_visible_dist``.
nearfield_distance : float, default=1.7
Nearfield distance in metres. Regions closer than the nearfield
may have been masked out from the dataset, but their effect will
be removed from the targets if ``remove_nearfield=True``.
nearfield_visible_dist : float, default=0
The distance at which the effect of being to close to the sounder
is obvious to the naked eye, and hence the distance which nearfield
will be mapped to if ``remove_nearfield=True``.
remove_offset_turbulence : float, default=0
Line offset built in to the turbulence line. If given, this will be
removed from the samples within the dataset.
remove_offset_bottom : float, default=0
Line offset built in to the bottom line. If given, this will be
removed from the samples within the dataset.
crop_depth : float
Maximum depth to include, in metres. Deeper data will be cropped
away. Default is ``None``.
transform : callable, optional
Operations to perform to the dictionary containing a single sample.
These are performed before generating the turbulence/bottom/overall
mask.
Returns
-------
dict
Like ``sample``, but contents fixed.
"""
# Rename Sv to signals if necessary
if "signals" not in sample and "Sv" in sample:
sample["signals"] = sample.pop("Sv")
if sample["depths"][-1] < sample["depths"][0]:
# Found some upward-facing data that needs to be reflected
for k in ["depths", "signals", "mask"]:
sample[k] = np.flip(sample[k], -1).copy()
sample["depths"] = sample["depths"].astype(np.float32)
# Fix any broken surface lines (these were generated automatically
# by Echoview and not adjusted by human annotator, so are not
# guaranteed to be sane).
# Note any locations where the labels are not sane. These can be
# removed from the training loss.
sample["is_bad_labels"] = sample["surface"] >= sample["bottom"]
# Surface line can not be below turbulence line
sample["surface"] = np.minimum(sample["surface"], sample["turbulence"])
# Ensure the bottom line is always below the surface line as well
sample["bottom"] = np.maximum(sample["bottom"], sample["surface"] + 0.02)
if sample["is_upward_facing"]:
min_top_depth = np.min(sample["depths"])
max_bot_depth = np.max(sample["depths"]) - nearfield_visible_dist
else:
min_top_depth = 0.966
max_bot_depth = np.max(sample["depths"])
if remove_nearfield:
depth_intv = np.abs(sample["depths"][-1] - sample["depths"][-2])
if sample["is_upward_facing"]:
nearfield_threshold = (
np.max(sample["depths"]) - depth_intv * 2.5 - nearfield_distance
)
was_in_nearfield = sample["bottom"] >= nearfield_threshold
sample["bottom"][was_in_nearfield] = max_bot_depth
was_in_nearfield_og = sample["bottom-original"] >= nearfield_threshold
sample["bottom-original"][was_in_nearfield_og] = max_bot_depth
# Extend/contract mask_patches where necessary
idx_search = utils.last_nonzero(sample["depths"] < nearfield_threshold)
idx_fillto = utils.first_nonzero(sample["depths"] > max_bot_depth)
is_close_patch = np.any(
sample["mask_patches"][:, idx_search:idx_fillto], -1
)
sample["mask_patches"][is_close_patch, idx_search:idx_fillto] = 1
is_close_patch_og = sample["mask_patches-original"][:, idx_search] > 0
is_close_patch_og = np.expand_dims(is_close_patch_og, -1)
sample["mask_patches-original"][:, idx_search:] = is_close_patch_og
sample["mask_patches-ntob"][:, idx_search:] = is_close_patch_og
else:
was_in_nearfield = sample["turbulence"] < nearfield_distance + depth_intv
sample["turbulence"][was_in_nearfield] = min_top_depth
was_in_nearfield_og = np.zeros_like(sample["is_removed"], dtype="bool")
# Extend/contract mask_patches where necessary
idx_search = utils.first_nonzero(
sample["depths"] > nearfield_distance, invalid_val=0
)
idx_fillfr = utils.last_nonzero(
sample["depths"] < min_top_depth, invalid_val=-1
)
is_close_patch = np.any(
sample["mask_patches"][:, idx_fillfr + 1 : idx_search + 1], -1
)
sample["mask_patches"][is_close_patch, idx_fillfr + 1 : idx_search] = 1
is_close_patch = np.any(
sample["mask_patches-ntob"][:, idx_fillfr + 1 : idx_search + 1], -1
)
sample["mask_patches-ntob"][is_close_patch, idx_fillfr + 1 : idx_search] = 1
is_close_patch_og = sample["mask_patches-original"][:, idx_search] > 0
is_close_patch_og = np.expand_dims(is_close_patch_og, -1)
sample["mask_patches-original"][:, :idx_search] = is_close_patch_og
else:
was_in_nearfield = np.zeros_like(sample["is_removed"], dtype="bool")
was_in_nearfield_og = np.zeros_like(sample["is_removed"], dtype="bool")
if remove_offset_turbulence:
# Check the mask beforehand
_ddepths = np.broadcast_to(sample["depths"], sample["signals"].shape)
_in_mask = _ddepths < np.expand_dims(sample["turbulence"], -1)
_in_mask_og = _ddepths < np.expand_dims(sample["turbulence-original"], -1)
# Shift lines up higher (less deep)
sample["turbulence"][
(~was_in_nearfield) | sample["is_upward_facing"]
] -= remove_offset_turbulence
sample["turbulence-original"][
(~was_in_nearfield_og) | sample["is_upward_facing"]
] -= remove_offset_turbulence
# Extend mask_patches where necessary
_fx_mask = _ddepths < np.expand_dims(sample["turbulence"], -1)
_df_mask = _in_mask * (_in_mask ^ _fx_mask)
is_close_patch = np.any(_df_mask[:, :-1] * sample["mask_patches"][:, 1:], -1)
sample["mask_patches"][is_close_patch, :] += _df_mask[is_close_patch, :]
# ... and extend ntob mask patches too
sample["mask_patches-ntob"][is_close_patch, :] += _df_mask[is_close_patch, :]
# ... and extend og mask patches too
_fx_mask_og = _ddepths < np.expand_dims(sample["turbulence-original"], -1)
_df_mask = _in_mask_og * (_in_mask_og ^ _fx_mask_og)
is_close_patch = np.any(
_df_mask[:, :-1] * sample["mask_patches-original"][:, 1:], -1
)
sample["mask_patches-original"][is_close_patch, :] += _df_mask[
is_close_patch, :
]
if remove_offset_bottom:
# Check the mask beforehand
_ddepths = np.broadcast_to(sample["depths"], sample["signals"].shape)
_in_mask = _ddepths > np.expand_dims(sample["bottom"], -1)
_in_mask_og = _ddepths > np.expand_dims(sample["bottom-original"], -1)
# Shift lines down lower (more deep)
sample["bottom"][
(~was_in_nearfield) | ~sample["is_upward_facing"]
] += remove_offset_bottom
sample["bottom-original"][
(~was_in_nearfield_og) | sample["is_upward_facing"]
] += remove_offset_bottom
# Extend mask_patches where necessary
_fx_mask = _ddepths > np.expand_dims(sample["bottom"], -1)
_df_mask = _in_mask * (_in_mask ^ _fx_mask)
is_close_patch = np.any(_df_mask[:, 1:] * sample["mask_patches"][:, :-1], -1)
sample["mask_patches"][is_close_patch, :] += _df_mask[is_close_patch, :]
# ... and extend og mask patches too
_fx_mask_og = _ddepths > np.expand_dims(sample["bottom-original"], -1)
_df_mask = _in_mask_og * (_in_mask_og ^ _fx_mask_og)
is_close_patch = np.any(
_df_mask[:, 1:] * sample["mask_patches-original"][:, :-1], -1
)
sample["mask_patches-original"][is_close_patch, :] += _df_mask[
is_close_patch, :
]
# ... and extend ntob mask patches too
sample["mask_patches-ntob"][is_close_patch, :] += _df_mask[is_close_patch, :]
if remove_offset_turbulence or remove_offset_bottom:
# Change any 2s in the mask to be 1s
for suffix in ("", "-original", "-ntob"):
sample["mask_patches" + suffix] = sample["mask_patches" + suffix] > 0.5
# Ensure all line values are finite
for key in [
"surface",
"turbulence",
"turbulence-original",
"bottom",
"bottom-original",
]:
where_invalid = ~np.isfinite(sample[key])
if np.sum(where_invalid) > 0 and np.sum(where_invalid) < len(sample[key]):
sample[key][where_invalid] = np.interp(
sample["timestamps"][where_invalid],
sample["timestamps"][~where_invalid],
sample[key][~where_invalid],
)
# Apply depth crop
if crop_depth is not None:
depth_crop_mask = sample["depths"] <= crop_depth
sample["depths"] = sample["depths"][depth_crop_mask]
sample["signals"] = sample["signals"][:, depth_crop_mask]
if transform is not None:
sample = transform(sample)
# Convert lines to masks
ddepths = np.broadcast_to(sample["depths"], sample["signals"].shape)
for suffix in ["", "-original"]:
sample["mask_turbulence" + suffix] = ddepths < np.expand_dims(
sample["turbulence" + suffix], -1
)
sample["mask_bottom" + suffix] = ddepths > np.expand_dims(
sample["bottom" + suffix], -1
)
sample["mask_surface"] = ddepths < np.expand_dims(sample["surface"], -1)
# Ensure is_surrogate_surface excludes passive data
sample["is_surrogate_surface"][sample["is_passive"] > 0.5] = 0
# Create mask corresponding to the aggregate of all elements we need
# masked in/out
sample["mask"] = np.ones_like(sample["signals"], dtype="bool")
sample["mask"][sample["is_passive"] > 0.5] = 0
sample["mask"][sample["is_removed"] > 0.5] = 0
sample["mask"][sample["mask_turbulence"] > 0.5] = 0
sample["mask"][sample["mask_bottom"] > 0.5] = 0
sample["mask"][sample["mask_patches"] > 0.5] = 0
# Determine the boundary index for depths
for sfx in {"turbulence", "turbulence-original", "surface"}:
# Ties are broken to the smaller index
sample["index_" + sfx] = np.searchsorted(
sample["depths"], sample[sfx], side="left"
)
# It is possible for the turbulence line to be above the field of
# view, and impossible for the turbulence line to below
sample["index_" + sfx] = np.maximum(
0, np.minimum(len(sample["depths"]) - 1, sample["index_" + sfx])
)
for sfx in {"bottom", "bottom-original"}:
# Ties are broken to the larger index
sample["index_" + sfx] = np.searchsorted(
sample["depths"], sample[sfx], side="right"
)
# It is possible for the bottom line to be below the field of view,
# and impossible for the bottom line to above
sample["index_" + sfx] -= 1
sample["index_" + sfx] = np.maximum(
0, np.minimum(len(sample["depths"]) - 1, sample["index_" + sfx])
)
return sample