Module emergenet.domseq

Expand source code
import os
import numpy as np
import pandas as pd
from distance import hamming
from itertools import cycle
import shapely
import alphashape
from quasinet.qnet import Qnet, qdistance_matrix, membership_degree
from sklearn.cluster import MeanShift, estimate_bandwidth
from sklearn.manifold import MDS
from collections import Counter
import matplotlib.pyplot as plt'fivethirtyeight')

# Area multiplier for alpha shapes

class DomSeq(object):
    ''' DomSeq architecture for predicting dominant sequences.

    def __init__(self, seq_trunc_length:int, random_state:int=42):
        ''' Initializes an DomSeq instance.

        seq_trunc_length - Length to truncate sequences in Emergenet analysis
            (Sequences used to train Emergenet and compute E-distance must be of same length)

        random_state - Sets seed for random number generator
        if seq_trunc_length <= 0:
            raise ValueError('Length to truncate sequences must be positive!')
        self.seq_trunc_length = seq_trunc_length

        if random_state is not None and random_state < 0:
            raise ValueError('Seed must be between 0 and 2**32 - 1!')
        self.random_state = random_state

    def __repr__(self):
        return 'emergenet.DomSeq'

    def __str__(self):
        return self.__repr__()
    def _dm_to_df(dm:np.ndarray) -> pd.DataFrame:
        ''' Converts a distance matrix to a DataFrame.

        dm - Distance matrix as 2D array

        df - Distance matrix as DataFrame
        columns = np.arange(0, dm.shape[1])
        index = np.arange(0, dm.shape[0])
        df = pd.DataFrame(dm, columns=columns, index=index)
        return df

    def _sequence_array(self, seq_df:pd.DataFrame, sample_size:int=None) -> np.ndarray:
        ''' Extracts array of sequence arrays from DataFrame; includes target sequence.

        seq_df - DataFrame containing sequences

        sample_size - Number of strains to sample randomly

        seq_lst - Array of sequence arrays
        if 'sequence' not in seq_df.columns:
            raise ValueError('The DataFrame must store sequences in `sequence` column!')
        if sample_size is not None and sample_size < len(seq_df):
            seq_df = seq_df.sample(sample_size, random_state=self.random_state)
        seqs = seq_df['sequence'].apply(lambda x: np.array(list(x[:self.seq_trunc_length])))
        seq_lst = []
        for seq in seqs:
        seq_lst = np.array(seq_lst)
        return seq_lst

    def train(self, seq_df:pd.DataFrame, sample_size:int=None, n_jobs:int=1) -> Qnet:
        ''' Trains an Emergenet model.

        seq_df - DataFrame of sequences

        sample_size - Number of strains to train Emergenet on, sampled randomly

        n_jobs - Number of CPUs to use when training

        enet - Trained Emergenet
        if len(seq_df) < 1:
            raise ValueError('The DataFrame contains no sequences!')
        seq_arr = self._sequence_array(seq_df, sample_size)
        enet = Qnet(feature_names=['x' + str(i) for i in np.arange(self.seq_trunc_length)],
                    random_state=self.random_state, n_jobs=n_jobs)
        return enet
    def _compute_risk_for_predict_domseq(self, seq_df:pd.DataFrame, 
                                         pred_seq_df:pd.DataFrame, enet:Qnet) -> pd.DataFrame:
        ''' Computes risk scores for potential prediction sequences.

        seq_df - DataFrame of current population sequences
        pred_seq_df - DataFrame of candidate sequences

        enet - Emergenet that sequences in seq_df belong to

        pred_seq_df - Potential prediction sequences, with additional columns for risk score
        # Distance matrix for computing prediction
        seq_arr = self._sequence_array(seq_df)
        pred_seq_arr = self._sequence_array(pred_seq_df)
        dist_matrix = qdistance_matrix(pred_seq_arr, seq_arr, enet, enet) 
        # Convert dist_matrix to dataframe
        dm = self._dm_to_df(dist_matrix)
        assert len(dm) == len(pred_seq_arr)
        # Find centroid data (sum across rows)
        first_term = dm.sum(axis='columns').values
        H = len(seq_df)
        A = 0.95/(np.sqrt(8) * self.seq_trunc_length**2)
        second_term = np.array([membership_degree(seq, enet) * H * A for seq in pred_seq_arr])
        sums = first_term - second_term
        pred_seq_df['first_term'] = first_term
        pred_seq_df['second_term'] = second_term
        pred_seq_df['sum'] = sums
        pred_seq_df = pred_seq_df.sort_values(by='sum',ascending=True)
        return pred_seq_df
    def _plot_embedding(dm_embed:np.ndarray, unique_clusters:Counter, clustering_predictions:np.array, 
                        n_clusters:int, save_data:str=None, alpha:int=4) -> list:
        ''' Plots the embedding and shows Enet and WHO predictions.

        dm_embed - Embedding points
        unique_clusters - Counter (class, size) of unique clusters, sorted by size

        clustering_predictions - Array of clustering predictions

        n_clusters - Number of cluster alphashapes to draw

        save_data - Directory to save plot to
        alpha - Alpha parameter, default 4
        cluster_areas - Areas of largest n_clusters
        # Add noise to offset equal points
        noise = np.random.normal(0, 0.02, size=dm_embed.shape)
        dm_embed += noise

        # Find unique clusters
        pts = []
        for class_ in unique_clusters:
            cluster_idx = np.where(clustering_predictions == class_[0])[0]
            pt = dm_embed[cluster_idx]
            x = pt[:,0]
            y = pt[:,1]
            points = np.column_stack((x, y))

        # Plot clusters
        plt.figure(figsize=(10, 10))
        colors = cycle(['skyblue','red','orange','green', 'purple'])
        markers = cycle(['o', 'v'])
        # Plot at most 5 clusters
        cluster_areas = []
        for i in range(min(len(pts), 5)):
            pt = pts[i]
            x = pt[:,0]
            y = pt[:,1]
            points = np.column_stack((x, y))
            # Draw alphashape for the first n clusters if >= 3 points
            # Otherwise just scatter the points
            if i < n_clusters:
                if len(x) < 3:
                    plt.scatter(x, y, color = next(colors), alpha = 0.7, 
                                marker = next(markers), label = f'Size: {len(pt)}')
                    alpha_shape = alphashape.alphashape(points, alpha=alpha)
                    area = alpha_shape.area * AREA_MULTIPLIER
                    if isinstance(alpha_shape, shapely.geometry.multipolygon.MultiPolygon) or \
                       isinstance(alpha_shape, shapely.GeometryCollection):
                        for poly in alpha_shape.geoms:
                            plt.plot(*poly.exterior.xy, 'k-', linewidth=1)
                        plt.plot(*alpha_shape.exterior.xy, 'k-', linewidth=1)
                    plt.scatter(x, y, color = next(colors), alpha = 0.7, marker = next(markers),
                                label = f'Size: {len(pt)}, Area: {round(area, 3)}')
                if len(x) == 1:
                plt.scatter(x, y, color = next(colors), alpha = 0.7, 
                            marker = next(markers), label = f'Size: {len(pt)}')
        # Save figure
        plt.legend(loc='upper left', bbox_to_anchor=(1, 1));
        if save_data is not None and os.path.isdir(save_data): 
            plt.savefig(os.path.join(save_data, 'embedding.png'), dpi=300, 
                        bbox_inches = 'tight', pad_inches = 0, transparent=True)
        return cluster_areas
    def predict_domseq(self, seq_df:pd.DataFrame, pred_seq_df:pd.DataFrame, enet:Qnet, 
                       n_clusters:int=3, sample_size:int=None, save_data:str=None) -> pd.DataFrame:
        ''' Predicts the future dominant sequence with multiple clusters.

        seq_df - DataFrame of current population sequences
        pred_seq_df - DataFrame of candidate sequences

        enet - Emergenet that sequences in seq_df belong to
        n_clusters - Number of clusters to predict dominant strain on; default 3

        sample_size - Number of strains to sample randomly from seq_df
        save_data - If directory is given, save the following files:
            1. seq_df.csv (post-sampling, if sample_size is given)
            2. dm.csv (the distance matrix corresponding to seq_df)
            3. pred_seq_df_i.csv (pred_seq_df with columns for risk score, i = 1,2,...n_clusters)
            4. clusters.txt (clusters and cluster counts)
            5. embedding.png (embedding plot with alpha shapes drawn for n_clusters)

        pred_seqs - Emergenet recommended sequences, with additional two columns for cluster counts and areas
        if len(seq_df) < 1 or len(pred_seq_df) < 1:
            raise ValueError('The DataFrame contains no sequences!')
        if sample_size is not None and sample_size < len(seq_df):
            seq_df = seq_df.sample(sample_size, random_state=self.random_state)
        if n_clusters <= 1:
            raise ValueError('For single cluster, please use predict_single_domseq!')
        if len(pred_seq_df) > 20000:
            pred_seq_df = pred_seq_df.sample(20000, random_state=self.random_state)
        # Parameter for alphashape
        ALPHA = 4
        # Distance matrix for clustering strains from current season
        seq_arr = self._sequence_array(seq_df)
        dist_matrix = qdistance_matrix(seq_arr, seq_arr, enet, enet)
        dm = self._dm_to_df(dist_matrix)
        if save_data is not None and os.path.isdir(save_data):
            seq_df.to_csv(os.path.join(save_data, 'seq_df.csv'), index=False)
            dm.to_csv(os.path.join(save_data, 'dm.csv'), index=False)
        # Clustering
        embedding = MDS(n_components=2, dissimilarity='precomputed', random_state=self.random_state)
        dm_embed = embedding.fit_transform(dm)
        bandwidth = estimate_bandwidth(dm_embed, quantile=0.3, random_state=self.random_state)
        bandwidth = max(bandwidth, 1e-5)
        clustering = MeanShift(bandwidth=bandwidth)
        clustering_predictions = clustering.fit_predict(dm_embed)
        # Normalize points
        mean = np.mean(dm_embed, axis=0)
        std = np.std(dm_embed, axis=0)
        dm_embed_normalized = (dm_embed - mean) / std
        if sum(std) == 0:
            dm_embed_normalized = dm_embed
        # Sort unique clusters by size
        label_counts = Counter(clustering_predictions)
        unique_clusters = sorted(label_counts.items(), key=lambda x: x[1], reverse=True)
        if save_data is not None and os.path.isdir(save_data):
            with open(os.path.join(save_data, 'clusters.txt'), 'w') as file:
                for item in unique_clusters:
                    file.write(str(item) + '\n')
        # Predictions
        pred_seqs = pd.DataFrame(columns=pred_seq_df.columns)
        cluster_counts = []
        for class_ in unique_clusters[:n_clusters]:
            # Take riskiest strain using this cluster
            wanted_names = dm.columns[clustering_predictions == class_[0]]
            cluster_seq_df = seq_df.iloc[wanted_names]
            pred_seq_df_with_risk = self._compute_risk_for_predict_domseq(cluster_seq_df, pred_seq_df, enet)
            if save_data is not None and os.path.isdir(save_data):
                pred_seq_df_with_risk.to_csv(os.path.join(save_data, 'pred_seq_df_'+str(class_)+'.csv'), index=False)
            # Save predictions
            pred_seqs = pred_seqs.append(pred_seq_df_with_risk.iloc[0])
        # Cluster areas
        cluster_areas = self._plot_embedding(dm_embed_normalized, unique_clusters, 
                                             clustering_predictions, n_clusters, save_data, ALPHA)
        pred_seqs['cluster_count'] = cluster_counts
        pred_seqs['cluster_area'] = cluster_areas
        return pred_seqs

    def predict_single_domseq(self, pred_seqs:pd.DataFrame, pred_seq_df:pd.DataFrame) -> pd.DataFrame:
        ''' Predicts a single future dominant sequence with a single cluster.
        pred_seqs - Emergenet recommended sequences, with additional column 'cluster_count'
                    This should have at least two rows (i.e. predictions from two clusters)
        pred_seq_df - DataFrame of candidate sequences
        pred_seq - Emergenet recommended sequence
        if len(pred_seqs) < 2:
            raise ValueError('Not enough prediction sequences!')
        if len(pred_seq_df) < 1:
            raise ValueError('The DataFrame contains no sequences!')
        # Sort by cluster area and get top two cluster predictions
        pred_seqs = pred_seqs.sort_values(by='cluster_count',ascending=False)
        cluster_sizes = pred_seqs['cluster_count'].values
        cluster_ratio = cluster_sizes[1]/cluster_sizes[0]
        seq1 = pred_seqs['sequence'][0][:self.seq_trunc_length]
        seq2 = pred_seqs['sequence'][1][:self.seq_trunc_length]
        dist = hamming(seq1, seq2)
        # If there is one dominant cluster, return the prediction from that cluster
        if seq1 == seq2 or cluster_ratio <= 1/10:
            return pred_seqs.head(1)
        ratios = []
        # Find the strain x closest to satisfying dist(seq1,x)/dist(seq2,x) = cluster2/cluster1
        for seq in pred_seq_df['sequence'].values:
            dist1 = hamming(seq1, seq[:self.seq_trunc_length])
            dist2 = hamming(seq2, seq[:self.seq_trunc_length])
            if dist2 == 0 or dist1 + dist2 > dist:
            ratios.append(abs(dist1/dist2 - cluster_ratio))
        pred_seq_df['ratio'] = ratios
        pred_seq_df = pred_seq_df.sort_values(by='ratio', ascending=True)
        pred_seq = pred_seq_df.head(1)
        return pred_seq


class DomSeq (seq_trunc_length: int, random_state: int = 42)

DomSeq architecture for predicting dominant sequences.

Initializes an DomSeq instance.


seq_trunc_length - Length to truncate sequences in Emergenet analysis (Sequences used to train Emergenet and compute E-distance must be of same length)

random_state - Sets seed for random number generator

Expand source code
class DomSeq(object):
    ''' DomSeq architecture for predicting dominant sequences.

    def __init__(self, seq_trunc_length:int, random_state:int=42):
        ''' Initializes an DomSeq instance.

        seq_trunc_length - Length to truncate sequences in Emergenet analysis
            (Sequences used to train Emergenet and compute E-distance must be of same length)

        random_state - Sets seed for random number generator
        if seq_trunc_length <= 0:
            raise ValueError('Length to truncate sequences must be positive!')
        self.seq_trunc_length = seq_trunc_length

        if random_state is not None and random_state < 0:
            raise ValueError('Seed must be between 0 and 2**32 - 1!')
        self.random_state = random_state

    def __repr__(self):
        return 'emergenet.DomSeq'

    def __str__(self):
        return self.__repr__()
    def _dm_to_df(dm:np.ndarray) -> pd.DataFrame:
        ''' Converts a distance matrix to a DataFrame.

        dm - Distance matrix as 2D array

        df - Distance matrix as DataFrame
        columns = np.arange(0, dm.shape[1])
        index = np.arange(0, dm.shape[0])
        df = pd.DataFrame(dm, columns=columns, index=index)
        return df

    def _sequence_array(self, seq_df:pd.DataFrame, sample_size:int=None) -> np.ndarray:
        ''' Extracts array of sequence arrays from DataFrame; includes target sequence.

        seq_df - DataFrame containing sequences

        sample_size - Number of strains to sample randomly

        seq_lst - Array of sequence arrays
        if 'sequence' not in seq_df.columns:
            raise ValueError('The DataFrame must store sequences in `sequence` column!')
        if sample_size is not None and sample_size < len(seq_df):
            seq_df = seq_df.sample(sample_size, random_state=self.random_state)
        seqs = seq_df['sequence'].apply(lambda x: np.array(list(x[:self.seq_trunc_length])))
        seq_lst = []
        for seq in seqs:
        seq_lst = np.array(seq_lst)
        return seq_lst

    def train(self, seq_df:pd.DataFrame, sample_size:int=None, n_jobs:int=1) -> Qnet:
        ''' Trains an Emergenet model.

        seq_df - DataFrame of sequences

        sample_size - Number of strains to train Emergenet on, sampled randomly

        n_jobs - Number of CPUs to use when training

        enet - Trained Emergenet
        if len(seq_df) < 1:
            raise ValueError('The DataFrame contains no sequences!')
        seq_arr = self._sequence_array(seq_df, sample_size)
        enet = Qnet(feature_names=['x' + str(i) for i in np.arange(self.seq_trunc_length)],
                    random_state=self.random_state, n_jobs=n_jobs)
        return enet
    def _compute_risk_for_predict_domseq(self, seq_df:pd.DataFrame, 
                                         pred_seq_df:pd.DataFrame, enet:Qnet) -> pd.DataFrame:
        ''' Computes risk scores for potential prediction sequences.

        seq_df - DataFrame of current population sequences
        pred_seq_df - DataFrame of candidate sequences

        enet - Emergenet that sequences in seq_df belong to

        pred_seq_df - Potential prediction sequences, with additional columns for risk score
        # Distance matrix for computing prediction
        seq_arr = self._sequence_array(seq_df)
        pred_seq_arr = self._sequence_array(pred_seq_df)
        dist_matrix = qdistance_matrix(pred_seq_arr, seq_arr, enet, enet) 
        # Convert dist_matrix to dataframe
        dm = self._dm_to_df(dist_matrix)
        assert len(dm) == len(pred_seq_arr)
        # Find centroid data (sum across rows)
        first_term = dm.sum(axis='columns').values
        H = len(seq_df)
        A = 0.95/(np.sqrt(8) * self.seq_trunc_length**2)
        second_term = np.array([membership_degree(seq, enet) * H * A for seq in pred_seq_arr])
        sums = first_term - second_term
        pred_seq_df['first_term'] = first_term
        pred_seq_df['second_term'] = second_term
        pred_seq_df['sum'] = sums
        pred_seq_df = pred_seq_df.sort_values(by='sum',ascending=True)
        return pred_seq_df
    def _plot_embedding(dm_embed:np.ndarray, unique_clusters:Counter, clustering_predictions:np.array, 
                        n_clusters:int, save_data:str=None, alpha:int=4) -> list:
        ''' Plots the embedding and shows Enet and WHO predictions.

        dm_embed - Embedding points
        unique_clusters - Counter (class, size) of unique clusters, sorted by size

        clustering_predictions - Array of clustering predictions

        n_clusters - Number of cluster alphashapes to draw

        save_data - Directory to save plot to
        alpha - Alpha parameter, default 4
        cluster_areas - Areas of largest n_clusters
        # Add noise to offset equal points
        noise = np.random.normal(0, 0.02, size=dm_embed.shape)
        dm_embed += noise

        # Find unique clusters
        pts = []
        for class_ in unique_clusters:
            cluster_idx = np.where(clustering_predictions == class_[0])[0]
            pt = dm_embed[cluster_idx]
            x = pt[:,0]
            y = pt[:,1]
            points = np.column_stack((x, y))

        # Plot clusters
        plt.figure(figsize=(10, 10))
        colors = cycle(['skyblue','red','orange','green', 'purple'])
        markers = cycle(['o', 'v'])
        # Plot at most 5 clusters
        cluster_areas = []
        for i in range(min(len(pts), 5)):
            pt = pts[i]
            x = pt[:,0]
            y = pt[:,1]
            points = np.column_stack((x, y))
            # Draw alphashape for the first n clusters if >= 3 points
            # Otherwise just scatter the points
            if i < n_clusters:
                if len(x) < 3:
                    plt.scatter(x, y, color = next(colors), alpha = 0.7, 
                                marker = next(markers), label = f'Size: {len(pt)}')
                    alpha_shape = alphashape.alphashape(points, alpha=alpha)
                    area = alpha_shape.area * AREA_MULTIPLIER
                    if isinstance(alpha_shape, shapely.geometry.multipolygon.MultiPolygon) or \
                       isinstance(alpha_shape, shapely.GeometryCollection):
                        for poly in alpha_shape.geoms:
                            plt.plot(*poly.exterior.xy, 'k-', linewidth=1)
                        plt.plot(*alpha_shape.exterior.xy, 'k-', linewidth=1)
                    plt.scatter(x, y, color = next(colors), alpha = 0.7, marker = next(markers),
                                label = f'Size: {len(pt)}, Area: {round(area, 3)}')
                if len(x) == 1:
                plt.scatter(x, y, color = next(colors), alpha = 0.7, 
                            marker = next(markers), label = f'Size: {len(pt)}')
        # Save figure
        plt.legend(loc='upper left', bbox_to_anchor=(1, 1));
        if save_data is not None and os.path.isdir(save_data): 
            plt.savefig(os.path.join(save_data, 'embedding.png'), dpi=300, 
                        bbox_inches = 'tight', pad_inches = 0, transparent=True)
        return cluster_areas
    def predict_domseq(self, seq_df:pd.DataFrame, pred_seq_df:pd.DataFrame, enet:Qnet, 
                       n_clusters:int=3, sample_size:int=None, save_data:str=None) -> pd.DataFrame:
        ''' Predicts the future dominant sequence with multiple clusters.

        seq_df - DataFrame of current population sequences
        pred_seq_df - DataFrame of candidate sequences

        enet - Emergenet that sequences in seq_df belong to
        n_clusters - Number of clusters to predict dominant strain on; default 3

        sample_size - Number of strains to sample randomly from seq_df
        save_data - If directory is given, save the following files:
            1. seq_df.csv (post-sampling, if sample_size is given)
            2. dm.csv (the distance matrix corresponding to seq_df)
            3. pred_seq_df_i.csv (pred_seq_df with columns for risk score, i = 1,2,...n_clusters)
            4. clusters.txt (clusters and cluster counts)
            5. embedding.png (embedding plot with alpha shapes drawn for n_clusters)

        pred_seqs - Emergenet recommended sequences, with additional two columns for cluster counts and areas
        if len(seq_df) < 1 or len(pred_seq_df) < 1:
            raise ValueError('The DataFrame contains no sequences!')
        if sample_size is not None and sample_size < len(seq_df):
            seq_df = seq_df.sample(sample_size, random_state=self.random_state)
        if n_clusters <= 1:
            raise ValueError('For single cluster, please use predict_single_domseq!')
        if len(pred_seq_df) > 20000:
            pred_seq_df = pred_seq_df.sample(20000, random_state=self.random_state)
        # Parameter for alphashape
        ALPHA = 4
        # Distance matrix for clustering strains from current season
        seq_arr = self._sequence_array(seq_df)
        dist_matrix = qdistance_matrix(seq_arr, seq_arr, enet, enet)
        dm = self._dm_to_df(dist_matrix)
        if save_data is not None and os.path.isdir(save_data):
            seq_df.to_csv(os.path.join(save_data, 'seq_df.csv'), index=False)
            dm.to_csv(os.path.join(save_data, 'dm.csv'), index=False)
        # Clustering
        embedding = MDS(n_components=2, dissimilarity='precomputed', random_state=self.random_state)
        dm_embed = embedding.fit_transform(dm)
        bandwidth = estimate_bandwidth(dm_embed, quantile=0.3, random_state=self.random_state)
        bandwidth = max(bandwidth, 1e-5)
        clustering = MeanShift(bandwidth=bandwidth)
        clustering_predictions = clustering.fit_predict(dm_embed)
        # Normalize points
        mean = np.mean(dm_embed, axis=0)
        std = np.std(dm_embed, axis=0)
        dm_embed_normalized = (dm_embed - mean) / std
        if sum(std) == 0:
            dm_embed_normalized = dm_embed
        # Sort unique clusters by size
        label_counts = Counter(clustering_predictions)
        unique_clusters = sorted(label_counts.items(), key=lambda x: x[1], reverse=True)
        if save_data is not None and os.path.isdir(save_data):
            with open(os.path.join(save_data, 'clusters.txt'), 'w') as file:
                for item in unique_clusters:
                    file.write(str(item) + '\n')
        # Predictions
        pred_seqs = pd.DataFrame(columns=pred_seq_df.columns)
        cluster_counts = []
        for class_ in unique_clusters[:n_clusters]:
            # Take riskiest strain using this cluster
            wanted_names = dm.columns[clustering_predictions == class_[0]]
            cluster_seq_df = seq_df.iloc[wanted_names]
            pred_seq_df_with_risk = self._compute_risk_for_predict_domseq(cluster_seq_df, pred_seq_df, enet)
            if save_data is not None and os.path.isdir(save_data):
                pred_seq_df_with_risk.to_csv(os.path.join(save_data, 'pred_seq_df_'+str(class_)+'.csv'), index=False)
            # Save predictions
            pred_seqs = pred_seqs.append(pred_seq_df_with_risk.iloc[0])
        # Cluster areas
        cluster_areas = self._plot_embedding(dm_embed_normalized, unique_clusters, 
                                             clustering_predictions, n_clusters, save_data, ALPHA)
        pred_seqs['cluster_count'] = cluster_counts
        pred_seqs['cluster_area'] = cluster_areas
        return pred_seqs

    def predict_single_domseq(self, pred_seqs:pd.DataFrame, pred_seq_df:pd.DataFrame) -> pd.DataFrame:
        ''' Predicts a single future dominant sequence with a single cluster.
        pred_seqs - Emergenet recommended sequences, with additional column 'cluster_count'
                    This should have at least two rows (i.e. predictions from two clusters)
        pred_seq_df - DataFrame of candidate sequences
        pred_seq - Emergenet recommended sequence
        if len(pred_seqs) < 2:
            raise ValueError('Not enough prediction sequences!')
        if len(pred_seq_df) < 1:
            raise ValueError('The DataFrame contains no sequences!')
        # Sort by cluster area and get top two cluster predictions
        pred_seqs = pred_seqs.sort_values(by='cluster_count',ascending=False)
        cluster_sizes = pred_seqs['cluster_count'].values
        cluster_ratio = cluster_sizes[1]/cluster_sizes[0]
        seq1 = pred_seqs['sequence'][0][:self.seq_trunc_length]
        seq2 = pred_seqs['sequence'][1][:self.seq_trunc_length]
        dist = hamming(seq1, seq2)
        # If there is one dominant cluster, return the prediction from that cluster
        if seq1 == seq2 or cluster_ratio <= 1/10:
            return pred_seqs.head(1)
        ratios = []
        # Find the strain x closest to satisfying dist(seq1,x)/dist(seq2,x) = cluster2/cluster1
        for seq in pred_seq_df['sequence'].values:
            dist1 = hamming(seq1, seq[:self.seq_trunc_length])
            dist2 = hamming(seq2, seq[:self.seq_trunc_length])
            if dist2 == 0 or dist1 + dist2 > dist:
            ratios.append(abs(dist1/dist2 - cluster_ratio))
        pred_seq_df['ratio'] = ratios
        pred_seq_df = pred_seq_df.sort_values(by='ratio', ascending=True)
        pred_seq = pred_seq_df.head(1)
        return pred_seq


def predict_domseq(self, seq_df: pandas.core.frame.DataFrame, pred_seq_df: pandas.core.frame.DataFrame, enet: quasinet.qnet.Qnet, n_clusters: int = 3, sample_size: int = None, save_data: str = None) ‑> pandas.core.frame.DataFrame

Predicts the future dominant sequence with multiple clusters.


seq_df - DataFrame of current population sequences

pred_seq_df - DataFrame of candidate sequences

enet - Emergenet that sequences in seq_df belong to

n_clusters - Number of clusters to predict dominant strain on; default 3

sample_size - Number of strains to sample randomly from seq_df

save_data - If directory is given, save the following files: 1. seq_df.csv (post-sampling, if sample_size is given) 2. dm.csv (the distance matrix corresponding to seq_df) 3. pred_seq_df_i.csv (pred_seq_df with columns for risk score, i = 1,2,…n_clusters) 4. clusters.txt (clusters and cluster counts) 5. embedding.png (embedding plot with alpha shapes drawn for n_clusters)


pred_seqs - Emergenet recommended sequences, with additional two columns for cluster counts and areas
Expand source code
def predict_domseq(self, seq_df:pd.DataFrame, pred_seq_df:pd.DataFrame, enet:Qnet, 
                   n_clusters:int=3, sample_size:int=None, save_data:str=None) -> pd.DataFrame:
    ''' Predicts the future dominant sequence with multiple clusters.

    seq_df - DataFrame of current population sequences
    pred_seq_df - DataFrame of candidate sequences

    enet - Emergenet that sequences in seq_df belong to
    n_clusters - Number of clusters to predict dominant strain on; default 3

    sample_size - Number of strains to sample randomly from seq_df
    save_data - If directory is given, save the following files:
        1. seq_df.csv (post-sampling, if sample_size is given)
        2. dm.csv (the distance matrix corresponding to seq_df)
        3. pred_seq_df_i.csv (pred_seq_df with columns for risk score, i = 1,2,...n_clusters)
        4. clusters.txt (clusters and cluster counts)
        5. embedding.png (embedding plot with alpha shapes drawn for n_clusters)

    pred_seqs - Emergenet recommended sequences, with additional two columns for cluster counts and areas
    if len(seq_df) < 1 or len(pred_seq_df) < 1:
        raise ValueError('The DataFrame contains no sequences!')
    if sample_size is not None and sample_size < len(seq_df):
        seq_df = seq_df.sample(sample_size, random_state=self.random_state)
    if n_clusters <= 1:
        raise ValueError('For single cluster, please use predict_single_domseq!')
    if len(pred_seq_df) > 20000:
        pred_seq_df = pred_seq_df.sample(20000, random_state=self.random_state)
    # Parameter for alphashape
    ALPHA = 4
    # Distance matrix for clustering strains from current season
    seq_arr = self._sequence_array(seq_df)
    dist_matrix = qdistance_matrix(seq_arr, seq_arr, enet, enet)
    dm = self._dm_to_df(dist_matrix)
    if save_data is not None and os.path.isdir(save_data):
        seq_df.to_csv(os.path.join(save_data, 'seq_df.csv'), index=False)
        dm.to_csv(os.path.join(save_data, 'dm.csv'), index=False)
    # Clustering
    embedding = MDS(n_components=2, dissimilarity='precomputed', random_state=self.random_state)
    dm_embed = embedding.fit_transform(dm)
    bandwidth = estimate_bandwidth(dm_embed, quantile=0.3, random_state=self.random_state)
    bandwidth = max(bandwidth, 1e-5)
    clustering = MeanShift(bandwidth=bandwidth)
    clustering_predictions = clustering.fit_predict(dm_embed)
    # Normalize points
    mean = np.mean(dm_embed, axis=0)
    std = np.std(dm_embed, axis=0)
    dm_embed_normalized = (dm_embed - mean) / std
    if sum(std) == 0:
        dm_embed_normalized = dm_embed
    # Sort unique clusters by size
    label_counts = Counter(clustering_predictions)
    unique_clusters = sorted(label_counts.items(), key=lambda x: x[1], reverse=True)
    if save_data is not None and os.path.isdir(save_data):
        with open(os.path.join(save_data, 'clusters.txt'), 'w') as file:
            for item in unique_clusters:
                file.write(str(item) + '\n')
    # Predictions
    pred_seqs = pd.DataFrame(columns=pred_seq_df.columns)
    cluster_counts = []
    for class_ in unique_clusters[:n_clusters]:
        # Take riskiest strain using this cluster
        wanted_names = dm.columns[clustering_predictions == class_[0]]
        cluster_seq_df = seq_df.iloc[wanted_names]
        pred_seq_df_with_risk = self._compute_risk_for_predict_domseq(cluster_seq_df, pred_seq_df, enet)
        if save_data is not None and os.path.isdir(save_data):
            pred_seq_df_with_risk.to_csv(os.path.join(save_data, 'pred_seq_df_'+str(class_)+'.csv'), index=False)
        # Save predictions
        pred_seqs = pred_seqs.append(pred_seq_df_with_risk.iloc[0])
    # Cluster areas
    cluster_areas = self._plot_embedding(dm_embed_normalized, unique_clusters, 
                                         clustering_predictions, n_clusters, save_data, ALPHA)
    pred_seqs['cluster_count'] = cluster_counts
    pred_seqs['cluster_area'] = cluster_areas
    return pred_seqs
def predict_single_domseq(self, pred_seqs: pandas.core.frame.DataFrame, pred_seq_df: pandas.core.frame.DataFrame) ‑> pandas.core.frame.DataFrame

Predicts a single future dominant sequence with a single cluster.


pred_seqs - Emergenet recommended sequences, with additional column 'cluster_count' This should have at least two rows (i.e. predictions from two clusters)

pred_seq_df - DataFrame of candidate sequences


pred_seq - Emergenet recommended sequence
Expand source code
def predict_single_domseq(self, pred_seqs:pd.DataFrame, pred_seq_df:pd.DataFrame) -> pd.DataFrame:
    ''' Predicts a single future dominant sequence with a single cluster.
    pred_seqs - Emergenet recommended sequences, with additional column 'cluster_count'
                This should have at least two rows (i.e. predictions from two clusters)
    pred_seq_df - DataFrame of candidate sequences
    pred_seq - Emergenet recommended sequence
    if len(pred_seqs) < 2:
        raise ValueError('Not enough prediction sequences!')
    if len(pred_seq_df) < 1:
        raise ValueError('The DataFrame contains no sequences!')
    # Sort by cluster area and get top two cluster predictions
    pred_seqs = pred_seqs.sort_values(by='cluster_count',ascending=False)
    cluster_sizes = pred_seqs['cluster_count'].values
    cluster_ratio = cluster_sizes[1]/cluster_sizes[0]
    seq1 = pred_seqs['sequence'][0][:self.seq_trunc_length]
    seq2 = pred_seqs['sequence'][1][:self.seq_trunc_length]
    dist = hamming(seq1, seq2)
    # If there is one dominant cluster, return the prediction from that cluster
    if seq1 == seq2 or cluster_ratio <= 1/10:
        return pred_seqs.head(1)
    ratios = []
    # Find the strain x closest to satisfying dist(seq1,x)/dist(seq2,x) = cluster2/cluster1
    for seq in pred_seq_df['sequence'].values:
        dist1 = hamming(seq1, seq[:self.seq_trunc_length])
        dist2 = hamming(seq2, seq[:self.seq_trunc_length])
        if dist2 == 0 or dist1 + dist2 > dist:
        ratios.append(abs(dist1/dist2 - cluster_ratio))
    pred_seq_df['ratio'] = ratios
    pred_seq_df = pred_seq_df.sort_values(by='ratio', ascending=True)
    pred_seq = pred_seq_df.head(1)
    return pred_seq
def train(self, seq_df: pandas.core.frame.DataFrame, sample_size: int = None, n_jobs: int = 1) ‑> quasinet.qnet.Qnet

Trains an Emergenet model.


seq_df - DataFrame of sequences

sample_size - Number of strains to train Emergenet on, sampled randomly

n_jobs - Number of CPUs to use when training


enet - Trained Emergenet
Expand source code
def train(self, seq_df:pd.DataFrame, sample_size:int=None, n_jobs:int=1) -> Qnet:
    ''' Trains an Emergenet model.

    seq_df - DataFrame of sequences

    sample_size - Number of strains to train Emergenet on, sampled randomly

    n_jobs - Number of CPUs to use when training

    enet - Trained Emergenet
    if len(seq_df) < 1:
        raise ValueError('The DataFrame contains no sequences!')
    seq_arr = self._sequence_array(seq_df, sample_size)
    enet = Qnet(feature_names=['x' + str(i) for i in np.arange(self.seq_trunc_length)],
                random_state=self.random_state, n_jobs=n_jobs)
    return enet