Source code for das.event_utils

"""Utilities for handling events."""
import numpy as np
import peakutils
from typing import Iterable, Optional, List, Tuple, Union


[docs]def find_nearest(array, values): """Find nearest occurrence of each item of values in array. Args: array: find nearest in this list values: queries Returns: val: nearest val in array to each item in values idx: index of nearest val in array to each item in values dist: distance to nearest val in array for each item in values NOTE: Returns nan-arrays of the same size as values if `array` is empty. """ if len(values) and len(array): # only do this if both inputs are non-empty lists values = np.atleast_1d(values) abs_dist = np.abs(np.int64(np.subtract.outer(array, values))) idx = abs_dist.argmin(0) dist = abs_dist.min(0) val = array[idx] else: idx = np.full_like(values, fill_value=np.nan) dist = np.full_like(values, fill_value=np.nan) val = np.full_like(values, fill_value=np.nan) return val, idx, dist
[docs]def detect_events( event_probability: np.ndarray, thres: float = 0.5, min_dist: int = 100, index: int = 0 ) -> Tuple[np.ndarray, np.ndarray]: """Detect events as peaks in probabilitiy. Args: event_probability ([np.ndarray]): [T, nb_classes] thres (float, optional): [description]. Defaults to 0.5. min_dist (int, optional): [description]. Defaults to 100 samples. index: (int, Optional): List of indices into axis 1 for which to compute the labels. Defaults to None (use all indices). Returns: event_indices: index of each detected event event_confidence: event_probability at the event_index """ event_indices = peakutils.indexes(event_probability[:, index], thres=thres, min_dist=min_dist, thres_abs=True) if len(event_indices): # guard against empty event_indices event_confidence = event_probability[event_indices, index] else: event_confidence = [] return event_indices, event_confidence
[docs]def match_events(eventindices_true, eventindices_pred, tol=100): """Find events eventindices_pred that match those (within tol) in eventindices_true. Args: eventindices_true: list of reference event indices eventindices_pred: list of detected event indices tol: n samples within which events are deemed identical Returns: nearest_event: masked array copy of eventindices_pred, mask=True indicates entries in pred not closest within tol in true nearest_dist: dist of each eventindices_pred to the nearest true_event """ nearest_dist = np.zeros_like(eventindices_pred) nearest_event = np.zeros_like(eventindices_pred) # find nearest true event for each predicted event _, nearest_event, nearest_dist = find_nearest(eventindices_true, eventindices_pred) nearest_event = nearest_event.astype(float) # flag those that have no nearby event nearest_event = np.ma.masked_array(nearest_event, mask=nearest_dist > tol) if len(eventindices_true) == 0: nearest_event.mask = True # all detections are false positives else: # flag doublettes - keep only nearest for idx in np.unique(nearest_event[nearest_event >= 0]): hits = np.where(nearest_event == idx)[0] if len(hits) > 1: nearest = np.argmin(nearest_dist[nearest_event == idx]) # find closest hit hits = np.delete(hits, nearest) nearest_event.mask[hits] = True return nearest_event.astype(np.uintp), nearest_dist
[docs]def event_interval_filter(events: Iterable, event_dist_min: float = 0, event_dist_max: float = np.inf) -> Iterable: """[summary] Args: events (Iterable): Iterable (list, np.array) of event times in seconds. event_dist_min (float, optional): [description]. Defaults to 0 second. event_dist_max (float, optional): [description]. Defaults to np.inf seconds. Returns: good_event_indices (Iterable): indices of events to keep `events[good_event_indices]`. """ ipi_pre = np.diff(events, prepend=np.inf) ipi_post = np.diff(events, append=np.inf) ipi_too_long = np.logical_and(ipi_pre > event_dist_max, ipi_post > event_dist_max) ipi_too_short = np.logical_or(ipi_pre < event_dist_min, ipi_post < event_dist_min) if len(ipi_too_short): ipi_too_short[0] = False # otherwise first event will always be removed good_event_indices = ~np.logical_or(ipi_too_long, ipi_too_short) return good_event_indices
[docs]def evaluate_eventtimes(eventtimes_true, eventtimes_pred, samplerate, tol=0.01): """[summary] Args: eventtimes_true ([type]): in seconds eventtimes_pred ([type]): in seconds samplerate (float): in Hz tol (int, optional): in seconds [description]. Defaults to 0.01 seconds. Returns: [type]: [description] """ # match_events works with indices - so need to convert eventtimes to indices and distance back to times eventindices_true = eventtimes_true * samplerate eventindices_pred = eventtimes_pred * samplerate nearest_pred_event, nearest_dist_indices = match_events(eventindices_true, eventindices_pred, tol * samplerate) nearest_true_event, _ = match_events(eventindices_pred, eventindices_true, tol * samplerate) nearest_dist = nearest_dist_indices / samplerate # convert back to seconds d = dict() # pred events that have no nearby true event (or there is another pred event nearer to the true event) d["FP"] = np.sum(nearest_pred_event.mask) d["TP"] = len(nearest_pred_event[np.isfinite(nearest_dist)].compressed()) d["FN"] = max(0, np.sum(nearest_true_event.mask)) if d["FP"] == 0: # if there are no false positives (even for no detections), then precision is 1.0 d["precision"] = 1.0 elif (d["TP"] + d["FP"]) == 0: d["precision"] = 0.0 else: d["precision"] = d["TP"] / (d["TP"] + d["FP"]) if d["FN"] == 0: # if there are no false negatives (even for no detections), the recall is 1.0 (?!) d["recall"] = 1.0 elif d["TP"] + d["FN"] == 0: d["recall"] = 0 else: d["recall"] = d["TP"] / (d["TP"] + d["FN"]) if (d["precision"] + d["recall"]) == 0: d["f1_score"] = 0 else: d["f1_score"] = 2 * (d["precision"] * d["recall"]) / (d["precision"] + d["recall"]) return d, nearest_pred_event, nearest_true_event, nearest_dist