#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Mon May 13 11:46:23 2019 @author: Sean Maudsley-Barton A set of utilities that help to calculate common sway metrics """ import numpy as np import pandas as pd import os import math import scipy from scipy import signal from scipy.spatial import distance from scipy.misc import electrocardiogram from scipy.signal import find_peaks import matplotlib.pyplot as plt from matplotlib.patches import Ellipse import matplotlib.transforms as transforms from enum import Enum #%% class SOTTrial(Enum): ''' Enumeration for SOT trial types ''' Eyes_Open_Fixed_Surround_and_Support = 1 Eyes_Closed_Fixed_Support = 2 Open_Sway_Referenced_Surrond = 3 Eyes_Open_Sway_Referenced_Support = 4 Eyes_Closed_Sway_Referenced_Support = 5 Eyes_Open_Sway_Referenced_Surround_and_Support = 6 class SwayMetric(Enum): ''' Enumeration for sway metric ''' ALL = 0 RDIST = 1 RDIST_ML = 2 RDIST_AP = 3 MDIST = 4 MDIST_ML = 5 MDIST_AP = 6 TOTEX = 7 TOTEX_ML = 8 TOTEX_AP = 9 MVELO = 10 MVELO_ML = 11 MVELO_AP = 12 MFREQ = 13 MFREQ_ML = 14 MFREQ_AP = 15 AREA_CE = 16 FRAC_DIM = 17 ROMBERG_RATIO = 18 ROMBERG_RATIO_FOAM = 19 class DeviceType(Enum): ''' Enumeration for sway metric ''' BALANCE_MASTER = 1 KINECT = 2 class SwayGroup(Enum): ''' Enumeration for sway groups ''' All = 0 All_healthy = 1 All_fallers = 2 Young = 3 Middle = 4 Old = 5 Faller_by_history = 6 Faller_by_history_single = 7 Faller_by_history_muliple = 8 Faller_by_miniBEStest = 9 Young_vs_old = 10 Old_vs_all_fallers = 11 Old_vs_single_fallers = 12 Young_vs_all_fallers = 13 Young_and_Middle = 14 SPINEBASE = 0 SPINEMID = 1 NECK = 2 HEAD = 3 SHOULDERLEFT = 4 ELBOWLEFT = 5 WRISTLEFT = 6 HANDLEFT = 7 SHOULDERRIGHT = 8 ELBOWRIGHT = 9 WRISTRIGHT = 10 HANDRIGHT = 11 HIPLEFT = 12 KNEELEFT = 13 ANKLELEFT = 14 FOOTLEFT = 15 HIPRIGHT = 16 KNEERIGHT = 17 ANKLERIGHT = 18 FOOTRIGHT = 19 SPINESHOULDER = 20 HANDTIPLEFT = 21 THUMBLEFT = 22 HANDTIPRIGHT = 23 THUMBRIGHT = 24 X = 0 Y = 1 Z = 2 #%% def normalise_skeleton(skel_frame, spine_base_joint): normalised_skel_frame = np.copy(skel_frame) # x = 0 # y = 1 # z = 2 for i in range(skel_frame.shape[0]): normalised_skel_frame[i][2] = str(100*(float(skel_frame[i][2]) - spine_base_joint[X])) normalised_skel_frame[i][3] = str(100*(float(skel_frame[i][3]) - spine_base_joint[Y])) normalised_skel_frame[i][4] = str(100*(float(skel_frame[i][4]) - spine_base_joint[Z])) return normalised_skel_frame def calculate_com(skelFrame): _X = 2 _Y = 3 _Z = 4 spine_shoulder = np.stack([float(skelFrame[SPINESHOULDER, _X]), float(skelFrame[SPINESHOULDER, _Y]), float(skelFrame[SPINESHOULDER, _Z])], axis=0) spine_base = np.stack([float(skelFrame[SPINEBASE, _X]), float(skelFrame[SPINEBASE, _Y]), float(skelFrame[SPINEBASE, _Z])], axis=0) spine_mid = np.stack([float(skelFrame[SPINEMID, _X]), float(skelFrame[SPINEMID, _Y]), float(skelFrame[SPINEMID, _Z])], axis=0) hip_left = np.stack([float(skelFrame[HIPLEFT, _X,]), float(skelFrame[HIPLEFT, _Y,]), float(skelFrame[HIPLEFT, _Z])], axis=0) hip_right = np.stack([float(skelFrame[HIPRIGHT, _X]), float(skelFrame[HIPRIGHT, _Y]), float(skelFrame[HIPRIGHT, _Z])], axis=0) xMean = np.mean([spine_mid[0],hip_left[0],hip_right[0]]) yMean = np.mean([spine_mid[1],hip_left[1],hip_right[1]]) zMean = np.mean([spine_mid[2],hip_left[2],hip_right[2]]) com = np.stack([xMean,yMean,zMean],axis=0) return com.tolist() def euclidean_distance_between_joints(j1, j2): ed = np.sqrt(np.square(j1[X] - j2[X]) + np.square(j1[Y] - j2[Y]) + np.square(j1[Z] - j2[Z])) return ed def cosine_distance_between_joints(j1, j2): cd = distance.cosine(j1, j2) return cd def normalised_magnitude_between_joints(j, j_ref): mag_j = np.linalg.norm(j) mag_j_ref = np.linalg.norm(j_ref) nn = mag_j / mag_j_ref return nn def get_joint_XYZ(skel_frame_row): x = float(skel_frame_row[2]) y = float(skel_frame_row[3]) z = float(skel_frame_row[4]) return [x, y, z] def eulidean_distance_2d(j1, j2): ed = np.sqrt(np.square(j1[0] - j2[0]) + np.square(j1[1] - j2[1])) return ed def get_angle(j1, j2, j3): ba = j1 - j2 bc = j3 - j2 cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc)) rad_angle = np.arccos(cosine_angle) deg_angle = np.degrees(rad_angle) return deg_angle def get_AP_angle_between_three_joints_matrix(j1, j2, j3): l1 = np.sqrt(np.square(j1[2] - j2[2])) l2 = np.sqrt(np.square(j2[2] - j3[2])) rad_between_three_joints = np.arccos(l2/l1) deg_between_three_joints = np.degrees(rad_between_three_joints) return deg_between_three_joints def get_2d_angle_between_three_joints(j1, j2, j3, plane='SP', deg=True): #here #AP = [Z, Y] #ML = [X, Y] if plane == 'SP': co_plane = [Z, Y] # SI, AP | Sagital Plane elif plane == 'FP': co_plane = [X, Y] # [X, Y] # ML, SI | Frontal Plane elif plane == 'TP': co_plane = [X, Z] # [X, Z] # ML, AP | Transverse Plane a = eulidean_distance_2d([j1[co_plane[0]], j1[co_plane[1]]], [j2[co_plane[0]], j2[co_plane[1]]]) b = eulidean_distance_2d([j2[co_plane[0]], j2[co_plane[1]]], [j3[co_plane[0]], j3[co_plane[1]]]) #j2, j3) c = eulidean_distance_2d([j3[co_plane[0]], j3[co_plane[1]]], [j1[co_plane[0]], j1[co_plane[1]]]) #j3, j1) angle_C = np.arccos((a**2 + b**2 - c**2) / (2 * a * b)) if deg: angle_C = np.degrees(angle_C) return angle_C def get_3d_angle_between_three_joints(j1, j2, j3, deg=True): SP_angle = get_2d_angle_between_three_joints(j1, j2, j3, plane='SP', deg=deg) FP_angle = get_2d_angle_between_three_joints(j1, j2, j3, plane='FP', deg=deg) TP_angle = get_2d_angle_between_three_joints(j1, j2, j3, plane='TP', deg=deg) #eular_angle_3d = np.hstack([x_YZ_euler, y_XZ_euler, z_XY_euler]) SP_FP_TP_angle_3d = np.hstack([SP_angle, FP_angle, TP_angle]) return SP_FP_TP_angle_3d #%% def get_unique_values(full_list): ''' Gets unique values for a list, but preserves order, unlike numpy.unnique ''' unnique_list = [] for item in full_list: if item not in unnique_list: unnique_list.append(item) return unnique_list def eigsorted(cov): ''' Eigenvalues and eigenvectors of the covariance matrix. ''' vals, vecs = np.linalg.eigh(cov) order = vals.argsort()[::-1] return vals[order], vecs[:, order] def confidence_ellipse(x, y, ax, n_std=1.96, facecolor='none', return_ellipse=True, **kwargs): ''' Create a plot of the covariance confidence ellipse of `x` and `y` Parameters ---------- x, y : array_like, shape (n, ) Input data. ax : matplotlib.axes.Axes The axes object to draw the ellipse into. n_std : float The number of standard deviations to determine the ellipse's radiuses. 1.96 SD = 95% confidence ellipse Returns ------- matplotlib.patches.Ellipse Other parameters ---------------- kwargs : `~matplotlib.patches.Patch` properties ''' if x.size != y.size: raise ValueError("x and y must be the same size") cov = np.cov(x, y) #eigvals, eigvecs = eigsorted(cov) eigvals, eigvecs = np.linalg.eigh(cov) angle = np.degrees(np.arctan2(*eigvecs[:, 0][::-1])) eigvals = [ [eigvals[0], 0], [0, eigvals[1]] ] axes = 2.4478 * np.sqrt(scipy.linalg.svdvals(eigvals)) height = axes[0] * 2 width = axes[1] * 2 area = np.pi * np.prod(axes) ellipse = Ellipse(xy=(np.mean(x), np.mean(y)), width=width, height=height, angle=angle, facecolor=facecolor, **kwargs) if return_ellipse == True: return ax.add_patch(ellipse), height, width, area, angle else: return height, width, area, angle def filter_signal(ML_path, AP_path=[], CC_path=np.array([]), N=2, fc=10, fs=30): ''' N = order of the filer - usually 1, 2 or 4 fc = Cut-off frequency of the filter - usually 10 or 6 fs = 30 Wn = fc / (fs / 2) # Normalize the frequency ''' Wn = np.pi * fc / (2 * fs) # Normalize the frequency #b, a = signal.butter(N, Wn, btype='low', fs=fs) b, a = signal.butter(N, Wn, btype='low') filtered_ML_path = signal.filtfilt(b, a, ML_path) if len(AP_path) != 0: filtered_AP_path = signal.filtfilt(b, a, AP_path) if len(CC_path) != 0: filtered_CC_path = signal.filtfilt(b, a, CC_path) if len(CC_path) != 0: return filtered_ML_path, filtered_AP_path, filtered_CC_path if len(AP_path) != 0: return filtered_ML_path, filtered_AP_path else: return filtered_ML_path def calculate_RD(selected_recording, deviceType = DeviceType.KINECT, rd_path = '', part_id = '', SOT_trial_type = ''): ''' Calulate resultant distance, that is the distance from each point on the raw CoM path to the mean of the time series ad displays RD path and sway area Saves AREA_CE image if rd_parh is filled in ''' if deviceType == DeviceType.KINECT: ML_path = selected_recording['CoGx'].values.astype(float) AP_path = selected_recording['CoGz'].values.astype(float) ML_path, AP_path = filter_signal(ML_path, AP_path, fc=8) elif deviceType == DeviceType.BALANCE_MASTER: ML_path = selected_recording['CoGx'].values.astype(float) AP_path = selected_recording['CoGy'].values.astype(float) mean_ML = np.mean(ML_path) mean_AP = np.mean(AP_path) ML = np.abs(np.subtract(ML_path, mean_ML)) AP = np.abs(np.subtract(AP_path, mean_AP)) #ML = np.sqrt(np.square(np.subtract(ML_path, mean_ML))) #AP = np.sqrt(np.square(np.subtract(AP_path, mean_AP))) #get hypotinuse of ML_RD and AP_RD RD = np.sqrt(np.add(np.square(ML),np.square(AP))) fig = plt.figure() ax = fig.add_subplot(111) ax.set_xlim([-8, 8]) ax.set_ylim([-8, 8]) elp, width, height, AREA_CE = confidence_ellipse(ML_path, AP_path, ax, n_std=1.96, edgecolor='red') ax.scatter(ML_path, AP_path, s=3) AREA_CE = round(AREA_CE, 2) ax.set_title(str.replace(SOT_trial_type, '-', ' ') + ' ' + part_id + ' CoM 95% CE' + '\nW:' + str(round(width, 2)) + ' cm' + ' x ' + 'H:' + str(round(height, 2)) + ' cm' + ' Ar:' + str(AREA_CE) + ' cm sq') ax.set_aspect('equal') if rd_path != '': plt.savefig(rd_path) plt.show() fig = plt.figure() ax = fig.add_subplot(111) elp, width, height, AREA_CE = confidence_ellipse(ML_path, AP_path, ax, n_std=1.96, edgecolor='red') ax.scatter(ML_path, AP_path, s=3) AREA_CE = round(AREA_CE, 2) ax.set_title(str.replace(SOT_trial_type, '-', ' ') + ' ' + part_id + ' CoM 95% CE' + '\nW:' + str(round(width, 2)) + ' cm' + ' x ' + 'H:' + str(round(height, 2)) + ' cm' + ' Ar:' + str(AREA_CE) + ' cm sq') ax.set_aspect('equal') if rd_path != '': detailed_rd_path = rd_path.replace('confidence_ellipse', 'confidence_ellipse_detailed') plt.savefig(detailed_rd_path) plt.show() return ML, AP, RD, AREA_CE def calculate_RD_3D(selected_recording, deviceType = DeviceType.KINECT,): ''' Calulate resultant distance ''' if deviceType == DeviceType.KINECT: ML_path = selected_recording['CoGx'].values.astype(float) AP_path = selected_recording['CoGz'].values.astype(float) UD_path = selected_recording['CoGy'].values.astype(float) ML_path, AP_path = filter_signal(ML_path, AP_path) elif deviceType == DeviceType.BALANCE_MASTER: ML_path = selected_recording['CoGx'].values.astype(float) AP_path = selected_recording['CoGy'].values.astype(float) mean_ML = np.mean(ML_path) mean_AP = np.mean(AP_path) mean_UD = np.mean(UD_path) ML_RD = np.sqrt(np.square(np.subtract(ML_path, mean_ML))) AP_RD = np.sqrt(np.square(np.subtract(AP_path, mean_AP))) UD_RD = np.sqrt(np.square(np.subtract(UD_path, mean_UD))) RD = np.sqrt(np.square(np.subtract(ML_path, mean_ML)) + np.square(np.subtract(AP_path, mean_AP)) + np.square(np.subtract(UD_path, mean_UD))) return ML_RD, AP_RD, UD_RD, RD def calculate_TOTEX(ML, AP): arr_ML_diff = [] arr_AP_diff = [] arr_tot_diff = [] ML_TOTEX = 0 AP_TOTEX = 0 TOTEX = 0 for step in range(1, len(AP)): ML_curr = ML[step] AP_curr = AP[step] ML_prev = ML[step - 1] AP_prev = AP[step - 1] ML_diff = abs(ML_curr - ML_prev) AP_diff = abs(AP_curr - AP_prev) #ML_diff = np.sqrt(np.square(ML_curr - ML_prev)) #AP_diff = np.sqrt(np.square(AP_curr - AP_prev)) arr_ML_diff.append(ML_diff) arr_AP_diff.append(AP_diff) # get hypotinuse of ML and AP diff RD_diff = np.sqrt(np.add(np.square(ML_diff), np.square(AP_diff))) arr_tot_diff.append(RD_diff) ML_TOTEX = np.sum(arr_ML_diff) AP_TOTEX = np.sum(arr_AP_diff) TOTEX = np.sum(arr_tot_diff) N = len(AP) d = max(arr_tot_diff) FD = np.log(N) / np.log((N * d) / TOTEX) return ML_TOTEX, AP_TOTEX, TOTEX, FD def calculate_sway_from_recording(selected_recording, selected_recording_name, pID, age, sex, SOT_trial_type, tNum, swayMetric = SwayMetric.ALL, deviceType = DeviceType.KINECT, impairment_self = 'healthy', impairment_confedence = 'healthy', impairment_clinical = 'healthy', impairment_stats = 'healthy', dp = -1, rd_path = '', start = 0, end = 600): ''' Calculates sway from Kinect or Balance master recordings ''' cliped_recording = selected_recording[start : end] ML, AP, RD, AREA_CE = calculate_RD(cliped_recording, deviceType, rd_path, pID, SOT_trial_type) recording_length = len(RD) #--mean DIST and rms DIST MDIST_ML = np.sum(ML) / recording_length MDIST_AP = np.sum(AP) / recording_length MDIST = np.sum(RD) / recording_length RDIST_ML = np.sqrt(np.sum(np.square(ML) / recording_length)) RDIST_AP = np.sqrt(np.sum(np.square(AP) / recording_length)) RDIST = np.sqrt(np.sum(np.square(RD) / recording_length)) #--Total Excursion - TOTEX TOTEX_ML, TOTEX_AP, TOTEX, FD = calculate_TOTEX(ML, AP) #--Mean Velocity - MVELO if deviceType == DeviceType.KINECT: T = recording_length / 30 elif deviceType == DeviceType.BALANCE_MASTER: T = recording_length / 100 MVELO_ML = TOTEX_ML / T MVELO_AP = TOTEX_AP / T MVELO = TOTEX / T #--Mean Fequency - MFREQ MFREQ_ML = MVELO_ML / (4*(np.sqrt(2 * MDIST_ML))) MFREQ_AP = MVELO_AP / (4*(np.sqrt(2 * MDIST_AP))) MFREQ = MVELO / (2 * np.pi * MDIST) #TOTEX MVELO RDIST if swayMetric == SwayMetric.RDIST: swayVal = RDIST elif swayMetric == SwayMetric.RDIST_ML: swayVal = RDIST_ML elif swayMetric == SwayMetric.RDIST_AP: swayVal = RDIST_AP elif swayMetric == SwayMetric.MDIST: swayVal = MDIST elif swayMetric == SwayMetric.MDIST_ML: swayVal = MDIST_ML elif swayMetric == SwayMetric.MDIST_AP: swayVal = MDIST_AP elif swayMetric == SwayMetric.TOTEX: swayVal = TOTEX elif swayMetric == SwayMetric.TOTEX_ML: swayVal = TOTEX_ML elif swayMetric == SwayMetric.TOTEX_AP: swayVal = TOTEX_AP elif swayMetric == SwayMetric.MVELO: swayVal = MVELO elif swayMetric == SwayMetric.MVELO_ML: swayVal = MVELO_ML elif swayMetric == SwayMetric.MVELO_AP: swayVal = MVELO_AP elif swayMetric == SwayMetric.MFREQ: swayVal = MFREQ elif swayMetric == SwayMetric.MFREQ_ML: swayVal = MFREQ_ML elif swayMetric == SwayMetric.MVELO_AP: swayVal = MFREQ_AP elif swayMetric == SwayMetric.AREA_CE: swayVal = AREA_CE elif swayMetric == SwayMetric.FRAC_DIM: swayVal = FD tmpSway = [] if dp != -1: swayVal = round(swayVal, dp) if swayMetric == SwayMetric.ALL: tmpSway.append([pID, selected_recording_name, tNum, age, sex, impairment_self, impairment_confedence, impairment_clinical, impairment_stats, swayMetric.name, RDIST_ML, RDIST_AP, RDIST, MDIST_ML, MDIST_AP, MDIST, TOTEX_ML, TOTEX_AP, TOTEX, MVELO_ML, MVELO_AP, MVELO, MFREQ_ML, MFREQ_AP, MFREQ, AREA_CE]) else: tmpSway.append([pID, selected_recording_name, tNum, age, sex, impairment_self, impairment_confedence, impairment_clinical, impairment_stats, swayMetric.name, swayVal]) return tmpSway #%% Balance master Utils def load_balance_master_file(rootDir, participantID, age, kinectTrialType, trialNumber = 1, swayMetric = SwayMetric.RDIST): root = '' dirs = [] columns = '' arrayOfRows = [] dfFinal = pd.DataFrame(arrayOfRows) for root, dirs, _ in os.walk(rootDir): break dirs.sort() selected_trial = '' for dirName in dirs: if participantID in dirName: print(dirName) rootFilePath = os.path.join(root, dirName, 'cm') trialRoot = '' trialfiles = [] for trialRoot, _, trialfiles in os.walk(rootFilePath): break found = False for trial in trialfiles: #if 'SOT' in trial and str('T'+ str(kinectTrialType.value)) in trial: if 'SOT' in trial and str('C'+ str(kinectTrialType.value)) in trial and str('T'+ str(trialNumber)) in trial: print('Collating:', trialRoot + '/' + trial) selected_trial = trial trialFilePath = os.path.join(trialRoot, trial) bMTrial = pd.read_csv(trialFilePath , sep="\t") i = 0 for row in bMTrial[25:].iterrows(): parsedRow = row[1][0] arrRow = parsedRow.split('\t') strArrRow = str.split(arrRow[0]) if i == 0: #columns = strArrRow columns = ['DP', 'LF', 'RR', 'SH', 'LR', 'RF', 'CoFx', 'CoFy', 'CoGx', 'CoGy'] else: arrayOfRows.append(strArrRow) i += 1 found = True break if found: dfFinal = pd.DataFrame(arrayOfRows, columns=columns) else: print('can not find ', trialRoot + '/' + trial) #break return dfFinal, selected_trial