diff --git a/src/crewai/agents/agent_builder/base_agent_executor_mixin.py b/src/crewai/agents/agent_builder/base_agent_executor_mixin.py index c46c46844..bd126799a 100644 --- a/src/crewai/agents/agent_builder/base_agent_executor_mixin.py +++ b/src/crewai/agents/agent_builder/base_agent_executor_mixin.py @@ -7,6 +7,7 @@ from crewai.utilities import I18N from crewai.utilities.converter import ConverterError from crewai.utilities.evaluators.task_evaluator import TaskEvaluator from crewai.utilities.printer import Printer +from crewai.utilities.events.event_listener import event_listener if TYPE_CHECKING: from crewai.agents.agent_builder.base_agent import BaseAgent @@ -125,33 +126,38 @@ class CrewAgentExecutorMixin: def _ask_human_input(self, final_answer: str) -> str: """Prompt human input with mode-appropriate messaging.""" - self._printer.print( - content=f"\033[1m\033[95m ## Final Result:\033[00m \033[92m{final_answer}\033[00m" - ) - - # Training mode prompt (single iteration) - if self.crew and getattr(self.crew, "_train", False): - prompt = ( - "\n\n=====\n" - "## TRAINING MODE: Provide feedback to improve the agent's performance.\n" - "This will be used to train better versions of the agent.\n" - "Please provide detailed feedback about the result quality and reasoning process.\n" - "=====\n" - ) - # Regular human-in-the-loop prompt (multiple iterations) - else: - prompt = ( - "\n\n=====\n" - "## HUMAN FEEDBACK: Provide feedback on the Final Result and Agent's actions.\n" - "Please follow these guidelines:\n" - " - If you are happy with the result, simply hit Enter without typing anything.\n" - " - Otherwise, provide specific improvement requests.\n" - " - You can provide multiple rounds of feedback until satisfied.\n" - "=====\n" + event_listener.formatter.pause_live_updates() + + try: + self._printer.print( + content=f"\033[1m\033[95m ## Final Result:\033[00m \033[92m{final_answer}\033[00m" ) - self._printer.print(content=prompt, color="bold_yellow") - response = input() - if response.strip() != "": - self._printer.print(content="\nProcessing your feedback...", color="cyan") - return response + # Training mode prompt (single iteration) + if self.crew and getattr(self.crew, "_train", False): + prompt = ( + "\n\n=====\n" + "## TRAINING MODE: Provide feedback to improve the agent's performance.\n" + "This will be used to train better versions of the agent.\n" + "Please provide detailed feedback about the result quality and reasoning process.\n" + "=====\n" + ) + # Regular human-in-the-loop prompt (multiple iterations) + else: + prompt = ( + "\n\n=====\n" + "## HUMAN FEEDBACK: Provide feedback on the Final Result and Agent's actions.\n" + "Please follow these guidelines:\n" + " - If you are happy with the result, simply hit Enter without typing anything.\n" + " - Otherwise, provide specific improvement requests.\n" + " - You can provide multiple rounds of feedback until satisfied.\n" + "=====\n" + ) + + self._printer.print(content=prompt, color="bold_yellow") + response = input() + if response.strip() != "": + self._printer.print(content="\nProcessing your feedback...", color="cyan") + return response + finally: + event_listener.formatter.resume_live_updates() diff --git a/src/crewai/utilities/events/utils/console_formatter.py b/src/crewai/utilities/events/utils/console_formatter.py index bcf12caaf..721bea6d8 100644 --- a/src/crewai/utilities/events/utils/console_formatter.py +++ b/src/crewai/utilities/events/utils/console_formatter.py @@ -17,6 +17,7 @@ class ConsoleFormatter: current_lite_agent_branch: Optional[Tree] = None tool_usage_counts: Dict[str, int] = {} current_reasoning_branch: Optional[Tree] = None # Track reasoning status + _live_paused: bool = False current_llm_tool_tree: Optional[Tree] = None def __init__(self, verbose: bool = False): @@ -119,6 +120,19 @@ class ConsoleFormatter: # Finally, pass through to the regular Console.print implementation self.console.print(*args, **kwargs) + def pause_live_updates(self) -> None: + """Pause Live session updates to allow for human input without interference.""" + if not self._live_paused: + if self._live: + self._live.stop() + self._live = None + self._live_paused = True + + def resume_live_updates(self) -> None: + """Resume Live session updates after human input is complete.""" + if self._live_paused: + self._live_paused = False + def print_panel( self, content: Text, title: str, style: str = "blue", is_flow: bool = False ) -> None: diff --git a/tests/test_flow_human_input_integration.py b/tests/test_flow_human_input_integration.py new file mode 100644 index 000000000..847a7d779 --- /dev/null +++ b/tests/test_flow_human_input_integration.py @@ -0,0 +1,167 @@ +import pytest +from unittest.mock import patch, MagicMock +from crewai.utilities.events.event_listener import event_listener + + +class TestFlowHumanInputIntegration: + """Test integration between Flow execution and human input functionality.""" + + def test_console_formatter_pause_resume_methods(self): + """Test that ConsoleFormatter pause/resume methods work correctly.""" + formatter = event_listener.formatter + + original_paused_state = formatter._live_paused + + try: + formatter._live_paused = False + + formatter.pause_live_updates() + assert formatter._live_paused + + formatter.resume_live_updates() + assert not formatter._live_paused + finally: + formatter._live_paused = original_paused_state + + @patch('builtins.input', return_value='') + def test_human_input_pauses_flow_updates(self, mock_input): + """Test that human input pauses Flow status updates.""" + from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin + + executor = CrewAgentExecutorMixin() + executor.crew = MagicMock() + executor.crew._train = False + executor._printer = MagicMock() + + formatter = event_listener.formatter + + original_paused_state = formatter._live_paused + + try: + formatter._live_paused = False + + with patch.object(formatter, 'pause_live_updates') as mock_pause, \ + patch.object(formatter, 'resume_live_updates') as mock_resume: + + result = executor._ask_human_input("Test result") + + mock_pause.assert_called_once() + mock_resume.assert_called_once() + mock_input.assert_called_once() + assert result == '' + finally: + formatter._live_paused = original_paused_state + + @patch('builtins.input', side_effect=['feedback', '']) + def test_multiple_human_input_rounds(self, mock_input): + """Test multiple rounds of human input with Flow status management.""" + from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin + + executor = CrewAgentExecutorMixin() + executor.crew = MagicMock() + executor.crew._train = False + executor._printer = MagicMock() + + formatter = event_listener.formatter + + original_paused_state = formatter._live_paused + + try: + pause_calls = [] + resume_calls = [] + + def track_pause(): + pause_calls.append(True) + + def track_resume(): + resume_calls.append(True) + + with patch.object(formatter, 'pause_live_updates', side_effect=track_pause), \ + patch.object(formatter, 'resume_live_updates', side_effect=track_resume): + + result1 = executor._ask_human_input("Test result 1") + assert result1 == 'feedback' + + result2 = executor._ask_human_input("Test result 2") + assert result2 == '' + + assert len(pause_calls) == 2 + assert len(resume_calls) == 2 + finally: + formatter._live_paused = original_paused_state + + def test_pause_resume_with_no_live_session(self): + """Test pause/resume methods handle case when no Live session exists.""" + formatter = event_listener.formatter + + original_live = formatter._live + original_paused_state = formatter._live_paused + + try: + formatter._live = None + formatter._live_paused = False + + formatter.pause_live_updates() + formatter.resume_live_updates() + + assert not formatter._live_paused + finally: + formatter._live = original_live + formatter._live_paused = original_paused_state + + def test_pause_resume_exception_handling(self): + """Test that resume is called even if exception occurs during human input.""" + from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin + + executor = CrewAgentExecutorMixin() + executor.crew = MagicMock() + executor.crew._train = False + executor._printer = MagicMock() + + formatter = event_listener.formatter + + original_paused_state = formatter._live_paused + + try: + with patch.object(formatter, 'pause_live_updates') as mock_pause, \ + patch.object(formatter, 'resume_live_updates') as mock_resume, \ + patch('builtins.input', side_effect=KeyboardInterrupt("Test exception")): + + with pytest.raises(KeyboardInterrupt): + executor._ask_human_input("Test result") + + mock_pause.assert_called_once() + mock_resume.assert_called_once() + finally: + formatter._live_paused = original_paused_state + + def test_training_mode_human_input(self): + """Test human input in training mode.""" + from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin + + executor = CrewAgentExecutorMixin() + executor.crew = MagicMock() + executor.crew._train = True + executor._printer = MagicMock() + + formatter = event_listener.formatter + + original_paused_state = formatter._live_paused + + try: + with patch.object(formatter, 'pause_live_updates') as mock_pause, \ + patch.object(formatter, 'resume_live_updates') as mock_resume, \ + patch('builtins.input', return_value='training feedback'): + + result = executor._ask_human_input("Test result") + + mock_pause.assert_called_once() + mock_resume.assert_called_once() + assert result == 'training feedback' + + executor._printer.print.assert_called() + call_args = [call[1]['content'] for call in executor._printer.print.call_args_list] + training_prompt_found = any('TRAINING MODE' in content for content in call_args) + assert training_prompt_found + finally: + formatter._live_paused = original_paused_state diff --git a/tests/utilities/test_console_formatter_pause_resume.py b/tests/utilities/test_console_formatter_pause_resume.py new file mode 100644 index 000000000..a00e38e6d --- /dev/null +++ b/tests/utilities/test_console_formatter_pause_resume.py @@ -0,0 +1,117 @@ +import pytest +from unittest.mock import MagicMock, patch +from rich.tree import Tree +from rich.live import Live +from crewai.utilities.events.utils.console_formatter import ConsoleFormatter + + +class TestConsoleFormatterPauseResume: + """Test ConsoleFormatter pause/resume functionality.""" + + def test_pause_live_updates_with_active_session(self): + """Test pausing when Live session is active.""" + formatter = ConsoleFormatter() + + mock_live = MagicMock(spec=Live) + formatter._live = mock_live + formatter._live_paused = False + + formatter.pause_live_updates() + + mock_live.stop.assert_called_once() + assert formatter._live_paused + + def test_pause_live_updates_when_already_paused(self): + """Test pausing when already paused does nothing.""" + formatter = ConsoleFormatter() + + mock_live = MagicMock(spec=Live) + formatter._live = mock_live + formatter._live_paused = True + + formatter.pause_live_updates() + + mock_live.stop.assert_not_called() + assert formatter._live_paused + + def test_pause_live_updates_with_no_session(self): + """Test pausing when no Live session exists.""" + formatter = ConsoleFormatter() + + formatter._live = None + formatter._live_paused = False + + formatter.pause_live_updates() + + assert formatter._live_paused + + def test_resume_live_updates_when_paused(self): + """Test resuming when paused.""" + formatter = ConsoleFormatter() + + formatter._live_paused = True + + formatter.resume_live_updates() + + assert not formatter._live_paused + + def test_resume_live_updates_when_not_paused(self): + """Test resuming when not paused does nothing.""" + formatter = ConsoleFormatter() + + formatter._live_paused = False + + formatter.resume_live_updates() + + assert not formatter._live_paused + + def test_print_after_resume_restarts_live_session(self): + """Test that printing a Tree after resume creates new Live session.""" + formatter = ConsoleFormatter() + + formatter._live_paused = True + formatter._live = None + + formatter.resume_live_updates() + assert not formatter._live_paused + + tree = Tree("Test") + + with patch('crewai.utilities.events.utils.console_formatter.Live') as mock_live_class: + mock_live_instance = MagicMock() + mock_live_class.return_value = mock_live_instance + + formatter.print(tree) + + mock_live_class.assert_called_once() + mock_live_instance.start.assert_called_once() + assert formatter._live == mock_live_instance + + def test_multiple_pause_resume_cycles(self): + """Test multiple pause/resume cycles work correctly.""" + formatter = ConsoleFormatter() + + mock_live = MagicMock(spec=Live) + formatter._live = mock_live + formatter._live_paused = False + + formatter.pause_live_updates() + assert formatter._live_paused + mock_live.stop.assert_called_once() + assert formatter._live is None # Live session should be cleared + + formatter.resume_live_updates() + assert not formatter._live_paused + + formatter.pause_live_updates() + assert formatter._live_paused + + formatter.resume_live_updates() + assert not formatter._live_paused + + def test_pause_resume_state_initialization(self): + """Test that _live_paused is properly initialized.""" + formatter = ConsoleFormatter() + + assert hasattr(formatter, '_live_paused') + assert not formatter._live_paused