Introduce structure keys (#902)

* Introduce structure keys

* Add agent key to tasks

* Rebasing is hard

* Rename task output telemetry

* Feedback
This commit is contained in:
Gui Vieira
2024-07-15 19:37:07 -03:00
committed by GitHub
parent 161c4a6856
commit dd8a199e99
7 changed files with 133 additions and 14 deletions

View File

@@ -1,6 +1,7 @@
import uuid import uuid
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from copy import copy as shallow_copy from copy import copy as shallow_copy
from hashlib import md5
from typing import Any, Dict, List, Optional, TypeVar from typing import Any, Dict, List, Optional, TypeVar
from pydantic import ( from pydantic import (
@@ -162,6 +163,11 @@ class BaseAgent(ABC, BaseModel):
self._token_process = TokenProcess() self._token_process = TokenProcess()
return self return self
@property
def key(self):
source = [self.role, self.goal, self.backstory]
return md5("|".join(source).encode()).hexdigest()
@abstractmethod @abstractmethod
def execute_task( def execute_task(
self, self,

View File

@@ -2,6 +2,7 @@ import asyncio
import json import json
import uuid import uuid
from concurrent.futures import Future from concurrent.futures import Future
from hashlib import md5
from typing import Any, Dict, List, Optional, Tuple, Union from typing import Any, Dict, List, Optional, Tuple, Union
from langchain_core.callbacks import BaseCallbackHandler from langchain_core.callbacks import BaseCallbackHandler
@@ -330,6 +331,13 @@ class Crew(BaseModel):
) )
return self return self
@property
def key(self) -> str:
source = [agent.key for agent in self.agents] + [
task.key for task in self.tasks
]
return md5("|".join(source).encode()).hexdigest()
def _setup_from_config(self): def _setup_from_config(self):
assert self.config is not None, "Config should not be None." assert self.config is not None, "Config should not be None."

View File

@@ -5,8 +5,10 @@ import threading
import uuid import uuid
from concurrent.futures import Future from concurrent.futures import Future
from copy import copy from copy import copy
from hashlib import md5
from typing import Any, Dict, List, Optional, Tuple, Type, Union from typing import Any, Dict, List, Optional, Tuple, Type, Union
from langchain_openai import ChatOpenAI from langchain_openai import ChatOpenAI
from opentelemetry.trace import Span from opentelemetry.trace import Span
from pydantic import UUID4, BaseModel, Field, field_validator, model_validator from pydantic import UUID4, BaseModel, Field, field_validator, model_validator
@@ -173,6 +175,14 @@ class Task(BaseModel):
"""Execute the task synchronously.""" """Execute the task synchronously."""
return self._execute_core(agent, context, tools) return self._execute_core(agent, context, tools)
@property
def key(self) -> str:
description = self._original_description or self.description
expected_output = self._original_expected_output or self.expected_output
source = [description, expected_output]
return md5("|".join(source).encode()).hexdigest()
def execute_async( def execute_async(
self, self,
agent: BaseAgent | None = None, agent: BaseAgent | None = None,
@@ -238,7 +248,7 @@ class Task(BaseModel):
self.callback(self.output) self.callback(self.output)
if self._execution_span: if self._execution_span:
self._telemetry.task_ended(self._execution_span, self) self._telemetry.task_ended(self._execution_span, self, agent.crew)
self._execution_span = None self._execution_span = None
if self.output_file: if self.output_file:

View File

@@ -92,13 +92,8 @@ class Telemetry:
pkg_resources.get_distribution("crewai").version, pkg_resources.get_distribution("crewai").version,
) )
self._add_attribute(span, "python_version", platform.python_version()) self._add_attribute(span, "python_version", platform.python_version())
self._add_attribute(span, "crew_key", crew.key)
self._add_attribute(span, "crew_id", str(crew.id)) self._add_attribute(span, "crew_id", str(crew.id))
if crew.share_crew:
self._add_attribute(
span, "crew_inputs", json.dumps(inputs) if inputs else None
)
self._add_attribute(span, "crew_process", crew.process) self._add_attribute(span, "crew_process", crew.process)
self._add_attribute(span, "crew_memory", crew.memory) self._add_attribute(span, "crew_memory", crew.memory)
self._add_attribute(span, "crew_number_of_tasks", len(crew.tasks)) self._add_attribute(span, "crew_number_of_tasks", len(crew.tasks))
@@ -109,6 +104,7 @@ class Telemetry:
json.dumps( json.dumps(
[ [
{ {
"key": agent.key,
"id": str(agent.id), "id": str(agent.id),
"role": agent.role, "role": agent.role,
"goal": agent.goal, "goal": agent.goal,
@@ -133,12 +129,14 @@ class Telemetry:
json.dumps( json.dumps(
[ [
{ {
"key": task.key,
"id": str(task.id), "id": str(task.id),
"description": task.description, "description": task.description,
"expected_output": task.expected_output, "expected_output": task.expected_output,
"async_execution?": task.async_execution, "async_execution?": task.async_execution,
"human_input?": task.human_input, "human_input?": task.human_input,
"agent_role": task.agent.role if task.agent else "None", "agent_role": task.agent.role if task.agent else "None",
"agent_key": task.agent.key if task.agent else None,
"context": ( "context": (
[task.description for task in task.context] [task.description for task in task.context]
if task.context if task.context
@@ -157,6 +155,12 @@ class Telemetry:
self._add_attribute(span, "platform_system", platform.system()) self._add_attribute(span, "platform_system", platform.system())
self._add_attribute(span, "platform_version", platform.version()) self._add_attribute(span, "platform_version", platform.version())
self._add_attribute(span, "cpus", os.cpu_count()) self._add_attribute(span, "cpus", os.cpu_count())
if crew.share_crew:
self._add_attribute(
span, "crew_inputs", json.dumps(inputs) if inputs else None
)
span.set_status(Status(StatusCode.OK)) span.set_status(Status(StatusCode.OK))
span.end() span.end()
except Exception: except Exception:
@@ -170,7 +174,9 @@ class Telemetry:
created_span = tracer.start_span("Task Created") created_span = tracer.start_span("Task Created")
self._add_attribute(created_span, "crew_key", crew.key)
self._add_attribute(created_span, "crew_id", str(crew.id)) self._add_attribute(created_span, "crew_id", str(crew.id))
self._add_attribute(created_span, "task_key", task.key)
self._add_attribute(created_span, "task_id", str(task.id)) self._add_attribute(created_span, "task_id", str(task.id))
if crew.share_crew: if crew.share_crew:
@@ -186,7 +192,9 @@ class Telemetry:
span = tracer.start_span("Task Execution") span = tracer.start_span("Task Execution")
self._add_attribute(span, "crew_key", crew.key)
self._add_attribute(span, "crew_id", str(crew.id)) self._add_attribute(span, "crew_id", str(crew.id))
self._add_attribute(span, "task_key", task.key)
self._add_attribute(span, "task_id", str(task.id)) self._add_attribute(span, "task_id", str(task.id))
if crew.share_crew: if crew.share_crew:
@@ -201,12 +209,15 @@ class Telemetry:
return None return None
def task_ended(self, span: Span, task: Task): def task_ended(self, span: Span, task: Task, crew: Crew):
"""Records task execution in a crew.""" """Records task execution in a crew."""
if self.ready: if self.ready:
try: try:
if crew.share_crew:
self._add_attribute( self._add_attribute(
span, "output", task.output.raw_output if task.output else "" span,
"task_output",
task.output.raw if task.output else "",
) )
span.set_status(Status(StatusCode.OK)) span.set_status(Status(StatusCode.OK))
@@ -293,6 +304,7 @@ class Telemetry:
"crewai_version", "crewai_version",
pkg_resources.get_distribution("crewai").version, pkg_resources.get_distribution("crewai").version,
) )
self._add_attribute(span, "crew_key", crew.key)
self._add_attribute(span, "crew_id", str(crew.id)) self._add_attribute(span, "crew_id", str(crew.id))
self._add_attribute( self._add_attribute(
span, "crew_inputs", json.dumps(inputs) if inputs else None span, "crew_inputs", json.dumps(inputs) if inputs else None
@@ -303,6 +315,7 @@ class Telemetry:
json.dumps( json.dumps(
[ [
{ {
"key": agent.key,
"id": str(agent.id), "id": str(agent.id),
"role": agent.role, "role": agent.role,
"goal": agent.goal, "goal": agent.goal,
@@ -333,6 +346,7 @@ class Telemetry:
"async_execution?": task.async_execution, "async_execution?": task.async_execution,
"human_input?": task.human_input, "human_input?": task.human_input,
"agent_role": task.agent.role if task.agent else "None", "agent_role": task.agent.role if task.agent else "None",
"agent_key": task.agent.key if task.agent else None,
"context": ( "context": (
[task.description for task in task.context] [task.description for task in task.context]
if task.context if task.context

View File

@@ -0,0 +1,36 @@
import hashlib
from typing import Any, List, Optional
from crewai.agents.agent_builder.base_agent import BaseAgent
from pydantic import BaseModel
class TestAgent(BaseAgent):
def execute_task(
self,
task: Any,
context: Optional[str] = None,
tools: Optional[List[Any]] = None,
) -> str:
return ""
def create_agent_executor(self, tools=None) -> None: ...
def _parse_tools(self, tools: List[Any]) -> List[Any]:
return []
def get_delegation_tools(self, agents: List["BaseAgent"]): ...
def get_output_converter(
self, llm: Any, text: str, model: type[BaseModel] | None, instructions: str
): ...
def test_key():
agent = TestAgent(
role="test role",
goal="test goal",
backstory="test backstory",
)
hash = hashlib.md5("test role|test goal|test backstory".encode()).hexdigest()
assert agent.key == hash

View File

@@ -1,5 +1,6 @@
"""Test Agent creation and execution basic functionality.""" """Test Agent creation and execution basic functionality."""
import hashlib
import json import json
from concurrent.futures import Future from concurrent.futures import Future
from unittest import mock from unittest import mock
@@ -2234,3 +2235,28 @@ def test_replay_from_task_setup_context():
assert crew.tasks[0].output.output_format == OutputFormat.RAW assert crew.tasks[0].output.output_format == OutputFormat.RAW
assert crew.tasks[1].prompt_context == "context raw output" assert crew.tasks[1].prompt_context == "context raw output"
def test_key():
tasks = [
Task(
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
expected_output="Bullet point list of 5 important events.",
agent=researcher,
),
Task(
description="Write a 1 amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
expected_output="A 4 paragraph article about AI.",
agent=writer,
),
]
crew = Crew(
agents=[researcher, writer],
process=Process.sequential,
tasks=tasks,
)
hash = hashlib.md5(
f"{researcher.key}|{writer.key}|{tasks[0].key}|{tasks[1].key}".encode()
).hexdigest()
assert crew.key == hash

View File

@@ -1,15 +1,15 @@
"""Test Agent creation and execution basic functionality.""" """Test Agent creation and execution basic functionality."""
import hashlib
import json import json
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
import pytest import pytest
from pydantic import BaseModel
from pydantic_core import ValidationError
from crewai import Agent, Crew, Process, Task from crewai import Agent, Crew, Process, Task
from crewai.tasks.task_output import TaskOutput from crewai.tasks.task_output import TaskOutput
from crewai.utilities.converter import Converter from crewai.utilities.converter import Converter
from pydantic import BaseModel
from pydantic_core import ValidationError
def test_task_tool_reflect_agent_tools(): def test_task_tool_reflect_agent_tools():
@@ -791,3 +791,22 @@ def test_task_output_str_with_none():
) )
assert str(task_output) == "" assert str(task_output) == ""
def test_key():
original_description = "Give me a list of 5 interesting ideas about {topic} to explore for an article, what makes them unique and interesting."
original_expected_output = "Bullet point list of 5 interesting ideas about {topic}."
task = Task(
description=original_description,
expected_output=original_expected_output,
)
hash = hashlib.md5(
f"{original_description}|{original_expected_output}".encode()
).hexdigest()
assert task.key == hash, "The key should be the hash of the description."
task.interpolate_inputs(inputs={"topic": "AI"})
assert (
task.key == hash
), "The key should be the hash of the non-interpolated description."