BC_praca/Backend/isolation_forest_custom.py

175 lines
5.2 KiB
Python

# isolation_forest_custom.py
import json
import pandas as pd
import numpy as np
import psutil
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import RobustScaler
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import os
import gc
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
TEMP_DIR = os.path.join(BASE_DIR, "temp")
def _force_memory_cleanup():
gc.collect()
try:
import ctypes
ctypes.CDLL("libc.so.6").malloc_trim(0)
except Exception:
pass
def update_progress(value, progress_path="progress.json"):
with open(progress_path, "w") as f:
json.dump({"progress": value}, f)
def run_isolation_forest_custom(
csv_path=None,
config_path=None,
progress_path=None
):
if csv_path is None:
csv_path = os.path.join(TEMP_DIR, "upload.csv")
if config_path is None:
config_path = os.path.join(TEMP_DIR, "config.json")
if progress_path is None:
progress_path = os.path.join(BASE_DIR, "progress.json")
update_progress(1, progress_path)
process = psutil.Process()
ram_before = process.memory_info().rss
ram_peak = ram_before
update_progress(5, progress_path)
with open(config_path, "r", encoding="utf-8") as f:
config = json.load(f)
label_col = config["labeling"]["label_column"]
normal_value = config["labeling"]["normal_value"]
features = config["features"]["selected_columns"]
params = config.get("algorithm", {}).get("parameters", {})
needed_cols = list(dict.fromkeys(features + [label_col]))
df = pd.read_csv(csv_path, usecols=needed_cols)
update_progress(15, progress_path)
current_ram = process.memory_info().rss
ram_peak = max(ram_peak, current_ram)
col = df[label_col]
df["__label"] = col.apply(lambda x: 0 if x == normal_value else 1)
update_progress(25, progress_path)
current_ram = process.memory_info().rss
ram_peak = max(ram_peak, current_ram)
normal_count = int((df["__label"] == 0).sum())
attack_count = int((df["__label"] == 1).sum())
X = df[features].to_numpy(dtype=np.float32, copy=True)
y = df["__label"].to_numpy(dtype=np.int8, copy=True)
scaler = RobustScaler()
X_scaled = scaler.fit_transform(X).astype(np.float32, copy=False)
contamination = min(y.mean() * 2.5, 0.49)
update_progress(45, progress_path)
current_ram = process.memory_info().rss
ram_peak = max(ram_peak, current_ram)
model = IsolationForest(
n_estimators=params.get("n_estimators", 600),
max_samples=params.get("max_samples", 0.3),
max_features=params.get("max_features", 0.7),
contamination=contamination,
random_state=params.get("random_state", 42),
n_jobs=1
)
model.fit(X_scaled)
update_progress(75, progress_path)
current_ram = process.memory_info().rss
ram_peak = max(ram_peak, current_ram)
preds = model.predict(X_scaled)
y_pred = np.where(preds == 1, 0, 1)
accuracy = accuracy_score(y, y_pred)
precision, recall, f1, _ = precision_recall_fscore_support(
y, y_pred, average=None, labels=[0, 1]
)
update_progress(85, progress_path)
results = {
"normal_count": float(normal_count),
"attack_count": float(attack_count),
"contamination": float(contamination),
"accuracy": float(accuracy),
"precision_normal": float(precision[0]),
"recall_normal": float(recall[0]),
"f1_normal": float(f1[0]),
"precision_attack": float(precision[1]),
"recall_attack": float(recall[1]),
"f1_attack": float(f1[1]),
}
anomaly_scores = -model.decision_function(X_scaled)
display_cols = list(dict.fromkeys(features + [label_col]))
top_df = df[display_cols].copy()
top_df["anomaly_score"] = anomaly_scores
top_df["predicted"] = y_pred
top_df["true_label"] = y
real_attacks = top_df[top_df["true_label"] == 1]
real_attacks_sorted = real_attacks.sort_values("anomaly_score", ascending=False)
top_real_attacks = real_attacks_sorted.head(5)
if len(top_real_attacks) > 0:
cols = [c for c in display_cols if c in top_real_attacks.columns] + ["anomaly_score"]
top_anomalies = (
top_real_attacks[cols]
.rename(columns={"anomaly_score": "score"})
.round(4)
.to_dict(orient="records")
)
else:
top_anomalies = []
results["top_anomalies"] = top_anomalies
del X, X_scaled, preds, y_pred, anomaly_scores
del top_df, real_attacks, real_attacks_sorted, top_real_attacks
del model, scaler, df, y, col
_force_memory_cleanup()
ram_after = process.memory_info().rss
results["ram_before"] = round(ram_before / (1024 ** 2), 2)
results["ram_peak"] = round(ram_peak / (1024 ** 2), 2)
results["ram_after"] = round(ram_after / (1024 ** 2), 2)
results["ram_increase"] = round((ram_peak - ram_before) / (1024 ** 2), 2)
return results
if __name__ == "__main__":
res = run_isolation_forest_custom()
print(json.dumps(res, indent=2))