Module teomim.teomim
Expand source code
import pandas as pd
import numpy as np
from quasinet.qnet import load_qnet
from quasinet.qsampling import qsample
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor
import argparse
from scipy.spatial.distance import cosine
import pkg_resources
import glob
import random
def bhattacharyya_coefficient(pmf1, pmf2):
return np.sum(np.sqrt((np.array(pmf1) * np.array(pmf2)).astype(float)))
# Global variables
global_model = None
global_steps = None
global_alpha = None
def select_key_by_probability(prob_dict):
"""
Select a key from a dictionary where the keys are the items to be selected
and the values are the probabilities of each key.
"""
# Normalize the probabilities to ensure they sum up to 1
total = sum(prob_dict.values())
normalized_probs = {k: v / total for k, v in prob_dict.items()}
# Randomly select a key based on the probabilities
return random.choices(list(normalized_probs.keys()), weights=normalized_probs.values(), k=1)[0]
def init_globals(model, steps, alpha):
global global_model, global_steps, global_alpha
global_model = model
global_steps = steps
global_alpha = alpha
def select_random_row(arr):
if arr.shape[0] == 1:
# Only one row
return arr[0]
else:
# Multiple rows, select one randomly
random_index = np.random.randint(arr.shape[0])
return arr[random_index]
def parallel_qsample(seed):
return qsample(seed, global_model,
steps=global_steps,
random_seed=True,
alpha=select_key_by_probability(global_alpha))
def generate(modelpath, gz=True, alpha=1.3, outfile=None,
steps=200000, numworkers=11, num_patients=1000,
seed=None):
model = load_qnet(modelpath, gz=gz)
featurenames = np.array(model.feature_names)
if seed is None:
seed = np.array([''] * len(featurenames)).astype('U100')
seeds = [seed for _ in range(num_patients)]
seed_used='EMPTY STR'
else:
seeds = [select_random_row(seed) for _ in range(num_patients)]
seed_used = 'DATAFRAME'
# Initialize global variables
init_globals(model, steps, alpha)
with ProcessPoolExecutor(max_workers=numworkers,
initializer=init_globals,
initargs=(model, steps, alpha)) as executor:
results = list(tqdm(executor.map(parallel_qsample, seeds),
total=num_patients))
Sf = pd.DataFrame(results, columns=featurenames)
if outfile:
Sf.to_csv(outfile)
return Sf,seed_used
def evaluate__(df, code_prefixes, suffix=None, age_prefix=''):
if not isinstance(code_prefixes, (np.ndarray, list)):
code_prefixes = [code_prefixes]
valid_rows = np.array([True] * df.index.size)
if suffix is not None and not isinstance(suffix, (np.ndarray, list)):
suffix = [suffix]
for code_prefix in code_prefixes:
af = df[[col for col in df.columns if col.startswith(code_prefix+'_'+age_prefix)]]
af=af.replace('.','').replace('',np.nan)
if suffix:
for s in suffix:
af = af.replace(s, np.nan)
# Determine if any non-NaN values exist in the row after handling suffixes
current_valid = af.notna().sum(axis=1).astype(bool)
# Perform an AND operation between the currently valid rows and the overall valid_rows
valid_rows &= current_valid
num_valid_rows = valid_rows.sum()
return num_valid_rows / df.index.size
class teomim:
def __init__(self, modelpath=None, gz=True, alpha=1.3,
outfile=None, steps=200000,
numworkers=11,
num_patients=1000,seed=None):
self.modelpath = modelpath
self.gz = gz
self.alpha = alpha
self.outfile = outfile
self.steps = steps
self.numworkers = numworkers
self.num_patients = num_patients
self.seed = seed
self.patients = None
self.seed_used = None
self.EVAL_PREFIXES={'I10':.7,'I25':.4,'I50':.25,'E11':.46,
'E66':.3,'I63':.4,'G20':.15,'F32':.5,
'F41':.4,'M81':.25,'J44':.55,'J84':0.005}
self.asset_path = pkg_resources.resource_filename('teomim', 'assets/')
def set_modelpath(self,specifier,path=None,gz=None):
if gz:
self.gz = gz
if not path:
self.modelpath = glob.glob(self.asset_path+'/*'+specifier+'*')[0]
else:
self.modelpath = specifier
return
def load(self,patientdata):
self.patients = pd.read_csv(patientdata)
def generate(self):
self.patients,self.seed_used\
= generate(modelpath=self.modelpath,
gz=self.gz, alpha=self.alpha,
outfile=self.outfile,
steps=self.steps,
seed=self.seed,
numworkers=self.numworkers,
num_patients=self.num_patients)
def set_model(self):
self.model = load_qnet(self.modelpath, gz=self.gz)
self.featurenames = np.array(self.model.feature_names)
def evaluate(self,EVAL=None):
if EVAL is None:
EVAL = self.EVAL_PREFIXES
elif not isinstance(EVAL, dict) or not all(isinstance(key,
str)
and isinstance(value,
float)
for key, value in EVAL.items()):
raise ValueError("EVAL must be a dictionary\
with keys as strings and values as floats.")
self.evaldf = pd.DataFrame([evaluate__(self.patients,x)
for x in EVAL.keys()],
list(EVAL.keys()),
columns=[
'prevalences']).assign(
prevalence_expected
=(np.array(EVAL.values())))
return self.evaldf.copy()
def quality(self,df=None):
if not df:
df=self.evaldf
if df.shape[1] != 2:
raise ValueError("DataFrame should have exactly\
two columns representing two PMFs.")
# Extracting PMFs from DataFrame columns
pmf1 = df.iloc[:, 0]
pmf2 = df.iloc[:, 1]
# Normalize PMFs to ensure they sum to 1
pmf1 = np.array(pmf1) / np.sum(pmf1)
pmf2 = np.array(pmf2) / np.sum(pmf2)
# Calculate Bhattacharyya Coefficient
b_coeff = bhattacharyya_coefficient(pmf1, pmf2)*100
return np.round(b_coeff,2)
Functions
def bhattacharyya_coefficient(pmf1, pmf2)-
Expand source code
def bhattacharyya_coefficient(pmf1, pmf2): return np.sum(np.sqrt((np.array(pmf1) * np.array(pmf2)).astype(float))) def evaluate__(df, code_prefixes, suffix=None, age_prefix='')-
Expand source code
def evaluate__(df, code_prefixes, suffix=None, age_prefix=''): if not isinstance(code_prefixes, (np.ndarray, list)): code_prefixes = [code_prefixes] valid_rows = np.array([True] * df.index.size) if suffix is not None and not isinstance(suffix, (np.ndarray, list)): suffix = [suffix] for code_prefix in code_prefixes: af = df[[col for col in df.columns if col.startswith(code_prefix+'_'+age_prefix)]] af=af.replace('.','').replace('',np.nan) if suffix: for s in suffix: af = af.replace(s, np.nan) # Determine if any non-NaN values exist in the row after handling suffixes current_valid = af.notna().sum(axis=1).astype(bool) # Perform an AND operation between the currently valid rows and the overall valid_rows valid_rows &= current_valid num_valid_rows = valid_rows.sum() return num_valid_rows / df.index.size def generate(modelpath, gz=True, alpha=1.3, outfile=None, steps=200000, numworkers=11, num_patients=1000, seed=None)-
Expand source code
def generate(modelpath, gz=True, alpha=1.3, outfile=None, steps=200000, numworkers=11, num_patients=1000, seed=None): model = load_qnet(modelpath, gz=gz) featurenames = np.array(model.feature_names) if seed is None: seed = np.array([''] * len(featurenames)).astype('U100') seeds = [seed for _ in range(num_patients)] seed_used='EMPTY STR' else: seeds = [select_random_row(seed) for _ in range(num_patients)] seed_used = 'DATAFRAME' # Initialize global variables init_globals(model, steps, alpha) with ProcessPoolExecutor(max_workers=numworkers, initializer=init_globals, initargs=(model, steps, alpha)) as executor: results = list(tqdm(executor.map(parallel_qsample, seeds), total=num_patients)) Sf = pd.DataFrame(results, columns=featurenames) if outfile: Sf.to_csv(outfile) return Sf,seed_used def init_globals(model, steps, alpha)-
Expand source code
def init_globals(model, steps, alpha): global global_model, global_steps, global_alpha global_model = model global_steps = steps global_alpha = alpha def parallel_qsample(seed)-
Expand source code
def parallel_qsample(seed): return qsample(seed, global_model, steps=global_steps, random_seed=True, alpha=select_key_by_probability(global_alpha)) def select_key_by_probability(prob_dict)-
Select a key from a dictionary where the keys are the items to be selected and the values are the probabilities of each key.
Expand source code
def select_key_by_probability(prob_dict): """ Select a key from a dictionary where the keys are the items to be selected and the values are the probabilities of each key. """ # Normalize the probabilities to ensure they sum up to 1 total = sum(prob_dict.values()) normalized_probs = {k: v / total for k, v in prob_dict.items()} # Randomly select a key based on the probabilities return random.choices(list(normalized_probs.keys()), weights=normalized_probs.values(), k=1)[0] def select_random_row(arr)-
Expand source code
def select_random_row(arr): if arr.shape[0] == 1: # Only one row return arr[0] else: # Multiple rows, select one randomly random_index = np.random.randint(arr.shape[0]) return arr[random_index]
Classes
class teomim (modelpath=None, gz=True, alpha=1.3, outfile=None, steps=200000, numworkers=11, num_patients=1000, seed=None)-
Expand source code
class teomim: def __init__(self, modelpath=None, gz=True, alpha=1.3, outfile=None, steps=200000, numworkers=11, num_patients=1000,seed=None): self.modelpath = modelpath self.gz = gz self.alpha = alpha self.outfile = outfile self.steps = steps self.numworkers = numworkers self.num_patients = num_patients self.seed = seed self.patients = None self.seed_used = None self.EVAL_PREFIXES={'I10':.7,'I25':.4,'I50':.25,'E11':.46, 'E66':.3,'I63':.4,'G20':.15,'F32':.5, 'F41':.4,'M81':.25,'J44':.55,'J84':0.005} self.asset_path = pkg_resources.resource_filename('teomim', 'assets/') def set_modelpath(self,specifier,path=None,gz=None): if gz: self.gz = gz if not path: self.modelpath = glob.glob(self.asset_path+'/*'+specifier+'*')[0] else: self.modelpath = specifier return def load(self,patientdata): self.patients = pd.read_csv(patientdata) def generate(self): self.patients,self.seed_used\ = generate(modelpath=self.modelpath, gz=self.gz, alpha=self.alpha, outfile=self.outfile, steps=self.steps, seed=self.seed, numworkers=self.numworkers, num_patients=self.num_patients) def set_model(self): self.model = load_qnet(self.modelpath, gz=self.gz) self.featurenames = np.array(self.model.feature_names) def evaluate(self,EVAL=None): if EVAL is None: EVAL = self.EVAL_PREFIXES elif not isinstance(EVAL, dict) or not all(isinstance(key, str) and isinstance(value, float) for key, value in EVAL.items()): raise ValueError("EVAL must be a dictionary\ with keys as strings and values as floats.") self.evaldf = pd.DataFrame([evaluate__(self.patients,x) for x in EVAL.keys()], list(EVAL.keys()), columns=[ 'prevalences']).assign( prevalence_expected =(np.array(EVAL.values()))) return self.evaldf.copy() def quality(self,df=None): if not df: df=self.evaldf if df.shape[1] != 2: raise ValueError("DataFrame should have exactly\ two columns representing two PMFs.") # Extracting PMFs from DataFrame columns pmf1 = df.iloc[:, 0] pmf2 = df.iloc[:, 1] # Normalize PMFs to ensure they sum to 1 pmf1 = np.array(pmf1) / np.sum(pmf1) pmf2 = np.array(pmf2) / np.sum(pmf2) # Calculate Bhattacharyya Coefficient b_coeff = bhattacharyya_coefficient(pmf1, pmf2)*100 return np.round(b_coeff,2)Methods
def evaluate(self, EVAL=None)-
Expand source code
def evaluate(self,EVAL=None): if EVAL is None: EVAL = self.EVAL_PREFIXES elif not isinstance(EVAL, dict) or not all(isinstance(key, str) and isinstance(value, float) for key, value in EVAL.items()): raise ValueError("EVAL must be a dictionary\ with keys as strings and values as floats.") self.evaldf = pd.DataFrame([evaluate__(self.patients,x) for x in EVAL.keys()], list(EVAL.keys()), columns=[ 'prevalences']).assign( prevalence_expected =(np.array(EVAL.values()))) return self.evaldf.copy() def generate(self)-
Expand source code
def generate(self): self.patients,self.seed_used\ = generate(modelpath=self.modelpath, gz=self.gz, alpha=self.alpha, outfile=self.outfile, steps=self.steps, seed=self.seed, numworkers=self.numworkers, num_patients=self.num_patients) def load(self, patientdata)-
Expand source code
def load(self,patientdata): self.patients = pd.read_csv(patientdata) def quality(self, df=None)-
Expand source code
def quality(self,df=None): if not df: df=self.evaldf if df.shape[1] != 2: raise ValueError("DataFrame should have exactly\ two columns representing two PMFs.") # Extracting PMFs from DataFrame columns pmf1 = df.iloc[:, 0] pmf2 = df.iloc[:, 1] # Normalize PMFs to ensure they sum to 1 pmf1 = np.array(pmf1) / np.sum(pmf1) pmf2 = np.array(pmf2) / np.sum(pmf2) # Calculate Bhattacharyya Coefficient b_coeff = bhattacharyya_coefficient(pmf1, pmf2)*100 return np.round(b_coeff,2) def set_model(self)-
Expand source code
def set_model(self): self.model = load_qnet(self.modelpath, gz=self.gz) self.featurenames = np.array(self.model.feature_names) def set_modelpath(self, specifier, path=None, gz=None)-
Expand source code
def set_modelpath(self,specifier,path=None,gz=None): if gz: self.gz = gz if not path: self.modelpath = glob.glob(self.asset_path+'/*'+specifier+'*')[0] else: self.modelpath = specifier return