diff --git a/conftest.py b/conftest.py index d63e7c885..12e3d9d19 100644 --- a/conftest.py +++ b/conftest.py @@ -31,6 +31,16 @@ def cleanup_event_handlers() -> Generator[None, Any, None]: pass +@pytest.fixture(autouse=True, scope="function") +def reset_event_state() -> None: + """Reset event system state before each test for isolation.""" + from crewai.events.base_events import reset_emission_counter + from crewai.events.event_context import _event_id_stack + + reset_emission_counter() + _event_id_stack.set(()) + + @pytest.fixture(autouse=True, scope="function") def setup_test_environment() -> Generator[None, Any, None]: """Setup test environment for crewAI workspace.""" diff --git a/lib/crewai/src/crewai/crews/utils.py b/lib/crewai/src/crewai/crews/utils.py index 5694dcda1..70e8828f0 100644 --- a/lib/crewai/src/crewai/crews/utils.py +++ b/lib/crewai/src/crewai/crews/utils.py @@ -189,9 +189,14 @@ def prepare_kickoff(crew: Crew, inputs: dict[str, Any] | None) -> dict[str, Any] Returns: The potentially modified inputs dictionary after before callbacks. """ + from crewai.events.base_events import reset_emission_counter from crewai.events.event_bus import crewai_event_bus + from crewai.events.event_context import get_current_parent_id from crewai.events.types.crew_events import CrewKickoffStartedEvent + if get_current_parent_id() is None: + reset_emission_counter() + for before_callback in crew.before_kickoff_callbacks: if inputs is None: inputs = {} diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index dcc0c78bc..715fd44fd 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -30,7 +30,9 @@ from pydantic import BaseModel, Field, ValidationError from rich.console import Console from rich.panel import Panel +from crewai.events.base_events import reset_emission_counter from crewai.events.event_bus import crewai_event_bus +from crewai.events.event_context import get_current_parent_id from crewai.events.listeners.tracing.trace_listener import ( TraceCollectionListener, ) @@ -73,6 +75,7 @@ from crewai.flow.utils import ( is_simple_flow_condition, ) + if TYPE_CHECKING: from crewai.flow.async_feedback.types import PendingFeedbackContext from crewai.flow.human_feedback import HumanFeedbackResult @@ -570,7 +573,7 @@ class Flow(Generic[T], metaclass=FlowMeta): flow_id: str, persistence: FlowPersistence | None = None, **kwargs: Any, - ) -> "Flow[Any]": + ) -> Flow[Any]: """Create a Flow instance from a pending feedback state. This classmethod is used to restore a flow that was paused waiting @@ -631,7 +634,7 @@ class Flow(Generic[T], metaclass=FlowMeta): return instance @property - def pending_feedback(self) -> "PendingFeedbackContext | None": + def pending_feedback(self) -> PendingFeedbackContext | None: """Get the pending feedback context if this flow is waiting for feedback. Returns: @@ -716,9 +719,10 @@ class Flow(Generic[T], metaclass=FlowMeta): Raises: ValueError: If no pending feedback context exists """ - from crewai.flow.human_feedback import HumanFeedbackResult from datetime import datetime + from crewai.flow.human_feedback import HumanFeedbackResult + if self._pending_feedback_context is None: raise ValueError( "No pending feedback context. Use from_pending() to restore a paused flow." @@ -744,7 +748,7 @@ class Flow(Generic[T], metaclass=FlowMeta): collapsed_outcome = self._collapse_to_outcome( feedback=feedback, outcomes=emit, - llm=llm, + llm=llm, # type: ignore[arg-type] ) # Create result @@ -792,13 +796,13 @@ class Flow(Generic[T], metaclass=FlowMeta): self._method_outputs.append(collapsed_outcome) # Then trigger listeners for the outcome (e.g., "approved" triggers @listen("approved")) - final_result = await self._execute_listeners( + final_result = await self._execute_listeners( # type: ignore[func-returns-value] FlowMethodName(collapsed_outcome), # Use outcome as trigger result, # Pass HumanFeedbackResult to listeners ) else: # Normal behavior - pass the HumanFeedbackResult - final_result = await self._execute_listeners( + final_result = await self._execute_listeners( # type: ignore[func-returns-value] FlowMethodName(context.method_name), result, ) @@ -901,11 +905,11 @@ class Flow(Generic[T], metaclass=FlowMeta): model_fields = getattr(self.initial_state, "model_fields", None) if not model_fields or "id" not in model_fields: raise ValueError("Flow state model must have an 'id' field") - instance = self.initial_state() + instance = self.initial_state() # type: ignore[assignment] # Ensure id is set - generate UUID if empty if not getattr(instance, "id", None): object.__setattr__(instance, "id", str(uuid4())) - return instance + return instance # type: ignore[return-value] if self.initial_state is dict: return cast(T, {"id": str(uuid4())}) @@ -1326,6 +1330,9 @@ class Flow(Generic[T], metaclass=FlowMeta): if filtered_inputs: self._initialize_state(filtered_inputs) + if get_current_parent_id() is None: + reset_emission_counter() + # Emit FlowStartedEvent and log the start of the flow. if not self.suppress_flow_events: future = crewai_event_bus.emit( @@ -2053,7 +2060,7 @@ class Flow(Generic[T], metaclass=FlowMeta): if isinstance(llm, str): llm_instance = LLM(model=llm) elif isinstance(llm, BaseLLMClass): - llm_instance = llm + llm_instance = llm # type: ignore[assignment] else: raise ValueError(f"Invalid llm type: {type(llm)}. Expected str or BaseLLM.") @@ -2090,7 +2097,7 @@ class Flow(Generic[T], metaclass=FlowMeta): try: parsed = json.loads(response) - return parsed.get("outcome", outcomes[0]) + return parsed.get("outcome", outcomes[0]) # type: ignore[no-any-return] except json.JSONDecodeError: # Not valid JSON, might be raw outcome string response_clean = response.strip() @@ -2099,9 +2106,9 @@ class Flow(Generic[T], metaclass=FlowMeta): return outcome return outcomes[0] elif isinstance(response, FeedbackOutcome): - return response.outcome + return response.outcome # type: ignore[no-any-return] elif hasattr(response, "outcome"): - return response.outcome + return response.outcome # type: ignore[no-any-return] else: # Unexpected type, fall back to first outcome logger.warning(f"Unexpected response type: {type(response)}")