GRABMyoFlow - Dataset extension 1.0.0

File: <base>/grabmyoflow_wfdb_to_mat_dynamic.py (5,869 bytes)
import sys
import os
import shutil
import wfdb
import numpy as np
from scipy.io import savemat
import tkinter as tk
from tkinter import filedialog

# =============================================================================
# README: WFDB to MAT Conversion Script (GRABMyoFlow dynamic Dataset)
# =============================================================================
# This script processes raw WFDB data files (.dat, .hea) from the GRABMyoFlow
# dynamic dataset (30 gestures, 2 trials each) and converts them into MATLAB (.mat) files.
#
# UPDATED LOGIC:
# - Removes channels 1, 8, 9, 16 (corresponding to U1, U2, U3, U4).
# - Keeps only the 12 Wrist channels.
# =============================================================================


# =============================================================================
# STEP 0: CONSTANTS AND INITIAL SETUP
# =============================================================================

root = tk.Tk()
root.withdraw()

# --- Dataset Constants ---
NSESSION = 3
NGEESTURE = 30 
NTRIALS = 2    

# --- Dataset Mask (16 Channels) ---
# Keeps 12 Wrist channels (Skipping indices 1, 7, 8, 15)
EXTENSION_INDICES_KEEP = list(range(1, 7)) + list(range(9, 15))


# =============================================================================
# STEP 1: USER CHOICE AND PATH SELECTION
# =============================================================================

print("Please select the base folder ...\GRABMyoFlow_1.0\dynamic")
BASE_INPUT = filedialog.askdirectory(title="Select base folder containing session data")

if not BASE_INPUT:
    print("No input folder selected. Exiting Script!")
    sys.exit()

# --- Output Path Determination ---
parent_dir = os.path.dirname(BASE_INPUT.rstrip("\\/"))
OUTPUT_ROOT = os.path.join(parent_dir, f"dynamic_MATLAB")

print(f"\nSelected input: {BASE_INPUT}")
print(f"Output folder: {OUTPUT_ROOT}")


# =============================================================================
# STEP 2: OUTPUT FOLDER HANDLING (Overwrite check)
# =============================================================================
if not os.path.exists(OUTPUT_ROOT):
    os.makedirs(OUTPUT_ROOT)
    print(f"Created output folder: {OUTPUT_ROOT}")
else:
    while True:
        print(f"Found existing folder in: {OUTPUT_ROOT}")
        cont = input("Overwrite it (Y/N)? ").upper()
        if cont in ('Y', 'N'):
            if cont == 'Y':
                print("Overwriting...")
                shutil.rmtree(OUTPUT_ROOT)
                os.makedirs(OUTPUT_ROOT)
                break
            else:
                print("Exiting Script!")
                sys.exit()

session_paths = [os.path.join(BASE_INPUT, f"session{i}") for i in (1, 2, 3)]
session1_path = session_paths[0]

if not os.path.exists(session1_path):
    print(f"[ERROR] Session folder not found: {session1_path}")
    sys.exit()

nsub = len([d for d in os.listdir(session1_path) if os.path.isdir(os.path.join(session1_path, d))])


# =============================================================================
# STEP 3: MAIN CONVERSION LOOP
# =============================================================================
count = 0

for isession in range(1, NSESSION + 1):
    converted_folder = f"session{isession}"
    output_session_dir = os.path.join(OUTPUT_ROOT, converted_folder)
    os.makedirs(output_session_dir, exist_ok=True) 
    
    session_dir = os.path.join(BASE_INPUT, f"session{isession}")
    if not os.path.exists(session_dir):
        print(f"[WARN] Input session folder not found: {session_dir}")
        continue

    for isub in range(1, nsub + 1):
        foldername = f"session{isession}_participant{isub}"
        participant_dir = os.path.join(session_dir, foldername)

        if not os.path.exists(participant_dir):
            print(f"[WARN] Participant folder missing: {participant_dir}")
            continue

        matrices_wrist = np.empty((NTRIALS, NGEESTURE), dtype=object)

        for igesture in range(1, NGEESTURE + 1):
            for itrial in range(1, NTRIALS + 1):
                filename = f"session{isession}_participant{isub}_gesture{igesture}_trial{itrial}"
                filepath = os.path.join(participant_dir, filename)

                if not os.path.exists(filepath + ".dat"):
                    print(f"[WARN] Missing file: {filepath}.dat")
                    continue
                
                try:
                    record = wfdb.rdrecord(filepath)
                    data_emg = record.p_signal 

                    # Split 16ch (Wrist only) to 12ch.                        
                    if data_emg.shape[1] >= 16:
                        data_wrist = data_emg[:, EXTENSION_INDICES_KEEP]
                    else:
                        print(f"{filename}: Unexpected channel count ({data_emg.shape[1]}ch). Using all available channels.")
                        data_wrist = data_emg

                    matrices_wrist[itrial - 1, igesture - 1] = data_wrist

                except Exception as e:
                    print(f"[ERROR] Reading {filename}: {e}")

        count += 1
        print(f"[OK] Converted participant {isub} of session {isession} ({count} total)")

        # =============================================================================
        # STEP 4: SAVE OUTPUT
        # =============================================================================
        mat_filename = f"{foldername}.mat"
        try:
            savemat(os.path.join(output_session_dir, mat_filename),
                    {"DATA_WRIST": matrices_wrist})
        except Exception as e:
            print(f"[FATAL ERROR] Could not save {mat_filename}: {e}")

print("\nEnd of Script.")