Improve telemetry (#818)

* Improve telemetry

* Minor adjustments

* Try to fix typing error

* Try to fix typing error [2]
This commit is contained in:
Gui Vieira
2024-06-28 20:05:47 -03:00
committed by GitHub
parent 0807798777
commit 647bd794ed
3 changed files with 94 additions and 19 deletions

View File

@@ -281,7 +281,7 @@ class Crew(BaseModel):
inputs: Optional[Dict[str, Any]] = {}, inputs: Optional[Dict[str, Any]] = {},
) -> Union[str, Dict[str, Any]]: ) -> Union[str, Dict[str, Any]]:
"""Starts the crew to work on its assigned tasks.""" """Starts the crew to work on its assigned tasks."""
self._execution_span = self._telemetry.crew_execution_span(self) self._execution_span = self._telemetry.crew_execution_span(self, inputs)
# type: ignore # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]" # type: ignore # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
self._interpolate_inputs(inputs) self._interpolate_inputs(inputs)
self._set_tasks_callbacks() self._set_tasks_callbacks()

View File

@@ -1,18 +1,20 @@
from copy import deepcopy
import os import os
import re import re
import threading import threading
import uuid import uuid
from copy import deepcopy
from typing import Any, Dict, List, Optional, Type from typing import Any, Dict, List, Optional, Type
from langchain_openai import ChatOpenAI from langchain_openai import ChatOpenAI
from opentelemetry.trace import Span
from pydantic import UUID4, BaseModel, Field, field_validator, model_validator from pydantic import UUID4, BaseModel, Field, field_validator, model_validator
from pydantic_core import PydanticCustomError from pydantic_core import PydanticCustomError
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tasks.task_output import TaskOutput from crewai.tasks.task_output import TaskOutput
from crewai.telemetry.telemetry import Telemetry
from crewai.utilities import I18N, ConverterError, Printer from crewai.utilities import I18N, ConverterError, Printer
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
from crewai.agents.agent_builder.base_agent import BaseAgent
class Task(BaseModel): class Task(BaseModel):
@@ -42,7 +44,6 @@ class Task(BaseModel):
tools_errors: int = 0 tools_errors: int = 0
delegations: int = 0 delegations: int = 0
i18n: I18N = I18N() i18n: I18N = I18N()
thread: Optional[threading.Thread] = None
prompt_context: Optional[str] = None prompt_context: Optional[str] = None
description: str = Field(description="Description of the actual task.") description: str = Field(description="Description of the actual task.")
expected_output: str = Field( expected_output: str = Field(
@@ -95,8 +96,11 @@ class Task(BaseModel):
default=False, default=False,
) )
_telemetry: Telemetry
_execution_span: Span | None = None
_original_description: str | None = None _original_description: str | None = None
_original_expected_output: str | None = None _original_expected_output: str | None = None
_thread: threading.Thread | None = None
def __init__(__pydantic_self__, **data): def __init__(__pydantic_self__, **data):
config = data.pop("config", {}) config = data.pop("config", {})
@@ -118,6 +122,12 @@ class Task(BaseModel):
return value[1:] return value[1:]
return value return value
@model_validator(mode="after")
def set_private_attrs(self) -> "Task":
"""Set private attributes."""
self._telemetry = Telemetry()
return self
@model_validator(mode="after") @model_validator(mode="after")
def set_attributes_based_on_config(self) -> "Task": def set_attributes_based_on_config(self) -> "Task":
"""Set attributes based on the agent configuration.""" """Set attributes based on the agent configuration."""
@@ -145,6 +155,18 @@ class Task(BaseModel):
) )
return self return self
def wait_for_completion(self) -> str | BaseModel:
"""Wait for asynchronous task completion and return the output."""
assert self.async_execution, "Task is not set to be executed asynchronously."
if self._thread:
self._thread.join()
self._thread = None
assert self.output, "Task output is not set."
return self.output.exported_output
def execute( # type: ignore # Missing return statement def execute( # type: ignore # Missing return statement
self, self,
agent: BaseAgent | None = None, agent: BaseAgent | None = None,
@@ -157,6 +179,8 @@ class Task(BaseModel):
Output of the task. Output of the task.
""" """
self._execution_span = self._telemetry.task_started(self)
agent = agent or self.agent agent = agent or self.agent
if not agent: if not agent:
raise Exception( raise Exception(
@@ -168,8 +192,8 @@ class Task(BaseModel):
context = [] context = []
for task in self.context: for task in self.context:
if task.async_execution: if task.async_execution:
task.thread.join() # type: ignore # Item "None" of "Thread | None" has no attribute "join" task.wait_for_completion()
if task and task.output: if task.output:
# type: ignore # Item "str" of "str | None" has no attribute "append" # type: ignore # Item "str" of "str | None" has no attribute "append"
context.append(task.output.raw_output) context.append(task.output.raw_output)
# type: ignore # Argument 1 to "join" of "str" has incompatible type "str | None"; expected "Iterable[str]" # type: ignore # Argument 1 to "join" of "str" has incompatible type "str | None"; expected "Iterable[str]"
@@ -179,10 +203,10 @@ class Task(BaseModel):
tools = tools or self.tools tools = tools or self.tools
if self.async_execution: if self.async_execution:
self.thread = threading.Thread( self._thread = threading.Thread(
target=self._execute, args=(agent, self, context, tools) target=self._execute, args=(agent, self, context, tools)
) )
self.thread.start() self._thread.start()
else: else:
result = self._execute( result = self._execute(
task=self, task=self,
@@ -200,7 +224,7 @@ class Task(BaseModel):
) )
exported_output = self._export_output(result) exported_output = self._export_output(result)
# type: the responses are usually str but need to figuire out a more elegant solution here # type: ignore # the responses are usually str but need to figure out a more elegant solution here
self.output = TaskOutput( self.output = TaskOutput(
description=self.description, description=self.description,
exported_output=exported_output, exported_output=exported_output,
@@ -211,6 +235,10 @@ class Task(BaseModel):
if self.callback: if self.callback:
self.callback(self.output) self.callback(self.output)
if self._execution_span:
self._telemetry.task_ended(self._execution_span, self)
self._execution_span = None
return exported_output return exported_output
def prompt(self) -> str: def prompt(self) -> str:

View File

@@ -1,8 +1,10 @@
from __future__ import annotations
import asyncio import asyncio
import json import json
import os import os
import platform import platform
from typing import Any from typing import TYPE_CHECKING, Any
import pkg_resources import pkg_resources
from opentelemetry import trace from opentelemetry import trace
@@ -10,7 +12,11 @@ from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExport
from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import Status, StatusCode from opentelemetry.trace import Span, Status, StatusCode
if TYPE_CHECKING:
from crewai.crew import Crew
from crewai.task import Task
class Telemetry: class Telemetry:
@@ -88,9 +94,6 @@ class Telemetry:
self._add_attribute(span, "python_version", platform.python_version()) self._add_attribute(span, "python_version", platform.python_version())
self._add_attribute(span, "crew_id", str(crew.id)) self._add_attribute(span, "crew_id", str(crew.id))
self._add_attribute(span, "crew_process", crew.process) self._add_attribute(span, "crew_process", crew.process)
self._add_attribute(
span, "crew_language", crew.prompt_file if crew.i18n else "None"
)
self._add_attribute(span, "crew_memory", crew.memory) self._add_attribute(span, "crew_memory", crew.memory)
self._add_attribute(span, "crew_number_of_tasks", len(crew.tasks)) self._add_attribute(span, "crew_number_of_tasks", len(crew.tasks))
self._add_attribute(span, "crew_number_of_agents", len(crew.agents)) self._add_attribute(span, "crew_number_of_agents", len(crew.agents))
@@ -102,6 +105,8 @@ class Telemetry:
{ {
"id": str(agent.id), "id": str(agent.id),
"role": agent.role, "role": agent.role,
"goal": agent.goal,
"backstory": agent.backstory,
"verbose?": agent.verbose, "verbose?": agent.verbose,
"max_iter": agent.max_iter, "max_iter": agent.max_iter,
"max_rpm": agent.max_rpm, "max_rpm": agent.max_rpm,
@@ -123,8 +128,16 @@ class Telemetry:
[ [
{ {
"id": str(task.id), "id": str(task.id),
"description": task.description,
"expected_output": task.expected_output,
"async_execution?": task.async_execution, "async_execution?": task.async_execution,
"human_input?": task.human_input,
"agent_role": task.agent.role if task.agent else "None", "agent_role": task.agent.role if task.agent else "None",
"context": (
[task.description for task in task.context]
if task.context
else None
),
"tools_names": [ "tools_names": [
tool.name.casefold() for tool in task.tools tool.name.casefold() for tool in task.tools
], ],
@@ -143,6 +156,38 @@ class Telemetry:
except Exception: except Exception:
pass pass
def task_started(self, task: Task) -> Span | None:
"""Records task started in a crew."""
if self.ready:
try:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Task Execution")
self._add_attribute(span, "task_id", str(task.id))
self._add_attribute(span, "formatted_description", task.description)
self._add_attribute(
span, "formatted_expected_output", task.expected_output
)
return span
except Exception:
pass
return None
def task_ended(self, span: Span, task: Task):
"""Records task execution in a crew."""
if self.ready:
try:
self._add_attribute(
span, "output", task.output.raw_output if task.output else ""
)
span.set_status(Status(StatusCode.OK))
span.end()
except Exception:
pass
def tool_repeated_usage(self, llm: Any, tool_name: str, attempts: int): def tool_repeated_usage(self, llm: Any, tool_name: str, attempts: int):
"""Records the repeated usage 'error' of a tool by an agent.""" """Records the repeated usage 'error' of a tool by an agent."""
if self.ready: if self.ready:
@@ -207,7 +252,7 @@ class Telemetry:
except Exception: except Exception:
pass pass
def crew_execution_span(self, crew): def crew_execution_span(self, crew: Crew, inputs: dict[str, Any] | None):
"""Records the complete execution of a crew. """Records the complete execution of a crew.
This is only collected if the user has opted-in to share the crew. This is only collected if the user has opted-in to share the crew.
""" """
@@ -221,6 +266,7 @@ class Telemetry:
pkg_resources.get_distribution("crewai").version, pkg_resources.get_distribution("crewai").version,
) )
self._add_attribute(span, "crew_id", str(crew.id)) self._add_attribute(span, "crew_id", str(crew.id))
self._add_attribute(span, "inputs", json.dumps(inputs))
self._add_attribute( self._add_attribute(
span, span,
"crew_agents", "crew_agents",
@@ -238,7 +284,7 @@ class Telemetry:
"llm": json.dumps(self._safe_llm_attributes(agent.llm)), "llm": json.dumps(self._safe_llm_attributes(agent.llm)),
"delegation_enabled?": agent.allow_delegation, "delegation_enabled?": agent.allow_delegation,
"tools_names": [ "tools_names": [
tool.name.casefold() for tool in agent.tools tool.name.casefold() for tool in agent.tools or []
], ],
} }
for agent in crew.agents for agent in crew.agents
@@ -253,16 +299,17 @@ class Telemetry:
{ {
"id": str(task.id), "id": str(task.id),
"description": task.description, "description": task.description,
"expected_output": task.expected_output,
"async_execution?": task.async_execution, "async_execution?": task.async_execution,
"output": task.expected_output, "human_input?": task.human_input,
"agent_role": task.agent.role if task.agent else "None", "agent_role": task.agent.role if task.agent else "None",
"context": ( "context": (
[task.description for task in task.context] [task.description for task in task.context]
if task.context if task.context
else "None" else None
), ),
"tools_names": [ "tools_names": [
tool.name.casefold() for tool in task.tools tool.name.casefold() for tool in task.tools or []
], ],
} }
for task in crew.tasks for task in crew.tasks