doc: make sufficient comments to all funcs

in Handle_emg_data.py
This commit is contained in:
Skudalen 2021-07-09 11:13:42 +02:00
parent 588fa7d282
commit 81d4a53335

View File

@ -18,7 +18,10 @@ NR_COEFFICIENTS = 13 # Number of coefficients
NR_MEL_BINS = 40 # Number of mel-filter-bins
class Data_container:
# Initiates personal data container for each subject. Dict for each session with keys 'left' and 'right',
# and values equal to lists of EMG data indexed 0-7
# NB! More sessions has to be added here in the future
def __init__(self, subject_nr:int, subject_name:str):
self.subject_nr = subject_nr
self.subject_name = subject_name
@ -31,21 +34,27 @@ class Data_container:
self.data_dict_round3,
self.data_dict_round4
]
class CSV_handler:
# Initiates object to store all datapoints in the experiment
def __init__(self):
self.working_dir = str(Path.cwd())
self.data_container_dict = {} # Dict with keys equal subject numbers and values equal the relvant datacontainer
self.data_type = None
self.data_container_dict = {} # Dict with keys equal subject numbers and values equal to its respective datacontainer
self.data_type = None # String describing which type of data is stored in the object
# Makes dataframe from the csv files in the working directory
# Input: filename of a csv-file
# Output: DataFrame
def make_df(self, filename):
filepath = self.working_dir + str(filename)
df = pd.read_csv(filepath)
return df
# Extracts out the timestamp and the selected emg signal into a new dataframe and stores the data on the subject
# Extracts out the timestamp and the selected emg signal into a new dataframe
# Input: filename of a csv-file, EMG nr
# Output: DataFrame(timestamp/EMG)
def get_time_emg_table(self, filename:str, emg_nr:int):
tot_data_frame = self.make_df(filename)
emg_str = 'emg' + str(emg_nr)
@ -53,29 +62,31 @@ class CSV_handler:
return filtered_df
# Takes in a df and stores the information in a Data_container object
def store_df_in_container(self, filename:str, emg_nr:int, which_arm:str, data_container:Data_container, round:int):
# Input: filename of a csv-file, EMG nr, left/right arm, subject's data_container, session nr
# Output: None -> stores EMG data in data container
def store_df_in_container(self, filename:str, emg_nr:int, which_arm:str, data_container:Data_container, session:int):
df = self.get_time_emg_table(filename, emg_nr+1)
if df.isnull().values.any():
print('NaN in: subject', data_container.subject_nr, 'arm:', which_arm, 'session:', round, 'emg nr:', emg_nr)
print('NaN in: subject', data_container.subject_nr, 'arm:', which_arm, 'session:', session, 'emg nr:', emg_nr)
# Places the data correctly:
if round == 1:
if session == 1:
if which_arm == 'left':
data_container.data_dict_round1['left'][emg_nr] = df # Zero indexed emg_nr in the dict
else:
data_container.data_dict_round1['right'][emg_nr] = df
elif round == 2:
elif session == 2:
if which_arm == 'left':
data_container.data_dict_round2['left'][emg_nr] = df
else:
data_container.data_dict_round2['right'][emg_nr] = df
elif round == 3:
elif session == 3:
if which_arm == 'left':
data_container.data_dict_round3['left'][emg_nr] = df
else:
data_container.data_dict_round3['right'][emg_nr] = df
elif round == 4:
elif session == 4:
if which_arm == 'left':
data_container.data_dict_round4['left'][emg_nr] = df
else:
@ -83,14 +94,25 @@ class CSV_handler:
else:
raise IndexError('Not a valid index')
# Links the data container for a subject to the handler object
# Links the data container for a subject to the csv_handler object
# Input: the subject's data_container
# Output: None -> places the data container correctly in the CSV_handler data_container_dict
def link_container_to_handler(self, data_container:Data_container):
# Links the retrieved data with the subjects data_container
subject_nr = data_container.subject_nr
self.data_container_dict[subject_nr] = data_container
# Loads the data from the csv files into a storing system in an CSV_handler object
# (hard, hardPP, soft and softPP)
# Retrieves df via the data_dict in the CSV_handler object
# Input: Experiment detailes
# Output: DataFrame
def get_df_from_data_dict(self, subject_nr, which_arm, session, emg_nr):
container:Data_container = self.data_container_dict.get(subject_nr)
df = container.dict_list[session - 1].get(which_arm)[emg_nr - 1]
return df
# Loads the data from the csv files into the storing system of the CSV_handler object
# Input: None(CSV_handler)
# Output: None -> load and stores data
def load_hard_PP_emg_data(self):
# CSV data from subject 1
@ -182,7 +204,6 @@ class CSV_handler:
self.link_container_to_handler(data_container)
self.data_type = 'hardPP'
return self.data_container_dict
def load_soft_PP_emg_data(self):
# CSV data from subject 1
@ -274,7 +295,6 @@ class CSV_handler:
self.link_container_to_handler(data_container)
self.data_type = 'softPP'
return self.data_container_dict
def load_hard_original_emg_data(self):
# CSV data from subject 1
@ -366,7 +386,6 @@ class CSV_handler:
self.link_container_to_handler(data_container)
self.data_type = 'hard'
return self.data_container_dict
def load_soft_original_emg_data(self):
# CSV data from subject 1
@ -459,14 +478,9 @@ class CSV_handler:
self.data_type = 'soft'
return self.data_container_dict
# Retrieves df via the data_dict in the handler object
def get_df_from_data_dict(self, subject_nr, which_arm, session, emg_nr):
container:Data_container = self.data_container_dict.get(subject_nr)
df = container.dict_list[session - 1].get(which_arm)[emg_nr - 1]
return df
# Loads in data to a CSV_handler. Choose data_type: hard, hardPP, soft og softPP as str.
# Returns None.
# Loads data the to the CSV_handler(general load func). Choose data_type: hard, hardPP, soft og softPP as str.
# Input: String(datatype you want)
# Output: None -> load and stores data
def load_data(self, data_type):
if data_type == 'hard':
self.load_hard_original_emg_data()
@ -480,28 +494,31 @@ class CSV_handler:
raise Exception('Wrong input')
# Retrieved data. Send in loaded csv_handler and data detailes you want.
# Returns DataFrame and samplerate
# Input: Experiment detailes
# Output: DataFrame, samplerate:int
def get_data(self, subject_nr, which_arm, session, emg_nr):
data_frame = self.get_df_from_data_dict(subject_nr, which_arm, session, emg_nr)
samplerate = get_samplerate(data_frame)
return data_frame, samplerate
# NOT IMPLEMENTED
'''
def get_keyboard_data(self, filename:str, pres_or_release:str='pressed'):
filepath = self.working_dir + str(filename)
df = pd.read_csv(filepath)
if pres_or_release == 'pressed':
df = df[(df['event'] == 'KeyPressed') and (df['event'] == 'KeyPressed')]
else
'''
else: pass
pass
class NN_handler:
# Paths for data storage in json to later use in Neural_Network_Analysis.py
JSON_PATH_REG = "reg_data.json"
JSON_PATH_MFCC = "mfcc_data.json"
# Class to manipulate data from the CSV_handler and store it for further analysis
# NB! More subject needs to be added manually
def __init__(self, csv_handler:CSV_handler) -> None:
self.csv_handler = csv_handler
# Should med 4 sessions * split nr of samples per person. Each sample is structured like this: [sample_df, samplerate]
@ -519,13 +536,17 @@ class NN_handler:
5: None
}
def get_reg_samples_dict(self):
# GET method for reg_samples_dict
def get_reg_samples_dict(self) -> dict:
return self.reg_samples_per_subject
def get_mfcc_samples_dict(self):
# GET method for mfcc_samples_dict
def get_mfcc_samples_dict(self) -> dict:
return self.mfcc_samples_per_subject
# Retrieves all EMG data from one subject and one session, and makes a list of the DataFrames
# Input: Subject nr, Session nr (norm, not 0-indexed)
# Output: List(df_1, ..., df_16)
def get_emg_list(self, subject_nr, session_nr) -> list:
list_of_emgs = []
df, _ = self.csv_handler.get_data(subject_nr, 'left', session_nr, 1)
@ -537,10 +558,13 @@ class NN_handler:
df, _ = self.csv_handler.get_data(subject_nr, 'right', session_nr, emg_nr+1)
list_of_emgs.append(DataFrame(df[get_emg_str(emg_nr+1)]))
return list_of_emgs # list of emg data where first element also has timestamp column
return list_of_emgs # list of emg data
# Creates one Dataframe of all EMG data(one session, one subject). One column for each EMG array
# Input: List(emg1, ..., emg16)
# Output: DataFrame(shape[1]=16)
def make_subj_sample(self, list_of_emgs_):
# Test and fix if the emgs have different size
# Test and fix if the left/right EMGs have different size
list_of_emgs = []
length_left_emgs = int(len(list_of_emgs_[0].index))
length_right_emgs = int(len(list_of_emgs_[-1].index))
@ -568,6 +592,9 @@ class NN_handler:
return tot_session_df
# Takes in all EMG session Dataframe and merges the EMG data into one column, creating one signal
# Input: DataFrame(shape[1]=16, EMG data)
# Output: DataFrame(signal), samplerate of it
def reshape_session_df_to_signal(self, df:DataFrame):
main_df = df[['timestamp', 1]].rename(columns={1: 'emg'})
for i in range(2, 17):
@ -576,6 +603,9 @@ class NN_handler:
samplerate = get_samplerate(main_df)
return main_df, samplerate
# Stores split, merged signals in the NN-handler's reg_samples_per_subject
# Input: Split_nr:int(how many times to split this merged signal)
# Output: None -> stores in NN_handler
def store_samples(self, split_nr) -> None:
for subject_nr in range(5):
subj_samples = []
@ -595,11 +625,12 @@ class NN_handler:
self.reg_samples_per_subject[subject_nr+1] = subj_samples
# Takes in all EMG session Dataframe and creates DataFrame of MFCC samples
# Input: DataFrame(shape[1]=16, EMG data)
# Output: DataFrame(merged MFCC data, shape: (n, 13*16))
def make_mfcc_df_from_session_df(self, session_df) -> DataFrame:
session_df.rename(columns={0:'timestamp'}, inplace=True)
samplerate = get_samplerate(session_df)
#attach_func = lambda list_1, list_2: list_1.tolist().extend(list_2.tolist())
attach_func = lambda list_1, list_2: list_1.extend(list_2)
signal = session_df[1]
@ -607,7 +638,6 @@ class NN_handler:
df = DataFrame(mfcc_0).dropna()
df['combined'] = df.values.tolist()
result_df = df['combined']
#print(result_df)
for i in range(2, 17):
signal_i = session_df[i]
@ -618,6 +648,10 @@ class NN_handler:
return result_df
# Merges MFCC data from all sessions and stores the sample data in
# the NN_handler's mfcc_samples_per_subject dict
# Input: None(NN_handler)
# Output: None -> stores in NN_handler
def store_mfcc_samples(self) -> None:
for subject_nr in range(5):
subj_samples = []
@ -636,9 +670,12 @@ class NN_handler:
self.mfcc_samples_per_subject[subject_nr+1] = result_df
# Makes MFCC data from reg_samples_per_subject and stores it in a json file
# Input: Path to the json file
# Output: None -> stores in json
def save_json_reg(self, json_path=JSON_PATH_REG):
# dictionary to store mapping, labels, and MFCCs
# Dictionary to store mapping, labels, and MFCCs
data = {
"mapping": [],
"labels": [],
@ -647,7 +684,7 @@ class NN_handler:
raw_data_dict = self.get_reg_samples_dict()
# loop through all subjects to get samples
# Loop through all subjects to get samples
mfcc_list = []
mfcc_frame_list = []
@ -689,6 +726,9 @@ class NN_handler:
with open(json_path, "w") as fp:
json.dump(data, fp, indent=4)
# Stores MFCC data from mfcc_samples_per_subject in a json file
# Input: Path to the json file
# Output: None -> stores in json
def save_json_mfcc(self, json_path=JSON_PATH_MFCC):
# dictionary to store mapping, labels, and MFCCs
@ -752,7 +792,7 @@ def get_samplerate(df:DataFrame):
samplerate = samples / seconds
return int(samplerate)
# Takes in a df and outputs np arrays for x and y values
# Help: takes in a df and outputs np arrays for x and y values
def get_xory_from_df(x_or_y, df:DataFrame):
swither = {
'x': df.iloc[:,0].to_numpy(),
@ -760,15 +800,15 @@ def get_xory_from_df(x_or_y, df:DataFrame):
}
return swither.get(x_or_y, 0)
# Slightly modified mfcc with inputs like below.
# Returns N (x_values from original df) and mfcc_y_values
def mfcc_custom(signal, samplesize, windowsize=MFCC_WINDOWSIZE,
# Help: slightly modified mfcc with inputs like below. Returns N (x_values from original df) and mfcc_y_values
def mfcc_custom(signal, samplerate, windowsize=MFCC_WINDOWSIZE,
stepsize=MFCC_STEPSIZE,
nr_coefficients=NR_COEFFICIENTS,
nr_mel_filters=NR_MEL_BINS):
return mfcc(signal, samplesize, windowsize, stepsize, nr_coefficients, nr_mel_filters)
return mfcc(signal, samplerate, windowsize, stepsize, nr_coefficients, nr_mel_filters)
# Help: test for unregularities in DataFrame obj
def test_df_for_bugs(signal, key, placement_index):
df = DataFrame(signal)
if df.isnull().values.any():