diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index 07b080fd3..12b6733bd 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -8,6 +8,7 @@ import time from typing import ( TYPE_CHECKING, Any, + Coroutine, Final, Literal, cast, @@ -69,6 +70,7 @@ from crewai.security.fingerprint import Fingerprint from crewai.tools.agent_tools.agent_tools import AgentTools from crewai.utilities.agent_utils import ( get_tool_names, + is_inside_event_loop, load_agent_from_repository, parse_tools, render_text_description_and_args, @@ -1699,16 +1701,16 @@ class Agent(BaseAgent): self, messages: str | list[LLMMessage], response_format: type[Any] | None = None, - ) -> LiteAgentOutput: + ) -> LiteAgentOutput | Coroutine[Any, Any, LiteAgentOutput]: """ Execute the agent with the given messages using the AgentExecutor. This method provides standalone agent execution without requiring a Crew. It supports tools, response formatting, and guardrails. - When called from within a sync Flow method, the Flow framework automatically - runs the method in a thread pool, so this works seamlessly. For async Flow - methods, use kickoff_async() instead. + When called from within a Flow (sync or async method), this automatically + detects the event loop and returns a coroutine that the Flow framework + awaits. Users don't need to handle async explicitly. Args: messages: Either a string query or a list of message dictionaries. @@ -1718,10 +1720,15 @@ class Agent(BaseAgent): Returns: LiteAgentOutput: The result of the agent execution. + When inside a Flow, returns a coroutine that resolves to LiteAgentOutput. Note: - If called from an async context (not through Flow), use kickoff_async(). + For explicit async usage outside of Flow, use kickoff_async() directly. """ + # Magic auto-async: if inside event loop (e.g., inside a Flow), + # return coroutine for Flow to await + if is_inside_event_loop(): + return self.kickoff_async(messages, response_format) executor, inputs, agent_info, parsed_tools = self._prepare_kickoff( messages, response_format diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 0bec95a96..6c26dbb67 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -1522,11 +1522,9 @@ class Flow(Generic[T], metaclass=FlowMeta): if self.last_human_feedback is not None else result ) - tasks = [ - self._execute_single_listener(listener_name, listener_result) - for listener_name in listeners_for_result - ] - await asyncio.gather(*tasks) + # Execute listeners sequentially to prevent race conditions on shared state + for listener_name in listeners_for_result: + await self._execute_single_listener(listener_name, listener_result) else: await self._execute_listeners(start_method_name, result) @@ -1595,12 +1593,14 @@ class Flow(Generic[T], metaclass=FlowMeta): if asyncio.iscoroutinefunction(method): result = await method(*args, **kwargs) else: + # Run sync methods in thread pool for isolation + # This allows Agent.kickoff() to work synchronously inside Flow methods import contextvars ctx = contextvars.copy_context() result = await asyncio.to_thread(ctx.run, method, *args, **kwargs) - # Auto-await coroutines from sync methods (still useful for explicit coroutine returns) + # Auto-await coroutines returned from sync methods (enables AgentExecutor pattern) if asyncio.iscoroutine(result): result = await result @@ -1749,11 +1749,11 @@ class Flow(Generic[T], metaclass=FlowMeta): listener_result = router_result_to_feedback.get( str(current_trigger), result ) - tasks = [ - self._execute_single_listener(listener_name, listener_result) - for listener_name in listeners_triggered - ] - await asyncio.gather(*tasks) + # Execute listeners sequentially to prevent race conditions on shared state + for listener_name in listeners_triggered: + await self._execute_single_listener( + listener_name, listener_result + ) if current_trigger in router_results: # Find start methods triggered by this router result @@ -1969,11 +1969,9 @@ class Flow(Generic[T], metaclass=FlowMeta): if self.last_human_feedback is not None else listener_result ) - tasks = [ - self._execute_single_listener(name, feedback_result) - for name in listeners_for_result - ] - await asyncio.gather(*tasks) + # Execute listeners sequentially to prevent race conditions on shared state + for name in listeners_for_result: + await self._execute_single_listener(name, feedback_result) except Exception as e: # Don't log HumanFeedbackPending as an error - it's expected control flow diff --git a/lib/crewai/tests/cassettes/agents/test_multiple_agents_in_same_flow.yaml b/lib/crewai/tests/cassettes/agents/test_multiple_agents_in_same_flow.yaml index 3a9fdda96..46ba712c1 100644 --- a/lib/crewai/tests/cassettes/agents/test_multiple_agents_in_same_flow.yaml +++ b/lib/crewai/tests/cassettes/agents/test_multiple_agents_in_same_flow.yaml @@ -47,16 +47,15 @@ interactions: uri: https://api.openai.com/v1/chat/completions response: body: - string: "{\n \"id\": \"chatcmpl-Cy7awLGYnYfpKGEeRhKlU90FltH7L\",\n \"object\": - \"chat.completion\",\n \"created\": 1768444910,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n + string: "{\n \"id\": \"chatcmpl-CyRKzgODZ9yn3F9OkaXsscLk2Ln3N\",\n \"object\": + \"chat.completion\",\n \"created\": 1768520801,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n \ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": \"assistant\",\n \"content\": \"I now can give a great answer \\nFinal - Answer: Hello and welcome! It's wonderful to see you here. I hope you're having - a fantastic day. If there's anything you need or if you have any questions, - feel free to ask. I'm here to help and make your experience enjoyable!\",\n - \ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\": - null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": - 127,\n \"completion_tokens\": 57,\n \"total_tokens\": 184,\n \"prompt_tokens_details\": + Answer: Hello! Welcome! I'm so glad to see you here. If you need any assistance + or have any questions, feel free to ask. Have a wonderful day!\",\n \"refusal\": + null,\n \"annotations\": []\n },\n \"logprobs\": null,\n + \ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\": + 127,\n \"completion_tokens\": 43,\n \"total_tokens\": 170,\n \"prompt_tokens_details\": {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\": @@ -69,7 +68,7 @@ interactions: Content-Type: - application/json Date: - - Thu, 15 Jan 2026 02:41:51 GMT + - Thu, 15 Jan 2026 23:46:42 GMT Server: - cloudflare Set-Cookie: @@ -87,17 +86,17 @@ interactions: cf-cache-status: - DYNAMIC content-length: - - '1074' + - '990' openai-organization: - OPENAI-ORG-XXX openai-processing-ms: - - '1019' + - '880' openai-project: - OPENAI-PROJECT-XXX openai-version: - '2020-10-01' x-envoy-upstream-service-time: - - '1242' + - '1160' x-openai-proxy-wasm: - v0.1 x-ratelimit-limit-requests: @@ -165,18 +164,18 @@ interactions: uri: https://api.openai.com/v1/chat/completions response: body: - string: "{\n \"id\": \"chatcmpl-Cy7ayZre6crr19UyujJE9YbNxDndk\",\n \"object\": - \"chat.completion\",\n \"created\": 1768444912,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n + string: "{\n \"id\": \"chatcmpl-CyRL1Ua2PkK5xXPp3KeF0AnGAk3JP\",\n \"object\": + \"chat.completion\",\n \"created\": 1768520803,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n \ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\": \"assistant\",\n \"content\": \"I now can give a great answer \\nFinal - Answer: As we conclude our conversation, I just want to take a moment to express - my heartfelt gratitude for your time and engagement. It has been a pleasure - interacting with you. I wish you all the best in your future endeavors. May - your path ahead be filled with success and happiness. Farewell, and until - we meet again!\",\n \"refusal\": null,\n \"annotations\": []\n - \ },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n - \ ],\n \"usage\": {\n \"prompt_tokens\": 126,\n \"completion_tokens\": - 75,\n \"total_tokens\": 201,\n \"prompt_tokens_details\": {\n \"cached_tokens\": + Answer: As we reach the end of our conversation, I want to express my gratitude + for the time we've shared. It's been a pleasure assisting you, and I hope + you found our interaction helpful and enjoyable. Remember, whenever you need + assistance, I'm just a message away. Wishing you all the best in your future + endeavors. Goodbye and take care!\",\n \"refusal\": null,\n \"annotations\": + []\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n + \ }\n ],\n \"usage\": {\n \"prompt_tokens\": 126,\n \"completion_tokens\": + 79,\n \"total_tokens\": 205,\n \"prompt_tokens_details\": {\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\": {\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\": 0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\": @@ -189,7 +188,7 @@ interactions: Content-Type: - application/json Date: - - Thu, 15 Jan 2026 02:41:53 GMT + - Thu, 15 Jan 2026 23:46:44 GMT Server: - cloudflare Set-Cookie: @@ -207,17 +206,17 @@ interactions: cf-cache-status: - DYNAMIC content-length: - - '1169' + - '1189' openai-organization: - OPENAI-ORG-XXX openai-processing-ms: - - '1298' + - '1363' openai-project: - OPENAI-PROJECT-XXX openai-version: - '2020-10-01' x-envoy-upstream-service-time: - - '1550' + - '1605' x-openai-proxy-wasm: - v0.1 x-ratelimit-limit-requests: