Waveform Database Software Package (WFDB) for Python 3.3.0

File: <base>/wfdb/processing/basic.py (7,052 bytes)
import numpy as np
from scipy import signal
import pandas as pd
import pdb

from wfdb.io.annotation import Annotation


def resample_ann(resampled_t, ann_sample):
    """
    Compute the new annotation indices.

    Parameters
    ----------
    resampled_t : ndarray
        Array of signal locations as returned by scipy.signal.resample.
    ann_sample : ndarray
        Array of annotation locations.

    Returns
    -------
    ndarray
        Array of resampled annotation locations.

    """
    tmp = np.zeros(len(resampled_t), dtype='int16')
    j = 0
    break_loop = 0
    tprec = resampled_t[j]
    for i, v in enumerate(ann_sample):
        break_loop = 0
        while True:
            d = False
            if v < tprec:
                j -= 1
                tprec = resampled_t[j]

            if j+1 == len(resampled_t):
                tmp[j] += 1
                break

            tnow = resampled_t[j+1]
            if tprec <= v and v <= tnow:
                if v-tprec < tnow-v:
                    tmp[j] += 1
                else:
                    tmp[j+1] += 1
                d = True
            j += 1
            tprec = tnow
            break_loop += 1
            if (break_loop > 1000):
                tmp[j] += 1
                break
            if d:
                break

    idx = np.where(tmp>0)[0].astype('int64')
    res = []
    for i in idx:
        for j in range(tmp[i]):
            res.append(i)
    assert len(res) == len(ann_sample)

    return np.asarray(res, dtype='int64')


def resample_sig(x, fs, fs_target):
    """
    Resample a signal to a different frequency.

    Parameters
    ----------
    x : ndarray
        Array containing the signal.
    fs : int, float
        The original sampling frequency.
    fs_target : int, float
        The target frequency.

    Returns
    -------
    resampled_x : ndarray
        Array of the resampled signal values.
    resampled_t : ndarray
        Array of the resampled signal locations.

    """
    t = np.arange(x.shape[0]).astype('float64')

    if fs == fs_target:
        return x, t

    new_length = int(x.shape[0]*fs_target/fs)
    # Resample the array if NaN values are present
    if np.isnan(x).any():
        x = pd.Series(x.reshape((-1,))).interpolate().values
    resampled_x, resampled_t = signal.resample(x, num=new_length, t=t)
    assert resampled_x.shape == resampled_t.shape and resampled_x.shape[0] == new_length
    assert np.all(np.diff(resampled_t) > 0)

    return resampled_x, resampled_t


def resample_singlechan(x, ann, fs, fs_target):
    """
    Resample a single-channel signal with its annotations.

    Parameters
    ----------
    x: ndarray
        The signal array.
    ann : WFDB Annotation
        The WFDB annotation object.
    fs : int, float
        The original frequency.
    fs_target : int, float
        The target frequency.

    Returns
    -------
    resampled_x : ndarray
        Array of the resampled signal values.
    resampled_ann : WFDB Annotation
        Annotation containing resampled annotation locations.

    """
    resampled_x, resampled_t = resample_sig(x, fs, fs_target)

    new_sample = resample_ann(resampled_t, ann.sample)
    assert ann.sample.shape == new_sample.shape

    resampled_ann = Annotation(record_name=ann.record_name,
                               extension=ann.extension,
                               sample=new_sample,
                               symbol=ann.symbol,
                               subtype=ann.subtype,
                               chan=ann.chan,
                               num=ann.num,
                               aux_note=ann.aux_note,
                               fs=fs_target)

    return resampled_x, resampled_ann


def resample_multichan(xs, ann, fs, fs_target, resamp_ann_chan=0):
    """
    Resample multiple channels with their annotations.

    Parameters
    ----------
    xs: ndarray
        The signal array.
    ann : WFDB Annotation
        The WFDB annotation object.
    fs : int, float
        The original frequency.
    fs_target : int, float
        The target frequency.
    resample_ann_channel : int, optional
        The signal channel used to compute new annotation indices.

    Returns
    -------
    ndarray
        Array of the resampled signal values.
    resampled_ann : WFDB Annotation
        Annotation containing resampled annotation locations.

    """
    assert resamp_ann_chan < xs.shape[1]

    lx = []
    lt = None
    for chan in range(xs.shape[1]):
        resampled_x, resampled_t = resample_sig(xs[:, chan], fs, fs_target)
        lx.append(resampled_x)
        if chan == resamp_ann_chan:
            lt = resampled_t

    new_sample = resample_ann(lt, ann.sample)
    assert ann.sample.shape == new_sample.shape

    resampled_ann = Annotation(record_name=ann.record_name,
                               extension=ann.extension,
                               sample=new_sample,
                               symbol=ann.symbol,
                               subtype=ann.subtype,
                               chan=ann.chan,
                               num=ann.num,
                               aux_note=ann.aux_note,
                               fs=fs_target)

    return np.column_stack(lx), resampled_ann


def normalize_bound(sig, lb=0, ub=1):
    """
    Normalize a signal between the lower and upper bound.

    Parameters
    ----------
    sig : ndarray
        Original signal to be normalized.
    lb : int, float, optional
        Lower bound.
    ub : int, float, optional
        Upper bound.

    Returns
    -------
    ndarray
        Normalized signal.

    """
    mid = ub - (ub - lb) / 2
    min_v = np.min(sig)
    max_v = np.max(sig)
    mid_v =  max_v - (max_v - min_v) / 2
    coef = (ub - lb) / (max_v - min_v)
    return sig * coef - (mid_v * coef) + mid


def smooth(sig, window_size):
    """
    Apply a uniform moving average filter to a signal.

    Parameters
    ----------
    sig : ndarray
        The signal to smooth.
    window_size : int
        The width of the moving average filter.

    Returns
    -------
    ndarray
        The convolved input signal with the desired box waveform.

    """
    box = np.ones(window_size)/window_size
    return np.convolve(sig, box, mode='same')


def get_filter_gain(b, a, f_gain, fs):
    """
    Given filter coefficients, return the gain at a particular
    frequency.

    Parameters
    ----------
    b : list
        List of linear filter b coefficients.
    a : list
        List of linear filter a coefficients.
    f_gain : int, float, optional
        The frequency at which to calculate the gain.
    fs : int, float, optional
        The sampling frequency of the system.
    
    Returns
    -------
    gain : int, float
        The passband gain at the desired frequency.

    """
    # Save the passband gain
    w, h = signal.freqz(b, a)
    w_gain = f_gain * 2 * np.pi / fs

    ind = np.where(w >= w_gain)[0][0]
    gain = abs(h[ind])

    return gain