Compare commits

..

2 Commits

Author SHA1 Message Date
Devin AI
1a36cbbb40 Update uv.lock after regeneration
Co-Authored-By: João <joao@crewai.com>
2025-10-21 14:08:22 +00:00
Devin AI
71ca89f535 Fix telemetry task_started double call causing app freeze (issue #3757)
The task_started method was calling _operation() twice when telemetry was enabled:
1. Once inside _safe_telemetry_operation()
2. Once again when returning the span

This caused duplicate span creation and could lead to the app freezing on 'thinking'
when using CrewAI Tracing, especially with slow network connections or telemetry
endpoint issues.

The fix removes the call to _safe_telemetry_operation() and directly calls
_operation() once, returning its result. This ensures spans are only created
once per task start.

Added comprehensive tests to verify:
- task_started only creates spans once when telemetry is enabled
- task_started returns None when telemetry is disabled

Co-Authored-By: João <joao@crewai.com>
2025-10-21 14:07:56 +00:00
6 changed files with 4059 additions and 4139 deletions

View File

@@ -1632,20 +1632,8 @@ class Crew(FlowTrackable, BaseModel):
def reset_knowledge(self, knowledges: list[Knowledge]) -> None:
"""Reset crew and agent knowledge storage."""
def _reset_single_knowledge(ks: Knowledge) -> None:
try:
ks.reset()
except Exception as e:
if "attempt to write a readonly database" in str(
e
) or "does not exist" in str(e):
# Ignore readonly database and collection not found errors (already reset)
pass
else:
raise
for ks in knowledges:
_reset_single_knowledge(ks)
ks.reset()
def _set_allow_crewai_trigger_context_for_first_task(self):
crewai_trigger_payload = self._inputs and self._inputs.get(

View File

@@ -94,16 +94,9 @@ class KnowledgeStorage(BaseKnowledgeStorage):
)
client.delete_collection(collection_name=collection_name)
except Exception as e:
if "attempt to write a readonly database" in str(
e
) or "does not exist" in str(e):
# Ignore readonly database and collection not found errors (already reset)
pass
else:
logging.error(
f"Error during knowledge reset: {e!s}\n{traceback.format_exc()}"
)
raise
logging.error(
f"Error during knowledge reset: {e!s}\n{traceback.format_exc()}"
)
def save(self, documents: list[str]) -> None:
try:

View File

@@ -448,7 +448,6 @@ class Telemetry:
if not self._should_execute_telemetry():
return None
self._safe_telemetry_operation(_operation)
return _operation()
def task_ended(self, span: Span, task: Task, crew: Crew) -> None:

View File

@@ -0,0 +1,102 @@
"""Test for issue #3757 - task_started method calls _operation twice causing freezing."""
import threading
from unittest.mock import MagicMock, patch
import pytest
from crewai import Agent, Crew, Task
from crewai.telemetry import Telemetry
@pytest.fixture(autouse=True)
def cleanup_telemetry():
"""Clean up telemetry singleton between tests."""
Telemetry._instance = None
if hasattr(Telemetry, "_lock"):
Telemetry._lock = threading.Lock()
yield
Telemetry._instance = None
if hasattr(Telemetry, "_lock"):
Telemetry._lock = threading.Lock()
@pytest.mark.telemetry
def test_task_started_does_not_call_operation_twice():
"""Test that task_started only calls _operation once when telemetry is enabled.
This test verifies the fix for issue #3757 where task_started was calling
_operation twice, causing the app to freeze when using CrewAI Tracing.
"""
with patch("crewai.telemetry.telemetry.TracerProvider"):
telemetry = Telemetry()
telemetry.ready = True
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
llm="gpt-4o-mini",
)
task = Task(
description="Test task",
expected_output="Test output",
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task],
name="TestCrew",
)
call_count = 0
original_start_span = None
def mock_start_span(name):
"""Mock start_span to count calls."""
nonlocal call_count
call_count += 1
span = MagicMock()
span.end = MagicMock()
return span
with patch("opentelemetry.trace.get_tracer") as mock_get_tracer:
mock_tracer = MagicMock()
mock_tracer.start_span = mock_start_span
mock_get_tracer.return_value = mock_tracer
span = telemetry.task_started(crew, task)
assert span is not None, "task_started should return a span when telemetry is enabled"
assert call_count == 2, f"Expected 2 spans (Task Created + Task Execution), but got {call_count}"
@pytest.mark.telemetry
def test_task_started_returns_none_when_disabled():
"""Test that task_started returns None when telemetry is disabled."""
with patch.dict("os.environ", {"CREWAI_DISABLE_TELEMETRY": "true"}):
telemetry = Telemetry()
agent = Agent(
role="Test Agent",
goal="Test goal",
backstory="Test backstory",
llm="gpt-4o-mini",
)
task = Task(
description="Test task",
expected_output="Test output",
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task],
name="TestCrew",
)
span = telemetry.task_started(crew, task)
assert span is None, "task_started should return None when telemetry is disabled"

View File

@@ -1,199 +0,0 @@
"""Test for crew reset_memories with readonly database error."""
from unittest.mock import MagicMock, patch
import pytest
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.knowledge.knowledge import Knowledge
from crewai.process import Process
from crewai.task import Task
@pytest.fixture
def researcher():
return Agent(
role="Researcher",
goal="Make the best research and analysis on content about AI and AI agents",
backstory="You're an expert researcher, specialized in technology",
allow_delegation=False,
)
@pytest.fixture
def writer():
return Agent(
role="Senior Writer",
goal="Write the best content about AI and AI agents.",
backstory="You're a senior writer, specialized in technology",
allow_delegation=False,
)
def test_reset_all_memories_with_readonly_database_error_on_agent_knowledge(
researcher, writer
):
"""Test that reset_memories('all') handles readonly database errors gracefully.
This test simulates the exact scenario from issue #3753 where calling
crew.reset_memories(command_type='all') fails with a "readonly database" error
when trying to reset agent knowledge.
"""
# Create mock knowledge objects for agents
mock_ks_research = MagicMock(spec=Knowledge)
mock_ks_writer = MagicMock(spec=Knowledge)
# Simulate readonly database error when resetting agent knowledge
mock_ks_research.reset.side_effect = Exception("attempt to write a readonly database")
mock_ks_writer.reset.side_effect = Exception("attempt to write a readonly database")
researcher.knowledge = mock_ks_research
writer.knowledge = mock_ks_writer
crew = Crew(
agents=[researcher, writer],
process=Process.sequential,
tasks=[
Task(description="Task 1", expected_output="output", agent=researcher),
Task(description="Task 2", expected_output="output", agent=writer),
],
)
# This should not raise an exception - the readonly database error should be handled gracefully
crew.reset_memories(command_type="all")
# Verify that reset was attempted on both knowledge objects
assert mock_ks_research.reset.call_count >= 1
assert mock_ks_writer.reset.call_count >= 1
def test_reset_all_memories_with_collection_not_found_error_on_agent_knowledge(
researcher, writer
):
"""Test that reset_memories('all') handles collection not found errors gracefully.
Similar to readonly database errors, collection not found errors should also
be handled gracefully as they indicate the collection is already reset.
"""
# Create mock knowledge objects for agents
mock_ks_research = MagicMock(spec=Knowledge)
mock_ks_writer = MagicMock(spec=Knowledge)
# Simulate collection not found error when resetting agent knowledge
mock_ks_research.reset.side_effect = Exception("Collection does not exist")
mock_ks_writer.reset.side_effect = Exception("Collection does not exist")
researcher.knowledge = mock_ks_research
writer.knowledge = mock_ks_writer
crew = Crew(
agents=[researcher, writer],
process=Process.sequential,
tasks=[
Task(description="Task 1", expected_output="output", agent=researcher),
Task(description="Task 2", expected_output="output", agent=writer),
],
)
# This should not raise an exception - the collection not found error should be handled gracefully
crew.reset_memories(command_type="all")
# Verify that reset was attempted on both knowledge objects
assert mock_ks_research.reset.call_count >= 1
assert mock_ks_writer.reset.call_count >= 1
def test_reset_agent_knowledge_with_readonly_database_error(researcher, writer):
"""Test that reset_memories('agent_knowledge') handles readonly database errors gracefully."""
# Create mock knowledge objects for agents
mock_ks_research = MagicMock(spec=Knowledge)
mock_ks_writer = MagicMock(spec=Knowledge)
# Simulate readonly database error when resetting agent knowledge
mock_ks_research.reset.side_effect = Exception("attempt to write a readonly database")
mock_ks_writer.reset.side_effect = Exception("attempt to write a readonly database")
researcher.knowledge = mock_ks_research
writer.knowledge = mock_ks_writer
crew = Crew(
agents=[researcher, writer],
process=Process.sequential,
tasks=[
Task(description="Task 1", expected_output="output", agent=researcher),
Task(description="Task 2", expected_output="output", agent=writer),
],
)
# This should not raise an exception - the readonly database error should be handled gracefully
crew.reset_memories(command_type="agent_knowledge")
# Verify that reset was attempted on both knowledge objects
mock_ks_research.reset.assert_called_once()
mock_ks_writer.reset.assert_called_once()
def test_reset_knowledge_with_readonly_database_error(researcher, writer):
"""Test that reset_memories('knowledge') handles readonly database errors gracefully."""
# Create mock knowledge objects
mock_ks_crew = MagicMock(spec=Knowledge)
mock_ks_research = MagicMock(spec=Knowledge)
mock_ks_writer = MagicMock(spec=Knowledge)
# Simulate readonly database error when resetting knowledge
mock_ks_crew.reset.side_effect = Exception("attempt to write a readonly database")
mock_ks_research.reset.side_effect = Exception("attempt to write a readonly database")
mock_ks_writer.reset.side_effect = Exception("attempt to write a readonly database")
researcher.knowledge = mock_ks_research
writer.knowledge = mock_ks_writer
crew = Crew(
agents=[researcher, writer],
process=Process.sequential,
tasks=[
Task(description="Task 1", expected_output="output", agent=researcher),
Task(description="Task 2", expected_output="output", agent=writer),
],
knowledge=mock_ks_crew,
)
# This should not raise an exception - the readonly database error should be handled gracefully
crew.reset_memories(command_type="knowledge")
# Verify that reset was attempted on all knowledge objects
mock_ks_crew.reset.assert_called_once()
mock_ks_research.reset.assert_called_once()
mock_ks_writer.reset.assert_called_once()
def test_reset_all_memories_with_unexpected_error_on_agent_knowledge(researcher, writer):
"""Test that reset_memories('all') propagates unexpected errors.
Only readonly database and collection not found errors should be ignored.
Other errors should be propagated.
"""
# Create mock knowledge objects for agents
mock_ks_research = MagicMock(spec=Knowledge)
# Simulate an unexpected error
mock_ks_research.reset.side_effect = Exception("Unexpected database error")
researcher.knowledge = mock_ks_research
crew = Crew(
agents=[researcher, writer],
process=Process.sequential,
tasks=[
Task(description="Task 1", expected_output="output", agent=researcher),
Task(description="Task 2", expected_output="output", agent=writer),
],
)
# This should raise an exception because it's not a readonly database or collection not found error
with pytest.raises(RuntimeError) as excinfo:
crew.reset_memories(command_type="all")
assert "Failed to reset" in str(excinfo.value)
assert "Unexpected database error" in str(excinfo.value)

7869
uv.lock generated

File diff suppressed because it is too large Load Diff