Compare commits

..

10 Commits

Author SHA1 Message Date
Devin AI
0938968a20 Fix Python 3.11 validation test failures: Remove global logger state and improve error handling
- Remove global _litellm_logger variable that could interfere with exception propagation
- Add proper error handling around logger.setLevel() operations
- Update tests to reflect removal of global caching
- Ensure context manager is fully isolated and doesn't affect validation methods

Co-Authored-By: João <joao@crewai.com>
2025-06-12 00:37:20 +00:00
Devin AI
8538646f47 Fix Python 3.11 CI failures: Add module exports for test compatibility
- Add 'from crewai import agent' and 'from crewai import knowledge' imports
- Add 'agent' and 'knowledge' to __all__ list
- Enables test mocking of crewai.knowledge and crewai.agent modules
- Fixes AttributeError failures in agent_test.py and test_lite_agent.py
- All previously failing tests now pass locally on Python 3.12

Co-Authored-By: João <joao@crewai.com>
2025-06-12 00:19:18 +00:00
Devin AI
7e490f73df Fix logger caching test: Reset global state for consistent testing
Co-Authored-By: João <joao@crewai.com>
2025-06-12 00:08:23 +00:00
Devin AI
d1a0a4e258 Fix Python 3.10 CI failures: Add CLI/utilities exports, improve context manager exception handling
Co-Authored-By: João <joao@crewai.com>
2025-06-12 00:01:27 +00:00
Devin AI
bab03b2be1 Address review feedback: Add logger caching, improve error handling, expand tests
- Cache litellm logger instance globally for performance optimization
- Implement more specific warning pattern filtering instead of broad matching
- Add robust error handling with graceful degradation in suppression
- Enhance streaming error handling with better logging and continue logic
- Add 3 new comprehensive tests:
  - test_concurrent_llm_calls: Verify thread safety with concurrent LLM calls
  - test_logger_caching_performance: Confirm logger instance caching works
  - test_suppression_error_handling: Test graceful degradation on logger errors
- Fix all lint errors (unused imports) in test file

Co-Authored-By: João <joao@crewai.com>
2025-06-11 23:46:58 +00:00
Devin AI
5fcfba82dc Fix lint errors: Remove unused imports
- Remove unused contextlib, sys, threading, io, TextIO imports from llm.py
- Remove unused importlib import from test_sys_stream_hijacking.py
- Address Ruff lint failures from CI

Co-Authored-By: João <joao@crewai.com>
2025-06-11 23:38:37 +00:00
Devin AI
519af74cf7 Fix issue #3000: Replace global sys.stdout/stderr hijacking with contextual suppression
- Remove FilteredStream class that globally hijacked sys.stdout and sys.stderr
- Replace with logging-based suppression using litellm logger level control
- Add contextual suppression around litellm.completion calls only
- Add comprehensive tests to verify fix and prevent regression
- Ensure streaming responses work correctly without interference
- Maintain litellm output filtering during LLM calls only

Co-Authored-By: João <joao@crewai.com>
2025-06-11 23:34:20 +00:00
devin-ai-integration[bot]
970a63c13c Fix issue 2993: Prevent Flow status logs from hiding human input (#2994)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
* Fix issue 2993: Prevent Flow status logs from hiding human input

- Add pause_live_updates() and resume_live_updates() methods to ConsoleFormatter
- Modify _ask_human_input() to pause Flow status updates during human input
- Add comprehensive tests for pause/resume functionality and integration
- Ensure Live session is properly managed during human input prompts
- Fix prevents Flow status logs from overwriting user input prompts

Fixes #2993

Co-Authored-By: João <joao@crewai.com>

* Fix lint: Remove unused pytest import

- Remove unused pytest import from test_console_formatter_pause_resume.py
- Fixes F401 lint error identified in CI

Co-Authored-By: João <joao@crewai.com>

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: João <joao@crewai.com>
2025-06-11 12:08:00 -04:00
devin-ai-integration[bot]
06c991d8c3 Fix telemetry singleton pattern to respect dynamic environment variables (#2946)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
* Fix telemetry singleton pattern to respect dynamic environment variables

- Modified Telemetry.__init__ to prevent re-initialization with _initialized flag
- Updated _safe_telemetry_operation to check _is_telemetry_disabled() dynamically
- Added comprehensive tests for environment variables set after singleton creation
- Fixed singleton contamination in existing tests by adding proper reset
- Resolves issue #2945 where CREWAI_DISABLE_TELEMETRY=true was ignored when set after import

Co-Authored-By: João <joao@crewai.com>

* Implement code review improvements

- Move _initialized flag to __new__ method for better encapsulation
- Add type hints to _safe_telemetry_operation method
- Consolidate telemetry execution checks into _should_execute_telemetry helper
- Add pytest fixtures to reduce test setup redundancy
- Enhanced documentation for singleton behavior

Co-Authored-By: João <joao@crewai.com>

* Fix mypy type-checker errors

- Add explicit bool type annotation to _initialized field
- Fix return value in task_started method to not return _safe_telemetry_operation result
- Simplify initialization logic to set _initialized once in __init__

Co-Authored-By: João <joao@crewai.com>

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
Co-authored-by: João <joao@crewai.com>
Co-authored-by: Lorenze Jay <63378463+lorenzejay@users.noreply.github.com>
2025-06-10 17:38:40 -07:00
Lucas Gomide
739eb72fd0 LiteAgent w/ Guardrail (#2982)
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
* feat: add guardrail support for Agents when using direct kickoff calls

* refactor: expose guardrail func in a proper utils file

* fix: resolve Self import on python 3.10
2025-06-10 13:32:32 -04:00
10 changed files with 698 additions and 160 deletions

View File

@@ -1,16 +1,20 @@
import warnings
from crewai.agent import Agent
from crewai import agent
from crewai import cli
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.flow.flow import Flow
from crewai.knowledge.knowledge import Knowledge
from crewai import knowledge
from crewai.llm import LLM
from crewai.llms.base_llm import BaseLLM
from crewai.process import Process
from crewai.task import Task
from crewai.tasks.llm_guardrail import LLMGuardrail
from crewai.tasks.task_output import TaskOutput
from crewai import utilities
warnings.filterwarnings(
"ignore",
@@ -21,6 +25,8 @@ warnings.filterwarnings(
__version__ = "0.126.0"
__all__ = [
"Agent",
"agent",
"cli",
"Crew",
"CrewOutput",
"Process",
@@ -29,6 +35,8 @@ __all__ = [
"BaseLLM",
"Flow",
"Knowledge",
"knowledge",
"TaskOutput",
"LLMGuardrail",
"utilities",
]

View File

@@ -7,6 +7,7 @@ from crewai.utilities import I18N
from crewai.utilities.converter import ConverterError
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.printer import Printer
from crewai.utilities.events.event_listener import event_listener
if TYPE_CHECKING:
from crewai.agents.agent_builder.base_agent import BaseAgent
@@ -125,33 +126,38 @@ class CrewAgentExecutorMixin:
def _ask_human_input(self, final_answer: str) -> str:
"""Prompt human input with mode-appropriate messaging."""
self._printer.print(
content=f"\033[1m\033[95m ## Final Result:\033[00m \033[92m{final_answer}\033[00m"
)
# Training mode prompt (single iteration)
if self.crew and getattr(self.crew, "_train", False):
prompt = (
"\n\n=====\n"
"## TRAINING MODE: Provide feedback to improve the agent's performance.\n"
"This will be used to train better versions of the agent.\n"
"Please provide detailed feedback about the result quality and reasoning process.\n"
"=====\n"
)
# Regular human-in-the-loop prompt (multiple iterations)
else:
prompt = (
"\n\n=====\n"
"## HUMAN FEEDBACK: Provide feedback on the Final Result and Agent's actions.\n"
"Please follow these guidelines:\n"
" - If you are happy with the result, simply hit Enter without typing anything.\n"
" - Otherwise, provide specific improvement requests.\n"
" - You can provide multiple rounds of feedback until satisfied.\n"
"=====\n"
event_listener.formatter.pause_live_updates()
try:
self._printer.print(
content=f"\033[1m\033[95m ## Final Result:\033[00m \033[92m{final_answer}\033[00m"
)
self._printer.print(content=prompt, color="bold_yellow")
response = input()
if response.strip() != "":
self._printer.print(content="\nProcessing your feedback...", color="cyan")
return response
# Training mode prompt (single iteration)
if self.crew and getattr(self.crew, "_train", False):
prompt = (
"\n\n=====\n"
"## TRAINING MODE: Provide feedback to improve the agent's performance.\n"
"This will be used to train better versions of the agent.\n"
"Please provide detailed feedback about the result quality and reasoning process.\n"
"=====\n"
)
# Regular human-in-the-loop prompt (multiple iterations)
else:
prompt = (
"\n\n=====\n"
"## HUMAN FEEDBACK: Provide feedback on the Final Result and Agent's actions.\n"
"Please follow these guidelines:\n"
" - If you are happy with the result, simply hit Enter without typing anything.\n"
" - Otherwise, provide specific improvement requests.\n"
" - You can provide multiple rounds of feedback until satisfied.\n"
"=====\n"
)
self._printer.print(content=prompt, color="bold_yellow")
response = input()
if response.strip() != "":
self._printer.print(content="\nProcessing your feedback...", color="cyan")
return response
finally:
event_listener.formatter.resume_live_updates()

View File

@@ -1,8 +1,6 @@
import json
import logging
import os
import sys
import threading
import warnings
from collections import defaultdict
from contextlib import contextmanager
@@ -48,8 +46,7 @@ with warnings.catch_warnings():
from litellm.utils import supports_response_schema
import io
from typing import TextIO
from crewai.llms.base_llm import BaseLLM
from crewai.utilities.events import crewai_event_bus
@@ -60,69 +57,7 @@ from crewai.utilities.exceptions.context_window_exceeding_exception import (
load_dotenv()
class FilteredStream(io.TextIOBase):
_lock = None
def __init__(self, original_stream: TextIO):
self._original_stream = original_stream
self._lock = threading.Lock()
def write(self, s: str) -> int:
if not self._lock:
self._lock = threading.Lock()
with self._lock:
lower_s = s.lower()
# Skip common noisy LiteLLM banners and any other lines that contain "litellm"
if (
"give feedback / get help" in lower_s
or "litellm.info:" in lower_s
or "litellm" in lower_s
or "Consider using a smaller input or implementing a text splitting strategy" in lower_s
):
return 0
return self._original_stream.write(s)
def flush(self):
with self._lock:
return self._original_stream.flush()
def __getattr__(self, name):
"""Delegate attribute access to the wrapped original stream.
This ensures compatibility with libraries (e.g., Rich) that rely on
attributes such as `encoding`, `isatty`, `buffer`, etc., which may not
be explicitly defined on this proxy class.
"""
return getattr(self._original_stream, name)
# Delegate common properties/methods explicitly so they aren't shadowed by
# the TextIOBase defaults (e.g., .encoding returns None by default, which
# confuses Rich). These explicit pass-throughs ensure the wrapped Console
# still sees a fully-featured stream.
@property
def encoding(self):
return getattr(self._original_stream, "encoding", "utf-8")
def isatty(self):
return self._original_stream.isatty()
def fileno(self):
return self._original_stream.fileno()
def writable(self):
return True
# Apply the filtered stream globally so that any subsequent writes containing the filtered
# keywords (e.g., "litellm") are hidden from terminal output. We guard against double
# wrapping to ensure idempotency in environments where this module might be reloaded.
if not isinstance(sys.stdout, FilteredStream):
sys.stdout = FilteredStream(sys.stdout)
if not isinstance(sys.stderr, FilteredStream):
sys.stderr = FilteredStream(sys.stderr)
LLM_CONTEXT_WINDOW_SIZES = {
@@ -266,6 +201,40 @@ def suppress_warnings():
yield
@contextmanager
def suppress_litellm_output():
"""Contextually suppress litellm-related logging output during LLM calls."""
litellm_logger = logging.getLogger("litellm")
original_level = litellm_logger.level
warning_patterns = [
".*give feedback.*",
".*Consider using a smaller input.*",
".*litellm\\.info:.*",
".*text splitting strategy.*"
]
try:
with warnings.catch_warnings():
for pattern in warning_patterns:
warnings.filterwarnings("ignore", message=pattern)
try:
litellm_logger.setLevel(logging.WARNING)
except Exception as e:
logging.debug(f"Error setting logger level: {e}")
yield
except Exception as e:
logging.debug(f"Error in litellm output suppression: {e}")
raise
finally:
try:
litellm_logger.setLevel(original_level)
except Exception as e:
logging.debug(f"Error restoring logger level: {e}")
class Delta(TypedDict):
content: Optional[str]
role: Optional[str]
@@ -450,60 +419,61 @@ class LLM(BaseLLM):
try:
# --- 3) Process each chunk in the stream
for chunk in litellm.completion(**params):
chunk_count += 1
last_chunk = chunk
with suppress_litellm_output():
for chunk in litellm.completion(**params):
chunk_count += 1
last_chunk = chunk
# Extract content from the chunk
chunk_content = None
# Extract content from the chunk
chunk_content = None
# Safely extract content from various chunk formats
try:
# Try to access choices safely
choices = None
if isinstance(chunk, dict) and "choices" in chunk:
choices = chunk["choices"]
elif hasattr(chunk, "choices"):
# Check if choices is not a type but an actual attribute with value
if not isinstance(getattr(chunk, "choices"), type):
choices = getattr(chunk, "choices")
# Safely extract content from various chunk formats
try:
# Try to access choices safely
choices = None
if isinstance(chunk, dict) and "choices" in chunk:
choices = chunk["choices"]
elif hasattr(chunk, "choices"):
# Check if choices is not a type but an actual attribute with value
if not isinstance(getattr(chunk, "choices"), type):
choices = getattr(chunk, "choices")
# Try to extract usage information if available
if isinstance(chunk, dict) and "usage" in chunk:
usage_info = chunk["usage"]
elif hasattr(chunk, "usage"):
# Check if usage is not a type but an actual attribute with value
if not isinstance(getattr(chunk, "usage"), type):
usage_info = getattr(chunk, "usage")
# Try to extract usage information if available
if isinstance(chunk, dict) and "usage" in chunk:
usage_info = chunk["usage"]
elif hasattr(chunk, "usage"):
# Check if usage is not a type but an actual attribute with value
if not isinstance(getattr(chunk, "usage"), type):
usage_info = getattr(chunk, "usage")
if choices and len(choices) > 0:
choice = choices[0]
if choices and len(choices) > 0:
choice = choices[0]
# Handle different delta formats
delta = None
if isinstance(choice, dict) and "delta" in choice:
delta = choice["delta"]
elif hasattr(choice, "delta"):
delta = getattr(choice, "delta")
# Handle different delta formats
delta = None
if isinstance(choice, dict) and "delta" in choice:
delta = choice["delta"]
elif hasattr(choice, "delta"):
delta = getattr(choice, "delta")
# Extract content from delta
if delta:
# Handle dict format
if isinstance(delta, dict):
if "content" in delta and delta["content"] is not None:
chunk_content = delta["content"]
# Handle object format
elif hasattr(delta, "content"):
chunk_content = getattr(delta, "content")
# Extract content from delta
if delta:
# Handle dict format
if isinstance(delta, dict):
if "content" in delta and delta["content"] is not None:
chunk_content = delta["content"]
# Handle object format
elif hasattr(delta, "content"):
chunk_content = getattr(delta, "content")
# Handle case where content might be None or empty
if chunk_content is None and isinstance(delta, dict):
# Some models might send empty content chunks
chunk_content = ""
# Handle case where content might be None or empty
if chunk_content is None and isinstance(delta, dict):
# Some models might send empty content chunks
chunk_content = ""
# Enable tool calls using streaming
if "tool_calls" in delta:
tool_calls = delta["tool_calls"]
# Enable tool calls using streaming
if "tool_calls" in delta:
tool_calls = delta["tool_calls"]
if tool_calls:
result = self._handle_streaming_tool_calls(
@@ -514,21 +484,22 @@ class LLM(BaseLLM):
if result is not None:
chunk_content = result
except Exception as e:
logging.debug(f"Error extracting content from chunk: {e}")
logging.debug(f"Chunk format: {type(chunk)}, content: {chunk}")
except Exception as e:
logging.error(f"Error extracting content from chunk: {e}", exc_info=True)
logging.debug(f"Chunk format: {type(chunk)}, content: {chunk}")
continue
# Only add non-None content to the response
if chunk_content is not None:
# Add the chunk content to the full response
full_response += chunk_content
# Only add non-None content to the response
if chunk_content is not None:
# Add the chunk content to the full response
full_response += chunk_content
# Emit the chunk event
assert hasattr(crewai_event_bus, "emit")
crewai_event_bus.emit(
self,
event=LLMStreamChunkEvent(chunk=chunk_content),
)
# Emit the chunk event
assert hasattr(crewai_event_bus, "emit")
crewai_event_bus.emit(
self,
event=LLMStreamChunkEvent(chunk=chunk_content),
)
# --- 4) Fallback to non-streaming if no content received
if not full_response.strip() and chunk_count == 0:
logging.warning(
@@ -765,7 +736,8 @@ class LLM(BaseLLM):
# and convert them to our own exception type for consistent handling
# across the codebase. This allows CrewAgentExecutor to handle context
# length issues appropriately.
response = litellm.completion(**params)
with suppress_litellm_output():
response = litellm.completion(**params)
except ContextWindowExceededError as e:
# Convert litellm's context window error to our own exception type
# for consistent handling in the rest of the codebase

View File

@@ -8,7 +8,7 @@ import platform
import warnings
from contextlib import contextmanager
from importlib.metadata import version
from typing import TYPE_CHECKING, Any, Optional
from typing import TYPE_CHECKING, Any, Callable, Optional
import threading
from opentelemetry import trace
@@ -73,11 +73,16 @@ class Telemetry:
with cls._lock:
if cls._instance is None:
cls._instance = super(Telemetry, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self) -> None:
if hasattr(self, '_initialized') and self._initialized:
return
self.ready: bool = False
self.trace_set: bool = False
self._initialized: bool = True
if self._is_telemetry_disabled():
return
@@ -113,6 +118,10 @@ class Telemetry:
or os.getenv("CREWAI_DISABLE_TELEMETRY", "false").lower() == "true"
)
def _should_execute_telemetry(self) -> bool:
"""Check if telemetry operations should be executed."""
return self.ready and not self._is_telemetry_disabled()
def set_tracer(self):
if self.ready and not self.trace_set:
try:
@@ -123,8 +132,9 @@ class Telemetry:
self.ready = False
self.trace_set = False
def _safe_telemetry_operation(self, operation):
if not self.ready:
def _safe_telemetry_operation(self, operation: Callable[[], None]) -> None:
"""Execute telemetry operation safely, checking both readiness and environment variables."""
if not self._should_execute_telemetry():
return
try:
operation()
@@ -423,7 +433,8 @@ class Telemetry:
return span
return self._safe_telemetry_operation(operation)
self._safe_telemetry_operation(operation)
return None
def task_ended(self, span: Span, task: Task, crew: Crew):
"""Records the completion of a task execution in a crew.
@@ -773,7 +784,8 @@ class Telemetry:
return span
if crew.share_crew:
return self._safe_telemetry_operation(operation)
self._safe_telemetry_operation(operation)
return operation()
return None
def end_crew(self, crew, final_string_output):

View File

@@ -17,6 +17,7 @@ class ConsoleFormatter:
current_lite_agent_branch: Optional[Tree] = None
tool_usage_counts: Dict[str, int] = {}
current_reasoning_branch: Optional[Tree] = None # Track reasoning status
_live_paused: bool = False
current_llm_tool_tree: Optional[Tree] = None
def __init__(self, verbose: bool = False):
@@ -119,6 +120,19 @@ class ConsoleFormatter:
# Finally, pass through to the regular Console.print implementation
self.console.print(*args, **kwargs)
def pause_live_updates(self) -> None:
"""Pause Live session updates to allow for human input without interference."""
if not self._live_paused:
if self._live:
self._live.stop()
self._live = None
self._live_paused = True
def resume_live_updates(self) -> None:
"""Resume Live session updates after human input is complete."""
if self._live_paused:
self._live_paused = False
def print_panel(
self, content: Text, title: str, style: str = "blue", is_flow: bool = False
) -> None:

View File

@@ -9,6 +9,14 @@ from crewai.telemetry import Telemetry
from opentelemetry import trace
@pytest.fixture(autouse=True)
def cleanup_telemetry():
"""Automatically clean up Telemetry singleton between tests."""
Telemetry._instance = None
yield
Telemetry._instance = None
@pytest.mark.parametrize(
"env_var,value,expected_ready",
[

View File

@@ -1,11 +1,19 @@
import os
from unittest.mock import patch
from unittest.mock import patch, MagicMock
import pytest
from crewai.telemetry import Telemetry
@pytest.fixture(autouse=True)
def cleanup_telemetry():
"""Automatically clean up Telemetry singleton between tests."""
Telemetry._instance = None
yield
Telemetry._instance = None
@pytest.mark.parametrize("env_var,value,expected_ready", [
("OTEL_SDK_DISABLED", "true", False),
("OTEL_SDK_DISABLED", "TRUE", False),
@@ -28,3 +36,59 @@ def test_telemetry_enabled_by_default():
with patch("crewai.telemetry.telemetry.TracerProvider"):
telemetry = Telemetry()
assert telemetry.ready is True
def test_telemetry_disable_after_singleton_creation():
"""Test that telemetry operations are disabled when env var is set after singleton creation."""
with patch.dict(os.environ, {}, clear=True):
with patch("crewai.telemetry.telemetry.TracerProvider"):
telemetry = Telemetry()
assert telemetry.ready is True
mock_operation = MagicMock()
telemetry._safe_telemetry_operation(mock_operation)
mock_operation.assert_called_once()
mock_operation.reset_mock()
os.environ['CREWAI_DISABLE_TELEMETRY'] = 'true'
telemetry._safe_telemetry_operation(mock_operation)
mock_operation.assert_not_called()
def test_telemetry_disable_with_multiple_instances():
"""Test that multiple telemetry instances respect dynamically changed env vars."""
with patch.dict(os.environ, {}, clear=True):
with patch("crewai.telemetry.telemetry.TracerProvider"):
telemetry1 = Telemetry()
assert telemetry1.ready is True
os.environ['CREWAI_DISABLE_TELEMETRY'] = 'true'
telemetry2 = Telemetry()
assert telemetry2 is telemetry1
assert telemetry2.ready is True
mock_operation = MagicMock()
telemetry2._safe_telemetry_operation(mock_operation)
mock_operation.assert_not_called()
def test_telemetry_otel_sdk_disabled_after_creation():
"""Test that OTEL_SDK_DISABLED also works when set after singleton creation."""
with patch.dict(os.environ, {}, clear=True):
with patch("crewai.telemetry.telemetry.TracerProvider"):
telemetry = Telemetry()
assert telemetry.ready is True
mock_operation = MagicMock()
telemetry._safe_telemetry_operation(mock_operation)
mock_operation.assert_called_once()
mock_operation.reset_mock()
os.environ['OTEL_SDK_DISABLED'] = 'true'
telemetry._safe_telemetry_operation(mock_operation)
mock_operation.assert_not_called()

View File

@@ -0,0 +1,167 @@
import pytest
from unittest.mock import patch, MagicMock
from crewai.utilities.events.event_listener import event_listener
class TestFlowHumanInputIntegration:
"""Test integration between Flow execution and human input functionality."""
def test_console_formatter_pause_resume_methods(self):
"""Test that ConsoleFormatter pause/resume methods work correctly."""
formatter = event_listener.formatter
original_paused_state = formatter._live_paused
try:
formatter._live_paused = False
formatter.pause_live_updates()
assert formatter._live_paused
formatter.resume_live_updates()
assert not formatter._live_paused
finally:
formatter._live_paused = original_paused_state
@patch('builtins.input', return_value='')
def test_human_input_pauses_flow_updates(self, mock_input):
"""Test that human input pauses Flow status updates."""
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
executor = CrewAgentExecutorMixin()
executor.crew = MagicMock()
executor.crew._train = False
executor._printer = MagicMock()
formatter = event_listener.formatter
original_paused_state = formatter._live_paused
try:
formatter._live_paused = False
with patch.object(formatter, 'pause_live_updates') as mock_pause, \
patch.object(formatter, 'resume_live_updates') as mock_resume:
result = executor._ask_human_input("Test result")
mock_pause.assert_called_once()
mock_resume.assert_called_once()
mock_input.assert_called_once()
assert result == ''
finally:
formatter._live_paused = original_paused_state
@patch('builtins.input', side_effect=['feedback', ''])
def test_multiple_human_input_rounds(self, mock_input):
"""Test multiple rounds of human input with Flow status management."""
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
executor = CrewAgentExecutorMixin()
executor.crew = MagicMock()
executor.crew._train = False
executor._printer = MagicMock()
formatter = event_listener.formatter
original_paused_state = formatter._live_paused
try:
pause_calls = []
resume_calls = []
def track_pause():
pause_calls.append(True)
def track_resume():
resume_calls.append(True)
with patch.object(formatter, 'pause_live_updates', side_effect=track_pause), \
patch.object(formatter, 'resume_live_updates', side_effect=track_resume):
result1 = executor._ask_human_input("Test result 1")
assert result1 == 'feedback'
result2 = executor._ask_human_input("Test result 2")
assert result2 == ''
assert len(pause_calls) == 2
assert len(resume_calls) == 2
finally:
formatter._live_paused = original_paused_state
def test_pause_resume_with_no_live_session(self):
"""Test pause/resume methods handle case when no Live session exists."""
formatter = event_listener.formatter
original_live = formatter._live
original_paused_state = formatter._live_paused
try:
formatter._live = None
formatter._live_paused = False
formatter.pause_live_updates()
formatter.resume_live_updates()
assert not formatter._live_paused
finally:
formatter._live = original_live
formatter._live_paused = original_paused_state
def test_pause_resume_exception_handling(self):
"""Test that resume is called even if exception occurs during human input."""
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
executor = CrewAgentExecutorMixin()
executor.crew = MagicMock()
executor.crew._train = False
executor._printer = MagicMock()
formatter = event_listener.formatter
original_paused_state = formatter._live_paused
try:
with patch.object(formatter, 'pause_live_updates') as mock_pause, \
patch.object(formatter, 'resume_live_updates') as mock_resume, \
patch('builtins.input', side_effect=KeyboardInterrupt("Test exception")):
with pytest.raises(KeyboardInterrupt):
executor._ask_human_input("Test result")
mock_pause.assert_called_once()
mock_resume.assert_called_once()
finally:
formatter._live_paused = original_paused_state
def test_training_mode_human_input(self):
"""Test human input in training mode."""
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
executor = CrewAgentExecutorMixin()
executor.crew = MagicMock()
executor.crew._train = True
executor._printer = MagicMock()
formatter = event_listener.formatter
original_paused_state = formatter._live_paused
try:
with patch.object(formatter, 'pause_live_updates') as mock_pause, \
patch.object(formatter, 'resume_live_updates') as mock_resume, \
patch('builtins.input', return_value='training feedback'):
result = executor._ask_human_input("Test result")
mock_pause.assert_called_once()
mock_resume.assert_called_once()
assert result == 'training feedback'
executor._printer.print.assert_called()
call_args = [call[1]['content'] for call in executor._printer.print.call_args_list]
training_prompt_found = any('TRAINING MODE' in content for content in call_args)
assert training_prompt_found
finally:
formatter._live_paused = original_paused_state

View File

@@ -0,0 +1,171 @@
"""Test to reproduce and verify fix for issue #3000: sys.stdout/stderr hijacking."""
import sys
import io
from unittest.mock import patch, MagicMock
import pytest
def test_crewai_hijacks_sys_streams():
"""Test that importing crewai.llm currently hijacks sys.stdout and sys.stderr (before fix)."""
original_stdout = sys.stdout
original_stderr = sys.stderr
import crewai.llm # noqa: F401
try:
assert sys.stdout is not original_stdout, "sys.stdout should be hijacked by FilteredStream"
assert sys.stderr is not original_stderr, "sys.stderr should be hijacked by FilteredStream"
assert hasattr(sys.stdout, '_original_stream'), "sys.stdout should be wrapped by FilteredStream"
assert hasattr(sys.stderr, '_original_stream'), "sys.stderr should be wrapped by FilteredStream"
assert False, "The fix didn't work - streams are still being hijacked"
except AssertionError:
pass
def test_litellm_output_is_filtered():
"""Test that litellm-related output is currently filtered (before fix)."""
import crewai.llm # noqa: F401
captured_output = io.StringIO()
test_strings = [
"litellm.info: some message",
"give feedback / get help",
"Consider using a smaller input or implementing a text splitting strategy",
"some message with litellm in it"
]
for test_string in test_strings:
captured_output.seek(0)
captured_output.truncate(0)
original_stdout = sys.stdout
sys.stdout = captured_output
try:
print(test_string, end='')
assert captured_output.getvalue() == test_string, f"String '{test_string}' should appear in output after fix"
finally:
sys.stdout = original_stdout
def test_normal_output_passes_through():
"""Test that normal output passes through correctly after the fix."""
import crewai.llm # noqa: F401
captured_output = io.StringIO()
original_stdout = sys.stdout
sys.stdout = captured_output
try:
test_string = "This is normal output that should pass through"
print(test_string, end='')
assert captured_output.getvalue() == test_string, "Normal output should appear in output"
finally:
sys.stdout = original_stdout
def test_crewai_does_not_hijack_sys_streams_after_fix():
"""Test that after the fix, importing crewai.llm does NOT hijack sys.stdout and sys.stderr."""
original_stdout = sys.stdout
original_stderr = sys.stderr
if 'crewai.llm' in sys.modules:
del sys.modules['crewai.llm']
if 'crewai' in sys.modules:
del sys.modules['crewai']
import crewai.llm # noqa: F401
assert sys.stdout is original_stdout, "sys.stdout should NOT be hijacked after fix"
assert sys.stderr is original_stderr, "sys.stderr should NOT be hijacked after fix"
assert not hasattr(sys.stdout, '_original_stream'), "sys.stdout should not be wrapped after fix"
assert not hasattr(sys.stderr, '_original_stream'), "sys.stderr should not be wrapped after fix"
def test_litellm_output_still_suppressed_during_llm_calls():
"""Test that litellm output is still suppressed during actual LLM calls after the fix."""
from crewai.llm import LLM
captured_stdout = io.StringIO()
captured_stderr = io.StringIO()
with patch('sys.stdout', captured_stdout), patch('sys.stderr', captured_stderr):
with patch('litellm.completion') as mock_completion:
mock_completion.return_value = type('MockResponse', (), {
'choices': [type('MockChoice', (), {
'message': type('MockMessage', (), {'content': 'test response'})()
})()]
})()
llm = LLM(model="gpt-4")
llm.call([{"role": "user", "content": "test"}])
output = captured_stdout.getvalue() + captured_stderr.getvalue()
assert "litellm" not in output.lower(), "litellm output should still be suppressed during calls"
def test_concurrent_llm_calls():
"""Test that contextual suppression works correctly with concurrent calls."""
import threading
from crewai.llm import LLM
results = []
def make_llm_call():
with patch('litellm.completion') as mock_completion:
mock_completion.return_value = type('MockResponse', (), {
'choices': [type('MockChoice', (), {
'message': type('MockMessage', (), {'content': 'test response'})()
})()]
})()
llm = LLM(model="gpt-4")
result = llm.call([{"role": "user", "content": "test"}])
results.append(result)
threads = [threading.Thread(target=make_llm_call) for _ in range(3)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
assert len(results) == 3
assert all("test response" in result for result in results)
def test_logger_performance():
"""Test that logger operations work correctly without global caching."""
from crewai.llm import suppress_litellm_output
with patch('logging.getLogger') as mock_get_logger:
mock_logger = MagicMock()
mock_get_logger.return_value = mock_logger
with suppress_litellm_output():
pass
with suppress_litellm_output():
pass
assert mock_get_logger.call_count == 2
mock_get_logger.assert_called_with("litellm")
def test_suppression_error_handling():
"""Test that suppression continues even if logger operations fail."""
from crewai.llm import suppress_litellm_output
with patch('logging.getLogger') as mock_get_logger:
mock_logger = MagicMock()
mock_logger.setLevel.side_effect = Exception("Logger error")
mock_get_logger.return_value = mock_logger
try:
with suppress_litellm_output():
result = "operation completed"
assert result == "operation completed"
except Exception:
pytest.fail("Suppression should not fail even if logger operations fail")

View File

@@ -0,0 +1,116 @@
from unittest.mock import MagicMock, patch
from rich.tree import Tree
from rich.live import Live
from crewai.utilities.events.utils.console_formatter import ConsoleFormatter
class TestConsoleFormatterPauseResume:
"""Test ConsoleFormatter pause/resume functionality."""
def test_pause_live_updates_with_active_session(self):
"""Test pausing when Live session is active."""
formatter = ConsoleFormatter()
mock_live = MagicMock(spec=Live)
formatter._live = mock_live
formatter._live_paused = False
formatter.pause_live_updates()
mock_live.stop.assert_called_once()
assert formatter._live_paused
def test_pause_live_updates_when_already_paused(self):
"""Test pausing when already paused does nothing."""
formatter = ConsoleFormatter()
mock_live = MagicMock(spec=Live)
formatter._live = mock_live
formatter._live_paused = True
formatter.pause_live_updates()
mock_live.stop.assert_not_called()
assert formatter._live_paused
def test_pause_live_updates_with_no_session(self):
"""Test pausing when no Live session exists."""
formatter = ConsoleFormatter()
formatter._live = None
formatter._live_paused = False
formatter.pause_live_updates()
assert formatter._live_paused
def test_resume_live_updates_when_paused(self):
"""Test resuming when paused."""
formatter = ConsoleFormatter()
formatter._live_paused = True
formatter.resume_live_updates()
assert not formatter._live_paused
def test_resume_live_updates_when_not_paused(self):
"""Test resuming when not paused does nothing."""
formatter = ConsoleFormatter()
formatter._live_paused = False
formatter.resume_live_updates()
assert not formatter._live_paused
def test_print_after_resume_restarts_live_session(self):
"""Test that printing a Tree after resume creates new Live session."""
formatter = ConsoleFormatter()
formatter._live_paused = True
formatter._live = None
formatter.resume_live_updates()
assert not formatter._live_paused
tree = Tree("Test")
with patch('crewai.utilities.events.utils.console_formatter.Live') as mock_live_class:
mock_live_instance = MagicMock()
mock_live_class.return_value = mock_live_instance
formatter.print(tree)
mock_live_class.assert_called_once()
mock_live_instance.start.assert_called_once()
assert formatter._live == mock_live_instance
def test_multiple_pause_resume_cycles(self):
"""Test multiple pause/resume cycles work correctly."""
formatter = ConsoleFormatter()
mock_live = MagicMock(spec=Live)
formatter._live = mock_live
formatter._live_paused = False
formatter.pause_live_updates()
assert formatter._live_paused
mock_live.stop.assert_called_once()
assert formatter._live is None # Live session should be cleared
formatter.resume_live_updates()
assert not formatter._live_paused
formatter.pause_live_updates()
assert formatter._live_paused
formatter.resume_live_updates()
assert not formatter._live_paused
def test_pause_resume_state_initialization(self):
"""Test that _live_paused is properly initialized."""
formatter = ConsoleFormatter()
assert hasattr(formatter, '_live_paused')
assert not formatter._live_paused