43 lines
1.8 KiB
Python
43 lines
1.8 KiB
Python
import json
|
|
import chainlit as cl
|
|
|
|
async def handle_tool_start(parsed: dict, steps: cl.Step, active_steps: dict) -> None:
|
|
"""Open a new step for the tool and store it in active_steps"""
|
|
step = cl.Step(name=f"tool 🔧: {parsed['tool']}", type="tool", parent_id=steps.id)
|
|
await step.__aenter__()
|
|
step.input = parsed.get("input", "")
|
|
active_steps[parsed["tool"]] = step
|
|
|
|
async def handle_tool_result(parsed: dict, active_steps: dict) -> None:
|
|
"""Close the tool step and extract url from nested JSON."""
|
|
step = active_steps.pop(parsed["tool"], None)
|
|
if step:
|
|
output = json.loads(parsed["output"])
|
|
inner = json.loads(output.get("text", "{}"))
|
|
step.output = inner.get("url", output.get("url", parsed["output"]))
|
|
await step.__aexit__(None, None, None)
|
|
|
|
async def handle_reasoning(parsed: dict, steps: cl.Step) -> None:
|
|
"""Create a reasoning Step with the thinking text."""
|
|
reasoning_step = cl.Step(name="reasoning 🤔:", type="tool", parent_id=steps.id)
|
|
await reasoning_step.__aenter__()
|
|
reasoning_step.output = parsed["data"]
|
|
await reasoning_step.__aexit__(None, None, None)
|
|
|
|
async def handle_text(parsed: dict, msg: cl.Message) -> None:
|
|
"""Stream the agent response token into the message."""
|
|
await msg.stream_token(parsed["data"])
|
|
|
|
async def dispatch_events(parsed: dict, msg: cl.Message, steps: cl.Step, active_steps: dict) -> None:
|
|
"""Processes agent SSE events and updates the UI based on type."""
|
|
event_type = parsed["type"]
|
|
|
|
match event_type:
|
|
case "tool_start":
|
|
await handle_tool_start(parsed, steps, active_steps)
|
|
case "tool_result":
|
|
await handle_tool_result(parsed, active_steps)
|
|
case "reasoning":
|
|
await handle_reasoning(parsed, steps)
|
|
case "text":
|
|
await handle_text(parsed, msg) |