svm model for handcrafted and vggsh features

This commit is contained in:
em474re 2021-08-19 10:39:47 +02:00
parent d366309629
commit c037c2ab0f
4 changed files with 518 additions and 1 deletions

View File

@ -1 +1,21 @@
test
# covid-19 speech diagnosis
This repo contains the code created by Elien Martens during IAESTE internship summer 2021, Technical University Kosice (Slowakia).
## Dataset
The COVID-19 datasets can be obtained through The University of Cambridge. (see also compare.openaudio.eu, Interspeech Computational Paralinguistics Challenges 2021)
## How it works
- clone repo
- add data (see Dataset section above), so that the structuring is the following:
CovidSpeechChallenge
-> raw_files
-> train
-> test
-> devel
-> labels
-> vgg_features
-> hand_features
-> vggish
...
- run .\run_experiments.sh

View File

@ -0,0 +1,230 @@
import json
import os
import sys
import warnings
from math import pi
import librosa
import numpy as np
import pandas as pd
from scipy.fftpack import fft, hilbert
warnings.filterwarnings("ignore")
SR = 22050 # sample rate
FRAME_LEN = int(SR / 10) # 100 ms
HOP = int(FRAME_LEN / 2) # 50% overlap, meaning 5ms hop length
MFCC_dim = 13 # the MFCC dimension
def sta_fun(np_data):
"""Extract various statistical features from the numpy array provided as input.
:param np_data: the numpy array to extract the features from
:type np_data: numpy.ndarray
:return: The extracted features as a vector
:rtype: numpy.ndarray
"""
# perform a sanity check
if np_data is None:
raise ValueError("Input array cannot be None")
# perform the feature extraction
dat_min = np.min(np_data)
dat_max = np.max(np_data)
dat_mean = np.mean(np_data)
dat_rms = np.sqrt(np.sum(np.square(np_data)) / len(np_data))
dat_median = np.median(np_data)
dat_qrl1 = np.percentile(np_data, 25)
dat_qrl3 = np.percentile(np_data, 75)
dat_lower_q = np.quantile(np_data, 0.25, interpolation="lower")
dat_higher_q = np.quantile(np_data, 0.75, interpolation="higher")
dat_iqrl = dat_higher_q - dat_lower_q
dat_std = np.std(np_data)
s = pd.Series(np_data)
dat_skew = s.skew()
dat_kurt = s.kurt()
# finally return the features in a concatenated array (as a vector)
return np.array([dat_mean, dat_min, dat_max, dat_std, dat_rms,
dat_median, dat_qrl1, dat_qrl3, dat_iqrl, dat_skew, dat_kurt])
def get_period(signal, signal_sr):
"""Extract the period from the the provided signal
:param signal: the signal to extract the period from
:type signal: numpy.ndarray
:param signal_sr: the sampling rate of the input signal
:type signal_sr: integer
:return: a vector containing the signal period
:rtype: numpy.ndarray
"""
# perform a sanity check
if signal is None:
raise ValueError("Input signal cannot be None")
# transform the signal to the hilbert space
hy = hilbert(signal)
ey = np.sqrt(signal ** 2 + hy ** 2)
min_time = 1.0 / signal_sr
tot_time = len(ey) * min_time
pow_ft = np.abs(fft(ey))
peak_freq = pow_ft[3: int(len(pow_ft) / 2)]
peak_freq_pos = peak_freq.argmax()
peak_freq_val = 2 * pi * (peak_freq_pos + 2) / tot_time
period = 2 * pi / peak_freq_val
return np.array([period])
def extract_signal_features(signal, signal_sr):
"""Extract part of handcrafted features from the input signal.
:param signal: the signal the extract features from
:type signal: numpy.ndarray
:param signal_sr: the sample rate of the signal
:type signal_sr: integer
:return: the populated feature vector
:rtype: numpy.ndarray
"""
# normalise the sound signal before processing
signal = signal / np.max(np.abs(signal))
# trim the signal to the appropriate length
trimmed_signal, idc = librosa.effects.trim(signal, frame_length=FRAME_LEN, hop_length=HOP)
# extract the signal duration
signal_duration = librosa.get_duration(y=trimmed_signal, sr=signal_sr)
# use librosa to track the beats
tempo, beats = librosa.beat.beat_track(y=trimmed_signal, sr=signal_sr)
# find the onset strength of the trimmed signal
o_env = librosa.onset.onset_strength(trimmed_signal, sr=signal_sr)
# find the frames of the onset
onset_frames = librosa.onset.onset_detect(onset_envelope=o_env, sr=signal_sr)
# keep only the first onset frame
onsets = onset_frames.shape[0]
# decompose the signal into its magnitude and the phase components such that signal = mag * phase
mag, phase = librosa.magphase(librosa.stft(trimmed_signal, n_fft=FRAME_LEN, hop_length=HOP))
# extract the rms from the magnitude component
rms = librosa.feature.rms(y=trimmed_signal)[0]
# extract the spectral centroid of the magnitude
cent = librosa.feature.spectral_centroid(S=mag)[0]
# extract the spectral rolloff point from the magnitude
rolloff = librosa.feature.spectral_rolloff(S=mag, sr=signal_sr)[0]
# extract the zero crossing rate from the trimmed signal using the predefined frame and hop lengths
zcr = librosa.feature.zero_crossing_rate(trimmed_signal, frame_length=FRAME_LEN, hop_length=HOP)[0]
# pack the extracted features into the feature vector to be returned
signal_features = np.concatenate(
(
np.array([signal_duration, tempo, onsets]),
get_period(signal, signal_sr=sr),
sta_fun(rms),
sta_fun(cent),
sta_fun(rolloff),
sta_fun(zcr),
),
axis=0,
)
# finally, return the gathered features and the trimmed signal
return signal_features, trimmed_signal
def extract_mfcc(signal, signal_sr=SR, n_fft=FRAME_LEN, hop_length=HOP, n_mfcc=MFCC_dim):
"""Extracts the Mel-frequency cepstral coefficients (MFCC) from the provided signal
:param signal: the signal to extract the mfcc from
:type signal: numpy.ndarray
:param signal_sr: the signal sample rate
:type signal_sr: integer
:param n_fft: the fft window size
:type n_fft: integer
:param hop_length: the hop length
:type hop_length: integer
:param n_mfcc: the dimension of the mfcc
:type n_mfcc: integer
:return: the populated feature vector
:rtype: numpy.ndarray
"""
# compute the mfcc of the input signal
mfcc = librosa.feature.mfcc(
y=signal, sr=signal_sr, n_fft=n_fft, hop_length=hop_length, n_mfcc=n_mfcc, dct_type=3
)
# extract the first and second order deltas from the retrieved mfcc's
mfcc_delta = librosa.feature.delta(mfcc, order=1)
mfcc_delta2 = librosa.feature.delta(mfcc, order=2)
# create the mfcc array
mfccs = []
# populate it using the extracted features
for i in range(n_mfcc):
mfccs.extend(sta_fun(mfcc[i, :]))
for i in range(n_mfcc):
mfccs.extend(sta_fun(mfcc_delta[i, :]))
for i in range(n_mfcc):
mfccs.extend(sta_fun(mfcc_delta2[i, :]))
# finally return the coefficients
return mfccs
def extract_features(signal, signal_sr):
"""Extract all features from the input signal.
:param signal: the signal the extract features from
:type signal: numpy.ndarray
:param signal_sr: the sample rate of the signal
:type signal_sr: integer
:return: the extracted feature vector
:rtype: numpy.ndarray
"""
# extract the signal features
signal_features, trimmed_signal = extract_signal_features(signal, signal_sr)
# extract the mfcc's from the trimmed signal and get the statistical feature.
mfccs = extract_mfcc(trimmed_signal)
return np.concatenate((signal_features, mfccs), axis=0)
if __name__ == "__main__":
# data path (raw_files\devel OR test OR train folder)
path = sys.argv[1]
x_data = []
y_label = []
y_uid = []
#extract features
files = os.listdir(path)
for file in files:
try:
sample_path = os.path.join(path,file)
file_b = sample_path
y, sr = librosa.load(
file_b, sr=SR, mono=True, offset=0.0, duration=None
)
except IOError:
print("file doesn't exit")
continue
yt, index = librosa.effects.trim(
y, frame_length=FRAME_LEN, hop_length=HOP
)
duration = librosa.get_duration(y=yt, sr=sr)
if duration < 2:
continue
features = extract_features(signal=y, signal_sr=sr)
x_data.append(features.tolist())
#save features in numpy.array
x_data = np.array(x_data)
labels_path = 'labels\\' + os.path.basename(os.path.normpath(path)) + '.csv'
df = pd.read_csv(labels_path, sep =',')
y_label = df.label
np.save(os.path.join('hand_features',"x_" + os.path.basename(os.path.normpath(path)) + "_data.npy"), x_data)
np.save(os.path.join('hand_features',"y_" + os.path.basename(os.path.normpath(path)) + "_label.npy"), y_label)

130
extract_vgg_features.py Normal file
View File

@ -0,0 +1,130 @@
from __future__ import print_function
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
import pandas as pd
import os
import json
import sys
import numpy as np
import librosa
import urllib
sys.path.append('vggish')
import vggish_input
import vggish_params
import vggish_slim
SR = 22050 # sample rate
SR_VGG = 16000 # VGG pretrained model sample rate
FRAME_LEN = int(SR / 10) # 100 ms
HOP = int(FRAME_LEN / 2) # 50%overlap, 5ms
def download(url, dst_dir):
"""Download file.
If the file not exist then download it.
Args:url: Web location of the file.
Returns: path to downloaded file.
"""
filename = url.split('/')[-1]
filepath = os.path.join(dst_dir, filename)
if not os.path.exists(filepath):
def _progress(count, block_size, total_size):
sys.stdout.write('\r>> Downloading %s %.1f%%' %
(filename,
float(count * block_size) / float(total_size) * 100.0))
sys.stdout.flush()
filepath, _ = urllib.request.urlretrieve(url, filepath, _progress)
statinfo = os.stat(filepath)
print('Successfully downloaded:', filename, statinfo.st_size, 'bytes.')
return filepath
def sta_fun_2(npdata): # 1D np array
"""Extract various statistical features from the numpy array provided as input.
:param np_data: the numpy array to extract the features from
:type np_data: numpy.ndarray
:return: The extracted features as a vector
:rtype: numpy.ndarray
"""
# perform a sanity check
if npdata is None:
raise ValueError("Input array cannot be None")
# perform the feature extraction
Mean = np.mean(npdata, axis=0)
Std = np.std(npdata, axis=0)
# finally return the features in a concatenated array (as a vector)
return np.concatenate((Mean, Std), axis=0).reshape(1, -1)
print("\nTesting your install of VGGish\n")
# Paths to downloaded VGGish files.
checkpoint_path = "vggish/vggish_model.ckpt"
if not os.path.exists(checkpoint_path): #automatically download the checkpoint if not exist.
url = 'https://storage.googleapis.com/audioset/vggish_model.ckpt'
download(url, './vggish/')
if __name__ == "__main__":
# data path (raw_files\devel OR test OR train folder)
path = sys.argv[1]
##feature extraction
with tf.Graph().as_default(), tf.Session() as sess:
# load pre-trained model
vggish_slim.define_vggish_slim()
vggish_slim.load_vggish_slim_checkpoint(sess, checkpoint_path)
features_tensor = sess.graph.get_tensor_by_name(vggish_params.INPUT_TENSOR_NAME)
embedding_tensor = sess.graph.get_tensor_by_name(
vggish_params.OUTPUT_TENSOR_NAME
)
x_data = []
y_label = []
y_uid = []
# extract features
files = os.listdir(path)
for file in files:
try:
sample_path = os.path.join(path,file)
file_b = sample_path
y, sr = librosa.load(
file_b, sr=SR, mono=True, offset=0.0, duration=None
)
except IOError:
print("file doesn't exit")
continue
yt, index = librosa.effects.trim(
y, frame_length=FRAME_LEN, hop_length=HOP
)
duration = librosa.get_duration(y=yt, sr=sr)
if duration < 2:
continue
input_batch = vggish_input.waveform_to_examples(
yt, SR_VGG
) # ?x96x64 --> ?x128
[features] = sess.run(
[embedding_tensor], feed_dict={features_tensor: input_batch}
)
features = sta_fun_2(features)
x_data.append(features.tolist())
y_uid.append(file)
#save features in numpy.array
x_data = np.array(x_data)
labels_path = 'labels\\' + os.path.basename(os.path.normpath(path)) + '.csv'
df = pd.read_csv(labels_path, sep =',')
y_label = df.label
np.save(os.path.join('vgg_features',"x_" + os.path.basename(os.path.normpath(path)) + "_data_vgg.npy"), x_data)
np.save(os.path.join('vgg_features',"y_" + os.path.basename(os.path.normpath(path)) + "_label_vgg.npy"), y_label)

137
svm.py Normal file
View File

@ -0,0 +1,137 @@
from sklearn.svm import LinearSVC
from sklearn.base import clone
from sklearn.pipeline import Pipeline
from sklearn.utils import resample
from sklearn.model_selection import PredefinedSplit, GridSearchCV
from sklearn.preprocessing import LabelEncoder, StandardScaler, MinMaxScaler
from sklearn.metrics import classification_report, confusion_matrix, recall_score, make_scorer
from joblib import Parallel, delayed
import pandas as pd
import scipy
import os, yaml
import json
import sys
import arff
import numpy as np
from tqdm import tqdm
from glob import glob
RANDOM_SEED = 42
GRID = [
{'scaler': [StandardScaler(), None],
'estimator': [LinearSVC(random_state=RANDOM_SEED)],
'estimator__loss': ['squared_hinge'],
'estimator__C': np.logspace(-1, -5, num=5),
'estimator__class_weight': ['balanced', None],
'estimator__max_iter': [100000]
}
]
PIPELINE = Pipeline([('scaler', None), ('estimator', LinearSVC())])
if __name__=='__main__':
# load features and labels
devel_X_vgg = np.load(
"vgg_features\\x_devel_data_vgg.npy", allow_pickle=True
)
test_X_vgg = np.load(
"vgg_features\\x_test_data_vgg.npy", allow_pickle=True
)
train_X_vgg = np.load(
"vgg_features\\x_train_data_vgg.npy", allow_pickle=True
)
devel_X_hand = np.load(
"hand_features\\x_devel_data.npy", allow_pickle=True
)
test_X_hand = np.load(
"hand_features\\x_test_data.npy", allow_pickle=True
)
train_X_hand = np.load(
"hand_features\\x_train_data.npy", allow_pickle=True
)
devel_y = np.load(
"vgg_features\\y_devel_label_vgg.npy", allow_pickle=True
)
test_y = np.load(
"vgg_features\\y_test_label_vgg.npy", allow_pickle=True
)
train_y = np.load(
"vgg_features\\y_train_label_vgg.npy", allow_pickle=True
)
num_train = train_X_vgg.shape[0]
num_devel = devel_X_vgg.shape[0]
split_indices = np.repeat([-1, 0], [num_train, num_devel])
split = PredefinedSplit(split_indices)
train_X_vgg = np.squeeze(train_X_vgg)
devel_X_vgg = np.squeeze(devel_X_vgg)
test_X_vgg = np.squeeze(test_X_vgg)
devel_X = np.concatenate(
(
devel_X_hand,
devel_X_vgg
),
axis=1,
)
test_X = np.concatenate(
(
test_X_hand,
test_X_vgg
),
axis=1,
)
train_X = np.concatenate(
(
train_X_hand,
train_X_vgg
),
axis=1,
)
X = np.append(train_X, devel_X, axis=0)
y = np.append(train_y, devel_y, axis=0)
grid_search = GridSearchCV(estimator=PIPELINE, param_grid=GRID,
scoring=make_scorer(recall_score, average='macro'),
n_jobs=-1, cv=split, refit=True, verbose=1,
return_train_score=False)
# fit on data. train -> devel first, then train+devel implicit
grid_search.fit(X, y)
best_estimator = grid_search.best_estimator_
# fit clone of best estimator on train again for devel predictions
estimator = clone(best_estimator, safe=False)
estimator.fit(train_X, train_y)
preds = estimator.predict(devel_X)
metrics = {'dev': {}, 'test': {}}
# devel metrics
print('DEVEL')
uar = recall_score(devel_y, preds, average='macro')
cm = confusion_matrix(devel_y, preds)
print(f'UAR: {uar}\n{classification_report(devel_y, preds)}\n\nConfusion Matrix:\n\n{cm}')
pd.DataFrame(grid_search.cv_results_).to_csv('grid_search.csv', index=False)
# test metrics
print('TEST')
preds = best_estimator.predict(test_X)
uar = recall_score(test_y, preds, average='macro')
cm = confusion_matrix(test_y, preds)
print(f'UAR: {uar}\n{classification_report(test_y, preds)}\n\nConfusion Matrix:\n\n{cm}')