Source code for utils.simulate

import numpy as np
from utils import analyze
from scipy.stats import multivariate_normal
from scipy.special import softmax
import pandas as pd

[docs]def generate_usage(module_feature_object, n_samples, random_state=42, mode="log-normal", scale=10): """ Generate simulated pose module usage object :param module_feature_object: module feature object of class ModuleUsage (from analyze.get_module_usage) or ModuleTransitions (from analyze.get_module_transitions) :param n_samples: number of samples to generate :param random_state: random state seed (default: 42) :param mode: 'log-normal' or 'multivariate_gaussian'; default log-normal :param scale: scale factor for variance in log-normal mode (not needed for multivariate_gaussian mode) :return: module_feature_object of the same style as the one input """ if mode=="multivariate_gaussian": np.random.seed(random_state) if module_feature_object.__class__.__name__ == "ModuleUsage": X = module_feature_object.label_counts elif module_feature_object.__class__.__name__ == "ModuleTransitions": X = module_feature_object.transition_counts else: raise ValueError( f'module_feature_object class must be ModuleUsage or ModuleTransitions, not {module_feature_object.__class__}') mean_X = np.mean(X, axis=0) cov_X = np.cov(X.T) simulated_X = np.random.multivariate_normal(mean_X, cov_X, size=n_samples) simulated_X[simulated_X < 0] = 0 simulated_X = simulated_X / np.sum(simulated_X, axis=1)[:, np.newaxis] if module_feature_object.__class__.__name__ == "ModuleUsage": return analyze.ModuleUsage(simulated_X, ["9999999"] * n_samples, ["9999999"] * n_samples, module_feature_object.feat_names, module_feature_object.group_dict, None) elif module_feature_object.__class__.__name__ == "ModuleTransitions": print("Warning - generating module transitions currently not tested for this function") return analyze.ModuleTransitions(simulated_X, simulated_X, ["9999999"] * n_samples, ["9999999"] * n_samples, module_feature_object.feat_names, module_feature_object.group_dict, None) if mode == "log-normal": np.random.seed(random_state) if module_feature_object.__class__.__name__ == "ModuleUsage": X = module_feature_object.label_counts elif module_feature_object.__class__.__name__ == "ModuleTransitions": X = module_feature_object.transition_counts else: raise ValueError( f'module_feature_object class must be ModuleUsage or ModuleTransitions, not {module_feature_object.__class__}') mean_X = np.mean(X, axis=0) cov_X = np.cov(X.T) * (scale**2) Z = np.random.multivariate_normal(mean=mean_X, cov=cov_X, size=n_samples) simulated_X = softmax(Z, axis=1) if module_feature_object.__class__.__name__ == "ModuleUsage": return analyze.ModuleUsage(simulated_X, ["9999999"] * n_samples, ["9999999"] * n_samples, module_feature_object.feat_names, module_feature_object.group_dict, None) elif module_feature_object.__class__.__name__ == "ModuleTransitions": print("Warning - generating module transitions currently not tested for this function") return analyze.ModuleTransitions(simulated_X, simulated_X, ["9999999"] * n_samples, ["9999999"] * n_samples, module_feature_object.feat_names, module_feature_object.group_dict, None)
[docs]def generate_usage_labeled(module_feature_object, n_samples_per_bin, bins, regression, max_iters="default", random_state=42, mode="log-normal", scale=10, verbosity="medium"): """ Generate simulated pose module usage object :param module_feature_object: module feature object of class ModuleUsage (from analyze.get_module_usage) or ModuleTransitions (from analyze.get_module_transitions) :param n_samples_per_bin: number of samples to generate per bin in timebin :param bins: 2D array of bins containing upper and lower limit for each bin :param regression: regression model to label samples :param max_iters: number for how many iterations to attempt to generate samples; will raise an error if exceeded without generating enough samples; number or 'default' for n_samples_total*10 :param random_state: random state seed (default: 42) :param mode: 'log-normal' or 'multivariate_gaussian'; default log-normal :param scale: scaling factor for covariance in log-transformed approach :param verbosity: 'low','medium', or 'high' :return: module_feature_object of the same style as the one input """ if max_iters=="default": max_iters = bins.shape[0] * n_samples_per_bin * 10 if max_iters<250: print_interval = 5 elif max_iters<2000: print_interval = 50 else: print_interval = 500 if mode=="multivariate_gaussian": np.random.seed(random_state) if module_feature_object.__class__.__name__ == "ModuleUsage": X = module_feature_object.label_counts elif module_feature_object.__class__.__name__ == "ModuleTransitions": X = module_feature_object.transition_counts else: raise ValueError( f'module_feature_object class must be ModuleUsage or ModuleTransitions, not {module_feature_object.__class__}') mean_X = np.mean(X, axis=0) cov_X = np.cov(X.T) simulated_X = [] group_labels=[] n_in_bins = np.zeros(bins.shape[0]) count=0 while np.sum(n_in_bins < n_samples_per_bin)>0: count+=1 if verbosity=="high": if count%print_interval==0: print(f"Iter {count}: {n_in_bins}") if verbosity=="medium": if count%print_interval==0: perc_samples_gen = 100 * (np.sum(n_in_bins)/(n_samples_per_bin * bins.shape[0])) print(f"Iter {count}: {perc_samples_gen:.1f}% samples generated ({np.sum(n_in_bins < n_samples_per_bin)} of {bins.shape[0]} bins not filled)") simulated_X_i = np.random.multivariate_normal(mean_X, cov_X, size=1) simulated_X_i[simulated_X_i < 0] = 0 simulated_X_i = simulated_X_i / np.sum(simulated_X_i, axis=1)[:, np.newaxis] for b in range(bins.shape[0]): if n_in_bins[b] < n_samples_per_bin: if ((regression.predict(simulated_X_i)>bins[b,0]) and (regression.predict(simulated_X_i)<bins[b,1])): simulated_X.append(simulated_X_i) n_in_bins[b]=n_in_bins[b]+1 group_labels.append(bins[b,0]) if count>max_iters: raise ValueError(f"Maximum number of iterations ({max_iters}) reached!") simulated_X = np.array(simulated_X) simulated_X = np.squeeze(simulated_X,axis=1) if module_feature_object.__class__.__name__ == "ModuleUsage": return analyze.ModuleUsage(simulated_X, group_labels, group_labels, module_feature_object.feat_names, {i:i for i in bins[:,0]}, None) elif module_feature_object.__class__.__name__ == "ModuleTransitions": print("Warning - generating module transitions currently not tested for this function") return analyze.ModuleTransitions(simulated_X, np.nan, group_labels, module_feature_object.feat_names, {i:i for i in bins[:,0]}, None) if mode == "log-normal": np.random.seed(random_state) if module_feature_object.__class__.__name__ == "ModuleUsage": X = module_feature_object.label_counts elif module_feature_object.__class__.__name__ == "ModuleTransitions": X = module_feature_object.transition_counts else: raise ValueError( f'module_feature_object class must be ModuleUsage or ModuleTransitions, not {module_feature_object.__class__}') mean_X = np.mean(X, axis=0) cov_X = np.cov(X.T) * (scale**2) simulated_X = [] group_labels=[] n_in_bins = np.zeros(bins.shape[0]) count=0 while np.sum(n_in_bins < n_samples_per_bin)>0: count+=1 if verbosity=="high": if count%print_interval==0: print(f"Iter {count}: {n_in_bins}") if verbosity=="medium": if count%print_interval==0: perc_samples_gen = 100 * (np.sum(n_in_bins)/(n_samples_per_bin * bins.shape[0])) print(f"Iter {count}: {perc_samples_gen:.1f}% samples generated ({np.sum(n_in_bins < n_samples_per_bin)} of {bins.shape[0]} bins not filled)") Z = np.random.multivariate_normal(mean=mean_X, cov=cov_X, size=1) simulated_X_i = softmax(Z, axis=1) for b in range(bins.shape[0]): if n_in_bins[b] < n_samples_per_bin: if ((regression.predict(simulated_X_i)>bins[b,0]) and (regression.predict(simulated_X_i)<bins[b,1])): simulated_X.append(simulated_X_i) n_in_bins[b]=n_in_bins[b]+1 group_labels.append(bins[b,0]) if count>max_iters: raise ValueError(f"Maximum number of iterations ({max_iters}) reached!") simulated_X = np.array(simulated_X) simulated_X = np.squeeze(simulated_X,axis=1) if module_feature_object.__class__.__name__ == "ModuleUsage": return analyze.ModuleUsage(simulated_X, group_labels, group_labels, module_feature_object.feat_names, {i:i for i in bins[:,0]}, None) elif module_feature_object.__class__.__name__ == "ModuleTransitions": print("Warning - generating module transitions currently not tested for this function") return analyze.ModuleTransitions(simulated_X, np.nan, group_labels, module_feature_object.feat_names, {i:i for i in bins[:,0]}, None) else: return ValueError("Mode invalid! Must be log-normal or multivariate_gaussian")
[docs]def generate_sequence(config, labels_df, T, n_subjs = 1, random_state=42, verbose=False): """ Generate individual simulated pose module label sequence of length T (noting that T = number of observations, not time) :param config: the config object :param labels_df: labels_df from analyze.get_module_labels :param T: length of sequence to be generated :param n_subjs: number of subjects to generate (if labels_df has subgroups, will be number of subjects PER GROUP) :param random_state: random state seed (default: 42) :param verbose: print progress (default: False) :return: sequence """ if verbose: print("Analyzing input labels dataframe") np.random.seed(random_state) module_usage = analyze.get_module_usage(config, labels_df) module_transitions = analyze.get_module_transitions(config, labels_df) subgroups = np.unique(module_usage.group_labels) n_subgroups = len(subgroups) if n_subgroups==1: start_prob = np.mean(module_usage.label_counts,axis=0) transition_mat = np.mean(module_transitions.transition_count_matrices,axis=0) transition_mat = transition_mat / np.sum(transition_mat, axis=1, keepdims=True) row_sums = np.sum(transition_mat, axis=1) transition_mat = transition_mat / row_sums[:, np.newaxis] transition_mat = np.nan_to_num(transition_mat,nan=1e-6) transition_mat[np.abs(np.sum(transition_mat, axis=1) - 1) > 1e-6] = 1 header = [] for i in range(n_subjs): if verbose: print(f"Generating data for subject {i+1} of {n_subjs}") current_state = np.random.choice(len(start_prob), p=start_prob) sequence = [current_state] for _ in range(1, T): try: current_state = np.random.choice(len(transition_mat), p=transition_mat[current_state]) sequence.append(current_state) except ValueError: sequence.append(np.random.choice(len(start_prob), p=start_prob)) subj_i = f"subj_{i}" header.append(subj_i) labels_i = np.array(sequence) if i == 0: sim_labels_df = pd.DataFrame(labels_i, columns=[header]) else: sim_labels_df.insert(loc=i, value=labels_i, column=subj_i) sim_labels_df = sim_labels_df.copy() else: count=0 total_subjs = int(n_subjs) * int(n_subgroups) reverse_group_dict = {v: u for u, v in module_usage.group_dict.items()} header1=[] header2=[] for g in range(n_subgroups): start_prob = np.mean(module_usage.label_counts[np.array(module_usage.group_labels)==g],axis=0) sub_transition_count_matrices = np.array(module_transitions.transition_count_matrices)[np.array(module_transitions.group_labels)==g,:,:] transition_mat = np.mean(sub_transition_count_matrices,axis=0) transition_mat = transition_mat / np.sum(transition_mat, axis=1, keepdims=True) row_sums = np.sum(transition_mat, axis=1) transition_mat = transition_mat / row_sums[:, np.newaxis] transition_mat = np.nan_to_num(transition_mat,nan=1e-6) transition_mat[np.abs(np.sum(transition_mat, axis=1) - 1) > 1e-6] = 1 group = f"{reverse_group_dict[g]}" for i in range(n_subjs): subj_i = f"{group}_{i}" if verbose: print(f"Generating data for {subj_i} ({count+1} of {total_subjs})") header = subj_i header2.append(header) header1.append(group) current_state = np.random.choice(len(start_prob), p=start_prob) sequence = [current_state] for _ in range(1, T): try: current_state = np.random.choice(len(transition_mat), p=transition_mat[current_state]) sequence.append(current_state) except ValueError: sequence.append(np.random.choice(len(start_prob), p=start_prob)) labels_i = np.array(sequence) if count == 0: sim_labels_df = pd.DataFrame(labels_i, columns=[header]) else: sim_labels_df.insert(loc=count, value=labels_i, column=header) sim_labels_df = sim_labels_df.copy() count+=1 sim_labels_df.columns = [header1, header2] return sim_labels_df