Brandon/cre 288 add telemetry to flows (#1391)

* Telemetry for flows

* store node names
This commit is contained in:
Brandon Hancock (bhancock_ai)
2024-10-04 12:21:55 -04:00
committed by GitHub
parent ed48efb9aa
commit e8a49e7687
3 changed files with 75 additions and 17 deletions

View File

@@ -2,6 +2,8 @@ from pathlib import Path
import click import click
from crewai.telemetry import Telemetry
def create_flow(name): def create_flow(name):
"""Create a new flow.""" """Create a new flow."""
@@ -15,6 +17,10 @@ def create_flow(name):
click.secho(f"Error: Folder {folder_name} already exists.", fg="red") click.secho(f"Error: Folder {folder_name} already exists.", fg="red")
return return
# Initialize telemetry
telemetry = Telemetry()
telemetry.flow_creation_span(class_name)
# Create directory structure # Create directory structure
(project_root / "src" / folder_name).mkdir(parents=True) (project_root / "src" / folder_name).mkdir(parents=True)
(project_root / "src" / folder_name / "crews").mkdir(parents=True) (project_root / "src" / folder_name / "crews").mkdir(parents=True)

View File

@@ -1,7 +1,5 @@
# flow.py # flow.py
# flow.py
import asyncio import asyncio
import inspect import inspect
from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union
@@ -9,6 +7,7 @@ from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union
from pydantic import BaseModel from pydantic import BaseModel
from crewai.flow.flow_visualizer import plot_flow from crewai.flow.flow_visualizer import plot_flow
from crewai.telemetry import Telemetry
T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]]) T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]])
@@ -142,6 +141,8 @@ class FlowMeta(type):
class Flow(Generic[T], metaclass=FlowMeta): class Flow(Generic[T], metaclass=FlowMeta):
_telemetry = Telemetry()
_start_methods: List[str] = [] _start_methods: List[str] = []
_listeners: Dict[str, tuple[str, List[str]]] = {} _listeners: Dict[str, tuple[str, List[str]]] = {}
_routers: Dict[str, str] = {} _routers: Dict[str, str] = {}
@@ -161,6 +162,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
self._pending_and_listeners: Dict[str, Set[str]] = {} self._pending_and_listeners: Dict[str, Set[str]] = {}
self._method_outputs: List[Any] = [] # List to store all method outputs self._method_outputs: List[Any] = [] # List to store all method outputs
self._telemetry.flow_creation_span(self.__class__.__name__)
for method_name in dir(self): for method_name in dir(self):
if callable(getattr(self, method_name)) and not method_name.startswith( if callable(getattr(self, method_name)) and not method_name.startswith(
"__" "__"
@@ -190,6 +193,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
if not self._start_methods: if not self._start_methods:
raise ValueError("No start method defined") raise ValueError("No start method defined")
self._telemetry.flow_execution_span(
self.__class__.__name__, list(self._methods.keys())
)
# Create tasks for all start methods # Create tasks for all start methods
tasks = [ tasks = [
self._execute_start_method(start_method) self._execute_start_method(start_method)
@@ -270,5 +277,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
traceback.print_exc() traceback.print_exc()
def plot(self, filename: str = "crewai_flow_graph"): def plot(self, filename: str = "crewai_flow"):
self._telemetry.flow_plotting_span(
self.__class__.__name__, list(self._methods.keys())
)
plot_flow(self, filename) plot_flow(self, filename)

View File

@@ -5,8 +5,8 @@ import json
import os import os
import platform import platform
import warnings import warnings
from typing import TYPE_CHECKING, Any, Optional
from contextlib import contextmanager from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Optional
@contextmanager @contextmanager
@@ -21,7 +21,9 @@ with suppress_warnings():
from opentelemetry import trace # noqa: E402 from opentelemetry import trace # noqa: E402
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter # noqa: E402 from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter, # noqa: E402
)
from opentelemetry.sdk.resources import SERVICE_NAME, Resource # noqa: E402 from opentelemetry.sdk.resources import SERVICE_NAME, Resource # noqa: E402
from opentelemetry.sdk.trace import TracerProvider # noqa: E402 from opentelemetry.sdk.trace import TracerProvider # noqa: E402
from opentelemetry.sdk.trace.export import BatchSpanProcessor # noqa: E402 from opentelemetry.sdk.trace.export import BatchSpanProcessor # noqa: E402
@@ -117,9 +119,11 @@ class Telemetry:
"max_iter": agent.max_iter, "max_iter": agent.max_iter,
"max_rpm": agent.max_rpm, "max_rpm": agent.max_rpm,
"i18n": agent.i18n.prompt_file, "i18n": agent.i18n.prompt_file,
"function_calling_llm": agent.function_calling_llm.model "function_calling_llm": (
agent.function_calling_llm.model
if agent.function_calling_llm if agent.function_calling_llm
else "", else ""
),
"llm": agent.llm.model, "llm": agent.llm.model,
"delegation_enabled?": agent.allow_delegation, "delegation_enabled?": agent.allow_delegation,
"allow_code_execution?": agent.allow_code_execution, "allow_code_execution?": agent.allow_code_execution,
@@ -145,9 +149,9 @@ class Telemetry:
"expected_output": task.expected_output, "expected_output": task.expected_output,
"async_execution?": task.async_execution, "async_execution?": task.async_execution,
"human_input?": task.human_input, "human_input?": task.human_input,
"agent_role": task.agent.role "agent_role": (
if task.agent task.agent.role if task.agent else "None"
else "None", ),
"agent_key": task.agent.key if task.agent else None, "agent_key": task.agent.key if task.agent else None,
"context": ( "context": (
[task.description for task in task.context] [task.description for task in task.context]
@@ -184,9 +188,11 @@ class Telemetry:
"verbose?": agent.verbose, "verbose?": agent.verbose,
"max_iter": agent.max_iter, "max_iter": agent.max_iter,
"max_rpm": agent.max_rpm, "max_rpm": agent.max_rpm,
"function_calling_llm": agent.function_calling_llm.model "function_calling_llm": (
agent.function_calling_llm.model
if agent.function_calling_llm if agent.function_calling_llm
else "", else ""
),
"llm": agent.llm.model, "llm": agent.llm.model,
"delegation_enabled?": agent.allow_delegation, "delegation_enabled?": agent.allow_delegation,
"allow_code_execution?": agent.allow_code_execution, "allow_code_execution?": agent.allow_code_execution,
@@ -210,9 +216,9 @@ class Telemetry:
"id": str(task.id), "id": str(task.id),
"async_execution?": task.async_execution, "async_execution?": task.async_execution,
"human_input?": task.human_input, "human_input?": task.human_input,
"agent_role": task.agent.role "agent_role": (
if task.agent task.agent.role if task.agent else "None"
else "None", ),
"agent_key": task.agent.key if task.agent else None, "agent_key": task.agent.key if task.agent else None,
"tools_names": [ "tools_names": [
tool.name.casefold() tool.name.casefold()
@@ -568,3 +574,38 @@ class Telemetry:
return span.set_attribute(key, value) return span.set_attribute(key, value)
except Exception: except Exception:
pass pass
def flow_creation_span(self, flow_name: str):
if self.ready:
try:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Creation")
self._add_attribute(span, "flow_name", flow_name)
span.set_status(Status(StatusCode.OK))
span.end()
except Exception:
pass
def flow_plotting_span(self, flow_name: str, node_names: list[str]):
if self.ready:
try:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Plotting")
self._add_attribute(span, "flow_name", flow_name)
self._add_attribute(span, "node_names", json.dumps(node_names))
span.set_status(Status(StatusCode.OK))
span.end()
except Exception:
pass
def flow_execution_span(self, flow_name: str, node_names: list[str]):
if self.ready:
try:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Flow Execution")
self._add_attribute(span, "flow_name", flow_name)
self._add_attribute(span, "node_names", json.dumps(node_names))
span.set_status(Status(StatusCode.OK))
span.end()
except Exception:
pass