diff --git a/src/crewai/events/utils/console_formatter.py b/src/crewai/events/utils/console_formatter.py index 6278e92a2..fa1f26de3 100644 --- a/src/crewai/events/utils/console_formatter.py +++ b/src/crewai/events/utils/console_formatter.py @@ -1,25 +1,25 @@ -from typing import Any, Dict, Optional +from typing import Any from rich.console import Console +from rich.live import Live from rich.panel import Panel +from rich.syntax import Syntax from rich.text import Text from rich.tree import Tree -from rich.live import Live -from rich.syntax import Syntax class ConsoleFormatter: - current_crew_tree: Optional[Tree] = None - current_task_branch: Optional[Tree] = None - current_agent_branch: Optional[Tree] = None - current_tool_branch: Optional[Tree] = None - current_flow_tree: Optional[Tree] = None - current_method_branch: Optional[Tree] = None - current_lite_agent_branch: Optional[Tree] = None - tool_usage_counts: Dict[str, int] = {} - current_reasoning_branch: Optional[Tree] = None # Track reasoning status + current_crew_tree: Tree | None = None + current_task_branch: Tree | None = None + current_agent_branch: Tree | None = None + current_tool_branch: Tree | None = None + current_flow_tree: Tree | None = None + current_method_branch: Tree | None = None + current_lite_agent_branch: Tree | None = None + tool_usage_counts: dict[str, int] = {} + current_reasoning_branch: Tree | None = None # Track reasoning status _live_paused: bool = False - current_llm_tool_tree: Optional[Tree] = None + current_llm_tool_tree: Tree | None = None def __init__(self, verbose: bool = False): self.console = Console(width=None) @@ -29,7 +29,7 @@ class ConsoleFormatter: # instance so the previous render is replaced instead of writing a new one. # Once any non-Tree renderable is printed we stop the Live session so the # final Tree persists on the terminal. - self._live: Optional[Live] = None + self._live: Live | None = None def create_panel(self, content: Text, title: str, style: str = "blue") -> Panel: """Create a standardized panel with consistent styling.""" @@ -45,7 +45,7 @@ class ConsoleFormatter: title: str, name: str, status_style: str = "blue", - tool_args: Dict[str, Any] | str = "", + tool_args: dict[str, Any] | str = "", **fields, ) -> Text: """Create standardized status content with consistent formatting.""" @@ -70,7 +70,7 @@ class ConsoleFormatter: prefix: str, name: str, style: str = "blue", - status: Optional[str] = None, + status: str | None = None, ) -> None: """Update tree label with consistent formatting.""" label = Text() @@ -156,7 +156,7 @@ class ConsoleFormatter: def update_crew_tree( self, - tree: Optional[Tree], + tree: Tree | None, crew_name: str, source_id: str, status: str = "completed", @@ -196,7 +196,7 @@ class ConsoleFormatter: self.print_panel(content, title, style) - def create_crew_tree(self, crew_name: str, source_id: str) -> Optional[Tree]: + def create_crew_tree(self, crew_name: str, source_id: str) -> Tree | None: """Create and initialize a new crew tree with initial status.""" if not self.verbose: return None @@ -220,8 +220,8 @@ class ConsoleFormatter: return tree def create_task_branch( - self, crew_tree: Optional[Tree], task_id: str, task_name: Optional[str] = None - ) -> Optional[Tree]: + self, crew_tree: Tree | None, task_id: str, task_name: str | None = None + ) -> Tree | None: """Create and initialize a task branch.""" if not self.verbose: return None @@ -255,11 +255,11 @@ class ConsoleFormatter: def update_task_status( self, - crew_tree: Optional[Tree], + crew_tree: Tree | None, task_id: str, agent_role: str, status: str = "completed", - task_name: Optional[str] = None, + task_name: str | None = None, ) -> None: """Update task status in the tree.""" if not self.verbose or crew_tree is None: @@ -306,8 +306,8 @@ class ConsoleFormatter: self.print_panel(content, panel_title, style) def create_agent_branch( - self, task_branch: Optional[Tree], agent_role: str, crew_tree: Optional[Tree] - ) -> Optional[Tree]: + self, task_branch: Tree | None, agent_role: str, crew_tree: Tree | None + ) -> Tree | None: """Create and initialize an agent branch.""" if not self.verbose or not task_branch or not crew_tree: return None @@ -325,9 +325,9 @@ class ConsoleFormatter: def update_agent_status( self, - agent_branch: Optional[Tree], + agent_branch: Tree | None, agent_role: str, - crew_tree: Optional[Tree], + crew_tree: Tree | None, status: str = "completed", ) -> None: """Update agent status in the tree.""" @@ -336,7 +336,7 @@ class ConsoleFormatter: # altering the tree. Keeping it a no-op avoids duplicate status lines. return - def create_flow_tree(self, flow_name: str, flow_id: str) -> Optional[Tree]: + def create_flow_tree(self, flow_name: str, flow_id: str) -> Tree | None: """Create and initialize a flow tree.""" content = self.create_status_content( "Starting Flow Execution", flow_name, "blue", ID=flow_id @@ -356,7 +356,7 @@ class ConsoleFormatter: return flow_tree - def start_flow(self, flow_name: str, flow_id: str) -> Optional[Tree]: + def start_flow(self, flow_name: str, flow_id: str) -> Tree | None: """Initialize a flow execution tree.""" flow_tree = Tree("") flow_label = Text() @@ -376,7 +376,7 @@ class ConsoleFormatter: def update_flow_status( self, - flow_tree: Optional[Tree], + flow_tree: Tree | None, flow_name: str, flow_id: str, status: str = "completed", @@ -423,11 +423,11 @@ class ConsoleFormatter: def update_method_status( self, - method_branch: Optional[Tree], - flow_tree: Optional[Tree], + method_branch: Tree | None, + flow_tree: Tree | None, method_name: str, status: str = "running", - ) -> Optional[Tree]: + ) -> Tree | None: """Update method status in the flow tree.""" if not flow_tree: return None @@ -480,7 +480,7 @@ class ConsoleFormatter: def handle_llm_tool_usage_started( self, tool_name: str, - tool_args: Dict[str, Any] | str, + tool_args: dict[str, Any] | str, ): # Create status content for the tool usage content = self.create_status_content( @@ -520,11 +520,11 @@ class ConsoleFormatter: def handle_tool_usage_started( self, - agent_branch: Optional[Tree], + agent_branch: Tree | None, tool_name: str, - crew_tree: Optional[Tree], - tool_args: Dict[str, Any] | str = "", - ) -> Optional[Tree]: + crew_tree: Tree | None, + tool_args: dict[str, Any] | str = "", + ) -> Tree | None: """Handle tool usage started event.""" if not self.verbose: return None @@ -569,9 +569,9 @@ class ConsoleFormatter: def handle_tool_usage_finished( self, - tool_branch: Optional[Tree], + tool_branch: Tree | None, tool_name: str, - crew_tree: Optional[Tree], + crew_tree: Tree | None, ) -> None: """Handle tool usage finished event.""" if not self.verbose or tool_branch is None: @@ -600,10 +600,10 @@ class ConsoleFormatter: def handle_tool_usage_error( self, - tool_branch: Optional[Tree], + tool_branch: Tree | None, tool_name: str, error: str, - crew_tree: Optional[Tree], + crew_tree: Tree | None, ) -> None: """Handle tool usage error event.""" if not self.verbose: @@ -631,9 +631,9 @@ class ConsoleFormatter: def handle_llm_call_started( self, - agent_branch: Optional[Tree], - crew_tree: Optional[Tree], - ) -> Optional[Tree]: + agent_branch: Tree | None, + crew_tree: Tree | None, + ) -> Tree | None: """Handle LLM call started event.""" if not self.verbose: return None @@ -672,9 +672,9 @@ class ConsoleFormatter: def handle_llm_call_completed( self, - tool_branch: Optional[Tree], - agent_branch: Optional[Tree], - crew_tree: Optional[Tree], + tool_branch: Tree | None, + agent_branch: Tree | None, + crew_tree: Tree | None, ) -> None: """Handle LLM call completed event.""" if not self.verbose: @@ -736,7 +736,7 @@ class ConsoleFormatter: self.print() def handle_llm_call_failed( - self, tool_branch: Optional[Tree], error: str, crew_tree: Optional[Tree] + self, tool_branch: Tree | None, error: str, crew_tree: Tree | None ) -> None: """Handle LLM call failed event.""" if not self.verbose: @@ -789,7 +789,7 @@ class ConsoleFormatter: def handle_crew_test_started( self, crew_name: str, source_id: str, n_iterations: int - ) -> Optional[Tree]: + ) -> Tree | None: """Handle crew test started event.""" if not self.verbose: return None @@ -823,7 +823,7 @@ class ConsoleFormatter: return test_tree def handle_crew_test_completed( - self, flow_tree: Optional[Tree], crew_name: str + self, flow_tree: Tree | None, crew_name: str ) -> None: """Handle crew test completed event.""" if not self.verbose: @@ -913,7 +913,7 @@ class ConsoleFormatter: self.print_panel(failure_content, "Test Failure", "red") self.print() - def create_lite_agent_branch(self, lite_agent_role: str) -> Optional[Tree]: + def create_lite_agent_branch(self, lite_agent_role: str) -> Tree | None: """Create and initialize a lite agent branch.""" if not self.verbose: return None @@ -935,10 +935,10 @@ class ConsoleFormatter: def update_lite_agent_status( self, - lite_agent_branch: Optional[Tree], + lite_agent_branch: Tree | None, lite_agent_role: str, status: str = "completed", - **fields: Dict[str, Any], + **fields: dict[str, Any], ) -> None: """Update lite agent status in the tree.""" if not self.verbose or lite_agent_branch is None: @@ -981,7 +981,7 @@ class ConsoleFormatter: lite_agent_role: str, status: str = "started", error: Any = None, - **fields: Dict[str, Any], + **fields: dict[str, Any], ) -> None: """Handle lite agent execution events with consistent formatting.""" if not self.verbose: @@ -1006,9 +1006,9 @@ class ConsoleFormatter: def handle_knowledge_retrieval_started( self, - agent_branch: Optional[Tree], - crew_tree: Optional[Tree], - ) -> Optional[Tree]: + agent_branch: Tree | None, + crew_tree: Tree | None, + ) -> Tree | None: """Handle knowledge retrieval started event.""" if not self.verbose: return None @@ -1034,13 +1034,13 @@ class ConsoleFormatter: def handle_knowledge_retrieval_completed( self, - agent_branch: Optional[Tree], - crew_tree: Optional[Tree], + agent_branch: Tree | None, + crew_tree: Tree | None, retrieved_knowledge: Any, ) -> None: """Handle knowledge retrieval completed event.""" if not self.verbose: - return None + return branch_to_use = self.current_lite_agent_branch or agent_branch tree_to_use = branch_to_use or crew_tree @@ -1062,7 +1062,7 @@ class ConsoleFormatter: ) self.print(knowledge_panel) self.print() - return None + return knowledge_branch_found = False for child in branch_to_use.children: @@ -1111,18 +1111,18 @@ class ConsoleFormatter: def handle_knowledge_query_started( self, - agent_branch: Optional[Tree], + agent_branch: Tree | None, task_prompt: str, - crew_tree: Optional[Tree], + crew_tree: Tree | None, ) -> None: """Handle knowledge query generated event.""" if not self.verbose: - return None + return branch_to_use = self.current_lite_agent_branch or agent_branch tree_to_use = branch_to_use or crew_tree if branch_to_use is None or tree_to_use is None: - return None + return query_branch = branch_to_use.add("") self.update_tree_label( @@ -1134,9 +1134,9 @@ class ConsoleFormatter: def handle_knowledge_query_failed( self, - agent_branch: Optional[Tree], + agent_branch: Tree | None, error: str, - crew_tree: Optional[Tree], + crew_tree: Tree | None, ) -> None: """Handle knowledge query failed event.""" if not self.verbose: @@ -1159,18 +1159,18 @@ class ConsoleFormatter: def handle_knowledge_query_completed( self, - agent_branch: Optional[Tree], - crew_tree: Optional[Tree], + agent_branch: Tree | None, + crew_tree: Tree | None, ) -> None: """Handle knowledge query completed event.""" if not self.verbose: - return None + return branch_to_use = self.current_lite_agent_branch or agent_branch tree_to_use = branch_to_use or crew_tree if branch_to_use is None or tree_to_use is None: - return None + return query_branch = branch_to_use.add("") self.update_tree_label(query_branch, "✅", "Knowledge Query Completed", "green") @@ -1180,9 +1180,9 @@ class ConsoleFormatter: def handle_knowledge_search_query_failed( self, - agent_branch: Optional[Tree], + agent_branch: Tree | None, error: str, - crew_tree: Optional[Tree], + crew_tree: Tree | None, ) -> None: """Handle knowledge search query failed event.""" if not self.verbose: @@ -1207,10 +1207,10 @@ class ConsoleFormatter: def handle_reasoning_started( self, - agent_branch: Optional[Tree], + agent_branch: Tree | None, attempt: int, - crew_tree: Optional[Tree], - ) -> Optional[Tree]: + crew_tree: Tree | None, + ) -> Tree | None: """Handle agent reasoning started (or refinement) event.""" if not self.verbose: return None @@ -1249,7 +1249,7 @@ class ConsoleFormatter: self, plan: str, ready: bool, - crew_tree: Optional[Tree], + crew_tree: Tree | None, ) -> None: """Handle agent reasoning completed event.""" if not self.verbose: @@ -1292,7 +1292,7 @@ class ConsoleFormatter: def handle_reasoning_failed( self, error: str, - crew_tree: Optional[Tree], + crew_tree: Tree | None, ) -> None: """Handle agent reasoning failure event.""" if not self.verbose: @@ -1329,7 +1329,7 @@ class ConsoleFormatter: def handle_agent_logs_started( self, agent_role: str, - task_description: Optional[str] = None, + task_description: str | None = None, verbose: bool = False, ) -> None: """Handle agent logs started event.""" @@ -1367,19 +1367,29 @@ class ConsoleFormatter: if not verbose: return - from crewai.agents.parser import AgentAction, AgentFinish import json import re + from crewai.agents.parser import AgentAction, AgentFinish + agent_role = agent_role.partition("\n")[0] if isinstance(formatted_answer, AgentAction): thought = re.sub(r"\n+", "\n", formatted_answer.thought) - formatted_json = json.dumps( - formatted_answer.tool_input, - indent=2, - ensure_ascii=False, - ) + + try: + parsed_input = json.loads(formatted_answer.tool_input) + formatted_json = json.dumps( + parsed_input, + indent=2, + ensure_ascii=False, + ) + except (json.JSONDecodeError, TypeError): + formatted_json = json.dumps( + formatted_answer.tool_input, + indent=2, + ensure_ascii=False, + ) # Create content for the action panel content = Text() @@ -1473,9 +1483,9 @@ class ConsoleFormatter: def handle_memory_retrieval_started( self, - agent_branch: Optional[Tree], - crew_tree: Optional[Tree], - ) -> Optional[Tree]: + agent_branch: Tree | None, + crew_tree: Tree | None, + ) -> Tree | None: if not self.verbose: return None @@ -1497,13 +1507,13 @@ class ConsoleFormatter: def handle_memory_retrieval_completed( self, - agent_branch: Optional[Tree], - crew_tree: Optional[Tree], + agent_branch: Tree | None, + crew_tree: Tree | None, memory_content: str, retrieval_time_ms: float, ) -> None: if not self.verbose: - return None + return branch_to_use = self.current_lite_agent_branch or agent_branch tree_to_use = branch_to_use or crew_tree @@ -1528,7 +1538,7 @@ class ConsoleFormatter: if branch_to_use is None or tree_to_use is None: add_panel() - return None + return memory_branch_found = False for child in branch_to_use.children: @@ -1565,13 +1575,13 @@ class ConsoleFormatter: def handle_memory_query_completed( self, - agent_branch: Optional[Tree], + agent_branch: Tree | None, source_type: str, query_time_ms: float, - crew_tree: Optional[Tree], + crew_tree: Tree | None, ) -> None: if not self.verbose: - return None + return branch_to_use = self.current_lite_agent_branch or agent_branch tree_to_use = branch_to_use or crew_tree @@ -1580,7 +1590,7 @@ class ConsoleFormatter: branch_to_use = tree_to_use if branch_to_use is None: - return None + return memory_type = source_type.replace("_", " ").title() @@ -1598,13 +1608,13 @@ class ConsoleFormatter: def handle_memory_query_failed( self, - agent_branch: Optional[Tree], - crew_tree: Optional[Tree], + agent_branch: Tree | None, + crew_tree: Tree | None, error: str, source_type: str, ) -> None: if not self.verbose: - return None + return branch_to_use = self.current_lite_agent_branch or agent_branch tree_to_use = branch_to_use or crew_tree @@ -1613,7 +1623,7 @@ class ConsoleFormatter: branch_to_use = tree_to_use if branch_to_use is None: - return None + return memory_type = source_type.replace("_", " ").title() @@ -1630,16 +1640,16 @@ class ConsoleFormatter: break def handle_memory_save_started( - self, agent_branch: Optional[Tree], crew_tree: Optional[Tree] + self, agent_branch: Tree | None, crew_tree: Tree | None ) -> None: if not self.verbose: - return None + return branch_to_use = agent_branch or self.current_lite_agent_branch tree_to_use = branch_to_use or crew_tree if tree_to_use is None: - return None + return for child in tree_to_use.children: if "Memory Update" in str(child.label): @@ -1655,19 +1665,19 @@ class ConsoleFormatter: def handle_memory_save_completed( self, - agent_branch: Optional[Tree], - crew_tree: Optional[Tree], + agent_branch: Tree | None, + crew_tree: Tree | None, save_time_ms: float, source_type: str, ) -> None: if not self.verbose: - return None + return branch_to_use = agent_branch or self.current_lite_agent_branch tree_to_use = branch_to_use or crew_tree if tree_to_use is None: - return None + return memory_type = source_type.replace("_", " ").title() content = f"✅ {memory_type} Memory Saved ({save_time_ms:.2f}ms)" @@ -1685,19 +1695,19 @@ class ConsoleFormatter: def handle_memory_save_failed( self, - agent_branch: Optional[Tree], + agent_branch: Tree | None, error: str, source_type: str, - crew_tree: Optional[Tree], + crew_tree: Tree | None, ) -> None: if not self.verbose: - return None + return branch_to_use = agent_branch or self.current_lite_agent_branch tree_to_use = branch_to_use or crew_tree if branch_to_use is None or tree_to_use is None: - return None + return memory_type = source_type.replace("_", " ").title() content = f"❌ {memory_type} Memory Save Failed" @@ -1738,7 +1748,7 @@ class ConsoleFormatter: def handle_guardrail_completed( self, success: bool, - error: Optional[str], + error: str | None, retry_count: int, ) -> None: """Display guardrail evaluation result. diff --git a/tests/utilities/test_console_formatter_json.py b/tests/utilities/test_console_formatter_json.py new file mode 100644 index 000000000..f8ad2c8a5 --- /dev/null +++ b/tests/utilities/test_console_formatter_json.py @@ -0,0 +1,174 @@ +import json +from unittest.mock import patch + +from crewai.agents.parser import AgentAction, AgentFinish +from crewai.events.utils.console_formatter import ConsoleFormatter + + +class TestConsoleFormatterJSON: + """Test ConsoleFormatter JSON formatting functionality.""" + + def test_handle_agent_logs_execution_with_json_tool_input(self): + """Test that JSON tool inputs are properly formatted.""" + formatter = ConsoleFormatter() + + json_input = ( + '{"task": "Research AI", "context": "Machine Learning", ' + '"priority": "high"}' + ) + agent_action = AgentAction( + thought="I need to research this topic", + tool="research_tool", + tool_input=json_input, + text="Full agent text", + result="Research completed" + ) + + with patch.object(formatter, 'print') as mock_print: + formatter.handle_agent_logs_execution(agent_action, "Test Agent") + + assert mock_print.call_count == 4 + + def test_handle_agent_logs_execution_with_malformed_json(self): + """Test that malformed JSON falls back to string formatting.""" + formatter = ConsoleFormatter() + + malformed_json = ( + '{"task": "Research AI", "context": "Machine Learning"' + ) + agent_action = AgentAction( + thought="I need to research this topic", + tool="research_tool", + tool_input=malformed_json, + text="Full agent text", + result="Research completed" + ) + + with patch.object(formatter, 'print') as mock_print: + formatter.handle_agent_logs_execution(agent_action, "Test Agent") + + assert mock_print.call_count == 4 + + def test_handle_agent_logs_execution_with_non_json_string(self): + """Test that non-JSON strings are handled properly.""" + formatter = ConsoleFormatter() + + plain_string = "search for weather in San Francisco" + agent_action = AgentAction( + thought="I need to search for weather", + tool="search_tool", + tool_input=plain_string, + text="Full agent text", + result="Weather found" + ) + + with patch.object(formatter, 'print') as mock_print: + formatter.handle_agent_logs_execution(agent_action, "Test Agent") + + assert mock_print.call_count == 4 + + def test_handle_agent_logs_execution_with_complex_json(self): + """Test with complex nested JSON structures.""" + formatter = ConsoleFormatter() + + complex_json = json.dumps({ + "query": { + "type": "research", + "parameters": { + "topic": "AI in healthcare", + "depth": "comprehensive", + "sources": ["academic", "industry", "news"] + } + }, + "filters": ["recent", "peer-reviewed"], + "limit": 50 + }) + + agent_action = AgentAction( + thought="Complex research query", + tool="advanced_search", + tool_input=complex_json, + text="Full agent text", + result="Complex search completed" + ) + + with patch.object(formatter, 'print') as mock_print: + formatter.handle_agent_logs_execution(agent_action, "Test Agent") + + assert mock_print.call_count == 4 + + def test_handle_agent_logs_execution_with_agent_finish(self): + """Test that AgentFinish objects are handled correctly.""" + formatter = ConsoleFormatter() + + agent_finish = AgentFinish( + thought="Task completed", + output="Final result of the task", + text="Full agent text" + ) + + with patch.object(formatter, 'print') as mock_print: + formatter.handle_agent_logs_execution(agent_finish, "Test Agent") + + assert mock_print.call_count == 2 + + def test_json_parsing_preserves_structure(self): + """Test that JSON parsing preserves the original structure.""" + formatter = ConsoleFormatter() + + original_data = { + "nested": { + "array": [1, 2, 3], + "string": "test", + "boolean": True, + "null": None + } + } + json_string = json.dumps(original_data) + + agent_action = AgentAction( + thought="Testing structure preservation", + tool="test_tool", + tool_input=json_string, + text="Full agent text", + result="Test completed" + ) + + with patch.object(formatter, 'print') as mock_print: + formatter.handle_agent_logs_execution(agent_action, "Test Agent") + + assert mock_print.call_count == 4 + + def test_empty_tool_input_handling(self): + """Test handling of empty tool input.""" + formatter = ConsoleFormatter() + + agent_action = AgentAction( + thought="Empty input test", + tool="test_tool", + tool_input="", + text="Full agent text", + result="Test completed" + ) + + with patch.object(formatter, 'print') as mock_print: + formatter.handle_agent_logs_execution(agent_action, "Test Agent") + + assert mock_print.call_count == 4 + + def test_numeric_tool_input_handling(self): + """Test handling of numeric tool input.""" + formatter = ConsoleFormatter() + + agent_action = AgentAction( + thought="Numeric input test", + tool="test_tool", + tool_input="42", + text="Full agent text", + result="Test completed" + ) + + with patch.object(formatter, 'print') as mock_print: + formatter.handle_agent_logs_execution(agent_action, "Test Agent") + + assert mock_print.call_count == 4