chore: bug fixes and more refactor
Some checks failed
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled

Refactor agent executor to delegate human interactions to a provider: add messages and ask_for_human_input properties, implement _invoke_loop and _format_feedback_message, and replace the internal iterative/training feedback logic with a call to get_provider().handle_feedback.

Make LLMGuardrail kickoff coroutine-aware by detecting coroutines and running them via asyncio.run so both sync and async agents are supported.

Make telemetry more robust by safely handling missing task.output (use empty string) and returning early if span is None before setting attributes.

Improve serialization to detect circular references via an _ancestors set, propagate it through recursive calls, and pass exclude/max_depth/_current_depth consistently to prevent infinite recursion and produce stable serializable output.
This commit is contained in:
Greyson LaLonde
2026-02-04 21:21:54 -05:00
committed by GitHub
parent 711e7171e1
commit fe2a4b4e40
4 changed files with 101 additions and 123 deletions

View File

@@ -18,6 +18,7 @@ from crewai.agents.parser import (
AgentFinish,
OutputParserError,
)
from crewai.core.providers.human_input import get_provider
from crewai.events.event_bus import crewai_event_bus
from crewai.events.listeners.tracing.utils import (
is_tracing_enabled_in_context,
@@ -41,7 +42,12 @@ from crewai.hooks.tool_hooks import (
get_after_tool_call_hooks,
get_before_tool_call_hooks,
)
from crewai.hooks.types import AfterLLMCallHookType, BeforeLLMCallHookType
from crewai.hooks.types import (
AfterLLMCallHookCallable,
AfterLLMCallHookType,
BeforeLLMCallHookCallable,
BeforeLLMCallHookType,
)
from crewai.utilities.agent_utils import (
convert_tools_to_openai_schema,
enforce_rpm_limit,
@@ -191,8 +197,12 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
self._instance_id = str(uuid4())[:8]
self.before_llm_call_hooks: list[BeforeLLMCallHookType] = []
self.after_llm_call_hooks: list[AfterLLMCallHookType] = []
self.before_llm_call_hooks: list[
BeforeLLMCallHookType | BeforeLLMCallHookCallable
] = []
self.after_llm_call_hooks: list[
AfterLLMCallHookType | AfterLLMCallHookCallable
] = []
self.before_llm_call_hooks.extend(get_before_llm_call_hooks())
self.after_llm_call_hooks.extend(get_after_llm_call_hooks())
@@ -207,6 +217,51 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
)
self._state = AgentReActState()
@property
def messages(self) -> list[LLMMessage]:
"""Delegate to state for ExecutorContext conformance."""
return self._state.messages
@messages.setter
def messages(self, value: list[LLMMessage]) -> None:
"""Delegate to state for ExecutorContext conformance."""
self._state.messages = value
@property
def ask_for_human_input(self) -> bool:
"""Delegate to state for ExecutorContext conformance."""
return self._state.ask_for_human_input
@ask_for_human_input.setter
def ask_for_human_input(self, value: bool) -> None:
"""Delegate to state for ExecutorContext conformance."""
self._state.ask_for_human_input = value
def _invoke_loop(self) -> AgentFinish:
"""Invoke the agent loop and return the result.
Required by ExecutorContext protocol.
"""
self._state.iterations = 0
self._state.is_finished = False
self._state.current_answer = None
self.kickoff()
answer = self._state.current_answer
if not isinstance(answer, AgentFinish):
raise RuntimeError("Agent loop did not produce a final answer")
return answer
def _format_feedback_message(self, feedback: str) -> LLMMessage:
"""Format feedback as a message for the LLM.
Required by ExecutorContext protocol.
"""
return format_message_for_llm(
self._i18n.slice("feedback_instructions").format(feedback=feedback)
)
def _ensure_flow_initialized(self) -> None:
"""Ensure Flow.__init__() has been called.
@@ -300,16 +355,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
"""
return self._state
@property
def messages(self) -> list[LLMMessage]:
"""Compatibility property for mixin - returns state messages."""
return self._state.messages
@messages.setter
def messages(self, value: list[LLMMessage]) -> None:
"""Set state messages."""
self._state.messages = value
@property
def iterations(self) -> int:
"""Compatibility property for mixin - returns state iterations."""
@@ -1321,17 +1366,8 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
Returns:
Final answer after feedback.
"""
output_str = (
str(formatted_answer.output)
if isinstance(formatted_answer.output, BaseModel)
else formatted_answer.output
)
human_feedback = self._ask_human_input(output_str)
if self._is_training_mode():
return self._handle_training_feedback(formatted_answer, human_feedback)
return self._handle_regular_feedback(formatted_answer, human_feedback)
provider = get_provider()
return provider.handle_feedback(formatted_answer, self)
def _is_training_mode(self) -> bool:
"""Check if training mode is active.
@@ -1341,101 +1377,6 @@ class AgentExecutor(Flow[AgentReActState], CrewAgentExecutorMixin):
"""
return bool(self.crew and self.crew._train)
def _handle_training_feedback(
self, initial_answer: AgentFinish, feedback: str
) -> AgentFinish:
"""Process training feedback and generate improved answer.
Args:
initial_answer: Initial agent output.
feedback: Training feedback.
Returns:
Improved answer.
"""
self._handle_crew_training_output(initial_answer, feedback)
self.state.messages.append(
format_message_for_llm(
self._i18n.slice("feedback_instructions").format(feedback=feedback)
)
)
# Re-run flow for improved answer
self.state.iterations = 0
self.state.is_finished = False
self.state.current_answer = None
self.kickoff()
# Get improved answer from state
improved_answer = self.state.current_answer
if not isinstance(improved_answer, AgentFinish):
raise RuntimeError(
"Training feedback iteration did not produce final answer"
)
self._handle_crew_training_output(improved_answer)
self.state.ask_for_human_input = False
return improved_answer
def _handle_regular_feedback(
self, current_answer: AgentFinish, initial_feedback: str
) -> AgentFinish:
"""Process regular feedback iteratively until user is satisfied.
Args:
current_answer: Current agent output.
initial_feedback: Initial user feedback.
Returns:
Final answer after iterations.
"""
feedback = initial_feedback
answer = current_answer
while self.state.ask_for_human_input:
if feedback.strip() == "":
self.state.ask_for_human_input = False
else:
answer = self._process_feedback_iteration(feedback)
output_str = (
str(answer.output)
if isinstance(answer.output, BaseModel)
else answer.output
)
feedback = self._ask_human_input(output_str)
return answer
def _process_feedback_iteration(self, feedback: str) -> AgentFinish:
"""Process a single feedback iteration and generate updated response.
Args:
feedback: User feedback.
Returns:
Updated agent response.
"""
self.state.messages.append(
format_message_for_llm(
self._i18n.slice("feedback_instructions").format(feedback=feedback)
)
)
# Re-run flow
self.state.iterations = 0
self.state.is_finished = False
self.state.current_answer = None
self.kickoff()
# Get answer from state
answer = self.state.current_answer
if not isinstance(answer, AgentFinish):
raise RuntimeError("Feedback iteration did not produce final answer")
return answer
@classmethod
def __get_pydantic_core_schema__(
cls, _source_type: Any, _handler: GetCoreSchemaHandler

View File

@@ -1,6 +1,10 @@
import asyncio
from collections.abc import Coroutine
import inspect
from typing import Any
from pydantic import BaseModel, Field
from typing_extensions import TypeIs
from crewai.agent import Agent
from crewai.lite_agent_output import LiteAgentOutput
@@ -8,6 +12,13 @@ from crewai.llms.base_llm import BaseLLM
from crewai.tasks.task_output import TaskOutput
def _is_coroutine(
obj: LiteAgentOutput | Coroutine[Any, Any, LiteAgentOutput],
) -> TypeIs[Coroutine[Any, Any, LiteAgentOutput]]:
"""Check if obj is a coroutine for type narrowing."""
return inspect.iscoroutine(obj)
class LLMGuardrailResult(BaseModel):
valid: bool = Field(
description="Whether the task output complies with the guardrail"
@@ -62,7 +73,10 @@ class LLMGuardrail:
- If the Task result complies with the guardrail, saying that is valid
"""
return agent.kickoff(query, response_format=LLMGuardrailResult)
kickoff_result = agent.kickoff(query, response_format=LLMGuardrailResult)
if _is_coroutine(kickoff_result):
return asyncio.run(kickoff_result)
return kickoff_result
def __call__(self, task_output: TaskOutput) -> tuple[bool, Any]:
"""Validates the output of a task based on specified criteria.

View File

@@ -903,7 +903,7 @@ class Telemetry:
{
"id": str(task.id),
"description": task.description,
"output": task.output.raw_output,
"output": task.output.raw if task.output else "",
}
for task in crew.tasks
]
@@ -923,6 +923,9 @@ class Telemetry:
value: The attribute value.
"""
if span is None:
return
def _operation() -> None:
return span.set_attribute(key, value)

View File

@@ -19,6 +19,7 @@ def to_serializable(
exclude: set[str] | None = None,
max_depth: int = 5,
_current_depth: int = 0,
_ancestors: set[int] | None = None,
) -> Serializable:
"""Converts a Python object into a JSON-compatible representation.
@@ -31,6 +32,7 @@ def to_serializable(
exclude: Set of keys to exclude from the result.
max_depth: Maximum recursion depth. Defaults to 5.
_current_depth: Current recursion depth (for internal use).
_ancestors: Set of ancestor object ids for cycle detection (for internal use).
Returns:
Serializable: A JSON-compatible structure.
@@ -41,16 +43,29 @@ def to_serializable(
if exclude is None:
exclude = set()
if _ancestors is None:
_ancestors = set()
if isinstance(obj, (str, int, float, bool, type(None))):
return obj
if isinstance(obj, uuid.UUID):
return str(obj)
if isinstance(obj, (date, datetime)):
return obj.isoformat()
object_id = id(obj)
if object_id in _ancestors:
return f"<circular_ref:{type(obj).__name__}>"
new_ancestors = _ancestors | {object_id}
if isinstance(obj, (list, tuple, set)):
return [
to_serializable(
item, max_depth=max_depth, _current_depth=_current_depth + 1
item,
exclude=exclude,
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
)
for item in obj
]
@@ -61,6 +76,7 @@ def to_serializable(
exclude=exclude,
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
)
for key, value in obj.items()
if key not in exclude
@@ -71,12 +87,16 @@ def to_serializable(
obj=obj.model_dump(exclude=exclude),
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
)
except Exception:
try:
return {
_to_serializable_key(k): to_serializable(
v, max_depth=max_depth, _current_depth=_current_depth + 1
v,
max_depth=max_depth,
_current_depth=_current_depth + 1,
_ancestors=new_ancestors,
)
for k, v in obj.__dict__.items()
if k not in (exclude or set())