From f98c2c00d8dfbfd9bfeda6b63434a3e6b2c722d0 Mon Sep 17 00:00:00 2001 From: "Brandon Hancock (bhancock_ai)" <109994880+bhancockio@users.noreply.github.com> Date: Fri, 4 Oct 2024 12:21:55 -0400 Subject: [PATCH] Brandon/cre 288 add telemetry to flows (#1391) * Telemetry for flows * store node names --- src/crewai/cli/create_flow.py | 6 +++ src/crewai/flow/flow.py | 17 ++++++-- src/crewai/telemetry/telemetry.py | 69 ++++++++++++++++++++++++------- 3 files changed, 75 insertions(+), 17 deletions(-) diff --git a/src/crewai/cli/create_flow.py b/src/crewai/cli/create_flow.py index a1c10bf46..ec68611b5 100644 --- a/src/crewai/cli/create_flow.py +++ b/src/crewai/cli/create_flow.py @@ -2,6 +2,8 @@ from pathlib import Path import click +from crewai.telemetry import Telemetry + def create_flow(name): """Create a new flow.""" @@ -15,6 +17,10 @@ def create_flow(name): click.secho(f"Error: Folder {folder_name} already exists.", fg="red") return + # Initialize telemetry + telemetry = Telemetry() + telemetry.flow_creation_span(class_name) + # Create directory structure (project_root / "src" / folder_name).mkdir(parents=True) (project_root / "src" / folder_name / "crews").mkdir(parents=True) diff --git a/src/crewai/flow/flow.py b/src/crewai/flow/flow.py index f60e5c1e3..6ae3941a4 100644 --- a/src/crewai/flow/flow.py +++ b/src/crewai/flow/flow.py @@ -1,7 +1,5 @@ # flow.py -# flow.py - import asyncio import inspect from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union @@ -9,6 +7,7 @@ from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union from pydantic import BaseModel from crewai.flow.flow_visualizer import plot_flow +from crewai.telemetry import Telemetry T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]]) @@ -142,6 +141,8 @@ class FlowMeta(type): class Flow(Generic[T], metaclass=FlowMeta): + _telemetry = Telemetry() + _start_methods: List[str] = [] _listeners: Dict[str, tuple[str, List[str]]] = {} _routers: Dict[str, str] = {} @@ -161,6 +162,8 @@ class Flow(Generic[T], metaclass=FlowMeta): self._pending_and_listeners: Dict[str, Set[str]] = {} self._method_outputs: List[Any] = [] # List to store all method outputs + self._telemetry.flow_creation_span(self.__class__.__name__) + for method_name in dir(self): if callable(getattr(self, method_name)) and not method_name.startswith( "__" @@ -190,6 +193,10 @@ class Flow(Generic[T], metaclass=FlowMeta): if not self._start_methods: raise ValueError("No start method defined") + self._telemetry.flow_execution_span( + self.__class__.__name__, list(self._methods.keys()) + ) + # Create tasks for all start methods tasks = [ self._execute_start_method(start_method) @@ -270,5 +277,9 @@ class Flow(Generic[T], metaclass=FlowMeta): traceback.print_exc() - def plot(self, filename: str = "crewai_flow_graph"): + def plot(self, filename: str = "crewai_flow"): + self._telemetry.flow_plotting_span( + self.__class__.__name__, list(self._methods.keys()) + ) + plot_flow(self, filename) diff --git a/src/crewai/telemetry/telemetry.py b/src/crewai/telemetry/telemetry.py index a3928ecd0..30e710549 100644 --- a/src/crewai/telemetry/telemetry.py +++ b/src/crewai/telemetry/telemetry.py @@ -5,8 +5,8 @@ import json import os import platform import warnings -from typing import TYPE_CHECKING, Any, Optional from contextlib import contextmanager +from typing import TYPE_CHECKING, Any, Optional @contextmanager @@ -21,7 +21,9 @@ with suppress_warnings(): from opentelemetry import trace # noqa: E402 -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter # noqa: E402 +from opentelemetry.exporter.otlp.proto.http.trace_exporter import ( + OTLPSpanExporter, # noqa: E402 +) from opentelemetry.sdk.resources import SERVICE_NAME, Resource # noqa: E402 from opentelemetry.sdk.trace import TracerProvider # noqa: E402 from opentelemetry.sdk.trace.export import BatchSpanProcessor # noqa: E402 @@ -117,9 +119,11 @@ class Telemetry: "max_iter": agent.max_iter, "max_rpm": agent.max_rpm, "i18n": agent.i18n.prompt_file, - "function_calling_llm": agent.function_calling_llm.model - if agent.function_calling_llm - else "", + "function_calling_llm": ( + agent.function_calling_llm.model + if agent.function_calling_llm + else "" + ), "llm": agent.llm.model, "delegation_enabled?": agent.allow_delegation, "allow_code_execution?": agent.allow_code_execution, @@ -145,9 +149,9 @@ class Telemetry: "expected_output": task.expected_output, "async_execution?": task.async_execution, "human_input?": task.human_input, - "agent_role": task.agent.role - if task.agent - else "None", + "agent_role": ( + task.agent.role if task.agent else "None" + ), "agent_key": task.agent.key if task.agent else None, "context": ( [task.description for task in task.context] @@ -184,9 +188,11 @@ class Telemetry: "verbose?": agent.verbose, "max_iter": agent.max_iter, "max_rpm": agent.max_rpm, - "function_calling_llm": agent.function_calling_llm.model - if agent.function_calling_llm - else "", + "function_calling_llm": ( + agent.function_calling_llm.model + if agent.function_calling_llm + else "" + ), "llm": agent.llm.model, "delegation_enabled?": agent.allow_delegation, "allow_code_execution?": agent.allow_code_execution, @@ -210,9 +216,9 @@ class Telemetry: "id": str(task.id), "async_execution?": task.async_execution, "human_input?": task.human_input, - "agent_role": task.agent.role - if task.agent - else "None", + "agent_role": ( + task.agent.role if task.agent else "None" + ), "agent_key": task.agent.key if task.agent else None, "tools_names": [ tool.name.casefold() @@ -568,3 +574,38 @@ class Telemetry: return span.set_attribute(key, value) except Exception: pass + + def flow_creation_span(self, flow_name: str): + if self.ready: + try: + tracer = trace.get_tracer("crewai.telemetry") + span = tracer.start_span("Flow Creation") + self._add_attribute(span, "flow_name", flow_name) + span.set_status(Status(StatusCode.OK)) + span.end() + except Exception: + pass + + def flow_plotting_span(self, flow_name: str, node_names: list[str]): + if self.ready: + try: + tracer = trace.get_tracer("crewai.telemetry") + span = tracer.start_span("Flow Plotting") + self._add_attribute(span, "flow_name", flow_name) + self._add_attribute(span, "node_names", json.dumps(node_names)) + span.set_status(Status(StatusCode.OK)) + span.end() + except Exception: + pass + + def flow_execution_span(self, flow_name: str, node_names: list[str]): + if self.ready: + try: + tracer = trace.get_tracer("crewai.telemetry") + span = tracer.start_span("Flow Execution") + self._add_attribute(span, "flow_name", flow_name) + self._add_attribute(span, "node_names", json.dumps(node_names)) + span.set_status(Status(StatusCode.OK)) + span.end() + except Exception: + pass