Merge branch 'bugfix/kickoff-for-each-usage-metrics' into feature/kickoff-consistent-output

This commit is contained in:
Brandon Hancock
2024-07-01 18:27:05 -04:00
57 changed files with 414522 additions and 6456 deletions

View File

@@ -3,16 +3,20 @@ import re
import threading
import uuid
from concurrent.futures import Future
from copy import deepcopy
from copy import copy, deepcopy
from typing import Any, Dict, List, Optional, Type, Union
from langchain_openai import ChatOpenAI
from opentelemetry.trace import Span
from pydantic import UUID4, BaseModel, Field, field_validator, model_validator
from pydantic_core import PydanticCustomError
from crewai.agent import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tasks.task_output import TaskOutput
from crewai.utilities import I18N, Converter, ConverterError, Printer
from crewai.telemetry.telemetry import Telemetry
from crewai.utilities.converter import Converter, ConverterError
from crewai.utilities.i18n import I18N
from crewai.utilities.printer import Printer
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
@@ -43,7 +47,6 @@ class Task(BaseModel):
tools_errors: int = 0
delegations: int = 0
i18n: I18N = I18N()
thread: Optional[threading.Thread] = None
prompt_context: Optional[str] = None
description: str = Field(description="Description of the actual task.")
expected_output: str = Field(
@@ -56,7 +59,7 @@ class Task(BaseModel):
callback: Optional[Any] = Field(
description="Callback to be executed after the task is completed.", default=None
)
agent: Optional[Agent] = Field(
agent: Optional[BaseAgent] = Field(
description="Agent responsible for execution the task.", default=None
)
context: Optional[List["Task"]] = Field(
@@ -96,8 +99,11 @@ class Task(BaseModel):
default=False,
)
_telemetry: Telemetry
_execution_span: Span | None = None
_original_description: str | None = None
_original_expected_output: str | None = None
_thread: threading.Thread | None = None
def __init__(__pydantic_self__, **data):
config = data.pop("config", {})
@@ -119,6 +125,12 @@ class Task(BaseModel):
return value[1:]
return value
@model_validator(mode="after")
def set_private_attrs(self) -> "Task":
"""Set private attributes."""
self._telemetry = Telemetry()
return self
@model_validator(mode="after")
def set_attributes_based_on_config(self) -> "Task":
"""Set attributes based on the agent configuration."""
@@ -148,7 +160,7 @@ class Task(BaseModel):
def execute_sync(
self,
agent: Optional[Agent] = None,
agent: Optional[BaseAgent] = None,
context: Optional[str] = None,
tools: Optional[List[Any]] = None,
) -> TaskOutput:
@@ -157,7 +169,7 @@ class Task(BaseModel):
def execute_async(
self,
agent: Optional[Agent] = None,
agent: BaseAgent | None = None,
context: Optional[str] = None,
tools: Optional[List[Any]] = None,
) -> Future[TaskOutput]:
@@ -170,7 +182,7 @@ class Task(BaseModel):
def _execute_task_async(
self,
agent: Optional[Agent],
agent: Optional[BaseAgent],
context: Optional[str],
tools: Optional[List[Any]],
future: Future[TaskOutput],
@@ -181,11 +193,13 @@ class Task(BaseModel):
def _execute_core(
self,
agent: Optional[Agent],
agent: Optional[BaseAgent],
context: Optional[str],
tools: Optional[List[Any]],
) -> TaskOutput:
"""Run the core execution logic of the task."""
self._execution_span = self._telemetry.task_started(self)
agent = agent or self.agent
if not agent:
raise Exception(
@@ -209,7 +223,6 @@ class Task(BaseModel):
context=context,
tools=tools,
)
exported_output = self._export_output(result)
task_output = TaskOutput(
@@ -224,6 +237,10 @@ class Task(BaseModel):
if self.callback:
self.callback(self.output)
if self._execution_span:
self._telemetry.task_ended(self._execution_span, self)
self._execution_span = None
return task_output
def prompt(self) -> str:
@@ -259,7 +276,7 @@ class Task(BaseModel):
"""Increment the delegations counter."""
self.delegations += 1
def copy(self):
def copy(self, agents: Optional[List["BaseAgent"]] = None) -> "Task":
"""Create a deep copy of the Task."""
exclude = {
"id",
@@ -274,8 +291,12 @@ class Task(BaseModel):
cloned_context = (
[task.copy() for task in self.context] if self.context else None
)
cloned_agent = self.agent.copy() if self.agent else None
cloned_tools = deepcopy(self.tools) if self.tools else None
def get_agent_by_role(role: str) -> Union["BaseAgent", None]:
return next((agent for agent in agents if agent.role == role), None)
cloned_agent = get_agent_by_role(self.agent.role) if self.agent else None
cloned_tools = copy(self.tools) if self.tools else []
copied_task = Task(
**copied_data,
@@ -283,6 +304,7 @@ class Task(BaseModel):
agent=cloned_agent,
tools=cloned_tools,
)
return copied_task
def _export_output(