Source code for scatcluster.processing.ica

"""ICA processing module"""
import os
import pickle
from glob import glob
from typing import List, Optional

import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from matplotlib import dates as mdates
from obspy.core import UTCDateTime
from scipy.signal import medfilt
from sklearn.decomposition import FastICA
from sklearn.metrics import explained_variance_score, mean_squared_error

from scatcluster.helper import demad, list_of_strings


[docs]def round_nearest(x, a): """ Rounds a number `x` to the nearest multiple of `a`. Parameters: x (float): The number to be rounded. a (float): The multiple to round to. Returns: float: The rounded number. """ return round(x / a) * a
[docs]class ICA:
[docs] def _get_index_from_UTC_timestamp(self, utc_timestamp: str): """ Get the index of the maximum value in the `data_times` list that is less than the given UTC timestamp. Parameters: utc_timestamp (str): The UTC timestamp to compare against the `data_times` list. Returns: int: The index of the maximum value in the `data_times` list that is less than the given UTC timestamp. """ df_time = pd.DataFrame({'time': self.data_times}) return max(df_time.loc[ df_time['time'] < mdates.date2num(UTCDateTime(utc_timestamp)), ].index)
[docs] def process_ICA_single(self, num_ICA: int, return_data: bool = False, exclude_timestamps: Optional[List[str]] = None, exclude_timestamps_skip: int = 5, **kwargs): """ Process the data for a single run of ICA. This function performs Independent Component Analysis (ICA) for a specified number of components. It fits the ICA model to the provided data after optionally excluding certain timestamps. The resulting features are then saved, and the model can be optionally returned. Parameters: num_ICA (int): The number of Independent Components to reduce the data to. return_data (bool): Flag indicating whether to return data after processing. Default is False. exclude_timestamps (Optional[List[str]]): List of timestamps to exclude from the data before fitting. exclude_timestamps_skip (int): The number of data points to skip for each excluded timestamp. **kwargs: Additional keyword arguments that can be passed to the FastICA model. Returns: Tuple: If return_data is True, it returns a tuple containing: - The Explained Variance (%) of the ICA. - The Mean Squared Error (MSE) of the ICA. - The trained ICA model. - The extracted features after transformation. Side Effects: - Saves the trained ICA model in a pickle file. - Saves the features in a numpy .npz file. """ print(f'Performing ICA for {num_ICA} ICAs') ica_model_path = (f'{self.data_savepath}ICA/{self.data_network}_{self.data_station}_{self.data_location}_' f'{self.network_name}_dimension_{num_ICA}.pickle') # check if reduction exists already if os.path.exists(ica_model_path) and self.ica_overwrite_previous_models is False: print(' Using pre-calculated model') with open(ica_model_path, 'rb') as f: model = pickle.load(f) else: # else fit kwargs['max_iter'] = 1000 if kwargs.get('max_iter') is None else kwargs.get('max_iter') kwargs['whiten'] = 'unit-variance' if kwargs.get('whiten') is None else kwargs.get('whiten') model = FastICA(n_components=num_ICA, random_state=42, **kwargs) # Exclude any timestamps scat_coeff_data = self.data_scat_coef_vectorized.copy() if exclude_timestamps is not None: for ts in exclude_timestamps: print( f'Timestamp {UTCDateTime(ts)} - ' f'{UTCDateTime(ts) + (self.network_segment * exclude_timestamps_skip)} has been excluded from the ' f'Scaling fit and Model fit.') ts_index = self._get_index_from_UTC_timestamp(ts) scat_coeff_data = np.delete(scat_coeff_data, np.s_[ts_index:ts_index + exclude_timestamps_skip], 0) print('Fitting FastICA') model.fit(scat_coeff_data) features = model.transform(scat_coeff_data) features_inverse = model.inverse_transform(features) score_exp_var = explained_variance_score(scat_coeff_data, features_inverse) score_mse = mean_squared_error(scat_coeff_data, features_inverse) print(f' ICAs #{num_ICA}: Explained Variance {score_exp_var * 100:0.5f}%: MSE {score_mse:0.5f}') self.ica = model self.ica_number_components = num_ICA self.ica_features = features self.ica_explained_variance_score = score_exp_var # SAVE ICA MODEL IN PICKLE FILE with open(ica_model_path, 'wb') as handle: pickle.dump(model, handle, protocol=pickle.HIGHEST_PROTOCOL) # save the features np.savez( f'{self.data_savepath}data/{self.data_network}_{self.data_station}_{self.data_location}_' f'{self.network_name}_features_{num_ICA}.npz', features=features, features_inverse=features_inverse, score_explained_variance=score_exp_var, score_mse=score_mse) if return_data: return score_exp_var, score_mse, model, features
[docs] def process_ICA_range(self, exclude_timestamps: Optional[List[str]] = None, exclude_timestamps_skip: int = 3, **kwargs) -> None: """ Process a range of Independent Component Analysis (ICA). Parameters: exclude_timestamps (Optional[List[str]]): List of timestamps to exclude from the data. exclude_timestamps_skip (int): The number of data points to skip for each excluded timestamp. **kwargs: Additional keyword arguments. """ score_exp_var = 0 ica_min_ICAs = 6 if self.ica_min_ICAs is None else self.ica_min_ICAs ica_max_ICAs = 20 if self.ica_max_ICAs is None else self.ica_max_ICAs num_ICA = ica_min_ICAs - 1 while score_exp_var <= self.ica_ev_limit and num_ICA < ica_max_ICAs: num_ICA += 1 score_exp_var, _, model, features = self.process_ICA_single(num_ICA=num_ICA, return_data=True, exclude_timestamps=exclude_timestamps, exclude_timestamps_skip=exclude_timestamps_skip, **kwargs) if num_ICA == ica_max_ICAs: print(f'Failed to reach an explained variance of {self.ica_ev_limit * 100:0.5f} with a max number of ' f'{self.ica_max_ICAs} ICAs. Either increase the maximum number of ICAs or decrease the Explained ' f'Variance Limit.') else: print(f'Recommended number of ICAs {num_ICA}.\nThe Explained Variance with these ICAs is {score_exp_var}.') self.ica = model self.ica_number_components = num_ICA self.ica_features = features print(f'Compressed Vectorised Scat. Coefficients Array after ICA: {features.shape}')
[docs] def preload_ICA(self, num_ICA: int) -> None: """ Load a pre-calculated ICA and set required variables Args: num_ICA (int): Desired number of ICAs """ if not os.path.exists(f'{self.data_savepath}ICA/{self.data_network}_{self.data_station}_{self.data_location}_' f'{self.network_name}_dimension_{num_ICA}.pickle'): raise ValueError( f'The supplied number of ICAs {num_ICA} has not been computed. Choose another number for the ICAs or ' f"calcate using 'process_ICA_single'") ica_file = f'{self.data_savepath}ICA/{self.data_network}_{self.data_station}_{self.data_location}_' + \ f'{self.network_name}_dimension_{num_ICA}.pickle' with open(ica_file, 'rb') as f: self.ica = pickle.load(f) ica_results = np.load(f'{self.data_savepath}data/{self.data_network}_{self.data_station}_{self.data_location}_' f'{self.network_name}_features_{num_ICA}.npz') self.ica_features = ica_results['features'] self.ica_features_inverse = ica_results['features_inverse'] self.ica_explained_variance_score = ica_results['score_explained_variance'] self.ica_mse = ica_results['score_mse'] self.ica_number_components = num_ICA print(f'Compressed Vectorised Scat. Coefficients Array after ICA: {self.ica_features.shape}') self.data_times = np.load( f'{self.data_savepath}data/{self.data_network}_{self.data_station}_{self.data_location}_' f'{self.network_name}_times.npy')
[docs] def plot_ICA(self, **kwargs) -> None: """ Visualise the ICAs """ dimensions = self.ica_features.shape[1] # Preprocess features = self.ica_features.T features = demad(features) # features = latents FACTOR = 0.5 # Show in time n_features, _ = features.shape # Figure kwargs['figsize'] = (10, 7) if kwargs.get('figsize') is None else kwargs.get('figsize') _, ax = plt.subplots(1, **kwargs) # Show for index, feature in enumerate(features.copy()): color = f'C{index % 3}' feature *= 0.5 feature += index + 1 feature_filtered = medfilt( feature, self.ica_median_filter if self.ica_median_filter is not None else (int(3600 / self.network_segment) * 3) + 1) ax.plot(self.data_times, feature, '.', ms=3, alpha=0.5, mew=0, color=color) ax.plot(self.data_times, feature_filtered, lw=0.7, color='k') # Labels ax.grid() ax.set_ylim(0, n_features + FACTOR) ax.set_yticks(np.arange(n_features) + 1) ax.set_yticklabels(list_of_strings(n_features)) ax.set_ylabel('Independant component index') # Date labels dateticks = mdates.AutoDateLocator() datelabels = mdates.ConciseDateFormatter(dateticks) ax.xaxis.set_major_locator(dateticks) ax.xaxis.set_major_formatter(datelabels) ax.set_xlim(self.data_times.min(), self.data_times.max()) # Remove borders ax.spines['top'].set_visible(False) ax.spines['right'].set_visible(False) ax.spines['left'].set_visible(False) ax.tick_params(axis='y', length=0) plt.title( f'{self.data_network}_{self.data_station}_{self.data_location}_{self.network_name}_ICAs_' f'{dimensions}\n{dimensions} ICAs - Explained Variance: {self.ica_explained_variance_score * 100:0.5f}') plt.savefig(f'{self.data_savepath}figures/{self.data_network}_{self.data_station}_{self.data_location}_' f'{self.network_name}_ICAs_{dimensions}.png') plt.show()
[docs] def plot_ICA_zoom(self, ICA_letter: str, zoom_time_start: str, zoom_time_end: str, **kwargs) -> None: """ Plots a zoomed view of a specific ICA component. Args: ICA_letter (str): The letter corresponding to the ICA component to plot. zoom_time_start (str): The start time of the zoomed view. zoom_time_end (str): The end time of the zoomed view. **kwargs: Additional keyword arguments to pass to `plt.subplots()`. Raises: IndexError: If the specified ICA component does not exist. Note: The `ICA_letter` argument should be an uppercase letter corresponding to the ICA component to plot. """ ica_number = [ x_enum for x_enum, x in enumerate(list_of_strings(self.ica.n_components)) if x == ICA_letter.upper() ][0] features = self.ica_features.T features_smoothed = demad(features) # Zoom into an ICA kwargs['figsize'] = (10, 7) if kwargs.get('figsize') is None else kwargs.get('figsize') _, ax = plt.subplots(**kwargs) ax.plot(self.data_times, features[ica_number], '.', ms=3, alpha=0.5, mew=0) ax.plot(self.data_times, features_smoothed[ica_number], lw=0.7, color='k') ax.set_xlim(min(self.data_times[self.data_times > int(mdates.date2num(UTCDateTime(zoom_time_start)))]), min(self.data_times[self.data_times > int(mdates.date2num(UTCDateTime(zoom_time_end)))])) ax.set_ylim(-2, 2) ax.grid() locator = mdates.AutoDateLocator() formatter = mdates.ConciseDateFormatter(locator) ax.xaxis.set_major_locator(locator) ax.xaxis.set_major_formatter(formatter) plt.savefig( f'{self.data_savepath}figures/{self.data_network}_{self.data_station}_{self.data_location}_' f'{self.network_name}_ICAs_{self.ica.n_components}_{ICA_letter}_Zoom_{zoom_time_start}_{zoom_time_end}.png') plt.show()
[docs] def plot_ica_contribution(self, **kwargs) -> None: """ Visualise the ICA contribution to each cluster """ # Calculate centroid classes = np.unique(self.dendrogram_predictions) feature_usage = np.zeros((self.ica_features.shape[1], len(classes))) for cluster in classes: within_cluster = self.dendrogram_predictions == cluster cluster_samples = self.ica_features[within_cluster] contributions = np.mean(cluster_samples, axis=0) contributions /= np.abs(contributions).sum() feature_usage[:, cluster - 1] = contributions vmax = round_nearest(np.abs(feature_usage).max(), 0.05) cmap = plt.cm.RdYlBu bounds = np.arange(-vmax, vmax, .05) norm = mpl.colors.BoundaryNorm(bounds, cmap.N) kwargs['figsize'] = (8, 5) if kwargs.get('figsize') is None else kwargs.get('figsize') _, ax = plt.subplots(1, 1, **kwargs) img = ax.matshow(feature_usage, cmap=cmap, norm=norm) cb = plt.colorbar(img, orientation='horizontal') cb.minorticks_on() cb.set_label('Normalised contribution') ax.set_title('Contribution of ICA to prediction\nCluster') ax.set_ylabel('ICA component') ax.set_xticks(range(len(classes)), range(1, len(classes) + 1)) ax.set_yticks(range(self.ica_features.shape[1]), list_of_strings(self.ica_features.shape[1])) plt.savefig( f'{self.data_savepath}figures/{self.data_network}_{self.data_station}_{self.data_location}_' f'{self.network_name}_ICA_{self.ica.n_components}_clustering_{self.ica.n_components}_' f'Waveform_contribution.png', bbox_inches='tight') plt.show()
[docs] def list_linkages(self): """ Lists all the linkage files for the given data network, station, location, and network name. This function uses the `glob` function to find all the linkage files in the clustering directory that match the pattern `f'{self.data_savepath}clustering/{self.data_network}_{self.data_station}_{self.data_location}_' f'{self.network_name}_ICA_*linkage*'`. It prints the list of linkage files found. Parameters: self (object): The instance of the class. """ print( glob(f'{self.data_savepath}clustering/{self.data_network}_{self.data_station}_{self.data_location}_' f'{self.network_name}_ICA_*linkage*'))