import numpy as np from keras import backend as K from sklearn.metrics import classification_report, confusion_matrix, recall_score, make_scorer import tensorflow as tf def non_nan_average(x): # Computes the average of all elements that are not NaN in a rank 1 tensor nan_mask = tf.math.is_nan(x) x = tf.boolean_mask(x, tf.logical_not(nan_mask)) return K.mean(x) def uar_accuracy(y_true, y_pred): # Calculate the label from one-hot encoding pred_class_label = K.argmax(y_pred, axis=-1) true_class_label = K.argmax(y_true, axis=-1) cf_mat = tf.math.confusion_matrix(true_class_label, pred_class_label ) diag = tf.linalg.tensor_diag_part(cf_mat) # Calculate the total number of data examples for each class total_per_class = tf.reduce_sum(cf_mat, axis=1) acc_per_class = diag / tf.maximum(1, total_per_class) uar = non_nan_average(acc_per_class) return uar # load features and labels devel_X_vgg = np.load( "vgg_features\\x_devel_data_vgg.npy", allow_pickle=True ) test_X_vgg = np.load( "vgg_features\\x_test_data_vgg.npy", allow_pickle=True ) train_X_vgg = np.load( "vgg_features\\x_train_data_vgg.npy", allow_pickle=True ) devel_X_hand = np.load( "hand_features\\x_devel_data.npy", allow_pickle=True ) test_X_hand = np.load( "hand_features\\x_test_data.npy", allow_pickle=True ) train_X_hand = np.load( "hand_features\\x_train_data.npy", allow_pickle=True ) devel_y = np.load( "vgg_features\\y_devel_label_vgg.npy", allow_pickle=True ) test_y = np.load( "vgg_features\\y_test_label_vgg.npy", allow_pickle=True ) train_y = np.load( "vgg_features\\y_train_label_vgg.npy", allow_pickle=True ) train_X_vgg = np.squeeze(train_X_vgg) devel_X_vgg = np.squeeze(devel_X_vgg) test_X_vgg = np.squeeze(test_X_vgg) devel_X = np.concatenate( ( devel_X_hand, devel_X_vgg ), axis=1, ) test_X = np.concatenate( ( test_X_hand, test_X_vgg ), axis=1, ) train_X = np.concatenate( ( train_X_hand, train_X_vgg ), axis=1, ) X = np.append(train_X, devel_X, axis=0) y = np.append(train_y, devel_y, axis=0) print(X.shape) x = X.reshape((X.shape[0], X.shape[1], 1)) x_train = train_X.reshape((train_X.shape[0], train_X.shape[1], 1)) x_test = test_X.reshape((test_X.shape[0], test_X.shape[1], 1)) devel_X = devel_X.reshape((devel_X.shape[0], devel_X.shape[1], 1)) print(x_train.shape) n_classes = len(np.unique(y)) train_y[train_y == "positive"] = 1 train_y[train_y == "negative"] = 0 y[y == "positive"] = 1 y[y == "negative"] = 0 devel_y[devel_y == "positive"] = 1 devel_y[devel_y == "negative"] = 0 test_y[test_y == "positive"] = 1 test_y[test_y == "negative"] = 0 """ ## Build the model Our model processes a tensor of shape `(batch size, sequence length, features)`, where `sequence length` is the number of time steps and `features` is each input timeseries. You can replace your classification RNN layers with this one: the inputs are fully compatible! """ from tensorflow import keras from tensorflow.keras import layers """ We include residual connections, layer normalization, and dropout. The resulting layer can be stacked multiple times. The projection layers are implemented through `keras.layers.Conv1D`. """ def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0): # Attention and Normalization x = layers.MultiHeadAttention( key_dim=head_size, num_heads=num_heads, dropout=dropout )(inputs, inputs) x = layers.Dropout(dropout)(x) x = layers.LayerNormalization(epsilon=1e-6)(x) res = x + inputs # Feed Forward Part x = layers.Conv1D(filters=ff_dim, kernel_size=1, activation="relu")(res) x = layers.Dropout(dropout)(x) x = layers.Conv1D(filters=inputs.shape[-1], kernel_size=1)(x) x = layers.LayerNormalization(epsilon=1e-6)(x) return x + res """ The main part of our model is now complete. We can stack multiple of those `transformer_encoder` blocks and we can also proceed to add the final Multi-Layer Perceptron classification head. Apart from a stack of `Dense` layers, we need to reduce the output tensor of the `TransformerEncoder` part of our model down to a vector of features for each data point in the current batch. A common way to achieve this is to use a pooling layer. For this example, a `GlobalAveragePooling1D` layer is sufficient. """ def build_model( input_shape, head_size, num_heads, ff_dim, num_transformer_blocks, mlp_units, dropout=0, mlp_dropout=0, ): inputs = keras.Input(shape=input_shape) x = inputs for _ in range(num_transformer_blocks): x = transformer_encoder(x, head_size, num_heads, ff_dim, dropout) x = layers.GlobalAveragePooling1D(data_format="channels_first")(x) for dim in mlp_units: x = layers.Dense(dim, activation="relu")(x) x = layers.Dropout(mlp_dropout)(x) outputs = layers.Dense(n_classes, activation="softmax")(x) return keras.Model(inputs, outputs) """ ## Train and evaluate """ input_shape = x_train.shape[1:] model = build_model( input_shape, head_size=256, num_heads=4, ff_dim=4, num_transformer_blocks=4, mlp_units=[128], mlp_dropout=0.4, dropout=0.25, ) model.compile( loss="sparse_categorical_crossentropy", optimizer=keras.optimizers.Adam(learning_rate=1e-4), metrics=[uar_accuracy], ) model.summary() callbacks = [keras.callbacks.EarlyStopping(patience=10, restore_best_weights=True)] model.fit( np.asarray(x_train).astype(np.float32), np.asarray(train_y).astype(np.float32), validation_split=0.2, epochs=20, batch_size=64, callbacks=callbacks, ) devel_y_pred = model.predict(np.asarray(devel_X).astype(np.float32), verbose=1) devel_y_pred = devel_y_pred.argmax(axis=-1) devel_y_pred = devel_y_pred.astype('bool') devel_y = devel_y.astype('bool') model.fit( np.asarray(x).astype(np.float32), np.asarray(y).astype(np.float32), validation_split=0.2, epochs=20, batch_size=64, callbacks=callbacks, ) test_y_pred = model.predict(np.asarray(test_X).astype(np.float32), verbose=1) test_y_pred = test_y_pred.argmax(axis=-1) test_y_pred = test_y_pred.astype('bool') test_y = test_y.astype('bool') # devel metrics print('DEVEL') uar = recall_score(devel_y, devel_y_pred, average='macro') cm = confusion_matrix(devel_y, devel_y_pred) print(f'UAR: {uar}\n{classification_report(devel_y, devel_y_pred)}\n\nConfusion Matrix:\n\n{cm}') # test metrics print('TEST') uar = recall_score(test_y, test_y_pred, average='macro') cm = confusion_matrix(test_y, test_y_pred) print(f'UAR: {uar}\n{classification_report(test_y, test_y_pred)}\n\nConfusion Matrix:\n\n{cm}')