Module truthnet.truthfinder

Expand source code
import gzip
import dill as pickle
import pandas as pd
from quasinet.qnet import load_qnet
from quasinet.qnet import qdistance
from quasinet.qsampling import qsample
from quasinet.qnet import membership_degree
import numpy as np


DIAGNOSIS_THRESHOLD=1.35
VERITAS_THRESHOLD=0.76
LOWER_FABRICATION_THREHOLD=1


def check_json_format(json_data):
    """
    Checks if the given JSON data follows the specified format:
    - JSON is a list
    - Each item in the list is an object with a string or int key (patient ID)
    - Each value corresponding to a patient ID is an object with string or int keys (question IDs) and integer values (answers)
    """
    # Check if the data is a list
    if not isinstance(json_data, (np.ndarray,list)):
        print('not nd.array or list')
        return False

    for item in json_data:
        # Each item in the list should be a dictionary (object)
        if not isinstance(item, dict):
            print('items not dict')
            return False

        for patient_id, questions in item.items():
            # Patient ID should be a string or int
            if not isinstance(patient_id, (str, int)):
                print('pid not str or int')
                return False

            # Questions should be a dictionary (object)
            if not isinstance(questions, dict):
                print('questions not dict')
                return False

            for question_id, answer in questions.items():
                # Question ID should be a string or int
                if not isinstance(question_id, (str, int)):
                    print('question id not str or int')
                    return False

                # Answer should be an integer
                if not isinstance(answer, (str,int)):
                    print('question response not str or int')
                    return False

    print('ckeck passed')
    return True
    
    
def make_str_format(resp_json):
    # check format
    if check_json_format(resp_json):
        return [
            {
                patient_id: {
                    question_id: str(int(response)) if response != '' else response
                    for question_id, response in patient_responses.items()
                }
                for patient_id, patient_responses in patient.items()
            }
            for patient in resp_json
        ]


def load_from_pkl_gz(filename):
    """
    Unpickles and loads the contents of a .pkl.gz file.

    :param filename: The path to the .pkl.gz file.
    :return: The unpickled data.
    """
    with gzip.open(filename, 'rb') as f:
        data = pickle.load(f)
    return data


def dissonance(pos,seq,model):
    if seq[pos]=='':
        return np.nan
    D=model.predict_distributions(seq) 
    return 1-D[pos].get(str(seq[pos]),0)

def dissonance_distr(seq,model):
    return np.array([dissonance(pos,seq,model) for pos in range(len(seq))])

def dissonance_distr_median(seq,model):
    a=dissonance_distr(seq,model)
    return np.median(a[~np.isnan(a)])


# Function to read the JSON file and extract the top-level keys
# as subject ID and the associated dictionary as responses
def extract_ptsd_items(jsondata):
        data=jsondata
        subjects = []
        for entry in data:
            for subject_id, responses in entry.items():
                subjects.append({"subject_id": subject_id,
                                 "responses": responses})
        return subjects

def funcw(s,model_pos,model_neg):
    '''
    funcw should be greater than DIAGNOSIS_THRESHOLD
    '''
    neg=membership_degree(s,model_neg)
    pos=membership_degree(s,model_pos)
    
    return neg/pos

def funcm(array,model_pos,model_neg=None):
    '''
    funcm should be greater than LOWER_FABRICATION_THREHOLD=1. Lower values indicate fabrication
    '''
    if isinstance(array, np.ndarray):
        if isinstance(array[0], str):
            return -membership_degree(array,model_pos)/(array!='').sum() 
        if isinstance(array[0],np.ndarray):
            if isinstance(array[0][0], str):
                return np.array([-membership_degree(s,model_pos)/(s!='').sum()  for s in array])
    raise('incorrect datatype. must be 2d numpy array of strings')
    return


    
def reveal(resp_json,
           veritas_model_path,
           perturb=3,
           score=True,
           ci=True,
           model_path=True):
    patients_responses = make_str_format(resp_json)
    
    list_response_dict = extract_ptsd_items(patients_responses)

    if model_path:
        veritas_model = load_from_pkl_gz(veritas_model_path)
    else:
        veritas_model = veritas_model_path

    message=[]
    for i in list_response_dict:
        subjectid=i['subject_id']
        resp = i['responses']
        s=pd.concat([pd.DataFrame(columns=veritas_model['model'].feature_names),
                   pd.DataFrame(resp,index=['response'])])\
                        .fillna('').values[0].astype(str)

        if perturb > 0:
            s=qsample(s,veritas_model['model'],steps=perturb)
            
        i['veritas'] = dissonance_distr_median(s,veritas_model['model'])

        if score:
            i['score']=funcw(s,
                         veritas_model['model'], 
                         veritas_model['model_neg'])
        else:
            i['score']=0
            
        i['lower_threshold']=funcm(s,
                         veritas_model['model'])
        if ci:
            i['veritas_prob'] = veritas_model['dist_veritas'].cdf(i['veritas'])
            i['lower_prob'] = veritas_model['dist_lower'].cdf(i['lower_threshold'])

        message = message + [interpret(i)]
    return list_response_dict, message


def interpret(calculated_score):
    lower_threshold=calculated_score.get('lower_threshold',None)
    score=calculated_score.get('score',None)
    veritas=calculated_score.get('veritas',None)
    veritas_prob=calculated_score.get('veritas_prob',None)


    MESSAGE={-1:'No PTSD indicated. Malingering test unnecessary',
             0:'No Malingering detected. True PTSD indicated',
             1:'Fabrication detected',
             2:'Maligering detected. You are likely lying with probability > '+ str(veritas_prob)[:5]}

    if lower_threshold < LOWER_FABRICATION_THREHOLD:
        malingering_class=1
    else:
        if score > DIAGNOSIS_THRESHOLD:
            if veritas > VERITAS_THRESHOLD:
                malingering_class=2
            else:
                malingering_class=0
        else:
             malingering_class=-1
                
    return MESSAGE[malingering_class]

Functions

def check_json_format(json_data)

Checks if the given JSON data follows the specified format: - JSON is a list - Each item in the list is an object with a string or int key (patient ID) - Each value corresponding to a patient ID is an object with string or int keys (question IDs) and integer values (answers)

Expand source code
def check_json_format(json_data):
    """
    Checks if the given JSON data follows the specified format:
    - JSON is a list
    - Each item in the list is an object with a string or int key (patient ID)
    - Each value corresponding to a patient ID is an object with string or int keys (question IDs) and integer values (answers)
    """
    # Check if the data is a list
    if not isinstance(json_data, (np.ndarray,list)):
        print('not nd.array or list')
        return False

    for item in json_data:
        # Each item in the list should be a dictionary (object)
        if not isinstance(item, dict):
            print('items not dict')
            return False

        for patient_id, questions in item.items():
            # Patient ID should be a string or int
            if not isinstance(patient_id, (str, int)):
                print('pid not str or int')
                return False

            # Questions should be a dictionary (object)
            if not isinstance(questions, dict):
                print('questions not dict')
                return False

            for question_id, answer in questions.items():
                # Question ID should be a string or int
                if not isinstance(question_id, (str, int)):
                    print('question id not str or int')
                    return False

                # Answer should be an integer
                if not isinstance(answer, (str,int)):
                    print('question response not str or int')
                    return False

    print('ckeck passed')
    return True
def dissonance(pos, seq, model)
Expand source code
def dissonance(pos,seq,model):
    if seq[pos]=='':
        return np.nan
    D=model.predict_distributions(seq) 
    return 1-D[pos].get(str(seq[pos]),0)
def dissonance_distr(seq, model)
Expand source code
def dissonance_distr(seq,model):
    return np.array([dissonance(pos,seq,model) for pos in range(len(seq))])
def dissonance_distr_median(seq, model)
Expand source code
def dissonance_distr_median(seq,model):
    a=dissonance_distr(seq,model)
    return np.median(a[~np.isnan(a)])
def extract_ptsd_items(jsondata)
Expand source code
def extract_ptsd_items(jsondata):
        data=jsondata
        subjects = []
        for entry in data:
            for subject_id, responses in entry.items():
                subjects.append({"subject_id": subject_id,
                                 "responses": responses})
        return subjects
def funcm(array, model_pos, model_neg=None)

funcm should be greater than LOWER_FABRICATION_THREHOLD=1. Lower values indicate fabrication

Expand source code
def funcm(array,model_pos,model_neg=None):
    '''
    funcm should be greater than LOWER_FABRICATION_THREHOLD=1. Lower values indicate fabrication
    '''
    if isinstance(array, np.ndarray):
        if isinstance(array[0], str):
            return -membership_degree(array,model_pos)/(array!='').sum() 
        if isinstance(array[0],np.ndarray):
            if isinstance(array[0][0], str):
                return np.array([-membership_degree(s,model_pos)/(s!='').sum()  for s in array])
    raise('incorrect datatype. must be 2d numpy array of strings')
    return
def funcw(s, model_pos, model_neg)

funcw should be greater than DIAGNOSIS_THRESHOLD

Expand source code
def funcw(s,model_pos,model_neg):
    '''
    funcw should be greater than DIAGNOSIS_THRESHOLD
    '''
    neg=membership_degree(s,model_neg)
    pos=membership_degree(s,model_pos)
    
    return neg/pos
def interpret(calculated_score)
Expand source code
def interpret(calculated_score):
    lower_threshold=calculated_score.get('lower_threshold',None)
    score=calculated_score.get('score',None)
    veritas=calculated_score.get('veritas',None)
    veritas_prob=calculated_score.get('veritas_prob',None)


    MESSAGE={-1:'No PTSD indicated. Malingering test unnecessary',
             0:'No Malingering detected. True PTSD indicated',
             1:'Fabrication detected',
             2:'Maligering detected. You are likely lying with probability > '+ str(veritas_prob)[:5]}

    if lower_threshold < LOWER_FABRICATION_THREHOLD:
        malingering_class=1
    else:
        if score > DIAGNOSIS_THRESHOLD:
            if veritas > VERITAS_THRESHOLD:
                malingering_class=2
            else:
                malingering_class=0
        else:
             malingering_class=-1
                
    return MESSAGE[malingering_class]
def load_from_pkl_gz(filename)

Unpickles and loads the contents of a .pkl.gz file.

:param filename: The path to the .pkl.gz file. :return: The unpickled data.

Expand source code
def load_from_pkl_gz(filename):
    """
    Unpickles and loads the contents of a .pkl.gz file.

    :param filename: The path to the .pkl.gz file.
    :return: The unpickled data.
    """
    with gzip.open(filename, 'rb') as f:
        data = pickle.load(f)
    return data
def make_str_format(resp_json)
Expand source code
def make_str_format(resp_json):
    # check format
    if check_json_format(resp_json):
        return [
            {
                patient_id: {
                    question_id: str(int(response)) if response != '' else response
                    for question_id, response in patient_responses.items()
                }
                for patient_id, patient_responses in patient.items()
            }
            for patient in resp_json
        ]
def reveal(resp_json, veritas_model_path, perturb=3, score=True, ci=True, model_path=True)
Expand source code
def reveal(resp_json,
           veritas_model_path,
           perturb=3,
           score=True,
           ci=True,
           model_path=True):
    patients_responses = make_str_format(resp_json)
    
    list_response_dict = extract_ptsd_items(patients_responses)

    if model_path:
        veritas_model = load_from_pkl_gz(veritas_model_path)
    else:
        veritas_model = veritas_model_path

    message=[]
    for i in list_response_dict:
        subjectid=i['subject_id']
        resp = i['responses']
        s=pd.concat([pd.DataFrame(columns=veritas_model['model'].feature_names),
                   pd.DataFrame(resp,index=['response'])])\
                        .fillna('').values[0].astype(str)

        if perturb > 0:
            s=qsample(s,veritas_model['model'],steps=perturb)
            
        i['veritas'] = dissonance_distr_median(s,veritas_model['model'])

        if score:
            i['score']=funcw(s,
                         veritas_model['model'], 
                         veritas_model['model_neg'])
        else:
            i['score']=0
            
        i['lower_threshold']=funcm(s,
                         veritas_model['model'])
        if ci:
            i['veritas_prob'] = veritas_model['dist_veritas'].cdf(i['veritas'])
            i['lower_prob'] = veritas_model['dist_lower'].cdf(i['lower_threshold'])

        message = message + [interpret(i)]
    return list_response_dict, message