from typing import Any, AsyncGenerator from agents import Agent, Runner from agents import RunHooks, ModelResponse from openai.types.responses import ResponseTextDeltaEvent class MyRunHooks(RunHooks): """RunHooks for logging the lifecycle of an agent.""" async def on_agent_start(self, context, agent) -> None: print(f"\n 🐾[RunHooks] {agent.name} started.\n") async def on_agent_end(self, context, agent, output: Any) -> None: print(f"\n 🚪[RunHooks] {agent.name} ended.\n") async def stream_response(agent: Agent, prompt: str) -> AsyncGenerator[str, None]: """Stream agent response and update the UI.""" try: result = Runner.run_streamed(agent, input=prompt, hooks=MyRunHooks()) async for event in result.stream_events(): if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): yield event.data.delta # <-- sends the next piece of response text except Exception as e: yield f"⚠️🖨️ Error: {e}"