diff --git a/lib/crewai/src/crewai/experimental/crew_agent_executor_flow.py b/lib/crewai/src/crewai/experimental/crew_agent_executor_flow.py index 08497ab17..3ede5ef64 100644 --- a/lib/crewai/src/crewai/experimental/crew_agent_executor_flow.py +++ b/lib/crewai/src/crewai/experimental/crew_agent_executor_flow.py @@ -6,6 +6,8 @@ from uuid import uuid4 from pydantic import BaseModel, Field, GetCoreSchemaHandler from pydantic_core import CoreSchema, core_schema +from rich.console import Console +from rich.text import Text from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin from crewai.agents.parser import ( @@ -146,6 +148,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): self.request_within_rpm_limit = request_within_rpm_limit self.response_model = response_model self.log_error_after = 3 + self._console: Console = Console() # Error context storage for recovery self._last_parser_error: OutputParserError | None = None @@ -220,7 +223,7 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): return self._state.iterations @start() - def initialize_reasoning(self) -> str: + def initialize_reasoning(self) -> Literal["initialized"]: """Initialize the reasoning flow and emit agent start logs.""" self._show_start_logs() return "initialized" @@ -248,10 +251,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): Returns routing decision based on parsing result. """ - self._printer.print( - content=f"🤖 call_llm_and_parse: About to call LLM (iteration {self.state.iterations})", - color="blue", - ) try: enforce_rpm_limit(self.request_within_rpm_limit) @@ -270,16 +269,18 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): formatted_answer = process_llm_response(answer, self.use_stop_words) self.state.current_answer = formatted_answer - # Debug: Check what we parsed - if "Final Answer:" in answer: - self._printer.print( - content=f"⚠️ LLM returned Final Answer but parsed as: {type(formatted_answer).__name__}", - color="yellow", + if "Final Answer:" in answer and isinstance(formatted_answer, AgentAction): + warning_text = Text() + warning_text.append("⚠️ ", style="yellow bold") + warning_text.append( + f"LLM returned 'Final Answer:' but parsed as AgentAction (tool: {formatted_answer.tool})", + style="yellow", ) - if isinstance(formatted_answer, AgentAction): - self._printer.print( - content=f"Answer preview: {answer[:200]}...", color="yellow" - ) + self._console.print(warning_text) + preview_text = Text() + preview_text.append("Answer preview: ", style="yellow") + preview_text.append(f"{answer[:200]}...", style="yellow dim") + self._console.print(preview_text) return "parsed" @@ -300,11 +301,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): @router(call_llm_and_parse) def route_by_answer_type(self) -> str: """Route based on whether answer is AgentAction or AgentFinish.""" - answer_type = type(self.state.current_answer).__name__ - self._printer.print( - content=f"🚦 route_by_answer_type: Got {answer_type}", - color="yellow", - ) if isinstance(self.state.current_answer, AgentAction): return "execute_tool" return "agent_finished" @@ -360,7 +356,10 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): return "tool_completed" except Exception as e: - self._printer.print(content=f"Error in tool execution: {e}", color="red") + error_text = Text() + error_text.append("❌ Error in tool execution: ", style="red bold") + error_text.append(str(e), style="red") + self._console.print(error_text) raise @listen("initialized") @@ -371,10 +370,6 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): @router(or_(initialize_reasoning, continue_iteration)) def check_max_iterations(self) -> str: """Check if max iterations reached before proceeding with reasoning.""" - self._printer.print( - content=f"🔄 check_max_iterations: iteration {self.state.iterations}/{self.max_iter}", - color="cyan", - ) if has_reached_max_iterations(self.state.iterations, self.max_iter): return "force_final_answer" return "continue_reasoning" @@ -383,27 +378,35 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): def increment_and_continue(self) -> str: """Increment iteration counter and loop back for next iteration.""" self.state.iterations += 1 - self._printer.print( - content=f"+ increment_and_continue: Incremented to iteration {self.state.iterations}, looping back", - color="magenta", + inc_text = Text() + inc_text.append("+ increment_and_continue: ", style="magenta bold") + inc_text.append( + f"Incremented to iteration {self.state.iterations}, looping back", + style="magenta", ) + self._console.print(inc_text) return "initialized" @listen(or_("agent_finished", "tool_result_is_final")) def finalize(self) -> str: """Finalize execution and emit completion logs.""" if self.state.current_answer is None: - self._printer.print( - content="⚠️ Finalize called but no answer in state - skipping", - color="yellow", + skip_text = Text() + skip_text.append("⚠️ ", style="yellow bold") + skip_text.append( + "Finalize called but no answer in state - skipping", style="yellow" ) + self._console.print(skip_text) return "skipped" if not isinstance(self.state.current_answer, AgentFinish): - self._printer.print( - content=f"⚠️ Finalize called with {type(self.state.current_answer).__name__} instead of AgentFinish - skipping", - color="yellow", + skip_text = Text() + skip_text.append("⚠️ ", style="yellow bold") + skip_text.append( + f"Finalize called with {type(self.state.current_answer).__name__} instead of AgentFinish - skipping", + style="yellow", ) + self._console.print(skip_text) return "skipped" self.state.is_finished = True @@ -511,10 +514,13 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): return {"output": formatted_answer.output} except AssertionError: - self._printer.print( - content="Agent failed to reach a final answer. This is likely a bug - please report it.", - color="red", + fail_text = Text() + fail_text.append("❌ ", style="red bold") + fail_text.append( + "Agent failed to reach a final answer. This is likely a bug - please report it.", + style="red", ) + self._console.print(fail_text) raise except Exception as e: handle_unknown_error(self._printer, e) @@ -624,10 +630,13 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): ) if train_iteration is None or not isinstance(train_iteration, int): - self._printer.print( - content="Invalid or missing train iteration. Cannot save training data.", - color="red", + train_error = Text() + train_error.append("❌ ", style="red bold") + train_error.append( + "Invalid or missing train iteration. Cannot save training data.", + style="red", ) + self._console.print(train_error) return training_handler = CrewTrainingHandler(TRAINING_DATA_FILE) @@ -647,13 +656,14 @@ class CrewAgentExecutorFlow(Flow[AgentReActState], CrewAgentExecutorMixin): if train_iteration in agent_training_data: agent_training_data[train_iteration]["improved_output"] = result.output else: - self._printer.print( - content=( - f"No existing training data for agent {agent_id} and iteration " - f"{train_iteration}. Cannot save improved output." - ), - color="red", + train_error = Text() + train_error.append("❌ ", style="red bold") + train_error.append( + f"No existing training data for agent {agent_id} and iteration " + f"{train_iteration}. Cannot save improved output.", + style="red", ) + self._console.print(train_error) return # Update the training data and save