diff --git a/README.md b/README.md index 30d74d2..3488658 100644 --- a/README.md +++ b/README.md @@ -1 +1,21 @@ -test \ No newline at end of file +# covid-19 speech diagnosis +This repo contains the code created by Elien Martens during IAESTE internship summer 2021, Technical University Kosice (Slowakia). + +## Dataset + +The COVID-19 datasets can be obtained through The University of Cambridge. (see also compare.openaudio.eu, Interspeech Computational Paralinguistics Challenges 2021) + +## How it works +- clone repo +- add data (see Dataset section above), so that the structuring is the following: + CovidSpeechChallenge + -> raw_files + -> train + -> test + -> devel + -> labels + -> vgg_features + -> hand_features + -> vggish + ... +- run .\run_experiments.sh \ No newline at end of file diff --git a/extract_handcrafted_features.py b/extract_handcrafted_features.py new file mode 100644 index 0000000..dff29a2 --- /dev/null +++ b/extract_handcrafted_features.py @@ -0,0 +1,230 @@ +import json +import os +import sys +import warnings +from math import pi + +import librosa +import numpy as np +import pandas as pd +from scipy.fftpack import fft, hilbert + +warnings.filterwarnings("ignore") + +SR = 22050 # sample rate +FRAME_LEN = int(SR / 10) # 100 ms +HOP = int(FRAME_LEN / 2) # 50% overlap, meaning 5ms hop length +MFCC_dim = 13 # the MFCC dimension + +def sta_fun(np_data): + """Extract various statistical features from the numpy array provided as input. + + :param np_data: the numpy array to extract the features from + :type np_data: numpy.ndarray + :return: The extracted features as a vector + :rtype: numpy.ndarray + """ + + # perform a sanity check + if np_data is None: + raise ValueError("Input array cannot be None") + + # perform the feature extraction + dat_min = np.min(np_data) + dat_max = np.max(np_data) + dat_mean = np.mean(np_data) + dat_rms = np.sqrt(np.sum(np.square(np_data)) / len(np_data)) + dat_median = np.median(np_data) + dat_qrl1 = np.percentile(np_data, 25) + dat_qrl3 = np.percentile(np_data, 75) + dat_lower_q = np.quantile(np_data, 0.25, interpolation="lower") + dat_higher_q = np.quantile(np_data, 0.75, interpolation="higher") + dat_iqrl = dat_higher_q - dat_lower_q + dat_std = np.std(np_data) + s = pd.Series(np_data) + dat_skew = s.skew() + dat_kurt = s.kurt() + + # finally return the features in a concatenated array (as a vector) + return np.array([dat_mean, dat_min, dat_max, dat_std, dat_rms, + dat_median, dat_qrl1, dat_qrl3, dat_iqrl, dat_skew, dat_kurt]) + +def get_period(signal, signal_sr): + """Extract the period from the the provided signal + + :param signal: the signal to extract the period from + :type signal: numpy.ndarray + :param signal_sr: the sampling rate of the input signal + :type signal_sr: integer + :return: a vector containing the signal period + :rtype: numpy.ndarray + """ + + # perform a sanity check + if signal is None: + raise ValueError("Input signal cannot be None") + + # transform the signal to the hilbert space + hy = hilbert(signal) + + ey = np.sqrt(signal ** 2 + hy ** 2) + min_time = 1.0 / signal_sr + tot_time = len(ey) * min_time + pow_ft = np.abs(fft(ey)) + peak_freq = pow_ft[3: int(len(pow_ft) / 2)] + peak_freq_pos = peak_freq.argmax() + peak_freq_val = 2 * pi * (peak_freq_pos + 2) / tot_time + period = 2 * pi / peak_freq_val + + return np.array([period]) + +def extract_signal_features(signal, signal_sr): + """Extract part of handcrafted features from the input signal. + + :param signal: the signal the extract features from + :type signal: numpy.ndarray + :param signal_sr: the sample rate of the signal + :type signal_sr: integer + :return: the populated feature vector + :rtype: numpy.ndarray + """ + + # normalise the sound signal before processing + signal = signal / np.max(np.abs(signal)) + # trim the signal to the appropriate length + trimmed_signal, idc = librosa.effects.trim(signal, frame_length=FRAME_LEN, hop_length=HOP) + # extract the signal duration + signal_duration = librosa.get_duration(y=trimmed_signal, sr=signal_sr) + # use librosa to track the beats + tempo, beats = librosa.beat.beat_track(y=trimmed_signal, sr=signal_sr) + # find the onset strength of the trimmed signal + o_env = librosa.onset.onset_strength(trimmed_signal, sr=signal_sr) + # find the frames of the onset + onset_frames = librosa.onset.onset_detect(onset_envelope=o_env, sr=signal_sr) + # keep only the first onset frame + onsets = onset_frames.shape[0] + # decompose the signal into its magnitude and the phase components such that signal = mag * phase + mag, phase = librosa.magphase(librosa.stft(trimmed_signal, n_fft=FRAME_LEN, hop_length=HOP)) + # extract the rms from the magnitude component + rms = librosa.feature.rms(y=trimmed_signal)[0] + # extract the spectral centroid of the magnitude + cent = librosa.feature.spectral_centroid(S=mag)[0] + # extract the spectral rolloff point from the magnitude + rolloff = librosa.feature.spectral_rolloff(S=mag, sr=signal_sr)[0] + # extract the zero crossing rate from the trimmed signal using the predefined frame and hop lengths + zcr = librosa.feature.zero_crossing_rate(trimmed_signal, frame_length=FRAME_LEN, hop_length=HOP)[0] + + # pack the extracted features into the feature vector to be returned + signal_features = np.concatenate( + ( + np.array([signal_duration, tempo, onsets]), + get_period(signal, signal_sr=sr), + sta_fun(rms), + sta_fun(cent), + sta_fun(rolloff), + sta_fun(zcr), + ), + axis=0, + ) + + # finally, return the gathered features and the trimmed signal + return signal_features, trimmed_signal + +def extract_mfcc(signal, signal_sr=SR, n_fft=FRAME_LEN, hop_length=HOP, n_mfcc=MFCC_dim): + """Extracts the Mel-frequency cepstral coefficients (MFCC) from the provided signal + + :param signal: the signal to extract the mfcc from + :type signal: numpy.ndarray + :param signal_sr: the signal sample rate + :type signal_sr: integer + :param n_fft: the fft window size + :type n_fft: integer + :param hop_length: the hop length + :type hop_length: integer + :param n_mfcc: the dimension of the mfcc + :type n_mfcc: integer + :return: the populated feature vector + :rtype: numpy.ndarray + """ + # compute the mfcc of the input signal + mfcc = librosa.feature.mfcc( + y=signal, sr=signal_sr, n_fft=n_fft, hop_length=hop_length, n_mfcc=n_mfcc, dct_type=3 + ) + + # extract the first and second order deltas from the retrieved mfcc's + mfcc_delta = librosa.feature.delta(mfcc, order=1) + mfcc_delta2 = librosa.feature.delta(mfcc, order=2) + + # create the mfcc array + mfccs = [] + + # populate it using the extracted features + for i in range(n_mfcc): + mfccs.extend(sta_fun(mfcc[i, :])) + for i in range(n_mfcc): + mfccs.extend(sta_fun(mfcc_delta[i, :])) + for i in range(n_mfcc): + mfccs.extend(sta_fun(mfcc_delta2[i, :])) + + # finally return the coefficients + return mfccs + +def extract_features(signal, signal_sr): + """Extract all features from the input signal. + + :param signal: the signal the extract features from + :type signal: numpy.ndarray + :param signal_sr: the sample rate of the signal + :type signal_sr: integer + :return: the extracted feature vector + :rtype: numpy.ndarray + """ + + # extract the signal features + signal_features, trimmed_signal = extract_signal_features(signal, signal_sr) + + # extract the mfcc's from the trimmed signal and get the statistical feature. + mfccs = extract_mfcc(trimmed_signal) + + return np.concatenate((signal_features, mfccs), axis=0) + + +if __name__ == "__main__": + # data path (raw_files\devel OR test OR train folder) + path = sys.argv[1] + + x_data = [] + y_label = [] + y_uid = [] + + #extract features + files = os.listdir(path) + for file in files: + try: + sample_path = os.path.join(path,file) + file_b = sample_path + y, sr = librosa.load( + file_b, sr=SR, mono=True, offset=0.0, duration=None + ) + except IOError: + print("file doesn't exit") + continue + + yt, index = librosa.effects.trim( + y, frame_length=FRAME_LEN, hop_length=HOP + ) + duration = librosa.get_duration(y=yt, sr=sr) + if duration < 2: + continue + features = extract_features(signal=y, signal_sr=sr) + + x_data.append(features.tolist()) + + #save features in numpy.array + x_data = np.array(x_data) + labels_path = 'labels\\' + os.path.basename(os.path.normpath(path)) + '.csv' + df = pd.read_csv(labels_path, sep =',') + y_label = df.label + + np.save(os.path.join('hand_features',"x_" + os.path.basename(os.path.normpath(path)) + "_data.npy"), x_data) + np.save(os.path.join('hand_features',"y_" + os.path.basename(os.path.normpath(path)) + "_label.npy"), y_label) \ No newline at end of file diff --git a/extract_vgg_features.py b/extract_vgg_features.py new file mode 100644 index 0000000..9fe7761 --- /dev/null +++ b/extract_vgg_features.py @@ -0,0 +1,130 @@ +from __future__ import print_function + +import tensorflow.compat.v1 as tf +tf.disable_v2_behavior() + +import pandas as pd +import os +import json +import sys +import numpy as np + +import librosa + +import urllib +sys.path.append('vggish') +import vggish_input +import vggish_params +import vggish_slim + +SR = 22050 # sample rate +SR_VGG = 16000 # VGG pretrained model sample rate +FRAME_LEN = int(SR / 10) # 100 ms +HOP = int(FRAME_LEN / 2) # 50%overlap, 5ms + + +def download(url, dst_dir): + """Download file. + If the file not exist then download it. + Args:url: Web location of the file. + Returns: path to downloaded file. + """ + filename = url.split('/')[-1] + filepath = os.path.join(dst_dir, filename) + if not os.path.exists(filepath): + def _progress(count, block_size, total_size): + sys.stdout.write('\r>> Downloading %s %.1f%%' % + (filename, + float(count * block_size) / float(total_size) * 100.0)) + sys.stdout.flush() + + filepath, _ = urllib.request.urlretrieve(url, filepath, _progress) + statinfo = os.stat(filepath) + print('Successfully downloaded:', filename, statinfo.st_size, 'bytes.') + return filepath + +def sta_fun_2(npdata): # 1D np array + """Extract various statistical features from the numpy array provided as input. + + :param np_data: the numpy array to extract the features from + :type np_data: numpy.ndarray + :return: The extracted features as a vector + :rtype: numpy.ndarray + """ + + # perform a sanity check + if npdata is None: + raise ValueError("Input array cannot be None") + + # perform the feature extraction + Mean = np.mean(npdata, axis=0) + Std = np.std(npdata, axis=0) + + # finally return the features in a concatenated array (as a vector) + return np.concatenate((Mean, Std), axis=0).reshape(1, -1) + +print("\nTesting your install of VGGish\n") +# Paths to downloaded VGGish files. +checkpoint_path = "vggish/vggish_model.ckpt" + +if not os.path.exists(checkpoint_path): #automatically download the checkpoint if not exist. + url = 'https://storage.googleapis.com/audioset/vggish_model.ckpt' + download(url, './vggish/') + + +if __name__ == "__main__": + # data path (raw_files\devel OR test OR train folder) + path = sys.argv[1] + + ##feature extraction + with tf.Graph().as_default(), tf.Session() as sess: + # load pre-trained model + vggish_slim.define_vggish_slim() + vggish_slim.load_vggish_slim_checkpoint(sess, checkpoint_path) + features_tensor = sess.graph.get_tensor_by_name(vggish_params.INPUT_TENSOR_NAME) + embedding_tensor = sess.graph.get_tensor_by_name( + vggish_params.OUTPUT_TENSOR_NAME + ) + + x_data = [] + y_label = [] + y_uid = [] + + # extract features + files = os.listdir(path) + for file in files: + try: + sample_path = os.path.join(path,file) + file_b = sample_path + y, sr = librosa.load( + file_b, sr=SR, mono=True, offset=0.0, duration=None + ) + except IOError: + print("file doesn't exit") + continue + + yt, index = librosa.effects.trim( + y, frame_length=FRAME_LEN, hop_length=HOP + ) + duration = librosa.get_duration(y=yt, sr=sr) + if duration < 2: + continue + input_batch = vggish_input.waveform_to_examples( + yt, SR_VGG + ) # ?x96x64 --> ?x128 + [features] = sess.run( + [embedding_tensor], feed_dict={features_tensor: input_batch} + ) + features = sta_fun_2(features) + + x_data.append(features.tolist()) + y_uid.append(file) + + #save features in numpy.array + x_data = np.array(x_data) + labels_path = 'labels\\' + os.path.basename(os.path.normpath(path)) + '.csv' + df = pd.read_csv(labels_path, sep =',') + y_label = df.label + + np.save(os.path.join('vgg_features',"x_" + os.path.basename(os.path.normpath(path)) + "_data_vgg.npy"), x_data) + np.save(os.path.join('vgg_features',"y_" + os.path.basename(os.path.normpath(path)) + "_label_vgg.npy"), y_label) diff --git a/svm.py b/svm.py new file mode 100644 index 0000000..1d0ac79 --- /dev/null +++ b/svm.py @@ -0,0 +1,137 @@ +from sklearn.svm import LinearSVC +from sklearn.base import clone +from sklearn.pipeline import Pipeline +from sklearn.utils import resample +from sklearn.model_selection import PredefinedSplit, GridSearchCV +from sklearn.preprocessing import LabelEncoder, StandardScaler, MinMaxScaler +from sklearn.metrics import classification_report, confusion_matrix, recall_score, make_scorer +from joblib import Parallel, delayed +import pandas as pd +import scipy +import os, yaml +import json +import sys +import arff +import numpy as np +from tqdm import tqdm +from glob import glob + +RANDOM_SEED = 42 + +GRID = [ + {'scaler': [StandardScaler(), None], + 'estimator': [LinearSVC(random_state=RANDOM_SEED)], + 'estimator__loss': ['squared_hinge'], + 'estimator__C': np.logspace(-1, -5, num=5), + 'estimator__class_weight': ['balanced', None], + 'estimator__max_iter': [100000] + } +] + +PIPELINE = Pipeline([('scaler', None), ('estimator', LinearSVC())]) + +if __name__=='__main__': + + # load features and labels + devel_X_vgg = np.load( + "vgg_features\\x_devel_data_vgg.npy", allow_pickle=True + ) + + test_X_vgg = np.load( + "vgg_features\\x_test_data_vgg.npy", allow_pickle=True + ) + + train_X_vgg = np.load( + "vgg_features\\x_train_data_vgg.npy", allow_pickle=True + ) + + devel_X_hand = np.load( + "hand_features\\x_devel_data.npy", allow_pickle=True + ) + + test_X_hand = np.load( + "hand_features\\x_test_data.npy", allow_pickle=True + ) + + train_X_hand = np.load( + "hand_features\\x_train_data.npy", allow_pickle=True + ) + + devel_y = np.load( + "vgg_features\\y_devel_label_vgg.npy", allow_pickle=True + ) + + test_y = np.load( + "vgg_features\\y_test_label_vgg.npy", allow_pickle=True + ) + + train_y = np.load( + "vgg_features\\y_train_label_vgg.npy", allow_pickle=True + ) + + num_train = train_X_vgg.shape[0] + num_devel = devel_X_vgg.shape[0] + split_indices = np.repeat([-1, 0], [num_train, num_devel]) + split = PredefinedSplit(split_indices) + + train_X_vgg = np.squeeze(train_X_vgg) + devel_X_vgg = np.squeeze(devel_X_vgg) + test_X_vgg = np.squeeze(test_X_vgg) + + devel_X = np.concatenate( + ( + devel_X_hand, + devel_X_vgg + ), + axis=1, + ) + + test_X = np.concatenate( + ( + test_X_hand, + test_X_vgg + ), + axis=1, + ) + + train_X = np.concatenate( + ( + train_X_hand, + train_X_vgg + ), + axis=1, + ) + + X = np.append(train_X, devel_X, axis=0) + y = np.append(train_y, devel_y, axis=0) + + grid_search = GridSearchCV(estimator=PIPELINE, param_grid=GRID, + scoring=make_scorer(recall_score, average='macro'), + n_jobs=-1, cv=split, refit=True, verbose=1, + return_train_score=False) + + # fit on data. train -> devel first, then train+devel implicit + grid_search.fit(X, y) + best_estimator = grid_search.best_estimator_ + + # fit clone of best estimator on train again for devel predictions + estimator = clone(best_estimator, safe=False) + estimator.fit(train_X, train_y) + preds = estimator.predict(devel_X) + + metrics = {'dev': {}, 'test': {}} + + # devel metrics + print('DEVEL') + uar = recall_score(devel_y, preds, average='macro') + cm = confusion_matrix(devel_y, preds) + print(f'UAR: {uar}\n{classification_report(devel_y, preds)}\n\nConfusion Matrix:\n\n{cm}') + + pd.DataFrame(grid_search.cv_results_).to_csv('grid_search.csv', index=False) + + # test metrics + print('TEST') + preds = best_estimator.predict(test_X) + uar = recall_score(test_y, preds, average='macro') + cm = confusion_matrix(test_y, preds) + print(f'UAR: {uar}\n{classification_report(test_y, preds)}\n\nConfusion Matrix:\n\n{cm}') \ No newline at end of file