legal-ai-assistant/tests/evals/test_performance.py
2026-05-29 14:04:54 +02:00

149 lines
5.2 KiB
Python

from dotenv import load_dotenv
load_dotenv(".env.test", override=True)
import os
import json
import pytest
from pathlib import Path
from backend.agent.agent import build_agent, make_mcp_server
from backend.agent.response import stream_response
TEST_MODEL = os.getenv("TEST_MODEL")
DATASET_PATH = Path(__file__).parent / "golden_datasets.json"
with open(DATASET_PATH, encoding="utf-8") as f:
DATASET = json.load(f)
SCENARIOS = DATASET["scenarios"]
class ScenarioStats:
def __init__(self):
self.actual_tools = []
self.response_text = ""
self.input_tokens = 0
self.output_tokens = 0
self.error = None
class TotalStats:
total_time = 0.0
total_input_tokens = 0
total_output_tokens = 0
total_cost = 0.0
scenarios_count = 0
def compute_tool_sets(expected: list[str],
actual: list[str]) -> tuple[set[str], set[str], set[str], float, float, float]:
"""Returns (TP, FP, FN, Precision, Recall, F1) as sets of tool names."""
expected_set = set(expected)
actual_set = set(actual)
tp = expected_set & actual_set
fp = actual_set - expected_set
fn = expected_set - actual_set
precision = len(tp) / (len(tp) + len(fp)) if actual_set else (1.0 if not expected_set else 0.0)
recall = len(tp) / (len(tp) + len(fn)) if expected_set else (1.0 if not actual_set else 0.0)
f1 = (2 * precision * recall / (precision + recall)) if (precision + recall) else 0.0
return tp, fp, fn, round(precision, 3), round(recall, 3), round(f1, 3)
def print_report(scenario: dict, stats: ScenarioStats, cost: float,
elapsed: float, tp: set, fp: set, fn: set,
precision: float, recall: float, f1: float) -> None:
sum_token = stats.input_tokens + stats.output_tokens
avg_time = TotalStats.total_time / TotalStats.scenarios_count
print(f"\n{'' * 60}")
print(f"\t[INFO]")
print(f"\tResource : {scenario['resource']} / {scenario['level']}")
print(f"\tQuery : {scenario['query'][:100]}...")
print(f"\tExpected : {scenario.get('expected_tools', [])}")
print(f"\tActual : {stats.actual_tools}")
print(f"\tResponse : {stats.response_text[:100].replace(chr(10), ' ')}...")
print(f"\t[EFFECTIVITY]")
print(f"\tElapsed : {elapsed}s")
print(f"\tTokens : input={stats.input_tokens} output={stats.output_tokens} sum={sum_token}")
print(f"\tCost : ${cost:.6f}")
print(f"\t[F1 SCORE | Scenarios: {TotalStats.scenarios_count}]")
print(f"\tTP : {sorted(tp)}")
print(f"\tFP : {sorted(fp)}")
print(f"\tFN : {sorted(fn)}")
print(f"\tPrecision: {precision:.3f}")
print(f"\tRecall : {recall:.3f}")
print(f"\tF1 : {f1:.3f}")
print(f"\t[TOTAL PROGRESS | Scenarios: {TotalStats.scenarios_count}]")
print(f"\tAccumulated Time : {TotalStats.total_time:.2f}s (avg: {avg_time:.2f}s/req)")
print(f"\tAccumulated Cost : ${TotalStats.total_cost:.6f}")
print(f"\tAccumulated Tokens: In={TotalStats.total_input_tokens} Out={TotalStats.total_output_tokens}")
if stats.error:
print(f"\tERROR : {stats.error}")
print(f"{'' * 60}")
@pytest.mark.evals
@pytest.mark.asyncio
@pytest.mark.parametrize(
"scenario",
SCENARIOS,
ids=[f"{s['resource']}-{s['level']}" for s in SCENARIOS],
)
async def test_scenarios(scenario: dict, calculate_cost) -> None:
stats = ScenarioStats()
query = scenario["query"]
expected_tools = scenario.get("expected_tools", [])
mcp_server = make_mcp_server()
pure_agent_time = 0.0
try:
async with mcp_server:
agent = build_agent(mcp_server=mcp_server, model_name=TEST_MODEL)
async for event in stream_response(agent, [{"role": "user", "content": query}]):
if event["type"] == "text":
stats.response_text += event["data"]
elif event["type"] == "tool_start":
stats.actual_tools.append(event["tool"])
elif event["type"] == "usage":
stats.input_tokens += event["input_tokens"]
stats.output_tokens += event["output_tokens"]
pure_agent_time = event.get("pure_duration", 0.0)
elif event["type"] == "error":
stats.error = event["data"]
except Exception as e:
stats.error = str(e)
elapsed = round(pure_agent_time, 2)
cost = calculate_cost(TEST_MODEL, stats.input_tokens, stats.output_tokens)
tp, fp, fn, precision, recall, f1 = compute_tool_sets(expected_tools, stats.actual_tools)
TotalStats.total_time += elapsed
TotalStats.total_input_tokens += stats.input_tokens
TotalStats.total_output_tokens += stats.output_tokens
TotalStats.total_cost += cost
TotalStats.scenarios_count += 1
print_report(
scenario=scenario,
stats=stats,
cost=cost,
elapsed=elapsed,
tp=tp,
fp=fp,
fn=fn,
precision=precision,
recall=recall,
f1=f1,
)
assert stats.error is None, (
f"Agent error [{scenario['resource']} {scenario['level']}]: {stats.error}"
)