Module qbiome.qnet_orchestrator
Expand source code
import os
import re
import numpy as np
from quasinet import qnet
class QnetOrchestrator:
"""Manages utilities related to the Quasinet model, for example, training, saving, loading, and different methods of predicting
"""
def __init__(self, quantizer):
"""Initialization
Args:
quantizer (qbiome.Quantizer): an instance with populated quantization map and other states
"""
self.model = None
"""qnet model"""
self.quantizer = quantizer
def train_qnet(self, features, data, alpha, min_samples_split, time_column_name='week', out_fname=None, PACK_QUANTIZER=True):
"""Train the qnet model. If `out_fname` is present, also saves the model. The inputs `features, data` are produced by `Quantizer.get_qnet_inputs`. See [Quasinet documentations](https://zeroknowledgediscovery.github.io/quasinet/build/html/quasinet.html#module-quasinet.qnet) for the other parameters
Args:
features (list): list: a list of feature names, ex. `['Acidobacteriota_35', 'Actinobacteriota_1', 'Actinobacteriota_2']`
data (numpy.ndarray): 2D matrix of label strings
alpha (float): threshold value for selecting feature with permutation tests. Smaller values correspond to shallower trees
min_samples_split (int): minimum samples required for a split
out_fname (str, optional): save file name. Defaults to None.
PACK_QUANTIZER (bool, optional): pack quantizer within qnet data structure with attribute "quantizer"
"""
self.model = qnet.Qnet(feature_names=features, alpha=alpha,
min_samples_split = min_samples_split, n_jobs=-1)
self.model.fit(data)
if PACK_QUANTIZER:
self.model.quantizer = self.quantizer
self.model.train_data = data
if out_fname:
self.save_qnet(out_fname)
def get_max_timestamp(self):
"""Return the maximum timestamp in qnet model's feature names
Returns:
int: max timestamp
"""
assert self.model is not None
pattern = r'[\D|\d]+_(\d+)'
timestamps = [
int(re.findall(pattern, feature)[0]) for feature
in self.model.feature_names
]
return max(timestamps)
def load_qnet(self, in_fname, GZIP=False):
"""Load `self.model` from file
Args:
in_fname (str): input file containing a saved qnet model
gzip (bool): file is gunzipped, and must be decompressed (default: False)
"""
if GZIP:
import gzip, shutil
with gzip.open(in_fname, 'r') as f_in, open(in_fname.replace('.gz',''), 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
self.model = qnet.load_qnet(in_fname)
if GZIP:
import os
os.remove(in_fname.replace('.gz',''))
def save_qnet(self, out_fname, GZIP=False):
"""Save `self.model` to file
Args:
out_fname (str): save file name
gzip (bool): gzip outfile if True (default: False)
"""
assert self.model is not None
qnet.save_qnet(self.model, f=out_fname, low_mem=False)
if GZIP:
import gzip
def gzip_file(src_path, dst_path):
with open(src_path, 'rb') as src, gzip.open(dst_path, 'wb') as dst:
for chunk in iter(lambda: src.read(4096), b""):
dst.write(chunk)
gzip_file(out_fname,out_fname+'.gz')
def export_qnet_tree_dotfiles(self, out_dirname):
"""Generate tree dotfiles for each feature of the model
Args:
out_dirname (str): the output directory, make one if doesn't exist
"""
assert self.model is not None
if not os.path.exists(out_dirname):
os.mkdir(out_dirname)
for idx, feature_name in enumerate(self.model.feature_names):
qnet.export_qnet_tree(self.model, idx,
os.path.join(out_dirname, '{}.dot'.format(feature_name)),
outformat='graphviz', detailed_output=True)
# the following functions can only be called when
# self.model is not None
# TODO: add tqdm for progress tracking
def predict_value_given_distributions(self, seq, idx, distribs, n_samples=100):
"""Predict a numeric value for the specified index of the label sequence, given the label distributions generated by the qnet. Sample `n_samples` times from the predictions, dequantize the sampled labels and take average
Args:
seq (numpy.ndarray): 1D array of label strings
idx (int): index into the input `seq`
distribs (list): Produced by `quasinet.qnet.Qnet.predict_distributions(seq)`. See [Quasinet documentations](https://zeroknowledgediscovery.github.io/quasinet/build/html/quasinet.html#quasinet.qnet.Qnet.predict_distributions)
n_samples (int, optional): the number of times to sample from qnet predictions for one masked entry. Defaults to 100.
Returns:
float: predicted numeric value
"""
distrib_dict = distribs[idx]
bin_arr = self.quantizer.get_bin_array_of_index(idx)
# sample n_samples
samples = np.empty(n_samples)
for i in range(n_samples):
sampled = np.random.choice(
list(distrib_dict.keys()),
p=list(distrib_dict.values()))
samples[i] = self.quantizer.dequantize_label(sampled, bin_arr)
ret = samples.mean()
return ret
def predict_sequence(self, seq, indices_to_predict=None, n_samples=100):
"""Convert the label sequence into a numeric one by filling qnet predictions for masked entries (represented as an empty string) or simply dequantizing the non-masked entries
Args:
seq (numpy.ndarray): 1D array of label strings
indices_to_predict (list, optional): a list of indices at which masks have been applied, for which we need to make qnet predictions. Defaults to None.
n_samples (int, optional): the number of times to sample from qnet predictions for one masked entry. Defaults to 100.
Returns:
numpy.ndarray: 1D array of floats
"""
predicted = np.empty(seq.shape)
distribs = self.model.predict_distributions(seq)
if not indices_to_predict: # predict everything in the sequence
indices_to_predict = range(len(seq))
for idx in indices_to_predict:
label = seq[idx]
if label == '': # this is masked, predict
num = self.predict_value_given_distributions(seq, idx, distribs, n_samples=n_samples)
else: # not masked, simpily dequantize
bin_arr = self.quantizer.get_bin_array_of_index(idx)
num = self.quantizer.dequantize_label(label, bin_arr)
predicted[idx] = num
return predicted
# sequantial prediction, i.e., the predicted sequence remain in labels for the iterative process
def predict_sequence_at_week(self, seq, week, n_samples=100):
"""For a given week, predict all `{biome}_{week}` columns. Note that the return array consists of label strings instead of floats, as it is just an intermediate state and will be used for sequential prediction.
Args:
seq (numpy.ndarray): 1D array of label strings
week (int): the week number
n_samples (int, optional): the number of times to sample from qnet predictions for one masked entry. Defaults to 100.
Returns:
numpy.ndarray: 1D array of label strings
"""
predicted = seq.copy()
distribs = self.model.predict_distributions(seq)
col_indices = np.where(self.model.feature_names.str.endswith('_'+str(week)))[0]
for idx in col_indices:
# predict
num = self.predict_value_given_distributions(seq, idx, distribs, n_samples=n_samples)
# re-quantize qnet-predicted numeric values
bin_arr = self.quantizer.get_bin_array_of_index(idx)
label = self.quantizer.quantize_value(num, bin_arr)
# fill the spot in masked for sequential feeding into qnet
predicted[idx] = label
return predicted
def _mask_at_week(self, seq, week, fill_value=''):
"""Mask out all biome observations at the specified week
Args:
seq (numpy.ndarray): 1D array of label strings
week (int): fill in empty strings for all biome_week column
fill_value (str, optional): mask value, can be empty string, None, np.nan etc. Defaults to ''.
Returns:
[type]: [description]
"""
masked = seq.copy()
col_indices = np.where(self.model.feature_names.str.contains(str(week)))[0]
for idx in col_indices:
masked[idx] = fill_value
return masked
def mask_sequence_at_weeks(self, seq, start_week, end_week, fill_value=''):
"""Mask out all biome observations between [start_week, end_week]
Args:
seq (numpy.ndarray): 1D array of label strings
start_week (int): start masking from this week
end_week (int): end masking after this week
fill_value (str, optional): mask value, can be empty string, None, np.nan etc. Defaults to ''.
Returns:
numpy.ndarray: 1D array of label strings
"""
masked = seq.copy()
for week in range(start_week, end_week + 1):
col_indices = np.where(self.model.feature_names.str.contains(str(week)))[0]
for idx in col_indices:
masked[idx] = fill_value
return masked
def predict_sequentially_by_week(self, seq, start_week, end_week, n_samples=100):
"""Use qnet to generate sequential, iterative prediction of the sequence from `start_week` to `end_week`. This is accomplished by masking the current week to predict, use the qnet to predict a label for this masked entry (after which the qnet can update its prediction for the label distributions), masking the next week, and repeat.
Args:
seq (numpy.ndarray): 1D array of label strings
start_week (int): start predicting from this week
end_week (int): end predicting after this week
n_samples (int, optional): the number of times to sample from qnet predictions for one masked entry. Defaults to 100.
Returns:
numpy.ndarray: 1D array of floats
"""
# apply mask
masked = self.mask_sequence_at_weeks(seq, start_week, end_week)
# feed into qnet sequentially, filling one week every iteration
for week in range(start_week, end_week + 1):
masked = self.predict_sequence_at_week(masked, week, n_samples=n_samples)
# to generate a numeric seq result, dequantize all the labels
ret = self.quantizer.dequantize_sequence(masked)
return ret
Classes
class QnetOrchestrator (quantizer)
-
Manages utilities related to the Quasinet model, for example, training, saving, loading, and different methods of predicting
Initialization
Args
quantizer
:qbiome.Quantizer
- an instance with populated quantization map and other states
Expand source code
class QnetOrchestrator: """Manages utilities related to the Quasinet model, for example, training, saving, loading, and different methods of predicting """ def __init__(self, quantizer): """Initialization Args: quantizer (qbiome.Quantizer): an instance with populated quantization map and other states """ self.model = None """qnet model""" self.quantizer = quantizer def train_qnet(self, features, data, alpha, min_samples_split, time_column_name='week', out_fname=None, PACK_QUANTIZER=True): """Train the qnet model. If `out_fname` is present, also saves the model. The inputs `features, data` are produced by `Quantizer.get_qnet_inputs`. See [Quasinet documentations](https://zeroknowledgediscovery.github.io/quasinet/build/html/quasinet.html#module-quasinet.qnet) for the other parameters Args: features (list): list: a list of feature names, ex. `['Acidobacteriota_35', 'Actinobacteriota_1', 'Actinobacteriota_2']` data (numpy.ndarray): 2D matrix of label strings alpha (float): threshold value for selecting feature with permutation tests. Smaller values correspond to shallower trees min_samples_split (int): minimum samples required for a split out_fname (str, optional): save file name. Defaults to None. PACK_QUANTIZER (bool, optional): pack quantizer within qnet data structure with attribute "quantizer" """ self.model = qnet.Qnet(feature_names=features, alpha=alpha, min_samples_split = min_samples_split, n_jobs=-1) self.model.fit(data) if PACK_QUANTIZER: self.model.quantizer = self.quantizer self.model.train_data = data if out_fname: self.save_qnet(out_fname) def get_max_timestamp(self): """Return the maximum timestamp in qnet model's feature names Returns: int: max timestamp """ assert self.model is not None pattern = r'[\D|\d]+_(\d+)' timestamps = [ int(re.findall(pattern, feature)[0]) for feature in self.model.feature_names ] return max(timestamps) def load_qnet(self, in_fname, GZIP=False): """Load `self.model` from file Args: in_fname (str): input file containing a saved qnet model gzip (bool): file is gunzipped, and must be decompressed (default: False) """ if GZIP: import gzip, shutil with gzip.open(in_fname, 'r') as f_in, open(in_fname.replace('.gz',''), 'wb') as f_out: shutil.copyfileobj(f_in, f_out) self.model = qnet.load_qnet(in_fname) if GZIP: import os os.remove(in_fname.replace('.gz','')) def save_qnet(self, out_fname, GZIP=False): """Save `self.model` to file Args: out_fname (str): save file name gzip (bool): gzip outfile if True (default: False) """ assert self.model is not None qnet.save_qnet(self.model, f=out_fname, low_mem=False) if GZIP: import gzip def gzip_file(src_path, dst_path): with open(src_path, 'rb') as src, gzip.open(dst_path, 'wb') as dst: for chunk in iter(lambda: src.read(4096), b""): dst.write(chunk) gzip_file(out_fname,out_fname+'.gz') def export_qnet_tree_dotfiles(self, out_dirname): """Generate tree dotfiles for each feature of the model Args: out_dirname (str): the output directory, make one if doesn't exist """ assert self.model is not None if not os.path.exists(out_dirname): os.mkdir(out_dirname) for idx, feature_name in enumerate(self.model.feature_names): qnet.export_qnet_tree(self.model, idx, os.path.join(out_dirname, '{}.dot'.format(feature_name)), outformat='graphviz', detailed_output=True) # the following functions can only be called when # self.model is not None # TODO: add tqdm for progress tracking def predict_value_given_distributions(self, seq, idx, distribs, n_samples=100): """Predict a numeric value for the specified index of the label sequence, given the label distributions generated by the qnet. Sample `n_samples` times from the predictions, dequantize the sampled labels and take average Args: seq (numpy.ndarray): 1D array of label strings idx (int): index into the input `seq` distribs (list): Produced by `quasinet.qnet.Qnet.predict_distributions(seq)`. See [Quasinet documentations](https://zeroknowledgediscovery.github.io/quasinet/build/html/quasinet.html#quasinet.qnet.Qnet.predict_distributions) n_samples (int, optional): the number of times to sample from qnet predictions for one masked entry. Defaults to 100. Returns: float: predicted numeric value """ distrib_dict = distribs[idx] bin_arr = self.quantizer.get_bin_array_of_index(idx) # sample n_samples samples = np.empty(n_samples) for i in range(n_samples): sampled = np.random.choice( list(distrib_dict.keys()), p=list(distrib_dict.values())) samples[i] = self.quantizer.dequantize_label(sampled, bin_arr) ret = samples.mean() return ret def predict_sequence(self, seq, indices_to_predict=None, n_samples=100): """Convert the label sequence into a numeric one by filling qnet predictions for masked entries (represented as an empty string) or simply dequantizing the non-masked entries Args: seq (numpy.ndarray): 1D array of label strings indices_to_predict (list, optional): a list of indices at which masks have been applied, for which we need to make qnet predictions. Defaults to None. n_samples (int, optional): the number of times to sample from qnet predictions for one masked entry. Defaults to 100. Returns: numpy.ndarray: 1D array of floats """ predicted = np.empty(seq.shape) distribs = self.model.predict_distributions(seq) if not indices_to_predict: # predict everything in the sequence indices_to_predict = range(len(seq)) for idx in indices_to_predict: label = seq[idx] if label == '': # this is masked, predict num = self.predict_value_given_distributions(seq, idx, distribs, n_samples=n_samples) else: # not masked, simpily dequantize bin_arr = self.quantizer.get_bin_array_of_index(idx) num = self.quantizer.dequantize_label(label, bin_arr) predicted[idx] = num return predicted # sequantial prediction, i.e., the predicted sequence remain in labels for the iterative process def predict_sequence_at_week(self, seq, week, n_samples=100): """For a given week, predict all `{biome}_{week}` columns. Note that the return array consists of label strings instead of floats, as it is just an intermediate state and will be used for sequential prediction. Args: seq (numpy.ndarray): 1D array of label strings week (int): the week number n_samples (int, optional): the number of times to sample from qnet predictions for one masked entry. Defaults to 100. Returns: numpy.ndarray: 1D array of label strings """ predicted = seq.copy() distribs = self.model.predict_distributions(seq) col_indices = np.where(self.model.feature_names.str.endswith('_'+str(week)))[0] for idx in col_indices: # predict num = self.predict_value_given_distributions(seq, idx, distribs, n_samples=n_samples) # re-quantize qnet-predicted numeric values bin_arr = self.quantizer.get_bin_array_of_index(idx) label = self.quantizer.quantize_value(num, bin_arr) # fill the spot in masked for sequential feeding into qnet predicted[idx] = label return predicted def _mask_at_week(self, seq, week, fill_value=''): """Mask out all biome observations at the specified week Args: seq (numpy.ndarray): 1D array of label strings week (int): fill in empty strings for all biome_week column fill_value (str, optional): mask value, can be empty string, None, np.nan etc. Defaults to ''. Returns: [type]: [description] """ masked = seq.copy() col_indices = np.where(self.model.feature_names.str.contains(str(week)))[0] for idx in col_indices: masked[idx] = fill_value return masked def mask_sequence_at_weeks(self, seq, start_week, end_week, fill_value=''): """Mask out all biome observations between [start_week, end_week] Args: seq (numpy.ndarray): 1D array of label strings start_week (int): start masking from this week end_week (int): end masking after this week fill_value (str, optional): mask value, can be empty string, None, np.nan etc. Defaults to ''. Returns: numpy.ndarray: 1D array of label strings """ masked = seq.copy() for week in range(start_week, end_week + 1): col_indices = np.where(self.model.feature_names.str.contains(str(week)))[0] for idx in col_indices: masked[idx] = fill_value return masked def predict_sequentially_by_week(self, seq, start_week, end_week, n_samples=100): """Use qnet to generate sequential, iterative prediction of the sequence from `start_week` to `end_week`. This is accomplished by masking the current week to predict, use the qnet to predict a label for this masked entry (after which the qnet can update its prediction for the label distributions), masking the next week, and repeat. Args: seq (numpy.ndarray): 1D array of label strings start_week (int): start predicting from this week end_week (int): end predicting after this week n_samples (int, optional): the number of times to sample from qnet predictions for one masked entry. Defaults to 100. Returns: numpy.ndarray: 1D array of floats """ # apply mask masked = self.mask_sequence_at_weeks(seq, start_week, end_week) # feed into qnet sequentially, filling one week every iteration for week in range(start_week, end_week + 1): masked = self.predict_sequence_at_week(masked, week, n_samples=n_samples) # to generate a numeric seq result, dequantize all the labels ret = self.quantizer.dequantize_sequence(masked) return ret
Instance variables
var model
-
qnet model
Methods
def export_qnet_tree_dotfiles(self, out_dirname)
-
Generate tree dotfiles for each feature of the model
Args
out_dirname
:str
- the output directory, make one if doesn't exist
Expand source code
def export_qnet_tree_dotfiles(self, out_dirname): """Generate tree dotfiles for each feature of the model Args: out_dirname (str): the output directory, make one if doesn't exist """ assert self.model is not None if not os.path.exists(out_dirname): os.mkdir(out_dirname) for idx, feature_name in enumerate(self.model.feature_names): qnet.export_qnet_tree(self.model, idx, os.path.join(out_dirname, '{}.dot'.format(feature_name)), outformat='graphviz', detailed_output=True)
def get_max_timestamp(self)
-
Return the maximum timestamp in qnet model's feature names
Returns
int
- max timestamp
Expand source code
def get_max_timestamp(self): """Return the maximum timestamp in qnet model's feature names Returns: int: max timestamp """ assert self.model is not None pattern = r'[\D|\d]+_(\d+)' timestamps = [ int(re.findall(pattern, feature)[0]) for feature in self.model.feature_names ] return max(timestamps)
def load_qnet(self, in_fname, GZIP=False)
-
Load
self.model
from fileArgs
in_fname
:str
- input file containing a saved qnet model
gzip
:bool
- file is gunzipped, and must be decompressed (default: False)
Expand source code
def load_qnet(self, in_fname, GZIP=False): """Load `self.model` from file Args: in_fname (str): input file containing a saved qnet model gzip (bool): file is gunzipped, and must be decompressed (default: False) """ if GZIP: import gzip, shutil with gzip.open(in_fname, 'r') as f_in, open(in_fname.replace('.gz',''), 'wb') as f_out: shutil.copyfileobj(f_in, f_out) self.model = qnet.load_qnet(in_fname) if GZIP: import os os.remove(in_fname.replace('.gz',''))
def mask_sequence_at_weeks(self, seq, start_week, end_week, fill_value='')
-
Mask out all biome observations between [start_week, end_week]
Args
seq
:numpy.ndarray
- 1D array of label strings
start_week
:int
- start masking from this week
end_week
:int
- end masking after this week fill_value (str, optional): mask value, can be empty string, None, np.nan etc. Defaults to ''.
Returns
numpy.ndarray
- 1D array of label strings
Expand source code
def mask_sequence_at_weeks(self, seq, start_week, end_week, fill_value=''): """Mask out all biome observations between [start_week, end_week] Args: seq (numpy.ndarray): 1D array of label strings start_week (int): start masking from this week end_week (int): end masking after this week fill_value (str, optional): mask value, can be empty string, None, np.nan etc. Defaults to ''. Returns: numpy.ndarray: 1D array of label strings """ masked = seq.copy() for week in range(start_week, end_week + 1): col_indices = np.where(self.model.feature_names.str.contains(str(week)))[0] for idx in col_indices: masked[idx] = fill_value return masked
def predict_sequence(self, seq, indices_to_predict=None, n_samples=100)
-
Convert the label sequence into a numeric one by filling qnet predictions for masked entries (represented as an empty string) or simply dequantizing the non-masked entries
Args
seq
:numpy.ndarray
- 1D array of label strings
indices_to_predict
:list
, optional- a list of indices at which masks have been applied, for which we need to make qnet predictions. Defaults to None.
n_samples
:int
, optional- the number of times to sample from qnet predictions for one masked entry. Defaults to 100.
Returns
numpy.ndarray
- 1D array of floats
Expand source code
def predict_sequence(self, seq, indices_to_predict=None, n_samples=100): """Convert the label sequence into a numeric one by filling qnet predictions for masked entries (represented as an empty string) or simply dequantizing the non-masked entries Args: seq (numpy.ndarray): 1D array of label strings indices_to_predict (list, optional): a list of indices at which masks have been applied, for which we need to make qnet predictions. Defaults to None. n_samples (int, optional): the number of times to sample from qnet predictions for one masked entry. Defaults to 100. Returns: numpy.ndarray: 1D array of floats """ predicted = np.empty(seq.shape) distribs = self.model.predict_distributions(seq) if not indices_to_predict: # predict everything in the sequence indices_to_predict = range(len(seq)) for idx in indices_to_predict: label = seq[idx] if label == '': # this is masked, predict num = self.predict_value_given_distributions(seq, idx, distribs, n_samples=n_samples) else: # not masked, simpily dequantize bin_arr = self.quantizer.get_bin_array_of_index(idx) num = self.quantizer.dequantize_label(label, bin_arr) predicted[idx] = num return predicted
def predict_sequence_at_week(self, seq, week, n_samples=100)
-
For a given week, predict all
{biome}_{week}
columns. Note that the return array consists of label strings instead of floats, as it is just an intermediate state and will be used for sequential prediction.Args
seq
:numpy.ndarray
- 1D array of label strings
week
:int
- the week number
n_samples
:int
, optional- the number of times to sample from qnet predictions for one masked entry. Defaults to 100.
Returns
numpy.ndarray
- 1D array of label strings
Expand source code
def predict_sequence_at_week(self, seq, week, n_samples=100): """For a given week, predict all `{biome}_{week}` columns. Note that the return array consists of label strings instead of floats, as it is just an intermediate state and will be used for sequential prediction. Args: seq (numpy.ndarray): 1D array of label strings week (int): the week number n_samples (int, optional): the number of times to sample from qnet predictions for one masked entry. Defaults to 100. Returns: numpy.ndarray: 1D array of label strings """ predicted = seq.copy() distribs = self.model.predict_distributions(seq) col_indices = np.where(self.model.feature_names.str.endswith('_'+str(week)))[0] for idx in col_indices: # predict num = self.predict_value_given_distributions(seq, idx, distribs, n_samples=n_samples) # re-quantize qnet-predicted numeric values bin_arr = self.quantizer.get_bin_array_of_index(idx) label = self.quantizer.quantize_value(num, bin_arr) # fill the spot in masked for sequential feeding into qnet predicted[idx] = label return predicted
def predict_sequentially_by_week(self, seq, start_week, end_week, n_samples=100)
-
Use qnet to generate sequential, iterative prediction of the sequence from
start_week
toend_week
. This is accomplished by masking the current week to predict, use the qnet to predict a label for this masked entry (after which the qnet can update its prediction for the label distributions), masking the next week, and repeat.Args
seq
:numpy.ndarray
- 1D array of label strings
start_week
:int
- start predicting from this week
end_week
:int
- end predicting after this week
n_samples
:int
, optional- the number of times to sample from qnet predictions for one masked entry. Defaults to 100.
Returns
numpy.ndarray
- 1D array of floats
Expand source code
def predict_sequentially_by_week(self, seq, start_week, end_week, n_samples=100): """Use qnet to generate sequential, iterative prediction of the sequence from `start_week` to `end_week`. This is accomplished by masking the current week to predict, use the qnet to predict a label for this masked entry (after which the qnet can update its prediction for the label distributions), masking the next week, and repeat. Args: seq (numpy.ndarray): 1D array of label strings start_week (int): start predicting from this week end_week (int): end predicting after this week n_samples (int, optional): the number of times to sample from qnet predictions for one masked entry. Defaults to 100. Returns: numpy.ndarray: 1D array of floats """ # apply mask masked = self.mask_sequence_at_weeks(seq, start_week, end_week) # feed into qnet sequentially, filling one week every iteration for week in range(start_week, end_week + 1): masked = self.predict_sequence_at_week(masked, week, n_samples=n_samples) # to generate a numeric seq result, dequantize all the labels ret = self.quantizer.dequantize_sequence(masked) return ret
def predict_value_given_distributions(self, seq, idx, distribs, n_samples=100)
-
Predict a numeric value for the specified index of the label sequence, given the label distributions generated by the qnet. Sample
n_samples
times from the predictions, dequantize the sampled labels and take averageArgs
seq
:numpy.ndarray
- 1D array of label strings
idx
:int
- index into the input
seq
distribs
:list
- Produced by
quasinet.qnet.Qnet.predict_distributions(seq)
. See Quasinet documentations n_samples
:int
, optional- the number of times to sample from qnet predictions for one masked entry. Defaults to 100.
Returns
float
- predicted numeric value
Expand source code
def predict_value_given_distributions(self, seq, idx, distribs, n_samples=100): """Predict a numeric value for the specified index of the label sequence, given the label distributions generated by the qnet. Sample `n_samples` times from the predictions, dequantize the sampled labels and take average Args: seq (numpy.ndarray): 1D array of label strings idx (int): index into the input `seq` distribs (list): Produced by `quasinet.qnet.Qnet.predict_distributions(seq)`. See [Quasinet documentations](https://zeroknowledgediscovery.github.io/quasinet/build/html/quasinet.html#quasinet.qnet.Qnet.predict_distributions) n_samples (int, optional): the number of times to sample from qnet predictions for one masked entry. Defaults to 100. Returns: float: predicted numeric value """ distrib_dict = distribs[idx] bin_arr = self.quantizer.get_bin_array_of_index(idx) # sample n_samples samples = np.empty(n_samples) for i in range(n_samples): sampled = np.random.choice( list(distrib_dict.keys()), p=list(distrib_dict.values())) samples[i] = self.quantizer.dequantize_label(sampled, bin_arr) ret = samples.mean() return ret
def save_qnet(self, out_fname, GZIP=False)
-
Save
self.model
to fileArgs
out_fname
:str
- save file name
gzip
:bool
- gzip outfile if True (default: False)
Expand source code
def save_qnet(self, out_fname, GZIP=False): """Save `self.model` to file Args: out_fname (str): save file name gzip (bool): gzip outfile if True (default: False) """ assert self.model is not None qnet.save_qnet(self.model, f=out_fname, low_mem=False) if GZIP: import gzip def gzip_file(src_path, dst_path): with open(src_path, 'rb') as src, gzip.open(dst_path, 'wb') as dst: for chunk in iter(lambda: src.read(4096), b""): dst.write(chunk) gzip_file(out_fname,out_fname+'.gz')
def train_qnet(self, features, data, alpha, min_samples_split, time_column_name='week', out_fname=None, PACK_QUANTIZER=True)
-
Train the qnet model. If
out_fname
is present, also saves the model. The inputsfeatures, data
are produced byQuantizer.get_qnet_inputs
. See Quasinet documentations for the other parametersArgs
features
:list
- list: a list of feature names, ex.
['Acidobacteriota_35', 'Actinobacteriota_1', 'Actinobacteriota_2']
data
:numpy.ndarray
- 2D matrix of label strings
alpha
:float
- threshold value for selecting feature with permutation tests. Smaller values correspond to shallower trees
min_samples_split
:int
- minimum samples required for a split
out_fname
:str
, optional- save file name. Defaults to None.
PACK_QUANTIZER
:bool
, optional- pack quantizer within qnet data structure with attribute "quantizer"
Expand source code
def train_qnet(self, features, data, alpha, min_samples_split, time_column_name='week', out_fname=None, PACK_QUANTIZER=True): """Train the qnet model. If `out_fname` is present, also saves the model. The inputs `features, data` are produced by `Quantizer.get_qnet_inputs`. See [Quasinet documentations](https://zeroknowledgediscovery.github.io/quasinet/build/html/quasinet.html#module-quasinet.qnet) for the other parameters Args: features (list): list: a list of feature names, ex. `['Acidobacteriota_35', 'Actinobacteriota_1', 'Actinobacteriota_2']` data (numpy.ndarray): 2D matrix of label strings alpha (float): threshold value for selecting feature with permutation tests. Smaller values correspond to shallower trees min_samples_split (int): minimum samples required for a split out_fname (str, optional): save file name. Defaults to None. PACK_QUANTIZER (bool, optional): pack quantizer within qnet data structure with attribute "quantizer" """ self.model = qnet.Qnet(feature_names=features, alpha=alpha, min_samples_split = min_samples_split, n_jobs=-1) self.model.fit(data) if PACK_QUANTIZER: self.model.quantizer = self.quantizer self.model.train_data = data if out_fname: self.save_qnet(out_fname)