Compare commits

...

2 Commits

Author SHA1 Message Date
Devin AI
19023a5fcc Fix lint: Remove unused pytest import
- Remove unused pytest import from test_console_formatter_pause_resume.py
- Fixes F401 lint error identified in CI

Co-Authored-By: João <joao@crewai.com>
2025-06-11 04:15:32 +00:00
Devin AI
417b1cd8a9 Fix issue 2993: Prevent Flow status logs from hiding human input
- Add pause_live_updates() and resume_live_updates() methods to ConsoleFormatter
- Modify _ask_human_input() to pause Flow status updates during human input
- Add comprehensive tests for pause/resume functionality and integration
- Ensure Live session is properly managed during human input prompts
- Fix prevents Flow status logs from overwriting user input prompts

Fixes #2993

Co-Authored-By: João <joao@crewai.com>
2025-06-11 04:08:27 +00:00
4 changed files with 331 additions and 28 deletions

View File

@@ -7,6 +7,7 @@ from crewai.utilities import I18N
from crewai.utilities.converter import ConverterError
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.printer import Printer
from crewai.utilities.events.event_listener import event_listener
if TYPE_CHECKING:
from crewai.agents.agent_builder.base_agent import BaseAgent
@@ -125,33 +126,38 @@ class CrewAgentExecutorMixin:
def _ask_human_input(self, final_answer: str) -> str:
"""Prompt human input with mode-appropriate messaging."""
self._printer.print(
content=f"\033[1m\033[95m ## Final Result:\033[00m \033[92m{final_answer}\033[00m"
)
# Training mode prompt (single iteration)
if self.crew and getattr(self.crew, "_train", False):
prompt = (
"\n\n=====\n"
"## TRAINING MODE: Provide feedback to improve the agent's performance.\n"
"This will be used to train better versions of the agent.\n"
"Please provide detailed feedback about the result quality and reasoning process.\n"
"=====\n"
)
# Regular human-in-the-loop prompt (multiple iterations)
else:
prompt = (
"\n\n=====\n"
"## HUMAN FEEDBACK: Provide feedback on the Final Result and Agent's actions.\n"
"Please follow these guidelines:\n"
" - If you are happy with the result, simply hit Enter without typing anything.\n"
" - Otherwise, provide specific improvement requests.\n"
" - You can provide multiple rounds of feedback until satisfied.\n"
"=====\n"
event_listener.formatter.pause_live_updates()
try:
self._printer.print(
content=f"\033[1m\033[95m ## Final Result:\033[00m \033[92m{final_answer}\033[00m"
)
self._printer.print(content=prompt, color="bold_yellow")
response = input()
if response.strip() != "":
self._printer.print(content="\nProcessing your feedback...", color="cyan")
return response
# Training mode prompt (single iteration)
if self.crew and getattr(self.crew, "_train", False):
prompt = (
"\n\n=====\n"
"## TRAINING MODE: Provide feedback to improve the agent's performance.\n"
"This will be used to train better versions of the agent.\n"
"Please provide detailed feedback about the result quality and reasoning process.\n"
"=====\n"
)
# Regular human-in-the-loop prompt (multiple iterations)
else:
prompt = (
"\n\n=====\n"
"## HUMAN FEEDBACK: Provide feedback on the Final Result and Agent's actions.\n"
"Please follow these guidelines:\n"
" - If you are happy with the result, simply hit Enter without typing anything.\n"
" - Otherwise, provide specific improvement requests.\n"
" - You can provide multiple rounds of feedback until satisfied.\n"
"=====\n"
)
self._printer.print(content=prompt, color="bold_yellow")
response = input()
if response.strip() != "":
self._printer.print(content="\nProcessing your feedback...", color="cyan")
return response
finally:
event_listener.formatter.resume_live_updates()

View File

@@ -17,6 +17,7 @@ class ConsoleFormatter:
current_lite_agent_branch: Optional[Tree] = None
tool_usage_counts: Dict[str, int] = {}
current_reasoning_branch: Optional[Tree] = None # Track reasoning status
_live_paused: bool = False
current_llm_tool_tree: Optional[Tree] = None
def __init__(self, verbose: bool = False):
@@ -119,6 +120,19 @@ class ConsoleFormatter:
# Finally, pass through to the regular Console.print implementation
self.console.print(*args, **kwargs)
def pause_live_updates(self) -> None:
"""Pause Live session updates to allow for human input without interference."""
if not self._live_paused:
if self._live:
self._live.stop()
self._live = None
self._live_paused = True
def resume_live_updates(self) -> None:
"""Resume Live session updates after human input is complete."""
if self._live_paused:
self._live_paused = False
def print_panel(
self, content: Text, title: str, style: str = "blue", is_flow: bool = False
) -> None:

View File

@@ -0,0 +1,167 @@
import pytest
from unittest.mock import patch, MagicMock
from crewai.utilities.events.event_listener import event_listener
class TestFlowHumanInputIntegration:
"""Test integration between Flow execution and human input functionality."""
def test_console_formatter_pause_resume_methods(self):
"""Test that ConsoleFormatter pause/resume methods work correctly."""
formatter = event_listener.formatter
original_paused_state = formatter._live_paused
try:
formatter._live_paused = False
formatter.pause_live_updates()
assert formatter._live_paused
formatter.resume_live_updates()
assert not formatter._live_paused
finally:
formatter._live_paused = original_paused_state
@patch('builtins.input', return_value='')
def test_human_input_pauses_flow_updates(self, mock_input):
"""Test that human input pauses Flow status updates."""
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
executor = CrewAgentExecutorMixin()
executor.crew = MagicMock()
executor.crew._train = False
executor._printer = MagicMock()
formatter = event_listener.formatter
original_paused_state = formatter._live_paused
try:
formatter._live_paused = False
with patch.object(formatter, 'pause_live_updates') as mock_pause, \
patch.object(formatter, 'resume_live_updates') as mock_resume:
result = executor._ask_human_input("Test result")
mock_pause.assert_called_once()
mock_resume.assert_called_once()
mock_input.assert_called_once()
assert result == ''
finally:
formatter._live_paused = original_paused_state
@patch('builtins.input', side_effect=['feedback', ''])
def test_multiple_human_input_rounds(self, mock_input):
"""Test multiple rounds of human input with Flow status management."""
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
executor = CrewAgentExecutorMixin()
executor.crew = MagicMock()
executor.crew._train = False
executor._printer = MagicMock()
formatter = event_listener.formatter
original_paused_state = formatter._live_paused
try:
pause_calls = []
resume_calls = []
def track_pause():
pause_calls.append(True)
def track_resume():
resume_calls.append(True)
with patch.object(formatter, 'pause_live_updates', side_effect=track_pause), \
patch.object(formatter, 'resume_live_updates', side_effect=track_resume):
result1 = executor._ask_human_input("Test result 1")
assert result1 == 'feedback'
result2 = executor._ask_human_input("Test result 2")
assert result2 == ''
assert len(pause_calls) == 2
assert len(resume_calls) == 2
finally:
formatter._live_paused = original_paused_state
def test_pause_resume_with_no_live_session(self):
"""Test pause/resume methods handle case when no Live session exists."""
formatter = event_listener.formatter
original_live = formatter._live
original_paused_state = formatter._live_paused
try:
formatter._live = None
formatter._live_paused = False
formatter.pause_live_updates()
formatter.resume_live_updates()
assert not formatter._live_paused
finally:
formatter._live = original_live
formatter._live_paused = original_paused_state
def test_pause_resume_exception_handling(self):
"""Test that resume is called even if exception occurs during human input."""
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
executor = CrewAgentExecutorMixin()
executor.crew = MagicMock()
executor.crew._train = False
executor._printer = MagicMock()
formatter = event_listener.formatter
original_paused_state = formatter._live_paused
try:
with patch.object(formatter, 'pause_live_updates') as mock_pause, \
patch.object(formatter, 'resume_live_updates') as mock_resume, \
patch('builtins.input', side_effect=KeyboardInterrupt("Test exception")):
with pytest.raises(KeyboardInterrupt):
executor._ask_human_input("Test result")
mock_pause.assert_called_once()
mock_resume.assert_called_once()
finally:
formatter._live_paused = original_paused_state
def test_training_mode_human_input(self):
"""Test human input in training mode."""
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
executor = CrewAgentExecutorMixin()
executor.crew = MagicMock()
executor.crew._train = True
executor._printer = MagicMock()
formatter = event_listener.formatter
original_paused_state = formatter._live_paused
try:
with patch.object(formatter, 'pause_live_updates') as mock_pause, \
patch.object(formatter, 'resume_live_updates') as mock_resume, \
patch('builtins.input', return_value='training feedback'):
result = executor._ask_human_input("Test result")
mock_pause.assert_called_once()
mock_resume.assert_called_once()
assert result == 'training feedback'
executor._printer.print.assert_called()
call_args = [call[1]['content'] for call in executor._printer.print.call_args_list]
training_prompt_found = any('TRAINING MODE' in content for content in call_args)
assert training_prompt_found
finally:
formatter._live_paused = original_paused_state

View File

@@ -0,0 +1,116 @@
from unittest.mock import MagicMock, patch
from rich.tree import Tree
from rich.live import Live
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
class TestConsoleFormatterPauseResume:
"""Test ConsoleFormatter pause/resume functionality."""
def test_pause_live_updates_with_active_session(self):
"""Test pausing when Live session is active."""
formatter = ConsoleFormatter()
mock_live = MagicMock(spec=Live)
formatter._live = mock_live
formatter._live_paused = False
formatter.pause_live_updates()
mock_live.stop.assert_called_once()
assert formatter._live_paused
def test_pause_live_updates_when_already_paused(self):
"""Test pausing when already paused does nothing."""
formatter = ConsoleFormatter()
mock_live = MagicMock(spec=Live)
formatter._live = mock_live
formatter._live_paused = True
formatter.pause_live_updates()
mock_live.stop.assert_not_called()
assert formatter._live_paused
def test_pause_live_updates_with_no_session(self):
"""Test pausing when no Live session exists."""
formatter = ConsoleFormatter()
formatter._live = None
formatter._live_paused = False
formatter.pause_live_updates()
assert formatter._live_paused
def test_resume_live_updates_when_paused(self):
"""Test resuming when paused."""
formatter = ConsoleFormatter()
formatter._live_paused = True
formatter.resume_live_updates()
assert not formatter._live_paused
def test_resume_live_updates_when_not_paused(self):
"""Test resuming when not paused does nothing."""
formatter = ConsoleFormatter()
formatter._live_paused = False
formatter.resume_live_updates()
assert not formatter._live_paused
def test_print_after_resume_restarts_live_session(self):
"""Test that printing a Tree after resume creates new Live session."""
formatter = ConsoleFormatter()
formatter._live_paused = True
formatter._live = None
formatter.resume_live_updates()
assert not formatter._live_paused
tree = Tree("Test")
with patch('crewai.utilities.events.utils.console_formatter.Live') as mock_live_class:
mock_live_instance = MagicMock()
mock_live_class.return_value = mock_live_instance
formatter.print(tree)
mock_live_class.assert_called_once()
mock_live_instance.start.assert_called_once()
assert formatter._live == mock_live_instance
def test_multiple_pause_resume_cycles(self):
"""Test multiple pause/resume cycles work correctly."""
formatter = ConsoleFormatter()
mock_live = MagicMock(spec=Live)
formatter._live = mock_live
formatter._live_paused = False
formatter.pause_live_updates()
assert formatter._live_paused
mock_live.stop.assert_called_once()
assert formatter._live is None # Live session should be cleared
formatter.resume_live_updates()
assert not formatter._live_paused
formatter.pause_live_updates()
assert formatter._live_paused
formatter.resume_live_updates()
assert not formatter._live_paused
def test_pause_resume_state_initialization(self):
"""Test that _live_paused is properly initialized."""
formatter = ConsoleFormatter()
assert hasattr(formatter, '_live_paused')
assert not formatter._live_paused