Source code for scatcluster.analysis.waveforms

"""Analysis Waveforms module."""
import datetime
from glob import glob
from typing import List

import matplotlib.pyplot as plt
import numpy as np
import obspy
import pandas as pd
from matplotlib import dates as mdates
from obspy.core import UTCDateTime
from scipy.spatial.distance import euclidean

from scatcluster.helper import COLORS, is_gpu_available


[docs]class Waveforms:
[docs] def plot_waveforms_per_cluster(self, clusters: List[int] = None, waveforms_n_samples: int = None, **kwargs): """ Visualise Waveforms for All or a select number of clusters. Args: clusters (List[int], optional): If specified, only plot waveforms for clusters in list of integers. For example, clusters = [1,3] will produce waveforms for cluster 1 and 3 only. Defaults to None will produce waveforms for all clusters. waveforms_n_samples (int, optional): Number of waveforms to plot. Defaults to None. **kwargs: Additional keyword arguments to pass to plt.subplots(). """ # get channel list from plotting stream = self.load_data(starttime=UTCDateTime(self.data_sample_starttime), endtime=UTCDateTime(self.data_sample_endtime), channel=self.data_channel) channel_list = sorted(trace.stats.channel for trace in stream) if clusters is not None: classes = clusters else: # Calculate centroid classes = np.unique(self.dendrogram_predictions) for cluster in classes: # Extract clusters within_cluster = self.dendrogram_predictions == cluster cluster_samples = self.ica_features[within_cluster] cluster_times = self.dendrogram_timestamps[within_cluster] # Centroid centroid = np.median(cluster_samples, axis=0) distances = [] for sample in cluster_samples: distances.append(euclidean(sample, centroid)) distances = np.array(distances) # Extract the best waveforms timestamps _waveforms_n_samples = 5 if waveforms_n_samples is None else waveforms_n_samples distances_argsort = np.argsort(distances) sorted_times = cluster_times[distances_argsort][:_waveforms_n_samples] sorted_times = mdates.num2date(sorted_times) kwargs['figsize'] = (20, 8) if kwargs.get('figsize') is None else kwargs.get('figsize') _, ax = plt.subplots(1, 4, gridspec_kw={'width_ratios': [2, 2, 2, 1]}, **kwargs) for ch, channel in enumerate(channel_list): yticks = [] for index, t in enumerate(sorted_times): start = obspy.UTCDateTime(t) end = start + self.network_segment stream = self.load_data(starttime=start, endtime=end, channel=self.data_channel) if len(stream) > 0: trace = stream[0] data = trace.data data /= np.abs(data).max() ax[ch].plot(trace.times(), data + index, lw=0.3) yticks.append(start) else: print(f'Trace is not available between {start}-{end} for supplied parametrization') ax[ch].set_title(f'channel {channel}') ax[0].set_yticks(range(len(yticks)), [x.strftime('%Y-%m-%d %H:%M:%S') for x in yticks]) self.show_dendrogram_waveforms(linkage=self.dendrogram_linkage, ax=ax[3], depth=len(np.unique(self.dendrogram_predictions))) ax[3].set_title('Dendrogram') self._set_share_axes(axs=ax[:2], sharex=True, sharey=True) plt.suptitle(f'{self.data_network}_{self.data_station}_{self.data_location}_{self.network_name}_ICA_' f'{self.ica.n_components}_clustering_{clusters}\nCluster {cluster}') plt.savefig( f'{self.data_savepath}figures/{self.data_network}_{self.data_station}_{self.data_location}_' f'{self.network_name}_ICA_{self.ica.n_components}_clustering_{clusters}_waveforms_cluster_{cluster}.png' ) plt.show()
[docs] def plot_all_waveforms(self, channel='HHZ', waveforms_n_samples=3, **kwargs): """ Plots waveforms for all or a select number of clusters. Args: channel (str, optional): The channel to plot waveforms for. Defaults to 'HHZ'. waveforms_n_samples (int, optional): The number of waveforms to plot. Defaults to 3. **kwargs: Additional keyword arguments to pass to plt.subplots(). Prints: Trace is not available between {start}-{end} for supplied parametrization Saves: A figure of waveforms for all clusters as a PNG file. """ classes = np.unique(self.dendrogram_predictions) _waveforms_n_samples = 5 if waveforms_n_samples is None else waveforms_n_samples kwargs['figsize'] = (20, 8) if kwargs.get('figsize') is None else kwargs.get('figsize') _, ax = plt.subplots( _waveforms_n_samples, len(classes), # sharey=True, sharex=True, # gridspec_kw={'width_ratios': [2, 2, 2, 1]}, **kwargs) for cluster_enum, cluster in enumerate(classes): # Extract clusters within_cluster = self.dendrogram_predictions == cluster cluster_samples = self.ica_features[within_cluster] cluster_times = self.dendrogram_timestamps[within_cluster] # Centroid centroid = np.median(cluster_samples, axis=0) distances = [] for sample in cluster_samples: distances.append(euclidean(sample, centroid)) distances = np.array(distances) # Extract the best waveforms timestamps distances_argsort = np.argsort(distances) sorted_times = cluster_times[distances_argsort][:_waveforms_n_samples] sorted_times = mdates.num2date(sorted_times) for index, t in enumerate(sorted_times): start = obspy.UTCDateTime(t) end = start + self.network_segment stream = self.load_data(starttime=start, endtime=end, channel=channel) if len(stream) > 0: trace = stream[0] stream_data = trace.data ax[index, cluster_enum].plot(trace.times(), stream_data, lw=0.3, color=COLORS[cluster_enum]) ax[index, cluster_enum].annotate(start.strftime('%Y/%m/%d %H:%M:%S'), xy=(0, 1), xycoords='axes fraction', horizontalalignment='left', verticalalignment='top') else: print(f'Trace is not available between {start}-{end} for supplied parametrization') ax[0, cluster_enum].set_title(f'Cluster {cluster}') _ = [axc.set_axis_off() for axr in ax for axc in axr] plt.subplots_adjust(wspace=0, hspace=0) plt.suptitle(f'{self.data_network}_{self.data_station}_{self.data_location}_{self.network_name}_ICA_' f'{self.ica.n_components}_clustering_{len(classes)}') plt.savefig( f'{self.data_savepath}figures/{self.data_network}_{self.data_station}_{self.data_location}_' f'{self.network_name}_ICA_{self.ica.n_components}_clustering_{len(classes)}_all_clusters_waveforms.png') plt.show()
[docs] def get_waveform(self, start_time='2022-01-01T00:00:00+00:00', time_second=60, channel='HHZ', envelope=False): """ Retrieves a waveform from the data source within a specified time range and channel. Args: start_time (str, optional): The start time of the waveform in ISO format. Defaults to '2022-01-01T00:00:00+00:00'. time_second (int, optional): The duration of the waveform in seconds. Defaults to 60. channel (str, optional): The channel of the waveform. Defaults to 'HHZ'. envelope (bool, optional): Whether to process the waveform with the envelope function. Defaults to False. Returns: obspy.core.stream.Stream: The waveform data as an ObsPy Stream object. """ time_start = UTCDateTime(str(start_time)) time_end = UTCDateTime(str(start_time)) + datetime.timedelta(seconds=time_second) wvf = self.load_data(starttime=time_start, endtime=time_end, channel=channel) if envelope: wvf = self.process_stream_envelope(wvf) return wvf
[docs] def retreive_scat_vector(self, scat_coef_vectorized, scat_coef_vectorized_index, num_channels=3, scat_vec_size_first_order=16, scat_vec_size_second_order=(16, 7)): """ Retrieves the scattering vector from the given `scat_coef_vectorized` array at the specified `scat_coef_vectorized_index`. Args: scat_coef_vectorized (ndarray): The array containing the scattering coefficients. scat_coef_vectorized_index (int): The index of the scattering coefficient vector to retrieve. num_channels (int, optional): The number of channels. Defaults to 3. scat_vec_size_first_order (int, optional): The size of the first-order scattering vector. Defaults to 16. scat_vec_size_second_order (tuple, optional): The size of the second-order scattering vector. Defaults to (16, 7). Returns: tuple: A tuple containing the first-order scattering vector and the second-order scattering vector. """ scat_vec = scat_coef_vectorized[scat_coef_vectorized_index, :] scat_vec_first_order = scat_vec[:(num_channels * scat_vec_size_first_order)].reshape( num_channels, scat_vec_size_first_order) scat_vec_second_order = scat_vec[(num_channels * scat_vec_size_first_order):].reshape( num_channels, scat_vec_size_second_order[0], scat_vec_size_second_order[1]) return scat_vec_first_order, scat_vec_second_order
[docs] def process_waveforms_envelopes_scat_coefficients(self, df_preds): """ Process waveforms, envelopes, and scattering coefficients. This function takes in a DataFrame `df_preds` containing predictions and processes waveforms, envelopes, and scattering coefficients. It loads the scattering coefficient vectorized data using the `load_scat_coef_vectorized` method. It then retrieves the scattering coefficient file list based on the data savepath, data network, data station, data location, and network name. It loads the first scattering coefficient file from the file list and prints the shape of the first-order and second-order scattering coefficient arrays. It calculates the number of channels, first-order scattering vector size, and second-order scattering vector size based on the shape of the scattering coefficient arrays. It creates a list of scattering vectors by iterating over the indices of `df_preds` and calling the `retreive_scat_vector` method with the scattering coefficient vectorized data, the current index, the number of channels, first-order scattering vector size, and second-order scattering vector size. It creates a DataFrame `df_scat_vec` with columns 'scat_vec_first_order' and 'scat_vec_second_order', where the values are obtained from the scattering vector list by extracting the first-order and second-order scattering vectors respectively. Finally, it returns the `df_scat_vec` DataFrame. Parameters: - df_preds (pandas.DataFrame): The DataFrame containing predictions. Returns: - df_scat_vec (pandas.DataFrame): The DataFrame with columns 'scat_vec_first_order' and 'scat_vec_second_order'. """ self.load_scat_coef_vectorized() file_list = glob( f'{self.data_savepath}scatterings/{self.data_network}_{self.data_station}_{self.data_location}_' f'{self.network_name}_scatterings_*.npz') scat_file = np.load(file_list[0]) print(f"First order: {scat_file['scat_coef_0'].shape[1:]}") print(f"Second order: {scat_file['scat_coef_1'].shape[1:]}") num_channels = scat_file['scat_coef_0'].shape[1:][0] first_order = scat_file['scat_coef_0'].shape[1:][1] second_order = scat_file['scat_coef_1'].shape[1:][1:] scat_vec_list = [ self.retreive_scat_vector(self.scat_coef_vectorized, i, num_channels, first_order, second_order) for i in df_preds.index ] df_scat_vec = pd.DataFrame({ 'scat_vec_first_order': [sv[0] for sv in scat_vec_list], 'scat_vec_second_order': [sv[1] for sv in scat_vec_list] }) return df_scat_vec
[docs] def waveforms_envelopes_scat_coefficients_df_merge( self, df_preds, df_scat_vec, ): """ Merge the DataFrames `df_preds` and `df_scat_vec` along the columns axis. Parameters: - df_preds (pandas.DataFrame): The DataFrame containing predictions. - df_scat_vec (pandas.DataFrame): The DataFrame containing scattering coefficients. Returns: - merged_df (pandas.DataFrame): The merged DataFrame with columns from `df_preds` and `df_scat_vec`. """ return pd.concat([df_preds, df_scat_vec], axis=1)
[docs] def plot_waveforms_envelopes_scat_coefficients(self, df_pred_scat_vec: pd.DataFrame, channel: str = 'HHZ', num_waveforms: int = 5, num_scat_coeff_stacking: int = 10, GPU: bool = is_gpu_available()): """ Plot waveforms, envelopes, and scattering coefficients for different clusters. Parameters: - df_pred_scat_vec (pandas.DataFrame): DataFrame with predictions and scattering coefficients. - channel (str): The channel to plot waveforms for (default is 'HHZ'). - num_waveforms (int): Number of waveforms to plot (default is 5). - num_scat_coeff_stacking (int): Number of scattering coefficients to stack (default is 10). - GPU (bool): Flag indicating if GPU is used for processing (default is determined by `is_gpu_available()`). Returns: None """ if GPU: self.net.banks[0].centers = self.net.banks[0].centers.get() self.net.banks[1].centers = self.net.banks[2].centers.get() _, ax = plt.subplots(len(set(df_pred_scat_vec.predictions)), 4, figsize=(20, 10), sharex='col', sharey='col') sv1_mean = np.mean(np.array([sv1[2, :] for sv1 in df_pred_scat_vec.scat_vec_first_order]), axis=0) sv2_mean = np.mean(np.array([sv2[2, :, :] for sv2 in df_pred_scat_vec.scat_vec_second_order]), axis=0) ax[0, 0].set_title('Waveforms') ax[0, 0].margins(x=0) ax[len(set(df_pred_scat_vec.predictions)) - 1, 0].set_xlabel(f'Time samples ({self.network_segment}s)') ax[0, 1].set_title('Envelopes') ax[0, 1].margins(x=0) ax[len(set(df_pred_scat_vec.predictions)) - 1, 1].set_xlabel(f'Time samples ({self.network_segment}s)') ax[0, 2].set_title('First Order Scat. Coeff.') # ax[0,2].set_xscale('log') # ax[0,2].set_yscale('log') ax[len(set(df_pred_scat_vec.predictions)) - 1, 2].set_xlabel('First-order frequency (Hz)') ax[len(set(df_pred_scat_vec.predictions)) - 1, 2].set_xticks(range(0, sv1_mean.shape[0])) first_order_scat = [f'{x:.1f}' for x in reversed(self.net.banks[0].centers)] ax[len(set(df_pred_scat_vec.predictions)) - 1, 2].set_xticklabels(labels=first_order_scat, rotation='vertical') ax[0, 3].set_title('Second Order Scat. Coeff.') # ax[0,3].set_xscale('log') # ax[0,3].set_yscale('log') ax[len(set(df_pred_scat_vec.predictions)) - 1, 3].set_xlabel('First-order frequency (Hz)') ax[len(set(df_pred_scat_vec.predictions)) - 1, 3].set_xticks(range(0, sv1_mean.shape[0])) ax[len(set(df_pred_scat_vec.predictions)) - 1, 3].set_xticklabels(labels=first_order_scat, rotation='vertical') ax[len(set(df_pred_scat_vec.predictions)) - 1, 3].set_yticks(range(0, sv2_mean.shape[1])) second_order_scat = [f'{x:.1f}' for x in self.net.banks[1].centers] ax[len(set(df_pred_scat_vec.predictions)) - 1, 3].set_yticklabels(labels=second_order_scat) for row in range(len(set(df_pred_scat_vec.predictions))): ax[row, 2].set_ylabel('First-order scat. coef.') ax[row, 3].set_ylabel('Second-order freq. (Hz)') num_clusters = list(set(df_pred_scat_vec.predictions)) for cluster_enum, cluster in enumerate(num_clusters): ax[cluster_enum, 0].set_ylabel(f'Cluster {cluster}') wvf0_time = str(df_pred_scat_vec.loc[(df_pred_scat_vec.predictions == cluster) & (df_pred_scat_vec.cluster_rank == 0), 'times_YYYYMMDD'].values[0]) wvf0 = self.get_waveform(start_time=str(wvf0_time), time_second=self.network_segment, channel=channel, envelope=False) if len(wvf0) < 1: print(f' The waveform in the centre of Cluster {cluster} contains no traces. This \n' + ' is most likely a cluster of damaged/missing data.') else: df_scat_vec = df_pred_scat_vec.loc[ (df_pred_scat_vec.predictions == cluster), ].sort_values(by='cluster_rank').reset_index() # WAVEFORMS ax[cluster_enum, 0].plot(self.min_max_vector(wvf0[0].data) - 0.5, 'k', alpha=0.5) wvfs = [ self.get_waveform(start_time=str(st).replace(' ', 'T'), time_second=self.network_segment, channel=channel, envelope=False)[0].data for st in df_scat_vec['times_YYYYMMDD'][1:num_waveforms] ] for wvf_enum, wvf in enumerate(wvfs): ax[cluster_enum, 0].plot(self.min_max_vector(wvf) + wvf_enum + 1 - 0.5, 'b', alpha=0.2) # ENVELOPES env0 = self.get_waveform(start_time=str(wvf0_time), time_second=self.network_segment, channel=channel, envelope=True) ax[cluster_enum, 1].plot(self.min_max_vector(env0[0].data), 'k', alpha=0.5) envs = [ self.get_waveform(start_time=str(st).replace(' ', 'T'), time_second=self.network_segment, channel=channel, envelope=True)[0].data for st in df_scat_vec['times_YYYYMMDD'][1:num_waveforms] ] for env_enum, env in enumerate(envs): ax[cluster_enum, 1].plot(self.min_max_vector(env) + env_enum + 1, 'b', alpha=0.2) # First Order Scat Coeffs sv1_array = np.array([sv1[2, :] for sv1 in df_scat_vec.scat_vec_first_order[:num_scat_coeff_stacking]]) sv1_array = sv1_array - sv1_mean for sv1_row in range(sv1_array.shape[0]): ax[cluster_enum, 2].plot(sv1_array[sv1_row, :], 'b', alpha=0.02) ax[cluster_enum, 2].plot(np.mean(sv1_array, axis=0), 'k', alpha=0.5) # Second Order Scat Coeffs sv2_array = np.array( [sv2[2, :, :] for sv2 in df_scat_vec.scat_vec_second_order[:num_scat_coeff_stacking]]) sv2_array = sv2_array - sv2_mean sv2_array = np.mean(sv2_array, axis=0).T sv2_array = np.flip(sv2_array, 0) print(f'MIN {sv2_array.min()} - MAX {sv2_array.max()}') im = ax[cluster_enum, 3].imshow(sv2_array, aspect='auto', cmap='RdYlBu') plt.colorbar(im, ax=ax[cluster_enum, 3]) filename = (f'{self.data_network}_{self.data_station}_{self.data_location}_{self.network_name}_ICA_' f'{self.ica.n_components}_Clustering_{len(num_clusters)}_Xcorr_ScatCoeff') plt.suptitle(filename) plt.savefig(f'{self.data_savepath}figures/{filename}.png', bbox_inches='tight') plt.show()