Lorenze/enh decouple executor from crew (#4209)
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled

* wip restrcuturing agent executor and liteagent

* fix: handle None task in AgentExecutor to prevent errors

Added a check to ensure that if the task is None, the method returns early without attempting to access task properties. This change improves the robustness of the AgentExecutor by preventing potential errors when the task is not set.

* refactor: streamline AgentExecutor initialization by removing redundant parameters

Updated the Agent class to simplify the initialization of the AgentExecutor by removing unnecessary task and crew parameters in standalone mode. This change enhances code clarity and maintains backward compatibility by ensuring that the executor is correctly configured without redundant assignments.

* ensure executors work inside a flow due to flow in flow async structure

* refactor: enhance agent kickoff preparation by separating common logic

Updated the Agent class to introduce a new private method  that consolidates the common setup logic for both synchronous and asynchronous kickoff executions. This change improves code clarity and maintainability by reducing redundancy in the kickoff process, while ensuring that the agent can still execute effectively within both standalone and flow contexts.

* linting and tests

* fix test

* refactor: improve test for Agent kickoff parameters

Updated the test for the Agent class to ensure that the kickoff method correctly preserves parameters. The test now verifies the configuration of the agent after kickoff, enhancing clarity and maintainability. Additionally, the test for asynchronous kickoff within a flow context has been updated to reflect the Agent class instead of LiteAgent.

* refactor: update test task guardrail process output for improved validation

Refactored the test for task guardrail process output to enhance the validation of the output against the OpenAPI schema. The changes include a more structured request body and updated response handling to ensure compliance with the guardrail requirements. This update aims to improve the clarity and reliability of the test cases, ensuring that task outputs are correctly validated and feedback is appropriately provided.

* test fix cassette

* test fix cassette

* working

* working cassette

* refactor: streamline agent execution and enhance flow compatibility

Refactored the Agent class to simplify the execution method by removing the event loop check and clarifying the behavior when called from synchronous and asynchronous contexts. The changes ensure that the method operates seamlessly within flow methods, improving clarity in the documentation. Additionally, updated the AgentExecutor to set the response model to None, enhancing flexibility. New test cassettes were added to validate the functionality of agents within flow contexts, ensuring robust testing for both synchronous and asynchronous operations.

* fixed cassette

* Enhance Flow Execution Logic

- Introduced conditional execution for start methods in the Flow class.
- Unconditional start methods are prioritized during kickoff, while conditional starts are executed only if no unconditional starts are present.
- Improved handling of cyclic flows by allowing re-execution of conditional start methods triggered by routers.
- Added checks to continue execution chains for completed conditional starts.

These changes improve the flexibility and control of flow execution, ensuring that the correct methods are triggered based on the defined conditions.

* Enhance Agent and Flow Execution Logic

- Updated the Agent class to automatically detect the event loop and return a coroutine when called within a Flow, simplifying async handling for users.
- Modified Flow class to execute listeners sequentially, preventing race conditions on shared state during listener execution.
- Improved handling of coroutine results from synchronous methods, ensuring proper execution flow and state management.

These changes enhance the overall execution logic and user experience when working with agents and flows in CrewAI.

* Enhance Flow Listener Logic and Agent Imports

- Updated the Flow class to track fired OR listeners, ensuring that multi-source OR listeners only trigger once during execution. This prevents redundant executions and improves flow efficiency.
- Cleared fired OR listeners during cyclic flow resets to allow re-execution in new cycles.
- Modified the Agent class imports to include Coroutine from collections.abc, enhancing type handling for asynchronous operations.

These changes improve the control and performance of flow execution in CrewAI, ensuring more predictable behavior in complex scenarios.

* adjusted test due to new cassette

* ensure we dont finalize batch on just a liteagent finishing

* feat: cancellable parallelized flow methods

* feat: allow methods to be cancelled & run parallelized

* feat: ensure state is thread safe through proxy

* fix: check for proxy state

* fix: mimic BaseModel method

* chore: update final attr checks; test

* better description

* fix test

* chore: update test assumptions

* extra

---------

Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
This commit is contained in:
Lorenze Jay
2026-01-20 21:44:45 -08:00
committed by GitHub
parent b267bb4054
commit 741bf12bf4
21 changed files with 3145 additions and 1376 deletions

View File

@@ -12,6 +12,7 @@ from concurrent.futures import Future
import copy
import inspect
import logging
import threading
from typing import (
TYPE_CHECKING,
Any,
@@ -64,6 +65,7 @@ from crewai.flow.persistence.base import FlowPersistence
from crewai.flow.types import FlowExecutionData, FlowMethodName, PendingListenerKey
from crewai.flow.utils import (
_extract_all_methods,
_extract_all_methods_recursive,
_normalize_condition,
get_possible_return_constants,
is_flow_condition_dict,
@@ -73,6 +75,7 @@ from crewai.flow.utils import (
is_simple_flow_condition,
)
if TYPE_CHECKING:
from crewai.flow.async_feedback.types import PendingFeedbackContext
from crewai.flow.human_feedback import HumanFeedbackResult
@@ -396,6 +399,62 @@ def and_(*conditions: str | FlowCondition | Callable[..., Any]) -> FlowCondition
return {"type": AND_CONDITION, "conditions": processed_conditions}
class StateProxy(Generic[T]):
"""Proxy that provides thread-safe access to flow state.
Wraps state objects (dict or BaseModel) and uses a lock for all write
operations to prevent race conditions when parallel listeners modify state.
"""
__slots__ = ("_proxy_lock", "_proxy_state")
def __init__(self, state: T, lock: threading.Lock) -> None:
object.__setattr__(self, "_proxy_state", state)
object.__setattr__(self, "_proxy_lock", lock)
def __getattr__(self, name: str) -> Any:
return getattr(object.__getattribute__(self, "_proxy_state"), name)
def __setattr__(self, name: str, value: Any) -> None:
if name in ("_proxy_state", "_proxy_lock"):
object.__setattr__(self, name, value)
else:
with object.__getattribute__(self, "_proxy_lock"):
setattr(object.__getattribute__(self, "_proxy_state"), name, value)
def __getitem__(self, key: str) -> Any:
return object.__getattribute__(self, "_proxy_state")[key]
def __setitem__(self, key: str, value: Any) -> None:
with object.__getattribute__(self, "_proxy_lock"):
object.__getattribute__(self, "_proxy_state")[key] = value
def __delitem__(self, key: str) -> None:
with object.__getattribute__(self, "_proxy_lock"):
del object.__getattribute__(self, "_proxy_state")[key]
def __contains__(self, key: str) -> bool:
return key in object.__getattribute__(self, "_proxy_state")
def __repr__(self) -> str:
return repr(object.__getattribute__(self, "_proxy_state"))
def _unwrap(self) -> T:
"""Return the underlying state object."""
return cast(T, object.__getattribute__(self, "_proxy_state"))
def model_dump(self) -> dict[str, Any]:
"""Return state as a dictionary.
Works for both dict and BaseModel underlying states.
"""
state = object.__getattribute__(self, "_proxy_state")
if isinstance(state, dict):
return state
result: dict[str, Any] = state.model_dump()
return result
class FlowMeta(type):
def __new__(
mcs,
@@ -519,7 +578,12 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._methods: dict[FlowMethodName, FlowMethod[Any, Any]] = {}
self._method_execution_counts: dict[FlowMethodName, int] = {}
self._pending_and_listeners: dict[PendingListenerKey, set[FlowMethodName]] = {}
self._fired_or_listeners: set[FlowMethodName] = (
set()
) # Track OR listeners that already fired
self._method_outputs: list[Any] = [] # list to store all method outputs
self._state_lock = threading.Lock()
self._or_listeners_lock = threading.Lock()
self._completed_methods: set[FlowMethodName] = (
set()
) # Track completed methods for reload
@@ -564,13 +628,182 @@ class Flow(Generic[T], metaclass=FlowMeta):
method = method.__get__(self, self.__class__)
self._methods[method.__name__] = method
def _mark_or_listener_fired(self, listener_name: FlowMethodName) -> bool:
"""Mark an OR listener as fired atomically.
Args:
listener_name: The name of the OR listener to mark.
Returns:
True if this call was the first to fire the listener.
False if the listener was already fired.
"""
with self._or_listeners_lock:
if listener_name in self._fired_or_listeners:
return False
self._fired_or_listeners.add(listener_name)
return True
def _clear_or_listeners(self) -> None:
"""Clear fired OR listeners for cyclic flows."""
with self._or_listeners_lock:
self._fired_or_listeners.clear()
def _discard_or_listener(self, listener_name: FlowMethodName) -> None:
"""Discard a single OR listener from the fired set."""
with self._or_listeners_lock:
self._fired_or_listeners.discard(listener_name)
def _build_racing_groups(self) -> dict[frozenset[FlowMethodName], FlowMethodName]:
"""Identify groups of methods that race for the same OR listener.
Analyzes the flow graph to find listeners with OR conditions that have
multiple trigger methods. These trigger methods form a "racing group"
where only the first to complete should trigger the OR listener.
Only methods that are EXCLUSIVELY sources for the OR listener are included
in the racing group. Methods that are also triggers for other listeners
(e.g., AND conditions) are not cancelled when another racing source wins.
Returns:
Dictionary mapping frozensets of racing method names to their
shared OR listener name.
Example:
If we have `@listen(or_(method_a, method_b))` on `handler`,
and method_a/method_b aren't used elsewhere,
this returns: {frozenset({'method_a', 'method_b'}): 'handler'}
"""
racing_groups: dict[frozenset[FlowMethodName], FlowMethodName] = {}
method_to_listeners: dict[FlowMethodName, set[FlowMethodName]] = {}
for listener_name, condition_data in self._listeners.items():
if is_simple_flow_condition(condition_data):
_, methods = condition_data
for m in methods:
method_to_listeners.setdefault(m, set()).add(listener_name)
elif is_flow_condition_dict(condition_data):
all_methods = _extract_all_methods_recursive(condition_data)
for m in all_methods:
method_name = FlowMethodName(m) if isinstance(m, str) else m
method_to_listeners.setdefault(method_name, set()).add(
listener_name
)
for listener_name, condition_data in self._listeners.items():
if listener_name in self._routers:
continue
trigger_methods: set[FlowMethodName] = set()
if is_simple_flow_condition(condition_data):
condition_type, methods = condition_data
if condition_type == OR_CONDITION and len(methods) > 1:
trigger_methods = set(methods)
elif is_flow_condition_dict(condition_data):
top_level_type = condition_data.get("type", OR_CONDITION)
if top_level_type == OR_CONDITION:
all_methods = _extract_all_methods_recursive(condition_data)
if len(all_methods) > 1:
trigger_methods = set(
FlowMethodName(m) if isinstance(m, str) else m
for m in all_methods
)
if trigger_methods:
exclusive_methods = {
m
for m in trigger_methods
if method_to_listeners.get(m, set()) == {listener_name}
}
if len(exclusive_methods) > 1:
racing_groups[frozenset(exclusive_methods)] = listener_name
return racing_groups
def _get_racing_group_for_listeners(
self,
listener_names: list[FlowMethodName],
) -> tuple[frozenset[FlowMethodName], FlowMethodName] | None:
"""Check if the given listeners form a racing group.
Args:
listener_names: List of listener method names being executed.
Returns:
Tuple of (racing_members, or_listener_name) if these listeners race,
None otherwise.
"""
if not hasattr(self, "_racing_groups_cache"):
self._racing_groups_cache = self._build_racing_groups()
listener_set = set(listener_names)
for racing_members, or_listener in self._racing_groups_cache.items():
if racing_members & listener_set:
racing_subset = racing_members & listener_set
if len(racing_subset) > 1:
return (frozenset(racing_subset), or_listener)
return None
async def _execute_racing_listeners(
self,
racing_listeners: frozenset[FlowMethodName],
other_listeners: list[FlowMethodName],
result: Any,
) -> None:
"""Execute racing listeners with first-wins semantics.
Racing listeners are executed in parallel, but once the first one
completes, the others are cancelled. Non-racing listeners in the
same batch are executed normally in parallel.
Args:
racing_listeners: Set of listener names that race for an OR condition.
other_listeners: Other listeners to execute in parallel (not racing).
result: The result from the triggering method.
"""
racing_tasks = [
asyncio.create_task(
self._execute_single_listener(name, result),
name=str(name),
)
for name in racing_listeners
]
other_tasks = [
asyncio.create_task(
self._execute_single_listener(name, result),
name=str(name),
)
for name in other_listeners
]
if racing_tasks:
for coro in asyncio.as_completed(racing_tasks):
try:
await coro
except Exception as e:
logger.debug(f"Racing listener failed: {e}")
continue
break
for task in racing_tasks:
if not task.done():
task.cancel()
if other_tasks:
await asyncio.gather(*other_tasks, return_exceptions=True)
@classmethod
def from_pending(
cls,
flow_id: str,
persistence: FlowPersistence | None = None,
**kwargs: Any,
) -> "Flow[Any]":
) -> Flow[Any]:
"""Create a Flow instance from a pending feedback state.
This classmethod is used to restore a flow that was paused waiting
@@ -631,7 +864,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
return instance
@property
def pending_feedback(self) -> "PendingFeedbackContext | None":
def pending_feedback(self) -> PendingFeedbackContext | None:
"""Get the pending feedback context if this flow is waiting for feedback.
Returns:
@@ -716,9 +949,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
Raises:
ValueError: If no pending feedback context exists
"""
from crewai.flow.human_feedback import HumanFeedbackResult
from datetime import datetime
from crewai.flow.human_feedback import HumanFeedbackResult
if self._pending_feedback_context is None:
raise ValueError(
"No pending feedback context. Use from_pending() to restore a paused flow."
@@ -740,12 +974,14 @@ class Flow(Generic[T], metaclass=FlowMeta):
# No default and no feedback - use first outcome
collapsed_outcome = emit[0]
elif emit:
# Collapse feedback to outcome using LLM
collapsed_outcome = self._collapse_to_outcome(
feedback=feedback,
outcomes=emit,
llm=llm,
)
if llm is not None:
collapsed_outcome = self._collapse_to_outcome(
feedback=feedback,
outcomes=emit,
llm=llm,
)
else:
collapsed_outcome = emit[0]
# Create result
result = HumanFeedbackResult(
@@ -784,21 +1020,16 @@ class Flow(Generic[T], metaclass=FlowMeta):
# This allows methods to re-execute in loops (e.g., implement_changes → suggest_changes → implement_changes)
self._is_execution_resuming = False
# Determine what to pass to listeners
final_result: Any = result
try:
if emit and collapsed_outcome:
# Router behavior - the outcome itself triggers listeners
# First, add the outcome to method outputs as a router would
self._method_outputs.append(collapsed_outcome)
# Then trigger listeners for the outcome (e.g., "approved" triggers @listen("approved"))
final_result = await self._execute_listeners(
FlowMethodName(collapsed_outcome), # Use outcome as trigger
result, # Pass HumanFeedbackResult to listeners
await self._execute_listeners(
FlowMethodName(collapsed_outcome),
result,
)
else:
# Normal behavior - pass the HumanFeedbackResult
final_result = await self._execute_listeners(
await self._execute_listeners(
FlowMethodName(context.method_name),
result,
)
@@ -894,18 +1125,17 @@ class Flow(Generic[T], metaclass=FlowMeta):
# Handle case where initial_state is a type (class)
if isinstance(self.initial_state, type):
if issubclass(self.initial_state, FlowState):
return self.initial_state() # Uses model defaults
if issubclass(self.initial_state, BaseModel):
# Validate that the model has an id field
model_fields = getattr(self.initial_state, "model_fields", None)
state_class: type[T] = self.initial_state
if issubclass(state_class, FlowState):
return state_class()
if issubclass(state_class, BaseModel):
model_fields = getattr(state_class, "model_fields", None)
if not model_fields or "id" not in model_fields:
raise ValueError("Flow state model must have an 'id' field")
instance = self.initial_state()
# Ensure id is set - generate UUID if empty
if not getattr(instance, "id", None):
object.__setattr__(instance, "id", str(uuid4()))
return instance
model_instance = state_class()
if not getattr(model_instance, "id", None):
object.__setattr__(model_instance, "id", str(uuid4()))
return model_instance
if self.initial_state is dict:
return cast(T, {"id": str(uuid4())})
@@ -970,7 +1200,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
@property
def state(self) -> T:
return self._state
return StateProxy(self._state, self._state_lock) # type: ignore[return-value]
@property
def method_outputs(self) -> list[Any]:
@@ -1295,6 +1525,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._completed_methods.clear()
self._method_outputs.clear()
self._pending_and_listeners.clear()
self._clear_or_listeners()
else:
# We're restoring from persistence, set the flag
self._is_execution_resuming = True
@@ -1346,9 +1577,26 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._initialize_state(inputs)
try:
# Determine which start methods to execute at kickoff
# Conditional start methods (with __trigger_methods__) are only triggered by their conditions
# UNLESS there are no unconditional starts (then all starts run as entry points)
unconditional_starts = [
start_method
for start_method in self._start_methods
if not getattr(
self._methods.get(start_method), "__trigger_methods__", None
)
]
# If there are unconditional starts, only run those at kickoff
# If there are NO unconditional starts, run all starts (including conditional ones)
starts_to_execute = (
unconditional_starts
if unconditional_starts
else self._start_methods
)
tasks = [
self._execute_start_method(start_method)
for start_method in self._start_methods
for start_method in starts_to_execute
]
await asyncio.gather(*tasks)
except Exception as e:
@@ -1431,13 +1679,14 @@ class Flow(Generic[T], metaclass=FlowMeta):
)
self._event_futures.clear()
trace_listener = TraceCollectionListener()
if trace_listener.batch_manager.batch_owner_type == "flow":
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
else:
trace_listener.batch_manager.finalize_batch()
if not self.suppress_flow_events:
trace_listener = TraceCollectionListener()
if trace_listener.batch_manager.batch_owner_type == "flow":
if trace_listener.first_time_handler.is_first_time:
trace_listener.first_time_handler.mark_events_collected()
trace_listener.first_time_handler.handle_execution_completion()
else:
trace_listener.batch_manager.finalize_batch()
return final_output
finally:
@@ -1481,6 +1730,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
return
# For cyclic flows, clear from completed to allow re-execution
self._completed_methods.discard(start_method_name)
# Also clear fired OR listeners to allow them to fire again in new cycle
self._clear_or_listeners()
method = self._methods[start_method_name]
enhanced_method = self._inject_trigger_payload_for_start_method(method)
@@ -1503,11 +1754,25 @@ class Flow(Generic[T], metaclass=FlowMeta):
if self.last_human_feedback is not None
else result
)
tasks = [
self._execute_single_listener(listener_name, listener_result)
for listener_name in listeners_for_result
]
await asyncio.gather(*tasks)
racing_group = self._get_racing_group_for_listeners(
listeners_for_result
)
if racing_group:
racing_members, _ = racing_group
other_listeners = [
name
for name in listeners_for_result
if name not in racing_members
]
await self._execute_racing_listeners(
racing_members, other_listeners, listener_result
)
else:
tasks = [
self._execute_single_listener(listener_name, listener_result)
for listener_name in listeners_for_result
]
await asyncio.gather(*tasks)
else:
await self._execute_listeners(start_method_name, result)
@@ -1573,11 +1838,19 @@ class Flow(Generic[T], metaclass=FlowMeta):
if future:
self._event_futures.append(future)
result = (
await method(*args, **kwargs)
if asyncio.iscoroutinefunction(method)
else method(*args, **kwargs)
)
if asyncio.iscoroutinefunction(method):
result = await method(*args, **kwargs)
else:
# Run sync methods in thread pool for isolation
# This allows Agent.kickoff() to work synchronously inside Flow methods
import contextvars
ctx = contextvars.copy_context()
result = await asyncio.to_thread(ctx.run, method, *args, **kwargs)
# Auto-await coroutines returned from sync methods (enables AgentExecutor pattern)
if asyncio.iscoroutine(result):
result = await result
self._method_outputs.append(result)
self._method_execution_counts[method_name] = (
@@ -1724,11 +1997,27 @@ class Flow(Generic[T], metaclass=FlowMeta):
listener_result = router_result_to_feedback.get(
str(current_trigger), result
)
tasks = [
self._execute_single_listener(listener_name, listener_result)
for listener_name in listeners_triggered
]
await asyncio.gather(*tasks)
racing_group = self._get_racing_group_for_listeners(
listeners_triggered
)
if racing_group:
racing_members, _ = racing_group
other_listeners = [
name
for name in listeners_triggered
if name not in racing_members
]
await self._execute_racing_listeners(
racing_members, other_listeners, listener_result
)
else:
tasks = [
self._execute_single_listener(
listener_name, listener_result
)
for listener_name in listeners_triggered
]
await asyncio.gather(*tasks)
if current_trigger in router_results:
# Find start methods triggered by this router result
@@ -1745,14 +2034,16 @@ class Flow(Generic[T], metaclass=FlowMeta):
should_trigger = current_trigger in all_methods
if should_trigger:
# Only execute if this is a cycle (method was already completed)
# Execute conditional start method triggered by router result
if method_name in self._completed_methods:
# For router-triggered start methods in cycles, temporarily clear resumption flag
# to allow cyclic execution
# For cyclic re-execution, temporarily clear resumption flag
was_resuming = self._is_execution_resuming
self._is_execution_resuming = False
await self._execute_start_method(method_name)
self._is_execution_resuming = was_resuming
else:
# First-time execution of conditional start
await self._execute_start_method(method_name)
def _evaluate_condition(
self,
@@ -1850,8 +2141,21 @@ class Flow(Generic[T], metaclass=FlowMeta):
condition_type, methods = condition_data
if condition_type == OR_CONDITION:
if trigger_method in methods:
triggered.append(listener_name)
# Only trigger multi-source OR listeners (or_(A, B, C)) once - skip if already fired
# Simple single-method listeners fire every time their trigger occurs
# Routers also fire every time - they're decision points
has_multiple_triggers = len(methods) > 1
should_check_fired = has_multiple_triggers and not is_router
if (
not should_check_fired
or listener_name not in self._fired_or_listeners
):
if trigger_method in methods:
triggered.append(listener_name)
# Only track multi-source OR listeners (not single-method or routers)
if should_check_fired:
self._fired_or_listeners.add(listener_name)
elif condition_type == AND_CONDITION:
pending_key = PendingListenerKey(listener_name)
if pending_key not in self._pending_and_listeners:
@@ -1864,10 +2168,26 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._pending_and_listeners.pop(pending_key, None)
elif is_flow_condition_dict(condition_data):
# For complex conditions, check if top-level is OR and track accordingly
top_level_type = condition_data.get("type", OR_CONDITION)
is_or_based = top_level_type == OR_CONDITION
# Only track multi-source OR conditions (multiple sub-conditions), not routers
sub_conditions = condition_data.get("conditions", [])
has_multiple_triggers = is_or_based and len(sub_conditions) > 1
should_check_fired = has_multiple_triggers and not is_router
# Skip compound OR-based listeners that have already fired
if should_check_fired and listener_name in self._fired_or_listeners:
continue
if self._evaluate_condition(
condition_data, trigger_method, listener_name
):
triggered.append(listener_name)
# Track compound OR-based listeners so they only fire once
if should_check_fired:
self._fired_or_listeners.add(listener_name)
return triggered
@@ -1896,9 +2216,22 @@ class Flow(Generic[T], metaclass=FlowMeta):
if self._is_execution_resuming:
# During resumption, skip execution but continue listeners
await self._execute_listeners(listener_name, None)
# For routers, also check if any conditional starts they triggered are completed
# If so, continue their chains
if listener_name in self._routers:
for start_method_name in self._start_methods:
if (
start_method_name in self._listeners
and start_method_name in self._completed_methods
):
# This conditional start was executed, continue its chain
await self._execute_start_method(start_method_name)
return
# For cyclic flows, clear from completed to allow re-execution
self._completed_methods.discard(listener_name)
# Also clear from fired OR listeners for cyclic flows
self._discard_or_listener(listener_name)
try:
method = self._methods[listener_name]
@@ -1931,11 +2264,25 @@ class Flow(Generic[T], metaclass=FlowMeta):
if self.last_human_feedback is not None
else listener_result
)
tasks = [
self._execute_single_listener(name, feedback_result)
for name in listeners_for_result
]
await asyncio.gather(*tasks)
racing_group = self._get_racing_group_for_listeners(
listeners_for_result
)
if racing_group:
racing_members, _ = racing_group
other_listeners = [
name
for name in listeners_for_result
if name not in racing_members
]
await self._execute_racing_listeners(
racing_members, other_listeners, feedback_result
)
else:
tasks = [
self._execute_single_listener(name, feedback_result)
for name in listeners_for_result
]
await asyncio.gather(*tasks)
except Exception as e:
# Don't log HumanFeedbackPending as an error - it's expected control flow
@@ -2049,7 +2396,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
from crewai.llms.base_llm import BaseLLM as BaseLLMClass
from crewai.utilities.i18n import get_i18n
# Get or create LLM instance
llm_instance: BaseLLMClass
if isinstance(llm, str):
llm_instance = LLM(model=llm)
elif isinstance(llm, BaseLLMClass):
@@ -2084,26 +2431,23 @@ class Flow(Generic[T], metaclass=FlowMeta):
response_model=FeedbackOutcome,
)
# Parse the response - LLM returns JSON string when using response_model
if isinstance(response, str):
import json
try:
parsed = json.loads(response)
return parsed.get("outcome", outcomes[0])
return str(parsed.get("outcome", outcomes[0]))
except json.JSONDecodeError:
# Not valid JSON, might be raw outcome string
response_clean = response.strip()
for outcome in outcomes:
if outcome.lower() == response_clean.lower():
return outcome
return outcomes[0]
elif isinstance(response, FeedbackOutcome):
return response.outcome
return str(response.outcome)
elif hasattr(response, "outcome"):
return response.outcome
return str(response.outcome)
else:
# Unexpected type, fall back to first outcome
logger.warning(f"Unexpected response type: {type(response)}")
return outcomes[0]