Module zedstat.zedstat
Expand source code
import pandas as pd
import numpy as np
from sklearn.metrics import auc
from scipy.interpolate import UnivariateSpline
class processRoc(object):
"""process ROC datafile"""
def __init__(self,
df=None,
fprcol='fpr',
tprcol='tpr',
thresholdcol='threshold',
prevalence=None,
order=2,
total_samples=None,
positive_samples=None,
alpha=0.05):
"""Initialization
Args:
df (pandas.DataFrame): dataframe with columns tabulating fpr, tpr, and optionally threshold values
fprcol (str): string name of fpr column
tprcol (str): string name of tpr column
thresholdcol (str): string name of threshold column
prevalence (float): prevalence of positive cases in population (need not be the data ratio)
order (int): order of polynomial/spline for smoothing
total_samples (int): total number of samples in the original data
positive samples (int): number of positive cases in the original data
alpha (float): significance level e.g. 0.05
"""
#print('local version 1')
if df.index.name==fprcol:
self.df=df.copy()
else:
if fprcol in df.columns:
self.df=df.set_index(fprcol).copy()
else:
raise('fpr not in columns or index')
self.thresholdcol = thresholdcol
if thresholdcol not in self.df.columns:
self.thresholdcol=None
self.df = self.df.sort_values('fpr')
self.fprcol = fprcol
self.tprcol = tprcol
self.raw_df = self.df.copy()
self.prevalence = prevalence
self.order = order
self.df_lim = {}
self._auc = {'U':[],'L':[]}
self.total_samples = total_samples
self.positive_samples = positive_samples
self.alpha = alpha
self.df=self.df.groupby(self.fprcol).max().reset_index()
def get(self):
'''
return dataframe currently in class
Returns:
pandas.DataFrame
'''
return self.df.copy()
def nominal_auc(self):
'''
calculate nominal auc
'''
from sklearn.metrics import auc
self._auc['nominal']=auc(self.df.index.values,self.df.tpr.values)
return
def auc(self,
total_samples=None,
positive_samples=None,
alpha=None):
'''
calculate auc with confidence bounds. As default, the arguments are read from class initializtion.
Args:
total_samples (int): total number fo samples, default None
positive_samples (int): number fo positive samples, default None
alpha (float): significance level, default None
Returns:
float: nominal auc
float: upper bound
float: lower bound
'''
self.nominal_auc()
self._auc['U']=[]
self._auc['L']=[]
self.getBounds(total_samples=total_samples,
positive_samples=positive_samples,
alpha=alpha)
self.__auc_cb2(total_samples=total_samples,
positive_samples=positive_samples,
alpha=alpha)
return self._auc['nominal'], self._auc['U'].min(), self._auc['L'].max()
def __convexify(self):
'''
compute convex hull of the roc curve
'''
#print(self.df)
from scipy.spatial import ConvexHull
if self.df.index.name==self.fprcol:
rf=self.df
else:
if self.fprcol in self.df.columns:
rf=self.df.set_index(self.fprcol)#.drop('threshold',axis=1)
else:
raise('fpr not in columns or index')
rf = rf.reset_index()
rf=pd.concat([rf,pd.DataFrame({self.fprcol:0, self.tprcol:0},index=[0])])
rf=pd.concat([rf,pd.DataFrame({self.fprcol:1, self.tprcol:0},index=[0])])
rf=pd.concat([rf,pd.DataFrame({self.fprcol:1, self.tprcol:1},index=[0])])
rf=rf.drop_duplicates()
rf=rf.sort_values(self.fprcol)
rf=rf.sort_values(self.tprcol)
pts=rf[[self.fprcol,self.tprcol]].values
#print(pts)
if len(pts)<3:
rf_=rf.copy()
rf_.columns=[self.tprcol,self.fprcol]
rf_=rf_.sort_values(self.fprcol)
rf_=rf_.sort_values(self.tprcol)
self.df=rf_.set_index(self.fprcol).copy()
return #self.df
hull = ConvexHull(pts)
rf_=pd.DataFrame(pts[hull.vertices, 0], pts[hull.vertices, 1]).reset_index()
rf_.columns=[self.tprcol,self.fprcol]
rf_ = rf_.set_index(self.fprcol)
rf_ = rf_.drop(1.0).sort_index()
rf_.loc[1.0]=1.0
self.df=rf_.copy()
return
def smooth(self,
STEP=0.0001,
interpolate=True,
convexify=True):
'''
smooth roc curves and update processRoc.df which is accessible using processRoc.get()
Args:
STEP (float): smooting step, default 0.0001
interpolate (bool): if True, interpolate missing values, default True
convexify (bool): if True, replace ROC with convex hull, default True
'''
self.df=self.raw_df.copy()
VAR=self.fprcol
df_=self.df.reset_index()
DF=pd.concat([pd.DataFrame(
df_[df_[VAR].between(i,i+STEP)].max()).transpose()
for i in np.arange(0,1,STEP)]).set_index(VAR)
DF=DF.dropna()
DF.loc[0]=pd.Series([],dtype=float)
DF.loc[1]=pd.Series([],dtype=float)
DF.loc[0,'tpr']=0
DF.loc[1,'tpr']=1
DF=DF.sort_index()
if interpolate:
DF=DF.interpolate(limit_direction='both',method='spline',order=self.order)
DF[DF < 0] = 0
self.df=DF
if convexify:
self.__convexify()
return
def allmeasures(self,prevalence=None,interpolate=False):
'''
compute accuracy, PPV, NPV, positive and negative likelihood ratios, and update processRoc.df, which can be accessed using processRoc.get()
Args:
prevalence (float): prevalence od positive cases in population
interpolate (bool): if True interpolate missing values, default False
'''
if prevalence is not None:
p=prevalence
self.prevalence=p
else:
p=self.prevalence
if p is None:
raise('prevalence undefined')
df__=self.df.copy()
if self.fprcol == df__.index.name:
df__=df__.reset_index()
df__['ppv']=1/(1+((df__.fpr/df__.tpr)*((1/p)-1)))
df__['acc']=p*df__.tpr + (1-p)*(1-df__.fpr)
df__['npv']=1/(1+((1-df__.tpr)/(1-df__.fpr))*(1/((1/p)-1)))
df__['LR+']=(df__.tpr)/(df__.fpr)
df__['LR-']=(1-df__.tpr)/(1-df__.fpr)
else:
assert('set fpr as index')
df__=df__.set_index(self.fprcol)
#display(df__)
df__=df__.replace(np.inf,np.nan)
#display(df__)
if interpolate:
try:
df__=df__.interpolate(limit_direction='both',method='spline',order=self.order)
except:
print('interpolation failed')
#display(df__)
if self.thresholdcol is not None:
if self.thresholdcol not in df__.columns:
df__=df__.join(self.raw_df[self.thresholdcol])
self.df=df__
self.__correctPPV()
self.df[self.df < 0] = 0
self.df=self.df.reset_index().groupby('fpr').max()
self.df.ppv=UnivariateSpline(self.df.index.values,self.df.ppv.values,k=1,s=None)(self.df.index.values)
self.df[self.df < 0] = 0
#display(self.df)
return #self.df
def __correctPPV(self,df=None):
'''
make ppv monotonic
'''
if 'ppv' not in self.df.columns:
return
if df is None:
df__=self.df
else:
df__=df.copy()
#df__.loc[0].ppv=1.0
#df__.loc[1].ppv=0.0
arr=df__.ppv.values
a_=[]
for i in np.arange(len(arr)-1):
if arr[i+1]>arr[i]:
a_.append(1.05*arr[i+1])
else:
a_.append(arr[i])
a_.append(arr[-1])
df__.ppv=a_
#df__.loc[0].ppv=1.0
#df__.loc[1].ppv=0.0
return df__
def scoretoprobability(self, score, regen=True, **kwargs):
'''
Map computed score to probability of sample being in the positive class.
This is simply the PPV corresponding to the threshold which equals the score.
Now supports both single scores and lists/numpy arrays of scores.
Args:
score (float or list or numpy.ndarray): computed score(s)
regen (bool): if True, regenerate roc curve
kwargs (dict): values passed for regeneration of smoothed roc
Return:
float or numpy.ndarray representing probability of being in positive cohort
'''
if score is None:
return None
if regen:
STEP = 0.01
precision = 3
interpolate = True
STEP = kwargs.get('STEP', STEP)
precision = kwargs.get('precision', precision)
interpolate = kwargs.get('interpolate', interpolate)
self.smooth(STEP=STEP)
self.allmeasures(interpolate=interpolate)
self.usample(precision=precision)
df = self.get()
if 'threshold' not in df.columns:
raise ValueError('Threshold not in columns or index')
if 'ppv' not in df.columns:
raise ValueError('PPV not in columns or index')
def compute_val(score):
if score is None:
return None
if score > df.threshold.max():
val = df.ppv.values.max()
else:
val = df[df.threshold > score].ppv.tail(1).values[0]
return (val - df.ppv.values.min()) / (df.ppv.values.max() - df.ppv.values.min())
if isinstance(score, (list, np.ndarray)):
return np.array([compute_val(s) for s in score])
else:
return compute_val(score)
def usample(self,
df=None,
precision=3):
'''
make performance measures estimated at regular intervals of false positive rate
Args:
df (pandas.DataFrame): dataframe woth performance values, fpr as index. default: None, when the dataframe entered at initialization is used
precision (int): number of digits after decismal point used to sample fpr range
Returns:
pandas.DataFrame: uniformly sampled performance dataframe
'''
step=10**(-precision)
fpr=[np.round(x,precision) for x in np.arange(0,1+step,step)]
if df is None:
fpr_=[x for x in fpr if x not in self.df.index]
else:
fpr_=[x for x in fpr if x not in df.index]
if df is None:
df___=self.df.copy()
else:
df___=df.copy()
for x in fpr_:
df___.loc[x]=pd.Series([],dtype=float)
df___=df___.sort_index().interpolate()
df___=df___.loc[fpr]
if df is None:
self.df=df___
return df___
def __getDelta(self,
total_samples=None,
positive_samples=None,
alpha=None):
'''
confidence bounds on specificity and sensitivity using Wald-type approach
'''
if total_samples is None:
total_samples=self.total_samples
if positive_samples is None:
positive_samples = self.positive_samples
if alpha is None:
alpha=self.alpha
n=total_samples
n_pos=positive_samples
if self.fprcol not in self.df.columns:
if self.fprcol != self.df.index.name:
return
if self.tprcol not in self.df.columns:
return
import scipy.stats as stats
z=stats. norm. ppf(1 - (alpha/2))
delta_=pd.DataFrame()
df_=self.df.copy()
if self.fprcol == df_.index.name:
df_=df_.reset_index()
delta_['fprdel']=z*np.sqrt((df_.fpr*(1-df_.fpr))/n)
delta_['tprdel']=z*np.sqrt((df_.tpr*(1-df_.tpr))/n_pos)
self.delta_=delta_
return
def getBounds(self,
total_samples=None,
positive_samples=None,
alpha=None,
prevalence=None):
'''
compute confidence bounds on performance measures
Args:
total_samples (int): total number fo samples, default None
positive_samples (int): number fo positive samples, default None
alpha (float): significance level, default None
prevalence (float): prevalence of positive cases in population, default None
'''
self.__getDelta(total_samples=total_samples,
positive_samples=positive_samples,
alpha=alpha)
if prevalence is None:
p=self.prevalence
else:
p=prevalence
if p is None:
raise('prevalence undefined')
for direction in ['U','L']:
df__=self.df.copy().reset_index()
if direction=='U':
df__.tpr=df__.tpr+self.delta_.tprdel
#df__.fpr=df__.fpr-self.delta_.fprdel
else:
df__.tpr=df__.tpr-self.delta_.tprdel
#df__.fpr=df__.fpr+self.delta_.fprdel
df__['ppv']=1/(1+((df__.fpr/df__.tpr)*((1/p)-1)))
df__['acc']=p*df__.tpr + (1-p)*(1-df__.fpr)
df__['npv']=1/(1+((1-df__.tpr)/(1-df__.fpr))*(1/((1/p)-1)))
df__['LR+']=(df__.tpr)/(df__.fpr)
df__['LR-']=(1-df__.tpr)/(1-df__.fpr)
df__=df__.replace(np.inf,np.nan)
df__=df__.interpolate(limit_direction='both',method='spline',
order=self.order).set_index(self.fprcol)
df__[df__ < 0] = 0
self.df_lim[direction]=self.__correctPPV(df__)
if direction=='U':
self._auc[direction]=np.array([np.append(self._auc[direction],
auc(df__.index.values,
df__.tpr.values)).min()])
if direction=='L':
self._auc[direction]=np.array([np.append(self._auc[direction],
auc(df__.index.values,
df__.tpr.values)).max()])
# adjust datframe to cneter of upper and lowwr bounds
#self.df=(self.df_lim['U']+ self.df_lim['L'] )/2
#self.__correctvalues()
return
def __correctvalues(self):
#ppv
if 'ppv' in self.df.columns:
self.df.ppv[self.df.ppv>1]=1.0
if 'npv' in self.df.columns:
self.df.npv[self.df.npv>1]=1.0
if 'LR-' in self.df.columns:
self.df['LR-'][self.df['LR-']>1]=1.0
def __auc_cb2(self,
total_samples=None,
positive_samples=None,
alpha=None):
'''
compute auc confidence bounds using Danzig bounds
Args:
total_samples (int): total number fo samples, default None
positive_samples (int): number fo positive samples, default None
alpha (float): significance level, default None
'''
if total_samples is None:
total_samples=self.total_samples
if positive_samples is None:
positive_samples = self.positive_samples
if alpha is None:
alpha=self.alpha
n=total_samples
n_pos=positive_samples
if 'nominal' not in self._auc.keys():
assert('calculate nominal auc first')
import scipy.stats as stats
auc=self._auc['nominal']
z=stats. norm. ppf(1 - (alpha/2))
eta=1+(n_pos/(z*z))
b=(auc-.5)/eta
auc_U=auc+b+ (1/eta)*np.sqrt((auc-.5)**2 + (auc*(1-auc)*eta))
auc_L=auc+b- (1/eta)*np.sqrt((auc-.5)**2 + (auc*(1-auc)*eta))
self._auc['L']=np.array([np.append(self._auc['L'],auc_L).max()])
self._auc['U']=np.array([np.append(self._auc['U'],auc_U).min()])
return
def operating_zone(self,
n=1,
LRplus=10,
LRminus=0.6):
'''
compute the end points of the operating zone,
one for maximizing precions, and one for maximizing sensitivity
Args:
n (int): number of operting points per condition returned, default 1
LRplus (float): lower bound on positive likelihood ratio, default 10.0
LRminus (float): upper bound on negative likelihood ratio, default 0.6
'''
wf=self.df.copy()
opf=pd.concat([wf[(wf['LR+']>LRplus)
& (wf['LR-']<LRminus) ]\
.sort_values('ppv',ascending=False).head(n),
wf[(wf['LR+']>LRplus)
& (wf['LR-']<LRminus) ]\
.sort_values('tpr',ascending=False).head(n)])
if opf.empty:
self._operating_zone=opf.copy()
return #self._operating_zone.copy()
self._operating_zone=opf.reset_index()
self._operating_zone.index=['high precision']*n + ['high sensitivity']*n
return #self._operating_zone.copy()
def samplesize(self,
delta_auc=0.1,
target_auc=None,
alpha=None):
'''
estimate sample size for atataing auc bound under given significance level
Args:
delta_auc (float): maximum perturbation from estimated auc, default 0.1
target_auc (float): if None, using estimate current nominal auc
alpha (float): significanec level. If None use processRoc.alpha
Returns:
float: minimum sample size
'''
if alpha is None:
alpha=self.alpha
if target_auc is None:
if 'nominal' not in self._auc.keys():
self.auc()
target_auc=self._auc['nominal']
import scipy.stats as stats
z=stats. norm. ppf(1 - (alpha/2))
required_npos = (z*z)*target_auc*(1-target_auc)/(delta_auc*delta_auc)
return required_npos
def pvalue(self,
delta_auc=0.1,
twosided=True):
'''
compute p-value for given auc bounds
Args:
delta_auc (float): maximum perturbation from estimated auc, default 0.1
twosided (bool): one sided or twosided confidence bounds
Returns:
float: pvalue for the null hypothesis that estimated nominal auc is lower by more than delta_auc
'''
if 'nominal' not in self._auc.keys():
self.auc()
auc=self._auc['nominal']
import scipy.stats as stats
z=np.sqrt(self.positive_samples/(auc*(1-auc)/(delta_auc*delta_auc)) )
pvalue=stats.norm.sf(abs(z))
if twosided:
pvalue=2*pvalue
return pvalue
def interpret(self,
fpr=0.01,
number_of_positives=10,
five_yr_survival=None,
factor=1):
'''
generate simple interpretation of inferred model, based on a number of positive cases
Args:
fpr (float): the false psotive rate or 1-specificity of the operating point
number_of_positives (int): interpret assuming this many positive cases, default 10
five_yr_survival (float): fraction not experiencing severe event after 5 years (default: None)
factor (float): fraction of TP who avert the severe outcome due to correct screen
'''
wf=self.df.copy()
wf.loc[fpr]=pd.Series([],dtype=float)
wf=wf.sort_index().interpolate(method='spline',order=self.order,limit_direction='both')
row=wf.loc[fpr]
# POS=number_of_positives
# TP=POS*row.tpr
# FP = TP*((1/row.ppv) -1)
# NEG=FP/fpr
# TOTALFLAGS=TP+FP
# FN=POS-TP
# TN=POS/self.prevalence
#factor=0.21*(0.95-0.69) + 0.08*(0.95-0.17)
#factor=0.33*(0.95-0.17)
POS=number_of_positives
NEG=POS*(1-self.prevalence)*(1/self.prevalence)
TP=POS*row.tpr
TOTALFLAGS=TP/row.ppv
FP=TOTALFLAGS-TP
FN=POS-TP
TN=NEG-FP
if five_yr_survival is not None:
NNS=TOTALFLAGS/(TP*factor*(1- five_yr_survival))
else:
NNS=np.nan
resdf=pd.DataFrame.from_dict({"POS":np.round(POS),"TP":np.round(TP),
"FP":np.round(FP),"NEG":np.round(NEG),
"FLAGS":np.round(TOTALFLAGS),"FN":np.round(FN),
"TN":np.round(TN),
"NNS":np.round(NNS),
"FLAGGED_FRACTION":np.round(TOTALFLAGS/(POS+NEG),
2)},orient='index',columns=['estimates'])
rf=pd.DataFrame({'pos':np.round(POS),
'flags':int(np.round(TOTALFLAGS)),
'tp':int(np.round(TP)),
'fp':int(np.round(FP)),
'fn':int(np.round(FN)),
'tn':int(np.round(TN))},index=['numbers'])
pos=rf.pos.values[0]
flags=rf['flags'].values[0]
fp=rf['fp'].values[0]
tp=rf['tp'].values[0]
fn=rf['fn'].values[0]
txt=[f"For every {pos} positive instances",
f"we raise {flags} flags,",
f"out of which {tp} are true positives",
f"{fp} are false alarms",
f"{fn} cases are missed"]
if five_yr_survival is not None:
txt.append(f"Number needed to screen is {NNS}")
return rf,txt,resdf
def genroc(df,
risk='predicted_risk',
target='target',
steps=1000,
TARGET=[1],
outfile=None):
'''
compute roc curve from raw observation of risk-target information on samples
Args:
df (pandas.DataFrame): dataframe of raw samples identified as positive or negative with computed risk
target (str): name of target column
risk (str): name of risk column
TARGET (list): list of values of target column that define the positive case, default [1]
steps (int): steps between max and min of risk value, default 1000
outfile (str): write datafraem with fpr tpr threshold, default None
Returns:
pandas.DataFrame: roc dataframe
int: total number of samples
int: total number of positive samples
'''
threshold={}
df_=df[[risk,target]].rename(columns={risk:'risk',target:'target'})
delta=(df_.risk.max()-df_.risk.min())/steps
for r in np.arange(df_.risk.min(),df_.risk.max()+delta,delta):
#print(r)
fn=df_[(df_.risk<r) & df_.target.isin(TARGET)].index.size
tp=df_[(df_.risk>=r) & df_.target.isin(TARGET)].index.size
fp=df_[(df_.risk>=r) & ~df_.target.isin(TARGET)].index.size
tn=df_[(df_.risk<r) & ~df_.target.isin(TARGET)].index.size
threshold[r]={'tp':tp,'fp':fp,'tn':tn,'fn':fn}
xf=pd.DataFrame.from_dict(threshold).transpose()
xf.index.name='threshold'
xf=xf.assign(tpr=(xf.tp)/(xf.tp+xf.fn)).assign(fpr=(xf.fp)/(xf.fp+xf.tn))
xf=xf[['fpr','tpr']].reset_index()#.set_index('fpr')
if outfile is not None:
xf.to_csv(outfile)
return xf,df_.index.size,df_[df_.target.isin(TARGET)].index.size
def pipeline(df,
risk='predicted_risk',
target='target',
steps=1000,
TARGET=[1],
order=3,
alpha=0.05,
prevalence=.002,
precision=3,
outfile=None):
rf,total_samples,positive_samples=genroc(df,risk=risk,
target=target,
TARGET=TARGET)
zt=processRoc(rf,
order=order,
total_samples=total_samples,
positive_samples=positive_samples,
alpha=alpha,
prevalence=prevalence)
zt.smooth(STEP=0.001)
zt.allmeasures(interpolate=True)
zt.usample(precision=precision)
zt.getBounds()
df_=zt.get()
df_u=zt.df_lim['U']
df_l=zt.df_lim['L']
df_=df_.join(df_u,rsuffix='_upper').join(df_l,rsuffix='_lower')
if outfile is not None:
df_.to_csv(outfile)
return df_,zt.auc()
def score_to_probability(scores,df,
prevalence,
total_samples,
positive_samples,
alpha=0.05):
'''
returns score to probability and upper and lower bound fast
and robust, standalone.
implements: Statist. Med. 2007; 26:3258–3273
DOI: 10.1002/sim.2812
Prevalence-dependent diagnostic accuracy measures
Jialiang Li, Jason P. Fine and Nasia Safdar
'''
from scipy.stats import norm
z = norm.ppf(1 - alpha / 2)
se=df.tpr
sp=1-df.fpr
var_se = se * (1 - se) / positive_samples
var_sp = sp * (1 - sp) / (total_samples - positive_samples)
df['g1']= (1 - sp) / se
sigma_1 = np.sqrt(total_samples * (1 - sp)**2 * var_se / (se**4) + total_samples * var_sp / (se**2))
df['ci_g1'] = z * (sigma_1 / np.sqrt(total_samples))
df['ppv'] = se * prevalence / (se * prevalence + (1 - sp) * (1 - prevalence))
df['ppv_lower'] = 1/(1+ ((1-prevalence)/prevalence)*(df['g1']-df['ci_g1']))
df['ppv_upper'] = 1/(1+ ((1-prevalence)/prevalence)*(df['g1']+df['ci_g1']))
df=df.dropna().drop(['g1','ci_g1'],axis=1)
return [df[df.threshold>=score].tail(1)[['ppv','ppv_upper','ppv_lower']].round(2).values[0] for score in scores]
Functions
def genroc(df, risk='predicted_risk', target='target', steps=1000, TARGET=[1], outfile=None)
-
compute roc curve from raw observation of risk-target information on samples
Args
df
:pandas.DataFrame
- dataframe of raw samples identified as positive or negative with computed risk
target
:str
- name of target column
risk
:str
- name of risk column
TARGET
:list
- list of values of target column that define the positive case, default [1]
steps
:int
- steps between max and min of risk value, default 1000
outfile
:str
- write datafraem with fpr tpr threshold, default None
Returns
pandas.DataFrame
- roc dataframe
int
- total number of samples
int
- total number of positive samples
Expand source code
def genroc(df, risk='predicted_risk', target='target', steps=1000, TARGET=[1], outfile=None): ''' compute roc curve from raw observation of risk-target information on samples Args: df (pandas.DataFrame): dataframe of raw samples identified as positive or negative with computed risk target (str): name of target column risk (str): name of risk column TARGET (list): list of values of target column that define the positive case, default [1] steps (int): steps between max and min of risk value, default 1000 outfile (str): write datafraem with fpr tpr threshold, default None Returns: pandas.DataFrame: roc dataframe int: total number of samples int: total number of positive samples ''' threshold={} df_=df[[risk,target]].rename(columns={risk:'risk',target:'target'}) delta=(df_.risk.max()-df_.risk.min())/steps for r in np.arange(df_.risk.min(),df_.risk.max()+delta,delta): #print(r) fn=df_[(df_.risk<r) & df_.target.isin(TARGET)].index.size tp=df_[(df_.risk>=r) & df_.target.isin(TARGET)].index.size fp=df_[(df_.risk>=r) & ~df_.target.isin(TARGET)].index.size tn=df_[(df_.risk<r) & ~df_.target.isin(TARGET)].index.size threshold[r]={'tp':tp,'fp':fp,'tn':tn,'fn':fn} xf=pd.DataFrame.from_dict(threshold).transpose() xf.index.name='threshold' xf=xf.assign(tpr=(xf.tp)/(xf.tp+xf.fn)).assign(fpr=(xf.fp)/(xf.fp+xf.tn)) xf=xf[['fpr','tpr']].reset_index()#.set_index('fpr') if outfile is not None: xf.to_csv(outfile) return xf,df_.index.size,df_[df_.target.isin(TARGET)].index.size
def pipeline(df, risk='predicted_risk', target='target', steps=1000, TARGET=[1], order=3, alpha=0.05, prevalence=0.002, precision=3, outfile=None)
-
Expand source code
def pipeline(df, risk='predicted_risk', target='target', steps=1000, TARGET=[1], order=3, alpha=0.05, prevalence=.002, precision=3, outfile=None): rf,total_samples,positive_samples=genroc(df,risk=risk, target=target, TARGET=TARGET) zt=processRoc(rf, order=order, total_samples=total_samples, positive_samples=positive_samples, alpha=alpha, prevalence=prevalence) zt.smooth(STEP=0.001) zt.allmeasures(interpolate=True) zt.usample(precision=precision) zt.getBounds() df_=zt.get() df_u=zt.df_lim['U'] df_l=zt.df_lim['L'] df_=df_.join(df_u,rsuffix='_upper').join(df_l,rsuffix='_lower') if outfile is not None: df_.to_csv(outfile) return df_,zt.auc()
def score_to_probability(scores, df, prevalence, total_samples, positive_samples, alpha=0.05)
-
returns score to probability and upper and lower bound fast and robust, standalone. implements: Statist. Med. 2007; 26:3258–3273 DOI: 10.1002/sim.2812 Prevalence-dependent diagnostic accuracy measures Jialiang Li, Jason P. Fine and Nasia Safdar
Expand source code
def score_to_probability(scores,df, prevalence, total_samples, positive_samples, alpha=0.05): ''' returns score to probability and upper and lower bound fast and robust, standalone. implements: Statist. Med. 2007; 26:3258–3273 DOI: 10.1002/sim.2812 Prevalence-dependent diagnostic accuracy measures Jialiang Li, Jason P. Fine and Nasia Safdar ''' from scipy.stats import norm z = norm.ppf(1 - alpha / 2) se=df.tpr sp=1-df.fpr var_se = se * (1 - se) / positive_samples var_sp = sp * (1 - sp) / (total_samples - positive_samples) df['g1']= (1 - sp) / se sigma_1 = np.sqrt(total_samples * (1 - sp)**2 * var_se / (se**4) + total_samples * var_sp / (se**2)) df['ci_g1'] = z * (sigma_1 / np.sqrt(total_samples)) df['ppv'] = se * prevalence / (se * prevalence + (1 - sp) * (1 - prevalence)) df['ppv_lower'] = 1/(1+ ((1-prevalence)/prevalence)*(df['g1']-df['ci_g1'])) df['ppv_upper'] = 1/(1+ ((1-prevalence)/prevalence)*(df['g1']+df['ci_g1'])) df=df.dropna().drop(['g1','ci_g1'],axis=1) return [df[df.threshold>=score].tail(1)[['ppv','ppv_upper','ppv_lower']].round(2).values[0] for score in scores]
Classes
class processRoc (df=None, fprcol='fpr', tprcol='tpr', thresholdcol='threshold', prevalence=None, order=2, total_samples=None, positive_samples=None, alpha=0.05)
-
process ROC datafile
Initialization
Args
df
:pandas.DataFrame
- dataframe with columns tabulating fpr, tpr, and optionally threshold values
fprcol
:str
- string name of fpr column
tprcol
:str
- string name of tpr column
thresholdcol
:str
- string name of threshold column
prevalence
:float
- prevalence of positive cases in population (need not be the data ratio)
order
:int
- order of polynomial/spline for smoothing
total_samples
:int
- total number of samples in the original data
- positive samples (int): number of positive cases in the original data
alpha
:float
- significance level e.g. 0.05
Expand source code
class processRoc(object): """process ROC datafile""" def __init__(self, df=None, fprcol='fpr', tprcol='tpr', thresholdcol='threshold', prevalence=None, order=2, total_samples=None, positive_samples=None, alpha=0.05): """Initialization Args: df (pandas.DataFrame): dataframe with columns tabulating fpr, tpr, and optionally threshold values fprcol (str): string name of fpr column tprcol (str): string name of tpr column thresholdcol (str): string name of threshold column prevalence (float): prevalence of positive cases in population (need not be the data ratio) order (int): order of polynomial/spline for smoothing total_samples (int): total number of samples in the original data positive samples (int): number of positive cases in the original data alpha (float): significance level e.g. 0.05 """ #print('local version 1') if df.index.name==fprcol: self.df=df.copy() else: if fprcol in df.columns: self.df=df.set_index(fprcol).copy() else: raise('fpr not in columns or index') self.thresholdcol = thresholdcol if thresholdcol not in self.df.columns: self.thresholdcol=None self.df = self.df.sort_values('fpr') self.fprcol = fprcol self.tprcol = tprcol self.raw_df = self.df.copy() self.prevalence = prevalence self.order = order self.df_lim = {} self._auc = {'U':[],'L':[]} self.total_samples = total_samples self.positive_samples = positive_samples self.alpha = alpha self.df=self.df.groupby(self.fprcol).max().reset_index() def get(self): ''' return dataframe currently in class Returns: pandas.DataFrame ''' return self.df.copy() def nominal_auc(self): ''' calculate nominal auc ''' from sklearn.metrics import auc self._auc['nominal']=auc(self.df.index.values,self.df.tpr.values) return def auc(self, total_samples=None, positive_samples=None, alpha=None): ''' calculate auc with confidence bounds. As default, the arguments are read from class initializtion. Args: total_samples (int): total number fo samples, default None positive_samples (int): number fo positive samples, default None alpha (float): significance level, default None Returns: float: nominal auc float: upper bound float: lower bound ''' self.nominal_auc() self._auc['U']=[] self._auc['L']=[] self.getBounds(total_samples=total_samples, positive_samples=positive_samples, alpha=alpha) self.__auc_cb2(total_samples=total_samples, positive_samples=positive_samples, alpha=alpha) return self._auc['nominal'], self._auc['U'].min(), self._auc['L'].max() def __convexify(self): ''' compute convex hull of the roc curve ''' #print(self.df) from scipy.spatial import ConvexHull if self.df.index.name==self.fprcol: rf=self.df else: if self.fprcol in self.df.columns: rf=self.df.set_index(self.fprcol)#.drop('threshold',axis=1) else: raise('fpr not in columns or index') rf = rf.reset_index() rf=pd.concat([rf,pd.DataFrame({self.fprcol:0, self.tprcol:0},index=[0])]) rf=pd.concat([rf,pd.DataFrame({self.fprcol:1, self.tprcol:0},index=[0])]) rf=pd.concat([rf,pd.DataFrame({self.fprcol:1, self.tprcol:1},index=[0])]) rf=rf.drop_duplicates() rf=rf.sort_values(self.fprcol) rf=rf.sort_values(self.tprcol) pts=rf[[self.fprcol,self.tprcol]].values #print(pts) if len(pts)<3: rf_=rf.copy() rf_.columns=[self.tprcol,self.fprcol] rf_=rf_.sort_values(self.fprcol) rf_=rf_.sort_values(self.tprcol) self.df=rf_.set_index(self.fprcol).copy() return #self.df hull = ConvexHull(pts) rf_=pd.DataFrame(pts[hull.vertices, 0], pts[hull.vertices, 1]).reset_index() rf_.columns=[self.tprcol,self.fprcol] rf_ = rf_.set_index(self.fprcol) rf_ = rf_.drop(1.0).sort_index() rf_.loc[1.0]=1.0 self.df=rf_.copy() return def smooth(self, STEP=0.0001, interpolate=True, convexify=True): ''' smooth roc curves and update processRoc.df which is accessible using processRoc.get() Args: STEP (float): smooting step, default 0.0001 interpolate (bool): if True, interpolate missing values, default True convexify (bool): if True, replace ROC with convex hull, default True ''' self.df=self.raw_df.copy() VAR=self.fprcol df_=self.df.reset_index() DF=pd.concat([pd.DataFrame( df_[df_[VAR].between(i,i+STEP)].max()).transpose() for i in np.arange(0,1,STEP)]).set_index(VAR) DF=DF.dropna() DF.loc[0]=pd.Series([],dtype=float) DF.loc[1]=pd.Series([],dtype=float) DF.loc[0,'tpr']=0 DF.loc[1,'tpr']=1 DF=DF.sort_index() if interpolate: DF=DF.interpolate(limit_direction='both',method='spline',order=self.order) DF[DF < 0] = 0 self.df=DF if convexify: self.__convexify() return def allmeasures(self,prevalence=None,interpolate=False): ''' compute accuracy, PPV, NPV, positive and negative likelihood ratios, and update processRoc.df, which can be accessed using processRoc.get() Args: prevalence (float): prevalence od positive cases in population interpolate (bool): if True interpolate missing values, default False ''' if prevalence is not None: p=prevalence self.prevalence=p else: p=self.prevalence if p is None: raise('prevalence undefined') df__=self.df.copy() if self.fprcol == df__.index.name: df__=df__.reset_index() df__['ppv']=1/(1+((df__.fpr/df__.tpr)*((1/p)-1))) df__['acc']=p*df__.tpr + (1-p)*(1-df__.fpr) df__['npv']=1/(1+((1-df__.tpr)/(1-df__.fpr))*(1/((1/p)-1))) df__['LR+']=(df__.tpr)/(df__.fpr) df__['LR-']=(1-df__.tpr)/(1-df__.fpr) else: assert('set fpr as index') df__=df__.set_index(self.fprcol) #display(df__) df__=df__.replace(np.inf,np.nan) #display(df__) if interpolate: try: df__=df__.interpolate(limit_direction='both',method='spline',order=self.order) except: print('interpolation failed') #display(df__) if self.thresholdcol is not None: if self.thresholdcol not in df__.columns: df__=df__.join(self.raw_df[self.thresholdcol]) self.df=df__ self.__correctPPV() self.df[self.df < 0] = 0 self.df=self.df.reset_index().groupby('fpr').max() self.df.ppv=UnivariateSpline(self.df.index.values,self.df.ppv.values,k=1,s=None)(self.df.index.values) self.df[self.df < 0] = 0 #display(self.df) return #self.df def __correctPPV(self,df=None): ''' make ppv monotonic ''' if 'ppv' not in self.df.columns: return if df is None: df__=self.df else: df__=df.copy() #df__.loc[0].ppv=1.0 #df__.loc[1].ppv=0.0 arr=df__.ppv.values a_=[] for i in np.arange(len(arr)-1): if arr[i+1]>arr[i]: a_.append(1.05*arr[i+1]) else: a_.append(arr[i]) a_.append(arr[-1]) df__.ppv=a_ #df__.loc[0].ppv=1.0 #df__.loc[1].ppv=0.0 return df__ def scoretoprobability(self, score, regen=True, **kwargs): ''' Map computed score to probability of sample being in the positive class. This is simply the PPV corresponding to the threshold which equals the score. Now supports both single scores and lists/numpy arrays of scores. Args: score (float or list or numpy.ndarray): computed score(s) regen (bool): if True, regenerate roc curve kwargs (dict): values passed for regeneration of smoothed roc Return: float or numpy.ndarray representing probability of being in positive cohort ''' if score is None: return None if regen: STEP = 0.01 precision = 3 interpolate = True STEP = kwargs.get('STEP', STEP) precision = kwargs.get('precision', precision) interpolate = kwargs.get('interpolate', interpolate) self.smooth(STEP=STEP) self.allmeasures(interpolate=interpolate) self.usample(precision=precision) df = self.get() if 'threshold' not in df.columns: raise ValueError('Threshold not in columns or index') if 'ppv' not in df.columns: raise ValueError('PPV not in columns or index') def compute_val(score): if score is None: return None if score > df.threshold.max(): val = df.ppv.values.max() else: val = df[df.threshold > score].ppv.tail(1).values[0] return (val - df.ppv.values.min()) / (df.ppv.values.max() - df.ppv.values.min()) if isinstance(score, (list, np.ndarray)): return np.array([compute_val(s) for s in score]) else: return compute_val(score) def usample(self, df=None, precision=3): ''' make performance measures estimated at regular intervals of false positive rate Args: df (pandas.DataFrame): dataframe woth performance values, fpr as index. default: None, when the dataframe entered at initialization is used precision (int): number of digits after decismal point used to sample fpr range Returns: pandas.DataFrame: uniformly sampled performance dataframe ''' step=10**(-precision) fpr=[np.round(x,precision) for x in np.arange(0,1+step,step)] if df is None: fpr_=[x for x in fpr if x not in self.df.index] else: fpr_=[x for x in fpr if x not in df.index] if df is None: df___=self.df.copy() else: df___=df.copy() for x in fpr_: df___.loc[x]=pd.Series([],dtype=float) df___=df___.sort_index().interpolate() df___=df___.loc[fpr] if df is None: self.df=df___ return df___ def __getDelta(self, total_samples=None, positive_samples=None, alpha=None): ''' confidence bounds on specificity and sensitivity using Wald-type approach ''' if total_samples is None: total_samples=self.total_samples if positive_samples is None: positive_samples = self.positive_samples if alpha is None: alpha=self.alpha n=total_samples n_pos=positive_samples if self.fprcol not in self.df.columns: if self.fprcol != self.df.index.name: return if self.tprcol not in self.df.columns: return import scipy.stats as stats z=stats. norm. ppf(1 - (alpha/2)) delta_=pd.DataFrame() df_=self.df.copy() if self.fprcol == df_.index.name: df_=df_.reset_index() delta_['fprdel']=z*np.sqrt((df_.fpr*(1-df_.fpr))/n) delta_['tprdel']=z*np.sqrt((df_.tpr*(1-df_.tpr))/n_pos) self.delta_=delta_ return def getBounds(self, total_samples=None, positive_samples=None, alpha=None, prevalence=None): ''' compute confidence bounds on performance measures Args: total_samples (int): total number fo samples, default None positive_samples (int): number fo positive samples, default None alpha (float): significance level, default None prevalence (float): prevalence of positive cases in population, default None ''' self.__getDelta(total_samples=total_samples, positive_samples=positive_samples, alpha=alpha) if prevalence is None: p=self.prevalence else: p=prevalence if p is None: raise('prevalence undefined') for direction in ['U','L']: df__=self.df.copy().reset_index() if direction=='U': df__.tpr=df__.tpr+self.delta_.tprdel #df__.fpr=df__.fpr-self.delta_.fprdel else: df__.tpr=df__.tpr-self.delta_.tprdel #df__.fpr=df__.fpr+self.delta_.fprdel df__['ppv']=1/(1+((df__.fpr/df__.tpr)*((1/p)-1))) df__['acc']=p*df__.tpr + (1-p)*(1-df__.fpr) df__['npv']=1/(1+((1-df__.tpr)/(1-df__.fpr))*(1/((1/p)-1))) df__['LR+']=(df__.tpr)/(df__.fpr) df__['LR-']=(1-df__.tpr)/(1-df__.fpr) df__=df__.replace(np.inf,np.nan) df__=df__.interpolate(limit_direction='both',method='spline', order=self.order).set_index(self.fprcol) df__[df__ < 0] = 0 self.df_lim[direction]=self.__correctPPV(df__) if direction=='U': self._auc[direction]=np.array([np.append(self._auc[direction], auc(df__.index.values, df__.tpr.values)).min()]) if direction=='L': self._auc[direction]=np.array([np.append(self._auc[direction], auc(df__.index.values, df__.tpr.values)).max()]) # adjust datframe to cneter of upper and lowwr bounds #self.df=(self.df_lim['U']+ self.df_lim['L'] )/2 #self.__correctvalues() return def __correctvalues(self): #ppv if 'ppv' in self.df.columns: self.df.ppv[self.df.ppv>1]=1.0 if 'npv' in self.df.columns: self.df.npv[self.df.npv>1]=1.0 if 'LR-' in self.df.columns: self.df['LR-'][self.df['LR-']>1]=1.0 def __auc_cb2(self, total_samples=None, positive_samples=None, alpha=None): ''' compute auc confidence bounds using Danzig bounds Args: total_samples (int): total number fo samples, default None positive_samples (int): number fo positive samples, default None alpha (float): significance level, default None ''' if total_samples is None: total_samples=self.total_samples if positive_samples is None: positive_samples = self.positive_samples if alpha is None: alpha=self.alpha n=total_samples n_pos=positive_samples if 'nominal' not in self._auc.keys(): assert('calculate nominal auc first') import scipy.stats as stats auc=self._auc['nominal'] z=stats. norm. ppf(1 - (alpha/2)) eta=1+(n_pos/(z*z)) b=(auc-.5)/eta auc_U=auc+b+ (1/eta)*np.sqrt((auc-.5)**2 + (auc*(1-auc)*eta)) auc_L=auc+b- (1/eta)*np.sqrt((auc-.5)**2 + (auc*(1-auc)*eta)) self._auc['L']=np.array([np.append(self._auc['L'],auc_L).max()]) self._auc['U']=np.array([np.append(self._auc['U'],auc_U).min()]) return def operating_zone(self, n=1, LRplus=10, LRminus=0.6): ''' compute the end points of the operating zone, one for maximizing precions, and one for maximizing sensitivity Args: n (int): number of operting points per condition returned, default 1 LRplus (float): lower bound on positive likelihood ratio, default 10.0 LRminus (float): upper bound on negative likelihood ratio, default 0.6 ''' wf=self.df.copy() opf=pd.concat([wf[(wf['LR+']>LRplus) & (wf['LR-']<LRminus) ]\ .sort_values('ppv',ascending=False).head(n), wf[(wf['LR+']>LRplus) & (wf['LR-']<LRminus) ]\ .sort_values('tpr',ascending=False).head(n)]) if opf.empty: self._operating_zone=opf.copy() return #self._operating_zone.copy() self._operating_zone=opf.reset_index() self._operating_zone.index=['high precision']*n + ['high sensitivity']*n return #self._operating_zone.copy() def samplesize(self, delta_auc=0.1, target_auc=None, alpha=None): ''' estimate sample size for atataing auc bound under given significance level Args: delta_auc (float): maximum perturbation from estimated auc, default 0.1 target_auc (float): if None, using estimate current nominal auc alpha (float): significanec level. If None use processRoc.alpha Returns: float: minimum sample size ''' if alpha is None: alpha=self.alpha if target_auc is None: if 'nominal' not in self._auc.keys(): self.auc() target_auc=self._auc['nominal'] import scipy.stats as stats z=stats. norm. ppf(1 - (alpha/2)) required_npos = (z*z)*target_auc*(1-target_auc)/(delta_auc*delta_auc) return required_npos def pvalue(self, delta_auc=0.1, twosided=True): ''' compute p-value for given auc bounds Args: delta_auc (float): maximum perturbation from estimated auc, default 0.1 twosided (bool): one sided or twosided confidence bounds Returns: float: pvalue for the null hypothesis that estimated nominal auc is lower by more than delta_auc ''' if 'nominal' not in self._auc.keys(): self.auc() auc=self._auc['nominal'] import scipy.stats as stats z=np.sqrt(self.positive_samples/(auc*(1-auc)/(delta_auc*delta_auc)) ) pvalue=stats.norm.sf(abs(z)) if twosided: pvalue=2*pvalue return pvalue def interpret(self, fpr=0.01, number_of_positives=10, five_yr_survival=None, factor=1): ''' generate simple interpretation of inferred model, based on a number of positive cases Args: fpr (float): the false psotive rate or 1-specificity of the operating point number_of_positives (int): interpret assuming this many positive cases, default 10 five_yr_survival (float): fraction not experiencing severe event after 5 years (default: None) factor (float): fraction of TP who avert the severe outcome due to correct screen ''' wf=self.df.copy() wf.loc[fpr]=pd.Series([],dtype=float) wf=wf.sort_index().interpolate(method='spline',order=self.order,limit_direction='both') row=wf.loc[fpr] # POS=number_of_positives # TP=POS*row.tpr # FP = TP*((1/row.ppv) -1) # NEG=FP/fpr # TOTALFLAGS=TP+FP # FN=POS-TP # TN=POS/self.prevalence #factor=0.21*(0.95-0.69) + 0.08*(0.95-0.17) #factor=0.33*(0.95-0.17) POS=number_of_positives NEG=POS*(1-self.prevalence)*(1/self.prevalence) TP=POS*row.tpr TOTALFLAGS=TP/row.ppv FP=TOTALFLAGS-TP FN=POS-TP TN=NEG-FP if five_yr_survival is not None: NNS=TOTALFLAGS/(TP*factor*(1- five_yr_survival)) else: NNS=np.nan resdf=pd.DataFrame.from_dict({"POS":np.round(POS),"TP":np.round(TP), "FP":np.round(FP),"NEG":np.round(NEG), "FLAGS":np.round(TOTALFLAGS),"FN":np.round(FN), "TN":np.round(TN), "NNS":np.round(NNS), "FLAGGED_FRACTION":np.round(TOTALFLAGS/(POS+NEG), 2)},orient='index',columns=['estimates']) rf=pd.DataFrame({'pos':np.round(POS), 'flags':int(np.round(TOTALFLAGS)), 'tp':int(np.round(TP)), 'fp':int(np.round(FP)), 'fn':int(np.round(FN)), 'tn':int(np.round(TN))},index=['numbers']) pos=rf.pos.values[0] flags=rf['flags'].values[0] fp=rf['fp'].values[0] tp=rf['tp'].values[0] fn=rf['fn'].values[0] txt=[f"For every {pos} positive instances", f"we raise {flags} flags,", f"out of which {tp} are true positives", f"{fp} are false alarms", f"{fn} cases are missed"] if five_yr_survival is not None: txt.append(f"Number needed to screen is {NNS}") return rf,txt,resdf
Methods
def allmeasures(self, prevalence=None, interpolate=False)
-
compute accuracy, PPV, NPV, positive and negative likelihood ratios, and update processRoc.df, which can be accessed using processRoc.get()
Args
prevalence
:float
- prevalence od positive cases in population
interpolate
:bool
- if True interpolate missing values, default False
Expand source code
def allmeasures(self,prevalence=None,interpolate=False): ''' compute accuracy, PPV, NPV, positive and negative likelihood ratios, and update processRoc.df, which can be accessed using processRoc.get() Args: prevalence (float): prevalence od positive cases in population interpolate (bool): if True interpolate missing values, default False ''' if prevalence is not None: p=prevalence self.prevalence=p else: p=self.prevalence if p is None: raise('prevalence undefined') df__=self.df.copy() if self.fprcol == df__.index.name: df__=df__.reset_index() df__['ppv']=1/(1+((df__.fpr/df__.tpr)*((1/p)-1))) df__['acc']=p*df__.tpr + (1-p)*(1-df__.fpr) df__['npv']=1/(1+((1-df__.tpr)/(1-df__.fpr))*(1/((1/p)-1))) df__['LR+']=(df__.tpr)/(df__.fpr) df__['LR-']=(1-df__.tpr)/(1-df__.fpr) else: assert('set fpr as index') df__=df__.set_index(self.fprcol) #display(df__) df__=df__.replace(np.inf,np.nan) #display(df__) if interpolate: try: df__=df__.interpolate(limit_direction='both',method='spline',order=self.order) except: print('interpolation failed') #display(df__) if self.thresholdcol is not None: if self.thresholdcol not in df__.columns: df__=df__.join(self.raw_df[self.thresholdcol]) self.df=df__ self.__correctPPV() self.df[self.df < 0] = 0 self.df=self.df.reset_index().groupby('fpr').max() self.df.ppv=UnivariateSpline(self.df.index.values,self.df.ppv.values,k=1,s=None)(self.df.index.values) self.df[self.df < 0] = 0 #display(self.df) return #self.df
def auc(self, total_samples=None, positive_samples=None, alpha=None)
-
calculate auc with confidence bounds. As default, the arguments are read from class initializtion.
Args
total_samples
:int
- total number fo samples, default None
positive_samples
:int
- number fo positive samples, default None
alpha
:float
- significance level, default None
Returns
float
- nominal auc
float
- upper bound
float
- lower bound
Expand source code
def auc(self, total_samples=None, positive_samples=None, alpha=None): ''' calculate auc with confidence bounds. As default, the arguments are read from class initializtion. Args: total_samples (int): total number fo samples, default None positive_samples (int): number fo positive samples, default None alpha (float): significance level, default None Returns: float: nominal auc float: upper bound float: lower bound ''' self.nominal_auc() self._auc['U']=[] self._auc['L']=[] self.getBounds(total_samples=total_samples, positive_samples=positive_samples, alpha=alpha) self.__auc_cb2(total_samples=total_samples, positive_samples=positive_samples, alpha=alpha) return self._auc['nominal'], self._auc['U'].min(), self._auc['L'].max()
def get(self)
-
return dataframe currently in class
Returns
pandas.DataFrame
Expand source code
def get(self): ''' return dataframe currently in class Returns: pandas.DataFrame ''' return self.df.copy()
def getBounds(self, total_samples=None, positive_samples=None, alpha=None, prevalence=None)
-
compute confidence bounds on performance measures
Args
total_samples
:int
- total number fo samples, default None
positive_samples
:int
- number fo positive samples, default None
alpha
:float
- significance level, default None
prevalence
:float
- prevalence of positive cases in population, default None
Expand source code
def getBounds(self, total_samples=None, positive_samples=None, alpha=None, prevalence=None): ''' compute confidence bounds on performance measures Args: total_samples (int): total number fo samples, default None positive_samples (int): number fo positive samples, default None alpha (float): significance level, default None prevalence (float): prevalence of positive cases in population, default None ''' self.__getDelta(total_samples=total_samples, positive_samples=positive_samples, alpha=alpha) if prevalence is None: p=self.prevalence else: p=prevalence if p is None: raise('prevalence undefined') for direction in ['U','L']: df__=self.df.copy().reset_index() if direction=='U': df__.tpr=df__.tpr+self.delta_.tprdel #df__.fpr=df__.fpr-self.delta_.fprdel else: df__.tpr=df__.tpr-self.delta_.tprdel #df__.fpr=df__.fpr+self.delta_.fprdel df__['ppv']=1/(1+((df__.fpr/df__.tpr)*((1/p)-1))) df__['acc']=p*df__.tpr + (1-p)*(1-df__.fpr) df__['npv']=1/(1+((1-df__.tpr)/(1-df__.fpr))*(1/((1/p)-1))) df__['LR+']=(df__.tpr)/(df__.fpr) df__['LR-']=(1-df__.tpr)/(1-df__.fpr) df__=df__.replace(np.inf,np.nan) df__=df__.interpolate(limit_direction='both',method='spline', order=self.order).set_index(self.fprcol) df__[df__ < 0] = 0 self.df_lim[direction]=self.__correctPPV(df__) if direction=='U': self._auc[direction]=np.array([np.append(self._auc[direction], auc(df__.index.values, df__.tpr.values)).min()]) if direction=='L': self._auc[direction]=np.array([np.append(self._auc[direction], auc(df__.index.values, df__.tpr.values)).max()]) # adjust datframe to cneter of upper and lowwr bounds #self.df=(self.df_lim['U']+ self.df_lim['L'] )/2 #self.__correctvalues() return
def interpret(self, fpr=0.01, number_of_positives=10, five_yr_survival=None, factor=1)
-
generate simple interpretation of inferred model, based on a number of positive cases
Args
fpr
:float
- the false psotive rate or 1-specificity of the operating point
number_of_positives
:int
- interpret assuming this many positive cases, default 10
five_yr_survival
:float
- fraction not experiencing severe event after 5 years (default: None)
factor
:float
- fraction of TP who avert the severe outcome due to correct screen
Expand source code
def interpret(self, fpr=0.01, number_of_positives=10, five_yr_survival=None, factor=1): ''' generate simple interpretation of inferred model, based on a number of positive cases Args: fpr (float): the false psotive rate or 1-specificity of the operating point number_of_positives (int): interpret assuming this many positive cases, default 10 five_yr_survival (float): fraction not experiencing severe event after 5 years (default: None) factor (float): fraction of TP who avert the severe outcome due to correct screen ''' wf=self.df.copy() wf.loc[fpr]=pd.Series([],dtype=float) wf=wf.sort_index().interpolate(method='spline',order=self.order,limit_direction='both') row=wf.loc[fpr] # POS=number_of_positives # TP=POS*row.tpr # FP = TP*((1/row.ppv) -1) # NEG=FP/fpr # TOTALFLAGS=TP+FP # FN=POS-TP # TN=POS/self.prevalence #factor=0.21*(0.95-0.69) + 0.08*(0.95-0.17) #factor=0.33*(0.95-0.17) POS=number_of_positives NEG=POS*(1-self.prevalence)*(1/self.prevalence) TP=POS*row.tpr TOTALFLAGS=TP/row.ppv FP=TOTALFLAGS-TP FN=POS-TP TN=NEG-FP if five_yr_survival is not None: NNS=TOTALFLAGS/(TP*factor*(1- five_yr_survival)) else: NNS=np.nan resdf=pd.DataFrame.from_dict({"POS":np.round(POS),"TP":np.round(TP), "FP":np.round(FP),"NEG":np.round(NEG), "FLAGS":np.round(TOTALFLAGS),"FN":np.round(FN), "TN":np.round(TN), "NNS":np.round(NNS), "FLAGGED_FRACTION":np.round(TOTALFLAGS/(POS+NEG), 2)},orient='index',columns=['estimates']) rf=pd.DataFrame({'pos':np.round(POS), 'flags':int(np.round(TOTALFLAGS)), 'tp':int(np.round(TP)), 'fp':int(np.round(FP)), 'fn':int(np.round(FN)), 'tn':int(np.round(TN))},index=['numbers']) pos=rf.pos.values[0] flags=rf['flags'].values[0] fp=rf['fp'].values[0] tp=rf['tp'].values[0] fn=rf['fn'].values[0] txt=[f"For every {pos} positive instances", f"we raise {flags} flags,", f"out of which {tp} are true positives", f"{fp} are false alarms", f"{fn} cases are missed"] if five_yr_survival is not None: txt.append(f"Number needed to screen is {NNS}") return rf,txt,resdf
def nominal_auc(self)
-
calculate nominal auc
Expand source code
def nominal_auc(self): ''' calculate nominal auc ''' from sklearn.metrics import auc self._auc['nominal']=auc(self.df.index.values,self.df.tpr.values) return
def operating_zone(self, n=1, LRplus=10, LRminus=0.6)
-
compute the end points of the operating zone, one for maximizing precions, and one for maximizing sensitivity
Args
n
:int
- number of operting points per condition returned, default 1
LRplus
:float
- lower bound on positive likelihood ratio, default 10.0
LRminus
:float
- upper bound on negative likelihood ratio, default 0.6
Expand source code
def operating_zone(self, n=1, LRplus=10, LRminus=0.6): ''' compute the end points of the operating zone, one for maximizing precions, and one for maximizing sensitivity Args: n (int): number of operting points per condition returned, default 1 LRplus (float): lower bound on positive likelihood ratio, default 10.0 LRminus (float): upper bound on negative likelihood ratio, default 0.6 ''' wf=self.df.copy() opf=pd.concat([wf[(wf['LR+']>LRplus) & (wf['LR-']<LRminus) ]\ .sort_values('ppv',ascending=False).head(n), wf[(wf['LR+']>LRplus) & (wf['LR-']<LRminus) ]\ .sort_values('tpr',ascending=False).head(n)]) if opf.empty: self._operating_zone=opf.copy() return #self._operating_zone.copy() self._operating_zone=opf.reset_index() self._operating_zone.index=['high precision']*n + ['high sensitivity']*n return #self._operating_zone.copy()
def pvalue(self, delta_auc=0.1, twosided=True)
-
compute p-value for given auc bounds
Args
delta_auc
:float
- maximum perturbation from estimated auc, default 0.1
twosided
:bool
- one sided or twosided confidence bounds
Returns
float
- pvalue for the null hypothesis that estimated nominal auc is lower by more than delta_auc
Expand source code
def pvalue(self, delta_auc=0.1, twosided=True): ''' compute p-value for given auc bounds Args: delta_auc (float): maximum perturbation from estimated auc, default 0.1 twosided (bool): one sided or twosided confidence bounds Returns: float: pvalue for the null hypothesis that estimated nominal auc is lower by more than delta_auc ''' if 'nominal' not in self._auc.keys(): self.auc() auc=self._auc['nominal'] import scipy.stats as stats z=np.sqrt(self.positive_samples/(auc*(1-auc)/(delta_auc*delta_auc)) ) pvalue=stats.norm.sf(abs(z)) if twosided: pvalue=2*pvalue return pvalue
def samplesize(self, delta_auc=0.1, target_auc=None, alpha=None)
-
estimate sample size for atataing auc bound under given significance level
Args
delta_auc
:float
- maximum perturbation from estimated auc, default 0.1
target_auc
:float
- if None, using estimate current nominal auc
alpha
:float
- significanec level. If None use processRoc.alpha
Returns
float
- minimum sample size
Expand source code
def samplesize(self, delta_auc=0.1, target_auc=None, alpha=None): ''' estimate sample size for atataing auc bound under given significance level Args: delta_auc (float): maximum perturbation from estimated auc, default 0.1 target_auc (float): if None, using estimate current nominal auc alpha (float): significanec level. If None use processRoc.alpha Returns: float: minimum sample size ''' if alpha is None: alpha=self.alpha if target_auc is None: if 'nominal' not in self._auc.keys(): self.auc() target_auc=self._auc['nominal'] import scipy.stats as stats z=stats. norm. ppf(1 - (alpha/2)) required_npos = (z*z)*target_auc*(1-target_auc)/(delta_auc*delta_auc) return required_npos
def scoretoprobability(self, score, regen=True, **kwargs)
-
Map computed score to probability of sample being in the positive class. This is simply the PPV corresponding to the threshold which equals the score. Now supports both single scores and lists/numpy arrays of scores.
Args
score
:float
orlist
ornumpy.ndarray
- computed score(s)
regen
:bool
- if True, regenerate roc curve
kwargs
:dict
- values passed for regeneration of smoothed roc
Return
float or numpy.ndarray representing probability of being in positive cohort
Expand source code
def scoretoprobability(self, score, regen=True, **kwargs): ''' Map computed score to probability of sample being in the positive class. This is simply the PPV corresponding to the threshold which equals the score. Now supports both single scores and lists/numpy arrays of scores. Args: score (float or list or numpy.ndarray): computed score(s) regen (bool): if True, regenerate roc curve kwargs (dict): values passed for regeneration of smoothed roc Return: float or numpy.ndarray representing probability of being in positive cohort ''' if score is None: return None if regen: STEP = 0.01 precision = 3 interpolate = True STEP = kwargs.get('STEP', STEP) precision = kwargs.get('precision', precision) interpolate = kwargs.get('interpolate', interpolate) self.smooth(STEP=STEP) self.allmeasures(interpolate=interpolate) self.usample(precision=precision) df = self.get() if 'threshold' not in df.columns: raise ValueError('Threshold not in columns or index') if 'ppv' not in df.columns: raise ValueError('PPV not in columns or index') def compute_val(score): if score is None: return None if score > df.threshold.max(): val = df.ppv.values.max() else: val = df[df.threshold > score].ppv.tail(1).values[0] return (val - df.ppv.values.min()) / (df.ppv.values.max() - df.ppv.values.min()) if isinstance(score, (list, np.ndarray)): return np.array([compute_val(s) for s in score]) else: return compute_val(score)
def smooth(self, STEP=0.0001, interpolate=True, convexify=True)
-
smooth roc curves and update processRoc.df which is accessible using processRoc.get()
Args
STEP
:float
- smooting step, default 0.0001
interpolate
:bool
- if True, interpolate missing values, default True
convexify
:bool
- if True, replace ROC with convex hull, default True
Expand source code
def smooth(self, STEP=0.0001, interpolate=True, convexify=True): ''' smooth roc curves and update processRoc.df which is accessible using processRoc.get() Args: STEP (float): smooting step, default 0.0001 interpolate (bool): if True, interpolate missing values, default True convexify (bool): if True, replace ROC with convex hull, default True ''' self.df=self.raw_df.copy() VAR=self.fprcol df_=self.df.reset_index() DF=pd.concat([pd.DataFrame( df_[df_[VAR].between(i,i+STEP)].max()).transpose() for i in np.arange(0,1,STEP)]).set_index(VAR) DF=DF.dropna() DF.loc[0]=pd.Series([],dtype=float) DF.loc[1]=pd.Series([],dtype=float) DF.loc[0,'tpr']=0 DF.loc[1,'tpr']=1 DF=DF.sort_index() if interpolate: DF=DF.interpolate(limit_direction='both',method='spline',order=self.order) DF[DF < 0] = 0 self.df=DF if convexify: self.__convexify() return
def usample(self, df=None, precision=3)
-
make performance measures estimated at regular intervals of false positive rate
Args
df
:pandas.DataFrame
- dataframe woth performance values, fpr as index. default: None, when the dataframe entered at initialization is used
precision
:int
- number of digits after decismal point used to sample fpr range
Returns
pandas.DataFrame
- uniformly sampled performance dataframe
Expand source code
def usample(self, df=None, precision=3): ''' make performance measures estimated at regular intervals of false positive rate Args: df (pandas.DataFrame): dataframe woth performance values, fpr as index. default: None, when the dataframe entered at initialization is used precision (int): number of digits after decismal point used to sample fpr range Returns: pandas.DataFrame: uniformly sampled performance dataframe ''' step=10**(-precision) fpr=[np.round(x,precision) for x in np.arange(0,1+step,step)] if df is None: fpr_=[x for x in fpr if x not in self.df.index] else: fpr_=[x for x in fpr if x not in df.index] if df is None: df___=self.df.copy() else: df___=df.copy() for x in fpr_: df___.loc[x]=pd.Series([],dtype=float) df___=df___.sort_index().interpolate() df___=df___.loc[fpr] if df is None: self.df=df___ return df___