From 81d4a533356b3967d4727efabc20d4db997c60d9 Mon Sep 17 00:00:00 2001 From: Skudalen Date: Fri, 9 Jul 2021 11:13:42 +0200 Subject: [PATCH] doc: make sufficient comments to all funcs in Handle_emg_data.py --- Handle_emg_data.py | 132 +++++++++++++++++++++++++++++---------------- 1 file changed, 86 insertions(+), 46 deletions(-) diff --git a/Handle_emg_data.py b/Handle_emg_data.py index 3a29abb..66b76fe 100644 --- a/Handle_emg_data.py +++ b/Handle_emg_data.py @@ -18,7 +18,10 @@ NR_COEFFICIENTS = 13 # Number of coefficients NR_MEL_BINS = 40 # Number of mel-filter-bins class Data_container: - + + # Initiates personal data container for each subject. Dict for each session with keys 'left' and 'right', + # and values equal to lists of EMG data indexed 0-7 + # NB! More sessions has to be added here in the future def __init__(self, subject_nr:int, subject_name:str): self.subject_nr = subject_nr self.subject_name = subject_name @@ -31,21 +34,27 @@ class Data_container: self.data_dict_round3, self.data_dict_round4 ] - + + class CSV_handler: + # Initiates object to store all datapoints in the experiment def __init__(self): self.working_dir = str(Path.cwd()) - self.data_container_dict = {} # Dict with keys equal subject numbers and values equal the relvant datacontainer - self.data_type = None + self.data_container_dict = {} # Dict with keys equal subject numbers and values equal to its respective datacontainer + self.data_type = None # String describing which type of data is stored in the object # Makes dataframe from the csv files in the working directory + # Input: filename of a csv-file + # Output: DataFrame def make_df(self, filename): filepath = self.working_dir + str(filename) df = pd.read_csv(filepath) return df - # Extracts out the timestamp and the selected emg signal into a new dataframe and stores the data on the subject + # Extracts out the timestamp and the selected emg signal into a new dataframe + # Input: filename of a csv-file, EMG nr + # Output: DataFrame(timestamp/EMG) def get_time_emg_table(self, filename:str, emg_nr:int): tot_data_frame = self.make_df(filename) emg_str = 'emg' + str(emg_nr) @@ -53,29 +62,31 @@ class CSV_handler: return filtered_df # Takes in a df and stores the information in a Data_container object - def store_df_in_container(self, filename:str, emg_nr:int, which_arm:str, data_container:Data_container, round:int): + # Input: filename of a csv-file, EMG nr, left/right arm, subject's data_container, session nr + # Output: None -> stores EMG data in data container + def store_df_in_container(self, filename:str, emg_nr:int, which_arm:str, data_container:Data_container, session:int): df = self.get_time_emg_table(filename, emg_nr+1) if df.isnull().values.any(): - print('NaN in: subject', data_container.subject_nr, 'arm:', which_arm, 'session:', round, 'emg nr:', emg_nr) + print('NaN in: subject', data_container.subject_nr, 'arm:', which_arm, 'session:', session, 'emg nr:', emg_nr) # Places the data correctly: - if round == 1: + if session == 1: if which_arm == 'left': data_container.data_dict_round1['left'][emg_nr] = df # Zero indexed emg_nr in the dict else: data_container.data_dict_round1['right'][emg_nr] = df - elif round == 2: + elif session == 2: if which_arm == 'left': data_container.data_dict_round2['left'][emg_nr] = df else: data_container.data_dict_round2['right'][emg_nr] = df - elif round == 3: + elif session == 3: if which_arm == 'left': data_container.data_dict_round3['left'][emg_nr] = df else: data_container.data_dict_round3['right'][emg_nr] = df - elif round == 4: + elif session == 4: if which_arm == 'left': data_container.data_dict_round4['left'][emg_nr] = df else: @@ -83,14 +94,25 @@ class CSV_handler: else: raise IndexError('Not a valid index') - # Links the data container for a subject to the handler object + # Links the data container for a subject to the csv_handler object + # Input: the subject's data_container + # Output: None -> places the data container correctly in the CSV_handler data_container_dict def link_container_to_handler(self, data_container:Data_container): # Links the retrieved data with the subjects data_container subject_nr = data_container.subject_nr self.data_container_dict[subject_nr] = data_container - # Loads the data from the csv files into a storing system in an CSV_handler object - # (hard, hardPP, soft and softPP) + # Retrieves df via the data_dict in the CSV_handler object + # Input: Experiment detailes + # Output: DataFrame + def get_df_from_data_dict(self, subject_nr, which_arm, session, emg_nr): + container:Data_container = self.data_container_dict.get(subject_nr) + df = container.dict_list[session - 1].get(which_arm)[emg_nr - 1] + return df + + # Loads the data from the csv files into the storing system of the CSV_handler object + # Input: None(CSV_handler) + # Output: None -> load and stores data def load_hard_PP_emg_data(self): # CSV data from subject 1 @@ -182,7 +204,6 @@ class CSV_handler: self.link_container_to_handler(data_container) self.data_type = 'hardPP' return self.data_container_dict - def load_soft_PP_emg_data(self): # CSV data from subject 1 @@ -274,7 +295,6 @@ class CSV_handler: self.link_container_to_handler(data_container) self.data_type = 'softPP' return self.data_container_dict - def load_hard_original_emg_data(self): # CSV data from subject 1 @@ -366,7 +386,6 @@ class CSV_handler: self.link_container_to_handler(data_container) self.data_type = 'hard' return self.data_container_dict - def load_soft_original_emg_data(self): # CSV data from subject 1 @@ -459,14 +478,9 @@ class CSV_handler: self.data_type = 'soft' return self.data_container_dict - # Retrieves df via the data_dict in the handler object - def get_df_from_data_dict(self, subject_nr, which_arm, session, emg_nr): - container:Data_container = self.data_container_dict.get(subject_nr) - df = container.dict_list[session - 1].get(which_arm)[emg_nr - 1] - return df - - # Loads in data to a CSV_handler. Choose data_type: hard, hardPP, soft og softPP as str. - # Returns None. + # Loads data the to the CSV_handler(general load func). Choose data_type: hard, hardPP, soft og softPP as str. + # Input: String(datatype you want) + # Output: None -> load and stores data def load_data(self, data_type): if data_type == 'hard': self.load_hard_original_emg_data() @@ -480,28 +494,31 @@ class CSV_handler: raise Exception('Wrong input') # Retrieved data. Send in loaded csv_handler and data detailes you want. - # Returns DataFrame and samplerate + # Input: Experiment detailes + # Output: DataFrame, samplerate:int def get_data(self, subject_nr, which_arm, session, emg_nr): data_frame = self.get_df_from_data_dict(subject_nr, which_arm, session, emg_nr) samplerate = get_samplerate(data_frame) return data_frame, samplerate # NOT IMPLEMENTED - ''' def get_keyboard_data(self, filename:str, pres_or_release:str='pressed'): filepath = self.working_dir + str(filename) df = pd.read_csv(filepath) if pres_or_release == 'pressed': df = df[(df['event'] == 'KeyPressed') and (df['event'] == 'KeyPressed')] - else - ''' + else: pass + pass + class NN_handler: + # Paths for data storage in json to later use in Neural_Network_Analysis.py JSON_PATH_REG = "reg_data.json" JSON_PATH_MFCC = "mfcc_data.json" - + # Class to manipulate data from the CSV_handler and store it for further analysis + # NB! More subject needs to be added manually def __init__(self, csv_handler:CSV_handler) -> None: self.csv_handler = csv_handler # Should med 4 sessions * split nr of samples per person. Each sample is structured like this: [sample_df, samplerate] @@ -519,13 +536,17 @@ class NN_handler: 5: None } - def get_reg_samples_dict(self): + # GET method for reg_samples_dict + def get_reg_samples_dict(self) -> dict: return self.reg_samples_per_subject - - def get_mfcc_samples_dict(self): + + # GET method for mfcc_samples_dict + def get_mfcc_samples_dict(self) -> dict: return self.mfcc_samples_per_subject - + # Retrieves all EMG data from one subject and one session, and makes a list of the DataFrames + # Input: Subject nr, Session nr (norm, not 0-indexed) + # Output: List(df_1, ..., df_16) def get_emg_list(self, subject_nr, session_nr) -> list: list_of_emgs = [] df, _ = self.csv_handler.get_data(subject_nr, 'left', session_nr, 1) @@ -537,10 +558,13 @@ class NN_handler: df, _ = self.csv_handler.get_data(subject_nr, 'right', session_nr, emg_nr+1) list_of_emgs.append(DataFrame(df[get_emg_str(emg_nr+1)])) - return list_of_emgs # list of emg data where first element also has timestamp column + return list_of_emgs # list of emg data + # Creates one Dataframe of all EMG data(one session, one subject). One column for each EMG array + # Input: List(emg1, ..., emg16) + # Output: DataFrame(shape[1]=16) def make_subj_sample(self, list_of_emgs_): - # Test and fix if the emgs have different size + # Test and fix if the left/right EMGs have different size list_of_emgs = [] length_left_emgs = int(len(list_of_emgs_[0].index)) length_right_emgs = int(len(list_of_emgs_[-1].index)) @@ -568,6 +592,9 @@ class NN_handler: return tot_session_df + # Takes in all EMG session Dataframe and merges the EMG data into one column, creating one signal + # Input: DataFrame(shape[1]=16, EMG data) + # Output: DataFrame(signal), samplerate of it def reshape_session_df_to_signal(self, df:DataFrame): main_df = df[['timestamp', 1]].rename(columns={1: 'emg'}) for i in range(2, 17): @@ -576,6 +603,9 @@ class NN_handler: samplerate = get_samplerate(main_df) return main_df, samplerate + # Stores split, merged signals in the NN-handler's reg_samples_per_subject + # Input: Split_nr:int(how many times to split this merged signal) + # Output: None -> stores in NN_handler def store_samples(self, split_nr) -> None: for subject_nr in range(5): subj_samples = [] @@ -595,11 +625,12 @@ class NN_handler: self.reg_samples_per_subject[subject_nr+1] = subj_samples - + # Takes in all EMG session Dataframe and creates DataFrame of MFCC samples + # Input: DataFrame(shape[1]=16, EMG data) + # Output: DataFrame(merged MFCC data, shape: (n, 13*16)) def make_mfcc_df_from_session_df(self, session_df) -> DataFrame: session_df.rename(columns={0:'timestamp'}, inplace=True) samplerate = get_samplerate(session_df) - #attach_func = lambda list_1, list_2: list_1.tolist().extend(list_2.tolist()) attach_func = lambda list_1, list_2: list_1.extend(list_2) signal = session_df[1] @@ -607,7 +638,6 @@ class NN_handler: df = DataFrame(mfcc_0).dropna() df['combined'] = df.values.tolist() result_df = df['combined'] - #print(result_df) for i in range(2, 17): signal_i = session_df[i] @@ -618,6 +648,10 @@ class NN_handler: return result_df + # Merges MFCC data from all sessions and stores the sample data in + # the NN_handler's mfcc_samples_per_subject dict + # Input: None(NN_handler) + # Output: None -> stores in NN_handler def store_mfcc_samples(self) -> None: for subject_nr in range(5): subj_samples = [] @@ -636,9 +670,12 @@ class NN_handler: self.mfcc_samples_per_subject[subject_nr+1] = result_df + # Makes MFCC data from reg_samples_per_subject and stores it in a json file + # Input: Path to the json file + # Output: None -> stores in json def save_json_reg(self, json_path=JSON_PATH_REG): - # dictionary to store mapping, labels, and MFCCs + # Dictionary to store mapping, labels, and MFCCs data = { "mapping": [], "labels": [], @@ -647,7 +684,7 @@ class NN_handler: raw_data_dict = self.get_reg_samples_dict() - # loop through all subjects to get samples + # Loop through all subjects to get samples mfcc_list = [] mfcc_frame_list = [] @@ -689,6 +726,9 @@ class NN_handler: with open(json_path, "w") as fp: json.dump(data, fp, indent=4) + # Stores MFCC data from mfcc_samples_per_subject in a json file + # Input: Path to the json file + # Output: None -> stores in json def save_json_mfcc(self, json_path=JSON_PATH_MFCC): # dictionary to store mapping, labels, and MFCCs @@ -752,7 +792,7 @@ def get_samplerate(df:DataFrame): samplerate = samples / seconds return int(samplerate) -# Takes in a df and outputs np arrays for x and y values +# Help: takes in a df and outputs np arrays for x and y values def get_xory_from_df(x_or_y, df:DataFrame): swither = { 'x': df.iloc[:,0].to_numpy(), @@ -760,15 +800,15 @@ def get_xory_from_df(x_or_y, df:DataFrame): } return swither.get(x_or_y, 0) -# Slightly modified mfcc with inputs like below. -# Returns N (x_values from original df) and mfcc_y_values -def mfcc_custom(signal, samplesize, windowsize=MFCC_WINDOWSIZE, +# Help: slightly modified mfcc with inputs like below. Returns N (x_values from original df) and mfcc_y_values +def mfcc_custom(signal, samplerate, windowsize=MFCC_WINDOWSIZE, stepsize=MFCC_STEPSIZE, nr_coefficients=NR_COEFFICIENTS, nr_mel_filters=NR_MEL_BINS): - return mfcc(signal, samplesize, windowsize, stepsize, nr_coefficients, nr_mel_filters) + return mfcc(signal, samplerate, windowsize, stepsize, nr_coefficients, nr_mel_filters) +# Help: test for unregularities in DataFrame obj def test_df_for_bugs(signal, key, placement_index): df = DataFrame(signal) if df.isnull().values.any():