Improve telemetry (#818)

* Improve telemetry

* Minor adjustments

* Try to fix typing error

* Try to fix typing error [2]
This commit is contained in:
Gui Vieira
2024-06-28 20:05:47 -03:00
committed by GitHub
parent 851dd0f84f
commit 5b66e87621
3 changed files with 94 additions and 19 deletions

View File

@@ -281,7 +281,7 @@ class Crew(BaseModel):
inputs: Optional[Dict[str, Any]] = {},
) -> Union[str, Dict[str, Any]]:
"""Starts the crew to work on its assigned tasks."""
self._execution_span = self._telemetry.crew_execution_span(self)
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
# type: ignore # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
self._interpolate_inputs(inputs)
self._set_tasks_callbacks()

View File

@@ -1,18 +1,20 @@
from copy import deepcopy
import os
import re
import threading
import uuid
from copy import deepcopy
from typing import Any, Dict, List, Optional, Type
from langchain_openai import ChatOpenAI
from opentelemetry.trace import Span
from pydantic import UUID4, BaseModel, Field, field_validator, model_validator
from pydantic_core import PydanticCustomError
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry.telemetry import Telemetry
from crewai.utilities import I18N, ConverterError, Printer
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
from crewai.agents.agent_builder.base_agent import BaseAgent
class Task(BaseModel):
@@ -42,7 +44,6 @@ class Task(BaseModel):
tools_errors: int = 0
delegations: int = 0
i18n: I18N = I18N()
thread: Optional[threading.Thread] = None
prompt_context: Optional[str] = None
description: str = Field(description="Description of the actual task.")
expected_output: str = Field(
@@ -95,8 +96,11 @@ class Task(BaseModel):
default=False,
)
_telemetry: Telemetry
_execution_span: Span | None = None
_original_description: str | None = None
_original_expected_output: str | None = None
_thread: threading.Thread | None = None
def __init__(__pydantic_self__, **data):
config = data.pop("config", {})
@@ -118,6 +122,12 @@ class Task(BaseModel):
return value[1:]
return value
@model_validator(mode="after")
def set_private_attrs(self) -> "Task":
"""Set private attributes."""
self._telemetry = Telemetry()
return self
@model_validator(mode="after")
def set_attributes_based_on_config(self) -> "Task":
"""Set attributes based on the agent configuration."""
@@ -145,6 +155,18 @@ class Task(BaseModel):
)
return self
def wait_for_completion(self) -> str | BaseModel:
"""Wait for asynchronous task completion and return the output."""
assert self.async_execution, "Task is not set to be executed asynchronously."
if self._thread:
self._thread.join()
self._thread = None
assert self.output, "Task output is not set."
return self.output.exported_output
def execute( # type: ignore # Missing return statement
self,
agent: BaseAgent | None = None,
@@ -157,6 +179,8 @@ class Task(BaseModel):
Output of the task.
"""
self._execution_span = self._telemetry.task_started(self)
agent = agent or self.agent
if not agent:
raise Exception(
@@ -168,8 +192,8 @@ class Task(BaseModel):
context = []
for task in self.context:
if task.async_execution:
task.thread.join() # type: ignore # Item "None" of "Thread | None" has no attribute "join"
if task and task.output:
task.wait_for_completion()
if task.output:
# type: ignore # Item "str" of "str | None" has no attribute "append"
context.append(task.output.raw_output)
# type: ignore # Argument 1 to "join" of "str" has incompatible type "str | None"; expected "Iterable[str]"
@@ -179,10 +203,10 @@ class Task(BaseModel):
tools = tools or self.tools
if self.async_execution:
self.thread = threading.Thread(
self._thread = threading.Thread(
target=self._execute, args=(agent, self, context, tools)
)
self.thread.start()
self._thread.start()
else:
result = self._execute(
task=self,
@@ -200,7 +224,7 @@ class Task(BaseModel):
)
exported_output = self._export_output(result)
# type: the responses are usually str but need to figuire out a more elegant solution here
# type: ignore # the responses are usually str but need to figure out a more elegant solution here
self.output = TaskOutput(
description=self.description,
exported_output=exported_output,
@@ -211,6 +235,10 @@ class Task(BaseModel):
if self.callback:
self.callback(self.output)
if self._execution_span:
self._telemetry.task_ended(self._execution_span, self)
self._execution_span = None
return exported_output
def prompt(self) -> str:

View File

@@ -1,8 +1,10 @@
from __future__ import annotations
import asyncio
import json
import os
import platform
from typing import Any
from typing import TYPE_CHECKING, Any
import pkg_resources
from opentelemetry import trace
@@ -10,7 +12,11 @@ from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExport
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import Status, StatusCode
from opentelemetry.trace import Span, Status, StatusCode
if TYPE_CHECKING:
from crewai.crew import Crew
from crewai.task import Task
class Telemetry:
@@ -88,9 +94,6 @@ class Telemetry:
self._add_attribute(span, "python_version", platform.python_version())
self._add_attribute(span, "crew_id", str(crew.id))
self._add_attribute(span, "crew_process", crew.process)
self._add_attribute(
span, "crew_language", crew.prompt_file if crew.i18n else "None"
)
self._add_attribute(span, "crew_memory", crew.memory)
self._add_attribute(span, "crew_number_of_tasks", len(crew.tasks))
self._add_attribute(span, "crew_number_of_agents", len(crew.agents))
@@ -102,6 +105,8 @@ class Telemetry:
{
"id": str(agent.id),
"role": agent.role,
"goal": agent.goal,
"backstory": agent.backstory,
"verbose?": agent.verbose,
"max_iter": agent.max_iter,
"max_rpm": agent.max_rpm,
@@ -123,8 +128,16 @@ class Telemetry:
[
{
"id": str(task.id),
"description": task.description,
"expected_output": task.expected_output,
"async_execution?": task.async_execution,
"human_input?": task.human_input,
"agent_role": task.agent.role if task.agent else "None",
"context": (
[task.description for task in task.context]
if task.context
else None
),
"tools_names": [
tool.name.casefold() for tool in task.tools
],
@@ -143,6 +156,38 @@ class Telemetry:
except Exception:
pass
def task_started(self, task: Task) -> Span | None:
"""Records task started in a crew."""
if self.ready:
try:
tracer = trace.get_tracer("crewai.telemetry")
span = tracer.start_span("Task Execution")
self._add_attribute(span, "task_id", str(task.id))
self._add_attribute(span, "formatted_description", task.description)
self._add_attribute(
span, "formatted_expected_output", task.expected_output
)
return span
except Exception:
pass
return None
def task_ended(self, span: Span, task: Task):
"""Records task execution in a crew."""
if self.ready:
try:
self._add_attribute(
span, "output", task.output.raw_output if task.output else ""
)
span.set_status(Status(StatusCode.OK))
span.end()
except Exception:
pass
def tool_repeated_usage(self, llm: Any, tool_name: str, attempts: int):
"""Records the repeated usage 'error' of a tool by an agent."""
if self.ready:
@@ -207,7 +252,7 @@ class Telemetry:
except Exception:
pass
def crew_execution_span(self, crew):
def crew_execution_span(self, crew: Crew, inputs: dict[str, Any] | None):
"""Records the complete execution of a crew.
This is only collected if the user has opted-in to share the crew.
"""
@@ -221,6 +266,7 @@ class Telemetry:
pkg_resources.get_distribution("crewai").version,
)
self._add_attribute(span, "crew_id", str(crew.id))
self._add_attribute(span, "inputs", json.dumps(inputs))
self._add_attribute(
span,
"crew_agents",
@@ -238,7 +284,7 @@ class Telemetry:
"llm": json.dumps(self._safe_llm_attributes(agent.llm)),
"delegation_enabled?": agent.allow_delegation,
"tools_names": [
tool.name.casefold() for tool in agent.tools
tool.name.casefold() for tool in agent.tools or []
],
}
for agent in crew.agents
@@ -253,16 +299,17 @@ class Telemetry:
{
"id": str(task.id),
"description": task.description,
"expected_output": task.expected_output,
"async_execution?": task.async_execution,
"output": task.expected_output,
"human_input?": task.human_input,
"agent_role": task.agent.role if task.agent else "None",
"context": (
[task.description for task in task.context]
if task.context
else "None"
else None
),
"tools_names": [
tool.name.casefold() for tool in task.tools
tool.name.casefold() for tool in task.tools or []
],
}
for task in crew.tasks