Source code for echofilter.inference

#!/usr/bin/env python

"""
Inference routine.
"""

# This file is part of Echofilter.
#
# Copyright (C) 2020-2022  Scott C. Lowe and Offshore Energy Research Association (OERA)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

import datetime
import os
import pprint
import sys
import tempfile
import textwrap
import time

import numpy as np
import torch
import torch.nn
import torch.utils.data
import torchvision.transforms
from matplotlib import colors as mcolors
from tqdm.auto import tqdm

import echofilter.data.transforms
import echofilter.nn
import echofilter.path
import echofilter.raw
import echofilter.ui
import echofilter.ui.checkpoints
import echofilter.utils
import echofilter.win
from echofilter.nn.unet import UNet
from echofilter.nn.utils import count_parameters
from echofilter.nn.wrapper import Echofilter
from echofilter.raw.manipulate import join_transect, pad_transect, split_transect
from echofilter.raw.utils import fillholes2d
from echofilter.ui.inference_cli import DEFAULT_VARNAME, cli, main

EV_UNDEFINED_DEPTH = -10000.99


[docs]def run_inference( paths, source_dir=".", recursive_dir_search=True, extensions="csv", skip_existing=False, skip_incompatible=False, output_dir="", dry_run=False, continue_on_error=False, overwrite_existing=False, overwrite_ev_lines=False, import_into_evfile=True, generate_turbulence_line=True, generate_bottom_line=True, generate_surface_line=True, add_nearfield_line=True, suffix_file="", suffix_var=None, color_turbulence="orangered", color_turbulence_offset=None, color_bottom="orangered", color_bottom_offset=None, color_surface="green", color_surface_offset=None, color_nearfield="mediumseagreen", thickness_turbulence=2, thickness_turbulence_offset=None, thickness_bottom=2, thickness_bottom_offset=None, thickness_surface=1, thickness_surface_offset=None, thickness_nearfield=1, cache_dir=None, cache_csv=None, suffix_csv="", keep_ext=False, line_status=3, offset_turbulence=1.0, offset_bottom=1.0, offset_surface=1.0, nearfield=1.7, cutoff_at_nearfield=None, lines_during_passive="interpolate-time", collate_passive_length=10, collate_removed_length=10, minimum_passive_length=10, minimum_removed_length=-1, minimum_patch_area=-1, patch_mode=None, variable_name=DEFAULT_VARNAME, export_raw_csv=True, row_len_selector="mode", facing="auto", use_training_standardization=False, prenorm_nan_value=None, postnorm_nan_value=None, crop_min_depth=None, crop_max_depth=None, autocrop_threshold=0.35, image_height=None, checkpoint=None, force_unconditioned=False, logit_smoothing_sigma=0, device=None, hide_echoview="new", minimize_echoview=False, verbose=2, ): """ Perform inference on input files, and generate output files. Outputs are written as lines in EVL and regions in EVR file formats. Parameters ---------- paths : iterable or str Files and folders to be processed. These may be full paths or paths relative to ``source_dir``. For each folder specified, any files with extension ``"csv"`` within the folder and all its tree of subdirectories will be processed. source_dir : str, default="." Path to directory where files are found. recursive_dir_search : bool, default=True How to handle directory inputs in ``paths``. If ``False``, only files (with the correct extension) in the directory will be included. If ``True``, subdirectories will also be walked through to find input files. extensions : iterable or str, default="csv" File extensions to detect when running on a directory. skip_existing : bool, default=False Skip processing files which already have all outputs present. skip_incompatible : bool, default=False Skip processing CSV files which do not seem to contain an exported Echoview transect. If ``False``, an error is raised. output_dir : str, default="" Directory where output files will be written. If this is an empty string, outputs are written to the same directory as each input file. Otherwise, they are written to ``output_dir``, preserving their path relative to ``source_dir`` if relative paths were used. dry_run : bool, default=False If ``True``, perform a trial run with no changes made. continue_on_error : bool, default=False Continue running on remaining files if one file hits an error. overwrite_existing : bool, default=False Overwrite existing outputs without producing a warning message. If ``False``, an error is generated if files would be overwritten. overwrite_ev_lines : bool, default=False Overwrite existing lines within the Echoview file without warning. If ``False`` (default), the current datetime will be appended to line variable names in the event of a collision. import_into_evfile : bool, default=True Whether to import the output lines and regions into the EV file, whenever the file being processed in an EV file. generate_turbulence_line : bool, default=True Whether to output an evl file for the turbulence line. If this is ``False``, the turbulence line is also never imported into Echoview. generate_bottom_line : bool, default=True Whether to output an evl file for the bottom line. If this is ``False``, the bottom line is also never imported into Echoview. generate_surface_line : bool, default=True Whether to output an evl file for the surface line. If this is ``False``, the surface line is also never imported into Echoview. add_nearfield_line : bool, default=True Whether to add a nearfield line to the EV file in Echoview. suffix_file : str, default="" Suffix to append to output artifacts (evl and evr files), between the name of the file and the extension. If ``suffix_file`` begins with an alphanumeric character, ``"-"`` is prepended. suffix_var : str, optional Suffix to append to line and region names when imported back into EV file. If ``suffix_var`` begins with an alphanumeric character, ``"-"`` is prepended. By default, ``suffix_var`` will match ``suffix_file`` if it is set, and will be "_echofilter" otherwise. color_turbulence : str, default="orangered" Color to use for the turbulence line when it is imported into Echoview. This can either be the name of a supported color from matplotlib.colors, or a hexadecimal color, or a string representation of an RGB color to supply directly to Echoview (such as "(0,255,0)"). color_turbulence_offset : str, optional Color to use for the offset turbulence line when it is imported into Echoview. By default, ``color_turbulence`` is used. color_bottom : str, default="orangered" Color to use for the bottom line when it is imported into Echoview. This can either be the name of a supported color from matplotlib.colors, or a hexadecimal color, or a string representation of an RGB color to supply directly to Echoview (such as "(0,255,0)"). color_bottom_offset : str, optional Color to use for the offset bottom line when it is imported into Echoview. By default, ``color_bottom`` is used. color_surface : str, default="green" Color to use for the surface line when it is imported into Echoview. This can either be the name of a supported color from matplotlib.colors, or a hexadecimal color, or a string representation of an RGB color to supply directly to Echoview (such as "(0,255,0)"). color_surface_offset : str, optional Color to use for the offset surface line when it is imported into Echoview. By default, ``color_surface`` is used. color_nearfield : str, default="mediumseagreen" Color to use for the nearfield line when it is created in Echoview. This can either be the name of a supported color from matplotlib.colors, or a hexadecimal color, or a string representation of an RGB color to supply directly to Echoview (such as "(0,255,0)"). thickness_turbulence : int, default=2 Thickness with which the turbulence line will be displayed in Echoview. thickness_turbulence_offset : str, optional Thickness with which the offset turbulence line will be displayed in Echoview. By default, ``thickness_turbulence`` is used. thickness_bottom : int, default=2 Thickness with which the bottom line will be displayed in Echoview. thickness_bottom_offset : str, optional Thickness with which the offset bottom line will be displayed in Echoview. By default, ``thickness_bottom`` is used. thickness_surface : int, default=1 Thickness with which the surface line will be displayed in Echoview. thickness_surface_offset : str, optional Thickness with which the offset surface line will be displayed in Echoview. By default, ``thickness_surface`` is used. thickness_nearfield : int, default=1 Thickness with which the nearfield line will be displayed in Echoview. cache_dir : str, optional Path to directory where downloaded checkpoint files should be cached. By default, an OS-appropriate application-specific default cache directory is used. cache_csv : str, optional Path to directory where CSV files generated from EV inputs should be cached. By default, EV files which are exported to CSV files are temporary files, deleted after this program has completed. If ``cache_csv=""``, the CSV files are cached in the same directory as the input EV files. suffix_csv : str, default="" Suffix used for cached CSV files which are exported from EV files. If ``suffix_file`` begins with an alphanumeric character, a delimiter is prepended. The delimiter is ``"."`` if ``keep_ext=True`` or ``"-"`` if ``keep_ext=False``. keep_ext : bool, default=False Whether to preserve the file extension in the input file name when generating output file name. Default is ``False``, removing the extension. line_status : int, default=3 Status to use for the lines. Must be one of: - ``0`` : none - ``1`` : unverified - ``2`` : bad - ``3`` : good offset_turbulence : float, default=1.0 Offset for turbulence line, which moves the turbulence line deeper. offset_bottom : float, default=1.0 Offset for bottom line, which moves the line to become more shallow. offset_surface : float, default=1.0 Offset for surface line, which moves the surface line deeper. nearfield : float, default=1.7 Nearfield approach distance, in metres. If the echogram is downward facing, the nearfield cutoff depth will be at a depth equal to the nearfield distance. If the echogram is upward facing, the nearfield cutoff will be ``nearfield`` meters above the deepest depth recorded in the input data. When processing an EV file, by default a nearfield line will be added at the nearfield cutoff depth. To prevent this behaviour, use the --no-nearfield-line argument. cutoff_at_nearfield : bool, optional Whether to cut-off the turbulence line (for downfacing data) or bottom line (for upfacing) when it is closer to the echosounder than the ``nearfield`` distance. By default, the bottom line is clipped (for upfacing data), but the turbulence line is not clipped (even with downfacing data). lines_during_passive : str, default="interpolate-time" Method used to handle line depths during collection periods determined to be passive recording instead of active recording. Options are: ``"interpolate-time"`` depths are linearly interpolated from active recording periods, using the time at which recordings where made. ``"interpolate-index"`` depths are linearly interpolated from active recording periods, using the index of the recording. ``"predict"`` the model's prediction for the lines during passive data collection will be kept; the nature of the prediction depends on how the model was trained. ``"redact"`` no depths are provided during periods determined to be passive data collection. ``"undefined"`` depths are replaced with the placeholder value used by Echoview to denote undefined values, which is ``-10000.99``. collate_passive_length : int, default=10 Maximum interval, in ping indices, between detected passive regions which will removed to merge consecutive passive regions together into a single, collated, region. collate_passive_length : int, default=10 Maximum interval, in ping indices, between detected blocks (vertical rectangles) marked for removal which will also be removed to merge consecutive removed blocks together into a single, collated, region. minimum_passive_length : int, default=10 Minimum length, in ping indices, which a detected passive region must have to be included in the output. Set to ``-1`` to omit all detected passive regions from the output. minimum_removed_length : int, default=-1 Minimum length, in ping indices, which a detected removal block (vertical rectangle) must have to be included in the output. Set to -1 to omit all detected removal blocks from the output (default). Recommended minimum length is 10. minimum_patch_area : int, default=-1 Minimum area, in pixels, which a detected removal patch (contour/polygon) region must have to be included in the output. Set to -1 to omit all detected patches from the output (default). Recommended minimum length 25. patch_mode : str, optional Type of mask patches to use. Must be supported by the model checkpoint used. Should be one of: ``"merged"`` Target patches for training were determined after merging as much as possible into the turbulence and bottom lines. ``"original"`` Target patches for training were determined using original lines, before expanding the turbulence and bottom lines. ``"ntob"`` Target patches for training were determined using the original bottom line and the merged turbulence line. By default, ``"merged"`` is used if downfacing and ``"ntob"`` is used if upfacing. variable_name : str, default="Fileset1: Sv pings T1" Name of the Echoview acoustic variable to load from EV files. export_raw_csv : bool, default=True If ``True`` (default), exclusion and threshold settings in the EV file are temporarily disabled before exporting the CSV, in order to ensure all raw data is exported. If ``False``, thresholds and exclusions are used as per the EV file. row_len_selector : str, default="mode" Method used to handle input csv files with different number of Sv values across time (i.e. a non-rectangular input). See :meth:`echofilter.raw.loader.transect_loader` for options. facing : {"downward", "upward", "auto"}, default="auto" Orientation in which the echosounder is facing. Default is ``"auto"``, in which case the orientation is determined from the ordering of the depth values in the data (increasing = ``"upward"``, decreasing = ``"downward"``). use_training_standardization : bool, default=False Whether to use the exact normalization center and deviation values as used during training. If ``False`` (default), the center and deviation are determined per sample, using the same method methodology as used to determine the center and deviation values for training. prenorm_nan_value : float, optional If this is set, replace NaN values with a given Sv value before the data normalisation (Gaussian standardisation) step. By default, NaNs are left as they are until after standardising the data. postnorm_nan_value : float, optional Placeholder value to replace NaNs with. Does nothing if ``prenorm_nan_value`` is set. By default this is set to the value used to train the model. crop_min_depth : float, optional Minimum depth to include in input. By default, there is no minimum depth. crop_max_depth : float, optional Maxmimum depth to include in input. By default, there is no maximum depth. autocrop_threshold : float, default=0.35 Minimum fraction of input height which must be found to be removable for the model to be re-run with an automatically cropped input. image_height : int, optional Height in pixels of input to model. The data loaded from the csv will be resized to this height (the width of the image is unchanged). By default, the height matches that used when the model was trained. checkpoint : str, optional A path to a checkpoint file, or name of a checkpoint known to this package (listed in ``echofilter/checkpoints.yaml``). By default, the first checkpoint in ``checkpoints.yaml`` is used. force_unconditioned : bool, default=False Whether to always use unconditioned logit outputs. If ``False`` (default) conditional logits will be used if the checkpoint loaded is for a conditional model. logit_smoothing_sigma : float, optional Standard deviation over which logits will be smoothed before being converted into output. Disabled by default. device : str or torch.device, optional Name of device on which the model will be run. By default, the first available CUDA GPU is used if any are found, and otherwise the CPU is used. Set to ``"cpu"`` to use the CPU even if a CUDA GPU is available. hide_echoview : {"never", "new", "always"}, default="new" Whether to hide the Echoview window entirely while the code runs. If ``hide_echoview="new"``, the application is only hidden if it was created by this function, and not if it was already running. If ``hide_echoview="always"``, the application is hidden even if it was already running. In the latter case, the window will be revealed again when this function is completed. minimize_echoview : bool, default=False If ``True``, the Echoview window being used will be minimized while this function is running. verbose : int, default=2 Verbosity level. Set to ``0`` to disable print statements, or elevate to a higher number to increase verbosity. """ t_start_prog = time.time() if isinstance(paths, str): paths = [paths] progress_fmt = ( echofilter.ui.style.dryrun_fmt if dry_run else echofilter.ui.style.progress_fmt ) existing_file_msg = ( "Run with overwrite_existing=True (with the command line" " interface, use the --force flag) to overwrite existing" " outputs." ) if verbose >= 1: print( progress_fmt( "{}Starting inference {}.{} {}".format( echofilter.ui.style.HighlightStyle.start, "dry-run" if dry_run else "routine", echofilter.ui.style.HighlightStyle.reset, datetime.datetime.now().strftime("%A, %B %d, %Y at %H:%M:%S"), ) ) ) if device is None: if verbose >= 2: print("Checking for available GPU device with CUDA capability...") device = "cuda" if torch.cuda.is_available() else "cpu" if verbose >= 2: print( "Will run on {}{}{}".format( echofilter.ui.style.HighlightStyle.start, device, echofilter.ui.style.HighlightStyle.reset, ) ) device = torch.device(device) if facing is None: facing = "auto" if facing.startswith("down"): facing = "downward" if facing.startswith("up"): facing = "upward" if suffix_file and suffix_file[0].isalpha(): suffix_file = "-" + suffix_file if suffix_var and suffix_var[0].isalpha(): suffix_var = "-" + suffix_var elif suffix_var is not None: pass elif suffix_file: suffix_var = suffix_file else: suffix_var = "_echofilter" if suffix_csv and suffix_csv[0].isalpha(): if keep_ext: suffix_csv = "." + suffix_csv else: suffix_csv = "-" + suffix_csv line_colors = dict( turbulence=color_turbulence, turbulence_offset=color_turbulence_offset, bottom=color_bottom, bottom_offset=color_bottom_offset, surface=color_surface, surface_offset=color_surface_offset, nearfield=color_nearfield, ) line_thicknesses = dict( turbulence=thickness_turbulence, turbulence_offset=thickness_turbulence_offset, bottom=thickness_bottom, bottom_offset=thickness_bottom_offset, surface=thickness_surface, surface_offset=thickness_surface_offset, nearfield=thickness_nearfield, ) # Carry over default line colours and thicknesses for lname in ["turbulence", "bottom", "surface"]: key_source = lname key_dest = lname + "_offset" if line_colors[key_dest] is None: line_colors[key_dest] = line_colors[key_source] if line_thicknesses[key_dest] is None: line_thicknesses[key_dest] = line_thicknesses[key_source] if verbose >= 2: print("Fetching checkpoint...") # Load checkpoint checkpoint, ckpt_name = echofilter.ui.checkpoints.load_checkpoint( checkpoint, cache_dir=cache_dir, device=device, return_name=True, verbose=verbose, ) if verbose >= 5: print("Loaded checkpoint {}:".format(ckpt_name)) pprint.pprint( { k: checkpoint[k] for k in ( "sample_shape", "epoch", "best_loss", "data_center", "data_deviation", "center_method", "deviation_method", "nan_value", "training_routine", ) } ) if image_height is None: image_height = checkpoint.get("sample_shape", (128, 512))[1] if use_training_standardization: center_param = checkpoint.get("data_center", -80.0) deviation_param = checkpoint.get("data_deviation", 20.0) else: center_param = checkpoint.get("center_method", "mean") deviation_param = checkpoint.get("deviation_method", "stdev") if postnorm_nan_value is None: postnorm_nan_value = checkpoint.get("nan_value", -3) if verbose >= 4: print("Constructing U-Net model, with arguments:") pprint.pprint(checkpoint["model_parameters"]) elif verbose >= 2: print("Constructing U-Net model...") unet = UNet(**checkpoint["model_parameters"]) if hasattr(logit_smoothing_sigma, "__len__"): max_logit_smoothing_sigma = max(logit_smoothing_sigma) else: max_logit_smoothing_sigma = logit_smoothing_sigma if max_logit_smoothing_sigma > 0: ks = max(11, int(np.round(max_logit_smoothing_sigma * 6))) ks += (ks + 1) % 2 # Increment to odd number if even model = torch.nn.Sequential( unet, echofilter.nn.modules.GaussianSmoothing( channels=checkpoint["model_parameters"]["out_channels"], kernel_size=ks, sigma=logit_smoothing_sigma, ), ) else: model = unet model = Echofilter( model, mapping=checkpoint.get("wrapper_mapping", None), **checkpoint.get("wrapper_params", {}), ) is_conditional_model = model.params.get("conditional", False) if verbose >= 3: print( "Built {}model with {} trainable parameters".format( "conditional " if is_conditional_model else "", count_parameters(model, only_trainable=True), ) ) try: unet.load_state_dict(checkpoint["state_dict"]) if verbose >= 3: print("Loaded U-Net state from the checkpoint") except RuntimeError as err: if verbose >= 5: s = ( "Warning: Checkpoint doesn't seem to be for the UNet." "Trying to load it as the whole model instead." ) s = echofilter.ui.style.warning_fmt(s) print(s) try: model.load_state_dict(checkpoint["state_dict"]) if verbose >= 3: print("Loaded model state from the checkpoint") except RuntimeError: msg = ( "Could not load the checkpoint state as either the whole model" "or the unet component." ) with echofilter.ui.style.error_message(msg) as msg: print(msg) raise err # Ensure model is on correct device model.to(device) # Put model in evaluation mode model.eval() files_input = paths files = list( echofilter.path.parse_files_in_folders( paths, source_dir, extensions, recursive=recursive_dir_search ) ) if verbose >= 1: print( progress_fmt( "Processing {}{} file{}{}...".format( echofilter.ui.style.HighlightStyle.start, len(files), "" if len(files) == 1 else "s", echofilter.ui.style.HighlightStyle.reset, ) ) ) if len(extensions) == 1 and "ev" in extensions: do_open = True else: do_open = False for file in files: if os.path.splitext(file)[1].lower() == ".ev": do_open = True break if dry_run: if verbose >= 3: print( "Echoview application would{} be opened {}.".format( "" if do_open else " not", "to convert EV files to CSV" if do_open else "(no EV files to process)", ) ) do_open = False common_notes = textwrap.dedent( """ Classified by echofilter {ver} at {dt} Model checkpoint: {ckpt_name} """.format( ver=echofilter.__version__, dt=datetime.datetime.now().astimezone().isoformat(timespec="seconds"), ckpt_name=os.path.split(ckpt_name)[1], ) ) disable_tqdm = len(files) == 1 or verbose <= 0 skip_count = 0 incompatible_count = 0 error_msgs = [] # Open Echoview connection with echofilter.win.maybe_open_echoview( do_open=do_open, minimize=minimize_echoview, hide=hide_echoview, ) as ev_app: for fname in tqdm(files, desc="Files", ascii=True, disable=disable_tqdm): if verbose >= 2: print( "\n" + progress_fmt( "Processing {}".format( echofilter.ui.style.highlight_fmt(fname), ) ) ) # Check what the full path should be fname_full = echofilter.path.determine_file_path(fname, source_dir) # Determine where destination should be placed destination = echofilter.path.determine_destination( fname, fname_full, source_dir, output_dir ) if not keep_ext: destination = os.path.splitext(destination)[0] # Make a list of all the outputs we will produce dest_files = {} for name in ("turbulence", "bottom", "surface"): dest_files[name] = "{}.{}{}.evl".format(destination, name, suffix_file) if not generate_turbulence_line: dest_files.pop("turbulence") if not generate_bottom_line: dest_files.pop("bottom") if not generate_surface_line: dest_files.pop("surface") dest_files["regions"] = "{}.{}{}.evr".format( destination, "regions", suffix_file ) # Check if any of them exists and if there is any missing any_missing = False clobbers = [] for _, dest_file in dest_files.items(): if os.path.isfile(dest_file): clobbers.append(dest_file) else: any_missing = True # Check whether to skip processing this file if skip_existing and not any_missing: if verbose >= 2: print(echofilter.ui.style.skip_fmt(" Skipping {}".format(fname))) skip_count += 1 continue # Check whether we would clobber a file we can't overwrite if clobbers and not overwrite_existing: msg = "Output " + clobbers[0] + "\n" if len(clobbers) == 2: msg += "and 1 other " elif len(clobbers) > 1: msg += " and {} others ".format(len(clobbers) - 1) msg += "already exist{} for file {}".format( "s" if len(clobbers) == 1 else "", fname, ) with echofilter.ui.style.error_message(msg) as msg: if dry_run: error_msgs.append("Error: " + msg + "\n " + existing_file_msg) print(error_msgs[-1]) continue raise EnvironmentError(msg + "\n " + existing_file_msg) # Determine whether we need to run ev2csv on this file ext = os.path.splitext(fname)[1] if len(ext) > 0: ext = ext[1:].lower() if ext == "csv": process_as_ev = False elif ext == "ev": process_as_ev = True elif len(extensions) == 1 and "csv" in extensions: process_as_ev = False elif len(extensions) == 1 and "ev" in extensions: process_as_ev = True else: msg = "Unsure how to process file {} with unrecognised extension {}".format( fname, ext ) if not skip_incompatible: with echofilter.ui.style.error_message(msg) as msg: raise EnvironmentError(msg) if verbose >= 2: print( echofilter.ui.style.skip_fmt( " Skipping incompatible file {}".format(fname) ) ) incompatible_count += 1 continue # If we are processing an ev file, we need to export it as a raw # csv file. Unless it has already been exported (which we will # check for below). export_to_csv = process_as_ev # Make a temporary directory in case we are not caching generated csvs # Directory and all its contents are deleted when we leave this context with tempfile.TemporaryDirectory() as tmpdirname: # Convert ev file to csv, if necessary ev2csv_dir = cache_csv if ev2csv_dir is None: ev2csv_dir = tmpdirname if not export_to_csv: csv_fname = fname_full else: # Determine where exported CSV file should be placed csv_fname = echofilter.path.determine_destination( fname, fname_full, source_dir, ev2csv_dir ) if not keep_ext: csv_fname = os.path.splitext(csv_fname)[0] csv_fname += suffix_csv + ".csv" if os.path.isfile(csv_fname): # If CSV file is already cached, no need to re-export it export_to_csv = False if not export_to_csv: pass elif dry_run: if verbose >= 1: print( " Would export {} as CSV file {}".format( fname_full, csv_fname ) ) else: # Import ev2csv now. We delay this import so Linux users # without pywin32 can run on CSV files. from echofilter.ev2csv import ev2csv # Export the CSV file fname_full = os.path.abspath(fname_full) csv_fname = os.path.abspath(csv_fname) ev2csv( fname_full, csv_fname, variable_name=variable_name, export_raw=export_raw_csv, ev_app=ev_app, verbose=verbose - 1, ) if (dry_run and verbose >= 2) or verbose >= 3: ww = "Would" if dry_run else "Will" print(" {} write files:".format(ww)) for key, fname in dest_files.items(): if os.path.isfile(fname) and overwrite_existing: over_txt = " " + echofilter.ui.style.overwrite_fmt( "(overwriting existing file)" ) else: over_txt = "" tp = "line" if os.path.splitext(fname)[1] == ".evl" else "file" print( " {} export {} {} to: {}{}".format( ww, key, tp, fname, over_txt ) ) if dry_run: continue # Load the data try: timestamps, depths, signals = echofilter.raw.loader.transect_loader( csv_fname, warn_row_overflow=0, row_len_selector=row_len_selector, ) except KeyError: if skip_incompatible and fname not in files_input: if verbose >= 2: print( echofilter.ui.style.skip_fmt( " Skipping incompatible file {}".format(fname) ) ) incompatible_count += 1 continue msg = "CSV file {} could not be loaded.".format(fname) with echofilter.ui.style.error_message(msg) as msg: print(msg) raise try: output = inference_transect( model, timestamps, depths, signals, device, image_height, facing=facing, crop_min_depth=crop_min_depth, crop_max_depth=crop_max_depth, autocrop_threshold=autocrop_threshold, force_unconditioned=force_unconditioned, data_center=center_param, data_deviation=deviation_param, prenorm_nan_value=prenorm_nan_value, postnorm_nan_value=postnorm_nan_value, verbose=verbose - 1, ) except BaseException as err: if not continue_on_error: raise print(echofilter.ui.style.error_fmt("An error has occurred:")) print(echofilter.ui.style.error_fmt(str(err))) error_msgs.append(err) continue if verbose >= 5: s = "\n ".join([""] + [str(k) for k in output.keys()]) print(" Generated model output with fields:" + s) if is_conditional_model and not force_unconditioned: if output["is_upward_facing"]: cs = "|upfacing" else: cs = "|downfacing" if verbose >= 4: print( echofilter.ui.style.aside_fmt( " Using conditional probability outputs from model:" " p(state{})".format(cs) ) ) else: cs = "" if is_conditional_model and verbose >= 4: print( echofilter.ui.style.aside_fmt( "Using unconditioned output from conditional model" ) ) # Convert output into lines surface_depths = output["depths"][ echofilter.utils.last_nonzero( output["p_is_above_surface" + cs] > 0.5, -1 ) ] turbulence_depths = output["depths"][ echofilter.utils.last_nonzero( output["p_is_above_turbulence" + cs] > 0.5, -1 ) ] bottom_depths = output["depths"][ echofilter.utils.first_nonzero( output["p_is_below_bottom" + cs] > 0.5, -1 ) ] line_timestamps = output["timestamps"].copy() is_passive = output["p_is_passive" + cs] > 0.5 if lines_during_passive == "predict": pass elif lines_during_passive == "redact": surface_depths = surface_depths[~is_passive] turbulence_depths = turbulence_depths[~is_passive] bottom_depths = bottom_depths[~is_passive] line_timestamps = line_timestamps[~is_passive] elif lines_during_passive == "undefined": surface_depths[is_passive] = EV_UNDEFINED_DEPTH turbulence_depths[is_passive] = EV_UNDEFINED_DEPTH bottom_depths[is_passive] = EV_UNDEFINED_DEPTH elif lines_during_passive.startswith("interp"): if lines_during_passive == "interpolate-time": x = line_timestamps elif lines_during_passive == "interpolate-index": x = np.arange(len(line_timestamps)) else: msg = "Unsupported passive line interpolation method: {}".format( lines_during_passive ) with echofilter.ui.style.error_message(msg) as msg: raise ValueError(msg) if len(x[~is_passive]) == 0: if verbose >= 0: s = ( "Could not interpolate depths for passive data for" " {}, as all data appears to be from passive" " collection. The original model predictions will" " be kept instead.".format(fname) ) s = echofilter.ui.style.warning_fmt(s) print(s) else: surface_depths[is_passive] = np.interp( x[is_passive], x[~is_passive], surface_depths[~is_passive] ) turbulence_depths[is_passive] = np.interp( x[is_passive], x[~is_passive], turbulence_depths[~is_passive] ) bottom_depths[is_passive] = np.interp( x[is_passive], x[~is_passive], bottom_depths[~is_passive] ) else: msg = "Unsupported passive line method: {}".format(lines_during_passive) with echofilter.ui.style.error_message(msg) as msg: raise ValueError(msg) if output["is_upward_facing"]: nearfield_depth = np.max(depths) - nearfield else: nearfield_depth = np.min(depths) + nearfield # Export evl files destination_dir = os.path.dirname(destination) if destination_dir != "": os.makedirs(destination_dir, exist_ok=True) for line_name, line_depths in ( ("turbulence", turbulence_depths), ("bottom", bottom_depths), ("surface", surface_depths), ): if line_name not in dest_files: continue dest_file = dest_files[line_name] if verbose >= 3: s = " Writing output" if not os.path.exists(dest_file): pass elif not overwrite_existing: s = echofilter.ui.style.error_fmt(s) else: s = echofilter.ui.style.overwrite_fmt(s) s += " {}".format(dest_file) print(s) if os.path.exists(dest_file) and not overwrite_existing: msg = "Output {} already exists.".format(dest_file) with echofilter.ui.style.error_message(msg) as msg: raise EnvironmentError(msg + "\n " + existing_file_msg) echofilter.raw.loader.evl_writer( dest_file, line_timestamps, line_depths, pad=True, status=line_status, ) # Export evr file dest_file = dest_files["regions"] if verbose >= 3: s = " Writing output" if not os.path.exists(dest_file): pass elif not overwrite_existing: s = echofilter.ui.style.error_fmt(s) else: s = echofilter.ui.style.overwrite_fmt(s) s += " {}".format(dest_file) print(s) if os.path.exists(dest_file) and not overwrite_existing: msg = "Output {} already exists.".format(dest_file) with echofilter.ui.style.error_message(msg) as msg: raise EnvironmentError(msg + "\n " + existing_file_msg) patches_key = "p_is_patch" if patch_mode is None: if output["is_upward_facing"]: patches_key += "-ntob" elif patch_mode != "merged": patches_key += "-" + patch_mode echofilter.raw.loader.write_transect_regions( dest_file, output, depth_range=depths, passive_key="p_is_passive" + cs, removed_key="p_is_removed" + cs, patches_key=patches_key + cs, collate_passive_length=collate_passive_length, collate_removed_length=collate_removed_length, minimum_passive_length=minimum_passive_length, minimum_removed_length=minimum_removed_length, minimum_patch_area=minimum_patch_area, name_suffix=suffix_var, common_notes=common_notes, verbose=verbose - 2, verbose_indent=2, ) if not process_as_ev or not import_into_evfile: # Done with processing this file continue target_names = {key: key + suffix_var for key in dest_files} target_names["nearfield"] = "nearfield" + suffix_var offsets = dict( turbulence=offset_turbulence, bottom=offset_bottom, surface=offset_surface, ) lines_cutoff_at_nearfield = [] if output["is_upward_facing"]: if cutoff_at_nearfield or cutoff_at_nearfield is None: lines_cutoff_at_nearfield = ["bottom"] elif cutoff_at_nearfield: lines_cutoff_at_nearfield = ["turbulence"] import_lines_regions_to_ev( fname_full, dest_files, target_names=target_names, nearfield_depth=nearfield_depth, add_nearfield_line=add_nearfield_line, lines_cutoff_at_nearfield=lines_cutoff_at_nearfield, offsets=offsets, line_colors=line_colors, line_thicknesses=line_thicknesses, ev_app=ev_app, overwrite=overwrite_ev_lines, common_notes=common_notes, verbose=verbose, ) if verbose >= 1: print( progress_fmt( "{}Finished {}processing {} file{}{}.".format( echofilter.ui.style.HighlightStyle.start, "simulating " if dry_run else "", len(files), "" if len(files) == 1 else "s", echofilter.ui.style.HighlightStyle.reset, ) ) ) skip_total = skip_count + incompatible_count if skip_total > 0: s = "" s += "Of these, {}{}{} file{} skipped{}: {} already processed".format( "all " if skip_total == len(files) else "", echofilter.ui.style.HighlightStyle.start, skip_total, " was" if skip_total == 1 else "s were", echofilter.ui.style.HighlightStyle.reset, skip_count, ) if not dry_run: s += ", {} incompatible".format(incompatible_count) s += "." s = echofilter.ui.style.skip_fmt(s) print(s) if error_msgs: print( echofilter.ui.style.error_fmt( "There {} {} error{}:".format( "was" if len(error_msgs) == 1 else "were", len(error_msgs), "" if len(error_msgs) == 1 else "s", ) ) ) for error_msg in error_msgs: print(echofilter.ui.style.error_fmt(str(error_msg))) print( "Total runtime: {}".format( datetime.timedelta(seconds=time.time() - t_start_prog) ) )
[docs]def inference_transect( model, timestamps, depths, signals, device, image_height, facing="auto", crop_min_depth=None, crop_max_depth=None, autocrop_threshold=0.35, force_unconditioned=False, data_center="mean", data_deviation="stdev", prenorm_nan_value=None, postnorm_nan_value=-3, dtype=torch.float, verbose=0, ): """ Run inference on a single transect. Parameters ---------- model : echofilter.wrapper.Echofilter A pytorch Module wrapped in an Echofilter UI layer. timestamps : array_like Sample recording timestamps (in seconds since Unix epoch). Must be a vector. depths : array_like Recording depths from the surface (in metres). Must be a vector. signals : array_like Echogram Sv data. Must be a matrix shaped `(len(timestamps), len(depths))`. image_height : int Height to resize echogram before passing through model. facing : {"downward", "upward", "auto"}, default="auto" Orientation in which the echosounder is facing. Default is ``"auto"``, in which case the orientation is determined from the ordering of the depth values in the data (increasing = ``"upward"``, decreasing = ``"downward"``). crop_min_depth : float, optional Minimum depth to include in input. By default, there is no minimum depth. crop_max_depth : float, optional Maxmimum depth to include in input. By default, there is no maximum depth. autocrop_threshold : float, default=0.35 Minimum fraction of input height which must be found to be removable for the model to be re-run with an automatically cropped input. force_unconditioned : bool, optional Whether to always use unconditioned logit outputs when deteriming the new depth range for automatic cropping. data_center : float or str, default="mean" Center point to use, which will be subtracted from the Sv signals (i.e. the overall sample mean). If ``data_center`` is a string, it specifies the method to use to determine the center value from the distribution of intensities seen in this sample transect. data_deviation : float or str, default="stdev" Deviation to use to normalise the Sv signals in divisive manner (i.e. the overall sample standard deviation). If ``data_deviation`` is a string, it specifies the method to use to determine the center value from the distribution of intensities seen in this sample transect. prenorm_nan_value : float, optional If this is set, replace NaN values with a given Sv value before the data normalisation (Gaussian standardisation) step. By default, NaNs are left as they are until after standardising the data. postnorm_nan_value : float, default=-3 Placeholder value to replace NaNs with. Does nothing if ``prenorm_nan_value`` is set. dtype : torch.dtype, default=torch.float Datatype to use for model input. verbose : int, default=0 Level of verbosity. Returns ------- dict Dictionary with fields as output by :class:`echofilter.wrapper.Echofilter`, plus ``timestamps`` and ``depths``. """ facing = facing.lower() timestamps = np.asarray(timestamps) depths = np.asarray(depths) signals = np.asarray(signals) transect = { "timestamps": timestamps, "depths": depths, "signals": signals, } if crop_min_depth is not None: # Apply minimum depth crop depth_crop_mask = transect["depths"] >= crop_min_depth transect["depths"] = transect["depths"][depth_crop_mask] transect["signals"] = transect["signals"][:, depth_crop_mask] if crop_max_depth is not None: # Apply maximum depth crop depth_crop_mask = transect["depths"] <= crop_max_depth transect["depths"] = transect["depths"][depth_crop_mask] transect["signals"] = transect["signals"][:, depth_crop_mask] if prenorm_nan_value is not None: # Replace NaNs before transforming the data transect = echofilter.data.transforms.ReplaceNan(prenorm_nan_value)(transect) # Standardize data distribution transect = echofilter.data.transforms.Normalize(data_center, data_deviation)( transect ) # Configure data to match what the model expects to see # Determine whether depths are ascending or descending is_upward_facing = transect["depths"][-1] < transect["depths"][0] # Ensure depth is always increasing (which corresponds to descending from # the air down the water column) if facing[:2] == "up" or (facing == "auto" and is_upward_facing): transect["depths"] = transect["depths"][::-1].copy() transect["signals"] = transect["signals"][:, ::-1].copy() if facing == "auto" and verbose >= 2: print( echofilter.ui.style.aside_fmt( " Echogram was autodetected as upward facing, and was flipped" " vertically before being input into the model." ) ) if not is_upward_facing: s = ( ' Warning: facing = "{}" was provided, but data appears to be' " downward facing".format(facing) ) s = echofilter.ui.style.warning_fmt(s) print(s) is_upward_facing = True elif facing[:4] != "down" and facing != "auto": msg = 'facing should be one of "downward", "upward", and "auto"' with echofilter.ui.style.error_message(msg) as msg: raise ValueError(msg) elif facing[:4] == "down" and is_upward_facing: s = ( ' Warning: facing = "{}" was provided, but data appears to be' " upward facing".format(facing) ) s = echofilter.ui.style.warning_fmt(s) print(s) is_upward_facing = False elif facing == "auto" and verbose >= 2: print( echofilter.ui.style.aside_fmt( " Echogram was autodetected as downward facing." ) ) # To reduce memory consumption, split into segments whenever the recording # interval is longer than normal segments = split_transect(max_length=1280, **transect) disable_tqdm = verbose < 1 outputs = [] for segment in tqdm( list(segments), desc=" Segments", position=0, ascii=True, disable=disable_tqdm ): # Try to remove any NaNs in the raw input with using 2d interpolation n_nans = np.isnan(segment["signals"]).sum() if n_nans > 0: if verbose >= 1: print(" Interpolating to fill in {} missing Sv values".format(n_nans)) segment["signals"] = fillholes2d(segment["signals"]) # Preprocessing transform transform = torchvision.transforms.Compose( [ echofilter.data.transforms.ReplaceNan(postnorm_nan_value), echofilter.data.transforms.Rescale( (segment["signals"].shape[0], image_height), order=1, ), ] ) segment = transform(segment) # Pad the edges of the segment with extra data. Any padding we add # now will be stripped away from the output by join_transect later. segment = pad_transect(segment) input = torch.tensor(segment["signals"]).unsqueeze(0).unsqueeze(0) input = input.to(device, dtype).contiguous() # Put data through model with torch.no_grad(): output = model(input, output_device="cpu") output = {k: v.squeeze(0).cpu().numpy() for k, v in output.items()} output["timestamps"] = segment["timestamps"] output["depths"] = segment["depths"] output["_pad_start"] = segment["_pad_start"] output["_pad_end"] = segment["_pad_end"] outputs.append(output) output = join_transect(outputs) output["is_upward_facing"] = is_upward_facing if autocrop_threshold >= 1: # If we are not doing auto-crop, return the current output return output # See how much of the depth we could crop away cs = "" if model.params["conditional"] and not force_unconditioned: if output["is_upward_facing"]: cs = "|upfacing" else: cs = "|downfacing" is_passive = output["p_is_passive" + cs] > 0.5 depth_intv = abs(transect["depths"][1] - transect["depths"][0]) if is_upward_facing: # Restrict the top based on the observed surface depths surface_depths = output["depths"][ echofilter.utils.last_nonzero(output["p_is_above_surface" + cs] > 0.5, -1) ] # Redact passive regions if len(surface_depths[~is_passive]) > 0: surface_depths = surface_depths[~is_passive] # Find a good estimator of the maximum. Go for a robust estimate of # four sigma from the middle. pct = np.percentile(surface_depths, [2.275, 97.725]) dev = (pct[1] - pct[0]) / 4 new_crop_min = pct[0] - 2 * dev # Don't go higher than the minimum of the predicted surface depths new_crop_min = max(new_crop_min, np.min(surface_depths)) # Offset minimum by at least 2m, to be sure we are capturing enough # surrounding content. new_crop_min -= max(2, 10 * depth_intv) new_crop_max = np.max(transect["depths"]) else: new_crop_min = np.min(transect["depths"]) # Restrict the bottom based on the observed bottom depths bottom_depths = output["depths"][ echofilter.utils.first_nonzero(output["p_is_below_bottom" + cs] > 0.5, -1) ] # Redact passive regions if len(bottom_depths[~is_passive]) > 0: bottom_depths = bottom_depths[~is_passive] # Find a good estimator of the maximum. Go for a robust estimate of # four sigma from the middle. pct = np.percentile(bottom_depths, [2.275, 97.725]) dev = (pct[1] - pct[0]) / 4 new_crop_max = pct[1] + 2 * dev # Don't go deeper than the maximum of the predicted bottom depths new_crop_max = min(new_crop_max, np.max(bottom_depths)) # Offset maximum by at least 2m, to be sure we are capturing enough # surrounding content. new_crop_max += max(2, 10 * depth_intv) if crop_min_depth is not None: new_crop_min = max(new_crop_min, crop_min_depth) if crop_max_depth is not None: new_crop_max = min(new_crop_max, crop_max_depth) current_height = abs(transect["depths"][-1] - transect["depths"][0]) new_height = abs(new_crop_max - new_crop_min) if (current_height - new_height) / current_height <= autocrop_threshold: # No need to crop return output if verbose >= 1: print( " Automatically zooming in on the {:.2f}m to {:.2f}m depth range" " and re-doing model inference.".format(new_crop_min, new_crop_max) ) # Crop and run again; and don't autocrop a second time! return inference_transect( model, timestamps, depths, signals, device, image_height, facing=facing, crop_min_depth=new_crop_min, crop_max_depth=new_crop_max, autocrop_threshold=1, # Don't crop again force_unconditioned=force_unconditioned, data_center=data_center, data_deviation=data_deviation, prenorm_nan_value=prenorm_nan_value, postnorm_nan_value=postnorm_nan_value, dtype=dtype, verbose=verbose, )
[docs]def import_lines_regions_to_ev( ev_fname, files, target_names=None, nearfield_depth=None, add_nearfield_line=True, lines_cutoff_at_nearfield=None, offsets=None, line_colors=None, line_thicknesses=None, ev_app=None, overwrite=False, common_notes="", verbose=1, ): """ Write lines and regions to EV file. Parameters ---------- ev_fname : str Path to Echoview file to import variables into. files : dict Mapping from output keys to filenames. target_names : dict, optional Mapping from output keys to output variable names. nearfield_depth : float, optional Depth at which nearfield line will be placed. By default, no nearfield line will be added, irrespective of ``add_nearfield_line``. add_nearfield_line : bool, default=True Whether to add a nearfield line. lines_cutoff_at_nearfield : list of str, optional Which lines (if any) should be clipped at the nearfield depth. By default, no lines will be clipped. offsets : dict, optional Amount of offset for each line. line_colors : dict, optional Mapping from output keys to line colours. line_thicknesses : dict, optional Mapping from output keys to line thicknesses. ev_app : win32com.client.Dispatch object, optional An object which can be used to interface with the Echoview application, as returned by :class:`win32com.client.Dispatch`. By default, a new instance of the application is opened (and closed on completion). overwrite : bool, default=False Whether existing lines with target names should be replaced. If a line with the target name already exists and ``overwrite=False``, the line is named with the current datetime to prevent collisions. common_notes : str, default="" Notes to include for every region. verbose : int, default=1 Verbosity level. """ if target_names is None: target_names = {} if lines_cutoff_at_nearfield is None: lines_cutoff_at_nearfield = {} if offsets is None: offsets = {} if line_colors is None: line_colors = {} if line_thicknesses is None: line_thicknesses = {} if verbose >= 2: print(f"Importing {len(files)} lines/regions into EV file {ev_fname}") # Assemble the color palette colors = get_color_palette(sort_colors=False) dtstr = datetime.datetime.now().isoformat(timespec="seconds") with echofilter.win.open_ev_file(ev_fname, ev_app) as ev_file: def change_line_color_thickness(line_name, color, thickness, ev_app=ev_app): if color is not None or thickness is not None: ev_app.Exec(f"{line_name} | UseDefaultLineDisplaySettings =| false") if color is not None: if color in colors: color = colors[color] elif not isinstance(color, str): pass elif "xkcd:" + color in colors: color = colors["xkcd:" + color] color = hexcolor2rgb8(color) color = repr(color).replace(" ", "") if verbose >= 4: print( f" Setting color of EV line {line_name} to {color}" " (color active when line has Good status)" ) ev_app.Exec(f"{line_name} | CustomGoodLineColor =| {color}") if thickness is not None: if verbose >= 4: print( f" Setting thickness of EV line {line_name} to {thickness}" ) ev_app.Exec(f"{line_name} | CustomLineDisplayThickness =| {thickness}") for key, fname in files.items(): # Check the file exists fname_full = os.path.abspath(fname) if not os.path.isfile(fname_full): s = f" Warning: File '{fname_full}' could not be found" s = echofilter.ui.style.warning_fmt(s) print(s) continue if os.path.splitext(fname)[1].lower() != ".evl": if verbose >= 3: print(f" Adding {key} to {ev_fname} from {fname}") # Import regions from the EVR file is_imported = ev_file.Import(fname_full) if not is_imported: s = ( f" Warning: Unable to import file '{fname_full}'" "\n Please consult Echoview for the Import error message." ) s = echofilter.ui.style.warning_fmt(s) print(s) continue if verbose >= 3: print(f" Adding {key} line to {ev_fname}") # Import the line into Python now. We might need it now for # clipping, or later on for offsetting. ts, depths, statuses = echofilter.raw.loader.evl_loader( fname_full, return_status=True ) line_status = echofilter.utils.mode(statuses) if nearfield_depth is None or key not in lines_cutoff_at_nearfield: # Import the original line straight into Echoview fname_loaded = fname_full is_imported = ev_file.Import(fname_loaded) else: if verbose >= 3: print( f" Clipping {key} line at nearfield depth {nearfield_depth}m" ) # Edit the line, clipping as necessary if key == "bottom": depths_clipped = np.minimum(depths, nearfield_depth) else: depths_clipped = np.maximum(depths, nearfield_depth) # Export the edited line to a temporary file with tempfile.TemporaryDirectory() as tmpdirname: temp_fname = os.path.join(tmpdirname, os.path.split(fname)[1]) echofilter.raw.loader.evl_writer( temp_fname, ts, depths_clipped, status=line_status ) # Import the edited line into the EV file fname_loaded = temp_fname is_imported = ev_file.Import(fname_loaded) if not is_imported: s = ( f" Warning: Unable to import file '{fname_loaded}'" "\n Please consult Echoview for the Import error message." ) s = echofilter.ui.style.warning_fmt(s) print(s) continue # Identify line just imported, which is the last variable variable = ev_file.Variables[ev_file.Variables.Count - 1] lines = ev_file.Lines line = lines.FindByName(variable.Name) if not line: s = ( " Warning: Could not find line which was just imported with" f" name '{variable.Name}'" "\n Ignoring and continuing processing." ) s = echofilter.ui.style.warning_fmt(s) print(s) continue # Check whether we need to change the name of the line target_name = target_names.get(key, None) # Check if a line with this name already exists old_line = None if target_name is None else lines.FindByName(target_name) # Maybe try to overwrite existing line with new line successful_overwrite = False if old_line and overwrite: # Overwrite the old line if verbose >= 2: print( echofilter.ui.style.overwrite_fmt( f"Overwriting existing line '{target_name}' with new" f" {key} line output" ) ) try: old_line_edit = old_line.AsLineEditable() if old_line_edit: # Overwrite the old line with the new line old_line_edit.OverwriteWith(line) # Delete the line we imported lines.Delete(line) # Update our reference to the line line = old_line successful_overwrite = True except Exception as err: if verbose >= 3: # Line is not editable s = ( f"An error occurred while trying to overwrite existing line '{target_name}':" f"\n{old_line_edit}" f"\n{err}" ) s = echofilter.ui.style.warning_fmt(s) print(s) if not successful_overwrite: try: # Just delete the old line (seems like a simpler solution) lines.Delete(old_line) successful_overwrite = True except Exception as err: if verbose >= 3: # Line could not be deleted s = ( f"An error occurred while trying to delete existing line '{target_name}':" f"\n{old_line}" f"\n{err}" ) s = echofilter.ui.style.warning_fmt(s) print(s) if not successful_overwrite and verbose >= 0: if verbose >= 0: # Line is not editable s = f"Existing line '{target_name}' cannot be overwritten." s = echofilter.ui.style.warning_fmt(s) print(s) if old_line and not successful_overwrite: # Change the name so there is no collision target_name += "_{}".format(dtstr) if verbose >= 1: print( f"Target line name '{target_names[key]}' already exists." f" Will save new {key} line with name '{target_name}' instead." ) if target_name and not successful_overwrite: # Rename the line variable.ShortName = target_name line.Name = target_name if verbose >= 2: print(f" Added offset {key} line '{line.Name}'") # Change the color and thickness of the line change_line_color_thickness( line.Name, line_colors.get(key), line_thicknesses.get(key) ) # Add notes to the line notes = key.title() + " line" if len(common_notes) > 0: notes += "\n" + common_notes ev_app.Exec( "{} | Notes =| {}".format(line.Name, notes.replace("\n", "\r\n")) ) # Handle offset line ---------------------------------------------- if key not in offsets: continue # Remember references to original line original_line = line original_target_name = target_name # Generate an offset line offset = offsets[key] if verbose >= 3: print(f" Adding {offset}m offset {key} line to {ev_fname}") if key == "bottom": # Offset is upward for bottom line, downward otherwise offset = -offset depths_offset = depths + offset if nearfield_depth is None or key not in lines_cutoff_at_nearfield: pass elif key == "bottom": depths_offset = np.minimum(depths_offset, nearfield_depth) else: depths_offset = np.maximum(depths_offset, nearfield_depth) # Export the edited line to a temporary file with tempfile.TemporaryDirectory() as tmpdirname: fname_noext, ext = os.path.splitext(fname) temp_fname = os.path.join( tmpdirname, os.path.split(fname_noext)[1] + "_offset" + ext, ) echofilter.raw.loader.evl_writer( temp_fname, ts, depths_offset, status=line_status, ) # Import the edited line into the EV file is_imported = ev_file.Import(temp_fname) if not is_imported: s = ( f" Warning: Unable to import file '{temp_fname}'" "\n Please consult Echoview for the Import error message." ) s = echofilter.ui.style.warning_fmt(s) print(s) continue # Identify line just imported, which is the last variable variable = ev_file.Variables[ev_file.Variables.Count - 1] line = lines.FindByName(variable.Name) # Reference to new, offset, line if not line: s = ( " Warning: Could not find line which was just imported with" f" name '{variable.Name}'" "\n Ignoring and continuing processing." ) s = echofilter.ui.style.warning_fmt(s) print(s) continue # Work out what we should call our new line target_name = target_names.get(key + "_offset") if target_name: pass elif not original_target_name: pass elif key in original_line.Name: target_name = original_line.Name.replace(key, key + "-offset", 1) elif key in target_names.get(key): target_name = target_names[key].replace(key, key + "-offset", 1) else: target_name = original_line.Name + "-offset" # Check if a line with this name already exists old_line = None if target_name is None else lines.FindByName(target_name) # Maybe try to overwrite existing line with new line successful_overwrite = False if old_line and overwrite: # Overwrite the old line if verbose >= 2: print( echofilter.ui.style.overwrite_fmt( f"Overwriting existing line '{target_name}' with new" f" {key} line output" ) ) try: old_line_edit = old_line.AsLineEditable() if old_line_edit: # Overwrite the old line with the new line old_line_edit.OverwriteWith(line) # Delete the line we imported lines.Delete(line) # Update our reference to the line line = old_line successful_overwrite = True except Exception as err: if verbose >= 3: # Line is not editable s = ( f"An error occurred while trying to overwrite existing line '{target_name}':" f"\n{old_line_edit}" f"\n{err}" ) s = echofilter.ui.style.warning_fmt(s) print(s) if not successful_overwrite: try: # Just delete the old line (seems like a simpler solution) lines.Delete(old_line) successful_overwrite = True except Exception as err: if verbose >= 3: # Line could not be deleted s = ( f"An error occurred while trying to delete existing line '{target_name}':" f"\n{old_line}" f"\n{err}" ) s = echofilter.ui.style.warning_fmt(s) print(s) if not successful_overwrite and verbose >= 0: if verbose >= 0: # Line is not editable s = f"Existing line '{target_name}' cannot be overwritten." s = echofilter.ui.style.warning_fmt(s) print(s) if old_line and not successful_overwrite: # Change the name so there is no collision target_name += "_{}".format(dtstr) if verbose >= 1: print( f"Target line name '{target_names[key]}' already exists." f" Will save new {key} line with name '{target_name}' instead." ) if target_name and not successful_overwrite: # Rename the line variable.ShortName = target_name line.Name = target_name if verbose >= 2: print(f" Added offset {key} line '{line.Name}'") # Change the color and thickness of the line change_line_color_thickness( line.Name, line_colors.get(key + "_offset", line_colors.get(key)), line_thicknesses.get(key + "_offset", line_thicknesses.get(key)), ) # Add notes to the line notes = "{} line\nOffset: {:+g}m".format(key.title(), offset) if len(common_notes) > 0: notes += "\n" + common_notes ev_app.Exec( "{} | Notes =| {}".format(line.Name, notes.replace("\n", "\r\n")) ) # Add nearfield line if nearfield_depth is not None and add_nearfield_line: key = "nearfield" if verbose >= 3: print(f" Adding nearfield line at fixed depth {nearfield_depth}") lines = ev_file.Lines line = lines.CreateFixedDepth(nearfield_depth) # Check whether we need to change the name of the line target_name = target_names.get(key, None) # Check if a line with this name already exists old_line = None if target_name is None else lines.FindByName(target_name) # Maybe try to delete existing line successful_overwrite = False if old_line and overwrite: # Overwrite the old line if verbose >= 2: print( echofilter.ui.style.overwrite_fmt( f"Deleting existing line '{target_name}'" ) ) successful_overwrite = lines.Delete(old_line) if not successful_overwrite and verbose >= 0: # Line could not be deleted print( echofilter.ui.style.warning_fmt( f"Existing line '{target_name}' could not be deleted" ) ) if old_line and not successful_overwrite: # Change the output name so there is no collision target_name += "_{}".format(dtstr) if verbose >= 1: print( f"Target line name '{target_names[key]}' already exists." f" Will save new {key} line with name '{target_name}' instead." ) if target_name: # Rename the line line.Name = target_name if verbose >= 2: print( f" Added nearfield line '{line.Name}' at fixed depth {nearfield_depth}m" ) # Change the color and thickness of the line change_line_color_thickness( line.Name, line_colors.get(key), line_thicknesses.get(key) ) # Add notes to the line notes = key.title() + " line" if len(common_notes) > 0: notes += "\n" + common_notes ev_app.Exec( "{} | Notes =| {}".format(line.Name, notes.replace("\n", "\r\n")) ) # Overwrite the EV file now the outputs have been imported ev_file.Save()
[docs]def get_color_palette(include_xkcd=True, sort_colors=True): """ Provide a mapping of named colors from matplotlib. Parameters ---------- include_xkcd : bool, default=True Whether to include the XKCD color palette in the output. Note that XKCD colors have ``"xkcd:"`` prepended to their names to prevent collisions with official named colors from CSS4. See https://xkcd.com/color/rgb/ and https://blog.xkcd.com/2010/05/03/color-survey-results/ for the XKCD colors. sort_colors : bool, default=True Whether to sort the colors by hue. Otherwise the colors are grouped together by source, and maintain their default ordering (alphabetized). Returns ------- colors : dict Mapping from names of colors as strings to color value, either as an RGB tuple (fractional, 0 to 1 range) or a hexadecimal string. """ colors = dict(mcolors.BASE_COLORS, **mcolors.CSS4_COLORS) if include_xkcd: colors.update(**mcolors.XKCD_COLORS) if not sort_colors: return colors # Sort colors by hue, saturation, value and name by_hsv = sorted( (tuple(mcolors.rgb_to_hsv(mcolors.to_rgb(color))), name) for name, color in colors.items() ) names = [name for hsv, name in by_hsv] colors = {name: colors[name] for name in names} return colors
[docs]def hexcolor2rgb8(color): """ Map hexadecimal colors to uint8 RGB. Parameters ---------- color : str A hexadecimal color string, with leading ``"#"``. If the input is not a string beginning with ``"#"``, it is returned as-is without raising an error. Returns ------- tuple RGB color tuple, in uint8 format (0--255). """ if isinstance(color, str) and color[0] == "#": color = mcolors.to_rgba(color)[:3] if ( isinstance(color, tuple) and any(isinstance(c, float) for c in color) or all(c <= 1 for c in color) ): color = tuple(max(0, min(255, int(np.round(c * 255)))) for c in color) return color
if __name__ == "__main__": main()