From 06c991d8c32261c397ad718df7c4e9979c7d9ef9 Mon Sep 17 00:00:00 2001 From: "devin-ai-integration[bot]" <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 10 Jun 2025 17:38:40 -0700 Subject: [PATCH 1/2] Fix telemetry singleton pattern to respect dynamic environment variables (#2946) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix telemetry singleton pattern to respect dynamic environment variables - Modified Telemetry.__init__ to prevent re-initialization with _initialized flag - Updated _safe_telemetry_operation to check _is_telemetry_disabled() dynamically - Added comprehensive tests for environment variables set after singleton creation - Fixed singleton contamination in existing tests by adding proper reset - Resolves issue #2945 where CREWAI_DISABLE_TELEMETRY=true was ignored when set after import Co-Authored-By: João * Implement code review improvements - Move _initialized flag to __new__ method for better encapsulation - Add type hints to _safe_telemetry_operation method - Consolidate telemetry execution checks into _should_execute_telemetry helper - Add pytest fixtures to reduce test setup redundancy - Enhanced documentation for singleton behavior Co-Authored-By: João * Fix mypy type-checker errors - Add explicit bool type annotation to _initialized field - Fix return value in task_started method to not return _safe_telemetry_operation result - Simplify initialization logic to set _initialized once in __init__ Co-Authored-By: João --------- Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: João Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com> --- src/crewai/telemetry/telemetry.py | 22 ++++++-- tests/telemetry/test_telemetry.py | 8 +++ tests/telemetry/test_telemetry_disable.py | 66 ++++++++++++++++++++++- 3 files changed, 90 insertions(+), 6 deletions(-) diff --git a/src/crewai/telemetry/telemetry.py b/src/crewai/telemetry/telemetry.py index ffd78d28e..f75809a02 100644 --- a/src/crewai/telemetry/telemetry.py +++ b/src/crewai/telemetry/telemetry.py @@ -8,7 +8,7 @@ import platform import warnings from contextlib import contextmanager from importlib.metadata import version -from typing import TYPE_CHECKING, Any, Optional +from typing import TYPE_CHECKING, Any, Callable, Optional import threading from opentelemetry import trace @@ -73,11 +73,16 @@ class Telemetry: with cls._lock: if cls._instance is None: cls._instance = super(Telemetry, cls).__new__(cls) + cls._instance._initialized = False return cls._instance def __init__(self) -> None: + if hasattr(self, '_initialized') and self._initialized: + return + self.ready: bool = False self.trace_set: bool = False + self._initialized: bool = True if self._is_telemetry_disabled(): return @@ -113,6 +118,10 @@ class Telemetry: or os.getenv("CREWAI_DISABLE_TELEMETRY", "false").lower() == "true" ) + def _should_execute_telemetry(self) -> bool: + """Check if telemetry operations should be executed.""" + return self.ready and not self._is_telemetry_disabled() + def set_tracer(self): if self.ready and not self.trace_set: try: @@ -123,8 +132,9 @@ class Telemetry: self.ready = False self.trace_set = False - def _safe_telemetry_operation(self, operation): - if not self.ready: + def _safe_telemetry_operation(self, operation: Callable[[], None]) -> None: + """Execute telemetry operation safely, checking both readiness and environment variables.""" + if not self._should_execute_telemetry(): return try: operation() @@ -423,7 +433,8 @@ class Telemetry: return span - return self._safe_telemetry_operation(operation) + self._safe_telemetry_operation(operation) + return None def task_ended(self, span: Span, task: Task, crew: Crew): """Records the completion of a task execution in a crew. @@ -773,7 +784,8 @@ class Telemetry: return span if crew.share_crew: - return self._safe_telemetry_operation(operation) + self._safe_telemetry_operation(operation) + return operation() return None def end_crew(self, crew, final_string_output): diff --git a/tests/telemetry/test_telemetry.py b/tests/telemetry/test_telemetry.py index 51c5a79f1..277578327 100644 --- a/tests/telemetry/test_telemetry.py +++ b/tests/telemetry/test_telemetry.py @@ -9,6 +9,14 @@ from crewai.telemetry import Telemetry from opentelemetry import trace +@pytest.fixture(autouse=True) +def cleanup_telemetry(): + """Automatically clean up Telemetry singleton between tests.""" + Telemetry._instance = None + yield + Telemetry._instance = None + + @pytest.mark.parametrize( "env_var,value,expected_ready", [ diff --git a/tests/telemetry/test_telemetry_disable.py b/tests/telemetry/test_telemetry_disable.py index 16c02acaa..96738ad5f 100644 --- a/tests/telemetry/test_telemetry_disable.py +++ b/tests/telemetry/test_telemetry_disable.py @@ -1,11 +1,19 @@ import os -from unittest.mock import patch +from unittest.mock import patch, MagicMock import pytest from crewai.telemetry import Telemetry +@pytest.fixture(autouse=True) +def cleanup_telemetry(): + """Automatically clean up Telemetry singleton between tests.""" + Telemetry._instance = None + yield + Telemetry._instance = None + + @pytest.mark.parametrize("env_var,value,expected_ready", [ ("OTEL_SDK_DISABLED", "true", False), ("OTEL_SDK_DISABLED", "TRUE", False), @@ -28,3 +36,59 @@ def test_telemetry_enabled_by_default(): with patch("crewai.telemetry.telemetry.TracerProvider"): telemetry = Telemetry() assert telemetry.ready is True + + +def test_telemetry_disable_after_singleton_creation(): + """Test that telemetry operations are disabled when env var is set after singleton creation.""" + with patch.dict(os.environ, {}, clear=True): + with patch("crewai.telemetry.telemetry.TracerProvider"): + telemetry = Telemetry() + assert telemetry.ready is True + + mock_operation = MagicMock() + telemetry._safe_telemetry_operation(mock_operation) + mock_operation.assert_called_once() + + mock_operation.reset_mock() + + os.environ['CREWAI_DISABLE_TELEMETRY'] = 'true' + + telemetry._safe_telemetry_operation(mock_operation) + mock_operation.assert_not_called() + + +def test_telemetry_disable_with_multiple_instances(): + """Test that multiple telemetry instances respect dynamically changed env vars.""" + with patch.dict(os.environ, {}, clear=True): + with patch("crewai.telemetry.telemetry.TracerProvider"): + telemetry1 = Telemetry() + assert telemetry1.ready is True + + os.environ['CREWAI_DISABLE_TELEMETRY'] = 'true' + + telemetry2 = Telemetry() + assert telemetry2 is telemetry1 + assert telemetry2.ready is True + + mock_operation = MagicMock() + telemetry2._safe_telemetry_operation(mock_operation) + mock_operation.assert_not_called() + + +def test_telemetry_otel_sdk_disabled_after_creation(): + """Test that OTEL_SDK_DISABLED also works when set after singleton creation.""" + with patch.dict(os.environ, {}, clear=True): + with patch("crewai.telemetry.telemetry.TracerProvider"): + telemetry = Telemetry() + assert telemetry.ready is True + + mock_operation = MagicMock() + telemetry._safe_telemetry_operation(mock_operation) + mock_operation.assert_called_once() + + mock_operation.reset_mock() + + os.environ['OTEL_SDK_DISABLED'] = 'true' + + telemetry._safe_telemetry_operation(mock_operation) + mock_operation.assert_not_called() From 970a63c13c7ac94b5e016e529cf80693f9b53b05 Mon Sep 17 00:00:00 2001 From: "devin-ai-integration[bot]" <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 11 Jun 2025 12:08:00 -0400 Subject: [PATCH 2/2] Fix issue 2993: Prevent Flow status logs from hiding human input (#2994) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix issue 2993: Prevent Flow status logs from hiding human input - Add pause_live_updates() and resume_live_updates() methods to ConsoleFormatter - Modify _ask_human_input() to pause Flow status updates during human input - Add comprehensive tests for pause/resume functionality and integration - Ensure Live session is properly managed during human input prompts - Fix prevents Flow status logs from overwriting user input prompts Fixes #2993 Co-Authored-By: João * Fix lint: Remove unused pytest import - Remove unused pytest import from test_console_formatter_pause_resume.py - Fixes F401 lint error identified in CI Co-Authored-By: João --------- Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: João --- .../base_agent_executor_mixin.py | 62 ++++--- .../events/utils/console_formatter.py | 14 ++ tests/test_flow_human_input_integration.py | 167 ++++++++++++++++++ .../test_console_formatter_pause_resume.py | 116 ++++++++++++ 4 files changed, 331 insertions(+), 28 deletions(-) create mode 100644 tests/test_flow_human_input_integration.py create mode 100644 tests/utilities/test_console_formatter_pause_resume.py diff --git a/src/crewai/agents/agent_builder/base_agent_executor_mixin.py b/src/crewai/agents/agent_builder/base_agent_executor_mixin.py index c46c46844..bd126799a 100644 --- a/src/crewai/agents/agent_builder/base_agent_executor_mixin.py +++ b/src/crewai/agents/agent_builder/base_agent_executor_mixin.py @@ -7,6 +7,7 @@ from crewai.utilities import I18N from crewai.utilities.converter import ConverterError from crewai.utilities.evaluators.task_evaluator import TaskEvaluator from crewai.utilities.printer import Printer +from crewai.utilities.events.event_listener import event_listener if TYPE_CHECKING: from crewai.agents.agent_builder.base_agent import BaseAgent @@ -125,33 +126,38 @@ class CrewAgentExecutorMixin: def _ask_human_input(self, final_answer: str) -> str: """Prompt human input with mode-appropriate messaging.""" - self._printer.print( - content=f"\033[1m\033[95m ## Final Result:\033[00m \033[92m{final_answer}\033[00m" - ) - - # Training mode prompt (single iteration) - if self.crew and getattr(self.crew, "_train", False): - prompt = ( - "\n\n=====\n" - "## TRAINING MODE: Provide feedback to improve the agent's performance.\n" - "This will be used to train better versions of the agent.\n" - "Please provide detailed feedback about the result quality and reasoning process.\n" - "=====\n" - ) - # Regular human-in-the-loop prompt (multiple iterations) - else: - prompt = ( - "\n\n=====\n" - "## HUMAN FEEDBACK: Provide feedback on the Final Result and Agent's actions.\n" - "Please follow these guidelines:\n" - " - If you are happy with the result, simply hit Enter without typing anything.\n" - " - Otherwise, provide specific improvement requests.\n" - " - You can provide multiple rounds of feedback until satisfied.\n" - "=====\n" + event_listener.formatter.pause_live_updates() + + try: + self._printer.print( + content=f"\033[1m\033[95m ## Final Result:\033[00m \033[92m{final_answer}\033[00m" ) - self._printer.print(content=prompt, color="bold_yellow") - response = input() - if response.strip() != "": - self._printer.print(content="\nProcessing your feedback...", color="cyan") - return response + # Training mode prompt (single iteration) + if self.crew and getattr(self.crew, "_train", False): + prompt = ( + "\n\n=====\n" + "## TRAINING MODE: Provide feedback to improve the agent's performance.\n" + "This will be used to train better versions of the agent.\n" + "Please provide detailed feedback about the result quality and reasoning process.\n" + "=====\n" + ) + # Regular human-in-the-loop prompt (multiple iterations) + else: + prompt = ( + "\n\n=====\n" + "## HUMAN FEEDBACK: Provide feedback on the Final Result and Agent's actions.\n" + "Please follow these guidelines:\n" + " - If you are happy with the result, simply hit Enter without typing anything.\n" + " - Otherwise, provide specific improvement requests.\n" + " - You can provide multiple rounds of feedback until satisfied.\n" + "=====\n" + ) + + self._printer.print(content=prompt, color="bold_yellow") + response = input() + if response.strip() != "": + self._printer.print(content="\nProcessing your feedback...", color="cyan") + return response + finally: + event_listener.formatter.resume_live_updates() diff --git a/src/crewai/utilities/events/utils/console_formatter.py b/src/crewai/utilities/events/utils/console_formatter.py index bcf12caaf..721bea6d8 100644 --- a/src/crewai/utilities/events/utils/console_formatter.py +++ b/src/crewai/utilities/events/utils/console_formatter.py @@ -17,6 +17,7 @@ class ConsoleFormatter: current_lite_agent_branch: Optional[Tree] = None tool_usage_counts: Dict[str, int] = {} current_reasoning_branch: Optional[Tree] = None # Track reasoning status + _live_paused: bool = False current_llm_tool_tree: Optional[Tree] = None def __init__(self, verbose: bool = False): @@ -119,6 +120,19 @@ class ConsoleFormatter: # Finally, pass through to the regular Console.print implementation self.console.print(*args, **kwargs) + def pause_live_updates(self) -> None: + """Pause Live session updates to allow for human input without interference.""" + if not self._live_paused: + if self._live: + self._live.stop() + self._live = None + self._live_paused = True + + def resume_live_updates(self) -> None: + """Resume Live session updates after human input is complete.""" + if self._live_paused: + self._live_paused = False + def print_panel( self, content: Text, title: str, style: str = "blue", is_flow: bool = False ) -> None: diff --git a/tests/test_flow_human_input_integration.py b/tests/test_flow_human_input_integration.py new file mode 100644 index 000000000..847a7d779 --- /dev/null +++ b/tests/test_flow_human_input_integration.py @@ -0,0 +1,167 @@ +import pytest +from unittest.mock import patch, MagicMock +from crewai.utilities.events.event_listener import event_listener + + +class TestFlowHumanInputIntegration: + """Test integration between Flow execution and human input functionality.""" + + def test_console_formatter_pause_resume_methods(self): + """Test that ConsoleFormatter pause/resume methods work correctly.""" + formatter = event_listener.formatter + + original_paused_state = formatter._live_paused + + try: + formatter._live_paused = False + + formatter.pause_live_updates() + assert formatter._live_paused + + formatter.resume_live_updates() + assert not formatter._live_paused + finally: + formatter._live_paused = original_paused_state + + @patch('builtins.input', return_value='') + def test_human_input_pauses_flow_updates(self, mock_input): + """Test that human input pauses Flow status updates.""" + from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin + + executor = CrewAgentExecutorMixin() + executor.crew = MagicMock() + executor.crew._train = False + executor._printer = MagicMock() + + formatter = event_listener.formatter + + original_paused_state = formatter._live_paused + + try: + formatter._live_paused = False + + with patch.object(formatter, 'pause_live_updates') as mock_pause, \ + patch.object(formatter, 'resume_live_updates') as mock_resume: + + result = executor._ask_human_input("Test result") + + mock_pause.assert_called_once() + mock_resume.assert_called_once() + mock_input.assert_called_once() + assert result == '' + finally: + formatter._live_paused = original_paused_state + + @patch('builtins.input', side_effect=['feedback', '']) + def test_multiple_human_input_rounds(self, mock_input): + """Test multiple rounds of human input with Flow status management.""" + from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin + + executor = CrewAgentExecutorMixin() + executor.crew = MagicMock() + executor.crew._train = False + executor._printer = MagicMock() + + formatter = event_listener.formatter + + original_paused_state = formatter._live_paused + + try: + pause_calls = [] + resume_calls = [] + + def track_pause(): + pause_calls.append(True) + + def track_resume(): + resume_calls.append(True) + + with patch.object(formatter, 'pause_live_updates', side_effect=track_pause), \ + patch.object(formatter, 'resume_live_updates', side_effect=track_resume): + + result1 = executor._ask_human_input("Test result 1") + assert result1 == 'feedback' + + result2 = executor._ask_human_input("Test result 2") + assert result2 == '' + + assert len(pause_calls) == 2 + assert len(resume_calls) == 2 + finally: + formatter._live_paused = original_paused_state + + def test_pause_resume_with_no_live_session(self): + """Test pause/resume methods handle case when no Live session exists.""" + formatter = event_listener.formatter + + original_live = formatter._live + original_paused_state = formatter._live_paused + + try: + formatter._live = None + formatter._live_paused = False + + formatter.pause_live_updates() + formatter.resume_live_updates() + + assert not formatter._live_paused + finally: + formatter._live = original_live + formatter._live_paused = original_paused_state + + def test_pause_resume_exception_handling(self): + """Test that resume is called even if exception occurs during human input.""" + from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin + + executor = CrewAgentExecutorMixin() + executor.crew = MagicMock() + executor.crew._train = False + executor._printer = MagicMock() + + formatter = event_listener.formatter + + original_paused_state = formatter._live_paused + + try: + with patch.object(formatter, 'pause_live_updates') as mock_pause, \ + patch.object(formatter, 'resume_live_updates') as mock_resume, \ + patch('builtins.input', side_effect=KeyboardInterrupt("Test exception")): + + with pytest.raises(KeyboardInterrupt): + executor._ask_human_input("Test result") + + mock_pause.assert_called_once() + mock_resume.assert_called_once() + finally: + formatter._live_paused = original_paused_state + + def test_training_mode_human_input(self): + """Test human input in training mode.""" + from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin + + executor = CrewAgentExecutorMixin() + executor.crew = MagicMock() + executor.crew._train = True + executor._printer = MagicMock() + + formatter = event_listener.formatter + + original_paused_state = formatter._live_paused + + try: + with patch.object(formatter, 'pause_live_updates') as mock_pause, \ + patch.object(formatter, 'resume_live_updates') as mock_resume, \ + patch('builtins.input', return_value='training feedback'): + + result = executor._ask_human_input("Test result") + + mock_pause.assert_called_once() + mock_resume.assert_called_once() + assert result == 'training feedback' + + executor._printer.print.assert_called() + call_args = [call[1]['content'] for call in executor._printer.print.call_args_list] + training_prompt_found = any('TRAINING MODE' in content for content in call_args) + assert training_prompt_found + finally: + formatter._live_paused = original_paused_state diff --git a/tests/utilities/test_console_formatter_pause_resume.py b/tests/utilities/test_console_formatter_pause_resume.py new file mode 100644 index 000000000..e150671ac --- /dev/null +++ b/tests/utilities/test_console_formatter_pause_resume.py @@ -0,0 +1,116 @@ +from unittest.mock import MagicMock, patch +from rich.tree import Tree +from rich.live import Live +from crewai.utilities.events.utils.console_formatter import ConsoleFormatter + + +class TestConsoleFormatterPauseResume: + """Test ConsoleFormatter pause/resume functionality.""" + + def test_pause_live_updates_with_active_session(self): + """Test pausing when Live session is active.""" + formatter = ConsoleFormatter() + + mock_live = MagicMock(spec=Live) + formatter._live = mock_live + formatter._live_paused = False + + formatter.pause_live_updates() + + mock_live.stop.assert_called_once() + assert formatter._live_paused + + def test_pause_live_updates_when_already_paused(self): + """Test pausing when already paused does nothing.""" + formatter = ConsoleFormatter() + + mock_live = MagicMock(spec=Live) + formatter._live = mock_live + formatter._live_paused = True + + formatter.pause_live_updates() + + mock_live.stop.assert_not_called() + assert formatter._live_paused + + def test_pause_live_updates_with_no_session(self): + """Test pausing when no Live session exists.""" + formatter = ConsoleFormatter() + + formatter._live = None + formatter._live_paused = False + + formatter.pause_live_updates() + + assert formatter._live_paused + + def test_resume_live_updates_when_paused(self): + """Test resuming when paused.""" + formatter = ConsoleFormatter() + + formatter._live_paused = True + + formatter.resume_live_updates() + + assert not formatter._live_paused + + def test_resume_live_updates_when_not_paused(self): + """Test resuming when not paused does nothing.""" + formatter = ConsoleFormatter() + + formatter._live_paused = False + + formatter.resume_live_updates() + + assert not formatter._live_paused + + def test_print_after_resume_restarts_live_session(self): + """Test that printing a Tree after resume creates new Live session.""" + formatter = ConsoleFormatter() + + formatter._live_paused = True + formatter._live = None + + formatter.resume_live_updates() + assert not formatter._live_paused + + tree = Tree("Test") + + with patch('crewai.utilities.events.utils.console_formatter.Live') as mock_live_class: + mock_live_instance = MagicMock() + mock_live_class.return_value = mock_live_instance + + formatter.print(tree) + + mock_live_class.assert_called_once() + mock_live_instance.start.assert_called_once() + assert formatter._live == mock_live_instance + + def test_multiple_pause_resume_cycles(self): + """Test multiple pause/resume cycles work correctly.""" + formatter = ConsoleFormatter() + + mock_live = MagicMock(spec=Live) + formatter._live = mock_live + formatter._live_paused = False + + formatter.pause_live_updates() + assert formatter._live_paused + mock_live.stop.assert_called_once() + assert formatter._live is None # Live session should be cleared + + formatter.resume_live_updates() + assert not formatter._live_paused + + formatter.pause_live_updates() + assert formatter._live_paused + + formatter.resume_live_updates() + assert not formatter._live_paused + + def test_pause_resume_state_initialization(self): + """Test that _live_paused is properly initialized.""" + formatter = ConsoleFormatter() + + assert hasattr(formatter, '_live_paused') + assert not formatter._live_paused