import os import posixpath import struct import numpy as np from wfdb.io import Record from wfdb.io import download from wfdb.io import _url from wfdb.io.record import rdrecord def wfdb_to_wav( record_name, pn_dir=None, sampfrom=0, sampto=None, channels=None, output_filename="", write_header=False, ): """ This program converts a WFDB record into .wav format (format 16, multiplexed signals, with embedded header information). Parameters ---------- record_name : str The name of the input WFDB record to be read. Can also work with both EDF and WAV files. pn_dir : str, optional Option used to stream data from Physionet. The Physionet database directory from which to find the required record files. eg. For record '100' in 'http://physionet.org/content/mitdb' pn_dir='mitdb'. sampfrom : int, optional The starting sample number to read for all channels. sampto : int, 'end', optional The sample number at which to stop reading for all channels. Reads the entire duration by default. channels : list, optional List of integer indices specifying the channels to be read. Reads all channels by default. output_filename : str, optional The desired name of the output file. If this value set to the default value of '', then the output filename will be 'REC.wav'. write_header : bool, optional Whether to write (True) or not to write (False) a header file to accompany the generated WAV file. The default value is 'False'. Returns ------- N/A Examples -------- >>> wfdb_to_wav('100', pn_dir='pwave') The output file name is '100.wav' """ record = rdrecord( record_name, pn_dir=pn_dir, sampfrom=sampfrom, sampto=sampto, smooth_frames=False, ) record_name_out = record_name.split(os.sep)[-1].replace("-", "_") # Get information needed for the header and format chunks num_samps = record.sig_len samps_per_second = record.fs frame_length = record.n_sig * 2 chunk_bytes = num_samps * frame_length file_bytes = chunk_bytes + 36 bits_per_sample = max(record.adc_res) offset = record.adc_zero shift = [(16 - v) for v in record.adc_res] # Start writing the file if output_filename != "": if not output_filename.endswith(".wav"): raise Exception("Name of output file must end in '.wav'") else: output_filename = record_name_out + ".wav" with open(output_filename, "wb") as f: # Write the WAV file identifier f.write(struct.pack(">4s", b"RIFF")) # Write the number of bytes to follow in the file # (num_samps*frame_length) sample bytes, and 36 more bytes of miscellaneous embedded header f.write(struct.pack("8s", b"WAVEfmt ")) # Number of bytes to follow in the format chunk f.write(struct.pack("4s", b"data")) # The number of bytes in the signal data chunk f.write(struct.pack(">> record = read_wav('sample-data/SC4001E0-PSG.wav') """ if not record_name.endswith(".wav"): raise Exception("Name of the input file must end in .wav") if pn_dir is not None: if "." not in pn_dir: dir_list = pn_dir.split("/") pn_dir = posixpath.join( dir_list[0], download.get_version(dir_list[0]), *dir_list[1:] ) file_url = posixpath.join(download.PN_INDEX_URL, pn_dir, record_name) # Currently must download file to read it though can give the # user the option to delete it immediately afterwards with _url.openurl(file_url, "rb") as f: open(record_name, "wb").write(f.read()) wave_file = open(record_name, mode="rb") record_name_out = ( record_name.split(os.sep)[-1].replace("-", "_").replace(".wav", "") ) chunk_ID = "".join( [s.decode() for s in struct.unpack(">4s", wave_file.read(4))] ) if chunk_ID != "RIFF": raise Exception("{} is not a .wav-format file".format(record_name)) correct_chunk_size = os.path.getsize(record_name) - 8 chunk_size = struct.unpack("4s", wave_file.read(4))[0].decode() if fmt != "WAVE": raise Exception("{} is not a .wav-format file".format(record_name)) subchunk1_ID = struct.unpack(">4s", wave_file.read(4))[0].decode() if subchunk1_ID != "fmt ": raise Exception("Format chunk missing or corrupt") subchunk1_size = struct.unpack(" 1: print("PCM has compression of {}".format(audio_format)) if (subchunk1_size != 16) or (audio_format != 1): raise Exception("Unsupported format {}".format(audio_format)) num_channels = struct.unpack("4s", wave_file.read(4))[0].decode() if subchunk2_ID != "data": raise Exception("Format chunk missing or corrupt") correct_subchunk2_size = os.path.getsize(record_name) - 44 subchunk2_size = struct.unpack("