258 lines
6.6 KiB
Python
258 lines
6.6 KiB
Python
import numpy as np
|
|
from keras import backend as K
|
|
from sklearn.metrics import classification_report, confusion_matrix, recall_score, make_scorer
|
|
import tensorflow as tf
|
|
|
|
def non_nan_average(x):
|
|
# Computes the average of all elements that are not NaN in a rank 1 tensor
|
|
nan_mask = tf.math.is_nan(x)
|
|
x = tf.boolean_mask(x, tf.logical_not(nan_mask))
|
|
return K.mean(x)
|
|
|
|
|
|
def uar_accuracy(y_true, y_pred):
|
|
# Calculate the label from one-hot encoding
|
|
pred_class_label = K.argmax(y_pred, axis=-1)
|
|
true_class_label = K.argmax(y_true, axis=-1)
|
|
|
|
cf_mat = tf.math.confusion_matrix(true_class_label, pred_class_label )
|
|
|
|
diag = tf.linalg.tensor_diag_part(cf_mat)
|
|
|
|
# Calculate the total number of data examples for each class
|
|
total_per_class = tf.reduce_sum(cf_mat, axis=1)
|
|
|
|
acc_per_class = diag / tf.maximum(1, total_per_class)
|
|
uar = non_nan_average(acc_per_class)
|
|
|
|
return uar
|
|
|
|
# load features and labels
|
|
devel_X_vgg = np.load(
|
|
"vgg_features\\x_devel_data_vgg.npy", allow_pickle=True
|
|
)
|
|
|
|
test_X_vgg = np.load(
|
|
"vgg_features\\x_test_data_vgg.npy", allow_pickle=True
|
|
)
|
|
|
|
train_X_vgg = np.load(
|
|
"vgg_features\\x_train_data_vgg.npy", allow_pickle=True
|
|
)
|
|
|
|
devel_X_hand = np.load(
|
|
"hand_features\\x_devel_data.npy", allow_pickle=True
|
|
)
|
|
|
|
test_X_hand = np.load(
|
|
"hand_features\\x_test_data.npy", allow_pickle=True
|
|
)
|
|
|
|
train_X_hand = np.load(
|
|
"hand_features\\x_train_data.npy", allow_pickle=True
|
|
)
|
|
|
|
devel_y = np.load(
|
|
"vgg_features\\y_devel_label_vgg.npy", allow_pickle=True
|
|
)
|
|
|
|
test_y = np.load(
|
|
"vgg_features\\y_test_label_vgg.npy", allow_pickle=True
|
|
)
|
|
|
|
train_y = np.load(
|
|
"vgg_features\\y_train_label_vgg.npy", allow_pickle=True
|
|
)
|
|
|
|
train_X_vgg = np.squeeze(train_X_vgg)
|
|
devel_X_vgg = np.squeeze(devel_X_vgg)
|
|
test_X_vgg = np.squeeze(test_X_vgg)
|
|
|
|
devel_X = np.concatenate(
|
|
(
|
|
devel_X_hand,
|
|
devel_X_vgg
|
|
),
|
|
axis=1,
|
|
)
|
|
|
|
test_X = np.concatenate(
|
|
(
|
|
test_X_hand,
|
|
test_X_vgg
|
|
),
|
|
axis=1,
|
|
)
|
|
|
|
train_X = np.concatenate(
|
|
(
|
|
train_X_hand,
|
|
train_X_vgg
|
|
),
|
|
axis=1,
|
|
)
|
|
|
|
X = np.append(train_X, devel_X, axis=0)
|
|
y = np.append(train_y, devel_y, axis=0)
|
|
|
|
print(X.shape)
|
|
|
|
x = X.reshape((X.shape[0], X.shape[1], 1))
|
|
x_train = train_X.reshape((train_X.shape[0], train_X.shape[1], 1))
|
|
x_test = test_X.reshape((test_X.shape[0], test_X.shape[1], 1))
|
|
devel_X = devel_X.reshape((devel_X.shape[0], devel_X.shape[1], 1))
|
|
|
|
print(x_train.shape)
|
|
|
|
n_classes = len(np.unique(y))
|
|
|
|
train_y[train_y == "positive"] = 1
|
|
train_y[train_y == "negative"] = 0
|
|
|
|
y[y == "positive"] = 1
|
|
y[y == "negative"] = 0
|
|
|
|
devel_y[devel_y == "positive"] = 1
|
|
devel_y[devel_y == "negative"] = 0
|
|
|
|
test_y[test_y == "positive"] = 1
|
|
test_y[test_y == "negative"] = 0
|
|
|
|
"""
|
|
## Build the model
|
|
Our model processes a tensor of shape `(batch size, sequence length, features)`,
|
|
where `sequence length` is the number of time steps and `features` is each input
|
|
timeseries.
|
|
You can replace your classification RNN layers with this one: the
|
|
inputs are fully compatible!
|
|
"""
|
|
|
|
from tensorflow import keras
|
|
from tensorflow.keras import layers
|
|
|
|
"""
|
|
We include residual connections, layer normalization, and dropout.
|
|
The resulting layer can be stacked multiple times.
|
|
The projection layers are implemented through `keras.layers.Conv1D`.
|
|
"""
|
|
|
|
|
|
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
|
|
# Attention and Normalization
|
|
x = layers.MultiHeadAttention(
|
|
key_dim=head_size, num_heads=num_heads, dropout=dropout
|
|
)(inputs, inputs)
|
|
x = layers.Dropout(dropout)(x)
|
|
x = layers.LayerNormalization(epsilon=1e-6)(x)
|
|
res = x + inputs
|
|
|
|
# Feed Forward Part
|
|
x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(res)
|
|
x = layers.Dropout(dropout)(x)
|
|
x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x)
|
|
x = layers.LayerNormalization(epsilon=1e-6)(x)
|
|
return x + res
|
|
|
|
|
|
"""
|
|
The main part of our model is now complete. We can stack multiple of those
|
|
`transformer_encoder` blocks and we can also proceed to add the final
|
|
Multi-Layer Perceptron classification head. Apart from a stack of `Dense`
|
|
layers, we need to reduce the output tensor of the `TransformerEncoder` part of
|
|
our model down to a vector of features for each data point in the current
|
|
batch. A common way to achieve this is to use a pooling layer. For
|
|
this example, a `GlobalAveragePooling1D` layer is sufficient.
|
|
"""
|
|
|
|
|
|
def build_model(
|
|
input_shape,
|
|
head_size,
|
|
num_heads,
|
|
ff_dim,
|
|
num_transformer_blocks,
|
|
mlp_units,
|
|
dropout=0,
|
|
mlp_dropout=0,
|
|
):
|
|
inputs = keras.Input(shape=input_shape)
|
|
x = inputs
|
|
for _ in range(num_transformer_blocks):
|
|
x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout)
|
|
|
|
x = layers.GlobalAveragePooling1D(data_format="channels_first")(x)
|
|
for dim in mlp_units:
|
|
x = layers.Dense(dim, activation="relu")(x)
|
|
x = layers.Dropout(mlp_dropout)(x)
|
|
outputs = layers.Dense(n_classes, activation="softmax")(x)
|
|
return keras.Model(inputs, outputs)
|
|
|
|
|
|
"""
|
|
## Train and evaluate
|
|
"""
|
|
|
|
input_shape = x_train.shape[1:]
|
|
|
|
model = build_model(
|
|
input_shape,
|
|
head_size=256,
|
|
num_heads=4,
|
|
ff_dim=4,
|
|
num_transformer_blocks=4,
|
|
mlp_units=[128],
|
|
mlp_dropout=0.4,
|
|
dropout=0.25,
|
|
)
|
|
|
|
model.compile(
|
|
loss="sparse_categorical_crossentropy",
|
|
optimizer=keras.optimizers.Adam(learning_rate=1e-4),
|
|
metrics=[uar_accuracy],
|
|
)
|
|
|
|
model.summary()
|
|
|
|
callbacks = [keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)]
|
|
|
|
model.fit(
|
|
np.asarray(x_train).astype(np.float32),
|
|
np.asarray(train_y).astype(np.float32),
|
|
validation_split=0.2,
|
|
epochs=20,
|
|
batch_size=64,
|
|
callbacks=callbacks,
|
|
)
|
|
|
|
devel_y_pred = model.predict(np.asarray(devel_X).astype(np.float32), verbose=1)
|
|
devel_y_pred = devel_y_pred.argmax(axis=-1)
|
|
|
|
devel_y_pred = devel_y_pred.astype('bool')
|
|
devel_y = devel_y.astype('bool')
|
|
|
|
model.fit(
|
|
np.asarray(x).astype(np.float32),
|
|
np.asarray(y).astype(np.float32),
|
|
validation_split=0.2,
|
|
epochs=20,
|
|
batch_size=64,
|
|
callbacks=callbacks,
|
|
)
|
|
|
|
test_y_pred = model.predict(np.asarray(test_X).astype(np.float32), verbose=1)
|
|
test_y_pred = test_y_pred.argmax(axis=-1)
|
|
|
|
test_y_pred = test_y_pred.astype('bool')
|
|
test_y = test_y.astype('bool')
|
|
|
|
# devel metrics
|
|
print('DEVEL')
|
|
uar = recall_score(devel_y, devel_y_pred, average='macro')
|
|
cm = confusion_matrix(devel_y, devel_y_pred)
|
|
print(f'UAR: {uar}\n{classification_report(devel_y, devel_y_pred)}\n\nConfusion Matrix:\n\n{cm}')
|
|
|
|
# test metrics
|
|
print('TEST')
|
|
uar = recall_score(test_y, test_y_pred, average='macro')
|
|
cm = confusion_matrix(test_y, test_y_pred)
|
|
print(f'UAR: {uar}\n{classification_report(test_y, test_y_pred)}\n\nConfusion Matrix:\n\n{cm}') |