Module emergenet.domseq

Expand source code
import os
import numpy as np
import pandas as pd
from distance import hamming
from itertools import cycle
import shapely
import alphashape
from quasinet.qnet import Qnet, qdistance_matrix, membership_degree
from sklearn.cluster import MeanShift, estimate_bandwidth
from sklearn.manifold import MDS
from collections import Counter
import matplotlib.pyplot as plt
plt.style.use('fivethirtyeight')


# Area multiplier for alpha shapes
AREA_MULTIPLIER = 100


class DomSeq(object):
    ''' DomSeq architecture for predicting dominant sequences.
    ''' 

    def __init__(self, seq_trunc_length:int, random_state:int=42):
        ''' Initializes an DomSeq instance.

        Parameters
        ----------
        seq_trunc_length - Length to truncate sequences in Emergenet analysis
            (Sequences used to train Emergenet and compute E-distance must be of same length)

        random_state - Sets seed for random number generator
        '''
        if seq_trunc_length <= 0:
            raise ValueError('Length to truncate sequences must be positive!')
        self.seq_trunc_length = seq_trunc_length

        if random_state is not None and random_state < 0:
            raise ValueError('Seed must be between 0 and 2**32 - 1!')
        self.random_state = random_state

        
    def __repr__(self):
        return 'emergenet.DomSeq'

    
    def __str__(self):
        return self.__repr__()
    
    
    @staticmethod
    def _dm_to_df(dm:np.ndarray) -> pd.DataFrame:
        ''' Converts a distance matrix to a DataFrame.

        Parameters
        ----------
        dm - Distance matrix as 2D array

        Returns
        -------
        df - Distance matrix as DataFrame
        ''' 
        columns = np.arange(0, dm.shape[1])
        index = np.arange(0, dm.shape[0])
        df = pd.DataFrame(dm, columns=columns, index=index)
        return df

    
    def _sequence_array(self, seq_df:pd.DataFrame, sample_size:int=None) -> np.ndarray:
        ''' Extracts array of sequence arrays from DataFrame; includes target sequence.

        Parameters
        ----------
        seq_df - DataFrame containing sequences

        sample_size - Number of strains to sample randomly

        Returns
        -------
        seq_lst - Array of sequence arrays
        ''' 
        if 'sequence' not in seq_df.columns:
            raise ValueError('The DataFrame must store sequences in `sequence` column!')
        if sample_size is not None and sample_size < len(seq_df):
            seq_df = seq_df.sample(sample_size, random_state=self.random_state)
        seqs = seq_df['sequence'].apply(lambda x: np.array(list(x[:self.seq_trunc_length])))
        seq_lst = []
        for seq in seqs:
            seq_lst.append(seq)
        seq_lst = np.array(seq_lst)
        return seq_lst

    
    def train(self, seq_df:pd.DataFrame, sample_size:int=None, n_jobs:int=1) -> Qnet:
        ''' Trains an Emergenet model.

        Parameters
        ----------
        seq_df - DataFrame of sequences

        sample_size - Number of strains to train Emergenet on, sampled randomly

        n_jobs - Number of CPUs to use when training

        Returns
        -------
        enet - Trained Emergenet
        ''' 
        if len(seq_df) < 1:
            raise ValueError('The DataFrame contains no sequences!')
        seq_arr = self._sequence_array(seq_df, sample_size)
        enet = Qnet(feature_names=['x' + str(i) for i in np.arange(self.seq_trunc_length)],
                    random_state=self.random_state, n_jobs=n_jobs)
        enet.fit(seq_arr)
        return enet
    
    
    def _compute_risk_for_predict_domseq(self, seq_df:pd.DataFrame, 
                                         pred_seq_df:pd.DataFrame, enet:Qnet) -> pd.DataFrame:
        ''' Computes risk scores for potential prediction sequences.

        Parameters
        ----------
        seq_df - DataFrame of current population sequences
            
        pred_seq_df - DataFrame of candidate sequences

        enet - Emergenet that sequences in seq_df belong to

        Returns
        -------
        pred_seq_df - Potential prediction sequences, with additional columns for risk score
        ''' 
        # Distance matrix for computing prediction
        seq_arr = self._sequence_array(seq_df)
        pred_seq_arr = self._sequence_array(pred_seq_df)
        dist_matrix = qdistance_matrix(pred_seq_arr, seq_arr, enet, enet) 
        
        # Convert dist_matrix to dataframe
        dm = self._dm_to_df(dist_matrix)
        assert len(dm) == len(pred_seq_arr)
        
        # Find centroid data (sum across rows)
        first_term = dm.sum(axis='columns').values
        H = len(seq_df)
        A = 0.95/(np.sqrt(8) * self.seq_trunc_length**2)
        second_term = np.array([membership_degree(seq, enet) * H * A for seq in pred_seq_arr])
        sums = first_term - second_term
        pred_seq_df['first_term'] = first_term
        pred_seq_df['second_term'] = second_term
        pred_seq_df['sum'] = sums
        pred_seq_df = pred_seq_df.sort_values(by='sum',ascending=True)
        return pred_seq_df
    
    @staticmethod
    def _plot_embedding(dm_embed:np.ndarray, unique_clusters:Counter, clustering_predictions:np.array, 
                        n_clusters:int, save_data:str=None, alpha:int=4) -> list:
        ''' Plots the embedding and shows Enet and WHO predictions.

        Parameters
        ----------
        dm_embed - Embedding points
            
        unique_clusters - Counter (class, size) of unique clusters, sorted by size

        clustering_predictions - Array of clustering predictions

        n_clusters - Number of cluster alphashapes to draw

        save_data - Directory to save plot to
        
        alpha - Alpha parameter, default 4
            
        Returns
        -------
        cluster_areas - Areas of largest n_clusters
        '''
        # Add noise to offset equal points
        noise = np.random.normal(0, 0.02, size=dm_embed.shape)
        dm_embed += noise

        # Find unique clusters
        pts = []
        for class_ in unique_clusters:
            cluster_idx = np.where(clustering_predictions == class_[0])[0]
            pt = dm_embed[cluster_idx]
            x = pt[:,0]
            y = pt[:,1]
            points = np.column_stack((x, y))
            pts.append(points)

        # Plot clusters
        plt.figure(figsize=(10, 10))
        colors = cycle(['skyblue','red','orange','green', 'purple'])
        markers = cycle(['o', 'v'])
        
        # Plot at most 5 clusters
        cluster_areas = []
        for i in range(min(len(pts), 5)):
            pt = pts[i]
            x = pt[:,0]
            y = pt[:,1]
            points = np.column_stack((x, y))
            
            # Draw alphashape for the first n clusters if >= 3 points
            # Otherwise just scatter the points
            if i < n_clusters:
                if len(x) < 3:
                    cluster_areas.append(0)
                    plt.scatter(x, y, color = next(colors), alpha = 0.7, 
                                marker = next(markers), label = f'Size: {len(pt)}')
                else:
                    alpha_shape = alphashape.alphashape(points, alpha=alpha)
                    area = alpha_shape.area * AREA_MULTIPLIER
                    cluster_areas.append(area)
                    if isinstance(alpha_shape, shapely.geometry.multipolygon.MultiPolygon) or \
                       isinstance(alpha_shape, shapely.GeometryCollection):
                        for poly in alpha_shape.geoms:
                            plt.plot(*poly.exterior.xy, 'k-', linewidth=1)
                    else:
                        plt.plot(*alpha_shape.exterior.xy, 'k-', linewidth=1)
                    plt.scatter(x, y, color = next(colors), alpha = 0.7, marker = next(markers),
                                label = f'Size: {len(pt)}, Area: {round(area, 3)}')
            else:
                if len(x) == 1:
                    continue
                plt.scatter(x, y, color = next(colors), alpha = 0.7, 
                            marker = next(markers), label = f'Size: {len(pt)}')
        # Save figure
        plt.axis('auto')
        plt.legend(loc='upper left', bbox_to_anchor=(1, 1));
        if save_data is not None and os.path.isdir(save_data): 
            plt.savefig(os.path.join(save_data, 'embedding.png'), dpi=300, 
                        bbox_inches = 'tight', pad_inches = 0, transparent=True)
        return cluster_areas
    
    
    def predict_domseq(self, seq_df:pd.DataFrame, pred_seq_df:pd.DataFrame, enet:Qnet, 
                       n_clusters:int=3, sample_size:int=None, save_data:str=None) -> pd.DataFrame:
        ''' Predicts the future dominant sequence with multiple clusters.

        Parameters
        ----------
        seq_df - DataFrame of current population sequences
            
        pred_seq_df - DataFrame of candidate sequences

        enet - Emergenet that sequences in seq_df belong to
            
        n_clusters - Number of clusters to predict dominant strain on; default 3

        sample_size - Number of strains to sample randomly from seq_df
            
        save_data - If directory is given, save the following files:
            1. seq_df.csv (post-sampling, if sample_size is given)
            2. dm.csv (the distance matrix corresponding to seq_df)
            3. pred_seq_df_i.csv (pred_seq_df with columns for risk score, i = 1,2,...n_clusters)
            4. clusters.txt (clusters and cluster counts)
            5. embedding.png (embedding plot with alpha shapes drawn for n_clusters)

        Returns
        -------
        pred_seqs - Emergenet recommended sequences, with additional two columns for cluster counts and areas
        ''' 
        if len(seq_df) < 1 or len(pred_seq_df) < 1:
            raise ValueError('The DataFrame contains no sequences!')
        if sample_size is not None and sample_size < len(seq_df):
            seq_df = seq_df.sample(sample_size, random_state=self.random_state)
        if n_clusters <= 1:
            raise ValueError('For single cluster, please use predict_single_domseq!')
        if len(pred_seq_df) > 20000:
            pred_seq_df = pred_seq_df.sample(20000, random_state=self.random_state)
        
        # Parameter for alphashape
        ALPHA = 4
            
        # Distance matrix for clustering strains from current season
        seq_arr = self._sequence_array(seq_df)
        dist_matrix = qdistance_matrix(seq_arr, seq_arr, enet, enet)
        dm = self._dm_to_df(dist_matrix)
        if save_data is not None and os.path.isdir(save_data):
            seq_df.to_csv(os.path.join(save_data, 'seq_df.csv'), index=False)
            dm.to_csv(os.path.join(save_data, 'dm.csv'), index=False)
            
        # Clustering
        embedding = MDS(n_components=2, dissimilarity='precomputed', random_state=self.random_state)
        dm_embed = embedding.fit_transform(dm)
        bandwidth = estimate_bandwidth(dm_embed, quantile=0.3, random_state=self.random_state)
        bandwidth = max(bandwidth, 1e-5)
        clustering = MeanShift(bandwidth=bandwidth)
        clustering_predictions = clustering.fit_predict(dm_embed)
        
        # Normalize points
        mean = np.mean(dm_embed, axis=0)
        std = np.std(dm_embed, axis=0)
        dm_embed_normalized = (dm_embed - mean) / std
        if sum(std) == 0:
            dm_embed_normalized = dm_embed
        
        # Sort unique clusters by size
        label_counts = Counter(clustering_predictions)
        unique_clusters = sorted(label_counts.items(), key=lambda x: x[1], reverse=True)
        if save_data is not None and os.path.isdir(save_data):
            with open(os.path.join(save_data, 'clusters.txt'), 'w') as file:
                for item in unique_clusters:
                    file.write(str(item) + '\n')
        
        # Predictions
        pred_seqs = pd.DataFrame(columns=pred_seq_df.columns)
        cluster_counts = []
        for class_ in unique_clusters[:n_clusters]:
            # Take riskiest strain using this cluster
            wanted_names = dm.columns[clustering_predictions == class_[0]]
            cluster_seq_df = seq_df.iloc[wanted_names]
            pred_seq_df_with_risk = self._compute_risk_for_predict_domseq(cluster_seq_df, pred_seq_df, enet)
            if save_data is not None and os.path.isdir(save_data):
                pred_seq_df_with_risk.to_csv(os.path.join(save_data, 'pred_seq_df_'+str(class_)+'.csv'), index=False)
            
            # Save predictions
            pred_seqs = pred_seqs.append(pred_seq_df_with_risk.iloc[0])
            cluster_counts.append(len(wanted_names))
        
        # Cluster areas
        cluster_areas = self._plot_embedding(dm_embed_normalized, unique_clusters, 
                                             clustering_predictions, n_clusters, save_data, ALPHA)
            
        pred_seqs['cluster_count'] = cluster_counts
        pred_seqs['cluster_area'] = cluster_areas
        return pred_seqs


    def predict_single_domseq(self, pred_seqs:pd.DataFrame, pred_seq_df:pd.DataFrame) -> pd.DataFrame:
        ''' Predicts a single future dominant sequence with a single cluster.
        
        Parameters
        ----------
        pred_seqs - Emergenet recommended sequences, with additional column 'cluster_count'
                    This should have at least two rows (i.e. predictions from two clusters)
        
        pred_seq_df - DataFrame of candidate sequences
            
        Returns
        -------
        pred_seq - Emergenet recommended sequence
        ''' 
        if len(pred_seqs) < 2:
            raise ValueError('Not enough prediction sequences!')
        if len(pred_seq_df) < 1:
            raise ValueError('The DataFrame contains no sequences!')
            
        # Sort by cluster area and get top two cluster predictions
        pred_seqs = pred_seqs.sort_values(by='cluster_count',ascending=False)
        cluster_sizes = pred_seqs['cluster_count'].values
        cluster_ratio = cluster_sizes[1]/cluster_sizes[0]
        seq1 = pred_seqs['sequence'][0][:self.seq_trunc_length]
        seq2 = pred_seqs['sequence'][1][:self.seq_trunc_length]
        dist = hamming(seq1, seq2)
        
        # If there is one dominant cluster, return the prediction from that cluster
        if seq1 == seq2 or cluster_ratio <= 1/10:
            return pred_seqs.head(1)
        ratios = []
        
        # Find the strain x closest to satisfying dist(seq1,x)/dist(seq2,x) = cluster2/cluster1
        for seq in pred_seq_df['sequence'].values:
            dist1 = hamming(seq1, seq[:self.seq_trunc_length])
            dist2 = hamming(seq2, seq[:self.seq_trunc_length])
            if dist2 == 0 or dist1 + dist2 > dist:
                ratios.append(1e5)
                continue
            ratios.append(abs(dist1/dist2 - cluster_ratio))
        pred_seq_df['ratio'] = ratios
        pred_seq_df = pred_seq_df.sort_values(by='ratio', ascending=True)
        pred_seq = pred_seq_df.head(1)
        return pred_seq

Classes

class DomSeq (seq_trunc_length: int, random_state: int = 42)

DomSeq architecture for predicting dominant sequences.

Initializes an DomSeq instance.

Parameters

seq_trunc_length - Length to truncate sequences in Emergenet analysis (Sequences used to train Emergenet and compute E-distance must be of same length)

random_state - Sets seed for random number generator

Expand source code
class DomSeq(object):
    ''' DomSeq architecture for predicting dominant sequences.
    ''' 

    def __init__(self, seq_trunc_length:int, random_state:int=42):
        ''' Initializes an DomSeq instance.

        Parameters
        ----------
        seq_trunc_length - Length to truncate sequences in Emergenet analysis
            (Sequences used to train Emergenet and compute E-distance must be of same length)

        random_state - Sets seed for random number generator
        '''
        if seq_trunc_length <= 0:
            raise ValueError('Length to truncate sequences must be positive!')
        self.seq_trunc_length = seq_trunc_length

        if random_state is not None and random_state < 0:
            raise ValueError('Seed must be between 0 and 2**32 - 1!')
        self.random_state = random_state

        
    def __repr__(self):
        return 'emergenet.DomSeq'

    
    def __str__(self):
        return self.__repr__()
    
    
    @staticmethod
    def _dm_to_df(dm:np.ndarray) -> pd.DataFrame:
        ''' Converts a distance matrix to a DataFrame.

        Parameters
        ----------
        dm - Distance matrix as 2D array

        Returns
        -------
        df - Distance matrix as DataFrame
        ''' 
        columns = np.arange(0, dm.shape[1])
        index = np.arange(0, dm.shape[0])
        df = pd.DataFrame(dm, columns=columns, index=index)
        return df

    
    def _sequence_array(self, seq_df:pd.DataFrame, sample_size:int=None) -> np.ndarray:
        ''' Extracts array of sequence arrays from DataFrame; includes target sequence.

        Parameters
        ----------
        seq_df - DataFrame containing sequences

        sample_size - Number of strains to sample randomly

        Returns
        -------
        seq_lst - Array of sequence arrays
        ''' 
        if 'sequence' not in seq_df.columns:
            raise ValueError('The DataFrame must store sequences in `sequence` column!')
        if sample_size is not None and sample_size < len(seq_df):
            seq_df = seq_df.sample(sample_size, random_state=self.random_state)
        seqs = seq_df['sequence'].apply(lambda x: np.array(list(x[:self.seq_trunc_length])))
        seq_lst = []
        for seq in seqs:
            seq_lst.append(seq)
        seq_lst = np.array(seq_lst)
        return seq_lst

    
    def train(self, seq_df:pd.DataFrame, sample_size:int=None, n_jobs:int=1) -> Qnet:
        ''' Trains an Emergenet model.

        Parameters
        ----------
        seq_df - DataFrame of sequences

        sample_size - Number of strains to train Emergenet on, sampled randomly

        n_jobs - Number of CPUs to use when training

        Returns
        -------
        enet - Trained Emergenet
        ''' 
        if len(seq_df) < 1:
            raise ValueError('The DataFrame contains no sequences!')
        seq_arr = self._sequence_array(seq_df, sample_size)
        enet = Qnet(feature_names=['x' + str(i) for i in np.arange(self.seq_trunc_length)],
                    random_state=self.random_state, n_jobs=n_jobs)
        enet.fit(seq_arr)
        return enet
    
    
    def _compute_risk_for_predict_domseq(self, seq_df:pd.DataFrame, 
                                         pred_seq_df:pd.DataFrame, enet:Qnet) -> pd.DataFrame:
        ''' Computes risk scores for potential prediction sequences.

        Parameters
        ----------
        seq_df - DataFrame of current population sequences
            
        pred_seq_df - DataFrame of candidate sequences

        enet - Emergenet that sequences in seq_df belong to

        Returns
        -------
        pred_seq_df - Potential prediction sequences, with additional columns for risk score
        ''' 
        # Distance matrix for computing prediction
        seq_arr = self._sequence_array(seq_df)
        pred_seq_arr = self._sequence_array(pred_seq_df)
        dist_matrix = qdistance_matrix(pred_seq_arr, seq_arr, enet, enet) 
        
        # Convert dist_matrix to dataframe
        dm = self._dm_to_df(dist_matrix)
        assert len(dm) == len(pred_seq_arr)
        
        # Find centroid data (sum across rows)
        first_term = dm.sum(axis='columns').values
        H = len(seq_df)
        A = 0.95/(np.sqrt(8) * self.seq_trunc_length**2)
        second_term = np.array([membership_degree(seq, enet) * H * A for seq in pred_seq_arr])
        sums = first_term - second_term
        pred_seq_df['first_term'] = first_term
        pred_seq_df['second_term'] = second_term
        pred_seq_df['sum'] = sums
        pred_seq_df = pred_seq_df.sort_values(by='sum',ascending=True)
        return pred_seq_df
    
    @staticmethod
    def _plot_embedding(dm_embed:np.ndarray, unique_clusters:Counter, clustering_predictions:np.array, 
                        n_clusters:int, save_data:str=None, alpha:int=4) -> list:
        ''' Plots the embedding and shows Enet and WHO predictions.

        Parameters
        ----------
        dm_embed - Embedding points
            
        unique_clusters - Counter (class, size) of unique clusters, sorted by size

        clustering_predictions - Array of clustering predictions

        n_clusters - Number of cluster alphashapes to draw

        save_data - Directory to save plot to
        
        alpha - Alpha parameter, default 4
            
        Returns
        -------
        cluster_areas - Areas of largest n_clusters
        '''
        # Add noise to offset equal points
        noise = np.random.normal(0, 0.02, size=dm_embed.shape)
        dm_embed += noise

        # Find unique clusters
        pts = []
        for class_ in unique_clusters:
            cluster_idx = np.where(clustering_predictions == class_[0])[0]
            pt = dm_embed[cluster_idx]
            x = pt[:,0]
            y = pt[:,1]
            points = np.column_stack((x, y))
            pts.append(points)

        # Plot clusters
        plt.figure(figsize=(10, 10))
        colors = cycle(['skyblue','red','orange','green', 'purple'])
        markers = cycle(['o', 'v'])
        
        # Plot at most 5 clusters
        cluster_areas = []
        for i in range(min(len(pts), 5)):
            pt = pts[i]
            x = pt[:,0]
            y = pt[:,1]
            points = np.column_stack((x, y))
            
            # Draw alphashape for the first n clusters if >= 3 points
            # Otherwise just scatter the points
            if i < n_clusters:
                if len(x) < 3:
                    cluster_areas.append(0)
                    plt.scatter(x, y, color = next(colors), alpha = 0.7, 
                                marker = next(markers), label = f'Size: {len(pt)}')
                else:
                    alpha_shape = alphashape.alphashape(points, alpha=alpha)
                    area = alpha_shape.area * AREA_MULTIPLIER
                    cluster_areas.append(area)
                    if isinstance(alpha_shape, shapely.geometry.multipolygon.MultiPolygon) or \
                       isinstance(alpha_shape, shapely.GeometryCollection):
                        for poly in alpha_shape.geoms:
                            plt.plot(*poly.exterior.xy, 'k-', linewidth=1)
                    else:
                        plt.plot(*alpha_shape.exterior.xy, 'k-', linewidth=1)
                    plt.scatter(x, y, color = next(colors), alpha = 0.7, marker = next(markers),
                                label = f'Size: {len(pt)}, Area: {round(area, 3)}')
            else:
                if len(x) == 1:
                    continue
                plt.scatter(x, y, color = next(colors), alpha = 0.7, 
                            marker = next(markers), label = f'Size: {len(pt)}')
        # Save figure
        plt.axis('auto')
        plt.legend(loc='upper left', bbox_to_anchor=(1, 1));
        if save_data is not None and os.path.isdir(save_data): 
            plt.savefig(os.path.join(save_data, 'embedding.png'), dpi=300, 
                        bbox_inches = 'tight', pad_inches = 0, transparent=True)
        return cluster_areas
    
    
    def predict_domseq(self, seq_df:pd.DataFrame, pred_seq_df:pd.DataFrame, enet:Qnet, 
                       n_clusters:int=3, sample_size:int=None, save_data:str=None) -> pd.DataFrame:
        ''' Predicts the future dominant sequence with multiple clusters.

        Parameters
        ----------
        seq_df - DataFrame of current population sequences
            
        pred_seq_df - DataFrame of candidate sequences

        enet - Emergenet that sequences in seq_df belong to
            
        n_clusters - Number of clusters to predict dominant strain on; default 3

        sample_size - Number of strains to sample randomly from seq_df
            
        save_data - If directory is given, save the following files:
            1. seq_df.csv (post-sampling, if sample_size is given)
            2. dm.csv (the distance matrix corresponding to seq_df)
            3. pred_seq_df_i.csv (pred_seq_df with columns for risk score, i = 1,2,...n_clusters)
            4. clusters.txt (clusters and cluster counts)
            5. embedding.png (embedding plot with alpha shapes drawn for n_clusters)

        Returns
        -------
        pred_seqs - Emergenet recommended sequences, with additional two columns for cluster counts and areas
        ''' 
        if len(seq_df) < 1 or len(pred_seq_df) < 1:
            raise ValueError('The DataFrame contains no sequences!')
        if sample_size is not None and sample_size < len(seq_df):
            seq_df = seq_df.sample(sample_size, random_state=self.random_state)
        if n_clusters <= 1:
            raise ValueError('For single cluster, please use predict_single_domseq!')
        if len(pred_seq_df) > 20000:
            pred_seq_df = pred_seq_df.sample(20000, random_state=self.random_state)
        
        # Parameter for alphashape
        ALPHA = 4
            
        # Distance matrix for clustering strains from current season
        seq_arr = self._sequence_array(seq_df)
        dist_matrix = qdistance_matrix(seq_arr, seq_arr, enet, enet)
        dm = self._dm_to_df(dist_matrix)
        if save_data is not None and os.path.isdir(save_data):
            seq_df.to_csv(os.path.join(save_data, 'seq_df.csv'), index=False)
            dm.to_csv(os.path.join(save_data, 'dm.csv'), index=False)
            
        # Clustering
        embedding = MDS(n_components=2, dissimilarity='precomputed', random_state=self.random_state)
        dm_embed = embedding.fit_transform(dm)
        bandwidth = estimate_bandwidth(dm_embed, quantile=0.3, random_state=self.random_state)
        bandwidth = max(bandwidth, 1e-5)
        clustering = MeanShift(bandwidth=bandwidth)
        clustering_predictions = clustering.fit_predict(dm_embed)
        
        # Normalize points
        mean = np.mean(dm_embed, axis=0)
        std = np.std(dm_embed, axis=0)
        dm_embed_normalized = (dm_embed - mean) / std
        if sum(std) == 0:
            dm_embed_normalized = dm_embed
        
        # Sort unique clusters by size
        label_counts = Counter(clustering_predictions)
        unique_clusters = sorted(label_counts.items(), key=lambda x: x[1], reverse=True)
        if save_data is not None and os.path.isdir(save_data):
            with open(os.path.join(save_data, 'clusters.txt'), 'w') as file:
                for item in unique_clusters:
                    file.write(str(item) + '\n')
        
        # Predictions
        pred_seqs = pd.DataFrame(columns=pred_seq_df.columns)
        cluster_counts = []
        for class_ in unique_clusters[:n_clusters]:
            # Take riskiest strain using this cluster
            wanted_names = dm.columns[clustering_predictions == class_[0]]
            cluster_seq_df = seq_df.iloc[wanted_names]
            pred_seq_df_with_risk = self._compute_risk_for_predict_domseq(cluster_seq_df, pred_seq_df, enet)
            if save_data is not None and os.path.isdir(save_data):
                pred_seq_df_with_risk.to_csv(os.path.join(save_data, 'pred_seq_df_'+str(class_)+'.csv'), index=False)
            
            # Save predictions
            pred_seqs = pred_seqs.append(pred_seq_df_with_risk.iloc[0])
            cluster_counts.append(len(wanted_names))
        
        # Cluster areas
        cluster_areas = self._plot_embedding(dm_embed_normalized, unique_clusters, 
                                             clustering_predictions, n_clusters, save_data, ALPHA)
            
        pred_seqs['cluster_count'] = cluster_counts
        pred_seqs['cluster_area'] = cluster_areas
        return pred_seqs


    def predict_single_domseq(self, pred_seqs:pd.DataFrame, pred_seq_df:pd.DataFrame) -> pd.DataFrame:
        ''' Predicts a single future dominant sequence with a single cluster.
        
        Parameters
        ----------
        pred_seqs - Emergenet recommended sequences, with additional column 'cluster_count'
                    This should have at least two rows (i.e. predictions from two clusters)
        
        pred_seq_df - DataFrame of candidate sequences
            
        Returns
        -------
        pred_seq - Emergenet recommended sequence
        ''' 
        if len(pred_seqs) < 2:
            raise ValueError('Not enough prediction sequences!')
        if len(pred_seq_df) < 1:
            raise ValueError('The DataFrame contains no sequences!')
            
        # Sort by cluster area and get top two cluster predictions
        pred_seqs = pred_seqs.sort_values(by='cluster_count',ascending=False)
        cluster_sizes = pred_seqs['cluster_count'].values
        cluster_ratio = cluster_sizes[1]/cluster_sizes[0]
        seq1 = pred_seqs['sequence'][0][:self.seq_trunc_length]
        seq2 = pred_seqs['sequence'][1][:self.seq_trunc_length]
        dist = hamming(seq1, seq2)
        
        # If there is one dominant cluster, return the prediction from that cluster
        if seq1 == seq2 or cluster_ratio <= 1/10:
            return pred_seqs.head(1)
        ratios = []
        
        # Find the strain x closest to satisfying dist(seq1,x)/dist(seq2,x) = cluster2/cluster1
        for seq in pred_seq_df['sequence'].values:
            dist1 = hamming(seq1, seq[:self.seq_trunc_length])
            dist2 = hamming(seq2, seq[:self.seq_trunc_length])
            if dist2 == 0 or dist1 + dist2 > dist:
                ratios.append(1e5)
                continue
            ratios.append(abs(dist1/dist2 - cluster_ratio))
        pred_seq_df['ratio'] = ratios
        pred_seq_df = pred_seq_df.sort_values(by='ratio', ascending=True)
        pred_seq = pred_seq_df.head(1)
        return pred_seq

Methods

def predict_domseq(self, seq_df: pandas.core.frame.DataFrame, pred_seq_df: pandas.core.frame.DataFrame, enet: quasinet.qnet.Qnet, n_clusters: int = 3, sample_size: int = None, save_data: str = None) ‑> pandas.core.frame.DataFrame

Predicts the future dominant sequence with multiple clusters.

Parameters

seq_df - DataFrame of current population sequences

pred_seq_df - DataFrame of candidate sequences

enet - Emergenet that sequences in seq_df belong to

n_clusters - Number of clusters to predict dominant strain on; default 3

sample_size - Number of strains to sample randomly from seq_df

save_data - If directory is given, save the following files: 1. seq_df.csv (post-sampling, if sample_size is given) 2. dm.csv (the distance matrix corresponding to seq_df) 3. pred_seq_df_i.csv (pred_seq_df with columns for risk score, i = 1,2,…n_clusters) 4. clusters.txt (clusters and cluster counts) 5. embedding.png (embedding plot with alpha shapes drawn for n_clusters)

Returns

pred_seqs - Emergenet recommended sequences, with additional two columns for cluster counts and areas
 
Expand source code
def predict_domseq(self, seq_df:pd.DataFrame, pred_seq_df:pd.DataFrame, enet:Qnet, 
                   n_clusters:int=3, sample_size:int=None, save_data:str=None) -> pd.DataFrame:
    ''' Predicts the future dominant sequence with multiple clusters.

    Parameters
    ----------
    seq_df - DataFrame of current population sequences
        
    pred_seq_df - DataFrame of candidate sequences

    enet - Emergenet that sequences in seq_df belong to
        
    n_clusters - Number of clusters to predict dominant strain on; default 3

    sample_size - Number of strains to sample randomly from seq_df
        
    save_data - If directory is given, save the following files:
        1. seq_df.csv (post-sampling, if sample_size is given)
        2. dm.csv (the distance matrix corresponding to seq_df)
        3. pred_seq_df_i.csv (pred_seq_df with columns for risk score, i = 1,2,...n_clusters)
        4. clusters.txt (clusters and cluster counts)
        5. embedding.png (embedding plot with alpha shapes drawn for n_clusters)

    Returns
    -------
    pred_seqs - Emergenet recommended sequences, with additional two columns for cluster counts and areas
    ''' 
    if len(seq_df) < 1 or len(pred_seq_df) < 1:
        raise ValueError('The DataFrame contains no sequences!')
    if sample_size is not None and sample_size < len(seq_df):
        seq_df = seq_df.sample(sample_size, random_state=self.random_state)
    if n_clusters <= 1:
        raise ValueError('For single cluster, please use predict_single_domseq!')
    if len(pred_seq_df) > 20000:
        pred_seq_df = pred_seq_df.sample(20000, random_state=self.random_state)
    
    # Parameter for alphashape
    ALPHA = 4
        
    # Distance matrix for clustering strains from current season
    seq_arr = self._sequence_array(seq_df)
    dist_matrix = qdistance_matrix(seq_arr, seq_arr, enet, enet)
    dm = self._dm_to_df(dist_matrix)
    if save_data is not None and os.path.isdir(save_data):
        seq_df.to_csv(os.path.join(save_data, 'seq_df.csv'), index=False)
        dm.to_csv(os.path.join(save_data, 'dm.csv'), index=False)
        
    # Clustering
    embedding = MDS(n_components=2, dissimilarity='precomputed', random_state=self.random_state)
    dm_embed = embedding.fit_transform(dm)
    bandwidth = estimate_bandwidth(dm_embed, quantile=0.3, random_state=self.random_state)
    bandwidth = max(bandwidth, 1e-5)
    clustering = MeanShift(bandwidth=bandwidth)
    clustering_predictions = clustering.fit_predict(dm_embed)
    
    # Normalize points
    mean = np.mean(dm_embed, axis=0)
    std = np.std(dm_embed, axis=0)
    dm_embed_normalized = (dm_embed - mean) / std
    if sum(std) == 0:
        dm_embed_normalized = dm_embed
    
    # Sort unique clusters by size
    label_counts = Counter(clustering_predictions)
    unique_clusters = sorted(label_counts.items(), key=lambda x: x[1], reverse=True)
    if save_data is not None and os.path.isdir(save_data):
        with open(os.path.join(save_data, 'clusters.txt'), 'w') as file:
            for item in unique_clusters:
                file.write(str(item) + '\n')
    
    # Predictions
    pred_seqs = pd.DataFrame(columns=pred_seq_df.columns)
    cluster_counts = []
    for class_ in unique_clusters[:n_clusters]:
        # Take riskiest strain using this cluster
        wanted_names = dm.columns[clustering_predictions == class_[0]]
        cluster_seq_df = seq_df.iloc[wanted_names]
        pred_seq_df_with_risk = self._compute_risk_for_predict_domseq(cluster_seq_df, pred_seq_df, enet)
        if save_data is not None and os.path.isdir(save_data):
            pred_seq_df_with_risk.to_csv(os.path.join(save_data, 'pred_seq_df_'+str(class_)+'.csv'), index=False)
        
        # Save predictions
        pred_seqs = pred_seqs.append(pred_seq_df_with_risk.iloc[0])
        cluster_counts.append(len(wanted_names))
    
    # Cluster areas
    cluster_areas = self._plot_embedding(dm_embed_normalized, unique_clusters, 
                                         clustering_predictions, n_clusters, save_data, ALPHA)
        
    pred_seqs['cluster_count'] = cluster_counts
    pred_seqs['cluster_area'] = cluster_areas
    return pred_seqs
def predict_single_domseq(self, pred_seqs: pandas.core.frame.DataFrame, pred_seq_df: pandas.core.frame.DataFrame) ‑> pandas.core.frame.DataFrame

Predicts a single future dominant sequence with a single cluster.

Parameters

pred_seqs - Emergenet recommended sequences, with additional column 'cluster_count' This should have at least two rows (i.e. predictions from two clusters)

pred_seq_df - DataFrame of candidate sequences

Returns

pred_seq - Emergenet recommended sequence
 
Expand source code
def predict_single_domseq(self, pred_seqs:pd.DataFrame, pred_seq_df:pd.DataFrame) -> pd.DataFrame:
    ''' Predicts a single future dominant sequence with a single cluster.
    
    Parameters
    ----------
    pred_seqs - Emergenet recommended sequences, with additional column 'cluster_count'
                This should have at least two rows (i.e. predictions from two clusters)
    
    pred_seq_df - DataFrame of candidate sequences
        
    Returns
    -------
    pred_seq - Emergenet recommended sequence
    ''' 
    if len(pred_seqs) < 2:
        raise ValueError('Not enough prediction sequences!')
    if len(pred_seq_df) < 1:
        raise ValueError('The DataFrame contains no sequences!')
        
    # Sort by cluster area and get top two cluster predictions
    pred_seqs = pred_seqs.sort_values(by='cluster_count',ascending=False)
    cluster_sizes = pred_seqs['cluster_count'].values
    cluster_ratio = cluster_sizes[1]/cluster_sizes[0]
    seq1 = pred_seqs['sequence'][0][:self.seq_trunc_length]
    seq2 = pred_seqs['sequence'][1][:self.seq_trunc_length]
    dist = hamming(seq1, seq2)
    
    # If there is one dominant cluster, return the prediction from that cluster
    if seq1 == seq2 or cluster_ratio <= 1/10:
        return pred_seqs.head(1)
    ratios = []
    
    # Find the strain x closest to satisfying dist(seq1,x)/dist(seq2,x) = cluster2/cluster1
    for seq in pred_seq_df['sequence'].values:
        dist1 = hamming(seq1, seq[:self.seq_trunc_length])
        dist2 = hamming(seq2, seq[:self.seq_trunc_length])
        if dist2 == 0 or dist1 + dist2 > dist:
            ratios.append(1e5)
            continue
        ratios.append(abs(dist1/dist2 - cluster_ratio))
    pred_seq_df['ratio'] = ratios
    pred_seq_df = pred_seq_df.sort_values(by='ratio', ascending=True)
    pred_seq = pred_seq_df.head(1)
    return pred_seq
def train(self, seq_df: pandas.core.frame.DataFrame, sample_size: int = None, n_jobs: int = 1) ‑> quasinet.qnet.Qnet

Trains an Emergenet model.

Parameters

seq_df - DataFrame of sequences

sample_size - Number of strains to train Emergenet on, sampled randomly

n_jobs - Number of CPUs to use when training

Returns

enet - Trained Emergenet
 
Expand source code
def train(self, seq_df:pd.DataFrame, sample_size:int=None, n_jobs:int=1) -> Qnet:
    ''' Trains an Emergenet model.

    Parameters
    ----------
    seq_df - DataFrame of sequences

    sample_size - Number of strains to train Emergenet on, sampled randomly

    n_jobs - Number of CPUs to use when training

    Returns
    -------
    enet - Trained Emergenet
    ''' 
    if len(seq_df) < 1:
        raise ValueError('The DataFrame contains no sequences!')
    seq_arr = self._sequence_array(seq_df, sample_size)
    enet = Qnet(feature_names=['x' + str(i) for i in np.arange(self.seq_trunc_length)],
                random_state=self.random_state, n_jobs=n_jobs)
    enet.fit(seq_arr)
    return enet