Enhance Agent and Flow Execution Logic

- Updated the Agent class to automatically detect the event loop and return a coroutine when called within a Flow, simplifying async handling for users.
- Modified Flow class to execute listeners sequentially, preventing race conditions on shared state during listener execution.
- Improved handling of coroutine results from synchronous methods, ensuring proper execution flow and state management.

These changes enhance the overall execution logic and user experience when working with agents and flows in CrewAI.
This commit is contained in:
lorenzejay
2026-01-15 15:51:39 -08:00
parent ad83e8a2bf
commit 7f7b5094cc
3 changed files with 51 additions and 47 deletions

View File

@@ -8,6 +8,7 @@ import time
from typing import (
TYPE_CHECKING,
Any,
Coroutine,
Final,
Literal,
cast,
@@ -69,6 +70,7 @@ from crewai.security.fingerprint import Fingerprint
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.utilities.agent_utils import (
get_tool_names,
is_inside_event_loop,
load_agent_from_repository,
parse_tools,
render_text_description_and_args,
@@ -1699,16 +1701,16 @@ class Agent(BaseAgent):
self,
messages: str | list[LLMMessage],
response_format: type[Any] | None = None,
) -> LiteAgentOutput:
) -> LiteAgentOutput | Coroutine[Any, Any, LiteAgentOutput]:
"""
Execute the agent with the given messages using the AgentExecutor.
This method provides standalone agent execution without requiring a Crew.
It supports tools, response formatting, and guardrails.
When called from within a sync Flow method, the Flow framework automatically
runs the method in a thread pool, so this works seamlessly. For async Flow
methods, use kickoff_async() instead.
When called from within a Flow (sync or async method), this automatically
detects the event loop and returns a coroutine that the Flow framework
awaits. Users don't need to handle async explicitly.
Args:
messages: Either a string query or a list of message dictionaries.
@@ -1718,10 +1720,15 @@ class Agent(BaseAgent):
Returns:
LiteAgentOutput: The result of the agent execution.
When inside a Flow, returns a coroutine that resolves to LiteAgentOutput.
Note:
If called from an async context (not through Flow), use kickoff_async().
For explicit async usage outside of Flow, use kickoff_async() directly.
"""
# Magic auto-async: if inside event loop (e.g., inside a Flow),
# return coroutine for Flow to await
if is_inside_event_loop():
return self.kickoff_async(messages, response_format)
executor, inputs, agent_info, parsed_tools = self._prepare_kickoff(
messages, response_format

View File

@@ -1522,11 +1522,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
if self.last_human_feedback is not None
else result
)
tasks = [
self._execute_single_listener(listener_name, listener_result)
for listener_name in listeners_for_result
]
await asyncio.gather(*tasks)
# Execute listeners sequentially to prevent race conditions on shared state
for listener_name in listeners_for_result:
await self._execute_single_listener(listener_name, listener_result)
else:
await self._execute_listeners(start_method_name, result)
@@ -1595,12 +1593,14 @@ class Flow(Generic[T], metaclass=FlowMeta):
if asyncio.iscoroutinefunction(method):
result = await method(*args, **kwargs)
else:
# Run sync methods in thread pool for isolation
# This allows Agent.kickoff() to work synchronously inside Flow methods
import contextvars
ctx = contextvars.copy_context()
result = await asyncio.to_thread(ctx.run, method, *args, **kwargs)
# Auto-await coroutines from sync methods (still useful for explicit coroutine returns)
# Auto-await coroutines returned from sync methods (enables AgentExecutor pattern)
if asyncio.iscoroutine(result):
result = await result
@@ -1749,11 +1749,11 @@ class Flow(Generic[T], metaclass=FlowMeta):
listener_result = router_result_to_feedback.get(
str(current_trigger), result
)
tasks = [
self._execute_single_listener(listener_name, listener_result)
for listener_name in listeners_triggered
]
await asyncio.gather(*tasks)
# Execute listeners sequentially to prevent race conditions on shared state
for listener_name in listeners_triggered:
await self._execute_single_listener(
listener_name, listener_result
)
if current_trigger in router_results:
# Find start methods triggered by this router result
@@ -1969,11 +1969,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
if self.last_human_feedback is not None
else listener_result
)
tasks = [
self._execute_single_listener(name, feedback_result)
for name in listeners_for_result
]
await asyncio.gather(*tasks)
# Execute listeners sequentially to prevent race conditions on shared state
for name in listeners_for_result:
await self._execute_single_listener(name, feedback_result)
except Exception as e:
# Don't log HumanFeedbackPending as an error - it's expected control flow

View File

@@ -47,16 +47,15 @@ interactions:
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-Cy7awLGYnYfpKGEeRhKlU90FltH7L\",\n \"object\":
\"chat.completion\",\n \"created\": 1768444910,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
string: "{\n \"id\": \"chatcmpl-CyRKzgODZ9yn3F9OkaXsscLk2Ln3N\",\n \"object\":
\"chat.completion\",\n \"created\": 1768520801,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: Hello and welcome! It's wonderful to see you here. I hope you're having
a fantastic day. If there's anything you need or if you have any questions,
feel free to ask. I'm here to help and make your experience enjoyable!\",\n
\ \"refusal\": null,\n \"annotations\": []\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
127,\n \"completion_tokens\": 57,\n \"total_tokens\": 184,\n \"prompt_tokens_details\":
Answer: Hello! Welcome! I'm so glad to see you here. If you need any assistance
or have any questions, feel free to ask. Have a wonderful day!\",\n \"refusal\":
null,\n \"annotations\": []\n },\n \"logprobs\": null,\n
\ \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
127,\n \"completion_tokens\": 43,\n \"total_tokens\": 170,\n \"prompt_tokens_details\":
{\n \"cached_tokens\": 0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
@@ -69,7 +68,7 @@ interactions:
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 02:41:51 GMT
- Thu, 15 Jan 2026 23:46:42 GMT
Server:
- cloudflare
Set-Cookie:
@@ -87,17 +86,17 @@ interactions:
cf-cache-status:
- DYNAMIC
content-length:
- '1074'
- '990'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '1019'
- '880'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '1242'
- '1160'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests:
@@ -165,18 +164,18 @@ interactions:
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-Cy7ayZre6crr19UyujJE9YbNxDndk\",\n \"object\":
\"chat.completion\",\n \"created\": 1768444912,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
string: "{\n \"id\": \"chatcmpl-CyRL1Ua2PkK5xXPp3KeF0AnGAk3JP\",\n \"object\":
\"chat.completion\",\n \"created\": 1768520803,\n \"model\": \"gpt-4o-mini-2024-07-18\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"I now can give a great answer \\nFinal
Answer: As we conclude our conversation, I just want to take a moment to express
my heartfelt gratitude for your time and engagement. It has been a pleasure
interacting with you. I wish you all the best in your future endeavors. May
your path ahead be filled with success and happiness. Farewell, and until
we meet again!\",\n \"refusal\": null,\n \"annotations\": []\n
\ },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n }\n
\ ],\n \"usage\": {\n \"prompt_tokens\": 126,\n \"completion_tokens\":
75,\n \"total_tokens\": 201,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
Answer: As we reach the end of our conversation, I want to express my gratitude
for the time we've shared. It's been a pleasure assisting you, and I hope
you found our interaction helpful and enjoyable. Remember, whenever you need
assistance, I'm just a message away. Wishing you all the best in your future
endeavors. Goodbye and take care!\",\n \"refusal\": null,\n \"annotations\":
[]\n },\n \"logprobs\": null,\n \"finish_reason\": \"stop\"\n
\ }\n ],\n \"usage\": {\n \"prompt_tokens\": 126,\n \"completion_tokens\":
79,\n \"total_tokens\": 205,\n \"prompt_tokens_details\": {\n \"cached_tokens\":
0,\n \"audio_tokens\": 0\n },\n \"completion_tokens_details\":
{\n \"reasoning_tokens\": 0,\n \"audio_tokens\": 0,\n \"accepted_prediction_tokens\":
0,\n \"rejected_prediction_tokens\": 0\n }\n },\n \"service_tier\":
@@ -189,7 +188,7 @@ interactions:
Content-Type:
- application/json
Date:
- Thu, 15 Jan 2026 02:41:53 GMT
- Thu, 15 Jan 2026 23:46:44 GMT
Server:
- cloudflare
Set-Cookie:
@@ -207,17 +206,17 @@ interactions:
cf-cache-status:
- DYNAMIC
content-length:
- '1169'
- '1189'
openai-organization:
- OPENAI-ORG-XXX
openai-processing-ms:
- '1298'
- '1363'
openai-project:
- OPENAI-PROJECT-XXX
openai-version:
- '2020-10-01'
x-envoy-upstream-service-time:
- '1550'
- '1605'
x-openai-proxy-wasm:
- v0.1
x-ratelimit-limit-requests: