Module qbiome.hypothesis
Expand source code
import pygraphviz as pgv
import networkx as nx
import numpy as np
import re
import sys
import os
from scipy import stats
import warnings
import pandas as pd
import glob
from tqdm import tqdm
warnings.filterwarnings('ignore')
class Hypothesis(object):
"""Generate and analyze hypotheses from models inferred. Assume biomes_timestamp format is biome_timestamp.
Also assume that dotname or decision tree name format is biome_timestamp.dot
Mathematical model of causal hypothesis:
---
```
Local Marginal Regulation Coefficient: Aiming to estimate the up-regulatory/down-regulatory influence of a source organism/entity on a target organism/entity, where regulation effects are causally localized in time (future cannot affect the past) with limited memory, and potential confounding effects from other entities/organisms are marginalized out.
```
Let us assume a general dependency between stochastic processes \(\\nu,u, \omega \) :
$$ \\nu_t = \\phi(u_{\leftarrow t},\omega_{\leftarrow t}) $$
We estimate the sign of \( \\alpha_t\) in a locally linear marginalized relationship \( \\nu_t = \\alpha_t u_{t'} + c \) with \(t' \in [ t-\delta, t] \) as follows:
Attributes:
qnet_orchestrator (qbiome.QnetOrchestrator): instance of qbiome.QnetOrchestrator with trained qnet model
model_path (str, optional): ath to directory containing generated decision trees in dot format (Default value = None)
no_self_loops (bool, optional): If True do not report self-loops in hypotheses (Default value = True)
causal_constraint (float, optional): lag of source inputs from target effects. >= 0 is causal (Default value = 0)
total_samples (int, optional): total number of samples used to construct decision model (Default value = 100)
detailed_labels (bool, optional): if True, decision tree models have detailed output (Default value = False)
MAPNAME (str): path to dequantization map
"""
#### def __init__(self,qnet_orchestrator,
# model_path=None,
# no_self_loops=True,
# causal_constraint=0,
# total_samples=100,
# detailed_labels=False):
def __init__(self,
qnet_orchestrator=None,
quantizer=None,
quantizer_mapfile=None,
model_path=None,
no_self_loops=True,
causal_constraint=0,
total_samples=100,
detailed_labels=False):
"""
"""
self.time_start = None
self.time_end = None
self.model_path = model_path
self.qnet_orchestrator = qnet_orchestrator
self.quantizer = quantizer
if all(v is None for v in[qnet_orchestrator,quantizer,quantizer_mapfile]):
raise Exception('Either qnet_orchestrator or quantizer or quantizer_mapfile must be provided to Hypothesis')
if self.qnet_orchestrator is not None:
self.quantizer = self.qnet_orchestrator.quantizer
if self.quantizer is not None:
self.variable_bin_map = self.quantizer.variable_bin_map
else:
self.variable_bin_map = np.load(quantizer_mapfile, allow_pickle=True)
self.biomes_timestamp = [x for x in self.variable_bin_map.keys()]
self.biomes = list(set(['_'.join(x.split('_')[:-1])
for x in self.biomes_timestamp]))
self.NMAP = self.variable_bin_map
if self.quantizer is not None:
self.LABELS = quantizer.labels
else:
warnings.warn("Using manually coded labels. Provide quantizer to Hypothesis")
self.LABELS = {'A':0, 'B':1, 'C':2, 'D':3, 'E':4}
self.total_samples = total_samples
self.detailed_labels = detailed_labels
self.decision_tree = None
self.tree_labels = None
self.tree_edgelabels = None
self.TGT = None
self.SRC = None
self.no_self_loops = no_self_loops
self.causal_constraint = causal_constraint
self.hypotheses=pd.DataFrame(columns=['src','tgt','time_tgt','lomar','pvalue'])
def src_time_constraint(self,source,target):
"""Constrain which time points for source can be
considered as inputs when computing source to target influences.
If causal_constraint is None, there are no restrictions.
Otherwise, only sources that are at least causal_constraint
units of time prior to the target may be considered. Default is 0.
Negative values are possible.
Args:
source (str): full source name in biome_timestamp format
target (str): full target name in biome_timestamp format
Returns:
bool: Truth value of src acceptance
"""
_, source_biome_full, source_timestamp, _ \
= re.split(r'(.*)_(\d+)', source)
_, target_biome_full, target_timestamp, _ \
= re.split(r'(.*)_(\d+)', target)
if self.causal_constraint is not None:
if (float(source_timestamp)
+ self.causal_constraint
<= float(target_timestamp)):
return True
return False
return True
def deQuantize_lowlevel(self,
letter,
bin_arr):
"""Low level dequantizer function
Args:
letter (str): quantized level (str) or nan
bin_arr (numpy.ndarray): 1D array of floats, which are quantization levels from trained quantizer.
Returns:
float: dequantized value
"""
if letter is np.nan or letter == 'nan':
return np.nan
lo = self.LABELS[letter]
hi = lo + 1
val = (bin_arr[lo] + bin_arr[hi]) / 2
return val
def deQuantizer(self,
letter,
biome_prefix,
timestamp_list=None,
time_start=None,
time_end=None):
"""Dequantizer function calling low level deQuantize_lowlevel to account
for the possibility of multiple timestamps being averaged over,
and ability to operate with incomplete biome names.
Args:
letter (str): quantized level (str) or nan
biome_prefix (str): prefix of biome name
timestamp_list (numpy.ndarray, optional): 1D array of int time-stamps to consider. (Default value = None)
time_start (int, optional): Start time. (Default value = None)
time_end (int, optional): End time. (Default value = None)
Returns:
float: median of dequantized value
"""
vals = []
if timestamp_list is None:
if time_start is None and self.time_end is None:
# average over all
for biome_key in self.NMAP:
if biome_prefix in biome_key:
vals.append(
self.deQuantize_lowlevel(letter,
self.NMAP[biome_key]))
elif time_start is None:
time_end = int(time_end)
for biome_key in self.NMAP:
_, biome_full, time, _ \
= re.split(r'(.*)_(\d+)', biome_key)
time = int(time)
if (time <= time_end) and (biome_prefix in biome_key):
vals.append(
self.deQuantize_lowlevel(letter, self.NMAP[biome_key]))
elif time_end is None:
time_start = int(time_start)
for biome_key in self.NMAP:
_, biome_full, time, _ = re.split(r'(.*)_(\d+)', biome_key)
time = int(time)
if (time_start <= time) and (biome_prefix in biome_key):
vals.append(
self.deQuantize_lowlevel(letter, self.NMAP[biome_key]))
else: # both present
time_start = int(time_start)
time_end = int(time_end)
for biome_key in self.NMAP:
_, biome_full, time, _ = re.split(r'(.*)_(\d+)', biome_key)
time = int(time)
if (time_start <= time <= time_end) and (
biome_prefix in biome_key):
vals.append(
self.deQuantize_lowlevel(letter,
self.NMAP[biome_key]))
else:
for biome_key in self.NMAP:
_, biome_full, time, _ = re.split(r'(.*)_(\d+)', biome_key)
time = int(time)
if time in timestamp_list:
vals.append(self.deQuantize_lowlevel(letter, self.NMAP[biome_key]))
return np.median(vals)
def getNumeric_internal(self,
dict_id_reached_by_edgelabel,
bin_name,
timestamp_list=None,
t0=None,
t1=None):
"""Dequantize labels on graph non-leaf nodes
Args:
dict_id_reached_by_edgelabel (dict[int,list[str]]): dict mapping nodeid to array of letters with str type
bin_name (str): biome name in biome_timestamp format
timestamp_list (numpy.ndarray, optional): 1D array of int Time stamps to consider (Default value = None)
t0 (int, optional): Start time (Default value = None)
t1 (int, optional): End time (Default value = None)
Returns:
dict[int,float]: dict mapping nodeid to dequantized values of float type
"""
biome_prefix = '_'.join(bin_name.split('_')[:-1])
if timestamp_list is None:
if (t0 is None) and (t1 is None):
t0 = int(bin_name.split('_')[-1])-1
t1 = int(bin_name.split('_')[-1])+1
R={}
for k in dict_id_reached_by_edgelabel:
v = dict_id_reached_by_edgelabel[k]
R[k]=np.median(
np.array([self.deQuantizer(
str(x).strip(),
biome_prefix,
timestamp_list=timestamp_list,
time_start=t0,
time_end=t1) for x in v]))
return R
def getNumeric_at_leaf(self,
Probability_distribution_dict,
Sample_fraction,
timestamp_list=None,
t0=None,
t1=None):
"""Dequantize labels on graph leaf nodes to return mean and sample standard deviation of outputs
Args:
Probability_distribution_dict (dict[int, numpy.ndarray[float]]): dict mapping nodeid to probability distribution over output labels at that leaf node
Sample_fraction (dict[int,float]): dict mapping nodeids to sample fraction captured by that leaf node
timestamp_list (numpy.ndarray[int], optional): 1D array of int. Time stamps to consider (Default value = None)
t0 (int, optional): start time (Default value = None)
t1 (int, optional): end time (Default value = None)
Returns:
float,float: mean and sample standard deviation
"""
bin_name=self.TGT
biome_prefix = '_'.join(bin_name.split('_')[:-1])
if timestamp_list is None:
if (t0 is None) and (t1 is None):
t0 = int(bin_name.split('_')[-1])-1
t1 = int(bin_name.split('_')[-1])+1
# ----------------------------------------
# Q is 1D array of dequantized values
# corresponding to levels for TGT
# ----------------------------------------
Q=np.array([self.deQuantizer(
str(x).strip(),
biome_prefix,
timestamp_list=timestamp_list,
time_start=t0,
time_end=t1)
for x in self.LABELS.keys()]).reshape(
len(self.LABELS),1)
mux=0
varx=0
for k in Probability_distribution_dict:
p = Probability_distribution_dict[k]
mu_k=np.dot(p.transpose(),Q)
var_k=np.dot(p.transpose(),(Q*Q))-mu_k*mu_k
mux = mux + Sample_fraction[k]*mu_k
varx = varx + Sample_fraction[k]*var_k
return mux,np.sqrt(varx/self.total_samples)
def regularize_distribution(self,prob,l,e=0.005):
"""Regularize probability distribution
using exponential decay to map non-detailed output of a
single maximum likelihood label to a probability distribution.
Used when detailed output is not available.
Args:
prob (float): probability of single output label of type str
e (float, optional): small value to regularize return probability of 1.0 (Default value = 0.005)
l (str): output label
Returns:
numpy.ndarray: probability distribution
"""
labels=np.array(list(self.LABELS.keys()))
yy=np.ones(len(labels))*((1-prob-e)/(len(labels)-1))
yy[np.where(labels==l)[0][0]]=prob-e
dy=pd.DataFrame(yy).ewm(alpha=.8).mean()
dy=dy/dy.sum()
return dy.values
def leaf_output_on_subgraph(self,nodeset):
"""Find the mean and sample standard deviation of output
in leafnodes reachable from nodeset, along with fraction of samples
captures by this subgraph
Args:
nodeset (numpy.ndarray): 1D array of nodeids
Returns:
tuple(float,float), float: mean, sample standard deviation and sample fraction
"""
## cLeaf is the set of leaf nodes reachable from nodeset
# oLabels is the output labels for target and
# is a dict mapping leafnode id to output label letter
#
# frac and prob are sample fraction and probability of
# output label in leaf node, parsed from dotfile
#
# SUM is the total sample fraction captured by nodeset
cLeaf=[x for x in nodeset
if self.decision_tree.out_degree(x)==0
and self.decision_tree.in_degree(x)==1]
oLabels={k:str(v.split('\n')[0])
for (k,v) in self.tree_labels.items()
if k in cLeaf}
frac={k:float(v.split('\n')[2].replace('Frac:',''))
for (k,v) in self.tree_labels.items()
if k in cLeaf}
if not self.detailed_labels:
prob={k:float(v.split('\n')[1].replace('Prob:',''))
for (k,v) in self.tree_labels.items()
if k in cLeaf}
## Get a kernel based distribution here.
# self.alphabet=['A',...,'E']
# prob is regularize_distributioned to get a dict {nodeid: [p1,..,pm]}
prob__={k:self.regularize_distribution(prob[k],oLabels[k])
for k in prob}
prob=prob__
else:
prob={k:self.get_vector_from_dict(v.split('\n')[1].replace('Prob:',''))
for (k,v) in self.tree_labels.items()
if k in cLeaf}
SUM=np.array(frac.values()).sum()
## mean and sample estimate of standard deviation
mu_X,sigma_X=self.getNumeric_at_leaf(prob,frac)
return (mu_X,sigma_X),SUM
def getHypothesisSlice(self,nid):
"""Generating impact of node nid with source label prefix. Note that there can be multiple
nodes in the tree with label that match with the source label prefix.
Args:
nid (int): nodeid
Returns:
[pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html): dataframe of hypotheses fragment with xvalue, ymean and y std dev
"""
cNodes=list(nx.descendants(
self.decision_tree,nid))
nextNodes=nx.neighbors(
self.decision_tree,nid)
nextedge={}
edgeProp={}
SUM=0.
for nn in list(nextNodes):
nextedge[nn]=[str(x) for x in
self.tree_edgelabels[(
nid,nn)].split('\\n')]
if len(list(nx.descendants(
self.decision_tree,nn))) == 0:
res,s=self.leaf_output_on_subgraph([nn])
else:
res,s=self.leaf_output_on_subgraph(list(
nx.descendants(self.decision_tree,nn)))
edgeProp[nn]=res
SUM=SUM+list(s)[0]
# nextedge is dict: {nodeid nn: letter array by which child nn is reached}
num_nextedge=self.getNumeric_internal(
nextedge,
bin_name
=self.tree_labels[str(
nid)])
for (k,v) in edgeProp.items():
num_nextedge[k]=np.append(
num_nextedge[k],[v[0],v[1]])
RF=pd.DataFrame(num_nextedge)
RF.index=['x', 'y','sigmay']
return RF
def createTargetList(self,
source,
target,
time_end=None,
time_start=None):
"""Create list of decision trees available within time points in model_path
Args:
source (str): source name in biome_timestamp format
target (str): target name in biome_timestamp format
time_end (int, optional): start time (Default value = None)
time_start (int, optional): end time (Default value = None)
Returns:
list[str]: list of paths to decision tree models in qnet
"""
self.TGT = target
self.SRC = source
if self.model_path is not None:
decision_trees = glob.glob(
os.path.join(self.model_path,
target)+'*.dot')
decision_trees_ = [x for x in decision_trees
if (time_start
<= float(re.split(
r'(.*)_(\d+)',x)[-2])
<= time_end)]
else:
raise Exception('self.model_path is not set')
return decision_trees_
def get_lowlevel(self,
source,
target,
time_end=None,
time_start=None):
"""Low level evaluation call to estimate local marginal regulation \( \\alpha \)
Args:
source (str): source
target (str): target
time_end (int, optional): end time (Default value = None)
time_start (int, optional): start time (Default value = None)
Returns:
"""
self.TGT = target
self.SRC = source
decision_trees = self.createTargetList(
source,
target,
time_end,
time_start)
grad_=[]
# can we do this in parallel
for tree in decision_trees:
self.TGT = os.path.basename(tree).replace('.dot','')
gv = pgv.AGraph(tree,
strict=False,
directed=True)
self.decision_tree = nx.DiGraph(gv)
self.time_start = time_start
self.time_end = time_end
self.tree_labels = nx.get_node_attributes(
self.decision_tree,'label')
self.tree_edgelabels = nx.get_edge_attributes(
self.decision_tree,"label")
nodes_with_src=[]
for (k,v) in self.tree_labels.items():
if self.SRC in v:
if self.src_time_constraint(v,self.TGT):
nodes_with_src=nodes_with_src+[k]
if len(nodes_with_src)==0:
continue
RES=pd.concat([self.getHypothesisSlice(i).transpose()
for i in nodes_with_src])
grad,pvalue=self.getAlpha(RES)
#RES.to_csv('tmp.csv')
#if RES.index.size > 2:
# quit()
#grad=stats.linregress(
# RES.x_.values,
# RES.muy.values).slope
if np.isnan(grad):
warnings.warn(
"Nan encountered in causal inferrence")
grad=np.median(
RES.y.values)/np.median(
RES.x.values)
ns_ = re.split(r'(.*)_(\d+)', self.TGT)
self.hypotheses = pd.concat([self.hypotheses,
pd.DataFrame([{'src':self.SRC,
'tgt':''.join(ns_[:-2]),
'time_tgt':float(ns_[-2]),
'lomar':float(grad),
'pvalue':pvalue}])],
ignore_index=True)
return
def get(self,
source=None,
target=None,
time_end=None,
time_start=None):
"""Calculate local marginal regulation \( \\alpha \). When source or target is not specified, we calculate for all entities available on model path. Populates self.hypotheses.
Args:
source (str, optional): source (Default value = None)
target (str, optional): target (Default value = None)
time_end (int, optional): end time (Default value = None)
time_start (int optional): start time (Default value = None)
Returns:
"""
if source is None:
source = self.biomes
else:
if isinstance(source,str):
source=[source]
if target is None:
target = self.biomes
else:
if isinstance(target,str):
target=[target]
for tgt_biome_ in tqdm(target):
for src_biome_ in source:
if (src_biome_ == tgt_biome_) and self.no_self_loops:
continue
self.get_lowlevel(src_biome_,
tgt_biome_,
time_end=time_end,
time_start=time_start)
if self.no_self_loops:
self.hypotheses=self.hypotheses[~(self.hypotheses.src==self.hypotheses.tgt)]
return
def to_csv(self, *args, **kwargs):
"""Output csv of hypotheses inferred. Arguments are passed to pandas.DataFrame.to_csv()
Args:
*args: optional arguments to [pandas.to_csv()]( https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_csv.html)
**kwargs: optional keywords to [pandas.to_csv()]( https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_csv.html)
Returns:
"""
self.hypotheses.to_csv(*args, **kwargs)
def to_dot(self,filename='tmp.dot',
hypotheses=None,
square_mat=False):
"""Output dot file of hypotheses inferred.
Args:
filename (str, optional): filename of dot outpt (Default value = 'tmp.dot')
hypotheses ([pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html), optional): If provided use this instead of self.hypotheses (Default value = None)
square_mat (bool, optional): If True resturn a heatmap matrix as filename+'sq.csv' (Default value = False)
Returns:
"""
if hypotheses is None:
df=self.hypotheses.copy()
else:
df=hypotheses
df=df.groupby(['src','tgt']).median().reset_index()
df=df.pivot(index='src',columns='tgt',values='lomar')
df=df.fillna(0)
index = df.index.union(df.columns)
df = df.reindex(index=index, columns=index, fill_value=0)
df=df.sort_index()
if square_mat:
df.to_csv(filename.replace('.dot','sq.csv'))
G = nx.from_pandas_adjacency(df,create_using=nx.DiGraph())
from networkx.drawing.nx_agraph import write_dot
write_dot(G,filename)
return
def getAlpha(self,dataframe_x_y_sigmay,N=500):
"""Carryout regression to estimate \( \\alpha \). Given mean and variance of each y observation, we
increase the number of pints by drawing N samples from a normal distribution of mean y and std dev sigma_y.
The slope and p-value of a linear regression fit is returned
Args:
dataframe_x_y_sigmax (pandas DataFrame): columns x,y,sigmay
N (int): number of samples drawn for each x to set up regression problem
Returns:
float,float: slope and p-value of fit
"""
gf=pd.DataFrame(np.random.normal
(dataframe_x_y_sigmay.y,
dataframe_x_y_sigmay.sigmay,
[N,dataframe_x_y_sigmay.index.size]))
RES=[dataframe_x_y_sigmay[['y','x']]]
for i in gf.columns:
xf=pd.DataFrame(gf.iloc[:,i])
xf.columns=['y']
xf['x']=dataframe_x_y_sigmay.x[i]
RES.append(xf)
RES=pd.concat(RES).dropna()
if RES.x.max() == RES.x.min():
return 0,1.0
lr=stats.linregress(RES.x,RES.y)
return lr.slope,lr.pvalue
def get_vector_from_dict(self,str_alph_val):
"""Calculate a probability distribution from string representation of
alphabet : value read from decision tree models
"""
vec_alph_val=str_alph_val.split()
dict_label_float={}
for x in vec_alph_val:
y=x.split(':')
dict_label_float[y[0]]=float(y[1])
prob_dist = np.zeros(len(self.LABELS.keys()))
for i in dict_label_float:
prob_dist[self.LABELS[i]] = dict_label_float[i]
return prob_dist/prob_dist.sum()
def trim_hypothesis(self,alternate_hypothesis_dataframe):
"""Compate current hypothesis dataframe with alternate_hypothesis_dataframe
Args:
alternate_hypothesis_dataframe ([pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)): alternate dataframe
Returns:
[pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html): manipulated dataframe
"""
df=self.hypotheses.copy()
#df.set_index(['src','tgt']).merge
return df
Classes
class Hypothesis (qnet_orchestrator=None, quantizer=None, quantizer_mapfile=None, model_path=None, no_self_loops=True, causal_constraint=0, total_samples=100, detailed_labels=False)
-
Generate and analyze hypotheses from models inferred. Assume biomes_timestamp format is biome_timestamp. Also assume that dotname or decision tree name format is biome_timestamp.dot
Mathematical model of causal hypothesis:
Local Marginal Regulation Coefficient: Aiming to estimate the up-regulatory/down-regulatory influence of a source organism/entity on a target organism/entity, where regulation effects are causally localized in time (future cannot affect the past) with limited memory, and potential confounding effects from other entities/organisms are marginalized out.
Let us assume a general dependency between stochastic processes \nu,u, \omega :
\nu_t = \phi(u_{\leftarrow t},\omega_{\leftarrow t})
We estimate the sign of \alpha_t in a locally linear marginalized relationship \nu_t = \alpha_t u_{t'} + c with t' \in [ t-\delta, t] as follows:
Attributes
qnet_orchestrator
:qbiome.QnetOrchestrator
- instance of qbiome.QnetOrchestrator with trained qnet model
model_path
:str
, optional- ath to directory containing generated decision trees in dot format (Default value = None)
no_self_loops
:bool
, optional- If True do not report self-loops in hypotheses (Default value = True)
causal_constraint
:float
, optional- lag of source inputs from target effects. >= 0 is causal (Default value = 0)
total_samples
:int
, optional- total number of samples used to construct decision model (Default value = 100)
detailed_labels
:bool
, optional- if True, decision tree models have detailed output (Default value = False)
MAPNAME
:str
- path to dequantization map
Expand source code
class Hypothesis(object): """Generate and analyze hypotheses from models inferred. Assume biomes_timestamp format is biome_timestamp. Also assume that dotname or decision tree name format is biome_timestamp.dot Mathematical model of causal hypothesis: --- ``` Local Marginal Regulation Coefficient: Aiming to estimate the up-regulatory/down-regulatory influence of a source organism/entity on a target organism/entity, where regulation effects are causally localized in time (future cannot affect the past) with limited memory, and potential confounding effects from other entities/organisms are marginalized out. ``` Let us assume a general dependency between stochastic processes \(\\nu,u, \omega \) : $$ \\nu_t = \\phi(u_{\leftarrow t},\omega_{\leftarrow t}) $$ We estimate the sign of \( \\alpha_t\) in a locally linear marginalized relationship \( \\nu_t = \\alpha_t u_{t'} + c \) with \(t' \in [ t-\delta, t] \) as follows: Attributes: qnet_orchestrator (qbiome.QnetOrchestrator): instance of qbiome.QnetOrchestrator with trained qnet model model_path (str, optional): ath to directory containing generated decision trees in dot format (Default value = None) no_self_loops (bool, optional): If True do not report self-loops in hypotheses (Default value = True) causal_constraint (float, optional): lag of source inputs from target effects. >= 0 is causal (Default value = 0) total_samples (int, optional): total number of samples used to construct decision model (Default value = 100) detailed_labels (bool, optional): if True, decision tree models have detailed output (Default value = False) MAPNAME (str): path to dequantization map """ #### def __init__(self,qnet_orchestrator, # model_path=None, # no_self_loops=True, # causal_constraint=0, # total_samples=100, # detailed_labels=False): def __init__(self, qnet_orchestrator=None, quantizer=None, quantizer_mapfile=None, model_path=None, no_self_loops=True, causal_constraint=0, total_samples=100, detailed_labels=False): """ """ self.time_start = None self.time_end = None self.model_path = model_path self.qnet_orchestrator = qnet_orchestrator self.quantizer = quantizer if all(v is None for v in[qnet_orchestrator,quantizer,quantizer_mapfile]): raise Exception('Either qnet_orchestrator or quantizer or quantizer_mapfile must be provided to Hypothesis') if self.qnet_orchestrator is not None: self.quantizer = self.qnet_orchestrator.quantizer if self.quantizer is not None: self.variable_bin_map = self.quantizer.variable_bin_map else: self.variable_bin_map = np.load(quantizer_mapfile, allow_pickle=True) self.biomes_timestamp = [x for x in self.variable_bin_map.keys()] self.biomes = list(set(['_'.join(x.split('_')[:-1]) for x in self.biomes_timestamp])) self.NMAP = self.variable_bin_map if self.quantizer is not None: self.LABELS = quantizer.labels else: warnings.warn("Using manually coded labels. Provide quantizer to Hypothesis") self.LABELS = {'A':0, 'B':1, 'C':2, 'D':3, 'E':4} self.total_samples = total_samples self.detailed_labels = detailed_labels self.decision_tree = None self.tree_labels = None self.tree_edgelabels = None self.TGT = None self.SRC = None self.no_self_loops = no_self_loops self.causal_constraint = causal_constraint self.hypotheses=pd.DataFrame(columns=['src','tgt','time_tgt','lomar','pvalue']) def src_time_constraint(self,source,target): """Constrain which time points for source can be considered as inputs when computing source to target influences. If causal_constraint is None, there are no restrictions. Otherwise, only sources that are at least causal_constraint units of time prior to the target may be considered. Default is 0. Negative values are possible. Args: source (str): full source name in biome_timestamp format target (str): full target name in biome_timestamp format Returns: bool: Truth value of src acceptance """ _, source_biome_full, source_timestamp, _ \ = re.split(r'(.*)_(\d+)', source) _, target_biome_full, target_timestamp, _ \ = re.split(r'(.*)_(\d+)', target) if self.causal_constraint is not None: if (float(source_timestamp) + self.causal_constraint <= float(target_timestamp)): return True return False return True def deQuantize_lowlevel(self, letter, bin_arr): """Low level dequantizer function Args: letter (str): quantized level (str) or nan bin_arr (numpy.ndarray): 1D array of floats, which are quantization levels from trained quantizer. Returns: float: dequantized value """ if letter is np.nan or letter == 'nan': return np.nan lo = self.LABELS[letter] hi = lo + 1 val = (bin_arr[lo] + bin_arr[hi]) / 2 return val def deQuantizer(self, letter, biome_prefix, timestamp_list=None, time_start=None, time_end=None): """Dequantizer function calling low level deQuantize_lowlevel to account for the possibility of multiple timestamps being averaged over, and ability to operate with incomplete biome names. Args: letter (str): quantized level (str) or nan biome_prefix (str): prefix of biome name timestamp_list (numpy.ndarray, optional): 1D array of int time-stamps to consider. (Default value = None) time_start (int, optional): Start time. (Default value = None) time_end (int, optional): End time. (Default value = None) Returns: float: median of dequantized value """ vals = [] if timestamp_list is None: if time_start is None and self.time_end is None: # average over all for biome_key in self.NMAP: if biome_prefix in biome_key: vals.append( self.deQuantize_lowlevel(letter, self.NMAP[biome_key])) elif time_start is None: time_end = int(time_end) for biome_key in self.NMAP: _, biome_full, time, _ \ = re.split(r'(.*)_(\d+)', biome_key) time = int(time) if (time <= time_end) and (biome_prefix in biome_key): vals.append( self.deQuantize_lowlevel(letter, self.NMAP[biome_key])) elif time_end is None: time_start = int(time_start) for biome_key in self.NMAP: _, biome_full, time, _ = re.split(r'(.*)_(\d+)', biome_key) time = int(time) if (time_start <= time) and (biome_prefix in biome_key): vals.append( self.deQuantize_lowlevel(letter, self.NMAP[biome_key])) else: # both present time_start = int(time_start) time_end = int(time_end) for biome_key in self.NMAP: _, biome_full, time, _ = re.split(r'(.*)_(\d+)', biome_key) time = int(time) if (time_start <= time <= time_end) and ( biome_prefix in biome_key): vals.append( self.deQuantize_lowlevel(letter, self.NMAP[biome_key])) else: for biome_key in self.NMAP: _, biome_full, time, _ = re.split(r'(.*)_(\d+)', biome_key) time = int(time) if time in timestamp_list: vals.append(self.deQuantize_lowlevel(letter, self.NMAP[biome_key])) return np.median(vals) def getNumeric_internal(self, dict_id_reached_by_edgelabel, bin_name, timestamp_list=None, t0=None, t1=None): """Dequantize labels on graph non-leaf nodes Args: dict_id_reached_by_edgelabel (dict[int,list[str]]): dict mapping nodeid to array of letters with str type bin_name (str): biome name in biome_timestamp format timestamp_list (numpy.ndarray, optional): 1D array of int Time stamps to consider (Default value = None) t0 (int, optional): Start time (Default value = None) t1 (int, optional): End time (Default value = None) Returns: dict[int,float]: dict mapping nodeid to dequantized values of float type """ biome_prefix = '_'.join(bin_name.split('_')[:-1]) if timestamp_list is None: if (t0 is None) and (t1 is None): t0 = int(bin_name.split('_')[-1])-1 t1 = int(bin_name.split('_')[-1])+1 R={} for k in dict_id_reached_by_edgelabel: v = dict_id_reached_by_edgelabel[k] R[k]=np.median( np.array([self.deQuantizer( str(x).strip(), biome_prefix, timestamp_list=timestamp_list, time_start=t0, time_end=t1) for x in v])) return R def getNumeric_at_leaf(self, Probability_distribution_dict, Sample_fraction, timestamp_list=None, t0=None, t1=None): """Dequantize labels on graph leaf nodes to return mean and sample standard deviation of outputs Args: Probability_distribution_dict (dict[int, numpy.ndarray[float]]): dict mapping nodeid to probability distribution over output labels at that leaf node Sample_fraction (dict[int,float]): dict mapping nodeids to sample fraction captured by that leaf node timestamp_list (numpy.ndarray[int], optional): 1D array of int. Time stamps to consider (Default value = None) t0 (int, optional): start time (Default value = None) t1 (int, optional): end time (Default value = None) Returns: float,float: mean and sample standard deviation """ bin_name=self.TGT biome_prefix = '_'.join(bin_name.split('_')[:-1]) if timestamp_list is None: if (t0 is None) and (t1 is None): t0 = int(bin_name.split('_')[-1])-1 t1 = int(bin_name.split('_')[-1])+1 # ---------------------------------------- # Q is 1D array of dequantized values # corresponding to levels for TGT # ---------------------------------------- Q=np.array([self.deQuantizer( str(x).strip(), biome_prefix, timestamp_list=timestamp_list, time_start=t0, time_end=t1) for x in self.LABELS.keys()]).reshape( len(self.LABELS),1) mux=0 varx=0 for k in Probability_distribution_dict: p = Probability_distribution_dict[k] mu_k=np.dot(p.transpose(),Q) var_k=np.dot(p.transpose(),(Q*Q))-mu_k*mu_k mux = mux + Sample_fraction[k]*mu_k varx = varx + Sample_fraction[k]*var_k return mux,np.sqrt(varx/self.total_samples) def regularize_distribution(self,prob,l,e=0.005): """Regularize probability distribution using exponential decay to map non-detailed output of a single maximum likelihood label to a probability distribution. Used when detailed output is not available. Args: prob (float): probability of single output label of type str e (float, optional): small value to regularize return probability of 1.0 (Default value = 0.005) l (str): output label Returns: numpy.ndarray: probability distribution """ labels=np.array(list(self.LABELS.keys())) yy=np.ones(len(labels))*((1-prob-e)/(len(labels)-1)) yy[np.where(labels==l)[0][0]]=prob-e dy=pd.DataFrame(yy).ewm(alpha=.8).mean() dy=dy/dy.sum() return dy.values def leaf_output_on_subgraph(self,nodeset): """Find the mean and sample standard deviation of output in leafnodes reachable from nodeset, along with fraction of samples captures by this subgraph Args: nodeset (numpy.ndarray): 1D array of nodeids Returns: tuple(float,float), float: mean, sample standard deviation and sample fraction """ ## cLeaf is the set of leaf nodes reachable from nodeset # oLabels is the output labels for target and # is a dict mapping leafnode id to output label letter # # frac and prob are sample fraction and probability of # output label in leaf node, parsed from dotfile # # SUM is the total sample fraction captured by nodeset cLeaf=[x for x in nodeset if self.decision_tree.out_degree(x)==0 and self.decision_tree.in_degree(x)==1] oLabels={k:str(v.split('\n')[0]) for (k,v) in self.tree_labels.items() if k in cLeaf} frac={k:float(v.split('\n')[2].replace('Frac:','')) for (k,v) in self.tree_labels.items() if k in cLeaf} if not self.detailed_labels: prob={k:float(v.split('\n')[1].replace('Prob:','')) for (k,v) in self.tree_labels.items() if k in cLeaf} ## Get a kernel based distribution here. # self.alphabet=['A',...,'E'] # prob is regularize_distributioned to get a dict {nodeid: [p1,..,pm]} prob__={k:self.regularize_distribution(prob[k],oLabels[k]) for k in prob} prob=prob__ else: prob={k:self.get_vector_from_dict(v.split('\n')[1].replace('Prob:','')) for (k,v) in self.tree_labels.items() if k in cLeaf} SUM=np.array(frac.values()).sum() ## mean and sample estimate of standard deviation mu_X,sigma_X=self.getNumeric_at_leaf(prob,frac) return (mu_X,sigma_X),SUM def getHypothesisSlice(self,nid): """Generating impact of node nid with source label prefix. Note that there can be multiple nodes in the tree with label that match with the source label prefix. Args: nid (int): nodeid Returns: [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html): dataframe of hypotheses fragment with xvalue, ymean and y std dev """ cNodes=list(nx.descendants( self.decision_tree,nid)) nextNodes=nx.neighbors( self.decision_tree,nid) nextedge={} edgeProp={} SUM=0. for nn in list(nextNodes): nextedge[nn]=[str(x) for x in self.tree_edgelabels[( nid,nn)].split('\\n')] if len(list(nx.descendants( self.decision_tree,nn))) == 0: res,s=self.leaf_output_on_subgraph([nn]) else: res,s=self.leaf_output_on_subgraph(list( nx.descendants(self.decision_tree,nn))) edgeProp[nn]=res SUM=SUM+list(s)[0] # nextedge is dict: {nodeid nn: letter array by which child nn is reached} num_nextedge=self.getNumeric_internal( nextedge, bin_name =self.tree_labels[str( nid)]) for (k,v) in edgeProp.items(): num_nextedge[k]=np.append( num_nextedge[k],[v[0],v[1]]) RF=pd.DataFrame(num_nextedge) RF.index=['x', 'y','sigmay'] return RF def createTargetList(self, source, target, time_end=None, time_start=None): """Create list of decision trees available within time points in model_path Args: source (str): source name in biome_timestamp format target (str): target name in biome_timestamp format time_end (int, optional): start time (Default value = None) time_start (int, optional): end time (Default value = None) Returns: list[str]: list of paths to decision tree models in qnet """ self.TGT = target self.SRC = source if self.model_path is not None: decision_trees = glob.glob( os.path.join(self.model_path, target)+'*.dot') decision_trees_ = [x for x in decision_trees if (time_start <= float(re.split( r'(.*)_(\d+)',x)[-2]) <= time_end)] else: raise Exception('self.model_path is not set') return decision_trees_ def get_lowlevel(self, source, target, time_end=None, time_start=None): """Low level evaluation call to estimate local marginal regulation \( \\alpha \) Args: source (str): source target (str): target time_end (int, optional): end time (Default value = None) time_start (int, optional): start time (Default value = None) Returns: """ self.TGT = target self.SRC = source decision_trees = self.createTargetList( source, target, time_end, time_start) grad_=[] # can we do this in parallel for tree in decision_trees: self.TGT = os.path.basename(tree).replace('.dot','') gv = pgv.AGraph(tree, strict=False, directed=True) self.decision_tree = nx.DiGraph(gv) self.time_start = time_start self.time_end = time_end self.tree_labels = nx.get_node_attributes( self.decision_tree,'label') self.tree_edgelabels = nx.get_edge_attributes( self.decision_tree,"label") nodes_with_src=[] for (k,v) in self.tree_labels.items(): if self.SRC in v: if self.src_time_constraint(v,self.TGT): nodes_with_src=nodes_with_src+[k] if len(nodes_with_src)==0: continue RES=pd.concat([self.getHypothesisSlice(i).transpose() for i in nodes_with_src]) grad,pvalue=self.getAlpha(RES) #RES.to_csv('tmp.csv') #if RES.index.size > 2: # quit() #grad=stats.linregress( # RES.x_.values, # RES.muy.values).slope if np.isnan(grad): warnings.warn( "Nan encountered in causal inferrence") grad=np.median( RES.y.values)/np.median( RES.x.values) ns_ = re.split(r'(.*)_(\d+)', self.TGT) self.hypotheses = pd.concat([self.hypotheses, pd.DataFrame([{'src':self.SRC, 'tgt':''.join(ns_[:-2]), 'time_tgt':float(ns_[-2]), 'lomar':float(grad), 'pvalue':pvalue}])], ignore_index=True) return def get(self, source=None, target=None, time_end=None, time_start=None): """Calculate local marginal regulation \( \\alpha \). When source or target is not specified, we calculate for all entities available on model path. Populates self.hypotheses. Args: source (str, optional): source (Default value = None) target (str, optional): target (Default value = None) time_end (int, optional): end time (Default value = None) time_start (int optional): start time (Default value = None) Returns: """ if source is None: source = self.biomes else: if isinstance(source,str): source=[source] if target is None: target = self.biomes else: if isinstance(target,str): target=[target] for tgt_biome_ in tqdm(target): for src_biome_ in source: if (src_biome_ == tgt_biome_) and self.no_self_loops: continue self.get_lowlevel(src_biome_, tgt_biome_, time_end=time_end, time_start=time_start) if self.no_self_loops: self.hypotheses=self.hypotheses[~(self.hypotheses.src==self.hypotheses.tgt)] return def to_csv(self, *args, **kwargs): """Output csv of hypotheses inferred. Arguments are passed to pandas.DataFrame.to_csv() Args: *args: optional arguments to [pandas.to_csv()]( https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_csv.html) **kwargs: optional keywords to [pandas.to_csv()]( https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_csv.html) Returns: """ self.hypotheses.to_csv(*args, **kwargs) def to_dot(self,filename='tmp.dot', hypotheses=None, square_mat=False): """Output dot file of hypotheses inferred. Args: filename (str, optional): filename of dot outpt (Default value = 'tmp.dot') hypotheses ([pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html), optional): If provided use this instead of self.hypotheses (Default value = None) square_mat (bool, optional): If True resturn a heatmap matrix as filename+'sq.csv' (Default value = False) Returns: """ if hypotheses is None: df=self.hypotheses.copy() else: df=hypotheses df=df.groupby(['src','tgt']).median().reset_index() df=df.pivot(index='src',columns='tgt',values='lomar') df=df.fillna(0) index = df.index.union(df.columns) df = df.reindex(index=index, columns=index, fill_value=0) df=df.sort_index() if square_mat: df.to_csv(filename.replace('.dot','sq.csv')) G = nx.from_pandas_adjacency(df,create_using=nx.DiGraph()) from networkx.drawing.nx_agraph import write_dot write_dot(G,filename) return def getAlpha(self,dataframe_x_y_sigmay,N=500): """Carryout regression to estimate \( \\alpha \). Given mean and variance of each y observation, we increase the number of pints by drawing N samples from a normal distribution of mean y and std dev sigma_y. The slope and p-value of a linear regression fit is returned Args: dataframe_x_y_sigmax (pandas DataFrame): columns x,y,sigmay N (int): number of samples drawn for each x to set up regression problem Returns: float,float: slope and p-value of fit """ gf=pd.DataFrame(np.random.normal (dataframe_x_y_sigmay.y, dataframe_x_y_sigmay.sigmay, [N,dataframe_x_y_sigmay.index.size])) RES=[dataframe_x_y_sigmay[['y','x']]] for i in gf.columns: xf=pd.DataFrame(gf.iloc[:,i]) xf.columns=['y'] xf['x']=dataframe_x_y_sigmay.x[i] RES.append(xf) RES=pd.concat(RES).dropna() if RES.x.max() == RES.x.min(): return 0,1.0 lr=stats.linregress(RES.x,RES.y) return lr.slope,lr.pvalue def get_vector_from_dict(self,str_alph_val): """Calculate a probability distribution from string representation of alphabet : value read from decision tree models """ vec_alph_val=str_alph_val.split() dict_label_float={} for x in vec_alph_val: y=x.split(':') dict_label_float[y[0]]=float(y[1]) prob_dist = np.zeros(len(self.LABELS.keys())) for i in dict_label_float: prob_dist[self.LABELS[i]] = dict_label_float[i] return prob_dist/prob_dist.sum() def trim_hypothesis(self,alternate_hypothesis_dataframe): """Compate current hypothesis dataframe with alternate_hypothesis_dataframe Args: alternate_hypothesis_dataframe ([pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)): alternate dataframe Returns: [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html): manipulated dataframe """ df=self.hypotheses.copy() #df.set_index(['src','tgt']).merge return df
Methods
def createTargetList(self, source, target, time_end=None, time_start=None)
-
Create list of decision trees available within time points in model_path
Args
source
:str
- source name in biome_timestamp format
target
:str
- target name in biome_timestamp format
time_end
:int
, optional- start time (Default value = None)
time_start
:int
, optional- end time (Default value = None)
Returns
list[str]
- list of paths to decision tree models in qnet
Expand source code
def createTargetList(self, source, target, time_end=None, time_start=None): """Create list of decision trees available within time points in model_path Args: source (str): source name in biome_timestamp format target (str): target name in biome_timestamp format time_end (int, optional): start time (Default value = None) time_start (int, optional): end time (Default value = None) Returns: list[str]: list of paths to decision tree models in qnet """ self.TGT = target self.SRC = source if self.model_path is not None: decision_trees = glob.glob( os.path.join(self.model_path, target)+'*.dot') decision_trees_ = [x for x in decision_trees if (time_start <= float(re.split( r'(.*)_(\d+)',x)[-2]) <= time_end)] else: raise Exception('self.model_path is not set') return decision_trees_
def deQuantize_lowlevel(self, letter, bin_arr)
-
Low level dequantizer function
Args
letter
:str
- quantized level (str) or nan
bin_arr
:numpy.ndarray
- 1D array of floats, which are quantization levels from trained quantizer.
Returns
float
- dequantized value
Expand source code
def deQuantize_lowlevel(self, letter, bin_arr): """Low level dequantizer function Args: letter (str): quantized level (str) or nan bin_arr (numpy.ndarray): 1D array of floats, which are quantization levels from trained quantizer. Returns: float: dequantized value """ if letter is np.nan or letter == 'nan': return np.nan lo = self.LABELS[letter] hi = lo + 1 val = (bin_arr[lo] + bin_arr[hi]) / 2 return val
def deQuantizer(self, letter, biome_prefix, timestamp_list=None, time_start=None, time_end=None)
-
Dequantizer function calling low level deQuantize_lowlevel to account for the possibility of multiple timestamps being averaged over, and ability to operate with incomplete biome names.
Args
letter
:str
- quantized level (str) or nan
biome_prefix
:str
- prefix of biome name
timestamp_list
:numpy.ndarray
, optional- 1D array of int time-stamps to consider. (Default value = None)
time_start
:int
, optional- Start time. (Default value = None)
time_end
:int
, optional- End time. (Default value = None)
Returns
float
- median of dequantized value
Expand source code
def deQuantizer(self, letter, biome_prefix, timestamp_list=None, time_start=None, time_end=None): """Dequantizer function calling low level deQuantize_lowlevel to account for the possibility of multiple timestamps being averaged over, and ability to operate with incomplete biome names. Args: letter (str): quantized level (str) or nan biome_prefix (str): prefix of biome name timestamp_list (numpy.ndarray, optional): 1D array of int time-stamps to consider. (Default value = None) time_start (int, optional): Start time. (Default value = None) time_end (int, optional): End time. (Default value = None) Returns: float: median of dequantized value """ vals = [] if timestamp_list is None: if time_start is None and self.time_end is None: # average over all for biome_key in self.NMAP: if biome_prefix in biome_key: vals.append( self.deQuantize_lowlevel(letter, self.NMAP[biome_key])) elif time_start is None: time_end = int(time_end) for biome_key in self.NMAP: _, biome_full, time, _ \ = re.split(r'(.*)_(\d+)', biome_key) time = int(time) if (time <= time_end) and (biome_prefix in biome_key): vals.append( self.deQuantize_lowlevel(letter, self.NMAP[biome_key])) elif time_end is None: time_start = int(time_start) for biome_key in self.NMAP: _, biome_full, time, _ = re.split(r'(.*)_(\d+)', biome_key) time = int(time) if (time_start <= time) and (biome_prefix in biome_key): vals.append( self.deQuantize_lowlevel(letter, self.NMAP[biome_key])) else: # both present time_start = int(time_start) time_end = int(time_end) for biome_key in self.NMAP: _, biome_full, time, _ = re.split(r'(.*)_(\d+)', biome_key) time = int(time) if (time_start <= time <= time_end) and ( biome_prefix in biome_key): vals.append( self.deQuantize_lowlevel(letter, self.NMAP[biome_key])) else: for biome_key in self.NMAP: _, biome_full, time, _ = re.split(r'(.*)_(\d+)', biome_key) time = int(time) if time in timestamp_list: vals.append(self.deQuantize_lowlevel(letter, self.NMAP[biome_key])) return np.median(vals)
def get(self, source=None, target=None, time_end=None, time_start=None)
-
Calculate local marginal regulation \alpha . When source or target is not specified, we calculate for all entities available on model path. Populates self.hypotheses.
Args
source
:str
, optional- source (Default value = None)
target
:str
, optional- target (Default value = None)
time_end
:int
, optional- end time (Default value = None)
time_start
:int optional
- start time (Default value = None)
Returns:
Expand source code
def get(self, source=None, target=None, time_end=None, time_start=None): """Calculate local marginal regulation \( \\alpha \). When source or target is not specified, we calculate for all entities available on model path. Populates self.hypotheses. Args: source (str, optional): source (Default value = None) target (str, optional): target (Default value = None) time_end (int, optional): end time (Default value = None) time_start (int optional): start time (Default value = None) Returns: """ if source is None: source = self.biomes else: if isinstance(source,str): source=[source] if target is None: target = self.biomes else: if isinstance(target,str): target=[target] for tgt_biome_ in tqdm(target): for src_biome_ in source: if (src_biome_ == tgt_biome_) and self.no_self_loops: continue self.get_lowlevel(src_biome_, tgt_biome_, time_end=time_end, time_start=time_start) if self.no_self_loops: self.hypotheses=self.hypotheses[~(self.hypotheses.src==self.hypotheses.tgt)] return
def getAlpha(self, dataframe_x_y_sigmay, N=500)
-
Carryout regression to estimate \alpha . Given mean and variance of each y observation, we increase the number of pints by drawing N samples from a normal distribution of mean y and std dev sigma_y. The slope and p-value of a linear regression fit is returned
Args
dataframe_x_y_sigmax
:pandas DataFrame
- columns x,y,sigmay
N
:int
- number of samples drawn for each x to set up regression problem
Returns
float,float
- slope and p-value of fit
Expand source code
def getAlpha(self,dataframe_x_y_sigmay,N=500): """Carryout regression to estimate \( \\alpha \). Given mean and variance of each y observation, we increase the number of pints by drawing N samples from a normal distribution of mean y and std dev sigma_y. The slope and p-value of a linear regression fit is returned Args: dataframe_x_y_sigmax (pandas DataFrame): columns x,y,sigmay N (int): number of samples drawn for each x to set up regression problem Returns: float,float: slope and p-value of fit """ gf=pd.DataFrame(np.random.normal (dataframe_x_y_sigmay.y, dataframe_x_y_sigmay.sigmay, [N,dataframe_x_y_sigmay.index.size])) RES=[dataframe_x_y_sigmay[['y','x']]] for i in gf.columns: xf=pd.DataFrame(gf.iloc[:,i]) xf.columns=['y'] xf['x']=dataframe_x_y_sigmay.x[i] RES.append(xf) RES=pd.concat(RES).dropna() if RES.x.max() == RES.x.min(): return 0,1.0 lr=stats.linregress(RES.x,RES.y) return lr.slope,lr.pvalue
def getHypothesisSlice(self, nid)
-
Generating impact of node nid with source label prefix. Note that there can be multiple nodes in the tree with label that match with the source label prefix.
Args
nid
:int
- nodeid
Returns
pandas.DataFrame: dataframe of hypotheses fragment with xvalue, ymean and y std dev
Expand source code
def getHypothesisSlice(self,nid): """Generating impact of node nid with source label prefix. Note that there can be multiple nodes in the tree with label that match with the source label prefix. Args: nid (int): nodeid Returns: [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html): dataframe of hypotheses fragment with xvalue, ymean and y std dev """ cNodes=list(nx.descendants( self.decision_tree,nid)) nextNodes=nx.neighbors( self.decision_tree,nid) nextedge={} edgeProp={} SUM=0. for nn in list(nextNodes): nextedge[nn]=[str(x) for x in self.tree_edgelabels[( nid,nn)].split('\\n')] if len(list(nx.descendants( self.decision_tree,nn))) == 0: res,s=self.leaf_output_on_subgraph([nn]) else: res,s=self.leaf_output_on_subgraph(list( nx.descendants(self.decision_tree,nn))) edgeProp[nn]=res SUM=SUM+list(s)[0] # nextedge is dict: {nodeid nn: letter array by which child nn is reached} num_nextedge=self.getNumeric_internal( nextedge, bin_name =self.tree_labels[str( nid)]) for (k,v) in edgeProp.items(): num_nextedge[k]=np.append( num_nextedge[k],[v[0],v[1]]) RF=pd.DataFrame(num_nextedge) RF.index=['x', 'y','sigmay'] return RF
def getNumeric_at_leaf(self, Probability_distribution_dict, Sample_fraction, timestamp_list=None, t0=None, t1=None)
-
Dequantize labels on graph leaf nodes to return mean and sample standard deviation of outputs
Args
Probability_distribution_dict
:dict[int, numpy.ndarray[float]]
- dict mapping nodeid to probability distribution over output labels at that leaf node
Sample_fraction
:dict[int,float]
- dict mapping nodeids to sample fraction captured by that leaf node
timestamp_list
:numpy.ndarray[int]
, optional- 1D array of int. Time stamps to consider (Default value = None)
t0
:int
, optional- start time (Default value = None)
t1
:int
, optional- end time (Default value = None)
Returns
float,float
- mean and sample standard deviation
Expand source code
def getNumeric_at_leaf(self, Probability_distribution_dict, Sample_fraction, timestamp_list=None, t0=None, t1=None): """Dequantize labels on graph leaf nodes to return mean and sample standard deviation of outputs Args: Probability_distribution_dict (dict[int, numpy.ndarray[float]]): dict mapping nodeid to probability distribution over output labels at that leaf node Sample_fraction (dict[int,float]): dict mapping nodeids to sample fraction captured by that leaf node timestamp_list (numpy.ndarray[int], optional): 1D array of int. Time stamps to consider (Default value = None) t0 (int, optional): start time (Default value = None) t1 (int, optional): end time (Default value = None) Returns: float,float: mean and sample standard deviation """ bin_name=self.TGT biome_prefix = '_'.join(bin_name.split('_')[:-1]) if timestamp_list is None: if (t0 is None) and (t1 is None): t0 = int(bin_name.split('_')[-1])-1 t1 = int(bin_name.split('_')[-1])+1 # ---------------------------------------- # Q is 1D array of dequantized values # corresponding to levels for TGT # ---------------------------------------- Q=np.array([self.deQuantizer( str(x).strip(), biome_prefix, timestamp_list=timestamp_list, time_start=t0, time_end=t1) for x in self.LABELS.keys()]).reshape( len(self.LABELS),1) mux=0 varx=0 for k in Probability_distribution_dict: p = Probability_distribution_dict[k] mu_k=np.dot(p.transpose(),Q) var_k=np.dot(p.transpose(),(Q*Q))-mu_k*mu_k mux = mux + Sample_fraction[k]*mu_k varx = varx + Sample_fraction[k]*var_k return mux,np.sqrt(varx/self.total_samples)
def getNumeric_internal(self, dict_id_reached_by_edgelabel, bin_name, timestamp_list=None, t0=None, t1=None)
-
Dequantize labels on graph non-leaf nodes
Args
dict_id_reached_by_edgelabel
:dict[int,list[str]]
- dict mapping nodeid to array of letters with str type
bin_name
:str
- biome name in biome_timestamp format
timestamp_list
:numpy.ndarray
, optional- 1D array of int Time stamps to consider (Default value = None)
t0
:int
, optional- Start time (Default value = None)
t1
:int
, optional- End time (Default value = None)
Returns
dict[int,float]
- dict mapping nodeid to dequantized values of float type
Expand source code
def getNumeric_internal(self, dict_id_reached_by_edgelabel, bin_name, timestamp_list=None, t0=None, t1=None): """Dequantize labels on graph non-leaf nodes Args: dict_id_reached_by_edgelabel (dict[int,list[str]]): dict mapping nodeid to array of letters with str type bin_name (str): biome name in biome_timestamp format timestamp_list (numpy.ndarray, optional): 1D array of int Time stamps to consider (Default value = None) t0 (int, optional): Start time (Default value = None) t1 (int, optional): End time (Default value = None) Returns: dict[int,float]: dict mapping nodeid to dequantized values of float type """ biome_prefix = '_'.join(bin_name.split('_')[:-1]) if timestamp_list is None: if (t0 is None) and (t1 is None): t0 = int(bin_name.split('_')[-1])-1 t1 = int(bin_name.split('_')[-1])+1 R={} for k in dict_id_reached_by_edgelabel: v = dict_id_reached_by_edgelabel[k] R[k]=np.median( np.array([self.deQuantizer( str(x).strip(), biome_prefix, timestamp_list=timestamp_list, time_start=t0, time_end=t1) for x in v])) return R
def get_lowlevel(self, source, target, time_end=None, time_start=None)
-
Low level evaluation call to estimate local marginal regulation \alpha
Args
source
:str
- source
target
:str
- target
time_end
:int
, optional- end time (Default value = None)
time_start
:int
, optional- start time (Default value = None)
Returns:
Expand source code
def get_lowlevel(self, source, target, time_end=None, time_start=None): """Low level evaluation call to estimate local marginal regulation \( \\alpha \) Args: source (str): source target (str): target time_end (int, optional): end time (Default value = None) time_start (int, optional): start time (Default value = None) Returns: """ self.TGT = target self.SRC = source decision_trees = self.createTargetList( source, target, time_end, time_start) grad_=[] # can we do this in parallel for tree in decision_trees: self.TGT = os.path.basename(tree).replace('.dot','') gv = pgv.AGraph(tree, strict=False, directed=True) self.decision_tree = nx.DiGraph(gv) self.time_start = time_start self.time_end = time_end self.tree_labels = nx.get_node_attributes( self.decision_tree,'label') self.tree_edgelabels = nx.get_edge_attributes( self.decision_tree,"label") nodes_with_src=[] for (k,v) in self.tree_labels.items(): if self.SRC in v: if self.src_time_constraint(v,self.TGT): nodes_with_src=nodes_with_src+[k] if len(nodes_with_src)==0: continue RES=pd.concat([self.getHypothesisSlice(i).transpose() for i in nodes_with_src]) grad,pvalue=self.getAlpha(RES) #RES.to_csv('tmp.csv') #if RES.index.size > 2: # quit() #grad=stats.linregress( # RES.x_.values, # RES.muy.values).slope if np.isnan(grad): warnings.warn( "Nan encountered in causal inferrence") grad=np.median( RES.y.values)/np.median( RES.x.values) ns_ = re.split(r'(.*)_(\d+)', self.TGT) self.hypotheses = pd.concat([self.hypotheses, pd.DataFrame([{'src':self.SRC, 'tgt':''.join(ns_[:-2]), 'time_tgt':float(ns_[-2]), 'lomar':float(grad), 'pvalue':pvalue}])], ignore_index=True) return
def get_vector_from_dict(self, str_alph_val)
-
Calculate a probability distribution from string representation of alphabet : value read from decision tree models
Expand source code
def get_vector_from_dict(self,str_alph_val): """Calculate a probability distribution from string representation of alphabet : value read from decision tree models """ vec_alph_val=str_alph_val.split() dict_label_float={} for x in vec_alph_val: y=x.split(':') dict_label_float[y[0]]=float(y[1]) prob_dist = np.zeros(len(self.LABELS.keys())) for i in dict_label_float: prob_dist[self.LABELS[i]] = dict_label_float[i] return prob_dist/prob_dist.sum()
def leaf_output_on_subgraph(self, nodeset)
-
Find the mean and sample standard deviation of output in leafnodes reachable from nodeset, along with fraction of samples captures by this subgraph
Args
nodeset
:numpy.ndarray
- 1D array of nodeids
Returns
tuple(float,float), float: mean, sample standard deviation and sample fraction
Expand source code
def leaf_output_on_subgraph(self,nodeset): """Find the mean and sample standard deviation of output in leafnodes reachable from nodeset, along with fraction of samples captures by this subgraph Args: nodeset (numpy.ndarray): 1D array of nodeids Returns: tuple(float,float), float: mean, sample standard deviation and sample fraction """ ## cLeaf is the set of leaf nodes reachable from nodeset # oLabels is the output labels for target and # is a dict mapping leafnode id to output label letter # # frac and prob are sample fraction and probability of # output label in leaf node, parsed from dotfile # # SUM is the total sample fraction captured by nodeset cLeaf=[x for x in nodeset if self.decision_tree.out_degree(x)==0 and self.decision_tree.in_degree(x)==1] oLabels={k:str(v.split('\n')[0]) for (k,v) in self.tree_labels.items() if k in cLeaf} frac={k:float(v.split('\n')[2].replace('Frac:','')) for (k,v) in self.tree_labels.items() if k in cLeaf} if not self.detailed_labels: prob={k:float(v.split('\n')[1].replace('Prob:','')) for (k,v) in self.tree_labels.items() if k in cLeaf} ## Get a kernel based distribution here. # self.alphabet=['A',...,'E'] # prob is regularize_distributioned to get a dict {nodeid: [p1,..,pm]} prob__={k:self.regularize_distribution(prob[k],oLabels[k]) for k in prob} prob=prob__ else: prob={k:self.get_vector_from_dict(v.split('\n')[1].replace('Prob:','')) for (k,v) in self.tree_labels.items() if k in cLeaf} SUM=np.array(frac.values()).sum() ## mean and sample estimate of standard deviation mu_X,sigma_X=self.getNumeric_at_leaf(prob,frac) return (mu_X,sigma_X),SUM
def regularize_distribution(self, prob, l, e=0.005)
-
Regularize probability distribution using exponential decay to map non-detailed output of a single maximum likelihood label to a probability distribution. Used when detailed output is not available.
Args
prob
:float
- probability of single output label of type str
e
:float
, optional- small value to regularize return probability of 1.0 (Default value = 0.005)
l
:str
- output label
Returns
numpy.ndarray
- probability distribution
Expand source code
def regularize_distribution(self,prob,l,e=0.005): """Regularize probability distribution using exponential decay to map non-detailed output of a single maximum likelihood label to a probability distribution. Used when detailed output is not available. Args: prob (float): probability of single output label of type str e (float, optional): small value to regularize return probability of 1.0 (Default value = 0.005) l (str): output label Returns: numpy.ndarray: probability distribution """ labels=np.array(list(self.LABELS.keys())) yy=np.ones(len(labels))*((1-prob-e)/(len(labels)-1)) yy[np.where(labels==l)[0][0]]=prob-e dy=pd.DataFrame(yy).ewm(alpha=.8).mean() dy=dy/dy.sum() return dy.values
def src_time_constraint(self, source, target)
-
Constrain which time points for source can be considered as inputs when computing source to target influences. If causal_constraint is None, there are no restrictions. Otherwise, only sources that are at least causal_constraint units of time prior to the target may be considered. Default is 0. Negative values are possible.
Args
source
:str
- full source name in biome_timestamp format
target
:str
- full target name in biome_timestamp format
Returns
bool
- Truth value of src acceptance
Expand source code
def src_time_constraint(self,source,target): """Constrain which time points for source can be considered as inputs when computing source to target influences. If causal_constraint is None, there are no restrictions. Otherwise, only sources that are at least causal_constraint units of time prior to the target may be considered. Default is 0. Negative values are possible. Args: source (str): full source name in biome_timestamp format target (str): full target name in biome_timestamp format Returns: bool: Truth value of src acceptance """ _, source_biome_full, source_timestamp, _ \ = re.split(r'(.*)_(\d+)', source) _, target_biome_full, target_timestamp, _ \ = re.split(r'(.*)_(\d+)', target) if self.causal_constraint is not None: if (float(source_timestamp) + self.causal_constraint <= float(target_timestamp)): return True return False return True
def to_csv(self, *args, **kwargs)
-
Output csv of hypotheses inferred. Arguments are passed to pandas.DataFrame.to_csv()
Args
*args
- optional arguments to pandas.to_csv()
**kwargs
- optional keywords to pandas.to_csv()
Returns:
Expand source code
def to_csv(self, *args, **kwargs): """Output csv of hypotheses inferred. Arguments are passed to pandas.DataFrame.to_csv() Args: *args: optional arguments to [pandas.to_csv()]( https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_csv.html) **kwargs: optional keywords to [pandas.to_csv()]( https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_csv.html) Returns: """ self.hypotheses.to_csv(*args, **kwargs)
def to_dot(self, filename='tmp.dot', hypotheses=None, square_mat=False)
-
Output dot file of hypotheses inferred.
Args
filename
:str
, optional- filename of dot outpt (Default value = 'tmp.dot')
- hypotheses (pandas.DataFrame, optional): If provided use this instead of self.hypotheses (Default value = None)
square_mat
:bool
, optional- If True resturn a heatmap matrix as filename+'sq.csv' (Default value = False)
Returns:
Expand source code
def to_dot(self,filename='tmp.dot', hypotheses=None, square_mat=False): """Output dot file of hypotheses inferred. Args: filename (str, optional): filename of dot outpt (Default value = 'tmp.dot') hypotheses ([pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html), optional): If provided use this instead of self.hypotheses (Default value = None) square_mat (bool, optional): If True resturn a heatmap matrix as filename+'sq.csv' (Default value = False) Returns: """ if hypotheses is None: df=self.hypotheses.copy() else: df=hypotheses df=df.groupby(['src','tgt']).median().reset_index() df=df.pivot(index='src',columns='tgt',values='lomar') df=df.fillna(0) index = df.index.union(df.columns) df = df.reindex(index=index, columns=index, fill_value=0) df=df.sort_index() if square_mat: df.to_csv(filename.replace('.dot','sq.csv')) G = nx.from_pandas_adjacency(df,create_using=nx.DiGraph()) from networkx.drawing.nx_agraph import write_dot write_dot(G,filename) return
def trim_hypothesis(self, alternate_hypothesis_dataframe)
-
Compate current hypothesis dataframe with alternate_hypothesis_dataframe
Args
alternate_hypothesis_dataframe (pandas.DataFrame): alternate dataframe
Returns
pandas.DataFrame: manipulated dataframe
Expand source code
def trim_hypothesis(self,alternate_hypothesis_dataframe): """Compate current hypothesis dataframe with alternate_hypothesis_dataframe Args: alternate_hypothesis_dataframe ([pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)): alternate dataframe Returns: [pandas.DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html): manipulated dataframe """ df=self.hypotheses.copy() #df.set_index(['src','tgt']).merge return df