Module truthnet.truthnet

Expand source code
from quasinet.qnet import load_qnet, save_qnet
from quasinet.qnet import qdistance
from quasinet.qsampling import qsample
from quasinet.qnet import membership_degree
import pandas as pd
import numpy as np
from tqdm import tqdm
from quasinet.qnet import Qnet
import dill as pickle 
import gzip
import shap
from scipy.stats import t, lognorm
from .truthfinder import reveal, funcm, funcw, dissonance_distr_median
from distfit import distfit
from sklearn import metrics
from zedstat import zedstat
from concurrent.futures import ProcessPoolExecutor
 
global_NSTR = None
global_steps = None
global_model = None

def init_globals(model, steps, NSTR):
    '''
    global variable initialization necessary for 
    getting maximum paralleization in calibration

    Parameters:
    model: The model to be used globally across parallel tasks.
    steps: The number of steps to be used for a specific operation, globally.
    NSTR: Network String Representation, a global variable to represent the network state.
    '''
    global global_model, global_steps,  global_NSTR
    global_model = model
    global_steps = steps
    global_NSTR = NSTR

def task(seed):
    '''
    Helper function for parallelization

    Parameters:
    seed: An integer seed for random number generation to ensure reproducibility.

    Returns:
    A tuple containing the function 'm' output and the median dissonance distribution for a sample.
    '''
    s=qsample(global_NSTR, global_model, steps=global_steps)
    return funcm(s,global_model),dissonance_distr_median(s,global_model)

class truthnet:
    """
    The truthnet class is designed to train the Veritas model which is used to determine if a
    subject is being deceptive or untruthful or insincere in a structured interview. 
    Examples of target scenarios include identifying
    adversarial responses in contexts like mental health diagnosis interviews 
    or automated computer-aided diagnostic tests.
    
    Attributes:
        - datapath (str): Path to the survey or interview database with logged responses.
        - target_label (str): Name of the column in the dataset that identifies the ground truth.
        - index_present (bool): Specifies if the first column in the dataset is an index column.
        - target_label_positive (int): Value indicating a positive case for the target condition.
        - target_label_negative (int): Value indicating a negative case for the target condition.
        - training_fraction (float): Fraction of data used as training data to learn the Q-net models.
        - query_limit (int): Number of features used to determine malingering status in deployment.
        - shap_index (list): Ordered list of indices based on SHAP values for feature importance.
        - problem (str): A description or identifier for the type of problem being addressed.
        - threshold_alpha (float): Significance level for lower decision threshold.
        - threshold_alpha_veritas (float): Significance level for Veritas threshold.
        - veritas_model (dict): model, see detailed documentation on veritas model.
        - problem (str): descriptive string for problem
        - VERBOSE (bool): flag to denote if there should be verbose output
    """
    def __init__(self, datapath,
                 target_label,
                 problem='',
                 index_present=True,
                 target_label_positive=1,
                 target_label_negative=0,
                 training_fraction=0.3,
                 threshold_alpha=0.1,
                 threshold_alpha_veritas=1-0.015,
                 query_limit=None,
                 VERBOSE=False,
                 shap_index=None):
        self.datapath = datapath
        self.target_label = target_label
        self.index_present = index_present
        self.target_label_positive = target_label_positive
        self.target_label_negative = target_label_negative
        self.training_fraction = training_fraction
        self.query_limit = query_limit
        if query_limit is None:
            self.query_limit = -1
        self.shap_index = shap_index
        self.data = None  # Placeholder for the data once loaded
        self.veritas_model={}
        self.data = None
        self.problem = problem
        self.training_index = None
        self.threshold_alpha = threshold_alpha
        self.threshold_alpha_veritas = threshold_alpha_veritas
        self.VERBOSE = VERBOSE
        
    def fit(self,
            alpha=0.1,
            shap_index=None,
            shapnum=10,
            nullsteps=100000,
            veritas_version='0.0.1'):
        """
        Fits the Veritas model to the provided data. It involves training Q-net models
        for both positive and negative cases and determining feature importance using SHAP values.

        Parameters:
            - alpha (float): The significance level for Qnet.
            - shap_index (list): Predefined list of SHAP indices if available.
            - shapnum (int): Number of samples to calculate SHAP values for.
            - nullsteps (int): Number of steps for q-sampling in the background distribution.
            - veritas_version (str): Version identifier for the model.
        """
        
        if self.index_present:
            self.data = pd.read_csv(self.datapath,index_col=0, dtype=str, na_filter=False).fillna('').astype(str)
        else:
            self.data = pd.read_csv(self.datapath, dtype=str, na_filter=False).fillna('').astype(str)

        if self.VERBOSE:
            print('data reading complete')
            
        self.data = self.synccols(self.data)
        
        if self.VERBOSE:
            print(self.data)
        
        num_training=np.rint(self.training_fraction*self.data.index.size).astype(int)
        training_index=np.random.choice(self.data.index.values,num_training, replace=False)

        self.training_index=training_index
        df_training=self.data.loc[training_index,:]
        df_training = self.synccols(df_training)
        df_test = self.data.loc[[x for x in self.data.index.values
                                 if x not in training_index],:][df_training.columns]

        if self.target_label:
        
            df_training_pos=df_training[df_training[self.target_label]==str(self.target_label_positive)]
            df_training_neg=df_training[df_training[self.target_label]==str(self.target_label_negative)]
            Xpos_training=df_training_pos.drop(self.target_label,
                                               axis=1)\
                                         .values.astype(str)
            Xneg_training=df_training_neg.drop(self.target_label,
                                               axis=1)\
                                         .values.astype(str)

            featurenames = df_training_pos.drop(self.target_label,
                                                axis=1).columns

            if self.VERBOSE:
                print("training qnets")
            
            modelneg=Qnet(feature_names=featurenames,alpha=alpha)
            modelneg.fit(Xneg_training)
            modelpos=Qnet(feature_names=featurenames,alpha=alpha)
            modelpos.fit(Xpos_training)
            modelneg.training_index=training_index
            modelpos.training_index=training_index
            
        else:
            featurenames = df_training.columns
            X_training=df_training.values.astype(str)

            model=Qnet(feature_names=featurenames,alpha=alpha)
            model.fit(X_training)
            model.training_index=training_index

        
        def funcw_(S):
            return np.array([membership_degree(s,modelneg)
                             /membership_degree(s,modelpos) for s in S])

        def funcm_(S):
            return funcm(S,model)

        if self.target_label:
            X=df_test.drop(self.target_label,
                           axis=1).values.astype(str)
        
            NULLSTR=np.array(['']*len(modelneg.feature_names))
            s_background=qsample(NULLSTR,modelneg,steps=nullsteps)
            explainer = shap.KernelExplainer(funcw_,np.array([s_background]))
            shap_values = explainer.shap_values(X[:shapnum])
            
            self.shap_index=pd.DataFrame(shap_values.mean(axis=0),
                                         columns=['shap'])\
                              .sort_values('shap',
                                           ascending=False).index.values
            
            modelneg.shap_index=self.shap_index
            modelpos.shap_index=self.shap_index

            # save veritas model
            self.veritas_model['version']=veritas_version
            self.veritas_model['model']=modelpos
            self.veritas_model['model_neg']=modelneg
            self.veritas_model['problem']=self.problem
            self.veritas_model['shapvalues']=shap_values

        else: 

            X=df_test.values.astype(str)
        
            NULLSTR=np.array(['']*len(model.feature_names))
            s_background=qsample(NULLSTR,model,steps=nullsteps)
            explainer = shap.KernelExplainer(funcm_,np.array([s_background]))
            shap_values = explainer.shap_values(X[:shapnum])
            
            self.shap_index=pd.DataFrame(shap_values.mean(axis=0),
                                         columns=['shap'])\
                              .sort_values('shap',
                                           ascending=False).index.values
            
            model.shap_index=self.shap_index

            self.veritas_model['version']=veritas_version
            self.veritas_model['model']=model
            self.veritas_model['problem']=self.problem
        
        return

    def save(self, filepath):
        '''
        save veritas model

        Parameters:
        filepath (str): The path where the model should be saved.
        '''
        with gzip.open(filepath, 'wb') as file:
            M=self.veritas_model
            pickle.dump(M, file)
    
    def calibrate(self,
                  qsteps=1000,num_workers=11,
                  calibration_num=10000):
        """
        Calibrates the decision thresholds for the Veritas model using the distribution of scores
        from the trained model. It involves sampling, revealing, and fitting distributions 
        to determine appropriate thresholds.


        Parameters:
        qsteps (int): Steps for q-sampling during calibration.
        num_workers (int): Number of parallel workers for calibration.
        calibration_num (int): Number of calibration samples.
        """

        featurenames = self.veritas_model['model'].feature_names
        NSTR = np.array([''] * len(featurenames)).astype('U100')
        model = self.veritas_model['model']

        if self.VERBOSE:
            print('calibrating...')

        seed=0
        init_globals(model, qsteps, NSTR)
        with ProcessPoolExecutor(max_workers=num_workers,
                                 initializer=init_globals,
                                 initargs=(model, qsteps, NSTR)) as executor:
            seeds = [seed for _ in range(calibration_num)]
            results = list(tqdm(executor.map(task, seeds),
                            total=calibration_num))


        lower_ = np.array([x[0] for x in results])
        veritas_ = np.array([x[1] for x in results])
        
        self.veritas_model['calibration_lower']=lower_
        self.veritas_model['calibration_veritas']=veritas_

        # Fitting distributions to lower and veritas thresholds
        dfit = distfit(distr='lognorm',verbose=None)
        dfit.fit_transform(lower_)
        df, loc, scale = dfit.model['params']
        dist = lognorm(df, loc=loc, scale=scale)
        self.veritas_model['dist_lower'] = dist        
        self.veritas_model['LOWER_THRESHOLD'] = dist.ppf(self.threshold_alpha)

        dfitv = distfit(smooth=10, distr='lognorm',verbose=None)
        dfitv.fit_transform(veritas_)
        dfv, locv, scalev = dfitv.model['params']
        distv = lognorm(dfv, loc=locv, scale=scalev)
        self.veritas_model['dist_veritas'] = distv        
        self.veritas_model['VERITAS_THRESHOLD'] = distv.ppf(self.threshold_alpha_veritas)

        if self.VERBOSE:
            print(self.veritas_model)
        
        # Using test data to infer the decision threshold for the upper threshold
        if self.target_label:
            df_test = self.data.loc[[x for x in self.data.index.values if x not in self.training_index], :]
            featurenames = df_test.drop(self.target_label, axis=1, errors='ignore').columns
            labels = df_test[self.target_label].values.astype(int)

            df_test = df_test.drop(self.target_label, axis=1, errors='ignore')
            df_test = pd.concat([pd.DataFrame(columns=featurenames),
                                  df_test[featurenames[self.shap_index[:self.query_limit]]]]).fillna('')
            X= df_test.values.astype(str)

            pred = np.array([funcw(s,
                         self.veritas_model['model'],
                         self.veritas_model['model_neg']) for s in X])

            # Calculating metrics and determining the upper threshold
            fpr, tpr, thresholds = metrics.roc_curve(labels, pred, pos_label=1)
            rf = pd.DataFrame(tpr, fpr, columns=['tpr']).assign(threshold=thresholds)
            rf.index.name = 'fpr'
            rf=rf.reset_index()
            zt = zedstat.processRoc(df=rf, order=3, total_samples=2*calibration_num,
                                    positive_samples=calibration_num, alpha=0.01, prevalence=0.5)
            zt.smooth(STEP=0.001)
            zt.allmeasures(interpolate=True)
            zt.usample(precision=3)
            Z = zt.get()
            
            if self.VERBOSE:
                rf.to_csv('tmp.csv')
                print(X,labels,pred,rf,Z)
                
            self.veritas_model['upper_scoretoprobability'] = zt.scoretoprobability
            
            if Z.ppv.values[0] > 0.85:
                THR=0.85
            else:
                THR=Z.ppv.values[2]
                
            self.veritas_model['UPPER_THRESHOLD'] = Z[Z.ppv > THR].threshold.values[-1]
            self.veritas_model['AUC'] = zt.auc()
        return 
    
    def synccols(self, df_):
        """
        Synchronize columns between positive and negative cases.

        Parameters:
        df_ (DataFrame): The DataFrame to process.

        Returns:
        DataFrame: A DataFrame with synchronized columns.
        """
        df=df_.copy()
        if self.target_label:
            df1 = df[df[self.target_label] ==  str(self.target_label_positive)]
            df0 = df[df[self.target_label] ==  str(self.target_label_negative)]
            col1 = df1.replace('', pd.NA).dropna(axis=1, how='all').columns
            col0 = df0.replace('', pd.NA).dropna(axis=1, how='all').columns
            col = [x for x in col0 if x in col1]
            return df[col]
        else:
            return remove_identical_columns(df_)    
        
def load_veritas_model(filepath):
    '''
    Load a Veritas model from a specified file.

    Parameters:
    filepath (str): The path to the file containing the saved Veritas model.

    Returns:
    The loaded Veritas model.
    '''
    with gzip.open(filepath, 'rb') as file:
        model = pickle.load(file)
    return model

def remove_identical_columns(df):
    '''
    Remove columns from a DataFrame that have identical values across all rows.

    Parameters:
    df (DataFrame): The DataFrame to process.

    Returns:
    DataFrame: A DataFrame with identical columns removed.
    '''
    columns_to_drop = [col for col in df.columns if df[col].nunique() == 1]
    df_cleaned = df.drop(columns=columns_to_drop)
    
    return df_cleaned
 

def train(datapath,modelpath,
          shapnum=10,target_label=None,
          query_limit=20,calibration_num=5000):
    '''
    Train a Veritas model with specified parameters.

    Parameters:
    datapath (str): Path to the data file.
    modelpath (str): Path to save the trained model.
    shapnum (int): Number of samples for SHAP value calculation.
    target_label (str): Target label column name.
    query_limit (int): Limit on the number of features to use.
    calibration_num (int): Number of samples for calibration.

    '''
    TR=truthnet(datapath=datapath,
                target_label=target_label,
                query_limit=query_limit,VERBOSE=False)
    TR.fit(shapnum=shapnum)
    rf=TR.calibrate(calibration_num=calibration_num)
    TR.save(modelpath)

Functions

def init_globals(model, steps, NSTR)

global variable initialization necessary for getting maximum paralleization in calibration

Parameters: model: The model to be used globally across parallel tasks. steps: The number of steps to be used for a specific operation, globally. NSTR: Network String Representation, a global variable to represent the network state.

Expand source code
def init_globals(model, steps, NSTR):
    '''
    global variable initialization necessary for 
    getting maximum paralleization in calibration

    Parameters:
    model: The model to be used globally across parallel tasks.
    steps: The number of steps to be used for a specific operation, globally.
    NSTR: Network String Representation, a global variable to represent the network state.
    '''
    global global_model, global_steps,  global_NSTR
    global_model = model
    global_steps = steps
    global_NSTR = NSTR
def load_veritas_model(filepath)

Load a Veritas model from a specified file.

Parameters: filepath (str): The path to the file containing the saved Veritas model.

Returns: The loaded Veritas model.

Expand source code
def load_veritas_model(filepath):
    '''
    Load a Veritas model from a specified file.

    Parameters:
    filepath (str): The path to the file containing the saved Veritas model.

    Returns:
    The loaded Veritas model.
    '''
    with gzip.open(filepath, 'rb') as file:
        model = pickle.load(file)
    return model
def remove_identical_columns(df)

Remove columns from a DataFrame that have identical values across all rows.

Parameters: df (DataFrame): The DataFrame to process.

Returns: DataFrame: A DataFrame with identical columns removed.

Expand source code
def remove_identical_columns(df):
    '''
    Remove columns from a DataFrame that have identical values across all rows.

    Parameters:
    df (DataFrame): The DataFrame to process.

    Returns:
    DataFrame: A DataFrame with identical columns removed.
    '''
    columns_to_drop = [col for col in df.columns if df[col].nunique() == 1]
    df_cleaned = df.drop(columns=columns_to_drop)
    
    return df_cleaned
def task(seed)

Helper function for parallelization

Parameters: seed: An integer seed for random number generation to ensure reproducibility.

Returns: A tuple containing the function 'm' output and the median dissonance distribution for a sample.

Expand source code
def task(seed):
    '''
    Helper function for parallelization

    Parameters:
    seed: An integer seed for random number generation to ensure reproducibility.

    Returns:
    A tuple containing the function 'm' output and the median dissonance distribution for a sample.
    '''
    s=qsample(global_NSTR, global_model, steps=global_steps)
    return funcm(s,global_model),dissonance_distr_median(s,global_model)
def train(datapath, modelpath, shapnum=10, target_label=None, query_limit=20, calibration_num=5000)

Train a Veritas model with specified parameters.

Parameters: datapath (str): Path to the data file. modelpath (str): Path to save the trained model. shapnum (int): Number of samples for SHAP value calculation. target_label (str): Target label column name. query_limit (int): Limit on the number of features to use. calibration_num (int): Number of samples for calibration.

Expand source code
def train(datapath,modelpath,
          shapnum=10,target_label=None,
          query_limit=20,calibration_num=5000):
    '''
    Train a Veritas model with specified parameters.

    Parameters:
    datapath (str): Path to the data file.
    modelpath (str): Path to save the trained model.
    shapnum (int): Number of samples for SHAP value calculation.
    target_label (str): Target label column name.
    query_limit (int): Limit on the number of features to use.
    calibration_num (int): Number of samples for calibration.

    '''
    TR=truthnet(datapath=datapath,
                target_label=target_label,
                query_limit=query_limit,VERBOSE=False)
    TR.fit(shapnum=shapnum)
    rf=TR.calibrate(calibration_num=calibration_num)
    TR.save(modelpath)

Classes

class truthnet (datapath, target_label, problem='', index_present=True, target_label_positive=1, target_label_negative=0, training_fraction=0.3, threshold_alpha=0.1, threshold_alpha_veritas=0.985, query_limit=None, VERBOSE=False, shap_index=None)

The truthnet class is designed to train the Veritas model which is used to determine if a subject is being deceptive or untruthful or insincere in a structured interview. Examples of target scenarios include identifying adversarial responses in contexts like mental health diagnosis interviews or automated computer-aided diagnostic tests.

Attributes

  • datapath (str): Path to the survey or interview database with logged responses.
  • target_label (str): Name of the column in the dataset that identifies the ground truth.
  • index_present (bool): Specifies if the first column in the dataset is an index column.
  • target_label_positive (int): Value indicating a positive case for the target condition.
  • target_label_negative (int): Value indicating a negative case for the target condition.
  • training_fraction (float): Fraction of data used as training data to learn the Q-net models.
  • query_limit (int): Number of features used to determine malingering status in deployment.
  • shap_index (list): Ordered list of indices based on SHAP values for feature importance.
  • problem (str): A description or identifier for the type of problem being addressed.
  • threshold_alpha (float): Significance level for lower decision threshold.
  • threshold_alpha_veritas (float): Significance level for Veritas threshold.
  • veritas_model (dict): model, see detailed documentation on veritas model.
  • problem (str): descriptive string for problem
  • VERBOSE (bool): flag to denote if there should be verbose output
Expand source code
class truthnet:
    """
    The truthnet class is designed to train the Veritas model which is used to determine if a
    subject is being deceptive or untruthful or insincere in a structured interview. 
    Examples of target scenarios include identifying
    adversarial responses in contexts like mental health diagnosis interviews 
    or automated computer-aided diagnostic tests.
    
    Attributes:
        - datapath (str): Path to the survey or interview database with logged responses.
        - target_label (str): Name of the column in the dataset that identifies the ground truth.
        - index_present (bool): Specifies if the first column in the dataset is an index column.
        - target_label_positive (int): Value indicating a positive case for the target condition.
        - target_label_negative (int): Value indicating a negative case for the target condition.
        - training_fraction (float): Fraction of data used as training data to learn the Q-net models.
        - query_limit (int): Number of features used to determine malingering status in deployment.
        - shap_index (list): Ordered list of indices based on SHAP values for feature importance.
        - problem (str): A description or identifier for the type of problem being addressed.
        - threshold_alpha (float): Significance level for lower decision threshold.
        - threshold_alpha_veritas (float): Significance level for Veritas threshold.
        - veritas_model (dict): model, see detailed documentation on veritas model.
        - problem (str): descriptive string for problem
        - VERBOSE (bool): flag to denote if there should be verbose output
    """
    def __init__(self, datapath,
                 target_label,
                 problem='',
                 index_present=True,
                 target_label_positive=1,
                 target_label_negative=0,
                 training_fraction=0.3,
                 threshold_alpha=0.1,
                 threshold_alpha_veritas=1-0.015,
                 query_limit=None,
                 VERBOSE=False,
                 shap_index=None):
        self.datapath = datapath
        self.target_label = target_label
        self.index_present = index_present
        self.target_label_positive = target_label_positive
        self.target_label_negative = target_label_negative
        self.training_fraction = training_fraction
        self.query_limit = query_limit
        if query_limit is None:
            self.query_limit = -1
        self.shap_index = shap_index
        self.data = None  # Placeholder for the data once loaded
        self.veritas_model={}
        self.data = None
        self.problem = problem
        self.training_index = None
        self.threshold_alpha = threshold_alpha
        self.threshold_alpha_veritas = threshold_alpha_veritas
        self.VERBOSE = VERBOSE
        
    def fit(self,
            alpha=0.1,
            shap_index=None,
            shapnum=10,
            nullsteps=100000,
            veritas_version='0.0.1'):
        """
        Fits the Veritas model to the provided data. It involves training Q-net models
        for both positive and negative cases and determining feature importance using SHAP values.

        Parameters:
            - alpha (float): The significance level for Qnet.
            - shap_index (list): Predefined list of SHAP indices if available.
            - shapnum (int): Number of samples to calculate SHAP values for.
            - nullsteps (int): Number of steps for q-sampling in the background distribution.
            - veritas_version (str): Version identifier for the model.
        """
        
        if self.index_present:
            self.data = pd.read_csv(self.datapath,index_col=0, dtype=str, na_filter=False).fillna('').astype(str)
        else:
            self.data = pd.read_csv(self.datapath, dtype=str, na_filter=False).fillna('').astype(str)

        if self.VERBOSE:
            print('data reading complete')
            
        self.data = self.synccols(self.data)
        
        if self.VERBOSE:
            print(self.data)
        
        num_training=np.rint(self.training_fraction*self.data.index.size).astype(int)
        training_index=np.random.choice(self.data.index.values,num_training, replace=False)

        self.training_index=training_index
        df_training=self.data.loc[training_index,:]
        df_training = self.synccols(df_training)
        df_test = self.data.loc[[x for x in self.data.index.values
                                 if x not in training_index],:][df_training.columns]

        if self.target_label:
        
            df_training_pos=df_training[df_training[self.target_label]==str(self.target_label_positive)]
            df_training_neg=df_training[df_training[self.target_label]==str(self.target_label_negative)]
            Xpos_training=df_training_pos.drop(self.target_label,
                                               axis=1)\
                                         .values.astype(str)
            Xneg_training=df_training_neg.drop(self.target_label,
                                               axis=1)\
                                         .values.astype(str)

            featurenames = df_training_pos.drop(self.target_label,
                                                axis=1).columns

            if self.VERBOSE:
                print("training qnets")
            
            modelneg=Qnet(feature_names=featurenames,alpha=alpha)
            modelneg.fit(Xneg_training)
            modelpos=Qnet(feature_names=featurenames,alpha=alpha)
            modelpos.fit(Xpos_training)
            modelneg.training_index=training_index
            modelpos.training_index=training_index
            
        else:
            featurenames = df_training.columns
            X_training=df_training.values.astype(str)

            model=Qnet(feature_names=featurenames,alpha=alpha)
            model.fit(X_training)
            model.training_index=training_index

        
        def funcw_(S):
            return np.array([membership_degree(s,modelneg)
                             /membership_degree(s,modelpos) for s in S])

        def funcm_(S):
            return funcm(S,model)

        if self.target_label:
            X=df_test.drop(self.target_label,
                           axis=1).values.astype(str)
        
            NULLSTR=np.array(['']*len(modelneg.feature_names))
            s_background=qsample(NULLSTR,modelneg,steps=nullsteps)
            explainer = shap.KernelExplainer(funcw_,np.array([s_background]))
            shap_values = explainer.shap_values(X[:shapnum])
            
            self.shap_index=pd.DataFrame(shap_values.mean(axis=0),
                                         columns=['shap'])\
                              .sort_values('shap',
                                           ascending=False).index.values
            
            modelneg.shap_index=self.shap_index
            modelpos.shap_index=self.shap_index

            # save veritas model
            self.veritas_model['version']=veritas_version
            self.veritas_model['model']=modelpos
            self.veritas_model['model_neg']=modelneg
            self.veritas_model['problem']=self.problem
            self.veritas_model['shapvalues']=shap_values

        else: 

            X=df_test.values.astype(str)
        
            NULLSTR=np.array(['']*len(model.feature_names))
            s_background=qsample(NULLSTR,model,steps=nullsteps)
            explainer = shap.KernelExplainer(funcm_,np.array([s_background]))
            shap_values = explainer.shap_values(X[:shapnum])
            
            self.shap_index=pd.DataFrame(shap_values.mean(axis=0),
                                         columns=['shap'])\
                              .sort_values('shap',
                                           ascending=False).index.values
            
            model.shap_index=self.shap_index

            self.veritas_model['version']=veritas_version
            self.veritas_model['model']=model
            self.veritas_model['problem']=self.problem
        
        return

    def save(self, filepath):
        '''
        save veritas model

        Parameters:
        filepath (str): The path where the model should be saved.
        '''
        with gzip.open(filepath, 'wb') as file:
            M=self.veritas_model
            pickle.dump(M, file)
    
    def calibrate(self,
                  qsteps=1000,num_workers=11,
                  calibration_num=10000):
        """
        Calibrates the decision thresholds for the Veritas model using the distribution of scores
        from the trained model. It involves sampling, revealing, and fitting distributions 
        to determine appropriate thresholds.


        Parameters:
        qsteps (int): Steps for q-sampling during calibration.
        num_workers (int): Number of parallel workers for calibration.
        calibration_num (int): Number of calibration samples.
        """

        featurenames = self.veritas_model['model'].feature_names
        NSTR = np.array([''] * len(featurenames)).astype('U100')
        model = self.veritas_model['model']

        if self.VERBOSE:
            print('calibrating...')

        seed=0
        init_globals(model, qsteps, NSTR)
        with ProcessPoolExecutor(max_workers=num_workers,
                                 initializer=init_globals,
                                 initargs=(model, qsteps, NSTR)) as executor:
            seeds = [seed for _ in range(calibration_num)]
            results = list(tqdm(executor.map(task, seeds),
                            total=calibration_num))


        lower_ = np.array([x[0] for x in results])
        veritas_ = np.array([x[1] for x in results])
        
        self.veritas_model['calibration_lower']=lower_
        self.veritas_model['calibration_veritas']=veritas_

        # Fitting distributions to lower and veritas thresholds
        dfit = distfit(distr='lognorm',verbose=None)
        dfit.fit_transform(lower_)
        df, loc, scale = dfit.model['params']
        dist = lognorm(df, loc=loc, scale=scale)
        self.veritas_model['dist_lower'] = dist        
        self.veritas_model['LOWER_THRESHOLD'] = dist.ppf(self.threshold_alpha)

        dfitv = distfit(smooth=10, distr='lognorm',verbose=None)
        dfitv.fit_transform(veritas_)
        dfv, locv, scalev = dfitv.model['params']
        distv = lognorm(dfv, loc=locv, scale=scalev)
        self.veritas_model['dist_veritas'] = distv        
        self.veritas_model['VERITAS_THRESHOLD'] = distv.ppf(self.threshold_alpha_veritas)

        if self.VERBOSE:
            print(self.veritas_model)
        
        # Using test data to infer the decision threshold for the upper threshold
        if self.target_label:
            df_test = self.data.loc[[x for x in self.data.index.values if x not in self.training_index], :]
            featurenames = df_test.drop(self.target_label, axis=1, errors='ignore').columns
            labels = df_test[self.target_label].values.astype(int)

            df_test = df_test.drop(self.target_label, axis=1, errors='ignore')
            df_test = pd.concat([pd.DataFrame(columns=featurenames),
                                  df_test[featurenames[self.shap_index[:self.query_limit]]]]).fillna('')
            X= df_test.values.astype(str)

            pred = np.array([funcw(s,
                         self.veritas_model['model'],
                         self.veritas_model['model_neg']) for s in X])

            # Calculating metrics and determining the upper threshold
            fpr, tpr, thresholds = metrics.roc_curve(labels, pred, pos_label=1)
            rf = pd.DataFrame(tpr, fpr, columns=['tpr']).assign(threshold=thresholds)
            rf.index.name = 'fpr'
            rf=rf.reset_index()
            zt = zedstat.processRoc(df=rf, order=3, total_samples=2*calibration_num,
                                    positive_samples=calibration_num, alpha=0.01, prevalence=0.5)
            zt.smooth(STEP=0.001)
            zt.allmeasures(interpolate=True)
            zt.usample(precision=3)
            Z = zt.get()
            
            if self.VERBOSE:
                rf.to_csv('tmp.csv')
                print(X,labels,pred,rf,Z)
                
            self.veritas_model['upper_scoretoprobability'] = zt.scoretoprobability
            
            if Z.ppv.values[0] > 0.85:
                THR=0.85
            else:
                THR=Z.ppv.values[2]
                
            self.veritas_model['UPPER_THRESHOLD'] = Z[Z.ppv > THR].threshold.values[-1]
            self.veritas_model['AUC'] = zt.auc()
        return 
    
    def synccols(self, df_):
        """
        Synchronize columns between positive and negative cases.

        Parameters:
        df_ (DataFrame): The DataFrame to process.

        Returns:
        DataFrame: A DataFrame with synchronized columns.
        """
        df=df_.copy()
        if self.target_label:
            df1 = df[df[self.target_label] ==  str(self.target_label_positive)]
            df0 = df[df[self.target_label] ==  str(self.target_label_negative)]
            col1 = df1.replace('', pd.NA).dropna(axis=1, how='all').columns
            col0 = df0.replace('', pd.NA).dropna(axis=1, how='all').columns
            col = [x for x in col0 if x in col1]
            return df[col]
        else:
            return remove_identical_columns(df_)    

Methods

def calibrate(self, qsteps=1000, num_workers=11, calibration_num=10000)

Calibrates the decision thresholds for the Veritas model using the distribution of scores from the trained model. It involves sampling, revealing, and fitting distributions to determine appropriate thresholds.

Parameters: qsteps (int): Steps for q-sampling during calibration. num_workers (int): Number of parallel workers for calibration. calibration_num (int): Number of calibration samples.

Expand source code
def calibrate(self,
              qsteps=1000,num_workers=11,
              calibration_num=10000):
    """
    Calibrates the decision thresholds for the Veritas model using the distribution of scores
    from the trained model. It involves sampling, revealing, and fitting distributions 
    to determine appropriate thresholds.


    Parameters:
    qsteps (int): Steps for q-sampling during calibration.
    num_workers (int): Number of parallel workers for calibration.
    calibration_num (int): Number of calibration samples.
    """

    featurenames = self.veritas_model['model'].feature_names
    NSTR = np.array([''] * len(featurenames)).astype('U100')
    model = self.veritas_model['model']

    if self.VERBOSE:
        print('calibrating...')

    seed=0
    init_globals(model, qsteps, NSTR)
    with ProcessPoolExecutor(max_workers=num_workers,
                             initializer=init_globals,
                             initargs=(model, qsteps, NSTR)) as executor:
        seeds = [seed for _ in range(calibration_num)]
        results = list(tqdm(executor.map(task, seeds),
                        total=calibration_num))


    lower_ = np.array([x[0] for x in results])
    veritas_ = np.array([x[1] for x in results])
    
    self.veritas_model['calibration_lower']=lower_
    self.veritas_model['calibration_veritas']=veritas_

    # Fitting distributions to lower and veritas thresholds
    dfit = distfit(distr='lognorm',verbose=None)
    dfit.fit_transform(lower_)
    df, loc, scale = dfit.model['params']
    dist = lognorm(df, loc=loc, scale=scale)
    self.veritas_model['dist_lower'] = dist        
    self.veritas_model['LOWER_THRESHOLD'] = dist.ppf(self.threshold_alpha)

    dfitv = distfit(smooth=10, distr='lognorm',verbose=None)
    dfitv.fit_transform(veritas_)
    dfv, locv, scalev = dfitv.model['params']
    distv = lognorm(dfv, loc=locv, scale=scalev)
    self.veritas_model['dist_veritas'] = distv        
    self.veritas_model['VERITAS_THRESHOLD'] = distv.ppf(self.threshold_alpha_veritas)

    if self.VERBOSE:
        print(self.veritas_model)
    
    # Using test data to infer the decision threshold for the upper threshold
    if self.target_label:
        df_test = self.data.loc[[x for x in self.data.index.values if x not in self.training_index], :]
        featurenames = df_test.drop(self.target_label, axis=1, errors='ignore').columns
        labels = df_test[self.target_label].values.astype(int)

        df_test = df_test.drop(self.target_label, axis=1, errors='ignore')
        df_test = pd.concat([pd.DataFrame(columns=featurenames),
                              df_test[featurenames[self.shap_index[:self.query_limit]]]]).fillna('')
        X= df_test.values.astype(str)

        pred = np.array([funcw(s,
                     self.veritas_model['model'],
                     self.veritas_model['model_neg']) for s in X])

        # Calculating metrics and determining the upper threshold
        fpr, tpr, thresholds = metrics.roc_curve(labels, pred, pos_label=1)
        rf = pd.DataFrame(tpr, fpr, columns=['tpr']).assign(threshold=thresholds)
        rf.index.name = 'fpr'
        rf=rf.reset_index()
        zt = zedstat.processRoc(df=rf, order=3, total_samples=2*calibration_num,
                                positive_samples=calibration_num, alpha=0.01, prevalence=0.5)
        zt.smooth(STEP=0.001)
        zt.allmeasures(interpolate=True)
        zt.usample(precision=3)
        Z = zt.get()
        
        if self.VERBOSE:
            rf.to_csv('tmp.csv')
            print(X,labels,pred,rf,Z)
            
        self.veritas_model['upper_scoretoprobability'] = zt.scoretoprobability
        
        if Z.ppv.values[0] > 0.85:
            THR=0.85
        else:
            THR=Z.ppv.values[2]
            
        self.veritas_model['UPPER_THRESHOLD'] = Z[Z.ppv > THR].threshold.values[-1]
        self.veritas_model['AUC'] = zt.auc()
    return 
def fit(self, alpha=0.1, shap_index=None, shapnum=10, nullsteps=100000, veritas_version='0.0.1')

Fits the Veritas model to the provided data. It involves training Q-net models for both positive and negative cases and determining feature importance using SHAP values.

Parameters

  • alpha (float): The significance level for Qnet.
  • shap_index (list): Predefined list of SHAP indices if available.
  • shapnum (int): Number of samples to calculate SHAP values for.
  • nullsteps (int): Number of steps for q-sampling in the background distribution.
  • veritas_version (str): Version identifier for the model.
Expand source code
def fit(self,
        alpha=0.1,
        shap_index=None,
        shapnum=10,
        nullsteps=100000,
        veritas_version='0.0.1'):
    """
    Fits the Veritas model to the provided data. It involves training Q-net models
    for both positive and negative cases and determining feature importance using SHAP values.

    Parameters:
        - alpha (float): The significance level for Qnet.
        - shap_index (list): Predefined list of SHAP indices if available.
        - shapnum (int): Number of samples to calculate SHAP values for.
        - nullsteps (int): Number of steps for q-sampling in the background distribution.
        - veritas_version (str): Version identifier for the model.
    """
    
    if self.index_present:
        self.data = pd.read_csv(self.datapath,index_col=0, dtype=str, na_filter=False).fillna('').astype(str)
    else:
        self.data = pd.read_csv(self.datapath, dtype=str, na_filter=False).fillna('').astype(str)

    if self.VERBOSE:
        print('data reading complete')
        
    self.data = self.synccols(self.data)
    
    if self.VERBOSE:
        print(self.data)
    
    num_training=np.rint(self.training_fraction*self.data.index.size).astype(int)
    training_index=np.random.choice(self.data.index.values,num_training, replace=False)

    self.training_index=training_index
    df_training=self.data.loc[training_index,:]
    df_training = self.synccols(df_training)
    df_test = self.data.loc[[x for x in self.data.index.values
                             if x not in training_index],:][df_training.columns]

    if self.target_label:
    
        df_training_pos=df_training[df_training[self.target_label]==str(self.target_label_positive)]
        df_training_neg=df_training[df_training[self.target_label]==str(self.target_label_negative)]
        Xpos_training=df_training_pos.drop(self.target_label,
                                           axis=1)\
                                     .values.astype(str)
        Xneg_training=df_training_neg.drop(self.target_label,
                                           axis=1)\
                                     .values.astype(str)

        featurenames = df_training_pos.drop(self.target_label,
                                            axis=1).columns

        if self.VERBOSE:
            print("training qnets")
        
        modelneg=Qnet(feature_names=featurenames,alpha=alpha)
        modelneg.fit(Xneg_training)
        modelpos=Qnet(feature_names=featurenames,alpha=alpha)
        modelpos.fit(Xpos_training)
        modelneg.training_index=training_index
        modelpos.training_index=training_index
        
    else:
        featurenames = df_training.columns
        X_training=df_training.values.astype(str)

        model=Qnet(feature_names=featurenames,alpha=alpha)
        model.fit(X_training)
        model.training_index=training_index

    
    def funcw_(S):
        return np.array([membership_degree(s,modelneg)
                         /membership_degree(s,modelpos) for s in S])

    def funcm_(S):
        return funcm(S,model)

    if self.target_label:
        X=df_test.drop(self.target_label,
                       axis=1).values.astype(str)
    
        NULLSTR=np.array(['']*len(modelneg.feature_names))
        s_background=qsample(NULLSTR,modelneg,steps=nullsteps)
        explainer = shap.KernelExplainer(funcw_,np.array([s_background]))
        shap_values = explainer.shap_values(X[:shapnum])
        
        self.shap_index=pd.DataFrame(shap_values.mean(axis=0),
                                     columns=['shap'])\
                          .sort_values('shap',
                                       ascending=False).index.values
        
        modelneg.shap_index=self.shap_index
        modelpos.shap_index=self.shap_index

        # save veritas model
        self.veritas_model['version']=veritas_version
        self.veritas_model['model']=modelpos
        self.veritas_model['model_neg']=modelneg
        self.veritas_model['problem']=self.problem
        self.veritas_model['shapvalues']=shap_values

    else: 

        X=df_test.values.astype(str)
    
        NULLSTR=np.array(['']*len(model.feature_names))
        s_background=qsample(NULLSTR,model,steps=nullsteps)
        explainer = shap.KernelExplainer(funcm_,np.array([s_background]))
        shap_values = explainer.shap_values(X[:shapnum])
        
        self.shap_index=pd.DataFrame(shap_values.mean(axis=0),
                                     columns=['shap'])\
                          .sort_values('shap',
                                       ascending=False).index.values
        
        model.shap_index=self.shap_index

        self.veritas_model['version']=veritas_version
        self.veritas_model['model']=model
        self.veritas_model['problem']=self.problem
    
    return
def save(self, filepath)

save veritas model

Parameters: filepath (str): The path where the model should be saved.

Expand source code
def save(self, filepath):
    '''
    save veritas model

    Parameters:
    filepath (str): The path where the model should be saved.
    '''
    with gzip.open(filepath, 'wb') as file:
        M=self.veritas_model
        pickle.dump(M, file)
def synccols(self, df_)

Synchronize columns between positive and negative cases.

Parameters: df_ (DataFrame): The DataFrame to process.

Returns: DataFrame: A DataFrame with synchronized columns.

Expand source code
def synccols(self, df_):
    """
    Synchronize columns between positive and negative cases.

    Parameters:
    df_ (DataFrame): The DataFrame to process.

    Returns:
    DataFrame: A DataFrame with synchronized columns.
    """
    df=df_.copy()
    if self.target_label:
        df1 = df[df[self.target_label] ==  str(self.target_label_positive)]
        df0 = df[df[self.target_label] ==  str(self.target_label_negative)]
        col1 = df1.replace('', pd.NA).dropna(axis=1, how='all').columns
        col0 = df0.replace('', pd.NA).dropna(axis=1, how='all').columns
        col = [x for x in col0 if x in col1]
        return df[col]
    else:
        return remove_identical_columns(df_)