PTB-XL+, a comprehensive electrocardiographic feature dataset 1.0.0

File: <base>/labels/mapping/apply_snomed_mapping.py (11,117 bytes)
import pandas as pd
import numpy as np
import networkx as nx

# Set paths

path_snomed_concept = "CONCEPT.csv"
path_snomed_relationship = "CONCEPT_RELATIONSHIP.csv"

path_12sl = "../12sl_statements.csv"
path_12sl_mapped_to_snomed = "../12slv23ToSNOMED.csv"

path_ptbxl = "../ptbxl_statements.csv"
path_ptbxl_mapped_to_snomed = "../ptbxlToSNOMED.csv"


# Utility functions

def flatten(l):
    return [item for sublist in l for item in sublist]

def flatten_unique(l):
    return list(set(flatten(l)))

def convert_to_int(x):
    try:
        xi = int(x)
    except:
        xi = -1
    return xi

def get_parents(concept_id, df_relationship):
    lst = list(df_relationship[(df_relationship.concept_id_1==concept_id) & (df_relationship.relationship_id == "Is a")]["concept_id_2"])
    return [x for x in lst if x!=concept_id]

def get_name_from_id(concept_id, df_concept,include_id=False):
    selection = df_concept[df_concept.concept_id ==concept_id]
    if(len(selection)>0):
        name = selection.iloc[0]["concept_name"]
        if(include_id):
            name = name + "["+str(concept_id)+"]"
    else:
        name = "invalid id ["+str(concept_id)+"]"
        print("Invalid ID:",concept_id)
    return name

def get_id_from_code(concept_code, df_concept):
    selection = df_concept[df_concept.concept_code ==concept_code]
    if(len(selection)>0):
        return selection.iloc[0]["concept_id"]
    else:
        print("Invalid code:",concept_code)
        return -1

def get_name_from_code(concept_code, df_concept,include_id=False):
    idx = get_id_from_code(concept_code,df_concept)
    return get_name_from_id(idx,df_concept,include_id)

def populate_graph(lst,G=None,lst_processed=[],include_id=False,only_id=False):
    if(G is None):
        G = nx.DiGraph()
    if(len(lst)==0):
        return G
    else:
        tag = lst[0] if only_id else get_name_from_id(lst[0], df_concept,include_id=include_id)

        for p in get_parents(lst[0], df_relationship):
            tagp = p if only_id else get_name_from_id(p, df_concept, include_id=include_id)
            if(not p in lst_processed):
                lst.append(p)
            G.add_edge(tagp,tag)
            
        lst0=lst[0]
        lst.pop(0)
        return populate_graph(list(set(lst)),G,lst_processed+[lst0], include_id=include_id, only_id=only_id)

def get_uppropagated_labels(key_lst,G,exclude_snomed_id_lst=[]):
    result=[]
    for key in key_lst:
        #assumes DAG
        for s in [key]:
            tmp=[l for l in list(nx.ancestors(G,s)) if not l in exclude_snomed_id_lst]
            if not s in exclude_snomed_id_lst:
                tmp+=[s]
            result+=tmp
    return list(set(result))

def reformat_ge(x,remove_brackets=True):
    if(remove_brackets):
        x= " ".join(x).replace(" COMMA "," ").replace(" LPAREN "," ").replace(" RPAREN "," ")
    else:
        x= " ".join(x).replace(" COMMA ",";,;").replace(" LPAREN ",";(;").replace(" RPAREN",";)")#human readable
    x= x.replace(" $SWITH ",";$SWITH;").replace(" $SOR ",";$SOR;").replace(" $SAND ",";$SAND;")#with, or, and
    for s in ["AC","AU","OLD","NEW"]:#infarction combine with previous
        x= x.replace(" "+s,";"+s)
    for s in ["BLKED","ACCEL","PO","CRO"]:#combine with next
        x= x.replace(s+" ",s+";")
    for s in ["FAV","SPR","MBZI","MBZII","SAV","CHB","VAVB",
              "AVDIS","W2T1","W3T1","W4T1","RVR","SVR","CJP",
              "IRREG","ABER","PROAV","CSEC","BIGEM","JESC","VESC",
             "$SRETC","SAR","MSAR","RVE+","QRSW","2ST","QRSW-2ST","MAFB"]:#with... combine with previous
        x= x.replace(" "+s,";"+s)
    x=x.replace(" OCC ",";OCC;")
    x=x.replace(" FREQ ",";FREQ;")
    x=x.replace("ST& ","ST&;")
    return x

def minimal_extension(x,with_uncert=True):#only bind AC and AU
    if(with_uncert):
        x=x.replace("PO; ","PO;").replace("CRO; ","CRO;")
    else:
        x=x.replace("PO;","").replace("CRO;","")
    #remove non-informative tokens
    for l in ["$SWITH;","$SOR;","$SAND;","OCC;","FREQ;"]:
        x = x.replace(l,"")
    x= x.replace("LVH ","")
    for l in ["CSEC","RAVL","SOKOLYON","CORNPROD","ROMESTES","QRSV","LVH3","LPAREN","RPAREN"]:
        x = x.replace(" "+l,"")
        x = x.replace(l+" ","")
        
    #fix issues with ACCEL and swap MAFB and AU first
    return x.replace(";ACCEL"," ACCEL").replace("MAFB;AU","AU;MAFB").replace(";AU","xxxAU").replace(";AC","xxxAC").replace("PO;","POxxx").replace("CRO;","CROxxx").replace(";"," ").replace("xxx",";").strip()

def apply_snomed_mapping_ge(x):
    if(x.startswith("PO;")):
        cert=uncertainty_mapping["PO"]
        statement=x[3:]
    elif(x.startswith("CRO;")):
        cert=uncertainty_mapping["CRO"]
        statement=x[4:]
    else:
        cert=100.
        statement=x
    x_mapped=ge_to_snomed[statement]
    return [(x[0],min(x[1],cert)) for x in x_mapped] #in doubt take the less certain value

def map_certainty_statements_ext(x):
    if(x.startswith("PO;")):
        cert=uncertainty_mapping["PO"]
        statement=x[3:]
    elif(x.startswith("CRO;")):
        cert=uncertainty_mapping["CRO"]
        statement=x[4:]
    else:
        cert=100.
        statement=x
    return (statement,cert)

def map_infarction_stadium(stadium1, stadium2):
    if(stadium1 in ['Stadium I', 'Stadium I-II']):
        return 1
    if(stadium2 in ['Stadium I', 'Stadium I-II']):
        return 1
    if(stadium1 == 'Stadium II'):
        return 2
    if(stadium2 == 'Stadium II'):
        return 2
    if(stadium1 in ['Stadium III', 'Stadium II-III']):
        return 3
    if(stadium2 in ['Stadium III', 'Stadium II-III']):
        return 3
    return 0

def map_mi_labels(labels, infarction_stadium):
    mi_labels = ["IMI","ASMI","ILMI","AMI","ALMI","INJAS","LMI","INJAL","IPLMI","IPMI","INJIN","PMI","INJLA","INJIL"]
    if(infarction_stadium==1):#add more specific acute labels
        return labels + [(l[0]+"_AC",l[1]) for l in labels if l[0] in mi_labels]
    elif(infarction_stadium==2):#add more specific old labels
        return labels + [(l[0]+"_OLD",l[1]) for l in labels if l[0] in mi_labels]
    return labels

def apply_snomed_mapping_ptbxl(x):
    return [(a,x[1]) for a in ptbxl_to_snomed[x[0]]]

def apply_uppropagation_dict(lst):
    lst_ext = [([l[0]]+uppropagation_dict[l[0]],l[1]) for l in lst]
    lst_ext = flatten([[(x,l[1]) for x in l[0]] for l in lst_ext])
    #remove duplicate entries (in doubt take the more certain value)
    labels = np.array([l[0] for l in lst_ext])
    confidences = np.array([l[1] for l in lst_ext])
    output=[]
    for l in np.unique(labels):
        output.append((l,max(confidences[np.where(labels==l)[0]])))
    return output
    

# Parsing the SNOMED label tree

df_concept=pd.read_csv(path_snomed_concept,sep='\t')
df_concept.concept_id=df_concept.concept_id.apply(lambda x: convert_to_int(x))
df_concept.concept_code=df_concept.concept_code.apply(lambda x: convert_to_int(x))
df_relationship=pd.read_csv(path_snomed_relationship,sep='\t')

# Load 12SL labels and mapping

Note (according to PTB-XL docs): cannot rule out (CRO) weight 0.15, consider weight 0.35, possible (PO) weight 0.5, probable weight 0.5

df_labels.columns

df_labels = pd.read_csv(path_12sl)

df_labels["statements"]=df_labels["statements"].apply(lambda x: eval(x))

df_labels["statements_cat"]=df_labels["statements"].apply(lambda x: reformat_ge(x))
df_labels["statements_ext"]=df_labels["statements_cat"].apply(lambda x: minimal_extension(x,with_uncert=True).split(" "))

#finally split into lists again
df_labels["statements_cat"]=df_labels["statements_cat"].apply(lambda x:x.split(" "))

uncertainty_mapping = {np.nan:100., "consider":35., "possible":50., "probable":50., "probably":50., "PO":50., "CRO":15.}

df_ge_snomed = pd.read_csv(path_12sl_mapped_to_snomed)

ge_to_snomed = {}

for _,row in df_ge_snomed.iterrows():
    if(not type(row["Acronym"])==str):
        continue
    tmp = []
    if(not np.isnan(row["id1"])):
        tmp.append((int(row["id1"]),uncertainty_mapping[row["qualifier1"]]))
    if(not np.isnan(row["id2"])):
        tmp.append((int(row["id2"]),uncertainty_mapping[row["qualifier2"]]))
    if(not np.isnan(row["id3"])):
        tmp.append((int(row["id3"]),uncertainty_mapping[row["qualifier3"]]))
    if(not np.isnan(row["id4"])):
        tmp.append((int(row["id4"]),uncertainty_mapping[row["qualifier4"]]))
    if(not np.isnan(row["id5"])):
        tmp.append((int(row["id5"]),uncertainty_mapping[row["qualifier5"]]))
    ge_to_snomed[row["Acronym"]]=tmp
    

df_labels["statements_ext_snomed"]=df_labels["statements_ext"].apply(lambda x:flatten([apply_snomed_mapping_ge(l) for l in x]))

df_labels["statements_ext"]=df_labels["statements_ext"].apply(lambda x: [map_certainty_statements_ext(y) for y in x])

# Load PTB-XL labels and mapping

df_ptbxl = pd.read_csv(path_ptbxl)

df_ptbxl_snomed = pd.read_csv(path_ptbxl_mapped_to_snomed)

ptbxl_to_snomed = {}

for _,row in df_ptbxl_snomed.iterrows():
    if(not type(row["Acronym"])==str):
        continue
    tmp = []
    if(not np.isnan(row["id1"])):
        tmp.append(int(row["id1"]))
    if(not np.isnan(row["id2"])):
        tmp.append(int(row["id2"]))
    if(not np.isnan(row["id3"])):
        tmp.append(int(row["id3"]))
    if(not np.isnan(row["id4"])):
        tmp.append(int(row["id4"]))
    ptbxl_to_snomed[row["Acronym"]]=tmp

df_ptbxl["scp_codes"]=df_ptbxl.scp_codes.apply(lambda x: eval(x))
df_ptbxl["scp_codes_ext"]=df_ptbxl.scp_codes_ext.apply(lambda x: eval(x))

df_ptbxl["scp_codes_ext_snomed"]=df_ptbxl["scp_codes_ext"].apply(lambda x:flatten([apply_snomed_mapping_ptbxl(l) for l in x]))

## Populate SNOMED label tree

ge_snomed_ids = flatten_unique([[l[0] for l in x] for x in df_labels.statements_ext_snomed])
ptbxl_snomed_ids = flatten_unique([[l[0] for l in x] for x in df_ptbxl.scp_codes_ext_snomed])
all_snomed_ids = list(np.unique(ge_snomed_ids+ptbxl_snomed_ids))

G = populate_graph(all_snomed_ids, only_id=True)

all_snomed_ids_uppropagated=flatten_unique([get_uppropagated_labels([i],G) for i in all_snomed_ids])

node_description = []
for i in all_snomed_ids_uppropagated:
    node_description.append({"snomed_id":i, "description":get_name_from_id(i, df_concept, include_id=False), "ancestors":[x for x in get_uppropagated_labels([i],G) if x!=i]})
df_snomed_description=pd.DataFrame(node_description)

#save new snomed description table
df_snomed_description.to_csv("snomed_description_new.csv",index=False)

uppropagation_dict={}

for _,row in df_snomed_description.iterrows():
    uppropagation_dict[row["snomed_id"]]=row["ancestors"]

## replace snomed labels by uppropagated labels

df_labels["statements_ext_snomed"]=df_labels["statements_ext_snomed"].apply(lambda x: apply_uppropagation_dict(x))

df_ptbxl["scp_codes_ext_snomed"]=df_ptbxl["scp_codes_ext_snomed"].apply(lambda x: apply_uppropagation_dict(x))
df_ptbxl = df_ptbxl[["ecg_id","scp_codes","scp_codes_ext","scp_codes_ext_snomed"]]

#save new mapped labels
df_labels.to_csv("./output/labels/12sl_statements_new.csv",index=False)
df_ptbxl.to_csv("./output/labels/ptbxl_statements_new.csv",index=False)