mirror of
https://github.com/crewAIInc/crewAI.git
synced 2025-12-16 12:28:30 +00:00
Compare commits
2 Commits
devin/1764
...
brandon/cr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1693be3e53 | ||
|
|
2418069081 |
@@ -2,6 +2,8 @@ from pathlib import Path
|
||||
|
||||
import click
|
||||
|
||||
from crewai.telemetry import Telemetry
|
||||
|
||||
|
||||
def create_flow(name):
|
||||
"""Create a new flow."""
|
||||
@@ -15,6 +17,10 @@ def create_flow(name):
|
||||
click.secho(f"Error: Folder {folder_name} already exists.", fg="red")
|
||||
return
|
||||
|
||||
# Initialize telemetry
|
||||
telemetry = Telemetry()
|
||||
telemetry.flow_creation_span(class_name)
|
||||
|
||||
# Create directory structure
|
||||
(project_root / "src" / folder_name).mkdir(parents=True)
|
||||
(project_root / "src" / folder_name / "crews").mkdir(parents=True)
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
# flow.py
|
||||
|
||||
# flow.py
|
||||
|
||||
import asyncio
|
||||
import inspect
|
||||
from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union
|
||||
@@ -9,6 +7,7 @@ from typing import Any, Callable, Dict, Generic, List, Set, Type, TypeVar, Union
|
||||
from pydantic import BaseModel
|
||||
|
||||
from crewai.flow.flow_visualizer import plot_flow
|
||||
from crewai.telemetry import Telemetry
|
||||
|
||||
T = TypeVar("T", bound=Union[BaseModel, Dict[str, Any]])
|
||||
|
||||
@@ -142,6 +141,8 @@ class FlowMeta(type):
|
||||
|
||||
|
||||
class Flow(Generic[T], metaclass=FlowMeta):
|
||||
_telemetry = Telemetry()
|
||||
|
||||
_start_methods: List[str] = []
|
||||
_listeners: Dict[str, tuple[str, List[str]]] = {}
|
||||
_routers: Dict[str, str] = {}
|
||||
@@ -161,6 +162,8 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
self._pending_and_listeners: Dict[str, Set[str]] = {}
|
||||
self._method_outputs: List[Any] = [] # List to store all method outputs
|
||||
|
||||
self._telemetry.flow_creation_span(self.__class__.__name__)
|
||||
|
||||
for method_name in dir(self):
|
||||
if callable(getattr(self, method_name)) and not method_name.startswith(
|
||||
"__"
|
||||
@@ -190,6 +193,10 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
if not self._start_methods:
|
||||
raise ValueError("No start method defined")
|
||||
|
||||
self._telemetry.flow_execution_span(
|
||||
self.__class__.__name__, list(self._methods.keys())
|
||||
)
|
||||
|
||||
# Create tasks for all start methods
|
||||
tasks = [
|
||||
self._execute_start_method(start_method)
|
||||
@@ -270,5 +277,9 @@ class Flow(Generic[T], metaclass=FlowMeta):
|
||||
|
||||
traceback.print_exc()
|
||||
|
||||
def plot(self, filename: str = "crewai_flow_graph"):
|
||||
def plot(self, filename: str = "crewai_flow"):
|
||||
self._telemetry.flow_plotting_span(
|
||||
self.__class__.__name__, list(self._methods.keys())
|
||||
)
|
||||
|
||||
plot_flow(self, filename)
|
||||
|
||||
@@ -5,8 +5,8 @@ import json
|
||||
import os
|
||||
import platform
|
||||
import warnings
|
||||
from typing import TYPE_CHECKING, Any, Optional
|
||||
from contextlib import contextmanager
|
||||
from typing import TYPE_CHECKING, Any, Optional
|
||||
|
||||
|
||||
@contextmanager
|
||||
@@ -21,7 +21,9 @@ with suppress_warnings():
|
||||
|
||||
|
||||
from opentelemetry import trace # noqa: E402
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter # noqa: E402
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
|
||||
OTLPSpanExporter, # noqa: E402
|
||||
)
|
||||
from opentelemetry.sdk.resources import SERVICE_NAME, Resource # noqa: E402
|
||||
from opentelemetry.sdk.trace import TracerProvider # noqa: E402
|
||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor # noqa: E402
|
||||
@@ -117,9 +119,11 @@ class Telemetry:
|
||||
"max_iter": agent.max_iter,
|
||||
"max_rpm": agent.max_rpm,
|
||||
"i18n": agent.i18n.prompt_file,
|
||||
"function_calling_llm": agent.function_calling_llm.model
|
||||
if agent.function_calling_llm
|
||||
else "",
|
||||
"function_calling_llm": (
|
||||
agent.function_calling_llm.model
|
||||
if agent.function_calling_llm
|
||||
else ""
|
||||
),
|
||||
"llm": agent.llm.model,
|
||||
"delegation_enabled?": agent.allow_delegation,
|
||||
"allow_code_execution?": agent.allow_code_execution,
|
||||
@@ -145,9 +149,9 @@ class Telemetry:
|
||||
"expected_output": task.expected_output,
|
||||
"async_execution?": task.async_execution,
|
||||
"human_input?": task.human_input,
|
||||
"agent_role": task.agent.role
|
||||
if task.agent
|
||||
else "None",
|
||||
"agent_role": (
|
||||
task.agent.role if task.agent else "None"
|
||||
),
|
||||
"agent_key": task.agent.key if task.agent else None,
|
||||
"context": (
|
||||
[task.description for task in task.context]
|
||||
@@ -184,9 +188,11 @@ class Telemetry:
|
||||
"verbose?": agent.verbose,
|
||||
"max_iter": agent.max_iter,
|
||||
"max_rpm": agent.max_rpm,
|
||||
"function_calling_llm": agent.function_calling_llm.model
|
||||
if agent.function_calling_llm
|
||||
else "",
|
||||
"function_calling_llm": (
|
||||
agent.function_calling_llm.model
|
||||
if agent.function_calling_llm
|
||||
else ""
|
||||
),
|
||||
"llm": agent.llm.model,
|
||||
"delegation_enabled?": agent.allow_delegation,
|
||||
"allow_code_execution?": agent.allow_code_execution,
|
||||
@@ -210,9 +216,9 @@ class Telemetry:
|
||||
"id": str(task.id),
|
||||
"async_execution?": task.async_execution,
|
||||
"human_input?": task.human_input,
|
||||
"agent_role": task.agent.role
|
||||
if task.agent
|
||||
else "None",
|
||||
"agent_role": (
|
||||
task.agent.role if task.agent else "None"
|
||||
),
|
||||
"agent_key": task.agent.key if task.agent else None,
|
||||
"tools_names": [
|
||||
tool.name.casefold()
|
||||
@@ -568,3 +574,38 @@ class Telemetry:
|
||||
return span.set_attribute(key, value)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def flow_creation_span(self, flow_name: str):
|
||||
if self.ready:
|
||||
try:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
span = tracer.start_span("Flow Creation")
|
||||
self._add_attribute(span, "flow_name", flow_name)
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.end()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def flow_plotting_span(self, flow_name: str, node_names: list[str]):
|
||||
if self.ready:
|
||||
try:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
span = tracer.start_span("Flow Plotting")
|
||||
self._add_attribute(span, "flow_name", flow_name)
|
||||
self._add_attribute(span, "node_names", json.dumps(node_names))
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.end()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def flow_execution_span(self, flow_name: str, node_names: list[str]):
|
||||
if self.ready:
|
||||
try:
|
||||
tracer = trace.get_tracer("crewai.telemetry")
|
||||
span = tracer.start_span("Flow Execution")
|
||||
self._add_attribute(span, "flow_name", flow_name)
|
||||
self._add_attribute(span, "node_names", json.dumps(node_names))
|
||||
span.set_status(Status(StatusCode.OK))
|
||||
span.end()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user