fix: ensure stream is closed on exit

This commit is contained in:
Greyson LaLonde
2026-01-06 18:52:01 -05:00
parent f53b8755da
commit c639455730
2 changed files with 55 additions and 44 deletions

View File

@@ -264,51 +264,57 @@ async def send_message_and_get_task_id(
Returns:
Task ID string if agent needs polling/waiting, or TaskStateResult if done.
"""
async for event in event_stream:
if isinstance(event, Message):
new_messages.append(event)
result_parts = [
part.root.text for part in event.parts if part.root.kind == "text"
]
response_text = " ".join(result_parts) if result_parts else ""
try:
async for event in event_stream:
if isinstance(event, Message):
new_messages.append(event)
result_parts = [
part.root.text for part in event.parts if part.root.kind == "text"
]
response_text = " ".join(result_parts) if result_parts else ""
crewai_event_bus.emit(
None,
A2AResponseReceivedEvent(
response=response_text,
turn_number=turn_number,
is_multiturn=is_multiturn,
status="completed",
agent_role=agent_role,
),
)
return TaskStateResult(
status=TaskState.completed,
result=response_text,
history=new_messages,
agent_card=agent_card,
)
if isinstance(event, tuple):
a2a_task, _ = event
if a2a_task.status.state in TERMINAL_STATES | ACTIONABLE_STATES:
result = process_task_state(
a2a_task=a2a_task,
new_messages=new_messages,
agent_card=agent_card,
turn_number=turn_number,
is_multiturn=is_multiturn,
agent_role=agent_role,
crewai_event_bus.emit(
None,
A2AResponseReceivedEvent(
response=response_text,
turn_number=turn_number,
is_multiturn=is_multiturn,
status="completed",
agent_role=agent_role,
),
)
if result:
return result
return a2a_task.id
return TaskStateResult(
status=TaskState.completed,
result=response_text,
history=new_messages,
agent_card=agent_card,
)
return TaskStateResult(
status=TaskState.failed,
error="No task ID received from initial message",
history=new_messages,
)
if isinstance(event, tuple):
a2a_task, _ = event
if a2a_task.status.state in TERMINAL_STATES | ACTIONABLE_STATES:
result = process_task_state(
a2a_task=a2a_task,
new_messages=new_messages,
agent_card=agent_card,
turn_number=turn_number,
is_multiturn=is_multiturn,
agent_role=agent_role,
)
if result:
return result
return a2a_task.id
return TaskStateResult(
status=TaskState.failed,
error="No task ID received from initial message",
history=new_messages,
)
finally:
aclose = getattr(event_stream, "aclose", None)
if aclose:
await aclose()

View File

@@ -133,6 +133,11 @@ class StreamingHandler:
history=new_messages,
)
finally:
aclose = getattr(event_stream, "aclose", None)
if aclose:
await aclose()
if final_result:
return final_result