legal-ai-assistant/frontend/services/tool_steps.py

43 lines
1.8 KiB
Python

import json
import chainlit as cl
async def handle_tool_start(parsed: dict, steps: cl.Step, active_steps: dict) -> None:
"""Open a new step for the tool and store it in active_steps"""
step = cl.Step(name=f"tool 🔧: {parsed['tool']}", type="tool", parent_id=steps.id)
await step.__aenter__()
step.input = parsed.get("input", "")
active_steps[parsed["tool"]] = step
async def handle_tool_result(parsed: dict, active_steps: dict) -> None:
"""Close the tool step and extract url from nested JSON."""
step = active_steps.pop(parsed["tool"], None)
if step:
output = json.loads(parsed["output"])
inner = json.loads(output.get("text", "{}"))
step.output = inner.get("url", output.get("url", parsed["output"]))
await step.__aexit__(None, None, None)
async def handle_reasoning(parsed: dict, steps: cl.Step) -> None:
"""Create a reasoning Step with the thinking text."""
reasoning_step = cl.Step(name="reasoning 🤔:", type="tool", parent_id=steps.id)
await reasoning_step.__aenter__()
reasoning_step.output = parsed["data"]
await reasoning_step.__aexit__(None, None, None)
async def handle_text(parsed: dict, msg: cl.Message) -> None:
"""Stream the agent response token into the message."""
await msg.stream_token(parsed["data"])
async def dispatch_events(parsed: dict, msg: cl.Message, steps: cl.Step, active_steps: dict) -> None:
"""Processes agent SSE events and updates the UI based on type."""
event_type = parsed["type"]
match event_type:
case "tool_start":
await handle_tool_start(parsed, steps, active_steps)
case "tool_result":
await handle_tool_result(parsed, active_steps)
case "reasoning":
await handle_reasoning(parsed, steps)
case "text":
await handle_text(parsed, msg)