PTB-XL+, a comprehensive electrocardiographic feature dataset 1.0.0
(11,117 bytes)
import pandas as pd
import numpy as np
import networkx as nx
# Set paths
path_snomed_concept = "CONCEPT.csv"
path_snomed_relationship = "CONCEPT_RELATIONSHIP.csv"
path_12sl = "../12sl_statements.csv"
path_12sl_mapped_to_snomed = "../12slv23ToSNOMED.csv"
path_ptbxl = "../ptbxl_statements.csv"
path_ptbxl_mapped_to_snomed = "../ptbxlToSNOMED.csv"
# Utility functions
def flatten(l):
return [item for sublist in l for item in sublist]
def flatten_unique(l):
return list(set(flatten(l)))
def convert_to_int(x):
try:
xi = int(x)
except:
xi = -1
return xi
def get_parents(concept_id, df_relationship):
lst = list(df_relationship[(df_relationship.concept_id_1==concept_id) & (df_relationship.relationship_id == "Is a")]["concept_id_2"])
return [x for x in lst if x!=concept_id]
def get_name_from_id(concept_id, df_concept,include_id=False):
selection = df_concept[df_concept.concept_id ==concept_id]
if(len(selection)>0):
name = selection.iloc[0]["concept_name"]
if(include_id):
name = name + "["+str(concept_id)+"]"
else:
name = "invalid id ["+str(concept_id)+"]"
print("Invalid ID:",concept_id)
return name
def get_id_from_code(concept_code, df_concept):
selection = df_concept[df_concept.concept_code ==concept_code]
if(len(selection)>0):
return selection.iloc[0]["concept_id"]
else:
print("Invalid code:",concept_code)
return -1
def get_name_from_code(concept_code, df_concept,include_id=False):
idx = get_id_from_code(concept_code,df_concept)
return get_name_from_id(idx,df_concept,include_id)
def populate_graph(lst,G=None,lst_processed=[],include_id=False,only_id=False):
if(G is None):
G = nx.DiGraph()
if(len(lst)==0):
return G
else:
tag = lst[0] if only_id else get_name_from_id(lst[0], df_concept,include_id=include_id)
for p in get_parents(lst[0], df_relationship):
tagp = p if only_id else get_name_from_id(p, df_concept, include_id=include_id)
if(not p in lst_processed):
lst.append(p)
G.add_edge(tagp,tag)
lst0=lst[0]
lst.pop(0)
return populate_graph(list(set(lst)),G,lst_processed+[lst0], include_id=include_id, only_id=only_id)
def get_uppropagated_labels(key_lst,G,exclude_snomed_id_lst=[]):
result=[]
for key in key_lst:
#assumes DAG
for s in [key]:
tmp=[l for l in list(nx.ancestors(G,s)) if not l in exclude_snomed_id_lst]
if not s in exclude_snomed_id_lst:
tmp+=[s]
result+=tmp
return list(set(result))
def reformat_ge(x,remove_brackets=True):
if(remove_brackets):
x= " ".join(x).replace(" COMMA "," ").replace(" LPAREN "," ").replace(" RPAREN "," ")
else:
x= " ".join(x).replace(" COMMA ",";,;").replace(" LPAREN ",";(;").replace(" RPAREN",";)")#human readable
x= x.replace(" $SWITH ",";$SWITH;").replace(" $SOR ",";$SOR;").replace(" $SAND ",";$SAND;")#with, or, and
for s in ["AC","AU","OLD","NEW"]:#infarction combine with previous
x= x.replace(" "+s,";"+s)
for s in ["BLKED","ACCEL","PO","CRO"]:#combine with next
x= x.replace(s+" ",s+";")
for s in ["FAV","SPR","MBZI","MBZII","SAV","CHB","VAVB",
"AVDIS","W2T1","W3T1","W4T1","RVR","SVR","CJP",
"IRREG","ABER","PROAV","CSEC","BIGEM","JESC","VESC",
"$SRETC","SAR","MSAR","RVE+","QRSW","2ST","QRSW-2ST","MAFB"]:#with... combine with previous
x= x.replace(" "+s,";"+s)
x=x.replace(" OCC ",";OCC;")
x=x.replace(" FREQ ",";FREQ;")
x=x.replace("ST& ","ST&;")
return x
def minimal_extension(x,with_uncert=True):#only bind AC and AU
if(with_uncert):
x=x.replace("PO; ","PO;").replace("CRO; ","CRO;")
else:
x=x.replace("PO;","").replace("CRO;","")
#remove non-informative tokens
for l in ["$SWITH;","$SOR;","$SAND;","OCC;","FREQ;"]:
x = x.replace(l,"")
x= x.replace("LVH ","")
for l in ["CSEC","RAVL","SOKOLYON","CORNPROD","ROMESTES","QRSV","LVH3","LPAREN","RPAREN"]:
x = x.replace(" "+l,"")
x = x.replace(l+" ","")
#fix issues with ACCEL and swap MAFB and AU first
return x.replace(";ACCEL"," ACCEL").replace("MAFB;AU","AU;MAFB").replace(";AU","xxxAU").replace(";AC","xxxAC").replace("PO;","POxxx").replace("CRO;","CROxxx").replace(";"," ").replace("xxx",";").strip()
def apply_snomed_mapping_ge(x):
if(x.startswith("PO;")):
cert=uncertainty_mapping["PO"]
statement=x[3:]
elif(x.startswith("CRO;")):
cert=uncertainty_mapping["CRO"]
statement=x[4:]
else:
cert=100.
statement=x
x_mapped=ge_to_snomed[statement]
return [(x[0],min(x[1],cert)) for x in x_mapped] #in doubt take the less certain value
def map_certainty_statements_ext(x):
if(x.startswith("PO;")):
cert=uncertainty_mapping["PO"]
statement=x[3:]
elif(x.startswith("CRO;")):
cert=uncertainty_mapping["CRO"]
statement=x[4:]
else:
cert=100.
statement=x
return (statement,cert)
def map_infarction_stadium(stadium1, stadium2):
if(stadium1 in ['Stadium I', 'Stadium I-II']):
return 1
if(stadium2 in ['Stadium I', 'Stadium I-II']):
return 1
if(stadium1 == 'Stadium II'):
return 2
if(stadium2 == 'Stadium II'):
return 2
if(stadium1 in ['Stadium III', 'Stadium II-III']):
return 3
if(stadium2 in ['Stadium III', 'Stadium II-III']):
return 3
return 0
def map_mi_labels(labels, infarction_stadium):
mi_labels = ["IMI","ASMI","ILMI","AMI","ALMI","INJAS","LMI","INJAL","IPLMI","IPMI","INJIN","PMI","INJLA","INJIL"]
if(infarction_stadium==1):#add more specific acute labels
return labels + [(l[0]+"_AC",l[1]) for l in labels if l[0] in mi_labels]
elif(infarction_stadium==2):#add more specific old labels
return labels + [(l[0]+"_OLD",l[1]) for l in labels if l[0] in mi_labels]
return labels
def apply_snomed_mapping_ptbxl(x):
return [(a,x[1]) for a in ptbxl_to_snomed[x[0]]]
def apply_uppropagation_dict(lst):
lst_ext = [([l[0]]+uppropagation_dict[l[0]],l[1]) for l in lst]
lst_ext = flatten([[(x,l[1]) for x in l[0]] for l in lst_ext])
#remove duplicate entries (in doubt take the more certain value)
labels = np.array([l[0] for l in lst_ext])
confidences = np.array([l[1] for l in lst_ext])
output=[]
for l in np.unique(labels):
output.append((l,max(confidences[np.where(labels==l)[0]])))
return output
# Parsing the SNOMED label tree
df_concept=pd.read_csv(path_snomed_concept,sep='\t')
df_concept.concept_id=df_concept.concept_id.apply(lambda x: convert_to_int(x))
df_concept.concept_code=df_concept.concept_code.apply(lambda x: convert_to_int(x))
df_relationship=pd.read_csv(path_snomed_relationship,sep='\t')
# Load 12SL labels and mapping
Note (according to PTB-XL docs): cannot rule out (CRO) weight 0.15, consider weight 0.35, possible (PO) weight 0.5, probable weight 0.5
df_labels.columns
df_labels = pd.read_csv(path_12sl)
df_labels["statements"]=df_labels["statements"].apply(lambda x: eval(x))
df_labels["statements_cat"]=df_labels["statements"].apply(lambda x: reformat_ge(x))
df_labels["statements_ext"]=df_labels["statements_cat"].apply(lambda x: minimal_extension(x,with_uncert=True).split(" "))
#finally split into lists again
df_labels["statements_cat"]=df_labels["statements_cat"].apply(lambda x:x.split(" "))
uncertainty_mapping = {np.nan:100., "consider":35., "possible":50., "probable":50., "probably":50., "PO":50., "CRO":15.}
df_ge_snomed = pd.read_csv(path_12sl_mapped_to_snomed)
ge_to_snomed = {}
for _,row in df_ge_snomed.iterrows():
if(not type(row["Acronym"])==str):
continue
tmp = []
if(not np.isnan(row["id1"])):
tmp.append((int(row["id1"]),uncertainty_mapping[row["qualifier1"]]))
if(not np.isnan(row["id2"])):
tmp.append((int(row["id2"]),uncertainty_mapping[row["qualifier2"]]))
if(not np.isnan(row["id3"])):
tmp.append((int(row["id3"]),uncertainty_mapping[row["qualifier3"]]))
if(not np.isnan(row["id4"])):
tmp.append((int(row["id4"]),uncertainty_mapping[row["qualifier4"]]))
if(not np.isnan(row["id5"])):
tmp.append((int(row["id5"]),uncertainty_mapping[row["qualifier5"]]))
ge_to_snomed[row["Acronym"]]=tmp
df_labels["statements_ext_snomed"]=df_labels["statements_ext"].apply(lambda x:flatten([apply_snomed_mapping_ge(l) for l in x]))
df_labels["statements_ext"]=df_labels["statements_ext"].apply(lambda x: [map_certainty_statements_ext(y) for y in x])
# Load PTB-XL labels and mapping
df_ptbxl = pd.read_csv(path_ptbxl)
df_ptbxl_snomed = pd.read_csv(path_ptbxl_mapped_to_snomed)
ptbxl_to_snomed = {}
for _,row in df_ptbxl_snomed.iterrows():
if(not type(row["Acronym"])==str):
continue
tmp = []
if(not np.isnan(row["id1"])):
tmp.append(int(row["id1"]))
if(not np.isnan(row["id2"])):
tmp.append(int(row["id2"]))
if(not np.isnan(row["id3"])):
tmp.append(int(row["id3"]))
if(not np.isnan(row["id4"])):
tmp.append(int(row["id4"]))
ptbxl_to_snomed[row["Acronym"]]=tmp
df_ptbxl["scp_codes"]=df_ptbxl.scp_codes.apply(lambda x: eval(x))
df_ptbxl["scp_codes_ext"]=df_ptbxl.scp_codes_ext.apply(lambda x: eval(x))
df_ptbxl["scp_codes_ext_snomed"]=df_ptbxl["scp_codes_ext"].apply(lambda x:flatten([apply_snomed_mapping_ptbxl(l) for l in x]))
## Populate SNOMED label tree
ge_snomed_ids = flatten_unique([[l[0] for l in x] for x in df_labels.statements_ext_snomed])
ptbxl_snomed_ids = flatten_unique([[l[0] for l in x] for x in df_ptbxl.scp_codes_ext_snomed])
all_snomed_ids = list(np.unique(ge_snomed_ids+ptbxl_snomed_ids))
G = populate_graph(all_snomed_ids, only_id=True)
all_snomed_ids_uppropagated=flatten_unique([get_uppropagated_labels([i],G) for i in all_snomed_ids])
node_description = []
for i in all_snomed_ids_uppropagated:
node_description.append({"snomed_id":i, "description":get_name_from_id(i, df_concept, include_id=False), "ancestors":[x for x in get_uppropagated_labels([i],G) if x!=i]})
df_snomed_description=pd.DataFrame(node_description)
#save new snomed description table
df_snomed_description.to_csv("snomed_description_new.csv",index=False)
uppropagation_dict={}
for _,row in df_snomed_description.iterrows():
uppropagation_dict[row["snomed_id"]]=row["ancestors"]
## replace snomed labels by uppropagated labels
df_labels["statements_ext_snomed"]=df_labels["statements_ext_snomed"].apply(lambda x: apply_uppropagation_dict(x))
df_ptbxl["scp_codes_ext_snomed"]=df_ptbxl["scp_codes_ext_snomed"].apply(lambda x: apply_uppropagation_dict(x))
df_ptbxl = df_ptbxl[["ecg_id","scp_codes","scp_codes_ext","scp_codes_ext_snomed"]]
#save new mapped labels
df_labels.to_csv("./output/labels/12sl_statements_new.csv",index=False)
df_ptbxl.to_csv("./output/labels/ptbxl_statements_new.csv",index=False)