diff --git a/transformer.py b/transformer.py new file mode 100644 index 0000000..dafd469 --- /dev/null +++ b/transformer.py @@ -0,0 +1,258 @@ +import numpy as np +from keras import backend as K +from sklearn.metrics import classification_report, confusion_matrix, recall_score, make_scorer +import tensorflow as tf + +def non_nan_average(x): + # Computes the average of all elements that are not NaN in a rank 1 tensor + nan_mask = tf.math.is_nan(x) + x = tf.boolean_mask(x, tf.logical_not(nan_mask)) + return K.mean(x) + + +def uar_accuracy(y_true, y_pred): + # Calculate the label from one-hot encoding + pred_class_label = K.argmax(y_pred, axis=-1) + true_class_label = K.argmax(y_true, axis=-1) + + cf_mat = tf.math.confusion_matrix(true_class_label, pred_class_label ) + + diag = tf.linalg.tensor_diag_part(cf_mat) + + # Calculate the total number of data examples for each class + total_per_class = tf.reduce_sum(cf_mat, axis=1) + + acc_per_class = diag / tf.maximum(1, total_per_class) + uar = non_nan_average(acc_per_class) + + return uar + +# load features and labels +devel_X_vgg = np.load( + "vgg_features\\x_devel_data_vgg.npy", allow_pickle=True +) + +test_X_vgg = np.load( + "vgg_features\\x_test_data_vgg.npy", allow_pickle=True +) + +train_X_vgg = np.load( + "vgg_features\\x_train_data_vgg.npy", allow_pickle=True +) + +devel_X_hand = np.load( + "hand_features\\x_devel_data.npy", allow_pickle=True +) + +test_X_hand = np.load( + "hand_features\\x_test_data.npy", allow_pickle=True +) + +train_X_hand = np.load( + "hand_features\\x_train_data.npy", allow_pickle=True +) + +devel_y = np.load( + "vgg_features\\y_devel_label_vgg.npy", allow_pickle=True +) + +test_y = np.load( + "vgg_features\\y_test_label_vgg.npy", allow_pickle=True +) + +train_y = np.load( + "vgg_features\\y_train_label_vgg.npy", allow_pickle=True +) + +train_X_vgg = np.squeeze(train_X_vgg) +devel_X_vgg = np.squeeze(devel_X_vgg) +test_X_vgg = np.squeeze(test_X_vgg) + +devel_X = np.concatenate( + ( + devel_X_hand, + devel_X_vgg + ), + axis=1, +) + +test_X = np.concatenate( + ( + test_X_hand, + test_X_vgg + ), + axis=1, +) + +train_X = np.concatenate( + ( + train_X_hand, + train_X_vgg + ), + axis=1, +) + +X = np.append(train_X, devel_X, axis=0) +y = np.append(train_y, devel_y, axis=0) + +print(X.shape) + +x = X.reshape((X.shape[0], X.shape[1], 1)) +x_train = train_X.reshape((train_X.shape[0], train_X.shape[1], 1)) +x_test = test_X.reshape((test_X.shape[0], test_X.shape[1], 1)) +devel_X = devel_X.reshape((devel_X.shape[0], devel_X.shape[1], 1)) + +print(x_train.shape) + +n_classes = len(np.unique(y)) + +train_y[train_y == "positive"] = 1 +train_y[train_y == "negative"] = 0 + +y[y == "positive"] = 1 +y[y == "negative"] = 0 + +devel_y[devel_y == "positive"] = 1 +devel_y[devel_y == "negative"] = 0 + +test_y[test_y == "positive"] = 1 +test_y[test_y == "negative"] = 0 + +""" +## Build the model +Our model processes a tensor of shape `(batch size, sequence length, features)`, +where `sequence length` is the number of time steps and `features` is each input +timeseries. +You can replace your classification RNN layers with this one: the +inputs are fully compatible! +""" + +from tensorflow import keras +from tensorflow.keras import layers + +""" +We include residual connections, layer normalization, and dropout. +The resulting layer can be stacked multiple times. +The projection layers are implemented through `keras.layers.Conv1D`. +""" + + +def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0): + # Attention and Normalization + x = layers.MultiHeadAttention( + key_dim=head_size, num_heads=num_heads, dropout=dropout + )(inputs, inputs) + x = layers.Dropout(dropout)(x) + x = layers.LayerNormalization(epsilon=1e-6)(x) + res = x + inputs + + # Feed Forward Part + x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(res) + x = layers.Dropout(dropout)(x) + x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x) + x = layers.LayerNormalization(epsilon=1e-6)(x) + return x + res + + +""" +The main part of our model is now complete. We can stack multiple of those +`transformer_encoder` blocks and we can also proceed to add the final +Multi-Layer Perceptron classification head. Apart from a stack of `Dense` +layers, we need to reduce the output tensor of the `TransformerEncoder` part of +our model down to a vector of features for each data point in the current +batch. A common way to achieve this is to use a pooling layer. For +this example, a `GlobalAveragePooling1D` layer is sufficient. +""" + + +def build_model( + input_shape, + head_size, + num_heads, + ff_dim, + num_transformer_blocks, + mlp_units, + dropout=0, + mlp_dropout=0, +): + inputs = keras.Input(shape=input_shape) + x = inputs + for _ in range(num_transformer_blocks): + x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout) + + x = layers.GlobalAveragePooling1D(data_format="channels_first")(x) + for dim in mlp_units: + x = layers.Dense(dim, activation="relu")(x) + x = layers.Dropout(mlp_dropout)(x) + outputs = layers.Dense(n_classes, activation="softmax")(x) + return keras.Model(inputs, outputs) + + +""" +## Train and evaluate +""" + +input_shape = x_train.shape[1:] + +model = build_model( + input_shape, + head_size=256, + num_heads=4, + ff_dim=4, + num_transformer_blocks=4, + mlp_units=[128], + mlp_dropout=0.4, + dropout=0.25, +) + +model.compile( + loss="sparse_categorical_crossentropy", + optimizer=keras.optimizers.Adam(learning_rate=1e-4), + metrics=[uar_accuracy], +) + +model.summary() + +callbacks = [keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)] + +model.fit( + np.asarray(x_train).astype(np.float32), + np.asarray(train_y).astype(np.float32), + validation_split=0.2, + epochs=20, + batch_size=64, + callbacks=callbacks, +) + +devel_y_pred = model.predict(np.asarray(devel_X).astype(np.float32), verbose=1) +devel_y_pred = devel_y_pred.argmax(axis=-1) + +devel_y_pred = devel_y_pred.astype('bool') +devel_y = devel_y.astype('bool') + +model.fit( + np.asarray(x).astype(np.float32), + np.asarray(y).astype(np.float32), + validation_split=0.2, + epochs=20, + batch_size=64, + callbacks=callbacks, +) + +test_y_pred = model.predict(np.asarray(test_X).astype(np.float32), verbose=1) +test_y_pred = test_y_pred.argmax(axis=-1) + +test_y_pred = test_y_pred.astype('bool') +test_y = test_y.astype('bool') + +# devel metrics +print('DEVEL') +uar = recall_score(devel_y, devel_y_pred, average='macro') +cm = confusion_matrix(devel_y, devel_y_pred) +print(f'UAR: {uar}\n{classification_report(devel_y, devel_y_pred)}\n\nConfusion Matrix:\n\n{cm}') + +# test metrics +print('TEST') +uar = recall_score(test_y, test_y_pred, average='macro') +cm = confusion_matrix(test_y, test_y_pred) +print(f'UAR: {uar}\n{classification_report(test_y, test_y_pred)}\n\nConfusion Matrix:\n\n{cm}') \ No newline at end of file