Files
crewAI/lib/crewai/tests/telemetry/test_flow_crew_span_integration.py
Greyson LaLonde f2f994612c
Some checks failed
Notify Downstream / notify-downstream (push) Has been cancelled
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
Build uv cache / build-cache (3.10) (push) Has been cancelled
Build uv cache / build-cache (3.11) (push) Has been cancelled
Build uv cache / build-cache (3.12) (push) Has been cancelled
Build uv cache / build-cache (3.13) (push) Has been cancelled
Mark stale issues and pull requests / stale (push) Has been cancelled
fix: ensure otel span is closed
2025-12-05 13:23:26 -05:00

302 lines
9.0 KiB
Python

"""Test that crew execution spans work correctly when crews run inside flows.
Note: These tests use mocked LLM responses instead of VCR cassettes because
VCR's httpx async stubs have a known incompatibility with the OpenAI client
when running inside asyncio.run() (which Flow.kickoff() uses). The VCR
assertion `assert not hasattr(resp, "_decoder")` fails silently when the
OpenAI client reads responses before VCR can serialize them.
"""
import os
import threading
from unittest.mock import Mock
import pytest
from pydantic import BaseModel
from crewai import Agent, Crew, Task, LLM
from crewai.events.event_listener import EventListener
from crewai.flow.flow import Flow, listen, start
from crewai.telemetry import Telemetry
from crewai.types.usage_metrics import UsageMetrics
class SimpleState(BaseModel):
"""Simple state for flow testing."""
result: str = ""
def create_mock_llm() -> Mock:
"""Create a mock LLM that returns a simple response.
The mock includes all attributes required by the telemetry system,
particularly the 'model' attribute which is accessed during span creation.
"""
mock_llm = Mock(spec=LLM)
mock_llm.call.return_value = "Hello! This is a test response."
mock_llm.stop = []
mock_llm.model = "gpt-4o-mini" # Required by telemetry
mock_llm.supports_stop_words.return_value = True
mock_llm.get_token_usage_summary.return_value = UsageMetrics(
total_tokens=100,
prompt_tokens=50,
completion_tokens=50,
cached_prompt_tokens=0,
successful_requests=1,
)
return mock_llm
@pytest.fixture(autouse=True)
def enable_telemetry_for_tests():
"""Enable telemetry for these tests and reset singletons."""
from crewai.events.event_bus import crewai_event_bus
original_telemetry = os.environ.get("CREWAI_DISABLE_TELEMETRY")
original_otel = os.environ.get("OTEL_SDK_DISABLED")
os.environ["CREWAI_DISABLE_TELEMETRY"] = "false"
os.environ["OTEL_SDK_DISABLED"] = "false"
with crewai_event_bus._rwlock.w_locked():
crewai_event_bus._sync_handlers.clear()
crewai_event_bus._async_handlers.clear()
Telemetry._instance = None
EventListener._instance = None
if hasattr(Telemetry, "_lock"):
Telemetry._lock = threading.Lock()
yield
with crewai_event_bus._rwlock.w_locked():
crewai_event_bus._sync_handlers.clear()
crewai_event_bus._async_handlers.clear()
Telemetry._instance = None
EventListener._instance = None
if hasattr(Telemetry, "_lock"):
Telemetry._lock = threading.Lock()
if original_telemetry is not None:
os.environ["CREWAI_DISABLE_TELEMETRY"] = original_telemetry
else:
os.environ.pop("CREWAI_DISABLE_TELEMETRY", None)
if original_otel is not None:
os.environ["OTEL_SDK_DISABLED"] = original_otel
else:
os.environ.pop("OTEL_SDK_DISABLED", None)
def test_crew_execution_span_in_flow_with_share_crew():
"""Test that crew._execution_span is properly set when crew runs inside a flow.
This verifies that when a crew is kicked off inside a flow method with
share_crew=True, the execution span is properly assigned and closed without
errors.
"""
mock_llm = create_mock_llm()
class SampleFlow(Flow[SimpleState]):
@start()
def run_crew(self):
"""Run a crew inside the flow."""
agent = Agent(
role="test agent",
goal="say hello",
backstory="a friendly agent",
llm=mock_llm,
)
task = Task(
description="Say hello",
expected_output="hello",
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task],
share_crew=True,
)
result = crew.kickoff()
assert crew._execution_span is not None, (
"crew._execution_span should be set after kickoff even when "
"crew runs inside a flow method"
)
self.state.result = str(result.raw)
return self.state.result
flow = SampleFlow()
flow.kickoff()
assert flow.state.result != ""
mock_llm.call.assert_called()
def test_crew_execution_span_not_set_in_flow_without_share_crew():
"""Test that crew._execution_span is None when share_crew=False in flow.
Verifies that when a crew runs inside a flow with share_crew=False,
no execution span is created.
"""
mock_llm = create_mock_llm()
class SampleTestFlowNotSet(Flow[SimpleState]):
@start()
def run_crew(self):
"""Run a crew inside the flow without sharing."""
agent = Agent(
role="test agent",
goal="say hello",
backstory="a friendly agent",
llm=mock_llm,
)
task = Task(
description="Say hello",
expected_output="hello",
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task],
share_crew=False,
)
result = crew.kickoff()
assert (
not hasattr(crew, "_execution_span") or crew._execution_span is None
), "crew._execution_span should be None when share_crew=False"
self.state.result = str(result.raw)
return self.state.result
flow = SampleTestFlowNotSet()
flow.kickoff()
assert flow.state.result != ""
mock_llm.call.assert_called()
def test_multiple_crews_in_flow_span_lifecycle():
"""Test that multiple crews in a flow each get proper execution spans.
This ensures that when multiple crews are executed sequentially in different
flow methods, each crew gets its own execution span properly assigned and closed.
"""
mock_llm_1 = create_mock_llm()
mock_llm_1.call.return_value = "First crew result"
mock_llm_2 = create_mock_llm()
mock_llm_2.call.return_value = "Second crew result"
class SampleMultiCrewFlow(Flow[SimpleState]):
@start()
def first_crew(self):
"""Run first crew."""
agent = Agent(
role="first agent",
goal="first task",
backstory="first agent",
llm=mock_llm_1,
)
task = Task(
description="First task",
expected_output="first result",
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task],
share_crew=True,
)
result = crew.kickoff()
assert crew._execution_span is not None
return str(result.raw)
@listen(first_crew)
def second_crew(self, first_result: str):
"""Run second crew."""
agent = Agent(
role="second agent",
goal="second task",
backstory="second agent",
llm=mock_llm_2,
)
task = Task(
description="Second task",
expected_output="second result",
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task],
share_crew=True,
)
result = crew.kickoff()
assert crew._execution_span is not None
self.state.result = f"{first_result} + {result.raw}"
return self.state.result
flow = SampleMultiCrewFlow()
flow.kickoff()
assert flow.state.result != ""
assert "+" in flow.state.result
mock_llm_1.call.assert_called()
mock_llm_2.call.assert_called()
@pytest.mark.asyncio
async def test_crew_execution_span_in_async_flow():
"""Test that crew execution spans work in async flow methods.
Verifies that crews executed within async flow methods still properly
assign and close execution spans.
"""
mock_llm = create_mock_llm()
class AsyncTestFlow(Flow[SimpleState]):
@start()
async def run_crew_async(self):
"""Run a crew inside an async flow method."""
agent = Agent(
role="test agent",
goal="say hello",
backstory="a friendly agent",
llm=mock_llm,
)
task = Task(
description="Say hello",
expected_output="hello",
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task],
share_crew=True,
)
result = crew.kickoff()
assert crew._execution_span is not None, (
"crew._execution_span should be set in async flow method"
)
self.state.result = str(result.raw)
return self.state.result
flow = AsyncTestFlow()
await flow.kickoff_async()
assert flow.state.result != ""
mock_llm.call.assert_called()