Module emergenet.emergenet

Emergenet: Fast Scalable Emergence Risk Assessment of Influenza A Strains Circulating In Non-human Hosts

Expand source code
"""
Emergenet: Fast Scalable Emergence Risk Assessment of Influenza A Strains Circulating In Non-human Hosts
"""
import re, os, json, joblib
import numpy as np
import pandas as pd
from typing import Tuple, Union
from datetime import date, datetime
from collections import Counter
from pkg_resources import resource_filename
from quasinet.qnet import Qnet, qdistance, qdistance_matrix
from .utils import filter_by_date_range, load_model, save_model


# Lengths to truncate sequences
HA_TRUNC = 560
NA_TRUNC = 460


class Enet(object):
    ''' Emergenet architecture for predicting emergence risk.
    ''' 

    def __init__(self, analysis_date:str, ha_seq:str, na_seq:str, 
                 pretrained_enet_path:str=None, save_data:str=None, random_state:int=None):
        ''' Initializes an Emergenet instance.

        Parameters
        ----------
        analysis_date - `PRESENT` or date of analysis (YYYY-MM-DD), supported from 2010-01-01 to 2024-01-01

        ha_seq - The target's HA sequence to be analysed by Emergenet
            
        na_seq - The target's NA sequence to be analysed by Emergenet

        save_data - Directory to save data to (Enet models, sequences used for training, etc.)

        random_state - Sets seed for random number generator
        
        pretrained_enet_path - Base path for all pretrained Enet models
        '''
        # Date
        if analysis_date != 'PRESENT':
            if not re.match(r'^[0-9]{4}-[0-9]{2}-[0-9]{2}$', analysis_date):
                raise ValueError('Date must be in format YYYY-MM-DD! or "PRESENT"')
            if analysis_date > '2024-01-01' or analysis_date < '2010-01-01':
                raise ValueError('Emergenet only supports sequences from 2010-01-01 to 2024-01-01!')
        self.analysis_date = analysis_date
        # Sequences
        if HA_TRUNC > len(ha_seq):
            print('HA sequence is shorter than required length (560), padding the end with Xs')
            ha_seq = ha_seq.ljust(HA_TRUNC, 'X')
        self.ha_seq = ha_seq.upper()[:HA_TRUNC]
        if NA_TRUNC > len(na_seq):
            print('NA sequence is shorter than required length (460), padding the end with Xs')
            na_seq = na_seq.ljust(NA_TRUNC, 'X')
        self.na_seq = na_seq.upper()[:NA_TRUNC]
        # Pretrained Enet path
        self.pretrained_enet_path = pretrained_enet_path
        # Save data
        self.save_data = save_data
        if save_data is not None:
            if not os.path.exists(save_data):
                os.makedirs(save_data, exist_ok=True)
            if self.analysis_date != 'PRESENT' and self.pretrained_enet_path is None:
                self.save_model = os.path.join(save_data, 'enet_models')
                self.save_sequences = os.path.join(save_data, 'data')
                self.save_results = os.path.join(save_data, 'results')
                os.makedirs(self.save_model, exist_ok=True)
                os.makedirs(self.save_sequences, exist_ok=True)
                os.makedirs(self.save_results, exist_ok=True)
        # Random state
        if random_state is not None and random_state < 0:
            raise ValueError('Seed must be between 0 and 2**32 - 1!')
        self.random_state = random_state

        
    def __repr__(self):
        return 'emergenet.Emergenet'


    def __str__(self):
        return self.__repr__()
    

    def _load_sequences(self, yearsbefore:int) -> pd.DataFrame:
        ''' Loads human sequences within yearsbefore years of the analysis date.

        Parameters
        ----------
        yearsbefore - Number of years prior to analysis_date to consider 

        Returns
        -------
        filtered - DataFrame of human sequences within one year of the analysis date
        '''
        filepath = resource_filename('emergenet', 'data/human.csv.gz')
        human = pd.read_csv(filepath, na_filter=False)
        end = datetime.strptime(self.analysis_date, '%Y-%m-%d').date()
        start = date(end.year - yearsbefore, end.month, end.day)
        filtered = filter_by_date_range(human, 'date', str(start), str(end))
        print('Number of human sequences:', len(filtered))
        print(Counter(filtered[filtered['segment'] == 'HA']['HA']))
        print(Counter(filtered[filtered['segment'] == 'NA']['NA']))
        return filtered
    

    def _sequence_array(self, segment:str, seq_df:pd.DataFrame, 
                        sample_size:int=None, include_target:bool=True) -> np.ndarray:
        ''' Extracts array of sequence arrays from DataFrame.

        Parameters
        ----------
        segment - Either 'HA' or 'NA'
            
        seq_df - DataFrame containing sequences

        sample_size - Number of sequences to sample randomly
            
        include_target - If true, includes target sequence

        Returns
        -------
        seq_lst - Array of sequence arrays
        ''' 
        if 'sequence' not in seq_df.columns:
            raise ValueError('The DataFrame must store sequences in `sequence` column!')
        if sample_size is not None:
            sample_size = min(sample_size, len(seq_df))
            seq_df = seq_df.sample(sample_size, random_state=self.random_state)
        seqs = seq_df['sequence'].values
        seq_lst = []
        if segment == 'HA':
            TRUNC = HA_TRUNC
            if include_target:
                seq_lst.append(np.array(list(self.ha_seq[:TRUNC])))
        elif segment == 'NA':
            TRUNC = NA_TRUNC
            if include_target:
                seq_lst.append(np.array(list(self.na_seq[:TRUNC])))
        for seq in seqs:
            if len(seq) < TRUNC:
                continue
            seq_lst.append(np.array(list(seq[:TRUNC])))
        seq_lst = np.array(seq_lst)
        return seq_lst

    
    def _compute_risks(self, segment:str, seq_df:pd.DataFrame, enet:Qnet) -> pd.DataFrame:
        ''' Computes risk score with qdistance.

        Parameters
        ----------
        segment - Either 'HA' or 'NA'

        seq_df - DataFrame of sequences

        enet - Emergenet that sequences in `seq_df` belong to

        Returns
        -------
        seq_df - The input `seq_df` with extra risk column
        ''' 
        if len(seq_df) < 1:
            raise ValueError('The DataFrame contains no sequences!')
        seq_arr = self._sequence_array(segment, seq_df, include_target=False)
        if segment == 'HA':
            TRUNC = HA_TRUNC
            target_seq = np.array(list(self.ha_seq[:TRUNC]))
        elif segment == 'NA':
            TRUNC = NA_TRUNC
            target_seq = np.array(list(self.na_seq[:TRUNC]))
        print('eo')
        qdist = qdistance_matrix(seq_arr, np.array([target_seq]), enet, enet)
        print('e1')

        seq_df['risk'] = qdist.ravel() 
        return seq_df
    

    def train(self, segment:str, seq_df:pd.DataFrame, sample_size:int=None, 
              include_target:bool=True, n_jobs:int=1) -> Qnet:
        ''' Trains an Emergenet model.

        Parameters
        ----------
        segment - Either 'HA' or 'NA'

        seq_df - DataFrame of sequences

        sample_size - Number of sequences to train Emergenet on, sampled randomly

        include_target - If true, includes target sequence

        n_jobs - Number of CPUs to use when training

        Returns
        -------
        enet - Trained Emergenet
        ''' 
        if len(seq_df) < 1:
            raise ValueError('The DataFrame contains no sequences!')
        if segment not in ['HA', 'NA']:
            raise ValueError('Segment must be either HA or NA!')
        if segment == 'HA':
            TRUNC = HA_TRUNC
        elif segment == 'NA':
            TRUNC = NA_TRUNC
        seq_arr = self._sequence_array(segment, seq_df, sample_size, include_target)
        enet = Qnet(feature_names=['x' + str(i) for i in np.arange(TRUNC)],
                    random_state=self.random_state, n_jobs=n_jobs)
        enet.fit(seq_arr)
        return enet
    

    def risk(self, yearsbefore:int=1, enet_sample_size:Union[int, float]=None, 
             risk_sample_size:Union[int, float]=None) -> Tuple[float, float]:
        ''' Computes risk scores for the target sequence.
        If `save_data` is not None, `analysis_date` is not 'PRESENT' and pretrained_enet_path is None, saves the following:
        1. Emergenet models: `save_data/enet_models/<subtype>.joblib`
        2. DataFrames of sequences used for training: `save_data/data/<subtype>.csv`
        3. Results (sequence DataFrames with extra `risk_score` column): `save_data/results/<subtype>.csv`
        4. Minimum risks for each HA and NA subtype: `save_data/results/<segment>_min_risks.csv`

        If `save_data` is not None, and `analysis_date` is 'PRESENT' or pretrained_enet_path is not None, saves the following:
        1. Results (sequence DataFrames with extra `risk_score` column): `save_data/<subtype>.csv`
        2. Minimum risks for each HA and NA subtype: `save_data/<segment>_min_risks.csv`

        Parameters
        ----------
        yearsbefore - Number of years prior to analysis_date to consider 
        
        enet_sample_size - Number of or proportion sequences of each subtype to train Emergenet on
        
        risk_sample_size - Number or proportion of unique sequences of each subtype to sample for risk estimation

        Returns
        -------
        ha_risk - Risk score for HA segment
        
        na_risk - Risk score for NA segment
        '''
        if self.analysis_date == 'PRESENT':
            filepath = resource_filename('emergenet', 'data/current_subtypes.json')
            with open(filepath, 'r') as file:
                current_subtypes = json.load(file)
            for segment in ['HA', 'NA']:
                risks = pd.DataFrame()
                if segment == 'HA':
                    TRUNC = HA_TRUNC
                elif segment == 'NA':
                    TRUNC = NA_TRUNC
                for subtype in current_subtypes[segment]:
                    # Load human sequences for and pretrained Enet models for current subtype
                    human_filepath = resource_filename('emergenet', f'data/current/{subtype}.csv')
                    model_filepath = resource_filename('emergenet', f'models/{subtype}.joblib.gz')
                    df = pd.read_csv(human_filepath, na_filter=False)
                    # Sample from human sequences if needed
                    if risk_sample_size is not None:
                        if isinstance(risk_sample_size, int):
                            sample_size = min(risk_sample_size, len(df))
                        elif isinstance(risk_sample_size, float) and risk_sample_size <= 1:
                            sample_size = int(risk_sample_size * len(df))
                        df = df.sample(n=sample_size, replace=False, random_state=self.random_state)
                    # Load Enet and compute risks
                    enet = load_model(model_filepath, gz=True)
                    df = self._compute_risks(segment, df, enet)
                    if self.save_data is not None:
                        df.to_csv(os.path.join(self.save_data, subtype + '.csv'), index=False)
                    # Save minimum risk for current subtype
                    risks[subtype] = [np.min(df['risk'])]
                # Save overall minimum risk
                if self.save_data is not None:
                    risks.to_csv(os.path.join(self.save_data, segment + '_min_risks.csv'), index=False)
                if segment == 'HA':
                    ha_risk = risks.min(axis=1).values[0] + 1e-5
                elif segment == 'NA':
                    na_risk = risks.min(axis=1).values[0] + 1e-5
            return ha_risk, na_risk

        elif self.pretrained_enet_path is not None:
            human = self._load_sequences(yearsbefore)
            for segment in ['HA', 'NA']:
                risks = pd.DataFrame()
                if segment == 'HA':
                    TRUNC = HA_TRUNC
                elif segment == 'NA':
                    TRUNC = NA_TRUNC
                human1 = human[human['segment'] == segment]
                human1 = human1[human1['sequence'].str.len() >= TRUNC]
                subtypes = Counter(human1[segment])
                for subtype in subtypes:
                    # Skip subtypes with less than 15 sequences
                    if subtypes[subtype] < 15:
                        continue                    
                    # Use only unique sequences for inference
                    df = human1[human1[segment] == subtype].drop_duplicates(subset=['sequence'])
                    # Sample from human sequences if needed
                    if risk_sample_size is not None:
                        if isinstance(risk_sample_size, int):
                            sample_size = min(risk_sample_size, len(df))
                        elif isinstance(risk_sample_size, float) and risk_sample_size <= 1:
                            sample_size = int(risk_sample_size * len(df))
                        df = df.sample(n=sample_size, replace=False, random_state=self.random_state)
                    # Load Enet and compute risks
                    enet = load_model(os.path.join(self.pretrained_enet_path, subtype + '.joblib'))
                    df = self._compute_risks(segment, df, enet)
                    if self.save_data is not None:
                        df.to_csv(os.path.join(self.save_data, subtype + '.csv'), index=False)
                    # Save minimum risk for current subtype
                    risks[subtype] = [np.min(df['risk'])]
                # Save overall minimum risk
                if self.save_data is not None:
                    risks.to_csv(os.path.join(self.save_data, segment + '_min_risks.csv'), index=False)
                if segment == 'HA':
                    ha_risk = risks.min(axis=1).values[0] + 1e-5
                elif segment == 'NA':
                    na_risk = risks.min(axis=1).values[0] + 1e-5
            return ha_risk, na_risk
        
        else:
            human = self._load_sequences(yearsbefore)
            for segment in ['HA', 'NA']:
                risks = pd.DataFrame()
                if segment == 'HA':
                    TRUNC = HA_TRUNC
                elif segment == 'NA':
                    TRUNC = NA_TRUNC
                human1 = human[human['segment'] == segment]
                human1 = human1[human1['sequence'].str.len() >= TRUNC]
                subtypes = Counter(human1[segment])
                for subtype in subtypes:
                    # Skip subtypes with less than 15 sequences
                    if subtypes[subtype] < 15:
                        continue
                    # Load human strains for constructing enet, sampling if needed
                    df = human1[human1[segment] == subtype]
                    if enet_sample_size is not None:
                        if isinstance(enet_sample_size, int):
                            sample_size = min(enet_sample_size, len(df))
                        elif isinstance(enet_sample_size, float) and enet_sample_size <= 1:
                            sample_size = int(enet_sample_size * len(df))
                        df = df.sample(n=sample_size, replace=False, random_state=self.random_state)
                    if self.save_data is not None:
                        df.to_csv(os.path.join(self.save_sequences, subtype + '.csv'), index=False)
                    # Train Enet
                    enet = self.train(segment, df)
                    if self.save_data is not None:
                        save_model(enet, os.path.join(self.save_model, subtype + '.joblib'))
                    # Use only unique sequences for inference
                    df = df.drop_duplicates(subset=['sequence'])
                    # Sample from human sequences if needed (only if we did not already sample for training Enet)
                    if risk_sample_size is not None and enet_sample_size is None:
                        if isinstance(risk_sample_size, int):
                            sample_size = min(risk_sample_size, len(df))
                        elif isinstance(risk_sample_size, float) and risk_sample_size <= 1:
                            sample_size = int(risk_sample_size * len(df))
                        df = df.sample(n=sample_size, replace=False, random_state=self.random_state)
                    # Compute risks
                    df = self._compute_risks(segment, df, enet)
                    if self.save_data is not None:
                        df.to_csv(os.path.join(self.save_results, subtype + '.csv'), index=False)
                    # Save minimum risk for current subtype
                    risks[subtype] = [np.min(df['risk'])]
                # Save overall minimum risk
                if self.save_data is not None:
                    risks.to_csv(os.path.join(self.save_results, segment + '_min_risks.csv'), index=False)
                if segment == 'HA':
                    ha_risk = risks.min(axis=1).values[0] + 1e-5
                elif segment == 'NA':
                    na_risk = risks.min(axis=1).values[0] + 1e-5
            return ha_risk, na_risk
        

def predict_irat_emergence(ha_risk:float, na_risk:float) -> Tuple[float, float, float]:
    ''' Computes IRAT emergence risk score.

    Parameters
    ----------
    ha_risk - Risk score for HA segment

    na_risk - Risk score for NA segment

    Returns
    -------
    irat_emergence - Predicted IRAT emergence risk score

    irat_emergence_low - Lower bound IRAT emergence risk score

    irat_emergence_high - Upper bound IRAT emergence risk score
    '''
    geom_mean = np.sqrt(ha_risk * na_risk)
    x = -np.log(geom_mean + 5e-4)
    emergence_model = joblib.load(resource_filename('emergenet', 'models/emergence_model.joblib'))
    emergence_low_model = joblib.load(resource_filename('emergenet', 'models/emergence_low_model.joblib'))
    emergence_high_model = joblib.load(resource_filename('emergenet', 'models/emergence_high_model.joblib'))
    irat_emergence = emergence_model['intercept'] + emergence_model['slope'] * x
    irat_emergence_low = emergence_low_model['intercept'] + emergence_low_model['slope'] * x
    irat_emergence_high = emergence_high_model['intercept'] + emergence_high_model['slope'] * x
    return irat_emergence, irat_emergence_low, irat_emergence_high
        

Functions

def predict_irat_emergence(ha_risk: float, na_risk: float) ‑> Tuple[float, float, float]

Computes IRAT emergence risk score.

Parameters

ha_risk - Risk score for HA segment

na_risk - Risk score for NA segment

Returns

irat_emergence - Predicted IRAT emergence risk score
 
irat_emergence_low - Lower bound IRAT emergence risk score
 
irat_emergence_high - Upper bound IRAT emergence risk score
 
Expand source code
def predict_irat_emergence(ha_risk:float, na_risk:float) -> Tuple[float, float, float]:
    ''' Computes IRAT emergence risk score.

    Parameters
    ----------
    ha_risk - Risk score for HA segment

    na_risk - Risk score for NA segment

    Returns
    -------
    irat_emergence - Predicted IRAT emergence risk score

    irat_emergence_low - Lower bound IRAT emergence risk score

    irat_emergence_high - Upper bound IRAT emergence risk score
    '''
    geom_mean = np.sqrt(ha_risk * na_risk)
    x = -np.log(geom_mean + 5e-4)
    emergence_model = joblib.load(resource_filename('emergenet', 'models/emergence_model.joblib'))
    emergence_low_model = joblib.load(resource_filename('emergenet', 'models/emergence_low_model.joblib'))
    emergence_high_model = joblib.load(resource_filename('emergenet', 'models/emergence_high_model.joblib'))
    irat_emergence = emergence_model['intercept'] + emergence_model['slope'] * x
    irat_emergence_low = emergence_low_model['intercept'] + emergence_low_model['slope'] * x
    irat_emergence_high = emergence_high_model['intercept'] + emergence_high_model['slope'] * x
    return irat_emergence, irat_emergence_low, irat_emergence_high

Classes

class Enet (analysis_date: str, ha_seq: str, na_seq: str, pretrained_enet_path: str = None, save_data: str = None, random_state: int = None)

Emergenet architecture for predicting emergence risk.

Initializes an Emergenet instance.

Parameters

analysis_date - PRESENT or date of analysis (YYYY-MM-DD), supported from 2010-01-01 to 2024-01-01

ha_seq - The target's HA sequence to be analysed by Emergenet

na_seq - The target's NA sequence to be analysed by Emergenet

save_data - Directory to save data to (Enet models, sequences used for training, etc.)

random_state - Sets seed for random number generator

pretrained_enet_path - Base path for all pretrained Enet models

Expand source code
class Enet(object):
    ''' Emergenet architecture for predicting emergence risk.
    ''' 

    def __init__(self, analysis_date:str, ha_seq:str, na_seq:str, 
                 pretrained_enet_path:str=None, save_data:str=None, random_state:int=None):
        ''' Initializes an Emergenet instance.

        Parameters
        ----------
        analysis_date - `PRESENT` or date of analysis (YYYY-MM-DD), supported from 2010-01-01 to 2024-01-01

        ha_seq - The target's HA sequence to be analysed by Emergenet
            
        na_seq - The target's NA sequence to be analysed by Emergenet

        save_data - Directory to save data to (Enet models, sequences used for training, etc.)

        random_state - Sets seed for random number generator
        
        pretrained_enet_path - Base path for all pretrained Enet models
        '''
        # Date
        if analysis_date != 'PRESENT':
            if not re.match(r'^[0-9]{4}-[0-9]{2}-[0-9]{2}$', analysis_date):
                raise ValueError('Date must be in format YYYY-MM-DD! or "PRESENT"')
            if analysis_date > '2024-01-01' or analysis_date < '2010-01-01':
                raise ValueError('Emergenet only supports sequences from 2010-01-01 to 2024-01-01!')
        self.analysis_date = analysis_date
        # Sequences
        if HA_TRUNC > len(ha_seq):
            print('HA sequence is shorter than required length (560), padding the end with Xs')
            ha_seq = ha_seq.ljust(HA_TRUNC, 'X')
        self.ha_seq = ha_seq.upper()[:HA_TRUNC]
        if NA_TRUNC > len(na_seq):
            print('NA sequence is shorter than required length (460), padding the end with Xs')
            na_seq = na_seq.ljust(NA_TRUNC, 'X')
        self.na_seq = na_seq.upper()[:NA_TRUNC]
        # Pretrained Enet path
        self.pretrained_enet_path = pretrained_enet_path
        # Save data
        self.save_data = save_data
        if save_data is not None:
            if not os.path.exists(save_data):
                os.makedirs(save_data, exist_ok=True)
            if self.analysis_date != 'PRESENT' and self.pretrained_enet_path is None:
                self.save_model = os.path.join(save_data, 'enet_models')
                self.save_sequences = os.path.join(save_data, 'data')
                self.save_results = os.path.join(save_data, 'results')
                os.makedirs(self.save_model, exist_ok=True)
                os.makedirs(self.save_sequences, exist_ok=True)
                os.makedirs(self.save_results, exist_ok=True)
        # Random state
        if random_state is not None and random_state < 0:
            raise ValueError('Seed must be between 0 and 2**32 - 1!')
        self.random_state = random_state

        
    def __repr__(self):
        return 'emergenet.Emergenet'


    def __str__(self):
        return self.__repr__()
    

    def _load_sequences(self, yearsbefore:int) -> pd.DataFrame:
        ''' Loads human sequences within yearsbefore years of the analysis date.

        Parameters
        ----------
        yearsbefore - Number of years prior to analysis_date to consider 

        Returns
        -------
        filtered - DataFrame of human sequences within one year of the analysis date
        '''
        filepath = resource_filename('emergenet', 'data/human.csv.gz')
        human = pd.read_csv(filepath, na_filter=False)
        end = datetime.strptime(self.analysis_date, '%Y-%m-%d').date()
        start = date(end.year - yearsbefore, end.month, end.day)
        filtered = filter_by_date_range(human, 'date', str(start), str(end))
        print('Number of human sequences:', len(filtered))
        print(Counter(filtered[filtered['segment'] == 'HA']['HA']))
        print(Counter(filtered[filtered['segment'] == 'NA']['NA']))
        return filtered
    

    def _sequence_array(self, segment:str, seq_df:pd.DataFrame, 
                        sample_size:int=None, include_target:bool=True) -> np.ndarray:
        ''' Extracts array of sequence arrays from DataFrame.

        Parameters
        ----------
        segment - Either 'HA' or 'NA'
            
        seq_df - DataFrame containing sequences

        sample_size - Number of sequences to sample randomly
            
        include_target - If true, includes target sequence

        Returns
        -------
        seq_lst - Array of sequence arrays
        ''' 
        if 'sequence' not in seq_df.columns:
            raise ValueError('The DataFrame must store sequences in `sequence` column!')
        if sample_size is not None:
            sample_size = min(sample_size, len(seq_df))
            seq_df = seq_df.sample(sample_size, random_state=self.random_state)
        seqs = seq_df['sequence'].values
        seq_lst = []
        if segment == 'HA':
            TRUNC = HA_TRUNC
            if include_target:
                seq_lst.append(np.array(list(self.ha_seq[:TRUNC])))
        elif segment == 'NA':
            TRUNC = NA_TRUNC
            if include_target:
                seq_lst.append(np.array(list(self.na_seq[:TRUNC])))
        for seq in seqs:
            if len(seq) < TRUNC:
                continue
            seq_lst.append(np.array(list(seq[:TRUNC])))
        seq_lst = np.array(seq_lst)
        return seq_lst

    
    def _compute_risks(self, segment:str, seq_df:pd.DataFrame, enet:Qnet) -> pd.DataFrame:
        ''' Computes risk score with qdistance.

        Parameters
        ----------
        segment - Either 'HA' or 'NA'

        seq_df - DataFrame of sequences

        enet - Emergenet that sequences in `seq_df` belong to

        Returns
        -------
        seq_df - The input `seq_df` with extra risk column
        ''' 
        if len(seq_df) < 1:
            raise ValueError('The DataFrame contains no sequences!')
        seq_arr = self._sequence_array(segment, seq_df, include_target=False)
        if segment == 'HA':
            TRUNC = HA_TRUNC
            target_seq = np.array(list(self.ha_seq[:TRUNC]))
        elif segment == 'NA':
            TRUNC = NA_TRUNC
            target_seq = np.array(list(self.na_seq[:TRUNC]))
        print('eo')
        qdist = qdistance_matrix(seq_arr, np.array([target_seq]), enet, enet)
        print('e1')

        seq_df['risk'] = qdist.ravel() 
        return seq_df
    

    def train(self, segment:str, seq_df:pd.DataFrame, sample_size:int=None, 
              include_target:bool=True, n_jobs:int=1) -> Qnet:
        ''' Trains an Emergenet model.

        Parameters
        ----------
        segment - Either 'HA' or 'NA'

        seq_df - DataFrame of sequences

        sample_size - Number of sequences to train Emergenet on, sampled randomly

        include_target - If true, includes target sequence

        n_jobs - Number of CPUs to use when training

        Returns
        -------
        enet - Trained Emergenet
        ''' 
        if len(seq_df) < 1:
            raise ValueError('The DataFrame contains no sequences!')
        if segment not in ['HA', 'NA']:
            raise ValueError('Segment must be either HA or NA!')
        if segment == 'HA':
            TRUNC = HA_TRUNC
        elif segment == 'NA':
            TRUNC = NA_TRUNC
        seq_arr = self._sequence_array(segment, seq_df, sample_size, include_target)
        enet = Qnet(feature_names=['x' + str(i) for i in np.arange(TRUNC)],
                    random_state=self.random_state, n_jobs=n_jobs)
        enet.fit(seq_arr)
        return enet
    

    def risk(self, yearsbefore:int=1, enet_sample_size:Union[int, float]=None, 
             risk_sample_size:Union[int, float]=None) -> Tuple[float, float]:
        ''' Computes risk scores for the target sequence.
        If `save_data` is not None, `analysis_date` is not 'PRESENT' and pretrained_enet_path is None, saves the following:
        1. Emergenet models: `save_data/enet_models/<subtype>.joblib`
        2. DataFrames of sequences used for training: `save_data/data/<subtype>.csv`
        3. Results (sequence DataFrames with extra `risk_score` column): `save_data/results/<subtype>.csv`
        4. Minimum risks for each HA and NA subtype: `save_data/results/<segment>_min_risks.csv`

        If `save_data` is not None, and `analysis_date` is 'PRESENT' or pretrained_enet_path is not None, saves the following:
        1. Results (sequence DataFrames with extra `risk_score` column): `save_data/<subtype>.csv`
        2. Minimum risks for each HA and NA subtype: `save_data/<segment>_min_risks.csv`

        Parameters
        ----------
        yearsbefore - Number of years prior to analysis_date to consider 
        
        enet_sample_size - Number of or proportion sequences of each subtype to train Emergenet on
        
        risk_sample_size - Number or proportion of unique sequences of each subtype to sample for risk estimation

        Returns
        -------
        ha_risk - Risk score for HA segment
        
        na_risk - Risk score for NA segment
        '''
        if self.analysis_date == 'PRESENT':
            filepath = resource_filename('emergenet', 'data/current_subtypes.json')
            with open(filepath, 'r') as file:
                current_subtypes = json.load(file)
            for segment in ['HA', 'NA']:
                risks = pd.DataFrame()
                if segment == 'HA':
                    TRUNC = HA_TRUNC
                elif segment == 'NA':
                    TRUNC = NA_TRUNC
                for subtype in current_subtypes[segment]:
                    # Load human sequences for and pretrained Enet models for current subtype
                    human_filepath = resource_filename('emergenet', f'data/current/{subtype}.csv')
                    model_filepath = resource_filename('emergenet', f'models/{subtype}.joblib.gz')
                    df = pd.read_csv(human_filepath, na_filter=False)
                    # Sample from human sequences if needed
                    if risk_sample_size is not None:
                        if isinstance(risk_sample_size, int):
                            sample_size = min(risk_sample_size, len(df))
                        elif isinstance(risk_sample_size, float) and risk_sample_size <= 1:
                            sample_size = int(risk_sample_size * len(df))
                        df = df.sample(n=sample_size, replace=False, random_state=self.random_state)
                    # Load Enet and compute risks
                    enet = load_model(model_filepath, gz=True)
                    df = self._compute_risks(segment, df, enet)
                    if self.save_data is not None:
                        df.to_csv(os.path.join(self.save_data, subtype + '.csv'), index=False)
                    # Save minimum risk for current subtype
                    risks[subtype] = [np.min(df['risk'])]
                # Save overall minimum risk
                if self.save_data is not None:
                    risks.to_csv(os.path.join(self.save_data, segment + '_min_risks.csv'), index=False)
                if segment == 'HA':
                    ha_risk = risks.min(axis=1).values[0] + 1e-5
                elif segment == 'NA':
                    na_risk = risks.min(axis=1).values[0] + 1e-5
            return ha_risk, na_risk

        elif self.pretrained_enet_path is not None:
            human = self._load_sequences(yearsbefore)
            for segment in ['HA', 'NA']:
                risks = pd.DataFrame()
                if segment == 'HA':
                    TRUNC = HA_TRUNC
                elif segment == 'NA':
                    TRUNC = NA_TRUNC
                human1 = human[human['segment'] == segment]
                human1 = human1[human1['sequence'].str.len() >= TRUNC]
                subtypes = Counter(human1[segment])
                for subtype in subtypes:
                    # Skip subtypes with less than 15 sequences
                    if subtypes[subtype] < 15:
                        continue                    
                    # Use only unique sequences for inference
                    df = human1[human1[segment] == subtype].drop_duplicates(subset=['sequence'])
                    # Sample from human sequences if needed
                    if risk_sample_size is not None:
                        if isinstance(risk_sample_size, int):
                            sample_size = min(risk_sample_size, len(df))
                        elif isinstance(risk_sample_size, float) and risk_sample_size <= 1:
                            sample_size = int(risk_sample_size * len(df))
                        df = df.sample(n=sample_size, replace=False, random_state=self.random_state)
                    # Load Enet and compute risks
                    enet = load_model(os.path.join(self.pretrained_enet_path, subtype + '.joblib'))
                    df = self._compute_risks(segment, df, enet)
                    if self.save_data is not None:
                        df.to_csv(os.path.join(self.save_data, subtype + '.csv'), index=False)
                    # Save minimum risk for current subtype
                    risks[subtype] = [np.min(df['risk'])]
                # Save overall minimum risk
                if self.save_data is not None:
                    risks.to_csv(os.path.join(self.save_data, segment + '_min_risks.csv'), index=False)
                if segment == 'HA':
                    ha_risk = risks.min(axis=1).values[0] + 1e-5
                elif segment == 'NA':
                    na_risk = risks.min(axis=1).values[0] + 1e-5
            return ha_risk, na_risk
        
        else:
            human = self._load_sequences(yearsbefore)
            for segment in ['HA', 'NA']:
                risks = pd.DataFrame()
                if segment == 'HA':
                    TRUNC = HA_TRUNC
                elif segment == 'NA':
                    TRUNC = NA_TRUNC
                human1 = human[human['segment'] == segment]
                human1 = human1[human1['sequence'].str.len() >= TRUNC]
                subtypes = Counter(human1[segment])
                for subtype in subtypes:
                    # Skip subtypes with less than 15 sequences
                    if subtypes[subtype] < 15:
                        continue
                    # Load human strains for constructing enet, sampling if needed
                    df = human1[human1[segment] == subtype]
                    if enet_sample_size is not None:
                        if isinstance(enet_sample_size, int):
                            sample_size = min(enet_sample_size, len(df))
                        elif isinstance(enet_sample_size, float) and enet_sample_size <= 1:
                            sample_size = int(enet_sample_size * len(df))
                        df = df.sample(n=sample_size, replace=False, random_state=self.random_state)
                    if self.save_data is not None:
                        df.to_csv(os.path.join(self.save_sequences, subtype + '.csv'), index=False)
                    # Train Enet
                    enet = self.train(segment, df)
                    if self.save_data is not None:
                        save_model(enet, os.path.join(self.save_model, subtype + '.joblib'))
                    # Use only unique sequences for inference
                    df = df.drop_duplicates(subset=['sequence'])
                    # Sample from human sequences if needed (only if we did not already sample for training Enet)
                    if risk_sample_size is not None and enet_sample_size is None:
                        if isinstance(risk_sample_size, int):
                            sample_size = min(risk_sample_size, len(df))
                        elif isinstance(risk_sample_size, float) and risk_sample_size <= 1:
                            sample_size = int(risk_sample_size * len(df))
                        df = df.sample(n=sample_size, replace=False, random_state=self.random_state)
                    # Compute risks
                    df = self._compute_risks(segment, df, enet)
                    if self.save_data is not None:
                        df.to_csv(os.path.join(self.save_results, subtype + '.csv'), index=False)
                    # Save minimum risk for current subtype
                    risks[subtype] = [np.min(df['risk'])]
                # Save overall minimum risk
                if self.save_data is not None:
                    risks.to_csv(os.path.join(self.save_results, segment + '_min_risks.csv'), index=False)
                if segment == 'HA':
                    ha_risk = risks.min(axis=1).values[0] + 1e-5
                elif segment == 'NA':
                    na_risk = risks.min(axis=1).values[0] + 1e-5
            return ha_risk, na_risk

Methods

def risk(self, yearsbefore: int = 1, enet_sample_size: Union[int, float] = None, risk_sample_size: Union[int, float] = None) ‑> Tuple[float, float]

Computes risk scores for the target sequence. If save_data is not None, analysis_date is not 'PRESENT' and pretrained_enet_path is None, saves the following: 1. Emergenet models: save_data/enet_models/<subtype>.joblib 2. DataFrames of sequences used for training: save_data/data/<subtype>.csv 3. Results (sequence DataFrames with extra risk_score column): save_data/results/<subtype>.csv 4. Minimum risks for each HA and NA subtype: save_data/results/<segment>_min_risks.csv

If save_data is not None, and analysis_date is 'PRESENT' or pretrained_enet_path is not None, saves the following: 1. Results (sequence DataFrames with extra risk_score column): save_data/<subtype>.csv 2. Minimum risks for each HA and NA subtype: save_data/<segment>_min_risks.csv

Parameters

yearsbefore - Number of years prior to analysis_date to consider

enet_sample_size - Number of or proportion sequences of each subtype to train Emergenet on

risk_sample_size - Number or proportion of unique sequences of each subtype to sample for risk estimation

Returns

ha_risk - Risk score for HA segment
 
na_risk - Risk score for NA segment
 
Expand source code
def risk(self, yearsbefore:int=1, enet_sample_size:Union[int, float]=None, 
         risk_sample_size:Union[int, float]=None) -> Tuple[float, float]:
    ''' Computes risk scores for the target sequence.
    If `save_data` is not None, `analysis_date` is not 'PRESENT' and pretrained_enet_path is None, saves the following:
    1. Emergenet models: `save_data/enet_models/<subtype>.joblib`
    2. DataFrames of sequences used for training: `save_data/data/<subtype>.csv`
    3. Results (sequence DataFrames with extra `risk_score` column): `save_data/results/<subtype>.csv`
    4. Minimum risks for each HA and NA subtype: `save_data/results/<segment>_min_risks.csv`

    If `save_data` is not None, and `analysis_date` is 'PRESENT' or pretrained_enet_path is not None, saves the following:
    1. Results (sequence DataFrames with extra `risk_score` column): `save_data/<subtype>.csv`
    2. Minimum risks for each HA and NA subtype: `save_data/<segment>_min_risks.csv`

    Parameters
    ----------
    yearsbefore - Number of years prior to analysis_date to consider 
    
    enet_sample_size - Number of or proportion sequences of each subtype to train Emergenet on
    
    risk_sample_size - Number or proportion of unique sequences of each subtype to sample for risk estimation

    Returns
    -------
    ha_risk - Risk score for HA segment
    
    na_risk - Risk score for NA segment
    '''
    if self.analysis_date == 'PRESENT':
        filepath = resource_filename('emergenet', 'data/current_subtypes.json')
        with open(filepath, 'r') as file:
            current_subtypes = json.load(file)
        for segment in ['HA', 'NA']:
            risks = pd.DataFrame()
            if segment == 'HA':
                TRUNC = HA_TRUNC
            elif segment == 'NA':
                TRUNC = NA_TRUNC
            for subtype in current_subtypes[segment]:
                # Load human sequences for and pretrained Enet models for current subtype
                human_filepath = resource_filename('emergenet', f'data/current/{subtype}.csv')
                model_filepath = resource_filename('emergenet', f'models/{subtype}.joblib.gz')
                df = pd.read_csv(human_filepath, na_filter=False)
                # Sample from human sequences if needed
                if risk_sample_size is not None:
                    if isinstance(risk_sample_size, int):
                        sample_size = min(risk_sample_size, len(df))
                    elif isinstance(risk_sample_size, float) and risk_sample_size <= 1:
                        sample_size = int(risk_sample_size * len(df))
                    df = df.sample(n=sample_size, replace=False, random_state=self.random_state)
                # Load Enet and compute risks
                enet = load_model(model_filepath, gz=True)
                df = self._compute_risks(segment, df, enet)
                if self.save_data is not None:
                    df.to_csv(os.path.join(self.save_data, subtype + '.csv'), index=False)
                # Save minimum risk for current subtype
                risks[subtype] = [np.min(df['risk'])]
            # Save overall minimum risk
            if self.save_data is not None:
                risks.to_csv(os.path.join(self.save_data, segment + '_min_risks.csv'), index=False)
            if segment == 'HA':
                ha_risk = risks.min(axis=1).values[0] + 1e-5
            elif segment == 'NA':
                na_risk = risks.min(axis=1).values[0] + 1e-5
        return ha_risk, na_risk

    elif self.pretrained_enet_path is not None:
        human = self._load_sequences(yearsbefore)
        for segment in ['HA', 'NA']:
            risks = pd.DataFrame()
            if segment == 'HA':
                TRUNC = HA_TRUNC
            elif segment == 'NA':
                TRUNC = NA_TRUNC
            human1 = human[human['segment'] == segment]
            human1 = human1[human1['sequence'].str.len() >= TRUNC]
            subtypes = Counter(human1[segment])
            for subtype in subtypes:
                # Skip subtypes with less than 15 sequences
                if subtypes[subtype] < 15:
                    continue                    
                # Use only unique sequences for inference
                df = human1[human1[segment] == subtype].drop_duplicates(subset=['sequence'])
                # Sample from human sequences if needed
                if risk_sample_size is not None:
                    if isinstance(risk_sample_size, int):
                        sample_size = min(risk_sample_size, len(df))
                    elif isinstance(risk_sample_size, float) and risk_sample_size <= 1:
                        sample_size = int(risk_sample_size * len(df))
                    df = df.sample(n=sample_size, replace=False, random_state=self.random_state)
                # Load Enet and compute risks
                enet = load_model(os.path.join(self.pretrained_enet_path, subtype + '.joblib'))
                df = self._compute_risks(segment, df, enet)
                if self.save_data is not None:
                    df.to_csv(os.path.join(self.save_data, subtype + '.csv'), index=False)
                # Save minimum risk for current subtype
                risks[subtype] = [np.min(df['risk'])]
            # Save overall minimum risk
            if self.save_data is not None:
                risks.to_csv(os.path.join(self.save_data, segment + '_min_risks.csv'), index=False)
            if segment == 'HA':
                ha_risk = risks.min(axis=1).values[0] + 1e-5
            elif segment == 'NA':
                na_risk = risks.min(axis=1).values[0] + 1e-5
        return ha_risk, na_risk
    
    else:
        human = self._load_sequences(yearsbefore)
        for segment in ['HA', 'NA']:
            risks = pd.DataFrame()
            if segment == 'HA':
                TRUNC = HA_TRUNC
            elif segment == 'NA':
                TRUNC = NA_TRUNC
            human1 = human[human['segment'] == segment]
            human1 = human1[human1['sequence'].str.len() >= TRUNC]
            subtypes = Counter(human1[segment])
            for subtype in subtypes:
                # Skip subtypes with less than 15 sequences
                if subtypes[subtype] < 15:
                    continue
                # Load human strains for constructing enet, sampling if needed
                df = human1[human1[segment] == subtype]
                if enet_sample_size is not None:
                    if isinstance(enet_sample_size, int):
                        sample_size = min(enet_sample_size, len(df))
                    elif isinstance(enet_sample_size, float) and enet_sample_size <= 1:
                        sample_size = int(enet_sample_size * len(df))
                    df = df.sample(n=sample_size, replace=False, random_state=self.random_state)
                if self.save_data is not None:
                    df.to_csv(os.path.join(self.save_sequences, subtype + '.csv'), index=False)
                # Train Enet
                enet = self.train(segment, df)
                if self.save_data is not None:
                    save_model(enet, os.path.join(self.save_model, subtype + '.joblib'))
                # Use only unique sequences for inference
                df = df.drop_duplicates(subset=['sequence'])
                # Sample from human sequences if needed (only if we did not already sample for training Enet)
                if risk_sample_size is not None and enet_sample_size is None:
                    if isinstance(risk_sample_size, int):
                        sample_size = min(risk_sample_size, len(df))
                    elif isinstance(risk_sample_size, float) and risk_sample_size <= 1:
                        sample_size = int(risk_sample_size * len(df))
                    df = df.sample(n=sample_size, replace=False, random_state=self.random_state)
                # Compute risks
                df = self._compute_risks(segment, df, enet)
                if self.save_data is not None:
                    df.to_csv(os.path.join(self.save_results, subtype + '.csv'), index=False)
                # Save minimum risk for current subtype
                risks[subtype] = [np.min(df['risk'])]
            # Save overall minimum risk
            if self.save_data is not None:
                risks.to_csv(os.path.join(self.save_results, segment + '_min_risks.csv'), index=False)
            if segment == 'HA':
                ha_risk = risks.min(axis=1).values[0] + 1e-5
            elif segment == 'NA':
                na_risk = risks.min(axis=1).values[0] + 1e-5
        return ha_risk, na_risk
def train(self, segment: str, seq_df: pandas.core.frame.DataFrame, sample_size: int = None, include_target: bool = True, n_jobs: int = 1) ‑> quasinet.qnet.Qnet

Trains an Emergenet model.

Parameters

segment - Either 'HA' or 'NA'

seq_df - DataFrame of sequences

sample_size - Number of sequences to train Emergenet on, sampled randomly

include_target - If true, includes target sequence

n_jobs - Number of CPUs to use when training

Returns

enet - Trained Emergenet
 
Expand source code
def train(self, segment:str, seq_df:pd.DataFrame, sample_size:int=None, 
          include_target:bool=True, n_jobs:int=1) -> Qnet:
    ''' Trains an Emergenet model.

    Parameters
    ----------
    segment - Either 'HA' or 'NA'

    seq_df - DataFrame of sequences

    sample_size - Number of sequences to train Emergenet on, sampled randomly

    include_target - If true, includes target sequence

    n_jobs - Number of CPUs to use when training

    Returns
    -------
    enet - Trained Emergenet
    ''' 
    if len(seq_df) < 1:
        raise ValueError('The DataFrame contains no sequences!')
    if segment not in ['HA', 'NA']:
        raise ValueError('Segment must be either HA or NA!')
    if segment == 'HA':
        TRUNC = HA_TRUNC
    elif segment == 'NA':
        TRUNC = NA_TRUNC
    seq_arr = self._sequence_array(segment, seq_df, sample_size, include_target)
    enet = Qnet(feature_names=['x' + str(i) for i in np.arange(TRUNC)],
                random_state=self.random_state, n_jobs=n_jobs)
    enet.fit(seq_arr)
    return enet