Nahrát soubory do „few_shot“

This commit is contained in:
Tetiana Mohorian 2025-01-30 21:00:20 +00:00
parent 79a188b193
commit c991f2d5b9
8 changed files with 1646 additions and 180 deletions

View File

@ -0,0 +1,212 @@
import sys
import codecs
from datasets import load_dataset, concatenate_datasets
import numpy as np
import torch
from sklearn.metrics import precision_recall_fscore_support,precision_recall_curve
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
Trainer,
TrainingArguments,
AutoModelForSeq2SeqLM,
set_seed,
T5Tokenizer
)
from lm_eval import evaluate
# Set seed for reproducibility
set_seed(42)
# Load and preprocess data
ds = load_dataset("TUKE-KEMT/hate_speech_slovak")
label_0 = ds['train'].filter(lambda example: example['label'] == 0)
label_1 = ds['train'].filter(lambda example: example['label'] == 1)
# Create stratified few-shot splits
def create_stratified_split(label_0, label_1, n_samples, seed=42):
few_shot_0 = label_0.shuffle(seed=seed).select(range(n_samples))
few_shot_1 = label_1.shuffle(seed=seed).select(range(n_samples))
return concatenate_datasets([few_shot_0, few_shot_1]).shuffle(seed=seed)
# Create train/val/test splits
train_dataset = create_stratified_split(label_0, label_1, n_samples=40)
val_dataset = create_stratified_split(label_0, label_1, n_samples=10, seed=43)
test_dataset = create_stratified_split(label_0, label_1, n_samples=50, seed=44)
# Initialize tokenizer and model
tokenizer = AutoTokenizer.from_pretrained("hlillemark/mt5-1B-flores200-baseline", force_download=True)
model = AutoModelForSequenceClassification.from_pretrained("hlillemark/mt5-1B-flores200-baseline", num_labels=2)
# Tokenization function with padding
def tokenize(batch):
return tokenizer(
batch["text"],
padding="max_length",
truncation=True,
max_length=256
)
# Prepare datasets
def prepare_dataset(dataset):
dataset = dataset.map(tokenize, batched=True, remove_columns=["text"])
print(dataset[0])
return dataset.rename_column("label", "labels")
train_dataset = prepare_dataset(train_dataset)
val_dataset = prepare_dataset(val_dataset)
test_dataset = prepare_dataset(test_dataset)
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
# Training arguments with improved settings
training_args = TrainingArguments(
output_dir="./hate_speech_model",
per_device_train_batch_size=8,
per_device_eval_batch_size=16,
learning_rate=3e-5, # Adjust as needed
num_train_epochs=7, # Increased epochs for better training
eval_strategy="epoch", # Use "epoch" for both strategies
save_strategy="epoch", # Matching the evaluation strategy
load_best_model_at_end=True,
metric_for_best_model="f1",
greater_is_better=True,
warmup_steps=100, # Increased warmup steps
weight_decay=0.01,
report_to="none",
seed=42,
logging_steps=10,
gradient_accumulation_steps=2, # For more effective training on larger datasets
lr_scheduler_type="cosine", # Using cosine scheduler for learning rate
logging_dir='./logs',
)
# Custom metrics computation
def compute_metrics(pred):
logits = pred.predictions[0] # Ensure only the logits are used
preds = logits.argmax(-1)
labels = pred.label_ids
precision, recall, f1, _ = precision_recall_fscore_support(
labels,
preds,
average='binary'
)
return {
'precision': precision,
'recall': recall,
'f1': f1
}
# Initialize trainer with validation data and metrics
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
compute_metrics=compute_metrics,
)
# Train model
trainer.train()
# Evaluate on test set
def find_optimal_threshold(trainer, dataset):
# Get predictions
predictions = trainer.predict(dataset)
# Extract logits (the first element in the predictions tuple)
logits = predictions.predictions # This is likely a tuple (logits, other_info)
# If logits is a tuple, extract only the logits
if isinstance(logits, tuple):
logits = logits[0] # Extract the logits from the tuple
# Check the shape of logits to debug the issue
print(f"Logits shape: {logits.shape}")
# Ensure logits has the shape (batch_size, 2) for binary classification
if logits.shape[-1] != 2:
logits = logits[:, :2] # Take only the first two columns (logits for the two classes)
print(f"Logits shape after slicing: {logits.shape}")
# Convert logits to tensor if necessary
if not isinstance(logits, torch.Tensor):
logits = torch.tensor(logits) # Convert logits to a tensor if needed
# Apply softmax to get probabilities
probs = torch.nn.functional.softmax(logits, dim=-1)
# Get probabilities for the positive class (label=1)
positive_probs = probs[:, 1].numpy() # The probabilities for the positive class (label=1)
# Get true labels from predictions
true_labels = predictions.label_ids
# Calculate precision-recall curve
precisions, recalls, thresholds = precision_recall_curve(true_labels, positive_probs)
f1_scores = 2 * (precisions * recalls) / (precisions + recalls + 1e-8)
# Find the optimal threshold based on F1-score
optimal_idx = np.argmax(f1_scores[:-1]) # Exclude last threshold (it is always 1)
optimal_threshold = thresholds[optimal_idx]
return optimal_threshold, precisions[optimal_idx], recalls[optimal_idx], f1_scores[optimal_idx]
def evaluate_with_threshold(trainer, dataset, threshold=0.5):
predictions = trainer.predict(dataset)
# Ensure that logits are properly reshaped to (batch_size, 2) before applying softmax
logits = predictions.predictions
if isinstance(logits, tuple):
logits = logits[0] # Extract logits if it's a tuple
logits = torch.tensor(logits) if not isinstance(logits, torch.Tensor) else logits
# Apply softmax to get probabilities
probs = torch.nn.functional.softmax(logits, dim=-1)
# Get predicted labels based on the threshold for the positive class (label=1)
predicted_labels = (probs[:, 1] > threshold).numpy().astype(int)
true_labels = predictions.label_ids
# Calculate precision-recall-fscore
precision, recall, f1, _ = precision_recall_fscore_support(
true_labels,
predicted_labels,
average='binary',
zero_division=0
)
return {
'precision': precision,
'recall': recall,
'f1': f1
}
# Example usage:
# Find the optimal threshold using validation data
print("\nFinding optimal threshold...")
optimal_threshold, best_precision, best_recall, best_f1 = find_optimal_threshold(trainer, val_dataset)
print(f"Optimal threshold: {optimal_threshold:.4f}")
# Evaluate with optimal threshold
print("\nEvaluating with optimal threshold:")
optimized_results = evaluate_with_threshold(trainer, test_dataset, threshold=optimal_threshold)
print(f"Precision: {optimized_results['precision']:.4f}")
print(f"Recall: {optimized_results['recall']:.4f}")
print(f"F1: {optimized_results['f1']:.4f}")
# Save the model
#trainer.save_model("./hate_speech_model/best_model")
del model
torch.cuda.empty_cache()

View File

@ -0,0 +1,208 @@
import sys
import codecs
from datasets import load_dataset, concatenate_datasets
import numpy as np
import torch
from sklearn.metrics import precision_recall_fscore_support,precision_recall_curve
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
Trainer,
TrainingArguments,
AutoModelForSeq2SeqLM,
set_seed,
T5Tokenizer
)
# Set seed for reproducibility
set_seed(42)
# Load and preprocess data
ds = load_dataset("TUKE-KEMT/hate_speech_slovak")
label_0 = ds['train'].filter(lambda example: example['label'] == 0)
label_1 = ds['train'].filter(lambda example: example['label'] == 1)
# Create stratified few-shot splits
def create_stratified_split(label_0, label_1, n_samples, seed=42):
few_shot_0 = label_0.shuffle(seed=seed).select(range(n_samples))
few_shot_1 = label_1.shuffle(seed=seed).select(range(n_samples))
return concatenate_datasets([few_shot_0, few_shot_1]).shuffle(seed=seed)
# Create train/val/test splits
train_dataset = create_stratified_split(label_0, label_1, n_samples=40)
val_dataset = create_stratified_split(label_0, label_1, n_samples=10, seed=43)
test_dataset = create_stratified_split(label_0, label_1, n_samples=50, seed=44)
# Initialize tokenizer and model
tokenizer = AutoTokenizer.from_pretrained("unicamp-dl/mt5-3B-mmarco-en-pt", force_download=True)
model = AutoModelForSequenceClassification.from_pretrained("unicamp-dl/mt5-3B-mmarco-en-pt", num_labels=2)
# Tokenization function with padding
def tokenize(batch):
return tokenizer(
batch["text"],
padding="max_length",
truncation=True,
max_length=256
)
# Prepare datasets
def prepare_dataset(dataset):
dataset = dataset.map(tokenize, batched=True, remove_columns=["text"])
print(dataset[0])
return dataset.rename_column("label", "labels")
train_dataset = prepare_dataset(train_dataset)
val_dataset = prepare_dataset(val_dataset)
test_dataset = prepare_dataset(test_dataset)
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
# Training arguments with improved settings
training_args = TrainingArguments(
output_dir="./hate_speech_model",
per_device_train_batch_size=8,
per_device_eval_batch_size=16,
learning_rate=3e-5, # Adjust as needed
num_train_epochs=7, # Increased epochs for better training
eval_strategy="epoch", # Use "epoch" for both strategies
save_strategy="epoch", # Matching the evaluation strategy
load_best_model_at_end=True,
metric_for_best_model="f1",
greater_is_better=True,
warmup_steps=100, # Increased warmup steps
weight_decay=0.01,
report_to="none",
seed=42,
logging_steps=10,
gradient_accumulation_steps=2, # For more effective training on larger datasets
lr_scheduler_type="cosine", # Using cosine scheduler for learning rate
logging_dir='./logs',
)
# Custom metrics computation
def compute_metrics(pred):
logits = pred.predictions[0] # Ensure only the logits are used
preds = logits.argmax(-1)
labels = pred.label_ids
precision, recall, f1, _ = precision_recall_fscore_support(
labels,
preds,
average='binary'
)
return {
'precision': precision,
'recall': recall,
'f1': f1
}
# Initialize trainer with validation data and metrics
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
compute_metrics=compute_metrics,
)
# Train model
trainer.train()
# Evaluate on test set
def find_optimal_threshold(trainer, dataset):
# Get predictions
predictions = trainer.predict(dataset)
# Extract logits (the first element in the predictions tuple)
logits = predictions.predictions # This is likely a tuple (logits, other_info)
# If logits is a tuple, extract only the logits
if isinstance(logits, tuple):
logits = logits[0] # Extract the logits from the tuple
# Check the shape of logits to debug the issue
print(f"Logits shape: {logits.shape}")
# Ensure logits has the shape (batch_size, 2) for binary classification
if logits.shape[-1] != 2:
logits = logits[:, :2] # Take only the first two columns (logits for the two classes)
print(f"Logits shape after slicing: {logits.shape}")
# Convert logits to tensor if necessary
if not isinstance(logits, torch.Tensor):
logits = torch.tensor(logits) # Convert logits to a tensor if needed
# Apply softmax to get probabilities
probs = torch.nn.functional.softmax(logits, dim=-1)
# Get probabilities for the positive class (label=1)
positive_probs = probs[:, 1].numpy() # The probabilities for the positive class (label=1)
# Get true labels from predictions
true_labels = predictions.label_ids
# Calculate precision-recall curve
precisions, recalls, thresholds = precision_recall_curve(true_labels, positive_probs)
f1_scores = 2 * (precisions * recalls) / (precisions + recalls + 1e-8)
# Find the optimal threshold based on F1-score
optimal_idx = np.argmax(f1_scores[:-1]) # Exclude last threshold (it is always 1)
optimal_threshold = thresholds[optimal_idx]
return optimal_threshold, precisions[optimal_idx], recalls[optimal_idx], f1_scores[optimal_idx]
def evaluate_with_threshold(trainer, dataset, threshold=0.5):
predictions = trainer.predict(dataset)
# Ensure that logits are properly reshaped to (batch_size, 2) before applying softmax
logits = predictions.predictions
if isinstance(logits, tuple):
logits = logits[0] # Extract logits if it's a tuple
logits = torch.tensor(logits) if not isinstance(logits, torch.Tensor) else logits
# Apply softmax to get probabilities
probs = torch.nn.functional.softmax(logits, dim=-1)
# Get predicted labels based on the threshold for the positive class (label=1)
predicted_labels = (probs[:, 1] > threshold).numpy().astype(int)
true_labels = predictions.label_ids
# Calculate precision-recall-fscore
precision, recall, f1, _ = precision_recall_fscore_support(
true_labels,
predicted_labels,
average='binary',
zero_division=0
)
return {
'precision': precision,
'recall': recall,
'f1': f1
}
# Example usage:
# Find the optimal threshold using validation data
print("\nFinding optimal threshold...")
optimal_threshold, best_precision, best_recall, best_f1 = find_optimal_threshold(trainer, val_dataset)
print(f"Optimal threshold: {optimal_threshold:.4f}")
# Evaluate with optimal threshold
print("\nEvaluating with optimal threshold:")
optimized_results = evaluate_with_threshold(trainer, test_dataset, threshold=optimal_threshold)
print(f"Precision: {optimized_results['precision']:.4f}")
print(f"Recall: {optimized_results['recall']:.4f}")
print(f"F1: {optimized_results['f1']:.4f}")
# Save the model
#trainer.save_model("./hate_speech_model/best_model")

View File

@ -0,0 +1,210 @@
import sys
import codecs
from datasets import load_dataset, concatenate_datasets
import numpy as np
import torch
from sklearn.metrics import precision_recall_fscore_support,precision_recall_curve
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
Trainer,
TrainingArguments,
AutoModelForSeq2SeqLM,
set_seed,
T5Tokenizer
)
# Set seed for reproducibility
set_seed(42)
# Load and preprocess data
ds = load_dataset("TUKE-KEMT/hate_speech_slovak")
label_0 = ds['train'].filter(lambda example: example['label'] == 0)
label_1 = ds['train'].filter(lambda example: example['label'] == 1)
# Create stratified few-shot splits
def create_stratified_split(label_0, label_1, n_samples, seed=42):
few_shot_0 = label_0.shuffle(seed=seed).select(range(n_samples))
few_shot_1 = label_1.shuffle(seed=seed).select(range(n_samples))
return concatenate_datasets([few_shot_0, few_shot_1]).shuffle(seed=seed)
# Create train/val/test splits
train_dataset = create_stratified_split(label_0, label_1, n_samples=40)
val_dataset = create_stratified_split(label_0, label_1, n_samples=10, seed=43)
test_dataset = create_stratified_split(label_0, label_1, n_samples=50, seed=44)
# Initialize tokenizer and model
tokenizer = AutoTokenizer.from_pretrained("google/mt5-base", force_download=True)
model = AutoModelForSequenceClassification.from_pretrained("google/mt5-base", num_labels=2)
# Tokenization function with padding
def tokenize(batch):
return tokenizer(
batch["text"],
padding="max_length",
truncation=True,
max_length=128
)
# Prepare datasets
def prepare_dataset(dataset):
dataset = dataset.map(tokenize, batched=True, remove_columns=["text"])
print(dataset[0])
return dataset.rename_column("label", "labels")
train_dataset = prepare_dataset(train_dataset)
val_dataset = prepare_dataset(val_dataset)
test_dataset = prepare_dataset(test_dataset)
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
# Training arguments with improved settings
training_args = TrainingArguments(
output_dir="./hate_speech_model",
per_device_train_batch_size=8,
per_device_eval_batch_size=16,
learning_rate=3e-5, # Adjust as needed
num_train_epochs=7, # Increased epochs for better training
eval_strategy="epoch", # Use "epoch" for both strategies
save_strategy="epoch", # Matching the evaluation strategy
load_best_model_at_end=True,
metric_for_best_model="f1",
greater_is_better=True,
warmup_steps=100, # Increased warmup steps
weight_decay=0.01,
report_to="none",
seed=42,
logging_steps=10,
gradient_accumulation_steps=2, # For more effective training on larger datasets
lr_scheduler_type="cosine", # Using cosine scheduler for learning rate
logging_dir='./logs',
)
# Custom metrics computation
def compute_metrics(pred):
logits = pred.predictions[0] # Ensure only the logits are used
preds = logits.argmax(-1)
labels = pred.label_ids
precision, recall, f1, _ = precision_recall_fscore_support(
labels,
preds,
average='binary'
)
return {
'precision': precision,
'recall': recall,
'f1': f1
}
# Initialize trainer with validation data and metrics
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
compute_metrics=compute_metrics,
)
# Train model
trainer.train()
# Evaluate on test set
def find_optimal_threshold(trainer, dataset):
# Get predictions
predictions = trainer.predict(dataset)
# Extract logits (the first element in the predictions tuple)
logits = predictions.predictions # This is likely a tuple (logits, other_info)
# If logits is a tuple, extract only the logits
if isinstance(logits, tuple):
logits = logits[0] # Extract the logits from the tuple
# Check the shape of logits to debug the issue
print(f"Logits shape: {logits.shape}")
# Ensure logits has the shape (batch_size, 2) for binary classification
if logits.shape[-1] != 2:
logits = logits[:, :2] # Take only the first two columns (logits for the two classes)
print(f"Logits shape after slicing: {logits.shape}")
# Convert logits to tensor if necessary
if not isinstance(logits, torch.Tensor):
logits = torch.tensor(logits) # Convert logits to a tensor if needed
# Apply softmax to get probabilities
probs = torch.nn.functional.softmax(logits, dim=-1)
# Get probabilities for the positive class (label=1)
positive_probs = probs[:, 1].numpy() # The probabilities for the positive class (label=1)
# Get true labels from predictions
true_labels = predictions.label_ids
# Calculate precision-recall curve
precisions, recalls, thresholds = precision_recall_curve(true_labels, positive_probs)
f1_scores = 2 * (precisions * recalls) / (precisions + recalls + 1e-8)
# Find the optimal threshold based on F1-score
optimal_idx = np.argmax(f1_scores[:-1]) # Exclude last threshold (it is always 1)
optimal_threshold = thresholds[optimal_idx]
return optimal_threshold, precisions[optimal_idx], recalls[optimal_idx], f1_scores[optimal_idx]
def evaluate_with_threshold(trainer, dataset, threshold=0.5):
predictions = trainer.predict(dataset)
# Ensure that logits are properly reshaped to (batch_size, 2) before applying softmax
logits = predictions.predictions
if isinstance(logits, tuple):
logits = logits[0] # Extract logits if it's a tuple
logits = torch.tensor(logits) if not isinstance(logits, torch.Tensor) else logits
# Apply softmax to get probabilities
probs = torch.nn.functional.softmax(logits, dim=-1)
# Get predicted labels based on the threshold for the positive class (label=1)
predicted_labels = (probs[:, 1] > threshold).numpy().astype(int)
true_labels = predictions.label_ids
# Calculate precision-recall-fscore
precision, recall, f1, _ = precision_recall_fscore_support(
true_labels,
predicted_labels,
average='binary',
zero_division=0
)
return {
'precision': precision,
'recall': recall,
'f1': f1
}
# Example usage:
# Find the optimal threshold using validation data
print("\nFinding optimal threshold...")
optimal_threshold, best_precision, best_recall, best_f1 = find_optimal_threshold(trainer, val_dataset)
print(f"Optimal threshold: {optimal_threshold:.4f}")
# Evaluate with optimal threshold
print("\nEvaluating with optimal threshold:")
optimized_results = evaluate_with_threshold(trainer, test_dataset, threshold=optimal_threshold)
print(f"Precision: {optimized_results['precision']:.4f}")
print(f"Recall: {optimized_results['recall']:.4f}")
print(f"F1: {optimized_results['f1']:.4f}")
# Save the model
trainer.save_model("./hate_speech_model/best_model")

View File

@ -0,0 +1,208 @@
import sys
import codecs
from datasets import load_dataset, concatenate_datasets
import numpy as np
import torch
from sklearn.metrics import precision_recall_fscore_support,precision_recall_curve
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
Trainer,
TrainingArguments,
AutoModelForSeq2SeqLM,
set_seed,
T5Tokenizer
)
# Set seed for reproducibility
set_seed(42)
# Load and preprocess data
ds = load_dataset("TUKE-KEMT/hate_speech_slovak")
label_0 = ds['train'].filter(lambda example: example['label'] == 0)
label_1 = ds['train'].filter(lambda example: example['label'] == 1)
# Create stratified few-shot splits
def create_stratified_split(label_0, label_1, n_samples, seed=42):
few_shot_0 = label_0.shuffle(seed=seed).select(range(n_samples))
few_shot_1 = label_1.shuffle(seed=seed).select(range(n_samples))
return concatenate_datasets([few_shot_0, few_shot_1]).shuffle(seed=seed)
# Create train/val/test splits
train_dataset = create_stratified_split(label_0, label_1, n_samples=40)
val_dataset = create_stratified_split(label_0, label_1, n_samples=10, seed=43)
test_dataset = create_stratified_split(label_0, label_1, n_samples=50, seed=44)
# Initialize tokenizer and model
tokenizer = AutoTokenizer.from_pretrained("google/mt5-large", force_download=True)
model = AutoModelForSequenceClassification.from_pretrained("google/mt5-large", num_labels=2)
# Tokenization function with padding
def tokenize(batch):
return tokenizer(
batch["text"],
padding="max_length",
truncation=True,
max_length=128
)
# Prepare datasets
def prepare_dataset(dataset):
dataset = dataset.map(tokenize, batched=True, remove_columns=["text"])
print(dataset[0])
return dataset.rename_column("label", "labels")
train_dataset = prepare_dataset(train_dataset)
val_dataset = prepare_dataset(val_dataset)
test_dataset = prepare_dataset(test_dataset)
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
# Training arguments with improved settings
training_args = TrainingArguments(
output_dir="./hate_speech_model",
per_device_train_batch_size=8,
per_device_eval_batch_size=16,
learning_rate=3e-5, # Adjust as needed
num_train_epochs=7, # Increased epochs for better training
eval_strategy="epoch", # Use "epoch" for both strategies
save_strategy="epoch", # Matching the evaluation strategy
load_best_model_at_end=True,
metric_for_best_model="f1",
greater_is_better=True,
warmup_steps=100, # Increased warmup steps
weight_decay=0.01,
report_to="none",
seed=42,
logging_steps=10,
gradient_accumulation_steps=2, # For more effective training on larger datasets
lr_scheduler_type="cosine", # Using cosine scheduler for learning rate
logging_dir='./logs',
)
# Custom metrics computation
def compute_metrics(pred):
logits = pred.predictions[0] # Ensure only the logits are used
preds = logits.argmax(-1)
labels = pred.label_ids
precision, recall, f1, _ = precision_recall_fscore_support(
labels,
preds,
average='binary'
)
return {
'precision': precision,
'recall': recall,
'f1': f1
}
# Initialize trainer with validation data and metrics
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
compute_metrics=compute_metrics,
)
# Train model
trainer.train()
# Evaluate on test set
def find_optimal_threshold(trainer, dataset):
# Get predictions
predictions = trainer.predict(dataset)
# Extract logits (the first element in the predictions tuple)
logits = predictions.predictions # This is likely a tuple (logits, other_info)
# If logits is a tuple, extract only the logits
if isinstance(logits, tuple):
logits = logits[0] # Extract the logits from the tuple
# Check the shape of logits to debug the issue
print(f"Logits shape: {logits.shape}")
# Ensure logits has the shape (batch_size, 2) for binary classification
if logits.shape[-1] != 2:
logits = logits[:, :2] # Take only the first two columns (logits for the two classes)
print(f"Logits shape after slicing: {logits.shape}")
# Convert logits to tensor if necessary
if not isinstance(logits, torch.Tensor):
logits = torch.tensor(logits) # Convert logits to a tensor if needed
# Apply softmax to get probabilities
probs = torch.nn.functional.softmax(logits, dim=-1)
# Get probabilities for the positive class (label=1)
positive_probs = probs[:, 1].numpy() # The probabilities for the positive class (label=1)
# Get true labels from predictions
true_labels = predictions.label_ids
# Calculate precision-recall curve
precisions, recalls, thresholds = precision_recall_curve(true_labels, positive_probs)
f1_scores = 2 * (precisions * recalls) / (precisions + recalls + 1e-8)
# Find the optimal threshold based on F1-score
optimal_idx = np.argmax(f1_scores[:-1]) # Exclude last threshold (it is always 1)
optimal_threshold = thresholds[optimal_idx]
return optimal_threshold, precisions[optimal_idx], recalls[optimal_idx], f1_scores[optimal_idx]
def evaluate_with_threshold(trainer, dataset, threshold=0.5):
predictions = trainer.predict(dataset)
# Ensure that logits are properly reshaped to (batch_size, 2) before applying softmax
logits = predictions.predictions
if isinstance(logits, tuple):
logits = logits[0] # Extract logits if it's a tuple
logits = torch.tensor(logits) if not isinstance(logits, torch.Tensor) else logits
# Apply softmax to get probabilities
probs = torch.nn.functional.softmax(logits, dim=-1)
# Get predicted labels based on the threshold for the positive class (label=1)
predicted_labels = (probs[:, 1] > threshold).numpy().astype(int)
true_labels = predictions.label_ids
# Calculate precision-recall-fscore
precision, recall, f1, _ = precision_recall_fscore_support(
true_labels,
predicted_labels,
average='binary',
zero_division=0
)
return {
'precision': precision,
'recall': recall,
'f1': f1
}
# Example usage:
# Find the optimal threshold using validation data
print("\nFinding optimal threshold...")
optimal_threshold, best_precision, best_recall, best_f1 = find_optimal_threshold(trainer, val_dataset)
print(f"Optimal threshold: {optimal_threshold:.4f}")
# Evaluate with optimal threshold
print("\nEvaluating with optimal threshold:")
optimized_results = evaluate_with_threshold(trainer, test_dataset, threshold=optimal_threshold)
print(f"Precision: {optimized_results['precision']:.4f}")
print(f"Recall: {optimized_results['recall']:.4f}")
print(f"F1: {optimized_results['f1']:.4f}")
# Save the model
trainer.save_model("./hate_speech_model/best_model")

View File

@ -0,0 +1,214 @@
import sys
import codecs
from datasets import load_dataset, concatenate_datasets
import numpy as np
import torch
from sklearn.metrics import precision_recall_fscore_support,precision_recall_curve
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
Trainer,
TrainingArguments,
AutoModelForSeq2SeqLM,
set_seed,
T5Tokenizer
)
# Set seed for reproducibility
set_seed(42)
# Load and preprocess data
ds = load_dataset("TUKE-KEMT/hate_speech_slovak")
label_0 = ds['train'].filter(lambda example: example['label'] == 0)
label_1 = ds['train'].filter(lambda example: example['label'] == 1)
# Create stratified few-shot splits
def create_stratified_split(label_0, label_1, n_samples, seed=42):
few_shot_0 = label_0.shuffle(seed=seed).select(range(n_samples))
few_shot_1 = label_1.shuffle(seed=seed).select(range(n_samples))
return concatenate_datasets([few_shot_0, few_shot_1]).shuffle(seed=seed)
# Create train/val/test splits
train_dataset = create_stratified_split(label_0, label_1, n_samples=40)
val_dataset = create_stratified_split(label_0, label_1, n_samples=10, seed=43)
test_dataset = create_stratified_split(label_0, label_1, n_samples=50, seed=44)
# Load the tokenizer and model from "gerulata/slovakbert"
tokenizer = AutoTokenizer.from_pretrained("gerulata/slovakbert", force_download=True)
model = AutoModelForSequenceClassification.from_pretrained("gerulata/slovakbert", num_labels=2)
# Tokenization function with padding
def tokenize(batch):
return tokenizer(
batch["text"],
padding="max_length",
truncation=True,
max_length=128
)
# Prepare datasets
def prepare_dataset(dataset):
dataset = dataset.map(tokenize, batched=True, remove_columns=["text"])
print(dataset[0])
return dataset.rename_column("label", "labels")
train_dataset = prepare_dataset(train_dataset)
val_dataset = prepare_dataset(val_dataset)
test_dataset = prepare_dataset(test_dataset)
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
# Training arguments with improved settings
training_args = TrainingArguments(
output_dir="./hate_speech_model",
per_device_train_batch_size=8,
per_device_eval_batch_size=16,
learning_rate=3e-5, # Adjust as needed
num_train_epochs=7, # Increased epochs for better training
eval_strategy="epoch", # Use "epoch" for both strategies
save_strategy="epoch", # Matching the evaluation strategy
load_best_model_at_end=True,
metric_for_best_model="f1",
greater_is_better=True,
warmup_steps=100, # Increased warmup steps
weight_decay=0.01,
report_to="none",
seed=42,
logging_steps=10,
gradient_accumulation_steps=2, # For more effective training on larger datasets
lr_scheduler_type="cosine", # Using cosine scheduler for learning rate
logging_dir='./logs',
)
# Custom metrics computation
def compute_metrics(pred):
# Ensure predictions are properly extracted
logits = pred.predictions[0] if isinstance(pred.predictions, tuple) else pred.predictions
preds = logits.argmax(axis=-1) # Ensure you get the correct predictions (argmax for logits)
labels = pred.label_ids
# Debugging: Check if preds and labels are arrays and have the same length
print(f"Predictions: {preds[:10]}")
print(f"Labels: {labels[:10]}")
# Calculate precision, recall, and F1 score
precision, recall, f1, _ = precision_recall_fscore_support(
labels,
preds,
average='binary',
zero_division=0
)
return {
'precision': precision,
'recall': recall,
'f1': f1
}
# Initialize trainer with validation data and metrics
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
compute_metrics=compute_metrics,
)
# Train model
trainer.train()
# Evaluate on test set
def find_optimal_threshold(trainer, dataset):
# Get predictions
predictions = trainer.predict(dataset)
# Extract logits (the first element in the predictions tuple)
logits = predictions.predictions # This is likely a tuple (logits, other_info)
# If logits is a tuple, extract only the logits
if isinstance(logits, tuple):
logits = logits[0] # Extract the logits from the tuple
# Check the shape of logits to debug the issue
print(f"Logits shape: {logits.shape}")
# Ensure logits has the shape (batch_size, 2) for binary classification
if logits.shape[-1] != 2:
logits = logits[:, :2] # Take only the first two columns (logits for the two classes)
print(f"Logits shape after slicing: {logits.shape}")
# Convert logits to tensor if necessary
if not isinstance(logits, torch.Tensor):
logits = torch.tensor(logits) # Convert logits to a tensor if needed
# Apply softmax to get probabilities
probs = torch.nn.functional.softmax(logits, dim=-1)
# Get probabilities for the positive class (label=1)
positive_probs = probs[:, 1].numpy() # The probabilities for the positive class (label=1)
# Get true labels from predictions
true_labels = predictions.label_ids
# Calculate precision-recall curve
precisions, recalls, thresholds = precision_recall_curve(true_labels, positive_probs)
f1_scores = 2 * (precisions * recalls) / (precisions + recalls + 1e-8)
# Find the optimal threshold based on F1-score
optimal_idx = np.argmax(f1_scores[:-1]) # Exclude last threshold (it is always 1)
optimal_threshold = thresholds[optimal_idx]
return optimal_threshold, precisions[optimal_idx], recalls[optimal_idx], f1_scores[optimal_idx]
def evaluate_with_threshold(trainer, dataset, threshold=0.5):
predictions = trainer.predict(dataset)
# Ensure that logits are properly reshaped to (batch_size, 2) before applying softmax
logits = predictions.predictions
if isinstance(logits, tuple):
logits = logits[0] # Extract logits if it's a tuple
logits = torch.tensor(logits) if not isinstance(logits, torch.Tensor) else logits
# Apply softmax to get probabilities
probs = torch.nn.functional.softmax(logits, dim=-1)
# Get predicted labels based on the threshold for the positive class (label=1)
predicted_labels = (probs[:, 1] > threshold).numpy().astype(int)
true_labels = predictions.label_ids
# Calculate precision-recall-fscore
precision, recall, f1, _ = precision_recall_fscore_support(
true_labels,
predicted_labels,
average='binary',
zero_division=0
)
return {
'precision': precision,
'recall': recall,
'f1': f1
}
# Example usage:
# Find the optimal threshold using validation data
print("\nFinding optimal threshold...")
optimal_threshold, best_precision, best_recall, best_f1 = find_optimal_threshold(trainer, val_dataset)
print(f"Optimal threshold: {optimal_threshold:.4f}")
# Evaluate with optimal threshold
print("\nEvaluating with optimal threshold:")
optimized_results = evaluate_with_threshold(trainer, test_dataset, threshold=optimal_threshold)
print(f"Precision: {optimized_results['precision']:.4f}")
print(f"Recall: {optimized_results['recall']:.4f}")
print(f"F1: {optimized_results['f1']:.4f}")
# Save the model
trainer.save_model("./hate_speech_model/best_model_slovakbert")

View File

@ -0,0 +1,205 @@
import sys
import codecs
from datasets import load_dataset, concatenate_datasets
import numpy as np
import torch
from sklearn.metrics import precision_recall_fscore_support,precision_recall_curve
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
Trainer,
TrainingArguments,
AutoModelForSeq2SeqLM,
set_seed,
T5Tokenizer
)
# Set seed for reproducibility
set_seed(42)
# Load and preprocess data
ds = load_dataset("TUKE-KEMT/hate_speech_slovak")
label_0 = ds['train'].filter(lambda example: example['label'] == 0)
label_1 = ds['train'].filter(lambda example: example['label'] == 1)
# Create stratified few-shot splits
def create_stratified_split(label_0, label_1, n_samples, seed=42):
few_shot_0 = label_0.shuffle(seed=seed).select(range(n_samples))
few_shot_1 = label_1.shuffle(seed=seed).select(range(n_samples))
return concatenate_datasets([few_shot_0, few_shot_1]).shuffle(seed=seed)
# Create train/val/test splits
train_dataset = create_stratified_split(label_0, label_1, n_samples=40)
val_dataset = create_stratified_split(label_0, label_1, n_samples=10, seed=43)
test_dataset = create_stratified_split(label_0, label_1, n_samples=50, seed=44)
# Initialize tokenizer and model
tokenizer = AutoTokenizer.from_pretrained("TUKE-KEMT/slovak-t5-base", use_fast=False, legacy=False)
model = AutoModelForSequenceClassification.from_pretrained("TUKE-KEMT/slovak-t5-base", num_labels=2)
# Tokenization function with padding
def tokenize(batch):
return tokenizer(
batch["text"],
padding="max_length",
truncation=True,
max_length=128
)
# Prepare datasets
def prepare_dataset(dataset):
dataset = dataset.map(tokenize, batched=True, remove_columns=["text"])
return dataset.rename_column("label", "labels")
train_dataset = prepare_dataset(train_dataset)
val_dataset = prepare_dataset(val_dataset)
test_dataset = prepare_dataset(test_dataset)
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
# Training arguments with improved settings
training_args = TrainingArguments(
output_dir="./hate_speech_model",
per_device_train_batch_size=8,
per_device_eval_batch_size=16,
learning_rate=3e-5, # Adjust as needed
num_train_epochs=7, # Increased epochs for better training
eval_strategy="epoch", # Use "epoch" for both strategies
save_strategy="epoch", # Matching the evaluation strategy
load_best_model_at_end=True,
metric_for_best_model="f1",
greater_is_better=True,
warmup_steps=100, # Increased warmup steps
weight_decay=0.01,
report_to="none",
seed=42,
logging_steps=10,
gradient_accumulation_steps=2, # For more effective training on larger datasets
lr_scheduler_type="cosine", # Using cosine scheduler for learning rate
logging_dir='./logs',
)
# Custom metrics computation
def compute_metrics(pred):
logits = pred.predictions[0] # Ensure only the logits are used
preds = logits.argmax(-1)
labels = pred.label_ids
precision, recall, f1, _ = precision_recall_fscore_support(
labels,
preds,
average='binary'
)
return {
'precision': precision,
'recall': recall,
'f1': f1
}
# Initialize trainer with validation data and metrics
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
compute_metrics=compute_metrics,
)
# Train model
trainer.train()
# Evaluate on test set
def find_optimal_threshold(trainer, dataset):
# Get predictions
predictions = trainer.predict(dataset)
# Extract logits (the first element in the predictions tuple)
logits = predictions.predictions # This is likely a tuple (logits, other_info)
# If logits is a tuple, extract only the logits
if isinstance(logits, tuple):
logits = logits[0] # Extract the logits from the tuple
# Check the shape of logits to debug the issue
print(f"Logits shape: {logits.shape}")
# Ensure logits has the shape (batch_size, 2) for binary classification
if logits.shape[-1] != 2:
logits = logits[:, :2] # Take only the first two columns (logits for the two classes)
print(f"Logits shape after slicing: {logits.shape}")
# Convert logits to tensor if necessary
if not isinstance(logits, torch.Tensor):
logits = torch.tensor(logits) # Convert logits to a tensor if needed
# Apply softmax to get probabilities
probs = torch.nn.functional.softmax(logits, dim=-1)
# Get probabilities for the positive class (label=1)
positive_probs = probs[:, 1].numpy() # The probabilities for the positive class (label=1)
# Get true labels from predictions
true_labels = predictions.label_ids
# Calculate precision-recall curve
precisions, recalls, thresholds = precision_recall_curve(true_labels, positive_probs)
f1_scores = 2 * (precisions * recalls) / (precisions + recalls + 1e-8)
# Find the optimal threshold based on F1-score
optimal_idx = np.argmax(f1_scores[:-1]) # Exclude last threshold (it is always 1)
optimal_threshold = thresholds[optimal_idx]
return optimal_threshold, precisions[optimal_idx], recalls[optimal_idx], f1_scores[optimal_idx]
def evaluate_with_threshold(trainer, dataset, threshold=0.5):
predictions = trainer.predict(dataset)
# Ensure that logits are properly reshaped to (batch_size, 2) before applying softmax
logits = predictions.predictions
if isinstance(logits, tuple):
logits = logits[0] # Extract logits if it's a tuple
logits = torch.tensor(logits) if not isinstance(logits, torch.Tensor) else logits
# Apply softmax to get probabilities
probs = torch.nn.functional.softmax(logits, dim=-1)
# Get predicted labels based on the threshold for the positive class (label=1)
predicted_labels = (probs[:, 1] > threshold).numpy().astype(int)
true_labels = predictions.label_ids
# Calculate precision-recall-fscore
precision, recall, f1, _ = precision_recall_fscore_support(
true_labels,
predicted_labels,
average='binary',
zero_division=0
)
return {
'precision': precision,
'recall': recall,
'f1': f1
}
# Example usage:
# Find the optimal threshold using validation data
print("\nFinding optimal threshold...")
optimal_threshold, best_precision, best_recall, best_f1 = find_optimal_threshold(trainer, val_dataset)
print(f"Optimal threshold: {optimal_threshold:.4f}")
# Evaluate with optimal threshold
print("\nEvaluating with optimal threshold:")
optimized_results = evaluate_with_threshold(trainer, test_dataset, threshold=optimal_threshold)
print(f"Precision: {optimized_results['precision']:.4f}")
print(f"Recall: {optimized_results['recall']:.4f}")
print(f"F1: {optimized_results['f1']:.4f}")
# Save the model
trainer.save_model("./hate_speech_model/best_model")

View File

@ -1,180 +1,180 @@
import numpy as np
import torch
from datasets import load_dataset, concatenate_datasets
from sklearn.metrics import precision_recall_fscore_support, precision_recall_curve
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
Trainer,
TrainingArguments,
set_seed
)
# 🔹 1. Set seed for reproducibility
set_seed(42)
# 🔹 2. Load dataset
ds = load_dataset("TUKE-KEMT/hate_speech_slovak")
label_0 = ds['train'].filter(lambda example: example['label'] == 0)
label_1 = ds['train'].filter(lambda example: example['label'] == 1)
# Function to create stratified splits
def create_stratified_split(label_0, label_1, n_samples, seed=42):
few_shot_0 = label_0.shuffle(seed=seed).select(range(n_samples))
few_shot_1 = label_1.shuffle(seed=seed).select(range(n_samples))
return concatenate_datasets([few_shot_0, few_shot_1]).shuffle(seed=seed)
# Create train, validation, and test splits
train_dataset = create_stratified_split(label_0, label_1, n_samples=40)
val_dataset = create_stratified_split(label_0, label_1, n_samples=10, seed=43)
test_dataset = create_stratified_split(label_0, label_1, n_samples=50, seed=44)
# 🔹 3. Load tokenizer and model
model_name = "ApoTro/slovak-t5-small"
tokenizer = AutoTokenizer.from_pretrained(model_name, force_download=True)
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)
# Tokenization function
def tokenize(batch):
return tokenizer(
batch["text"],
padding="max_length",
truncation=True,
max_length=128
)
# Function to prepare datasets
def prepare_dataset(dataset):
dataset = dataset.map(tokenize, batched=True, remove_columns=["text"])
return dataset.rename_column("label", "labels")
# Apply preparation to datasets
train_dataset = prepare_dataset(train_dataset)
val_dataset = prepare_dataset(val_dataset)
test_dataset = prepare_dataset(test_dataset)
# Set device (GPU if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
# 🔹 4. Define training arguments
training_args = TrainingArguments(
output_dir="./hate_speech_model",
per_device_train_batch_size=8,
per_device_eval_batch_size=16,
learning_rate=3e-5,
num_train_epochs=7,
evaluation_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
metric_for_best_model="f1",
greater_is_better=True,
warmup_steps=100,
weight_decay=0.01,
report_to="none",
seed=42,
logging_steps=10,
gradient_accumulation_steps=2,
lr_scheduler_type="cosine",
logging_dir='./logs',
)
# 🔹 5. Define evaluation metrics
def compute_metrics(pred):
logits = pred.predictions[0]
preds = logits.argmax(-1)
labels = pred.label_ids
precision, recall, f1, _ = precision_recall_fscore_support(
labels, preds, average='binary'
)
return {
'precision': precision,
'recall': recall,
'f1': f1
}
# 🔹 6. Create Trainer instance
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
compute_metrics=compute_metrics,
)
# 🔹 7. Train the model
trainer.train()
# 🔹 8. Save the trained model
trainer.save_model("./hate_speech_model/best_model")
# 🔹 9. Load the trained model before testing
model = AutoModelForSequenceClassification.from_pretrained("./hate_speech_model/best_model")
trainer = Trainer(
model=model,
args=training_args,
eval_dataset=val_dataset,
compute_metrics=compute_metrics,
)
# 🔹 10. Function to find the optimal threshold
def find_optimal_threshold(trainer, dataset):
predictions = trainer.predict(dataset)
logits = predictions.predictions
# Ensure logits are properly shaped
if isinstance(logits, tuple):
logits = logits[0]
logits = torch.tensor(logits)
# Apply softmax
probs = torch.nn.functional.softmax(logits, dim=-1)
positive_probs = probs[:, 1].numpy()
true_labels = predictions.label_ids
# Compute Precision-Recall curve
precisions, recalls, thresholds = precision_recall_curve(true_labels, positive_probs)
f1_scores = 2 * (precisions * recalls) / (precisions + recalls + 1e-8)
# Find the best threshold based on F1-score
optimal_idx = np.argmax(f1_scores[:-1])
optimal_threshold = thresholds[optimal_idx]
return optimal_threshold, precisions[optimal_idx], recalls[optimal_idx], f1_scores[optimal_idx]
# 🔹 11. Function to evaluate the model using a custom threshold
def evaluate_with_threshold(trainer, dataset, threshold=0.5):
predictions = trainer.predict(dataset)
logits = predictions.predictions
if isinstance(logits, tuple):
logits = logits[0]
logits = torch.tensor(logits)
# Apply softmax
probs = torch.nn.functional.softmax(logits, dim=-1)
predicted_labels = (probs[:, 1] > threshold).numpy().astype(int)
true_labels = predictions.label_ids
# Compute Precision, Recall, F1-score
precision, recall, f1, _ = precision_recall_fscore_support(
true_labels, predicted_labels, average='binary', zero_division=0
)
return {
'precision': precision,
'recall': recall,
'f1': f1
}
# 🔹 12. Find the optimal threshold using validation set
print("\n🔍 Finding optimal threshold...")
optimal_threshold, best_precision, best_recall, best_f1 = find_optimal_threshold(trainer, val_dataset)
print(f"✅ Optimal threshold: {optimal_threshold:.4f}")
# 🔹 13. Final evaluation on the test set using the best threshold
print("\n📊 Evaluating on the test set with the optimal threshold:")
optimized_results = evaluate_with_threshold(trainer, test_dataset, threshold=optimal_threshold)
print(f"🎯 Precision: {optimized_results['precision']:.4f}")
print(f"🎯 Recall: {optimized_results['recall']:.4f}")
print(f"🎯 F1-score: {optimized_results['f1']:.4f}")
import numpy as np
import torch
from datasets import load_dataset, concatenate_datasets
from sklearn.metrics import precision_recall_fscore_support, precision_recall_curve
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
Trainer,
TrainingArguments,
set_seed
)
# 🔹 1. Set seed for reproducibility
set_seed(42)
# 🔹 2. Load dataset
ds = load_dataset("TUKE-KEMT/hate_speech_slovak")
label_0 = ds['train'].filter(lambda example: example['label'] == 0)
label_1 = ds['train'].filter(lambda example: example['label'] == 1)
# Function to create stratified splits
def create_stratified_split(label_0, label_1, n_samples, seed=42):
few_shot_0 = label_0.shuffle(seed=seed).select(range(n_samples))
few_shot_1 = label_1.shuffle(seed=seed).select(range(n_samples))
return concatenate_datasets([few_shot_0, few_shot_1]).shuffle(seed=seed)
# Create train, validation, and test splits
train_dataset = create_stratified_split(label_0, label_1, n_samples=40)
val_dataset = create_stratified_split(label_0, label_1, n_samples=10, seed=43)
test_dataset = create_stratified_split(label_0, label_1, n_samples=50, seed=44)
# 🔹 3. Load tokenizer and model
model_name = "ApoTro/slovak-t5-small"
tokenizer = AutoTokenizer.from_pretrained(model_name, force_download=True)
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=2)
# Tokenization function
def tokenize(batch):
return tokenizer(
batch["text"],
padding="max_length",
truncation=True,
max_length=128
)
# Function to prepare datasets
def prepare_dataset(dataset):
dataset = dataset.map(tokenize, batched=True, remove_columns=["text"])
return dataset.rename_column("label", "labels")
# Apply preparation to datasets
train_dataset = prepare_dataset(train_dataset)
val_dataset = prepare_dataset(val_dataset)
test_dataset = prepare_dataset(test_dataset)
# Set device (GPU if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
# 🔹 4. Define training arguments
training_args = TrainingArguments(
output_dir="./hate_speech_model",
per_device_train_batch_size=8,
per_device_eval_batch_size=16,
learning_rate=3e-5,
num_train_epochs=7,
evaluation_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
metric_for_best_model="f1",
greater_is_better=True,
warmup_steps=100,
weight_decay=0.01,
report_to="none",
seed=42,
logging_steps=10,
gradient_accumulation_steps=2,
lr_scheduler_type="cosine",
logging_dir='./logs',
)
# 🔹 5. Define evaluation metrics
def compute_metrics(pred):
logits = pred.predictions[0]
preds = logits.argmax(-1)
labels = pred.label_ids
precision, recall, f1, _ = precision_recall_fscore_support(
labels, preds, average='binary'
)
return {
'precision': precision,
'recall': recall,
'f1': f1
}
# 🔹 6. Create Trainer instance
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
compute_metrics=compute_metrics,
)
# 🔹 7. Train the model
trainer.train()
# 🔹 8. Save the trained model
trainer.save_model("./hate_speech_model/best_model")
# 🔹 9. Load the trained model before testing
model = AutoModelForSequenceClassification.from_pretrained("./hate_speech_model/best_model")
trainer = Trainer(
model=model,
args=training_args,
eval_dataset=val_dataset,
compute_metrics=compute_metrics,
)
# 🔹 10. Function to find the optimal threshold
def find_optimal_threshold(trainer, dataset):
predictions = trainer.predict(dataset)
logits = predictions.predictions
# Ensure logits are properly shaped
if isinstance(logits, tuple):
logits = logits[0]
logits = torch.tensor(logits)
# Apply softmax
probs = torch.nn.functional.softmax(logits, dim=-1)
positive_probs = probs[:, 1].numpy()
true_labels = predictions.label_ids
# Compute Precision-Recall curve
precisions, recalls, thresholds = precision_recall_curve(true_labels, positive_probs)
f1_scores = 2 * (precisions * recalls) / (precisions + recalls + 1e-8)
# Find the best threshold based on F1-score
optimal_idx = np.argmax(f1_scores[:-1])
optimal_threshold = thresholds[optimal_idx]
return optimal_threshold, precisions[optimal_idx], recalls[optimal_idx], f1_scores[optimal_idx]
# 🔹 11. Function to evaluate the model using a custom threshold
def evaluate_with_threshold(trainer, dataset, threshold=0.5):
predictions = trainer.predict(dataset)
logits = predictions.predictions
if isinstance(logits, tuple):
logits = logits[0]
logits = torch.tensor(logits)
# Apply softmax
probs = torch.nn.functional.softmax(logits, dim=-1)
predicted_labels = (probs[:, 1] > threshold).numpy().astype(int)
true_labels = predictions.label_ids
# Compute Precision, Recall, F1-score
precision, recall, f1, _ = precision_recall_fscore_support(
true_labels, predicted_labels, average='binary', zero_division=0
)
return {
'precision': precision,
'recall': recall,
'f1': f1
}
# 🔹 12. Find the optimal threshold using validation set
print("\n🔍 Finding optimal threshold...")
optimal_threshold, best_precision, best_recall, best_f1 = find_optimal_threshold(trainer, val_dataset)
print(f"✅ Optimal threshold: {optimal_threshold:.4f}")
# 🔹 13. Final evaluation on the test set using the best threshold
print("\n📊 Evaluating on the test set with the optimal threshold:")
optimized_results = evaluate_with_threshold(trainer, test_dataset, threshold=optimal_threshold)
print(f"🎯 Precision: {optimized_results['precision']:.4f}")
print(f"🎯 Recall: {optimized_results['recall']:.4f}")
print(f"🎯 F1-score: {optimized_results['f1']:.4f}")

View File

@ -0,0 +1,209 @@
import sys
import codecs
from datasets import load_dataset, concatenate_datasets
import numpy as np
import torch
from sklearn.metrics import precision_recall_fscore_support,precision_recall_curve
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
Trainer,
TrainingArguments,
AutoModelForSeq2SeqLM,
set_seed,
T5Tokenizer
)
# Set seed for reproducibility
set_seed(42)
# Load and preprocess data
ds = load_dataset("TUKE-KEMT/hate_speech_slovak")
label_0 = ds['train'].filter(lambda example: example['label'] == 0)
label_1 = ds['train'].filter(lambda example: example['label'] == 1)
# Create stratified few-shot splits
def create_stratified_split(label_0, label_1, n_samples, seed=42):
few_shot_0 = label_0.shuffle(seed=seed).select(range(n_samples))
few_shot_1 = label_1.shuffle(seed=seed).select(range(n_samples))
return concatenate_datasets([few_shot_0, few_shot_1]).shuffle(seed=seed)
# Create train/val/test splits
train_dataset = create_stratified_split(label_0, label_1, n_samples=40)
val_dataset = create_stratified_split(label_0, label_1, n_samples=10, seed=43)
test_dataset = create_stratified_split(label_0, label_1, n_samples=50, seed=44)
# Initialize tokenizer and model
tokenizer = AutoTokenizer.from_pretrained("google/mt5-small", force_download=True)
model = AutoModelForSequenceClassification.from_pretrained("google/mt5-small", num_labels=2)
# Tokenization function with padding
def tokenize(batch):
return tokenizer(
batch["text"],
padding="max_length",
truncation=True,
max_length=128
)
# Prepare datasets
def prepare_dataset(dataset):
dataset = dataset.map(tokenize, batched=True, remove_columns=["text"])
print(dataset[0])
return dataset.rename_column("label", "labels")
train_dataset = prepare_dataset(train_dataset)
val_dataset = prepare_dataset(val_dataset)
test_dataset = prepare_dataset(test_dataset)
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
# Training arguments with improved settings
training_args = TrainingArguments(
output_dir="./hate_speech_model",
per_device_train_batch_size=8,
per_device_eval_batch_size=16,
learning_rate=3e-5, # Adjust as needed
num_train_epochs=7, # Increased epochs for better training
eval_strategy="epoch", # Use "epoch" for both strategies
save_strategy="epoch", # Matching the evaluation strategy
load_best_model_at_end=True,
metric_for_best_model="f1",
greater_is_better=True,
warmup_steps=100, # Increased warmup steps
weight_decay=0.01,
report_to="none",
seed=42,
logging_steps=10,
gradient_accumulation_steps=2, # For more effective training on larger datasets
lr_scheduler_type="cosine", # Using cosine scheduler for learning rate
logging_dir='./logs',
)
# Custom metrics computation
def compute_metrics(pred):
logits = pred.predictions[0] # Ensure only the logits are used
preds = logits.argmax(-1)
labels = pred.label_ids
precision, recall, f1, _ = precision_recall_fscore_support(
labels,
preds,
average='binary'
)
return {
'precision': precision,
'recall': recall,
'f1': f1
}
# Initialize trainer with validation data and metrics
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
compute_metrics=compute_metrics,
)
# Train model
trainer.train()
# Evaluate on test set
def find_optimal_threshold(trainer, dataset):
# Get predictions
predictions = trainer.predict(dataset)
# Extract logits (the first element in the predictions tuple)
logits = predictions.predictions # This is likely a tuple (logits, other_info)
# If logits is a tuple, extract only the logits
if isinstance(logits, tuple):
logits = logits[0] # Extract the logits from the tuple
# Check the shape of logits to debug the issue
print(f"Logits shape: {logits.shape}")
# Ensure logits has the shape (batch_size, 2) for binary classification
if logits.shape[-1] != 2:
logits = logits[:, :2] # Take only the first two columns (logits for the two classes)
print(f"Logits shape after slicing: {logits.shape}")
# Convert logits to tensor if necessary
if not isinstance(logits, torch.Tensor):
logits = torch.tensor(logits) # Convert logits to a tensor if needed
# Apply softmax to get probabilities
probs = torch.nn.functional.softmax(logits, dim=-1)
# Get probabilities for the positive class (label=1)
positive_probs = probs[:, 1].numpy() # The probabilities for the positive class (label=1)
# Get true labels from predictions
true_labels = predictions.label_ids
# Calculate precision-recall curve
precisions, recalls, thresholds = precision_recall_curve(true_labels, positive_probs)
f1_scores = 2 * (precisions * recalls) / (precisions + recalls + 1e-8)
# Find the optimal threshold based on F1-score
optimal_idx = np.argmax(f1_scores[:-1]) # Exclude last threshold (it is always 1)
optimal_threshold = thresholds[optimal_idx]
return optimal_threshold, precisions[optimal_idx], recalls[optimal_idx], f1_scores[optimal_idx]
def evaluate_with_threshold(trainer, dataset, threshold=0.5):
predictions = trainer.predict(dataset)
# Ensure that logits are properly reshaped to (batch_size, 2) before applying softmax
logits = predictions.predictions
if isinstance(logits, tuple):
logits = logits[0] # Extract logits if it's a tuple
logits = torch.tensor(logits) if not isinstance(logits, torch.Tensor) else logits
# Apply softmax to get probabilities
probs = torch.nn.functional.softmax(logits, dim=-1)
# Get predicted labels based on the threshold for the positive class (label=1)
predicted_labels = (probs[:, 1] > threshold).numpy().astype(int)
true_labels = predictions.label_ids
# Calculate precision-recall-fscore
precision, recall, f1, _ = precision_recall_fscore_support(
true_labels,
predicted_labels,
average='binary',
zero_division=0
)
return {
'precision': precision,
'recall': recall,
'f1': f1
}
# Example usage:
# Find the optimal threshold using validation data
print("\nFinding optimal threshold...")
optimal_threshold, best_precision, best_recall, best_f1 = find_optimal_threshold(trainer, val_dataset)
print(f"Optimal threshold: {optimal_threshold:.4f}")
# Evaluate with optimal threshold
print("\nEvaluating with optimal threshold:")
optimized_results = evaluate_with_threshold(trainer, test_dataset, threshold=optimal_threshold)
print(f"Precision: {optimized_results['precision']:.4f}")
print(f"Recall: {optimized_results['recall']:.4f}")
print(f"F1: {optimized_results['f1']:.4f}")
# Save the model
trainer.save_model("./hate_speech_model/best_model")