CovidSpeechChallenge_2021/transformer.py
2021-08-24 10:44:14 +02:00

258 lines
6.6 KiB
Python

import numpy as np
from keras import backend as K
from sklearn.metrics import classification_report, confusion_matrix, recall_score, make_scorer
import tensorflow as tf
def non_nan_average(x):
# Computes the average of all elements that are not NaN in a rank 1 tensor
nan_mask = tf.math.is_nan(x)
x = tf.boolean_mask(x, tf.logical_not(nan_mask))
return K.mean(x)
def uar_accuracy(y_true, y_pred):
# Calculate the label from one-hot encoding
pred_class_label = K.argmax(y_pred, axis=-1)
true_class_label = K.argmax(y_true, axis=-1)
cf_mat = tf.math.confusion_matrix(true_class_label, pred_class_label )
diag = tf.linalg.tensor_diag_part(cf_mat)
# Calculate the total number of data examples for each class
total_per_class = tf.reduce_sum(cf_mat, axis=1)
acc_per_class = diag / tf.maximum(1, total_per_class)
uar = non_nan_average(acc_per_class)
return uar
# load features and labels
devel_X_vgg = np.load(
"vgg_features\\x_devel_data_vgg.npy", allow_pickle=True
)
test_X_vgg = np.load(
"vgg_features\\x_test_data_vgg.npy", allow_pickle=True
)
train_X_vgg = np.load(
"vgg_features\\x_train_data_vgg.npy", allow_pickle=True
)
devel_X_hand = np.load(
"hand_features\\x_devel_data.npy", allow_pickle=True
)
test_X_hand = np.load(
"hand_features\\x_test_data.npy", allow_pickle=True
)
train_X_hand = np.load(
"hand_features\\x_train_data.npy", allow_pickle=True
)
devel_y = np.load(
"vgg_features\\y_devel_label_vgg.npy", allow_pickle=True
)
test_y = np.load(
"vgg_features\\y_test_label_vgg.npy", allow_pickle=True
)
train_y = np.load(
"vgg_features\\y_train_label_vgg.npy", allow_pickle=True
)
train_X_vgg = np.squeeze(train_X_vgg)
devel_X_vgg = np.squeeze(devel_X_vgg)
test_X_vgg = np.squeeze(test_X_vgg)
devel_X = np.concatenate(
(
devel_X_hand,
devel_X_vgg
),
axis=1,
)
test_X = np.concatenate(
(
test_X_hand,
test_X_vgg
),
axis=1,
)
train_X = np.concatenate(
(
train_X_hand,
train_X_vgg
),
axis=1,
)
X = np.append(train_X, devel_X, axis=0)
y = np.append(train_y, devel_y, axis=0)
print(X.shape)
x = X.reshape((X.shape[0], X.shape[1], 1))
x_train = train_X.reshape((train_X.shape[0], train_X.shape[1], 1))
x_test = test_X.reshape((test_X.shape[0], test_X.shape[1], 1))
devel_X = devel_X.reshape((devel_X.shape[0], devel_X.shape[1], 1))
print(x_train.shape)
n_classes = len(np.unique(y))
train_y[train_y == "positive"] = 1
train_y[train_y == "negative"] = 0
y[y == "positive"] = 1
y[y == "negative"] = 0
devel_y[devel_y == "positive"] = 1
devel_y[devel_y == "negative"] = 0
test_y[test_y == "positive"] = 1
test_y[test_y == "negative"] = 0
"""
## Build the model
Our model processes a tensor of shape `(batch size, sequence length, features)`,
where `sequence length` is the number of time steps and `features` is each input
timeseries.
You can replace your classification RNN layers with this one: the
inputs are fully compatible!
"""
from tensorflow import keras
from tensorflow.keras import layers
"""
We include residual connections, layer normalization, and dropout.
The resulting layer can be stacked multiple times.
The projection layers are implemented through `keras.layers.Conv1D`.
"""
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
# Attention and Normalization
x = layers.MultiHeadAttention(
key_dim=head_size, num_heads=num_heads, dropout=dropout
)(inputs, inputs)
x = layers.Dropout(dropout)(x)
x = layers.LayerNormalization(epsilon=1e-6)(x)
res = x + inputs
# Feed Forward Part
x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(res)
x = layers.Dropout(dropout)(x)
x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x)
x = layers.LayerNormalization(epsilon=1e-6)(x)
return x + res
"""
The main part of our model is now complete. We can stack multiple of those
`transformer_encoder` blocks and we can also proceed to add the final
Multi-Layer Perceptron classification head. Apart from a stack of `Dense`
layers, we need to reduce the output tensor of the `TransformerEncoder` part of
our model down to a vector of features for each data point in the current
batch. A common way to achieve this is to use a pooling layer. For
this example, a `GlobalAveragePooling1D` layer is sufficient.
"""
def build_model(
input_shape,
head_size,
num_heads,
ff_dim,
num_transformer_blocks,
mlp_units,
dropout=0,
mlp_dropout=0,
):
inputs = keras.Input(shape=input_shape)
x = inputs
for _ in range(num_transformer_blocks):
x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)
x = layers.GlobalAveragePooling1D(data_format="channels_first")(x)
for dim in mlp_units:
x = layers.Dense(dim, activation="relu")(x)
x = layers.Dropout(mlp_dropout)(x)
outputs = layers.Dense(n_classes, activation="softmax")(x)
return keras.Model(inputs, outputs)
"""
## Train and evaluate
"""
input_shape = x_train.shape[1:]
model = build_model(
input_shape,
head_size=256,
num_heads=4,
ff_dim=4,
num_transformer_blocks=4,
mlp_units=[128],
mlp_dropout=0.4,
dropout=0.25,
)
model.compile(
loss="sparse_categorical_crossentropy",
optimizer=keras.optimizers.Adam(learning_rate=1e-4),
metrics=[uar_accuracy],
)
model.summary()
callbacks = [keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)]
model.fit(
np.asarray(x_train).astype(np.float32),
np.asarray(train_y).astype(np.float32),
validation_split=0.2,
epochs=20,
batch_size=64,
callbacks=callbacks,
)
devel_y_pred = model.predict(np.asarray(devel_X).astype(np.float32), verbose=1)
devel_y_pred = devel_y_pred.argmax(axis=-1)
devel_y_pred = devel_y_pred.astype('bool')
devel_y = devel_y.astype('bool')
model.fit(
np.asarray(x).astype(np.float32),
np.asarray(y).astype(np.float32),
validation_split=0.2,
epochs=20,
batch_size=64,
callbacks=callbacks,
)
test_y_pred = model.predict(np.asarray(test_X).astype(np.float32), verbose=1)
test_y_pred = test_y_pred.argmax(axis=-1)
test_y_pred = test_y_pred.astype('bool')
test_y = test_y.astype('bool')
# devel metrics
print('DEVEL')
uar = recall_score(devel_y, devel_y_pred, average='macro')
cm = confusion_matrix(devel_y, devel_y_pred)
print(f'UAR: {uar}\n{classification_report(devel_y, devel_y_pred)}\n\nConfusion Matrix:\n\n{cm}')
# test metrics
print('TEST')
uar = recall_score(test_y, test_y_pred, average='macro')
cm = confusion_matrix(test_y, test_y_pred)
print(f'UAR: {uar}\n{classification_report(test_y, test_y_pred)}\n\nConfusion Matrix:\n\n{cm}')