From 4d6ff2cb7089cdc43ceae2f10bf4164aa03cf646 Mon Sep 17 00:00:00 2001 From: Lucas Gomide Date: Tue, 23 Jun 2026 11:53:22 -0300 Subject: [PATCH] fix: keep coroutine results inside the execute flow method span Address review feedback on the native OpenTelemetry instrumentation --- .../src/crewai/flow/runtime/__init__.py | 9 ++++--- lib/crewai/src/crewai/llm.py | 10 ++++--- .../llms/providers/anthropic/completion.py | 10 ++++--- .../crewai/llms/providers/azure/completion.py | 10 ++++--- .../llms/providers/bedrock/completion.py | 10 ++++--- .../llms/providers/gemini/completion.py | 10 ++++--- .../llms/providers/openai/completion.py | 10 ++++--- lib/crewai/src/crewai/telemetry/otel.py | 4 +-- .../src/crewai/tools/structured_tool.py | 4 +-- lib/crewai/tests/telemetry/test_otel.py | 27 +++++++++++++++++-- 10 files changed, 68 insertions(+), 36 deletions(-) diff --git a/lib/crewai/src/crewai/flow/runtime/__init__.py b/lib/crewai/src/crewai/flow/runtime/__init__.py index c22473a01..dbb547e19 100644 --- a/lib/crewai/src/crewai/flow/runtime/__init__.py +++ b/lib/crewai/src/crewai/flow/runtime/__init__.py @@ -2868,13 +2868,14 @@ class Flow(BaseModel, Generic[T], metaclass=FlowMeta): result = await asyncio.to_thread( ctx.run, method, *args, **kwargs ) + # Auto-await coroutines returned from sync methods so the + # whole call stays inside the "execute flow method" span + # (enables AgentExecutor pattern). + if asyncio.iscoroutine(result): + result = await result finally: current_flow_method_name.reset(method_name_token) - # Auto-await coroutines returned from sync methods (enables AgentExecutor pattern) - if asyncio.iscoroutine(result): - result = await result - method_definition = self._definition.methods[str(method_name)] if method_definition.human_feedback is not None: result = await self._run_human_feedback_step( diff --git a/lib/crewai/src/crewai/llm.py b/lib/crewai/src/crewai/llm.py index 7e944f27d..9606b8068 100644 --- a/lib/crewai/src/crewai/llm.py +++ b/lib/crewai/src/crewai/llm.py @@ -1814,8 +1814,9 @@ class LLM(BaseLLM): ValueError: If response format is not supported LLMContextLengthExceededError: If input exceeds model's context limit """ - with llm_call_context(), operation( - "call llm", {"crewai.llm.model": self.model} + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), ): self._emit_call_started_event( messages=messages, @@ -1955,8 +1956,9 @@ class LLM(BaseLLM): ValueError: If response format is not supported LLMContextLengthExceededError: If input exceeds model's context limit """ - with llm_call_context(), operation( - "call llm", {"crewai.llm.model": self.model} + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), ): self._emit_call_started_event( messages=messages, diff --git a/lib/crewai/src/crewai/llms/providers/anthropic/completion.py b/lib/crewai/src/crewai/llms/providers/anthropic/completion.py index d96d4bf46..edc84113f 100644 --- a/lib/crewai/src/crewai/llms/providers/anthropic/completion.py +++ b/lib/crewai/src/crewai/llms/providers/anthropic/completion.py @@ -298,8 +298,9 @@ class AnthropicCompletion(BaseLLM): Returns: Chat completion response or tool call result """ - with llm_call_context(), operation( - "call llm", {"crewai.llm.model": self.model} + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), ): try: self._emit_call_started_event( @@ -375,8 +376,9 @@ class AnthropicCompletion(BaseLLM): Returns: Chat completion response or tool call result """ - with llm_call_context(), operation( - "call llm", {"crewai.llm.model": self.model} + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), ): try: self._emit_call_started_event( diff --git a/lib/crewai/src/crewai/llms/providers/azure/completion.py b/lib/crewai/src/crewai/llms/providers/azure/completion.py index fe3cba709..b9b478168 100644 --- a/lib/crewai/src/crewai/llms/providers/azure/completion.py +++ b/lib/crewai/src/crewai/llms/providers/azure/completion.py @@ -504,8 +504,9 @@ class AzureCompletion(BaseLLM): response_model=response_model, ) - with llm_call_context(), operation( - "call llm", {"crewai.llm.model": self.model} + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), ): try: self._emit_call_started_event( @@ -585,8 +586,9 @@ class AzureCompletion(BaseLLM): response_model=response_model, ) - with llm_call_context(), operation( - "call llm", {"crewai.llm.model": self.model} + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), ): try: self._emit_call_started_event( diff --git a/lib/crewai/src/crewai/llms/providers/bedrock/completion.py b/lib/crewai/src/crewai/llms/providers/bedrock/completion.py index afb322733..c91c1eab5 100644 --- a/lib/crewai/src/crewai/llms/providers/bedrock/completion.py +++ b/lib/crewai/src/crewai/llms/providers/bedrock/completion.py @@ -363,8 +363,9 @@ class BedrockCompletion(BaseLLM): """Call AWS Bedrock Converse API.""" effective_response_model = response_model or self.response_format - with llm_call_context(), operation( - "call llm", {"crewai.llm.model": self.model} + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), ): try: self._emit_call_started_event( @@ -498,8 +499,9 @@ class BedrockCompletion(BaseLLM): 'Install with: uv add "crewai[bedrock-async]"' ) - with llm_call_context(), operation( - "call llm", {"crewai.llm.model": self.model} + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), ): try: self._emit_call_started_event( diff --git a/lib/crewai/src/crewai/llms/providers/gemini/completion.py b/lib/crewai/src/crewai/llms/providers/gemini/completion.py index 057e0a7b4..2b7ecbbd5 100644 --- a/lib/crewai/src/crewai/llms/providers/gemini/completion.py +++ b/lib/crewai/src/crewai/llms/providers/gemini/completion.py @@ -295,8 +295,9 @@ class GeminiCompletion(BaseLLM): Returns: Chat completion response or tool call result """ - with llm_call_context(), operation( - "call llm", {"crewai.llm.model": self.model} + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), ): try: self._emit_call_started_event( @@ -383,8 +384,9 @@ class GeminiCompletion(BaseLLM): Returns: Chat completion response or tool call result """ - with llm_call_context(), operation( - "call llm", {"crewai.llm.model": self.model} + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), ): try: self._emit_call_started_event( diff --git a/lib/crewai/src/crewai/llms/providers/openai/completion.py b/lib/crewai/src/crewai/llms/providers/openai/completion.py index 77a1ae14a..a411922f5 100644 --- a/lib/crewai/src/crewai/llms/providers/openai/completion.py +++ b/lib/crewai/src/crewai/llms/providers/openai/completion.py @@ -411,8 +411,9 @@ class OpenAICompletion(BaseLLM): Returns: Completion response or tool call result. """ - with llm_call_context(), operation( - "call llm", {"crewai.llm.model": self.model} + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), ): try: self._emit_call_started_event( @@ -513,8 +514,9 @@ class OpenAICompletion(BaseLLM): Returns: Completion response or tool call result. """ - with llm_call_context(), operation( - "call llm", {"crewai.llm.model": self.model} + with ( + llm_call_context(), + operation("call llm", {"crewai.llm.model": self.model}), ): try: self._emit_call_started_event( diff --git a/lib/crewai/src/crewai/telemetry/otel.py b/lib/crewai/src/crewai/telemetry/otel.py index 4b4d9f714..7ff086f26 100644 --- a/lib/crewai/src/crewai/telemetry/otel.py +++ b/lib/crewai/src/crewai/telemetry/otel.py @@ -83,9 +83,7 @@ def operation( raise except Exception as exc: span.record_exception(exc, escaped=True) - span.set_status( - Status(StatusCode.ERROR, f"{type(exc).__name__}: {exc}") - ) + span.set_status(Status(StatusCode.ERROR, f"{type(exc).__name__}: {exc}")) raise diff --git a/lib/crewai/src/crewai/tools/structured_tool.py b/lib/crewai/src/crewai/tools/structured_tool.py index 0b7c12bc8..d76024299 100644 --- a/lib/crewai/src/crewai/tools/structured_tool.py +++ b/lib/crewai/src/crewai/tools/structured_tool.py @@ -327,9 +327,7 @@ class CrewStructuredTool(BaseModel): ctx = contextvars.copy_context() call = functools.partial(self.func, **parsed_args, **kwargs) - return await asyncio.get_event_loop().run_in_executor( - None, ctx.run, call - ) + return await asyncio.get_event_loop().run_in_executor(None, ctx.run, call) except Exception: raise diff --git a/lib/crewai/tests/telemetry/test_otel.py b/lib/crewai/tests/telemetry/test_otel.py index 678bf3fd7..b76f8111b 100644 --- a/lib/crewai/tests/telemetry/test_otel.py +++ b/lib/crewai/tests/telemetry/test_otel.py @@ -52,6 +52,30 @@ _SHARED_EXPORTER: InMemorySpanExporter | None = None _SHARED_PROVIDER: TracerProvider | None = None +def _reset_global_tracer_provider() -> None: + """Reset OTel's process-global tracer provider slot. + + OTel's ``set_tracer_provider`` is a one-shot install: once called, the + private ``_TRACER_PROVIDER_SET_ONCE`` latch silently no-ops every + subsequent call. Tests that need to install their own SDK provider + have to undo that latch, but OTel exposes no public API for it, so we + poke the private symbols directly. + + This helper is pinned to ``opentelemetry-api~=1.34.0`` (see the + project's ``pyproject.toml``). If a future bump renames or removes + either of these private attributes, the ``assert`` below will fail + loudly and a maintainer can adjust the shim. + """ + assert hasattr(trace, "_TRACER_PROVIDER_SET_ONCE"), ( + "opentelemetry-api dropped _TRACER_PROVIDER_SET_ONCE; update _reset_global_tracer_provider" + ) + assert hasattr(trace, "_TRACER_PROVIDER"), ( + "opentelemetry-api dropped _TRACER_PROVIDER; update _reset_global_tracer_provider" + ) + trace._TRACER_PROVIDER_SET_ONCE._done = False # type: ignore[attr-defined] + trace._TRACER_PROVIDER = None # type: ignore[attr-defined] + + @pytest.fixture def span_exporter(monkeypatch: pytest.MonkeyPatch) -> Iterator[InMemorySpanExporter]: """Install (once) an SDK TracerProvider and yield the in-memory exporter. @@ -81,8 +105,7 @@ def span_exporter(monkeypatch: pytest.MonkeyPatch) -> Iterator[InMemorySpanExpor _SHARED_EXPORTER = InMemorySpanExporter() _SHARED_PROVIDER = TracerProvider() _SHARED_PROVIDER.add_span_processor(SimpleSpanProcessor(_SHARED_EXPORTER)) - trace._TRACER_PROVIDER_SET_ONCE._done = False # type: ignore[attr-defined] - trace._TRACER_PROVIDER = None # type: ignore[attr-defined] + _reset_global_tracer_provider() trace.set_tracer_provider(_SHARED_PROVIDER) actual = trace.get_tracer_provider() assert actual is _SHARED_PROVIDER, (