Files
crewAI/tests/test_sys_stream_hijacking.py
Devin AI bab03b2be1 Address review feedback: Add logger caching, improve error handling, expand tests
- Cache litellm logger instance globally for performance optimization
- Implement more specific warning pattern filtering instead of broad matching
- Add robust error handling with graceful degradation in suppression
- Enhance streaming error handling with better logging and continue logic
- Add 3 new comprehensive tests:
  - test_concurrent_llm_calls: Verify thread safety with concurrent LLM calls
  - test_logger_caching_performance: Confirm logger instance caching works
  - test_suppression_error_handling: Test graceful degradation on logger errors
- Fix all lint errors (unused imports) in test file

Co-Authored-By: João <joao@crewai.com>
2025-06-11 23:46:58 +00:00

171 lines
6.1 KiB
Python

"""Test to reproduce and verify fix for issue #3000: sys.stdout/stderr hijacking."""
import sys
import io
from unittest.mock import patch, MagicMock
import pytest
def test_crewai_hijacks_sys_streams():
"""Test that importing crewai.llm currently hijacks sys.stdout and sys.stderr (before fix)."""
original_stdout = sys.stdout
original_stderr = sys.stderr
import crewai.llm # noqa: F401
try:
assert sys.stdout is not original_stdout, "sys.stdout should be hijacked by FilteredStream"
assert sys.stderr is not original_stderr, "sys.stderr should be hijacked by FilteredStream"
assert hasattr(sys.stdout, '_original_stream'), "sys.stdout should be wrapped by FilteredStream"
assert hasattr(sys.stderr, '_original_stream'), "sys.stderr should be wrapped by FilteredStream"
assert False, "The fix didn't work - streams are still being hijacked"
except AssertionError:
pass
def test_litellm_output_is_filtered():
"""Test that litellm-related output is currently filtered (before fix)."""
import crewai.llm # noqa: F401
captured_output = io.StringIO()
test_strings = [
"litellm.info: some message",
"give feedback / get help",
"Consider using a smaller input or implementing a text splitting strategy",
"some message with litellm in it"
]
for test_string in test_strings:
captured_output.seek(0)
captured_output.truncate(0)
original_stdout = sys.stdout
sys.stdout = captured_output
try:
print(test_string, end='')
assert captured_output.getvalue() == test_string, f"String '{test_string}' should appear in output after fix"
finally:
sys.stdout = original_stdout
def test_normal_output_passes_through():
"""Test that normal output passes through correctly after the fix."""
import crewai.llm # noqa: F401
captured_output = io.StringIO()
original_stdout = sys.stdout
sys.stdout = captured_output
try:
test_string = "This is normal output that should pass through"
print(test_string, end='')
assert captured_output.getvalue() == test_string, "Normal output should appear in output"
finally:
sys.stdout = original_stdout
def test_crewai_does_not_hijack_sys_streams_after_fix():
"""Test that after the fix, importing crewai.llm does NOT hijack sys.stdout and sys.stderr."""
original_stdout = sys.stdout
original_stderr = sys.stderr
if 'crewai.llm' in sys.modules:
del sys.modules['crewai.llm']
if 'crewai' in sys.modules:
del sys.modules['crewai']
import crewai.llm # noqa: F401
assert sys.stdout is original_stdout, "sys.stdout should NOT be hijacked after fix"
assert sys.stderr is original_stderr, "sys.stderr should NOT be hijacked after fix"
assert not hasattr(sys.stdout, '_original_stream'), "sys.stdout should not be wrapped after fix"
assert not hasattr(sys.stderr, '_original_stream'), "sys.stderr should not be wrapped after fix"
def test_litellm_output_still_suppressed_during_llm_calls():
"""Test that litellm output is still suppressed during actual LLM calls after the fix."""
from crewai.llm import LLM
captured_stdout = io.StringIO()
captured_stderr = io.StringIO()
with patch('sys.stdout', captured_stdout), patch('sys.stderr', captured_stderr):
with patch('litellm.completion') as mock_completion:
mock_completion.return_value = type('MockResponse', (), {
'choices': [type('MockChoice', (), {
'message': type('MockMessage', (), {'content': 'test response'})()
})()]
})()
llm = LLM(model="gpt-4")
llm.call([{"role": "user", "content": "test"}])
output = captured_stdout.getvalue() + captured_stderr.getvalue()
assert "litellm" not in output.lower(), "litellm output should still be suppressed during calls"
def test_concurrent_llm_calls():
"""Test that contextual suppression works correctly with concurrent calls."""
import threading
from crewai.llm import LLM
results = []
def make_llm_call():
with patch('litellm.completion') as mock_completion:
mock_completion.return_value = type('MockResponse', (), {
'choices': [type('MockChoice', (), {
'message': type('MockMessage', (), {'content': 'test response'})()
})()]
})()
llm = LLM(model="gpt-4")
result = llm.call([{"role": "user", "content": "test"}])
results.append(result)
threads = [threading.Thread(target=make_llm_call) for _ in range(3)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
assert len(results) == 3
assert all("test response" in result for result in results)
def test_logger_caching_performance():
"""Test that logger instance is cached for performance."""
from crewai.llm import suppress_litellm_output
with patch('logging.getLogger') as mock_get_logger:
mock_logger = MagicMock()
mock_get_logger.return_value = mock_logger
with suppress_litellm_output():
pass
with suppress_litellm_output():
pass
mock_get_logger.assert_called_once_with("litellm")
def test_suppression_error_handling():
"""Test that suppression continues even if logger operations fail."""
from crewai.llm import suppress_litellm_output
with patch('logging.getLogger') as mock_get_logger:
mock_logger = MagicMock()
mock_logger.setLevel.side_effect = Exception("Logger error")
mock_get_logger.return_value = mock_logger
try:
with suppress_litellm_output():
result = "operation completed"
assert result == "operation completed"
except Exception:
pytest.fail("Suppression should not fail even if logger operations fail")