diff --git a/lib/crewai/src/crewai/experimental/agent_executor.py b/lib/crewai/src/crewai/experimental/agent_executor.py index 98e8b5bdb..87f9e83d4 100644 --- a/lib/crewai/src/crewai/experimental/agent_executor.py +++ b/lib/crewai/src/crewai/experimental/agent_executor.py @@ -18,6 +18,7 @@ from crewai.agents.parser import ( AgentFinish, OutputParserError, ) +from crewai.core.providers.human_input import get_provider from crewai.events.event_bus import crewai_event_bus from crewai.events.listeners.tracing.utils import ( is_tracing_enabled_in_context, @@ -41,7 +42,12 @@ from crewai.hooks.tool_hooks import ( get_after_tool_call_hooks, get_before_tool_call_hooks, ) -from crewai.hooks.types import AfterLLMCallHookType, BeforeLLMCallHookType +from crewai.hooks.types import ( + AfterLLMCallHookCallable, + AfterLLMCallHookType, + BeforeLLMCallHookCallable, + BeforeLLMCallHookType, +) from crewai.utilities.agent_utils import ( convert_tools_to_openai_schema, enforce_rpm_limit, @@ -191,8 +197,12 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): self._instance_id = str(uuid4())[:8] - self.before_llm_call_hooks: list[BeforeLLMCallHookType] = [] - self.after_llm_call_hooks: list[AfterLLMCallHookType] = [] + self.before_llm_call_hooks: list[ + BeforeLLMCallHookType | BeforeLLMCallHookCallable + ] = [] + self.after_llm_call_hooks: list[ + AfterLLMCallHookType | AfterLLMCallHookCallable + ] = [] self.before_llm_call_hooks.extend(get_before_llm_call_hooks()) self.after_llm_call_hooks.extend(get_after_llm_call_hooks()) @@ -207,6 +217,51 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): ) self._state = AgentReActState() + @property + def messages(self) -> list[LLMMessage]: + """Delegate to state for ExecutorContext conformance.""" + return self._state.messages + + @messages.setter + def messages(self, value: list[LLMMessage]) -> None: + """Delegate to state for ExecutorContext conformance.""" + self._state.messages = value + + @property + def ask_for_human_input(self) -> bool: + """Delegate to state for ExecutorContext conformance.""" + return self._state.ask_for_human_input + + @ask_for_human_input.setter + def ask_for_human_input(self, value: bool) -> None: + """Delegate to state for ExecutorContext conformance.""" + self._state.ask_for_human_input = value + + def _invoke_loop(self) -> AgentFinish: + """Invoke the agent loop and return the result. + + Required by ExecutorContext protocol. + """ + self._state.iterations = 0 + self._state.is_finished = False + self._state.current_answer = None + + self.kickoff() + + answer = self._state.current_answer + if not isinstance(answer, AgentFinish): + raise RuntimeError("Agent loop did not produce a final answer") + return answer + + def _format_feedback_message(self, feedback: str) -> LLMMessage: + """Format feedback as a message for the LLM. + + Required by ExecutorContext protocol. + """ + return format_message_for_llm( + self._i18n.slice("feedback_instructions").format(feedback=feedback) + ) + def _ensure_flow_initialized(self) -> None: """Ensure Flow.__init__() has been called. @@ -300,16 +355,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): """ return self._state - @property - def messages(self) -> list[LLMMessage]: - """Compatibility property for mixin - returns state messages.""" - return self._state.messages - - @messages.setter - def messages(self, value: list[LLMMessage]) -> None: - """Set state messages.""" - self._state.messages = value - @property def iterations(self) -> int: """Compatibility property for mixin - returns state iterations.""" @@ -1321,17 +1366,8 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): Returns: Final answer after feedback. """ - output_str = ( - str(formatted_answer.output) - if isinstance(formatted_answer.output, BaseModel) - else formatted_answer.output - ) - human_feedback = self._ask_human_input(output_str) - - if self._is_training_mode(): - return self._handle_training_feedback(formatted_answer, human_feedback) - - return self._handle_regular_feedback(formatted_answer, human_feedback) + provider = get_provider() + return provider.handle_feedback(formatted_answer, self) def _is_training_mode(self) -> bool: """Check if training mode is active. @@ -1341,101 +1377,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin): """ return bool(self.crew and self.crew._train) - def _handle_training_feedback( - self, initial_answer: AgentFinish, feedback: str - ) -> AgentFinish: - """Process training feedback and generate improved answer. - - Args: - initial_answer: Initial agent output. - feedback: Training feedback. - - Returns: - Improved answer. - """ - self._handle_crew_training_output(initial_answer, feedback) - self.state.messages.append( - format_message_for_llm( - self._i18n.slice("feedback_instructions").format(feedback=feedback) - ) - ) - - # Re-run flow for improved answer - self.state.iterations = 0 - self.state.is_finished = False - self.state.current_answer = None - - self.kickoff() - - # Get improved answer from state - improved_answer = self.state.current_answer - if not isinstance(improved_answer, AgentFinish): - raise RuntimeError( - "Training feedback iteration did not produce final answer" - ) - - self._handle_crew_training_output(improved_answer) - self.state.ask_for_human_input = False - return improved_answer - - def _handle_regular_feedback( - self, current_answer: AgentFinish, initial_feedback: str - ) -> AgentFinish: - """Process regular feedback iteratively until user is satisfied. - - Args: - current_answer: Current agent output. - initial_feedback: Initial user feedback. - - Returns: - Final answer after iterations. - """ - feedback = initial_feedback - answer = current_answer - - while self.state.ask_for_human_input: - if feedback.strip() == "": - self.state.ask_for_human_input = False - else: - answer = self._process_feedback_iteration(feedback) - output_str = ( - str(answer.output) - if isinstance(answer.output, BaseModel) - else answer.output - ) - feedback = self._ask_human_input(output_str) - - return answer - - def _process_feedback_iteration(self, feedback: str) -> AgentFinish: - """Process a single feedback iteration and generate updated response. - - Args: - feedback: User feedback. - - Returns: - Updated agent response. - """ - self.state.messages.append( - format_message_for_llm( - self._i18n.slice("feedback_instructions").format(feedback=feedback) - ) - ) - - # Re-run flow - self.state.iterations = 0 - self.state.is_finished = False - self.state.current_answer = None - - self.kickoff() - - # Get answer from state - answer = self.state.current_answer - if not isinstance(answer, AgentFinish): - raise RuntimeError("Feedback iteration did not produce final answer") - - return answer - @classmethod def __get_pydantic_core_schema__( cls, _source_type: Any, _handler: GetCoreSchemaHandler diff --git a/lib/crewai/src/crewai/tasks/llm_guardrail.py b/lib/crewai/src/crewai/tasks/llm_guardrail.py index 803b2d749..3729e8084 100644 --- a/lib/crewai/src/crewai/tasks/llm_guardrail.py +++ b/lib/crewai/src/crewai/tasks/llm_guardrail.py @@ -1,6 +1,10 @@ +import asyncio +from collections.abc import Coroutine +import inspect from typing import Any from pydantic import BaseModel, Field +from typing_extensions import TypeIs from crewai.agent import Agent from crewai.lite_agent_output import LiteAgentOutput @@ -8,6 +12,13 @@ from crewai.llms.base_llm import BaseLLM from crewai.tasks.task_output import TaskOutput +def _is_coroutine( + obj: LiteAgentOutput | Coroutine[Any, Any, LiteAgentOutput], +) -> TypeIs[Coroutine[Any, Any, LiteAgentOutput]]: + """Check if obj is a coroutine for type narrowing.""" + return inspect.iscoroutine(obj) + + class LLMGuardrailResult(BaseModel): valid: bool = Field( description="Whether the task output complies with the guardrail" @@ -62,7 +73,10 @@ class LLMGuardrail: - If the Task result complies with the guardrail, saying that is valid """ - return agent.kickoff(query, response_format=LLMGuardrailResult) + kickoff_result = agent.kickoff(query, response_format=LLMGuardrailResult) + if _is_coroutine(kickoff_result): + return asyncio.run(kickoff_result) + return kickoff_result def __call__(self, task_output: TaskOutput) -> tuple[bool, Any]: """Validates the output of a task based on specified criteria. diff --git a/lib/crewai/src/crewai/telemetry/telemetry.py b/lib/crewai/src/crewai/telemetry/telemetry.py index 27d5d6090..04303fc3d 100644 --- a/lib/crewai/src/crewai/telemetry/telemetry.py +++ b/lib/crewai/src/crewai/telemetry/telemetry.py @@ -903,7 +903,7 @@ class Telemetry: { "id": str(task.id), "description": task.description, - "output": task.output.raw_output, + "output": task.output.raw if task.output else "", } for task in crew.tasks ] @@ -923,6 +923,9 @@ class Telemetry: value: The attribute value. """ + if span is None: + return + def _operation() -> None: return span.set_attribute(key, value) diff --git a/lib/crewai/src/crewai/utilities/serialization.py b/lib/crewai/src/crewai/utilities/serialization.py index a6b871d3d..0207e80ab 100644 --- a/lib/crewai/src/crewai/utilities/serialization.py +++ b/lib/crewai/src/crewai/utilities/serialization.py @@ -19,6 +19,7 @@ def to_serializable( exclude: set[str] | None = None, max_depth: int = 5, _current_depth: int = 0, + _ancestors: set[int] | None = None, ) -> Serializable: """Converts a Python object into a JSON-compatible representation. @@ -31,6 +32,7 @@ def to_serializable( exclude: Set of keys to exclude from the result. max_depth: Maximum recursion depth. Defaults to 5. _current_depth: Current recursion depth (for internal use). + _ancestors: Set of ancestor object ids for cycle detection (for internal use). Returns: Serializable: A JSON-compatible structure. @@ -41,16 +43,29 @@ def to_serializable( if exclude is None: exclude = set() + if _ancestors is None: + _ancestors = set() + if isinstance(obj, (str, int, float, bool, type(None))): return obj if isinstance(obj, uuid.UUID): return str(obj) if isinstance(obj, (date, datetime)): return obj.isoformat() + + object_id = id(obj) + if object_id in _ancestors: + return f"" + new_ancestors = _ancestors | {object_id} + if isinstance(obj, (list, tuple, set)): return [ to_serializable( - item, max_depth=max_depth, _current_depth=_current_depth + 1 + item, + exclude=exclude, + max_depth=max_depth, + _current_depth=_current_depth + 1, + _ancestors=new_ancestors, ) for item in obj ] @@ -61,6 +76,7 @@ def to_serializable( exclude=exclude, max_depth=max_depth, _current_depth=_current_depth + 1, + _ancestors=new_ancestors, ) for key, value in obj.items() if key not in exclude @@ -71,12 +87,16 @@ def to_serializable( obj=obj.model_dump(exclude=exclude), max_depth=max_depth, _current_depth=_current_depth + 1, + _ancestors=new_ancestors, ) except Exception: try: return { _to_serializable_key(k): to_serializable( - v, max_depth=max_depth, _current_depth=_current_depth + 1 + v, + max_depth=max_depth, + _current_depth=_current_depth + 1, + _ancestors=new_ancestors, ) for k, v in obj.__dict__.items() if k not in (exclude or set())