diff --git a/lib/crewai/src/crewai/a2a/task_helpers.py b/lib/crewai/src/crewai/a2a/task_helpers.py index 1083f3fea..9e24c7e32 100644 --- a/lib/crewai/src/crewai/a2a/task_helpers.py +++ b/lib/crewai/src/crewai/a2a/task_helpers.py @@ -264,51 +264,57 @@ async def send_message_and_get_task_id( Returns: Task ID string if agent needs polling/waiting, or TaskStateResult if done. """ - async for event in event_stream: - if isinstance(event, Message): - new_messages.append(event) - result_parts = [ - part.root.text for part in event.parts if part.root.kind == "text" - ] - response_text = " ".join(result_parts) if result_parts else "" + try: + async for event in event_stream: + if isinstance(event, Message): + new_messages.append(event) + result_parts = [ + part.root.text for part in event.parts if part.root.kind == "text" + ] + response_text = " ".join(result_parts) if result_parts else "" - crewai_event_bus.emit( - None, - A2AResponseReceivedEvent( - response=response_text, - turn_number=turn_number, - is_multiturn=is_multiturn, - status="completed", - agent_role=agent_role, - ), - ) - - return TaskStateResult( - status=TaskState.completed, - result=response_text, - history=new_messages, - agent_card=agent_card, - ) - - if isinstance(event, tuple): - a2a_task, _ = event - - if a2a_task.status.state in TERMINAL_STATES | ACTIONABLE_STATES: - result = process_task_state( - a2a_task=a2a_task, - new_messages=new_messages, - agent_card=agent_card, - turn_number=turn_number, - is_multiturn=is_multiturn, - agent_role=agent_role, + crewai_event_bus.emit( + None, + A2AResponseReceivedEvent( + response=response_text, + turn_number=turn_number, + is_multiturn=is_multiturn, + status="completed", + agent_role=agent_role, + ), ) - if result: - return result - return a2a_task.id + return TaskStateResult( + status=TaskState.completed, + result=response_text, + history=new_messages, + agent_card=agent_card, + ) - return TaskStateResult( - status=TaskState.failed, - error="No task ID received from initial message", - history=new_messages, - ) + if isinstance(event, tuple): + a2a_task, _ = event + + if a2a_task.status.state in TERMINAL_STATES | ACTIONABLE_STATES: + result = process_task_state( + a2a_task=a2a_task, + new_messages=new_messages, + agent_card=agent_card, + turn_number=turn_number, + is_multiturn=is_multiturn, + agent_role=agent_role, + ) + if result: + return result + + return a2a_task.id + + return TaskStateResult( + status=TaskState.failed, + error="No task ID received from initial message", + history=new_messages, + ) + + finally: + aclose = getattr(event_stream, "aclose", None) + if aclose: + await aclose() diff --git a/lib/crewai/src/crewai/a2a/updates/streaming/handler.py b/lib/crewai/src/crewai/a2a/updates/streaming/handler.py index eb05724e7..556374edf 100644 --- a/lib/crewai/src/crewai/a2a/updates/streaming/handler.py +++ b/lib/crewai/src/crewai/a2a/updates/streaming/handler.py @@ -133,6 +133,11 @@ class StreamingHandler: history=new_messages, ) + finally: + aclose = getattr(event_stream, "aclose", None) + if aclose: + await aclose() + if final_result: return final_result