Source code for utils.analyze

import numpy as np
import pandas as pd
import os
import copy
from sklearn.tree import plot_tree
from sklearn.decomposition import PCA
from sklearn.model_selection import LeaveOneOut, cross_val_predict
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.pipeline import make_pipeline
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.linear_model import LogisticRegression as LR
from sklearn.linear_model import LinearRegression, Lasso, Ridge
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
from scipy import stats
from statsmodels.stats.multitest import multipletests
import pickle
import h5py

import scipy
from scipy.spatial import distance

[docs]def is_nonnum(value): try: float(value) return False except (ValueError, TypeError): return True
[docs]def packed_moving_average(x, w): x = np.asarray(x) n = len(x) x_new = np.zeros(n) for i in range(n): win_size = min(w, i + 1, n - i) # Reduce window size at edges x_new[i] = np.mean(x[max(0, i - win_size // 2): min(n, i + win_size // 2 + 1)]) return x_new
[docs]def pickle_dump(object, save_path): if ((save_path.endswith(".pkl")==False) and (save_path.endswith(".pickle")==False)): save_path+=".pickle" print(f"Save path changed to {save_path}") with open(save_path, 'wb') as file: pickle.dump(object, file)
[docs]def pickle_load(object_path): with open(object_path, 'rb') as file: obj = pickle.load(file) return obj
[docs]class KeypointFeature: def __init__(self, keypoint_feature, group_labels, observation_labels, feat_names, group_dict, scaler, feature_type): self.keypoint_feature = keypoint_feature self.group_labels = group_labels self.observation_labels = observation_labels self.feat_names = feat_names self.group_dict = group_dict self.scaler = scaler mean_check = np.allclose(np.mean(keypoint_feature, axis=0), 0, atol=0.1) std_check = np.allclose(np.std(keypoint_feature, axis=0), 1, atol=0.1) self.scaled = mean_check and std_check self.feature_type = feature_type
[docs] def to_df(self): colnames = [] flip_group_dict = {v: k for k, v in self.group_dict.items()} bad_group_labels = [] for i in self.group_labels: if i in flip_group_dict.keys(): colnames.append(flip_group_dict[i]) else: colnames.append("none") bad_group_labels.append(i) if len(bad_group_labels) > 0: bad_group_labels = list(np.unique(bad_group_labels)) print(f"Warning: the following groups were not detected in the config and were labeled 'none'; continuing:\n{bad_group_labels}") df = pd.DataFrame(self.keypoint_feature, columns=self.feat_names, index=self.observation_labels) df["group"] = colnames return df
[docs] def scale(self): scaler = StandardScaler() keypoint_feature_scaled = scaler.fit_transform(self.keypoint_feature) return KeypointFeature(keypoint_feature_scaled, self.group_labels, self.observation_labels, self.feat_names, self.group_dict, scaler, self.feature_type)
[docs] def save(self, save_path): if ((save_path.endswith(".pkl")==False) and (save_path.endswith(".pickle")==False)): save_path+=".pickle" print(f"Save path changed to {save_path}") with open(save_path, 'wb') as file: pickle.dump(self, file)
[docs]def get_module_labels(config, start, stop, subgroups = None): """ Generates a Pandas dataframe containing the labels for every frame in the specified time range for the video paths in defined groups. :param config: the config :param start: time in seconds to start dataframe from. :param stop: time in seconds to stop dataframe at. :param fps: frames per second of recording. :param subgroups: subgroups to include; by default, None will result in an object without data subgrouped; could alternatively be a list of subgroup names from config or "all" (to include all subgroups present in config) :return: labels dataframe """ fps = config["fps"] start_frame = start * int(fps) stop_frame = stop * int(fps) if subgroups==None: count = 0 header = [] if config["data_source"]=="B-SOiD": label_paths=[i for i in config["project_files"] if "labels_" in i] for i in range(len(label_paths)): header.append(label_paths[i]) labels_i = np.loadtxt( str(config["data_directory"] + "/" + label_paths[i]), delimiter=",", skiprows=3, usecols=1)[start_frame:stop_frame] if i == 0: labels_df = pd.DataFrame(labels_i, columns=[header]) else: labels_df = labels_df.copy() labels_df.insert(loc=count, value=labels_i, column=label_paths[i]) count = count + 1 elif config["data_source"]=="Keypoint-MoSeq": label_paths=config["project_files"] for i in range(len(label_paths)): header.append(label_paths[i]) labels_i = np.loadtxt( str(config["data_directory"] + "/" + label_paths[i]), delimiter=",", skiprows=1, usecols=0)[start_frame:stop_frame] if i == 0: labels_df = pd.DataFrame(labels_i, columns=[header]) else: labels_df.insert(loc=count, value=labels_i, column=label_paths[i]) labels_df = labels_df.copy() count = count + 1 elif config["data_source"]=="VAME": data_directory = config["data_directory"] model_path = "/VAME/"+config["vame_model_name"]+"/" for f in range(len(config["project_files"])): header.append(config["project_files"][f]) datstr = [i for i in os.listdir(data_directory + "/" + config["project_files"][f] + model_path) if "_label_" in i][0] labels_i = np.load(data_directory + "/" + config["project_files"][f] + model_path + datstr)[ start_frame:stop_frame] if f == 0: labels_df = pd.DataFrame(labels_i, columns=[header]) else: labels_df.insert(loc=count, value=labels_i, column=config["project_files"][f]) labels_df = labels_df.copy() count = count + 1 elif config["data_source"]=="MotionMapper": label_paths=config["project_files"] for i in range(len(label_paths)): header.append(label_paths[i]) labels_i = scipy.io.loadmat(config["data_directory"]+"/"+label_paths[i])["ethogram_data"] labels_i = np.sum(labels_i,axis=1) if i == 0: labels_df = pd.DataFrame(labels_i, columns=[header]) else: labels_df.insert(loc=count, value=labels_i, column=label_paths[i]) labels_df = labels_df.copy() count = count + 1 else: if subgroups=="all": subgroups=list(config["subgroups"].keys()) n_groups = len(subgroups) count = 0 header1 = [] header2 = [] if config["data_source"]=="B-SOiD": for g in range(n_groups): label_paths_g = [i for i in config["subgroups"][subgroups[g]] if "labels_" in i] for i in range(len(label_paths_g)): header = label_paths_g[i] header2.append(header) header1.append(subgroups[g]) labels_i = np.loadtxt( str(config["data_directory"] + "/" + label_paths_g[i]), delimiter=",", skiprows=3, usecols=1)[start_frame:stop_frame] if i == 0 and g == 0: labels_df = pd.DataFrame(labels_i, columns=[header]) else: labels_df.insert(loc=count, value=labels_i, column=header) labels_df = labels_df.copy() count = count + 1 labels_df.columns = [header1, header2] elif config["data_source"]=="Keypoint-MoSeq": for g in range(n_groups): label_paths_g = config["subgroups"][subgroups[g]] for i in range(len(label_paths_g)): header = label_paths_g[i] header2.append(header) header1.append(subgroups[g]) labels_i = np.loadtxt( str(config["data_directory"] + "/" + label_paths_g[i]), delimiter=",", skiprows=1, usecols=0)[start_frame:stop_frame] if i == 0 and g == 0: labels_df = pd.DataFrame(labels_i, columns=[header]) else: labels_df.insert(loc=count, value=labels_i, column=header) labels_df = labels_df.copy() count = count + 1 labels_df.columns = [header1, header2] elif config["data_source"]=="VAME": data_directory = config["data_directory"] model_path = "/VAME/"+config["vame_model_name"]+"/" for g in range(n_groups): label_paths_g = config["subgroups"][subgroups[g]] for f in range(len(label_paths_g)): header = label_paths_g[f] header2.append(header) header1.append(subgroups[g]) datstr = [i for i in os.listdir(data_directory + "/" + label_paths_g[f] + model_path) if "_label_" in i][0] labels_i = np.load(data_directory + "/" + label_paths_g[f] + model_path + datstr)[ start_frame:stop_frame] if f == 0 and g == 0: labels_df = pd.DataFrame(labels_i, columns=[header]) else: labels_df.insert(loc=count, value=labels_i, column=header) labels_df = labels_df.copy() count = count + 1 labels_df.columns = [header1, header2] elif config["data_source"]=="MotionMapper": for g in range(n_groups): label_paths_g = config["subgroups"][subgroups[g]] for i in range(len(label_paths_g)): header = label_paths_g[i] header2.append(header) header1.append(subgroups[g]) labels_i = scipy.io.loadmat(config["data_directory"]+"/"+label_paths_g[i])["ethogram_data"] labels_i = np.sum(labels_i,axis=1) if i == 0 and g == 0: labels_df = pd.DataFrame(labels_i, columns=[header]) else: labels_df.insert(loc=count, value=labels_i, column=header) count = count + 1 labels_df.columns = [header1, header2] return labels_df
[docs]def get_keypoint_travel(PE_config, keypoint, start, end, binsize=None, thresh=70, selected_subgroups="all", return_as_df=True): """ Measure keypodint distance travelled across group of subjects :param PE_config: pose estimation config - used for FPS only :param keypoint: name of keypoint :param start: start time in seconds :param end: end time in seconds :param binsize: if binned output is desired, size of timebins (default is None for no binning) :param thresh: threshold for when distance 'jump' is too large and should be excluded; default 70 :param selected_subgroups: subgroups to analyze as defined in config; default "all" :param return_as_df: return as a dataframe or return as KeypointTravel class (for embedding, classification) :return: """ if selected_subgroups=="all": selected_subgroups=PE_config["subgroups"].keys() group_dict = {selected_subgroups[i]: i for i in range(len(selected_subgroups))} group_labels=[] travel_data=[] observation_labels=[] for g, group in enumerate(selected_subgroups): for s, sess in enumerate(PE_config["subgroups"][group]): observation_labels.append(sess) group_labels.append(group_dict[group]) filepath = PE_config["data_directory"]+"/"+sess if PE_config["data_source"]=="DeepLabCut": data = pd.read_csv(filepath, header=[1, 2]) elif PE_config["data_source"]=="OpenFace": data = read_openface_csv(PE_config, filepath) elif PE_config["data_source"]=="SLEAP": final_filepath, track_name = tuple(filepath.split(":::")) data = read_sleap_csv(final_filepath, track_name) if binsize == None: binsize = end - start nbins = 1 else: nbins = int((end - start) / binsize) dist_i = np.zeros(nbins) fps = int(PE_config["fps"]) for b in range(nbins): x = np.array(data[keypoint]['x'])[(start + b * binsize) * fps:(start + (b + 1) * binsize) * fps] y = np.array(data[keypoint]['y'])[(start + b * binsize) * fps:(start + (b + 1) * binsize) * fps] for i in range(len(x) - 1): if np.absolute(x[i + 1] - x[i]) > thresh: x[i + 1] = x[i] if np.absolute(y[i + 1] - y[i]) > thresh: y[i + 1] = y[i] dist_i[b] = dist_i[b] + np.sqrt((x[i + 1] - x[i]) ** 2 + (y[i + 1] - y[i]) ** 2) travel_data.append(dist_i) travel_data = np.array(travel_data) if binsize is not None: feat_names=np.arange(start/60,end/60,binsize/60) else: feat_names="travel" if return_as_df: kf = KeypointFeature(travel_data, group_labels, observation_labels, feat_names, group_dict, None, "travel") kf_df = kf.to_df() return kf_df else: return KeypointFeature(travel_data, group_labels, observation_labels, feat_names, group_dict, None, "travel")
[docs]def ego_center(config, data, keypoint_ego1, keypoint_ego2): """ Ego centering a single pose estimation dataframe :param config: config :param data: pose estimation pandas dataframe in DLC format :param keypoint_ego1: first keypoint to use for establishing egocentric alignment axis :param keypoint_ego2: second keypoint to use for establishing egocentric alignment axis :return: ego_centered pandas dataframe """ x_coords = data.xs("x", level=1, axis=1) y_coords = data.xs("y", level=1, axis=1) ego1 = np.stack([x_coords[keypoint_ego1], y_coords[keypoint_ego1]], axis=-1) ego2 = np.stack([x_coords[keypoint_ego2], y_coords[keypoint_ego2]], axis=-1) origin = (ego1 + ego2) / 2 delta = ego2 - ego1 theta = np.arctan2(delta[:, 0], delta[:, 1]) sin_theta = np.sin(theta) cos_theta = np.cos(theta) new_data = {} for keypoint in config["keypoints"]: point = np.stack([x_coords[keypoint], y_coords[keypoint]], axis=-1) new_point = point - origin rot_x = new_point[:, 0] * cos_theta - new_point[:, 1] * sin_theta rot_y = new_point[:, 0] * sin_theta + new_point[:, 1] * cos_theta new_data[(keypoint, "x")] = rot_x new_data[(keypoint, "y")] = rot_y return pd.DataFrame(new_data, index=data.index)
[docs]def read_openface_csv(of_config, filepath): data = pd.read_csv(filepath) data.columns = data.columns.str.replace(' ', '') data_dlc = {} for keypoint in of_config["keypoints"]: keypoints_x = ["x" + keypoint.split("kp")[1], "X" + keypoint.split("kp")[1]] keypoints_x = [i for i in keypoints_x if i in data.columns] data_dlc[(keypoint, "x")] = data[keypoints_x].to_numpy().T[0] keypoints_y = ["y" + keypoint.split("kp")[1], "Y" + keypoint.split("kp")[1]] keypoints_y = [i for i in keypoints_y if i in data.columns] data_dlc[(keypoint, "y")] = data[keypoints_y].to_numpy().T[0] return pd.DataFrame(data_dlc)
[docs]def read_sleap_csv(filepath, track_name): with h5py.File(filepath, "r") as f: node_names = f['node_names'][:] node_names = [node.decode('utf-8') for node in node_names] track_names = f['track_names'][:] track_names = [tr.decode('utf-8') for tr in track_names] track_occupancy = f['track_occupancy'][:] tracks = f['tracks'][:] track_idx = np.argmax([i == track_name for i in track_names]) df = {} for n, node in enumerate(node_names): df[(node, "x")] = tracks[track_idx][0][n] df[(node, "y")] = tracks[track_idx][1][n] df = pd.DataFrame(df) return pd.DataFrame(df)
[docs]def get_keypoint_kinematics(PE_config, keypoint_ego1, keypoint_ego2, start, end, binsize=None, metric="angle", thresh=70, selected_subgroups="all", return_as_df=True, verbose=False): """ Measure keypoint kinematics (egocentrically aligned angle, distance, and travel of keypoints) across group of subjects :param PE_config: pose estimation config - used for FPS only :param keypoint1: first keypoint to use for establishing egocentric alignment axis :param keypoint2: second keypoint to use for establishing egocentric alignment axis :param start: start time in seconds :param end: end time in seconds :param binsize: if binned output is desired, size of timebins (default is None for no binning) :param metric: metric to assess for all non-keypoint1/2 keypoints; options are "angle_m" (for mean angle), "angle_sd" (for std deviation of angle), "distance_m" (for mean distance of keypoints to ego-center), "distance_sd" (for std deviation of distance of keypoints to ego-center), or "travel" (for movement of keypoints relative to ego-center) :param thresh: threshold for when distance 'jump' is too large and should be excluded; default 70 :param selected_subgroups: subgroups to analyze as defined in config; default "all" :param return_as_df: return as a dataframe or return as KeypointTravel class (for embedding, classification) :return: """ if metric=="travel": feature_type = "Keypoint travel relative to ego-center" elif metric=="angle_m": feature_type = "Keypoint mean angle relative to ego-center" elif metric=="angle_sd": feature_type = "Keypoint std deviation angle relative to ego-center" elif metric=="distance_m": feature_type = "Keypoint mean distance to ego-center" elif metric=="distance_sd": feature_type = "Keypoint std deviation distance to ego-center" else: return ValueError(f"Invalid metric ({metric}) - must be one of: 'angle_m' (for mean angle), 'angle_sd' (for std deviation of angle), 'distance_m' (for mean distance of keypoints to ego-center), 'distance_sd' (for std deviation of distance of keypoints to ego-center), or 'travel' (for movement of keypoints relative to ego-center)") if selected_subgroups == "all": selected_subgroups = PE_config["subgroups"].keys() group_dict = {selected_subgroups[i]: i for i in range(len(selected_subgroups))} non_ego_keypoints = [i for i in PE_config["keypoints"] if ((i != keypoint_ego1) and (i != keypoint_ego2))] group_labels = [] kinematic_data = [] observation_labels = [] n_observations = np.sum([len(PE_config["subgroups"][group]) for group in selected_subgroups]) count = 0 for g, group in enumerate(selected_subgroups): for s, sess in enumerate(PE_config["subgroups"][group]): count += 1 observation_labels.append(sess) if verbose: print(f"Working on {metric} kinematics for observation {count} of {n_observations}") group_labels.append(group_dict[group]) filepath = PE_config["data_directory"] + "/" + sess if PE_config["data_source"] == "DeepLabCut": data = pd.read_csv(filepath, header=[1, 2]) if verbose: print("Ego-centering") data_new = ego_center(PE_config, data, keypoint_ego1, keypoint_ego2) data = data_new elif PE_config["data_source"] == "OpenFace": data = read_openface_csv(PE_config,filepath) if verbose: print("Ego-centering") data_new = ego_center(PE_config, data, keypoint_ego1, keypoint_ego2) data = data_new elif PE_config["data_source"]=="SLEAP": final_filepath, track_name = tuple(filepath.split(":::")) data = read_sleap_csv(final_filepath, track_name) if verbose: print("Ego-centering") data_new = ego_center(PE_config, data, keypoint_ego1, keypoint_ego2) data = data_new if binsize == None: binsize = end - start nbins = 1 else: nbins = int((end - start) / binsize) kin_i = np.zeros(nbins * len(non_ego_keypoints)) fps = int(PE_config["fps"]) if verbose: print(f"Computing {metric}") for kp, keypoint in enumerate(non_ego_keypoints): for b in range(nbins): x = np.array(data[keypoint]['x'])[(start + b * binsize) * fps:(start + (b + 1) * binsize) * fps] y = np.array(data[keypoint]['y'])[(start + b * binsize) * fps:(start + (b + 1) * binsize) * fps] for i in range(len(x) - 1): if np.absolute(x[i + 1] - x[i]) > thresh: x[i + 1] = x[i] if np.absolute(y[i + 1] - y[i]) > thresh: y[i + 1] = y[i] if metric == "travel": kin_i[(kp * nbins) + b] = kin_i[(kp * nbins) + b] + np.sqrt( (x[i + 1] - x[i]) ** 2 + (y[i + 1] - y[i]) ** 2) if metric == "angle_m": kin_i[(kp * nbins) + b] = np.mean(np.arctan2(y, x)) if metric == "angle_sd": kin_i[(kp * nbins) + b] = np.std(np.arctan2(y, x)) elif metric == "distance_m": kin_i[(kp * nbins) + b] = np.mean(np.sqrt((x ** 2) + (y ** 2))) elif metric == "distance_sd": kin_i[(kp * nbins) + b] = np.std(np.sqrt((x ** 2) + (y ** 2))) kinematic_data.append(kin_i) kinematic_data = np.array(kinematic_data) feat_names = [] for kp in non_ego_keypoints: if binsize is not None: feat_name_kp = np.arange(start / 60, end / 60, binsize / 60) feat_names += [metric + "_" + kp + "_" + str(i) for i in feat_name_kp] else: feat_names.append(metric + "_" + kp) if return_as_df: kf = KeypointFeature(kinematic_data, group_labels, observation_labels, feat_names, group_dict, None, feature_type) kf_df = kf.to_df() return kf_df else: return KeypointFeature(kinematic_data, group_labels, observation_labels, feat_names, group_dict, None, feature_type)
[docs]class ActionUnits: def __init__(self, action_units, group_labels, observation_labels, feat_names, group_dict, scaler): self.action_units = action_units self.group_labels = group_labels self.observation_labels = observation_labels self.feat_names = feat_names self.group_dict = group_dict self.scaler = scaler mean_check = np.allclose(np.mean(action_units, axis=0), 0, atol=0.1) std_check = np.allclose(np.std(action_units, axis=0), 1, atol=0.1) self.scaled = mean_check and std_check
[docs] def to_df(self): colnames = [] flip_group_dict = {v: k for k, v in self.group_dict.items()} bad_group_labels = [] for i in self.group_labels: if i in flip_group_dict.keys(): colnames.append(flip_group_dict[i]) else: colnames.append("none") bad_group_labels.append(i) if len(bad_group_labels) > 0: bad_group_labels = list(np.unique(bad_group_labels)) print(f"Warning: the following groups were not detected in the config and were labeled 'none'; continuing:\n{bad_group_labels}") df = pd.DataFrame(self.action_units, columns=self.feat_names, index=self.observation_labels) df["group"] = colnames return df
[docs] def scale(self): scaler = StandardScaler() action_units_scaled = scaler.fit_transform(self.action_units) return ActionUnits(action_units_scaled, self.group_labels, self.observation_labels, self.feat_names, self.group_dict, scaler)
[docs] def save(self, save_path): if ((save_path.endswith(".pkl")==False) and (save_path.endswith(".pickle")==False)): save_path+=".pickle" print(f"Save path changed to {save_path}") with open(save_path, 'wb') as file: pickle.dump(self, file)
[docs]def get_action_units(config, start, end, binsize=None, selected_subgroups=None, aus_to_include="all"): """ Function for getting mean action units :param config: project config :param start: start time (in s) :param end: end time (in s) :param binsize: optional - temporal bin size (default None for no binning) :param selected_subgroups: otpional - selected subgroups :param aus_to_include: list of integers corresponding to action units to include, or "all" for all; default "all" :return: ActionUnits """ all_au_data = {} group_labels = [] observation_labels = [] if binsize == None: binsize = end - start nbins = 1 else: nbins = int((end - start) / binsize) if selected_subgroups == None: warning_printed=False group_dict = {"no_assigned_subgroup": 0} for subj in config["project_files"]: df = pd.read_csv(config["data_directory"] + "/" + subj) df.columns = df.columns.str.replace(' ', '') au_cols = [i for i in df.columns if ((i.startswith("AU")) and (i.endswith("r")))] if aus_to_include!="all": au_cols_subset = [] full_au_cols = au_cols.copy() bad_aus = [] for au_idx in aus_to_include: au_to_add = [au_i for au_i in au_cols if ((au_i.lower()==f"au{int(au_idx)}_r") or (au_i.lower()==f"au0{int(au_idx)}_r"))] au_cols_subset+=au_to_add if len(au_to_add)==0: bad_aus.append(au_idx) if ((len(bad_aus)>0) and (warning_printed==False)): print(f"Warning - the following AUs were not included as they were not found in the dataset: {bad_aus}; valid AUs are {full_au_cols}") warning_printed=True au_cols = au_cols_subset au_df = df[au_cols] au_data = None for b in range(nbins): df_i = au_df.iloc[int(int(config["fps"]) * (start + b) * binsize):int( int(config["fps"]) * (start + b + 1) * binsize)].mean(axis=0) df_i.index = [i + "_t" + str((start + b) * binsize) + "-t" + str((start + b + 1) * binsize) for i in list(df_i.index)] if type(au_data) is type(None): au_data = df_i else: au_data = pd.concat([au_data, df_i]) all_au_data[subj] = au_data group_labels.append(0) observation_labels.append(subj) au_df = pd.DataFrame(all_au_data) au_arr = np.array(au_df.T) feat_names = list(au_df.index) else: group_dict = {} warning_printed = False for g, group in enumerate(selected_subgroups): group_dict[group] = g for subj in config["subgroups"][group]: df = pd.read_csv(config["data_directory"] + "/" + subj) df.columns = df.columns.str.replace(' ', '') au_cols = [i for i in df.columns if ((i.startswith("AU")) and (i.endswith("r")))] if aus_to_include!="all": au_cols_subset = [] full_au_cols = au_cols.copy() bad_aus = [] for au_idx in aus_to_include: au_to_add = [au_i for au_i in au_cols if ((au_i.lower()==f"au{int(au_idx)}_r") or (au_i.lower()==f"au0{int(au_idx)}_r"))] au_cols_subset+=au_to_add if len(au_to_add)==0: bad_aus.append(au_idx) if ((len(bad_aus)>0) and (warning_printed==False)): print(f"Warning - the following AUs were not included as they were not found in the dataset: {bad_aus}; valid AUs are {full_au_cols}") warning_printed=True au_cols = au_cols_subset au_df = df[au_cols] if binsize == None: binsize = end - start nbins = 1 else: nbins = int((end - start) / binsize) au_data = None for b in range(nbins): df_i = au_df.iloc[int(int(config["fps"]) * (start + b) * binsize):int( int(config["fps"]) * (start + b + 1) * binsize)].mean(axis=0) df_i.index = [i + "_t" + str((start + b) * binsize) + "-t" + str((start + b + 1) * binsize) for i in list(df_i.index)] if type(au_data) is type(None): au_data = df_i else: au_data = pd.concat([au_data, df_i]) all_au_data[subj] = au_data group_labels.append(g) observation_labels.append(subj) au_df = pd.DataFrame(all_au_data) au_arr = np.array(au_df.T) feat_names = list(au_df.index) return ActionUnits(au_arr, group_labels, observation_labels, feat_names, group_dict, None)
[docs]def BORIS_to_pose(config, verbose = True): """ Intake paired BORIS one-hot-encoded observation files and pose segmented files and align them to see what behaviors line up with what pose modules :param config: the config :return: """ boris_directory = config["boris_directory"] boris_to_pose_pairings = config["boris_to_pose_pairings"] results=None config_modulo = copy.deepcopy(config) config_modulo["project_files"]=[pairing[1] for pairing in boris_to_pose_pairings if pairing[0] is not None] labels_df = get_module_labels(config_modulo, 0, 1200) labels_flat = np.array(labels_df) labels_flat = [item for sublist in labels_flat for item in sublist] ## old way # modules = np.unique(labels_flat) # n_modules = len(modules) # modules = np.unique(labels_df.values.flatten()) n_modules=int(config["n_modules"]) modules = np.arange(0,n_modules,1) for pairing in boris_to_pose_pairings: if pairing[0]==None: continue else: config_modulo = copy.deepcopy(config) config_modulo["project_files"]=[pairing[1]] labels_df = get_module_labels(config_modulo,0,1200) boris_i = pd.read_csv(boris_directory + "/" + pairing[0]) boris_i["frame"] = np.round(boris_i["time"] / (1 / int(config["fps"]))) boris_i = boris_i.drop_duplicates(subset='frame', keep='first') fr = np.min([len(labels_df), np.max(boris_i["frame"])]) boris_i=boris_i[0:int(fr)] labels_df = labels_df[0:int(fr)] boris_i = boris_i.drop(["time", "frame"], axis=1) boris_i = boris_i.reset_index(drop=True) behaviors = list(boris_i.columns) if results is None: results = pd.DataFrame(data=np.zeros([n_modules,len(behaviors)]),columns=behaviors,index=modules) results_i = pd.DataFrame(columns=behaviors, index=modules) for behavior in behaviors: mask = boris_i[behavior]>0 masked_labels = labels_df[mask==True] for module in range(n_modules): results_i.at[module,behavior] = np.sum(masked_labels==module).to_numpy()[0] results=results+results_i if verbose: print("done with " + str(pairing)) results=results.T column_sums = results.sum() column_sums = column_sums.replace(0, 1) normalized_results = results / column_sums loss_score = np.sum(results.sum()-results.max())/np.sum(results.sum()) if verbose: print(config["data_source"] + " modules map onto scored behaviors with 'loss' of " + str(loss_score)) return results, normalized_results, loss_score
[docs]def combine_pose_modules(config, labels_df, force_no_numeric=False): """ Combine pose modules based on remappings key in config :param config: config :param labels_df: from label_counter (subgroups or no_subgroups) :return: labels_df_remapped """ try: n_rules=len(config["remappings"]) print("Applying " + str(n_rules) + " remapping rules from config.") for r, remapping in enumerate(config["remappings"]): print(str(r+1) + " of " + str(n_rules) + ": Remapping " + str(remapping[0]) + " to " + str(remapping[1])) if remapping[0] is not None: for old_val in remapping[0]: for column in labels_df.columns: labels_df.replace(old_val, remapping[1], inplace=True) except: raise ValueError("Could not apply module remappings - check to see if something is wrong in the config.") if force_no_numeric: labels_flat = np.array(labels_df) labels_flat = [item for sublist in labels_flat for item in sublist] modules = np.unique(labels_flat) nonnum_vals = [is_nonnum(i)==False for i in modules] if np.sum(nonnum_vals)>0: print("Warning - numeric values detected and force_no_numeric set to True. The following modules will be changed to 'other':") print(modules[nonnum_vals]) for module in modules[nonnum_vals]: for column in labels_df.columns: labels_df.replace(module, "other", inplace=True) try: labels_df.replace(int(module), "other", inplace=True) except: pass try: labels_df.replace(float(module), "other", inplace=True) except: pass return labels_df
[docs]def make_remappings_from_BORIS(config, labels_df=None, BORIS_to_pose_mat=None, force_no_numeric=False): """ Make remappings based on BORIS output and apply to labels_df :param config: config :param labels_df: :param BORIS_to_pose_mat: the non-normalized result matrix (first result option) from BORIS_to_pose :return: """ BORIS_to_pose_mat_numeric = BORIS_to_pose_mat.apply(pd.to_numeric, errors='coerce') new_mappings = list(BORIS_to_pose_mat_numeric.idxmax()) new_mappings = ['other' if x not in BORIS_to_pose_mat.index else x for x in new_mappings] old_mappings =list(BORIS_to_pose_mat_numeric.idxmax().index) remappings = [[[old_mappings[i]],new_mappings[i]] for i in range(len(old_mappings))] config["remappings"] = remappings if labels_df is not None: combine_pose_modules(config, labels_df, force_no_numeric=force_no_numeric) return labels_df else: return config
[docs]class ModuleUsage: def __init__(self, label_counts, group_labels, observation_labels, feat_names, group_dict, scaler): self.label_counts = label_counts self.group_labels = group_labels self.observation_labels = observation_labels self.feat_names = feat_names self.group_dict = group_dict self.scaler = scaler mean_check = np.allclose(np.mean(label_counts, axis=0), 0, atol=0.1) std_check = np.allclose(np.std(label_counts, axis=0), 1, atol=1) self.scaled = mean_check and std_check
[docs] def to_df(self): colnames = [] flip_group_dict = {v: k for k, v in self.group_dict.items()} bad_group_labels = [] for i in self.group_labels: if i in flip_group_dict.keys(): colnames.append(flip_group_dict[i]) else: colnames.append("none") bad_group_labels.append(i) if len(bad_group_labels) > 0: bad_group_labels = list(np.unique(bad_group_labels)) print(f"Warning: the following groups were not detected in the config and were labeled 'none'; continuing:\n{bad_group_labels}") df = pd.DataFrame(self.label_counts, columns=self.feat_names, index=self.observation_labels) df["group"] = colnames return df
[docs] def collapse_timebins(self): colnames = [] flip_group_dict = {v: k for k, v in self.group_dict.items()} for i in self.group_labels: colnames.append(flip_group_dict[i]) df = pd.DataFrame(self.label_counts, columns=self.feat_names, index=colnames) extracted = [i.split("_")[0] for i in df.columns] modules = pd.Series(extracted).unique() df_notimebins = pd.DataFrame(index=df.index, columns=modules) for module in modules: df_notimebins[module] = df.filter(like=module + "_").mean(axis=1) return ModuleUsage(np.array(df_notimebins), self.group_labels, self.observation_labels, df_notimebins.columns, self.group_dict, self.scaler)
[docs] def usage_density(self, convolve=False, window=5): df = self.to_df() modules = np.unique([i.split("module")[1].split("_")[0] for i in list(df.columns) if "module" in i]) if is_nonnum(modules[0]): modules = sorted(modules) else: modules = sorted([int(m) for m in modules]) start_times = [int(col.split("_t")[1].split("-")[0]) for col in list(df.columns) if f"module{modules[0]}_" in col] subjs = list(df.index) data = [] for subj in subjs: new_df = pd.DataFrame(columns=start_times, index=modules) for module in modules: subj_module_arr = np.array(df.filter(like=f"module{module}_", axis=1).loc[subj]) if convolve: subj_module_arr = packed_moving_average(subj_module_arr, window) new_df.loc[module] = subj_module_arr data.append(new_df) return data
[docs] def apply_picks(self, pick_names): feats = self.to_df().columns if set(pick_names) <= set(feats): colnames = [] flip_group_dict = {v: k for k, v in self.group_dict.items()} for i in self.group_labels: colnames.append(flip_group_dict[i]) df = pd.DataFrame(self.label_counts, columns=self.feat_names, index=colnames) df_sub = df[pick_names] return ModuleUsage(np.array(df_sub), self.group_labels, self.observation_labels, df_sub.columns, self.group_dict, self.scaler) else: print( "Picks not found in features, presuming that you are applying no-bin picks to binned data and adjusting accordingly...") print("Input picks: {}".format(pick_names)) new_picks = [] for pick in pick_names: new_picks.extend([i for i in feats if pick + "_" in i]) print("Applied picks: {}".format(new_picks)) picked_feats = self.apply_picks(new_picks) return picked_feats
[docs] def scale(self): scaler = StandardScaler() label_counts_scaled = scaler.fit_transform(self.label_counts) return ModuleUsage(label_counts_scaled, self.group_labels, self.observation_labels, self.feat_names, self.group_dict, scaler)
[docs] def f_oneway(self): df = self.to_df() all_mod_usage = {} module_names = [i for i in list(df.columns) if i != "group"] classes = list(df[("group")].unique()) results_df = [] for m in module_names: mod_m_usage = [] for class_c in classes: mod_m_usage.append(df[df["group"] == class_c][m].to_numpy()) all_mod_usage[m] = mod_m_usage result = stats.f_oneway(*all_mod_usage[m]) results_df.append({"module": m, "f": result.statistic, "p_uncorr": result.pvalue}) results_df = pd.DataFrame(results_df) p_uncorr = np.array(results_df["p_uncorr"]) results_df["p_corr"] = multipletests(p_uncorr, method="fdr_bh")[1] return results_df
[docs] def save(self, save_path): if ((save_path.endswith(".pkl")==False) and (save_path.endswith(".pickle")==False)): save_path+=".pickle" print(f"Save path changed to {save_path}") with open(save_path, 'wb') as file: pickle.dump(self, file)
[docs]def get_module_usage(config, labels_df, binsize=None, modules_altered=False): """ Reshape labels dataframe from label_counter_subgroups to be an array of features :param config: config object :param labels_df: labels dataframe from label_counter_subgroups :param binsize: width of bins in seconds; if None, no binning is performed :param modules_altered: must be true if modules have been remapped :return: object of class ModuleUsage """ data_subgrouped = False try: list(labels_df.columns.get_level_values(1).unique()) subgroups = list(labels_df.columns.get_level_values(0).unique()) group_dict = {subgroups[i]: i for i in range(len(subgroups))} data_subgrouped = True except IndexError: subgroups = list(labels_df.columns.get_level_values(0).unique()) group_dict = {"no_assigned_subgroup": 0} data_subgrouped = False n_groups = len(subgroups) fps = int(config["fps"]) labels_flat = np.array(labels_df) labels_flat = [item for sublist in labels_flat for item in sublist] if modules_altered: modules = np.unique(labels_flat) n_modules = len(modules) else: n_modules = int(config["n_modules"]) modules = np.arange(0, n_modules, 1) modules_detect = np.unique(labels_flat) for m in modules_detect: if is_nonnum(m): print("Warning, detected non-numeric modules - consider setting 'modules_altered' to True (False by defualt)") break label_counts = [] if binsize == None: binsize = (labels_df.index.stop - labels_df.index.start) / int(config["fps"]) nbins = int(labels_df.shape[0] / (binsize * fps)) feat_names_made = False feat_names = [] group_labels = [] observation_labels = [] for g in range(n_groups): for i in range(len(labels_df[subgroups[g]].columns)): label_counts_i = np.zeros(n_modules * nbins) for b in range(nbins): binstart = int(b * (binsize * fps)) binstop = int((b + 1) * (binsize * fps)) labels_df_sub = labels_df[binstart:binstop] for m,mod in enumerate(modules): label_counts_i[m + n_modules * b] = np.count_nonzero( labels_df_sub[subgroups[g]][[labels_df_sub[subgroups[g]].columns[i]]] == mod) / (binsize * fps) if feat_names_made == False: if is_nonnum(mod): modname=mod else: modname=str(int(mod)) if nbins > 1: feat_names.append(f"module{modname}_t{int(binstart / fps)}-{int(binstop / fps)}") else: feat_names.append(f"module{modname}") label_counts.append(label_counts_i) if data_subgrouped: group_labels.append(g) observation_labels.append(labels_df[subgroups[g]].columns[i]) else: group_labels.append(0) observation_labels.append(labels_df[subgroups[g]].columns[i][0]) feat_names_made = True label_counts = np.array(label_counts) return ModuleUsage(label_counts, group_labels, observation_labels, feat_names, group_dict, None)
[docs]class ModuleTransitions: def __init__(self, transition_counts, transition_count_matrices, group_labels, observation_labels, feat_names, group_dict): self.transition_counts = transition_counts self.transition_count_matrices = transition_count_matrices self.group_labels = group_labels self.observation_labels = observation_labels self.feat_names = feat_names self.group_dict = group_dict mean_check = np.allclose(np.mean(transition_counts, axis=0), 0, atol=0.1) std_check = np.allclose(np.std(transition_counts, axis=0), 1, atol=1) self.scaled = mean_check and std_check
[docs] def to_df(self): colnames = [] flip_group_dict = {v: k for k, v in self.group_dict.items()} bad_group_labels = [] for i in self.group_labels: if i in flip_group_dict.keys(): colnames.append(flip_group_dict[i]) else: colnames.append("none") bad_group_labels.append(i) if len(bad_group_labels) > 0: print(f"Warning: the following groups were not detected in the config and were labeled 'none'; continuing:\n{bad_group_labels}") df = pd.DataFrame(self.transition_counts, columns=self.feat_names, index=self.observation_labels) df["group"] = colnames return df
[docs] def scale(self): scaler = StandardScaler() transition_counts_scaled = scaler.fit_transform(self.transition_counts) #transition_count_matrices_scaled = scaler.fit_transform(self.transition_count_matrices) return ModuleTransitions(transition_counts_scaled, self.transition_count_matrices, self.group_labels, self.observation_labels, self.feat_names, self.group_dict)
[docs] def save(self, save_path): if ((save_path.endswith(".pkl")==False) and (save_path.endswith(".pickle")==False)): save_path+=".pickle" print(f"Save path changed to {save_path}") with open(save_path, 'wb') as file: pickle.dump(self, file)
[docs]def get_module_transitions(config, labels_df, modules_altered=False): """ Reshape labels dataframe from label_counter_subgroups to be an array of features :param config: config object :param labels_df: labels dataframe from label_counter_subgroups :param binsize: width of bins in seconds; if None, no binning is performed :return: """ data_subgrouped = False try: list(labels_df.columns.get_level_values(1).unique()) subgroups = list(labels_df.columns.get_level_values(0).unique()) group_dict = {subgroups[i]: i for i in range(len(subgroups))} data_subgrouped = True except IndexError: subgroups = list(labels_df.columns.get_level_values(0).unique()) group_dict = {"no_assigned_subgroup": 0} data_subgrouped = False n_groups = len(subgroups) fps = int(config["fps"]) labels_flat = np.array(labels_df) labels_flat = [item for sublist in labels_flat for item in sublist] if modules_altered: modules = np.unique(labels_flat) n_modules = len(modules) else: n_modules = int(config["n_modules"]) modules = np.arange(0, n_modules, 1) modules_detect = np.unique(labels_flat) for m in modules_detect: if is_nonnum(m): print("Warning, detected non-numeric modules - consider setting 'modules_altered' to True (False by defualt)") break transition_counts = [] transition_count_matrices = [] feat_names_made = False feat_names = [] group_labels = [] observation_labels = [] for g in range(n_groups): for i in range(len(labels_df[subgroups[g]].columns)): transition_counts_i = np.zeros(n_modules * n_modules) transition_counts_i_matrix = np.zeros([n_modules, n_modules]) comparison_idx=0 for m_i, mod_i in enumerate(modules): for m_j, mod_j in enumerate(modules): if feat_names_made == False: if is_nonnum(mod_i): modname=f'{mod_i}_to_{mod_j}' else: modname=f'{str(int(mod_i))}_to_{str(int(mod_j))}' feat_names.append(f"{modname}") arr = np.array(labels_df[subgroups[g]][labels_df[subgroups[g]].columns[i]]) value=np.sum((arr[:-1] == mod_i) & (arr[1:] == mod_j)) transition_counts_i[comparison_idx] = value transition_counts_i_matrix[m_i,m_j] = value comparison_idx += 1 transition_counts.append(transition_counts_i) transition_count_matrices.append(transition_counts_i_matrix) if data_subgrouped: group_labels.append(g) observation_labels.append(labels_df[subgroups[g]].columns[i]) else: group_labels.append(0) observation_labels.append(labels_df[subgroups[g]].columns[i][0]) feat_names_made = True transition_counts = np.array(transition_counts) return ModuleTransitions(transition_counts, transition_count_matrices, group_labels, observation_labels, feat_names, group_dict)
[docs]def load_module_feature_object(module_feature_object_path): with open(module_feature_object_path, 'rb') as file: module_feature_object = pickle.load(file) return module_feature_object
[docs]def embed(module_feature_object,method="lda",n_components=2): """ Get dimensionally reduced space embeddings of the data with LDA or PCA :param module_feature_object: module feature object :param method: LDA or PCA; default LDA :param n_components: number of components :return: """ if module_feature_object.__class__.__name__=="ModuleUsage": X=module_feature_object.label_counts y=module_feature_object.group_labels obj_type="module usage" elif module_feature_object.__class__.__name__=="ModuleTransitions": X=module_feature_object.transition_counts y=module_feature_object.group_labels obj_type="module transitions" elif module_feature_object.__class__.__name__=="KeypointFeature": X=module_feature_object.keypoint_feature y=module_feature_object.group_labels obj_type="keypoint feature" elif module_feature_object.__class__.__name__=="ActionUnits": X=module_feature_object.action_units y=module_feature_object.group_labels obj_type="action units" else: raise ValueError(f'module_feature_object class must be ModuleUsage or ModuleTransitions or KeypointFeature, not {module_feature_object.__class__}') if ((method == "lda") or (method == "LDA") or (method == "lineardiscriminantanalysis") or (method == "LinearDiscriminantAnalysis")): emb = LDA(n_components=n_components,store_covariance=True) emb.fit(X,y) elif ((method == "pca") or (method == "PCA")): emb = PCA(n_components=n_components) emb.fit(X) return emb
[docs]def classify(module_feature_object, method="lda"): """ Classify pose segmentation data using either module usage or transitions :param module_feature_object: ModuleUsage or ModuleTransitions object :param method: classification method to use; options include "lda", "logisticregression", "mlp", "naivebayes", "knn", or "randomforest" :return: classifier """ if module_feature_object.__class__.__name__=="ModuleUsage": X=module_feature_object.label_counts y=module_feature_object.group_labels obj_type="module usage" elif module_feature_object.__class__.__name__=="ModuleTransitions": X=module_feature_object.transition_counts y=module_feature_object.group_labels obj_type="module transitions" elif module_feature_object.__class__.__name__=="KeypointFeature": X=module_feature_object.keypoint_feature y=module_feature_object.group_labels obj_type="keypoint feature" elif module_feature_object.__class__.__name__=="ActionUnits": X=module_feature_object.action_units y=module_feature_object.group_labels obj_type="action units" else: raise ValueError(f'module_feature_object class must be ModuleUsage or ModuleTransitions or KeypointFeature, not {module_feature_object.__class__}') if ((method == "lda") or (method == "LDA") or (method == "lineardiscriminantanalysis") or ( method == "LinearDiscriminantAnalysis")): clf = LDA() elif ((method == "lr") or (method == "LR") or (method == "LogisticRegression") or ( method == "logisticregression")): clf = LR() elif ((method == "mlp") or (method == "MLP") or (method == "nn") or (method == "NN")): clf = MLPClassifier(max_iter=1000) elif ((method == "naivebayes") or (method == "NaiveBayes") or (method == "nb") or (method == "NB")): clf = GaussianNB() elif ((method == "k") or (method == "knn") or (method == "KNN")): clf = KNeighborsClassifier() elif ((method == "rf") or (method == "randomforest") or (method == "RF") or (method == "RandomForest")): clf = RandomForestClassifier() else: raise ValueError( f'Invalid method "{method}"; must be one of: lda, lr, mlp, nb, knn, or rf (or one of various alternative spellings)') clf.fit(X, y) return clf
[docs]def loocv(module_feature_object, method="lda"): """ Perform leave-one-out cross-validation for a method of classifying pose segmentation data using either module usage or transitions :param module_feature_object: ModuleUsage or ModuleTransitions object :param method: classification method to use; options include "lda", "logisticregression", "mlp", "naivebayes", "knn", or "randomforest" :return: accuracy, conf_mat """ loocv_preds = [] if module_feature_object.__class__.__name__=="ModuleUsage": X=module_feature_object.label_counts y=module_feature_object.group_labels obj_type="module usage" elif module_feature_object.__class__.__name__=="ModuleTransitions": X=module_feature_object.transition_counts y=module_feature_object.group_labels obj_type="module transitions" elif module_feature_object.__class__.__name__=="KeypointFeature": X=module_feature_object.keypoint_feature y=module_feature_object.group_labels obj_type="keypoint feature" elif module_feature_object.__class__.__name__=="ActionUnits": X=module_feature_object.action_units y=module_feature_object.group_labels obj_type="action units" else: raise ValueError(f'module_feature_object class must be ModuleUsage or ModuleTransitions or KeypointFeature, not {module_feature_object.__class__}') for obs in range(len(y)): sub_X = np.delete(X, obs, axis=0) sub_y = y[:obs] + y[obs + 1:] X_i = X[obs, :] y_i = y[obs] if ((method == "lda") or (method == "LDA") or (method == "lineardiscriminantanalysis") or ( method == "LinearDiscriminantAnalysis")): clf = LDA() elif ((method == "lr") or (method == "LR") or (method == "LogisticRegression") or ( method == "logisticregression")): clf = LR() elif ((method == "mlp") or (method == "MLP") or (method == "nn") or (method == "NN")): clf = MLPClassifier(max_iter=1000) elif ((method == "naivebayes") or (method == "NaiveBayes") or (method == "nb") or (method == "NB")): clf = GaussianNB() elif ((method == "k") or (method == "knn") or (method == "KNN")): clf = KNeighborsClassifier() elif ((method == "rf") or (method == "randomforest") or (method == "RF") or (method == "RandomForest")): clf = RandomForestClassifier() else: raise ValueError( f'Invalid method "{method}"; must be one of: lda, lr, mlp, nb, knn, or rf (or one of various alternative spellings)') clf.fit(sub_X, sub_y) loocv_preds.append(clf.predict(X_i.reshape(1, -1))[0]) loocv_preds = np.array(loocv_preds) y = np.array(y) accuracy = np.mean(loocv_preds == y) classes = sorted(np.unique(y)) conf_mat = np.zeros([len(classes), len(classes)]) for i in range(len(classes)): for j in range(len(classes)): conf_mat[i, j] = np.sum(loocv_preds[y == i] == j) return accuracy, conf_mat
[docs]def get_distance(module_feature_object, method="euclidean"): """ Get distance :param module_feature_object: :param method: :return: """ if module_feature_object.__class__.__name__=="ModuleUsage": X=module_feature_object.label_counts y=module_feature_object.group_labels obj_type="module usage" elif module_feature_object.__class__.__name__=="ModuleTransitions": X=module_feature_object.transition_counts y=module_feature_object.group_labels obj_type="module transitions" elif module_feature_object.__class__.__name__=="KeypointFeature": X=module_feature_object.keypoint_feature y=module_feature_object.group_labels obj_type="keypoint feature" elif module_feature_object.__class__.__name__=="ActionUnits": X=module_feature_object.action_units y=module_feature_object.group_labels obj_type="keypoint feature" else: raise ValueError(f'module_feature_object class must be ModuleUsage or ModuleTransitions or KeypointFeature, not {module_feature_object.__class__}') centroids = np.zeros([len(np.unique(X)), len(module_feature_object.feat_names)]) for group in sorted(np.unique(module_feature_object.group_labels)): centroids[group, :] = np.mean(X[np.array(module_feature_object.group_labels) == group], axis=0) dist_mat = np.zeros([len(np.unique(module_feature_object.group_labels)), len(module_feature_object.group_labels)]) for o, obs in enumerate(X): for group in sorted(np.unique(module_feature_object.group_labels)): if method == "euclidean": dist_mat[group, o] = distance.euclidean(obs, centroids[group, :]) elif method == "braycurtis": dist_mat[group, o] = distance.braycurtis(obs, centroids[group, :]) elif method == "cityblock": dist_mat[group, o] = distance.cityblock(obs, centroids[group, :]) elif method == "correlation": dist_mat[group, o] = distance.correlation(obs, centroids[group, :]) return dist_mat.T
[docs]def regress(module_feature_object, dose_dict, method="LinearRegression", degree=1, alpha = 1): """ Regress a continuous variable (e.g., dose, stimulus value) from a module or keypoint feature object :param module_feature_object: ModuleUsage, ModuleTransitions, or KeypointFeature object :param dose_dict: dictionary with keys corresponding to subgroups and items corresponding to variable :param method: method of regression; either "LinearRegression" (default) or "Ridge" or "Lasso" :param degree: polynomial degree; default 1 :param alpha: alpha (default 1; only applies for regularized regression, i.e. Ridge and Lasso) :return: regression model, dose_labels """ if module_feature_object.__class__.__name__=="ModuleUsage": X=module_feature_object.label_counts elif module_feature_object.__class__.__name__=="ModuleTransitions": X=module_feature_object.transition_counts elif module_feature_object.__class__.__name__=="KeypointFeature": X=module_feature_object.keypoint_feature else: raise ValueError(f'module_feature_object class must be ModuleUsage or ModuleTransitions or KeypointFeature, not {module_feature_object.__class__}') reverse_grp_dict = {v: k for k, v in module_feature_object.group_dict.items()} dose_labels = [dose_dict[reverse_grp_dict[i]] for i in module_feature_object.group_labels] if ((degree<=0) or (degree%1!=0)): raise ValueError("Degree must be a positive, non-zero integer!") if degree==1: if method=="LinearRegression": reg = LinearRegression().fit(X, dose_labels) elif method=="Lasso": reg = Lasso(alpha=alpha).fit(X, dose_labels) elif method=="Ridge": reg = Ridge(alpha=alpha).fit(X, dose_labels) elif degree>1: if method=="LinearRegression": reg = make_pipeline(PolynomialFeatures(degree=degree), LinearRegression()) reg.fit(X, dose_labels) elif method=="Lasso": reg = make_pipeline(PolynomialFeatures(degree=degree), Lasso(alpha=alpha)) reg.fit(X, dose_labels) elif method=="Ridge": reg = make_pipeline(PolynomialFeatures(degree=degree), Ridge(alpha=alpha)) reg.fit(X, dose_labels) return reg, dose_labels
[docs]def loocv_regression(module_feature_object, dose_dict, method="LinearRegression", constrain_pos = True, degree=1, alpha=1): """ Perform LOOCV for linear regression :param module_feature_object: ModuleUsage, ModuleTransitions, or KeypointFeature object :param dose_dict: dictionary with keys corresponding to subgroups and items corresponding to variable :param method: method of regression; either "LinearRegression" (default) or "Ridge" or "Lasso" :param constrain_pos: constrain to only positive values; true by default :param degree: degree of polynomial (default 1) :param alpha: alpha (default 1; only applies for regularized regression, i.e. Ridge and Lasso) :return: held-out predictions, squared error """ loocv_preds = [] if module_feature_object.__class__.__name__=="ModuleUsage": X=module_feature_object.label_counts y=module_feature_object.group_labels obj_type="module usage" elif module_feature_object.__class__.__name__=="ModuleTransitions": X=module_feature_object.transition_counts y=module_feature_object.group_labels obj_type="module transitions" elif module_feature_object.__class__.__name__=="KeypointFeature": X=module_feature_object.keypoint_feature y=module_feature_object.group_labels obj_type="keypoint feature" else: raise ValueError(f'module_feature_object class must be ModuleUsage or ModuleTransitions or KeypointFeature, not {module_feature_object.__class__}') reverse_grp_dict = {v: k for k, v in module_feature_object.group_dict.items()} y = [dose_dict[reverse_grp_dict[i]] for i in module_feature_object.group_labels] for obs in range(len(y)): sub_X = np.delete(X, obs, axis=0) sub_y = y[:obs] + y[obs + 1:] X_i = X[obs, :] y_i = y[obs] if ((degree<=0) or (degree%1!=0)): raise ValueError("Degree must be a positive, non-zero integer!") if degree==1: if method=="LinearRegression": reg = LinearRegression().fit(sub_X, sub_y) elif method=="Lasso": reg = Lasso(alpha=alpha).fit(sub_X, sub_y) elif method=="Ridge": reg = Ridge(alpha=alpha).fit(sub_X, sub_y) elif degree>1: if method=="LinearRegression": reg = make_pipeline(PolynomialFeatures(degree=degree), LinearRegression()) reg.fit(sub_X, sub_y) elif method=="Lasso": reg = make_pipeline(PolynomialFeatures(degree=degree), Lasso(alpha=alpha)) reg.fit(sub_X, sub_y) elif method=="Ridge": reg = make_pipeline(PolynomialFeatures(degree=degree), Ridge(alpha=alpha)) reg.fit(sub_X, sub_y) loocv_preds.append(reg.predict(X_i.reshape(1, -1))[0]) loocv_preds = np.array(loocv_preds) if constrain_pos: loocv_preds[loocv_preds<0]=0 y = np.array(y) sq_err = np.square(loocv_preds-y) return loocv_preds, sq_err