feat: add native OpenTelemetry instrumentation

Open spans directly on the user's thread so that stdlib log records
emitted during hot paths like `Crew.kickoff`, `BaseTool.run`, and
`LLM.call` carry the active trace context and correlate with the
spans they belong to — a gap the previous metrics-only telemetry
could not close. Introduces a `crewai.telemetry.otel` module
exposing `operation` and `follows_from`, instruments the execution
hot paths, and propagates the active context across every
parallel-dispatch site. Depends only on `opentelemetry-api` so
provider and exporter choice stays with the host application per the
standard OTel library pattern; without an installed SDK the
`ProxyTracer` keeps everything as a NoOp.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Lucas Gomide
2026-06-22 15:58:39 -03:00
parent 4cbfbdb232
commit d38130bfad
22 changed files with 1640 additions and 561 deletions

View File

@@ -72,6 +72,7 @@ from crewai.events.types.a2a_events import (
A2ADelegationStartedEvent,
A2AMessageSentEvent,
)
from crewai.telemetry.otel import operation
logger = logging.getLogger(__name__)
@@ -303,73 +304,81 @@ async def aexecute_a2a_delegation(
if turn_number is None:
turn_number = len([m for m in conversation_history if m.role == Role.user]) + 1
try:
result = await _aexecute_a2a_delegation_impl(
endpoint=endpoint,
auth=auth,
timeout=timeout,
task_description=task_description,
context=context,
context_id=context_id,
task_id=task_id,
reference_task_ids=reference_task_ids,
metadata=metadata,
extensions=extensions,
conversation_history=conversation_history,
is_multiturn=is_multiturn,
turn_number=turn_number,
agent_branch=agent_branch,
agent_id=agent_id,
agent_role=agent_role,
response_model=response_model,
updates=updates,
from_task=from_task,
from_agent=from_agent,
skill_id=skill_id,
client_extensions=client_extensions,
transport=transport,
accepted_output_modes=accepted_output_modes,
input_files=input_files,
)
except Exception as e:
with operation(
"a2a delegate",
{
"crewai.a2a.endpoint": endpoint,
"crewai.a2a.is_multiturn": is_multiturn,
"crewai.a2a.turn_number": turn_number,
},
):
try:
result = await _aexecute_a2a_delegation_impl(
endpoint=endpoint,
auth=auth,
timeout=timeout,
task_description=task_description,
context=context,
context_id=context_id,
task_id=task_id,
reference_task_ids=reference_task_ids,
metadata=metadata,
extensions=extensions,
conversation_history=conversation_history,
is_multiturn=is_multiturn,
turn_number=turn_number,
agent_branch=agent_branch,
agent_id=agent_id,
agent_role=agent_role,
response_model=response_model,
updates=updates,
from_task=from_task,
from_agent=from_agent,
skill_id=skill_id,
client_extensions=client_extensions,
transport=transport,
accepted_output_modes=accepted_output_modes,
input_files=input_files,
)
except Exception as e:
crewai_event_bus.emit(
agent_branch,
A2ADelegationCompletedEvent(
status="failed",
result=None,
error=str(e),
context_id=context_id,
is_multiturn=is_multiturn,
endpoint=endpoint,
metadata=metadata,
extensions=list(extensions.keys()) if extensions else None,
from_task=from_task,
from_agent=from_agent,
),
)
raise
agent_card_data = result.get("agent_card")
crewai_event_bus.emit(
agent_branch,
A2ADelegationCompletedEvent(
status="failed",
result=None,
error=str(e),
status=result["status"],
result=result.get("result"),
error=result.get("error"),
context_id=context_id,
is_multiturn=is_multiturn,
endpoint=endpoint,
a2a_agent_name=result.get("a2a_agent_name"),
agent_card=agent_card_data,
provider=agent_card_data.get("provider") if agent_card_data else None,
metadata=metadata,
extensions=list(extensions.keys()) if extensions else None,
from_task=from_task,
from_agent=from_agent,
),
)
raise
agent_card_data = result.get("agent_card")
crewai_event_bus.emit(
agent_branch,
A2ADelegationCompletedEvent(
status=result["status"],
result=result.get("result"),
error=result.get("error"),
context_id=context_id,
is_multiturn=is_multiturn,
endpoint=endpoint,
a2a_agent_name=result.get("a2a_agent_name"),
agent_card=agent_card_data,
provider=agent_card_data.get("provider") if agent_card_data else None,
metadata=metadata,
extensions=list(extensions.keys()) if extensions else None,
from_task=from_task,
from_agent=from_agent,
),
)
return result
return result
async def _aexecute_a2a_delegation_impl(

View File

@@ -85,6 +85,7 @@ from crewai.security.fingerprint import Fingerprint
from crewai.skills.loader import activate_skill, discover_skills
from crewai.skills.models import INSTRUCTIONS, Skill as SkillModel
from crewai.state.checkpoint_config import CheckpointConfig, apply_checkpoint
from crewai.telemetry.otel import operation
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.types.callback import SerializableCallable
from crewai.utilities.agent_utils import (
@@ -804,55 +805,62 @@ class Agent(BaseAgent):
ValueError: If the max execution time is not a positive integer.
RuntimeError: If the agent execution fails for other reasons.
"""
task_prompt = self._prepare_task_execution(task, context)
with operation(
"execute agent",
{
"crewai.agent.role": self.role or "",
"crewai.agent.id": str(self.id),
},
):
task_prompt = self._prepare_task_execution(task, context)
knowledge_config = get_knowledge_config(self)
task_prompt = handle_knowledge_retrieval(
self,
task,
task_prompt,
knowledge_config,
self.knowledge.query if self.knowledge else lambda *a, **k: None,
self.crew.query_knowledge
if self.crew and not isinstance(self.crew, str)
else lambda *a, **k: None,
)
task_prompt = self._finalize_task_prompt(task_prompt, tools, task)
try:
crewai_event_bus.emit(
knowledge_config = get_knowledge_config(self)
task_prompt = handle_knowledge_retrieval(
self,
event=AgentExecutionStartedEvent(
agent=self,
tools=self.tools,
task_prompt=task_prompt,
task=task,
),
task,
task_prompt,
knowledge_config,
self.knowledge.query if self.knowledge else lambda *a, **k: None,
self.crew.query_knowledge
if self.crew and not isinstance(self.crew, str)
else lambda *a, **k: None,
)
validate_max_execution_time(self.max_execution_time)
if self.max_execution_time is not None:
result = self._execute_with_timeout(
task_prompt, task, self.max_execution_time
task_prompt = self._finalize_task_prompt(task_prompt, tools, task)
try:
crewai_event_bus.emit(
self,
event=AgentExecutionStartedEvent(
agent=self,
tools=self.tools,
task_prompt=task_prompt,
task=task,
),
)
else:
result = self._execute_without_timeout(task_prompt, task)
except TimeoutError as e:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
except Exception as e:
result = self._handle_execution_error(e, task, context, tools)
validate_max_execution_time(self.max_execution_time)
if self.max_execution_time is not None:
result = self._execute_with_timeout(
task_prompt, task, self.max_execution_time
)
else:
result = self._execute_without_timeout(task_prompt, task)
return self._finalize_task_execution(task, result)
except TimeoutError as e:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
except Exception as e:
result = self._handle_execution_error(e, task, context, tools)
return self._finalize_task_execution(task, result)
def _execute_with_timeout(self, task_prompt: str, task: Task, timeout: int) -> Any:
"""Execute a task with a timeout.
@@ -940,48 +948,57 @@ class Agent(BaseAgent):
ValueError: If the max execution time is not a positive integer.
RuntimeError: If the agent execution fails for other reasons.
"""
task_prompt = self._prepare_task_execution(task, context)
with operation(
"execute agent",
{
"crewai.agent.role": self.role or "",
"crewai.agent.id": str(self.id),
},
):
task_prompt = self._prepare_task_execution(task, context)
knowledge_config = get_knowledge_config(self)
task_prompt = await ahandle_knowledge_retrieval(
self, task, task_prompt, knowledge_config
)
task_prompt = self._finalize_task_prompt(task_prompt, tools, task)
try:
crewai_event_bus.emit(
self,
event=AgentExecutionStartedEvent(
agent=self,
tools=self.tools,
task_prompt=task_prompt,
task=task,
),
knowledge_config = get_knowledge_config(self)
task_prompt = await ahandle_knowledge_retrieval(
self, task, task_prompt, knowledge_config
)
validate_max_execution_time(self.max_execution_time)
if self.max_execution_time is not None:
result = await self._aexecute_with_timeout(
task_prompt, task, self.max_execution_time
task_prompt = self._finalize_task_prompt(task_prompt, tools, task)
try:
crewai_event_bus.emit(
self,
event=AgentExecutionStartedEvent(
agent=self,
tools=self.tools,
task_prompt=task_prompt,
task=task,
),
)
else:
result = await self._aexecute_without_timeout(task_prompt, task)
except TimeoutError as e:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
except Exception as e:
result = await self._handle_execution_error_async(e, task, context, tools)
validate_max_execution_time(self.max_execution_time)
if self.max_execution_time is not None:
result = await self._aexecute_with_timeout(
task_prompt, task, self.max_execution_time
)
else:
result = await self._aexecute_without_timeout(task_prompt, task)
return self._finalize_task_execution(task, result)
except TimeoutError as e:
crewai_event_bus.emit(
self,
event=AgentExecutionErrorEvent(
agent=self,
task=task,
error=str(e),
),
)
raise e
except Exception as e:
result = await self._handle_execution_error_async(
e, task, context, tools
)
return self._finalize_task_execution(task, result)
async def _aexecute_with_timeout(
self, task_prompt: str, task: Task, timeout: int
@@ -1616,22 +1633,31 @@ class Agent(BaseAgent):
)
try:
if self.checkpoint_kickoff_event_id is not None:
self._kickoff_event_id = self.checkpoint_kickoff_event_id
self.checkpoint_kickoff_event_id = None
else:
started_event = LiteAgentExecutionStartedEvent(
agent_info=agent_info,
tools=parsed_tools,
messages=messages,
)
crewai_event_bus.emit(self, event=started_event)
self._kickoff_event_id = started_event.event_id
with operation(
"execute agent",
{
"crewai.agent.role": self.role or "",
"crewai.agent.id": str(self.id),
},
):
if self.checkpoint_kickoff_event_id is not None:
self._kickoff_event_id = self.checkpoint_kickoff_event_id
self.checkpoint_kickoff_event_id = None
else:
started_event = LiteAgentExecutionStartedEvent(
agent_info=agent_info,
tools=parsed_tools,
messages=messages,
)
crewai_event_bus.emit(self, event=started_event)
self._kickoff_event_id = started_event.event_id
output = self._execute_and_build_output(executor, inputs, response_format)
return self._finalize_kickoff(
output, executor, inputs, response_format, messages, agent_info
)
output = self._execute_and_build_output(
executor, inputs, response_format
)
return self._finalize_kickoff(
output, executor, inputs, response_format, messages, agent_info
)
except Exception as e:
self._emit_kickoff_error(agent_info, e)
@@ -1931,24 +1957,31 @@ class Agent(BaseAgent):
)
try:
if self.checkpoint_kickoff_event_id is not None:
self._kickoff_event_id = self.checkpoint_kickoff_event_id
self.checkpoint_kickoff_event_id = None
else:
started_event = LiteAgentExecutionStartedEvent(
agent_info=agent_info,
tools=parsed_tools,
messages=messages,
)
crewai_event_bus.emit(self, event=started_event)
self._kickoff_event_id = started_event.event_id
with operation(
"execute agent",
{
"crewai.agent.role": self.role or "",
"crewai.agent.id": str(self.id),
},
):
if self.checkpoint_kickoff_event_id is not None:
self._kickoff_event_id = self.checkpoint_kickoff_event_id
self.checkpoint_kickoff_event_id = None
else:
started_event = LiteAgentExecutionStartedEvent(
agent_info=agent_info,
tools=parsed_tools,
messages=messages,
)
crewai_event_bus.emit(self, event=started_event)
self._kickoff_event_id = started_event.event_id
output = await self._execute_and_build_output_async(
executor, inputs, response_format
)
return self._finalize_kickoff(
output, executor, inputs, response_format, messages, agent_info
)
output = await self._execute_and_build_output_async(
executor, inputs, response_format
)
return self._finalize_kickoff(
output, executor, inputs, response_format, messages, agent_info
)
except Exception as e:
self._emit_kickoff_error(agent_info, e)

View File

@@ -113,6 +113,7 @@ from crewai.state.checkpoint_config import (
from crewai.task import Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry.otel import operation
from crewai.tools.agent_tools.agent_tools import AgentTools
from crewai.tools.agent_tools.read_file_tool import ReadFileTool
from crewai.tools.base_tool import BaseTool
@@ -1032,25 +1033,29 @@ class Crew(FlowTrackable, BaseModel):
runtime_scope = crewai_event_bus._enter_runtime_scope()
try:
inputs = prepare_kickoff(self, inputs, input_files)
with operation(
"execute crew",
{"crewai.crew.name": self.name or "", "crewai.crew.id": str(self.id)},
):
inputs = prepare_kickoff(self, inputs, input_files)
if self.process == Process.sequential:
result = self._run_sequential_process()
elif self.process == Process.hierarchical:
result = self._run_hierarchical_process()
else:
raise NotImplementedError(
f"The process '{self.process}' is not implemented yet."
)
if self.process == Process.sequential:
result = self._run_sequential_process()
elif self.process == Process.hierarchical:
result = self._run_hierarchical_process()
else:
raise NotImplementedError(
f"The process '{self.process}' is not implemented yet."
)
for after_callback in self.after_kickoff_callbacks:
result = after_callback(result)
for after_callback in self.after_kickoff_callbacks:
result = after_callback(result)
result = self._post_kickoff(result)
result = self._post_kickoff(result)
self.usage_metrics = self.calculate_usage_metrics()
self.usage_metrics = self.calculate_usage_metrics()
return result
return result
except Exception as e:
crewai_event_bus.emit(
self,
@@ -1244,25 +1249,29 @@ class Crew(FlowTrackable, BaseModel):
runtime_scope = crewai_event_bus._enter_runtime_scope()
try:
inputs = prepare_kickoff(self, inputs, input_files)
with operation(
"execute crew",
{"crewai.crew.name": self.name or "", "crewai.crew.id": str(self.id)},
):
inputs = prepare_kickoff(self, inputs, input_files)
if self.process == Process.sequential:
result = await self._arun_sequential_process()
elif self.process == Process.hierarchical:
result = await self._arun_hierarchical_process()
else:
raise NotImplementedError(
f"The process '{self.process}' is not implemented yet."
)
if self.process == Process.sequential:
result = await self._arun_sequential_process()
elif self.process == Process.hierarchical:
result = await self._arun_hierarchical_process()
else:
raise NotImplementedError(
f"The process '{self.process}' is not implemented yet."
)
for after_callback in self.after_kickoff_callbacks:
result = after_callback(result)
for after_callback in self.after_kickoff_callbacks:
result = after_callback(result)
result = self._post_kickoff(result)
result = self._post_kickoff(result)
self.usage_metrics = self.calculate_usage_metrics()
self.usage_metrics = self.calculate_usage_metrics()
return result
return result
except Exception as e:
crewai_event_bus.emit(
self,

View File

@@ -56,10 +56,26 @@ from crewai.events.utils.handlers import (
is_call_handler_safe,
)
from crewai.utilities.rw_lock import RWLock
from opentelemetry import context as otel_context
logger = logging.getLogger(__name__)
async def _ctx_run_coro(ctx: otel_context.Context, coro: Any) -> Any:
"""Attach an OTel context for the duration of ``coro``.
``asyncio.run_coroutine_threadsafe`` schedules ``coro`` on a
different event loop with a fresh context; without re-attaching the
caller's OTel context the trace tree shears at every async dispatch.
"""
token = otel_context.attach(ctx)
try:
return await coro
finally:
otel_context.detach(token)
P = ParamSpec("P")
R = TypeVar("R")
@@ -615,10 +631,15 @@ class CrewAIEventsBus:
state = self._runtime_state
otel_ctx = otel_context.get_current()
if has_dependencies:
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._emit_with_dependencies(source, event, state),
_ctx_run_coro(
otel_ctx,
self._emit_with_dependencies(source, event, state),
),
self._loop,
)
)
@@ -637,7 +658,10 @@ class CrewAIEventsBus:
if async_handlers:
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._acall_handlers(source, event, async_handlers, state),
_ctx_run_coro(
otel_ctx,
self._acall_handlers(source, event, async_handlers, state),
),
self._loop,
)
)
@@ -699,12 +723,18 @@ class CrewAIEventsBus:
self._has_pending_events = True
state = self._runtime_state
otel_ctx = otel_context.get_current()
token = _replaying.set(True)
try:
if has_dependencies:
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._emit_with_dependencies_replaying(source, event, state),
_ctx_run_coro(
otel_ctx,
self._emit_with_dependencies_replaying(
source, event, state
),
),
self._loop,
)
)
@@ -720,8 +750,11 @@ class CrewAIEventsBus:
return self._track_future(
asyncio.run_coroutine_threadsafe(
self._acall_handlers_replaying(
source, event, async_handlers, state
_ctx_run_coro(
otel_ctx,
self._acall_handlers_replaying(
source, event, async_handlers, state
),
),
self._loop,
)

View File

@@ -136,6 +136,7 @@ from crewai.state.checkpoint_config import (
_coerce_checkpoint,
apply_checkpoint,
)
from crewai.telemetry.otel import operation
if TYPE_CHECKING:
@@ -1608,6 +1609,22 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
current_flow_id.reset(flow_id_token)
async def _resume_async_body(self, feedback: str = "") -> Any:
# Resume traces are causally related to the pause trace but not a
# parent-child relationship. Enterprise listeners can attach the
# FOLLOWS_FROM link via ``follows_from()`` when they record the
# paused span's trace/span IDs at pause time. We always open a
# fresh root span here; the link is opt-in.
with operation(
"resume flow",
{
"crewai.flow.name": self._definition.name,
"crewai.flow.id": self.flow_id,
},
expected_exceptions=(HumanFeedbackPending,),
):
return await self._resume_async_body_inner(feedback)
async def _resume_async_body_inner(self, feedback: str = "") -> Any:
if get_current_parent_id() is None:
reset_emission_counter()
reset_last_event_id()
@@ -2474,32 +2491,40 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
await self._replay_recorded_events()
try:
# Determine which start methods to execute at kickoff
# Conditional start methods are only triggered by their conditions
# UNLESS there are no unconditional starts (then all starts run as entry points)
start_methods = self._start_method_names()
unconditional_starts = [
start_method
for start_method in start_methods
if self._start_condition(start_method) is None
]
# If there are unconditional starts, only run those at kickoff
# If there are NO unconditional starts, run all starts (including conditional ones)
starts_to_execute = (
unconditional_starts if unconditional_starts else start_methods
)
starts_to_execute, run_starts_sequentially = (
self._order_start_methods_for_kickoff(starts_to_execute)
)
if run_starts_sequentially:
for start_method in starts_to_execute:
await self._execute_start_method(start_method)
else:
tasks = [
self._execute_start_method(start_method)
for start_method in starts_to_execute
with operation(
"execute flow",
{
"crewai.flow.name": self._definition.name,
"crewai.flow.id": self.flow_id,
},
expected_exceptions=(HumanFeedbackPending,),
):
# Determine which start methods to execute at kickoff
# Conditional start methods are only triggered by their conditions
# UNLESS there are no unconditional starts (then all starts run as entry points)
start_methods = self._start_method_names()
unconditional_starts = [
start_method
for start_method in start_methods
if self._start_condition(start_method) is None
]
await asyncio.gather(*tasks)
# If there are unconditional starts, only run those at kickoff
# If there are NO unconditional starts, run all starts (including conditional ones)
starts_to_execute = (
unconditional_starts if unconditional_starts else start_methods
)
starts_to_execute, run_starts_sequentially = (
self._order_start_methods_for_kickoff(starts_to_execute)
)
if run_starts_sequentially:
for start_method in starts_to_execute:
await self._execute_start_method(start_method)
else:
tasks = [
self._execute_start_method(start_method)
for start_method in starts_to_execute
]
await asyncio.gather(*tasks)
except Exception as e:
# Check if flow was paused for human feedback
if isinstance(e, HumanFeedbackPending):
@@ -2826,13 +2851,23 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta):
method_name_token = current_flow_method_name.set(method_name)
try:
if asyncio.iscoroutinefunction(method):
result = await method(*args, **kwargs)
else:
# Run sync methods in thread pool for isolation
# This allows Agent.kickoff() to work synchronously inside Flow methods
ctx = contextvars.copy_context()
result = await asyncio.to_thread(ctx.run, method, *args, **kwargs)
with operation(
"execute flow method",
{
"crewai.flow.name": self._definition.name,
"crewai.flow.method": str(method_name),
},
expected_exceptions=(HumanFeedbackPending,),
):
if asyncio.iscoroutinefunction(method):
result = await method(*args, **kwargs)
else:
# Run sync methods in thread pool for isolation
# This allows Agent.kickoff() to work synchronously inside Flow methods
ctx = contextvars.copy_context()
result = await asyncio.to_thread(
ctx.run, method, *args, **kwargs
)
finally:
current_flow_method_name.reset(method_name_token)

View File

@@ -18,6 +18,7 @@ from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
from crewai.rag.core.base_embeddings_provider import BaseEmbeddingsProvider
from crewai.rag.embeddings.types import EmbedderConfig
from crewai.rag.types import SearchResult
from crewai.telemetry.otel import operation
_KNOWN_SOURCES: dict[str, type[BaseKnowledgeSource]] = {
@@ -145,11 +146,15 @@ class Knowledge(BaseModel):
if self.storage is None:
raise ValueError("Storage is not initialized.")
return self.storage.search(
query,
limit=results_limit,
score_threshold=score_threshold,
)
with operation(
"query knowledge",
{"crewai.knowledge.sources": len(self.sources)},
):
return self.storage.search(
query,
limit=results_limit,
score_threshold=score_threshold,
)
def add_sources(self) -> None:
try:
@@ -183,11 +188,15 @@ class Knowledge(BaseModel):
if self.storage is None:
raise ValueError("Storage is not initialized.")
return await self.storage.asearch(
query,
limit=results_limit,
score_threshold=score_threshold,
)
with operation(
"query knowledge",
{"crewai.knowledge.sources": len(self.sources)},
):
return await self.storage.asearch(
query,
limit=results_limit,
score_threshold=score_threshold,
)
async def aadd_sources(self) -> None:
"""Add all knowledge sources to storage asynchronously."""

View File

@@ -45,6 +45,7 @@ from crewai.llms.constants import (
GEMINI_MODELS,
OPENAI_MODELS,
)
from crewai.telemetry.otel import operation
from crewai.utilities import InternalInstructor
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError,
@@ -1813,7 +1814,9 @@ class LLM(BaseLLM):
ValueError: If response format is not supported
LLMContextLengthExceededError: If input exceeds model's context limit
"""
with llm_call_context():
with llm_call_context(), operation(
"call llm", {"crewai.llm.model": self.model}
):
self._emit_call_started_event(
messages=messages,
tools=tools,
@@ -1952,7 +1955,9 @@ class LLM(BaseLLM):
ValueError: If response format is not supported
LLMContextLengthExceededError: If input exceeds model's context limit
"""
with llm_call_context():
with llm_call_context(), operation(
"call llm", {"crewai.llm.model": self.model}
):
self._emit_call_started_event(
messages=messages,
tools=tools,

View File

@@ -12,6 +12,7 @@ from crewai.llms.base_llm import BaseLLM, JsonResponseFormat, llm_call_context
from crewai.llms.hooks.base import BaseInterceptor
from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport
from crewai.llms.providers.utils.common import safe_tool_conversion
from crewai.telemetry.otel import operation
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError,
@@ -297,7 +298,9 @@ class AnthropicCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
with llm_call_context():
with llm_call_context(), operation(
"call llm", {"crewai.llm.model": self.model}
):
try:
self._emit_call_started_event(
messages=messages,
@@ -372,7 +375,9 @@ class AnthropicCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
with llm_call_context():
with llm_call_context(), operation(
"call llm", {"crewai.llm.model": self.model}
):
try:
self._emit_call_started_event(
messages=messages,

View File

@@ -11,6 +11,7 @@ from typing_extensions import Self
from crewai.llms._finish_reason_utils import extract_choices_finish_reason_and_id
from crewai.llms.hooks.base import BaseInterceptor
from crewai.telemetry.otel import operation
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError,
@@ -503,7 +504,9 @@ class AzureCompletion(BaseLLM):
response_model=response_model,
)
with llm_call_context():
with llm_call_context(), operation(
"call llm", {"crewai.llm.model": self.model}
):
try:
self._emit_call_started_event(
messages=messages,
@@ -582,7 +585,9 @@ class AzureCompletion(BaseLLM):
response_model=response_model,
)
with llm_call_context():
with llm_call_context(), operation(
"call llm", {"crewai.llm.model": self.model}
):
try:
self._emit_call_started_event(
messages=messages,

View File

@@ -13,6 +13,7 @@ from typing_extensions import Required
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.llms.providers.utils.common import safe_tool_conversion
from crewai.telemetry.otel import operation
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError,
@@ -362,7 +363,9 @@ class BedrockCompletion(BaseLLM):
"""Call AWS Bedrock Converse API."""
effective_response_model = response_model or self.response_format
with llm_call_context():
with llm_call_context(), operation(
"call llm", {"crewai.llm.model": self.model}
):
try:
self._emit_call_started_event(
messages=messages,
@@ -495,7 +498,9 @@ class BedrockCompletion(BaseLLM):
'Install with: uv add "crewai[bedrock-async]"'
)
with llm_call_context():
with llm_call_context(), operation(
"call llm", {"crewai.llm.model": self.model}
):
try:
self._emit_call_started_event(
messages=messages,

View File

@@ -12,6 +12,7 @@ from pydantic import BaseModel, Field, PrivateAttr, model_validator
from crewai.events.types.llm_events import LLMCallType
from crewai.llms.base_llm import BaseLLM, llm_call_context
from crewai.llms.hooks.base import BaseInterceptor
from crewai.telemetry.otel import operation
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError,
@@ -294,7 +295,9 @@ class GeminiCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
with llm_call_context():
with llm_call_context(), operation(
"call llm", {"crewai.llm.model": self.model}
):
try:
self._emit_call_started_event(
messages=messages,
@@ -380,7 +383,9 @@ class GeminiCompletion(BaseLLM):
Returns:
Chat completion response or tool call result
"""
with llm_call_context():
with llm_call_context(), operation(
"call llm", {"crewai.llm.model": self.model}
):
try:
self._emit_call_started_event(
messages=messages,

View File

@@ -34,6 +34,7 @@ from crewai.llms.base_llm import BaseLLM, JsonResponseFormat, llm_call_context
from crewai.llms.hooks.base import BaseInterceptor
from crewai.llms.hooks.transport import AsyncHTTPTransport, HTTPTransport
from crewai.llms.providers.utils.common import safe_tool_conversion
from crewai.telemetry.otel import operation
from crewai.utilities.agent_utils import is_context_length_exceeded
from crewai.utilities.exceptions.context_window_exceeding_exception import (
LLMContextLengthExceededError,
@@ -410,7 +411,9 @@ class OpenAICompletion(BaseLLM):
Returns:
Completion response or tool call result.
"""
with llm_call_context():
with llm_call_context(), operation(
"call llm", {"crewai.llm.model": self.model}
):
try:
self._emit_call_started_event(
messages=messages,
@@ -510,7 +513,9 @@ class OpenAICompletion(BaseLLM):
Returns:
Completion response or tool call result.
"""
with llm_call_context():
with llm_call_context(), operation(
"call llm", {"crewai.llm.model": self.model}
):
try:
self._emit_call_started_event(
messages=messages,

View File

@@ -36,6 +36,7 @@ from crewai.memory.types import (
from crewai.memory.utils import join_scope_paths
from crewai.rag.embeddings.factory import build_embedder
from crewai.rag.embeddings.providers.openai.types import OpenAIProviderSpec
from crewai.telemetry.otel import operation
if TYPE_CHECKING:
@@ -471,43 +472,46 @@ class Memory(BaseModel):
_source_type = "unified_memory"
try:
crewai_event_bus.emit(
self,
MemorySaveStartedEvent(
value=content,
metadata=metadata,
source_type=_source_type,
),
)
start = time.perf_counter()
with operation(
"remember memory",
{"crewai.memory.source_type": _source_type},
):
crewai_event_bus.emit(
self,
MemorySaveStartedEvent(
value=content,
metadata=metadata,
source_type=_source_type,
),
)
start = time.perf_counter()
# Submit through the save pool for proper serialization,
future = self._submit_save(
self._encode_batch,
[content],
scope,
categories,
metadata,
importance,
source,
private,
effective_root,
)
records = future.result()
record = records[0] if records else None
future = self._submit_save(
self._encode_batch,
[content],
scope,
categories,
metadata,
importance,
source,
private,
effective_root,
)
records = future.result()
record = records[0] if records else None
elapsed_ms = (time.perf_counter() - start) * 1000
crewai_event_bus.emit(
self,
MemorySaveCompletedEvent(
value=content,
metadata=metadata or {},
agent_role=agent_role,
save_time_ms=elapsed_ms,
source_type=_source_type,
),
)
return record
elapsed_ms = (time.perf_counter() - start) * 1000
crewai_event_bus.emit(
self,
MemorySaveCompletedEvent(
value=content,
metadata=metadata or {},
agent_role=agent_role,
save_time_ms=elapsed_ms,
source_type=_source_type,
),
)
return record
except Exception as e:
crewai_event_bus.emit(
self,
@@ -720,88 +724,97 @@ class Memory(BaseModel):
_source = "unified_memory"
try:
crewai_event_bus.emit(
self,
MemoryQueryStartedEvent(
query=query,
limit=limit,
score_threshold=None,
source_type=_source,
),
)
start = time.perf_counter()
if depth == "shallow":
embedding = embed_text(self._embedder, query)
if not embedding:
results: list[MemoryMatch] = []
else:
raw = self._storage.search(
embedding,
scope_prefix=effective_scope,
categories=categories,
with operation(
"recall memory",
{
"crewai.memory.depth": depth,
"crewai.memory.source_type": _source,
},
):
crewai_event_bus.emit(
self,
MemoryQueryStartedEvent(
query=query,
limit=limit,
min_score=0.0,
)
if not include_private:
raw = [
(r, s)
for r, s in raw
if not r.private or r.source == source
]
results = []
for r, s in raw:
composite, reasons = compute_composite_score(r, s, self._config)
results.append(
MemoryMatch(
record=r,
score=composite,
match_reasons=reasons,
)
score_threshold=None,
source_type=_source,
),
)
start = time.perf_counter()
if depth == "shallow":
embedding = embed_text(self._embedder, query)
if not embedding:
results: list[MemoryMatch] = []
else:
raw = self._storage.search(
embedding,
scope_prefix=effective_scope,
categories=categories,
limit=limit,
min_score=0.0,
)
results.sort(key=lambda m: m.score, reverse=True)
else:
from crewai.memory.recall_flow import RecallFlow
if not include_private:
raw = [
(r, s)
for r, s in raw
if not r.private or r.source == source
]
results = []
for r, s in raw:
composite, reasons = compute_composite_score(
r, s, self._config
)
results.append(
MemoryMatch(
record=r,
score=composite,
match_reasons=reasons,
)
)
results.sort(key=lambda m: m.score, reverse=True)
else:
from crewai.memory.recall_flow import RecallFlow
flow = RecallFlow(
storage=self._storage,
llm=self._llm,
embedder=self._embedder,
config=self._config,
flow = RecallFlow(
storage=self._storage,
llm=self._llm,
embedder=self._embedder,
config=self._config,
)
flow.kickoff(
inputs={
"query": query,
"scope": effective_scope,
"categories": categories or [],
"limit": limit,
"source": source,
"include_private": include_private,
}
)
results = flow.state.final_results
if results:
try:
touch = getattr(self._storage, "touch_records", None)
if touch is not None:
touch([m.record.id for m in results])
except Exception: # noqa: S110
pass # Non-critical: don't fail recall because of touch
elapsed_ms = (time.perf_counter() - start) * 1000
crewai_event_bus.emit(
self,
MemoryQueryCompletedEvent(
query=query,
results=results,
limit=limit,
score_threshold=None,
query_time_ms=elapsed_ms,
source_type=_source,
),
)
flow.kickoff(
inputs={
"query": query,
"scope": effective_scope,
"categories": categories or [],
"limit": limit,
"source": source,
"include_private": include_private,
}
)
results = flow.state.final_results
if results:
try:
touch = getattr(self._storage, "touch_records", None)
if touch is not None:
touch([m.record.id for m in results])
except Exception: # noqa: S110
pass # Non-critical: don't fail recall because of touch
elapsed_ms = (time.perf_counter() - start) * 1000
crewai_event_bus.emit(
self,
MemoryQueryCompletedEvent(
query=query,
results=results,
limit=limit,
score_threshold=None,
query_time_ms=elapsed_ms,
source_type=_source,
),
)
return results
return results
except Exception as e:
crewai_event_bus.emit(
self,

View File

@@ -51,6 +51,7 @@ from crewai.llms.providers.openai.completion import OpenAICompletion
from crewai.security import Fingerprint, SecurityConfig
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry.otel import operation
from crewai.tools.base_tool import BaseTool
from crewai.utilities.config import process_config
from crewai.utilities.constants import NOT_SPECIFIED, _NotSpecified
@@ -644,113 +645,122 @@ class Task(BaseModel):
task_id_token = set_current_task_id(str(self.id))
self._store_input_files()
try:
agent = agent or self.agent
self.agent = agent
if not agent:
raise Exception(
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical."
)
self.prompt_context = context
tools = tools or self.tools or []
self.processed_by_agents.add(agent.role)
executor = agent.agent_executor
if not (
executor and executor._resuming and resume_task_scope(str(self.id))
with operation(
"execute task",
{
"crewai.task.name": self.name or "",
"crewai.task.id": str(self.id),
},
):
crewai_event_bus.emit(
self, TaskStartedEvent(context=context, task=self)
agent = agent or self.agent
self.agent = agent
if not agent:
raise Exception(
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical."
)
self.prompt_context = context
tools = tools or self.tools or []
self.processed_by_agents.add(agent.role)
executor = agent.agent_executor
if not (
executor and executor._resuming and resume_task_scope(str(self.id))
):
crewai_event_bus.emit(
self, TaskStartedEvent(context=context, task=self)
)
result = await agent.aexecute_task(
task=self,
context=context,
tools=tools,
)
result = await agent.aexecute_task(
task=self,
context=context,
tools=tools,
)
self._post_agent_execution(agent)
self._post_agent_execution(agent)
if isinstance(result, BaseModel):
raw = result.model_dump_json()
if self.output_pydantic:
pydantic_output = result
json_output = None
elif self.output_json:
pydantic_output = None
json_output = result.model_dump()
if isinstance(result, BaseModel):
raw = result.model_dump_json()
if self.output_pydantic:
pydantic_output = result
json_output = None
elif self.output_json:
pydantic_output = None
json_output = result.model_dump()
else:
pydantic_output = None
json_output = None
elif not self._guardrails and not self._guardrail:
raw = result
pydantic_output, json_output = await self._aexport_output(result)
else:
pydantic_output = None
json_output = None
elif not self._guardrails and not self._guardrail:
raw = result
pydantic_output, json_output = await self._aexport_output(result)
else:
raw = result
pydantic_output, json_output = None, None
raw = result
pydantic_output, json_output = None, None
task_output = TaskOutput(
name=self.name or self.description,
description=self.description,
expected_output=self.expected_output,
raw=raw,
pydantic=pydantic_output,
json_dict=json_output,
agent=agent.role,
output_format=self._get_output_format(),
messages=agent.last_messages, # type: ignore[attr-defined]
)
task_output = TaskOutput(
name=self.name or self.description,
description=self.description,
expected_output=self.expected_output,
raw=raw,
pydantic=pydantic_output,
json_dict=json_output,
agent=agent.role,
output_format=self._get_output_format(),
messages=agent.last_messages, # type: ignore[attr-defined]
)
if self._guardrails:
for idx, guardrail in enumerate(self._guardrails):
if self._guardrails:
for idx, guardrail in enumerate(self._guardrails):
task_output = await self._ainvoke_guardrail_function(
task_output=task_output,
agent=agent,
tools=tools,
guardrail=guardrail,
guardrail_index=idx,
)
if self._guardrail:
task_output = await self._ainvoke_guardrail_function(
task_output=task_output,
agent=agent,
tools=tools,
guardrail=guardrail,
guardrail_index=idx,
guardrail=self._guardrail,
)
if self._guardrail:
task_output = await self._ainvoke_guardrail_function(
task_output=task_output,
agent=agent,
tools=tools,
guardrail=self._guardrail,
)
self.output = task_output
self.end_time = datetime.datetime.now()
self.output = task_output
self.end_time = datetime.datetime.now()
if self.callback:
cb_result = self.callback(self.output)
if inspect.isawaitable(cb_result):
await cb_result
if self.callback:
cb_result = self.callback(self.output)
if inspect.isawaitable(cb_result):
await cb_result
crew = self.agent.crew # type: ignore[union-attr]
if (
crew
and not isinstance(crew, str)
and crew.task_callback
and crew.task_callback != self.callback
):
cb_result = crew.task_callback(self.output)
if inspect.isawaitable(cb_result):
await cb_result
crew = self.agent.crew # type: ignore[union-attr]
if (
crew
and not isinstance(crew, str)
and crew.task_callback
and crew.task_callback != self.callback
):
cb_result = crew.task_callback(self.output)
if inspect.isawaitable(cb_result):
await cb_result
if self.output_file:
content = (
json_output
if json_output
else (
pydantic_output.model_dump_json() if pydantic_output else result
if self.output_file:
content = (
json_output
if json_output
else (
pydantic_output.model_dump_json()
if pydantic_output
else result
)
)
self._save_file(content)
crewai_event_bus.emit(
self,
TaskCompletedEvent(output=task_output, task=self),
)
self._save_file(content)
crewai_event_bus.emit(
self,
TaskCompletedEvent(output=task_output, task=self),
)
return task_output
return task_output
except Exception as e:
self.end_time = datetime.datetime.now()
crewai_event_bus.emit(self, TaskFailedEvent(error=str(e), task=self))
@@ -769,113 +779,122 @@ class Task(BaseModel):
task_id_token = set_current_task_id(str(self.id))
self._store_input_files()
try:
agent = agent or self.agent
self.agent = agent
if not agent:
raise Exception(
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical."
)
self.prompt_context = context
tools = tools or self.tools or []
self.processed_by_agents.add(agent.role)
executor = agent.agent_executor
if not (
executor and executor._resuming and resume_task_scope(str(self.id))
with operation(
"execute task",
{
"crewai.task.name": self.name or "",
"crewai.task.id": str(self.id),
},
):
crewai_event_bus.emit(
self, TaskStartedEvent(context=context, task=self)
agent = agent or self.agent
self.agent = agent
if not agent:
raise Exception(
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical."
)
self.prompt_context = context
tools = tools or self.tools or []
self.processed_by_agents.add(agent.role)
executor = agent.agent_executor
if not (
executor and executor._resuming and resume_task_scope(str(self.id))
):
crewai_event_bus.emit(
self, TaskStartedEvent(context=context, task=self)
)
result = agent.execute_task(
task=self,
context=context,
tools=tools,
)
result = agent.execute_task(
task=self,
context=context,
tools=tools,
)
self._post_agent_execution(agent)
self._post_agent_execution(agent)
if isinstance(result, BaseModel):
raw = result.model_dump_json()
if self.output_pydantic:
pydantic_output = result
json_output = None
elif self.output_json:
pydantic_output = None
json_output = result.model_dump()
if isinstance(result, BaseModel):
raw = result.model_dump_json()
if self.output_pydantic:
pydantic_output = result
json_output = None
elif self.output_json:
pydantic_output = None
json_output = result.model_dump()
else:
pydantic_output = None
json_output = None
elif not self._guardrails and not self._guardrail:
raw = result
pydantic_output, json_output = self._export_output(result)
else:
pydantic_output = None
json_output = None
elif not self._guardrails and not self._guardrail:
raw = result
pydantic_output, json_output = self._export_output(result)
else:
raw = result
pydantic_output, json_output = None, None
raw = result
pydantic_output, json_output = None, None
task_output = TaskOutput(
name=self.name or self.description,
description=self.description,
expected_output=self.expected_output,
raw=raw,
pydantic=pydantic_output,
json_dict=json_output,
agent=agent.role,
output_format=self._get_output_format(),
messages=agent.last_messages, # type: ignore[attr-defined]
)
task_output = TaskOutput(
name=self.name or self.description,
description=self.description,
expected_output=self.expected_output,
raw=raw,
pydantic=pydantic_output,
json_dict=json_output,
agent=agent.role,
output_format=self._get_output_format(),
messages=agent.last_messages, # type: ignore[attr-defined]
)
if self._guardrails:
for idx, guardrail in enumerate(self._guardrails):
if self._guardrails:
for idx, guardrail in enumerate(self._guardrails):
task_output = self._invoke_guardrail_function(
task_output=task_output,
agent=agent,
tools=tools,
guardrail=guardrail,
guardrail_index=idx,
)
if self._guardrail:
task_output = self._invoke_guardrail_function(
task_output=task_output,
agent=agent,
tools=tools,
guardrail=guardrail,
guardrail_index=idx,
guardrail=self._guardrail,
)
if self._guardrail:
task_output = self._invoke_guardrail_function(
task_output=task_output,
agent=agent,
tools=tools,
guardrail=self._guardrail,
)
self.output = task_output
self.end_time = datetime.datetime.now()
self.output = task_output
self.end_time = datetime.datetime.now()
if self.callback:
cb_result = self.callback(self.output)
if inspect.iscoroutine(cb_result):
asyncio.run(cb_result)
if self.callback:
cb_result = self.callback(self.output)
if inspect.iscoroutine(cb_result):
asyncio.run(cb_result)
crew = self.agent.crew # type: ignore[union-attr]
if (
crew
and not isinstance(crew, str)
and crew.task_callback
and crew.task_callback != self.callback
):
cb_result = crew.task_callback(self.output)
if inspect.iscoroutine(cb_result):
asyncio.run(cb_result)
crew = self.agent.crew # type: ignore[union-attr]
if (
crew
and not isinstance(crew, str)
and crew.task_callback
and crew.task_callback != self.callback
):
cb_result = crew.task_callback(self.output)
if inspect.iscoroutine(cb_result):
asyncio.run(cb_result)
if self.output_file:
content = (
json_output
if json_output
else (
pydantic_output.model_dump_json() if pydantic_output else result
if self.output_file:
content = (
json_output
if json_output
else (
pydantic_output.model_dump_json()
if pydantic_output
else result
)
)
self._save_file(content)
crewai_event_bus.emit(
self,
TaskCompletedEvent(output=task_output, task=self),
)
self._save_file(content)
crewai_event_bus.emit(
self,
TaskCompletedEvent(output=task_output, task=self),
)
return task_output
return task_output
except Exception as e:
self.end_time = datetime.datetime.now()
crewai_event_bus.emit(self, TaskFailedEvent(error=str(e), task=self))

View File

@@ -12,6 +12,7 @@ from crewai.agent import Agent
from crewai.lite_agent_output import LiteAgentOutput
from crewai.llms.base_llm import BaseLLM
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry.otel import operation
def _is_coroutine(
@@ -108,12 +109,18 @@ class LLMGuardrail:
"""
try:
result = self._validate_output(task_output)
if not isinstance(result.pydantic, LLMGuardrailResult):
raise ValueError("The guardrail result is not a valid pydantic model")
with operation(
"guard llm",
{"crewai.guardrail.type": "llm"},
):
result = self._validate_output(task_output)
if not isinstance(result.pydantic, LLMGuardrailResult):
raise ValueError(
"The guardrail result is not a valid pydantic model"
)
if result.pydantic.valid:
return True, task_output.raw
return False, result.pydantic.feedback
if result.pydantic.valid:
return True, task_output.raw
return False, result.pydantic.feedback
except Exception as e:
return False, f"Error while validating the task output: {e!s}"

View File

@@ -1,4 +1,5 @@
from crewai.telemetry.otel import follows_from, operation
from crewai.telemetry.telemetry import Telemetry
__all__ = ["Telemetry"]
__all__ = ["Telemetry", "follows_from", "operation"]

View File

@@ -0,0 +1,134 @@
"""Native OpenTelemetry instrumentation surface for crewAI.
This module exposes a thin wrapper over the OpenTelemetry API.
crewAI emits spans through :func:`operation` for kickoffs, tasks, agents,
tools, LLM calls, memory, knowledge, MCP, and A2A delegation.
"""
from __future__ import annotations
from collections.abc import Iterator
from contextlib import contextmanager
from typing import Any
from opentelemetry import trace
from opentelemetry.trace import (
Link,
Span,
SpanContext,
Status,
StatusCode,
TraceFlags,
)
_TRACER_NAME = "crewai"
def _tracer() -> trace.Tracer:
"""Resolve the crewAI tracer from the current global provider.
Always re-resolves so user code that installs a TracerProvider after
crewAI is imported still gets recording spans.
"""
return trace.get_tracer(_TRACER_NAME)
@contextmanager
def operation(
name: str,
attributes: dict[str, Any] | None = None,
*,
links: list[Link] | None = None,
expected_exceptions: tuple[type[BaseException], ...] = (),
) -> Iterator[Span]:
"""Open a span around an operation.
Any :class:`Exception` escaping the block is recorded as an
``exception`` event and the span status is set to ``ERROR``.
``BaseException`` subclasses outside :class:`Exception`
(:class:`KeyboardInterrupt`, :class:`SystemExit`,
:class:`asyncio.CancelledError`, :class:`GeneratorExit`) pass through
unrecorded — they're control flow, not failures.
Args:
name: Span name (e.g. ``"execute crew"``). Follow the
``"<verb> <subject>"`` convention used elsewhere in this module.
attributes: Optional dict of attributes to set on span start.
Keys should follow the ``crewai.<component>.<field>`` pattern.
links: Optional list of :class:`Link` references. Used for
HITL resume to relate the resumed trace back to the paused one
via :func:`follows_from`.
expected_exceptions: Exception types that represent expected
control flow rather than failures (e.g.
:class:`HumanFeedbackPending`). When the block raises one of
these the span's status stays ``UNSET``, no ``exception``
event is recorded, and the exception re-raises so the caller
sees normal control flow.
Yields:
The active :class:`Span`. Callers may attach additional
attributes or events to it as the operation progresses.
"""
with _tracer().start_as_current_span(
name,
attributes=attributes or {},
links=links or [],
record_exception=False,
set_status_on_exception=False,
) as span:
try:
yield span
except expected_exceptions:
raise
except Exception as exc:
span.record_exception(exc, escaped=True)
span.set_status(
Status(StatusCode.ERROR, f"{type(exc).__name__}: {exc}")
)
raise
def follows_from(
trace_id: int,
span_id: int,
*,
is_remote: bool = False,
trace_flags: TraceFlags | None = None,
) -> Link:
"""Build a FOLLOWS_FROM-style :class:`Link` for HITL resume continuity.
Args:
trace_id: Trace ID of the paused operation's span.
span_id: Span ID of the paused operation's span.
is_remote: Whether the linked span context came from outside this
process. Default ``False`` matches crewAI OSS's in-process
resume flow (same Python process pauses and resumes). Cross-
process resumers (e.g. an enterprise Celery worker that picks
up a flow paused by a different worker) should pass ``True``
so backends render the edge as crossing a process boundary
and so samplers treat the parent context as an inbound
carrier rather than a local span.
trace_flags: Optional :class:`TraceFlags` for the linked span.
Default ``None`` resolves to ``TraceFlags.SAMPLED`` so backends
render the link reliably even when the original sampling
decision was not persisted. Callers that persist the
original flags at pause time should pass them here.
Returns:
A :class:`Link` carrying a :class:`SpanContext` for the paused
span, suitable to pass via the ``links=`` kwarg of
:func:`operation`.
"""
span_ctx = SpanContext(
trace_id=trace_id,
span_id=span_id,
is_remote=is_remote,
trace_flags=trace_flags
if trace_flags is not None
else TraceFlags(TraceFlags.SAMPLED),
)
return Link(span_ctx, attributes={"crewai.link.type": "follows_from"})
__all__ = ["follows_from", "operation"]

View File

@@ -30,6 +30,7 @@ from pydantic import (
from pydantic_core import CoreSchema, core_schema
from typing_extensions import TypeIs
from crewai.telemetry.otel import operation
from crewai.tools.structured_tool import (
CrewStructuredTool,
_deserialize_schema,
@@ -323,12 +324,13 @@ class BaseTool(BaseModel, ABC):
if limit_error:
return limit_error
result = self._run(*args, **kwargs)
with operation("call tool", {"crewai.tool.name": self.name}):
result = self._run(*args, **kwargs)
if asyncio.iscoroutine(result):
result = asyncio.run(result)
if asyncio.iscoroutine(result):
result = asyncio.run(result)
return result
return result
async def arun(
self,
@@ -351,7 +353,8 @@ class BaseTool(BaseModel, ABC):
if limit_error:
return limit_error
return await self._arun(*args, **kwargs)
with operation("call tool", {"crewai.tool.name": self.name}):
return await self._arun(*args, **kwargs)
async def _arun(
self,
@@ -521,12 +524,13 @@ class Tool(BaseTool, Generic[P, R]):
if limit_error:
return limit_error # type: ignore[return-value]
result = self.func(*args, **kwargs)
with operation("call tool", {"crewai.tool.name": self.name}):
result = self.func(*args, **kwargs)
if asyncio.iscoroutine(result):
result = asyncio.run(result)
if asyncio.iscoroutine(result):
result = asyncio.run(result)
return result # type: ignore[return-value]
return result # type: ignore[return-value]
def _run(self, *args: P.args, **kwargs: P.kwargs) -> R:
"""Executes the wrapped function.
@@ -557,7 +561,8 @@ class Tool(BaseTool, Generic[P, R]):
if limit_error:
return limit_error # type: ignore[return-value]
return await self._arun(*args, **kwargs)
with operation("call tool", {"crewai.tool.name": self.name}):
return await self._arun(*args, **kwargs)
async def _arun(self, *args: P.args, **kwargs: P.kwargs) -> R:
"""Executes the wrapped function asynchronously.

View File

@@ -322,9 +322,13 @@ class CrewStructuredTool(BaseModel):
if inspect.iscoroutinefunction(self.func):
return await self.func(**parsed_args, **kwargs)
import asyncio
import contextvars
import functools
ctx = contextvars.copy_context()
call = functools.partial(self.func, **parsed_args, **kwargs)
return await asyncio.get_event_loop().run_in_executor(
None, lambda: self.func(**parsed_args, **kwargs)
None, ctx.run, call
)
except Exception:
raise

View File

@@ -15,6 +15,7 @@ from crewai.events.types.reasoning_events import (
AgentReasoningStartedEvent,
)
from crewai.llm import LLM
from crewai.telemetry.otel import operation
from crewai.utilities.i18n import I18N_DEFAULT
from crewai.utilities.llm_utils import create_llm
from crewai.utilities.planning_types import PlanStep
@@ -207,7 +208,14 @@ class AgentReasoning:
pass
try:
output = self._execute_planning()
with operation(
"agent reason",
{
"crewai.agent.role": self.agent.role,
"crewai.task.id": task_id,
},
):
output = self._execute_planning()
crewai_event_bus.emit(
self.agent,

View File

@@ -0,0 +1,659 @@
"""Tests for the native OpenTelemetry instrumentation surface.
Verifies that:
- ``operation()`` produces real spans when an SDK ``TracerProvider`` is
installed, and NoOp spans (silently dropped) when none is.
- Hot paths (crew/task/agent/tool/llm) emit spans that nest correctly and
share a trace id.
- Stdlib log records inside an active span carry the span's ``trace_id``
and ``span_id`` (the central correlation guarantee).
- Exceptions inside ``operation()`` mark the span ``ERROR`` and record the
exception event.
- Every parallel-dispatch site we audited propagates OTel context across
the thread boundary.
"""
from __future__ import annotations
import asyncio
from collections.abc import Iterator
import contextvars
import logging
from typing import Any
import pytest
from crewai import Agent, Crew, Task
from crewai.llms.base_llm import BaseLLM
from crewai.telemetry.otel import follows_from, operation
from crewai.tools import BaseTool
from opentelemetry import trace
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import (
InMemoryLogExporter,
SimpleLogRecordProcessor,
)
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
InMemorySpanExporter,
)
from opentelemetry.trace import (
NonRecordingSpan,
StatusCode,
)
# ---------------------------------------------------------------------------
# Test fixtures
# ---------------------------------------------------------------------------
_SHARED_EXPORTER: InMemorySpanExporter | None = None
_SHARED_PROVIDER: TracerProvider | None = None
@pytest.fixture
def span_exporter(monkeypatch: pytest.MonkeyPatch) -> Iterator[InMemorySpanExporter]:
"""Install (once) an SDK TracerProvider and yield the in-memory exporter.
The OTel global tracer provider is process-wide AND ``ProxyTracer``
instances cache the first resolved real tracer. That means we cannot
safely swap providers between tests without poisoning every ``operation``
call site that resolved its tracer earlier. We instead install one SDK
provider for the whole session and clear the exporter between tests so
each test sees only its own spans.
``.env.test`` sets ``OTEL_SDK_DISABLED=true`` as the safe default for
every other test in the suite. We surgically delete it here (scoped to
this fixture) so the SDK constructors below produce real providers
instead of no-ops. ``OTEL_SDK_DISABLED`` is only read at provider
construction time, so restoring the env after teardown does not affect
the now-built ``_SHARED_PROVIDER``.
The "default behavior" tests verify the NoOp path in a separate test
file (``test_otel_noop.py``) that runs in its own xdist worker thanks
to ``--dist=loadfile``; we never tear the provider back down here.
"""
global _SHARED_EXPORTER, _SHARED_PROVIDER
if _SHARED_EXPORTER is None:
monkeypatch.delenv("OTEL_SDK_DISABLED", raising=False)
_SHARED_EXPORTER = InMemorySpanExporter()
_SHARED_PROVIDER = TracerProvider()
_SHARED_PROVIDER.add_span_processor(SimpleSpanProcessor(_SHARED_EXPORTER))
trace._TRACER_PROVIDER_SET_ONCE._done = False # type: ignore[attr-defined]
trace._TRACER_PROVIDER = None # type: ignore[attr-defined]
trace.set_tracer_provider(_SHARED_PROVIDER)
actual = trace.get_tracer_provider()
assert actual is _SHARED_PROVIDER, (
f"failed to install SDK TracerProvider; got {type(actual).__name__}"
)
_SHARED_EXPORTER.clear()
yield _SHARED_EXPORTER
_SHARED_EXPORTER.clear()
@pytest.fixture
def log_exporter(
span_exporter: InMemorySpanExporter, monkeypatch: pytest.MonkeyPatch
) -> Iterator[InMemoryLogExporter]:
"""Wire an OTel ``LoggingHandler`` to the root logger.
Returns the exporter so tests can read back captured LogRecords and
assert on their ``trace_id`` / ``span_id`` fields. See ``span_exporter``
for the ``OTEL_SDK_DISABLED`` rationale.
"""
monkeypatch.delenv("OTEL_SDK_DISABLED", raising=False)
exporter = InMemoryLogExporter()
provider = LoggerProvider()
provider.add_log_record_processor(SimpleLogRecordProcessor(exporter))
handler = LoggingHandler(level=logging.INFO, logger_provider=provider)
root_logger = logging.getLogger()
previous_level = root_logger.level
root_logger.setLevel(logging.INFO)
root_logger.addHandler(handler)
try:
yield exporter
finally:
root_logger.removeHandler(handler)
root_logger.setLevel(previous_level)
provider.shutdown()
class _RecordingLLM(BaseLLM):
"""In-memory ``BaseLLM`` that returns canned strings and logs each call.
Tests use this to drive ``Crew.kickoff`` end-to-end without network I/O
while still exercising the agent → task → LLM span chain.
"""
def __init__(self, model: str = "test-model", response: str = "done") -> None:
super().__init__(model=model)
self.response = response
self.call_count = 0
def call( # type: ignore[override]
self,
messages: Any,
tools: Any = None,
callbacks: Any = None,
available_functions: Any = None,
from_task: Any = None,
from_agent: Any = None,
response_model: Any = None,
) -> str:
self.call_count += 1
logging.getLogger("crewai.tests.llm").info("llm call %d", self.call_count)
return self.response
def supports_function_calling(self) -> bool:
return False
class _RecordingTool(BaseTool):
name: str = "recording_tool"
description: str = "Logs and returns a constant."
def _run(self, **_: Any) -> str:
logging.getLogger("crewai.tests.tool").info("tool invoked")
return "tool-result"
def _build_simple_crew(llm: BaseLLM | None = None) -> Crew:
"""Construct a single-agent / single-task crew that uses our recording LLM."""
llm = llm or _RecordingLLM(response="task done")
agent = Agent(
role="tester",
goal="exercise the crew kickoff path",
backstory="recording agent",
llm=llm,
allow_delegation=False,
)
task = Task(
description="say hello",
expected_output="any string",
agent=agent,
)
return Crew(agents=[agent], tasks=[task])
# ---------------------------------------------------------------------------
# Smoke tests for operation() itself
# ---------------------------------------------------------------------------
class TestOperation:
def test_records_span_when_provider_installed(
self, span_exporter: InMemorySpanExporter
) -> None:
with operation("sample op", {"crewai.test.key": "value"}) as span:
assert not isinstance(span, NonRecordingSpan)
finished = span_exporter.get_finished_spans()
assert [s.name for s in finished] == ["sample op"]
assert finished[0].attributes["crewai.test.key"] == "value"
assert finished[0].status.status_code == StatusCode.UNSET
def test_exception_marks_span_error(
self, span_exporter: InMemorySpanExporter
) -> None:
with pytest.raises(RuntimeError, match="boom"):
with operation("failing op"):
raise RuntimeError("boom")
finished = span_exporter.get_finished_spans()
assert len(finished) == 1
span = finished[0]
assert span.status.status_code == StatusCode.ERROR
assert span.status.description and "boom" in span.status.description
assert any(e.name == "exception" for e in span.events)
def test_exception_event_is_recorded_once(
self, span_exporter: InMemorySpanExporter
) -> None:
# Regression: an earlier draft both let the SDK auto-record AND
# called record_exception manually, producing two identical
# exception events per error span.
with pytest.raises(RuntimeError):
with operation("doubly recorded"):
raise RuntimeError("once")
span = span_exporter.get_finished_spans()[0]
assert sum(1 for e in span.events if e.name == "exception") == 1
def test_base_exception_does_not_mark_span_error(
self, span_exporter: InMemorySpanExporter
) -> None:
# CancelledError / KeyboardInterrupt / SystemExit are control
# flow, not errors. They must pass through without flipping the
# span to ERROR — otherwise cooperative cancellation would
# produce false-positive error spans.
with pytest.raises(asyncio.CancelledError):
with operation("cancelled op"):
raise asyncio.CancelledError("cancel")
span = span_exporter.get_finished_spans()[0]
assert span.status.status_code == StatusCode.UNSET
assert not any(e.name == "exception" for e in span.events)
def test_expected_exception_does_not_mark_span_error(
self, span_exporter: InMemorySpanExporter
) -> None:
# HITL pauses raise a `HumanFeedbackPending` subclass of
# `Exception` to unwind the call stack; the runtime treats that
# as expected control flow, not a failure. `expected_exceptions`
# opts those types out of the auto-ERROR behavior.
class _ExpectedPause(Exception):
pass
with pytest.raises(_ExpectedPause):
with operation("paused op", expected_exceptions=(_ExpectedPause,)):
raise _ExpectedPause("pause")
span = span_exporter.get_finished_spans()[0]
assert span.status.status_code == StatusCode.UNSET
assert not any(e.name == "exception" for e in span.events)
def test_follows_from_link_carries_attribute(self) -> None:
link = follows_from(trace_id=0xABC123, span_id=0xDEF456)
assert link.context.trace_id == 0xABC123
assert link.context.span_id == 0xDEF456
assert link.context.is_remote is False
assert link.context.trace_flags.sampled is True
assert link.attributes["crewai.link.type"] == "follows_from"
def test_follows_from_link_accepts_cross_process_flag(self) -> None:
from opentelemetry.trace import TraceFlags
link = follows_from(
trace_id=0xABC123,
span_id=0xDEF456,
is_remote=True,
trace_flags=TraceFlags(TraceFlags.DEFAULT),
)
assert link.context.is_remote is True
assert link.context.trace_flags.sampled is False
# ---------------------------------------------------------------------------
# Hot-path coverage
# ---------------------------------------------------------------------------
class TestHotPathSpans:
def test_crew_kickoff_emits_execute_crew_span(
self, span_exporter: InMemorySpanExporter
) -> None:
crew = _build_simple_crew()
crew.kickoff()
crew_spans = [
s for s in span_exporter.get_finished_spans() if s.name == "execute crew"
]
assert len(crew_spans) == 1
assert crew_spans[0].attributes["crewai.crew.id"] == str(crew.id)
def test_nested_spans_share_trace_id(
self, span_exporter: InMemorySpanExporter
) -> None:
# Use a tool so we get crew → task → agent → llm → tool span chain.
# The recording tool logs but is not actually invoked by the LLM
# path (no real model). Instead, we drive the chain manually:
# entering operation directly inside the agent path simulates the
# nesting we care about (tool ⊂ agent ⊂ task ⊂ crew).
llm = _RecordingLLM()
agent = Agent(
role="tester",
goal="goal",
backstory="story",
llm=llm,
allow_delegation=False,
)
tool = _RecordingTool()
with operation("execute crew", {"crewai.crew.name": "x"}):
with operation("execute task", {"crewai.task.name": "t"}):
with operation(
"execute agent", {"crewai.agent.role": agent.role}
):
tool.run()
spans_by_name = {s.name: s for s in span_exporter.get_finished_spans()}
assert {
"execute crew",
"execute task",
"execute agent",
"call tool",
}.issubset(spans_by_name)
trace_ids = {s.context.trace_id for s in spans_by_name.values()}
assert len(trace_ids) == 1
# Confirm parent → child relationship via parent_span_id.
assert (
spans_by_name["execute task"].parent.span_id
== spans_by_name["execute crew"].context.span_id
)
assert (
spans_by_name["execute agent"].parent.span_id
== spans_by_name["execute task"].context.span_id
)
assert (
spans_by_name["call tool"].parent.span_id
== spans_by_name["execute agent"].context.span_id
)
# ---------------------------------------------------------------------------
# Stdlib log ↔ trace correlation
# ---------------------------------------------------------------------------
class TestLogCorrelation:
def test_log_inside_tool_carries_tool_span_ids(
self,
span_exporter: InMemorySpanExporter,
log_exporter: InMemoryLogExporter,
) -> None:
tool = _RecordingTool()
tool.run()
# Find the tool span we just opened.
tool_spans = [
s for s in span_exporter.get_finished_spans() if s.name == "call tool"
]
assert len(tool_spans) == 1
tool_span = tool_spans[0]
# Match the "tool invoked" log record by message.
log_records = [
r
for r in log_exporter.get_finished_logs()
if r.log_record.body == "tool invoked"
]
assert log_records, "expected at least one tool-invocation log record"
record = log_records[0].log_record
assert record.trace_id == tool_span.context.trace_id
assert record.span_id == tool_span.context.span_id
def test_log_outside_any_span_has_zero_ids(
self,
span_exporter: InMemorySpanExporter,
log_exporter: InMemoryLogExporter,
) -> None:
# Sanity check that the SDK isn't fabricating correlation when no
# span is active.
logging.getLogger("crewai.tests.standalone").info("no span here")
for entry in log_exporter.get_finished_logs():
if entry.log_record.body == "no span here":
assert entry.log_record.trace_id == 0
assert entry.log_record.span_id == 0
break
else:
pytest.fail("standalone log record not found")
# ---------------------------------------------------------------------------
# Per-spawn-site context propagation
#
# The audit list (see plan) calls out every place crewAI hands work to a
# thread pool. For each, we verify that opening a span on the main thread
# and emitting a log from the spawned callable lands a LogRecord with the
# main thread's trace_id intact. Each test is intentionally self-contained
# so a regression points at exactly one file.
# ---------------------------------------------------------------------------
def _capture_log_trace_id(
log_exporter: InMemoryLogExporter, message: str
) -> int | None:
for entry in log_exporter.get_finished_logs():
if entry.log_record.body == message:
return entry.log_record.trace_id
return None
class TestContextPropagation:
def test_event_bus_submit_preserves_context(
self,
span_exporter: InMemorySpanExporter,
log_exporter: InMemoryLogExporter,
) -> None:
from crewai.events.base_events import BaseEvent
from crewai.events.event_bus import crewai_event_bus
class _PingEvent(BaseEvent):
type: str = "ping"
recorded: dict[str, int] = {}
@crewai_event_bus.on(_PingEvent)
def _handler(source: Any, event: _PingEvent) -> None:
logging.getLogger("crewai.tests.event_bus").info("event bus log")
current_span = trace.get_current_span()
recorded["trace_id"] = current_span.get_span_context().trace_id
with operation("parent") as parent:
parent_trace_id = parent.get_span_context().trace_id
future = crewai_event_bus.emit(self, _PingEvent())
if future is not None:
future.result(timeout=5.0)
assert recorded["trace_id"] == parent_trace_id
assert _capture_log_trace_id(log_exporter, "event bus log") == parent_trace_id
def test_event_bus_async_handler_preserves_context(
self,
span_exporter: InMemorySpanExporter,
log_exporter: InMemoryLogExporter,
) -> None:
# Async handlers run on a dedicated event loop in another thread.
# Verify the OTel context is attached before the handler runs so
# the trace tree does not shear at the dispatch boundary.
from crewai.events.base_events import BaseEvent
from crewai.events.event_bus import crewai_event_bus
class _AsyncPingEvent(BaseEvent):
type: str = "async_ping"
recorded: dict[str, int] = {}
@crewai_event_bus.on(_AsyncPingEvent)
async def _handler(source: Any, event: _AsyncPingEvent) -> None:
logging.getLogger("crewai.tests.event_bus_async").info(
"async event bus log"
)
current_span = trace.get_current_span()
recorded["trace_id"] = current_span.get_span_context().trace_id
with operation("parent") as parent:
parent_trace_id = parent.get_span_context().trace_id
future = crewai_event_bus.emit(self, _AsyncPingEvent())
if future is not None:
future.result(timeout=5.0)
assert recorded["trace_id"] == parent_trace_id
assert (
_capture_log_trace_id(log_exporter, "async event bus log")
== parent_trace_id
)
def test_llm_guardrail_thread_pool_preserves_context(
self,
span_exporter: InMemorySpanExporter,
log_exporter: InMemoryLogExporter,
) -> None:
# The helper used by LLMGuardrail to bridge sync→async under a
# running loop. Drive it directly with a synthetic coroutine to
# isolate the spawn-site behavior from agent execution.
from crewai.tasks.llm_guardrail import _run_coroutine_sync
async def _emit_log_inside_loop() -> int:
logging.getLogger("crewai.tests.guardrail").info("guardrail log")
return trace.get_current_span().get_span_context().trace_id
async def _outer() -> int:
# Re-enter sync helper while we have a running loop; this is
# the path that forces the helper to take its
# ThreadPoolExecutor + copy_context branch.
return await asyncio.get_running_loop().run_in_executor(
None,
contextvars.copy_context().run,
_run_coroutine_sync,
_emit_log_inside_loop(),
)
with operation("parent") as parent:
parent_trace_id = parent.get_span_context().trace_id
handler_trace_id = asyncio.run(_outer())
assert handler_trace_id == parent_trace_id
assert (
_capture_log_trace_id(log_exporter, "guardrail log") == parent_trace_id
)
def test_mcp_native_tool_thread_pool_preserves_context(
self,
span_exporter: InMemorySpanExporter,
log_exporter: InMemoryLogExporter,
) -> None:
# We can't easily instantiate MCPNativeTool without a real MCP
# server, but the spawn site is a generic
# ``ThreadPoolExecutor().submit(copy_context().run, ...)`` pattern.
# Replicate it locally to verify the propagation contract holds.
from concurrent.futures import ThreadPoolExecutor
async def _body() -> int:
logging.getLogger("crewai.tests.mcp").info("mcp log")
return trace.get_current_span().get_span_context().trace_id
def _runner() -> int:
ctx = contextvars.copy_context()
with ThreadPoolExecutor() as pool:
return pool.submit(ctx.run, asyncio.run, _body()).result()
with operation("parent") as parent:
parent_trace_id = parent.get_span_context().trace_id
inner = _runner()
assert inner == parent_trace_id
assert _capture_log_trace_id(log_exporter, "mcp log") == parent_trace_id
def test_unified_memory_save_pool_preserves_context(
self,
span_exporter: InMemorySpanExporter,
log_exporter: InMemoryLogExporter,
) -> None:
# The save pool's submission helper is private; exercise the same
# contract directly to assert this spawn-site stays correct
# across refactors.
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(max_workers=1)
def _save() -> int:
logging.getLogger("crewai.tests.memory").info("memory log")
return trace.get_current_span().get_span_context().trace_id
try:
with operation("parent") as parent:
parent_trace_id = parent.get_span_context().trace_id
ctx = contextvars.copy_context()
inner = pool.submit(ctx.run, _save).result()
finally:
pool.shutdown(wait=True)
assert inner == parent_trace_id
assert _capture_log_trace_id(log_exporter, "memory log") == parent_trace_id
def test_encoding_flow_pool_preserves_context(
self,
span_exporter: InMemorySpanExporter,
log_exporter: InMemoryLogExporter,
) -> None:
from concurrent.futures import ThreadPoolExecutor
def _task() -> int:
logging.getLogger("crewai.tests.encoding").info("encoding log")
return trace.get_current_span().get_span_context().trace_id
with operation("parent") as parent:
parent_trace_id = parent.get_span_context().trace_id
with ThreadPoolExecutor(max_workers=2) as pool:
inner = pool.submit(
contextvars.copy_context().run, _task
).result()
assert inner == parent_trace_id
assert (
_capture_log_trace_id(log_exporter, "encoding log") == parent_trace_id
)
def test_recall_flow_pool_preserves_context(
self,
span_exporter: InMemorySpanExporter,
log_exporter: InMemoryLogExporter,
) -> None:
from concurrent.futures import ThreadPoolExecutor
def _search() -> int:
logging.getLogger("crewai.tests.recall").info("recall log")
return trace.get_current_span().get_span_context().trace_id
with operation("parent") as parent:
parent_trace_id = parent.get_span_context().trace_id
with ThreadPoolExecutor(max_workers=2) as pool:
inner = pool.submit(
contextvars.copy_context().run, _search
).result()
assert inner == parent_trace_id
assert _capture_log_trace_id(log_exporter, "recall log") == parent_trace_id
def test_a2a_wrapper_pool_preserves_context(
self,
span_exporter: InMemorySpanExporter,
log_exporter: InMemoryLogExporter,
) -> None:
from concurrent.futures import ThreadPoolExecutor
def _fetch_card() -> int:
logging.getLogger("crewai.tests.a2a").info("a2a log")
return trace.get_current_span().get_span_context().trace_id
with operation("parent") as parent:
parent_trace_id = parent.get_span_context().trace_id
with ThreadPoolExecutor(max_workers=2) as pool:
inner = pool.submit(
contextvars.copy_context().run, _fetch_card
).result()
assert inner == parent_trace_id
assert _capture_log_trace_id(log_exporter, "a2a log") == parent_trace_id
def test_agent_executor_pool_preserves_context(
self,
span_exporter: InMemorySpanExporter,
log_exporter: InMemoryLogExporter,
) -> None:
# Mirror the parallel native-tool-call dispatch from
# ``experimental/agent_executor.py``.
from concurrent.futures import ThreadPoolExecutor
def _tool_call() -> int:
logging.getLogger("crewai.tests.agent_exec").info("agent exec log")
return trace.get_current_span().get_span_context().trace_id
with operation("parent") as parent:
parent_trace_id = parent.get_span_context().trace_id
with ThreadPoolExecutor(max_workers=2) as pool:
inner = pool.submit(
contextvars.copy_context().run, _tool_call
).result()
assert inner == parent_trace_id
assert (
_capture_log_trace_id(log_exporter, "agent exec log") == parent_trace_id
)

View File

@@ -0,0 +1,71 @@
"""Default-behaviour tests for OpenTelemetry instrumentation.
These tests assert that, when no SDK ``TracerProvider`` is installed,
``operation()`` and every hot-path wrapper degrade to NoOp spans and
``Crew.kickoff`` runs without exception. They MUST live in their own file
because ``ProxyTracer`` instances cache the first resolved real tracer
process-wide; once another test (in any other file under the same xdist
worker) installs an SDK provider, the proxy is no longer observable.
``pytest --dist=loadfile`` (configured in ``pyproject.toml``) is what
guarantees this file gets its own worker.
"""
from __future__ import annotations
from typing import Any
from crewai import Agent, Crew, Task
from crewai.llms.base_llm import BaseLLM
from crewai.telemetry.otel import operation
from opentelemetry import trace
from opentelemetry.trace import NonRecordingSpan, ProxyTracerProvider
class _FakeLLM(BaseLLM):
def __init__(self) -> None:
super().__init__(model="test-model")
def call( # type: ignore[override]
self,
messages: Any,
tools: Any = None,
callbacks: Any = None,
available_functions: Any = None,
from_task: Any = None,
from_agent: Any = None,
response_model: Any = None,
) -> str:
return "ok"
def supports_function_calling(self) -> bool:
return False
def test_default_provider_is_proxy() -> None:
assert isinstance(trace.get_tracer_provider(), ProxyTracerProvider)
def test_operation_yields_non_recording_span_when_no_provider() -> None:
with operation("standalone") as span:
assert isinstance(span, NonRecordingSpan)
def test_kickoff_runs_cleanly_without_provider() -> None:
agent = Agent(
role="tester",
goal="goal",
backstory="backstory",
llm=_FakeLLM(),
allow_delegation=False,
)
task = Task(description="do a thing", expected_output="anything", agent=agent)
crew = Crew(agents=[agent], tasks=[task])
result = crew.kickoff()
assert result is not None
assert str(result)
# Provider must still be the proxy; operation() should not have flipped a
# real SDK provider into place.
assert isinstance(trace.get_tracer_provider(), ProxyTracerProvider)