import copy import numpy as np from wfdb.processing.basic import smooth def find_peaks(sig): """ Find hard peaks and soft peaks in a signal, defined as follows: - Hard peak: a peak that is either /\ or \/. - Soft peak: a peak that is either /-*\ or \-*/. In this case we define the middle as the peak. Parameters ---------- sig : np array The 1d signal array. Returns ------- hard_peaks : ndarray Array containing the indices of the hard peaks. soft_peaks : ndarray Array containing the indices of the soft peaks. """ if len(sig) == 0: return np.empty([0]), np.empty([0]) tmp = sig[1:] tmp = np.append(tmp, [sig[-1]]) tmp = sig - tmp tmp[np.where(tmp>0)] = 1 tmp[np.where(tmp==0)] = 0 tmp[np.where(tmp<0)] = -1 tmp2 = tmp[1:] tmp2 = np.append(tmp2, [0]) tmp = tmp-tmp2 hard_peaks = np.where(np.logical_or(tmp==-2, tmp==+2))[0] + 1 soft_peaks = [] for iv in np.where(np.logical_or(tmp==-1,tmp==+1))[0]: t = tmp[iv] i = iv+1 while True: if i==len(tmp) or tmp[i] == -t or tmp[i] == -2 or tmp[i] == 2: break if tmp[i] == t: soft_peaks.append(int(iv + (i - iv)/2)) break i += 1 soft_peaks = np.array(soft_peaks, dtype='int') + 1 return hard_peaks, soft_peaks def find_local_peaks(sig, radius): """ Find all local peaks in a signal. A sample is a local peak if it is the largest value within the samples on its left and right. In cases where it shares the max value with nearby samples, the middle sample is classified as the local peak. Parameters ---------- sig : ndarray 1d numpy array of the signal. radius : int The radius in which to search for defining local maxima. Returns ------- ndarray The locations of all of the local peaks of the input signal. """ # TODO: Fix flat mountain scenarios. if np.min(sig) == np.max(sig): return np.empty(0) peak_inds = [] i = 0 while i < radius + 1: if sig[i] == max(sig[:i + radius]): peak_inds.append(i) i += radius else: i += 1 while i < len(sig): if sig[i] == max(sig[i - radius:i + radius]): peak_inds.append(i) i += radius else: i += 1 while i < len(sig): if sig[i] == max(sig[i - radius:]): peak_inds.append(i) i += radius else: i += 1 return (np.array(peak_inds)) def correct_peaks(sig, peak_inds, search_radius, smooth_window_size, peak_dir='compare'): """ Adjust a set of detected peaks to coincide with local signal maxima. Parameters ---------- sig : ndarray The 1d signal array. peak_inds : np array Array of the original peak indices. search_radius : int The radius within which the original peaks may be shifted. smooth_window_size : int The window size of the moving average filter applied on the signal. Peak distance is calculated on the difference between the original and smoothed signal. peak_dir : str, optional The expected peak direction: 'up' or 'down', 'both', or 'compare'. - If 'up', the peaks will be shifted to local maxima. - If 'down', the peaks will be shifted to local minima. - If 'both', the peaks will be shifted to local maxima of the rectified signal. - If 'compare', the function will try both 'up' and 'down' options, and choose the direction that gives the largest mean distance from the smoothed signal. Returns ------- shifted_peak_inds : ndarray Array of the corrected peak indices. """ sig_len = sig.shape[0] n_peaks = len(peak_inds) # Subtract the smoothed signal from the original sig = sig - smooth(sig=sig, window_size=smooth_window_size) # Shift peaks to local maxima if peak_dir == 'up': shifted_peak_inds = shift_peaks(sig=sig, peak_inds=peak_inds, search_radius=search_radius, peak_up=True) elif peak_dir == 'down': shifted_peak_inds = shift_peaks(sig=sig, peak_inds=peak_inds, search_radius=search_radius, peak_up=False) elif peak_dir == 'both': shifted_peak_inds = shift_peaks(sig=np.abs(sig), peak_inds=peak_inds, search_radius=search_radius, peak_up=True) else: shifted_peak_inds_up = shift_peaks(sig=sig, peak_inds=peak_inds, search_radius=search_radius, peak_up=True) shifted_peak_inds_down = shift_peaks(sig=sig, peak_inds=peak_inds, search_radius=search_radius, peak_up=False) # Choose the direction with the biggest deviation up_dist = np.mean(np.abs(sig[shifted_peak_inds_up])) down_dist = np.mean(np.abs(sig[shifted_peak_inds_down])) if up_dist >= down_dist: shifted_peak_inds = shifted_peak_inds_up else: shifted_peak_inds = shifted_peak_inds_down return shifted_peak_inds def shift_peaks(sig, peak_inds, search_radius, peak_up): """ Helper function for correct_peaks. Return the shifted peaks to local maxima or minima within a radius. Parameters ---------- sig : ndarray The 1d signal array. peak_inds : np array Array of the original peak indices. search_radius : int The radius within which the original peaks may be shifted. peak_up : bool Whether the expected peak direction is up. Returns ------- shifted_peak_inds : ndarray Array of the corrected peak indices. """ sig_len = sig.shape[0] n_peaks = len(peak_inds) # The indices to shift each peak ind by shift_inds = np.zeros(n_peaks, dtype='int') # Iterate through peaks for i in range(n_peaks): ind = peak_inds[i] local_sig = sig[max(0, ind - search_radius):min(ind + search_radius, sig_len-1)] if peak_up: shift_inds[i] = np.argmax(local_sig) else: shift_inds[i] = np.argmin(local_sig) # May have to adjust early values for i in range(n_peaks): ind = peak_inds[i] if ind >= search_radius: break shift_inds[i] -= search_radius - ind shifted_peak_inds = peak_inds + shift_inds - search_radius return shifted_peak_inds