feat: reset emission counter for test isolation

This commit is contained in:
Greyson LaLonde
2026-01-20 01:12:52 -05:00
parent ae253b4156
commit 161f9bd063
3 changed files with 34 additions and 12 deletions

View File

@@ -31,6 +31,16 @@ def cleanup_event_handlers() -> Generator[None, Any, None]:
pass pass
@pytest.fixture(autouse=True, scope="function")
def reset_event_state() -> None:
"""Reset event system state before each test for isolation."""
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_context import _event_id_stack
reset_emission_counter()
_event_id_stack.set(())
@pytest.fixture(autouse=True, scope="function") @pytest.fixture(autouse=True, scope="function")
def setup_test_environment() -> Generator[None, Any, None]: def setup_test_environment() -> Generator[None, Any, None]:
"""Setup test environment for crewAI workspace.""" """Setup test environment for crewAI workspace."""

View File

@@ -189,9 +189,14 @@ def prepare_kickoff(crew: Crew, inputs: dict[str, Any] | None) -> dict[str, Any]
Returns: Returns:
The potentially modified inputs dictionary after before callbacks. The potentially modified inputs dictionary after before callbacks.
""" """
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_bus import crewai_event_bus from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_context import get_current_parent_id
from crewai.events.types.crew_events import CrewKickoffStartedEvent from crewai.events.types.crew_events import CrewKickoffStartedEvent
if get_current_parent_id() is None:
reset_emission_counter()
for before_callback in crew.before_kickoff_callbacks: for before_callback in crew.before_kickoff_callbacks:
if inputs is None: if inputs is None:
inputs = {} inputs = {}

View File

@@ -30,7 +30,9 @@ from pydantic import BaseModel, Field, ValidationError
from rich.console import Console from rich.console import Console
from rich.panel import Panel from rich.panel import Panel
from crewai.events.base_events import reset_emission_counter
from crewai.events.event_bus import crewai_event_bus from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_context import get_current_parent_id
from crewai.events.listeners.tracing.trace_listener import ( from crewai.events.listeners.tracing.trace_listener import (
TraceCollectionListener, TraceCollectionListener,
) )
@@ -73,6 +75,7 @@ from crewai.flow.utils import (
is_simple_flow_condition, is_simple_flow_condition,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from crewai.flow.async_feedback.types import PendingFeedbackContext from crewai.flow.async_feedback.types import PendingFeedbackContext
from crewai.flow.human_feedback import HumanFeedbackResult from crewai.flow.human_feedback import HumanFeedbackResult
@@ -570,7 +573,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
flow_id: str, flow_id: str,
persistence: FlowPersistence | None = None, persistence: FlowPersistence | None = None,
**kwargs: Any, **kwargs: Any,
) -> "Flow[Any]": ) -> Flow[Any]:
"""Create a Flow instance from a pending feedback state. """Create a Flow instance from a pending feedback state.
This classmethod is used to restore a flow that was paused waiting This classmethod is used to restore a flow that was paused waiting
@@ -631,7 +634,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
return instance return instance
@property @property
def pending_feedback(self) -> "PendingFeedbackContext | None": def pending_feedback(self) -> PendingFeedbackContext | None:
"""Get the pending feedback context if this flow is waiting for feedback. """Get the pending feedback context if this flow is waiting for feedback.
Returns: Returns:
@@ -716,9 +719,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
Raises: Raises:
ValueError: If no pending feedback context exists ValueError: If no pending feedback context exists
""" """
from crewai.flow.human_feedback import HumanFeedbackResult
from datetime import datetime from datetime import datetime
from crewai.flow.human_feedback import HumanFeedbackResult
if self._pending_feedback_context is None: if self._pending_feedback_context is None:
raise ValueError( raise ValueError(
"No pending feedback context. Use from_pending() to restore a paused flow." "No pending feedback context. Use from_pending() to restore a paused flow."
@@ -744,7 +748,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
collapsed_outcome = self._collapse_to_outcome( collapsed_outcome = self._collapse_to_outcome(
feedback=feedback, feedback=feedback,
outcomes=emit, outcomes=emit,
llm=llm, llm=llm, # type: ignore[arg-type]
) )
# Create result # Create result
@@ -792,13 +796,13 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._method_outputs.append(collapsed_outcome) self._method_outputs.append(collapsed_outcome)
# Then trigger listeners for the outcome (e.g., "approved" triggers @listen("approved")) # Then trigger listeners for the outcome (e.g., "approved" triggers @listen("approved"))
final_result = await self._execute_listeners( final_result = await self._execute_listeners( # type: ignore[func-returns-value]
FlowMethodName(collapsed_outcome), # Use outcome as trigger FlowMethodName(collapsed_outcome), # Use outcome as trigger
result, # Pass HumanFeedbackResult to listeners result, # Pass HumanFeedbackResult to listeners
) )
else: else:
# Normal behavior - pass the HumanFeedbackResult # Normal behavior - pass the HumanFeedbackResult
final_result = await self._execute_listeners( final_result = await self._execute_listeners( # type: ignore[func-returns-value]
FlowMethodName(context.method_name), FlowMethodName(context.method_name),
result, result,
) )
@@ -901,11 +905,11 @@ class Flow(Generic[T], metaclass=FlowMeta):
model_fields = getattr(self.initial_state, "model_fields", None) model_fields = getattr(self.initial_state, "model_fields", None)
if not model_fields or "id" not in model_fields: if not model_fields or "id" not in model_fields:
raise ValueError("Flow state model must have an 'id' field") raise ValueError("Flow state model must have an 'id' field")
instance = self.initial_state() instance = self.initial_state() # type: ignore[assignment]
# Ensure id is set - generate UUID if empty # Ensure id is set - generate UUID if empty
if not getattr(instance, "id", None): if not getattr(instance, "id", None):
object.__setattr__(instance, "id", str(uuid4())) object.__setattr__(instance, "id", str(uuid4()))
return instance return instance # type: ignore[return-value]
if self.initial_state is dict: if self.initial_state is dict:
return cast(T, {"id": str(uuid4())}) return cast(T, {"id": str(uuid4())})
@@ -1326,6 +1330,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
if filtered_inputs: if filtered_inputs:
self._initialize_state(filtered_inputs) self._initialize_state(filtered_inputs)
if get_current_parent_id() is None:
reset_emission_counter()
# Emit FlowStartedEvent and log the start of the flow. # Emit FlowStartedEvent and log the start of the flow.
if not self.suppress_flow_events: if not self.suppress_flow_events:
future = crewai_event_bus.emit( future = crewai_event_bus.emit(
@@ -2053,7 +2060,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
if isinstance(llm, str): if isinstance(llm, str):
llm_instance = LLM(model=llm) llm_instance = LLM(model=llm)
elif isinstance(llm, BaseLLMClass): elif isinstance(llm, BaseLLMClass):
llm_instance = llm llm_instance = llm # type: ignore[assignment]
else: else:
raise ValueError(f"Invalid llm type: {type(llm)}. Expected str or BaseLLM.") raise ValueError(f"Invalid llm type: {type(llm)}. Expected str or BaseLLM.")
@@ -2090,7 +2097,7 @@ class Flow(Generic[T], metaclass=FlowMeta):
try: try:
parsed = json.loads(response) parsed = json.loads(response)
return parsed.get("outcome", outcomes[0]) return parsed.get("outcome", outcomes[0]) # type: ignore[no-any-return]
except json.JSONDecodeError: except json.JSONDecodeError:
# Not valid JSON, might be raw outcome string # Not valid JSON, might be raw outcome string
response_clean = response.strip() response_clean = response.strip()
@@ -2099,9 +2106,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
return outcome return outcome
return outcomes[0] return outcomes[0]
elif isinstance(response, FeedbackOutcome): elif isinstance(response, FeedbackOutcome):
return response.outcome return response.outcome # type: ignore[no-any-return]
elif hasattr(response, "outcome"): elif hasattr(response, "outcome"):
return response.outcome return response.outcome # type: ignore[no-any-return]
else: else:
# Unexpected type, fall back to first outcome # Unexpected type, fall back to first outcome
logger.warning(f"Unexpected response type: {type(response)}") logger.warning(f"Unexpected response type: {type(response)}")