#!/usr/bin/env python
# coding: utf-8
"""
Convert dataset of CSV exports from Echoview into shards.
"""
# This file is part of Echofilter.
#
# Copyright (C) 2020-2022 Scott C. Lowe and Offshore Energy Research Association (OERA)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import functools
import multiprocessing
import os
import sys
import traceback
from tqdm.autonotebook import tqdm
import echofilter.raw
import echofilter.ui
ROOT_DATA_DIR = echofilter.raw.loader.ROOT_DATA_DIR
[docs]def generate_shard(
transect_pth,
verbose=False,
fail_gracefully=True,
**kwargs,
):
"""
Shard a single transect.
Wrapper around echofilter.raw.shardloader.segment_and_shard_transect which
adds verboseness and graceful failure options.
Parameters
----------
transect_pth : str
Relative path to transect.
verbose : bool, optional
Whether to print which transect is being processed. Default is ``False``.
fail_gracefully : bool, optional
If ``True``, any transect which triggers an errors during processing
will be printed out, but processing the rest of the transects will
continue. If ``False``, the process will halt with an error as soon as
any single transect hits an error. Default is ``True``.
**kwargs
See :meth:`echofilter.raw.shardloader.segment_and_shard_transect`.
"""
if verbose:
print("Sharding {}".format(transect_pth))
try:
echofilter.raw.shardloader.segment_and_shard_transect(
transect_pth,
**kwargs,
)
except Exception as ex:
with echofilter.ui.style.error_message():
if not fail_gracefully:
raise ex
print("Error sharding {}".format(transect_pth))
print("".join(traceback.TracebackException.from_exception(ex).format()))
[docs]def generate_shards(
partition,
dataset,
partitioning_version="firstpass",
progress_bar=False,
ncores=None,
verbose=False,
fail_gracefully=True,
root_data_dir=ROOT_DATA_DIR,
**kwargs,
):
"""
Shard all transections in one partition of a dataset.
Wrapper around echofilter.raw.shardloader.segment_and_shard_transect which
adds verboseness and graceful failure options.
Parameters
----------
partition : str
Name of the partition to process (``'train'``, ``'validate'``, ``'test'``,
etc).
dataset : str
Name of the dataset to process (``'mobile'``, ``'MinasPassage'``, etc).
partitioning_version : str, optional
Name of the partition version to use process. Default is ``'firstpass'``.
progress_bar : bool, optional
Whether to output a progress bar using ``tqdm``. Default is ``False``.
ncores : int, optional
Number of cores to use for multiprocessing. To disable multiprocessing,
set to ``1``. Set to ``None`` to use all available cores.
Default is ``None``.
verbose : bool, optional
Whether to print which transect is being processed. Default is ``False``.
fail_gracefully : bool, optional
If ``True``, any transect which triggers an errors during processing
will be printed out, but processing the rest of the transects will
continue. If ``False``, the process will halt with an error as soon as
any single transect hits an error. Default is ``True``.
**kwargs
See :func:`echofilter.raw.shardloader.segment_and_shard_transect`.
"""
if verbose:
print('Getting partition list "{}" for "{}"'.format(partition, dataset))
transect_pths = echofilter.raw.loader.get_partition_list(
partition,
dataset=dataset,
full_path=False,
partitioning_version=partitioning_version,
root_data_dir=root_data_dir,
)
if verbose:
print("Will process {} transects".format(len(transect_pths)))
print()
disable_tqdm = not progress_bar
fn = functools.partial(
generate_shard,
dataset=dataset,
verbose=verbose,
fail_gracefully=fail_gracefully,
root_data_dir=root_data_dir,
**kwargs,
)
if ncores == 1:
for transect_pth in tqdm(
transect_pths, total=len(transect_pths), disable=disable_tqdm
):
fn(transect_pth)
else:
with multiprocessing.Pool(ncores) as pool:
for _ in tqdm(
pool.imap_unordered(fn, transect_pths),
total=len(transect_pths),
disable=disable_tqdm,
):
pass
[docs]def get_parser():
"""
Build parser for command line interface for generating shards.
Returns
-------
parser : argparse.ArgumentParser
CLI argument parser for generating shards.
"""
import argparse
# Create parser
prog = os.path.split(sys.argv[0])[1]
if prog == "__main__.py" or prog == "__main__":
prog = os.path.split(__file__)[1]
parser = argparse.ArgumentParser(
prog=prog,
description="Generate dataset shards",
)
parser.add_argument(
"--version",
"-V",
action="version",
version="%(prog)s {version}".format(version=echofilter.__version__),
)
parser.add_argument(
"partition",
type=str,
help="partition to shard",
)
parser.add_argument(
"dataset",
type=str,
help="dataset to shard",
)
parser.add_argument(
"--root",
dest="root_data_dir",
type=str,
default=ROOT_DATA_DIR,
help="root data directory",
)
parser.add_argument(
"--partitioning-version",
type=str,
default="firstpass",
help="partitioning version",
)
parser.add_argument(
"--max-depth",
type=float,
default=None,
help="maximum depth to include in sharded data",
)
parser.add_argument(
"--shard-len",
type=int,
default=128,
help="number of samples in each shard",
)
parser.add_argument(
"--ncores",
type=int,
default=None,
help="number of cores to use (default: all). Set to 1 to disable"
" multiprocessing.",
)
parser.add_argument(
"--verbose",
"-v",
action="count",
default=0,
help="increase verbosity",
)
return parser
def _get_parser_sphinx():
"""
Pre-format parser help for sphinx-argparse processing.
"""
return echofilter.ui.formatters.format_parser_for_sphinx(get_parser())
[docs]def main(args=None):
"""
Command line interface for generating dataset shards from CSV files.
"""
parser = get_parser()
# Parse command line arguments
args = parser.parse_args(args)
# Check the input directory exists
print("Sharding {} partition of {}".format(args.partition, args.dataset))
# Run command with these arguments
generate_shards(**vars(args))
if __name__ == "__main__":
main()