Module patternly.detection
Expand source code
import dill
import numpy as np
import pandas as pd
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from zedsuite.zutil import Llk, Lsmash, Prun, DrawPFSA
from zedsuite.genesess import GenESeSS
from zedsuite.quantizer import Quantizer
from patternly._utils import RANDOM_NAME, DirectedGraph
class AnomalyDetection:
""" Tool for anomaly detection """
def __init__(
self,
*,
anomaly_sensitivity=1,
n_clusters=1,
reduce_clusters=True,
clustering_alg=None,
quantize=True,
quantize_type='complex',
eps=0.1,
verbose=False
) -> None:
"""
Args:
anomaly_sensitivity (float, optional): how many standard deviations above the mean llk to consider
an anomaly (Default = 1)
n_clusters (int, optional): number of clusters to use with KMeans (Default = 1)
reduce_clusters (bool, optional): whether to attempt to reduce the number of clusters
cluster_alg (sklearn.cluster, optional): clustering algorithm to use, if None then KMeans (Default = None)
quantize (bool, optional): whether to quantize the data (Default = True)
quantize_type (str, optional): type of quantization to use ("complex" or "simple") (Default = "complex")
eps (float, optional): epsilon parameter for finding PFSAs (Default = 0.1)
verbose (bool, optional): whether to print verbose output (Default = False)
Attributes:
cluster_llks (np.ndarray): array of shape (n_clusters, n_predictions) containing the llk or each
prediction for each cluster after calling fit()
closest_match (np.ndarray): array of shape (n_predictions) containing the closest match after
calling fit()
cluster_PFSA_pngs (list[str]): list of file paths to PFSA png files
"""
self.anomaly_sensitivity = anomaly_sensitivity
self.n_clusters = n_clusters
self.reduce_clusters = reduce_clusters
self.clustering_alg = clustering_alg
self.quantize = quantize
self.quantize_type = quantize_type
self.eps = eps
self.verbose = verbose
self.temp_dir = 'zed_temp'
self.PFSA_prop_titles = ['%ANN_ERR', '%MRG_EPS', '%SYN_STR', '%SYM_FRQ', '%PITILDE', '%CONNX']
# values calculated in self.fit()
self.fitted = False # whether model has been fit
self.quantizer = None # quantizer used for quantizing data
self.quantized_data = None # quantized data
self.dist_matrix = pd.DataFrame() # calculated by lsmash, used for clustering
self.cluster_labels = [] # list of cluster labels
self.cluster_counts = [] # number of instances in each cluster
self.cluster_PFSA_files = [] # list of file paths to cluster PFSAs
self.cluster_PFSA_info = [] # list of dicts of cluster PFSAs info for printing
self.cluster_PFSA_pngs = [] # list of file paths to PFSA pngs
self.PFSA_llk_means = [] # list of mean llk for each cluster
self.PFSA_llk_stds = [] # list of std of llk for each cluster
# can be accessed after calling self.predict()
self.cluster_llks = None # list of llk of each prediction for each cluster
self.closest_match = None # cluster label of closest match for each prediction
self.predicted_active_pfsas = {} # dictionary of each pfsa active at each unit of time in the sequence
def fit(self, X, y=None):
""" Fit an anomaly detection model
Args:
X (pd.DataFrame or pd.Series): time series data to be fit
y (pd.Series, optional): labels for X only provided for sklearn standard (Default = None)
Returns:
AnomalyDetection: fitted model
"""
self.quantized_data = self.__quantize(X)
self.__calculate_dist_matrix()
self.__calculate_cluster_labels()
self.__calculate_cluster_PFSAs()
self.__reduce_clusters()
self.__calculate_PFSA_stats()
self.fitted = True
if self.verbose:
print("Model fit.")
return self
def predict(self, X=None, *, clean=True):
""" Predict whether a time series sequence is anomalous
Args:
X (pd.DataFrame or pd.Series, optional): time series data to find anomalies, if None then
predicts on original data (Default = None)
clean (bool, optional): whether to remove temp files (Default = True)
Returns:
bool or list[bool]: True if time series is an anomaly, False otherwise output shape depends on input
"""
if not self.fitted:
raise ValueError("Model has not been fit yet.")
data = None
num_predictions = 0
# commonly want to find anomalies in original data
if X is None:
# occurs when model is loaded from file
if self.quantized_data is None:
raise ValueError("Original data not found. Pass data to predict().")
data = self.quantized_data.drop(columns=['cluster'], axis=1, errors='ignore')
# data.to_csv('error.csv', sep=' ', index=False, header=False)
num_predictions = self.quantized_data.shape[0]
else:
data = self.__quantize(X)
num_predictions = 1 if type(X) is pd.Series else data.shape[0]
cluster_llks = np.zeros(shape=(self.n_clusters, num_predictions), dtype=np.float32)
def count_clusters(row):
clusters = np.bincount(row)
return clusters.size
data['alphabet_size'] = data.apply(count_clusters, axis=1)
for i in range(self.n_clusters):
curr_alphabet_size = len(self.cluster_PFSA_info[i]['%SYM_FRQ'])
curr_data = data[data['alphabet_size'] <= curr_alphabet_size].drop(columns=['alphabet_size'], axis=1)
llks = Llk(data=curr_data, pfsafile=self.cluster_PFSA_files[i]).run()
llks = pd.concat(
[data[0], pd.DataFrame(llks, index=curr_data.index, columns=['llks'])], axis=1
).fillna(float('inf'))['llks'].values.tolist()
cluster_llks[i] = np.asarray(llks, dtype=np.float32)
# consider to be anomaly if all llks above specified upper bound (X standard deviations above the mean)
upper_bounds = self.PFSA_llk_means + (self.PFSA_llk_stds * self.anomaly_sensitivity)
predictions = np.all(cluster_llks.T > upper_bounds, axis=1)
self.cluster_llks = cluster_llks
self.closest_match = np.argmin(cluster_llks, axis=0)
self.predicted_active_pfsas = {}
for i, pfsa in enumerate(self.closest_match):
if pfsa in self.predicted_active_pfsas:
self.predicted_active_pfsas[pfsa].append(i)
else:
self.predicted_active_pfsas[pfsa] = [i]
if len(predictions) == 1:
return predictions[0]
return predictions
def save_model(self, path='patternly_model.dill'):
""" Save model to file
Args:
path (str): file path to save model to
"""
if not self.fitted:
raise ValueError("Model has not been fit yet.")
metadata = {
'modeltype': type(self),
'user_params': {
'anomaly_sensitivity': self.anomaly_sensitivity,
'n_clusters': self.n_clusters,
'reduce_clusters': self.reduce_clusters,
'clustering_alg': self.clustering_alg,
'quantize': self.quantize,
'quantize_type': self.quantize_type,
'eps': self.eps,
'verbose': self.verbose,
},
'fitted_params': {
'quantizer_parameters': None if self.quantizer is None else self.quantizer.parameters ,
'quantizer_feature_order': None if self.quantizer is None else self.quantizer._feature_order,
'cluster_labels': self.cluster_labels,
'cluster_counts': self.cluster_counts,
'cluster_PFSA_info': self.cluster_PFSA_info,
'PFSA_llk_means': self.PFSA_llk_means.tolist(),
'PFSA_llk_stds': self.PFSA_llk_stds.tolist()
}
}
with open(path, 'wb') as f:
dill.dump(metadata, f)
@staticmethod
def load_model(path='patternly_model.dill'):
""" Load saved model
Args:
path (str): path to saved model
Returns:
AnomalyDetection or StreamingDetection: loaded model
"""
with open(path, 'rb') as f:
metadata = dill.load(f)
model = None
if metadata['modeltype'] is AnomalyDetection:
model = AnomalyDetection(**metadata['user_params'])
else:
model = StreamingDetection(**metadata['user_params'])
if model.quantize and model.quantize_type == 'complex':
model.quantizer = Quantizer(n_quantizations=1, eps=-1)
model.quantizer.parameters = metadata['fitted_params']['quantizer_parameters']
model.quantizer._feature_order = metadata['fitted_params']['quantizer_feature_order']
model.cluster_labels = metadata['fitted_params']['cluster_labels']
model.cluster_counts = metadata['fitted_params']['cluster_counts']
model.cluster_PFSA_info = metadata['fitted_params']['cluster_PFSA_info']
model.PFSA_llk_means = np.asarray(metadata['fitted_params']['PFSA_llk_means'])
model.PFSA_llk_stds = np.asarray(metadata['fitted_params']['PFSA_llk_stds'])
# write PFSA files
model.cluster_PFSA_files = []
for i in range(model.n_clusters):
model.cluster_PFSA_files.append(RANDOM_NAME(path=model.temp_dir))
with open(model.cluster_PFSA_files[i], 'w') as f:
f.write(f'{model.__format_PFSA_info(model.cluster_PFSA_info[i])}')
# model.generate_PFSA_pngs()
model.fitted = True
return model
def print_PFSAs(self):
""" Print PFSAs found for each cluster """
if not self.fitted:
raise ValueError("Model has not been fit yet.")
for i in range(self.n_clusters):
print(f"Cluster {i} PFSA:")
print(self.__format_PFSA_info(self.cluster_PFSA_info[i], indent_level=4))
def generate_PFSA_pngs(self):
""" Generates png files for PFSAs
Returns:
list[str]: list of file paths to png files
"""
self.cluster_PFSA_pngs = []
for i in range(self.n_clusters):
self.cluster_PFSA_pngs.append(RANDOM_NAME(path=self.temp_dir))
DrawPFSA(pfsafile=self.cluster_PFSA_files[i], graphpref=self.cluster_PFSA_pngs[i]).run()
return self.cluster_PFSA_pngs
def __quantize(self, X):
""" Quantize the data into finite alphabet
Args:
X (pd.DataFrame): time series data to be quantized
"""
if not self.quantize or self.quantize_type is None:
return X.copy(deep=False).astype(np.int8)
if self.verbose:
print("Quantizing...")
# Quantizer() expects a DataFrame
if type(X) is pd.Series:
X = X.copy().to_frame().reset_index(drop=True).T
if self.quantize_type == 'simple':
# basic differentiation
X = X.astype(float).diff(axis=1).fillna(0)
return X.apply(lambda row : row.apply(lambda n : 1 if n > 0 else 0), axis=1)
elif self.quantize_type == 'simple-second':
# second derivative
X = X.astype(float).diff(axis=1).fillna(0).diff(axis=1).fillna(0)
return X.apply(lambda row : row.apply(lambda n : 1 if n > 0 else 0), axis=1)
elif self.quantize_type == 'complex':
# use cythonized quantizer binary
if not self.fitted:
self.quantizer = Quantizer(n_quantizations=1, epsilon=-1)
self.quantizer.fit(X)
return pd.concat(
[quantized for quantized in self.quantizer.transform(X)],
axis=1,
copy=False
).astype(np.int8)
else:
raise ValueError(f"Unknown quantize type: {self.quantize_type}. Choose \"simple\" or \"complex\".")
def __calculate_dist_matrix(self):
""" Calculate distance matrix using lsmash """
if self.verbose:
print("Calculating distance matrix...")
if self.n_clusters == 1:
self.dist_matrix = self.quantized_data
else:
self.dist_matrix = pd.DataFrame(
Lsmash(data=self.quantized_data, data_type='symbolic', sae=False).run(),
dtype=np.float32
)
def __calculate_cluster_labels(self):
""" Cluster distance matrix """
if self.verbose:
print("Clustering distance matrix...")
if self.n_clusters == 1:
self.cluster_labels = [0 for i in range(self.dist_matrix.shape[0])]
cluster_labels = []
if self.clustering_alg is None:
cluster_labels = KMeans(n_clusters=self.n_clusters).fit(self.dist_matrix).labels_
else:
cluster_labels = self.clustering_alg.fit(self.dist_matrix).labels_
n_clusters = len(set(cluster_labels)) - (1 if -1 in cluster_labels else 0)
# reassign clusters such that cluster 0 is the most common label, 1 is the second most common, etc.
cluster_counts = np.zeros(n_clusters, dtype=np.int32)
for cluster in cluster_labels:
cluster_counts[cluster] += 1
cluster_rank = np.full(n_clusters, n_clusters - 1, dtype=np.int32) - np.argsort(np.argsort(cluster_counts))
cluster_labels = [cluster_rank[cluster] for cluster in cluster_labels]
cluster_counts = [cluster_counts[cluster] for cluster in np.argsort(cluster_counts)][::-1]
# def adjust_clusters(row):
# bins = np.bincount(row)
# offset = [0 for _ in range(len(bins))]
# preceeding_zeros = 0
# for i, bin in enumerate(bins):
# if bin == 0:
# preceeding_zeros += 1
# offset[i] = preceeding_zeros
# return row.apply(lambda val: val - offset[val])
# self.quantized_data = self.quantized_data.apply(adjust_clusters, axis=1)
self.n_clusters = n_clusters
self.cluster_counts = cluster_counts
self.cluster_labels = cluster_labels
self.quantized_data['cluster'] = cluster_labels
# self.quantized_data['alphabet_size'] = self.quantized_data.apply(count_clusters, axis=1)
# self.quantized_data[self.quantized_data['cluster'] == 0].to_csv('firstcluster.csv', sep=' ', header=False, index=False)
def __calculate_cluster_PFSAs(self):
""" Infer PFSAs from clusters using genESeSS """
cluster_PFSA_files = []
PFSA_pngs = []
for i in range(self.n_clusters):
if self.verbose:
print(f"Generating cluster PFSA {i + 1}/{self.n_clusters}...")
cluster_data = self.quantized_data[self.quantized_data['cluster'] == i].drop(
columns=['cluster'], axis=1
)
cluster_PFSA_file = RANDOM_NAME(path=self.temp_dir)
dot_file = RANDOM_NAME(path=self.temp_dir)
cluster_PFSA_files.append(cluster_PFSA_file)
PFSA_pngs.append(dot_file)
alg = GenESeSS(
data=cluster_data,
outfile=cluster_PFSA_file,
data_type='symbolic',
data_dir='row',
force=True,
eps=self.eps,
dot=dot_file,
configfile='./patternly/config.cfg'
)
# TODO: add error handling for when genESeSS fails to find a PFSA
PFSA_found = alg.run()
if PFSA_found:
prop_vals = [
alg.inference_error,
alg.epsilon_used,
alg.synchronizing_string_found,
alg.symbol_frequency,
alg.probability_morph_matrix,
alg.connectivity_matrix,
]
self.cluster_PFSA_info.append(dict(zip(self.PFSA_prop_titles, prop_vals)))
else:
raise ValueError(f"Unable to find PFSA for cluster {i + 1}/{self.n_clusters}. Try quantizing the data manually.")
self.cluster_PFSA_files = cluster_PFSA_files
self.cluster_PFSA_pngs = PFSA_pngs
def __reduce_clusters(self):
""" Attempt to reduce the number of clusters by combining clusters that
generate similar PFSAs
"""
if self.n_clusters == 1 or not self.reduce_clusters:
return
if self.verbose:
print("Attempting to reduce clusters...")
all_cluster_likelihoods = np.empty(shape=(self.n_clusters, self.n_clusters), dtype=np.float32)
all_ranked_likelihoods = np.empty(shape=(self.n_clusters, self.n_clusters), dtype=np.int32)
for i in range(self.n_clusters):
cluster_llks = []
for j in range(self.n_clusters):
curr_alphabet_size = len(self.cluster_PFSA_info[j]['%SYM_FRQ'])
cluster_data = self.quantized_data[self.quantized_data['cluster'] == i]
curr_data = cluster_data
# curr_data = cluster_data[cluster_data['alphabet_size'] <= curr_alphabet_size].drop(
# columns=['cluster', 'alphabet_size'], axis=1
# )
llks = [] if curr_data.shape[0] == 0 else Llk(data=curr_data, pfsafile=self.cluster_PFSA_files[j]).run()
# llks = pd.concat(
# [cluster_data[0], pd.DataFrame(llks, index=curr_data.index, columns=['llks'])], axis=1
# ).fillna(float('inf'))['llks'].values.tolist()
cluster_llks.append(np.asarray(llks, dtype=np.float32))
# which cluster PFSA each sequence most likely maps back to
closest_matches = np.argmin(cluster_llks, axis=0)
# the likelihoods of the sequences generated by the current PFSA mapping back to each cluster PFSA
cluster_likelihoods = np.count_nonzero(
(closest_matches.reshape(-1, 1) == np.arange(self.n_clusters).reshape(1, -1)),
axis=0
)
cluster_likelihoods = cluster_likelihoods / cluster_likelihoods.sum()
# list of cluster PFSAs sorted in descending order of likelihood
ranked_likelihoods = np.argsort(cluster_likelihoods)[::-1]
all_cluster_likelihoods[i] = cluster_likelihoods
all_ranked_likelihoods[i] = ranked_likelihoods
new_n_clusters = self.n_clusters
for i in range(self.n_clusters):
best_match = all_ranked_likelihoods[i][0]
if best_match != i:
all_cluster_likelihoods[best_match][i] += 1
graph = DirectedGraph(self.n_clusters)
graph.from_matrix(all_cluster_likelihoods, threshold=0.2)
new_n_clusters = graph.find_scc()
if new_n_clusters != self.n_clusters:
if self.verbose:
print(f"Reduced clusters from {self.n_clusters} to {new_n_clusters}.")
self.n_clusters = new_n_clusters
# reduced_clusters = set(graph.roots)
# self.cluster_labels = [graph.roots[cluster] for cluster in self.cluster_labels]
# self.quantized_data['cluster'] = self.cluster_labels
# self.cluster_PFSA_files = [self.cluster_PFSA_files[i] for i in reduced_clusters]
# self.cluster_PFSA_info = [self.cluster_PFSA_info[i] for i in reduced_clusters]
# self.cluster_PFSA_pngs = [self.cluster_PFSA_pngs[i] for i in reduced_clusters]
# new_cluster_counts = [0 for _ in range(graph.size)]
# for i, count in enumerate(self.cluster_counts):
# new_cluster_counts[graph.roots[i]] += count
# self.cluster_counts = [count for count in new_cluster_counts if count != 0]
self.__calculate_cluster_labels()
self.__calculate_cluster_PFSAs()
def __calculate_PFSA_stats(self):
""" Calculate the means and standard deviations of llks for each PFSA
to later determine if a sequence is an anomaly
"""
if self.verbose:
print("Calculating cluster PFSA means and stds...")
PFSA_llk_means = np.empty(shape=self.n_clusters)
PFSA_llk_stds = np.empty(shape=self.n_clusters)
for i in range(self.n_clusters):
cluster_data = self.quantized_data[self.quantized_data['cluster'] == i].drop(
columns=['cluster'], axis=1
)
llks = np.asarray(Llk(data=cluster_data, pfsafile=self.cluster_PFSA_files[i]).run(), dtype=np.float32)
PFSA_llk_means[i] = np.mean(llks)
PFSA_llk_stds[i] = np.std(llks, ddof=1)
# for i in range(self.n_clusters):
# curr_alphabet_size = len(self.cluster_PFSA_info[i]['%SYM_FRQ'])
# curr_data = data[data['alphabet_size'] <= curr_alphabet_size].drop(columns=['cluster', 'alphabet_size'], axis=1)
# llks = Llk(data=curr_data, pfsafile=self.cluster_PFSA_files[i]).run()
# llks = pd.concat(
# [data[0], pd.DataFrame(llks, index=curr_data.index, columns=['llks'])], axis=1
# ).fillna(float('inf'))['llks'].values.tolist()
# cluster_llks[i] = np.asarray(llks, dtype=np.float32)
self.PFSA_llk_means, self.PFSA_llk_stds = PFSA_llk_means, PFSA_llk_stds
def __format_PFSA_info(self, PFSA_info, indent_level=0):
""" Format the PFSA information for output
Args:
PFSA_info (dict): PFSA information to format
indent_level (int): indentation level for formatting
Returns:
str: formatted PFSA information
"""
indent = indent_level * ' '
syn_str_vals = ''
if PFSA_info['%SYN_STR'] is not None:
for syn_str in PFSA_info['%SYN_STR']:
syn_str_vals += (f'{syn_str} ')
sym_frq_vals = ''
for sym_frq in PFSA_info['%SYM_FRQ']:
sym_frq_vals += (f'{sym_frq} ')
pitilde_vals = indent
for row in PFSA_info['%PITILDE']:
for i, val in enumerate(row):
suffix = ' ' if (i+1) < len(row) else f' \n{indent}'
pitilde_vals += (f'{val}{suffix}')
pitilde_vals = pitilde_vals[:-len(indent)] if indent_level > 0 else pitilde_vals
connx_vals = indent
for row in PFSA_info['%CONNX']:
for i, val in enumerate(row):
suffix = ' ' if (i+1) < len(row) else f' \n{indent}'
connx_vals += (f'{val}{suffix}')
connx_vals = connx_vals[:-len(indent)] if indent_level > 0 else connx_vals
return (
f"{indent}%ANN_ERR: {PFSA_info['%ANN_ERR']}\n"
+ f"{indent}%MRG_EPS: {PFSA_info['%MRG_EPS']}\n"
+ f"{indent}%SYN_STR: {syn_str_vals}\n"
+ f"{indent}%SYM_FRQ: {sym_frq_vals}\n"
+ f"{indent}%PITILDE: size({len(PFSA_info['%PITILDE'])})\n"
+ f"{indent}#PITILDE\n{pitilde_vals}"
+ f"{indent}%CONNX: size({len(PFSA_info['%CONNX'])})\n"
+ f"{indent}#CONNX\n{connx_vals}\n"
)
class StreamingDetection(AnomalyDetection):
""" Tool for anomaly detection within a single data stream """
def __init__(self, *, window_size=1000, window_overlap=0, **kwargs):
"""
Args:
window_size (int): size of sliding window
window_overlap (int): overlap of sliding windows
"""
super().__init__(**kwargs)
self.window_size = int(window_size)
self.window_overlap = int(window_overlap)
def fit(self, X, y=None):
if (self.verbose):
print("Splitting data into individual streams...")
X_split_streams = self.split_streams(X, self.window_size, self.window_overlap)
return super().fit(X_split_streams)
def predict(self, X=None):
if X is None:
return super().predict()
else:
if (self.verbose):
print("Splitting data into individual streams...")
X_split_streams = self.split_streams(X, self.window_size, self.window_overlap)
return super().predict(X_split_streams)
def save_model(self, path='patternly_model.dill'):
super().save_model(path=path)
with open(path, 'rb') as f:
metadata = dill.load(f)
metadata['user_params']['window_size'] = self.window_size
metadata['user_params']['window_overlap'] = self.window_overlap
with open(path, 'wb') as f:
dill.dump(metadata, f)
@staticmethod
def split_streams(X, window_size, window_overlap):
""" Split stream data into individual streams using windows with specified size
and overlap
Args:
X (pd.Series): data to be split into streams
"""
if type(X) is not pd.DataFrame:
X = pd.DataFrame(X)
def beg(i): return int((window_size * i) - (window_overlap * i))
def end(i): return int(beg(i) + window_size)
size = int(X.shape[0] // (window_size - window_overlap))
return pd.concat(
[X[beg(i):end(i)].reset_index(drop=True) for i in range(size)],
axis=1
).T.reset_index(drop=True).dropna(how='any', axis=0)
class ContinuousStreamingDetection(StreamingDetection):
""" Tool for anomaly detection within a single continual data stream """
def __init__(self, **kwargs):
"""
Args: see StreamingDetection
"""
self.pattern_emergence_times = []
super().__init__(**kwargs)
def fit_stream(self, X, *, clean=True):
""" Fit new stream of data
Args:
X (pd.DataFrame or pd.Series): time series data stream to find anomalies
clean (bool, optional): whether to remove temp files (Default = True)
Returns:
ContinuousStreamingDetection: fitted model
"""
if X is None:
raise ValueError("Please pass data stream to fit.")
self.pattern_emergence_times = []
X_split_streams = self.split_streams(X, self.window_size, self.window_overlap)
data = self._AnomalyDetection__quantize(X_split_streams)
num_predictions = data.shape[0]
def count_clusters(row):
clusters = np.bincount(row)
return clusters.size
data['alphabet_size'] = data.apply(count_clusters, axis=1)
# if not fitted with previous data, create a PFSA from first stream
if not self.fitted:
self.n_clusters = 0
self.fitted = True
self.__add_to_PFSA_library(data.iloc[0])
self.pattern_emergence_times.append(0)
cluster_llks = np.zeros(shape=(self.n_clusters, num_predictions), dtype=np.float32)
for i, row in data.iterrows():
upper_bounds = self.PFSA_llk_means + (self.PFSA_llk_stds * self.anomaly_sensitivity)
for j in range(self.n_clusters):
curr_alphabet_size = len(self.cluster_PFSA_info[j]['%SYM_FRQ'])
if row['alphabet_size'] > curr_alphabet_size:
cluster_llks[j][i] = float('inf')
continue
cluster_llks[j][i] = Llk(data=[row.drop(index=['alphabet_size'])], pfsafile=self.cluster_PFSA_files[j]).run()[0]
# consider to be anomaly if all llks above specified upper bound (X standard deviations above the mean)
# in this case, we create a new model based on the anomalous pattern
if np.all(cluster_llks.T[i] > upper_bounds):
self.pattern_emergence_times.append(i)
self.__add_to_PFSA_library(row)
cluster_llks = np.append(cluster_llks, np.zeros((1, num_predictions))).reshape((self.n_clusters, num_predictions))
cluster_llks[self.n_clusters - 1][i] = Llk(data=[row.drop(index=['alphabet_size'])], pfsafile=self.cluster_PFSA_files[self.n_clusters-1]).run()[0]
upper_bounds = self.PFSA_llk_means + (self.PFSA_llk_stds * self.anomaly_sensitivity)
self.cluster_llks = cluster_llks
# self.closest_match = np.argmin(cluster_llks, axis=0)
# self.predicted_active_pfsas = {}
# for i, pfsa in enumerate(self.closest_match):
# if pfsa in self.predicted_active_pfsas:
# self.predicted_active_pfsas[pfsa].append(i)
# else:
# self.predicted_active_pfsas[pfsa] = [i]
self.quantized_data = pd.concat([self.quantized_data, data.drop(columns=['alphabet_size'])], axis=0).reset_index(drop=True)
return self
def __add_to_PFSA_library(self, data):
""" Infer PFSA from data using genESeSS and update the PFSA library """
cluster_PFSA_file = RANDOM_NAME(path=self.temp_dir)
PFSA_png = RANDOM_NAME(path=self.temp_dir)
alg = GenESeSS(
data=[data.drop(index=['alphabet_size'])],
outfile=cluster_PFSA_file,
data_type='symbolic',
data_dir='row',
force=True,
eps=self.eps,
dot=PFSA_png,
configfile='./patternly/config.cfg'
)
# TODO: add error handling for when genESeSS fails to find a PFSA
PFSA_found = alg.run()
if PFSA_found:
prop_vals = [
alg.inference_error,
alg.epsilon_used,
alg.synchronizing_string_found,
alg.symbol_frequency,
alg.probability_morph_matrix,
alg.connectivity_matrix,
]
self.cluster_PFSA_info.append(dict(zip(self.PFSA_prop_titles, prop_vals)))
else:
raise ValueError(f"Unable to find new PFSA for anomaly.")
self.cluster_PFSA_files.append(cluster_PFSA_file)
self.cluster_PFSA_pngs.append(PFSA_png)
self.n_clusters += 1
seqs = pd.DataFrame(Prun(pfsafile=cluster_PFSA_file, data_len=self.window_size, num_repeats=100).run())
new_data = pd.concat([pd.DataFrame(data.drop(index=['alphabet_size'])).T, seqs], axis=0)
llks = np.asarray(Llk(data=new_data, pfsafile=self.cluster_PFSA_files[self.n_clusters-1]).run())
self.PFSA_llk_means = np.mean(llks)
self.PFSA_llk_stds = np.std(llks, ddof=1)
Classes
class AnomalyDetection (*, anomaly_sensitivity=1, n_clusters=1, reduce_clusters=True, clustering_alg=None, quantize=True, quantize_type='complex', eps=0.1, verbose=False)
-
Tool for anomaly detection
Args
anomaly_sensitivity
:float
, optional- how many standard deviations above the mean llk to consider an anomaly (Default = 1)
n_clusters
:int
, optional- number of clusters to use with KMeans (Default = 1)
reduce_clusters
:bool
, optional- whether to attempt to reduce the number of clusters
cluster_alg
:sklearn.cluster
, optional- clustering algorithm to use, if None then KMeans (Default = None)
quantize
:bool
, optional- whether to quantize the data (Default = True)
quantize_type
:str
, optional- type of quantization to use ("complex" or "simple") (Default = "complex")
eps
:float
, optional- epsilon parameter for finding PFSAs (Default = 0.1)
verbose
:bool
, optional- whether to print verbose output (Default = False)
Attributes
cluster_llks
:np.ndarray
- array of shape (n_clusters, n_predictions) containing the llk or each prediction for each cluster after calling fit()
closest_match
:np.ndarray
- array of shape (n_predictions) containing the closest match after calling fit()
cluster_PFSA_pngs
:list[str]
- list of file paths to PFSA png files
Expand source code
class AnomalyDetection: """ Tool for anomaly detection """ def __init__( self, *, anomaly_sensitivity=1, n_clusters=1, reduce_clusters=True, clustering_alg=None, quantize=True, quantize_type='complex', eps=0.1, verbose=False ) -> None: """ Args: anomaly_sensitivity (float, optional): how many standard deviations above the mean llk to consider an anomaly (Default = 1) n_clusters (int, optional): number of clusters to use with KMeans (Default = 1) reduce_clusters (bool, optional): whether to attempt to reduce the number of clusters cluster_alg (sklearn.cluster, optional): clustering algorithm to use, if None then KMeans (Default = None) quantize (bool, optional): whether to quantize the data (Default = True) quantize_type (str, optional): type of quantization to use ("complex" or "simple") (Default = "complex") eps (float, optional): epsilon parameter for finding PFSAs (Default = 0.1) verbose (bool, optional): whether to print verbose output (Default = False) Attributes: cluster_llks (np.ndarray): array of shape (n_clusters, n_predictions) containing the llk or each prediction for each cluster after calling fit() closest_match (np.ndarray): array of shape (n_predictions) containing the closest match after calling fit() cluster_PFSA_pngs (list[str]): list of file paths to PFSA png files """ self.anomaly_sensitivity = anomaly_sensitivity self.n_clusters = n_clusters self.reduce_clusters = reduce_clusters self.clustering_alg = clustering_alg self.quantize = quantize self.quantize_type = quantize_type self.eps = eps self.verbose = verbose self.temp_dir = 'zed_temp' self.PFSA_prop_titles = ['%ANN_ERR', '%MRG_EPS', '%SYN_STR', '%SYM_FRQ', '%PITILDE', '%CONNX'] # values calculated in self.fit() self.fitted = False # whether model has been fit self.quantizer = None # quantizer used for quantizing data self.quantized_data = None # quantized data self.dist_matrix = pd.DataFrame() # calculated by lsmash, used for clustering self.cluster_labels = [] # list of cluster labels self.cluster_counts = [] # number of instances in each cluster self.cluster_PFSA_files = [] # list of file paths to cluster PFSAs self.cluster_PFSA_info = [] # list of dicts of cluster PFSAs info for printing self.cluster_PFSA_pngs = [] # list of file paths to PFSA pngs self.PFSA_llk_means = [] # list of mean llk for each cluster self.PFSA_llk_stds = [] # list of std of llk for each cluster # can be accessed after calling self.predict() self.cluster_llks = None # list of llk of each prediction for each cluster self.closest_match = None # cluster label of closest match for each prediction self.predicted_active_pfsas = {} # dictionary of each pfsa active at each unit of time in the sequence def fit(self, X, y=None): """ Fit an anomaly detection model Args: X (pd.DataFrame or pd.Series): time series data to be fit y (pd.Series, optional): labels for X only provided for sklearn standard (Default = None) Returns: AnomalyDetection: fitted model """ self.quantized_data = self.__quantize(X) self.__calculate_dist_matrix() self.__calculate_cluster_labels() self.__calculate_cluster_PFSAs() self.__reduce_clusters() self.__calculate_PFSA_stats() self.fitted = True if self.verbose: print("Model fit.") return self def predict(self, X=None, *, clean=True): """ Predict whether a time series sequence is anomalous Args: X (pd.DataFrame or pd.Series, optional): time series data to find anomalies, if None then predicts on original data (Default = None) clean (bool, optional): whether to remove temp files (Default = True) Returns: bool or list[bool]: True if time series is an anomaly, False otherwise output shape depends on input """ if not self.fitted: raise ValueError("Model has not been fit yet.") data = None num_predictions = 0 # commonly want to find anomalies in original data if X is None: # occurs when model is loaded from file if self.quantized_data is None: raise ValueError("Original data not found. Pass data to predict().") data = self.quantized_data.drop(columns=['cluster'], axis=1, errors='ignore') # data.to_csv('error.csv', sep=' ', index=False, header=False) num_predictions = self.quantized_data.shape[0] else: data = self.__quantize(X) num_predictions = 1 if type(X) is pd.Series else data.shape[0] cluster_llks = np.zeros(shape=(self.n_clusters, num_predictions), dtype=np.float32) def count_clusters(row): clusters = np.bincount(row) return clusters.size data['alphabet_size'] = data.apply(count_clusters, axis=1) for i in range(self.n_clusters): curr_alphabet_size = len(self.cluster_PFSA_info[i]['%SYM_FRQ']) curr_data = data[data['alphabet_size'] <= curr_alphabet_size].drop(columns=['alphabet_size'], axis=1) llks = Llk(data=curr_data, pfsafile=self.cluster_PFSA_files[i]).run() llks = pd.concat( [data[0], pd.DataFrame(llks, index=curr_data.index, columns=['llks'])], axis=1 ).fillna(float('inf'))['llks'].values.tolist() cluster_llks[i] = np.asarray(llks, dtype=np.float32) # consider to be anomaly if all llks above specified upper bound (X standard deviations above the mean) upper_bounds = self.PFSA_llk_means + (self.PFSA_llk_stds * self.anomaly_sensitivity) predictions = np.all(cluster_llks.T > upper_bounds, axis=1) self.cluster_llks = cluster_llks self.closest_match = np.argmin(cluster_llks, axis=0) self.predicted_active_pfsas = {} for i, pfsa in enumerate(self.closest_match): if pfsa in self.predicted_active_pfsas: self.predicted_active_pfsas[pfsa].append(i) else: self.predicted_active_pfsas[pfsa] = [i] if len(predictions) == 1: return predictions[0] return predictions def save_model(self, path='patternly_model.dill'): """ Save model to file Args: path (str): file path to save model to """ if not self.fitted: raise ValueError("Model has not been fit yet.") metadata = { 'modeltype': type(self), 'user_params': { 'anomaly_sensitivity': self.anomaly_sensitivity, 'n_clusters': self.n_clusters, 'reduce_clusters': self.reduce_clusters, 'clustering_alg': self.clustering_alg, 'quantize': self.quantize, 'quantize_type': self.quantize_type, 'eps': self.eps, 'verbose': self.verbose, }, 'fitted_params': { 'quantizer_parameters': None if self.quantizer is None else self.quantizer.parameters , 'quantizer_feature_order': None if self.quantizer is None else self.quantizer._feature_order, 'cluster_labels': self.cluster_labels, 'cluster_counts': self.cluster_counts, 'cluster_PFSA_info': self.cluster_PFSA_info, 'PFSA_llk_means': self.PFSA_llk_means.tolist(), 'PFSA_llk_stds': self.PFSA_llk_stds.tolist() } } with open(path, 'wb') as f: dill.dump(metadata, f) @staticmethod def load_model(path='patternly_model.dill'): """ Load saved model Args: path (str): path to saved model Returns: AnomalyDetection or StreamingDetection: loaded model """ with open(path, 'rb') as f: metadata = dill.load(f) model = None if metadata['modeltype'] is AnomalyDetection: model = AnomalyDetection(**metadata['user_params']) else: model = StreamingDetection(**metadata['user_params']) if model.quantize and model.quantize_type == 'complex': model.quantizer = Quantizer(n_quantizations=1, eps=-1) model.quantizer.parameters = metadata['fitted_params']['quantizer_parameters'] model.quantizer._feature_order = metadata['fitted_params']['quantizer_feature_order'] model.cluster_labels = metadata['fitted_params']['cluster_labels'] model.cluster_counts = metadata['fitted_params']['cluster_counts'] model.cluster_PFSA_info = metadata['fitted_params']['cluster_PFSA_info'] model.PFSA_llk_means = np.asarray(metadata['fitted_params']['PFSA_llk_means']) model.PFSA_llk_stds = np.asarray(metadata['fitted_params']['PFSA_llk_stds']) # write PFSA files model.cluster_PFSA_files = [] for i in range(model.n_clusters): model.cluster_PFSA_files.append(RANDOM_NAME(path=model.temp_dir)) with open(model.cluster_PFSA_files[i], 'w') as f: f.write(f'{model.__format_PFSA_info(model.cluster_PFSA_info[i])}') # model.generate_PFSA_pngs() model.fitted = True return model def print_PFSAs(self): """ Print PFSAs found for each cluster """ if not self.fitted: raise ValueError("Model has not been fit yet.") for i in range(self.n_clusters): print(f"Cluster {i} PFSA:") print(self.__format_PFSA_info(self.cluster_PFSA_info[i], indent_level=4)) def generate_PFSA_pngs(self): """ Generates png files for PFSAs Returns: list[str]: list of file paths to png files """ self.cluster_PFSA_pngs = [] for i in range(self.n_clusters): self.cluster_PFSA_pngs.append(RANDOM_NAME(path=self.temp_dir)) DrawPFSA(pfsafile=self.cluster_PFSA_files[i], graphpref=self.cluster_PFSA_pngs[i]).run() return self.cluster_PFSA_pngs def __quantize(self, X): """ Quantize the data into finite alphabet Args: X (pd.DataFrame): time series data to be quantized """ if not self.quantize or self.quantize_type is None: return X.copy(deep=False).astype(np.int8) if self.verbose: print("Quantizing...") # Quantizer() expects a DataFrame if type(X) is pd.Series: X = X.copy().to_frame().reset_index(drop=True).T if self.quantize_type == 'simple': # basic differentiation X = X.astype(float).diff(axis=1).fillna(0) return X.apply(lambda row : row.apply(lambda n : 1 if n > 0 else 0), axis=1) elif self.quantize_type == 'simple-second': # second derivative X = X.astype(float).diff(axis=1).fillna(0).diff(axis=1).fillna(0) return X.apply(lambda row : row.apply(lambda n : 1 if n > 0 else 0), axis=1) elif self.quantize_type == 'complex': # use cythonized quantizer binary if not self.fitted: self.quantizer = Quantizer(n_quantizations=1, epsilon=-1) self.quantizer.fit(X) return pd.concat( [quantized for quantized in self.quantizer.transform(X)], axis=1, copy=False ).astype(np.int8) else: raise ValueError(f"Unknown quantize type: {self.quantize_type}. Choose \"simple\" or \"complex\".") def __calculate_dist_matrix(self): """ Calculate distance matrix using lsmash """ if self.verbose: print("Calculating distance matrix...") if self.n_clusters == 1: self.dist_matrix = self.quantized_data else: self.dist_matrix = pd.DataFrame( Lsmash(data=self.quantized_data, data_type='symbolic', sae=False).run(), dtype=np.float32 ) def __calculate_cluster_labels(self): """ Cluster distance matrix """ if self.verbose: print("Clustering distance matrix...") if self.n_clusters == 1: self.cluster_labels = [0 for i in range(self.dist_matrix.shape[0])] cluster_labels = [] if self.clustering_alg is None: cluster_labels = KMeans(n_clusters=self.n_clusters).fit(self.dist_matrix).labels_ else: cluster_labels = self.clustering_alg.fit(self.dist_matrix).labels_ n_clusters = len(set(cluster_labels)) - (1 if -1 in cluster_labels else 0) # reassign clusters such that cluster 0 is the most common label, 1 is the second most common, etc. cluster_counts = np.zeros(n_clusters, dtype=np.int32) for cluster in cluster_labels: cluster_counts[cluster] += 1 cluster_rank = np.full(n_clusters, n_clusters - 1, dtype=np.int32) - np.argsort(np.argsort(cluster_counts)) cluster_labels = [cluster_rank[cluster] for cluster in cluster_labels] cluster_counts = [cluster_counts[cluster] for cluster in np.argsort(cluster_counts)][::-1] # def adjust_clusters(row): # bins = np.bincount(row) # offset = [0 for _ in range(len(bins))] # preceeding_zeros = 0 # for i, bin in enumerate(bins): # if bin == 0: # preceeding_zeros += 1 # offset[i] = preceeding_zeros # return row.apply(lambda val: val - offset[val]) # self.quantized_data = self.quantized_data.apply(adjust_clusters, axis=1) self.n_clusters = n_clusters self.cluster_counts = cluster_counts self.cluster_labels = cluster_labels self.quantized_data['cluster'] = cluster_labels # self.quantized_data['alphabet_size'] = self.quantized_data.apply(count_clusters, axis=1) # self.quantized_data[self.quantized_data['cluster'] == 0].to_csv('firstcluster.csv', sep=' ', header=False, index=False) def __calculate_cluster_PFSAs(self): """ Infer PFSAs from clusters using genESeSS """ cluster_PFSA_files = [] PFSA_pngs = [] for i in range(self.n_clusters): if self.verbose: print(f"Generating cluster PFSA {i + 1}/{self.n_clusters}...") cluster_data = self.quantized_data[self.quantized_data['cluster'] == i].drop( columns=['cluster'], axis=1 ) cluster_PFSA_file = RANDOM_NAME(path=self.temp_dir) dot_file = RANDOM_NAME(path=self.temp_dir) cluster_PFSA_files.append(cluster_PFSA_file) PFSA_pngs.append(dot_file) alg = GenESeSS( data=cluster_data, outfile=cluster_PFSA_file, data_type='symbolic', data_dir='row', force=True, eps=self.eps, dot=dot_file, configfile='./patternly/config.cfg' ) # TODO: add error handling for when genESeSS fails to find a PFSA PFSA_found = alg.run() if PFSA_found: prop_vals = [ alg.inference_error, alg.epsilon_used, alg.synchronizing_string_found, alg.symbol_frequency, alg.probability_morph_matrix, alg.connectivity_matrix, ] self.cluster_PFSA_info.append(dict(zip(self.PFSA_prop_titles, prop_vals))) else: raise ValueError(f"Unable to find PFSA for cluster {i + 1}/{self.n_clusters}. Try quantizing the data manually.") self.cluster_PFSA_files = cluster_PFSA_files self.cluster_PFSA_pngs = PFSA_pngs def __reduce_clusters(self): """ Attempt to reduce the number of clusters by combining clusters that generate similar PFSAs """ if self.n_clusters == 1 or not self.reduce_clusters: return if self.verbose: print("Attempting to reduce clusters...") all_cluster_likelihoods = np.empty(shape=(self.n_clusters, self.n_clusters), dtype=np.float32) all_ranked_likelihoods = np.empty(shape=(self.n_clusters, self.n_clusters), dtype=np.int32) for i in range(self.n_clusters): cluster_llks = [] for j in range(self.n_clusters): curr_alphabet_size = len(self.cluster_PFSA_info[j]['%SYM_FRQ']) cluster_data = self.quantized_data[self.quantized_data['cluster'] == i] curr_data = cluster_data # curr_data = cluster_data[cluster_data['alphabet_size'] <= curr_alphabet_size].drop( # columns=['cluster', 'alphabet_size'], axis=1 # ) llks = [] if curr_data.shape[0] == 0 else Llk(data=curr_data, pfsafile=self.cluster_PFSA_files[j]).run() # llks = pd.concat( # [cluster_data[0], pd.DataFrame(llks, index=curr_data.index, columns=['llks'])], axis=1 # ).fillna(float('inf'))['llks'].values.tolist() cluster_llks.append(np.asarray(llks, dtype=np.float32)) # which cluster PFSA each sequence most likely maps back to closest_matches = np.argmin(cluster_llks, axis=0) # the likelihoods of the sequences generated by the current PFSA mapping back to each cluster PFSA cluster_likelihoods = np.count_nonzero( (closest_matches.reshape(-1, 1) == np.arange(self.n_clusters).reshape(1, -1)), axis=0 ) cluster_likelihoods = cluster_likelihoods / cluster_likelihoods.sum() # list of cluster PFSAs sorted in descending order of likelihood ranked_likelihoods = np.argsort(cluster_likelihoods)[::-1] all_cluster_likelihoods[i] = cluster_likelihoods all_ranked_likelihoods[i] = ranked_likelihoods new_n_clusters = self.n_clusters for i in range(self.n_clusters): best_match = all_ranked_likelihoods[i][0] if best_match != i: all_cluster_likelihoods[best_match][i] += 1 graph = DirectedGraph(self.n_clusters) graph.from_matrix(all_cluster_likelihoods, threshold=0.2) new_n_clusters = graph.find_scc() if new_n_clusters != self.n_clusters: if self.verbose: print(f"Reduced clusters from {self.n_clusters} to {new_n_clusters}.") self.n_clusters = new_n_clusters # reduced_clusters = set(graph.roots) # self.cluster_labels = [graph.roots[cluster] for cluster in self.cluster_labels] # self.quantized_data['cluster'] = self.cluster_labels # self.cluster_PFSA_files = [self.cluster_PFSA_files[i] for i in reduced_clusters] # self.cluster_PFSA_info = [self.cluster_PFSA_info[i] for i in reduced_clusters] # self.cluster_PFSA_pngs = [self.cluster_PFSA_pngs[i] for i in reduced_clusters] # new_cluster_counts = [0 for _ in range(graph.size)] # for i, count in enumerate(self.cluster_counts): # new_cluster_counts[graph.roots[i]] += count # self.cluster_counts = [count for count in new_cluster_counts if count != 0] self.__calculate_cluster_labels() self.__calculate_cluster_PFSAs() def __calculate_PFSA_stats(self): """ Calculate the means and standard deviations of llks for each PFSA to later determine if a sequence is an anomaly """ if self.verbose: print("Calculating cluster PFSA means and stds...") PFSA_llk_means = np.empty(shape=self.n_clusters) PFSA_llk_stds = np.empty(shape=self.n_clusters) for i in range(self.n_clusters): cluster_data = self.quantized_data[self.quantized_data['cluster'] == i].drop( columns=['cluster'], axis=1 ) llks = np.asarray(Llk(data=cluster_data, pfsafile=self.cluster_PFSA_files[i]).run(), dtype=np.float32) PFSA_llk_means[i] = np.mean(llks) PFSA_llk_stds[i] = np.std(llks, ddof=1) # for i in range(self.n_clusters): # curr_alphabet_size = len(self.cluster_PFSA_info[i]['%SYM_FRQ']) # curr_data = data[data['alphabet_size'] <= curr_alphabet_size].drop(columns=['cluster', 'alphabet_size'], axis=1) # llks = Llk(data=curr_data, pfsafile=self.cluster_PFSA_files[i]).run() # llks = pd.concat( # [data[0], pd.DataFrame(llks, index=curr_data.index, columns=['llks'])], axis=1 # ).fillna(float('inf'))['llks'].values.tolist() # cluster_llks[i] = np.asarray(llks, dtype=np.float32) self.PFSA_llk_means, self.PFSA_llk_stds = PFSA_llk_means, PFSA_llk_stds def __format_PFSA_info(self, PFSA_info, indent_level=0): """ Format the PFSA information for output Args: PFSA_info (dict): PFSA information to format indent_level (int): indentation level for formatting Returns: str: formatted PFSA information """ indent = indent_level * ' ' syn_str_vals = '' if PFSA_info['%SYN_STR'] is not None: for syn_str in PFSA_info['%SYN_STR']: syn_str_vals += (f'{syn_str} ') sym_frq_vals = '' for sym_frq in PFSA_info['%SYM_FRQ']: sym_frq_vals += (f'{sym_frq} ') pitilde_vals = indent for row in PFSA_info['%PITILDE']: for i, val in enumerate(row): suffix = ' ' if (i+1) < len(row) else f' \n{indent}' pitilde_vals += (f'{val}{suffix}') pitilde_vals = pitilde_vals[:-len(indent)] if indent_level > 0 else pitilde_vals connx_vals = indent for row in PFSA_info['%CONNX']: for i, val in enumerate(row): suffix = ' ' if (i+1) < len(row) else f' \n{indent}' connx_vals += (f'{val}{suffix}') connx_vals = connx_vals[:-len(indent)] if indent_level > 0 else connx_vals return ( f"{indent}%ANN_ERR: {PFSA_info['%ANN_ERR']}\n" + f"{indent}%MRG_EPS: {PFSA_info['%MRG_EPS']}\n" + f"{indent}%SYN_STR: {syn_str_vals}\n" + f"{indent}%SYM_FRQ: {sym_frq_vals}\n" + f"{indent}%PITILDE: size({len(PFSA_info['%PITILDE'])})\n" + f"{indent}#PITILDE\n{pitilde_vals}" + f"{indent}%CONNX: size({len(PFSA_info['%CONNX'])})\n" + f"{indent}#CONNX\n{connx_vals}\n" )
Subclasses
Static methods
def load_model(path='patternly_model.dill')
-
Load saved model
Args
path
:str
- path to saved model
Returns
AnomalyDetection
orStreamingDetection
- loaded model
Expand source code
@staticmethod def load_model(path='patternly_model.dill'): """ Load saved model Args: path (str): path to saved model Returns: AnomalyDetection or StreamingDetection: loaded model """ with open(path, 'rb') as f: metadata = dill.load(f) model = None if metadata['modeltype'] is AnomalyDetection: model = AnomalyDetection(**metadata['user_params']) else: model = StreamingDetection(**metadata['user_params']) if model.quantize and model.quantize_type == 'complex': model.quantizer = Quantizer(n_quantizations=1, eps=-1) model.quantizer.parameters = metadata['fitted_params']['quantizer_parameters'] model.quantizer._feature_order = metadata['fitted_params']['quantizer_feature_order'] model.cluster_labels = metadata['fitted_params']['cluster_labels'] model.cluster_counts = metadata['fitted_params']['cluster_counts'] model.cluster_PFSA_info = metadata['fitted_params']['cluster_PFSA_info'] model.PFSA_llk_means = np.asarray(metadata['fitted_params']['PFSA_llk_means']) model.PFSA_llk_stds = np.asarray(metadata['fitted_params']['PFSA_llk_stds']) # write PFSA files model.cluster_PFSA_files = [] for i in range(model.n_clusters): model.cluster_PFSA_files.append(RANDOM_NAME(path=model.temp_dir)) with open(model.cluster_PFSA_files[i], 'w') as f: f.write(f'{model.__format_PFSA_info(model.cluster_PFSA_info[i])}') # model.generate_PFSA_pngs() model.fitted = True return model
Methods
def fit(self, X, y=None)
-
Fit an anomaly detection model
Args
X
:pd.DataFrame
orpd.Series
- time series data to be fit
y
:pd.Series
, optional- labels for X only provided for sklearn standard (Default = None)
Returns
AnomalyDetection
- fitted model
Expand source code
def fit(self, X, y=None): """ Fit an anomaly detection model Args: X (pd.DataFrame or pd.Series): time series data to be fit y (pd.Series, optional): labels for X only provided for sklearn standard (Default = None) Returns: AnomalyDetection: fitted model """ self.quantized_data = self.__quantize(X) self.__calculate_dist_matrix() self.__calculate_cluster_labels() self.__calculate_cluster_PFSAs() self.__reduce_clusters() self.__calculate_PFSA_stats() self.fitted = True if self.verbose: print("Model fit.") return self
def generate_PFSA_pngs(self)
-
Generates png files for PFSAs
Returns
list[str]
- list of file paths to png files
Expand source code
def generate_PFSA_pngs(self): """ Generates png files for PFSAs Returns: list[str]: list of file paths to png files """ self.cluster_PFSA_pngs = [] for i in range(self.n_clusters): self.cluster_PFSA_pngs.append(RANDOM_NAME(path=self.temp_dir)) DrawPFSA(pfsafile=self.cluster_PFSA_files[i], graphpref=self.cluster_PFSA_pngs[i]).run() return self.cluster_PFSA_pngs
def predict(self, X=None, *, clean=True)
-
Predict whether a time series sequence is anomalous
Args
X
:pd.DataFrame
orpd.Series
, optional- time series data to find anomalies, if None then predicts on original data (Default = None)
clean
:bool
, optional- whether to remove temp files (Default = True)
Returns
bool
orlist[bool]
- True if time series is an anomaly, False otherwise output shape depends on input
Expand source code
def predict(self, X=None, *, clean=True): """ Predict whether a time series sequence is anomalous Args: X (pd.DataFrame or pd.Series, optional): time series data to find anomalies, if None then predicts on original data (Default = None) clean (bool, optional): whether to remove temp files (Default = True) Returns: bool or list[bool]: True if time series is an anomaly, False otherwise output shape depends on input """ if not self.fitted: raise ValueError("Model has not been fit yet.") data = None num_predictions = 0 # commonly want to find anomalies in original data if X is None: # occurs when model is loaded from file if self.quantized_data is None: raise ValueError("Original data not found. Pass data to predict().") data = self.quantized_data.drop(columns=['cluster'], axis=1, errors='ignore') # data.to_csv('error.csv', sep=' ', index=False, header=False) num_predictions = self.quantized_data.shape[0] else: data = self.__quantize(X) num_predictions = 1 if type(X) is pd.Series else data.shape[0] cluster_llks = np.zeros(shape=(self.n_clusters, num_predictions), dtype=np.float32) def count_clusters(row): clusters = np.bincount(row) return clusters.size data['alphabet_size'] = data.apply(count_clusters, axis=1) for i in range(self.n_clusters): curr_alphabet_size = len(self.cluster_PFSA_info[i]['%SYM_FRQ']) curr_data = data[data['alphabet_size'] <= curr_alphabet_size].drop(columns=['alphabet_size'], axis=1) llks = Llk(data=curr_data, pfsafile=self.cluster_PFSA_files[i]).run() llks = pd.concat( [data[0], pd.DataFrame(llks, index=curr_data.index, columns=['llks'])], axis=1 ).fillna(float('inf'))['llks'].values.tolist() cluster_llks[i] = np.asarray(llks, dtype=np.float32) # consider to be anomaly if all llks above specified upper bound (X standard deviations above the mean) upper_bounds = self.PFSA_llk_means + (self.PFSA_llk_stds * self.anomaly_sensitivity) predictions = np.all(cluster_llks.T > upper_bounds, axis=1) self.cluster_llks = cluster_llks self.closest_match = np.argmin(cluster_llks, axis=0) self.predicted_active_pfsas = {} for i, pfsa in enumerate(self.closest_match): if pfsa in self.predicted_active_pfsas: self.predicted_active_pfsas[pfsa].append(i) else: self.predicted_active_pfsas[pfsa] = [i] if len(predictions) == 1: return predictions[0] return predictions
def print_PFSAs(self)
-
Print PFSAs found for each cluster
Expand source code
def print_PFSAs(self): """ Print PFSAs found for each cluster """ if not self.fitted: raise ValueError("Model has not been fit yet.") for i in range(self.n_clusters): print(f"Cluster {i} PFSA:") print(self.__format_PFSA_info(self.cluster_PFSA_info[i], indent_level=4))
def save_model(self, path='patternly_model.dill')
-
Save model to file
Args
path
:str
- file path to save model to
Expand source code
def save_model(self, path='patternly_model.dill'): """ Save model to file Args: path (str): file path to save model to """ if not self.fitted: raise ValueError("Model has not been fit yet.") metadata = { 'modeltype': type(self), 'user_params': { 'anomaly_sensitivity': self.anomaly_sensitivity, 'n_clusters': self.n_clusters, 'reduce_clusters': self.reduce_clusters, 'clustering_alg': self.clustering_alg, 'quantize': self.quantize, 'quantize_type': self.quantize_type, 'eps': self.eps, 'verbose': self.verbose, }, 'fitted_params': { 'quantizer_parameters': None if self.quantizer is None else self.quantizer.parameters , 'quantizer_feature_order': None if self.quantizer is None else self.quantizer._feature_order, 'cluster_labels': self.cluster_labels, 'cluster_counts': self.cluster_counts, 'cluster_PFSA_info': self.cluster_PFSA_info, 'PFSA_llk_means': self.PFSA_llk_means.tolist(), 'PFSA_llk_stds': self.PFSA_llk_stds.tolist() } } with open(path, 'wb') as f: dill.dump(metadata, f)
class ContinuousStreamingDetection (**kwargs)
-
Tool for anomaly detection within a single continual data stream
Args: see StreamingDetection
Expand source code
class ContinuousStreamingDetection(StreamingDetection): """ Tool for anomaly detection within a single continual data stream """ def __init__(self, **kwargs): """ Args: see StreamingDetection """ self.pattern_emergence_times = [] super().__init__(**kwargs) def fit_stream(self, X, *, clean=True): """ Fit new stream of data Args: X (pd.DataFrame or pd.Series): time series data stream to find anomalies clean (bool, optional): whether to remove temp files (Default = True) Returns: ContinuousStreamingDetection: fitted model """ if X is None: raise ValueError("Please pass data stream to fit.") self.pattern_emergence_times = [] X_split_streams = self.split_streams(X, self.window_size, self.window_overlap) data = self._AnomalyDetection__quantize(X_split_streams) num_predictions = data.shape[0] def count_clusters(row): clusters = np.bincount(row) return clusters.size data['alphabet_size'] = data.apply(count_clusters, axis=1) # if not fitted with previous data, create a PFSA from first stream if not self.fitted: self.n_clusters = 0 self.fitted = True self.__add_to_PFSA_library(data.iloc[0]) self.pattern_emergence_times.append(0) cluster_llks = np.zeros(shape=(self.n_clusters, num_predictions), dtype=np.float32) for i, row in data.iterrows(): upper_bounds = self.PFSA_llk_means + (self.PFSA_llk_stds * self.anomaly_sensitivity) for j in range(self.n_clusters): curr_alphabet_size = len(self.cluster_PFSA_info[j]['%SYM_FRQ']) if row['alphabet_size'] > curr_alphabet_size: cluster_llks[j][i] = float('inf') continue cluster_llks[j][i] = Llk(data=[row.drop(index=['alphabet_size'])], pfsafile=self.cluster_PFSA_files[j]).run()[0] # consider to be anomaly if all llks above specified upper bound (X standard deviations above the mean) # in this case, we create a new model based on the anomalous pattern if np.all(cluster_llks.T[i] > upper_bounds): self.pattern_emergence_times.append(i) self.__add_to_PFSA_library(row) cluster_llks = np.append(cluster_llks, np.zeros((1, num_predictions))).reshape((self.n_clusters, num_predictions)) cluster_llks[self.n_clusters - 1][i] = Llk(data=[row.drop(index=['alphabet_size'])], pfsafile=self.cluster_PFSA_files[self.n_clusters-1]).run()[0] upper_bounds = self.PFSA_llk_means + (self.PFSA_llk_stds * self.anomaly_sensitivity) self.cluster_llks = cluster_llks # self.closest_match = np.argmin(cluster_llks, axis=0) # self.predicted_active_pfsas = {} # for i, pfsa in enumerate(self.closest_match): # if pfsa in self.predicted_active_pfsas: # self.predicted_active_pfsas[pfsa].append(i) # else: # self.predicted_active_pfsas[pfsa] = [i] self.quantized_data = pd.concat([self.quantized_data, data.drop(columns=['alphabet_size'])], axis=0).reset_index(drop=True) return self def __add_to_PFSA_library(self, data): """ Infer PFSA from data using genESeSS and update the PFSA library """ cluster_PFSA_file = RANDOM_NAME(path=self.temp_dir) PFSA_png = RANDOM_NAME(path=self.temp_dir) alg = GenESeSS( data=[data.drop(index=['alphabet_size'])], outfile=cluster_PFSA_file, data_type='symbolic', data_dir='row', force=True, eps=self.eps, dot=PFSA_png, configfile='./patternly/config.cfg' ) # TODO: add error handling for when genESeSS fails to find a PFSA PFSA_found = alg.run() if PFSA_found: prop_vals = [ alg.inference_error, alg.epsilon_used, alg.synchronizing_string_found, alg.symbol_frequency, alg.probability_morph_matrix, alg.connectivity_matrix, ] self.cluster_PFSA_info.append(dict(zip(self.PFSA_prop_titles, prop_vals))) else: raise ValueError(f"Unable to find new PFSA for anomaly.") self.cluster_PFSA_files.append(cluster_PFSA_file) self.cluster_PFSA_pngs.append(PFSA_png) self.n_clusters += 1 seqs = pd.DataFrame(Prun(pfsafile=cluster_PFSA_file, data_len=self.window_size, num_repeats=100).run()) new_data = pd.concat([pd.DataFrame(data.drop(index=['alphabet_size'])).T, seqs], axis=0) llks = np.asarray(Llk(data=new_data, pfsafile=self.cluster_PFSA_files[self.n_clusters-1]).run()) self.PFSA_llk_means = np.mean(llks) self.PFSA_llk_stds = np.std(llks, ddof=1)
Ancestors
Methods
def fit_stream(self, X, *, clean=True)
-
Fit new stream of data
Args
X
:pd.DataFrame
orpd.Series
- time series data stream to find anomalies
clean
:bool
, optional- whether to remove temp files (Default = True)
Returns
ContinuousStreamingDetection
- fitted model
Expand source code
def fit_stream(self, X, *, clean=True): """ Fit new stream of data Args: X (pd.DataFrame or pd.Series): time series data stream to find anomalies clean (bool, optional): whether to remove temp files (Default = True) Returns: ContinuousStreamingDetection: fitted model """ if X is None: raise ValueError("Please pass data stream to fit.") self.pattern_emergence_times = [] X_split_streams = self.split_streams(X, self.window_size, self.window_overlap) data = self._AnomalyDetection__quantize(X_split_streams) num_predictions = data.shape[0] def count_clusters(row): clusters = np.bincount(row) return clusters.size data['alphabet_size'] = data.apply(count_clusters, axis=1) # if not fitted with previous data, create a PFSA from first stream if not self.fitted: self.n_clusters = 0 self.fitted = True self.__add_to_PFSA_library(data.iloc[0]) self.pattern_emergence_times.append(0) cluster_llks = np.zeros(shape=(self.n_clusters, num_predictions), dtype=np.float32) for i, row in data.iterrows(): upper_bounds = self.PFSA_llk_means + (self.PFSA_llk_stds * self.anomaly_sensitivity) for j in range(self.n_clusters): curr_alphabet_size = len(self.cluster_PFSA_info[j]['%SYM_FRQ']) if row['alphabet_size'] > curr_alphabet_size: cluster_llks[j][i] = float('inf') continue cluster_llks[j][i] = Llk(data=[row.drop(index=['alphabet_size'])], pfsafile=self.cluster_PFSA_files[j]).run()[0] # consider to be anomaly if all llks above specified upper bound (X standard deviations above the mean) # in this case, we create a new model based on the anomalous pattern if np.all(cluster_llks.T[i] > upper_bounds): self.pattern_emergence_times.append(i) self.__add_to_PFSA_library(row) cluster_llks = np.append(cluster_llks, np.zeros((1, num_predictions))).reshape((self.n_clusters, num_predictions)) cluster_llks[self.n_clusters - 1][i] = Llk(data=[row.drop(index=['alphabet_size'])], pfsafile=self.cluster_PFSA_files[self.n_clusters-1]).run()[0] upper_bounds = self.PFSA_llk_means + (self.PFSA_llk_stds * self.anomaly_sensitivity) self.cluster_llks = cluster_llks # self.closest_match = np.argmin(cluster_llks, axis=0) # self.predicted_active_pfsas = {} # for i, pfsa in enumerate(self.closest_match): # if pfsa in self.predicted_active_pfsas: # self.predicted_active_pfsas[pfsa].append(i) # else: # self.predicted_active_pfsas[pfsa] = [i] self.quantized_data = pd.concat([self.quantized_data, data.drop(columns=['alphabet_size'])], axis=0).reset_index(drop=True) return self
Inherited members
class StreamingDetection (*, window_size=1000, window_overlap=0, **kwargs)
-
Tool for anomaly detection within a single data stream
Args
window_size
:int
- size of sliding window
window_overlap
:int
- overlap of sliding windows
Expand source code
class StreamingDetection(AnomalyDetection): """ Tool for anomaly detection within a single data stream """ def __init__(self, *, window_size=1000, window_overlap=0, **kwargs): """ Args: window_size (int): size of sliding window window_overlap (int): overlap of sliding windows """ super().__init__(**kwargs) self.window_size = int(window_size) self.window_overlap = int(window_overlap) def fit(self, X, y=None): if (self.verbose): print("Splitting data into individual streams...") X_split_streams = self.split_streams(X, self.window_size, self.window_overlap) return super().fit(X_split_streams) def predict(self, X=None): if X is None: return super().predict() else: if (self.verbose): print("Splitting data into individual streams...") X_split_streams = self.split_streams(X, self.window_size, self.window_overlap) return super().predict(X_split_streams) def save_model(self, path='patternly_model.dill'): super().save_model(path=path) with open(path, 'rb') as f: metadata = dill.load(f) metadata['user_params']['window_size'] = self.window_size metadata['user_params']['window_overlap'] = self.window_overlap with open(path, 'wb') as f: dill.dump(metadata, f) @staticmethod def split_streams(X, window_size, window_overlap): """ Split stream data into individual streams using windows with specified size and overlap Args: X (pd.Series): data to be split into streams """ if type(X) is not pd.DataFrame: X = pd.DataFrame(X) def beg(i): return int((window_size * i) - (window_overlap * i)) def end(i): return int(beg(i) + window_size) size = int(X.shape[0] // (window_size - window_overlap)) return pd.concat( [X[beg(i):end(i)].reset_index(drop=True) for i in range(size)], axis=1 ).T.reset_index(drop=True).dropna(how='any', axis=0)
Ancestors
Subclasses
Static methods
def split_streams(X, window_size, window_overlap)
-
Split stream data into individual streams using windows with specified size and overlap
Args
X
:pd.Series
- data to be split into streams
Expand source code
@staticmethod def split_streams(X, window_size, window_overlap): """ Split stream data into individual streams using windows with specified size and overlap Args: X (pd.Series): data to be split into streams """ if type(X) is not pd.DataFrame: X = pd.DataFrame(X) def beg(i): return int((window_size * i) - (window_overlap * i)) def end(i): return int(beg(i) + window_size) size = int(X.shape[0] // (window_size - window_overlap)) return pd.concat( [X[beg(i):end(i)].reset_index(drop=True) for i in range(size)], axis=1 ).T.reset_index(drop=True).dropna(how='any', axis=0)
Inherited members