diff --git a/lib/crewai/src/crewai/agent/core.py b/lib/crewai/src/crewai/agent/core.py index 6c2626a28..47eb841b4 100644 --- a/lib/crewai/src/crewai/agent/core.py +++ b/lib/crewai/src/crewai/agent/core.py @@ -118,6 +118,8 @@ MCP_TOOL_EXECUTION_TIMEOUT: Final[int] = 30 MCP_DISCOVERY_TIMEOUT: Final[int] = 15 MCP_MAX_RETRIES: Final[int] = 3 +_passthrough_exceptions: tuple[type[Exception], ...] = () + # Simple in-memory cache for MCP tool schemas (duration: 5 minutes) _mcp_schema_cache: dict[str, Any] = {} _cache_ttl: Final[int] = 300 # 5 minutes @@ -479,6 +481,8 @@ class Agent(BaseAgent): ), ) raise e + if isinstance(e, _passthrough_exceptions): + raise self._times_executed += 1 if self._times_executed > self.max_retry_limit: crewai_event_bus.emit( @@ -711,6 +715,8 @@ class Agent(BaseAgent): ), ) raise e + if isinstance(e, _passthrough_exceptions): + raise self._times_executed += 1 if self._times_executed > self.max_retry_limit: crewai_event_bus.emit( diff --git a/lib/crewai/src/crewai/agents/agent_adapters/base_agent_adapter.py b/lib/crewai/src/crewai/agents/agent_adapters/base_agent_adapter.py index 8001628ed..12dfcf43c 100644 --- a/lib/crewai/src/crewai/agents/agent_adapters/base_agent_adapter.py +++ b/lib/crewai/src/crewai/agents/agent_adapters/base_agent_adapter.py @@ -37,9 +37,10 @@ class BaseAgentAdapter(BaseAgent, ABC): tools: Optional list of BaseTool instances to be configured """ - def configure_structured_output(self, structured_output: Any) -> None: + @abstractmethod + def configure_structured_output(self, task: Any) -> None: """Configure the structured output for the specific agent implementation. Args: - structured_output: The structured output to be configured + task: The task object containing output format specifications. """ diff --git a/lib/crewai/src/crewai/agents/agent_builder/base_agent_executor_mixin.py b/lib/crewai/src/crewai/agents/agent_builder/base_agent_executor_mixin.py index c9dceaa84..03787c802 100644 --- a/lib/crewai/src/crewai/agents/agent_builder/base_agent_executor_mixin.py +++ b/lib/crewai/src/crewai/agents/agent_builder/base_agent_executor_mixin.py @@ -4,7 +4,6 @@ import time from typing import TYPE_CHECKING from crewai.agents.parser import AgentFinish -from crewai.events.event_listener import event_listener from crewai.memory.entity.entity_memory_item import EntityMemoryItem from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem from crewai.utilities.converter import ConverterError @@ -138,52 +137,3 @@ class CrewAgentExecutorMixin: content="Long term memory is enabled, but entity memory is not enabled. Please configure entity memory or set memory=True to automatically enable it.", color="bold_yellow", ) - - def _ask_human_input(self, final_answer: str) -> str: - """Prompt human input with mode-appropriate messaging. - - Note: The final answer is already displayed via the AgentLogsExecutionEvent - panel, so we only show the feedback prompt here. - """ - from rich.panel import Panel - from rich.text import Text - - formatter = event_listener.formatter - formatter.pause_live_updates() - - try: - # Training mode prompt (single iteration) - if self.crew and getattr(self.crew, "_train", False): - prompt_text = ( - "TRAINING MODE: Provide feedback to improve the agent's performance.\n\n" - "This will be used to train better versions of the agent.\n" - "Please provide detailed feedback about the result quality and reasoning process." - ) - title = "🎓 Training Feedback Required" - # Regular human-in-the-loop prompt (multiple iterations) - else: - prompt_text = ( - "Provide feedback on the Final Result above.\n\n" - "• If you are happy with the result, simply hit Enter without typing anything.\n" - "• Otherwise, provide specific improvement requests.\n" - "• You can provide multiple rounds of feedback until satisfied." - ) - title = "💬 Human Feedback Required" - - content = Text() - content.append(prompt_text, style="yellow") - - prompt_panel = Panel( - content, - title=title, - border_style="yellow", - padding=(1, 2), - ) - formatter.console.print(prompt_panel) - - response = input() - if response.strip() != "": - formatter.console.print("\n[cyan]Processing your feedback...[/cyan]") - return response - finally: - formatter.resume_live_updates() diff --git a/lib/crewai/src/crewai/agents/crew_agent_executor.py b/lib/crewai/src/crewai/agents/crew_agent_executor.py index 1218ceae8..647596f2a 100644 --- a/lib/crewai/src/crewai/agents/crew_agent_executor.py +++ b/lib/crewai/src/crewai/agents/crew_agent_executor.py @@ -19,6 +19,7 @@ from crewai.agents.parser import ( AgentFinish, OutputParserError, ) +from crewai.core.providers.human_input import ExecutorContext, get_provider from crewai.events.event_bus import crewai_event_bus from crewai.events.types.logging_events import ( AgentLogsExecutionEvent, @@ -175,15 +176,16 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): """ return self.llm.supports_stop_words() if self.llm else False - def invoke(self, inputs: dict[str, Any]) -> dict[str, Any]: - """Execute the agent with given inputs. + def _setup_messages(self, inputs: dict[str, Any]) -> None: + """Set up messages for the agent execution. Args: inputs: Input dictionary containing prompt variables. - - Returns: - Dictionary with agent output. """ + provider = get_provider() + if provider.setup_messages(cast(ExecutorContext, cast(object, self))): + return + if "system" in self.prompt: system_prompt = self._format_prompt( cast(str, self.prompt.get("system", "")), inputs @@ -197,6 +199,19 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): user_prompt = self._format_prompt(self.prompt.get("prompt", ""), inputs) self.messages.append(format_message_for_llm(user_prompt)) + provider.post_setup_messages(cast(ExecutorContext, cast(object, self))) + + def invoke(self, inputs: dict[str, Any]) -> dict[str, Any]: + """Execute the agent with given inputs. + + Args: + inputs: Input dictionary containing prompt variables. + + Returns: + Dictionary with agent output. + """ + self._setup_messages(inputs) + self._inject_multimodal_files(inputs) self._show_start_logs() @@ -799,6 +814,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): agent_key=agent_key, ), ) + error_event_emitted = False track_delegation_if_needed(func_name, args_dict, self.task) @@ -881,6 +897,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): error=e, ), ) + error_event_emitted = True elif max_usage_reached and original_tool: # Return error message when max usage limit is reached result = f"Tool '{func_name}' has reached its usage limit of {original_tool.max_usage_count} times and cannot be used anymore." @@ -908,20 +925,20 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): color="red", ) - # Emit tool usage finished event - crewai_event_bus.emit( - self, - event=ToolUsageFinishedEvent( - output=result, - tool_name=func_name, - tool_args=args_dict, - from_agent=self.agent, - from_task=self.task, - agent_key=agent_key, - started_at=started_at, - finished_at=datetime.now(), - ), - ) + if not error_event_emitted: + crewai_event_bus.emit( + self, + event=ToolUsageFinishedEvent( + output=result, + tool_name=func_name, + tool_args=args_dict, + from_agent=self.agent, + from_task=self.task, + agent_key=agent_key, + started_at=started_at, + finished_at=datetime.now(), + ), + ) # Append tool result message tool_message: LLMMessage = { @@ -970,18 +987,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): Returns: Dictionary with agent output. """ - if "system" in self.prompt: - system_prompt = self._format_prompt( - cast(str, self.prompt.get("system", "")), inputs - ) - user_prompt = self._format_prompt( - cast(str, self.prompt.get("user", "")), inputs - ) - self.messages.append(format_message_for_llm(system_prompt, role="system")) - self.messages.append(format_message_for_llm(user_prompt)) - else: - user_prompt = self._format_prompt(self.prompt.get("prompt", ""), inputs) - self.messages.append(format_message_for_llm(user_prompt)) + self._setup_messages(inputs) await self._ainject_multimodal_files(inputs) @@ -1491,7 +1497,7 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): return prompt.replace("{tools}", inputs["tools"]) def _handle_human_feedback(self, formatted_answer: AgentFinish) -> AgentFinish: - """Process human feedback. + """Process human feedback via the configured provider. Args: formatted_answer: Initial agent result. @@ -1499,17 +1505,8 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): Returns: Final answer after feedback. """ - output_str = ( - formatted_answer.output - if isinstance(formatted_answer.output, str) - else formatted_answer.output.model_dump_json() - ) - human_feedback = self._ask_human_input(output_str) - - if self._is_training_mode(): - return self._handle_training_feedback(formatted_answer, human_feedback) - - return self._handle_regular_feedback(formatted_answer, human_feedback) + provider = get_provider() + return provider.handle_feedback(formatted_answer, self) def _is_training_mode(self) -> bool: """Check if training mode is active. @@ -1519,74 +1516,18 @@ class CrewAgentExecutor(CrewAgentExecutorMixin): """ return bool(self.crew and self.crew._train) - def _handle_training_feedback( - self, initial_answer: AgentFinish, feedback: str - ) -> AgentFinish: - """Process training feedback. + def _format_feedback_message(self, feedback: str) -> LLMMessage: + """Format feedback as a message for the LLM. Args: - initial_answer: Initial agent output. - feedback: Training feedback. + feedback: User feedback string. Returns: - Improved answer. + Formatted message dict. """ - self._handle_crew_training_output(initial_answer, feedback) - self.messages.append( - format_message_for_llm( - self._i18n.slice("feedback_instructions").format(feedback=feedback) - ) + return format_message_for_llm( + self._i18n.slice("feedback_instructions").format(feedback=feedback) ) - improved_answer = self._invoke_loop() - self._handle_crew_training_output(improved_answer) - self.ask_for_human_input = False - return improved_answer - - def _handle_regular_feedback( - self, current_answer: AgentFinish, initial_feedback: str - ) -> AgentFinish: - """Process regular feedback iteratively. - - Args: - current_answer: Current agent output. - initial_feedback: Initial user feedback. - - Returns: - Final answer after iterations. - """ - feedback = initial_feedback - answer = current_answer - - while self.ask_for_human_input: - # If the user provides a blank response, assume they are happy with the result - if feedback.strip() == "": - self.ask_for_human_input = False - else: - answer = self._process_feedback_iteration(feedback) - output_str = ( - answer.output - if isinstance(answer.output, str) - else answer.output.model_dump_json() - ) - feedback = self._ask_human_input(output_str) - - return answer - - def _process_feedback_iteration(self, feedback: str) -> AgentFinish: - """Process single feedback iteration. - - Args: - feedback: User feedback. - - Returns: - Updated agent response. - """ - self.messages.append( - format_message_for_llm( - self._i18n.slice("feedback_instructions").format(feedback=feedback) - ) - ) - return self._invoke_loop() @classmethod def __get_pydantic_core_schema__( diff --git a/lib/crewai/src/crewai/core/__init__.py b/lib/crewai/src/crewai/core/__init__.py new file mode 100644 index 000000000..714ed9161 --- /dev/null +++ b/lib/crewai/src/crewai/core/__init__.py @@ -0,0 +1 @@ +"""Core crewAI components and interfaces.""" diff --git a/lib/crewai/src/crewai/core/providers/__init__.py b/lib/crewai/src/crewai/core/providers/__init__.py new file mode 100644 index 000000000..fc0a9ed4b --- /dev/null +++ b/lib/crewai/src/crewai/core/providers/__init__.py @@ -0,0 +1 @@ +"""Provider interfaces for extensible crewAI components.""" diff --git a/lib/crewai/src/crewai/core/providers/content_processor.py b/lib/crewai/src/crewai/core/providers/content_processor.py new file mode 100644 index 000000000..828a7e311 --- /dev/null +++ b/lib/crewai/src/crewai/core/providers/content_processor.py @@ -0,0 +1,78 @@ +"""Content processor provider for extensible content processing.""" + +from contextvars import ContextVar +from typing import Any, Protocol, runtime_checkable + + +@runtime_checkable +class ContentProcessorProvider(Protocol): + """Protocol for content processing during task execution.""" + + def process(self, content: str, context: dict[str, Any] | None = None) -> str: + """Process content before use. + + Args: + content: The content to process. + context: Optional context information. + + Returns: + The processed content. + """ + ... + + +class NoOpContentProcessor: + """Default processor that returns content unchanged.""" + + def process(self, content: str, context: dict[str, Any] | None = None) -> str: + """Return content unchanged. + + Args: + content: The content to process. + context: Optional context information (unused). + + Returns: + The original content unchanged. + """ + return content + + +_content_processor: ContextVar[ContentProcessorProvider | None] = ContextVar( + "_content_processor", default=None +) + +_default_processor = NoOpContentProcessor() + + +def get_processor() -> ContentProcessorProvider: + """Get the current content processor. + + Returns: + The registered content processor or the default no-op processor. + """ + processor = _content_processor.get() + if processor is not None: + return processor + return _default_processor + + +def set_processor(processor: ContentProcessorProvider) -> None: + """Set the content processor for the current context. + + Args: + processor: The content processor to use. + """ + _content_processor.set(processor) + + +def process_content(content: str, context: dict[str, Any] | None = None) -> str: + """Process content using the registered processor. + + Args: + content: The content to process. + context: Optional context information. + + Returns: + The processed content. + """ + return get_processor().process(content, context) diff --git a/lib/crewai/src/crewai/core/providers/human_input.py b/lib/crewai/src/crewai/core/providers/human_input.py new file mode 100644 index 000000000..4062e6bb9 --- /dev/null +++ b/lib/crewai/src/crewai/core/providers/human_input.py @@ -0,0 +1,304 @@ +"""Human input provider for HITL (Human-in-the-Loop) flows.""" + +from __future__ import annotations + +from contextvars import ContextVar, Token +from typing import TYPE_CHECKING, Protocol, runtime_checkable + + +if TYPE_CHECKING: + from crewai.agent.core import Agent + from crewai.agents.parser import AgentFinish + from crewai.crew import Crew + from crewai.llms.base_llm import BaseLLM + from crewai.task import Task + from crewai.utilities.types import LLMMessage + + +class ExecutorContext(Protocol): + """Context interface for human input providers to interact with executor.""" + + task: Task | None + crew: Crew | None + messages: list[LLMMessage] + ask_for_human_input: bool + llm: BaseLLM + agent: Agent + + def _invoke_loop(self) -> AgentFinish: + """Invoke the agent loop and return the result.""" + ... + + def _is_training_mode(self) -> bool: + """Check if training mode is active.""" + ... + + def _handle_crew_training_output( + self, + result: AgentFinish, + human_feedback: str | None = None, + ) -> None: + """Handle training output.""" + ... + + def _format_feedback_message(self, feedback: str) -> LLMMessage: + """Format feedback as a message.""" + ... + + +@runtime_checkable +class HumanInputProvider(Protocol): + """Protocol for human input handling. + + Implementations handle the full feedback flow: + - Sync: prompt user, loop until satisfied + - Async: raise exception for external handling + """ + + def setup_messages(self, context: ExecutorContext) -> bool: + """Set up messages for execution. + + Called before standard message setup. Allows providers to handle + conversation resumption or other custom message initialization. + + Args: + context: Executor context with messages list to modify. + + Returns: + True if messages were set up (skip standard setup), + False to use standard setup. + """ + ... + + def post_setup_messages(self, context: ExecutorContext) -> None: + """Called after standard message setup. + + Allows providers to modify messages after standard setup completes. + Only called when setup_messages returned False. + + Args: + context: Executor context with messages list to modify. + """ + ... + + def handle_feedback( + self, + formatted_answer: AgentFinish, + context: ExecutorContext, + ) -> AgentFinish: + """Handle the full human feedback flow. + + Args: + formatted_answer: The agent's current answer. + context: Executor context for callbacks. + + Returns: + The final answer after feedback processing. + + Raises: + Exception: Async implementations may raise to signal external handling. + """ + ... + + @staticmethod + def _get_output_string(answer: AgentFinish) -> str: + """Extract output string from answer. + + Args: + answer: The agent's finished answer. + + Returns: + String representation of the output. + """ + if isinstance(answer.output, str): + return answer.output + return answer.output.model_dump_json() + + +class SyncHumanInputProvider(HumanInputProvider): + """Default synchronous human input via terminal.""" + + def setup_messages(self, context: ExecutorContext) -> bool: + """Use standard message setup. + + Args: + context: Executor context (unused). + + Returns: + False to use standard setup. + """ + return False + + def post_setup_messages(self, context: ExecutorContext) -> None: + """No-op for sync provider. + + Args: + context: Executor context (unused). + """ + + def handle_feedback( + self, + formatted_answer: AgentFinish, + context: ExecutorContext, + ) -> AgentFinish: + """Handle feedback synchronously with terminal prompts. + + Args: + formatted_answer: The agent's current answer. + context: Executor context for callbacks. + + Returns: + The final answer after feedback processing. + """ + feedback = self._prompt_input(context.crew) + + if context._is_training_mode(): + return self._handle_training_feedback(formatted_answer, feedback, context) + + return self._handle_regular_feedback(formatted_answer, feedback, context) + + @staticmethod + def _handle_training_feedback( + initial_answer: AgentFinish, + feedback: str, + context: ExecutorContext, + ) -> AgentFinish: + """Process training feedback (single iteration). + + Args: + initial_answer: The agent's initial answer. + feedback: Human feedback string. + context: Executor context for callbacks. + + Returns: + Improved answer after processing feedback. + """ + context._handle_crew_training_output(initial_answer, feedback) + context.messages.append(context._format_feedback_message(feedback)) + improved_answer = context._invoke_loop() + context._handle_crew_training_output(improved_answer) + context.ask_for_human_input = False + return improved_answer + + def _handle_regular_feedback( + self, + current_answer: AgentFinish, + initial_feedback: str, + context: ExecutorContext, + ) -> AgentFinish: + """Process regular feedback with iteration loop. + + Args: + current_answer: The agent's current answer. + initial_feedback: Initial human feedback string. + context: Executor context for callbacks. + + Returns: + Final answer after all feedback iterations. + """ + feedback = initial_feedback + answer = current_answer + + while context.ask_for_human_input: + if feedback.strip() == "": + context.ask_for_human_input = False + else: + context.messages.append(context._format_feedback_message(feedback)) + answer = context._invoke_loop() + feedback = self._prompt_input(context.crew) + + return answer + + @staticmethod + def _prompt_input(crew: Crew | None) -> str: + """Show rich panel and prompt for input. + + Args: + crew: The crew instance for context. + + Returns: + User input string from terminal. + """ + from rich.panel import Panel + from rich.text import Text + + from crewai.events.event_listener import event_listener + + formatter = event_listener.formatter + formatter.pause_live_updates() + + try: + if crew and getattr(crew, "_train", False): + prompt_text = ( + "TRAINING MODE: Provide feedback to improve the agent's performance.\n\n" + "This will be used to train better versions of the agent.\n" + "Please provide detailed feedback about the result quality and reasoning process." + ) + title = "🎓 Training Feedback Required" + else: + prompt_text = ( + "Provide feedback on the Final Result above.\n\n" + "• If you are happy with the result, simply hit Enter without typing anything.\n" + "• Otherwise, provide specific improvement requests.\n" + "• You can provide multiple rounds of feedback until satisfied." + ) + title = "💬 Human Feedback Required" + + content = Text() + content.append(prompt_text, style="yellow") + + prompt_panel = Panel( + content, + title=title, + border_style="yellow", + padding=(1, 2), + ) + formatter.console.print(prompt_panel) + + response = input() + if response.strip() != "": + formatter.console.print("\n[cyan]Processing your feedback...[/cyan]") + return response + finally: + formatter.resume_live_updates() + + +_provider: ContextVar[HumanInputProvider | None] = ContextVar( + "human_input_provider", + default=None, +) + + +def get_provider() -> HumanInputProvider: + """Get the current human input provider. + + Returns: + The current provider, or a new SyncHumanInputProvider if none set. + """ + provider = _provider.get() + if provider is None: + initialized_provider = SyncHumanInputProvider() + set_provider(initialized_provider) + return initialized_provider + return provider + + +def set_provider(provider: HumanInputProvider) -> Token[HumanInputProvider | None]: + """Set the human input provider for the current context. + + Args: + provider: The provider to use. + + Returns: + Token that can be used to reset to previous value. + """ + return _provider.set(provider) + + +def reset_provider(token: Token[HumanInputProvider | None]) -> None: + """Reset the provider to its previous value. + + Args: + token: Token returned from set_provider. + """ + _provider.reset(token) diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index f22dd4b72..09a103eba 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -751,6 +751,8 @@ class Crew(FlowTrackable, BaseModel): for after_callback in self.after_kickoff_callbacks: result = after_callback(result) + result = self._post_kickoff(result) + self.usage_metrics = self.calculate_usage_metrics() return result @@ -764,6 +766,9 @@ class Crew(FlowTrackable, BaseModel): clear_files(self.id) detach(token) + def _post_kickoff(self, result: CrewOutput) -> CrewOutput: + return result + def kickoff_for_each( self, inputs: list[dict[str, Any]], @@ -936,6 +941,8 @@ class Crew(FlowTrackable, BaseModel): for after_callback in self.after_kickoff_callbacks: result = after_callback(result) + result = self._post_kickoff(result) + self.usage_metrics = self.calculate_usage_metrics() return result @@ -1181,6 +1188,9 @@ class Crew(FlowTrackable, BaseModel): self.manager_agent = manager manager.crew = self + def _get_execution_start_index(self, tasks: list[Task]) -> int | None: + return None + def _execute_tasks( self, tasks: list[Task], @@ -1197,6 +1207,9 @@ class Crew(FlowTrackable, BaseModel): Returns: CrewOutput: Final output of the crew """ + custom_start = self._get_execution_start_index(tasks) + if custom_start is not None: + start_index = custom_start task_outputs: list[TaskOutput] = [] futures: list[tuple[Task, Future[TaskOutput], int]] = [] @@ -1305,8 +1318,10 @@ class Crew(FlowTrackable, BaseModel): if files: supported_types: list[str] = [] if agent and agent.llm and agent.llm.supports_multimodal(): - provider = getattr(agent.llm, "provider", None) or getattr( - agent.llm, "model", "openai" + provider = ( + getattr(agent.llm, "provider", None) + or getattr(agent.llm, "model", None) + or "openai" ) api = getattr(agent.llm, "api", None) supported_types = get_supported_content_types(provider, api) @@ -2011,7 +2026,13 @@ class Crew(FlowTrackable, BaseModel): @staticmethod def _show_tracing_disabled_message() -> None: """Show a message when tracing is disabled.""" - from crewai.events.listeners.tracing.utils import has_user_declined_tracing + from crewai.events.listeners.tracing.utils import ( + has_user_declined_tracing, + should_suppress_tracing_messages, + ) + + if should_suppress_tracing_messages(): + return console = Console() diff --git a/lib/crewai/src/crewai/events/__init__.py b/lib/crewai/src/crewai/events/__init__.py index 61c0ec380..a6f213a54 100644 --- a/lib/crewai/src/crewai/events/__init__.py +++ b/lib/crewai/src/crewai/events/__init__.py @@ -195,6 +195,7 @@ __all__ = [ "ToolUsageFinishedEvent", "ToolUsageStartedEvent", "ToolValidateInputErrorEvent", + "_extension_exports", "crewai_event_bus", ] @@ -210,14 +211,29 @@ _AGENT_EVENT_MAPPING = { "LiteAgentExecutionStartedEvent": "crewai.events.types.agent_events", } +_extension_exports: dict[str, Any] = {} + def __getattr__(name: str) -> Any: - """Lazy import for agent events to avoid circular imports.""" + """Lazy import for agent events and registered extensions.""" if name in _AGENT_EVENT_MAPPING: import importlib module_path = _AGENT_EVENT_MAPPING[name] module = importlib.import_module(module_path) return getattr(module, name) + + if name in _extension_exports: + import importlib + + value = _extension_exports[name] + if isinstance(value, str): + module_path, _, attr_name = value.rpartition(".") + if module_path: + module = importlib.import_module(module_path) + return getattr(module, attr_name) + return importlib.import_module(value) + return value + msg = f"module {__name__!r} has no attribute {name!r}" raise AttributeError(msg) diff --git a/lib/crewai/src/crewai/events/event_bus.py b/lib/crewai/src/crewai/events/event_bus.py index 5c4bec58f..d0aaa4455 100644 --- a/lib/crewai/src/crewai/events/event_bus.py +++ b/lib/crewai/src/crewai/events/event_bus.py @@ -227,6 +227,39 @@ class CrewAIEventsBus: return decorator + def off( + self, + event_type: type[BaseEvent], + handler: Callable[..., Any], + ) -> None: + """Unregister an event handler for a specific event type. + + Args: + event_type: The event class to stop listening for + handler: The handler function to unregister + """ + with self._rwlock.w_locked(): + if event_type in self._sync_handlers: + existing_sync = self._sync_handlers[event_type] + if handler in existing_sync: + self._sync_handlers[event_type] = existing_sync - {handler} + if not self._sync_handlers[event_type]: + del self._sync_handlers[event_type] + + if event_type in self._async_handlers: + existing_async = self._async_handlers[event_type] + if handler in existing_async: + self._async_handlers[event_type] = existing_async - {handler} + if not self._async_handlers[event_type]: + del self._async_handlers[event_type] + + if event_type in self._handler_dependencies: + self._handler_dependencies[event_type].pop(handler, None) + if not self._handler_dependencies[event_type]: + del self._handler_dependencies[event_type] + + self._execution_plan_cache.pop(event_type, None) + def _call_handlers( self, source: Any, diff --git a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py index ee337d7fd..0e4d7d8a2 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/trace_listener.py @@ -797,7 +797,13 @@ class TraceCollectionListener(BaseEventListener): from rich.console import Console from rich.panel import Panel - from crewai.events.listeners.tracing.utils import has_user_declined_tracing + from crewai.events.listeners.tracing.utils import ( + has_user_declined_tracing, + should_suppress_tracing_messages, + ) + + if should_suppress_tracing_messages(): + return console = Console() diff --git a/lib/crewai/src/crewai/events/listeners/tracing/utils.py b/lib/crewai/src/crewai/events/listeners/tracing/utils.py index 13e26dacb..a98142619 100644 --- a/lib/crewai/src/crewai/events/listeners/tracing/utils.py +++ b/lib/crewai/src/crewai/events/listeners/tracing/utils.py @@ -1,3 +1,4 @@ +from collections.abc import Callable from contextvars import ContextVar, Token from datetime import datetime import getpass @@ -26,6 +27,35 @@ logger = logging.getLogger(__name__) _tracing_enabled: ContextVar[bool | None] = ContextVar("_tracing_enabled", default=None) +_first_time_trace_hook: ContextVar[Callable[[], bool] | None] = ContextVar( + "_first_time_trace_hook", default=None +) + +_suppress_tracing_messages: ContextVar[bool] = ContextVar( + "_suppress_tracing_messages", default=False +) + + +def set_suppress_tracing_messages(suppress: bool) -> object: + """Set whether to suppress tracing-related console messages. + + Args: + suppress: True to suppress messages, False to show them. + + Returns: + A token that can be used to restore the previous value. + """ + return _suppress_tracing_messages.set(suppress) + + +def should_suppress_tracing_messages() -> bool: + """Check if tracing messages should be suppressed. + + Returns: + True if messages should be suppressed, False otherwise. + """ + return _suppress_tracing_messages.get() + def should_enable_tracing(*, override: bool | None = None) -> bool: """Determine if tracing should be enabled. @@ -407,10 +437,13 @@ def truncate_messages( def should_auto_collect_first_time_traces() -> bool: """True if we should auto-collect traces for first-time user. - Returns: True if first-time user AND telemetry not disabled AND tracing not explicitly enabled, False otherwise. """ + hook = _first_time_trace_hook.get() + if hook is not None: + return hook() + if _is_test_environment(): return False @@ -432,6 +465,9 @@ def prompt_user_for_trace_viewing(timeout_seconds: int = 20) -> bool: if _is_test_environment(): return False + if should_suppress_tracing_messages(): + return False + try: import threading diff --git a/lib/crewai/src/crewai/events/types/tool_usage_events.py b/lib/crewai/src/crewai/events/types/tool_usage_events.py index 7fe9b897f..c4e681546 100644 --- a/lib/crewai/src/crewai/events/types/tool_usage_events.py +++ b/lib/crewai/src/crewai/events/types/tool_usage_events.py @@ -16,7 +16,7 @@ class ToolUsageEvent(BaseEvent): tool_name: str tool_args: dict[str, Any] | str tool_class: str | None = None - run_attempts: int | None = None + run_attempts: int = 0 delegations: int | None = None agent: Any | None = None task_name: str | None = None @@ -26,7 +26,7 @@ class ToolUsageEvent(BaseEvent): model_config = ConfigDict(arbitrary_types_allowed=True) - def __init__(self, **data): + def __init__(self, **data: Any) -> None: if data.get("from_task"): task = data["from_task"] data["task_id"] = str(task.id) @@ -96,10 +96,10 @@ class ToolExecutionErrorEvent(BaseEvent): type: str = "tool_execution_error" tool_name: str tool_args: dict[str, Any] - tool_class: Callable + tool_class: Callable[..., Any] agent: Any | None = None - def __init__(self, **data): + def __init__(self, **data: Any) -> None: super().__init__(**data) # Set fingerprint data from the agent if self.agent and hasattr(self.agent, "fingerprint") and self.agent.fingerprint: diff --git a/lib/crewai/src/crewai/events/utils/console_formatter.py b/lib/crewai/src/crewai/events/utils/console_formatter.py index ac6caabcf..ee466c344 100644 --- a/lib/crewai/src/crewai/events/utils/console_formatter.py +++ b/lib/crewai/src/crewai/events/utils/console_formatter.py @@ -1,3 +1,4 @@ +from contextvars import ContextVar import os import threading from typing import Any, ClassVar, cast @@ -10,6 +11,36 @@ from rich.text import Text from crewai.cli.version import is_newer_version_available +_disable_version_check: ContextVar[bool] = ContextVar( + "_disable_version_check", default=False +) + +_suppress_console_output: ContextVar[bool] = ContextVar( + "_suppress_console_output", default=False +) + + +def set_suppress_console_output(suppress: bool) -> object: + """Set whether to suppress all console output. + + Args: + suppress: True to suppress output, False to show it. + + Returns: + A token that can be used to restore the previous value. + """ + return _suppress_console_output.set(suppress) + + +def should_suppress_console_output() -> bool: + """Check if console output should be suppressed. + + Returns: + True if output should be suppressed, False otherwise. + """ + return _suppress_console_output.get() + + class ConsoleFormatter: tool_usage_counts: ClassVar[dict[str, int]] = {} @@ -46,9 +77,15 @@ class ConsoleFormatter: if not self.verbose: return + if _disable_version_check.get(): + return + if os.getenv("CI", "").lower() in ("true", "1"): return + if os.getenv("CREWAI_DISABLE_VERSION_CHECK", "").lower() in ("true", "1"): + return + try: is_newer, current, latest = is_newer_version_available() if is_newer and latest: @@ -76,8 +113,12 @@ To update, run: uv sync --upgrade-package crewai""" from crewai.events.listeners.tracing.utils import ( has_user_declined_tracing, is_tracing_enabled_in_context, + should_suppress_tracing_messages, ) + if should_suppress_tracing_messages(): + return + if not is_tracing_enabled_in_context(): if has_user_declined_tracing(): message = """Info: Tracing is disabled. @@ -129,6 +170,8 @@ To enable tracing, do any one of these: def print(self, *args: Any, **kwargs: Any) -> None: """Print to console. Simplified to only handle panel-based output.""" + if should_suppress_console_output(): + return # Skip blank lines during streaming if len(args) == 0 and self._is_streaming: return @@ -485,6 +528,9 @@ To enable tracing, do any one of these: if not self.verbose: return + if should_suppress_console_output(): + return + self._is_streaming = True self._last_stream_call_type = call_type diff --git a/lib/crewai/src/crewai/experimental/agent_executor.py b/lib/crewai/src/crewai/experimental/agent_executor.py index b9d8adccc..87f9e83d4 100644 --- a/lib/crewai/src/crewai/experimental/agent_executor.py +++ b/lib/crewai/src/crewai/experimental/agent_executor.py @@ -18,6 +18,7 @@ from crewai.agents.parser import ( AgentFinish, OutputParserError, ) +from crewai.core.providers.human_input import get_provider from crewai.events.event_bus import crewai_event_bus from crewai.events.listeners.tracing.utils import ( is_tracing_enabled_in_context, @@ -41,7 +42,12 @@ from crewai.hooks.tool_hooks import ( get_after_tool_call_hooks, get_before_tool_call_hooks, ) -from crewai.hooks.types import AfterLLMCallHookType, BeforeLLMCallHookType +from crewai.hooks.types import ( + AfterLLMCallHookCallable, + AfterLLMCallHookType, + BeforeLLMCallHookCallable, + BeforeLLMCallHookType, +) from crewai.utilities.agent_utils import ( convert_tools_to_openai_schema, enforce_rpm_limit, @@ -191,8 +197,12 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): self._instance_id = str(uuid4())[:8] - self.before_llm_call_hooks: list[BeforeLLMCallHookType] = [] - self.after_llm_call_hooks: list[AfterLLMCallHookType] = [] + self.before_llm_call_hooks: list[ + BeforeLLMCallHookType | BeforeLLMCallHookCallable + ] = [] + self.after_llm_call_hooks: list[ + AfterLLMCallHookType | AfterLLMCallHookCallable + ] = [] self.before_llm_call_hooks.extend(get_before_llm_call_hooks()) self.after_llm_call_hooks.extend(get_after_llm_call_hooks()) @@ -207,6 +217,51 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): ) self._state = AgentReActState() + @property + def messages(self) -> list[LLMMessage]: + """Delegate to state for ExecutorContext conformance.""" + return self._state.messages + + @messages.setter + def messages(self, value: list[LLMMessage]) -> None: + """Delegate to state for ExecutorContext conformance.""" + self._state.messages = value + + @property + def ask_for_human_input(self) -> bool: + """Delegate to state for ExecutorContext conformance.""" + return self._state.ask_for_human_input + + @ask_for_human_input.setter + def ask_for_human_input(self, value: bool) -> None: + """Delegate to state for ExecutorContext conformance.""" + self._state.ask_for_human_input = value + + def _invoke_loop(self) -> AgentFinish: + """Invoke the agent loop and return the result. + + Required by ExecutorContext protocol. + """ + self._state.iterations = 0 + self._state.is_finished = False + self._state.current_answer = None + + self.kickoff() + + answer = self._state.current_answer + if not isinstance(answer, AgentFinish): + raise RuntimeError("Agent loop did not produce a final answer") + return answer + + def _format_feedback_message(self, feedback: str) -> LLMMessage: + """Format feedback as a message for the LLM. + + Required by ExecutorContext protocol. + """ + return format_message_for_llm( + self._i18n.slice("feedback_instructions").format(feedback=feedback) + ) + def _ensure_flow_initialized(self) -> None: """Ensure Flow.__init__() has been called. @@ -300,16 +355,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): """ return self._state - @property - def messages(self) -> list[LLMMessage]: - """Compatibility property for mixin - returns state messages.""" - return self._state.messages - - @messages.setter - def messages(self, value: list[LLMMessage]) -> None: - """Set state messages.""" - self._state.messages = value - @property def iterations(self) -> int: """Compatibility property for mixin - returns state iterations.""" @@ -689,6 +734,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): agent_key=agent_key, ), ) + error_event_emitted = False track_delegation_if_needed(func_name, args_dict, self.task) @@ -764,6 +810,7 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): error=e, ), ) + error_event_emitted = True elif max_usage_reached and original_tool: # Return error message when max usage limit is reached result = f"Tool '{func_name}' has reached its usage limit of {original_tool.max_usage_count} times and cannot be used anymore." @@ -792,20 +839,20 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): color="red", ) - # Emit tool usage finished event - crewai_event_bus.emit( - self, - event=ToolUsageFinishedEvent( - output=result, - tool_name=func_name, - tool_args=args_dict, - from_agent=self.agent, - from_task=self.task, - agent_key=agent_key, - started_at=started_at, - finished_at=datetime.now(), - ), - ) + if not error_event_emitted: + crewai_event_bus.emit( + self, + event=ToolUsageFinishedEvent( + output=result, + tool_name=func_name, + tool_args=args_dict, + from_agent=self.agent, + from_task=self.task, + agent_key=agent_key, + started_at=started_at, + finished_at=datetime.now(), + ), + ) # Append tool result message tool_message: LLMMessage = { @@ -1319,17 +1366,8 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): Returns: Final answer after feedback. """ - output_str = ( - str(formatted_answer.output) - if isinstance(formatted_answer.output, BaseModel) - else formatted_answer.output - ) - human_feedback = self._ask_human_input(output_str) - - if self._is_training_mode(): - return self._handle_training_feedback(formatted_answer, human_feedback) - - return self._handle_regular_feedback(formatted_answer, human_feedback) + provider = get_provider() + return provider.handle_feedback(formatted_answer, self) def _is_training_mode(self) -> bool: """Check if training mode is active. @@ -1339,101 +1377,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): """ return bool(self.crew and self.crew._train) - def _handle_training_feedback( - self, initial_answer: AgentFinish, feedback: str - ) -> AgentFinish: - """Process training feedback and generate improved answer. - - Args: - initial_answer: Initial agent output. - feedback: Training feedback. - - Returns: - Improved answer. - """ - self._handle_crew_training_output(initial_answer, feedback) - self.state.messages.append( - format_message_for_llm( - self._i18n.slice("feedback_instructions").format(feedback=feedback) - ) - ) - - # Re-run flow for improved answer - self.state.iterations = 0 - self.state.is_finished = False - self.state.current_answer = None - - self.kickoff() - - # Get improved answer from state - improved_answer = self.state.current_answer - if not isinstance(improved_answer, AgentFinish): - raise RuntimeError( - "Training feedback iteration did not produce final answer" - ) - - self._handle_crew_training_output(improved_answer) - self.state.ask_for_human_input = False - return improved_answer - - def _handle_regular_feedback( - self, current_answer: AgentFinish, initial_feedback: str - ) -> AgentFinish: - """Process regular feedback iteratively until user is satisfied. - - Args: - current_answer: Current agent output. - initial_feedback: Initial user feedback. - - Returns: - Final answer after iterations. - """ - feedback = initial_feedback - answer = current_answer - - while self.state.ask_for_human_input: - if feedback.strip() == "": - self.state.ask_for_human_input = False - else: - answer = self._process_feedback_iteration(feedback) - output_str = ( - str(answer.output) - if isinstance(answer.output, BaseModel) - else answer.output - ) - feedback = self._ask_human_input(output_str) - - return answer - - def _process_feedback_iteration(self, feedback: str) -> AgentFinish: - """Process a single feedback iteration and generate updated response. - - Args: - feedback: User feedback. - - Returns: - Updated agent response. - """ - self.state.messages.append( - format_message_for_llm( - self._i18n.slice("feedback_instructions").format(feedback=feedback) - ) - ) - - # Re-run flow - self.state.iterations = 0 - self.state.is_finished = False - self.state.current_answer = None - - self.kickoff() - - # Get answer from state - answer = self.state.current_answer - if not isinstance(answer, AgentFinish): - raise RuntimeError("Feedback iteration did not produce final answer") - - return answer - @classmethod def __get_pydantic_core_schema__( cls, _source_type: Any, _handler: GetCoreSchemaHandler diff --git a/lib/crewai/src/crewai/flow/async_feedback/__init__.py b/lib/crewai/src/crewai/flow/async_feedback/__init__.py index 286fdaa8d..02590a785 100644 --- a/lib/crewai/src/crewai/flow/async_feedback/__init__.py +++ b/lib/crewai/src/crewai/flow/async_feedback/__init__.py @@ -28,6 +28,8 @@ Example: ``` """ +from typing import Any + from crewai.flow.async_feedback.providers import ConsoleProvider from crewai.flow.async_feedback.types import ( HumanFeedbackPending, @@ -41,4 +43,15 @@ __all__ = [ "HumanFeedbackPending", "HumanFeedbackProvider", "PendingFeedbackContext", + "_extension_exports", ] + +_extension_exports: dict[str, Any] = {} + + +def __getattr__(name: str) -> Any: + """Support extensions via dynamic attribute lookup.""" + if name in _extension_exports: + return _extension_exports[name] + msg = f"module {__name__!r} has no attribute {name!r}" + raise AttributeError(msg) diff --git a/lib/crewai/src/crewai/flow/flow.py b/lib/crewai/src/crewai/flow/flow.py index 1b4665620..64bd86d53 100644 --- a/lib/crewai/src/crewai/flow/flow.py +++ b/lib/crewai/src/crewai/flow/flow.py @@ -45,6 +45,7 @@ from crewai.events.listeners.tracing.utils import ( has_user_declined_tracing, set_tracing_enabled, should_enable_tracing, + should_suppress_tracing_messages, ) from crewai.events.types.flow_events import ( FlowCreatedEvent, @@ -2074,12 +2075,14 @@ class Flow(Generic[T], metaclass=FlowMeta): racing_members, other_listeners, listener_result, - triggering_event_id, + current_triggering_event_id, ) else: tasks = [ self._execute_single_listener( - listener_name, listener_result, triggering_event_id + listener_name, + listener_result, + current_triggering_event_id, ) for listener_name in listeners_triggered ] @@ -2626,6 +2629,8 @@ class Flow(Generic[T], metaclass=FlowMeta): @staticmethod def _show_tracing_disabled_message() -> None: """Show a message when tracing is disabled.""" + if should_suppress_tracing_messages(): + return console = Console() diff --git a/lib/crewai/src/crewai/hooks/llm_hooks.py b/lib/crewai/src/crewai/hooks/llm_hooks.py index 2f5462fe0..3a6abbedf 100644 --- a/lib/crewai/src/crewai/hooks/llm_hooks.py +++ b/lib/crewai/src/crewai/hooks/llm_hooks.py @@ -3,7 +3,12 @@ from __future__ import annotations from typing import TYPE_CHECKING, Any, cast from crewai.events.event_listener import event_listener -from crewai.hooks.types import AfterLLMCallHookType, BeforeLLMCallHookType +from crewai.hooks.types import ( + AfterLLMCallHookCallable, + AfterLLMCallHookType, + BeforeLLMCallHookCallable, + BeforeLLMCallHookType, +) from crewai.utilities.printer import Printer @@ -149,12 +154,12 @@ class LLMCallHookContext: event_listener.formatter.resume_live_updates() -_before_llm_call_hooks: list[BeforeLLMCallHookType] = [] -_after_llm_call_hooks: list[AfterLLMCallHookType] = [] +_before_llm_call_hooks: list[BeforeLLMCallHookType | BeforeLLMCallHookCallable] = [] +_after_llm_call_hooks: list[AfterLLMCallHookType | AfterLLMCallHookCallable] = [] def register_before_llm_call_hook( - hook: BeforeLLMCallHookType, + hook: BeforeLLMCallHookType | BeforeLLMCallHookCallable, ) -> None: """Register a global before_llm_call hook. @@ -190,7 +195,7 @@ def register_before_llm_call_hook( def register_after_llm_call_hook( - hook: AfterLLMCallHookType, + hook: AfterLLMCallHookType | AfterLLMCallHookCallable, ) -> None: """Register a global after_llm_call hook. @@ -217,7 +222,9 @@ def register_after_llm_call_hook( _after_llm_call_hooks.append(hook) -def get_before_llm_call_hooks() -> list[BeforeLLMCallHookType]: +def get_before_llm_call_hooks() -> list[ + BeforeLLMCallHookType | BeforeLLMCallHookCallable +]: """Get all registered global before_llm_call hooks. Returns: @@ -226,7 +233,7 @@ def get_before_llm_call_hooks() -> list[BeforeLLMCallHookType]: return _before_llm_call_hooks.copy() -def get_after_llm_call_hooks() -> list[AfterLLMCallHookType]: +def get_after_llm_call_hooks() -> list[AfterLLMCallHookType | AfterLLMCallHookCallable]: """Get all registered global after_llm_call hooks. Returns: @@ -236,7 +243,7 @@ def get_after_llm_call_hooks() -> list[AfterLLMCallHookType]: def unregister_before_llm_call_hook( - hook: BeforeLLMCallHookType, + hook: BeforeLLMCallHookType | BeforeLLMCallHookCallable, ) -> bool: """Unregister a specific global before_llm_call hook. @@ -262,7 +269,7 @@ def unregister_before_llm_call_hook( def unregister_after_llm_call_hook( - hook: AfterLLMCallHookType, + hook: AfterLLMCallHookType | AfterLLMCallHookCallable, ) -> bool: """Unregister a specific global after_llm_call hook. diff --git a/lib/crewai/src/crewai/hooks/tool_hooks.py b/lib/crewai/src/crewai/hooks/tool_hooks.py index 6ee0ab033..ac7f5c362 100644 --- a/lib/crewai/src/crewai/hooks/tool_hooks.py +++ b/lib/crewai/src/crewai/hooks/tool_hooks.py @@ -3,7 +3,12 @@ from __future__ import annotations from typing import TYPE_CHECKING, Any from crewai.events.event_listener import event_listener -from crewai.hooks.types import AfterToolCallHookType, BeforeToolCallHookType +from crewai.hooks.types import ( + AfterToolCallHookCallable, + AfterToolCallHookType, + BeforeToolCallHookCallable, + BeforeToolCallHookType, +) from crewai.utilities.printer import Printer @@ -112,12 +117,12 @@ class ToolCallHookContext: # Global hook registries -_before_tool_call_hooks: list[BeforeToolCallHookType] = [] -_after_tool_call_hooks: list[AfterToolCallHookType] = [] +_before_tool_call_hooks: list[BeforeToolCallHookType | BeforeToolCallHookCallable] = [] +_after_tool_call_hooks: list[AfterToolCallHookType | AfterToolCallHookCallable] = [] def register_before_tool_call_hook( - hook: BeforeToolCallHookType, + hook: BeforeToolCallHookType | BeforeToolCallHookCallable, ) -> None: """Register a global before_tool_call hook. @@ -154,7 +159,7 @@ def register_before_tool_call_hook( def register_after_tool_call_hook( - hook: AfterToolCallHookType, + hook: AfterToolCallHookType | AfterToolCallHookCallable, ) -> None: """Register a global after_tool_call hook. @@ -184,7 +189,9 @@ def register_after_tool_call_hook( _after_tool_call_hooks.append(hook) -def get_before_tool_call_hooks() -> list[BeforeToolCallHookType]: +def get_before_tool_call_hooks() -> list[ + BeforeToolCallHookType | BeforeToolCallHookCallable +]: """Get all registered global before_tool_call hooks. Returns: @@ -193,7 +200,9 @@ def get_before_tool_call_hooks() -> list[BeforeToolCallHookType]: return _before_tool_call_hooks.copy() -def get_after_tool_call_hooks() -> list[AfterToolCallHookType]: +def get_after_tool_call_hooks() -> list[ + AfterToolCallHookType | AfterToolCallHookCallable +]: """Get all registered global after_tool_call hooks. Returns: @@ -203,7 +212,7 @@ def get_after_tool_call_hooks() -> list[AfterToolCallHookType]: def unregister_before_tool_call_hook( - hook: BeforeToolCallHookType, + hook: BeforeToolCallHookType | BeforeToolCallHookCallable, ) -> bool: """Unregister a specific global before_tool_call hook. @@ -229,7 +238,7 @@ def unregister_before_tool_call_hook( def unregister_after_tool_call_hook( - hook: AfterToolCallHookType, + hook: AfterToolCallHookType | AfterToolCallHookCallable, ) -> bool: """Unregister a specific global after_tool_call hook. diff --git a/lib/crewai/src/crewai/knowledge/source/utils/__init__.py b/lib/crewai/src/crewai/knowledge/source/utils/__init__.py new file mode 100644 index 000000000..0f7c5142c --- /dev/null +++ b/lib/crewai/src/crewai/knowledge/source/utils/__init__.py @@ -0,0 +1 @@ +"""Knowledge source utilities.""" diff --git a/lib/crewai/src/crewai/knowledge/source/utils/source_helper.py b/lib/crewai/src/crewai/knowledge/source/utils/source_helper.py new file mode 100644 index 000000000..9ab41cd30 --- /dev/null +++ b/lib/crewai/src/crewai/knowledge/source/utils/source_helper.py @@ -0,0 +1,70 @@ +"""Helper utilities for knowledge sources.""" + +from typing import Any, ClassVar + +from crewai.knowledge.source.base_knowledge_source import BaseKnowledgeSource +from crewai.knowledge.source.csv_knowledge_source import CSVKnowledgeSource +from crewai.knowledge.source.excel_knowledge_source import ExcelKnowledgeSource +from crewai.knowledge.source.json_knowledge_source import JSONKnowledgeSource +from crewai.knowledge.source.pdf_knowledge_source import PDFKnowledgeSource +from crewai.knowledge.source.text_file_knowledge_source import TextFileKnowledgeSource + + +class SourceHelper: + """Helper class for creating and managing knowledge sources.""" + + SUPPORTED_FILE_TYPES: ClassVar[list[str]] = [ + ".csv", + ".pdf", + ".json", + ".txt", + ".xlsx", + ".xls", + ] + + _FILE_TYPE_MAP: ClassVar[dict[str, type[BaseKnowledgeSource]]] = { + ".csv": CSVKnowledgeSource, + ".pdf": PDFKnowledgeSource, + ".json": JSONKnowledgeSource, + ".txt": TextFileKnowledgeSource, + ".xlsx": ExcelKnowledgeSource, + ".xls": ExcelKnowledgeSource, + } + + @classmethod + def is_supported_file(cls, file_path: str) -> bool: + """Check if a file type is supported. + + Args: + file_path: Path to the file. + + Returns: + True if the file type is supported. + """ + return file_path.lower().endswith(tuple(cls.SUPPORTED_FILE_TYPES)) + + @classmethod + def get_source( + cls, file_path: str, metadata: dict[str, Any] | None = None + ) -> BaseKnowledgeSource: + """Create appropriate KnowledgeSource based on file extension. + + Args: + file_path: Path to the file. + metadata: Optional metadata to attach to the source. + + Returns: + The appropriate KnowledgeSource instance. + + Raises: + ValueError: If the file type is not supported. + """ + if not cls.is_supported_file(file_path): + raise ValueError(f"Unsupported file type: {file_path}") + + lower_path = file_path.lower() + for ext, source_cls in cls._FILE_TYPE_MAP.items(): + if lower_path.endswith(ext): + return source_cls(file_path=[file_path], metadata=metadata) + + raise ValueError(f"Unsupported file type: {file_path}") diff --git a/lib/crewai/src/crewai/project/crew_base.py b/lib/crewai/src/crewai/project/crew_base.py index 202d98898..323450b13 100644 --- a/lib/crewai/src/crewai/project/crew_base.py +++ b/lib/crewai/src/crewai/project/crew_base.py @@ -27,6 +27,8 @@ if TYPE_CHECKING: from crewai import Agent, Task from crewai.agents.cache.cache_handler import CacheHandler from crewai.crews.crew_output import CrewOutput + from crewai.hooks.llm_hooks import LLMCallHookContext + from crewai.hooks.tool_hooks import ToolCallHookContext from crewai.project.wrappers import ( CrewInstance, OutputJsonClass, @@ -34,6 +36,8 @@ if TYPE_CHECKING: ) from crewai.tasks.task_output import TaskOutput +_post_initialize_crew_hooks: list[Callable[[Any], None]] = [] + class AgentConfig(TypedDict, total=False): """Type definition for agent configuration dictionary. @@ -266,6 +270,9 @@ class CrewBaseMeta(type): instance.map_all_agent_variables() instance.map_all_task_variables() + for hook in _post_initialize_crew_hooks: + hook(instance) + original_methods = { name: method for name, method in cls.__dict__.items() @@ -485,47 +492,61 @@ def _register_crew_hooks(instance: CrewInstance, cls: type) -> None: if has_agent_filter: agents_filter = hook_method._filter_agents - def make_filtered_before_llm(bound_fn, agents_list): - def filtered(context): + def make_filtered_before_llm( + bound_fn: Callable[[LLMCallHookContext], bool | None], + agents_list: list[str], + ) -> Callable[[LLMCallHookContext], bool | None]: + def filtered(context: LLMCallHookContext) -> bool | None: if context.agent and context.agent.role not in agents_list: return None return bound_fn(context) return filtered - final_hook = make_filtered_before_llm(bound_hook, agents_filter) + before_llm_hook = make_filtered_before_llm(bound_hook, agents_filter) else: - final_hook = bound_hook + before_llm_hook = bound_hook - register_before_llm_call_hook(final_hook) - instance._registered_hook_functions.append(("before_llm_call", final_hook)) + register_before_llm_call_hook(before_llm_hook) + instance._registered_hook_functions.append( + ("before_llm_call", before_llm_hook) + ) if hasattr(hook_method, "is_after_llm_call_hook"): if has_agent_filter: agents_filter = hook_method._filter_agents - def make_filtered_after_llm(bound_fn, agents_list): - def filtered(context): + def make_filtered_after_llm( + bound_fn: Callable[[LLMCallHookContext], str | None], + agents_list: list[str], + ) -> Callable[[LLMCallHookContext], str | None]: + def filtered(context: LLMCallHookContext) -> str | None: if context.agent and context.agent.role not in agents_list: return None return bound_fn(context) return filtered - final_hook = make_filtered_after_llm(bound_hook, agents_filter) + after_llm_hook = make_filtered_after_llm(bound_hook, agents_filter) else: - final_hook = bound_hook + after_llm_hook = bound_hook - register_after_llm_call_hook(final_hook) - instance._registered_hook_functions.append(("after_llm_call", final_hook)) + register_after_llm_call_hook(after_llm_hook) + instance._registered_hook_functions.append( + ("after_llm_call", after_llm_hook) + ) if hasattr(hook_method, "is_before_tool_call_hook"): if has_tool_filter or has_agent_filter: tools_filter = getattr(hook_method, "_filter_tools", None) agents_filter = getattr(hook_method, "_filter_agents", None) - def make_filtered_before_tool(bound_fn, tools_list, agents_list): - def filtered(context): + def make_filtered_before_tool( + bound_fn: Callable[[ToolCallHookContext], bool | None], + tools_list: list[str] | None, + agents_list: list[str] | None, + ) -> Callable[[ToolCallHookContext], bool | None]: + def filtered(context: ToolCallHookContext) -> bool | None: if tools_list and context.tool_name not in tools_list: return None if ( @@ -538,22 +559,28 @@ def _register_crew_hooks(instance: CrewInstance, cls: type) -> None: return filtered - final_hook = make_filtered_before_tool( + before_tool_hook = make_filtered_before_tool( bound_hook, tools_filter, agents_filter ) else: - final_hook = bound_hook + before_tool_hook = bound_hook - register_before_tool_call_hook(final_hook) - instance._registered_hook_functions.append(("before_tool_call", final_hook)) + register_before_tool_call_hook(before_tool_hook) + instance._registered_hook_functions.append( + ("before_tool_call", before_tool_hook) + ) if hasattr(hook_method, "is_after_tool_call_hook"): if has_tool_filter or has_agent_filter: tools_filter = getattr(hook_method, "_filter_tools", None) agents_filter = getattr(hook_method, "_filter_agents", None) - def make_filtered_after_tool(bound_fn, tools_list, agents_list): - def filtered(context): + def make_filtered_after_tool( + bound_fn: Callable[[ToolCallHookContext], str | None], + tools_list: list[str] | None, + agents_list: list[str] | None, + ) -> Callable[[ToolCallHookContext], str | None]: + def filtered(context: ToolCallHookContext) -> str | None: if tools_list and context.tool_name not in tools_list: return None if ( @@ -566,14 +593,16 @@ def _register_crew_hooks(instance: CrewInstance, cls: type) -> None: return filtered - final_hook = make_filtered_after_tool( + after_tool_hook = make_filtered_after_tool( bound_hook, tools_filter, agents_filter ) else: - final_hook = bound_hook + after_tool_hook = bound_hook - register_after_tool_call_hook(final_hook) - instance._registered_hook_functions.append(("after_tool_call", final_hook)) + register_after_tool_call_hook(after_tool_hook) + instance._registered_hook_functions.append( + ("after_tool_call", after_tool_hook) + ) instance._hooks_being_registered = False diff --git a/lib/crewai/src/crewai/project/wrappers.py b/lib/crewai/src/crewai/project/wrappers.py index 28cd39525..3d570b6f0 100644 --- a/lib/crewai/src/crewai/project/wrappers.py +++ b/lib/crewai/src/crewai/project/wrappers.py @@ -72,6 +72,8 @@ class CrewInstance(Protocol): __crew_metadata__: CrewMetadata _mcp_server_adapter: Any _all_methods: dict[str, Callable[..., Any]] + _registered_hook_functions: list[tuple[str, Callable[..., Any]]] + _hooks_being_registered: bool agents: list[Agent] tasks: list[Task] base_directory: Path diff --git a/lib/crewai/src/crewai/task.py b/lib/crewai/src/crewai/task.py index 77056f0ca..d73c3d919 100644 --- a/lib/crewai/src/crewai/task.py +++ b/lib/crewai/src/crewai/task.py @@ -31,6 +31,7 @@ from pydantic_core import PydanticCustomError from typing_extensions import Self from crewai.agents.agent_builder.base_agent import BaseAgent +from crewai.core.providers.content_processor import process_content from crewai.events.event_bus import crewai_event_bus from crewai.events.types.task_events import ( TaskCompletedEvent, @@ -496,6 +497,7 @@ class Task(BaseModel): tools: list[BaseTool] | None = None, ) -> TaskOutput: """Execute the task synchronously.""" + self.start_time = datetime.datetime.now() return self._execute_core(agent, context, tools) @property @@ -536,6 +538,7 @@ class Task(BaseModel): ) -> None: """Execute the task asynchronously with context handling.""" try: + self.start_time = datetime.datetime.now() result = self._execute_core(agent, context, tools) future.set_result(result) except Exception as e: @@ -548,6 +551,7 @@ class Task(BaseModel): tools: list[BaseTool] | None = None, ) -> TaskOutput: """Execute the task asynchronously using native async/await.""" + self.start_time = datetime.datetime.now() return await self._aexecute_core(agent, context, tools) async def _aexecute_core( @@ -566,8 +570,6 @@ class Task(BaseModel): f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical." ) - self.start_time = datetime.datetime.now() - self.prompt_context = context tools = tools or self.tools or [] @@ -579,6 +581,8 @@ class Task(BaseModel): tools=tools, ) + self._post_agent_execution(agent) + if not self._guardrails and not self._guardrail: pydantic_output, json_output = self._export_output(result) else: @@ -661,8 +665,6 @@ class Task(BaseModel): f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical." ) - self.start_time = datetime.datetime.now() - self.prompt_context = context tools = tools or self.tools or [] @@ -674,6 +676,8 @@ class Task(BaseModel): tools=tools, ) + self._post_agent_execution(agent) + if not self._guardrails and not self._guardrail: pydantic_output, json_output = self._export_output(result) else: @@ -741,6 +745,9 @@ class Task(BaseModel): finally: clear_task_files(self.id) + def _post_agent_execution(self, agent: BaseAgent) -> None: + pass + def prompt(self) -> str: """Generates the task prompt with optional markdown formatting. @@ -863,6 +870,11 @@ Follow these guidelines: except ValueError as e: raise ValueError(f"Error interpolating description: {e!s}") from e + self.description = process_content(self.description, {"task": self}) + self._original_expected_output = process_content( + self._original_expected_output, {"task": self} + ) + try: self.expected_output = interpolate_only( input_string=self._original_expected_output, inputs=inputs diff --git a/lib/crewai/src/crewai/tasks/hallucination_guardrail.py b/lib/crewai/src/crewai/tasks/hallucination_guardrail.py index dd000a83c..c48b890a8 100644 --- a/lib/crewai/src/crewai/tasks/hallucination_guardrail.py +++ b/lib/crewai/src/crewai/tasks/hallucination_guardrail.py @@ -6,6 +6,7 @@ Classes: HallucinationGuardrail: Placeholder guardrail that validates task outputs. """ +from collections.abc import Callable from typing import Any from crewai.llm import LLM @@ -13,32 +14,36 @@ from crewai.tasks.task_output import TaskOutput from crewai.utilities.logger import Logger +_validate_output_hook: Callable[..., tuple[bool, Any]] | None = None + + class HallucinationGuardrail: """Placeholder for the HallucinationGuardrail feature. Attributes: - context: The reference context that outputs would be checked against. + context: Optional reference context that outputs would be checked against. llm: The language model that would be used for evaluation. threshold: Optional minimum faithfulness score that would be required to pass. tool_response: Optional tool response information that would be used in evaluation. Examples: - >>> # Basic usage with default verdict logic + >>> # Basic usage without context (uses task expected_output as context) + >>> guardrail = HallucinationGuardrail(llm=agent.llm) + + >>> # With context for reference >>> guardrail = HallucinationGuardrail( - ... context="AI helps with various tasks including analysis and generation.", ... llm=agent.llm, + ... context="AI helps with various tasks including analysis and generation.", ... ) >>> # With custom threshold for stricter validation >>> strict_guardrail = HallucinationGuardrail( - ... context="Quantum computing uses qubits in superposition.", ... llm=agent.llm, - ... threshold=8.0, # Would require score >= 8 to pass in enterprise version + ... threshold=8.0, # Require score >= 8 to pass ... ) >>> # With tool response for additional context >>> guardrail_with_tools = HallucinationGuardrail( - ... context="The current weather data", ... llm=agent.llm, ... tool_response="Weather API returned: Temperature 22°C, Humidity 65%", ... ) @@ -46,16 +51,17 @@ class HallucinationGuardrail: def __init__( self, - context: str, llm: LLM, + context: str | None = None, threshold: float | None = None, tool_response: str = "", ): """Initialize the HallucinationGuardrail placeholder. Args: - context: The reference context that outputs would be checked against. llm: The language model that would be used for evaluation. + context: Optional reference context that outputs would be checked against. + If not provided, the task's expected_output will be used as context. threshold: Optional minimum faithfulness score that would be required to pass. tool_response: Optional tool response information that would be used in evaluation. """ @@ -78,16 +84,17 @@ class HallucinationGuardrail: def __call__(self, task_output: TaskOutput) -> tuple[bool, Any]: """Validate a task output against hallucination criteria. - In the open source, this method always returns that the output is valid. - Args: task_output: The output to be validated. Returns: A tuple containing: - - True - - The raw task output + - True if validation passed, False otherwise + - The raw task output if valid, or error feedback if invalid """ + if callable(_validate_output_hook): + return _validate_output_hook(self, task_output) + self._logger.log( "warning", "Premium hallucination detection skipped (use for free at https://app.crewai.com)\n", diff --git a/lib/crewai/src/crewai/tasks/llm_guardrail.py b/lib/crewai/src/crewai/tasks/llm_guardrail.py index 803b2d749..3729e8084 100644 --- a/lib/crewai/src/crewai/tasks/llm_guardrail.py +++ b/lib/crewai/src/crewai/tasks/llm_guardrail.py @@ -1,6 +1,10 @@ +import asyncio +from collections.abc import Coroutine +import inspect from typing import Any from pydantic import BaseModel, Field +from typing_extensions import TypeIs from crewai.agent import Agent from crewai.lite_agent_output import LiteAgentOutput @@ -8,6 +12,13 @@ from crewai.llms.base_llm import BaseLLM from crewai.tasks.task_output import TaskOutput +def _is_coroutine( + obj: LiteAgentOutput | Coroutine[Any, Any, LiteAgentOutput], +) -> TypeIs[Coroutine[Any, Any, LiteAgentOutput]]: + """Check if obj is a coroutine for type narrowing.""" + return inspect.iscoroutine(obj) + + class LLMGuardrailResult(BaseModel): valid: bool = Field( description="Whether the task output complies with the guardrail" @@ -62,7 +73,10 @@ class LLMGuardrail: - If the Task result complies with the guardrail, saying that is valid """ - return agent.kickoff(query, response_format=LLMGuardrailResult) + kickoff_result = agent.kickoff(query, response_format=LLMGuardrailResult) + if _is_coroutine(kickoff_result): + return asyncio.run(kickoff_result) + return kickoff_result def __call__(self, task_output: TaskOutput) -> tuple[bool, Any]: """Validates the output of a task based on specified criteria. diff --git a/lib/crewai/src/crewai/telemetry/telemetry.py b/lib/crewai/src/crewai/telemetry/telemetry.py index 27d5d6090..04303fc3d 100644 --- a/lib/crewai/src/crewai/telemetry/telemetry.py +++ b/lib/crewai/src/crewai/telemetry/telemetry.py @@ -903,7 +903,7 @@ class Telemetry: { "id": str(task.id), "description": task.description, - "output": task.output.raw_output, + "output": task.output.raw if task.output else "", } for task in crew.tasks ] @@ -923,6 +923,9 @@ class Telemetry: value: The attribute value. """ + if span is None: + return + def _operation() -> None: return span.set_attribute(key, value) diff --git a/lib/crewai/src/crewai/tools/tool_usage.py b/lib/crewai/src/crewai/tools/tool_usage.py index e5a9e6154..b6ce5adb6 100644 --- a/lib/crewai/src/crewai/tools/tool_usage.py +++ b/lib/crewai/src/crewai/tools/tool_usage.py @@ -270,6 +270,7 @@ class ToolUsage: result = None # type: ignore should_retry = False available_tool = None + error_event_emitted = False try: if self.tools_handler and self.tools_handler.cache: @@ -408,6 +409,7 @@ class ToolUsage: except Exception as e: self.on_tool_error(tool=tool, tool_calling=calling, e=e) + error_event_emitted = True self._run_attempts += 1 if self._run_attempts > self._max_parsing_attempts: self._telemetry.tool_usage_error(llm=self.function_calling_llm) @@ -435,7 +437,7 @@ class ToolUsage: result = self._format_result(result=result) finally: - if started_event_emitted: + if started_event_emitted and not error_event_emitted: self.on_tool_use_finished( tool=tool, tool_calling=calling, @@ -500,6 +502,7 @@ class ToolUsage: result = None # type: ignore should_retry = False available_tool = None + error_event_emitted = False try: if self.tools_handler and self.tools_handler.cache: @@ -638,6 +641,7 @@ class ToolUsage: except Exception as e: self.on_tool_error(tool=tool, tool_calling=calling, e=e) + error_event_emitted = True self._run_attempts += 1 if self._run_attempts > self._max_parsing_attempts: self._telemetry.tool_usage_error(llm=self.function_calling_llm) @@ -665,7 +669,7 @@ class ToolUsage: result = self._format_result(result=result) finally: - if started_event_emitted: + if started_event_emitted and not error_event_emitted: self.on_tool_use_finished( tool=tool, tool_calling=calling, diff --git a/lib/crewai/src/crewai/utilities/agent_utils.py b/lib/crewai/src/crewai/utilities/agent_utils.py index 1ede7f07d..ee76dc53f 100644 --- a/lib/crewai/src/crewai/utilities/agent_utils.py +++ b/lib/crewai/src/crewai/utilities/agent_utils.py @@ -42,6 +42,8 @@ if TYPE_CHECKING: from crewai.llm import LLM from crewai.task import Task +_create_plus_client_hook: Callable[[], Any] | None = None + class SummaryContent(TypedDict): """Structure for summary content entries. @@ -91,7 +93,11 @@ def parse_tools(tools: list[BaseTool]) -> list[CrewStructuredTool]: for tool in tools: if isinstance(tool, CrewAITool): - tools_list.append(tool.to_structured_tool()) + structured_tool = tool.to_structured_tool() + structured_tool.current_usage_count = 0 + if structured_tool._original_tool: + structured_tool._original_tool.current_usage_count = 0 + tools_list.append(structured_tool) else: raise ValueError("Tool is not a CrewStructuredTool or BaseTool") @@ -818,10 +824,13 @@ def load_agent_from_repository(from_repository: str) -> dict[str, Any]: if from_repository: import importlib - from crewai.cli.authentication.token import get_auth_token - from crewai.cli.plus_api import PlusAPI + if callable(_create_plus_client_hook): + client = _create_plus_client_hook() + else: + from crewai.cli.authentication.token import get_auth_token + from crewai.cli.plus_api import PlusAPI - client = PlusAPI(api_key=get_auth_token()) + client = PlusAPI(api_key=get_auth_token()) _print_current_organization() response = client.get_agent(from_repository) if response.status_code == 404: diff --git a/lib/crewai/src/crewai/utilities/evaluators/crew_evaluator_handler.py b/lib/crewai/src/crewai/utilities/evaluators/crew_evaluator_handler.py index 9c9cac0c6..32b847d73 100644 --- a/lib/crewai/src/crewai/utilities/evaluators/crew_evaluator_handler.py +++ b/lib/crewai/src/crewai/utilities/evaluators/crew_evaluator_handler.py @@ -1,7 +1,7 @@ from __future__ import annotations from collections import defaultdict -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from pydantic import BaseModel, Field, InstanceOf from rich.box import HEAVY_EDGE @@ -36,7 +36,13 @@ class CrewEvaluator: iteration: The current iteration of the evaluation. """ - def __init__(self, crew: Crew, eval_llm: InstanceOf[BaseLLM]) -> None: + def __init__( + self, + crew: Crew, + eval_llm: InstanceOf[BaseLLM] | str | None = None, + openai_model_name: str | None = None, + llm: InstanceOf[BaseLLM] | str | None = None, + ) -> None: self.crew = crew self.llm = eval_llm self.tasks_scores: defaultdict[int, list[float]] = defaultdict(list) @@ -86,7 +92,9 @@ class CrewEvaluator: """ self.iteration = iteration - def print_crew_evaluation_result(self) -> None: + def print_crew_evaluation_result( + self, token_usage: list[dict[str, Any]] | None = None + ) -> None: """ Prints the evaluation result of the crew in a table. A Crew with 2 tasks using the command crewai test -n 3 @@ -204,7 +212,7 @@ class CrewEvaluator: CrewTestResultEvent( quality=quality_score, execution_duration=current_task.execution_duration, - model=self.llm.model, + model=getattr(self.llm, "model", str(self.llm)), crew_name=self.crew.name, crew=self.crew, ), diff --git a/lib/crewai/src/crewai/utilities/printer.py b/lib/crewai/src/crewai/utilities/printer.py index c40de684e..949da543a 100644 --- a/lib/crewai/src/crewai/utilities/printer.py +++ b/lib/crewai/src/crewai/utilities/printer.py @@ -4,6 +4,8 @@ from __future__ import annotations from typing import TYPE_CHECKING, Final, Literal, NamedTuple +from crewai.events.utils.console_formatter import should_suppress_console_output + if TYPE_CHECKING: from _typeshed import SupportsWrite @@ -77,6 +79,8 @@ class Printer: file: A file-like object (stream); defaults to the current sys.stdout. flush: Whether to forcibly flush the stream. """ + if should_suppress_console_output(): + return if isinstance(content, str): content = [ColoredText(content, color)] print( diff --git a/lib/crewai/src/crewai/utilities/serialization.py b/lib/crewai/src/crewai/utilities/serialization.py index a6b871d3d..0207e80ab 100644 --- a/lib/crewai/src/crewai/utilities/serialization.py +++ b/lib/crewai/src/crewai/utilities/serialization.py @@ -19,6 +19,7 @@ def to_serializable( exclude: set[str] | None = None, max_depth: int = 5, _current_depth: int = 0, + _ancestors: set[int] | None = None, ) -> Serializable: """Converts a Python object into a JSON-compatible representation. @@ -31,6 +32,7 @@ def to_serializable( exclude: Set of keys to exclude from the result. max_depth: Maximum recursion depth. Defaults to 5. _current_depth: Current recursion depth (for internal use). + _ancestors: Set of ancestor object ids for cycle detection (for internal use). Returns: Serializable: A JSON-compatible structure. @@ -41,16 +43,29 @@ def to_serializable( if exclude is None: exclude = set() + if _ancestors is None: + _ancestors = set() + if isinstance(obj, (str, int, float, bool, type(None))): return obj if isinstance(obj, uuid.UUID): return str(obj) if isinstance(obj, (date, datetime)): return obj.isoformat() + + object_id = id(obj) + if object_id in _ancestors: + return f"" + new_ancestors = _ancestors | {object_id} + if isinstance(obj, (list, tuple, set)): return [ to_serializable( - item, max_depth=max_depth, _current_depth=_current_depth + 1 + item, + exclude=exclude, + max_depth=max_depth, + _current_depth=_current_depth + 1, + _ancestors=new_ancestors, ) for item in obj ] @@ -61,6 +76,7 @@ def to_serializable( exclude=exclude, max_depth=max_depth, _current_depth=_current_depth + 1, + _ancestors=new_ancestors, ) for key, value in obj.items() if key not in exclude @@ -71,12 +87,16 @@ def to_serializable( obj=obj.model_dump(exclude=exclude), max_depth=max_depth, _current_depth=_current_depth + 1, + _ancestors=new_ancestors, ) except Exception: try: return { _to_serializable_key(k): to_serializable( - v, max_depth=max_depth, _current_depth=_current_depth + 1 + v, + max_depth=max_depth, + _current_depth=_current_depth + 1, + _ancestors=new_ancestors, ) for k, v in obj.__dict__.items() if k not in (exclude or set()) diff --git a/lib/crewai/tests/agents/agent_adapters/test_base_agent_adapter.py b/lib/crewai/tests/agents/agent_adapters/test_base_agent_adapter.py index 6ed42b5d1..78320d187 100644 --- a/lib/crewai/tests/agents/agent_adapters/test_base_agent_adapter.py +++ b/lib/crewai/tests/agents/agent_adapters/test_base_agent_adapter.py @@ -51,6 +51,10 @@ class ConcreteAgentAdapter(BaseAgentAdapter): # Dummy implementation for MCP tools return [] + def configure_structured_output(self, task: Any) -> None: + # Dummy implementation for structured output + pass + async def aexecute_task( self, task: Any, diff --git a/lib/crewai/tests/agents/test_agent.py b/lib/crewai/tests/agents/test_agent.py index 32130f900..025bfd334 100644 --- a/lib/crewai/tests/agents/test_agent.py +++ b/lib/crewai/tests/agents/test_agent.py @@ -703,6 +703,8 @@ def test_agent_definition_based_on_dict(): # test for human input @pytest.mark.vcr() def test_agent_human_input(): + from crewai.core.providers.human_input import SyncHumanInputProvider + # Agent configuration config = { "role": "test role", @@ -720,7 +722,7 @@ def test_agent_human_input(): human_input=True, ) - # Side effect function for _ask_human_input to simulate multiple feedback iterations + # Side effect function for _prompt_input to simulate multiple feedback iterations feedback_responses = iter( [ "Don't say hi, say Hello instead!", # First feedback: instruct change @@ -728,16 +730,16 @@ def test_agent_human_input(): ] ) - def ask_human_input_side_effect(*args, **kwargs): + def prompt_input_side_effect(*args, **kwargs): return next(feedback_responses) - # Patch both _ask_human_input and _invoke_loop to avoid real API/network calls. + # Patch both _prompt_input on provider and _invoke_loop to avoid real API/network calls. with ( patch.object( - CrewAgentExecutor, - "_ask_human_input", - side_effect=ask_human_input_side_effect, - ) as mock_human_input, + SyncHumanInputProvider, + "_prompt_input", + side_effect=prompt_input_side_effect, + ) as mock_prompt_input, patch.object( CrewAgentExecutor, "_invoke_loop", @@ -749,7 +751,7 @@ def test_agent_human_input(): # Assertions to ensure the agent behaves correctly. # It should have requested feedback twice. - assert mock_human_input.call_count == 2 + assert mock_prompt_input.call_count == 2 # The final result should be processed to "Hello" assert output.strip().lower() == "hello" diff --git a/lib/crewai/tests/events/test_event_context.py b/lib/crewai/tests/events/test_event_context.py index 071e1a34d..2a69ca1ee 100644 --- a/lib/crewai/tests/events/test_event_context.py +++ b/lib/crewai/tests/events/test_event_context.py @@ -177,4 +177,40 @@ class TestTriggeredByScope: raise ValueError("test error") except ValueError: pass - assert get_triggering_event_id() is None \ No newline at end of file + assert get_triggering_event_id() is None + + +def test_agent_scope_preserved_after_tool_error_event() -> None: + from crewai.events import crewai_event_bus + from crewai.events.types.tool_usage_events import ( + ToolUsageErrorEvent, + ToolUsageStartedEvent, + ) + + push_event_scope("crew-1", "crew_kickoff_started") + push_event_scope("task-1", "task_started") + push_event_scope("agent-1", "agent_execution_started") + + crewai_event_bus.emit( + None, + ToolUsageStartedEvent( + tool_name="test_tool", + tool_args={}, + agent_key="test_agent", + ) + ) + + crewai_event_bus.emit( + None, + ToolUsageErrorEvent( + tool_name="test_tool", + tool_args={}, + agent_key="test_agent", + error=ValueError("test error"), + ) + ) + + crewai_event_bus.flush() + + assert get_current_parent_id() == "agent-1" + diff --git a/lib/crewai/tests/test_flow_human_input_integration.py b/lib/crewai/tests/test_flow_human_input_integration.py index e60cfe514..3ce4ebbd7 100644 --- a/lib/crewai/tests/test_flow_human_input_integration.py +++ b/lib/crewai/tests/test_flow_human_input_integration.py @@ -2,6 +2,7 @@ from unittest.mock import MagicMock, patch import pytest from crewai.events.event_listener import event_listener +from crewai.core.providers.human_input import SyncHumanInputProvider class TestFlowHumanInputIntegration: @@ -24,14 +25,9 @@ class TestFlowHumanInputIntegration: @patch("builtins.input", return_value="") def test_human_input_pauses_flow_updates(self, mock_input): """Test that human input pauses Flow status updates.""" - from crewai.agents.agent_builder.base_agent_executor_mixin import ( - CrewAgentExecutorMixin, - ) - - executor = CrewAgentExecutorMixin() - executor.crew = MagicMock() - executor.crew._train = False - executor._printer = MagicMock() + provider = SyncHumanInputProvider() + crew = MagicMock() + crew._train = False formatter = event_listener.formatter @@ -39,7 +35,7 @@ class TestFlowHumanInputIntegration: patch.object(formatter, "pause_live_updates") as mock_pause, patch.object(formatter, "resume_live_updates") as mock_resume, ): - result = executor._ask_human_input("Test result") + result = provider._prompt_input(crew) mock_pause.assert_called_once() mock_resume.assert_called_once() @@ -49,14 +45,9 @@ class TestFlowHumanInputIntegration: @patch("builtins.input", side_effect=["feedback", ""]) def test_multiple_human_input_rounds(self, mock_input): """Test multiple rounds of human input with Flow status management.""" - from crewai.agents.agent_builder.base_agent_executor_mixin import ( - CrewAgentExecutorMixin, - ) - - executor = CrewAgentExecutorMixin() - executor.crew = MagicMock() - executor.crew._train = False - executor._printer = MagicMock() + provider = SyncHumanInputProvider() + crew = MagicMock() + crew._train = False formatter = event_listener.formatter @@ -75,10 +66,10 @@ class TestFlowHumanInputIntegration: formatter, "resume_live_updates", side_effect=track_resume ), ): - result1 = executor._ask_human_input("Test result 1") + result1 = provider._prompt_input(crew) assert result1 == "feedback" - result2 = executor._ask_human_input("Test result 2") + result2 = provider._prompt_input(crew) assert result2 == "" assert len(pause_calls) == 2 @@ -103,14 +94,9 @@ class TestFlowHumanInputIntegration: def test_pause_resume_exception_handling(self): """Test that resume is called even if exception occurs during human input.""" - from crewai.agents.agent_builder.base_agent_executor_mixin import ( - CrewAgentExecutorMixin, - ) - - executor = CrewAgentExecutorMixin() - executor.crew = MagicMock() - executor.crew._train = False - executor._printer = MagicMock() + provider = SyncHumanInputProvider() + crew = MagicMock() + crew._train = False formatter = event_listener.formatter @@ -122,21 +108,16 @@ class TestFlowHumanInputIntegration: ), ): with pytest.raises(KeyboardInterrupt): - executor._ask_human_input("Test result") + provider._prompt_input(crew) mock_pause.assert_called_once() mock_resume.assert_called_once() def test_training_mode_human_input(self): """Test human input in training mode.""" - from crewai.agents.agent_builder.base_agent_executor_mixin import ( - CrewAgentExecutorMixin, - ) - - executor = CrewAgentExecutorMixin() - executor.crew = MagicMock() - executor.crew._train = True - executor._printer = MagicMock() + provider = SyncHumanInputProvider() + crew = MagicMock() + crew._train = True formatter = event_listener.formatter @@ -146,7 +127,7 @@ class TestFlowHumanInputIntegration: patch.object(formatter.console, "print") as mock_console_print, patch("builtins.input", return_value="training feedback"), ): - result = executor._ask_human_input("Test result") + result = provider._prompt_input(crew) mock_pause.assert_called_once() mock_resume.assert_called_once() @@ -161,4 +142,4 @@ class TestFlowHumanInputIntegration: for call in call_args if call[0] ) - assert training_panel_found + assert training_panel_found \ No newline at end of file diff --git a/lib/crewai/tests/tools/test_tool_usage.py b/lib/crewai/tests/tools/test_tool_usage.py index c6dfad58b..b68a41666 100644 --- a/lib/crewai/tests/tools/test_tool_usage.py +++ b/lib/crewai/tests/tools/test_tool_usage.py @@ -10,7 +10,9 @@ from crewai import Agent, Task from crewai.events.event_bus import crewai_event_bus from crewai.events.types.tool_usage_events import ( ToolSelectionErrorEvent, + ToolUsageErrorEvent, ToolUsageFinishedEvent, + ToolUsageStartedEvent, ToolValidateInputErrorEvent, ) from crewai.tools import BaseTool @@ -744,3 +746,78 @@ def test_tool_usage_finished_event_with_cached_result(): assert isinstance(event.started_at, datetime.datetime) assert isinstance(event.finished_at, datetime.datetime) assert event.type == "tool_usage_finished" + + +def test_tool_error_does_not_emit_finished_event(): + from crewai.tools.tool_calling import ToolCalling + + class FailingTool(BaseTool): + name: str = "Failing Tool" + description: str = "A tool that always fails" + + def _run(self, **kwargs) -> str: + raise ValueError("Intentional failure") + + failing_tool = FailingTool().to_structured_tool() + + mock_agent = MagicMock() + mock_agent.key = "test_agent_key" + mock_agent.role = "test_agent_role" + mock_agent._original_role = "test_agent_role" + mock_agent.verbose = False + mock_agent.fingerprint = None + mock_agent.i18n.tools.return_value = {"name": "Add Image"} + mock_agent.i18n.errors.return_value = "Error: {error}" + mock_agent.i18n.slice.return_value = "Available tools: {tool_names}" + + mock_task = MagicMock() + mock_task.delegations = 0 + mock_task.name = "Test Task" + mock_task.description = "A test task" + mock_task.id = "test-task-id" + + mock_action = MagicMock() + mock_action.tool = "failing_tool" + mock_action.tool_input = "{}" + + tool_usage = ToolUsage( + tools_handler=MagicMock(cache=None, last_used_tool=None), + tools=[failing_tool], + task=mock_task, + function_calling_llm=None, + agent=mock_agent, + action=mock_action, + ) + + started_events = [] + error_events = [] + finished_events = [] + error_received = threading.Event() + + @crewai_event_bus.on(ToolUsageStartedEvent) + def on_started(source, event): + if event.tool_name == "failing_tool": + started_events.append(event) + + @crewai_event_bus.on(ToolUsageErrorEvent) + def on_error(source, event): + if event.tool_name == "failing_tool": + error_events.append(event) + error_received.set() + + @crewai_event_bus.on(ToolUsageFinishedEvent) + def on_finished(source, event): + if event.tool_name == "failing_tool": + finished_events.append(event) + + tool_calling = ToolCalling(tool_name="failing_tool", arguments={}) + tool_usage.use(calling=tool_calling, tool_string="Action: failing_tool") + + assert error_received.wait(timeout=5), "Timeout waiting for error event" + crewai_event_bus.flush() + + assert len(started_events) >= 1, "Expected at least one ToolUsageStartedEvent" + assert len(error_events) >= 1, "Expected at least one ToolUsageErrorEvent" + assert len(finished_events) == 0, ( + "ToolUsageFinishedEvent should NOT be emitted after ToolUsageErrorEvent" + )