From 6bfc98e960fd6bd78978d9131005d2eb2b5cb7e5 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Wed, 4 Feb 2026 15:40:22 -0500 Subject: [PATCH 1/6] refactor: extract hitl to provider pattern * refactor: extract hitl to provider pattern - add humaninputprovider protocol with setup_messages and handle_feedback - move sync hitl logic from executor to synchuman inputprovider - add _passthrough_exceptions extension point in agent/core.py - create crewai.core.providers module for extensible components - remove _ask_human_input from base_agent_executor_mixin --- lib/crewai/src/crewai/agent/core.py | 6 + .../base_agent_executor_mixin.py | 50 --- .../src/crewai/agents/crew_agent_executor.py | 121 ++----- lib/crewai/src/crewai/core/__init__.py | 1 + .../src/crewai/core/providers/__init__.py | 1 + .../core/providers/content_processor.py | 78 +++++ .../src/crewai/core/providers/human_input.py | 304 ++++++++++++++++++ lib/crewai/src/crewai/task.py | 20 +- lib/crewai/tests/agents/test_agent.py | 18 +- .../test_flow_human_input_integration.py | 57 ++-- 10 files changed, 465 insertions(+), 191 deletions(-) create mode 100644 lib/crewai/src/crewai/core/__init__.py create mode 100644 lib/crewai/src/crewai/core/providers/__init__.py create mode 100644 lib/crewai/src/crewai/core/providers/content_processor.py create mode 100644 lib/crewai/src/crewai/core/providers/human_input.py diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index 6c2626a28..47eb841b4 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -118,6 +118,8 @@ MCP_TOOL_EXECUTION_TIMEOUT: Final[int] = 30 MCP_DISCOVERY_TIMEOUT: Final[int] = 15 MCP_MAX_RETRIES: Final[int] = 3 +_passthrough_exceptions: tuple[type[Exception], ...] = () + # Simple in-memory cache for MCP tool schemas (duration: 5 minutes) _mcp_schema_cache: dict[str, Any] = {} _cache_ttl: Final[int] = 300 # 5 minutes @@ -479,6 +481,8 @@ class Agent(BaseAgent): ), ) raise e + if isinstance(e, _passthrough_exceptions): + raise self._times_executed += 1 if self._times_executed > self.max_retry_limit: crewai_event_bus.emit( @@ -711,6 +715,8 @@ class Agent(BaseAgent): ), ) raise e + if isinstance(e, _passthrough_exceptions): + raise self._times_executed += 1 if self._times_executed > self.max_retry_limit: crewai_event_bus.emit( diff --git a/lib/crewai/src/crewai/agents/agent_builder/base_agent_executor_mixin.py b/lib/crewai/src/crewai/agents/agent_builder/base_agent_executor_mixin.py index c9dceaa84..03787c802 100644 --- a/lib/crewai/src/crewai/agents/agent_builder/base_agent_executor_mixin.py +++ b/lib/crewai/src/crewai/agents/agent_builder/base_agent_executor_mixin.py @@ -4,7 +4,6 @@ import time from typing import TYPE_CHECKING from crewai.agents.parser import AgentFinish -from crewai.events.event_listener import event_listener from crewai.memory.entity.entity_memory_item import EntityMemoryItem from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem from crewai.utilities.converter import ConverterError @@ -138,52 +137,3 @@ class CrewAgentExecutorMixin: content="Long term memory is enabled, but entity memory is not enabled. Please configure entity memory or set memory=True to automatically enable it.", color="bold_yellow", ) - - def _ask_human_input(self, final_answer: str) -> str: - """Prompt human input with mode-appropriate messaging. - - Note: The final answer is already displayed via the AgentLogsExecutionEvent - panel, so we only show the feedback prompt here. - """ - from rich.panel import Panel - from rich.text import Text - - formatter = event_listener.formatter - formatter.pause_live_updates() - - try: - # Training mode prompt (single iteration) - if self.crew and getattr(self.crew, "_train", False): - prompt_text = ( - "TRAINING MODE: Provide feedback to improve the agent's performance.\n\n" - "This will be used to train better versions of the agent.\n" - "Please provide detailed feedback about the result quality and reasoning process." - ) - title = "🎓 Training Feedback Required" - # Regular human-in-the-loop prompt (multiple iterations) - else: - prompt_text = ( - "Provide feedback on the Final Result above.\n\n" - "• If you are happy with the result, simply hit Enter without typing anything.\n" - "• Otherwise, provide specific improvement requests.\n" - "• You can provide multiple rounds of feedback until satisfied." - ) - title = "💬 Human Feedback Required" - - content = Text() - content.append(prompt_text, style="yellow") - - prompt_panel = Panel( - content, - title=title, - border_style="yellow", - padding=(1, 2), - ) - formatter.console.print(prompt_panel) - - response = input() - if response.strip() != "": - formatter.console.print("\n[cyan]Processing your feedback...[/cyan]") - return response - finally: - formatter.resume_live_updates() diff --git a/lib/crewai/src/crewai/agents/crew_agent_executor.py b/lib/crewai/src/crewai/agents/crew_agent_executor.py index 1218ceae8..c0f24516a 100644 --- a/lib/crewai/src/crewai/agents/crew_agent_executor.py +++ b/lib/crewai/src/crewai/agents/crew_agent_executor.py @@ -19,6 +19,7 @@ from crewai.agents.parser import ( AgentFinish, OutputParserError, ) +from crewai.core.providers.human_input import ExecutorContext, get_provider from crewai.events.event_bus import crewai_event_bus from crewai.events.types.logging_events import ( AgentLogsExecutionEvent, @@ -175,15 +176,16 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): """ return self.llm.supports_stop_words() if self.llm else False - def invoke(self, inputs: dict[str, Any]) -> dict[str, Any]: - """Execute the agent with given inputs. + def _setup_messages(self, inputs: dict[str, Any]) -> None: + """Set up messages for the agent execution. Args: inputs: Input dictionary containing prompt variables. - - Returns: - Dictionary with agent output. """ + provider = get_provider() + if provider.setup_messages(cast(ExecutorContext, cast(object, self))): + return + if "system" in self.prompt: system_prompt = self._format_prompt( cast(str, self.prompt.get("system", "")), inputs @@ -197,6 +199,19 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): user_prompt = self._format_prompt(self.prompt.get("prompt", ""), inputs) self.messages.append(format_message_for_llm(user_prompt)) + provider.post_setup_messages(cast(ExecutorContext, cast(object, self))) + + def invoke(self, inputs: dict[str, Any]) -> dict[str, Any]: + """Execute the agent with given inputs. + + Args: + inputs: Input dictionary containing prompt variables. + + Returns: + Dictionary with agent output. + """ + self._setup_messages(inputs) + self._inject_multimodal_files(inputs) self._show_start_logs() @@ -970,18 +985,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): Returns: Dictionary with agent output. """ - if "system" in self.prompt: - system_prompt = self._format_prompt( - cast(str, self.prompt.get("system", "")), inputs - ) - user_prompt = self._format_prompt( - cast(str, self.prompt.get("user", "")), inputs - ) - self.messages.append(format_message_for_llm(system_prompt, role="system")) - self.messages.append(format_message_for_llm(user_prompt)) - else: - user_prompt = self._format_prompt(self.prompt.get("prompt", ""), inputs) - self.messages.append(format_message_for_llm(user_prompt)) + self._setup_messages(inputs) await self._ainject_multimodal_files(inputs) @@ -1491,7 +1495,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): return prompt.replace("{tools}", inputs["tools"]) def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish: - """Process human feedback. + """Process human feedback via the configured provider. Args: formatted_answer: Initial agent result. @@ -1499,17 +1503,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): Returns: Final answer after feedback. """ - output_str = ( - formatted_answer.output - if isinstance(formatted_answer.output, str) - else formatted_answer.output.model_dump_json() - ) - human_feedback = self._ask_human_input(output_str) - - if self._is_training_mode(): - return self._handle_training_feedback(formatted_answer, human_feedback) - - return self._handle_regular_feedback(formatted_answer, human_feedback) + provider = get_provider() + return provider.handle_feedback(formatted_answer, self) def _is_training_mode(self) -> bool: """Check if training mode is active. @@ -1519,74 +1514,18 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): """ return bool(self.crew and self.crew._train) - def _handle_training_feedback( - self, initial_answer: AgentFinish, feedback: str - ) -> AgentFinish: - """Process training feedback. + def _format_feedback_message(self, feedback: str) -> LLMMessage: + """Format feedback as a message for the LLM. Args: - initial_answer: Initial agent output. - feedback: Training feedback. + feedback: User feedback string. Returns: - Improved answer. + Formatted message dict. """ - self._handle_crew_training_output(initial_answer, feedback) - self.messages.append( - format_message_for_llm( - self._i18n.slice("feedback_instructions").format(feedback=feedback) - ) + return format_message_for_llm( + self._i18n.slice("feedback_instructions").format(feedback=feedback) ) - improved_answer = self._invoke_loop() - self._handle_crew_training_output(improved_answer) - self.ask_for_human_input = False - return improved_answer - - def _handle_regular_feedback( - self, current_answer: AgentFinish, initial_feedback: str - ) -> AgentFinish: - """Process regular feedback iteratively. - - Args: - current_answer: Current agent output. - initial_feedback: Initial user feedback. - - Returns: - Final answer after iterations. - """ - feedback = initial_feedback - answer = current_answer - - while self.ask_for_human_input: - # If the user provides a blank response, assume they are happy with the result - if feedback.strip() == "": - self.ask_for_human_input = False - else: - answer = self._process_feedback_iteration(feedback) - output_str = ( - answer.output - if isinstance(answer.output, str) - else answer.output.model_dump_json() - ) - feedback = self._ask_human_input(output_str) - - return answer - - def _process_feedback_iteration(self, feedback: str) -> AgentFinish: - """Process single feedback iteration. - - Args: - feedback: User feedback. - - Returns: - Updated agent response. - """ - self.messages.append( - format_message_for_llm( - self._i18n.slice("feedback_instructions").format(feedback=feedback) - ) - ) - return self._invoke_loop() @classmethod def __get_pydantic_core_schema__( diff --git a/lib/crewai/src/crewai/core/__init__.py b/lib/crewai/src/crewai/core/__init__.py new file mode 100644 index 000000000..714ed9161 --- /dev/null +++ b/lib/crewai/src/crewai/core/__init__.py @@ -0,0 +1 @@ +"""Core crewAI components and interfaces.""" diff --git a/lib/crewai/src/crewai/core/providers/__init__.py b/lib/crewai/src/crewai/core/providers/__init__.py new file mode 100644 index 000000000..fc0a9ed4b --- /dev/null +++ b/lib/crewai/src/crewai/core/providers/__init__.py @@ -0,0 +1 @@ +"""Provider interfaces for extensible crewAI components.""" diff --git a/lib/crewai/src/crewai/core/providers/content_processor.py b/lib/crewai/src/crewai/core/providers/content_processor.py new file mode 100644 index 000000000..828a7e311 --- /dev/null +++ b/lib/crewai/src/crewai/core/providers/content_processor.py @@ -0,0 +1,78 @@ +"""Content processor provider for extensible content processing.""" + +from contextvars import ContextVar +from typing import Any, Protocol, runtime_checkable + + +@runtime_checkable +class ContentProcessorProvider(Protocol): + """Protocol for content processing during task execution.""" + + def process(self, content: str, context: dict[str, Any] | None = None) -> str: + """Process content before use. + + Args: + content: The content to process. + context: Optional context information. + + Returns: + The processed content. + """ + ... + + +class NoOpContentProcessor: + """Default processor that returns content unchanged.""" + + def process(self, content: str, context: dict[str, Any] | None = None) -> str: + """Return content unchanged. + + Args: + content: The content to process. + context: Optional context information (unused). + + Returns: + The original content unchanged. + """ + return content + + +_content_processor: ContextVar[ContentProcessorProvider | None] = ContextVar( + "_content_processor", default=None +) + +_default_processor = NoOpContentProcessor() + + +def get_processor() -> ContentProcessorProvider: + """Get the current content processor. + + Returns: + The registered content processor or the default no-op processor. + """ + processor = _content_processor.get() + if processor is not None: + return processor + return _default_processor + + +def set_processor(processor: ContentProcessorProvider) -> None: + """Set the content processor for the current context. + + Args: + processor: The content processor to use. + """ + _content_processor.set(processor) + + +def process_content(content: str, context: dict[str, Any] | None = None) -> str: + """Process content using the registered processor. + + Args: + content: The content to process. + context: Optional context information. + + Returns: + The processed content. + """ + return get_processor().process(content, context) diff --git a/lib/crewai/src/crewai/core/providers/human_input.py b/lib/crewai/src/crewai/core/providers/human_input.py new file mode 100644 index 000000000..4062e6bb9 --- /dev/null +++ b/lib/crewai/src/crewai/core/providers/human_input.py @@ -0,0 +1,304 @@ +"""Human input provider for HITL (Human-in-the-Loop) flows.""" + +from __future__ import annotations + +from contextvars import ContextVar, Token +from typing import TYPE_CHECKING, Protocol, runtime_checkable + + +if TYPE_CHECKING: + from crewai.agent.core import Agent + from crewai.agents.parser import AgentFinish + from crewai.crew import Crew + from crewai.llms.base_llm import BaseLLM + from crewai.task import Task + from crewai.utilities.types import LLMMessage + + +class ExecutorContext(Protocol): + """Context interface for human input providers to interact with executor.""" + + task: Task | None + crew: Crew | None + messages: list[LLMMessage] + ask_for_human_input: bool + llm: BaseLLM + agent: Agent + + def _invoke_loop(self) -> AgentFinish: + """Invoke the agent loop and return the result.""" + ... + + def _is_training_mode(self) -> bool: + """Check if training mode is active.""" + ... + + def _handle_crew_training_output( + self, + result: AgentFinish, + human_feedback: str | None = None, + ) -> None: + """Handle training output.""" + ... + + def _format_feedback_message(self, feedback: str) -> LLMMessage: + """Format feedback as a message.""" + ... + + +@runtime_checkable +class HumanInputProvider(Protocol): + """Protocol for human input handling. + + Implementations handle the full feedback flow: + - Sync: prompt user, loop until satisfied + - Async: raise exception for external handling + """ + + def setup_messages(self, context: ExecutorContext) -> bool: + """Set up messages for execution. + + Called before standard message setup. Allows providers to handle + conversation resumption or other custom message initialization. + + Args: + context: Executor context with messages list to modify. + + Returns: + True if messages were set up (skip standard setup), + False to use standard setup. + """ + ... + + def post_setup_messages(self, context: ExecutorContext) -> None: + """Called after standard message setup. + + Allows providers to modify messages after standard setup completes. + Only called when setup_messages returned False. + + Args: + context: Executor context with messages list to modify. + """ + ... + + def handle_feedback( + self, + formatted_answer: AgentFinish, + context: ExecutorContext, + ) -> AgentFinish: + """Handle the full human feedback flow. + + Args: + formatted_answer: The agent's current answer. + context: Executor context for callbacks. + + Returns: + The final answer after feedback processing. + + Raises: + Exception: Async implementations may raise to signal external handling. + """ + ... + + @staticmethod + def _get_output_string(answer: AgentFinish) -> str: + """Extract output string from answer. + + Args: + answer: The agent's finished answer. + + Returns: + String representation of the output. + """ + if isinstance(answer.output, str): + return answer.output + return answer.output.model_dump_json() + + +class SyncHumanInputProvider(HumanInputProvider): + """Default synchronous human input via terminal.""" + + def setup_messages(self, context: ExecutorContext) -> bool: + """Use standard message setup. + + Args: + context: Executor context (unused). + + Returns: + False to use standard setup. + """ + return False + + def post_setup_messages(self, context: ExecutorContext) -> None: + """No-op for sync provider. + + Args: + context: Executor context (unused). + """ + + def handle_feedback( + self, + formatted_answer: AgentFinish, + context: ExecutorContext, + ) -> AgentFinish: + """Handle feedback synchronously with terminal prompts. + + Args: + formatted_answer: The agent's current answer. + context: Executor context for callbacks. + + Returns: + The final answer after feedback processing. + """ + feedback = self._prompt_input(context.crew) + + if context._is_training_mode(): + return self._handle_training_feedback(formatted_answer, feedback, context) + + return self._handle_regular_feedback(formatted_answer, feedback, context) + + @staticmethod + def _handle_training_feedback( + initial_answer: AgentFinish, + feedback: str, + context: ExecutorContext, + ) -> AgentFinish: + """Process training feedback (single iteration). + + Args: + initial_answer: The agent's initial answer. + feedback: Human feedback string. + context: Executor context for callbacks. + + Returns: + Improved answer after processing feedback. + """ + context._handle_crew_training_output(initial_answer, feedback) + context.messages.append(context._format_feedback_message(feedback)) + improved_answer = context._invoke_loop() + context._handle_crew_training_output(improved_answer) + context.ask_for_human_input = False + return improved_answer + + def _handle_regular_feedback( + self, + current_answer: AgentFinish, + initial_feedback: str, + context: ExecutorContext, + ) -> AgentFinish: + """Process regular feedback with iteration loop. + + Args: + current_answer: The agent's current answer. + initial_feedback: Initial human feedback string. + context: Executor context for callbacks. + + Returns: + Final answer after all feedback iterations. + """ + feedback = initial_feedback + answer = current_answer + + while context.ask_for_human_input: + if feedback.strip() == "": + context.ask_for_human_input = False + else: + context.messages.append(context._format_feedback_message(feedback)) + answer = context._invoke_loop() + feedback = self._prompt_input(context.crew) + + return answer + + @staticmethod + def _prompt_input(crew: Crew | None) -> str: + """Show rich panel and prompt for input. + + Args: + crew: The crew instance for context. + + Returns: + User input string from terminal. + """ + from rich.panel import Panel + from rich.text import Text + + from crewai.events.event_listener import event_listener + + formatter = event_listener.formatter + formatter.pause_live_updates() + + try: + if crew and getattr(crew, "_train", False): + prompt_text = ( + "TRAINING MODE: Provide feedback to improve the agent's performance.\n\n" + "This will be used to train better versions of the agent.\n" + "Please provide detailed feedback about the result quality and reasoning process." + ) + title = "🎓 Training Feedback Required" + else: + prompt_text = ( + "Provide feedback on the Final Result above.\n\n" + "• If you are happy with the result, simply hit Enter without typing anything.\n" + "• Otherwise, provide specific improvement requests.\n" + "• You can provide multiple rounds of feedback until satisfied." + ) + title = "💬 Human Feedback Required" + + content = Text() + content.append(prompt_text, style="yellow") + + prompt_panel = Panel( + content, + title=title, + border_style="yellow", + padding=(1, 2), + ) + formatter.console.print(prompt_panel) + + response = input() + if response.strip() != "": + formatter.console.print("\n[cyan]Processing your feedback...[/cyan]") + return response + finally: + formatter.resume_live_updates() + + +_provider: ContextVar[HumanInputProvider | None] = ContextVar( + "human_input_provider", + default=None, +) + + +def get_provider() -> HumanInputProvider: + """Get the current human input provider. + + Returns: + The current provider, or a new SyncHumanInputProvider if none set. + """ + provider = _provider.get() + if provider is None: + initialized_provider = SyncHumanInputProvider() + set_provider(initialized_provider) + return initialized_provider + return provider + + +def set_provider(provider: HumanInputProvider) -> Token[HumanInputProvider | None]: + """Set the human input provider for the current context. + + Args: + provider: The provider to use. + + Returns: + Token that can be used to reset to previous value. + """ + return _provider.set(provider) + + +def reset_provider(token: Token[HumanInputProvider | None]) -> None: + """Reset the provider to its previous value. + + Args: + token: Token returned from set_provider. + """ + _provider.reset(token) diff --git a/lib/crewai/src/crewai/task.py b/lib/crewai/src/crewai/task.py index 77056f0ca..d73c3d919 100644 --- a/lib/crewai/src/crewai/task.py +++ b/lib/crewai/src/crewai/task.py @@ -31,6 +31,7 @@ from pydantic_core import PydanticCustomError from typing_extensions import Self from crewai.agents.agent_builder.base_agent import BaseAgent +from crewai.core.providers.content_processor import process_content from crewai.events.event_bus import crewai_event_bus from crewai.events.types.task_events import ( TaskCompletedEvent, @@ -496,6 +497,7 @@ class Task(BaseModel): tools: list[BaseTool] | None = None, ) -> TaskOutput: """Execute the task synchronously.""" + self.start_time = datetime.datetime.now() return self._execute_core(agent, context, tools) @property @@ -536,6 +538,7 @@ class Task(BaseModel): ) -> None: """Execute the task asynchronously with context handling.""" try: + self.start_time = datetime.datetime.now() result = self._execute_core(agent, context, tools) future.set_result(result) except Exception as e: @@ -548,6 +551,7 @@ class Task(BaseModel): tools: list[BaseTool] | None = None, ) -> TaskOutput: """Execute the task asynchronously using native async/await.""" + self.start_time = datetime.datetime.now() return await self._aexecute_core(agent, context, tools) async def _aexecute_core( @@ -566,8 +570,6 @@ class Task(BaseModel): f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical." ) - self.start_time = datetime.datetime.now() - self.prompt_context = context tools = tools or self.tools or [] @@ -579,6 +581,8 @@ class Task(BaseModel): tools=tools, ) + self._post_agent_execution(agent) + if not self._guardrails and not self._guardrail: pydantic_output, json_output = self._export_output(result) else: @@ -661,8 +665,6 @@ class Task(BaseModel): f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical." ) - self.start_time = datetime.datetime.now() - self.prompt_context = context tools = tools or self.tools or [] @@ -674,6 +676,8 @@ class Task(BaseModel): tools=tools, ) + self._post_agent_execution(agent) + if not self._guardrails and not self._guardrail: pydantic_output, json_output = self._export_output(result) else: @@ -741,6 +745,9 @@ class Task(BaseModel): finally: clear_task_files(self.id) + def _post_agent_execution(self, agent: BaseAgent) -> None: + pass + def prompt(self) -> str: """Generates the task prompt with optional markdown formatting. @@ -863,6 +870,11 @@ Follow these guidelines: except ValueError as e: raise ValueError(f"Error interpolating description: {e!s}") from e + self.description = process_content(self.description, {"task": self}) + self._original_expected_output = process_content( + self._original_expected_output, {"task": self} + ) + try: self.expected_output = interpolate_only( input_string=self._original_expected_output, inputs=inputs diff --git a/lib/crewai/tests/agents/test_agent.py b/lib/crewai/tests/agents/test_agent.py index 32130f900..025bfd334 100644 --- a/lib/crewai/tests/agents/test_agent.py +++ b/lib/crewai/tests/agents/test_agent.py @@ -703,6 +703,8 @@ def test_agent_definition_based_on_dict(): # test for human input @pytest.mark.vcr() def test_agent_human_input(): + from crewai.core.providers.human_input import SyncHumanInputProvider + # Agent configuration config = { "role": "test role", @@ -720,7 +722,7 @@ def test_agent_human_input(): human_input=True, ) - # Side effect function for _ask_human_input to simulate multiple feedback iterations + # Side effect function for _prompt_input to simulate multiple feedback iterations feedback_responses = iter( [ "Don't say hi, say Hello instead!", # First feedback: instruct change @@ -728,16 +730,16 @@ def test_agent_human_input(): ] ) - def ask_human_input_side_effect(*args, **kwargs): + def prompt_input_side_effect(*args, **kwargs): return next(feedback_responses) - # Patch both _ask_human_input and _invoke_loop to avoid real API/network calls. + # Patch both _prompt_input on provider and _invoke_loop to avoid real API/network calls. with ( patch.object( - CrewAgentExecutor, - "_ask_human_input", - side_effect=ask_human_input_side_effect, - ) as mock_human_input, + SyncHumanInputProvider, + "_prompt_input", + side_effect=prompt_input_side_effect, + ) as mock_prompt_input, patch.object( CrewAgentExecutor, "_invoke_loop", @@ -749,7 +751,7 @@ def test_agent_human_input(): # Assertions to ensure the agent behaves correctly. # It should have requested feedback twice. - assert mock_human_input.call_count == 2 + assert mock_prompt_input.call_count == 2 # The final result should be processed to "Hello" assert output.strip().lower() == "hello" diff --git a/lib/crewai/tests/test_flow_human_input_integration.py b/lib/crewai/tests/test_flow_human_input_integration.py index e60cfe514..3ce4ebbd7 100644 --- a/lib/crewai/tests/test_flow_human_input_integration.py +++ b/lib/crewai/tests/test_flow_human_input_integration.py @@ -2,6 +2,7 @@ from unittest.mock import MagicMock, patch import pytest from crewai.events.event_listener import event_listener +from crewai.core.providers.human_input import SyncHumanInputProvider class TestFlowHumanInputIntegration: @@ -24,14 +25,9 @@ class TestFlowHumanInputIntegration: @patch("builtins.input", return_value="") def test_human_input_pauses_flow_updates(self, mock_input): """Test that human input pauses Flow status updates.""" - from crewai.agents.agent_builder.base_agent_executor_mixin import ( - CrewAgentExecutorMixin, - ) - - executor = CrewAgentExecutorMixin() - executor.crew = MagicMock() - executor.crew._train = False - executor._printer = MagicMock() + provider = SyncHumanInputProvider() + crew = MagicMock() + crew._train = False formatter = event_listener.formatter @@ -39,7 +35,7 @@ class TestFlowHumanInputIntegration: patch.object(formatter, "pause_live_updates") as mock_pause, patch.object(formatter, "resume_live_updates") as mock_resume, ): - result = executor._ask_human_input("Test result") + result = provider._prompt_input(crew) mock_pause.assert_called_once() mock_resume.assert_called_once() @@ -49,14 +45,9 @@ class TestFlowHumanInputIntegration: @patch("builtins.input", side_effect=["feedback", ""]) def test_multiple_human_input_rounds(self, mock_input): """Test multiple rounds of human input with Flow status management.""" - from crewai.agents.agent_builder.base_agent_executor_mixin import ( - CrewAgentExecutorMixin, - ) - - executor = CrewAgentExecutorMixin() - executor.crew = MagicMock() - executor.crew._train = False - executor._printer = MagicMock() + provider = SyncHumanInputProvider() + crew = MagicMock() + crew._train = False formatter = event_listener.formatter @@ -75,10 +66,10 @@ class TestFlowHumanInputIntegration: formatter, "resume_live_updates", side_effect=track_resume ), ): - result1 = executor._ask_human_input("Test result 1") + result1 = provider._prompt_input(crew) assert result1 == "feedback" - result2 = executor._ask_human_input("Test result 2") + result2 = provider._prompt_input(crew) assert result2 == "" assert len(pause_calls) == 2 @@ -103,14 +94,9 @@ class TestFlowHumanInputIntegration: def test_pause_resume_exception_handling(self): """Test that resume is called even if exception occurs during human input.""" - from crewai.agents.agent_builder.base_agent_executor_mixin import ( - CrewAgentExecutorMixin, - ) - - executor = CrewAgentExecutorMixin() - executor.crew = MagicMock() - executor.crew._train = False - executor._printer = MagicMock() + provider = SyncHumanInputProvider() + crew = MagicMock() + crew._train = False formatter = event_listener.formatter @@ -122,21 +108,16 @@ class TestFlowHumanInputIntegration: ), ): with pytest.raises(KeyboardInterrupt): - executor._ask_human_input("Test result") + provider._prompt_input(crew) mock_pause.assert_called_once() mock_resume.assert_called_once() def test_training_mode_human_input(self): """Test human input in training mode.""" - from crewai.agents.agent_builder.base_agent_executor_mixin import ( - CrewAgentExecutorMixin, - ) - - executor = CrewAgentExecutorMixin() - executor.crew = MagicMock() - executor.crew._train = True - executor._printer = MagicMock() + provider = SyncHumanInputProvider() + crew = MagicMock() + crew._train = True formatter = event_listener.formatter @@ -146,7 +127,7 @@ class TestFlowHumanInputIntegration: patch.object(formatter.console, "print") as mock_console_print, patch("builtins.input", return_value="training feedback"), ): - result = executor._ask_human_input("Test result") + result = provider._prompt_input(crew) mock_pause.assert_called_once() mock_resume.assert_called_once() @@ -161,4 +142,4 @@ class TestFlowHumanInputIntegration: for call in call_args if call[0] ) - assert training_panel_found + assert training_panel_found \ No newline at end of file From d86d43d3e0918fe2c17001edd920c438487813b8 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Wed, 4 Feb 2026 16:05:21 -0500 Subject: [PATCH 2/6] chore: refactor crew to provider Enable dynamic extension exports and small behavior fixes across events and flow modules: - events/__init__.py: Added _extension_exports and extended __getattr__ to lazily resolve registered extension values or import paths. - events/event_bus.py: Implemented off() to unregister sync/async handlers, clean handler dependencies, and invalidate execution plan cache. - events/listeners/tracing/utils.py: Added Callable import and _first_time_trace_hook to allow overriding first-time trace auto-collection behavior. - events/types/tool_usage_events.py: Changed ToolUsageEvent.run_attempts default from None to 0 to avoid nullable handling. - events/utils/console_formatter.py: Respect CREWAI_DISABLE_VERSION_CHECK env var to skip version checks in CI-like flows. - flow/async_feedback/__init__.py: Added typing.Any import, _extension_exports and __getattr__ to support extensions via attribute lookup. These changes add extension points and safer defaults, and provide a way to unregister event handlers. --- lib/crewai/src/crewai/crew.py | 19 ++++- lib/crewai/src/crewai/events/__init__.py | 18 ++++- lib/crewai/src/crewai/events/event_bus.py | 33 +++++++++ .../crewai/events/listeners/tracing/utils.py | 7 +- .../crewai/events/types/tool_usage_events.py | 8 +-- .../crewai/events/utils/console_formatter.py | 3 + .../crewai/flow/async_feedback/__init__.py | 13 ++++ .../crewai/knowledge/source/utils/__init__.py | 1 + .../knowledge/source/utils/source_helper.py | 70 +++++++++++++++++++ .../evaluators/crew_evaluator_handler.py | 16 +++-- 10 files changed, 176 insertions(+), 12 deletions(-) create mode 100644 lib/crewai/src/crewai/knowledge/source/utils/__init__.py create mode 100644 lib/crewai/src/crewai/knowledge/source/utils/source_helper.py diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index f22dd4b72..baabce190 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -751,6 +751,8 @@ class Crew(FlowTrackable, BaseModel): for after_callback in self.after_kickoff_callbacks: result = after_callback(result) + result = self._post_kickoff(result) + self.usage_metrics = self.calculate_usage_metrics() return result @@ -764,6 +766,9 @@ class Crew(FlowTrackable, BaseModel): clear_files(self.id) detach(token) + def _post_kickoff(self, result: CrewOutput) -> CrewOutput: + return result + def kickoff_for_each( self, inputs: list[dict[str, Any]], @@ -936,6 +941,8 @@ class Crew(FlowTrackable, BaseModel): for after_callback in self.after_kickoff_callbacks: result = after_callback(result) + result = self._post_kickoff(result) + self.usage_metrics = self.calculate_usage_metrics() return result @@ -1181,6 +1188,9 @@ class Crew(FlowTrackable, BaseModel): self.manager_agent = manager manager.crew = self + def _get_execution_start_index(self, tasks: list[Task]) -> int | None: + return None + def _execute_tasks( self, tasks: list[Task], @@ -1197,6 +1207,9 @@ class Crew(FlowTrackable, BaseModel): Returns: CrewOutput: Final output of the crew """ + custom_start = self._get_execution_start_index(tasks) + if custom_start is not None: + start_index = custom_start task_outputs: list[TaskOutput] = [] futures: list[tuple[Task, Future[TaskOutput], int]] = [] @@ -1305,8 +1318,10 @@ class Crew(FlowTrackable, BaseModel): if files: supported_types: list[str] = [] if agent and agent.llm and agent.llm.supports_multimodal(): - provider = getattr(agent.llm, "provider", None) or getattr( - agent.llm, "model", "openai" + provider = ( + getattr(agent.llm, "provider", None) + or getattr(agent.llm, "model", None) + or "openai" ) api = getattr(agent.llm, "api", None) supported_types = get_supported_content_types(provider, api) diff --git a/lib/crewai/src/crewai/events/__init__.py b/lib/crewai/src/crewai/events/__init__.py index 61c0ec380..a6f213a54 100644 --- a/lib/crewai/src/crewai/events/__init__.py +++ b/lib/crewai/src/crewai/events/__init__.py @@ -195,6 +195,7 @@ __all__ = [ "ToolUsageFinishedEvent", "ToolUsageStartedEvent", "ToolValidateInputErrorEvent", + "_extension_exports", "crewai_event_bus", ] @@ -210,14 +211,29 @@ _AGENT_EVENT_MAPPING = { "LiteAgentExecutionStartedEvent": "crewai.events.types.agent_events", } +_extension_exports: dict[str, Any] = {} + def __getattr__(name: str) -> Any: - """Lazy import for agent events to avoid circular imports.""" + """Lazy import for agent events and registered extensions.""" if name in _AGENT_EVENT_MAPPING: import importlib module_path = _AGENT_EVENT_MAPPING[name] module = importlib.import_module(module_path) return getattr(module, name) + + if name in _extension_exports: + import importlib + + value = _extension_exports[name] + if isinstance(value, str): + module_path, _, attr_name = value.rpartition(".") + if module_path: + module = importlib.import_module(module_path) + return getattr(module, attr_name) + return importlib.import_module(value) + return value + msg = f"module {__name__!r} has no attribute {name!r}" raise AttributeError(msg) diff --git a/lib/crewai/src/crewai/events/event_bus.py b/lib/crewai/src/crewai/events/event_bus.py index 5c4bec58f..d0aaa4455 100644 --- a/lib/crewai/src/crewai/events/event_bus.py +++ b/lib/crewai/src/crewai/events/event_bus.py @@ -227,6 +227,39 @@ class CrewAIEventsBus: return decorator + def off( + self, + event_type: type[BaseEvent], + handler: Callable[..., Any], + ) -> None: + """Unregister an event handler for a specific event type. + + Args: + event_type: The event class to stop listening for + handler: The handler function to unregister + """ + with self._rwlock.w_locked(): + if event_type in self._sync_handlers: + existing_sync = self._sync_handlers[event_type] + if handler in existing_sync: + self._sync_handlers[event_type] = existing_sync - {handler} + if not self._sync_handlers[event_type]: + del self._sync_handlers[event_type] + + if event_type in self._async_handlers: + existing_async = self._async_handlers[event_type] + if handler in existing_async: + self._async_handlers[event_type] = existing_async - {handler} + if not self._async_handlers[event_type]: + del self._async_handlers[event_type] + + if event_type in self._handler_dependencies: + self._handler_dependencies[event_type].pop(handler, None) + if not self._handler_dependencies[event_type]: + del self._handler_dependencies[event_type] + + self._execution_plan_cache.pop(event_type, None) + def _call_handlers( self, source: Any, diff --git a/lib/crewai/src/crewai/events/listeners/tracing/utils.py b/lib/crewai/src/crewai/events/listeners/tracing/utils.py index 13e26dacb..b6bf1026b 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/utils.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/utils.py @@ -1,3 +1,4 @@ +from collections.abc import Callable from contextvars import ContextVar, Token from datetime import datetime import getpass @@ -26,6 +27,8 @@ logger = logging.getLogger(__name__) _tracing_enabled: ContextVar[bool | None] = ContextVar("_tracing_enabled", default=None) +_first_time_trace_hook: Callable[[], bool] | None = None + def should_enable_tracing(*, override: bool | None = None) -> bool: """Determine if tracing should be enabled. @@ -407,10 +410,12 @@ def truncate_messages( def should_auto_collect_first_time_traces() -> bool: """True if we should auto-collect traces for first-time user. - Returns: True if first-time user AND telemetry not disabled AND tracing not explicitly enabled, False otherwise. """ + if _first_time_trace_hook is not None: + return _first_time_trace_hook() + if _is_test_environment(): return False diff --git a/lib/crewai/src/crewai/events/types/tool_usage_events.py b/lib/crewai/src/crewai/events/types/tool_usage_events.py index 7fe9b897f..c4e681546 100644 --- a/lib/crewai/src/crewai/events/types/tool_usage_events.py +++ b/lib/crewai/src/crewai/events/types/tool_usage_events.py @@ -16,7 +16,7 @@ class ToolUsageEvent(BaseEvent): tool_name: str tool_args: dict[str, Any] | str tool_class: str | None = None - run_attempts: int | None = None + run_attempts: int = 0 delegations: int | None = None agent: Any | None = None task_name: str | None = None @@ -26,7 +26,7 @@ class ToolUsageEvent(BaseEvent): model_config = ConfigDict(arbitrary_types_allowed=True) - def __init__(self, **data): + def __init__(self, **data: Any) -> None: if data.get("from_task"): task = data["from_task"] data["task_id"] = str(task.id) @@ -96,10 +96,10 @@ class ToolExecutionErrorEvent(BaseEvent): type: str = "tool_execution_error" tool_name: str tool_args: dict[str, Any] - tool_class: Callable + tool_class: Callable[..., Any] agent: Any | None = None - def __init__(self, **data): + def __init__(self, **data: Any) -> None: super().__init__(**data) # Set fingerprint data from the agent if self.agent and hasattr(self.agent, "fingerprint") and self.agent.fingerprint: diff --git a/lib/crewai/src/crewai/events/utils/console_formatter.py b/lib/crewai/src/crewai/events/utils/console_formatter.py index ac6caabcf..eaecc0e74 100644 --- a/lib/crewai/src/crewai/events/utils/console_formatter.py +++ b/lib/crewai/src/crewai/events/utils/console_formatter.py @@ -49,6 +49,9 @@ class ConsoleFormatter: if os.getenv("CI", "").lower() in ("true", "1"): return + if os.getenv("CREWAI_DISABLE_VERSION_CHECK", "").lower() in ("true", "1"): + return + try: is_newer, current, latest = is_newer_version_available() if is_newer and latest: diff --git a/lib/crewai/src/crewai/flow/async_feedback/__init__.py b/lib/crewai/src/crewai/flow/async_feedback/__init__.py index 286fdaa8d..02590a785 100644 --- a/lib/crewai/src/crewai/flow/async_feedback/__init__.py +++ b/lib/crewai/src/crewai/flow/async_feedback/__init__.py @@ -28,6 +28,8 @@ Example: ``` """ +from typing import Any + from crewai.flow.async_feedback.providers import ConsoleProvider from crewai.flow.async_feedback.types import ( HumanFeedbackPending, @@ -41,4 +43,15 @@ __all__ = [ "HumanFeedbackPending", "HumanFeedbackProvider", "PendingFeedbackContext", + "_extension_exports", ] + +_extension_exports: dict[str, Any] = {} + + +def __getattr__(name: str) -> Any: + """Support extensions via dynamic attribute lookup.""" + if name in _extension_exports: + return _extension_exports[name] + msg = f"module {__name__!r} has no attribute {name!r}" + raise AttributeError(msg) diff --git a/lib/crewai/src/crewai/knowledge/source/utils/__init__.py b/lib/crewai/src/crewai/knowledge/source/utils/__init__.py new file mode 100644 index 000000000..0f7c5142c --- /dev/null +++ b/lib/crewai/src/crewai/knowledge/source/utils/__init__.py @@ -0,0 +1 @@ +"""Knowledge source utilities.""" diff --git a/lib/crewai/src/crewai/knowledge/source/utils/source_helper.py b/lib/crewai/src/crewai/knowledge/source/utils/source_helper.py new file mode 100644 index 000000000..9ab41cd30 --- /dev/null +++ b/lib/crewai/src/crewai/knowledge/source/utils/source_helper.py @@ -0,0 +1,70 @@ +"""Helper utilities for knowledge sources.""" + +from typing import Any, ClassVar + +from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource +from crewai.knowledge.source.csv_knowledge_source import CSVKnowledgeSource +from crewai.knowledge.source.excel_knowledge_source import ExcelKnowledgeSource +from crewai.knowledge.source.json_knowledge_source import JSONKnowledgeSource +from crewai.knowledge.source.pdf_knowledge_source import PDFKnowledgeSource +from crewai.knowledge.source.text_file_knowledge_source import TextFileKnowledgeSource + + +class SourceHelper: + """Helper class for creating and managing knowledge sources.""" + + SUPPORTED_FILE_TYPES: ClassVar[list[str]] = [ + ".csv", + ".pdf", + ".json", + ".txt", + ".xlsx", + ".xls", + ] + + _FILE_TYPE_MAP: ClassVar[dict[str, type[BaseKnowledgeSource]]] = { + ".csv": CSVKnowledgeSource, + ".pdf": PDFKnowledgeSource, + ".json": JSONKnowledgeSource, + ".txt": TextFileKnowledgeSource, + ".xlsx": ExcelKnowledgeSource, + ".xls": ExcelKnowledgeSource, + } + + @classmethod + def is_supported_file(cls, file_path: str) -> bool: + """Check if a file type is supported. + + Args: + file_path: Path to the file. + + Returns: + True if the file type is supported. + """ + return file_path.lower().endswith(tuple(cls.SUPPORTED_FILE_TYPES)) + + @classmethod + def get_source( + cls, file_path: str, metadata: dict[str, Any] | None = None + ) -> BaseKnowledgeSource: + """Create appropriate KnowledgeSource based on file extension. + + Args: + file_path: Path to the file. + metadata: Optional metadata to attach to the source. + + Returns: + The appropriate KnowledgeSource instance. + + Raises: + ValueError: If the file type is not supported. + """ + if not cls.is_supported_file(file_path): + raise ValueError(f"Unsupported file type: {file_path}") + + lower_path = file_path.lower() + for ext, source_cls in cls._FILE_TYPE_MAP.items(): + if lower_path.endswith(ext): + return source_cls(file_path=[file_path], metadata=metadata) + + raise ValueError(f"Unsupported file type: {file_path}") diff --git a/lib/crewai/src/crewai/utilities/evaluators/crew_evaluator_handler.py b/lib/crewai/src/crewai/utilities/evaluators/crew_evaluator_handler.py index 9c9cac0c6..32b847d73 100644 --- a/lib/crewai/src/crewai/utilities/evaluators/crew_evaluator_handler.py +++ b/lib/crewai/src/crewai/utilities/evaluators/crew_evaluator_handler.py @@ -1,7 +1,7 @@ from __future__ import annotations from collections import defaultdict -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from pydantic import BaseModel, Field, InstanceOf from rich.box import HEAVY_EDGE @@ -36,7 +36,13 @@ class CrewEvaluator: iteration: The current iteration of the evaluation. """ - def __init__(self, crew: Crew, eval_llm: InstanceOf[BaseLLM]) -> None: + def __init__( + self, + crew: Crew, + eval_llm: InstanceOf[BaseLLM] | str | None = None, + openai_model_name: str | None = None, + llm: InstanceOf[BaseLLM] | str | None = None, + ) -> None: self.crew = crew self.llm = eval_llm self.tasks_scores: defaultdict[int, list[float]] = defaultdict(list) @@ -86,7 +92,9 @@ class CrewEvaluator: """ self.iteration = iteration - def print_crew_evaluation_result(self) -> None: + def print_crew_evaluation_result( + self, token_usage: list[dict[str, Any]] | None = None + ) -> None: """ Prints the evaluation result of the crew in a table. A Crew with 2 tasks using the command crewai test -n 3 @@ -204,7 +212,7 @@ class CrewEvaluator: CrewTestResultEvent( quality=quality_score, execution_duration=current_task.execution_duration, - model=self.llm.model, + model=getattr(self.llm, "model", str(self.llm)), crew_name=self.crew.name, crew=self.crew, ), From 76b5f72e81844e7ee592db87317a96f2e05636ca Mon Sep 17 00:00:00 2001 From: Vini Brasil Date: Wed, 4 Feb 2026 20:34:08 -0300 Subject: [PATCH 3/6] Fix tool error causing double event scope pop (#4373) When a tool raises an error, both ToolUsageErrorEvent and ToolUsageFinishedEvent were being emitted. Since both events pop the event scope stack, this caused the agent scope to be incorrectly popped along with the tool scope. --- .../src/crewai/agents/crew_agent_executor.py | 30 ++++---- .../src/crewai/experimental/agent_executor.py | 30 ++++---- lib/crewai/src/crewai/tools/tool_usage.py | 8 +- lib/crewai/tests/events/test_event_context.py | 38 ++++++++- lib/crewai/tests/tools/test_tool_usage.py | 77 +++++++++++++++++++ 5 files changed, 152 insertions(+), 31 deletions(-) diff --git a/lib/crewai/src/crewai/agents/crew_agent_executor.py b/lib/crewai/src/crewai/agents/crew_agent_executor.py index c0f24516a..647596f2a 100644 --- a/lib/crewai/src/crewai/agents/crew_agent_executor.py +++ b/lib/crewai/src/crewai/agents/crew_agent_executor.py @@ -814,6 +814,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): agent_key=agent_key, ), ) + error_event_emitted = False track_delegation_if_needed(func_name, args_dict, self.task) @@ -896,6 +897,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): error=e, ), ) + error_event_emitted = True elif max_usage_reached and original_tool: # Return error message when max usage limit is reached result = f"Tool '{func_name}' has reached its usage limit of {original_tool.max_usage_count} times and cannot be used anymore." @@ -923,20 +925,20 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): color="red", ) - # Emit tool usage finished event - crewai_event_bus.emit( - self, - event=ToolUsageFinishedEvent( - output=result, - tool_name=func_name, - tool_args=args_dict, - from_agent=self.agent, - from_task=self.task, - agent_key=agent_key, - started_at=started_at, - finished_at=datetime.now(), - ), - ) + if not error_event_emitted: + crewai_event_bus.emit( + self, + event=ToolUsageFinishedEvent( + output=result, + tool_name=func_name, + tool_args=args_dict, + from_agent=self.agent, + from_task=self.task, + agent_key=agent_key, + started_at=started_at, + finished_at=datetime.now(), + ), + ) # Append tool result message tool_message: LLMMessage = { diff --git a/lib/crewai/src/crewai/experimental/agent_executor.py b/lib/crewai/src/crewai/experimental/agent_executor.py index b9d8adccc..98e8b5bdb 100644 --- a/lib/crewai/src/crewai/experimental/agent_executor.py +++ b/lib/crewai/src/crewai/experimental/agent_executor.py @@ -689,6 +689,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): agent_key=agent_key, ), ) + error_event_emitted = False track_delegation_if_needed(func_name, args_dict, self.task) @@ -764,6 +765,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): error=e, ), ) + error_event_emitted = True elif max_usage_reached and original_tool: # Return error message when max usage limit is reached result = f"Tool '{func_name}' has reached its usage limit of {original_tool.max_usage_count} times and cannot be used anymore." @@ -792,20 +794,20 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): color="red", ) - # Emit tool usage finished event - crewai_event_bus.emit( - self, - event=ToolUsageFinishedEvent( - output=result, - tool_name=func_name, - tool_args=args_dict, - from_agent=self.agent, - from_task=self.task, - agent_key=agent_key, - started_at=started_at, - finished_at=datetime.now(), - ), - ) + if not error_event_emitted: + crewai_event_bus.emit( + self, + event=ToolUsageFinishedEvent( + output=result, + tool_name=func_name, + tool_args=args_dict, + from_agent=self.agent, + from_task=self.task, + agent_key=agent_key, + started_at=started_at, + finished_at=datetime.now(), + ), + ) # Append tool result message tool_message: LLMMessage = { diff --git a/lib/crewai/src/crewai/tools/tool_usage.py b/lib/crewai/src/crewai/tools/tool_usage.py index e5a9e6154..b6ce5adb6 100644 --- a/lib/crewai/src/crewai/tools/tool_usage.py +++ b/lib/crewai/src/crewai/tools/tool_usage.py @@ -270,6 +270,7 @@ class ToolUsage: result = None # type: ignore should_retry = False available_tool = None + error_event_emitted = False try: if self.tools_handler and self.tools_handler.cache: @@ -408,6 +409,7 @@ class ToolUsage: except Exception as e: self.on_tool_error(tool=tool, tool_calling=calling, e=e) + error_event_emitted = True self._run_attempts += 1 if self._run_attempts > self._max_parsing_attempts: self._telemetry.tool_usage_error(llm=self.function_calling_llm) @@ -435,7 +437,7 @@ class ToolUsage: result = self._format_result(result=result) finally: - if started_event_emitted: + if started_event_emitted and not error_event_emitted: self.on_tool_use_finished( tool=tool, tool_calling=calling, @@ -500,6 +502,7 @@ class ToolUsage: result = None # type: ignore should_retry = False available_tool = None + error_event_emitted = False try: if self.tools_handler and self.tools_handler.cache: @@ -638,6 +641,7 @@ class ToolUsage: except Exception as e: self.on_tool_error(tool=tool, tool_calling=calling, e=e) + error_event_emitted = True self._run_attempts += 1 if self._run_attempts > self._max_parsing_attempts: self._telemetry.tool_usage_error(llm=self.function_calling_llm) @@ -665,7 +669,7 @@ class ToolUsage: result = self._format_result(result=result) finally: - if started_event_emitted: + if started_event_emitted and not error_event_emitted: self.on_tool_use_finished( tool=tool, tool_calling=calling, diff --git a/lib/crewai/tests/events/test_event_context.py b/lib/crewai/tests/events/test_event_context.py index 071e1a34d..2a69ca1ee 100644 --- a/lib/crewai/tests/events/test_event_context.py +++ b/lib/crewai/tests/events/test_event_context.py @@ -177,4 +177,40 @@ class TestTriggeredByScope: raise ValueError("test error") except ValueError: pass - assert get_triggering_event_id() is None \ No newline at end of file + assert get_triggering_event_id() is None + + +def test_agent_scope_preserved_after_tool_error_event() -> None: + from crewai.events import crewai_event_bus + from crewai.events.types.tool_usage_events import ( + ToolUsageErrorEvent, + ToolUsageStartedEvent, + ) + + push_event_scope("crew-1", "crew_kickoff_started") + push_event_scope("task-1", "task_started") + push_event_scope("agent-1", "agent_execution_started") + + crewai_event_bus.emit( + None, + ToolUsageStartedEvent( + tool_name="test_tool", + tool_args={}, + agent_key="test_agent", + ) + ) + + crewai_event_bus.emit( + None, + ToolUsageErrorEvent( + tool_name="test_tool", + tool_args={}, + agent_key="test_agent", + error=ValueError("test error"), + ) + ) + + crewai_event_bus.flush() + + assert get_current_parent_id() == "agent-1" + diff --git a/lib/crewai/tests/tools/test_tool_usage.py b/lib/crewai/tests/tools/test_tool_usage.py index c6dfad58b..b68a41666 100644 --- a/lib/crewai/tests/tools/test_tool_usage.py +++ b/lib/crewai/tests/tools/test_tool_usage.py @@ -10,7 +10,9 @@ from crewai import Agent, Task from crewai.events.event_bus import crewai_event_bus from crewai.events.types.tool_usage_events import ( ToolSelectionErrorEvent, + ToolUsageErrorEvent, ToolUsageFinishedEvent, + ToolUsageStartedEvent, ToolValidateInputErrorEvent, ) from crewai.tools import BaseTool @@ -744,3 +746,78 @@ def test_tool_usage_finished_event_with_cached_result(): assert isinstance(event.started_at, datetime.datetime) assert isinstance(event.finished_at, datetime.datetime) assert event.type == "tool_usage_finished" + + +def test_tool_error_does_not_emit_finished_event(): + from crewai.tools.tool_calling import ToolCalling + + class FailingTool(BaseTool): + name: str = "Failing Tool" + description: str = "A tool that always fails" + + def _run(self, **kwargs) -> str: + raise ValueError("Intentional failure") + + failing_tool = FailingTool().to_structured_tool() + + mock_agent = MagicMock() + mock_agent.key = "test_agent_key" + mock_agent.role = "test_agent_role" + mock_agent._original_role = "test_agent_role" + mock_agent.verbose = False + mock_agent.fingerprint = None + mock_agent.i18n.tools.return_value = {"name": "Add Image"} + mock_agent.i18n.errors.return_value = "Error: {error}" + mock_agent.i18n.slice.return_value = "Available tools: {tool_names}" + + mock_task = MagicMock() + mock_task.delegations = 0 + mock_task.name = "Test Task" + mock_task.description = "A test task" + mock_task.id = "test-task-id" + + mock_action = MagicMock() + mock_action.tool = "failing_tool" + mock_action.tool_input = "{}" + + tool_usage = ToolUsage( + tools_handler=MagicMock(cache=None, last_used_tool=None), + tools=[failing_tool], + task=mock_task, + function_calling_llm=None, + agent=mock_agent, + action=mock_action, + ) + + started_events = [] + error_events = [] + finished_events = [] + error_received = threading.Event() + + @crewai_event_bus.on(ToolUsageStartedEvent) + def on_started(source, event): + if event.tool_name == "failing_tool": + started_events.append(event) + + @crewai_event_bus.on(ToolUsageErrorEvent) + def on_error(source, event): + if event.tool_name == "failing_tool": + error_events.append(event) + error_received.set() + + @crewai_event_bus.on(ToolUsageFinishedEvent) + def on_finished(source, event): + if event.tool_name == "failing_tool": + finished_events.append(event) + + tool_calling = ToolCalling(tool_name="failing_tool", arguments={}) + tool_usage.use(calling=tool_calling, tool_string="Action: failing_tool") + + assert error_received.wait(timeout=5), "Timeout waiting for error event" + crewai_event_bus.flush() + + assert len(started_events) >= 1, "Expected at least one ToolUsageStartedEvent" + assert len(error_events) >= 1, "Expected at least one ToolUsageErrorEvent" + assert len(finished_events) == 0, ( + "ToolUsageFinishedEvent should NOT be emitted after ToolUsageErrorEvent" + ) From 711e7171e1d01ef5a00c5f4f863463c670650e92 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Wed, 4 Feb 2026 21:16:20 -0500 Subject: [PATCH 4/6] chore: improve hook typing and registration Allow hook registration to accept both typed hook types and plain callables by importing and using After*/Before*CallHookCallable types; add explicit LLMCallHookContext and ToolCallHookContext typing in crew_base. Introduce a post-initialize crew hook list and invoke hooks after Crew instance initialization. Refactor filtered hook factory functions to include precise typing and clearer local names (before_llm_hook/after_llm_hook/before_tool_hook/after_tool_hook) and register those with the instance. Update CrewInstance protocol to include _registered_hook_functions and _hooks_being_registered fields. --- lib/crewai/src/crewai/hooks/llm_hooks.py | 25 ++++--- lib/crewai/src/crewai/hooks/tool_hooks.py | 27 +++++--- lib/crewai/src/crewai/project/crew_base.py | 77 +++++++++++++++------- lib/crewai/src/crewai/project/wrappers.py | 2 + 4 files changed, 89 insertions(+), 42 deletions(-) diff --git a/lib/crewai/src/crewai/hooks/llm_hooks.py b/lib/crewai/src/crewai/hooks/llm_hooks.py index 2f5462fe0..3a6abbedf 100644 --- a/lib/crewai/src/crewai/hooks/llm_hooks.py +++ b/lib/crewai/src/crewai/hooks/llm_hooks.py @@ -3,7 +3,12 @@ from __future__ import annotations from typing import TYPE_CHECKING, Any, cast from crewai.events.event_listener import event_listener -from crewai.hooks.types import AfterLLMCallHookType, BeforeLLMCallHookType +from crewai.hooks.types import ( + AfterLLMCallHookCallable, + AfterLLMCallHookType, + BeforeLLMCallHookCallable, + BeforeLLMCallHookType, +) from crewai.utilities.printer import Printer @@ -149,12 +154,12 @@ class LLMCallHookContext: event_listener.formatter.resume_live_updates() -_before_llm_call_hooks: list[BeforeLLMCallHookType] = [] -_after_llm_call_hooks: list[AfterLLMCallHookType] = [] +_before_llm_call_hooks: list[BeforeLLMCallHookType | BeforeLLMCallHookCallable] = [] +_after_llm_call_hooks: list[AfterLLMCallHookType | AfterLLMCallHookCallable] = [] def register_before_llm_call_hook( - hook: BeforeLLMCallHookType, + hook: BeforeLLMCallHookType | BeforeLLMCallHookCallable, ) -> None: """Register a global before_llm_call hook. @@ -190,7 +195,7 @@ def register_before_llm_call_hook( def register_after_llm_call_hook( - hook: AfterLLMCallHookType, + hook: AfterLLMCallHookType | AfterLLMCallHookCallable, ) -> None: """Register a global after_llm_call hook. @@ -217,7 +222,9 @@ def register_after_llm_call_hook( _after_llm_call_hooks.append(hook) -def get_before_llm_call_hooks() -> list[BeforeLLMCallHookType]: +def get_before_llm_call_hooks() -> list[ + BeforeLLMCallHookType | BeforeLLMCallHookCallable +]: """Get all registered global before_llm_call hooks. Returns: @@ -226,7 +233,7 @@ def get_before_llm_call_hooks() -> list[BeforeLLMCallHookType]: return _before_llm_call_hooks.copy() -def get_after_llm_call_hooks() -> list[AfterLLMCallHookType]: +def get_after_llm_call_hooks() -> list[AfterLLMCallHookType | AfterLLMCallHookCallable]: """Get all registered global after_llm_call hooks. Returns: @@ -236,7 +243,7 @@ def get_after_llm_call_hooks() -> list[AfterLLMCallHookType]: def unregister_before_llm_call_hook( - hook: BeforeLLMCallHookType, + hook: BeforeLLMCallHookType | BeforeLLMCallHookCallable, ) -> bool: """Unregister a specific global before_llm_call hook. @@ -262,7 +269,7 @@ def unregister_before_llm_call_hook( def unregister_after_llm_call_hook( - hook: AfterLLMCallHookType, + hook: AfterLLMCallHookType | AfterLLMCallHookCallable, ) -> bool: """Unregister a specific global after_llm_call hook. diff --git a/lib/crewai/src/crewai/hooks/tool_hooks.py b/lib/crewai/src/crewai/hooks/tool_hooks.py index 6ee0ab033..ac7f5c362 100644 --- a/lib/crewai/src/crewai/hooks/tool_hooks.py +++ b/lib/crewai/src/crewai/hooks/tool_hooks.py @@ -3,7 +3,12 @@ from __future__ import annotations from typing import TYPE_CHECKING, Any from crewai.events.event_listener import event_listener -from crewai.hooks.types import AfterToolCallHookType, BeforeToolCallHookType +from crewai.hooks.types import ( + AfterToolCallHookCallable, + AfterToolCallHookType, + BeforeToolCallHookCallable, + BeforeToolCallHookType, +) from crewai.utilities.printer import Printer @@ -112,12 +117,12 @@ class ToolCallHookContext: # Global hook registries -_before_tool_call_hooks: list[BeforeToolCallHookType] = [] -_after_tool_call_hooks: list[AfterToolCallHookType] = [] +_before_tool_call_hooks: list[BeforeToolCallHookType | BeforeToolCallHookCallable] = [] +_after_tool_call_hooks: list[AfterToolCallHookType | AfterToolCallHookCallable] = [] def register_before_tool_call_hook( - hook: BeforeToolCallHookType, + hook: BeforeToolCallHookType | BeforeToolCallHookCallable, ) -> None: """Register a global before_tool_call hook. @@ -154,7 +159,7 @@ def register_before_tool_call_hook( def register_after_tool_call_hook( - hook: AfterToolCallHookType, + hook: AfterToolCallHookType | AfterToolCallHookCallable, ) -> None: """Register a global after_tool_call hook. @@ -184,7 +189,9 @@ def register_after_tool_call_hook( _after_tool_call_hooks.append(hook) -def get_before_tool_call_hooks() -> list[BeforeToolCallHookType]: +def get_before_tool_call_hooks() -> list[ + BeforeToolCallHookType | BeforeToolCallHookCallable +]: """Get all registered global before_tool_call hooks. Returns: @@ -193,7 +200,9 @@ def get_before_tool_call_hooks() -> list[BeforeToolCallHookType]: return _before_tool_call_hooks.copy() -def get_after_tool_call_hooks() -> list[AfterToolCallHookType]: +def get_after_tool_call_hooks() -> list[ + AfterToolCallHookType | AfterToolCallHookCallable +]: """Get all registered global after_tool_call hooks. Returns: @@ -203,7 +212,7 @@ def get_after_tool_call_hooks() -> list[AfterToolCallHookType]: def unregister_before_tool_call_hook( - hook: BeforeToolCallHookType, + hook: BeforeToolCallHookType | BeforeToolCallHookCallable, ) -> bool: """Unregister a specific global before_tool_call hook. @@ -229,7 +238,7 @@ def unregister_before_tool_call_hook( def unregister_after_tool_call_hook( - hook: AfterToolCallHookType, + hook: AfterToolCallHookType | AfterToolCallHookCallable, ) -> bool: """Unregister a specific global after_tool_call hook. diff --git a/lib/crewai/src/crewai/project/crew_base.py b/lib/crewai/src/crewai/project/crew_base.py index 202d98898..323450b13 100644 --- a/lib/crewai/src/crewai/project/crew_base.py +++ b/lib/crewai/src/crewai/project/crew_base.py @@ -27,6 +27,8 @@ if TYPE_CHECKING: from crewai import Agent, Task from crewai.agents.cache.cache_handler import CacheHandler from crewai.crews.crew_output import CrewOutput + from crewai.hooks.llm_hooks import LLMCallHookContext + from crewai.hooks.tool_hooks import ToolCallHookContext from crewai.project.wrappers import ( CrewInstance, OutputJsonClass, @@ -34,6 +36,8 @@ if TYPE_CHECKING: ) from crewai.tasks.task_output import TaskOutput +_post_initialize_crew_hooks: list[Callable[[Any], None]] = [] + class AgentConfig(TypedDict, total=False): """Type definition for agent configuration dictionary. @@ -266,6 +270,9 @@ class CrewBaseMeta(type): instance.map_all_agent_variables() instance.map_all_task_variables() + for hook in _post_initialize_crew_hooks: + hook(instance) + original_methods = { name: method for name, method in cls.__dict__.items() @@ -485,47 +492,61 @@ def _register_crew_hooks(instance: CrewInstance, cls: type) -> None: if has_agent_filter: agents_filter = hook_method._filter_agents - def make_filtered_before_llm(bound_fn, agents_list): - def filtered(context): + def make_filtered_before_llm( + bound_fn: Callable[[LLMCallHookContext], bool | None], + agents_list: list[str], + ) -> Callable[[LLMCallHookContext], bool | None]: + def filtered(context: LLMCallHookContext) -> bool | None: if context.agent and context.agent.role not in agents_list: return None return bound_fn(context) return filtered - final_hook = make_filtered_before_llm(bound_hook, agents_filter) + before_llm_hook = make_filtered_before_llm(bound_hook, agents_filter) else: - final_hook = bound_hook + before_llm_hook = bound_hook - register_before_llm_call_hook(final_hook) - instance._registered_hook_functions.append(("before_llm_call", final_hook)) + register_before_llm_call_hook(before_llm_hook) + instance._registered_hook_functions.append( + ("before_llm_call", before_llm_hook) + ) if hasattr(hook_method, "is_after_llm_call_hook"): if has_agent_filter: agents_filter = hook_method._filter_agents - def make_filtered_after_llm(bound_fn, agents_list): - def filtered(context): + def make_filtered_after_llm( + bound_fn: Callable[[LLMCallHookContext], str | None], + agents_list: list[str], + ) -> Callable[[LLMCallHookContext], str | None]: + def filtered(context: LLMCallHookContext) -> str | None: if context.agent and context.agent.role not in agents_list: return None return bound_fn(context) return filtered - final_hook = make_filtered_after_llm(bound_hook, agents_filter) + after_llm_hook = make_filtered_after_llm(bound_hook, agents_filter) else: - final_hook = bound_hook + after_llm_hook = bound_hook - register_after_llm_call_hook(final_hook) - instance._registered_hook_functions.append(("after_llm_call", final_hook)) + register_after_llm_call_hook(after_llm_hook) + instance._registered_hook_functions.append( + ("after_llm_call", after_llm_hook) + ) if hasattr(hook_method, "is_before_tool_call_hook"): if has_tool_filter or has_agent_filter: tools_filter = getattr(hook_method, "_filter_tools", None) agents_filter = getattr(hook_method, "_filter_agents", None) - def make_filtered_before_tool(bound_fn, tools_list, agents_list): - def filtered(context): + def make_filtered_before_tool( + bound_fn: Callable[[ToolCallHookContext], bool | None], + tools_list: list[str] | None, + agents_list: list[str] | None, + ) -> Callable[[ToolCallHookContext], bool | None]: + def filtered(context: ToolCallHookContext) -> bool | None: if tools_list and context.tool_name not in tools_list: return None if ( @@ -538,22 +559,28 @@ def _register_crew_hooks(instance: CrewInstance, cls: type) -> None: return filtered - final_hook = make_filtered_before_tool( + before_tool_hook = make_filtered_before_tool( bound_hook, tools_filter, agents_filter ) else: - final_hook = bound_hook + before_tool_hook = bound_hook - register_before_tool_call_hook(final_hook) - instance._registered_hook_functions.append(("before_tool_call", final_hook)) + register_before_tool_call_hook(before_tool_hook) + instance._registered_hook_functions.append( + ("before_tool_call", before_tool_hook) + ) if hasattr(hook_method, "is_after_tool_call_hook"): if has_tool_filter or has_agent_filter: tools_filter = getattr(hook_method, "_filter_tools", None) agents_filter = getattr(hook_method, "_filter_agents", None) - def make_filtered_after_tool(bound_fn, tools_list, agents_list): - def filtered(context): + def make_filtered_after_tool( + bound_fn: Callable[[ToolCallHookContext], str | None], + tools_list: list[str] | None, + agents_list: list[str] | None, + ) -> Callable[[ToolCallHookContext], str | None]: + def filtered(context: ToolCallHookContext) -> str | None: if tools_list and context.tool_name not in tools_list: return None if ( @@ -566,14 +593,16 @@ def _register_crew_hooks(instance: CrewInstance, cls: type) -> None: return filtered - final_hook = make_filtered_after_tool( + after_tool_hook = make_filtered_after_tool( bound_hook, tools_filter, agents_filter ) else: - final_hook = bound_hook + after_tool_hook = bound_hook - register_after_tool_call_hook(final_hook) - instance._registered_hook_functions.append(("after_tool_call", final_hook)) + register_after_tool_call_hook(after_tool_hook) + instance._registered_hook_functions.append( + ("after_tool_call", after_tool_hook) + ) instance._hooks_being_registered = False diff --git a/lib/crewai/src/crewai/project/wrappers.py b/lib/crewai/src/crewai/project/wrappers.py index 28cd39525..3d570b6f0 100644 --- a/lib/crewai/src/crewai/project/wrappers.py +++ b/lib/crewai/src/crewai/project/wrappers.py @@ -72,6 +72,8 @@ class CrewInstance(Protocol): __crew_metadata__: CrewMetadata _mcp_server_adapter: Any _all_methods: dict[str, Callable[..., Any]] + _registered_hook_functions: list[tuple[str, Callable[..., Any]]] + _hooks_being_registered: bool agents: list[Agent] tasks: list[Task] base_directory: Path From fe2a4b4e40dac46db8ceec45dfc58dbefb8c39c3 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Wed, 4 Feb 2026 21:21:54 -0500 Subject: [PATCH 5/6] chore: bug fixes and more refactor Refactor agent executor to delegate human interactions to a provider: add messages and ask_for_human_input properties, implement _invoke_loop and _format_feedback_message, and replace the internal iterative/training feedback logic with a call to get_provider().handle_feedback. Make LLMGuardrail kickoff coroutine-aware by detecting coroutines and running them via asyncio.run so both sync and async agents are supported. Make telemetry more robust by safely handling missing task.output (use empty string) and returning early if span is None before setting attributes. Improve serialization to detect circular references via an _ancestors set, propagate it through recursive calls, and pass exclude/max_depth/_current_depth consistently to prevent infinite recursion and produce stable serializable output. --- .../src/crewai/experimental/agent_executor.py | 179 ++++++------------ lib/crewai/src/crewai/tasks/llm_guardrail.py | 16 +- lib/crewai/src/crewai/telemetry/telemetry.py | 5 +- .../src/crewai/utilities/serialization.py | 24 ++- 4 files changed, 101 insertions(+), 123 deletions(-) diff --git a/lib/crewai/src/crewai/experimental/agent_executor.py b/lib/crewai/src/crewai/experimental/agent_executor.py index 98e8b5bdb..87f9e83d4 100644 --- a/lib/crewai/src/crewai/experimental/agent_executor.py +++ b/lib/crewai/src/crewai/experimental/agent_executor.py @@ -18,6 +18,7 @@ from crewai.agents.parser import ( AgentFinish, OutputParserError, ) +from crewai.core.providers.human_input import get_provider from crewai.events.event_bus import crewai_event_bus from crewai.events.listeners.tracing.utils import ( is_tracing_enabled_in_context, @@ -41,7 +42,12 @@ from crewai.hooks.tool_hooks import ( get_after_tool_call_hooks, get_before_tool_call_hooks, ) -from crewai.hooks.types import AfterLLMCallHookType, BeforeLLMCallHookType +from crewai.hooks.types import ( + AfterLLMCallHookCallable, + AfterLLMCallHookType, + BeforeLLMCallHookCallable, + BeforeLLMCallHookType, +) from crewai.utilities.agent_utils import ( convert_tools_to_openai_schema, enforce_rpm_limit, @@ -191,8 +197,12 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): self._instance_id = str(uuid4())[:8] - self.before_llm_call_hooks: list[BeforeLLMCallHookType] = [] - self.after_llm_call_hooks: list[AfterLLMCallHookType] = [] + self.before_llm_call_hooks: list[ + BeforeLLMCallHookType | BeforeLLMCallHookCallable + ] = [] + self.after_llm_call_hooks: list[ + AfterLLMCallHookType | AfterLLMCallHookCallable + ] = [] self.before_llm_call_hooks.extend(get_before_llm_call_hooks()) self.after_llm_call_hooks.extend(get_after_llm_call_hooks()) @@ -207,6 +217,51 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): ) self._state = AgentReActState() + @property + def messages(self) -> list[LLMMessage]: + """Delegate to state for ExecutorContext conformance.""" + return self._state.messages + + @messages.setter + def messages(self, value: list[LLMMessage]) -> None: + """Delegate to state for ExecutorContext conformance.""" + self._state.messages = value + + @property + def ask_for_human_input(self) -> bool: + """Delegate to state for ExecutorContext conformance.""" + return self._state.ask_for_human_input + + @ask_for_human_input.setter + def ask_for_human_input(self, value: bool) -> None: + """Delegate to state for ExecutorContext conformance.""" + self._state.ask_for_human_input = value + + def _invoke_loop(self) -> AgentFinish: + """Invoke the agent loop and return the result. + + Required by ExecutorContext protocol. + """ + self._state.iterations = 0 + self._state.is_finished = False + self._state.current_answer = None + + self.kickoff() + + answer = self._state.current_answer + if not isinstance(answer, AgentFinish): + raise RuntimeError("Agent loop did not produce a final answer") + return answer + + def _format_feedback_message(self, feedback: str) -> LLMMessage: + """Format feedback as a message for the LLM. + + Required by ExecutorContext protocol. + """ + return format_message_for_llm( + self._i18n.slice("feedback_instructions").format(feedback=feedback) + ) + def _ensure_flow_initialized(self) -> None: """Ensure Flow.__init__() has been called. @@ -300,16 +355,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): """ return self._state - @property - def messages(self) -> list[LLMMessage]: - """Compatibility property for mixin - returns state messages.""" - return self._state.messages - - @messages.setter - def messages(self, value: list[LLMMessage]) -> None: - """Set state messages.""" - self._state.messages = value - @property def iterations(self) -> int: """Compatibility property for mixin - returns state iterations.""" @@ -1321,17 +1366,8 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): Returns: Final answer after feedback. """ - output_str = ( - str(formatted_answer.output) - if isinstance(formatted_answer.output, BaseModel) - else formatted_answer.output - ) - human_feedback = self._ask_human_input(output_str) - - if self._is_training_mode(): - return self._handle_training_feedback(formatted_answer, human_feedback) - - return self._handle_regular_feedback(formatted_answer, human_feedback) + provider = get_provider() + return provider.handle_feedback(formatted_answer, self) def _is_training_mode(self) -> bool: """Check if training mode is active. @@ -1341,101 +1377,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): """ return bool(self.crew and self.crew._train) - def _handle_training_feedback( - self, initial_answer: AgentFinish, feedback: str - ) -> AgentFinish: - """Process training feedback and generate improved answer. - - Args: - initial_answer: Initial agent output. - feedback: Training feedback. - - Returns: - Improved answer. - """ - self._handle_crew_training_output(initial_answer, feedback) - self.state.messages.append( - format_message_for_llm( - self._i18n.slice("feedback_instructions").format(feedback=feedback) - ) - ) - - # Re-run flow for improved answer - self.state.iterations = 0 - self.state.is_finished = False - self.state.current_answer = None - - self.kickoff() - - # Get improved answer from state - improved_answer = self.state.current_answer - if not isinstance(improved_answer, AgentFinish): - raise RuntimeError( - "Training feedback iteration did not produce final answer" - ) - - self._handle_crew_training_output(improved_answer) - self.state.ask_for_human_input = False - return improved_answer - - def _handle_regular_feedback( - self, current_answer: AgentFinish, initial_feedback: str - ) -> AgentFinish: - """Process regular feedback iteratively until user is satisfied. - - Args: - current_answer: Current agent output. - initial_feedback: Initial user feedback. - - Returns: - Final answer after iterations. - """ - feedback = initial_feedback - answer = current_answer - - while self.state.ask_for_human_input: - if feedback.strip() == "": - self.state.ask_for_human_input = False - else: - answer = self._process_feedback_iteration(feedback) - output_str = ( - str(answer.output) - if isinstance(answer.output, BaseModel) - else answer.output - ) - feedback = self._ask_human_input(output_str) - - return answer - - def _process_feedback_iteration(self, feedback: str) -> AgentFinish: - """Process a single feedback iteration and generate updated response. - - Args: - feedback: User feedback. - - Returns: - Updated agent response. - """ - self.state.messages.append( - format_message_for_llm( - self._i18n.slice("feedback_instructions").format(feedback=feedback) - ) - ) - - # Re-run flow - self.state.iterations = 0 - self.state.is_finished = False - self.state.current_answer = None - - self.kickoff() - - # Get answer from state - answer = self.state.current_answer - if not isinstance(answer, AgentFinish): - raise RuntimeError("Feedback iteration did not produce final answer") - - return answer - @classmethod def __get_pydantic_core_schema__( cls, _source_type: Any, _handler: GetCoreSchemaHandler diff --git a/lib/crewai/src/crewai/tasks/llm_guardrail.py b/lib/crewai/src/crewai/tasks/llm_guardrail.py index 803b2d749..3729e8084 100644 --- a/lib/crewai/src/crewai/tasks/llm_guardrail.py +++ b/lib/crewai/src/crewai/tasks/llm_guardrail.py @@ -1,6 +1,10 @@ +import asyncio +from collections.abc import Coroutine +import inspect from typing import Any from pydantic import BaseModel, Field +from typing_extensions import TypeIs from crewai.agent import Agent from crewai.lite_agent_output import LiteAgentOutput @@ -8,6 +12,13 @@ from crewai.llms.base_llm import BaseLLM from crewai.tasks.task_output import TaskOutput +def _is_coroutine( + obj: LiteAgentOutput | Coroutine[Any, Any, LiteAgentOutput], +) -> TypeIs[Coroutine[Any, Any, LiteAgentOutput]]: + """Check if obj is a coroutine for type narrowing.""" + return inspect.iscoroutine(obj) + + class LLMGuardrailResult(BaseModel): valid: bool = Field( description="Whether the task output complies with the guardrail" @@ -62,7 +73,10 @@ class LLMGuardrail: - If the Task result complies with the guardrail, saying that is valid """ - return agent.kickoff(query, response_format=LLMGuardrailResult) + kickoff_result = agent.kickoff(query, response_format=LLMGuardrailResult) + if _is_coroutine(kickoff_result): + return asyncio.run(kickoff_result) + return kickoff_result def __call__(self, task_output: TaskOutput) -> tuple[bool, Any]: """Validates the output of a task based on specified criteria. diff --git a/lib/crewai/src/crewai/telemetry/telemetry.py b/lib/crewai/src/crewai/telemetry/telemetry.py index 27d5d6090..04303fc3d 100644 --- a/lib/crewai/src/crewai/telemetry/telemetry.py +++ b/lib/crewai/src/crewai/telemetry/telemetry.py @@ -903,7 +903,7 @@ class Telemetry: { "id": str(task.id), "description": task.description, - "output": task.output.raw_output, + "output": task.output.raw if task.output else "", } for task in crew.tasks ] @@ -923,6 +923,9 @@ class Telemetry: value: The attribute value. """ + if span is None: + return + def _operation() -> None: return span.set_attribute(key, value) diff --git a/lib/crewai/src/crewai/utilities/serialization.py b/lib/crewai/src/crewai/utilities/serialization.py index a6b871d3d..0207e80ab 100644 --- a/lib/crewai/src/crewai/utilities/serialization.py +++ b/lib/crewai/src/crewai/utilities/serialization.py @@ -19,6 +19,7 @@ def to_serializable( exclude: set[str] | None = None, max_depth: int = 5, _current_depth: int = 0, + _ancestors: set[int] | None = None, ) -> Serializable: """Converts a Python object into a JSON-compatible representation. @@ -31,6 +32,7 @@ def to_serializable( exclude: Set of keys to exclude from the result. max_depth: Maximum recursion depth. Defaults to 5. _current_depth: Current recursion depth (for internal use). + _ancestors: Set of ancestor object ids for cycle detection (for internal use). Returns: Serializable: A JSON-compatible structure. @@ -41,16 +43,29 @@ def to_serializable( if exclude is None: exclude = set() + if _ancestors is None: + _ancestors = set() + if isinstance(obj, (str, int, float, bool, type(None))): return obj if isinstance(obj, uuid.UUID): return str(obj) if isinstance(obj, (date, datetime)): return obj.isoformat() + + object_id = id(obj) + if object_id in _ancestors: + return f"" + new_ancestors = _ancestors | {object_id} + if isinstance(obj, (list, tuple, set)): return [ to_serializable( - item, max_depth=max_depth, _current_depth=_current_depth + 1 + item, + exclude=exclude, + max_depth=max_depth, + _current_depth=_current_depth + 1, + _ancestors=new_ancestors, ) for item in obj ] @@ -61,6 +76,7 @@ def to_serializable( exclude=exclude, max_depth=max_depth, _current_depth=_current_depth + 1, + _ancestors=new_ancestors, ) for key, value in obj.items() if key not in exclude @@ -71,12 +87,16 @@ def to_serializable( obj=obj.model_dump(exclude=exclude), max_depth=max_depth, _current_depth=_current_depth + 1, + _ancestors=new_ancestors, ) except Exception: try: return { _to_serializable_key(k): to_serializable( - v, max_depth=max_depth, _current_depth=_current_depth + 1 + v, + max_depth=max_depth, + _current_depth=_current_depth + 1, + _ancestors=new_ancestors, ) for k, v in obj.__dict__.items() if k not in (exclude or set()) From 6bb1b178a10139160575cccc4ce1be62800b28c0 Mon Sep 17 00:00:00 2001 From: Greyson LaLonde Date: Thu, 5 Feb 2026 12:49:54 -0500 Subject: [PATCH 6/6] chore: extension points Introduce ContextVar-backed hooks and small API/behavior changes to improve extensibility and testability. Changes include: - agents: mark configure_structured_output as abstract and change its parameter to task to reflect use of task metadata. - tracing: convert _first_time_trace_hook to a ContextVar and call .get() to safely retrieve the hook. - console formatter: add _disable_version_check ContextVar and skip version checks when set (avoids noisy checks in certain contexts). - flow: use current_triggering_event_id variable when scheduling listener tasks to keep naming consistent. - hallucination guardrail: make context optional, add _validate_output_hook to allow custom validation hooks, update examples and return contract to allow hooks to override behavior. - agent utilities: add _create_plus_client_hook for injecting a Plus client (used in tests/alternate flows), ensure structured tools have current_usage_count initialized and propagate to original tool, and fall back to creating PlusAPI client when no hook is provided. --- .../agent_adapters/base_agent_adapter.py | 5 ++- lib/crewai/src/crewai/crew.py | 8 +++- .../listeners/tracing/trace_listener.py | 8 +++- .../crewai/events/listeners/tracing/utils.py | 37 ++++++++++++++-- .../crewai/events/utils/console_formatter.py | 43 +++++++++++++++++++ lib/crewai/src/crewai/flow/flow.py | 9 +++- .../crewai/tasks/hallucination_guardrail.py | 31 +++++++------ .../src/crewai/utilities/agent_utils.py | 17 ++++++-- lib/crewai/src/crewai/utilities/printer.py | 4 ++ .../agent_adapters/test_base_agent_adapter.py | 4 ++ 10 files changed, 141 insertions(+), 25 deletions(-) diff --git a/lib/crewai/src/crewai/agents/agent_adapters/base_agent_adapter.py b/lib/crewai/src/crewai/agents/agent_adapters/base_agent_adapter.py index 8001628ed..12dfcf43c 100644 --- a/lib/crewai/src/crewai/agents/agent_adapters/base_agent_adapter.py +++ b/lib/crewai/src/crewai/agents/agent_adapters/base_agent_adapter.py @@ -37,9 +37,10 @@ class BaseAgentAdapter(BaseAgent, ABC): tools: Optional list of BaseTool instances to be configured """ - def configure_structured_output(self, structured_output: Any) -> None: + @abstractmethod + def configure_structured_output(self, task: Any) -> None: """Configure the structured output for the specific agent implementation. Args: - structured_output: The structured output to be configured + task: The task object containing output format specifications. """ diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index baabce190..09a103eba 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -2026,7 +2026,13 @@ class Crew(FlowTrackable, BaseModel): @staticmethod def _show_tracing_disabled_message() -> None: """Show a message when tracing is disabled.""" - from crewai.events.listeners.tracing.utils import has_user_declined_tracing + from crewai.events.listeners.tracing.utils import ( + has_user_declined_tracing, + should_suppress_tracing_messages, + ) + + if should_suppress_tracing_messages(): + return console = Console() diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py index ee337d7fd..0e4d7d8a2 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py @@ -797,7 +797,13 @@ class TraceCollectionListener(BaseEventListener): from rich.console import Console from rich.panel import Panel - from crewai.events.listeners.tracing.utils import has_user_declined_tracing + from crewai.events.listeners.tracing.utils import ( + has_user_declined_tracing, + should_suppress_tracing_messages, + ) + + if should_suppress_tracing_messages(): + return console = Console() diff --git a/lib/crewai/src/crewai/events/listeners/tracing/utils.py b/lib/crewai/src/crewai/events/listeners/tracing/utils.py index b6bf1026b..a98142619 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/utils.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/utils.py @@ -27,7 +27,34 @@ logger = logging.getLogger(__name__) _tracing_enabled: ContextVar[bool | None] = ContextVar("_tracing_enabled", default=None) -_first_time_trace_hook: Callable[[], bool] | None = None +_first_time_trace_hook: ContextVar[Callable[[], bool] | None] = ContextVar( + "_first_time_trace_hook", default=None +) + +_suppress_tracing_messages: ContextVar[bool] = ContextVar( + "_suppress_tracing_messages", default=False +) + + +def set_suppress_tracing_messages(suppress: bool) -> object: + """Set whether to suppress tracing-related console messages. + + Args: + suppress: True to suppress messages, False to show them. + + Returns: + A token that can be used to restore the previous value. + """ + return _suppress_tracing_messages.set(suppress) + + +def should_suppress_tracing_messages() -> bool: + """Check if tracing messages should be suppressed. + + Returns: + True if messages should be suppressed, False otherwise. + """ + return _suppress_tracing_messages.get() def should_enable_tracing(*, override: bool | None = None) -> bool: @@ -413,8 +440,9 @@ def should_auto_collect_first_time_traces() -> bool: Returns: True if first-time user AND telemetry not disabled AND tracing not explicitly enabled, False otherwise. """ - if _first_time_trace_hook is not None: - return _first_time_trace_hook() + hook = _first_time_trace_hook.get() + if hook is not None: + return hook() if _is_test_environment(): return False @@ -437,6 +465,9 @@ def prompt_user_for_trace_viewing(timeout_seconds: int = 20) -> bool: if _is_test_environment(): return False + if should_suppress_tracing_messages(): + return False + try: import threading diff --git a/lib/crewai/src/crewai/events/utils/console_formatter.py b/lib/crewai/src/crewai/events/utils/console_formatter.py index eaecc0e74..ee466c344 100644 --- a/lib/crewai/src/crewai/events/utils/console_formatter.py +++ b/lib/crewai/src/crewai/events/utils/console_formatter.py @@ -1,3 +1,4 @@ +from contextvars import ContextVar import os import threading from typing import Any, ClassVar, cast @@ -10,6 +11,36 @@ from rich.text import Text from crewai.cli.version import is_newer_version_available +_disable_version_check: ContextVar[bool] = ContextVar( + "_disable_version_check", default=False +) + +_suppress_console_output: ContextVar[bool] = ContextVar( + "_suppress_console_output", default=False +) + + +def set_suppress_console_output(suppress: bool) -> object: + """Set whether to suppress all console output. + + Args: + suppress: True to suppress output, False to show it. + + Returns: + A token that can be used to restore the previous value. + """ + return _suppress_console_output.set(suppress) + + +def should_suppress_console_output() -> bool: + """Check if console output should be suppressed. + + Returns: + True if output should be suppressed, False otherwise. + """ + return _suppress_console_output.get() + + class ConsoleFormatter: tool_usage_counts: ClassVar[dict[str, int]] = {} @@ -46,6 +77,9 @@ class ConsoleFormatter: if not self.verbose: return + if _disable_version_check.get(): + return + if os.getenv("CI", "").lower() in ("true", "1"): return @@ -79,8 +113,12 @@ To update, run: uv sync --upgrade-package crewai""" from crewai.events.listeners.tracing.utils import ( has_user_declined_tracing, is_tracing_enabled_in_context, + should_suppress_tracing_messages, ) + if should_suppress_tracing_messages(): + return + if not is_tracing_enabled_in_context(): if has_user_declined_tracing(): message = """Info: Tracing is disabled. @@ -132,6 +170,8 @@ To enable tracing, do any one of these: def print(self, *args: Any, **kwargs: Any) -> None: """Print to console. Simplified to only handle panel-based output.""" + if should_suppress_console_output(): + return # Skip blank lines during streaming if len(args) == 0 and self._is_streaming: return @@ -488,6 +528,9 @@ To enable tracing, do any one of these: if not self.verbose: return + if should_suppress_console_output(): + return + self._is_streaming = True self._last_stream_call_type = call_type diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 1b4665620..64bd86d53 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -45,6 +45,7 @@ from crewai.events.listeners.tracing.utils import ( has_user_declined_tracing, set_tracing_enabled, should_enable_tracing, + should_suppress_tracing_messages, ) from crewai.events.types.flow_events import ( FlowCreatedEvent, @@ -2074,12 +2075,14 @@ class Flow(Generic[T], metaclass=FlowMeta): racing_members, other_listeners, listener_result, - triggering_event_id, + current_triggering_event_id, ) else: tasks = [ self._execute_single_listener( - listener_name, listener_result, triggering_event_id + listener_name, + listener_result, + current_triggering_event_id, ) for listener_name in listeners_triggered ] @@ -2626,6 +2629,8 @@ class Flow(Generic[T], metaclass=FlowMeta): @staticmethod def _show_tracing_disabled_message() -> None: """Show a message when tracing is disabled.""" + if should_suppress_tracing_messages(): + return console = Console() diff --git a/lib/crewai/src/crewai/tasks/hallucination_guardrail.py b/lib/crewai/src/crewai/tasks/hallucination_guardrail.py index dd000a83c..c48b890a8 100644 --- a/lib/crewai/src/crewai/tasks/hallucination_guardrail.py +++ b/lib/crewai/src/crewai/tasks/hallucination_guardrail.py @@ -6,6 +6,7 @@ Classes: HallucinationGuardrail: Placeholder guardrail that validates task outputs. """ +from collections.abc import Callable from typing import Any from crewai.llm import LLM @@ -13,32 +14,36 @@ from crewai.tasks.task_output import TaskOutput from crewai.utilities.logger import Logger +_validate_output_hook: Callable[..., tuple[bool, Any]] | None = None + + class HallucinationGuardrail: """Placeholder for the HallucinationGuardrail feature. Attributes: - context: The reference context that outputs would be checked against. + context: Optional reference context that outputs would be checked against. llm: The language model that would be used for evaluation. threshold: Optional minimum faithfulness score that would be required to pass. tool_response: Optional tool response information that would be used in evaluation. Examples: - >>> # Basic usage with default verdict logic + >>> # Basic usage without context (uses task expected_output as context) + >>> guardrail = HallucinationGuardrail(llm=agent.llm) + + >>> # With context for reference >>> guardrail = HallucinationGuardrail( - ... context="AI helps with various tasks including analysis and generation.", ... llm=agent.llm, + ... context="AI helps with various tasks including analysis and generation.", ... ) >>> # With custom threshold for stricter validation >>> strict_guardrail = HallucinationGuardrail( - ... context="Quantum computing uses qubits in superposition.", ... llm=agent.llm, - ... threshold=8.0, # Would require score >= 8 to pass in enterprise version + ... threshold=8.0, # Require score >= 8 to pass ... ) >>> # With tool response for additional context >>> guardrail_with_tools = HallucinationGuardrail( - ... context="The current weather data", ... llm=agent.llm, ... tool_response="Weather API returned: Temperature 22°C, Humidity 65%", ... ) @@ -46,16 +51,17 @@ class HallucinationGuardrail: def __init__( self, - context: str, llm: LLM, + context: str | None = None, threshold: float | None = None, tool_response: str = "", ): """Initialize the HallucinationGuardrail placeholder. Args: - context: The reference context that outputs would be checked against. llm: The language model that would be used for evaluation. + context: Optional reference context that outputs would be checked against. + If not provided, the task's expected_output will be used as context. threshold: Optional minimum faithfulness score that would be required to pass. tool_response: Optional tool response information that would be used in evaluation. """ @@ -78,16 +84,17 @@ class HallucinationGuardrail: def __call__(self, task_output: TaskOutput) -> tuple[bool, Any]: """Validate a task output against hallucination criteria. - In the open source, this method always returns that the output is valid. - Args: task_output: The output to be validated. Returns: A tuple containing: - - True - - The raw task output + - True if validation passed, False otherwise + - The raw task output if valid, or error feedback if invalid """ + if callable(_validate_output_hook): + return _validate_output_hook(self, task_output) + self._logger.log( "warning", "Premium hallucination detection skipped (use for free at https://app.crewai.com)\n", diff --git a/lib/crewai/src/crewai/utilities/agent_utils.py b/lib/crewai/src/crewai/utilities/agent_utils.py index 1ede7f07d..ee76dc53f 100644 --- a/lib/crewai/src/crewai/utilities/agent_utils.py +++ b/lib/crewai/src/crewai/utilities/agent_utils.py @@ -42,6 +42,8 @@ if TYPE_CHECKING: from crewai.llm import LLM from crewai.task import Task +_create_plus_client_hook: Callable[[], Any] | None = None + class SummaryContent(TypedDict): """Structure for summary content entries. @@ -91,7 +93,11 @@ def parse_tools(tools: list[BaseTool]) -> list[CrewStructuredTool]: for tool in tools: if isinstance(tool, CrewAITool): - tools_list.append(tool.to_structured_tool()) + structured_tool = tool.to_structured_tool() + structured_tool.current_usage_count = 0 + if structured_tool._original_tool: + structured_tool._original_tool.current_usage_count = 0 + tools_list.append(structured_tool) else: raise ValueError("Tool is not a CrewStructuredTool or BaseTool") @@ -818,10 +824,13 @@ def load_agent_from_repository(from_repository: str) -> dict[str, Any]: if from_repository: import importlib - from crewai.cli.authentication.token import get_auth_token - from crewai.cli.plus_api import PlusAPI + if callable(_create_plus_client_hook): + client = _create_plus_client_hook() + else: + from crewai.cli.authentication.token import get_auth_token + from crewai.cli.plus_api import PlusAPI - client = PlusAPI(api_key=get_auth_token()) + client = PlusAPI(api_key=get_auth_token()) _print_current_organization() response = client.get_agent(from_repository) if response.status_code == 404: diff --git a/lib/crewai/src/crewai/utilities/printer.py b/lib/crewai/src/crewai/utilities/printer.py index c40de684e..949da543a 100644 --- a/lib/crewai/src/crewai/utilities/printer.py +++ b/lib/crewai/src/crewai/utilities/printer.py @@ -4,6 +4,8 @@ from __future__ import annotations from typing import TYPE_CHECKING, Final, Literal, NamedTuple +from crewai.events.utils.console_formatter import should_suppress_console_output + if TYPE_CHECKING: from _typeshed import SupportsWrite @@ -77,6 +79,8 @@ class Printer: file: A file-like object (stream); defaults to the current sys.stdout. flush: Whether to forcibly flush the stream. """ + if should_suppress_console_output(): + return if isinstance(content, str): content = [ColoredText(content, color)] print( diff --git a/lib/crewai/tests/agents/agent_adapters/test_base_agent_adapter.py b/lib/crewai/tests/agents/agent_adapters/test_base_agent_adapter.py index 6ed42b5d1..78320d187 100644 --- a/lib/crewai/tests/agents/agent_adapters/test_base_agent_adapter.py +++ b/lib/crewai/tests/agents/agent_adapters/test_base_agent_adapter.py @@ -51,6 +51,10 @@ class ConcreteAgentAdapter(BaseAgentAdapter): # Dummy implementation for MCP tools return [] + def configure_structured_output(self, task: Any) -> None: + # Dummy implementation for structured output + pass + async def aexecute_task( self, task: Any,