Waveform Database Software Package (WFDB) for Python 3.3.0
(7,052 bytes)
import numpy as np
from scipy import signal
import pandas as pd
import pdb
from wfdb.io.annotation import Annotation
def resample_ann(resampled_t, ann_sample):
"""
Compute the new annotation indices.
Parameters
----------
resampled_t : ndarray
Array of signal locations as returned by scipy.signal.resample.
ann_sample : ndarray
Array of annotation locations.
Returns
-------
ndarray
Array of resampled annotation locations.
"""
tmp = np.zeros(len(resampled_t), dtype='int16')
j = 0
break_loop = 0
tprec = resampled_t[j]
for i, v in enumerate(ann_sample):
break_loop = 0
while True:
d = False
if v < tprec:
j -= 1
tprec = resampled_t[j]
if j+1 == len(resampled_t):
tmp[j] += 1
break
tnow = resampled_t[j+1]
if tprec <= v and v <= tnow:
if v-tprec < tnow-v:
tmp[j] += 1
else:
tmp[j+1] += 1
d = True
j += 1
tprec = tnow
break_loop += 1
if (break_loop > 1000):
tmp[j] += 1
break
if d:
break
idx = np.where(tmp>0)[0].astype('int64')
res = []
for i in idx:
for j in range(tmp[i]):
res.append(i)
assert len(res) == len(ann_sample)
return np.asarray(res, dtype='int64')
def resample_sig(x, fs, fs_target):
"""
Resample a signal to a different frequency.
Parameters
----------
x : ndarray
Array containing the signal.
fs : int, float
The original sampling frequency.
fs_target : int, float
The target frequency.
Returns
-------
resampled_x : ndarray
Array of the resampled signal values.
resampled_t : ndarray
Array of the resampled signal locations.
"""
t = np.arange(x.shape[0]).astype('float64')
if fs == fs_target:
return x, t
new_length = int(x.shape[0]*fs_target/fs)
# Resample the array if NaN values are present
if np.isnan(x).any():
x = pd.Series(x.reshape((-1,))).interpolate().values
resampled_x, resampled_t = signal.resample(x, num=new_length, t=t)
assert resampled_x.shape == resampled_t.shape and resampled_x.shape[0] == new_length
assert np.all(np.diff(resampled_t) > 0)
return resampled_x, resampled_t
def resample_singlechan(x, ann, fs, fs_target):
"""
Resample a single-channel signal with its annotations.
Parameters
----------
x: ndarray
The signal array.
ann : WFDB Annotation
The WFDB annotation object.
fs : int, float
The original frequency.
fs_target : int, float
The target frequency.
Returns
-------
resampled_x : ndarray
Array of the resampled signal values.
resampled_ann : WFDB Annotation
Annotation containing resampled annotation locations.
"""
resampled_x, resampled_t = resample_sig(x, fs, fs_target)
new_sample = resample_ann(resampled_t, ann.sample)
assert ann.sample.shape == new_sample.shape
resampled_ann = Annotation(record_name=ann.record_name,
extension=ann.extension,
sample=new_sample,
symbol=ann.symbol,
subtype=ann.subtype,
chan=ann.chan,
num=ann.num,
aux_note=ann.aux_note,
fs=fs_target)
return resampled_x, resampled_ann
def resample_multichan(xs, ann, fs, fs_target, resamp_ann_chan=0):
"""
Resample multiple channels with their annotations.
Parameters
----------
xs: ndarray
The signal array.
ann : WFDB Annotation
The WFDB annotation object.
fs : int, float
The original frequency.
fs_target : int, float
The target frequency.
resample_ann_channel : int, optional
The signal channel used to compute new annotation indices.
Returns
-------
ndarray
Array of the resampled signal values.
resampled_ann : WFDB Annotation
Annotation containing resampled annotation locations.
"""
assert resamp_ann_chan < xs.shape[1]
lx = []
lt = None
for chan in range(xs.shape[1]):
resampled_x, resampled_t = resample_sig(xs[:, chan], fs, fs_target)
lx.append(resampled_x)
if chan == resamp_ann_chan:
lt = resampled_t
new_sample = resample_ann(lt, ann.sample)
assert ann.sample.shape == new_sample.shape
resampled_ann = Annotation(record_name=ann.record_name,
extension=ann.extension,
sample=new_sample,
symbol=ann.symbol,
subtype=ann.subtype,
chan=ann.chan,
num=ann.num,
aux_note=ann.aux_note,
fs=fs_target)
return np.column_stack(lx), resampled_ann
def normalize_bound(sig, lb=0, ub=1):
"""
Normalize a signal between the lower and upper bound.
Parameters
----------
sig : ndarray
Original signal to be normalized.
lb : int, float, optional
Lower bound.
ub : int, float, optional
Upper bound.
Returns
-------
ndarray
Normalized signal.
"""
mid = ub - (ub - lb) / 2
min_v = np.min(sig)
max_v = np.max(sig)
mid_v = max_v - (max_v - min_v) / 2
coef = (ub - lb) / (max_v - min_v)
return sig * coef - (mid_v * coef) + mid
def smooth(sig, window_size):
"""
Apply a uniform moving average filter to a signal.
Parameters
----------
sig : ndarray
The signal to smooth.
window_size : int
The width of the moving average filter.
Returns
-------
ndarray
The convolved input signal with the desired box waveform.
"""
box = np.ones(window_size)/window_size
return np.convolve(sig, box, mode='same')
def get_filter_gain(b, a, f_gain, fs):
"""
Given filter coefficients, return the gain at a particular
frequency.
Parameters
----------
b : list
List of linear filter b coefficients.
a : list
List of linear filter a coefficients.
f_gain : int, float, optional
The frequency at which to calculate the gain.
fs : int, float, optional
The sampling frequency of the system.
Returns
-------
gain : int, float
The passband gain at the desired frequency.
"""
# Save the passband gain
w, h = signal.freqz(b, a)
w_gain = f_gain * 2 * np.pi / fs
ind = np.where(w >= w_gain)[0][0]
gain = abs(h[ind])
return gain