Compare commits

..

4 Commits

Author SHA1 Message Date
Devin AI
d9dad68f5c Fix type-checker: use PrinterColor type for color parameter
Co-Authored-By: João <joao@crewai.com>
2025-10-16 22:23:42 +00:00
Devin AI
8abbae57af Fix lint: use underscore for unused condition_type variable
Co-Authored-By: João <joao@crewai.com>
2025-10-16 22:18:06 +00:00
Devin AI
425bfeaf9f Fix lint: remove trailing whitespace from docstrings
Co-Authored-By: João <joao@crewai.com>
2025-10-16 22:16:00 +00:00
Devin AI
b0f6c66c36 Fix nested or_() conditions triggering listeners multiple times (issue #3719)
- Add tracking of triggered OR listeners to prevent multiple executions
- OR listeners now execute only once when any condition in the OR is met
- Clear OR listener tracking when entering new cycles or restarting flows
- Add comprehensive test suite for nested OR conditions
- All existing tests pass with no regressions

Fixes #3719

Co-Authored-By: João <joao@crewai.com>
2025-10-16 22:12:30 +00:00
9 changed files with 3829 additions and 3830 deletions

View File

@@ -105,7 +105,7 @@ def start(condition: str | dict | Callable | None = None) -> Callable:
condition : Optional[Union[str, dict, Callable]], optional
Defines when the start method should execute. Can be:
- str: Name of a method that triggers this start
- dict: Result from or_() or and_(), including nested conditions
- dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
- Callable: A method reference that triggers this start
Default is None, meaning unconditional start.
@@ -140,18 +140,13 @@ def start(condition: str | dict | Callable | None = None) -> Callable:
if isinstance(condition, str):
func.__trigger_methods__ = [condition]
func.__condition_type__ = "OR"
elif isinstance(condition, dict) and "type" in condition:
if "conditions" in condition:
func.__trigger_condition__ = condition
func.__trigger_methods__ = _extract_all_methods(condition)
func.__condition_type__ = condition["type"]
elif "methods" in condition:
func.__trigger_methods__ = condition["methods"]
func.__condition_type__ = condition["type"]
else:
raise ValueError(
"Condition dict must contain 'conditions' or 'methods'"
)
elif (
isinstance(condition, dict)
and "type" in condition
and "methods" in condition
):
func.__trigger_methods__ = condition["methods"]
func.__condition_type__ = condition["type"]
elif callable(condition) and hasattr(condition, "__name__"):
func.__trigger_methods__ = [condition.__name__]
func.__condition_type__ = "OR"
@@ -177,7 +172,7 @@ def listen(condition: str | dict | Callable) -> Callable:
condition : Union[str, dict, Callable]
Specifies when the listener should execute. Can be:
- str: Name of a method that triggers this listener
- dict: Result from or_() or and_(), including nested conditions
- dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
- Callable: A method reference that triggers this listener
Returns
@@ -205,18 +200,13 @@ def listen(condition: str | dict | Callable) -> Callable:
if isinstance(condition, str):
func.__trigger_methods__ = [condition]
func.__condition_type__ = "OR"
elif isinstance(condition, dict) and "type" in condition:
if "conditions" in condition:
func.__trigger_condition__ = condition
func.__trigger_methods__ = _extract_all_methods(condition)
func.__condition_type__ = condition["type"]
elif "methods" in condition:
func.__trigger_methods__ = condition["methods"]
func.__condition_type__ = condition["type"]
else:
raise ValueError(
"Condition dict must contain 'conditions' or 'methods'"
)
elif (
isinstance(condition, dict)
and "type" in condition
and "methods" in condition
):
func.__trigger_methods__ = condition["methods"]
func.__condition_type__ = condition["type"]
elif callable(condition) and hasattr(condition, "__name__"):
func.__trigger_methods__ = [condition.__name__]
func.__condition_type__ = "OR"
@@ -243,7 +233,7 @@ def router(condition: str | dict | Callable) -> Callable:
condition : Union[str, dict, Callable]
Specifies when the router should execute. Can be:
- str: Name of a method that triggers this router
- dict: Result from or_() or and_(), including nested conditions
- dict: Contains "type" ("AND"/"OR") and "methods" (list of triggers)
- Callable: A method reference that triggers this router
Returns
@@ -276,18 +266,13 @@ def router(condition: str | dict | Callable) -> Callable:
if isinstance(condition, str):
func.__trigger_methods__ = [condition]
func.__condition_type__ = "OR"
elif isinstance(condition, dict) and "type" in condition:
if "conditions" in condition:
func.__trigger_condition__ = condition
func.__trigger_methods__ = _extract_all_methods(condition)
func.__condition_type__ = condition["type"]
elif "methods" in condition:
func.__trigger_methods__ = condition["methods"]
func.__condition_type__ = condition["type"]
else:
raise ValueError(
"Condition dict must contain 'conditions' or 'methods'"
)
elif (
isinstance(condition, dict)
and "type" in condition
and "methods" in condition
):
func.__trigger_methods__ = condition["methods"]
func.__condition_type__ = condition["type"]
elif callable(condition) and hasattr(condition, "__name__"):
func.__trigger_methods__ = [condition.__name__]
func.__condition_type__ = "OR"
@@ -313,15 +298,14 @@ def or_(*conditions: str | dict | Callable) -> dict:
*conditions : Union[str, dict, Callable]
Variable number of conditions that can be:
- str: Method names
- dict: Existing condition dictionaries (nested conditions)
- dict: Existing condition dictionaries
- Callable: Method references
Returns
-------
dict
A condition dictionary with format:
{"type": "OR", "conditions": list_of_conditions}
where each condition can be a string (method name) or a nested dict
{"type": "OR", "methods": list_of_method_names}
Raises
------
@@ -333,22 +317,18 @@ def or_(*conditions: str | dict | Callable) -> dict:
>>> @listen(or_("success", "timeout"))
>>> def handle_completion(self):
... pass
>>> @listen(or_(and_("step1", "step2"), "step3"))
>>> def handle_nested(self):
... pass
"""
processed_conditions: list[str | dict[str, Any]] = []
methods = []
for condition in conditions:
if isinstance(condition, dict):
processed_conditions.append(condition)
if isinstance(condition, dict) and "methods" in condition:
methods.extend(condition["methods"])
elif isinstance(condition, str):
processed_conditions.append(condition)
methods.append(condition)
elif callable(condition):
processed_conditions.append(getattr(condition, "__name__", repr(condition)))
methods.append(getattr(condition, "__name__", repr(condition)))
else:
raise ValueError("Invalid condition in or_()")
return {"type": "OR", "conditions": processed_conditions}
return {"type": "OR", "methods": methods}
def and_(*conditions: str | dict | Callable) -> dict:
@@ -364,15 +344,14 @@ def and_(*conditions: str | dict | Callable) -> dict:
*conditions : Union[str, dict, Callable]
Variable number of conditions that can be:
- str: Method names
- dict: Existing condition dictionaries (nested conditions)
- dict: Existing condition dictionaries
- Callable: Method references
Returns
-------
dict
A condition dictionary with format:
{"type": "AND", "conditions": list_of_conditions}
where each condition can be a string (method name) or a nested dict
{"type": "AND", "methods": list_of_method_names}
Raises
------
@@ -384,69 +363,18 @@ def and_(*conditions: str | dict | Callable) -> dict:
>>> @listen(and_("validated", "processed"))
>>> def handle_complete_data(self):
... pass
>>> @listen(and_(or_("step1", "step2"), "step3"))
>>> def handle_nested(self):
... pass
"""
processed_conditions: list[str | dict[str, Any]] = []
methods = []
for condition in conditions:
if isinstance(condition, dict):
processed_conditions.append(condition)
if isinstance(condition, dict) and "methods" in condition:
methods.extend(condition["methods"])
elif isinstance(condition, str):
processed_conditions.append(condition)
methods.append(condition)
elif callable(condition):
processed_conditions.append(getattr(condition, "__name__", repr(condition)))
methods.append(getattr(condition, "__name__", repr(condition)))
else:
raise ValueError("Invalid condition in and_()")
return {"type": "AND", "conditions": processed_conditions}
def _normalize_condition(condition: str | dict | list) -> dict:
"""Normalize a condition to standard format with 'conditions' key.
Args:
condition: Can be a string (method name), dict (condition), or list
Returns:
Normalized dict with 'type' and 'conditions' keys
"""
if isinstance(condition, str):
return {"type": "OR", "conditions": [condition]}
if isinstance(condition, dict):
if "conditions" in condition:
return condition
if "methods" in condition:
return {"type": condition["type"], "conditions": condition["methods"]}
return condition
if isinstance(condition, list):
return {"type": "OR", "conditions": condition}
return {"type": "OR", "conditions": [condition]}
def _extract_all_methods(condition: str | dict | list) -> list[str]:
"""Extract all method names from a condition (including nested).
Args:
condition: Can be a string, dict, or list
Returns:
List of all method names in the condition tree
"""
if isinstance(condition, str):
return [condition]
if isinstance(condition, dict):
normalized = _normalize_condition(condition)
methods = []
for sub_cond in normalized.get("conditions", []):
methods.extend(_extract_all_methods(sub_cond))
return methods
if isinstance(condition, list):
methods = []
for item in condition:
methods.extend(_extract_all_methods(item))
return methods
return []
return {"type": "AND", "methods": methods}
class FlowMeta(type):
@@ -474,10 +402,7 @@ class FlowMeta(type):
if hasattr(attr_value, "__trigger_methods__"):
methods = attr_value.__trigger_methods__
condition_type = getattr(attr_value, "__condition_type__", "OR")
if hasattr(attr_value, "__trigger_condition__"):
listeners[attr_name] = attr_value.__trigger_condition__
else:
listeners[attr_name] = (condition_type, methods)
listeners[attr_name] = (condition_type, methods)
if (
hasattr(attr_value, "__is_router__")
@@ -534,6 +459,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._methods: dict[str, Callable] = {}
self._method_execution_counts: dict[str, int] = {}
self._pending_and_listeners: dict[str, set[str]] = {}
self._triggered_or_listeners: set[str] = set() # Track OR listeners that have already triggered
self._method_outputs: list[Any] = [] # list to store all method outputs
self._completed_methods: set[str] = set() # Track completed methods for reload
self._persistence: FlowPersistence | None = persistence
@@ -897,7 +823,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Clear completed methods and outputs for a fresh start
self._completed_methods.clear()
self._method_outputs.clear()
self._pending_and_listeners.clear()
self._triggered_or_listeners.clear()
else:
# We're restoring from persistence, set the flag
self._is_execution_resuming = True
@@ -998,6 +924,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
return
# For cyclic flows, clear from completed to allow re-execution
self._completed_methods.discard(start_method_name)
self._triggered_or_listeners.clear()
method = self._methods[start_method_name]
enhanced_method = self._inject_trigger_payload_for_start_method(method)
@@ -1162,16 +1089,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
for method_name in self._start_methods:
# Check if this start method is triggered by the current trigger
if method_name in self._listeners:
condition_data = self._listeners[method_name]
should_trigger = False
if isinstance(condition_data, tuple):
_, trigger_methods = condition_data
should_trigger = current_trigger in trigger_methods
elif isinstance(condition_data, dict):
all_methods = _extract_all_methods(condition_data)
should_trigger = current_trigger in all_methods
if should_trigger:
_, trigger_methods = self._listeners[
method_name
]
if current_trigger in trigger_methods:
# Only execute if this is a cycle (method was already completed)
if method_name in self._completed_methods:
# For router-triggered start methods in cycles, temporarily clear resumption flag
@@ -1181,51 +1102,6 @@ class Flow(Generic[T], metaclass=FlowMeta):
await self._execute_start_method(method_name)
self._is_execution_resuming = was_resuming
def _evaluate_condition(
self, condition: str | dict, trigger_method: str, listener_name: str
) -> bool:
"""Recursively evaluate a condition (simple or nested).
Args:
condition: Can be a string (method name) or dict (nested condition)
trigger_method: The method that just completed
listener_name: Name of the listener being evaluated
Returns:
True if the condition is satisfied, False otherwise
"""
if isinstance(condition, str):
return condition == trigger_method
if isinstance(condition, dict):
normalized = _normalize_condition(condition)
cond_type = normalized.get("type", "OR")
sub_conditions = normalized.get("conditions", [])
if cond_type == "OR":
return any(
self._evaluate_condition(sub_cond, trigger_method, listener_name)
for sub_cond in sub_conditions
)
if cond_type == "AND":
pending_key = f"{listener_name}:{id(condition)}"
if pending_key not in self._pending_and_listeners:
all_methods = set(_extract_all_methods(condition))
self._pending_and_listeners[pending_key] = all_methods
if trigger_method in self._pending_and_listeners[pending_key]:
self._pending_and_listeners[pending_key].discard(trigger_method)
if not self._pending_and_listeners[pending_key]:
self._pending_and_listeners.pop(pending_key, None)
return True
return False
return False
def _find_triggered_methods(
self, trigger_method: str, router_only: bool
) -> list[str]:
@@ -1233,7 +1109,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
Finds all methods that should be triggered based on conditions.
This internal method evaluates both OR and AND conditions to determine
which methods should be executed next in the flow. Supports nested conditions.
which methods should be executed next in the flow.
Parameters
----------
@@ -1250,13 +1126,15 @@ class Flow(Generic[T], metaclass=FlowMeta):
Notes
-----
- Handles both OR and AND conditions, including nested combinations
- Handles both OR and AND conditions:
* OR: Triggers if any condition is met (but only once per listener)
* AND: Triggers only when all conditions are met
- Maintains state for AND conditions using _pending_and_listeners
- Tracks OR listeners to prevent multiple triggers from the same condition
- Separates router and normal listener evaluation
"""
triggered = []
for listener_name, condition_data in self._listeners.items():
for listener_name, (condition_type, methods) in self._listeners.items():
is_router = listener_name in self._routers
if router_only != is_router:
@@ -1265,29 +1143,29 @@ class Flow(Generic[T], metaclass=FlowMeta):
if not router_only and listener_name in self._start_methods:
continue
if isinstance(condition_data, tuple):
condition_type, methods = condition_data
if condition_type == "OR":
# Check if this OR listener has already been triggered
if listener_name in self._triggered_or_listeners:
# Skip this listener as it has already been triggered by another method in the OR condition
continue
if condition_type == "OR":
if trigger_method in methods:
triggered.append(listener_name)
elif condition_type == "AND":
if listener_name not in self._pending_and_listeners:
self._pending_and_listeners[listener_name] = set(methods)
if trigger_method in self._pending_and_listeners[listener_name]:
self._pending_and_listeners[listener_name].discard(
trigger_method
)
if not self._pending_and_listeners[listener_name]:
triggered.append(listener_name)
self._pending_and_listeners.pop(listener_name, None)
elif isinstance(condition_data, dict):
if self._evaluate_condition(
condition_data, trigger_method, listener_name
):
# If the trigger_method matches any in methods, run this
if trigger_method in methods:
triggered.append(listener_name)
self._triggered_or_listeners.add(listener_name)
elif condition_type == "AND":
# Initialize pending methods for this listener if not already done
if listener_name not in self._pending_and_listeners:
self._pending_and_listeners[listener_name] = set(methods)
# Remove the trigger method from pending methods
if trigger_method in self._pending_and_listeners[listener_name]:
self._pending_and_listeners[listener_name].discard(trigger_method)
if not self._pending_and_listeners[listener_name]:
# All required methods have been executed
triggered.append(listener_name)
# Reset pending methods for this listener
self._pending_and_listeners.pop(listener_name, None)
return triggered
@@ -1327,6 +1205,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
return
# For cyclic flows, clear from completed to allow re-execution
self._completed_methods.discard(listener_name)
self._triggered_or_listeners.discard(listener_name)
try:
method = self._methods[listener_name]
@@ -1350,7 +1229,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
raise
def _log_flow_event(
self, message: str, color: PrinterColor | None = "yellow", level: str = "info"
self, message: str, color: PrinterColor = "yellow", level: str = "info"
) -> None:
"""Centralized logging method for flow events.

View File

@@ -14,7 +14,6 @@ from pydantic import (
from pydantic import BaseModel as PydanticBaseModel
from crewai.tools.structured_tool import CrewStructuredTool
from crewai.utilities.asyncio_utils import run_coroutine_sync
class EnvVar(BaseModel):
@@ -91,7 +90,7 @@ class BaseTool(BaseModel, ABC):
# If _run is async, we safely run it
if asyncio.iscoroutine(result):
result = run_coroutine_sync(result)
result = asyncio.run(result)
self.current_usage_count += 1

View File

@@ -8,7 +8,6 @@ from typing import TYPE_CHECKING, Any, get_type_hints
from pydantic import BaseModel, Field, create_model
from crewai.utilities.asyncio_utils import run_coroutine_sync
from crewai.utilities.logger import Logger
if TYPE_CHECKING:
@@ -270,12 +269,12 @@ class CrewStructuredTool:
self._increment_usage_count()
if inspect.iscoroutinefunction(self.func):
return run_coroutine_sync(self.func(**parsed_args, **kwargs))
return asyncio.run(self.func(**parsed_args, **kwargs))
result = self.func(**parsed_args, **kwargs)
if asyncio.iscoroutine(result):
return run_coroutine_sync(result)
return asyncio.run(result)
return result

View File

@@ -1,53 +0,0 @@
"""Utilities for handling asyncio operations safely across different contexts."""
import asyncio
from collections.abc import Coroutine
from typing import Any
def run_coroutine_sync(coro: Coroutine) -> Any:
"""
Run a coroutine synchronously, handling both cases where an event loop
is already running and where it's not.
This is useful when you need to run async code from sync code, but you're
not sure if you're already in an async context (e.g., when using asyncio.to_thread).
Args:
coro: The coroutine to run
Returns:
The result of the coroutine
Raises:
Any exception raised by the coroutine
"""
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(coro)
else:
import threading
result = None
exception = None
def run_in_new_loop():
nonlocal result, exception
try:
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
try:
result = new_loop.run_until_complete(coro)
finally:
new_loop.close()
except Exception as e:
exception = e
thread = threading.Thread(target=run_in_new_loop)
thread.start()
thread.join()
if exception:
raise exception
return result

View File

@@ -1,11 +1,6 @@
"""Utility for colored console output."""
from __future__ import annotations
from typing import TYPE_CHECKING, Final, Literal, NamedTuple
if TYPE_CHECKING:
from _typeshed import SupportsWrite
from typing import Final, Literal, NamedTuple
PrinterColor = Literal[
"purple",
@@ -59,22 +54,13 @@ class Printer:
@staticmethod
def print(
content: str | list[ColoredText],
color: PrinterColor | None = None,
sep: str | None = " ",
end: str | None = "\n",
file: SupportsWrite[str] | None = None,
flush: Literal[False] = False,
content: str | list[ColoredText], color: PrinterColor | None = None
) -> None:
"""Prints content to the console with optional color formatting.
Args:
content: Either a string or a list of ColoredText objects for multicolor output.
color: Optional color for the text when content is a string. Ignored when content is a list.
sep: Separator to use between the text and color.
end: String appended after the last value.
file: A file-like object (stream); defaults to the current sys.stdout.
flush: Whether to forcibly flush the stream.
"""
if isinstance(content, str):
content = [ColoredText(content, color)]
@@ -82,9 +68,5 @@ class Printer:
"".join(
f"{_COLOR_CODES[c.color] if c.color else ''}{c.text}{RESET}"
for c in content
),
sep=sep,
end=end,
file=file,
flush=flush,
)
)

View File

@@ -1,142 +0,0 @@
"""Tests for asyncio tool execution in different contexts."""
import asyncio
from unittest.mock import patch
import pytest
from crewai import Agent, Crew, Task
from crewai.tools import tool
@tool
async def async_test_tool(message: str) -> str:
"""An async tool that returns a message."""
await asyncio.sleep(0.01)
return f"Processed: {message}"
@tool
def sync_test_tool(message: str) -> str:
"""A sync tool that returns a message."""
return f"Sync: {message}"
class TestAsyncioToolExecution:
"""Test that tools work correctly in different asyncio contexts."""
@patch("crewai.Agent.execute_task")
def test_async_tool_with_asyncio_to_thread(self, mock_execute_task):
"""Test that async tools work when crew is run with asyncio.to_thread."""
mock_execute_task.return_value = "Task completed"
agent = Agent(
role="Test Agent",
goal="Test async tool execution",
backstory="A test agent",
)
task = Task(
description="Test task",
expected_output="A result",
agent=agent,
tools=[async_test_tool],
)
crew = Crew(agents=[agent], tasks=[task], verbose=False)
async def run_with_to_thread():
"""Run crew with asyncio.to_thread - this should not hang."""
result = await asyncio.to_thread(crew.kickoff)
return result
result = asyncio.run(run_with_to_thread())
assert result is not None
@patch("crewai.Agent.execute_task")
def test_sync_tool_with_asyncio_to_thread(self, mock_execute_task):
"""Test that sync tools work when crew is run with asyncio.to_thread."""
mock_execute_task.return_value = "Task completed"
agent = Agent(
role="Test Agent",
goal="Test sync tool execution",
backstory="A test agent",
)
task = Task(
description="Test task",
expected_output="A result",
agent=agent,
tools=[sync_test_tool],
)
crew = Crew(agents=[agent], tasks=[task], verbose=False)
async def run_with_to_thread():
"""Run crew with asyncio.to_thread."""
result = await asyncio.to_thread(crew.kickoff)
return result
result = asyncio.run(run_with_to_thread())
assert result is not None
@pytest.mark.asyncio
@patch("crewai.Agent.execute_task")
async def test_async_tool_with_kickoff_async(self, mock_execute_task):
"""Test that async tools work with kickoff_async."""
mock_execute_task.return_value = "Task completed"
agent = Agent(
role="Test Agent",
goal="Test async tool execution",
backstory="A test agent",
)
task = Task(
description="Test task",
expected_output="A result",
agent=agent,
tools=[async_test_tool],
)
crew = Crew(agents=[agent], tasks=[task], verbose=False)
result = await crew.kickoff_async()
assert result is not None
def test_async_tool_direct_invocation(self):
"""Test that async tools can be invoked directly from sync context."""
structured_tool = async_test_tool.to_structured_tool()
result = structured_tool.invoke({"message": "test"})
assert result == "Processed: test"
def test_async_tool_invocation_from_thread(self):
"""Test that async tools work when invoked from a thread pool."""
structured_tool = async_test_tool.to_structured_tool()
def invoke_tool():
return structured_tool.invoke({"message": "test"})
async def run_in_thread():
result = await asyncio.to_thread(invoke_tool)
return result
result = asyncio.run(run_in_thread())
assert result == "Processed: test"
@pytest.mark.asyncio
async def test_multiple_async_tools_concurrent(self):
"""Test multiple async tool invocations concurrently."""
structured_tool = async_test_tool.to_structured_tool()
async def invoke_async():
return await structured_tool.ainvoke({"message": "test"})
results = await asyncio.gather(
invoke_async(), invoke_async(), invoke_async()
)
assert len(results) == 3
for r in results:
assert "test" in str(r)

View File

@@ -6,15 +6,15 @@ from datetime import datetime
import pytest
from pydantic import BaseModel
from crewai.flow.flow import Flow, and_, listen, or_, router, start
from crewai.events.event_bus import crewai_event_bus
from crewai.events.types.flow_events import (
FlowFinishedEvent,
FlowPlotEvent,
FlowStartedEvent,
FlowPlotEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.flow.flow import Flow, and_, listen, or_, router, start
def test_simple_sequential_flow():
@@ -679,11 +679,11 @@ def test_structured_flow_event_emission():
assert isinstance(received_events[3], MethodExecutionStartedEvent)
assert received_events[3].method_name == "send_welcome_message"
assert received_events[3].params == {}
assert received_events[3].state.sent is False
assert getattr(received_events[3].state, "sent") is False
assert isinstance(received_events[4], MethodExecutionFinishedEvent)
assert received_events[4].method_name == "send_welcome_message"
assert received_events[4].state.sent is True
assert getattr(received_events[4].state, "sent") is True
assert received_events[4].result == "Welcome, Anakin!"
assert isinstance(received_events[5], FlowFinishedEvent)
@@ -894,75 +894,3 @@ def test_flow_name():
flow = MyFlow()
assert flow.name == "MyFlow"
def test_nested_and_or_conditions():
"""Test nested conditions like or_(and_(A, B), and_(C, D)).
Reproduces bug from issue #3719 where nested conditions are flattened,
causing premature execution.
"""
execution_order = []
class NestedConditionFlow(Flow):
@start()
def method_1(self):
execution_order.append("method_1")
@listen(method_1)
def method_2(self):
execution_order.append("method_2")
@router(method_2)
def method_3(self):
execution_order.append("method_3")
# Choose b_condition path
return "b_condition"
@listen("b_condition")
def method_5(self):
execution_order.append("method_5")
@listen(method_5)
async def method_4(self):
execution_order.append("method_4")
@listen(or_("a_condition", "b_condition"))
async def method_6(self):
execution_order.append("method_6")
@listen(
or_(
and_("a_condition", method_6),
and_(method_6, method_4),
)
)
def method_7(self):
execution_order.append("method_7")
@listen(method_7)
async def method_8(self):
execution_order.append("method_8")
flow = NestedConditionFlow()
flow.kickoff()
# Verify execution happened
assert "method_1" in execution_order
assert "method_2" in execution_order
assert "method_3" in execution_order
assert "method_5" in execution_order
assert "method_4" in execution_order
assert "method_6" in execution_order
assert "method_7" in execution_order
assert "method_8" in execution_order
# Critical assertion: method_7 should only execute AFTER both method_6 AND method_4
# Since b_condition was returned, method_6 triggers on b_condition
# method_7 requires: (a_condition AND method_6) OR (method_6 AND method_4)
# The second condition (method_6 AND method_4) should be the one that triggers
assert execution_order.index("method_7") > execution_order.index("method_6")
assert execution_order.index("method_7") > execution_order.index("method_4")
# method_8 should execute after method_7
assert execution_order.index("method_8") > execution_order.index("method_7")

View File

@@ -0,0 +1,296 @@
"""Test nested or_() conditions in Flow execution (Issue #3719)."""
from crewai.flow.flow import Flow, listen, or_, start
def test_nested_or_condition_triggers_once():
"""Test that nested or_() conditions only trigger listeners once.
This test addresses issue #3719 where nested or_() conditions would
cause listeners to execute multiple times instead of once.
Setup:
method_5 listens to or_(method_1, or_(method_2, method_3))
method_7 listens to or_(method_5, method_6)
Expected behavior:
- method_5 should execute exactly once (triggered by first matching condition)
- method_7 should execute exactly once (triggered by first matching condition)
Bug behavior (before fix):
- method_5 executed 3 times (once for method_1, method_2, and method_3)
- method_7 executed 4 times (once for each method_5 execution + method_6)
"""
execution_order = []
class NestedOrFlow(Flow):
@start()
def method_1(self):
execution_order.append("method_1")
return "method_1_done"
@listen("method_1")
def method_2(self):
execution_order.append("method_2")
return "method_2_done"
@listen("method_1")
def method_3(self):
execution_order.append("method_3")
return "method_3_done"
@listen(or_("method_1", or_("method_2", "method_3")))
def method_5(self):
execution_order.append("method_5")
return "method_5_done"
@listen("method_1")
def method_6(self):
execution_order.append("method_6")
return "method_6_done"
@listen(or_("method_5", "method_6"))
def method_7(self):
execution_order.append("method_7")
return "method_7_done"
flow = NestedOrFlow()
flow.kickoff()
assert execution_order.count("method_5") == 1, (
f"method_5 should execute exactly once, but executed {execution_order.count('method_5')} times"
)
assert execution_order.count("method_7") == 1, (
f"method_7 should execute exactly once, but executed {execution_order.count('method_7')} times"
)
assert "method_1" in execution_order
assert "method_2" in execution_order
assert "method_3" in execution_order
assert "method_5" in execution_order
assert "method_6" in execution_order
assert "method_7" in execution_order
def test_simple_or_condition_triggers_once():
"""Test that simple or_() conditions only trigger once.
Even without nesting, an OR condition should only trigger a listener once,
not multiple times for each method in the OR list.
"""
execution_order = []
class SimpleOrFlow(Flow):
@start()
def method_a(self):
execution_order.append("method_a")
@listen("method_a")
def method_b(self):
execution_order.append("method_b")
@listen("method_a")
def method_c(self):
execution_order.append("method_c")
@listen(or_("method_b", "method_c"))
def method_d(self):
execution_order.append("method_d")
flow = SimpleOrFlow()
flow.kickoff()
assert execution_order.count("method_d") == 1, (
f"method_d should execute exactly once, but executed {execution_order.count('method_d')} times"
)
def test_or_condition_with_three_methods():
"""Test OR condition with three methods triggers only once."""
execution_order = []
class ThreeMethodOrFlow(Flow):
@start()
def method_1(self):
execution_order.append("method_1")
@listen("method_1")
def method_2(self):
execution_order.append("method_2")
@listen("method_1")
def method_3(self):
execution_order.append("method_3")
@listen("method_1")
def method_4(self):
execution_order.append("method_4")
@listen(or_("method_2", "method_3", "method_4"))
def method_5(self):
execution_order.append("method_5")
flow = ThreeMethodOrFlow()
flow.kickoff()
assert execution_order.count("method_5") == 1, (
f"method_5 should execute exactly once, but executed {execution_order.count('method_5')} times"
)
def test_multiple_or_listeners_independent():
"""Test that multiple OR listeners are independent of each other."""
execution_order = []
class MultipleOrFlow(Flow):
@start()
def method_1(self):
execution_order.append("method_1")
@listen("method_1")
def method_2(self):
execution_order.append("method_2")
@listen("method_1")
def method_3(self):
execution_order.append("method_3")
@listen(or_("method_2", "method_3"))
def method_a(self):
execution_order.append("method_a")
@listen(or_("method_2", "method_3"))
def method_b(self):
execution_order.append("method_b")
flow = MultipleOrFlow()
flow.kickoff()
assert execution_order.count("method_a") == 1
assert execution_order.count("method_b") == 1
def test_deeply_nested_or_conditions():
"""Test deeply nested or_() conditions."""
execution_order = []
class DeeplyNestedOrFlow(Flow):
@start()
def start_method(self):
execution_order.append("start_method")
@listen("start_method")
def method_a(self):
execution_order.append("method_a")
@listen("start_method")
def method_b(self):
execution_order.append("method_b")
@listen("start_method")
def method_c(self):
execution_order.append("method_c")
@listen("start_method")
def method_d(self):
execution_order.append("method_d")
@listen(or_(or_("method_a", "method_b"), or_("method_c", "method_d")))
def final_method(self):
execution_order.append("final_method")
flow = DeeplyNestedOrFlow()
flow.kickoff()
assert execution_order.count("final_method") == 1, (
f"final_method should execute exactly once, but executed {execution_order.count('final_method')} times"
)
def test_or_condition_execution_order():
"""Test that OR listener executes after first matching condition.
The listener should trigger as soon as any one of the OR conditions is met,
not wait for all of them.
"""
execution_order = []
class ExecutionOrderFlow(Flow):
@start()
def method_1(self):
execution_order.append("method_1")
@listen("method_1")
def method_2(self):
execution_order.append("method_2")
@listen("method_1")
def method_3(self):
execution_order.append("method_3")
@listen(or_("method_2", "method_3"))
def method_4(self):
execution_order.append("method_4")
flow = ExecutionOrderFlow()
flow.kickoff()
method_4_index = execution_order.index("method_4")
assert "method_2" in execution_order[:method_4_index] or "method_3" in execution_order[:method_4_index], (
"method_4 should execute after at least one of method_2 or method_3"
)
def test_or_condition_with_single_method():
"""Test OR condition with a single method (edge case)."""
execution_order = []
class SingleMethodOrFlow(Flow):
@start()
def method_1(self):
execution_order.append("method_1")
@listen(or_("method_1"))
def method_2(self):
execution_order.append("method_2")
flow = SingleMethodOrFlow()
flow.kickoff()
assert execution_order == ["method_1", "method_2"]
assert execution_order.count("method_2") == 1
def test_cyclic_flow_with_or_condition():
"""Test that OR conditions work correctly in cyclic flows.
Within a single flow execution, an OR listener should only trigger once
even if multiple methods in the OR condition complete.
"""
execution_order = []
class CyclicOrFlow(Flow):
@start()
def step_1(self):
execution_order.append("step_1")
@listen("step_1")
def step_2(self):
execution_order.append("step_2")
@listen("step_1")
def step_3(self):
execution_order.append("step_3")
@listen(or_("step_2", "step_3"))
def step_4(self):
execution_order.append("step_4")
flow = CyclicOrFlow()
flow.kickoff()
assert execution_order.count("step_4") == 1, (
f"step_4 should execute once (not once for each OR condition), but executed {execution_order.count('step_4')} times"
)

6781
uv.lock generated

File diff suppressed because it is too large Load Diff