Compare commits

..

No commits in common. "d7c733e54f65b1b59aae4b8147f7f0996e7b0be4" and "439238e0708c1bd1c068424b12c01bc5e18fc5a9" have entirely different histories.

7 changed files with 165 additions and 425 deletions

BIN
.DS_Store vendored

Binary file not shown.

View File

@ -9,7 +9,6 @@ import sys
sys.path.insert(0, '/Users/Markus/Prosjekter git/Slovakia 2021/psf_lib/python_speech_features/python_speech_features') sys.path.insert(0, '/Users/Markus/Prosjekter git/Slovakia 2021/psf_lib/python_speech_features/python_speech_features')
from psf_lib.python_speech_features.python_speech_features import mfcc from psf_lib.python_speech_features.python_speech_features import mfcc
import json import json
import os
# Global variables for MFCC # Global variables for MFCC
@ -22,39 +21,41 @@ class Data_container:
# Initiates personal data container for each subject. Dict for each session with keys 'left' and 'right', # Initiates personal data container for each subject. Dict for each session with keys 'left' and 'right',
# and values equal to lists of EMG data indexed 0-7 # and values equal to lists of EMG data indexed 0-7
def __init__(self, subject_nr:int, subject_name:str, nr_sessions:int): # NB! More sessions has to be added here in the future
def __init__(self, subject_nr:int, subject_name:str):
self.subject_nr = subject_nr self.subject_nr = subject_nr
self.subject_name = subject_name self.subject_name = subject_name
self.dict_list = [{'left': [None]*8, 'right': [None]*8} for i in range(nr_sessions)] self.data_dict_round1 = {'left': [None]*8, 'right': [None]*8}
self.data_dict_round2 = {'left': [None]*8, 'right': [None]*8}
self.data_dict_round3 = {'left': [None]*8, 'right': [None]*8}
self.data_dict_round4 = {'left': [None]*8, 'right': [None]*8}
self.dict_list = [self.data_dict_round1,
self.data_dict_round2,
self.data_dict_round3,
self.data_dict_round4
]
def __str__(self) -> str:
return 'Name: {}, \tID: {}'.format(self.subject_name, self.subject_nr)
class CSV_handler: class CSV_handler:
# Initiates object to store all datapoints in the experiment # Initiates object to store all datapoints in the experiment
def __init__(self, nr_subjects:int, nr_sessions:int): def __init__(self):
self.working_dir = str(Path.cwd()) self.working_dir = str(Path.cwd())
self.nr_subjects = nr_subjects self.data_container_dict = {} # Dict with keys equal subject numbers and values equal to its respective datacontainer
self.nr_sessions = nr_sessions self.data_type = None # String describing which type of data is stored in the object
# Dict with keys equal subject numbers and values equal to its respective datacontainer
self.data_container_dict = {i: None for i in range(nr_subjects)}
# String describing which type of data is stored in the object
self.data_type = None
# Makes dataframe from the csv files in the working directory # Makes dataframe from the csv files in the working directory
# Input: filename of a csv-file # Input: filename of a csv-file
# Output: DataFrame # Output: DataFrame
def make_df(self, filename): def make_df(self, filename):
#filepath = self.working_dir + str(filename) filepath = self.working_dir + str(filename)
filepath = str(filename)
df = pd.read_csv(filepath) df = pd.read_csv(filepath)
return df return df
# Extracts out the timestamp and the selected emg signal into a new dataframe # Extracts out the timestamp and the selected emg signal into a new dataframe
# Input: filename of a csv-file, EMG nr # Input: filename of a csv-file, EMG nr
# Output: DataFrame(timestamp/EMG) # Output: DataFrame(timestamp/EMG)
def get_emg_table_from_file(self, filename:str, emg_nr:int): def get_time_emg_table(self, filename:str, emg_nr:int):
tot_data_frame = self.make_df(filename) tot_data_frame = self.make_df(filename)
emg_str = 'emg' + str(emg_nr) emg_str = 'emg' + str(emg_nr)
filtered_df = tot_data_frame[["timestamp", emg_str]] filtered_df = tot_data_frame[["timestamp", emg_str]]
@ -64,14 +65,34 @@ class CSV_handler:
# Input: filename of a csv-file, EMG nr, left/right arm, subject's data_container, session nr # Input: filename of a csv-file, EMG nr, left/right arm, subject's data_container, session nr
# Output: None -> stores EMG data in data container # Output: None -> stores EMG data in data container
def store_df_in_container(self, filename:str, emg_nr:int, which_arm:str, data_container:Data_container, session:int): def store_df_in_container(self, filename:str, emg_nr:int, which_arm:str, data_container:Data_container, session:int):
df = self.get_emg_table_from_file(filename, emg_nr+1) df = self.get_time_emg_table(filename, emg_nr+1)
if df.isnull().values.any(): if df.isnull().values.any():
print('NaN in: subject', data_container.subject_nr, 'arm:', which_arm, 'session:', session, 'emg nr:', emg_nr) print('NaN in: subject', data_container.subject_nr, 'arm:', which_arm, 'session:', session, 'emg nr:', emg_nr)
# Places the data correctly: # Places the data correctly:
data_container.dict_list[session-1][which_arm][emg_nr] = df if session == 1:
if which_arm == 'left':
data_container.data_dict_round1['left'][emg_nr] = df # Zero indexed emg_nr in the dict
else:
data_container.data_dict_round1['right'][emg_nr] = df
elif session == 2:
if which_arm == 'left':
data_container.data_dict_round2['left'][emg_nr] = df
else:
data_container.data_dict_round2['right'][emg_nr] = df
elif session == 3:
if which_arm == 'left':
data_container.data_dict_round3['left'][emg_nr] = df
else:
data_container.data_dict_round3['right'][emg_nr] = df
elif session == 4:
if which_arm == 'left':
data_container.data_dict_round4['left'][emg_nr] = df
else:
data_container.data_dict_round4['right'][emg_nr] = df
else:
raise IndexError('Not a valid index')
# Links the data container for a subject to the csv_handler object # Links the data container for a subject to the csv_handler object
# Input: the subject's data_container # Input: the subject's data_container
@ -89,55 +110,9 @@ class CSV_handler:
df = container.dict_list[session - 1].get(which_arm)[emg_nr - 1] df = container.dict_list[session - 1].get(which_arm)[emg_nr - 1]
return df return df
# Loads the data from the csv files into the storing system of the CSV_handler object
# Loads data the to the CSV_handler(general load func). Choose data_type: hard, hardPP, soft og softPP as str. # Input: None(CSV_handler)
# Input: String(datatype you want), direction name of that type
# Output: None -> load and stores data # Output: None -> load and stores data
def load_data(self, type:str, type_dir_name:str):
data_path = self.working_dir + '/data/' + type_dir_name
subject_id = 100
subject_name = 'bruh'
nr_sessions = 101
container = None
session_count = 0
for i, (path, subject_dir, session_dir) in enumerate(os.walk(data_path)):
if path is not data_path:
if subject_dir:
session_count = 0
subject_id = int(path[-1])
subject_name = subject_dir[0].split('_')[0]
nr_sessions = len(subject_dir)
container = Data_container(subject_id, subject_name, nr_sessions)
continue
else:
session_count += 1
for f in session_dir:
spes_path = os.path.join(path, f)
if f == 'myoLeftEmg.csv':
for emg_nr in range(8):
self.store_df_in_container(spes_path, emg_nr, 'left', container, session_count)
elif f == 'myoRightEmg.csv':
for emg_nr in range(8):
self.store_df_in_container(spes_path, emg_nr, 'right', container, session_count)
self.link_container_to_handler(container)
self.data_type = type
return self.data_container_dict
# Retrieved data. Send in loaded csv_handler and data detailes you want.
# Input: Experiment detailes
# Output: DataFrame, samplerate:int
def get_data(self, subject_nr, which_arm, session, emg_nr):
data_frame = self.get_df_from_data_dict(subject_nr, which_arm, session, emg_nr)
samplerate = get_samplerate(data_frame)
return data_frame, samplerate
# OBSOLETE
def load_hard_PP_emg_data(self): def load_hard_PP_emg_data(self):
# CSV data from subject 1 # CSV data from subject 1
@ -502,7 +477,11 @@ class CSV_handler:
self.link_container_to_handler(data_container) self.link_container_to_handler(data_container)
self.data_type = 'soft' self.data_type = 'soft'
return self.data_container_dict return self.data_container_dict
def load_data_OLD(self, data_type):
# Loads data the to the CSV_handler(general load func). Choose data_type: hard, hardPP, soft og softPP as str.
# Input: String(datatype you want)
# Output: None -> load and stores data
def load_data(self, data_type):
if data_type == 'hard': if data_type == 'hard':
self.load_hard_original_emg_data() self.load_hard_original_emg_data()
elif data_type == 'hardPP': elif data_type == 'hardPP':
@ -514,6 +493,13 @@ class CSV_handler:
else: else:
raise Exception('Wrong input') raise Exception('Wrong input')
# Retrieved data. Send in loaded csv_handler and data detailes you want.
# Input: Experiment detailes
# Output: DataFrame, samplerate:int
def get_data(self, subject_nr, which_arm, session, emg_nr):
data_frame = self.get_df_from_data_dict(subject_nr, which_arm, session, emg_nr)
samplerate = get_samplerate(data_frame)
return data_frame, samplerate
# NOT IMPLEMENTED # NOT IMPLEMENTED
def get_keyboard_data(self, filename:str, pres_or_release:str='pressed'): def get_keyboard_data(self, filename:str, pres_or_release:str='pressed'):
@ -528,13 +514,31 @@ class CSV_handler:
class NN_handler: class NN_handler:
# Paths for data storage in json to later use in Neural_Network_Analysis.py # Paths for data storage in json to later use in Neural_Network_Analysis.py
JSON_PATH_REG = "reg_data.json"
JSON_PATH_MFCC = "mfcc_data.json" JSON_PATH_MFCC = "mfcc_data.json"
# Class to manipulate data from the CSV_handler and store it for further analysis # Class to manipulate data from the CSV_handler and store it for further analysis
# NB! More subject needs to be added manually
def __init__(self, csv_handler:CSV_handler) -> None: def __init__(self, csv_handler:CSV_handler) -> None:
self.csv_handler = csv_handler self.csv_handler = csv_handler
# Should med 4 sessions * split nr of samples per person. Each sample is structured like this: [sample_df, samplerate]
self.reg_samples_per_subject = {1: [],
2: [],
3: [],
4: [],
5: []
}
# Should med 4 sessions * (~150, 208) of mfcc samples per person. One [DataFrame, session_length_list] per subject # Should med 4 sessions * (~150, 208) of mfcc samples per person. One [DataFrame, session_length_list] per subject
self.mfcc_samples_per_subject = {k+1:[] for k in range(csv_handler.nr_subjects)} self.mfcc_samples_per_subject = {1: [],
2: [],
3: [],
4: [],
5: []
}
# GET method for reg_samples_dict
def get_reg_samples_dict(self) -> dict:
return self.reg_samples_per_subject
# GET method for mfcc_samples_dict # GET method for mfcc_samples_dict
def get_mfcc_samples_dict(self) -> dict: def get_mfcc_samples_dict(self) -> dict:
@ -588,6 +592,39 @@ class NN_handler:
return tot_session_df return tot_session_df
# Takes in all EMG session Dataframe and merges the EMG data into one column, creating one signal
# Input: DataFrame(shape[1]=16, EMG data)
# Output: DataFrame(signal), samplerate of it
def reshape_session_df_to_signal(self, df:DataFrame):
main_df = df[['timestamp', 1]].rename(columns={1: 'emg'})
for i in range(2, 17):
adding_df = df[['timestamp', i]].rename(columns={i: 'emg'})
main_df = pd.concat([main_df, adding_df], ignore_index=True)
samplerate = get_samplerate(main_df)
return main_df, samplerate
# Stores split, merged signals in the NN-handler's reg_samples_per_subject
# Input: Split_nr:int(how many times to split this merged signal)
# Output: None -> stores in NN_handler
def store_samples(self, split_nr) -> None:
for subject_nr in range(5):
subj_samples = []
for session_nr in range(4):
list_of_emg = self.get_emg_list(subject_nr+1, session_nr+1)
tot_session_df = self.make_subj_sample(list_of_emg)
# TESTING FOR NAN
if tot_session_df.isnull().values.any():
print('NaN in: subject', subject_nr+1, 'session:', session_nr+1, 'where? HERE')
samples = np.array_split(tot_session_df.to_numpy(), split_nr)
for array in samples:
df = DataFrame(array).rename(columns={0:'timestamp'})
df_finished, samplerate = self.reshape_session_df_to_signal(df)
subj_samples.append([df_finished, samplerate])
self.reg_samples_per_subject[subject_nr+1] = subj_samples
# Takes in all EMG session Dataframe and creates DataFrame of MFCC samples # Takes in all EMG session Dataframe and creates DataFrame of MFCC samples
# Input: DataFrame(shape[1]=16, EMG data) # Input: DataFrame(shape[1]=16, EMG data)
# Output: DataFrame(merged MFCC data, shape: (n, 13*16)), length of session datapoints # Output: DataFrame(merged MFCC data, shape: (n, 13*16)), length of session datapoints
@ -636,75 +673,11 @@ class NN_handler:
result_df = pd.concat(subj_samples, axis=0, ignore_index=True) result_df = pd.concat(subj_samples, axis=0, ignore_index=True)
self.mfcc_samples_per_subject[subject_nr+1] = [result_df, session_length_list] self.mfcc_samples_per_subject[subject_nr+1] = [result_df, session_length_list]
# Stores MFCC data from mfcc_samples_per_subject in a json file
# Makes MFCC data from reg_samples_per_subject and stores it in a json file
# Input: Path to the json file # Input: Path to the json file
# Output: None -> stores in json # Output: None -> stores in json
def save_json_mfcc(self, json_path=JSON_PATH_MFCC): def save_json_reg(self, json_path=JSON_PATH_REG):
# dictionary to store mapping, labels, and MFCCs
data = {
"mapping": [],
"labels": [],
"mfcc": [],
"session_lengths": []
}
raw_data_dict = self.get_mfcc_samples_dict()
# loop through all subjects to get samples
for key, value in raw_data_dict.items():
# save subject label in the mapping
subject_label = 'Subject ' + str(key)
print("\nProcessing: {}".format(subject_label))
data["mapping"].append(subject_label) # Subject label
data["session_lengths"].append(value[1]) # List[subject][session_length_list]
# process all samples per subject
for i, sample in enumerate(value[0]):
data["labels"].append(key-1) # Subject nr
data["mfcc"].append(sample) # MFCC sample on same index
print("sample:{} is done".format(i+1))
#print(np.array(mfcc_data).shape)
# save MFCCs to json file
with open(json_path, "w") as fp:
json.dump(data, fp, indent=4)
# OBSOLETE
def get_reg_samples_dict(self) -> dict:
return self.reg_samples_per_subject
def reshape_session_df_to_signal(self, df:DataFrame):
main_df = df[['timestamp', 1]].rename(columns={1: 'emg'})
for i in range(2, 17):
adding_df = df[['timestamp', i]].rename(columns={i: 'emg'})
main_df = pd.concat([main_df, adding_df], ignore_index=True)
samplerate = get_samplerate(main_df)
return main_df, samplerate
def store_samples(self, split_nr) -> None:
for subject_nr in range(5):
subj_samples = []
for session_nr in range(4):
list_of_emg = self.get_emg_list(subject_nr+1, session_nr+1)
tot_session_df = self.make_subj_sample(list_of_emg)
# TESTING FOR NAN
if tot_session_df.isnull().values.any():
print('NaN in: subject', subject_nr+1, 'session:', session_nr+1, 'where? HERE')
samples = np.array_split(tot_session_df.to_numpy(), split_nr)
for array in samples:
df = DataFrame(array).rename(columns={0:'timestamp'})
df_finished, samplerate = self.reshape_session_df_to_signal(df)
subj_samples.append([df_finished, samplerate])
self.reg_samples_per_subject[subject_nr+1] = subj_samples
def save_json_reg(self, json_path):
# Dictionary to store mapping, labels, and MFCCs # Dictionary to store mapping, labels, and MFCCs
data = { data = {
@ -757,6 +730,43 @@ class NN_handler:
with open(json_path, "w") as fp: with open(json_path, "w") as fp:
json.dump(data, fp, indent=4) json.dump(data, fp, indent=4)
# Stores MFCC data from mfcc_samples_per_subject in a json file
# Input: Path to the json file
# Output: None -> stores in json
def save_json_mfcc(self, json_path=JSON_PATH_MFCC):
# dictionary to store mapping, labels, and MFCCs
data = {
"mapping": [],
"labels": [],
"mfcc": [],
"session_lengths": []
}
raw_data_dict = self.get_mfcc_samples_dict()
# loop through all subjects to get samples
for key, value in raw_data_dict.items():
# save subject label in the mapping
subject_label = 'Subject ' + str(key)
print("\nProcessing: {}".format(subject_label))
data["mapping"].append(subject_label) # Subject label
data["session_lengths"].append(value[1]) # List[subject][session_length_list]
# process all samples per subject
for i, sample in enumerate(value[0]):
data["labels"].append(key-1) # Subject nr
data["mfcc"].append(sample) # MFCC sample on same index
print("sample:{} is done".format(i+1))
#print(np.array(mfcc_data).shape)
# save MFCCs to json file
with open(json_path, "w") as fp:
json.dump(data, fp, indent=4)
# HELP FUNCTIONS: ------------------------------------------------------------------------: # HELP FUNCTIONS: ------------------------------------------------------------------------:

View File

@ -12,7 +12,6 @@ from keras.callbacks import Callback, CSVLogger, ModelCheckpoint
from pathlib import Path from pathlib import Path
import pandas as pd import pandas as pd
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
#from matplotlib.legend import _get_legend_handles_
import statistics import statistics
import csv import csv
@ -67,185 +66,6 @@ def plot_train_history(history, val_data=False):
plt.show() plt.show()
# Plots the training history of four networks inverse cross-validated
# Input: data, nr of sessions in total, batch_size and epochs
# Ouput: None -> plot
def plot_4_x_inverse_cross_val(X, y, session_lengths, nr_sessions, batch_size=64, epochs=30):
history_dict = {'GRU': [],
'LSTM': [],
'FFN': [],
'CNN_1D': []}
for i in range(nr_sessions):
X_test_session, X_train_session, y_test_session, y_train_session = prepare_datasets_sessions(X, y, session_lengths, i)
model_GRU = GRU(input_shape=(1, 208))
GRU_h = train(model_GRU, X_train_session, y_train_session, 1, batch_size=batch_size, epochs=epochs)
history_dict['GRU'].append(GRU_h)
del model_GRU
K.clear_session()
model_LSTM = LSTM(input_shape=(1, 208))
LSTM_h = train(model_LSTM, X_train_session, y_train_session, 1, batch_size=batch_size, epochs=epochs)
history_dict['LSTM'].append(LSTM_h)
del model_LSTM
K.clear_session()
model_FFN = FFN(input_shape=(1, 208))
FFN_h = train(model_FFN, X_train_session, y_train_session, 1, batch_size=batch_size, epochs=epochs)
history_dict['FFN'].append(FFN_h)
del model_FFN
K.clear_session()
model_CNN_1D = CNN_1D(input_shape=(208, 1))
X_train_session = np.reshape(X_train_session, (X_train_session.shape[0], 208, 1))
X_test_session = np.reshape(X_test_session, (X_test_session.shape[0], 208, 1))
CNN_1D_h = train(model_CNN_1D, X_train_session, y_train_session, 1, batch_size=batch_size, epochs=epochs)
history_dict['CNN_1D'].append(CNN_1D_h)
del model_CNN_1D
K.clear_session()
fig, axs = plt.subplots(2, 2, sharey=True)
plt.ylim(0, 1)
# GRU plot:
axs[0, 0].plot(history_dict['GRU'][0].history["accuracy"])
axs[0, 0].plot(history_dict['GRU'][1].history["accuracy"], 'tab:orange')
axs[0, 0].plot(history_dict['GRU'][2].history["accuracy"], 'tab:green')
axs[0, 0].plot(history_dict['GRU'][3].history["accuracy"], 'tab:red')
axs[0, 0].set_title('GRU')
# LSTM plot:
axs[0, 1].plot(history_dict['LSTM'][0].history["accuracy"])
axs[0, 1].plot(history_dict['LSTM'][1].history["accuracy"], 'tab:orange')
axs[0, 1].plot(history_dict['LSTM'][2].history["accuracy"], 'tab:green')
axs[0, 1].plot(history_dict['LSTM'][3].history["accuracy"], 'tab:red')
axs[0, 1].set_title('LSTM')
# FFN plot:
axs[1, 0].plot(history_dict['FFN'][0].history["accuracy"])
axs[1, 0].plot(history_dict['FFN'][1].history["accuracy"], 'tab:orange')
axs[1, 0].plot(history_dict['FFN'][2].history["accuracy"], 'tab:green')
axs[1, 0].plot(history_dict['FFN'][3].history["accuracy"], 'tab:red')
axs[1, 0].set_title('FFN')
# CNN_1D plot:
axs[1, 1].plot(history_dict['CNN_1D'][0].history["accuracy"])
axs[1, 1].plot(history_dict['CNN_1D'][1].history["accuracy"], 'tab:orange')
axs[1, 1].plot(history_dict['CNN_1D'][2].history["accuracy"], 'tab:green')
axs[1, 1].plot(history_dict['CNN_1D'][3].history["accuracy"], 'tab:red')
axs[1, 1].set_title('CNN_1D')
for ax in axs.flat:
ax.set(xlabel='Epochs', ylabel='Accuracy')
# Hide x labels and tick labels for top plots and y ticks for right plots.
for ax in axs.flat:
ax.label_outer()
plt.show()
# Plots the average training history of four networks inverse cross-validated
# Input: data, nr of sessions in total, batch_size and epochs
# Ouput: None -> plot
def plot_4_x_average_val(X, y, session_lengths, nr_sessions, batch_size=64, epochs=30):
history_dict = {'GRU_train': [],
'LSTM_train': [],
'FFN_train': [],
'CNN_1D_train': []}
history_dict_val = {'GRU_val': [],
'LSTM_val': [],
'FFN_val': [],
'CNN_1D_val': []}
for i in range(nr_sessions):
# Prepare data
X_val_session, X_train_session, y_val_session, y_train_session = prepare_datasets_sessions(X, y, session_lengths, i)
# GRU
model_GRU = GRU(input_shape=(1, 208))
GRU_h = train(model_GRU, X_train_session, y_train_session, 1, batch_size=batch_size, epochs=epochs,
X_validation=X_val_session, y_validation=y_val_session)
history_dict['GRU_train'].append(GRU_h.history['accuracy'])
history_dict_val['GRU_val'].append(GRU_h.history['val_accuracy'])
del model_GRU
K.clear_session()
# LSTM
model_LSTM = LSTM(input_shape=(1, 208))
LSTM_h = train(model_LSTM, X_train_session, y_train_session, 1, batch_size=batch_size, epochs=epochs,
X_validation=X_val_session, y_validation=y_val_session)
history_dict['LSTM_train'].append(LSTM_h.history['accuracy'])
history_dict_val['LSTM_val'].append(LSTM_h.history['val_accuracy'])
del model_LSTM
K.clear_session()
# FFN
model_FFN = FFN(input_shape=(1, 208))
FFN_h = train(model_FFN, X_train_session, y_train_session, 1, batch_size=batch_size, epochs=epochs,
X_validation=X_val_session, y_validation=y_val_session)
history_dict['FFN_train'].append(FFN_h.history['accuracy'])
history_dict_val['FFN_val'].append(FFN_h.history['val_accuracy'])
del model_FFN
K.clear_session()
# CNN_1D
model_CNN_1D = CNN_1D(input_shape=(208, 1))
X_train_session = np.reshape(X_train_session, (X_train_session.shape[0], 208, 1))
X_val_session = np.reshape(X_val_session, (X_val_session.shape[0], 208, 1))
CNN_1D_h = train(model_CNN_1D, X_train_session, y_train_session, 1, batch_size=batch_size, epochs=epochs,
X_validation=X_val_session, y_validation=y_val_session)
history_dict['CNN_1D_train'].append(CNN_1D_h.history['accuracy'])
history_dict_val['CNN_1D_val'].append(CNN_1D_h.history['val_accuracy'])
del model_CNN_1D
K.clear_session()
# Averaging out session training for each network
for key in history_dict:
history_dict[key] = list(np.average([x, y, z, c]) for x, y, z, c in list(zip(*history_dict[key])))
for key in history_dict_val:
history_dict_val[key] = list(np.average([x, y, z, c]) for x, y, z, c in list(zip(*history_dict_val[key])))
'''
history_dict = {'GRU_train': [0.5, 0.8],
'LSTM_train': [0.5, 0.9],
'FFN_train': [0.75, 0.8],
'CNN_1D_train': [0.8, 0.95]}
history_dict_val = {'GRU_val': [0.5, 0.8],
'LSTM_val': [0.5, 0.9],
'FFN_val': [0.75, 0.8],
'CNN_1D_val': [0.8, 0.95]}
'''
# Plot:
fig, axs = plt.subplots(2, sharey=True)
plt.ylim(0, 1)
plt.subplots_adjust(hspace=1.0, top=0.85, bottom=0.15, right=0.75)
fig.suptitle('Avarage accuracy with cross-session-training', fontsize=16)
axs[0].plot(history_dict['GRU_train'], label='GRU')
axs[0].plot(history_dict['LSTM_train'], 'tab:orange', label='LSTM')
axs[0].plot(history_dict['FFN_train'], 'tab:green', label='FFN')
axs[0].plot(history_dict['CNN_1D_train'], 'tab:red', label='CNN_1D')
axs[0].set_title('Training accuracy')
axs[1].plot(history_dict_val['GRU_val'], label='GRU')
axs[1].plot(history_dict_val['LSTM_val'], 'tab:orange', label='LSTM')
axs[1].plot(history_dict_val['FFN_val'], 'tab:green', label='FFN')
axs[1].plot(history_dict_val['CNN_1D_val'], 'tab:red', label='CNN_1D')
axs[1].set_title('Validation accuracy')
for ax in axs.flat:
ax.set(xlabel='Epochs', ylabel='Accuracy')
plt.legend(bbox_to_anchor=(1.05, 1.5), title='Networks', loc='center left')
plt.show()
# Takes in data and labels, and splits it into train, validation and test sets by percentage # Takes in data and labels, and splits it into train, validation and test sets by percentage
# Input: Data, labels, whether to shuffle, % validatiion, % test # Input: Data, labels, whether to shuffle, % validatiion, % test
# Ouput: X_train, X_validation, X_test, y_train, y_validation, y_test # Ouput: X_train, X_validation, X_test, y_train, y_validation, y_test
@ -353,7 +173,7 @@ def train( model, X_train, y_train, verbose, batch_size=64, epochs=30,
#csv_path = str(Path.cwd()) + '/logs/{}/{}_train_log.csv'.format(MODEL_NAME, MODEL_NAME) #csv_path = str(Path.cwd()) + '/logs/{}/{}_train_log.csv'.format(MODEL_NAME, MODEL_NAME)
#csv_logger = CSVLogger(csv_path, append=False) #csv_logger = CSVLogger(csv_path, append=False)
if X_validation.any(): if X_validation != None:
history = model.fit(X_train, history = model.fit(X_train,
y_train, y_train,
validation_data=(X_validation, y_validation), validation_data=(X_validation, y_validation),
@ -465,57 +285,12 @@ def session_cross_validation(model_name:str, X, y, session_lengths, nr_sessions,
return average_result, session_training_results return average_result, session_training_results
# Retrieves data sets for each session as train set and evalutes on the others.
# the average of networks trained om them
# Input: raw data, session_lengths list, total nr of sessions, batch_size, and nr of epochs
# Ouput: tuple(cross validation average, list(result for each dataset(len=nr_sessions)))
def inverse_session_cross_validation(model_name:str, X, y, session_lengths, nr_sessions, log_to_csv=True, batch_size=64, epochs=30):
session_training_results = []
for i in range(nr_sessions):
X_test_session, X_train_session, y_test_session, y_train_session = prepare_datasets_sessions(X, y, session_lengths, i)
# Model:
if model_name == 'LSTM':
model = LSTM(input_shape=(1, 208))
elif model_name == 'GRU':
model = GRU(input_shape=(1, 208))
elif model_name == 'CNN_1D':
X_train_session = np.reshape(X_train_session, (X_train_session.shape[0], 208, 1))
X_test_session = np.reshape(X_test_session, (X_test_session.shape[0], 208, 1))
model = CNN_1D(input_shape=(208, 1))
elif model_name == 'FFN':
model = FFN(input_shape=(1, 208))
else:
raise Exception('Model not found')
train(model, X_train_session, y_train_session, verbose=1, batch_size=batch_size, epochs=epochs)
test_loss, test_acc = model.evaluate(X_test_session, y_test_session, verbose=0)
session_training_results.append(test_acc)
if log_to_csv:
custom_path = '/{}_train_session{}_log.csv'
prediction_csv_logger(X_test_session, y_test_session, model_name, model, i, custom_path)
del model
K.clear_session()
#print('Session', i, 'as test data gives accuracy:', test_acc)
average_result = statistics.mean((session_training_results))
return average_result, session_training_results
# Takes in test data and logs input data and the prediction from a model # Takes in test data and logs input data and the prediction from a model
# Input: raw data, session_lengths list, total nr of sessions, batch_size, and nr of epochs # Input: raw data, session_lengths list, total nr of sessions, batch_size, and nr of epochs
# Ouput: tuple(cross validation average, list(result for each dataset(len=nr_sessions))) # Ouput: tuple(cross validation average, list(result for each dataset(len=nr_sessions)))
def prediction_csv_logger(X, y, model_name, model, session_nr, custom_path=None): def prediction_csv_logger(X, y, model_name, model, session_nr):
csv_path = str(Path.cwd()) + '/logs/{}/{}_session{}_log.csv'.format(model_name, model_name, session_nr+1) csv_path = str(Path.cwd()) + '/logs/{}/{}_session{}_log.csv'.format(model_name, model_name, session_nr+1)
if custom_path:
path = str(Path.cwd()) + '/logs/{}' + custom_path
csv_path = path.format(model_name, model_name, session_nr+1)
layerOutput = model.predict(X, verbose=0) layerOutput = model.predict(X, verbose=0)
@ -602,12 +377,12 @@ if __name__ == "__main__":
NR_SUBJECTS = 5 NR_SUBJECTS = 5
NR_SESSIONS = 4 NR_SESSIONS = 4
BATCH_SIZE = 64 BATCH_SIZE = 64
EPOCHS = 10 EPOCHS = 5
TEST_SESSION_NR = 4 TEST_SESSION_NR = 4
VERBOSE = 1 VERBOSE = 1
MODEL_NAME = 'CNN_1D' MODEL_NAME = 'CNN_1D'
LOG = False LOG = True
# ----- Get prepared data: train, validation, and test ------ # ----- Get prepared data: train, validation, and test ------
# X_train.shape = (2806-X_test, 1, 208) # X_train.shape = (2806-X_test, 1, 208)
@ -655,9 +430,8 @@ if __name__ == "__main__":
''' '''
''' #'''
# ----- Cross validation ------ # ----- Cross validation ------
# Trained on three sessions, tested on one
average_GRU = session_cross_validation('GRU', X, y, session_lengths, nr_sessions=NR_SESSIONS, average_GRU = session_cross_validation('GRU', X, y, session_lengths, nr_sessions=NR_SESSIONS,
log_to_csv=LOG, log_to_csv=LOG,
batch_size=BATCH_SIZE, batch_size=BATCH_SIZE,
@ -676,44 +450,10 @@ if __name__ == "__main__":
epochs=EPOCHS) epochs=EPOCHS)
print('\n') print('\n')
print('Cross-validated GRU:', average_GRU) print('Crossvalidated GRU:', average_GRU)
print('Cross-validated LSTM:', average_LSTM) print('Crossvalidated LSTM:', average_LSTM)
print('Cross-validated FFN:', average_FFN) print('Crossvalidated FFN:', average_FFN)
print('Cross-validated CNN_1D:', average_CNN) print('Cross-validated CNN_1D:', average_CNN)
print('\n') print('\n')
''' #'''
'''
# ----- Inverse cross-validation ------
# Trained on one session, tested on three
average_GRU = inverse_session_cross_validation('GRU', X, y, session_lengths, nr_sessions=NR_SESSIONS,
log_to_csv=LOG,
batch_size=BATCH_SIZE,
epochs=EPOCHS)
average_LSTM = inverse_session_cross_validation('LSTM', X, y, session_lengths, nr_sessions=NR_SESSIONS,
log_to_csv=LOG,
batch_size=BATCH_SIZE,
epochs=EPOCHS)
average_FFN = inverse_session_cross_validation('FFN', X, y, session_lengths, nr_sessions=NR_SESSIONS,
log_to_csv=LOG,
batch_size=BATCH_SIZE,
epochs=EPOCHS)
average_CNN = inverse_session_cross_validation('CNN_1D', X, y, session_lengths, nr_sessions=NR_SESSIONS,
log_to_csv=LOG,
batch_size=BATCH_SIZE,
epochs=EPOCHS)
print('\n')
print('Cross-validated one-session-train GRU:', average_GRU)
print('Cross-validated one-session-train LSTM:', average_LSTM)
print('Cross-validated one-session-train FFN:', average_FFN)
print('Cross-validated one-session-train CNN_1D:', average_CNN)
print('\n')
'''
# ----- PLOTTING ------
#plot_4xinverse_cross_val(X, y, session_lengths, NR_SESSIONS, epochs=30)
plot_4_x_average_val(X, y, session_lengths, NR_SESSIONS, epochs=30)

View File

@ -125,9 +125,8 @@ def pretty(dict):
# DATA FUNCTIONS: --------------------------------------------------------------: # DATA FUNCTIONS: --------------------------------------------------------------:
# The CSV_handler takes in nr of subjects and nr of sessions in the experiment # The CSV_handler takes in data_type(soft, hard, softPP, hardPP)
# E.g. handler = CSV_handler(nr_subjects=5, nr_sessions=4) # E.g. handler = CSV_handler('soft')
# Needs to load data: handler.load_data(<type>, <type_directory_name>)
# Denoices one set of EMG data # Denoices one set of EMG data
# Input: CSV_handler and detailes for ID # Input: CSV_handler and detailes for ID
@ -239,25 +238,15 @@ def mfcc_all_emg_plots(csv_handler:CSV_handler):
plot_all_emg_mfcc(feat_list, label_list) plot_all_emg_mfcc(feat_list, label_list)
# MAIN: ------------------------------------------------------------------------: # MAIN: ------------------------------------------------------------------------:
if __name__ == "__main__": if __name__ == "__main__":
NR_SUBJECTS = 5 csv_handler = CSV_handler()
NR_SESSIONS = 4 csv_handler.load_data('soft')
soft_dir_name = 'Exp20201205_2myo_softType'
hard_dir_name = 'Exp20201205_2myo_hardType'
JSON_TEST_NAME = 'TEST_mfcc.json'
csv_handler = CSV_handler(NR_SUBJECTS, NR_SESSIONS)
dict = csv_handler.load_data('soft', soft_dir_name)
nn_handler = NN_handler(csv_handler) nn_handler = NN_handler(csv_handler)
nn_handler.store_mfcc_samples() nn_handler.store_mfcc_samples()
nn_handler.save_json_mfcc(JSON_TEST_NAME) nn_handler.save_json_mfcc()

View File

@ -11,10 +11,12 @@ Scripts to handle CSV files composed by 2 * 8 EMG sensors(left & right) devided
* Community libs: Python_speech_features, Pywt * Community libs: Python_speech_features, Pywt
#### Challanges in the module #### Challanges in the module
* The CSV handlig requires a specific file structure. Se "How to use it" * The CSV handlig is for the moment hard-coded to fit the current project due to a very specific file structure and respective naming convention.
* Preprocessing is still limited in Signal_prep.py * Preprocessing is still limited in Signal_prep.py
* Neural_Network_Analysis.py lacks a more general way to access multiple types of networks
#### Credits for insporational code #### Credits for insporational code
* Kapre: Keunwoochoi * Kapre: Keunwoochoi
* Audio-Classification: seth814 * Audio-Classification: seth814
* DeepLearningForAudioWithPyhton: musikalkemist * DeepLearningForAudioWithPyhton: musikalkemist
@ -33,8 +35,7 @@ Scripts to handle CSV files composed by 2 * 8 EMG sensors(left & right) devided
1. Clone the repo 1. Clone the repo
2. Place the data files in the working directory 2. Place the data files in the working directory
3. Place the data files within the `data`-folder 3. (For now) Add the session filenames in the desired load_data() function
(format: /`data`/<datatype>/<subject-folder+ID>/<session-folder>/<left/right-CSV-files>)
4. Assuming NN analysis: 4. Assuming NN analysis:
1. Create a `CSV_handler` object 1. Create a `CSV_handler` object
2. Load data with `load_data(CSV_handler, <datatype>)` 2. Load data with `load_data(CSV_handler, <datatype>)`