"""
Converting raw data into shards, and loading data from shards.
"""
import os
import numpy as np
from . import loader, manipulate
from .utils import pad1d
ROOT_DATA_DIR = loader.ROOT_DATA_DIR
[docs]def segment_and_shard_transect(
transect_pth,
dataset="mobile",
max_depth=None,
shard_len=128,
root_data_dir=ROOT_DATA_DIR,
):
"""
Creates a sharded copy of a transect, with the transect cut into segments
based on recording starts/stops. Each segment is split across multiple
files (shards) for efficient loading.
Parameters
----------
transect_pth : str
Relative path to transect, excluding `"_Sv_raw.csv"`.
dataset : str, optional
Name of dataset. Default is `"mobile"`.
max_depth : float or None, optional
The maximum depth to include in the saved shard. Data corresponding
to deeper locations is omitted to save on load time and memory when
the shard is loaded. If `None`, no cropping is applied.
Default is `None`.
shard_len : int, optional
Number of timestamp samples to include in each shard. Default is `128`.
root_data_dir : str
Path to root directory where data is located.
Notes
-----
The segments will be written to the directories
`<root_data_dir>_sharded/<dataset>/transect_path/<segment>/`
For the contents of each directory, see `write_transect_shards`.
"""
# Define output destination
root_data_dir = loader.remove_trailing_slash(root_data_dir)
root_shard_dir = os.path.join(root_data_dir + "_sharded", dataset)
# Load the data, with mask decomposed into turbulence, bottom, passive,
# and removed regions.
transect = manipulate.load_decomposed_transect_mask(
os.path.join(root_data_dir, dataset, transect_pth)
)
segments = manipulate.split_transect(**transect)
for i_segment, segment in enumerate(segments):
dirname = os.path.join(root_shard_dir, transect_pth, str(i_segment))
write_transect_shards(
dirname, segment, max_depth=max_depth, shard_len=shard_len
)
n_segment = i_segment + 1
# Save segmentation metadata
with open(os.path.join(root_shard_dir, transect_pth, "n_segment.txt"), "w") as hf:
print(str(n_segment), file=hf)
[docs]def write_transect_shards(dirname, transect, max_depth=None, shard_len=128):
"""
Creates a sharded copy of a transect, with the transect cut by timestamp
and split across multiple files.
Parameters
----------
dirname : str
Path to output directory.
transect : dict
Observed values for the transect. Should already be segmented.
max_depth : float or None, optional
The maximum depth to include in the saved shard. Data corresponding
to deeper locations is omitted to save on load time and memory when
the shard is loaded. If `None`, no cropping is applied.
Default is `None`.
shard_len : int, optional
Number of timestamp samples to include in each shard. Default is `128`.
Notes
-----
The output will be written to the directory `dirname`, and will contain:
- a file named `"shard_size.txt"`, which contains the sharding metadata:
total number of samples, and shard size;
- a directory for each shard, named 0, 1, ...
Each shard directory will contain files:
- depths.npy
- timestamps.npy
- Sv.npy
- mask.npy
- turbulence.npy
- bottom.npy
- is_passive.npy
- is_removed.npy
- is_upward_facing.npy
which contain pickled numpy dumps of the matrices for each shard.
"""
# Remove depths which are too deep for us to care about
if max_depth is not None:
depth_mask = transect["depths"] <= max_depth
transect["depths"] = transect["depths"][depth_mask]
transect["Sv"] = transect["Sv"][:, depth_mask]
transect["mask"] = transect["mask"][:, depth_mask]
# Reduce floating point precision for some variables
for key in ("Sv", "turbulence", "bottom"):
transect[key] = np.half(transect[key])
# Ensure is_upward_facing is an array
transect["is_upward_facing"] = np.array(transect["is_upward_facing"])
# Prep output directory
os.makedirs(dirname, exist_ok=True)
# Save sharding metadata (total number of datapoints, shard size) to
# make loading from the shards easier
with open(os.path.join(dirname, "shard_size.txt"), "w") as hf:
print("{},{}".format(transect["Sv"].shape[0], shard_len), file=hf)
# Work out where to split the arrays
indices = range(shard_len, transect["Sv"].shape[0], shard_len)
# Split the transect into shards
n_shards = len(indices) + 1
shards = [{} for _ in range(n_shards)]
for key in transect:
if key in ("depths", "is_upward_facing") or not hasattr(
transect[key], "__len__"
):
for i_shards in range(n_shards):
shards[i_shards][key] = transect[key]
else:
for i_split, split in enumerate(np.split(transect[key], indices)):
shards[i_split][key] = split
for shard in shards:
if shard.keys() != shards[0].keys():
raise ValueError("Inconsistent split lengths")
# Save the data for each of the shards
for i_shard, shard in enumerate(shards):
fname = os.path.join(dirname, "{}.npz".format(i_shard))
np.savez_compressed(fname, **shard)
[docs]def load_transect_from_shards_abs(
transect_abs_pth,
i1=0,
i2=None,
pad_mode="edge",
):
"""
Load transect data from shard files.
Parameters
----------
transect_abs_pth : str
Absolute path to transect shard directory.
i1 : int, optional
Index of first sample to retrieve. Default is `0`, the first sample.
i2 : int, optional
Index of last sample to retrieve. As-per python convention, the range
`i1` to `i2` is inclusive on the left and exclusive on the right, so
datapoint `i2 - 1` is the right-most datapoint loaded. Default is
`None`, which loads everything up to and including to the last sample.
pad_mode : str, optional
Padding method for out-of-bounds inputs. Must be supported by
:meth:`numpy.pad`, such as `"contast"`, `"reflect"`, or `"edge"`. If the mode
is `"contast"`, the array will be padded with zeros. Default is "edge".
Returns
-------
dict
A dictionary with keys:
- "timestamps" : numpy.ndarray
Timestamps (in seconds since Unix epoch), for each recording
timepoint. The number of entries, `num_timestamps`, is equal
to `i2 - i1`.
- "depths" : numpy.ndarray
Depths from the surface (in metres), with each entry
corresponding to each column in the `signals` data.
- "Sv" : numpy.ndarray
Echogram Sv data, shaped (num_timestamps, num_depths).
- "mask" : numpy.ndarray
Logical array indicating which datapoints were kept (`True`)
and which removed (`False`) for the masked Sv output.
Shaped (num_timestamps, num_depths).
- "turbulence" : numpy.ndarray
For each timepoint, the depth of the shallowest datapoint which
should be included for the mask. Shaped (num_timestamps, ).
- "bottom" : numpy.ndarray
For each timepoint, the depth of the deepest datapoint which
should be included for the mask. Shaped (num_timestamps, ).
- "is_passive" : numpy.ndarray
Logical array showing whether a timepoint is of passive data.
Shaped (num_timestamps, ). All passive recording data should
be excluded by the mask.
- "is_removed" : numpy.ndarray
Logical array showing whether a timepoint is entirely removed
by the mask. Shaped (num_timestamps, ). Does not include
periods of passive recording.
- "is_upward_facing" : bool
Indicates whether the recording source is located at the
deepest depth (i.e. the seabed), facing upwards. Otherwise, the
recording source is at the shallowest depth (i.e. the surface),
facing downwards.
"""
# Load the sharding metadata
with open(os.path.join(transect_abs_pth, "shard_size.txt"), "r") as f:
n_timestamps, shard_len = f.readline().strip().split(",")
n_timestamps = int(n_timestamps)
shard_len = int(shard_len)
# Set the default value for i2
if i2 is None:
i2 = n_timestamps
# Sanity check
if i1 > n_timestamps:
raise ValueError(
"All requested datapoints out of range: {}, {} > {}".format(
i1, i2, n_timestamps
)
)
if i2 < 0:
raise ValueError(
"All requested datapoints out of range: {}, {} < {}".format(i1, i2, 0)
)
# Make indices safe
i1_ = max(0, i1)
i2_ = min(i2, n_timestamps)
# Work out which shards we'll need to load to get this data
j1 = max(0, int(i1 / shard_len))
j2 = int(min(i2, n_timestamps - 1) / shard_len)
transect = {}
shards = [
np.load(os.path.join(transect_abs_pth, str(j) + ".npz"), allow_pickle=True)
for j in range(j1, j2 + 1)
]
# Depths and is_upward_facing should all be the same. Only load one of
# each of them.
for key in shards[0].keys():
if key in ("depths", "is_upward_facing"):
transect[key] = shards[0][key]
else:
broad_data = np.concatenate([shard[key] for shard in shards])
# Have to trim data down, and pad if requested indices out of range
transect[key] = pad1d(
broad_data[(i1_ - j1 * shard_len) : (i2_ - j1 * shard_len)],
(i1_ - i1, i2 - i2_),
axis=0,
mode=pad_mode,
)
return transect
[docs]def load_transect_from_shards_rel(
transect_rel_pth,
i1=0,
i2=None,
dataset="mobile",
segment=0,
root_data_dir=ROOT_DATA_DIR,
**kwargs,
):
"""
Load transect data from shard files.
Parameters
----------
transect_rel_pth : str
Relative path to transect.
i1 : int, optional
Index of first sample to retrieve. Default is `0`, the first sample.
i2 : int, optional
Index of last sample to retrieve. As-per python convention, the range
`i1` to `i2` is inclusive on the left and exclusive on the right, so
datapoint `i2 - 1` is the right-most datapoint loaded. Default is
`None`, which loads everything up to and including to the last sample.
dataset : str, optional
Name of dataset. Default is `"mobile"`.
segment : int, optional
Which segment to load. Default is `0`.
root_data_dir : str
Path to root directory where data is located.
**kwargs
As per :meth:`load_transect_from_shards_abs`.
Returns
-------
dict
See :meth:`load_transect_from_shards_abs`.
"""
root_data_dir = loader.remove_trailing_slash(root_data_dir)
root_shard_dir = os.path.join(root_data_dir + "_sharded", dataset)
dirname = os.path.join(root_shard_dir, transect_rel_pth, str(segment))
return load_transect_from_shards_abs(
dirname,
i1=i1,
i2=i2,
**kwargs,
)
[docs]def load_transect_segments_from_shards_abs(
transect_abs_pth,
segments=None,
):
"""
Load transect data from shard files.
Parameters
----------
transect_abs_pth : str
Absolute path to transect shard segments directory.
segments : iterable or None
Which segments to load. If `None` (default), all segments are loaded.
Returns
-------
dict
See :meth:`load_transect_from_shards_abs`.
"""
if segments is None:
# Load the segmentation metadata
with open(os.path.join(transect_abs_pth, "n_segment.txt"), "r") as f:
n_segment = int(f.readline().strip())
segments = range(n_segment)
# Load each segment
transects = []
for segment in segments:
dirname = os.path.join(transect_abs_pth, str(segment))
transects.append(load_transect_from_shards_abs(dirname))
# Join the segments together
return manipulate.join_transect(transects)
[docs]def load_transect_segments_from_shards_rel(
transect_rel_pth,
dataset="mobile",
segments=None,
root_data_dir=ROOT_DATA_DIR,
):
"""
Load transect data from shard files.
Parameters
----------
transect_rel_pth : str
Relative path to transect.
dataset : str, optional
Name of dataset. Default is `"mobile"`.
segments : iterable or None
Which segments to load. If `None` (default), all segments are loaded.
root_data_dir : str
Path to root directory where data is located.
**kwargs
As per :meth:`load_transect_from_shards_abs`.
Returns
-------
dict
See :meth:`load_transect_from_shards_abs`.
"""
root_data_dir = loader.remove_trailing_slash(root_data_dir)
root_shard_dir = os.path.join(root_data_dir + "_sharded", dataset)
dirname = os.path.join(root_shard_dir, transect_rel_pth)
return load_transect_segments_from_shards_abs(dirname, segments=segments)
# Backwards compatibility
shard_transect = segment_and_shard_transect
load_transect_from_shards = load_transect_from_shards_rel