From 647bd794ed933ab32f02849d678b9693cbefa918 Mon Sep 17 00:00:00 2001 From: Gui Vieira Date: Fri, 28 Jun 2024 20:05:47 -0300 Subject: [PATCH] Improve telemetry (#818) * Improve telemetry * Minor adjustments * Try to fix typing error * Try to fix typing error [2] --- src/crewai/crew.py | 2 +- src/crewai/task.py | 44 ++++++++++++++++---- src/crewai/telemetry/telemetry.py | 67 ++++++++++++++++++++++++++----- 3 files changed, 94 insertions(+), 19 deletions(-) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 9164f4a3f..0a10e9620 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -281,7 +281,7 @@ class Crew(BaseModel): inputs: Optional[Dict[str, Any]] = {}, ) -> Union[str, Dict[str, Any]]: """Starts the crew to work on its assigned tasks.""" - self._execution_span = self._telemetry.crew_execution_span(self) + self._execution_span = self._telemetry.crew_execution_span(self, inputs) # type: ignore # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]" self._interpolate_inputs(inputs) self._set_tasks_callbacks() diff --git a/src/crewai/task.py b/src/crewai/task.py index 0e8bf434f..4d8d8f00e 100644 --- a/src/crewai/task.py +++ b/src/crewai/task.py @@ -1,18 +1,20 @@ -from copy import deepcopy import os import re import threading import uuid +from copy import deepcopy from typing import Any, Dict, List, Optional, Type from langchain_openai import ChatOpenAI +from opentelemetry.trace import Span from pydantic import UUID4, BaseModel, Field, field_validator, model_validator from pydantic_core import PydanticCustomError +from crewai.agents.agent_builder.base_agent import BaseAgent from crewai.tasks.task_output import TaskOutput +from crewai.telemetry.telemetry import Telemetry from crewai.utilities import I18N, ConverterError, Printer from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser -from crewai.agents.agent_builder.base_agent import BaseAgent class Task(BaseModel): @@ -42,7 +44,6 @@ class Task(BaseModel): tools_errors: int = 0 delegations: int = 0 i18n: I18N = I18N() - thread: Optional[threading.Thread] = None prompt_context: Optional[str] = None description: str = Field(description="Description of the actual task.") expected_output: str = Field( @@ -95,8 +96,11 @@ class Task(BaseModel): default=False, ) + _telemetry: Telemetry + _execution_span: Span | None = None _original_description: str | None = None _original_expected_output: str | None = None + _thread: threading.Thread | None = None def __init__(__pydantic_self__, **data): config = data.pop("config", {}) @@ -118,6 +122,12 @@ class Task(BaseModel): return value[1:] return value + @model_validator(mode="after") + def set_private_attrs(self) -> "Task": + """Set private attributes.""" + self._telemetry = Telemetry() + return self + @model_validator(mode="after") def set_attributes_based_on_config(self) -> "Task": """Set attributes based on the agent configuration.""" @@ -145,6 +155,18 @@ class Task(BaseModel): ) return self + def wait_for_completion(self) -> str | BaseModel: + """Wait for asynchronous task completion and return the output.""" + assert self.async_execution, "Task is not set to be executed asynchronously." + + if self._thread: + self._thread.join() + self._thread = None + + assert self.output, "Task output is not set." + + return self.output.exported_output + def execute( # type: ignore # Missing return statement self, agent: BaseAgent | None = None, @@ -157,6 +179,8 @@ class Task(BaseModel): Output of the task. """ + self._execution_span = self._telemetry.task_started(self) + agent = agent or self.agent if not agent: raise Exception( @@ -168,8 +192,8 @@ class Task(BaseModel): context = [] for task in self.context: if task.async_execution: - task.thread.join() # type: ignore # Item "None" of "Thread | None" has no attribute "join" - if task and task.output: + task.wait_for_completion() + if task.output: # type: ignore # Item "str" of "str | None" has no attribute "append" context.append(task.output.raw_output) # type: ignore # Argument 1 to "join" of "str" has incompatible type "str | None"; expected "Iterable[str]" @@ -179,10 +203,10 @@ class Task(BaseModel): tools = tools or self.tools if self.async_execution: - self.thread = threading.Thread( + self._thread = threading.Thread( target=self._execute, args=(agent, self, context, tools) ) - self.thread.start() + self._thread.start() else: result = self._execute( task=self, @@ -200,7 +224,7 @@ class Task(BaseModel): ) exported_output = self._export_output(result) - # type: the responses are usually str but need to figuire out a more elegant solution here + # type: ignore # the responses are usually str but need to figure out a more elegant solution here self.output = TaskOutput( description=self.description, exported_output=exported_output, @@ -211,6 +235,10 @@ class Task(BaseModel): if self.callback: self.callback(self.output) + if self._execution_span: + self._telemetry.task_ended(self._execution_span, self) + self._execution_span = None + return exported_output def prompt(self) -> str: diff --git a/src/crewai/telemetry/telemetry.py b/src/crewai/telemetry/telemetry.py index 067d1bb22..f9bd6bfdb 100644 --- a/src/crewai/telemetry/telemetry.py +++ b/src/crewai/telemetry/telemetry.py @@ -1,8 +1,10 @@ +from __future__ import annotations + import asyncio import json import os import platform -from typing import Any +from typing import TYPE_CHECKING, Any import pkg_resources from opentelemetry import trace @@ -10,7 +12,11 @@ from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExport from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor -from opentelemetry.trace import Status, StatusCode +from opentelemetry.trace import Span, Status, StatusCode + +if TYPE_CHECKING: + from crewai.crew import Crew + from crewai.task import Task class Telemetry: @@ -88,9 +94,6 @@ class Telemetry: self._add_attribute(span, "python_version", platform.python_version()) self._add_attribute(span, "crew_id", str(crew.id)) self._add_attribute(span, "crew_process", crew.process) - self._add_attribute( - span, "crew_language", crew.prompt_file if crew.i18n else "None" - ) self._add_attribute(span, "crew_memory", crew.memory) self._add_attribute(span, "crew_number_of_tasks", len(crew.tasks)) self._add_attribute(span, "crew_number_of_agents", len(crew.agents)) @@ -102,6 +105,8 @@ class Telemetry: { "id": str(agent.id), "role": agent.role, + "goal": agent.goal, + "backstory": agent.backstory, "verbose?": agent.verbose, "max_iter": agent.max_iter, "max_rpm": agent.max_rpm, @@ -123,8 +128,16 @@ class Telemetry: [ { "id": str(task.id), + "description": task.description, + "expected_output": task.expected_output, "async_execution?": task.async_execution, + "human_input?": task.human_input, "agent_role": task.agent.role if task.agent else "None", + "context": ( + [task.description for task in task.context] + if task.context + else None + ), "tools_names": [ tool.name.casefold() for tool in task.tools ], @@ -143,6 +156,38 @@ class Telemetry: except Exception: pass + def task_started(self, task: Task) -> Span | None: + """Records task started in a crew.""" + if self.ready: + try: + tracer = trace.get_tracer("crewai.telemetry") + span = tracer.start_span("Task Execution") + + self._add_attribute(span, "task_id", str(task.id)) + self._add_attribute(span, "formatted_description", task.description) + self._add_attribute( + span, "formatted_expected_output", task.expected_output + ) + + return span + except Exception: + pass + + return None + + def task_ended(self, span: Span, task: Task): + """Records task execution in a crew.""" + if self.ready: + try: + self._add_attribute( + span, "output", task.output.raw_output if task.output else "" + ) + + span.set_status(Status(StatusCode.OK)) + span.end() + except Exception: + pass + def tool_repeated_usage(self, llm: Any, tool_name: str, attempts: int): """Records the repeated usage 'error' of a tool by an agent.""" if self.ready: @@ -207,7 +252,7 @@ class Telemetry: except Exception: pass - def crew_execution_span(self, crew): + def crew_execution_span(self, crew: Crew, inputs: dict[str, Any] | None): """Records the complete execution of a crew. This is only collected if the user has opted-in to share the crew. """ @@ -221,6 +266,7 @@ class Telemetry: pkg_resources.get_distribution("crewai").version, ) self._add_attribute(span, "crew_id", str(crew.id)) + self._add_attribute(span, "inputs", json.dumps(inputs)) self._add_attribute( span, "crew_agents", @@ -238,7 +284,7 @@ class Telemetry: "llm": json.dumps(self._safe_llm_attributes(agent.llm)), "delegation_enabled?": agent.allow_delegation, "tools_names": [ - tool.name.casefold() for tool in agent.tools + tool.name.casefold() for tool in agent.tools or [] ], } for agent in crew.agents @@ -253,16 +299,17 @@ class Telemetry: { "id": str(task.id), "description": task.description, + "expected_output": task.expected_output, "async_execution?": task.async_execution, - "output": task.expected_output, + "human_input?": task.human_input, "agent_role": task.agent.role if task.agent else "None", "context": ( [task.description for task in task.context] if task.context - else "None" + else None ), "tools_names": [ - tool.name.casefold() for tool in task.tools + tool.name.casefold() for tool in task.tools or [] ], } for task in crew.tasks