Waveform Database Software Package (WFDB) for Python 4.0.0

File: <base>/tests/test_record.py (33,683 bytes)
import datetime
import os
import shutil
import unittest

import numpy as np

import wfdb


class TestRecord(unittest.TestCase):
    """
    Test read and write of single segment WFDB records, including
    PhysioNet streaming.

    Target files created using the original WFDB Software Package
    version 10.5.24

    """

    # ----------------------- 1. Basic Tests -----------------------#

    def test_1a(self):
        """
        Format 16, entire signal, digital.

        Target file created with:
            rdsamp -r sample-data/test01_00s | cut -f 2- > record-1a
        """
        record = wfdb.rdrecord(
            "sample-data/test01_00s", physical=False, return_res=16
        )
        sig = record.d_signal
        sig_target = np.genfromtxt("tests/target-output/record-1a")

        # Compare data streaming from Physionet
        record_pn = wfdb.rdrecord(
            "test01_00s", pn_dir="macecgdb", physical=False, return_res=16
        )

        # Test file writing
        record_2 = wfdb.rdrecord(
            "sample-data/test01_00s", physical=False, return_res=16
        )
        record_2.sig_name = ["ECG_1", "ECG_2", "ECG_3", "ECG_4"]
        record_2.wrsamp()
        record_write = wfdb.rdrecord(
            "test01_00s", physical=False, return_res=16
        )

        assert np.array_equal(sig, sig_target)
        assert record.__eq__(record_pn)
        assert record_2.__eq__(record_write)

    def test_1b(self):
        """
        Format 16, byte offset, selected duration, selected channels,
        physical.

        Target file created with:
            rdsamp -r sample-data/a103l -f 50 -t 160 -s 2 0 -P | cut -f 2- > record-1b
        """
        sig, fields = wfdb.rdsamp(
            "sample-data/a103l", sampfrom=12500, sampto=40000, channels=[2, 0]
        )
        sig_round = np.round(sig, decimals=8)
        sig_target = np.genfromtxt("tests/target-output/record-1b")

        # Compare data streaming from Physionet
        sig_pn, fields_pn = wfdb.rdsamp(
            "a103l",
            pn_dir="challenge-2015/training",
            sampfrom=12500,
            sampto=40000,
            channels=[2, 0],
        )

        # Option of selecting channels by name
        sig_named, fields_named = wfdb.rdsamp(
            "sample-data/a103l",
            sampfrom=12500,
            sampto=40000,
            channel_names=["PLETH", "II"],
        )

        assert np.array_equal(sig_round, sig_target)
        assert np.array_equal(sig, sig_pn) and fields == fields_pn
        assert np.array_equal(sig, sig_named) and fields == fields_named

    def test_1c(self):
        """
        Format 16, byte offset, selected duration, selected channels,
        digital, expanded format.

        Target file created with:
            rdsamp -r sample-data/a103l -f 80 -s 0 1 | cut -f 2- > record-1c
        """
        record = wfdb.rdrecord(
            "sample-data/a103l",
            sampfrom=20000,
            channels=[0, 1],
            physical=False,
            smooth_frames=False,
        )
        # convert expanded to uniform array
        sig = np.zeros((record.sig_len, record.n_sig))
        for i in range(record.n_sig):
            sig[:, i] = record.e_d_signal[i]

        sig_target = np.genfromtxt("tests/target-output/record-1c")

        # Compare data streaming from Physionet
        record_pn = wfdb.rdrecord(
            "a103l",
            pn_dir="challenge-2015/training",
            sampfrom=20000,
            channels=[0, 1],
            physical=False,
            smooth_frames=False,
        )

        # Test file writing
        record.wrsamp(expanded=True)
        record_write = wfdb.rdrecord(
            "a103l", physical=False, smooth_frames=False
        )

        assert np.array_equal(sig, sig_target)
        assert record.__eq__(record_pn)
        assert record.__eq__(record_write)

    def test_1d(self):
        """
        Format 80, selected duration, selected channels, physical

        Target file created with:
            rdsamp -r sample-data/3000003_0003 -f 1 -t 8 -s 1 -P | cut -f 2- > record-1d
        """
        sig, fields = wfdb.rdsamp(
            "sample-data/3000003_0003", sampfrom=125, sampto=1000, channels=[1]
        )
        sig_round = np.round(sig, decimals=8)
        sig_target = np.genfromtxt("tests/target-output/record-1d")
        sig_target = sig_target.reshape(len(sig_target), 1)

        # Compare data streaming from Physionet
        sig_pn, fields_pn = wfdb.rdsamp(
            "3000003_0003",
            pn_dir="mimic3wdb/30/3000003/",
            sampfrom=125,
            sampto=1000,
            channels=[1],
        )

        assert np.array_equal(sig_round, sig_target)
        assert np.array_equal(sig, sig_pn) and fields == fields_pn

    def test_1e(self):
        """
        Format 24, entire signal, digital.

        Target file created with:
            rdsamp -r sample-data/n8_evoked_raw_95_F1_R9 | cut -f 2- |
            gzip -9 -n > record-1e.gz
        """
        record = wfdb.rdrecord(
            "sample-data/n8_evoked_raw_95_F1_R9", physical=False
        )
        sig = record.d_signal
        sig_target = np.genfromtxt("tests/target-output/record-1e.gz")
        sig_target[sig_target == -32768] = -(2**23)

        # Compare data streaming from Physionet
        record_pn = wfdb.rdrecord(
            "n8_evoked_raw_95_F1_R9", physical=False, pn_dir="earndb/raw/N8"
        )

        # Test file writing
        record_2 = wfdb.rdrecord(
            "sample-data/n8_evoked_raw_95_F1_R9", physical=False
        )
        record_2.wrsamp()
        record_write = wfdb.rdrecord("n8_evoked_raw_95_F1_R9", physical=False)

        assert np.array_equal(sig, sig_target)
        assert record.__eq__(record_pn)
        assert record_2.__eq__(record_write)

    def test_1f(self):
        """
        All binary formats, multiple signal files in one record.

        Target file created with:
            rdsamp -r sample-data/binformats | cut -f 2- |
            gzip -9 -n > record-1f.gz
        """
        record = wfdb.rdrecord("sample-data/binformats", physical=False)
        sig_target = np.genfromtxt("tests/target-output/record-1f.gz")

        for n, name in enumerate(record.sig_name):
            np.testing.assert_array_equal(
                record.d_signal[:, n], sig_target[:, n], "Mismatch in %s" % name
            )

        for sampfrom in range(0, 3):
            for sampto in range(record.sig_len - 3, record.sig_len):
                record_2 = wfdb.rdrecord(
                    "sample-data/binformats",
                    physical=False,
                    sampfrom=sampfrom,
                    sampto=sampto,
                )
                for n, name in enumerate(record.sig_name):
                    if record.fmt[n] != "8":
                        np.testing.assert_array_equal(
                            record_2.d_signal[:, n],
                            sig_target[sampfrom:sampto, n],
                            "Mismatch in %s" % name,
                        )

    def test_read_flac(self):
        """
        All FLAC formats, multiple signal files in one record.

        Target file created with:
            rdsamp -r sample-data/flacformats | cut -f 2- |
            gzip -9 -n > record-flac.gz
        """
        record = wfdb.rdrecord("sample-data/flacformats", physical=False)
        sig_target = np.genfromtxt("tests/target-output/record-flac.gz")

        for n, name in enumerate(record.sig_name):
            np.testing.assert_array_equal(
                record.d_signal[:, n], sig_target[:, n], f"Mismatch in {name}"
            )

        for sampfrom in range(0, 3):
            for sampto in range(record.sig_len - 3, record.sig_len):
                record_2 = wfdb.rdrecord(
                    "sample-data/flacformats",
                    physical=False,
                    sampfrom=sampfrom,
                    sampto=sampto,
                )
                for n, name in enumerate(record.sig_name):
                    np.testing.assert_array_equal(
                        record_2.d_signal[:, n],
                        sig_target[sampfrom:sampto, n],
                        f"Mismatch in {name}",
                    )

    def test_read_flac_longduration(self):
        """
        Three signals multiplexed in a FLAC file, over 2**24 samples.

        Input file created with:
            yes 25 50 75 | head -5600000 |
            wrsamp -O 508 -o flac_3_constant 0 1 2

        Note that the total number of samples (across the three
        channels) exceeds 2**24.  There is a bug in libsndfile that
        causes it to break if we try to read more than 2**24 total
        samples at a time, when the number of channels is not a power
        of two.
        """
        record = wfdb.rdrecord("sample-data/flac_3_constant")
        sig_target = np.repeat(
            np.array([[0.125, 0.25, 0.375]], dtype="float64"),
            5600000,
            axis=0,
        )
        np.testing.assert_array_equal(record.p_signal, sig_target)

    # ------------------ 2. Special format records ------------------ #

    def test_2a(self):
        """
        Format 212, entire signal, physical.

        Target file created with:
            rdsamp -r sample-data/100 -P | cut -f 2- > record-2a
        """
        sig, fields = wfdb.rdsamp("sample-data/100")
        sig_round = np.round(sig, decimals=8)
        sig_target = np.genfromtxt("tests/target-output/record-2a")

        # Compare data streaming from Physionet
        sig_pn, fields_pn = wfdb.rdsamp("100", pn_dir="mitdb")
        # This comment line was manually added and is not present in the
        # original PhysioNet record
        del fields["comments"][0]

        assert np.array_equal(sig_round, sig_target)
        assert np.array_equal(sig, sig_pn) and fields == fields_pn

    def test_2b(self):
        """
        Format 212, selected duration, selected channel, digital.

        Target file created with:
            rdsamp -r sample-data/100 -f 0.002 -t 30 -s 1 | cut -f 2- > record-2b
        """
        record = wfdb.rdrecord(
            "sample-data/100",
            sampfrom=1,
            sampto=10800,
            channels=[1],
            physical=False,
        )
        sig = record.d_signal
        sig_target = np.genfromtxt("tests/target-output/record-2b")
        sig_target = sig_target.reshape(len(sig_target), 1)

        # Compare data streaming from Physionet
        record_pn = wfdb.rdrecord(
            "100",
            sampfrom=1,
            sampto=10800,
            channels=[1],
            physical=False,
            pn_dir="mitdb",
        )
        # This comment line was manually added and is not present in the
        # original PhysioNet record
        del record.comments[0]

        # Option of selecting channels by name
        record_named = wfdb.rdrecord(
            "sample-data/100",
            sampfrom=1,
            sampto=10800,
            channel_names=["V5"],
            physical=False,
        )
        del record_named.comments[0]

        # Test file writing
        record.wrsamp()
        record_write = wfdb.rdrecord("100", physical=False)

        assert np.array_equal(sig, sig_target)
        assert record.__eq__(record_pn)
        assert record.__eq__(record_named)
        assert record.__eq__(record_write)

    def test_2c(self):
        """
        Format 212, entire signal, physical, odd sampled record.

        Target file created with:
            rdsamp -r sample-data/100_3chan -P | cut -f 2- > record-2c
        """
        record = wfdb.rdrecord("sample-data/100_3chan")
        sig_round = np.round(record.p_signal, decimals=8)
        sig_target = np.genfromtxt("tests/target-output/record-2c")

        # Test file writing
        record.d_signal = record.adc()
        record.wrsamp()
        record_write = wfdb.rdrecord("100_3chan")
        record.d_signal = None

        assert np.array_equal(sig_round, sig_target)
        assert record.__eq__(record_write)

    def test_2d(self):
        """
        Format 310, selected duration, digital
        Target file created with:
            rdsamp -r sample-data/3000003_0003 -f 0 -t 8.21 | cut -f 2- | wrsamp -o 310derive -O 310
            rdsamp -r 310derive -f 0.007 | cut -f 2- > record-2d
        """
        record = wfdb.rdrecord(
            "sample-data/310derive", sampfrom=2, physical=False
        )
        sig = record.d_signal
        sig_target = np.genfromtxt("tests/target-output/record-2d")
        assert np.array_equal(sig, sig_target)

    def test_2e(self):
        """
        Format 311, selected duration, physical.

        Target file created with:
            rdsamp -r sample-data/3000003_0003 -f 0 -t 8.21 -s 1 | cut -f 2- | wrsamp -o 311derive -O 311
            rdsamp -r 311derive -f 0.005 -t 3.91 -P | cut -f 2- > record-2e
        """
        sig, fields = wfdb.rdsamp(
            "sample-data/311derive", sampfrom=1, sampto=978
        )
        sig = np.round(sig, decimals=8)
        sig_target = np.genfromtxt("tests/target-output/record-2e")
        sig_target = sig_target.reshape([977, 1])
        assert np.array_equal(sig, sig_target)

    # --------------------- 3. Multi-dat records --------------------- #

    def test_3a(self):
        """
        Multi-dat, entire signal, digital
        Target file created with:
            rdsamp -r sample-data/s0010_re | cut -f 2- > record-3a
        """
        record = wfdb.rdrecord("sample-data/s0010_re", physical=False)
        sig = record.d_signal
        sig_target = np.genfromtxt("tests/target-output/record-3a")

        # Compare data streaming from Physionet
        record_pn = wfdb.rdrecord(
            "s0010_re", physical=False, pn_dir="ptbdb/patient001"
        )

        # Test file writing
        record.wrsamp()
        record_write = wfdb.rdrecord("s0010_re", physical=False)

        assert np.array_equal(sig, sig_target)
        assert record.__eq__(record_pn)
        assert record.__eq__(record_write)

    def test_3b(self):
        """
        Multi-dat, selected duration, selected channels, physical.

        Target file created with:
            rdsamp -r sample-data/s0010_re -f 5 -t 38 -P -s 13 0 4 8 3 | cut -f 2- > record-3b
        """
        sig, fields = wfdb.rdsamp(
            "sample-data/s0010_re",
            sampfrom=5000,
            sampto=38000,
            channels=[13, 0, 4, 8, 3],
        )
        sig_round = np.round(sig, decimals=8)
        sig_target = np.genfromtxt("tests/target-output/record-3b")

        # Compare data streaming from Physionet
        sig_pn, fields_pn = wfdb.rdsamp(
            "s0010_re",
            sampfrom=5000,
            pn_dir="ptbdb/patient001",
            sampto=38000,
            channels=[13, 0, 4, 8, 3],
        )

        assert np.array_equal(sig_round, sig_target)
        assert np.array_equal(sig, sig_pn) and fields == fields_pn

    # -------------- 4. Skew and multiple samples/frame -------------- #

    def test_4a(self):
        """
        Format 16, multi-samples per frame, skew, digital.

        Target file created with:
            rdsamp -r sample-data/test01_00s_skewframe | cut -f 2- > record-4a
        """
        record = wfdb.rdrecord(
            "sample-data/test01_00s_skewframe", physical=False
        )
        sig = record.d_signal
        # The WFDB library rdsamp does not return the final N samples for all
        # channels due to the skew. The WFDB python rdsamp does return the final
        # N samples, filling in NANs for end of skewed channels only.
        sig = sig[:-3, :]

        sig_target = np.genfromtxt("tests/target-output/record-4a")

        # Test file writing. Multiple samples per frame and skew.
        # Have to read all the samples in the record, ignoring skew
        record_no_skew = wfdb.rdrecord(
            "sample-data/test01_00s_skewframe",
            physical=False,
            smooth_frames=False,
            ignore_skew=True,
        )
        record_no_skew.wrsamp(expanded=True)
        # Read the written record
        record_write = wfdb.rdrecord("test01_00s_skewframe", physical=False)

        assert np.array_equal(sig, sig_target)
        assert record.__eq__(record_write)

    def test_4b(self):
        """
        Format 12, multi-samples per frame, skew, entire signal, digital.

        Target file created with:
            rdsamp -r sample-data/03700181 | cut -f 2- > record-4b
        """
        record = wfdb.rdrecord("sample-data/03700181", physical=False)
        sig = record.d_signal
        # The WFDB library rdsamp does not return the final N samples for all
        # channels due to the skew.
        sig = sig[:-4, :]
        # The WFDB python rdsamp does return the final N samples, filling in
        # NANs for end of skewed channels only.
        sig_target = np.genfromtxt("tests/target-output/record-4b")

        # Compare data streaming from Physionet
        record_pn = wfdb.rdrecord(
            "03700181", physical=False, pn_dir="mimicdb/037"
        )

        # Test file writing. Multiple samples per frame and skew.
        # Have to read all the samples in the record, ignoring skew
        record_no_skew = wfdb.rdrecord(
            "sample-data/03700181",
            physical=False,
            smooth_frames=False,
            ignore_skew=True,
        )
        record_no_skew.wrsamp(expanded=True)
        # Read the written record
        record_write = wfdb.rdrecord("03700181", physical=False)

        assert np.array_equal(sig, sig_target)
        assert record.__eq__(record_pn)
        assert record.__eq__(record_write)

    def test_4c(self):
        """
        Format 12, multi-samples per frame, skew, selected suration,
        selected channels, physical.

        Target file created with:
            rdsamp -r sample-data/03700181 -f 8 -t 128 -s 0 2 -P | cut -f 2- > record-4c
        """
        sig, fields = wfdb.rdsamp(
            "sample-data/03700181", channels=[0, 2], sampfrom=1000, sampto=16000
        )
        sig_round = np.round(sig, decimals=8)
        sig_target = np.genfromtxt("tests/target-output/record-4c")

        # Compare data streaming from Physionet
        sig_pn, fields_pn = wfdb.rdsamp(
            "03700181",
            pn_dir="mimicdb/037",
            channels=[0, 2],
            sampfrom=1000,
            sampto=16000,
        )

        # Test file writing. Multiple samples per frame and skew.
        # Have to read all the samples in the record, ignoring skew
        record_no_skew = wfdb.rdrecord(
            "sample-data/03700181",
            physical=False,
            smooth_frames=False,
            ignore_skew=True,
        )
        record_no_skew.wrsamp(expanded=True)
        # Read the written record
        writesig, writefields = wfdb.rdsamp(
            "03700181", channels=[0, 2], sampfrom=1000, sampto=16000
        )

        assert np.array_equal(sig_round, sig_target)
        assert np.array_equal(sig, sig_pn) and fields == fields_pn
        assert np.array_equal(sig, writesig) and fields == writefields

    def test_4d(self):
        """
        Format 16, multi-samples per frame, skew, read expanded signals

        Target file created with:
            rdsamp -r sample-data/test01_00s_skewframe -P -H | cut -f 2- > record-4d
        """
        record = wfdb.rdrecord(
            "sample-data/test01_00s_skewframe", smooth_frames=False
        )

        # Upsample the channels with lower samples/frame
        expandsig = np.zeros((7994, 3))
        expandsig[:, 0] = np.repeat(record.e_p_signal[0][:-3], 2)
        expandsig[:, 1] = record.e_p_signal[1][:-6]
        expandsig[:, 2] = np.repeat(record.e_p_signal[2][:-3], 2)

        sig_round = np.round(expandsig, decimals=8)
        sig_target = np.genfromtxt("tests/target-output/record-4d")

        assert np.array_equal(sig_round, sig_target)

    def test_header_with_non_utf8(self):
        """
        Ignores non-utf8 characters in the header part.
        """
        record = wfdb.rdrecord("sample-data/test_generator_2")
        sig_units_target = [
            "uV",
            "uV",
            "uV",
            "uV",
            "uV",
            "uV",
            "uV",
            "uV",
            "mV",
            "mV",
            "uV",
            "mV",
        ]
        assert record.units.__eq__(sig_units_target)

    @classmethod
    def tearDownClass(cls):
        "Clean up written files"
        writefiles = [
            "03700181.dat",
            "03700181.hea",
            "100.dat",
            "100.hea",
            "100_3chan.dat",
            "100_3chan.hea",
            "a103l.hea",
            "a103l.mat",
            "s0010_re.dat",
            "s0010_re.hea",
            "s0010_re.xyz",
            "test01_00s.dat",
            "test01_00s.hea",
            "test01_00s_skewframe.hea",
            "n8_evoked_raw_95_F1_R9.dat",
            "n8_evoked_raw_95_F1_R9.hea",
        ]

        for file in writefiles:
            if os.path.isfile(file):
                os.remove(file)


class TestMultiRecord(unittest.TestCase):
    """
    Test read and write of multi segment WFDB records, including
    PhysioNet streaming.

    Target files created using the original WFDB Software Package
    version 10.5.24

    """

    def test_multi_fixed_a(self):
        """
        Multi-segment, fixed layout, read entire signal.

        Target file created with:
            rdsamp -r sample-data/multi-segment/fixed1/v102s -P | cut -f 2- > record-multi-fixed-a
        """
        record = wfdb.rdrecord("sample-data/multi-segment/fixed1/v102s")
        sig_round = np.round(record.p_signal, decimals=8)
        sig_target = np.genfromtxt("tests/target-output/record-multi-fixed-a")

        np.testing.assert_equal(sig_round, sig_target)

    def test_multi_fixed_b(self):
        """
        Multi-segment, fixed layout, selected duration, samples read
        from one segment.

        Target file created with:
            rdsamp -r sample-data/multi-segment/fixed1/v102s -t s75000 -P | cut -f 2- > record-multi-fixed-b
        """
        record = wfdb.rdrecord(
            "sample-data/multi-segment/fixed1/v102s", sampto=75000
        )
        sig_round = np.round(record.p_signal, decimals=8)
        sig_target = np.genfromtxt("tests/target-output/record-multi-fixed-b")

        np.testing.assert_equal(sig_round, sig_target)

    def test_multi_fixed_c(self):
        """
        Multi-segment, fixed layout, selected duration and channels,
        samples read from multiple segments

        Target file created with:
            rdsamp -r sample-data/multi-segment/fixed1/v102s -f s70000 -t s80000 -s 1 0 3 -P | cut -f 2- > record-multi-fixed-c
        """
        record = wfdb.rdrecord(
            "sample-data/multi-segment/fixed1/v102s",
            sampfrom=70000,
            sampto=80000,
            channels=[1, 0, 3],
        )
        sig_round = np.round(record.p_signal, decimals=8)
        sig_target = np.genfromtxt("tests/target-output/record-multi-fixed-c")

        # Option of selecting channels by name
        record_named = wfdb.rdrecord(
            "sample-data/multi-segment/fixed1/v102s",
            sampfrom=70000,
            sampto=80000,
            channel_names=["V", "II", "RESP"],
        )

        np.testing.assert_equal(sig_round, sig_target)
        assert record.__eq__(record_named)

    def test_multi_fixed_d(self):
        """
        Multi-segment, fixed layout, multi-frequency, selected channels

        Target file created with:
            rdsamp -r sample-data/multi-segment/041s/ -s 3 2 1 -H |
            cut -f 2- | sed s/-32768/-2048/ |
            gzip -9 -n > tests/target-output/record-multi-fixed-d.gz
        """
        record = wfdb.rdrecord(
            "sample-data/multi-segment/041s/041s",
            channels=[3, 2, 1],
            physical=False,
            smooth_frames=False,
        )

        # Convert expanded to uniform array (high-resolution)
        sig = np.zeros((record.sig_len * 4, record.n_sig), dtype=int)
        for i, s in enumerate(record.e_d_signal):
            sig[:, i] = np.repeat(s, len(sig[:, i]) // len(s))

        sig_target = np.genfromtxt(
            "tests/target-output/record-multi-fixed-d.gz"
        )

        record_named = wfdb.rdrecord(
            "sample-data/multi-segment/041s/041s",
            channel_names=["ABP", "V", "I"],
            physical=False,
            smooth_frames=False,
        )

        # Sample values should match the output of rdsamp -H
        np.testing.assert_array_equal(sig, sig_target)
        # channel_names=[...] should give the same result as channels=[...]
        self.assertEqual(record, record_named)

    def test_multi_variable_a(self):
        """
        Multi-segment, variable layout, selected duration, samples read
        from one segment only.

        Target file created with:
            rdsamp -r sample-data/multi-segment/s00001/s00001-2896-10-10-00-31 -f s14428365 -t s14428375 -P | cut -f 2- > record-multi-variable-a
        """
        record = wfdb.rdrecord(
            "sample-data/multi-segment/s00001/s00001-2896-10-10-00-31",
            sampfrom=14428365,
            sampto=14428375,
        )
        sig_round = np.round(record.p_signal, decimals=8)
        sig_target = np.genfromtxt(
            "tests/target-output/record-multi-variable-a"
        )

        np.testing.assert_equal(sig_round, sig_target)

    def test_multi_variable_b(self):
        """
        Multi-segment, variable layout, selected duration, samples read
        from several segments.

        Target file created with:
            rdsamp -r sample-data/multi-segment/s00001/s00001-2896-10-10-00-31 -f s14428364 -t s14428375 -P | cut -f 2- > record-multi-variable-b
        """
        record = wfdb.rdrecord(
            "sample-data/multi-segment/s00001/s00001-2896-10-10-00-31",
            sampfrom=14428364,
            sampto=14428375,
        )
        sig_round = np.round(record.p_signal, decimals=8)
        sig_target = np.genfromtxt(
            "tests/target-output/record-multi-variable-b"
        )

        np.testing.assert_equal(sig_round, sig_target)

    def test_multi_variable_c(self):
        """
        Multi-segment, variable layout, entire signal, physical, expanded

        The reference signal creation cannot be made with rdsamp
        directly because the WFDB c package (10.5.24) applies the single
        adcgain and baseline values from the layout specification
        header, which is undesired in multi-segment signals with
        different adcgain/baseline values across segments.

        Target file created with:
        ```
        for i in {01..18}
        do
            rdsamp -r sample-data/multi-segment/s25047/3234460_00$i -P | cut -f 2- >> record-multi-variable-c
        done
        ```

        Entire signal has 543240 samples.
        - 25740 length empty segment.
        - First 16 segments have same 2 channels, length 420000
        - Last 2 segments have same 3 channels, length 97500

        """
        record = wfdb.rdrecord(
            "sample-data/multi-segment/s25047/s25047-2704-05-04-10-44",
            smooth_frames=False,
        )

        # convert expanded to uniform array and round to 8 digits
        sig_round = np.zeros((record.sig_len, record.n_sig))
        for i in range(record.n_sig):
            sig_round[:, i] = np.round(record.e_p_signal[i], decimals=8)

        sig_target_a = np.full((25740, 3), np.nan)
        sig_target_b = np.concatenate(
            (
                np.genfromtxt(
                    "tests/target-output/record-multi-variable-c",
                    skip_footer=97500,
                ),
                np.full((420000, 1), np.nan),
            ),
            axis=1,
        )
        sig_target_c = np.genfromtxt(
            "tests/target-output/record-multi-variable-c", skip_header=420000
        )
        sig_target = np.concatenate((sig_target_a, sig_target_b, sig_target_c))

        np.testing.assert_equal(sig_round, sig_target)

    def test_multi_variable_d(self):
        """
        Multi-segment, variable layout, selected duration, selected
        channels, digital. There are two channels: PLETH, and II. Their
        fmt, adc_gain, and baseline do not change between the segments.

        Target file created with:
            rdsamp -r sample-data/multi-segment/p000878/p000878-2137-10-26-16-57 -f s3550 -t s7500 -s 0 1 | cut -f 2- | perl -p -e 's/-32768/  -128/g;' > record-multi-variable-d

        """
        record = wfdb.rdrecord(
            "sample-data/multi-segment/p000878/p000878-2137-10-26-16-57",
            sampfrom=3550,
            sampto=7500,
            channels=[0, 1],
            physical=False,
        )
        sig = record.d_signal

        # Compare data streaming from Physionet
        record_pn = wfdb.rdrecord(
            "p000878-2137-10-26-16-57",
            pn_dir="mimic3wdb/matched/p00/p000878/",
            sampfrom=3550,
            sampto=7500,
            channels=[0, 1],
            physical=False,
        )
        sig_target = np.genfromtxt(
            "tests/target-output/record-multi-variable-d"
        )

        # Option of selecting channels by name
        record_named = wfdb.rdrecord(
            "sample-data/multi-segment/p000878/p000878-2137-10-26-16-57",
            sampfrom=3550,
            sampto=7500,
            physical=False,
            channel_names=["PLETH", "II"],
        )

        np.testing.assert_equal(sig, sig_target)
        assert record.__eq__(record_pn)
        assert record.__eq__(record_named)


class TestTimeConversion(unittest.TestCase):
    """
    Test cases for time conversion
    """

    def test_single(self):
        """
        Time conversion for a single-segment record

        This checks the get_frame_number, get_elapsed_time, and
        get_absolute_time methods for a Record object.  The example record
        has no base date defined, so attempting to convert to/from absolute
        time should raise an exception.

        """
        header = wfdb.rdheader("sample-data/test01_00s")

        # these time values should be equivalent
        n = 123 * header.fs
        t = datetime.timedelta(seconds=123)
        self.assertEqual(header.get_frame_number(n), n)
        self.assertEqual(header.get_frame_number(t), n)
        self.assertEqual(header.get_elapsed_time(n), t)
        self.assertEqual(header.get_elapsed_time(t), t)

        # record test01_00s has no base date, so absolute time conversions
        # should fail
        self.assertIsNone(header.base_date)
        d = datetime.datetime(2001, 1, 1, 12, 0, 0)
        self.assertRaises(ValueError, header.get_frame_number, d)
        self.assertRaises(ValueError, header.get_absolute_time, n)
        self.assertRaises(ValueError, header.get_absolute_time, t)

    def test_multisegment_with_date(self):
        """
        Time conversion for a multi-segment record with base date

        This checks the get_frame_number, get_elapsed_time, and
        get_absolute_time methods for a MultiRecord object.  The example
        record has a base date, so we can convert timestamps between all
        three of the supported representations.

        """
        header = wfdb.rdheader(
            "sample-data/multi-segment/p000878/p000878-2137-10-26-16-57"
        )

        # these time values should be equivalent
        n = 123 * header.fs
        t = datetime.timedelta(seconds=123)
        d = t + header.base_datetime
        self.assertEqual(header.get_frame_number(n), n)
        self.assertEqual(header.get_frame_number(t), n)
        self.assertEqual(header.get_frame_number(d), n)
        self.assertEqual(header.get_elapsed_time(n), t)
        self.assertEqual(header.get_elapsed_time(t), t)
        self.assertEqual(header.get_elapsed_time(d), t)
        self.assertEqual(header.get_absolute_time(n), d)
        self.assertEqual(header.get_absolute_time(t), d)
        self.assertEqual(header.get_absolute_time(d), d)


class TestSignal(unittest.TestCase):
    """
    For lower level signal tests

    """

    def test_infer_sig_len(self):
        """
        Infer the signal length of a record without the sig_len header
        Read two headers. The records should be the same.
        """

        record = wfdb.rdrecord("sample-data/drive02")
        record_2 = wfdb.rdrecord("sample-data/drive02-no-len")
        record_2.record_name = record.record_name

        assert record_2.__eq__(record)

        record = wfdb.rdrecord("sample-data/a103l")
        record_2 = wfdb.rdrecord("sample-data/a103l-no-len")
        record_2.record_name = record.record_name

        assert record_2.__eq__(record)


class TestDownload(unittest.TestCase):
    # Test that we can download records with no "dat" file
    # Regression test for https://github.com/MIT-LCP/wfdb-python/issues/118
    def test_dl_database_no_dat_file(self):
        wfdb.dl_database("afdb", "./download-tests/", ["00735"])

    # Test that we can download records that *do* have a "dat" file.
    def test_dl_database_with_dat_file(self):
        wfdb.dl_database("afdb", "./download-tests/", ["04015"])

    # Cleanup written files
    @classmethod
    def tearDownClass(self):
        if os.path.isdir("./download-tests/"):
            shutil.rmtree("./download-tests/")


if __name__ == "__main__":
    unittest.main()
    print("Everything passed!")