Compare commits

..

1 Commits

Author SHA1 Message Date
Eduardo Chiarotti
554ea8fcee fix: possible fix for Thinking stuck 2025-06-11 13:58:28 -03:00
5 changed files with 202 additions and 290 deletions

View File

@@ -1,20 +1,16 @@
import warnings
from crewai.agent import Agent
from crewai import agent
from crewai import cli
from crewai.crew import Crew
from crewai.crews.crew_output import CrewOutput
from crewai.flow.flow import Flow
from crewai.knowledge.knowledge import Knowledge
from crewai import knowledge
from crewai.llm import LLM
from crewai.llms.base_llm import BaseLLM
from crewai.process import Process
from crewai.task import Task
from crewai.tasks.llm_guardrail import LLMGuardrail
from crewai.tasks.task_output import TaskOutput
from crewai import utilities
warnings.filterwarnings(
"ignore",
@@ -25,8 +21,6 @@ warnings.filterwarnings(
__version__ = "0.126.0"
__all__ = [
"Agent",
"agent",
"cli",
"Crew",
"CrewOutput",
"Process",
@@ -35,8 +29,6 @@ __all__ = [
"BaseLLM",
"Flow",
"Knowledge",
"knowledge",
"TaskOutput",
"LLMGuardrail",
"utilities",
]

View File

@@ -1,6 +1,8 @@
import json
import logging
import os
import sys
import threading
import warnings
from collections import defaultdict
from contextlib import contextmanager
@@ -46,7 +48,8 @@ with warnings.catch_warnings():
from litellm.utils import supports_response_schema
import io
from typing import TextIO
from crewai.llms.base_llm import BaseLLM
from crewai.utilities.events import crewai_event_bus
@@ -57,7 +60,69 @@ from crewai.utilities.exceptions.context_window_exceeding_exception import (
load_dotenv()
class FilteredStream(io.TextIOBase):
_lock = None
def __init__(self, original_stream: TextIO):
self._original_stream = original_stream
self._lock = threading.Lock()
def write(self, s: str) -> int:
if not self._lock:
self._lock = threading.Lock()
with self._lock:
lower_s = s.lower()
# Skip common noisy LiteLLM banners and any other lines that contain "litellm"
if (
"give feedback / get help" in lower_s
or "litellm.info:" in lower_s
or "litellm" in lower_s
or "Consider using a smaller input or implementing a text splitting strategy" in lower_s
):
return 0
return self._original_stream.write(s)
def flush(self):
with self._lock:
return self._original_stream.flush()
def __getattr__(self, name):
"""Delegate attribute access to the wrapped original stream.
This ensures compatibility with libraries (e.g., Rich) that rely on
attributes such as `encoding`, `isatty`, `buffer`, etc., which may not
be explicitly defined on this proxy class.
"""
return getattr(self._original_stream, name)
# Delegate common properties/methods explicitly so they aren't shadowed by
# the TextIOBase defaults (e.g., .encoding returns None by default, which
# confuses Rich). These explicit pass-throughs ensure the wrapped Console
# still sees a fully-featured stream.
@property
def encoding(self):
return getattr(self._original_stream, "encoding", "utf-8")
def isatty(self):
return self._original_stream.isatty()
def fileno(self):
return self._original_stream.fileno()
def writable(self):
return True
# Apply the filtered stream globally so that any subsequent writes containing the filtered
# keywords (e.g., "litellm") are hidden from terminal output. We guard against double
# wrapping to ensure idempotency in environments where this module might be reloaded.
if not isinstance(sys.stdout, FilteredStream):
sys.stdout = FilteredStream(sys.stdout)
if not isinstance(sys.stderr, FilteredStream):
sys.stderr = FilteredStream(sys.stderr)
LLM_CONTEXT_WINDOW_SIZES = {
@@ -201,40 +266,6 @@ def suppress_warnings():
yield
@contextmanager
def suppress_litellm_output():
"""Contextually suppress litellm-related logging output during LLM calls."""
litellm_logger = logging.getLogger("litellm")
original_level = litellm_logger.level
warning_patterns = [
".*give feedback.*",
".*Consider using a smaller input.*",
".*litellm\\.info:.*",
".*text splitting strategy.*"
]
try:
with warnings.catch_warnings():
for pattern in warning_patterns:
warnings.filterwarnings("ignore", message=pattern)
try:
litellm_logger.setLevel(logging.WARNING)
except Exception as e:
logging.debug(f"Error setting logger level: {e}")
yield
except Exception as e:
logging.debug(f"Error in litellm output suppression: {e}")
raise
finally:
try:
litellm_logger.setLevel(original_level)
except Exception as e:
logging.debug(f"Error restoring logger level: {e}")
class Delta(TypedDict):
content: Optional[str]
role: Optional[str]
@@ -419,61 +450,60 @@ class LLM(BaseLLM):
try:
# --- 3) Process each chunk in the stream
with suppress_litellm_output():
for chunk in litellm.completion(**params):
chunk_count += 1
last_chunk = chunk
for chunk in litellm.completion(**params):
chunk_count += 1
last_chunk = chunk
# Extract content from the chunk
chunk_content = None
# Extract content from the chunk
chunk_content = None
# Safely extract content from various chunk formats
try:
# Try to access choices safely
choices = None
if isinstance(chunk, dict) and "choices" in chunk:
choices = chunk["choices"]
elif hasattr(chunk, "choices"):
# Check if choices is not a type but an actual attribute with value
if not isinstance(getattr(chunk, "choices"), type):
choices = getattr(chunk, "choices")
# Safely extract content from various chunk formats
try:
# Try to access choices safely
choices = None
if isinstance(chunk, dict) and "choices" in chunk:
choices = chunk["choices"]
elif hasattr(chunk, "choices"):
# Check if choices is not a type but an actual attribute with value
if not isinstance(getattr(chunk, "choices"), type):
choices = getattr(chunk, "choices")
# Try to extract usage information if available
if isinstance(chunk, dict) and "usage" in chunk:
usage_info = chunk["usage"]
elif hasattr(chunk, "usage"):
# Check if usage is not a type but an actual attribute with value
if not isinstance(getattr(chunk, "usage"), type):
usage_info = getattr(chunk, "usage")
# Try to extract usage information if available
if isinstance(chunk, dict) and "usage" in chunk:
usage_info = chunk["usage"]
elif hasattr(chunk, "usage"):
# Check if usage is not a type but an actual attribute with value
if not isinstance(getattr(chunk, "usage"), type):
usage_info = getattr(chunk, "usage")
if choices and len(choices) > 0:
choice = choices[0]
if choices and len(choices) > 0:
choice = choices[0]
# Handle different delta formats
delta = None
if isinstance(choice, dict) and "delta" in choice:
delta = choice["delta"]
elif hasattr(choice, "delta"):
delta = getattr(choice, "delta")
# Handle different delta formats
delta = None
if isinstance(choice, dict) and "delta" in choice:
delta = choice["delta"]
elif hasattr(choice, "delta"):
delta = getattr(choice, "delta")
# Extract content from delta
if delta:
# Handle dict format
if isinstance(delta, dict):
if "content" in delta and delta["content"] is not None:
chunk_content = delta["content"]
# Handle object format
elif hasattr(delta, "content"):
chunk_content = getattr(delta, "content")
# Extract content from delta
if delta:
# Handle dict format
if isinstance(delta, dict):
if "content" in delta and delta["content"] is not None:
chunk_content = delta["content"]
# Handle object format
elif hasattr(delta, "content"):
chunk_content = getattr(delta, "content")
# Handle case where content might be None or empty
if chunk_content is None and isinstance(delta, dict):
# Some models might send empty content chunks
chunk_content = ""
# Handle case where content might be None or empty
if chunk_content is None and isinstance(delta, dict):
# Some models might send empty content chunks
chunk_content = ""
# Enable tool calls using streaming
if "tool_calls" in delta:
tool_calls = delta["tool_calls"]
# Enable tool calls using streaming
if "tool_calls" in delta:
tool_calls = delta["tool_calls"]
if tool_calls:
result = self._handle_streaming_tool_calls(
@@ -484,22 +514,21 @@ class LLM(BaseLLM):
if result is not None:
chunk_content = result
except Exception as e:
logging.error(f"Error extracting content from chunk: {e}", exc_info=True)
logging.debug(f"Chunk format: {type(chunk)}, content: {chunk}")
continue
except Exception as e:
logging.debug(f"Error extracting content from chunk: {e}")
logging.debug(f"Chunk format: {type(chunk)}, content: {chunk}")
# Only add non-None content to the response
if chunk_content is not None:
# Add the chunk content to the full response
full_response += chunk_content
# Only add non-None content to the response
if chunk_content is not None:
# Add the chunk content to the full response
full_response += chunk_content
# Emit the chunk event
assert hasattr(crewai_event_bus, "emit")
crewai_event_bus.emit(
self,
event=LLMStreamChunkEvent(chunk=chunk_content),
)
# Emit the chunk event
assert hasattr(crewai_event_bus, "emit")
crewai_event_bus.emit(
self,
event=LLMStreamChunkEvent(chunk=chunk_content),
)
# --- 4) Fallback to non-streaming if no content received
if not full_response.strip() and chunk_count == 0:
logging.warning(
@@ -736,8 +765,7 @@ class LLM(BaseLLM):
# and convert them to our own exception type for consistent handling
# across the codebase. This allows CrewAgentExecutor to handle context
# length issues appropriately.
with suppress_litellm_output():
response = litellm.completion(**params)
response = litellm.completion(**params)
except ContextWindowExceededError as e:
# Convert litellm's context window error to our own exception type
# for consistent handling in the rest of the codebase

View File

@@ -326,10 +326,14 @@ class EventListener(BaseEventListener):
@crewai_event_bus.on(LLMCallStartedEvent)
def on_llm_call_started(source, event: LLMCallStartedEvent):
self.formatter.handle_llm_call_started(
# Capture the returned tool branch and update the current_tool_branch reference
thinking_branch = self.formatter.handle_llm_call_started(
self.formatter.current_agent_branch,
self.formatter.current_crew_tree,
)
# Update the formatter's current_tool_branch to ensure proper cleanup
if thinking_branch is not None:
self.formatter.current_tool_branch = thinking_branch
@crewai_event_bus.on(LLMCallCompletedEvent)
def on_llm_call_completed(source, event: LLMCallCompletedEvent):

View File

@@ -625,14 +625,22 @@ class ConsoleFormatter:
return None
# Only add thinking status if we don't have a current tool branch
if self.current_tool_branch is None:
# or if the current tool branch is not a thinking node
should_add_thinking = (
self.current_tool_branch is None or
"Thinking" not in str(self.current_tool_branch.label)
)
if should_add_thinking:
tool_branch = branch_to_use.add("")
self.update_tree_label(tool_branch, "🧠", "Thinking...", "blue")
self.current_tool_branch = tool_branch
self.print(tree_to_use)
self.print()
return tool_branch
return None
# Return the existing tool branch if it's already a thinking node
return self.current_tool_branch
def handle_llm_call_completed(
self,
@@ -641,7 +649,7 @@ class ConsoleFormatter:
crew_tree: Optional[Tree],
) -> None:
"""Handle LLM call completed event."""
if not self.verbose or tool_branch is None:
if not self.verbose:
return
# Decide which tree to render: prefer full crew tree, else parent branch
@@ -649,23 +657,47 @@ class ConsoleFormatter:
if tree_to_use is None:
return
# Remove the thinking status node when complete
if "Thinking" in str(tool_branch.label):
# Try to remove the thinking status node - first try the provided tool_branch
thinking_branch_to_remove = None
removed = False
# Method 1: Use the provided tool_branch if it's a thinking node
if tool_branch is not None and "Thinking" in str(tool_branch.label):
thinking_branch_to_remove = tool_branch
# Method 2: Fallback - search for any thinking node if tool_branch is None or not thinking
if thinking_branch_to_remove is None:
parents = [
self.current_lite_agent_branch,
self.current_agent_branch,
self.current_task_branch,
tree_to_use,
]
removed = False
for parent in parents:
if isinstance(parent, Tree) and tool_branch in parent.children:
parent.children.remove(tool_branch)
if isinstance(parent, Tree):
for child in parent.children:
if "Thinking" in str(child.label):
thinking_branch_to_remove = child
break
if thinking_branch_to_remove:
break
# Remove the thinking node if found
if thinking_branch_to_remove:
parents = [
self.current_lite_agent_branch,
self.current_agent_branch,
self.current_task_branch,
tree_to_use,
]
for parent in parents:
if isinstance(parent, Tree) and thinking_branch_to_remove in parent.children:
parent.children.remove(thinking_branch_to_remove)
removed = True
break
# Clear pointer if we just removed the current_tool_branch
if self.current_tool_branch is tool_branch:
if self.current_tool_branch is thinking_branch_to_remove:
self.current_tool_branch = None
if removed:
@@ -682,9 +714,36 @@ class ConsoleFormatter:
# Decide which tree to render: prefer full crew tree, else parent branch
tree_to_use = self.current_crew_tree or crew_tree or self.current_task_branch
# Update tool branch if it exists
if tool_branch:
tool_branch.label = Text("❌ LLM Failed", style="red bold")
# Find the thinking branch to update (similar to completion logic)
thinking_branch_to_update = None
# Method 1: Use the provided tool_branch if it's a thinking node
if tool_branch is not None and "Thinking" in str(tool_branch.label):
thinking_branch_to_update = tool_branch
# Method 2: Fallback - search for any thinking node if tool_branch is None or not thinking
if thinking_branch_to_update is None:
parents = [
self.current_lite_agent_branch,
self.current_agent_branch,
self.current_task_branch,
tree_to_use,
]
for parent in parents:
if isinstance(parent, Tree):
for child in parent.children:
if "Thinking" in str(child.label):
thinking_branch_to_update = child
break
if thinking_branch_to_update:
break
# Update the thinking branch to show failure
if thinking_branch_to_update:
thinking_branch_to_update.label = Text("❌ LLM Failed", style="red bold")
# Clear the current_tool_branch reference
if self.current_tool_branch is thinking_branch_to_update:
self.current_tool_branch = None
if tree_to_use:
self.print(tree_to_use)
self.print()

View File

@@ -1,171 +0,0 @@
"""Test to reproduce and verify fix for issue #3000: sys.stdout/stderr hijacking."""
import sys
import io
from unittest.mock import patch, MagicMock
import pytest
def test_crewai_hijacks_sys_streams():
"""Test that importing crewai.llm currently hijacks sys.stdout and sys.stderr (before fix)."""
original_stdout = sys.stdout
original_stderr = sys.stderr
import crewai.llm # noqa: F401
try:
assert sys.stdout is not original_stdout, "sys.stdout should be hijacked by FilteredStream"
assert sys.stderr is not original_stderr, "sys.stderr should be hijacked by FilteredStream"
assert hasattr(sys.stdout, '_original_stream'), "sys.stdout should be wrapped by FilteredStream"
assert hasattr(sys.stderr, '_original_stream'), "sys.stderr should be wrapped by FilteredStream"
assert False, "The fix didn't work - streams are still being hijacked"
except AssertionError:
pass
def test_litellm_output_is_filtered():
"""Test that litellm-related output is currently filtered (before fix)."""
import crewai.llm # noqa: F401
captured_output = io.StringIO()
test_strings = [
"litellm.info: some message",
"give feedback / get help",
"Consider using a smaller input or implementing a text splitting strategy",
"some message with litellm in it"
]
for test_string in test_strings:
captured_output.seek(0)
captured_output.truncate(0)
original_stdout = sys.stdout
sys.stdout = captured_output
try:
print(test_string, end='')
assert captured_output.getvalue() == test_string, f"String '{test_string}' should appear in output after fix"
finally:
sys.stdout = original_stdout
def test_normal_output_passes_through():
"""Test that normal output passes through correctly after the fix."""
import crewai.llm # noqa: F401
captured_output = io.StringIO()
original_stdout = sys.stdout
sys.stdout = captured_output
try:
test_string = "This is normal output that should pass through"
print(test_string, end='')
assert captured_output.getvalue() == test_string, "Normal output should appear in output"
finally:
sys.stdout = original_stdout
def test_crewai_does_not_hijack_sys_streams_after_fix():
"""Test that after the fix, importing crewai.llm does NOT hijack sys.stdout and sys.stderr."""
original_stdout = sys.stdout
original_stderr = sys.stderr
if 'crewai.llm' in sys.modules:
del sys.modules['crewai.llm']
if 'crewai' in sys.modules:
del sys.modules['crewai']
import crewai.llm # noqa: F401
assert sys.stdout is original_stdout, "sys.stdout should NOT be hijacked after fix"
assert sys.stderr is original_stderr, "sys.stderr should NOT be hijacked after fix"
assert not hasattr(sys.stdout, '_original_stream'), "sys.stdout should not be wrapped after fix"
assert not hasattr(sys.stderr, '_original_stream'), "sys.stderr should not be wrapped after fix"
def test_litellm_output_still_suppressed_during_llm_calls():
"""Test that litellm output is still suppressed during actual LLM calls after the fix."""
from crewai.llm import LLM
captured_stdout = io.StringIO()
captured_stderr = io.StringIO()
with patch('sys.stdout', captured_stdout), patch('sys.stderr', captured_stderr):
with patch('litellm.completion') as mock_completion:
mock_completion.return_value = type('MockResponse', (), {
'choices': [type('MockChoice', (), {
'message': type('MockMessage', (), {'content': 'test response'})()
})()]
})()
llm = LLM(model="gpt-4")
llm.call([{"role": "user", "content": "test"}])
output = captured_stdout.getvalue() + captured_stderr.getvalue()
assert "litellm" not in output.lower(), "litellm output should still be suppressed during calls"
def test_concurrent_llm_calls():
"""Test that contextual suppression works correctly with concurrent calls."""
import threading
from crewai.llm import LLM
results = []
def make_llm_call():
with patch('litellm.completion') as mock_completion:
mock_completion.return_value = type('MockResponse', (), {
'choices': [type('MockChoice', (), {
'message': type('MockMessage', (), {'content': 'test response'})()
})()]
})()
llm = LLM(model="gpt-4")
result = llm.call([{"role": "user", "content": "test"}])
results.append(result)
threads = [threading.Thread(target=make_llm_call) for _ in range(3)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
assert len(results) == 3
assert all("test response" in result for result in results)
def test_logger_performance():
"""Test that logger operations work correctly without global caching."""
from crewai.llm import suppress_litellm_output
with patch('logging.getLogger') as mock_get_logger:
mock_logger = MagicMock()
mock_get_logger.return_value = mock_logger
with suppress_litellm_output():
pass
with suppress_litellm_output():
pass
assert mock_get_logger.call_count == 2
mock_get_logger.assert_called_with("litellm")
def test_suppression_error_handling():
"""Test that suppression continues even if logger operations fail."""
from crewai.llm import suppress_litellm_output
with patch('logging.getLogger') as mock_get_logger:
mock_logger = MagicMock()
mock_logger.setLevel.side_effect = Exception("Logger error")
mock_get_logger.return_value = mock_logger
try:
with suppress_litellm_output():
result = "operation completed"
assert result == "operation completed"
except Exception:
pytest.fail("Suppression should not fail even if logger operations fail")