# -*- coding: utf-8 -*- #%% Download Data import wfdb import h5py import numpy as np import pandas as pd import matplotlib.pyplot as plt import scipy.io as sio from scipy import signal as sg #%% Pan Tompkins Algorithm class Pan_Tompkins_QRS(): def __init__(self, fs): self.fs = fs def band_pass_filter(self,signal): ''' Band Pass Filter :param signal: input signal :return: prcoessed signal Methodology/Explaination: Bandpass filter is used to attenuate the noise in the input signal. To acheive a passband of 5-15 Hz, the input signal is first passed through a low pass filter having a cutoff frequency of 11 Hz and then through a high pass filter with a cutoff frequency of 5 Hz, thus achieving the required thresholds. The low pass filter has the recursive equation: y(nT) = 2y(nT - T) - y(nT - 2T) + x(nT) - 2x(nT - 6T) + x(nT - 12T) The high pass filter has the recursive equation: y(nT) = 32x(nT - 16T) - y(nT - T) - x(nT) + x(nT - 32T) ''' # Initialize result result = None # Create a copy of the input signal sig = signal.copy() # Apply the low pass filter using the equation given for index in range(len(signal)): sig[index] = signal[index] if (index >= 1): sig[index] += 2*sig[index-1] if (index >= 2): sig[index] -= sig[index-2] if (index >= 6): sig[index] -= 2*signal[index-6] if (index >= 12): sig[index] += signal[index-12] # Copy the result of the low pass filter result = sig.copy() # Apply the high pass filter using the equation given for index in range(len(signal)): result[index] = -1*sig[index] if (index >= 1): result[index] -= result[index-1] if (index >= 16): result[index] += 32*sig[index-16] if (index >= 32): result[index] += sig[index-32] # Normalize the result from the high pass filter max_val = max(max(result),-min(result)) result = result/max_val return result def derivative(self,signal): ''' Derivative Filter :param signal: input signal :return: prcoessed signal Methodology/Explaination: The derivative of the input signal is taken to obtain the information of the slope of the signal. Thus, the rate of change of input is obtain in this step of the algorithm. The derivative filter has the recursive equation: y(nT) = [-x(nT - 2T) - 2x(nT - T) + 2x(nT + T) + x(nT + 2T)]/(8T) ''' # Initialize result result = signal.copy() # Apply the derivative filter using the equation given for index in range(len(signal)): result[index] = 0 if (index >= 1): result[index] -= 2*signal[index-1] if (index >= 2): result[index] -= signal[index-2] if (index >= 2 and index <= len(signal)-2): result[index] += 2*signal[index+1] if (index >= 2 and index <= len(signal)-3): result[index] += signal[index+2] result[index] = (result[index]*self.fs)/8 return result def squaring(self,signal): ''' Squaring the Signal :param signal: input signal :return: prcoessed signal Methodology/Explaination: The squaring process is used to intensify the slope of the frequency response curve obtained in the derivative step. This step helps in restricting false positives which may be caused by T waves in the input signal. The squaring filter has the recursive equation: y(nT) = [x(nT)]^2 ''' # Initialize result result = signal.copy() # Apply the squaring using the equation given for index in range(len(signal)): result[index] = signal[index]**2 return result def moving_window_integration(self,signal): ''' Moving Window Integrator :param signal: input signal :return: prcoessed signal Methodology/Explaination: The moving window integration process is done to obtain information about both the slope and width of the QRS complex. A window size of 0.15*(sample frequency) is used for more accurate results. The moving window integration has the recursive equation: y(nT) = [y(nT - (N-1)T) + x(nT - (N-2)T) + ... + x(nT)]/N where N is the number of samples in the width of integration window. ''' # Initialize result and window size for integration result = signal.copy() #print(0.150 * self.fs) win_size = round(0.150 * self.fs) sum = 0 # Calculate the sum for the first N terms for j in range(win_size): sum += signal[j]/win_size result[j] = sum # Apply the moving window integration using the equation given for index in range(win_size,len(signal)): sum += signal[index]/win_size sum -= signal[index-win_size]/win_size result[index] = sum return result def solve(self,signal): ''' Solver, Combines all the above functions :param signal: input signal :return: prcoessed signal Methodology/Explaination: The peak detection algorithm works on the moving window and bandpass filtered signal. So the input signal is first bandpassed, then the output of the bandpass filter is given to the derivative function and the result is squared. Finally the output of the squaring function is given to the moving window integration function and returned. ''' # Convert the input signal into numpy array input_signal = signal.iloc[:,1].to_numpy() # Bandpass Filter global bpass bpass = self.band_pass_filter(input_signal.copy()) # Derivative Function global der der = self.derivative(bpass.copy()) # Squaring Function global sqr sqr = self.squaring(der.copy()) # Moving Window Integration Function global mwin mwin = self.moving_window_integration(sqr.copy()) return mwin #%% Calculating heart rate # Importing Libraries class heart_rate(): def __init__(self,signal,samp_freq): ''' Initialize Variables :param signal: input signal :param samp_freq: sample frequency of input signal ''' # Initialize variables self.RR1, self.RR2, self.probable_peaks, self.r_locs, self.peaks, self.result = ([] for i in range(6)) self.SPKI, self.NPKI, self.Threshold_I1, self.Threshold_I2, self.SPKF, self.NPKF, self.Threshold_F1, self.Threshold_F2 = (0 for i in range(8)) self.T_wave = False self.m_win = mwin self.b_pass = bpass self.samp_freq = samp_freq self.signal = signal self.win_150ms = round(0.15*self.samp_freq) self.RR_Low_Limit = 0 self.RR_High_Limit = 0 self.RR_Missed_Limit = 0 self.RR_Average1 = 0 def approx_peak(self): ''' Approximate peak locations ''' # FFT convolution slopes = sg.fftconvolve(self.m_win, np.full((25,), 1) / 25, mode='same') # Finding approximate peak locations for i in range(round(0.5*self.samp_freq) + 1,len(slopes)-1): if (slopes[i] > slopes[i-1]) and (slopes[i+1] < slopes[i]): self.peaks.append(i) def adjust_rr_interval(self,ind): ''' Adjust RR Interval and Limits :param ind: current index in peaks array ''' # Finding the eight most recent RR intervals self.RR1 = np.diff(self.peaks[max(0,ind - 8) : ind + 1])/self.samp_freq # Calculating RR Averages self.RR_Average1 = np.mean(self.RR1) RR_Average2 = self.RR_Average1 # Finding the eight most recent RR intervals lying between RR Low Limit and RR High Limit if (ind >= 8): for i in range(0, 8): if (self.RR_Low_Limit < self.RR1[i] < self.RR_High_Limit): self.RR2.append(self.RR1[i]) if (len(self.RR2) > 8): self.RR2.remove(self.RR2[0]) RR_Average2 = np.mean(self.RR2) # Adjusting the RR Low Limit and RR High Limit if (len(self.RR2) > 7 or ind < 8): self.RR_Low_Limit = 0.92 * RR_Average2 self.RR_High_Limit = 1.16 * RR_Average2 self.RR_Missed_Limit = 1.66 * RR_Average2 def searchback(self,peak_val,RRn,sb_win): ''' Searchback :param peak_val: peak location in consideration :param RRn: the most recent RR interval :param sb_win: searchback window ''' # Check if the most recent RR interval is greater than the RR Missed Limit if (RRn > self.RR_Missed_Limit): # Initialize a window to searchback win_rr = self.m_win[peak_val - sb_win + 1 : peak_val + 1] # Find the x locations inside the window having y values greater than Threshold I1 coord = np.asarray(win_rr > self.Threshold_I1).nonzero()[0] # Find the x location of the max peak value in the search window if (len(coord) > 0): for pos in coord: if (win_rr[pos] == max(win_rr[coord])): x_max = pos break else: x_max = None # If the max peak value is found if (x_max is not None): # Update the thresholds corresponding to moving window integration self.SPKI = 0.25 * self.m_win[x_max] + 0.75 * self.SPKI self.Threshold_I1 = self.NPKI + 0.25 * (self.SPKI - self.NPKI) self.Threshold_I2 = 0.5 * self.Threshold_I1 # Initialize a window to searchback win_rr = self.b_pass[x_max - self.win_150ms: min(len(self.b_pass) -1, x_max)] # Find the x locations inside the window having y values greater than Threshold F1 coord = np.asarray(win_rr > self.Threshold_F1).nonzero()[0] # Find the x location of the max peak value in the search window if (len(coord) > 0): for pos in coord: if (win_rr[pos] == max(win_rr[coord])): r_max = pos break else: r_max = None # If the max peak value is found if (r_max is not None): # Update the thresholds corresponding to bandpass filter if self.b_pass[r_max] > self.Threshold_F2: self.SPKF = 0.25 * self.b_pass[r_max] + 0.75 * self.SPKF self.Threshold_F1 = self.NPKF + 0.25 * (self.SPKF - self.NPKF) self.Threshold_F2 = 0.5 * self.Threshold_F1 # Append the probable R peak location self.r_locs.append(r_max) def find_t_wave(self,peak_val,RRn,ind,prev_ind): ''' T Wave Identification :param peak_val: peak location in consideration :param RRn: the most recent RR interval :param ind: current index in peaks array :param prev_ind: previous index in peaks array ''' if (self.m_win[peak_val] >= self.Threshold_I1): if (ind > 0 and 0.20 < RRn < 0.36): # Find the slope of current and last waveform detected curr_slope = max(np.diff(self.m_win[peak_val - round(self.win_150ms/2) : peak_val + 1])) last_slope = max(np.diff(self.m_win[self.peaks[prev_ind] - round(self.win_150ms/2) : self.peaks[prev_ind] + 1])) # If current waveform slope is less than half of last waveform slope if (curr_slope < 0.5*last_slope): # T Wave is found and update noise threshold self.T_wave = True self.NPKI = 0.125 * self.m_win[peak_val] + 0.875 * self.NPKI if (not self.T_wave): # T Wave is not found and update signal thresholds if (self.probable_peaks[ind] > self.Threshold_F1): self.SPKI = 0.125 * self.m_win[peak_val] + 0.875 * self.SPKI self.SPKF = 0.125 * self.b_pass[ind] + 0.875 * self.SPKF # Append the probable R peak location self.r_locs.append(self.probable_peaks[ind]) else: self.SPKI = 0.125 * self.m_win[peak_val] + 0.875 * self.SPKI self.NPKF = 0.125 * self.b_pass[ind] + 0.875 * self.NPKF # Update noise thresholds elif (self.m_win[peak_val] < self.Threshold_I1) or (self.Threshold_I1 < self.m_win[peak_val] < self.Threshold_I2): self.NPKI = 0.125 * self.m_win[peak_val] + 0.875 * self.NPKI self.NPKF = 0.125 * self.b_pass[ind] + 0.875 * self.NPKF def adjust_thresholds(self,peak_val,ind): ''' Adjust Noise and Signal Thresholds During Learning Phase :param peak_val: peak location in consideration :param ind: current index in peaks array ''' if (self.m_win[peak_val] >= self.Threshold_I1): # Update signal threshold self.SPKI = 0.125 * self.m_win[peak_val] + 0.875 * self.SPKI if (self.probable_peaks[ind] > self.Threshold_F1): self.SPKF = 0.125 * self.b_pass[ind] + 0.875 * self.SPKF # Append the probable R peak location self.r_locs.append(self.probable_peaks[ind]) else: # Update noise threshold self.NPKF = 0.125 * self.b_pass[ind] + 0.875 * self.NPKF # Update noise thresholds elif (self.m_win[peak_val] < self.Threshold_I2) or (self.Threshold_I2 < self.m_win[peak_val] < self.Threshold_I1): self.NPKI = 0.125 * self.m_win[peak_val] + 0.875 * self.NPKI self.NPKF = 0.125 * self.b_pass[ind] + 0.875 * self.NPKF def update_thresholds(self): ''' Update Noise and Signal Thresholds for next iteration ''' self.Threshold_I1 = self.NPKI + 0.25 * (self.SPKI - self.NPKI) self.Threshold_F1 = self.NPKF + 0.25 * (self.SPKF - self.NPKF) self.Threshold_I2 = 0.5 * self.Threshold_I1 self.Threshold_F2 = 0.5 * self.Threshold_F1 self.T_wave = False def ecg_searchback(self): ''' Searchback in ECG signal to increase efficiency ''' # Filter the unique R peak locations self.r_locs = np.unique(np.array(self.r_locs).astype(int)) # Initialize a window to searchback win_200ms = round(0.2*self.samp_freq) for r_val in self.r_locs: coord = np.arange(r_val - win_200ms, min(len(self.signal), r_val + win_200ms + 1), 1) # Find the x location of the max peak value if (len(coord) > 0): for pos in coord: if (self.signal[pos] == max(self.signal[coord])): x_max = pos break else: x_max = None # Append the peak location if (x_max is not None): self.result.append(x_max) def find_r_peaks(self): ''' R Peak Detection ''' # Find approximate peak locations self.approx_peak() # Iterate over possible peak locations for ind in range(len(self.peaks)): # Initialize the search window for peak detection peak_val = self.peaks[ind] win_300ms = np.arange(max(0, self.peaks[ind] - self.win_150ms), min(self.peaks[ind] + self.win_150ms, len(self.b_pass)-1), 1) max_val = max(self.b_pass[win_300ms], default = 0) # Find the x location of the max peak value if (max_val != 0): x_coord = np.asarray(self.b_pass == max_val).nonzero() self.probable_peaks.append(x_coord[0][0]) if (ind < len(self.probable_peaks) and ind != 0): # Adjust RR interval and limits self.adjust_rr_interval(ind) # Adjust thresholds in case of irregular beats if (self.RR_Average1 < self.RR_Low_Limit or self.RR_Average1 > self.RR_Missed_Limit): self.Threshold_I1 /= 2 self.Threshold_F1 /= 2 RRn = self.RR1[-1] # Searchback self.searchback(peak_val,RRn,round(RRn*self.samp_freq)) # T Wave Identification self.find_t_wave(peak_val,RRn,ind,ind-1) else: # Adjust threholds self.adjust_thresholds(peak_val,ind) # Update threholds for next iteration self.update_thresholds() # Searchback in ECG signal self.ecg_searchback() return self.result #%% Compute Parameters if __name__ == "__main__": # Convert ecg signal to numpy array samplFreq = 200 QRS_detector = Pan_Tompkins_QRS(samplFreq) f = h5py.File('./59146237/measure/CH07_59146237_s0000029.h5', 'r') ecg_signal = f['measure']['value']['_030']['value']['data']['value'][0,:] timestamp = f['measure']['value']['_030']['value']['time']['value'][0,:] ecg = pd.DataFrame({'TimeStamp': timestamp, 'ecg': ecg_signal}) output_singal = QRS_detector.solve(ecg) signal = ecg.iloc[:,1].to_numpy() # Find the R peak locations hr = heart_rate(signal,samplFreq) result = hr.find_r_peaks() result = np.array(result) # Clip the x locations less than 0 (Learning Phase) result = result[result > 0] # Calculate the heart rate heartRate = (60*samplFreq)/np.average(np.diff(result[1:])) print("Heart Rate",heartRate, "BPM") # Plotting the R peak locations in ECG signal plt.figure(figsize = (20,4), dpi = 100) plt.xticks(np.arange(0, len(signal)+1, 150)) plt.plot(signal, color = 'blue') plt.scatter(result, signal[result], color = 'red', s = 50, marker= '*') plt.xlabel('Samples') plt.ylabel('mV') plt.title("R Peak Locations")