Files
crewAI/lib/crewai/tests/utilities/test_events.py
Lorenze Jay 32d7b4a8d4 Lorenze/feat/plan execute pattern (#4817)
* feat: introduce PlanningConfig for enhanced agent planning capabilities (#4344)

* feat: introduce PlanningConfig for enhanced agent planning capabilities

This update adds a new PlanningConfig class to manage agent planning configurations, allowing for customizable planning behavior before task execution. The existing reasoning parameter is deprecated in favor of this new configuration, ensuring backward compatibility while enhancing the planning process. Additionally, the Agent class has been updated to utilize this new configuration, and relevant utility functions have been adjusted accordingly. Tests have been added to validate the new planning functionality and ensure proper integration with existing agent workflows.

* dropping redundancy

* fix test

* revert handle_reasoning here

* refactor: update reasoning handling in Agent class

This commit modifies the Agent class to conditionally call the handle_reasoning function based on the executor class being used. The legacy CrewAgentExecutor will continue to utilize handle_reasoning, while the new AgentExecutor will manage planning internally. Additionally, the PlanningConfig class has been referenced in the documentation to clarify its role in enabling or disabling planning. Tests have been updated to reflect these changes and ensure proper functionality.

* improve planning prompts

* matching

* refactor: remove default enabled flag from PlanningConfig in Agent class

* more cassettes

* fix test

* refactor: update planning prompt and remove deprecated methods in reasoning handler

* improve planning prompt

* Lorenze/feat planning pt 2 todo list gen (#4449)

* feat: introduce PlanningConfig for enhanced agent planning capabilities

This update adds a new PlanningConfig class to manage agent planning configurations, allowing for customizable planning behavior before task execution. The existing reasoning parameter is deprecated in favor of this new configuration, ensuring backward compatibility while enhancing the planning process. Additionally, the Agent class has been updated to utilize this new configuration, and relevant utility functions have been adjusted accordingly. Tests have been added to validate the new planning functionality and ensure proper integration with existing agent workflows.

* dropping redundancy

* fix test

* revert handle_reasoning here

* refactor: update reasoning handling in Agent class

This commit modifies the Agent class to conditionally call the handle_reasoning function based on the executor class being used. The legacy CrewAgentExecutor will continue to utilize handle_reasoning, while the new AgentExecutor will manage planning internally. Additionally, the PlanningConfig class has been referenced in the documentation to clarify its role in enabling or disabling planning. Tests have been updated to reflect these changes and ensure proper functionality.

* improve planning prompts

* matching

* refactor: remove default enabled flag from PlanningConfig in Agent class

* more cassettes

* fix test

* feat: enhance agent planning with structured todo management

This commit introduces a new planning system within the AgentExecutor class, allowing for the creation of structured todo items from planning steps. The TodoList and TodoItem models have been added to facilitate tracking of plan execution. The reasoning plan now includes a list of steps, improving the clarity and organization of agent tasks. Additionally, tests have been added to validate the new planning functionality and ensure proper integration with existing workflows.

* refactor: update planning prompt and remove deprecated methods in reasoning handler

* improve planning prompt

* improve handler

* linted

* linted

* Lorenze/feat/planning pt 3 todo list execution (#4450)

* feat: introduce PlanningConfig for enhanced agent planning capabilities

This update adds a new PlanningConfig class to manage agent planning configurations, allowing for customizable planning behavior before task execution. The existing reasoning parameter is deprecated in favor of this new configuration, ensuring backward compatibility while enhancing the planning process. Additionally, the Agent class has been updated to utilize this new configuration, and relevant utility functions have been adjusted accordingly. Tests have been added to validate the new planning functionality and ensure proper integration with existing agent workflows.

* dropping redundancy

* fix test

* revert handle_reasoning here

* refactor: update reasoning handling in Agent class

This commit modifies the Agent class to conditionally call the handle_reasoning function based on the executor class being used. The legacy CrewAgentExecutor will continue to utilize handle_reasoning, while the new AgentExecutor will manage planning internally. Additionally, the PlanningConfig class has been referenced in the documentation to clarify its role in enabling or disabling planning. Tests have been updated to reflect these changes and ensure proper functionality.

* improve planning prompts

* matching

* refactor: remove default enabled flag from PlanningConfig in Agent class

* more cassettes

* fix test

* feat: enhance agent planning with structured todo management

This commit introduces a new planning system within the AgentExecutor class, allowing for the creation of structured todo items from planning steps. The TodoList and TodoItem models have been added to facilitate tracking of plan execution. The reasoning plan now includes a list of steps, improving the clarity and organization of agent tasks. Additionally, tests have been added to validate the new planning functionality and ensure proper integration with existing workflows.

* refactor: update planning prompt and remove deprecated methods in reasoning handler

* improve planning prompt

* improve handler

* execute todos and be able to track them

* feat: introduce PlannerObserver and StepExecutor for enhanced plan execution

This commit adds the PlannerObserver and StepExecutor classes to the CrewAI framework, implementing the observation phase of the Plan-and-Execute architecture. The PlannerObserver analyzes step execution results, determines plan validity, and suggests refinements, while the StepExecutor executes individual todo items in isolation. These additions improve the overall planning and execution process, allowing for more dynamic and responsive agent behavior. Additionally, new observation events have been defined to facilitate monitoring and logging of the planning process.

* refactor: enhance final answer synthesis in AgentExecutor

This commit improves the synthesis of final answers in the AgentExecutor class by implementing a more coherent approach to combining results from multiple todo items. The method now utilizes a single LLM call to generate a polished response, falling back to concatenation if the synthesis fails. Additionally, the test cases have been updated to reflect the changes in planning and execution, ensuring that the results are properly validated and that the plan-and-execute architecture is functioning as intended.

* refactor: enhance final answer synthesis in AgentExecutor

This commit improves the synthesis of final answers in the AgentExecutor class by implementing a more coherent approach to combining results from multiple todo items. The method now utilizes a single LLM call to generate a polished response, falling back to concatenation if the synthesis fails. Additionally, the test cases have been updated to reflect the changes in planning and execution, ensuring that the results are properly validated and that the plan-and-execute architecture is functioning as intended.

* refactor: implement structured output handling in final answer synthesis

This commit enhances the final answer synthesis process in the AgentExecutor class by introducing support for structured outputs when a response model is specified. The synthesis method now utilizes the response model to produce outputs that conform to the expected schema, while still falling back to concatenation in case of synthesis failures. This change ensures that intermediate steps yield free-text results, but the final output can be structured, improving the overall coherence and usability of the synthesized answers.

* regen tests

* linted

* fix

* Enhance PlanningConfig and AgentExecutor with Reasoning Effort Levels

This update introduces a new  attribute in the  class, allowing users to customize the observation and replanning behavior during task execution. The  class has been modified to utilize this new attribute, routing step observations based on the specified reasoning effort level: low, medium, or high.

Additionally, tests have been added to validate the functionality of the reasoning effort levels, ensuring that the agent behaves as expected under different configurations. This enhancement improves the adaptability and efficiency of the planning process in agent execution.

* regen cassettes for test and fix test

* cassette regen

* fixing tests

* dry

* Refactor PlannerObserver and StepExecutor to Utilize I18N for Prompts

This update enhances the PlannerObserver and StepExecutor classes by integrating the I18N utility for managing prompts and messages. The system and user prompts are now retrieved from the I18N module, allowing for better localization and maintainability. Additionally, the code has been cleaned up to remove hardcoded strings, improving readability and consistency across the planning and execution processes.

* Refactor PlannerObserver and StepExecutor to Utilize I18N for Prompts

This update enhances the PlannerObserver and StepExecutor classes by integrating the I18N utility for managing prompts and messages. The system and user prompts are now retrieved from the I18N module, allowing for better localization and maintainability. Additionally, the code has been cleaned up to remove hardcoded strings, improving readability and consistency across the planning and execution processes.

* consolidate agent logic

* fix datetime

* improving step executor

* refactor: streamline observation and refinement process in PlannerObserver

- Updated the PlannerObserver to apply structured refinements directly from observations without requiring a second LLM call.
- Renamed  method to  for clarity.
- Enhanced documentation to reflect changes in how refinements are handled.
- Removed unnecessary LLM message building and parsing logic, simplifying the refinement process.
- Updated event emissions to include summaries of refinements instead of raw data.

* enhance step executor with tool usage events and validation

- Added event emissions for tool usage, including started and finished events, to track tool execution.
- Implemented validation to ensure expected tools are called during step execution, raising errors when not.
- Refactored the  method to handle tool execution with event logging.
- Introduced a new method  for parsing tool input into a structured format.
- Updated tests to cover new functionality and ensure correct behavior of tool usage events.

* refactor: enhance final answer synthesis logic in AgentExecutor

- Updated the finalization process to conditionally skip synthesis when the last todo result is sufficient as a complete answer.
- Introduced a new method to determine if the last todo result can be used directly, improving efficiency.
- Added tests to verify the new behavior, ensuring synthesis is skipped when appropriate and maintained when a response model is set.

* fix: update observation handling in PlannerObserver for LLM errors

- Modified the error handling in the PlannerObserver to default to a conservative replan when an LLM call fails.
- Updated the return values to indicate that the step was not completed successfully and that a full replan is needed.
- Added a new test to verify the behavior of the observer when an LLM error occurs, ensuring the correct replan logic is triggered.

* refactor: enhance planning and execution flow in agents

- Updated the PlannerObserver to accept a kickoff input for standalone task execution, improving flexibility in task handling.
- Refined the step execution process in StepExecutor to support multi-turn action loops, allowing for iterative tool execution and observation.
- Introduced a method to extract relevant task sections from descriptions, ensuring clarity in task requirements.
- Enhanced the AgentExecutor to manage step failures more effectively, triggering replans only when necessary and preserving completed task history.
- Updated translations to reflect changes in planning principles and execution prompts, emphasizing concrete and executable steps.

* refactor: update setup_native_tools to include tool_name_mapping

- Modified the setup_native_tools function to return an additional mapping of tool names.
- Updated StepExecutor and AgentExecutor classes to accommodate the new return value from setup_native_tools.

* fix tests

* linted

* linted

* feat: enhance image block handling in Anthropic provider and update AgentExecutor logic

- Added a method to convert OpenAI-style image_url blocks to Anthropic's required format.
- Updated AgentExecutor to handle cases where no todos are ready, introducing a needs_replan return state.
- Improved fallback answer generation in AgentExecutor to prevent RuntimeErrors when no final output is produced.

* lint

* lint

* 1. Added failed to TodoStatus (planning_types.py)

  - TodoStatus now includes failed as a valid state: Literal[pending, running, completed, failed]
  - Added mark_failed(step_number, result) method to TodoList
  - Added get_failed_todos() method to TodoList
  - Updated is_complete to treat both completed and failed as terminal states
  - Updated replace_pending_todos docstring to mention failed items are preserved

  2. Mark running todos as failed before replan (agent_executor.py)

  All three effort-level handlers now call mark_failed() on the current todo before routing to replan_now:

  - Low effort (handle_step_observed_low): hard-failure branch
  - Medium effort (handle_step_observed_medium): needs_full_replan branch
  - High effort (decide_next_action): both needs_full_replan and step_completed_successfully=False branches

  3. Updated _should_replan to use get_failed_todos()

  Previously filtered on todo.status == failed which was dead code. Now uses the proper accessor method that will actually find failed items.

  What this fixes: Before these changes, a step that triggered a replan would stay in running status permanently, causing is_complete to never
  return True and next_pending to skip it — leading to stuck execution states. Now failed steps are properly tracked, replanning context correctly
  reports them, and LiteAgentOutput.failed_todos will actually return results.

* fix test

* imp on failed states

* adjusted the var name from AgentReActState to AgentExecutorState

* addressed p0 bugs

* more improvements

* linted

* regen cassette

* addressing crictical comments

* ensure configurable timeouts, max_replans and max step iterations

* adjusted tools

* dropping debug statements

* addressed comment

* fix  linter

* lints and test fixes

* fix: default observation parse fallback to failure and clean up plan-execute types

When _parse_observation_response fails all parse attempts, default to
step_completed_successfully=False instead of True to avoid silently
masking failures. Extract duplicate _extract_task_section into a shared
utility in agent_utils. Type PlanningConfig.llm as str | BaseLLM | None
instead of str | Any | None. Make StepResult a frozen dataclass for
immutability consistency with StepExecutionContext.

* fix: remove Any from function_calling_llm union type in step_executor

* fix: make BaseTool usage count thread-safe for parallel step execution

Add _usage_lock and _claim_usage() to BaseTool for atomic
check-and-increment of current_usage_count. This prevents race
conditions when parallel plan steps invoke the same tool concurrently
via execute_todos_parallel. Remove the racy pre-check from
execute_single_native_tool_call since the limit is now enforced
atomically inside tool.run().

---------

Co-authored-by: Greyson LaLonde <greyson.r.lalonde@gmail.com>
Co-authored-by: Greyson LaLonde <greyson@crewai.com>
2026-03-15 18:33:17 -07:00

1513 lines
48 KiB
Python

import threading
from datetime import datetime
import os
from unittest.mock import Mock, patch
from crewai.agent import Agent
from crewai.agents.crew_agent_executor import CrewAgentExecutor
from crewai.crew import Crew
from crewai.events.event_bus import crewai_event_bus
from crewai.events.event_listener import EventListener
from crewai.events.types.agent_events import (
AgentExecutionCompletedEvent,
AgentExecutionErrorEvent,
AgentExecutionStartedEvent,
)
from crewai.events.types.crew_events import (
CrewKickoffCompletedEvent,
CrewKickoffFailedEvent,
CrewKickoffStartedEvent,
CrewTestCompletedEvent,
CrewTestResultEvent,
CrewTestStartedEvent,
)
from crewai.events.types.flow_events import (
FlowCreatedEvent,
FlowFinishedEvent,
FlowStartedEvent,
HumanFeedbackReceivedEvent,
HumanFeedbackRequestedEvent,
MethodExecutionFailedEvent,
MethodExecutionFinishedEvent,
MethodExecutionStartedEvent,
)
from crewai.events.types.llm_events import (
LLMCallCompletedEvent,
LLMCallFailedEvent,
LLMCallStartedEvent,
LLMStreamChunkEvent,
)
from crewai.events.types.task_events import (
TaskCompletedEvent,
TaskFailedEvent,
TaskStartedEvent,
)
from crewai.events.types.tool_usage_events import (
ToolUsageErrorEvent,
ToolUsageFinishedEvent,
)
from crewai.flow.flow import Flow, listen, start
from crewai.flow.human_feedback import human_feedback
from crewai.llm import LLM
from crewai.task import Task
from crewai.tools.base_tool import BaseTool
from pydantic import BaseModel, Field
import pytest
from ..utils import wait_for_event_handlers
@pytest.fixture(scope="module")
def base_agent():
return Agent(
role="base_agent",
llm="gpt-4o-mini",
goal="Just say hi",
backstory="You are a helpful assistant that just says hi",
)
@pytest.fixture(scope="module")
def base_task(base_agent):
return Task(
description="Just say hi",
expected_output="hi",
agent=base_agent,
)
@pytest.fixture
def reset_event_listener_singleton():
"""Reset EventListener singleton for clean test state."""
original_instance = EventListener._instance
original_initialized = (
getattr(EventListener._instance, "_initialized", False)
if EventListener._instance
else False
)
EventListener._instance = None
yield
EventListener._instance = original_instance
if original_instance and original_initialized:
EventListener._instance._initialized = original_initialized
@pytest.mark.vcr()
def test_crew_emits_start_kickoff_event(
base_agent, base_task, reset_event_listener_singleton
):
received_events = []
mock_span = Mock()
@crewai_event_bus.on(CrewKickoffStartedEvent)
def handle_crew_start(source, event):
received_events.append(event)
mock_telemetry = Mock()
mock_telemetry.crew_execution_span = Mock(return_value=mock_span)
mock_telemetry.end_crew = Mock(return_value=mock_span)
mock_telemetry.set_tracer = Mock()
mock_telemetry.task_started = Mock(return_value=mock_span)
mock_telemetry.task_ended = Mock(return_value=mock_span)
# Patch the Telemetry class to return our mock
with patch("crewai.events.event_listener.Telemetry", return_value=mock_telemetry):
# Now when Crew creates EventListener, it will use our mocked telemetry
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
crew.kickoff()
wait_for_event_handlers()
mock_telemetry.crew_execution_span.assert_called_once_with(crew, None)
mock_telemetry.end_crew.assert_called_once_with(crew, "hi")
assert len(received_events) == 1
assert received_events[0].crew_name == "TestCrew"
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "crew_kickoff_started"
@pytest.mark.vcr()
def test_crew_emits_end_kickoff_event(base_agent, base_task):
received_events = []
event_received = threading.Event()
@crewai_event_bus.on(CrewKickoffCompletedEvent)
def handle_crew_end(source, event):
received_events.append(event)
event_received.set()
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
crew.kickoff()
assert event_received.wait(timeout=5), (
"Timeout waiting for crew kickoff completed event"
)
assert len(received_events) == 1
assert received_events[0].crew_name == "TestCrew"
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "crew_kickoff_completed"
@pytest.mark.vcr()
def test_crew_emits_test_kickoff_type_event(base_agent, base_task):
received_events = []
@crewai_event_bus.on(CrewTestStartedEvent)
def handle_crew_end(source, event):
received_events.append(event)
@crewai_event_bus.on(CrewTestCompletedEvent)
def handle_crew_test_end(source, event):
received_events.append(event)
@crewai_event_bus.on(CrewTestResultEvent)
def handle_crew_test_result(source, event):
received_events.append(event)
eval_llm = LLM(model="gpt-4o-mini")
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
crew.test(n_iterations=1, eval_llm=eval_llm)
wait_for_event_handlers()
assert len(received_events) == 3
assert received_events[0].crew_name == "TestCrew"
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "crew_test_started"
assert received_events[1].crew_name == "TestCrew"
assert isinstance(received_events[1].timestamp, datetime)
assert received_events[1].type == "crew_test_result"
assert received_events[2].crew_name == "TestCrew"
assert isinstance(received_events[2].timestamp, datetime)
assert received_events[2].type == "crew_test_completed"
@pytest.mark.vcr()
def test_crew_emits_kickoff_failed_event(base_agent, base_task):
received_events = []
event_received = threading.Event()
@crewai_event_bus.on(CrewKickoffFailedEvent)
def handle_crew_failed(source, event):
received_events.append(event)
event_received.set()
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
with patch.object(Crew, "_execute_tasks") as mock_execute:
error_message = "Simulated crew kickoff failure"
mock_execute.side_effect = Exception(error_message)
with pytest.raises(Exception): # noqa: B017
crew.kickoff()
assert event_received.wait(timeout=5), "Timeout waiting for failed event"
assert len(received_events) == 1
assert received_events[0].error == error_message
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "crew_kickoff_failed"
@pytest.mark.vcr()
def test_crew_emits_start_task_event(base_agent, base_task):
received_events = []
event_received = threading.Event()
@crewai_event_bus.on(TaskStartedEvent)
def handle_task_start(source, event):
received_events.append(event)
event_received.set()
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
crew.kickoff()
assert event_received.wait(timeout=5), "Timeout waiting for task started event"
assert len(received_events) == 1
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "task_started"
@pytest.mark.vcr()
def test_crew_emits_end_task_event(base_agent, base_task):
received_events = []
event_received = threading.Event()
@crewai_event_bus.on(TaskCompletedEvent)
def handle_task_end(source, event):
received_events.append(event)
event_received.set()
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
crew.kickoff()
assert event_received.wait(timeout=5), "Timeout waiting for task completed event"
assert len(received_events) == 1
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "task_completed"
@pytest.mark.vcr()
def test_task_emits_failed_event_on_execution_error(base_agent, base_task):
received_events = []
received_sources = []
event_received = threading.Event()
@crewai_event_bus.on(TaskFailedEvent)
def handle_task_failed(source, event):
received_events.append(event)
received_sources.append(source)
event_received.set()
with patch.object(
Task,
"_execute_core",
) as mock_execute:
error_message = "Simulated task failure"
mock_execute.side_effect = Exception(error_message)
agent = Agent(
role="base_agent",
goal="Just say hi",
backstory="You are a helpful assistant that just says hi",
)
task = Task(
description="Just say hi",
expected_output="hi",
agent=agent,
)
with pytest.raises(Exception): # noqa: B017
agent.execute_task(task=task)
assert event_received.wait(timeout=5), (
"Timeout waiting for task failed event"
)
assert len(received_events) == 1
assert received_sources[0] == task
assert received_events[0].error == error_message
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "task_failed"
@pytest.mark.vcr()
def test_agent_emits_execution_started_and_completed_events(base_agent, base_task):
started_events: list[AgentExecutionStartedEvent] = []
completed_events: list[AgentExecutionCompletedEvent] = []
condition = threading.Condition()
@crewai_event_bus.on(AgentExecutionStartedEvent)
def handle_agent_start(source, event):
with condition:
started_events.append(event)
condition.notify()
@crewai_event_bus.on(AgentExecutionCompletedEvent)
def handle_agent_completed(source, event):
with condition:
completed_events.append(event)
condition.notify()
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
crew.kickoff()
with condition:
success = condition.wait_for(
lambda: len(started_events) >= 1 and len(completed_events) >= 1,
timeout=10,
)
assert success, "Timeout waiting for agent execution events"
assert len(started_events) == 1
assert len(completed_events) == 1
assert started_events[0].agent == base_agent
assert started_events[0].task == base_task
assert started_events[0].tools == []
assert isinstance(started_events[0].task_prompt, str)
assert (
started_events[0].task_prompt
== "Just say hi\n\nThis is the expected criteria for your final answer: hi\nyou MUST return the actual complete content as the final answer, not a summary."
)
assert isinstance(started_events[0].timestamp, datetime)
assert started_events[0].type == "agent_execution_started"
assert isinstance(completed_events[0].timestamp, datetime)
assert completed_events[0].type == "agent_execution_completed"
@pytest.mark.vcr()
def test_agent_emits_execution_error_event(base_agent, base_task):
received_events = []
event_received = threading.Event()
@crewai_event_bus.on(AgentExecutionErrorEvent)
def handle_agent_start(source, event):
received_events.append(event)
event_received.set()
error_message = "Error happening while sending prompt to model."
base_agent.max_retry_limit = 0
# Patch at the class level since agent_executor is created lazily
with patch.object(
CrewAgentExecutor, "invoke", side_effect=Exception(error_message)
):
with pytest.raises(Exception): # noqa: B017
base_agent.execute_task(
task=base_task,
)
assert event_received.wait(timeout=5), (
"Timeout waiting for agent execution error event"
)
assert len(received_events) == 1
assert received_events[0].agent == base_agent
assert received_events[0].task == base_task
assert received_events[0].error == error_message
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "agent_execution_error"
class SayHiTool(BaseTool):
name: str = Field(default="say_hi", description="The name of the tool")
description: str = Field(
default="Say hi", description="The description of the tool"
)
def _run(self) -> str:
return "hi"
@pytest.mark.vcr()
def test_tools_emits_finished_events():
received_events = []
event_received = threading.Event()
@crewai_event_bus.on(ToolUsageFinishedEvent)
def handle_tool_end(source, event):
received_events.append(event)
event_received.set()
agent = Agent(
role="base_agent",
goal="Just say hi",
backstory="You are a helpful assistant that just says hi",
tools=[SayHiTool()],
)
task = Task(
description="Just say hi",
expected_output="hi",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], name="TestCrew")
crew.kickoff()
assert event_received.wait(timeout=5), (
"Timeout waiting for tool usage finished event"
)
assert len(received_events) == 1
assert received_events[0].agent_key == agent.key
assert received_events[0].agent_role == agent.role
assert received_events[0].tool_name == SayHiTool().name
assert received_events[0].tool_args == "{}" or received_events[0].tool_args == {}
assert received_events[0].type == "tool_usage_finished"
assert isinstance(received_events[0].timestamp, datetime)
@pytest.mark.vcr()
def test_tools_emits_error_events():
received_events = []
lock = threading.Lock()
all_events_received = threading.Event()
@crewai_event_bus.on(ToolUsageErrorEvent)
def handle_tool_end(source, event):
with lock:
received_events.append(event)
# Set event when we receive at least 1 error event
if len(received_events) >= 1:
all_events_received.set()
class ErrorTool(BaseTool):
name: str = Field(
default="error_tool", description="A tool that raises an error"
)
description: str = Field(
default="This tool always raises an error",
description="The description of the tool",
)
def _run(self) -> str:
raise Exception("Simulated tool error")
agent = Agent(
role="base_agent",
goal="Try to use the error tool",
backstory="You are an assistant that tests error handling",
tools=[ErrorTool()],
llm=LLM(model="gpt-4o-mini"),
)
task = Task(
description="Use the error tool",
expected_output="This should error",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], name="TestCrew")
crew.kickoff()
assert all_events_received.wait(timeout=10), (
"Timeout waiting for tool usage error events"
)
# At least one error event should be received (number varies by execution path)
assert len(received_events) >= 1
assert received_events[0].agent_key == agent.key
assert received_events[0].agent_role == agent.role
assert received_events[0].tool_name == "error_tool"
assert received_events[0].tool_args == "{}" or received_events[0].tool_args == {}
assert str(received_events[0].error) == "Simulated tool error"
assert received_events[0].type == "tool_usage_error"
assert isinstance(received_events[0].timestamp, datetime)
def test_flow_emits_start_event(reset_event_listener_singleton):
received_events = []
event_received = threading.Event()
mock_span = Mock()
@crewai_event_bus.on(FlowStartedEvent)
def handle_flow_start(source, event):
received_events.append(event)
event_received.set()
class TestFlow(Flow[dict]):
@start()
def begin(self):
return "started"
mock_telemetry = Mock()
mock_telemetry.flow_execution_span = Mock(return_value=mock_span)
mock_telemetry.flow_creation_span = Mock()
mock_telemetry.set_tracer = Mock()
with patch("crewai.events.event_listener.Telemetry", return_value=mock_telemetry):
# Force creation of EventListener singleton with mocked telemetry
_ = EventListener()
flow = TestFlow()
flow.kickoff()
assert event_received.wait(timeout=5), "Timeout waiting for flow started event"
mock_telemetry.flow_execution_span.assert_called_once_with("TestFlow", ["begin"])
assert len(received_events) == 1
assert received_events[0].flow_name == "TestFlow"
assert received_events[0].type == "flow_started"
def test_flow_name_emitted_to_event_bus():
received_events = []
event_received = threading.Event()
class MyFlowClass(Flow):
name = "PRODUCTION_FLOW"
@start()
def start(self):
return "Hello, world!"
@crewai_event_bus.on(FlowStartedEvent)
def handle_flow_start(source, event):
received_events.append(event)
event_received.set()
flow = MyFlowClass()
flow.kickoff()
assert event_received.wait(timeout=5), "Timeout waiting for flow started event"
assert len(received_events) == 1
assert received_events[0].flow_name == "PRODUCTION_FLOW"
def test_flow_emits_finish_event():
received_events = []
event_received = threading.Event()
@crewai_event_bus.on(FlowFinishedEvent)
def handle_flow_finish(source, event):
received_events.append(event)
event_received.set()
class TestFlow(Flow[dict]):
@start()
def begin(self):
return "completed"
flow = TestFlow()
result = flow.kickoff()
assert event_received.wait(timeout=5), "Timeout waiting for finish event"
assert len(received_events) == 1
assert received_events[0].flow_name == "TestFlow"
assert received_events[0].type == "flow_finished"
assert received_events[0].result == "completed"
assert result == "completed"
def test_flow_emits_method_execution_started_event():
received_events = []
lock = threading.Lock()
second_event_received = threading.Event()
@crewai_event_bus.on(MethodExecutionStartedEvent)
async def handle_method_start(source, event):
with lock:
received_events.append(event)
if event.method_name == "second_method":
second_event_received.set()
class TestFlow(Flow[dict]):
@start()
def begin(self):
return "started"
@listen("begin")
def second_method(self):
return "executed"
flow = TestFlow()
flow.kickoff()
assert second_event_received.wait(timeout=5), (
"Timeout waiting for second_method event"
)
assert len(received_events) == 2
# Events may arrive in any order due to async handlers, so check both are present
method_names = {event.method_name for event in received_events}
assert method_names == {"begin", "second_method"}
for event in received_events:
assert event.flow_name == "TestFlow"
assert event.type == "method_execution_started"
@pytest.mark.vcr()
def test_register_handler_adds_new_handler(base_agent, base_task):
received_events = []
event_received = threading.Event()
def custom_handler(source, event):
received_events.append(event)
event_received.set()
crewai_event_bus.register_handler(CrewKickoffStartedEvent, custom_handler)
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
crew.kickoff()
assert event_received.wait(timeout=5), "Timeout waiting for handler event"
assert len(received_events) == 1
assert isinstance(received_events[0].timestamp, datetime)
assert received_events[0].type == "crew_kickoff_started"
@pytest.mark.vcr()
def test_multiple_handlers_for_same_event(base_agent, base_task):
received_events_1 = []
received_events_2 = []
event_received = threading.Event()
def handler_1(source, event):
received_events_1.append(event)
def handler_2(source, event):
received_events_2.append(event)
event_received.set()
crewai_event_bus.register_handler(CrewKickoffStartedEvent, handler_1)
crewai_event_bus.register_handler(CrewKickoffStartedEvent, handler_2)
crew = Crew(agents=[base_agent], tasks=[base_task], name="TestCrew")
crew.kickoff()
assert event_received.wait(timeout=5), "Timeout waiting for handler events"
assert len(received_events_1) == 1
assert len(received_events_2) == 1
assert received_events_1[0].type == "crew_kickoff_started"
assert received_events_2[0].type == "crew_kickoff_started"
def test_flow_emits_created_event():
received_events = []
event_received = threading.Event()
@crewai_event_bus.on(FlowCreatedEvent)
def handle_flow_created(source, event):
received_events.append(event)
event_received.set()
class TestFlow(Flow[dict]):
@start()
def begin(self):
return "started"
flow = TestFlow()
flow.kickoff()
assert event_received.wait(timeout=5), "Timeout waiting for flow created event"
assert len(received_events) == 1
assert received_events[0].flow_name == "TestFlow"
assert received_events[0].type == "flow_created"
def test_flow_emits_method_execution_failed_event():
received_events = []
event_received = threading.Event()
error = Exception("Simulated method failure")
@crewai_event_bus.on(MethodExecutionFailedEvent)
def handle_method_failed(source, event):
received_events.append(event)
event_received.set()
class TestFlow(Flow[dict]):
@start()
def begin(self):
raise error
flow = TestFlow()
with pytest.raises(Exception): # noqa: B017
flow.kickoff()
assert event_received.wait(timeout=5), (
"Timeout waiting for method execution failed event"
)
assert len(received_events) == 1
assert received_events[0].method_name == "begin"
assert received_events[0].flow_name == "TestFlow"
assert received_events[0].type == "method_execution_failed"
assert received_events[0].error == error
def test_flow_method_execution_started_includes_unstructured_state():
"""Test that MethodExecutionStartedEvent includes unstructured (dict) state."""
received_events = []
event_received = threading.Event()
@crewai_event_bus.on(MethodExecutionStartedEvent)
def handle_method_started(source, event):
received_events.append(event)
if event.method_name == "process":
event_received.set()
class TestFlow(Flow[dict]):
@start()
def begin(self):
self.state["counter"] = 1
self.state["message"] = "test"
return "started"
@listen("begin")
def process(self):
self.state["counter"] = 2
return "processed"
flow = TestFlow()
flow.kickoff()
assert event_received.wait(timeout=5), (
"Timeout waiting for method execution started event"
)
# Find the events for each method
begin_event = next(e for e in received_events if e.method_name == "begin")
process_event = next(e for e in received_events if e.method_name == "process")
# Verify state is included and is a dict
assert begin_event.state is not None
assert isinstance(begin_event.state, dict)
assert "id" in begin_event.state # Auto-generated ID
# Verify state from begin method is captured in process event
assert process_event.state is not None
assert isinstance(process_event.state, dict)
assert process_event.state["counter"] == 1
assert process_event.state["message"] == "test"
def test_flow_method_execution_started_includes_structured_state():
"""Test that MethodExecutionStartedEvent includes structured (BaseModel) state and serializes it properly."""
received_events = []
event_received = threading.Event()
class FlowState(BaseModel):
counter: int = 0
message: str = ""
items: list[str] = []
@crewai_event_bus.on(MethodExecutionStartedEvent)
def handle_method_started(source, event):
received_events.append(event)
if event.method_name == "process":
event_received.set()
class TestFlow(Flow[FlowState]):
@start()
def begin(self):
self.state.counter = 1
self.state.message = "initial"
self.state.items = ["a", "b"]
return "started"
@listen("begin")
def process(self):
self.state.counter += 1
return "processed"
flow = TestFlow()
flow.kickoff()
assert event_received.wait(timeout=5), (
"Timeout waiting for method execution started event"
)
begin_event = next(e for e in received_events if e.method_name == "begin")
process_event = next(e for e in received_events if e.method_name == "process")
assert begin_event.state is not None
assert isinstance(begin_event.state, dict)
assert begin_event.state["counter"] == 0 # Initial state
assert begin_event.state["message"] == ""
assert begin_event.state["items"] == []
assert process_event.state is not None
assert isinstance(process_event.state, dict)
assert process_event.state["counter"] == 1
assert process_event.state["message"] == "initial"
assert process_event.state["items"] == ["a", "b"]
def test_flow_method_execution_finished_includes_serialized_state():
"""Test that MethodExecutionFinishedEvent includes properly serialized state."""
received_events = []
event_received = threading.Event()
class FlowState(BaseModel):
result: str = ""
completed: bool = False
@crewai_event_bus.on(MethodExecutionFinishedEvent)
def handle_method_finished(source, event):
received_events.append(event)
if event.method_name == "process":
event_received.set()
class TestFlow(Flow[FlowState]):
@start()
def begin(self):
self.state.result = "begin done"
return "started"
@listen("begin")
def process(self):
self.state.result = "process done"
self.state.completed = True
return "final_result"
flow = TestFlow()
final_output = flow.kickoff()
assert event_received.wait(timeout=5), (
"Timeout waiting for method execution finished event"
)
begin_finished = next(e for e in received_events if e.method_name == "begin")
process_finished = next(e for e in received_events if e.method_name == "process")
assert begin_finished.state is not None
assert isinstance(begin_finished.state, dict)
assert begin_finished.state["result"] == "begin done"
assert begin_finished.state["completed"] is False
assert begin_finished.result == "started"
# Verify process finished event has final state and result
assert process_finished.state is not None
assert isinstance(process_finished.state, dict)
assert process_finished.state["result"] == "process done"
assert process_finished.state["completed"] is True
assert process_finished.result == "final_result"
assert final_output == "final_result"
@pytest.mark.vcr()
def test_llm_emits_call_started_event():
started_events: list[LLMCallStartedEvent] = []
completed_events: list[LLMCallCompletedEvent] = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallStartedEvent)
def handle_llm_call_started(source, event):
with condition:
started_events.append(event)
condition.notify()
@crewai_event_bus.on(LLMCallCompletedEvent)
def handle_llm_call_completed(source, event):
with condition:
completed_events.append(event)
condition.notify()
llm = LLM(model="gpt-4o-mini")
llm.call("Hello, how are you?")
with condition:
success = condition.wait_for(
lambda: len(started_events) >= 1 and len(completed_events) >= 1,
timeout=10,
)
assert success, "Timeout waiting for LLM events"
assert started_events[0].type == "llm_call_started"
assert completed_events[0].type == "llm_call_completed"
assert started_events[0].task_name is None
assert started_events[0].agent_role is None
assert started_events[0].agent_id is None
assert started_events[0].task_id is None
@pytest.mark.vcr()
def test_llm_emits_call_failed_event():
received_events = []
event_received = threading.Event()
@crewai_event_bus.on(LLMCallFailedEvent)
def handle_llm_call_failed(source, event):
received_events.append(event)
event_received.set()
error_message = "OpenAI API call failed: Simulated API failure"
with patch(
"crewai.llms.providers.openai.completion.OpenAICompletion._handle_completion"
) as mock_handle_completion:
mock_handle_completion.side_effect = Exception("Simulated API failure")
llm = LLM(model="gpt-4o-mini")
with pytest.raises(Exception) as exc_info:
llm.call("Hello, how are you?")
assert str(exc_info.value) == "Simulated API failure"
assert event_received.wait(timeout=5), "Timeout waiting for failed event"
assert len(received_events) == 1
assert received_events[0].type == "llm_call_failed"
assert received_events[0].error == error_message
assert received_events[0].task_name is None
assert received_events[0].agent_role is None
assert received_events[0].agent_id is None
assert received_events[0].task_id is None
@pytest.mark.vcr()
def test_llm_emits_stream_chunk_events():
"""Test that LLM emits stream chunk events when streaming is enabled."""
received_chunks = []
event_received = threading.Event()
@crewai_event_bus.on(LLMStreamChunkEvent)
def handle_stream_chunk(source, event):
received_chunks.append(event.chunk)
if len(received_chunks) >= 1:
event_received.set()
# Create an LLM with streaming enabled
llm = LLM(model="gpt-4o", stream=True)
# Call the LLM with a simple message
response = llm.call("Tell me a short joke")
# Wait for at least one chunk
assert event_received.wait(timeout=5), "Timeout waiting for stream chunks"
# Verify that we received chunks
assert len(received_chunks) > 0
# Verify that concatenating all chunks equals the final response
assert "".join(received_chunks) == response
@pytest.mark.vcr()
def test_llm_no_stream_chunks_when_streaming_disabled():
"""Test that LLM doesn't emit stream chunk events when streaming is disabled."""
received_chunks = []
@crewai_event_bus.on(LLMStreamChunkEvent)
def handle_stream_chunk(source, event):
received_chunks.append(event.chunk)
# Create an LLM with streaming disabled
llm = LLM(model="gpt-4o", stream=False)
# Call the LLM with a simple message
response = llm.call("Tell me a short joke")
# Verify that we didn't receive any chunks
assert len(received_chunks) == 0
# Verify we got a response
assert response and isinstance(response, str)
@pytest.mark.vcr()
def test_streaming_fallback_to_non_streaming():
"""Test that streaming falls back to non-streaming when there's an error."""
received_chunks = []
fallback_called = False
event_received = threading.Event()
@crewai_event_bus.on(LLMStreamChunkEvent)
def handle_stream_chunk(source, event):
received_chunks.append(event.chunk)
if len(received_chunks) >= 2:
event_received.set()
# Create an LLM with streaming enabled
llm = LLM(model="gpt-4o", stream=True)
# Store original methods
original_call = llm.call
# Create a mock call method that handles the streaming error
def mock_call(messages, tools=None, callbacks=None, available_functions=None):
nonlocal fallback_called
# Emit a couple of chunks to simulate partial streaming
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 1", response_id="Id", call_id="test-call-id"))
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="Test chunk 2", response_id="Id", call_id="test-call-id"))
# Mark that fallback would be called
fallback_called = True
# Return a response as if fallback succeeded
return "Fallback response after streaming error"
# Replace the call method with our mock
llm.call = mock_call
try:
# Call the LLM
response = llm.call("Tell me a short joke")
wait_for_event_handlers()
assert event_received.wait(timeout=5), "Timeout waiting for stream chunks"
# Verify that we received some chunks
assert len(received_chunks) == 2
assert received_chunks[0] == "Test chunk 1"
assert received_chunks[1] == "Test chunk 2"
# Verify fallback was triggered
assert fallback_called
# Verify we got the fallback response
assert response == "Fallback response after streaming error"
finally:
# Restore the original method
llm.call = original_call
@pytest.mark.vcr()
def test_streaming_empty_response_handling():
"""Test that streaming handles empty responses correctly."""
received_chunks = []
event_received = threading.Event()
@crewai_event_bus.on(LLMStreamChunkEvent)
def handle_stream_chunk(source, event):
received_chunks.append(event.chunk)
if len(received_chunks) >= 3:
event_received.set()
# Create an LLM with streaming enabled
llm = LLM(model="gpt-3.5-turbo", stream=True)
# Store original methods
original_call = llm.call
# Create a mock call method that simulates empty chunks
def mock_call(messages, tools=None, callbacks=None, available_functions=None):
# Emit a few empty chunks
for _ in range(3):
crewai_event_bus.emit(llm, event=LLMStreamChunkEvent(chunk="", response_id="id", call_id="test-call-id"))
# Return the default message for empty responses
return "I apologize, but I couldn't generate a proper response. Please try again or rephrase your request."
# Replace the call method with our mock
llm.call = mock_call
try:
# Call the LLM - this should handle empty response
response = llm.call("Tell me a short joke")
assert event_received.wait(timeout=5), "Timeout waiting for empty chunks"
# Verify that we received empty chunks
assert len(received_chunks) == 3
assert all(chunk == "" for chunk in received_chunks)
# Verify the response is the default message for empty responses
assert "I apologize" in response and "couldn't generate" in response
finally:
# Restore the original method
llm.call = original_call
@pytest.mark.vcr()
def test_stream_llm_emits_event_with_task_and_agent_info():
completed_event = []
failed_event = []
started_event = []
stream_event = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallFailedEvent)
def handle_llm_failed(source, event):
with condition:
failed_event.append(event)
condition.notify()
@crewai_event_bus.on(LLMCallStartedEvent)
def handle_llm_started(source, event):
with condition:
started_event.append(event)
condition.notify()
@crewai_event_bus.on(LLMCallCompletedEvent)
def handle_llm_completed(source, event):
with condition:
completed_event.append(event)
condition.notify()
@crewai_event_bus.on(LLMStreamChunkEvent)
def handle_llm_stream_chunk(source, event):
with condition:
stream_event.append(event)
condition.notify()
agent = Agent(
role="TestAgent",
llm=LLM(model="gpt-4o-mini", stream=True),
goal="Just say hi",
backstory="You are a helpful assistant that just says hi",
)
task = Task(
description="Just say hi",
expected_output="hi",
llm=LLM(model="gpt-4o-mini", stream=True),
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
crew.kickoff()
with condition:
success = condition.wait_for(
lambda: len(completed_event) >= 1
and len(started_event) >= 1
and len(stream_event) >= 12,
timeout=10,
)
assert success, "Timeout waiting for LLM events"
assert len(completed_event) == 1
assert len(failed_event) == 0
assert len(started_event) == 1
assert len(stream_event) == 12
all_events = completed_event + failed_event + started_event + stream_event
all_agent_roles = [event.agent_role for event in all_events]
all_agent_id = [event.agent_id for event in all_events]
all_task_id = [event.task_id for event in all_events]
all_task_name = [event.task_name for event in all_events]
# ensure all events have the agent + task props set
assert len(all_agent_roles) == 14
assert len(all_agent_id) == 14
assert len(all_task_id) == 14
assert len(all_task_name) == 14
assert set(all_agent_roles) == {agent.role}
assert set(all_agent_id) == {str(agent.id)}
assert set(all_task_id) == {str(task.id)}
assert set(all_task_name) == {task.name or task.description}
@pytest.mark.vcr()
def test_llm_emits_event_with_task_and_agent_info(base_agent, base_task):
completed_event: list[LLMCallCompletedEvent] = []
failed_event: list[LLMCallFailedEvent] = []
started_event: list[LLMCallStartedEvent] = []
stream_event: list[LLMStreamChunkEvent] = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallFailedEvent)
def handle_llm_failed(source, event):
with condition:
failed_event.append(event)
condition.notify()
@crewai_event_bus.on(LLMCallStartedEvent)
def handle_llm_started(source, event):
with condition:
started_event.append(event)
condition.notify()
@crewai_event_bus.on(LLMCallCompletedEvent)
def handle_llm_completed(source, event):
with condition:
completed_event.append(event)
condition.notify()
@crewai_event_bus.on(LLMStreamChunkEvent)
def handle_llm_stream_chunk(source, event):
with condition:
stream_event.append(event)
condition.notify()
crew = Crew(agents=[base_agent], tasks=[base_task])
crew.kickoff()
with condition:
success = condition.wait_for(
lambda: len(completed_event) >= 1 and len(started_event) >= 1,
timeout=10,
)
assert success, "Timeout waiting for LLM events"
assert len(completed_event) == 1
assert len(failed_event) == 0
assert len(started_event) == 1
assert len(stream_event) == 0
all_events = completed_event + failed_event + started_event + stream_event
all_agent_roles = [event.agent_role for event in all_events]
all_agent_id = [event.agent_id for event in all_events]
all_task_id = [event.task_id for event in all_events]
all_task_name = [event.task_name for event in all_events]
# ensure all events have the agent + task props set
assert len(all_agent_roles) == 2
assert len(all_agent_id) == 2
assert len(all_task_id) == 2
assert len(all_task_name) == 2
assert set(all_agent_roles) == {base_agent.role}
assert set(all_agent_id) == {str(base_agent.id)}
assert set(all_task_id) == {str(base_task.id)}
assert set(all_task_name) == {base_task.name or base_task.description}
@pytest.mark.vcr()
def test_llm_emits_event_with_lite_agent():
completed_event = []
failed_event = []
started_event = []
stream_event = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallFailedEvent)
def handle_llm_failed(source, event):
with condition:
failed_event.append(event)
condition.notify()
@crewai_event_bus.on(LLMCallStartedEvent)
def handle_llm_started(source, event):
with condition:
started_event.append(event)
condition.notify()
@crewai_event_bus.on(LLMCallCompletedEvent)
def handle_llm_completed(source, event):
with condition:
completed_event.append(event)
condition.notify()
@crewai_event_bus.on(LLMStreamChunkEvent)
def handle_llm_stream_chunk(source, event):
with condition:
stream_event.append(event)
condition.notify()
agent = Agent(
role="Speaker",
llm=LLM(model="gpt-4o-mini", stream=True),
goal="Just say hi",
backstory="You are a helpful assistant that just says hi",
)
agent.kickoff(messages=[{"role": "user", "content": "say hi!"}])
with condition:
success = condition.wait_for(
lambda: len(completed_event) >= 1
and len(started_event) >= 1
and len(stream_event) >= 1,
timeout=10,
)
assert success, "Timeout waiting for all events"
assert len(completed_event) == 1
assert len(failed_event) == 0
assert len(started_event) == 1
assert len(stream_event) >= 1
all_events = completed_event + failed_event + started_event + stream_event
all_agent_roles = [event.agent_role for event in all_events]
all_agent_id = [event.agent_id for event in all_events]
all_task_id = [event.task_id for event in all_events if event.task_id]
all_task_name = [event.task_name for event in all_events if event.task_name]
# ensure all events have the agent + task props set
expected_total = 1 + 1 + len(stream_event) # completed + started + stream
assert len(all_agent_roles) == expected_total
assert len(all_agent_id) == expected_total
assert len(all_task_id) == 0
assert len(all_task_name) == 0
assert set(all_agent_roles) == {agent.role}
assert set(all_agent_id) == {str(agent.id)}
# ----------- CALL_ID CORRELATION TESTS -----------
@pytest.mark.vcr()
def test_llm_call_events_share_call_id():
"""All events from a single LLM call should share the same call_id."""
import uuid
events = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallStartedEvent)
def on_start(source, event):
with condition:
events.append(event)
condition.notify()
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_complete(source, event):
with condition:
events.append(event)
condition.notify()
llm = LLM(model="gpt-4o-mini")
llm.call("Say hi")
with condition:
success = condition.wait_for(lambda: len(events) >= 2, timeout=10)
assert success, "Timeout waiting for LLM events"
# Behavior: all events from the call share the same call_id
assert len(events) == 2
assert events[0].call_id == events[1].call_id
# call_id should be a valid UUID
uuid.UUID(events[0].call_id)
@pytest.mark.vcr()
def test_streaming_chunks_share_call_id_with_call():
"""Streaming chunks should share call_id with started/completed events."""
events = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallStartedEvent)
def on_start(source, event):
with condition:
events.append(event)
condition.notify()
@crewai_event_bus.on(LLMStreamChunkEvent)
def on_chunk(source, event):
with condition:
events.append(event)
condition.notify()
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_complete(source, event):
with condition:
events.append(event)
condition.notify()
llm = LLM(model="gpt-4o-mini", stream=True)
llm.call("Say hi")
with condition:
# Wait for at least started, some chunks, and completed
success = condition.wait_for(lambda: len(events) >= 3, timeout=10)
assert success, "Timeout waiting for streaming events"
# Behavior: all events (started, chunks, completed) share the same call_id
call_ids = {e.call_id for e in events}
assert len(call_ids) == 1
@pytest.mark.vcr()
def test_separate_llm_calls_have_different_call_ids():
"""Different LLM calls should have different call_ids."""
call_ids = []
condition = threading.Condition()
@crewai_event_bus.on(LLMCallStartedEvent)
def on_start(source, event):
with condition:
call_ids.append(event.call_id)
condition.notify()
llm = LLM(model="gpt-4o-mini")
llm.call("Say hi")
llm.call("Say bye")
with condition:
success = condition.wait_for(lambda: len(call_ids) >= 2, timeout=10)
assert success, "Timeout waiting for LLM call events"
# Behavior: each call has its own call_id
assert len(call_ids) == 2
assert call_ids[0] != call_ids[1]
# ----------- HUMAN FEEDBACK EVENTS -----------
@patch("builtins.input", return_value="looks good")
@patch("builtins.print")
def test_human_feedback_emits_requested_and_received_events(mock_print, mock_input):
"""Test that @human_feedback decorator emits HumanFeedbackRequested and Received events."""
requested_events = []
received_events = []
events_received = threading.Event()
@crewai_event_bus.on(HumanFeedbackRequestedEvent)
def handle_requested(source, event):
requested_events.append(event)
@crewai_event_bus.on(HumanFeedbackReceivedEvent)
def handle_received(source, event):
received_events.append(event)
events_received.set()
class TestFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
)
def review(self):
return "test content"
flow = TestFlow()
with patch.object(flow, "_collapse_to_outcome", return_value="approved"):
flow.kickoff()
assert events_received.wait(timeout=5), (
"Timeout waiting for human feedback events"
)
assert len(requested_events) == 1
assert requested_events[0].type == "human_feedback_requested"
assert requested_events[0].emit == ["approved", "rejected"]
assert requested_events[0].message == "Review:"
assert requested_events[0].output == "test content"
assert len(received_events) == 1
assert received_events[0].type == "human_feedback_received"
assert received_events[0].feedback == "looks good"
assert received_events[0].outcome is None
assert flow.last_human_feedback is not None
assert flow.last_human_feedback.outcome == "approved"
@patch("builtins.input", return_value="feedback text")
@patch("builtins.print")
def test_human_feedback_without_routing_emits_events(mock_print, mock_input):
"""Test that @human_feedback without emit still emits events."""
requested_events = []
received_events = []
events_received = threading.Event()
@crewai_event_bus.on(HumanFeedbackRequestedEvent)
def handle_requested(source, event):
requested_events.append(event)
@crewai_event_bus.on(HumanFeedbackReceivedEvent)
def handle_received(source, event):
received_events.append(event)
events_received.set()
class SimpleFlow(Flow):
@start()
@human_feedback(message="Please review:")
def review(self):
return "content to review"
flow = SimpleFlow()
flow.kickoff()
assert events_received.wait(timeout=5), (
"Timeout waiting for human feedback events"
)
assert len(requested_events) == 1
assert requested_events[0].emit is None
assert len(received_events) == 1
assert received_events[0].feedback == "feedback text"
assert received_events[0].outcome is None
@patch("builtins.input", return_value="")
@patch("builtins.print")
def test_human_feedback_empty_feedback_emits_events(mock_print, mock_input):
"""Test that empty feedback (skipped) still emits events correctly."""
received_events = []
events_received = threading.Event()
@crewai_event_bus.on(HumanFeedbackReceivedEvent)
def handle_received(source, event):
received_events.append(event)
events_received.set()
class SkipFlow(Flow):
@start()
@human_feedback(
message="Review:",
emit=["approved", "rejected"],
llm="gpt-4o-mini",
default_outcome="rejected",
)
def review(self):
return "content"
flow = SkipFlow()
flow.kickoff()
assert events_received.wait(timeout=5), (
"Timeout waiting for human feedback events"
)
assert len(received_events) == 1
assert received_events[0].feedback == ""
assert received_events[0].outcome is None
assert flow.last_human_feedback is not None
assert flow.last_human_feedback.outcome == "rejected"