fix: resolve additional mypy type annotation issues

- Fixed rag_storage.py embedder type compatibility and query response handling
- Fixed knowledge_storage.py dict type parameters and return types
- Added comprehensive type annotations to telemetry.py methods
- Added type annotations to trace_listener.py event handlers and methods
- Fixed ChromaDB response indexing safety checks
This commit is contained in:
Greyson LaLonde
2025-09-04 13:22:36 -04:00
parent 8dd3493e9c
commit 23c60befd8
4 changed files with 125 additions and 102 deletions

View File

@@ -2,6 +2,8 @@ import os
import uuid
from typing import Any, Optional
from typing_extensions import Self
from crewai.cli.authentication.token import AuthError, get_auth_token
from crewai.cli.version import get_crewai_version
from crewai.events.base_event_listener import BaseEventListener
@@ -84,7 +86,7 @@ class TraceCollectionListener(BaseEventListener):
_initialized = False
_listeners_setup = False
def __new__(cls, batch_manager=None):
def __new__(cls, batch_manager: Optional[Any] = None) -> "TraceCollectionListener":
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@@ -129,169 +131,169 @@ class TraceCollectionListener(BaseEventListener):
self._listeners_setup = True
def _register_flow_event_handlers(self, event_bus):
def _register_flow_event_handlers(self, event_bus: CrewAIEventsBus) -> None:
"""Register handlers for flow events"""
@event_bus.on(FlowCreatedEvent)
def on_flow_created(source, event):
def on_flow_created(source: Any, event: Any) -> None:
pass
@event_bus.on(FlowStartedEvent)
def on_flow_started(source, event):
def on_flow_started(source: Any, event: Any) -> None:
if not self.batch_manager.is_batch_initialized():
self._initialize_flow_batch(source, event)
self._handle_trace_event("flow_started", source, event)
@event_bus.on(MethodExecutionStartedEvent)
def on_method_started(source, event):
def on_method_started(source: Any, event: Any) -> None:
self._handle_trace_event("method_execution_started", source, event)
@event_bus.on(MethodExecutionFinishedEvent)
def on_method_finished(source, event):
def on_method_finished(source: Any, event: Any) -> None:
self._handle_trace_event("method_execution_finished", source, event)
@event_bus.on(MethodExecutionFailedEvent)
def on_method_failed(source, event):
def on_method_failed(source: Any, event: Any) -> None:
self._handle_trace_event("method_execution_failed", source, event)
@event_bus.on(FlowFinishedEvent)
def on_flow_finished(source, event):
def on_flow_finished(source: Any, event: Any) -> None:
self._handle_trace_event("flow_finished", source, event)
if self.batch_manager.batch_owner_type == "flow":
self.batch_manager.finalize_batch()
@event_bus.on(FlowPlotEvent)
def on_flow_plot(source, event):
def on_flow_plot(source: Any, event: Any) -> None:
self._handle_action_event("flow_plot", source, event)
def _register_context_event_handlers(self, event_bus):
def _register_context_event_handlers(self, event_bus: CrewAIEventsBus) -> None:
"""Register handlers for context events (start/end)"""
@event_bus.on(CrewKickoffStartedEvent)
def on_crew_started(source, event):
def on_crew_started(source: Any, event: Any) -> None:
if not self.batch_manager.is_batch_initialized():
self._initialize_crew_batch(source, event)
self._handle_trace_event("crew_kickoff_started", source, event)
@event_bus.on(CrewKickoffCompletedEvent)
def on_crew_completed(source, event):
def on_crew_completed(source: Any, event: Any) -> None:
self._handle_trace_event("crew_kickoff_completed", source, event)
if self.batch_manager.batch_owner_type == "crew":
self.batch_manager.finalize_batch()
@event_bus.on(CrewKickoffFailedEvent)
def on_crew_failed(source, event):
def on_crew_failed(source: Any, event: Any) -> None:
self._handle_trace_event("crew_kickoff_failed", source, event)
self.batch_manager.finalize_batch()
@event_bus.on(TaskStartedEvent)
def on_task_started(source, event):
def on_task_started(source: Any, event: Any) -> None:
self._handle_trace_event("task_started", source, event)
@event_bus.on(TaskCompletedEvent)
def on_task_completed(source, event):
def on_task_completed(source: Any, event: Any) -> None:
self._handle_trace_event("task_completed", source, event)
@event_bus.on(TaskFailedEvent)
def on_task_failed(source, event):
def on_task_failed(source: Any, event: Any) -> None:
self._handle_trace_event("task_failed", source, event)
@event_bus.on(AgentExecutionStartedEvent)
def on_agent_started(source, event):
def on_agent_started(source: Any, event: Any) -> None:
self._handle_trace_event("agent_execution_started", source, event)
@event_bus.on(AgentExecutionCompletedEvent)
def on_agent_completed(source, event):
def on_agent_completed(source: Any, event: Any) -> None:
self._handle_trace_event("agent_execution_completed", source, event)
@event_bus.on(LiteAgentExecutionStartedEvent)
def on_lite_agent_started(source, event):
def on_lite_agent_started(source: Any, event: Any) -> None:
self._handle_trace_event("lite_agent_execution_started", source, event)
@event_bus.on(LiteAgentExecutionCompletedEvent)
def on_lite_agent_completed(source, event):
def on_lite_agent_completed(source: Any, event: Any) -> None:
self._handle_trace_event("lite_agent_execution_completed", source, event)
@event_bus.on(LiteAgentExecutionErrorEvent)
def on_lite_agent_error(source, event):
def on_lite_agent_error(source: Any, event: Any) -> None:
self._handle_trace_event("lite_agent_execution_error", source, event)
@event_bus.on(AgentExecutionErrorEvent)
def on_agent_error(source, event):
def on_agent_error(source: Any, event: Any) -> None:
self._handle_trace_event("agent_execution_error", source, event)
@event_bus.on(LLMGuardrailStartedEvent)
def on_guardrail_started(source, event):
def on_guardrail_started(source: Any, event: Any) -> None:
self._handle_trace_event("llm_guardrail_started", source, event)
@event_bus.on(LLMGuardrailCompletedEvent)
def on_guardrail_completed(source, event):
def on_guardrail_completed(source: Any, event: Any) -> None:
self._handle_trace_event("llm_guardrail_completed", source, event)
def _register_action_event_handlers(self, event_bus):
def _register_action_event_handlers(self, event_bus: CrewAIEventsBus) -> None:
"""Register handlers for action events (LLM calls, tool usage)"""
@event_bus.on(LLMCallStartedEvent)
def on_llm_call_started(source, event):
def on_llm_call_started(source: Any, event: Any) -> None:
self._handle_action_event("llm_call_started", source, event)
@event_bus.on(LLMCallCompletedEvent)
def on_llm_call_completed(source, event):
def on_llm_call_completed(source: Any, event: Any) -> None:
self._handle_action_event("llm_call_completed", source, event)
@event_bus.on(LLMCallFailedEvent)
def on_llm_call_failed(source, event):
def on_llm_call_failed(source: Any, event: Any) -> None:
self._handle_action_event("llm_call_failed", source, event)
@event_bus.on(ToolUsageStartedEvent)
def on_tool_started(source, event):
def on_tool_started(source: Any, event: Any) -> None:
self._handle_action_event("tool_usage_started", source, event)
@event_bus.on(ToolUsageFinishedEvent)
def on_tool_finished(source, event):
def on_tool_finished(source: Any, event: Any) -> None:
self._handle_action_event("tool_usage_finished", source, event)
@event_bus.on(ToolUsageErrorEvent)
def on_tool_error(source, event):
def on_tool_error(source: Any, event: Any) -> None:
self._handle_action_event("tool_usage_error", source, event)
@event_bus.on(MemoryQueryStartedEvent)
def on_memory_query_started(source, event):
def on_memory_query_started(source: Any, event: Any) -> None:
self._handle_action_event("memory_query_started", source, event)
@event_bus.on(MemoryQueryCompletedEvent)
def on_memory_query_completed(source, event):
def on_memory_query_completed(source: Any, event: Any) -> None:
self._handle_action_event("memory_query_completed", source, event)
@event_bus.on(MemoryQueryFailedEvent)
def on_memory_query_failed(source, event):
def on_memory_query_failed(source: Any, event: Any) -> None:
self._handle_action_event("memory_query_failed", source, event)
@event_bus.on(MemorySaveStartedEvent)
def on_memory_save_started(source, event):
def on_memory_save_started(source: Any, event: Any) -> None:
self._handle_action_event("memory_save_started", source, event)
@event_bus.on(MemorySaveCompletedEvent)
def on_memory_save_completed(source, event):
def on_memory_save_completed(source: Any, event: Any) -> None:
self._handle_action_event("memory_save_completed", source, event)
@event_bus.on(MemorySaveFailedEvent)
def on_memory_save_failed(source, event):
def on_memory_save_failed(source: Any, event: Any) -> None:
self._handle_action_event("memory_save_failed", source, event)
@event_bus.on(AgentReasoningStartedEvent)
def on_agent_reasoning_started(source, event):
def on_agent_reasoning_started(source: Any, event: Any) -> None:
self._handle_action_event("agent_reasoning_started", source, event)
@event_bus.on(AgentReasoningCompletedEvent)
def on_agent_reasoning_completed(source, event):
def on_agent_reasoning_completed(source: Any, event: Any) -> None:
self._handle_action_event("agent_reasoning_completed", source, event)
@event_bus.on(AgentReasoningFailedEvent)
def on_agent_reasoning_failed(source, event):
def on_agent_reasoning_failed(source: Any, event: Any) -> None:
self._handle_action_event("agent_reasoning_failed", source, event)
def _initialize_crew_batch(self, source: Any, event: Any):
def _initialize_crew_batch(self, source: Any, event: Any) -> None:
"""Initialize trace batch"""
user_context = self._get_user_context()
execution_metadata = {
@@ -305,7 +307,7 @@ class TraceCollectionListener(BaseEventListener):
self._initialize_batch(user_context, execution_metadata)
def _initialize_flow_batch(self, source: Any, event: Any):
def _initialize_flow_batch(self, source: Any, event: Any) -> None:
"""Initialize trace batch for Flow execution"""
user_context = self._get_user_context()
execution_metadata = {
@@ -333,14 +335,14 @@ class TraceCollectionListener(BaseEventListener):
user_context, execution_metadata, use_ephemeral=False
)
def _handle_trace_event(self, event_type: str, source: Any, event: Any):
def _handle_trace_event(self, event_type: str, source: Any, event: Any) -> None:
"""Generic handler for context end events"""
trace_event = self._create_trace_event(event_type, source, event)
self.batch_manager.add_event(trace_event)
def _handle_action_event(self, event_type: str, source: Any, event: Any):
def _handle_action_event(self, event_type: str, source: Any, event: Any) -> None:
"""Generic handler for action events (LLM calls, tool usage)"""
if not self.batch_manager.is_batch_initialized():
@@ -437,7 +439,9 @@ class TraceCollectionListener(BaseEventListener):
return {"serialization_error": str(e), "object_type": type(obj).__name__}
# TODO: move to utils
def _truncate_messages(self, messages, max_content_length=500, max_messages=5):
def _truncate_messages(
self, messages: Any, max_content_length: int = 500, max_messages: int = 5
) -> Any:
"""Truncate message content and limit number of messages"""
if not messages or not isinstance(messages, list):
return messages

View File

@@ -7,6 +7,7 @@ from typing import Any, Optional, Union
import chromadb
import chromadb.errors
from chromadb import EmbeddingFunction
from chromadb.api import ClientAPI
from chromadb.api.types import OneOrMany
from chromadb.config import Settings
@@ -29,6 +30,7 @@ class KnowledgeStorage(BaseKnowledgeStorage):
collection: Optional[chromadb.Collection] = None
collection_name: Optional[str] = "knowledge"
app: Optional[ClientAPI] = None
embedder: Optional[EmbeddingFunction] = None
def __init__(
self,
@@ -42,7 +44,7 @@ class KnowledgeStorage(BaseKnowledgeStorage):
self,
query: list[str],
limit: int = 3,
filter: Optional[dict] = None,
filter: Optional[dict[str, Any]] = None,
score_threshold: float = 0.35,
) -> list[dict[str, Any]]:
with suppress_logging(
@@ -55,14 +57,16 @@ class KnowledgeStorage(BaseKnowledgeStorage):
where=filter,
)
results = []
for i in range(len(fetched["ids"][0])): # type: ignore
for i in range(len(fetched["ids"][0])):
result = {
"id": fetched["ids"][0][i], # type: ignore
"metadata": fetched["metadatas"][0][i], # type: ignore
"context": fetched["documents"][0][i], # type: ignore
"score": fetched["distances"][0][i], # type: ignore
"id": fetched["ids"][0][i],
"metadata": fetched["metadatas"][0][i],
"context": fetched["documents"][0][i],
"score": fetched["distances"][0][i],
}
if result["score"] >= score_threshold:
if (
result["score"] <= score_threshold
): # Note: distances are smaller when more similar
results.append(result)
return results
else:
@@ -114,7 +118,7 @@ class KnowledgeStorage(BaseKnowledgeStorage):
self,
documents: list[str],
metadata: Optional[dict[str, Any] | list[dict[str, Any]]] = None,
):
) -> None:
if not self.collection:
raise Exception("Collection not initialized")
@@ -169,7 +173,7 @@ class KnowledgeStorage(BaseKnowledgeStorage):
Logger(verbose=True).log("error", f"Failed to upsert documents: {e}", "red")
raise
def _create_default_embedding_function(self):
def _create_default_embedding_function(self) -> Any:
from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction,
)

View File

@@ -5,6 +5,7 @@ import uuid
import warnings
from typing import Any, Optional
from chromadb import EmbeddingFunction
from chromadb.api import ClientAPI
from crewai.rag.embeddings.configurator import EmbeddingConfigurator
@@ -22,6 +23,7 @@ class RAGStorage(BaseRAGStorage):
"""
app: ClientAPI | None = None
embedder_config: EmbeddingFunction | None = None # type: ignore[assignment]
def __init__(
self,
@@ -44,11 +46,11 @@ class RAGStorage(BaseRAGStorage):
self.path = path
self._initialize_app()
def _set_embedder_config(self):
def _set_embedder_config(self) -> None:
configurator = EmbeddingConfigurator()
self.embedder_config = configurator.configure_embedder(self.embedder_config)
def _initialize_app(self):
def _initialize_app(self) -> None:
from chromadb.config import Settings
# Suppress deprecation warnings from chromadb, which are not relevant to us
@@ -103,7 +105,7 @@ class RAGStorage(BaseRAGStorage):
self,
query: str,
limit: int = 3,
filter: Optional[dict] = None,
filter: Optional[dict[str, Any]] = None,
score_threshold: float = 0.35,
) -> list[Any]:
if not hasattr(self, "app"):
@@ -116,15 +118,24 @@ class RAGStorage(BaseRAGStorage):
response = self.collection.query(query_texts=query, n_results=limit)
results = []
for i in range(len(response["ids"][0])):
result = {
"id": response["ids"][0][i],
"metadata": response["metadatas"][0][i],
"context": response["documents"][0][i],
"score": response["distances"][0][i],
}
if result["score"] >= score_threshold:
results.append(result)
if response and "ids" in response and response["ids"]:
for i in range(len(response["ids"][0])):
result = {
"id": response["ids"][0][i],
"metadata": response["metadatas"][0][i]
if response.get("metadatas")
else {},
"context": response["documents"][0][i]
if response.get("documents")
else "",
"score": response["distances"][0][i]
if response.get("distances")
else 1.0,
}
if (
result["score"] <= score_threshold
): # Note: distances are smaller when more similar
results.append(result)
return results
except Exception as e:

View File

@@ -33,7 +33,7 @@ logger = logging.getLogger(__name__)
@contextmanager
def suppress_warnings():
def suppress_warnings() -> Any:
with warnings.catch_warnings():
warnings.filterwarnings("ignore")
yield
@@ -45,7 +45,7 @@ if TYPE_CHECKING:
class SafeOTLPSpanExporter(OTLPSpanExporter):
def export(self, spans) -> SpanExportResult:
def export(self, spans: Any) -> SpanExportResult:
try:
return super().export(spans)
except Exception as e:
@@ -69,7 +69,7 @@ class Telemetry:
_instance = None
_lock = threading.Lock()
def __new__(cls):
def __new__(cls) -> Telemetry:
if cls._instance is None:
with cls._lock:
if cls._instance is None:
@@ -144,10 +144,10 @@ class Telemetry:
except Exception:
pass
def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None):
def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None) -> None:
"""Records the creation of a crew."""
def operation():
def operation() -> Any:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Crew Created")
self._add_attribute(
@@ -352,7 +352,7 @@ class Telemetry:
def task_started(self, crew: Crew, task: Task) -> Span | None:
"""Records task started in a crew."""
def operation():
def operation() -> Any:
tracer = trace.get_tracer("crewai.telemetry")
created_span = tracer.start_span("Task Created")
@@ -439,7 +439,7 @@ class Telemetry:
self._safe_telemetry_operation(operation)
return None
def task_ended(self, span: Span, task: Task, crew: Crew):
def task_ended(self, span: Span, task: Task, crew: Crew) -> None:
"""Records the completion of a task execution in a crew.
Args:
@@ -451,7 +451,7 @@ class Telemetry:
If share_crew is enabled, this will also record the task output
"""
def operation():
def operation() -> Any:
# Ensure fingerprint data is present on completion span
if hasattr(task, "fingerprint") and task.fingerprint:
self._add_attribute(span, "task_fingerprint", task.fingerprint.uuid_str)
@@ -468,7 +468,7 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def tool_repeated_usage(self, llm: Any, tool_name: str, attempts: int):
def tool_repeated_usage(self, llm: Any, tool_name: str, attempts: int) -> None:
"""Records when a tool is used repeatedly, which might indicate an issue.
Args:
@@ -477,7 +477,7 @@ class Telemetry:
attempts (int): Number of attempts made with this tool
"""
def operation():
def operation() -> Any:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Tool Repeated Usage")
self._add_attribute(
@@ -494,7 +494,9 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def tool_usage(self, llm: Any, tool_name: str, attempts: int, agent: Any = None):
def tool_usage(
self, llm: Any, tool_name: str, attempts: int, agent: Any = None
) -> None:
"""Records the usage of a tool by an agent.
Args:
@@ -504,7 +506,7 @@ class Telemetry:
agent (Any, optional): The agent using the tool
"""
def operation():
def operation() -> Any:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Tool Usage")
self._add_attribute(
@@ -541,7 +543,7 @@ class Telemetry:
tool_name (str, optional): Name of the tool that caused the error
"""
def operation():
def operation() -> Any:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Tool Usage Error")
self._add_attribute(
@@ -580,7 +582,7 @@ class Telemetry:
model_name (str): Name of the model used
"""
def operation():
def operation() -> Any:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Crew Individual Test Result")
@@ -615,7 +617,7 @@ class Telemetry:
model_name (str): Name of the model used in testing
"""
def operation():
def operation() -> Any:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Crew Test Execution")
@@ -639,10 +641,10 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def deploy_signup_error_span(self):
def deploy_signup_error_span(self) -> None:
"""Records when an error occurs during the deployment signup process."""
def operation():
def operation() -> Any:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Deploy Signup Error")
span.set_status(Status(StatusCode.OK))
@@ -650,14 +652,14 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def start_deployment_span(self, uuid: Optional[str] = None):
def start_deployment_span(self, uuid: Optional[str] = None) -> None:
"""Records the start of a deployment process.
Args:
uuid (Optional[str]): Unique identifier for the deployment
"""
def operation():
def operation() -> Any:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Start Deployment")
if uuid:
@@ -667,10 +669,10 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def create_crew_deployment_span(self):
def create_crew_deployment_span(self) -> None:
"""Records the creation of a new crew deployment."""
def operation():
def operation() -> Any:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Create Crew Deployment")
span.set_status(Status(StatusCode.OK))
@@ -678,7 +680,9 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def get_crew_logs_span(self, uuid: Optional[str], log_type: str = "deployment"):
def get_crew_logs_span(
self, uuid: Optional[str], log_type: str = "deployment"
) -> None:
"""Records the retrieval of crew logs.
Args:
@@ -686,7 +690,7 @@ class Telemetry:
log_type (str, optional): Type of logs being retrieved. Defaults to "deployment".
"""
def operation():
def operation() -> Any:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Get Crew Logs")
self._add_attribute(span, "log_type", log_type)
@@ -697,14 +701,14 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def remove_crew_span(self, uuid: Optional[str] = None):
def remove_crew_span(self, uuid: Optional[str] = None) -> None:
"""Records the removal of a crew.
Args:
uuid (Optional[str]): Unique identifier for the crew being removed
"""
def operation():
def operation() -> Any:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Remove Crew")
if uuid:
@@ -714,13 +718,13 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def crew_execution_span(self, crew: Crew, inputs: dict[str, Any] | None):
def crew_execution_span(self, crew: Crew, inputs: dict[str, Any] | None) -> None:
"""Records the complete execution of a crew.
This is only collected if the user has opted-in to share the crew.
"""
self.crew_creation(crew, inputs)
def operation():
def operation() -> Any:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Crew Execution")
self._add_attribute(
@@ -792,7 +796,7 @@ class Telemetry:
return None
def end_crew(self, crew: Any, final_string_output: str) -> None:
def operation():
def operation() -> Any:
self._add_attribute(
crew._execution_span,
"crewai_version",
@@ -821,22 +825,22 @@ class Telemetry:
if crew.share_crew:
self._safe_telemetry_operation(operation)
def _add_attribute(self, span, key, value):
def _add_attribute(self, span: Any, key: str, value: Any) -> None:
"""Add an attribute to a span."""
def operation():
def operation() -> Any:
return span.set_attribute(key, value)
self._safe_telemetry_operation(operation)
def flow_creation_span(self, flow_name: str):
def flow_creation_span(self, flow_name: str) -> None:
"""Records the creation of a new flow.
Args:
flow_name (str): Name of the flow being created
"""
def operation():
def operation() -> Any:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Creation")
self._add_attribute(span, "flow_name", flow_name)
@@ -845,7 +849,7 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def flow_plotting_span(self, flow_name: str, node_names: list[str]):
def flow_plotting_span(self, flow_name: str, node_names: list[str]) -> None:
"""Records flow visualization/plotting activity.
Args:
@@ -853,7 +857,7 @@ class Telemetry:
node_names (list[str]): List of node names in the flow
"""
def operation():
def operation() -> Any:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Plotting")
self._add_attribute(span, "flow_name", flow_name)
@@ -863,7 +867,7 @@ class Telemetry:
self._safe_telemetry_operation(operation)
def flow_execution_span(self, flow_name: str, node_names: list[str]):
def flow_execution_span(self, flow_name: str, node_names: list[str]) -> None:
"""Records the execution of a flow.
Args:
@@ -871,7 +875,7 @@ class Telemetry:
node_names (list[str]): List of nodes being executed in the flow
"""
def operation():
def operation() -> Any:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Execution")
self._add_attribute(span, "flow_name", flow_name)