Waveform Database Software Package (WFDB) for Python 3.3.0
(4,179 bytes)
import numpy as np
import wfdb
from wfdb import processing
class test_processing():
"""
Test processing functions
"""
def test_resample_single(self):
sig, fields = wfdb.rdsamp('sample-data/100')
ann = wfdb.rdann('sample-data/100', 'atr')
fs = fields['fs']
fs_target = 50
new_sig, new_ann = processing.resample_singlechan(sig[:, 0], ann, fs,
fs_target)
expected_length = int(sig.shape[0] * fs_target / fs)
assert new_sig.shape[0] == expected_length
def test_resample_multi(self):
sig, fields = wfdb.rdsamp('sample-data/100')
ann = wfdb.rdann('sample-data/100', 'atr')
fs = fields['fs']
fs_target = 50
new_sig, new_ann = processing.resample_multichan(sig, ann, fs, fs_target)
expected_length = int(sig.shape[0]*fs_target/fs)
assert new_sig.shape[0] == expected_length
assert new_sig.shape[1] == sig.shape[1]
def test_normalize_bound(self):
sig, _ = wfdb.rdsamp('sample-data/100')
lb = -5
ub = 15
x = processing.normalize_bound(sig[:, 0], lb, ub)
assert x.shape[0] == sig.shape[0]
assert np.min(x) >= lb
assert np.max(x) <= ub
def test_find_peaks(self):
x = [0, 2, 1, 0, -10, -15, -15, -15, 9, 8, 0, 0, 1, 2, 10]
hp, sp = processing.find_peaks(x)
assert np.array_equal(hp, [1, 8])
assert np.array_equal(sp, [6, 10])
def test_find_peaks_empty(self):
x = []
hp, sp = processing.find_peaks(x)
assert hp.shape == (0,)
assert sp.shape == (0,)
def test_gqrs(self):
record = wfdb.rdrecord('sample-data/100', channels=[0],
sampfrom=9998, sampto=19998, physical=False)
expected_peaks = [271, 580, 884, 1181, 1469, 1770, 2055, 2339, 2634,
2939, 3255, 3551, 3831, 4120, 4412, 4700, 5000, 5299,
5596, 5889, 6172, 6454, 6744, 7047, 7347, 7646, 7936,
8216, 8503, 8785, 9070, 9377, 9682]
peaks = processing.gqrs_detect(d_sig=record.d_signal[:,0],
fs=record.fs,
adc_gain=record.adc_gain[0],
adc_zero=record.adc_zero[0],
threshold=1.0)
assert np.array_equal(peaks, expected_peaks)
def test_correct_peaks(self):
sig, fields = wfdb.rdsamp('sample-data/100')
ann = wfdb.rdann('sample-data/100', 'atr')
fs = fields['fs']
min_bpm = 10
max_bpm = 350
min_gap = fs*60/min_bpm
max_gap = fs * 60 / max_bpm
y_idxs = processing.correct_peaks(sig=sig[:,0], peak_inds=ann.sample,
search_radius=int(max_gap),
smooth_window_size=150)
yz = np.zeros(sig.shape[0])
yz[y_idxs] = 1
yz = np.where(yz[:10000]==1)[0]
expected_peaks = [77, 370, 663, 947, 1231, 1515, 1809, 2045, 2403,
2706, 2998, 3283, 3560, 3863, 4171, 4466, 4765, 5061,
5347, 5634, 5919, 6215, 6527, 6824, 7106, 7393, 7670,
7953, 8246, 8539, 8837, 9142, 9432, 9710, 9998]
assert np.array_equal(yz, expected_peaks)
class test_qrs():
"""
Testing QRS detectors
"""
def test_xqrs(self):
"""
Run XQRS detector on record 100 and compare to reference annotations
"""
sig, fields = wfdb.rdsamp('sample-data/100', channels=[0])
ann_ref = wfdb.rdann('sample-data/100','atr')
xqrs = processing.XQRS(sig=sig[:,0], fs=fields['fs'])
xqrs.detect()
comparitor = processing.compare_annotations(ann_ref.sample[1:],
xqrs.qrs_inds,
int(0.1 * fields['fs']))
assert comparitor.sensitivity > 0.99
assert comparitor.positive_predictivity > 0.99