Source code for das.predict_oom

"""Code for training and evaluating networks."""
import logging
import os
import scipy
import flammkuchen
import numpy as np
from . import utils, data, models, event_utils, segment_utils, annot
from typing import List, Optional, Dict, Any, Sequence, Union
import glob
import tensorflow
import zarr
from tqdm.autonotebook import tqdm
import dask
from dask.diagnostics import ProgressBar


[docs]def predict_probabilities(x: np.ndarray, model: tensorflow.keras.Model, params: Dict[str, Any], verbose: Optional[int] = 1, prepend_data_padding: bool = True): """[summary] Args: x ([samples, ...]): [description] model (tf.keras.Model): [description] params ([type]): [description] verbose (int, optional): Verbose level for predict() (see keras docs). Defaults to 1. prepend_data_padding (bool, optional): Restores samples that are ignored in the beginning of the first and the end of the last chunk because of "ignore_boundaries". Defaults to True. Returns: y_pred - output of network for each sample [samples, nb_classes] """ pred_gen = data.AudioSequence(x=x, y=None, shuffle=False, **params) # prep data nb_batches = len(pred_gen) verbose = 1 prepend_data_padding = True temp_store = zarr.storage.TempStore() class_probabilities = zarr.zeros(shape=(0, params['nb_classes']), chunks=(100_000, params['nb_classes']), dtype='float', store=temp_store, overwrite=True) # predict and unpack batch by batch to memmaped np or zarr array for batch_number, batch_data in tqdm(enumerate(pred_gen), total=nb_batches, disable=verbose < 1): y_pred_batch = model.predict_on_batch(batch_data) # run the network # reshape from [batches, nb_hist, ...] to [time, ...] y_pred_unpacked_batch = data.unpack_batches(y_pred_batch, pred_gen.data_padding) if prepend_data_padding: pad_width = None if batch_number == 0: # to account for loss of samples at the first and last chunks pad_width = ((params['data_padding'], 0), (0, 0)) elif batch_number == nb_batches - 1: pad_width = ((0, params['data_padding']), (0, 0)) if pad_width is not None: y_pred_unpacked_batch = np.pad(y_pred_unpacked_batch, pad_width=pad_width, mode='constant', constant_values=0) class_probabilities.append(y_pred_unpacked_batch, axis=0) class_probabilities = dask.array.from_zarr(class_probabilities, inline_array=True) class_probabilities = class_probabilities.rechunk((1_000_000, params['nb_classes'])) return class_probabilities
[docs]def labels_from_probabilities( probabilities, threshold: Optional[float] = None, indices: Optional[Sequence[int]] = None) -> np.ndarray: """Convert class-wise probabilities into labels. Args: probabilities ([type]): [samples, classes] or [samples, ] threshold (float, Optional): Argmax over all classes (Default, 2D - corresponds to 1/nb_classes or 0.5 if 1D). If float, each class probability is compared to the threshold. First class to cross threshold wins. If no class crosses threshold label will default to the first class. indices: (List[int], Optional): List of indices into axis 1 for which to compute the labels. Defaults to None (use all indices). Returns: labels [samples,] - index of "winning" dimension for each sample """ if indices is None: indices = slice(None) # equivalent to ":" if probabilities.ndim == 1: if threshold is None: threshold = 0.5 labels = (probabilities[:, indices] > threshold).astype(np.intp) elif probabilities.ndim == 2: if threshold is None: labels = np.argmax(probabilities[:, indices], axis=1) else: thresholded_probabilities = probabilities[:, indices].copy() thresholded_probabilities[thresholded_probabilities < threshold] = 0 labels = np.argmax(thresholded_probabilities > threshold, axis=1) else: raise ValueError( f'Probabilities have to many dimensions ({probabilities.ndim}). Can only be 1D or 2D.' ) return labels
[docs]def predict_segments(class_probabilities: np.ndarray, samplerate: float = 1.0, segment_dims: Optional[Sequence[int]] = None, segment_names: Optional[Sequence[str]] = None, segment_ref_onsets: Optional[List[float]] = None, segment_ref_offsets: Optional[List[float]] = None, segment_thres: float = 0.5, segment_minlen: Optional[float] = None, segment_fillgap: Optional[float] = None, segment_labels_by_majority: bool = True) -> Dict: """[summary] TODO: document different approaches for single-type vs. multi-type segment detection Args: class_probabilities ([type]): [T, nb_classes] with probabilities for each class and sample or [T,] with integer entries as class labels samplerate (float, optional): Hz. Defaults to 1.0. segment_dims (Optional[List[int]], optional): set of indices into class_probabilities corresponding to segment-like song types. Needs to include the noise dim. Required to ignore event-like song types. Defaults to None (all classes are considered segment-like). segment_names (Optional[List[str]], optional): Names for segment-like classes. Defaults to None (use indices of segment-like classes). segment_ref_onsets (Optional[List[float]], optional): Syllable onsets (in seconds) to use for estimating labels. Defaults to None (will use onsets est from class_probabilitieslabels as ref). segment_ref_offsets (Optional[List[float]], optional): [description]. Syllable offsets (in seconds) to use for estimating labels. Defaults to None (will use offsets est from class_probabilitieslabels as ref). segment_thres (float, optional): [description]. Defaults to 0.5. segment_minlen (Optional[float], optional): seconds. Defaults to None. segment_fillgap (Optional[float], optional): seconds. Defaults to None. segment_labels_by_majority (bool, optional): Segment labels given by majority of label values within on- and offsets. Defaults to True. Returns: dict['segmentnames']['denselabels-samples'/'onsets'/'offsets'/'probabilities'] """ probs_are_labels = class_probabilities.ndim == 1 if segment_dims is None: if not probs_are_labels: # class_probabilities is [T, nb_classes] nb_classes = class_probabilities.shape[1] else: # class_probabilities is [T,] with integer entries as class labels nb_classes = int(dask.array.max(class_probabilities).compute()) segment_dims = list(range(nb_classes)) if segment_names is None: segment_names = [str(sd) for sd in segment_dims] segments: Dict[str, Any] = dict() if len(segment_dims): segments['samplerate_Hz'] = samplerate segments['index'] = segment_dims segments['names'] = segment_names if not probs_are_labels: # prob = class_probabilities[:, segment_dims] # segments['probabilities'] = prob segments['probabilities'] = None labels = dask.array.map_blocks(labels_from_probabilities, class_probabilities, segment_thres, segment_dims, dtype=np.intp, drop_axis=1) else: segments['probabilities'] = None labels = class_probabilities # turn into song (0), no song (1) sequence to detect onsets (0->1) and offsets (1->0) song_binary = (labels > 0).astype(np.int8) if segment_fillgap is not None: song_binary = dask.array.map_overlap( segment_utils.fill_gaps, song_binary, gap_dur=int(segment_fillgap * samplerate), depth=(int(segment_fillgap * samplerate + 1), 0), boundary=None, trim=True, align_arrays=True) if segment_minlen is not None: song_binary = dask.array.map_overlap( segment_utils.remove_short, song_binary, min_len=int(segment_minlen * samplerate), depth=(int(segment_minlen * samplerate + 1), 0), boundary=None, trim=True, align_arrays=True) # detect syllable on- and offsets # pre- and post-pend 0 so we detect on and offsets at boundaries onsets = dask.array.where( dask.array.diff(song_binary, prepend=0) == 1)[0] offsets = dask.array.where( dask.array.diff(song_binary, append=0) == -1)[0] logging.info(" Detecting syllable on and offsets:") with ProgressBar(): onsets, offsets = dask.array.compute(onsets, offsets) segments['onsets_seconds'] = onsets.astype(float) / samplerate segments['offsets_seconds'] = offsets.astype(float) / samplerate # there is just a single segment type plus noise - in that case we use the gap-filled, short-deleted pred if len(segment_dims) == 2: labels = song_binary # syllable-type for each syllable as int segments['sequence'] = [str(segment_names[1])] * len(segments['offsets_seconds']) # if >1 segment type (plus noise) label sylls by majority vote on un-smoothed labels elif len(segment_dims) > 2 and segment_labels_by_majority: # if no refs provided, use use on/offsets from smoothed labels if segment_ref_onsets is None: segment_ref_onsets = segments['onsets_seconds'] if segment_ref_offsets is None: segment_ref_offsets = segments['offsets_seconds'] logging.info(" Labeling by majority:") if len(segment_dims) < np.iinfo('uint8').max: cast_to = np.uint8 elif len(segment_dims) < np.iinfo('uint16').max: cast_to = np.uint16 elif len(segment_dims) < np.iinfo('uint32').max: cast_to = np.uint32 else: cast_to = None if cast_to is not None: logging.info(f" Casting labels to {cast_to}:") labels = dask.array.map_blocks(lambda x: x.astype(cast_to), labels, dtype=np.int16) with ProgressBar(): labels = labels.compute() sequence, labels = segment_utils.label_syllables_by_majority(labels, segment_ref_onsets, segment_ref_offsets, samplerate) # syllable-type for each syllable as int segments['sequence'] = sequence else: segments['sequence'] = None segments['samples'] = labels return segments
def _detect_events_oom(event_probability: np.ndarray, thres: float = 0.70, min_dist: int = 100, index: int = 0, block_info=None, pad=0) -> np.ndarray: """Wrapper around detect_events that returns 2D np.ndarray for use in predict_oom.""" event_indices, event_confidence = event_utils.detect_events( event_probability, thres, min_dist, index) if block_info is not None: # add offset from position of chunk in chunk_start_index = block_info[None]['array-location'][0][0] chunk_end_index = block_info[None]['array-location'][0][1] event_indices += chunk_start_index # correct for overlap in all but the first chunk event_indices -= pad # delete detections in overlapping samples good_events = np.logical_and(event_indices >= chunk_start_index, event_indices < chunk_end_index) if len(good_events): event_confidence = event_confidence[good_events] event_indices = event_indices[good_events] else: event_confidence = [] event_indices = [] return np.stack((event_indices, event_confidence), axis=1)
[docs]def predict_events(class_probabilities: np.ndarray, samplerate: float = 1.0, event_dims: Sequence[int] = None, event_names: Sequence[str] = None, event_thres: float = 0.5, events_offset: float = 0, event_dist: float = 100, event_dist_min: float = 0, event_dist_max: float = np.inf) -> Dict[str, Any]: """[summary] Args: class_probabilities (np.ndarray): [samples, classes][description] samplerate (float, optional): Hz event_dims (List[int], optional): [description]. Defaults to np.arange(1, nb_classes). event_names ([type], optional): [description]. Defaults to event_dims. event_thres (float, optional): [description]. Defaults to 0.5. events_offset (float, optional): . Defaults to 0 seconds. event_dist (float, optional): minimal distance between events for detection (in seconds). Defaults to 100 seconds. event_dist_min (float, optional): minimal distance to nearest event for post detection interval filter (in seconds). Defaults to 0 seconds. event_dist_max (float, optional): maximal distance to nearest event for post detection interval filter (in seconds). Defaults to None (no upper limit). Raises: ValueError: [description] Returns: Dict[str, Any] """ if event_dims is None: nb_classes = class_probabilities.shape[1] event_dims = np.arange(1, nb_classes) if event_names is None: event_names = event_dims if event_dist_max is None: event_dist_max = np.inf #class_probabilities.shape[0] + 1 events: Dict[str, Any] = dict() if len(event_dims): events['samplerate_Hz'] = samplerate events['index'] = event_dims events['names'] = event_names events['seconds'] = [] events['probabilities'] = [] events['sequence'] = [] pad = int(event_dist * samplerate + 1) for event_dim, event_name in zip(event_dims, event_names): event_indices_and_probs = dask.array.map_overlap( _detect_events_oom, class_probabilities, thres=event_thres, min_dist=event_dist * samplerate, index=event_dim, pad=pad, depth=(pad, 0), boundary="none", trim=False, dtype=int, meta=np.array(())) with ProgressBar(): event_indices_and_probs = dask.array.compute( event_indices_and_probs) event_indices, event_probabilities = event_indices_and_probs[ 0][:, 0], event_indices_and_probs[0][:, 1] events_seconds = event_indices.astype(float) / samplerate events_seconds += events_offset good_event_indices = event_utils.event_interval_filter( events_seconds, event_dist_min, event_dist_max) events['seconds'].extend(events_seconds[good_event_indices]) events['probabilities'].extend( event_probabilities[good_event_indices]) events['sequence'].extend( [event_name for _ in events_seconds[good_event_indices]]) return events
def predict_song(class_probabilities: np.ndarray, params: dict[str, Any], event_thres: float = 0.5, event_dist: float = 0.01, event_dist_min: float = 0, event_dist_max: float = np.inf, segment_ref_onsets: Optional[List[float]] = None, segment_ref_offsets: Optional[List[float]] = None, segment_thres: float = 0.5, segment_minlen: float = None, segment_fillgap: float = None): samplerate = params['samplerate_x_Hz'] events_offset = 0 segment_dims = np.where( [val == 'segment' for val in params['class_types']])[0] segment_names = [ str(params['class_names'][segment_dim]) for segment_dim in segment_dims ] segments = predict_segments(class_probabilities, samplerate, segment_dims, segment_names, segment_ref_onsets, segment_ref_offsets, segment_thres, segment_minlen, segment_fillgap) event_dims = np.where([val == 'event' for val in params['class_types']])[0] event_names = [ str(params['class_names'][event_dim]) for event_dim in event_dims ] events = predict_events(class_probabilities, samplerate, event_dims, event_names, event_thres, events_offset, event_dist, event_dist_min, event_dist_max) return events, segments
[docs]def predict(x: np.ndarray, model_save_name: str = None, verbose: int = 1, batch_size: int = None, model: models.keras.models.Model = None, params: dict = None, event_thres: float = 0.5, event_dist: float = 0.01, event_dist_min: float = 0, event_dist_max: float = None, segment_thres: float = 0.5, segment_use_optimized: bool = True, segment_minlen: float = None, segment_fillgap: float = None, pad: bool = True, prepend_data_padding: bool = True): """[summary] Usage: Calling predict with the path to the model will load the model and the associated params and run inference: `das.predict.predict(x=data, model_save_name='tata')` To re-use the same model with multiple recordings, load the modal and params once and pass them to `predict` ```my_model, my_params = das.utils.load_model_and_params(model_save_name) for data in data_list: das.predict.predict(x=data, model=my_model, params=my_params) ``` Args: x (np.array): Audio data [samples, channels] model_save_name (str): path with the trunk name of the model. Defaults to None. model (keras.model.Models): Defaults to None. params (dict): Defaults to None. verbose (int): display progress bar during prediction. Defaults to 1. batch_size (int): number of chunks processed at once . Defaults to None (the default used during training). Larger batches lead to faster inference. Limited by memory size, in particular for GPUs which typically have 8GB. Large batch sizes lead to loss of samples since only complete batches are used. pad (bool): Append zeros to fill up batch. Otherwise the end can be cut. Defaults to False event_thres (float): Confidence threshold for detecting peaks. Range 0..1. Defaults to 0.5. event_dist (float): Minimal distance between adjacent events during thresholding. Prevents detecting duplicate events when the confidence trace is a little noisy. Defaults to 0.01. event_dist_min (float): MINimal inter-event interval for the event filter run during post processing. Defaults to 0. event_dist_max (float): MAXimal inter-event interval for the event filter run during post processing. Defaults to None (no upper limit). segment_thres (float): Confidence threshold for detecting segments. Range 0..1. Defaults to 0.5. segment_use_optimized (bool): Use minlen and fillgap values from param file if they exist. If segment_minlen and segment_fillgap are provided, then they will override the values from the param file. Defaults to True. segment_minlen (float): Minimal duration in seconds of a segment used for filtering out spurious detections. Defaults to None. segment_fillgap (float): Gap in seconds between adjacent segments to be filled. Useful for correcting brief lapses. Defaults to None. pad (bool): prepend values (repeat last sample value) to fill the last batch. Otherwise, the end of the data will not be annotated because the last, non-full batch will be skipped. prepend_data_padding (bool, optional): Restores samples that are ignored in the beginning of the first and the end of the last chunk because of "ignore_boundaries". Defaults to True. Raises: ValueError: [description] Returns: events: [description] segments: [description] class_probabilities (np.array): [T, nb_classes] class_names (List[str]): [nb_classes] """ if model_save_name is not None: model, params = utils.load_model_and_params(model_save_name) else: assert isinstance(model, models.keras.models.Model) assert isinstance(params, dict) # use postprocessing values from params and/or args if segment_use_optimized and 'post_opt' in params and isinstance( params['post_opt'], dict): if segment_minlen is None: segment_minlen = params['post_opt']['min_len'] if segment_fillgap is None: segment_fillgap = params['post_opt']['gap_dur'] if batch_size is not None: params['batch_size'] = batch_size if pad: # figure out length in multiples of batches batch_len = params['batch_size'] * params['nb_hist'] + params['nb_hist'] x_len_original = len(x) pad_len = 0 if np.remainder(len(x), batch_len) > 0: pad_len = batch_len - np.remainder(len(x), batch_len) # pad with end val to fill x = np.pad(x, ((0, pad_len), (0, 0)), mode='edge') class_probabilities = predict_probabilities(x, model, params, verbose, prepend_data_padding) if pad: # trim probs to original len of x class_probabilities = class_probabilities[:x_len_original, :] # set all song probs in padded section to zero to avoid out of bounds detections! # assumes that the non-song class is at index 0 # class_probabilities[-pad_len:, 1:] = 0 # class_probabilities[-pad_len:, 0] = 1 events, segments = predict_song(class_probabilities=class_probabilities, params=params, event_thres=event_thres, event_dist=event_dist, event_dist_min=event_dist_min, event_dist_max=event_dist_max, segment_ref_onsets=None, segment_ref_offsets=None, segment_thres=segment_thres, segment_minlen=segment_minlen, segment_fillgap=segment_fillgap) return events, segments, class_probabilities, params['class_names']
[docs]def cli_predict(path: str, model_save_name: str, *, save_filename: Optional[str] = None, save_format: str = 'csv', verbose: int = 1, batch_size: Optional[int] = None, event_thres: float = 0.5, event_dist: float = 0.01, event_dist_min: float = 0, event_dist_max: Optional[float] = None, segment_thres: float = 0.5, segment_use_optimized: Optional[bool] = None, segment_minlen: Optional[float] = None, segment_fillgap: Optional[float] = None, resample: bool = True): """Predict song labels for a wav file or a folder of wav files. Saves hdf5 files with keys: events, segments, class_probabilities OR csv files with columns: label/start_seconds/stop_seconds Args: path (str): Path to a single WAV file with the audio data or to a folder with WAV files. model_save_name (str): Stem of the path for the model (and parameters). File to load will be MODEL_SAVE_NAME + _model.h5. save_filename (Optional[str]): Path to save annotations to. If omitted, will construct save_filename by stripping the extension from recording_filename and adding '_das.h5' or '_annotations.csv'. Will be ignored if `path` is a folder. save_format (str): 'csv' or 'h5'. csv: tabular text file with label, start and end seconds for each predicted song. h5: same information as in csv plus confidence values for each sample and song type. Defaults to 'csv'. verbose (int): Display progress bar during prediction. Defaults to 1. batch_size (Optional[int]): Number of chunks processed at once. Defaults to None (the default used during training). event_thres (float): Confidence threshold for detecting events. Range 0..1. Defaults to 0.5. event_dist (float): Minimal distance between adjacent events during thresholding. Prevents detecting duplicate events when the confidence trace is a little noisy. Defaults to 0.01. event_dist_min (float): MINimal inter-event interval for the event filter run during post processing. Defaults to 0. event_dist_max (Optional[float]): MAXimal inter-event interval for the event filter run during post processing. Defaults to None (no upper limit). segment_thres (float): Confidence threshold for detecting segments. Range 0..1. Defaults to 0.5. segment_use_optimized (Optional[bool]): Use minlen and fillgap values from param file if they exist. If segment_minlen and segment_fillgap are provided, then they will override the values from the param file. Defaults to True. segment_minlen (Optional[float]): Minimal duration of a segment used for filtering out spurious detections. Defaults to None (keep all segments). segment_fillgap (Optional[float]): Gap between adjacent segments to be filled. Useful for correcting brief lapses. Defaults to None (do not fill gaps). resample (bool): Resample audio data to the rate expected by the model. Defaults to True. Raises: ValueError on unknown save_format """ if not (save_format == 'csv' or save_format == 'h5'): raise ValueError( f"Unknown save_format '{save_format}'. Should be either 'csv' or 'h5'." ) if os.path.isdir(path) and save_filename is not None: logging.warning( f'{path} is a folder. Will ignore save_filename argument {save_filename}.' ) # if path is folder: glob contents - all files if os.path.isdir(path): filenames = glob.glob(f'{path}/*.wav') filenames = [ filename for filename in filenames if not os.path.isdir(filename) ] elif os.path.isfile(path): filenames = [path] logging.info(f"Loading model from {model_save_name}.") model, params = utils.load_model_and_params(model_save_name) fs_model = params['samplerate_x_Hz'] for recording_filename in filenames: logging.info(f" Loading data from {recording_filename}.") try: # else if path is file - predict only on file but make it single-item list fs_audio, x = scipy.io.wavfile.read(recording_filename) if x.ndim == 1: x = x[:, np.newaxis] if resample and fs_audio != fs_model: logging.info( f" Resampling. Audio rate is {fs_audio}Hz but model was trained on data with {fs_model}Hz." ) x = utils.resample(x, fs_audio, fs_model) logging.info(f" Annotating using model at {model_save_name}.") # TODO: load model once, provide as direct arg events, segments, class_probabilities, class_names = predict( x, None, verbose, batch_size, model, params, event_thres, event_dist, event_dist_min, event_dist_max, segment_thres, segment_use_optimized, segment_minlen, segment_fillgap) if 'event' in params["class_types"]: logging.info( f" found {len(events['seconds'])} instances of events '{list(set(events['sequence']))}'." ) if 'segment' in params["class_types"]: logging.info( f" found {len(segments['onsets_seconds'])} instances of segments '{list(set(segments['sequence']))}'." ) if save_format == 'h5': # turn events and segments into df! d = { 'events': events, 'segments': segments, 'class_probabilities': class_probabilities, 'class_names': class_names } if save_filename is None: save_filename = os.path.splitext( recording_filename)[0] + '_das.h5' logging.info(f" Saving results to {save_filename}.") flammkuchen.save(save_filename, d) logging.info(f"Done.") elif save_format == 'csv': evt = annot.Events.from_predict(events, segments) if save_filename is None: save_filename = os.path.splitext( recording_filename)[0] + '_annotations.csv' logging.info(f" Saving results to {save_filename}.") evt.to_df().to_csv(save_filename) logging.info(f"Done.") # reset if os.path.isdir(path): save_filename = None except Exception: logging.exception(f'Error processing file {recording_filename}.')