Merge remote-tracking branch 'upstream/main'

# Conflicts:
#	pyproject.toml
#	src/crewai/agent.py
#	src/crewai/crew.py
#	src/crewai/task.py
#	src/crewai/tools/tool_usage.py
#	src/crewai/utilities/evaluators/task_evaluator.py
This commit is contained in:
Braelyn Boynton
2024-07-23 17:55:15 -04:00
180 changed files with 925552 additions and 16679 deletions

View File

@@ -2,3 +2,5 @@ from crewai.agent import Agent
from crewai.crew import Crew
from crewai.process import Process
from crewai.task import Task
__all__ = ["Agent", "Crew", "Process", "Task"]

View File

@@ -1,33 +1,27 @@
import os
import uuid
from typing import Any, Dict, List, Optional, Tuple
from inspect import signature
from typing import Any, List, Optional, Tuple
from langchain.agents.agent import RunnableAgent
from langchain.agents.tools import BaseTool
from langchain.agents.tools import tool as LangChainTool
from langchain.tools.render import render_text_description
from langchain_core.agents import AgentAction
from langchain_core.callbacks import BaseCallbackHandler
from langchain_openai import ChatOpenAI
from pydantic import (
UUID4,
BaseModel,
ConfigDict,
Field,
InstanceOf,
PrivateAttr,
field_validator,
model_validator,
)
from pydantic_core import PydanticCustomError
from pydantic import Field, InstanceOf, PrivateAttr, model_validator
from crewai.agents import CacheHandler, CrewAgentExecutor, CrewAgentParser, ToolsHandler
from crewai.agents import CacheHandler, CrewAgentExecutor, CrewAgentParser
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.memory.contextual.contextual_memory import ContextualMemory
from crewai.utilities import I18N, Logger, Prompts, RPMController
from crewai.utilities.token_counter_callback import TokenCalcHandler, TokenProcess
from crewai.tools.agent_tools import AgentTools
from crewai.utilities import Converter, Prompts
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.token_counter_callback import TokenCalcHandler
from crewai.utilities.training_handler import CrewTrainingHandler
agentops = None
try:
import agentops
import agentops # type: ignore # Name "agentops" already defined on line 21
from agentops import track_agent
except ImportError:
@@ -39,7 +33,7 @@ except ImportError:
@track_agent()
class Agent(BaseModel):
class Agent(BaseAgent):
"""Represents an agent in a system.
Each agent has a role, a goal, a backstory, and an optional language model (llm).
@@ -61,61 +55,17 @@ class Agent(BaseModel):
tools: Tools at agents disposal
step_callback: Callback to be executed after each step of the agent execution.
callbacks: A list of callback functions from the langchain library that are triggered during the agent's execution process
allow_code_execution: Enable code execution for the agent.
max_retry_limit: Maximum number of retries for an agent to execute a task when an error occurs.
"""
__hash__ = object.__hash__ # type: ignore
_logger: Logger = PrivateAttr()
_rpm_controller: RPMController = PrivateAttr(default=None)
_request_within_rpm_limit: Any = PrivateAttr(default=None)
_token_process: TokenProcess = TokenProcess()
agent_ops_agent_name: str = None
agent_ops_agent_id: str = None
formatting_errors: int = 0
model_config = ConfigDict(arbitrary_types_allowed=True)
id: UUID4 = Field(
default_factory=uuid.uuid4,
frozen=True,
description="Unique identifier for the object, not set by user.",
)
role: str = Field(description="Role of the agent")
goal: str = Field(description="Objective of the agent")
backstory: str = Field(description="Backstory of the agent")
cache: bool = Field(
default=True,
description="Whether the agent should use a cache for tool usage.",
)
config: Optional[Dict[str, Any]] = Field(
description="Configuration for the agent",
default=None,
)
max_rpm: Optional[int] = Field(
default=None,
description="Maximum number of requests per minute for the agent execution to be respected.",
)
verbose: bool = Field(
default=False, description="Verbose mode for the Agent Execution"
)
allow_delegation: bool = Field(
default=True, description="Allow delegation of tasks to agents"
)
tools: Optional[List[Any]] = Field(
default_factory=list, description="Tools at agents disposal"
)
max_iter: Optional[int] = Field(
default=25, description="Maximum iterations for an agent to execute a task"
)
_times_executed: int = PrivateAttr(default=0)
max_execution_time: Optional[int] = Field(
default=None,
description="Maximum execution time for an agent to execute a task",
)
agent_executor: InstanceOf[CrewAgentExecutor] = Field(
default=None, description="An instance of the CrewAgentExecutor class."
)
crew: Any = Field(default=None, description="Crew to which the agent belongs.")
tools_handler: InstanceOf[ToolsHandler] = Field(
default=None, description="An instance of the ToolsHandler class."
)
agent_ops_agent_name: str = None # type: ignore # Incompatible types in assignment (expression has type "None", variable has type "str")
agent_ops_agent_id: str = None # type: ignore # Incompatible types in assignment (expression has type "None", variable has type "str")
cache_handler: InstanceOf[CacheHandler] = Field(
default=None, description="An instance of the CacheHandler class."
)
@@ -123,7 +73,6 @@ class Agent(BaseModel):
default=None,
description="Callback to be executed after each step of the agent execution.",
)
i18n: I18N = Field(default=I18N(), description="Internationalization settings.")
llm: Any = Field(
default_factory=lambda: ChatOpenAI(
model=os.environ.get("OPENAI_MODEL_NAME", "gpt-4o")
@@ -145,45 +94,25 @@ class Agent(BaseModel):
response_template: Optional[str] = Field(
default=None, description="Response format for the agent."
)
_original_role: str | None = None
_original_goal: str | None = None
_original_backstory: str | None = None
tools_results: Optional[List[Any]] = Field(
default=[], description="Results of the tools used by the agent."
)
allow_code_execution: Optional[bool] = Field(
default=False, description="Enable code execution for the agent."
)
max_retry_limit: int = Field(
default=2,
description="Maximum number of retries for an agent to execute a task when an error occurs.",
)
def __init__(__pydantic_self__, **data):
config = data.pop("config", {})
super().__init__(**config, **data)
__pydantic_self__.agent_ops_agent_name = __pydantic_self__.role
@field_validator("id", mode="before")
@classmethod
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
if v:
raise PydanticCustomError(
"may_not_set_field", "This field is not to be set by the user.", {}
)
@model_validator(mode="after")
def set_attributes_based_on_config(self) -> "Agent":
"""Set attributes based on the agent configuration."""
if self.config:
for key, value in self.config.items():
setattr(self, key, value)
return self
@model_validator(mode="after")
def set_private_attrs(self):
"""Set private attributes."""
self._logger = Logger(self.verbose)
if self.max_rpm and not self._rpm_controller:
self._rpm_controller = RPMController(
max_rpm=self.max_rpm, logger=self._logger
)
return self
@model_validator(mode="after")
def set_agent_executor(self) -> "Agent":
"""set agent executor is set."""
"""Ensure agent executor and token process are set."""
if hasattr(self.llm, "model_name"):
token_handler = TokenCalcHandler(self.llm.model_name, self._token_process)
@@ -198,7 +127,8 @@ class Agent(BaseModel):
self.llm.callbacks.append(token_handler)
if agentops and not any(
isinstance(handler, agentops.LangchainCallbackHandler) for handler in self.llm.callbacks
isinstance(handler, agentops.LangchainCallbackHandler)
for handler in self.llm.callbacks
):
agentops.stop_instrumenting()
self.llm.callbacks.append(agentops.LangchainCallbackHandler())
@@ -245,50 +175,60 @@ class Agent(BaseModel):
if memory.strip() != "":
task_prompt += self.i18n.slice("memory").format(memory=memory)
tools = tools or self.tools
parsed_tools = self._parse_tools(tools) # type: ignore # Argument 1 to "_parse_tools" of "Agent" has incompatible type "list[Any] | None"; expected "list[Any]"
tools = tools or self.tools or []
parsed_tools = self._parse_tools(tools)
self.create_agent_executor(tools=tools)
self.agent_executor.tools = parsed_tools
self.agent_executor.task = task
self.agent_executor.tools_description = render_text_description(parsed_tools)
self.agent_executor.tools_description = self._render_text_description_and_args(
parsed_tools
)
self.agent_executor.tools_names = self.__tools_names(parsed_tools)
result = self.agent_executor.invoke(
{
"input": task_prompt,
"tool_names": self.agent_executor.tools_names,
"tools": self.agent_executor.tools_description,
}
)["output"]
if self.crew and self.crew._train:
task_prompt = self._training_handler(task_prompt=task_prompt)
else:
task_prompt = self._use_trained_data(task_prompt=task_prompt)
try:
result = self.agent_executor.invoke(
{
"input": task_prompt,
"tool_names": self.agent_executor.tools_names,
"tools": self.agent_executor.tools_description,
}
)["output"]
except Exception as e:
self._times_executed += 1
if self._times_executed > self.max_retry_limit:
raise e
result = self.execute_task(task, context, tools)
if self.max_rpm:
self._rpm_controller.stop_rpm_counter()
# If there was any tool in self.tools_results that had result_as_answer
# set to True, return the results of the last tool that had
# result_as_answer set to True
for tool_result in self.tools_results: # type: ignore # Item "None" of "list[Any] | None" has no attribute "__iter__" (not iterable)
if tool_result.get("result_as_answer", False):
result = tool_result["result"]
return result
def set_cache_handler(self, cache_handler: CacheHandler) -> None:
"""Set the cache handler for the agent.
Args:
cache_handler: An instance of the CacheHandler class.
"""
self.tools_handler = ToolsHandler()
if self.cache:
self.cache_handler = cache_handler
self.tools_handler.cache = cache_handler
self.create_agent_executor()
def set_rpm_controller(self, rpm_controller: RPMController) -> None:
"""Set the rpm controller for the agent.
Args:
rpm_controller: An instance of the RPMController class.
"""
if not self._rpm_controller:
self._rpm_controller = rpm_controller
self.create_agent_executor()
def format_log_to_str(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
observation_prefix: str = "Observation: ",
llm_prefix: str = "",
) -> str:
"""Construct the scratchpad that lets the agent continue its thought process."""
thoughts = ""
for action, observation in intermediate_steps:
thoughts += action.log
thoughts += f"\n{observation_prefix}{observation}\n{llm_prefix}"
return thoughts
def create_agent_executor(self, tools=None) -> None:
"""Create an agent executor for the agent.
@@ -296,7 +236,7 @@ class Agent(BaseModel):
Returns:
An instance of the CrewAgentExecutor class.
"""
tools = tools or self.tools
tools = tools or self.tools or []
agent_args = {
"input": lambda x: x["input"],
@@ -344,53 +284,42 @@ class Agent(BaseModel):
)
stop_words = [self.i18n.slice("observation")]
if self.response_template:
stop_words.append(
self.response_template.split("{{ .Response }}")[1].strip()
)
bind = self.llm.bind(stop=stop_words)
inner_agent = agent_args | execution_prompt | bind | CrewAgentParser(agent=self)
self.agent_executor = CrewAgentExecutor(
agent=RunnableAgent(runnable=inner_agent), **executor_args
)
def interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
"""Interpolate inputs into the agent description and backstory."""
if self._original_role is None:
self._original_role = self.role
if self._original_goal is None:
self._original_goal = self.goal
if self._original_backstory is None:
self._original_backstory = self.backstory
def get_delegation_tools(self, agents: List[BaseAgent]):
agent_tools = AgentTools(agents=agents)
tools = agent_tools.tools()
return tools
if inputs:
self.role = self._original_role.format(**inputs)
self.goal = self._original_goal.format(**inputs)
self.backstory = self._original_backstory.format(**inputs)
def get_code_execution_tools(self):
try:
from crewai_tools import CodeInterpreterTool
def increment_formatting_errors(self) -> None:
"""Count the formatting errors of the agent."""
self.formatting_errors += 1
return [CodeInterpreterTool()]
except ModuleNotFoundError:
self._logger.log(
"info", "Coding tools not available. Install crewai_tools. "
)
def format_log_to_str(
self,
intermediate_steps: List[Tuple[AgentAction, str]],
observation_prefix: str = "Observation: ",
llm_prefix: str = "",
) -> str:
"""Construct the scratchpad that lets the agent continue its thought process."""
thoughts = ""
for action, observation in intermediate_steps:
thoughts += action.log
thoughts += f"\n{observation_prefix}{observation}\n{llm_prefix}"
return thoughts
def get_output_converter(self, llm, text, model, instructions):
return Converter(llm=llm, text=text, model=model, instructions=instructions)
def _parse_tools(self, tools: List[Any]) -> List[LangChainTool]: # type: ignore # Function "langchain_core.tools.tool" is not valid as a type
"""Parse tools to be used for the task."""
# tentatively try to import from crewai_tools import BaseTool as CrewAITool
tools_list = []
try:
# tentatively try to import from crewai_tools import BaseTool as CrewAITool
from crewai_tools import BaseTool as CrewAITool
for tool in tools:
@@ -399,10 +328,82 @@ class Agent(BaseModel):
else:
tools_list.append(tool)
except ModuleNotFoundError:
tools_list = []
for tool in tools:
tools_list.append(tool)
return tools_list
def _training_handler(self, task_prompt: str) -> str:
"""Handle training data for the agent task prompt to improve output on Training."""
if data := CrewTrainingHandler(TRAINING_DATA_FILE).load():
agent_id = str(self.id)
if data.get(agent_id):
human_feedbacks = [
i["human_feedback"] for i in data.get(agent_id, {}).values()
]
task_prompt += "You MUST follow these feedbacks: \n " + "\n - ".join(
human_feedbacks
)
return task_prompt
def _use_trained_data(self, task_prompt: str) -> str:
"""Use trained data for the agent task prompt to improve output."""
if data := CrewTrainingHandler(TRAINED_AGENTS_DATA_FILE).load():
if trained_data_output := data.get(self.role):
task_prompt += "You MUST follow these feedbacks: \n " + "\n - ".join(
trained_data_output["suggestions"]
)
return task_prompt
def _render_text_description(self, tools: List[BaseTool]) -> str:
"""Render the tool name and description in plain text.
Output will be in the format of:
.. code-block:: markdown
search: This tool is used for search
calculator: This tool is used for math
"""
description = "\n".join(
[
f"Tool name: {tool.name}\nTool description:\n{tool.description}"
for tool in tools
]
)
return description
def _render_text_description_and_args(self, tools: List[BaseTool]) -> str:
"""Render the tool name, description, and args in plain text.
Output will be in the format of:
.. code-block:: markdown
search: This tool is used for search, args: {"query": {"type": "string"}}
calculator: This tool is used for math, \
args: {"expression": {"type": "string"}}
"""
tool_strings = []
for tool in tools:
args_schema = str(tool.args)
if hasattr(tool, "func") and tool.func:
sig = signature(tool.func)
description = (
f"Tool Name: {tool.name}{sig}\nTool Description: {tool.description}"
)
else:
description = (
f"Tool Name: {tool.name}\nTool Description: {tool.description}"
)
tool_strings.append(f"{description}\nTool Arguments: {args_schema}")
return "\n".join(tool_strings)
@staticmethod
def __tools_names(tools) -> str:
return ", ".join([t.name for t in tools])

View File

@@ -0,0 +1,262 @@
import uuid
from abc import ABC, abstractmethod
from copy import copy as shallow_copy
from hashlib import md5
from typing import Any, Dict, List, Optional, TypeVar
from pydantic import (
UUID4,
BaseModel,
ConfigDict,
Field,
InstanceOf,
PrivateAttr,
field_validator,
model_validator,
)
from pydantic_core import PydanticCustomError
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
from crewai.agents.cache.cache_handler import CacheHandler
from crewai.agents.tools_handler import ToolsHandler
from crewai.utilities import I18N, Logger, RPMController
T = TypeVar("T", bound="BaseAgent")
class BaseAgent(ABC, BaseModel):
"""Abstract Base Class for all third party agents compatible with CrewAI.
Attributes:
id (UUID4): Unique identifier for the agent.
role (str): Role of the agent.
goal (str): Objective of the agent.
backstory (str): Backstory of the agent.
cache (bool): Whether the agent should use a cache for tool usage.
config (Optional[Dict[str, Any]]): Configuration for the agent.
verbose (bool): Verbose mode for the Agent Execution.
max_rpm (Optional[int]): Maximum number of requests per minute for the agent execution.
allow_delegation (bool): Allow delegation of tasks to agents.
tools (Optional[List[Any]]): Tools at the agent's disposal.
max_iter (Optional[int]): Maximum iterations for an agent to execute a task.
agent_executor (InstanceOf): An instance of the CrewAgentExecutor class.
llm (Any): Language model that will run the agent.
crew (Any): Crew to which the agent belongs.
i18n (I18N): Internationalization settings.
cache_handler (InstanceOf[CacheHandler]): An instance of the CacheHandler class.
tools_handler (InstanceOf[ToolsHandler]): An instance of the ToolsHandler class.
Methods:
execute_task(task: Any, context: Optional[str] = None, tools: Optional[List[Any]] = None) -> str:
Abstract method to execute a task.
create_agent_executor(tools=None) -> None:
Abstract method to create an agent executor.
_parse_tools(tools: List[Any]) -> List[Any]:
Abstract method to parse tools.
get_delegation_tools(agents: List["BaseAgent"]):
Abstract method to set the agents task tools for handling delegation and question asking to other agents in crew.
get_output_converter(llm, model, instructions):
Abstract method to get the converter class for the agent to create json/pydantic outputs.
interpolate_inputs(inputs: Dict[str, Any]) -> None:
Interpolate inputs into the agent description and backstory.
set_cache_handler(cache_handler: CacheHandler) -> None:
Set the cache handler for the agent.
increment_formatting_errors() -> None:
Increment formatting errors.
copy() -> "BaseAgent":
Create a copy of the agent.
set_rpm_controller(rpm_controller: RPMController) -> None:
Set the rpm controller for the agent.
set_private_attrs() -> "BaseAgent":
Set private attributes.
"""
__hash__ = object.__hash__ # type: ignore
_logger: Logger = PrivateAttr()
_rpm_controller: RPMController = PrivateAttr(default=None)
_request_within_rpm_limit: Any = PrivateAttr(default=None)
formatting_errors: int = 0
model_config = ConfigDict(arbitrary_types_allowed=True)
id: UUID4 = Field(default_factory=uuid.uuid4, frozen=True)
role: str = Field(description="Role of the agent")
goal: str = Field(description="Objective of the agent")
backstory: str = Field(description="Backstory of the agent")
cache: bool = Field(
default=True, description="Whether the agent should use a cache for tool usage."
)
config: Optional[Dict[str, Any]] = Field(
description="Configuration for the agent", default=None
)
verbose: bool = Field(
default=False, description="Verbose mode for the Agent Execution"
)
max_rpm: Optional[int] = Field(
default=None,
description="Maximum number of requests per minute for the agent execution to be respected.",
)
allow_delegation: bool = Field(
default=True, description="Allow delegation of tasks to agents"
)
tools: Optional[List[Any]] = Field(
default_factory=list, description="Tools at agents' disposal"
)
max_iter: Optional[int] = Field(
default=25, description="Maximum iterations for an agent to execute a task"
)
agent_executor: InstanceOf = Field(
default=None, description="An instance of the CrewAgentExecutor class."
)
llm: Any = Field(
default=None, description="Language model that will run the agent."
)
crew: Any = Field(default=None, description="Crew to which the agent belongs.")
i18n: I18N = Field(default=I18N(), description="Internationalization settings.")
cache_handler: InstanceOf[CacheHandler] = Field(
default=None, description="An instance of the CacheHandler class."
)
tools_handler: InstanceOf[ToolsHandler] = Field(
default=None, description="An instance of the ToolsHandler class."
)
_original_role: str | None = None
_original_goal: str | None = None
_original_backstory: str | None = None
_token_process: TokenProcess = TokenProcess()
def __init__(__pydantic_self__, **data):
config = data.pop("config", {})
super().__init__(**config, **data)
@model_validator(mode="after")
def set_config_attributes(self):
if self.config:
for key, value in self.config.items():
setattr(self, key, value)
return self
@field_validator("id", mode="before")
@classmethod
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
if v:
raise PydanticCustomError(
"may_not_set_field", "This field is not to be set by the user.", {}
)
@model_validator(mode="after")
def set_attributes_based_on_config(self) -> "BaseAgent":
"""Set attributes based on the agent configuration."""
if self.config:
for key, value in self.config.items():
setattr(self, key, value)
return self
@model_validator(mode="after")
def set_private_attrs(self):
"""Set private attributes."""
self._logger = Logger(self.verbose)
if self.max_rpm and not self._rpm_controller:
self._rpm_controller = RPMController(
max_rpm=self.max_rpm, logger=self._logger
)
if not self._token_process:
self._token_process = TokenProcess()
return self
@property
def key(self):
source = [self.role, self.goal, self.backstory]
return md5("|".join(source).encode()).hexdigest()
@abstractmethod
def execute_task(
self,
task: Any,
context: Optional[str] = None,
tools: Optional[List[Any]] = None,
) -> str:
pass
@abstractmethod
def create_agent_executor(self, tools=None) -> None:
pass
@abstractmethod
def _parse_tools(self, tools: List[Any]) -> List[Any]:
pass
@abstractmethod
def get_delegation_tools(self, agents: List["BaseAgent"]) -> List[Any]:
"""Set the task tools that init BaseAgenTools class."""
pass
@abstractmethod
def get_output_converter(
self, llm: Any, text: str, model: type[BaseModel] | None, instructions: str
):
"""Get the converter class for the agent to create json/pydantic outputs."""
pass
def copy(self: T) -> T: # type: ignore # Signature of "copy" incompatible with supertype "BaseModel"
"""Create a deep copy of the Agent."""
exclude = {
"id",
"_logger",
"_rpm_controller",
"_request_within_rpm_limit",
"_token_process",
"agent_executor",
"tools",
"tools_handler",
"cache_handler",
"llm",
}
# Copy llm and clear callbacks
existing_llm = shallow_copy(self.llm)
existing_llm.callbacks = []
copied_data = self.model_dump(exclude=exclude)
copied_data = {k: v for k, v in copied_data.items() if v is not None}
copied_agent = type(self)(**copied_data, llm=existing_llm, tools=self.tools)
return copied_agent
def interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
"""Interpolate inputs into the agent description and backstory."""
if self._original_role is None:
self._original_role = self.role
if self._original_goal is None:
self._original_goal = self.goal
if self._original_backstory is None:
self._original_backstory = self.backstory
if inputs:
self.role = self._original_role.format(**inputs)
self.goal = self._original_goal.format(**inputs)
self.backstory = self._original_backstory.format(**inputs)
def set_cache_handler(self, cache_handler: CacheHandler) -> None:
"""Set the cache handler for the agent.
Args:
cache_handler: An instance of the CacheHandler class.
"""
self.tools_handler = ToolsHandler()
if self.cache:
self.cache_handler = cache_handler
self.tools_handler.cache = cache_handler
self.create_agent_executor()
def increment_formatting_errors(self) -> None:
self.formatting_errors += 1
def set_rpm_controller(self, rpm_controller: RPMController) -> None:
"""Set the rpm controller for the agent.
Args:
rpm_controller: An instance of the RPMController class.
"""
if not self._rpm_controller:
self._rpm_controller = rpm_controller
self.create_agent_executor()

View File

@@ -0,0 +1,109 @@
import time
from typing import TYPE_CHECKING, Optional
from crewai.memory.entity.entity_memory_item import EntityMemoryItem
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
from crewai.utilities.converter import ConverterError
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities import I18N
if TYPE_CHECKING:
from crewai.crew import Crew
from crewai.task import Task
from crewai.agents.agent_builder.base_agent import BaseAgent
class CrewAgentExecutorMixin:
crew: Optional["Crew"]
crew_agent: Optional["BaseAgent"]
task: Optional["Task"]
iterations: int
force_answer_max_iterations: int
have_forced_answer: bool
_i18n: I18N
def _should_force_answer(self) -> bool:
"""Determine if a forced answer is required based on iteration count."""
return (
self.iterations == self.force_answer_max_iterations
) and not self.have_forced_answer
def _create_short_term_memory(self, output) -> None:
"""Create and save a short-term memory item if conditions are met."""
if (
self.crew
and self.crew_agent
and self.task
and "Action: Delegate work to coworker" not in output.log
):
try:
memory = ShortTermMemoryItem(
data=output.log,
agent=self.crew_agent.role,
metadata={
"observation": self.task.description,
},
)
if (
hasattr(self.crew, "_short_term_memory")
and self.crew._short_term_memory
):
self.crew._short_term_memory.save(memory)
except Exception as e:
print(f"Failed to add to short term memory: {e}")
pass
def _create_long_term_memory(self, output) -> None:
"""Create and save long-term and entity memory items based on evaluation."""
if (
self.crew
and self.crew.memory
and self.crew._long_term_memory
and self.crew._entity_memory
and self.task
and self.crew_agent
):
try:
ltm_agent = TaskEvaluator(self.crew_agent)
evaluation = ltm_agent.evaluate(self.task, output.log)
if isinstance(evaluation, ConverterError):
return
long_term_memory = LongTermMemoryItem(
task=self.task.description,
agent=self.crew_agent.role,
quality=evaluation.quality,
datetime=str(time.time()),
expected_output=self.task.expected_output,
metadata={
"suggestions": evaluation.suggestions,
"quality": evaluation.quality,
},
)
self.crew._long_term_memory.save(long_term_memory)
for entity in evaluation.entities:
entity_memory = EntityMemoryItem(
name=entity.name,
type=entity.type,
description=entity.description,
relationships="\n".join(
[f"- {r}" for r in entity.relationships]
),
)
self.crew._entity_memory.save(entity_memory)
except AttributeError as e:
print(f"Missing attributes for long term memory: {e}")
pass
except Exception as e:
print(f"Failed to add to long term memory: {e}")
pass
def _ask_human_input(self, final_answer: dict) -> str:
"""Prompt human input for final decision making."""
return input(
self._i18n.slice("getting_input").format(final_answer=final_answer)
)

View File

@@ -0,0 +1,86 @@
from abc import ABC, abstractmethod
from typing import List, Optional, Union
from pydantic import BaseModel, Field
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.task import Task
from crewai.utilities import I18N
class BaseAgentTools(BaseModel, ABC):
"""Default tools around agent delegation"""
agents: List[BaseAgent] = Field(description="List of agents in this crew.")
i18n: I18N = Field(default=I18N(), description="Internationalization settings.")
@abstractmethod
def tools(self):
pass
def _get_coworker(self, coworker: Optional[str], **kwargs) -> Optional[str]:
coworker = coworker or kwargs.get("co_worker") or kwargs.get("coworker")
if coworker:
is_list = coworker.startswith("[") and coworker.endswith("]")
if is_list:
coworker = coworker[1:-1].split(",")[0]
return coworker
def delegate_work(
self, task: str, context: str, coworker: Optional[str] = None, **kwargs
):
"""Useful to delegate a specific task to a coworker passing all necessary context and names."""
coworker = self._get_coworker(coworker, **kwargs)
return self._execute(coworker, task, context)
def ask_question(
self, question: str, context: str, coworker: Optional[str] = None, **kwargs
):
"""Useful to ask a question, opinion or take from a coworker passing all necessary context and names."""
coworker = self._get_coworker(coworker, **kwargs)
return self._execute(coworker, question, context)
def _execute(
self, agent_name: Union[str, None], task: str, context: Union[str, None]
):
"""Execute the command."""
try:
if agent_name is None:
agent_name = ""
# It is important to remove the quotes from the agent name.
# The reason we have to do this is because less-powerful LLM's
# have difficulty producing valid JSON.
# As a result, we end up with invalid JSON that is truncated like this:
# {"task": "....", "coworker": "....
# when it should look like this:
# {"task": "....", "coworker": "...."}
agent_name = agent_name.casefold().replace('"', "").replace("\n", "")
agent = [ # type: ignore # Incompatible types in assignment (expression has type "list[BaseAgent]", variable has type "str | None")
available_agent
for available_agent in self.agents
if available_agent.role.casefold().replace("\n", "") == agent_name
]
except Exception as _:
return self.i18n.errors("agent_tool_unexsiting_coworker").format(
coworkers="\n".join(
[f"- {agent.role.casefold()}" for agent in self.agents]
)
)
if not agent:
return self.i18n.errors("agent_tool_unexsiting_coworker").format(
coworkers="\n".join(
[f"- {agent.role.casefold()}" for agent in self.agents]
)
)
agent = agent[0]
task_with_assigned_agent = Task( # type: ignore # Incompatible types in assignment (expression has type "Task", variable has type "str")
description=task,
agent=agent,
expected_output="Your best answer to your coworker asking you this, accounting for the context shared.",
)
return agent.execute_task(task_with_assigned_agent, context)

View File

@@ -0,0 +1,47 @@
from abc import ABC, abstractmethod
from typing import Any, Optional
from pydantic import BaseModel, Field
class OutputConverter(BaseModel, ABC):
"""
Abstract base class for converting task results into structured formats.
This class provides a framework for converting unstructured text into
either Pydantic models or JSON, tailored for specific agent requirements.
It uses a language model to interpret and structure the input text based
on given instructions.
Attributes:
text (str): The input text to be converted.
llm (Any): The language model used for conversion.
model (Any): The target model for structuring the output.
instructions (str): Specific instructions for the conversion process.
max_attempts (int): Maximum number of conversion attempts (default: 3).
"""
text: str = Field(description="Text to be converted.")
llm: Any = Field(description="The language model to be used to convert the text.")
model: Any = Field(description="The model to be used to convert the text.")
instructions: str = Field(description="Conversion instructions to the LLM.")
max_attempts: Optional[int] = Field(
description="Max number of attempts to try to get the output formatted.",
default=3,
)
@abstractmethod
def to_pydantic(self, current_attempt=1):
"""Convert text to pydantic."""
pass
@abstractmethod
def to_json(self, current_attempt=1):
"""Convert text to json."""
pass
@property
@abstractmethod
def is_gpt(self) -> bool:
"""Return if llm provided is of gpt from openai."""
pass

View File

@@ -0,0 +1,27 @@
from typing import Any, Dict
class TokenProcess:
total_tokens: int = 0
prompt_tokens: int = 0
completion_tokens: int = 0
successful_requests: int = 0
def sum_prompt_tokens(self, tokens: int):
self.prompt_tokens = self.prompt_tokens + tokens
self.total_tokens = self.total_tokens + tokens
def sum_completion_tokens(self, tokens: int):
self.completion_tokens = self.completion_tokens + tokens
self.total_tokens = self.total_tokens + tokens
def sum_successful_requests(self, requests: int):
self.successful_requests = self.successful_requests + requests
def get_summary(self) -> Dict[str, Any]:
return {
"total_tokens": self.total_tokens,
"prompt_tokens": self.prompt_tokens,
"completion_tokens": self.completion_tokens,
"successful_requests": self.successful_requests,
}

View File

@@ -7,22 +7,19 @@ from langchain.agents.agent import ExceptionTool
from langchain.callbacks.manager import CallbackManagerForChainRun
from langchain_core.agents import AgentAction, AgentFinish, AgentStep
from langchain_core.exceptions import OutputParserException
from langchain_core.pydantic_v1 import root_validator
from langchain_core.tools import BaseTool
from langchain_core.utils.input import get_color_mapping
from pydantic import InstanceOf
from crewai.agents.agent_builder.base_agent_executor_mixin import CrewAgentExecutorMixin
from crewai.agents.tools_handler import ToolsHandler
from crewai.memory.entity.entity_memory_item import EntityMemoryItem
from crewai.memory.long_term.long_term_memory_item import LongTermMemoryItem
from crewai.memory.short_term.short_term_memory_item import ShortTermMemoryItem
from crewai.tools.tool_usage import ToolUsage, ToolUsageErrorException
from crewai.utilities import I18N
from crewai.utilities.converter import ConverterError
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.constants import TRAINING_DATA_FILE
from crewai.utilities.training_handler import CrewTrainingHandler
class CrewAgentExecutor(AgentExecutor):
class CrewAgentExecutor(AgentExecutor, CrewAgentExecutorMixin):
_i18n: I18N = I18N()
should_ask_for_human_input: bool = False
llm: Any = None
@@ -38,67 +35,12 @@ class CrewAgentExecutor(AgentExecutor):
tools_handler: Optional[InstanceOf[ToolsHandler]] = None
max_iterations: Optional[int] = 15
have_forced_answer: bool = False
force_answer_max_iterations: Optional[int] = None
force_answer_max_iterations: Optional[int] = None # type: ignore # Incompatible types in assignment (expression has type "int | None", base class "CrewAgentExecutorMixin" defined the type as "int")
step_callback: Optional[Any] = None
system_template: Optional[str] = None
prompt_template: Optional[str] = None
response_template: Optional[str] = None
@root_validator()
def set_force_answer_max_iterations(cls, values: Dict) -> Dict:
values["force_answer_max_iterations"] = values["max_iterations"] - 2
return values
def _should_force_answer(self) -> bool:
return (
self.iterations == self.force_answer_max_iterations
) and not self.have_forced_answer
def _create_short_term_memory(self, output) -> None:
if (
self.crew
and self.crew.memory
and "Action: Delegate work to co-worker" not in output.log
):
memory = ShortTermMemoryItem(
data=output.log,
agent=self.crew_agent.role,
metadata={
"observation": self.task.description,
},
)
self.crew._short_term_memory.save(memory)
def _create_long_term_memory(self, output) -> None:
if self.crew and self.crew.memory:
ltm_agent = TaskEvaluator(self.crew_agent)
evaluation = ltm_agent.evaluate(self.task, output.log)
if isinstance(evaluation, ConverterError):
return
long_term_memory = LongTermMemoryItem(
task=self.task.description,
agent=self.crew_agent.role,
quality=evaluation.quality,
datetime=str(time.time()),
expected_output=self.task.expected_output,
metadata={
"suggestions": evaluation.suggestions,
"quality": evaluation.quality,
},
)
self.crew._long_term_memory.save(long_term_memory)
for entity in evaluation.entities:
entity_memory = EntityMemoryItem(
name=entity.name,
type=entity.type,
description=entity.description,
relationships="\n".join([f"- {r}" for r in entity.relationships]),
)
self.crew._entity_memory.save(entity_memory)
def _call(
self,
inputs: Dict[str, str],
@@ -246,12 +188,17 @@ class CrewAgentExecutor(AgentExecutor):
# If the tool chosen is the finishing tool, then we end and return.
if isinstance(output, AgentFinish):
if self.should_ask_for_human_input:
human_feedback = self._ask_human_input(output.return_values["output"])
if self.crew and self.crew._train:
self._handle_crew_training_output(output, human_feedback)
# Making sure we only ask for it once, so disabling for the next thought loop
self.should_ask_for_human_input = False
human_feedback = self._ask_human_input(output.return_values["output"])
action = AgentAction(
tool="Human Input", tool_input=human_feedback, log=output.log
)
yield AgentStep(
action=action,
observation=self._i18n.slice("human_feedback").format(
@@ -261,6 +208,9 @@ class CrewAgentExecutor(AgentExecutor):
return
else:
if self.crew and self.crew._train:
self._handle_crew_training_output(output)
yield output
return
@@ -282,6 +232,7 @@ class CrewAgentExecutor(AgentExecutor):
tools_names=self.tools_names,
function_calling_llm=self.function_calling_llm,
task=self.task,
agent=self.crew_agent,
action=agent_action,
)
tool_calling = tool_usage.parse(agent_action.log)
@@ -291,6 +242,8 @@ class CrewAgentExecutor(AgentExecutor):
else:
if tool_calling.tool_name.casefold().strip() in [
name.casefold().strip() for name in name_to_tool_map
] or tool_calling.tool_name.casefold().replace("_", " ") in [
name.casefold().strip() for name in name_to_tool_map
]:
observation = tool_usage.use(tool_calling, agent_action.log)
else:
@@ -300,8 +253,30 @@ class CrewAgentExecutor(AgentExecutor):
)
yield AgentStep(action=agent_action, observation=observation)
def _ask_human_input(self, final_answer: dict) -> str:
"""Get human input."""
return input(
self._i18n.slice("getting_input").format(final_answer=final_answer)
)
def _handle_crew_training_output(
self, output: AgentFinish, human_feedback: str | None = None
) -> None:
"""Function to handle the process of the training data."""
agent_id = str(self.crew_agent.id)
if (
CrewTrainingHandler(TRAINING_DATA_FILE).load()
and not self.should_ask_for_human_input
):
training_data = CrewTrainingHandler(TRAINING_DATA_FILE).load()
if training_data.get(agent_id):
training_data[agent_id][self.crew._train_iteration][
"improved_output"
] = output.return_values["output"]
CrewTrainingHandler(TRAINING_DATA_FILE).save(training_data)
if self.should_ask_for_human_input and human_feedback is not None:
training_data = {
"initial_output": output.return_values["output"],
"human_feedback": human_feedback,
"agent": agent_id,
"agent_role": self.crew_agent.role,
}
CrewTrainingHandler(TRAINING_DATA_FILE).append(
self.crew._train_iteration, agent_id, training_data
)

View File

@@ -1,6 +1,7 @@
import re
from typing import Any, Union
from json_repair import repair_json
from langchain.agents.output_parsers import ReActSingleInputOutputParser
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.exceptions import OutputParserException
@@ -48,11 +49,15 @@ class CrewAgentParser(ReActSingleInputOutputParser):
raise OutputParserException(
f"{FINAL_ANSWER_AND_PARSABLE_ACTION_ERROR_MESSAGE}: {text}"
)
action = action_match.group(1).strip()
action_input = action_match.group(2)
tool_input = action_input.strip(" ")
tool_input = tool_input.strip('"')
return AgentAction(action, tool_input, text)
action = action_match.group(1)
clean_action = self._clean_action(action)
action_input = action_match.group(2).strip()
tool_input = action_input.strip(" ").strip('"')
safe_tool_input = self._safe_repair_json(tool_input)
return AgentAction(clean_action, safe_tool_input, text)
elif includes_answer:
return AgentFinish(
@@ -87,3 +92,30 @@ class CrewAgentParser(ReActSingleInputOutputParser):
llm_output=text,
send_to_llm=True,
)
def _clean_action(self, text: str) -> str:
"""Clean action string by removing non-essential formatting characters."""
return re.sub(r"^\s*\*+\s*|\s*\*+\s*$", "", text).strip()
def _safe_repair_json(self, tool_input: str) -> str:
UNABLE_TO_REPAIR_JSON_RESULTS = ['""', "{}"]
# Skip repair if the input starts and ends with square brackets
# Explanation: The JSON parser has issues handling inputs that are enclosed in square brackets ('[]').
# These are typically valid JSON arrays or strings that do not require repair. Attempting to repair such inputs
# might lead to unintended alterations, such as wrapping the entire input in additional layers or modifying
# the structure in a way that changes its meaning. By skipping the repair for inputs that start and end with
# square brackets, we preserve the integrity of these valid JSON structures and avoid unnecessary modifications.
if tool_input.startswith("[") and tool_input.endswith("]"):
return tool_input
# Before repair, handle common LLM issues:
# 1. Replace """ with " to avoid JSON parser errors
tool_input = tool_input.replace('"""', '"')
result = repair_json(tool_input)
if result in UNABLE_TO_REPAIR_JSON_RESULTS:
return tool_input
return str(result)

View File

@@ -1,7 +1,14 @@
import click
import pkg_resources
from crewai.memory.storage.kickoff_task_outputs_storage import (
KickoffTaskOutputsSQLiteStorage,
)
from .create_crew import create_crew
from .replay_from_task import replay_task_command
from .reset_memories_command import reset_memories_command
from .test_crew import test_crew
from .train_crew import train_crew
@@ -48,5 +55,97 @@ def train(n_iterations: int):
train_crew(n_iterations)
@crewai.command()
@click.option(
"-t",
"--task_id",
type=str,
help="Replay the crew from this task ID, including all subsequent tasks.",
)
def replay(task_id: str) -> None:
"""
Replay the crew execution from a specific task.
Args:
task_id (str): The ID of the task to replay from.
"""
try:
click.echo(f"Replaying the crew from task {task_id}")
replay_task_command(task_id)
except Exception as e:
click.echo(f"An error occurred while replaying: {e}", err=True)
@crewai.command()
def log_tasks_outputs() -> None:
"""
Retrieve your latest crew.kickoff() task outputs.
"""
try:
storage = KickoffTaskOutputsSQLiteStorage()
tasks = storage.load()
if not tasks:
click.echo(
"No task outputs found. Only crew kickoff task outputs are logged."
)
return
for index, task in enumerate(tasks, 1):
click.echo(f"Task {index}: {task['task_id']}")
click.echo(f"Description: {task['expected_output']}")
click.echo("------")
except Exception as e:
click.echo(f"An error occurred while logging task outputs: {e}", err=True)
@crewai.command()
@click.option("-l", "--long", is_flag=True, help="Reset LONG TERM memory")
@click.option("-s", "--short", is_flag=True, help="Reset SHORT TERM memory")
@click.option("-e", "--entities", is_flag=True, help="Reset ENTITIES memory")
@click.option(
"-k",
"--kickoff-outputs",
is_flag=True,
help="Reset LATEST KICKOFF TASK OUTPUTS",
)
@click.option("-a", "--all", is_flag=True, help="Reset ALL memories")
def reset_memories(long, short, entities, kickoff_outputs, all):
"""
Reset the crew memories (long, short, entity, latest_crew_kickoff_ouputs). This will delete all the data saved.
"""
try:
if not all and not (long or short or entities or kickoff_outputs):
click.echo(
"Please specify at least one memory type to reset using the appropriate flags."
)
return
reset_memories_command(long, short, entities, kickoff_outputs, all)
except Exception as e:
click.echo(f"An error occurred while resetting memories: {e}", err=True)
@crewai.command()
@click.option(
"-n",
"--n_iterations",
type=int,
default=3,
help="Number of iterations to Test the crew",
)
@click.option(
"-m",
"--model",
type=str,
default="gpt-4o-mini",
help="LLM Model to run the tests on the Crew. For now only accepting only OpenAI models.",
)
def test(n_iterations: int, model: str):
"""Test the crew and evaluate the results."""
click.echo(f"Testing the crew for {n_iterations} iterations with model {model}")
test_crew(n_iterations, model)
if __name__ == "__main__":
crewai()

View File

@@ -0,0 +1,24 @@
import subprocess
import click
def replay_task_command(task_id: str) -> None:
"""
Replay the crew execution from a specific task.
Args:
task_id (str): The ID of the task to replay from.
"""
command = ["poetry", "run", "replay", task_id]
try:
result = subprocess.run(command, capture_output=False, text=True, check=True)
if result.stderr:
click.echo(result.stderr, err=True)
except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while replaying the task: {e}", err=True)
click.echo(e.output, err=True)
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)

View File

@@ -0,0 +1,45 @@
import subprocess
import click
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
def reset_memories_command(long, short, entity, kickoff_outputs, all) -> None:
"""
Replay the crew execution from a specific task.
Args:
task_id (str): The ID of the task to replay from.
"""
try:
if all:
ShortTermMemory().reset()
EntityMemory().reset()
LongTermMemory().reset()
TaskOutputStorageHandler().reset()
click.echo("All memories have been reset.")
else:
if long:
LongTermMemory().reset()
click.echo("Long term memory has been reset.")
if short:
ShortTermMemory().reset()
click.echo("Short term memory has been reset.")
if entity:
EntityMemory().reset()
click.echo("Entity memory has been reset.")
if kickoff_outputs:
TaskOutputStorageHandler().reset()
click.echo("Latest Kickoff outputs stored has been reset.")
except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while resetting the memories: {e}", err=True)
click.echo(e.output, err=True)
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)

View File

@@ -5,6 +5,7 @@ research_task:
the current year is 2024.
expected_output: >
A list with 10 bullet points of the most relevant information about {topic}
agent: researcher
reporting_task:
description: >
@@ -12,4 +13,5 @@ reporting_task:
Make sure the report is detailed and contains any and all relevant information.
expected_output: >
A fully fledge reports with the mains topics, each with a full section of information.
Formated as markdown with out '```'
Formatted as markdown without '```'
agent: reporting_analyst

View File

@@ -32,14 +32,12 @@ class {{crew_name}}Crew():
def research_task(self) -> Task:
return Task(
config=self.tasks_config['research_task'],
agent=self.researcher()
)
@task
def reporting_task(self) -> Task:
return Task(
config=self.tasks_config['reporting_task'],
agent=self.reporting_analyst(),
output_file='report.md'
)

View File

@@ -2,9 +2,15 @@
import sys
from {{folder_name}}.crew import {{crew_name}}Crew
# This main file is intended to be a way for your to run your
# crew locally, so refrain from adding necessary logic into this file.
# Replace with inputs you want to test with, it will automatically
# interpolate any tasks and agents information
def run():
# Replace with your inputs, it will automatically interpolate any tasks and agents information
"""
Run the crew.
"""
inputs = {
'topic': 'AI LLMs'
}
@@ -15,8 +21,34 @@ def train():
"""
Train the crew for a given number of iterations.
"""
inputs = {
"topic": "AI LLMs"
}
try:
{{crew_name}}Crew().crew().train(n_iterations=int(sys.argv[1]))
{{crew_name}}Crew().crew().train(n_iterations=int(sys.argv[1]), inputs=inputs)
except Exception as e:
raise Exception(f"An error occurred while training the crew: {e}")
def replay():
"""
Replay the crew execution from a specific task.
"""
try:
{{crew_name}}Crew().crew().replay(task_id=sys.argv[1])
except Exception as e:
raise Exception(f"An error occurred while replaying the crew: {e}")
def test():
"""
Test the crew execution and returns the results.
"""
inputs = {
"topic": "AI LLMs"
}
try:
{{crew_name}}Crew().crew().test(n_iterations=int(sys.argv[1]), model=sys.argv[2], inputs=inputs)
except Exception as e:
raise Exception(f"An error occurred while replaying the crew: {e}")

View File

@@ -6,11 +6,13 @@ authors = ["Your Name <you@example.com>"]
[tool.poetry.dependencies]
python = ">=3.10,<=3.13"
crewai = { extras = ["tools"], version = "^0.30.11" }
crewai = { extras = ["tools"], version = "^0.41.1" }
[tool.poetry.scripts]
{{folder_name}} = "{{folder_name}}.main:run"
train = "{{folder_name}}.main:train"
replay = "{{folder_name}}.main:replay"
test = "{{folder_name}}.main:test"
[build-system]
requires = ["poetry-core"]

View File

@@ -0,0 +1,30 @@
import subprocess
import click
def test_crew(n_iterations: int, model: str) -> None:
"""
Test the crew by running a command in the Poetry environment.
Args:
n_iterations (int): The number of iterations to test the crew.
model (str): The model to test the crew with.
"""
command = ["poetry", "run", "test", str(n_iterations), model]
try:
if n_iterations <= 0:
raise ValueError("The number of iterations must be a positive integer.")
result = subprocess.run(command, capture_output=False, text=True, check=True)
if result.stderr:
click.echo(result.stderr, err=True)
except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while testing the crew: {e}", err=True)
click.echo(e.output, err=True)
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)

View File

@@ -1,6 +1,9 @@
import asyncio
import json
import uuid
from typing import Any, Dict, List, Optional, Union
from concurrent.futures import Future
from hashlib import md5
from typing import Any, Dict, List, Optional, Tuple, Union
from langchain_core.callbacks import BaseCallbackHandler
from pydantic import (
@@ -17,15 +20,31 @@ from pydantic import (
from pydantic_core import PydanticCustomError
from crewai.agent import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.agents.cache import CacheHandler
from crewai.crews.crew_output import CrewOutput
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.process import Process
from crewai.task import Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry import Telemetry
from crewai.tools.agent_tools import AgentTools
from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.constants import (
TRAINED_AGENTS_DATA_FILE,
TRAINING_DATA_FILE,
)
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs,
aggregate_raw_outputs_from_tasks,
)
from crewai.utilities.planning_handler import CrewPlanner
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
from crewai.utilities.training_handler import CrewTrainingHandler
try:
import agentops
@@ -52,10 +71,10 @@ class Crew(BaseModel):
max_rpm: Maximum number of requests per minute for the crew execution to be respected.
prompt_file: Path to the prompt json file to be used for the crew.
id: A unique identifier for the crew instance.
full_output: Whether the crew should return the full output with all tasks outputs or just the final output.
task_callback: Callback to be executed after each task for every agents execution.
step_callback: Callback to be executed after each step for every agents execution.
share_crew: Whether you want to share the complete crew information and execution with crewAI to make the library better, and allow us to train models.
planning: Plan the crew execution and add the plan to the crew.
"""
__hash__ = object.__hash__ # type: ignore
@@ -67,11 +86,20 @@ class Crew(BaseModel):
_short_term_memory: Optional[InstanceOf[ShortTermMemory]] = PrivateAttr()
_long_term_memory: Optional[InstanceOf[LongTermMemory]] = PrivateAttr()
_entity_memory: Optional[InstanceOf[EntityMemory]] = PrivateAttr()
_train: Optional[bool] = PrivateAttr(default=False)
_train_iteration: Optional[int] = PrivateAttr()
_inputs: Optional[Dict[str, Any]] = PrivateAttr(default=None)
_logging_color: str = PrivateAttr(
default="bold_purple",
)
_task_output_handler: TaskOutputStorageHandler = PrivateAttr(
default_factory=TaskOutputStorageHandler
)
cache: bool = Field(default=True)
model_config = ConfigDict(arbitrary_types_allowed=True)
tasks: List[Task] = Field(default_factory=list)
agents: List[Agent] = Field(default_factory=list)
agents: List[BaseAgent] = Field(default_factory=list)
process: Process = Field(default=Process.sequential)
verbose: Union[int, bool] = Field(default=0)
memory: bool = Field(
@@ -86,14 +114,10 @@ class Crew(BaseModel):
default=None,
description="Metrics for the LLM usage during all tasks execution.",
)
full_output: Optional[bool] = Field(
default=False,
description="Whether the crew should return the full output with all tasks outputs or just the final output.",
)
manager_llm: Optional[Any] = Field(
description="Language model that will run the agent.", default=None
)
manager_agent: Optional[Any] = Field(
manager_agent: Optional[BaseAgent] = Field(
description="Custom agent that will be used as manager.", default=None
)
manager_callbacks: Optional[List[InstanceOf[BaseCallbackHandler]]] = Field(
@@ -126,6 +150,18 @@ class Crew(BaseModel):
default=False,
description="output_log_file",
)
planning: Optional[bool] = Field(
default=False,
description="Plan the crew execution and add the plan to the crew.",
)
task_execution_output_json_files: Optional[List[str]] = Field(
default=None,
description="List of file paths for task execution JSON files.",
)
execution_logs: List[Dict[str, Any]] = Field(
default=[],
description="List of execution logs for tasks",
)
@field_validator("id", mode="before")
@classmethod
@@ -161,7 +197,6 @@ class Crew(BaseModel):
self._rpm_controller = RPMController(max_rpm=self.max_rpm, logger=self._logger)
self._telemetry = Telemetry()
self._telemetry.set_tracer()
self._telemetry.crew_creation(self)
return self
@model_validator(mode="after")
@@ -218,6 +253,120 @@ class Crew(BaseModel):
agent.set_rpm_controller(self._rpm_controller)
return self
@model_validator(mode="after")
def validate_tasks(self):
if self.process == Process.sequential:
for task in self.tasks:
if task.agent is None:
raise PydanticCustomError(
"missing_agent_in_task",
f"Sequential process error: Agent is missing in the task with the following description: {task.description}", # type: ignore # Argument of type "str" cannot be assigned to parameter "message_template" of type "LiteralString"
{},
)
return self
@model_validator(mode="after")
def check_tasks_in_hierarchical_process_not_async(self):
"""Validates that the tasks in hierarchical process are not flagged with async_execution."""
if self.process == Process.hierarchical:
for task in self.tasks:
if task.async_execution:
raise PydanticCustomError(
"async_execution_in_hierarchical_process",
"Hierarchical process error: Tasks cannot be flagged with async_execution.",
{},
)
return self
@model_validator(mode="after")
def validate_end_with_at_most_one_async_task(self):
"""Validates that the crew ends with at most one asynchronous task."""
final_async_task_count = 0
# Traverse tasks backward
for task in reversed(self.tasks):
if task.async_execution:
final_async_task_count += 1
else:
break # Stop traversing as soon as a non-async task is encountered
if final_async_task_count > 1:
raise PydanticCustomError(
"async_task_count",
"The crew must end with at most one asynchronous task.",
{},
)
return self
@model_validator(mode="after")
def validate_first_task(self) -> "Crew":
"""Ensure the first task is not a ConditionalTask."""
if self.tasks and isinstance(self.tasks[0], ConditionalTask):
raise PydanticCustomError(
"invalid_first_task",
"The first task cannot be a ConditionalTask.",
{},
)
return self
@model_validator(mode="after")
def validate_async_tasks_not_async(self) -> "Crew":
"""Ensure that ConditionalTask is not async."""
for task in self.tasks:
if task.async_execution and isinstance(task, ConditionalTask):
raise PydanticCustomError(
"invalid_async_conditional_task",
f"Conditional Task: {task.description} , cannot be executed asynchronously.", # type: ignore # Argument of type "str" cannot be assigned to parameter "message_template" of type "LiteralString"
{},
)
return self
@model_validator(mode="after")
def validate_async_task_cannot_include_sequential_async_tasks_in_context(self):
"""
Validates that if a task is set to be executed asynchronously,
it cannot include other asynchronous tasks in its context unless
separated by a synchronous task.
"""
for i, task in enumerate(self.tasks):
if task.async_execution and task.context:
for context_task in task.context:
if context_task.async_execution:
for j in range(i - 1, -1, -1):
if self.tasks[j] == context_task:
raise ValueError(
f"Task '{task.description}' is asynchronous and cannot include other sequential asynchronous tasks in its context."
)
if not self.tasks[j].async_execution:
break
return self
@model_validator(mode="after")
def validate_context_no_future_tasks(self):
"""Validates that a task's context does not include future tasks."""
task_indices = {id(task): i for i, task in enumerate(self.tasks)}
for task in self.tasks:
if task.context:
for context_task in task.context:
if id(context_task) not in task_indices:
continue # Skip context tasks not in the main tasks list
if task_indices[id(context_task)] > task_indices[id(task)]:
raise ValueError(
f"Task '{task.description}' has a context dependency on a future task '{context_task.description}', which is not allowed."
)
return self
@property
def key(self) -> str:
source = [agent.key for agent in self.agents] + [
task.key for task in self.tasks
]
return md5("|".join(source).encode()).hexdigest()
def _setup_from_config(self):
assert self.config is not None, "Config should not be None."
@@ -246,102 +395,226 @@ class Crew(BaseModel):
del task_config["agent"]
return Task(**task_config, agent=task_agent)
def kickoff(self, inputs: Optional[Dict[str, Any]] = {}) -> str:
def _setup_for_training(self) -> None:
"""Sets up the crew for training."""
self._train = True
raise DeprecationWarning('🚨 This fork has been deprecated. Please use the official Crew package instead. https://github.com/joaomdmoura/crewAI')
for task in self.tasks:
task.human_input = True
for agent in self.agents:
agent.allow_delegation = False
CrewTrainingHandler(TRAINING_DATA_FILE).initialize_file()
CrewTrainingHandler(TRAINED_AGENTS_DATA_FILE).initialize_file()
def train(self, n_iterations: int, inputs: Optional[Dict[str, Any]] = {}) -> None:
"""Trains the crew for a given number of iterations."""
self._setup_for_training()
for n_iteration in range(n_iterations):
self._train_iteration = n_iteration
self.kickoff(inputs=inputs)
training_data = CrewTrainingHandler(TRAINING_DATA_FILE).load()
for agent in self.agents:
result = TaskEvaluator(agent).evaluate_training_data(
training_data=training_data, agent_id=str(agent.id)
)
CrewTrainingHandler(TRAINED_AGENTS_DATA_FILE).save_trained_data(
agent_id=str(agent.role), trained_data=result.model_dump()
)
def kickoff(
self,
inputs: Optional[Dict[str, Any]] = None,
) -> CrewOutput:
"""Starts the crew to work on its assigned tasks."""
self._execution_span = self._telemetry.crew_execution_span(self)
self._interpolate_inputs(inputs) # type: ignore # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
self._execution_span = self._telemetry.crew_execution_span(self, inputs)
self._task_output_handler.reset()
self._logging_color = "bold_purple"
if inputs is not None:
self._inputs = inputs
self._interpolate_inputs(inputs)
self._set_tasks_callbacks()
i18n = I18N(prompt_file=self.prompt_file)
if agentops:
agentops.set_parent_key("daebe730-f54d-4af5-98df-e6946fb76d13")
agentops.add_tags(["crewai"])
for agent in self.agents:
agent.i18n = i18n
agent.crew = self
# type: ignore[attr-defined] # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
agent.crew = self # type: ignore[attr-defined]
# TODO: Create an AgentFunctionCalling protocol for future refactoring
if not agent.function_calling_llm: # type: ignore # "BaseAgent" has no attribute "function_calling_llm"
agent.function_calling_llm = self.function_calling_llm # type: ignore # "BaseAgent" has no attribute "function_calling_llm"
if not agent.function_calling_llm:
agent.function_calling_llm = self.function_calling_llm
if not agent.step_callback:
agent.step_callback = self.step_callback
if agent.allow_code_execution: # type: ignore # BaseAgent" has no attribute "allow_code_execution"
agent.tools += agent.get_code_execution_tools() # type: ignore # "BaseAgent" has no attribute "get_code_execution_tools"; maybe "get_delegation_tools"?
if not agent.step_callback: # type: ignore # "BaseAgent" has no attribute "step_callback"
agent.step_callback = self.step_callback # type: ignore # "BaseAgent" has no attribute "step_callback"
agent.create_agent_executor()
if self.planning:
self._handle_crew_planning()
metrics = []
if self.process == Process.sequential:
result = self._run_sequential_process()
elif self.process == Process.hierarchical:
result, manager_metrics = self._run_hierarchical_process() # type: ignore # Unpacking a string is disallowed
metrics.append(manager_metrics) # type: ignore # Cannot determine type of "manager_metrics"
result = self._run_hierarchical_process()
else:
raise NotImplementedError(
f"The process '{self.process}' is not implemented yet."
)
metrics += [agent._token_process.get_summary() for agent in self.agents]
metrics = metrics + [
agent._token_process.get_summary() for agent in self.agents
]
self.usage_metrics = {
key: sum([m[key] for m in metrics if m is not None]) for key in metrics[0]
}
return result
def train(self, n_iterations: int) -> None:
# TODO: Implement training
pass
def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]:
"""Executes the Crew's workflow for each input in the list and aggregates results."""
results: List[CrewOutput] = []
def _run_sequential_process(self) -> str:
"""Executes tasks sequentially and returns the final output."""
task_output = ""
for task in self.tasks:
if task.agent.allow_delegation: # type: ignore # Item "None" of "Agent | None" has no attribute "allow_delegation"
agents_for_delegation = [
agent for agent in self.agents if agent != task.agent
]
if len(self.agents) > 1 and len(agents_for_delegation) > 0:
task.tools += AgentTools(agents=agents_for_delegation).tools()
# Initialize the parent crew's usage metrics
total_usage_metrics = {
"total_tokens": 0,
"prompt_tokens": 0,
"completion_tokens": 0,
"successful_requests": 0,
}
role = task.agent.role if task.agent is not None else "None"
self._logger.log("debug", f"== Working Agent: {role}", color="bold_purple")
for input_data in inputs:
crew = self.copy()
output = crew.kickoff(inputs=input_data)
if crew.usage_metrics:
for key in total_usage_metrics:
total_usage_metrics[key] += crew.usage_metrics.get(key, 0)
results.append(output)
self.usage_metrics = total_usage_metrics
self._task_output_handler.reset()
return results
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = {}) -> CrewOutput:
"""Asynchronous kickoff method to start the crew execution."""
return await asyncio.to_thread(self.kickoff, inputs)
async def kickoff_for_each_async(self, inputs: List[Dict]) -> List[CrewOutput]:
crew_copies = [self.copy() for _ in inputs]
async def run_crew(crew, input_data):
return await crew.kickoff_async(inputs=input_data)
tasks = [
asyncio.create_task(run_crew(crew_copies[i], inputs[i]))
for i in range(len(inputs))
]
tasks = [
asyncio.create_task(run_crew(crew_copies[i], inputs[i]))
for i in range(len(inputs))
]
results = await asyncio.gather(*tasks)
total_usage_metrics = {
"total_tokens": 0,
"prompt_tokens": 0,
"completion_tokens": 0,
"successful_requests": 0,
}
for crew in crew_copies:
if crew.usage_metrics:
for key in total_usage_metrics:
total_usage_metrics[key] += crew.usage_metrics.get(key, 0)
self.usage_metrics = total_usage_metrics
total_usage_metrics = {
"total_tokens": 0,
"prompt_tokens": 0,
"completion_tokens": 0,
"successful_requests": 0,
}
for crew in crew_copies:
if crew.usage_metrics:
for key in total_usage_metrics:
total_usage_metrics[key] += crew.usage_metrics.get(key, 0)
self.usage_metrics = total_usage_metrics
self._task_output_handler.reset()
return results
def _handle_crew_planning(self):
"""Handles the Crew planning."""
self._logger.log("info", "Planning the crew execution")
result = CrewPlanner(self.tasks)._handle_crew_planning()
if result is not None and hasattr(result, "list_of_plans_per_task"):
for task, step_plan in zip(self.tasks, result.list_of_plans_per_task):
task.description += step_plan
else:
self._logger.log(
"info", f"== Starting Task: {task.description}", color="bold_purple"
"info", "Something went wrong with the planning process of the Crew"
)
if self.output_log_file:
self._file_handler.log(
agent=role, task=task.description, status="started"
)
def _store_execution_log(
self,
task: Task,
output: TaskOutput,
task_index: int,
was_replayed: bool = False,
):
if self._inputs:
inputs = self._inputs
else:
inputs = {}
output = task.execute(context=task_output)
if not task.async_execution:
task_output = output
log = {
"task": task,
"output": {
"description": output.description,
"summary": output.summary,
"raw": output.raw,
"pydantic": output.pydantic,
"json_dict": output.json_dict,
"output_format": output.output_format,
"agent": output.agent,
},
"task_index": task_index,
"inputs": inputs,
"was_replayed": was_replayed,
}
self._task_output_handler.update(task_index, log)
role = task.agent.role if task.agent is not None else "None"
self._logger.log("debug", f"== [{role}] Task output: {task_output}\n\n")
def _run_sequential_process(self) -> CrewOutput:
"""Executes tasks sequentially and returns the final output."""
return self._execute_tasks(self.tasks)
if self.output_log_file:
self._file_handler.log(agent=role, task=task_output, status="completed")
self._finish_execution(task_output)
return self._format_output(task_output)
def _run_hierarchical_process(self) -> str:
def _run_hierarchical_process(self) -> CrewOutput:
"""Creates and assigns a manager agent to make sure the crew completes the tasks."""
self._create_manager_agent()
return self._execute_tasks(self.tasks, self.manager_agent)
def _create_manager_agent(self):
i18n = I18N(prompt_file=self.prompt_file)
if self.manager_agent is not None:
self.manager_agent.allow_delegation = True
manager = self.manager_agent
if len(manager.tools) > 0:
if manager.tools is not None and len(manager.tools) > 0:
raise Exception("Manager agent should not have tools")
manager.tools = AgentTools(agents=self.agents).tools()
manager.tools = self.manager_agent.get_delegation_tools(self.agents)
else:
manager = Agent(
role=i18n.retrieve("hierarchical_manager_agent", "role"),
@@ -349,32 +622,298 @@ class Crew(BaseModel):
backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"),
tools=AgentTools(agents=self.agents).tools(),
llm=self.manager_llm,
verbose=True,
verbose=self.verbose,
)
self.manager_agent = manager
task_output = ""
for task in self.tasks:
self._logger.log("debug", f"Working Agent: {manager.role}")
self._logger.log("info", f"Starting Task: {task.description}")
def _execute_tasks(
self,
tasks: List[Task],
manager: Optional[BaseAgent] = None,
start_index: Optional[int] = 0,
was_replayed: bool = False,
) -> CrewOutput:
"""Executes tasks sequentially and returns the final output.
if self.output_log_file:
self._file_handler.log(
agent=manager.role, task=task.description, status="started"
Args:
tasks (List[Task]): List of tasks to execute
manager (Optional[BaseAgent], optional): Manager agent to use for delegation. Defaults to None.
Returns:
CrewOutput: Final output of the crew
"""
task_outputs: List[TaskOutput] = []
futures: List[Tuple[Task, Future[TaskOutput], int]] = []
last_sync_output: Optional[TaskOutput] = None
for task_index, task in enumerate(tasks):
if start_index is not None and task_index < start_index:
if task.output:
if task.async_execution:
task_outputs.append(task.output)
else:
task_outputs = [task.output]
last_sync_output = task.output
continue
agent_to_use = self._get_agent_to_use(task, manager)
if agent_to_use is None:
raise ValueError(
f"No agent available for task: {task.description}. Ensure that either the task has an assigned agent or a manager agent is provided."
)
task_output = task.execute(
agent=manager, context=task_output, tools=manager.tools
)
self._prepare_agent_tools(task, manager)
self._log_task_start(task, agent_to_use.role)
self._logger.log("debug", f"[{manager.role}] Task output: {task_output}")
if self.output_log_file:
self._file_handler.log(
agent=manager.role, task=task_output, status="completed"
if isinstance(task, ConditionalTask):
skipped_task_output = self._handle_conditional_task(
task, task_outputs, futures, task_index, was_replayed
)
if skipped_task_output:
continue
self._finish_execution(task_output)
return self._format_output(task_output), manager._token_process.get_summary() # type: ignore # Incompatible return value type (got "tuple[str, Any]", expected "str")
if task.async_execution:
context = self._get_context(
task, [last_sync_output] if last_sync_output else []
)
future = task.execute_async(
agent=agent_to_use,
context=context,
tools=agent_to_use.tools,
)
futures.append((task, future, task_index))
else:
if futures:
task_outputs = self._process_async_tasks(futures, was_replayed)
futures.clear()
context = self._get_context(task, task_outputs)
task_output = task.execute_sync(
agent=agent_to_use,
context=context,
tools=agent_to_use.tools,
)
task_outputs = [task_output]
self._process_task_result(task, task_output)
self._store_execution_log(task, task_output, task_index, was_replayed)
if futures:
task_outputs = self._process_async_tasks(futures, was_replayed)
return self._create_crew_output(task_outputs)
def _handle_conditional_task(
self,
task: ConditionalTask,
task_outputs: List[TaskOutput],
futures: List[Tuple[Task, Future[TaskOutput], int]],
task_index: int,
was_replayed: bool,
) -> Optional[TaskOutput]:
if futures:
task_outputs = self._process_async_tasks(futures, was_replayed)
futures.clear()
previous_output = task_outputs[task_index - 1] if task_outputs else None
if previous_output is not None and not task.should_execute(previous_output):
self._logger.log(
"debug",
f"Skipping conditional task: {task.description}",
color="yellow",
)
skipped_task_output = task.get_skipped_task_output()
if not was_replayed:
self._store_execution_log(task, skipped_task_output, task_index)
return skipped_task_output
return None
def _prepare_agent_tools(self, task: Task, manager: Optional[BaseAgent]):
if self.process == Process.hierarchical:
if manager:
self._update_manager_tools(task, manager)
else:
raise ValueError("Manager agent is required for hierarchical process.")
elif task.agent and task.agent.allow_delegation:
self._add_delegation_tools(task)
def _get_agent_to_use(
self, task: Task, manager: Optional[BaseAgent]
) -> Optional[BaseAgent]:
if self.process == Process.hierarchical:
return manager
return task.agent
def _add_delegation_tools(self, task: Task):
agents_for_delegation = [agent for agent in self.agents if agent != task.agent]
if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent:
delegation_tools = task.agent.get_delegation_tools(agents_for_delegation)
# Add tools if they are not already in task.tools
for new_tool in delegation_tools:
# Find the index of the tool with the same name
existing_tool_index = next(
(
index
for index, tool in enumerate(task.tools or [])
if tool.name == new_tool.name
),
None,
)
if not task.tools:
task.tools = []
if existing_tool_index is not None:
# Replace the existing tool
task.tools[existing_tool_index] = new_tool
else:
# Add the new tool
task.tools.append(new_tool)
def _log_task_start(self, task: Task, role: str = "None"):
color = self._logging_color
self._logger.log("debug", f"== Working Agent: {role}", color=color)
self._logger.log("info", f"== Starting Task: {task.description}", color=color)
if self.output_log_file:
self._file_handler.log(agent=role, task=task.description, status="started")
def _update_manager_tools(self, task: Task, manager: BaseAgent):
if task.agent:
manager.tools = task.agent.get_delegation_tools([task.agent])
else:
manager.tools = manager.get_delegation_tools(self.agents)
def _get_context(self, task: Task, task_outputs: List[TaskOutput]):
context = (
aggregate_raw_outputs_from_tasks(task.context)
if task.context
else aggregate_raw_outputs_from_task_outputs(task_outputs)
)
return context
def _process_task_result(self, task: Task, output: TaskOutput) -> None:
role = task.agent.role if task.agent is not None else "None"
self._logger.log("debug", f"== [{role}] Task output: {output}\n\n")
if self.output_log_file:
self._file_handler.log(agent=role, task=output, status="completed")
def _create_crew_output(self, task_outputs: List[TaskOutput]) -> CrewOutput:
if len(task_outputs) != 1:
raise ValueError(
"Something went wrong. Kickoff should return only one task output."
)
final_task_output = task_outputs[0]
final_string_output = final_task_output.raw
self._finish_execution(final_string_output)
token_usage = self.calculate_usage_metrics()
return CrewOutput(
raw=final_task_output.raw,
pydantic=final_task_output.pydantic,
json_dict=final_task_output.json_dict,
tasks_output=[task.output for task in self.tasks if task.output],
token_usage=token_usage,
)
def _process_async_tasks(
self,
futures: List[Tuple[Task, Future[TaskOutput], int]],
was_replayed: bool = False,
) -> List[TaskOutput]:
task_outputs: List[TaskOutput] = []
for future_task, future, task_index in futures:
task_output = future.result()
task_outputs.append(task_output)
self._process_task_result(future_task, task_output)
self._store_execution_log(
future_task, task_output, task_index, was_replayed
)
return task_outputs
def _find_task_index(
self, task_id: str, stored_outputs: List[Any]
) -> Optional[int]:
return next(
(
index
for (index, d) in enumerate(stored_outputs)
if d["task_id"] == str(task_id)
),
None,
)
def replay(
self, task_id: str, inputs: Optional[Dict[str, Any]] = None
) -> CrewOutput:
stored_outputs = self._task_output_handler.load()
if not stored_outputs:
raise ValueError(f"Task with id {task_id} not found in the crew's tasks.")
start_index = self._find_task_index(task_id, stored_outputs)
if start_index is None:
raise ValueError(f"Task with id {task_id} not found in the crew's tasks.")
replay_inputs = (
inputs if inputs is not None else stored_outputs[start_index]["inputs"]
)
self._inputs = replay_inputs
if replay_inputs:
self._interpolate_inputs(replay_inputs)
if self.process == Process.hierarchical:
self._create_manager_agent()
for i in range(start_index):
stored_output = stored_outputs[i][
"output"
] # for adding context to the task
task_output = TaskOutput(
description=stored_output["description"],
agent=stored_output["agent"],
raw=stored_output["raw"],
pydantic=stored_output["pydantic"],
json_dict=stored_output["json_dict"],
output_format=stored_output["output_format"],
)
self.tasks[i].output = task_output
self._logging_color = "bold_blue"
result = self._execute_tasks(self.tasks, self.manager_agent, start_index, True)
return result
def copy(self):
"""Create a deep copy of the Crew."""
exclude = {
"id",
"_rpm_controller",
"_logger",
"_execution_span",
"_file_handler",
"_cache_handler",
"_short_term_memory",
"_long_term_memory",
"_entity_memory",
"_telemetry",
"agents",
"tasks",
}
cloned_agents = [agent.copy() for agent in self.agents]
cloned_tasks = [task.copy(cloned_agents) for task in self.tasks]
copied_data = self.model_dump(exclude=exclude)
copied_data = {k: v for k, v in copied_data.items() if v is not None}
copied_data.pop("agents", None)
copied_data.pop("tasks", None)
copied_crew = Crew(**copied_data, agents=cloned_agents, tasks=cloned_tasks)
return copied_crew
def _set_tasks_callbacks(self) -> None:
"""Sets callback for every task suing task_callback"""
@@ -384,27 +923,54 @@ class Crew(BaseModel):
def _interpolate_inputs(self, inputs: Dict[str, Any]) -> None:
"""Interpolates the inputs in the tasks and agents."""
[task.interpolate_inputs(inputs) for task in self.tasks] # type: ignore # "interpolate_inputs" of "Task" does not return a value (it only ever returns None)
[agent.interpolate_inputs(inputs) for agent in self.agents] # type: ignore # "interpolate_inputs" of "Agent" does not return a value (it only ever returns None)
[
task.interpolate_inputs(
# type: ignore # "interpolate_inputs" of "Task" does not return a value (it only ever returns None)
inputs
)
for task in self.tasks
]
# type: ignore # "interpolate_inputs" of "Agent" does not return a value (it only ever returns None)
for agent in self.agents:
agent.interpolate_inputs(inputs)
def _format_output(self, output: str) -> str:
"""Formats the output of the crew execution."""
if self.full_output:
return { # type: ignore # Incompatible return value type (got "dict[str, Sequence[str | TaskOutput | None]]", expected "str")
"final_output": output,
"tasks_outputs": [task.output for task in self.tasks if task],
}
else:
return output
def _finish_execution(self, output) -> None:
def _finish_execution(self, final_string_output: str) -> None:
if self.max_rpm:
self._rpm_controller.stop_rpm_counter()
if agentops:
agentops.end_session(
end_state="Success", end_state_reason="Finished Execution", is_auto_end=True
end_state="Success",
end_state_reason="Finished Execution",
)
self._telemetry.end_crew(self, output)
self._telemetry.end_crew(self, final_string_output)
def calculate_usage_metrics(self) -> Dict[str, int]:
"""Calculates and returns the usage metrics."""
total_usage_metrics = {
"total_tokens": 0,
"prompt_tokens": 0,
"completion_tokens": 0,
"successful_requests": 0,
}
for agent in self.agents:
if hasattr(agent, "_token_process"):
token_sum = agent._token_process.get_summary()
for key in total_usage_metrics:
total_usage_metrics[key] += token_sum.get(key, 0)
if self.manager_agent and hasattr(self.manager_agent, "_token_process"):
token_sum = self.manager_agent._token_process.get_summary()
for key in total_usage_metrics:
total_usage_metrics[key] += token_sum.get(key, 0)
return total_usage_metrics
def test(
self, n_iterations: int, model: str, inputs: Optional[Dict[str, Any]] = None
) -> None:
"""Test the crew with the given inputs."""
pass
def __repr__(self):
return f"Crew(id={self.id}, process={self.process}, number_of_agents={len(self.agents)}, number_of_tasks={len(self.tasks)})"

View File

@@ -0,0 +1 @@
from .crew_output import CrewOutput

View File

@@ -0,0 +1,50 @@
import json
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
class CrewOutput(BaseModel):
"""Class that represents the result of a crew."""
raw: str = Field(description="Raw output of crew", default="")
pydantic: Optional[BaseModel] = Field(
description="Pydantic output of Crew", default=None
)
json_dict: Optional[Dict[str, Any]] = Field(
description="JSON dict output of Crew", default=None
)
tasks_output: list[TaskOutput] = Field(
description="Output of each task", default=[]
)
token_usage: Dict[str, Any] = Field(
description="Processed token summary", default={}
)
@property
def json(self) -> Optional[str]:
if self.tasks_output[-1].output_format != OutputFormat.JSON:
raise ValueError(
"No JSON output found in the final task. Please make sure to set the output_json property in the final task in your crew."
)
return json.dumps(self.json_dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert json_output and pydantic_output to a dictionary."""
output_dict = {}
if self.json_dict:
output_dict.update(self.json_dict)
elif self.pydantic:
output_dict.update(self.pydantic.model_dump())
return output_dict
def __str__(self):
if self.pydantic:
return str(self.pydantic)
if self.json_dict:
return str(self.json_dict)
return self.raw

View File

@@ -23,3 +23,9 @@ class EntityMemory(Memory):
"""Saves an entity item into the SQLite storage."""
data = f"{item.name}({item.type}): {item.description}"
super().save(data, item.metadata)
def reset(self) -> None:
try:
self.storage.reset()
except Exception as e:
raise Exception(f"An error occurred while resetting the entity memory: {e}")

View File

@@ -30,3 +30,6 @@ class LongTermMemory(Memory):
def search(self, task: str, latest_n: int = 3) -> Dict[str, Any]:
return self.storage.load(task, latest_n) # type: ignore # BUG?: "Storage" has no attribute "load"
def reset(self) -> None:
self.storage.reset()

View File

@@ -18,8 +18,16 @@ class ShortTermMemory(Memory):
)
super().__init__(storage)
def save(self, item: ShortTermMemoryItem) -> None: # type: ignore # BUG?: Signature of "save" incompatible with supertype "Memory"
def save(self, item: ShortTermMemoryItem) -> None:
super().save(item.data, item.metadata, item.agent)
def search(self, query: str, score_threshold: float = 0.35):
return self.storage.search(query=query, score_threshold=score_threshold) # type: ignore # BUG? The reference is to the parent class, but the parent class does not have this parameters
def reset(self) -> None:
try:
self.storage.reset()
except Exception as e:
raise Exception(
f"An error occurred while resetting the short-term memory: {e}"
)

View File

@@ -9,3 +9,6 @@ class Storage:
def search(self, key: str) -> Dict[str, Any]: # type: ignore
pass
def reset(self) -> None:
pass

View File

@@ -0,0 +1,166 @@
import json
import sqlite3
from typing import Any, Dict, List, Optional
from crewai.task import Task
from crewai.utilities import Printer
from crewai.utilities.crew_json_encoder import CrewJSONEncoder
from crewai.utilities.paths import db_storage_path
class KickoffTaskOutputsSQLiteStorage:
"""
An updated SQLite storage class for kickoff task outputs storage.
"""
def __init__(
self, db_path: str = f"{db_storage_path()}/latest_kickoff_task_outputs.db"
) -> None:
self.db_path = db_path
self._printer: Printer = Printer()
self._initialize_db()
def _initialize_db(self):
"""
Initializes the SQLite database and creates LTM table
"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS latest_kickoff_task_outputs (
task_id TEXT PRIMARY KEY,
expected_output TEXT,
output JSON,
task_index INTEGER,
inputs JSON,
was_replayed BOOLEAN,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
"""
)
conn.commit()
except sqlite3.Error as e:
self._printer.print(
content=f"SAVING KICKOFF TASK OUTPUTS ERROR: An error occurred during database initialization: {e}",
color="red",
)
def add(
self,
task: Task,
output: Dict[str, Any],
task_index: int,
was_replayed: bool = False,
inputs: Dict[str, Any] = {},
):
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"""
INSERT OR REPLACE INTO latest_kickoff_task_outputs
(task_id, expected_output, output, task_index, inputs, was_replayed)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
str(task.id),
task.expected_output,
json.dumps(output, cls=CrewJSONEncoder),
task_index,
json.dumps(inputs),
was_replayed,
),
)
conn.commit()
except sqlite3.Error as e:
self._printer.print(
content=f"SAVING KICKOFF TASK OUTPUTS ERROR: An error occurred during database initialization: {e}",
color="red",
)
def update(
self,
task_index: int,
**kwargs,
):
"""
Updates an existing row in the latest_kickoff_task_outputs table based on task_index.
"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
fields = []
values = []
for key, value in kwargs.items():
fields.append(f"{key} = ?")
values.append(
json.dumps(value, cls=CrewJSONEncoder)
if isinstance(value, dict)
else value
)
query = f"UPDATE latest_kickoff_task_outputs SET {', '.join(fields)} WHERE task_index = ?"
values.append(task_index)
cursor.execute(query, tuple(values))
conn.commit()
if cursor.rowcount == 0:
self._printer.print(
f"No row found with task_index {task_index}. No update performed.",
color="red",
)
except sqlite3.Error as e:
self._printer.print(f"UPDATE KICKOFF TASK OUTPUTS ERROR: {e}", color="red")
def load(self) -> Optional[List[Dict[str, Any]]]:
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT *
FROM latest_kickoff_task_outputs
ORDER BY task_index
""")
rows = cursor.fetchall()
results = []
for row in rows:
result = {
"task_id": row[0],
"expected_output": row[1],
"output": json.loads(row[2]),
"task_index": row[3],
"inputs": json.loads(row[4]),
"was_replayed": row[5],
"timestamp": row[6],
}
results.append(result)
return results
except sqlite3.Error as e:
self._printer.print(
content=f"LOADING KICKOFF TASK OUTPUTS ERROR: An error occurred while querying kickoff task outputs: {e}",
color="red",
)
return None
def delete_all(self):
"""
Deletes all rows from the latest_kickoff_task_outputs table.
"""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM latest_kickoff_task_outputs")
conn.commit()
except sqlite3.Error as e:
self._printer.print(
content=f"ERROR: Failed to delete all kickoff task outputs: {e}",
color="red",
)

View File

@@ -103,3 +103,20 @@ class LTMSQLiteStorage:
color="red",
)
return None
def reset(
self,
) -> None:
"""Resets the LTM table with error handling."""
try:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM long_term_memories")
conn.commit()
except sqlite3.Error as e:
self._printer.print(
content=f"MEMORY ERROR: An error occurred while deleting all rows in LTM: {e}",
color="red",
)
return None

View File

@@ -2,6 +2,7 @@ import contextlib
import io
import logging
import os
import shutil
from typing import Any, Dict, List, Optional
from embedchain import App
@@ -71,13 +72,13 @@ class RAGStorage(Storage):
if embedder_config:
config["embedder"] = embedder_config
self.type = type
self.app = App.from_config(config=config)
self.app.llm = FakeLLM()
if allow_reset:
self.app.reset()
def save(self, value: Any, metadata: Dict[str, Any]) -> None: # type: ignore # BUG?: Should be save(key, value, metadata) Signature of "save" incompatible with supertype "Storage"
def save(self, value: Any, metadata: Dict[str, Any]) -> None:
self._generate_embedding(value, metadata)
def search( # type: ignore # BUG?: Signature of "search" incompatible with supertype "Storage"
@@ -102,3 +103,11 @@ class RAGStorage(Storage):
def _generate_embedding(self, text: str, metadata: Dict[str, Any]) -> Any:
with suppress_logging():
self.app.add(text, data_type="text", metadata=metadata)
def reset(self) -> None:
try:
shutil.rmtree(f"{db_storage_path()}/{self.type}")
except Exception as e:
raise Exception(
f"An error occurred while resetting the {self.type} memory: {e}"
)

View File

@@ -1,2 +1,25 @@
from .annotations import agent, crew, task
from .annotations import (
agent,
crew,
task,
output_json,
output_pydantic,
tool,
callback,
llm,
cache_handler,
)
from .crew_base import CrewBase
__all__ = [
"agent",
"crew",
"task",
"output_json",
"output_pydantic",
"tool",
"callback",
"CrewBase",
"llm",
"cache_handler",
]

View File

@@ -1,6 +1,3 @@
tasks_order = []
def memoize(func):
cache = {}
@@ -10,14 +7,21 @@ def memoize(func):
cache[key] = func(*args, **kwargs)
return cache[key]
memoized_func.__dict__.update(func.__dict__)
return memoized_func
def task(func):
if not hasattr(task, "registration_order"):
task.registration_order = []
func.is_task = True
tasks_order.append(func.__name__)
func = memoize(func)
return func
wrapped_func = memoize(func)
# Append the function name to the registration order list
task.registration_order.append(func.__name__)
return wrapped_func
def agent(func):
@@ -26,32 +30,80 @@ def agent(func):
return func
def llm(func):
func.is_llm = True
func = memoize(func)
return func
def output_json(cls):
cls.is_output_json = True
return cls
def output_pydantic(cls):
cls.is_output_pydantic = True
return cls
def tool(func):
func.is_tool = True
return memoize(func)
def callback(func):
func.is_callback = True
return memoize(func)
def cache_handler(func):
func.is_cache_handler = True
return memoize(func)
def crew(func):
def wrapper(self, *args, **kwargs):
instantiated_tasks = []
instantiated_agents = []
agent_roles = set()
# Iterate over tasks_order to maintain the defined order
for task_name in tasks_order:
possible_task = getattr(self, task_name)
if callable(possible_task):
task_instance = possible_task()
instantiated_tasks.append(task_instance)
if hasattr(task_instance, "agent"):
agent_instance = task_instance.agent
if agent_instance.role not in agent_roles:
instantiated_agents.append(agent_instance)
agent_roles.add(agent_instance.role)
all_functions = {
name: getattr(self, name)
for name in dir(self)
if callable(getattr(self, name))
}
tasks = {
name: func
for name, func in all_functions.items()
if hasattr(func, "is_task")
}
agents = {
name: func
for name, func in all_functions.items()
if hasattr(func, "is_agent")
}
# Sort tasks by their registration order
sorted_task_names = sorted(
tasks, key=lambda name: task.registration_order.index(name)
)
# Instantiate tasks in the order they were defined
for task_name in sorted_task_names:
task_instance = tasks[task_name]()
instantiated_tasks.append(task_instance)
if hasattr(task_instance, "agent"):
agent_instance = task_instance.agent
if agent_instance.role not in agent_roles:
instantiated_agents.append(agent_instance)
agent_roles.add(agent_instance.role)
# Instantiate any additional agents not already included by tasks
for attr_name in dir(self):
possible_agent = getattr(self, attr_name)
if callable(possible_agent) and hasattr(possible_agent, "is_agent"):
temp_agent_instance = possible_agent()
if temp_agent_instance.role not in agent_roles:
instantiated_agents.append(temp_agent_instance)
agent_roles.add(temp_agent_instance.role)
for agent_name in agents:
temp_agent_instance = agents[agent_name]()
if temp_agent_instance.role not in agent_roles:
instantiated_agents.append(temp_agent_instance)
agent_roles.add(temp_agent_instance.role)
self.agents = instantiated_agents
self.tasks = instantiated_tasks

View File

@@ -1,6 +1,7 @@
import inspect
import os
from pathlib import Path
from typing import Any, Callable, Dict
import yaml
from dotenv import load_dotenv
@@ -20,11 +21,6 @@ def CrewBase(cls):
base_directory = Path(frame_info.filename).parent.resolve()
break
if base_directory is None:
raise Exception(
"Unable to dynamically determine the project's base directory, you must run it from the project's root directory."
)
original_agents_config_path = getattr(
cls, "agents_config", "config/agents.yaml"
)
@@ -32,12 +28,20 @@ def CrewBase(cls):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.base_directory is None:
raise Exception(
"Unable to dynamically determine the project's base directory, you must run it from the project's root directory."
)
self.agents_config = self.load_yaml(
os.path.join(self.base_directory, self.original_agents_config_path)
)
self.tasks_config = self.load_yaml(
os.path.join(self.base_directory, self.original_tasks_config_path)
)
self.map_all_agent_variables()
self.map_all_task_variables()
@staticmethod
def load_yaml(config_path: str):
@@ -45,4 +49,138 @@ def CrewBase(cls):
# parsedContent = YamlParser.parse(file) # type: ignore # Argument 1 to "parse" has incompatible type "TextIOWrapper"; expected "YamlParser"
return yaml.safe_load(file)
def _get_all_functions(self):
return {
name: getattr(self, name)
for name in dir(self)
if callable(getattr(self, name))
}
def _filter_functions(
self, functions: Dict[str, Callable], attribute: str
) -> Dict[str, Callable]:
return {
name: func
for name, func in functions.items()
if hasattr(func, attribute)
}
def map_all_agent_variables(self) -> None:
all_functions = self._get_all_functions()
llms = self._filter_functions(all_functions, "is_llm")
tool_functions = self._filter_functions(all_functions, "is_tool")
cache_handler_functions = self._filter_functions(
all_functions, "is_cache_handler"
)
callbacks = self._filter_functions(all_functions, "is_callback")
agents = self._filter_functions(all_functions, "is_agent")
for agent_name, agent_info in self.agents_config.items():
self._map_agent_variables(
agent_name,
agent_info,
agents,
llms,
tool_functions,
cache_handler_functions,
callbacks,
)
def _map_agent_variables(
self,
agent_name: str,
agent_info: Dict[str, Any],
agents: Dict[str, Callable],
llms: Dict[str, Callable],
tool_functions: Dict[str, Callable],
cache_handler_functions: Dict[str, Callable],
callbacks: Dict[str, Callable],
) -> None:
if llm := agent_info.get("llm"):
self.agents_config[agent_name]["llm"] = llms[llm]()
if tools := agent_info.get("tools"):
self.agents_config[agent_name]["tools"] = [
tool_functions[tool]() for tool in tools
]
if function_calling_llm := agent_info.get("function_calling_llm"):
self.agents_config[agent_name]["function_calling_llm"] = agents[
function_calling_llm
]()
if step_callback := agent_info.get("step_callback"):
self.agents_config[agent_name]["step_callback"] = callbacks[
step_callback
]()
if cache_handler := agent_info.get("cache_handler"):
self.agents_config[agent_name]["cache_handler"] = (
cache_handler_functions[cache_handler]()
)
def map_all_task_variables(self) -> None:
all_functions = self._get_all_functions()
agents = self._filter_functions(all_functions, "is_agent")
tasks = self._filter_functions(all_functions, "is_task")
output_json_functions = self._filter_functions(
all_functions, "is_output_json"
)
tool_functions = self._filter_functions(all_functions, "is_tool")
callback_functions = self._filter_functions(all_functions, "is_callback")
output_pydantic_functions = self._filter_functions(
all_functions, "is_output_pydantic"
)
for task_name, task_info in self.tasks_config.items():
self._map_task_variables(
task_name,
task_info,
agents,
tasks,
output_json_functions,
tool_functions,
callback_functions,
output_pydantic_functions,
)
def _map_task_variables(
self,
task_name: str,
task_info: Dict[str, Any],
agents: Dict[str, Callable],
tasks: Dict[str, Callable],
output_json_functions: Dict[str, Callable],
tool_functions: Dict[str, Callable],
callback_functions: Dict[str, Callable],
output_pydantic_functions: Dict[str, Callable],
) -> None:
if context_list := task_info.get("context"):
self.tasks_config[task_name]["context"] = [
tasks[context_task_name]() for context_task_name in context_list
]
if tools := task_info.get("tools"):
self.tasks_config[task_name]["tools"] = [
tool_functions[tool]() for tool in tools
]
if agent_name := task_info.get("agent"):
self.tasks_config[task_name]["agent"] = agents[agent_name]()
if output_json := task_info.get("output_json"):
self.tasks_config[task_name]["output_json"] = output_json_functions[
output_json
]
if output_pydantic := task_info.get("output_pydantic"):
self.tasks_config[task_name]["output_pydantic"] = (
output_pydantic_functions[output_pydantic]
)
if callbacks := task_info.get("callbacks"):
self.tasks_config[task_name]["callbacks"] = [
callback_functions[callback]() for callback in callbacks
]
return WrappedClass

View File

@@ -1,16 +1,25 @@
import json
import os
import re
import threading
import uuid
from typing import Any, Dict, List, Optional, Type
from concurrent.futures import Future
from copy import copy
from hashlib import md5
from typing import Any, Dict, List, Optional, Tuple, Type, Union
from langchain_openai import ChatOpenAI
from opentelemetry.trace import Span
from pydantic import UUID4, BaseModel, Field, field_validator, model_validator
from pydantic_core import PydanticCustomError
from crewai.agent import Agent
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.utilities import I18N, Converter, ConverterError, Printer
from crewai.telemetry.telemetry import Telemetry
from crewai.utilities.converter import Converter, ConverterError
from crewai.utilities.i18n import I18N
from crewai.utilities.printer import Printer
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
@@ -41,7 +50,6 @@ class Task(BaseModel):
tools_errors: int = 0
delegations: int = 0
i18n: I18N = I18N()
thread: Optional[threading.Thread] = None
prompt_context: Optional[str] = None
description: str = Field(description="Description of the actual task.")
expected_output: str = Field(
@@ -54,7 +62,7 @@ class Task(BaseModel):
callback: Optional[Any] = Field(
description="Callback to be executed after the task is completed.", default=None
)
agent: Optional[Agent] = Field(
agent: Optional[BaseAgent] = Field(
description="Agent responsible for execution the task.", default=None
)
context: Optional[List["Task"]] = Field(
@@ -93,9 +101,16 @@ class Task(BaseModel):
description="Whether the task should have a human review the final answer of the agent",
default=False,
)
converter_cls: Optional[Type[Converter]] = Field(
description="A converter class used to export structured output",
default=None,
)
_telemetry: Telemetry
_execution_span: Span | None = None
_original_description: str | None = None
_original_expected_output: str | None = None
_thread: threading.Thread | None = None
def __init__(__pydantic_self__, **data):
config = data.pop("config", {})
@@ -117,6 +132,12 @@ class Task(BaseModel):
return value[1:]
return value
@model_validator(mode="after")
def set_private_attrs(self) -> "Task":
"""Set private attributes."""
self._telemetry = Telemetry()
return self
@model_validator(mode="after")
def set_attributes_based_on_config(self) -> "Task":
"""Set attributes based on the agent configuration."""
@@ -144,73 +165,102 @@ class Task(BaseModel):
)
return self
def execute( # type: ignore # Missing return statement
def execute_sync(
self,
agent: Agent | None = None,
agent: Optional[BaseAgent] = None,
context: Optional[str] = None,
tools: Optional[List[Any]] = None,
) -> str:
"""Execute the task.
) -> TaskOutput:
"""Execute the task synchronously."""
return self._execute_core(agent, context, tools)
Returns:
Output of the task.
"""
@property
def key(self) -> str:
description = self._original_description or self.description
expected_output = self._original_expected_output or self.expected_output
source = [description, expected_output]
return md5("|".join(source).encode()).hexdigest()
def execute_async(
self,
agent: BaseAgent | None = None,
context: Optional[str] = None,
tools: Optional[List[Any]] = None,
) -> Future[TaskOutput]:
"""Execute the task asynchronously."""
future: Future[TaskOutput] = Future()
threading.Thread(
target=self._execute_task_async, args=(agent, context, tools, future)
).start()
return future
def _execute_task_async(
self,
agent: Optional[BaseAgent],
context: Optional[str],
tools: Optional[List[Any]],
future: Future[TaskOutput],
) -> None:
"""Execute the task asynchronously with context handling."""
result = self._execute_core(agent, context, tools)
future.set_result(result)
def _execute_core(
self,
agent: Optional[BaseAgent],
context: Optional[str],
tools: Optional[List[Any]],
) -> TaskOutput:
"""Run the core execution logic of the task."""
agent = agent or self.agent
self.agent = agent
if not agent:
raise Exception(
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical."
)
if self.context:
# type: ignore # Incompatible types in assignment (expression has type "list[Never]", variable has type "str | None")
context = []
for task in self.context:
if task.async_execution:
task.thread.join() # type: ignore # Item "None" of "Thread | None" has no attribute "join"
if task and task.output:
# type: ignore # Item "str" of "str | None" has no attribute "append"
context.append(task.output.raw_output)
# type: ignore # Argument 1 to "join" of "str" has incompatible type "str | None"; expected "Iterable[str]"
context = "\n".join(context)
self._execution_span = self._telemetry.task_started(crew=agent.crew, task=self)
self.prompt_context = context
tools = tools or self.tools
tools = tools or self.tools or []
if self.async_execution:
self.thread = threading.Thread(
target=self._execute, args=(agent, self, context, tools)
)
self.thread.start()
else:
result = self._execute(
task=self,
agent=agent,
context=context,
tools=tools,
)
return result
def _execute(self, agent, task, context, tools):
result = agent.execute_task(
task=task,
task=self,
context=context,
tools=tools,
)
exported_output = self._export_output(result)
pydantic_output, json_output = self._export_output(result)
self.output = TaskOutput(
task_output = TaskOutput(
description=self.description,
exported_output=exported_output,
raw_output=result,
raw=result,
pydantic=pydantic_output,
json_dict=json_output,
agent=agent.role,
output_format=self._get_output_format(),
)
self.output = task_output
if self.callback:
self.callback(self.output)
return exported_output
if self._execution_span:
self._telemetry.task_ended(self._execution_span, self, agent.crew)
self._execution_span = None
if self.output_file:
content = (
json_output
if json_output
else pydantic_output.model_dump_json()
if pydantic_output
else result
)
self._save_file(content)
return task_output
def prompt(self) -> str:
"""Prompt the task.
@@ -245,77 +295,159 @@ class Task(BaseModel):
"""Increment the delegations counter."""
self.delegations += 1
def _export_output(self, result: str) -> Any:
exported_result = result
instructions = "I'm gonna convert this raw text into valid JSON."
def copy(self, agents: List["BaseAgent"]) -> "Task":
"""Create a deep copy of the Task."""
exclude = {
"id",
"agent",
"context",
"tools",
}
copied_data = self.model_dump(exclude=exclude)
copied_data = {k: v for k, v in copied_data.items() if v is not None}
cloned_context = (
[task.copy(agents) for task in self.context] if self.context else None
)
def get_agent_by_role(role: str) -> Union["BaseAgent", None]:
return next((agent for agent in agents if agent.role == role), None)
cloned_agent = get_agent_by_role(self.agent.role) if self.agent else None
cloned_tools = copy(self.tools) if self.tools else []
copied_task = Task(
**copied_data,
context=cloned_context,
agent=cloned_agent,
tools=cloned_tools,
)
return copied_task
def _create_converter(self, *args, **kwargs) -> Converter:
"""Create a converter instance."""
if self.agent and not self.converter_cls:
converter = self.agent.get_output_converter(*args, **kwargs)
elif self.converter_cls:
converter = self.converter_cls(*args, **kwargs)
if not converter:
raise Exception("No output converter found or set.")
return converter
def _export_output(
self, result: str
) -> Tuple[Optional[BaseModel], Optional[Dict[str, Any]]]:
pydantic_output: Optional[BaseModel] = None
json_output: Optional[Dict[str, Any]] = None
if self.output_pydantic or self.output_json:
model = self.output_pydantic or self.output_json
model_output = self._convert_to_model(result)
pydantic_output = (
model_output if isinstance(model_output, BaseModel) else None
)
if isinstance(model_output, str):
try:
json_output = json.loads(model_output)
except json.JSONDecodeError:
json_output = None
else:
json_output = model_output if isinstance(model_output, dict) else None
# try to convert task_output directly to pydantic/json
return pydantic_output, json_output
def _convert_to_model(self, result: str) -> Union[dict, BaseModel, str]:
model = self.output_pydantic or self.output_json
if model is None:
return result
try:
return self._validate_model(result, model)
except Exception:
return self._handle_partial_json(result, model)
def _validate_model(
self, result: str, model: Type[BaseModel]
) -> Union[dict, BaseModel]:
exported_result = model.model_validate_json(result)
if self.output_json:
return exported_result.model_dump()
return exported_result
def _handle_partial_json(
self, result: str, model: Type[BaseModel]
) -> Union[dict, BaseModel, str]:
match = re.search(r"({.*})", result, re.DOTALL)
if match:
try:
# type: ignore # Item "None" of "type[BaseModel] | None" has no attribute "model_validate_json"
exported_result = model.model_validate_json(result)
exported_result = model.model_validate_json(match.group(0))
if self.output_json:
return exported_result.model_dump() # type: ignore # "str" has no attribute "model_dump"
return exported_result.model_dump()
return exported_result
except Exception:
# sometimes the response contains valid JSON in the middle of text
match = re.search(r"({.*})", result, re.DOTALL)
if match:
try:
# type: ignore # Item "None" of "type[BaseModel] | None" has no attribute "model_validate_json"
exported_result = model.model_validate_json(match.group(0))
if self.output_json:
return exported_result.model_dump() # type: ignore # "str" has no attribute "model_dump"
return exported_result
except Exception:
pass
pass
# type: ignore # Item "None" of "Agent | None" has no attribute "function_calling_llm"
llm = self.agent.function_calling_llm or self.agent.llm
return self._convert_with_instructions(result, model)
if not self._is_gpt(llm):
# type: ignore # Argument "model" to "PydanticSchemaParser" has incompatible type "type[BaseModel] | None"; expected "type[BaseModel]"
model_schema = PydanticSchemaParser(model=model).get_schema()
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
def _convert_with_instructions(
self, result: str, model: Type[BaseModel]
) -> Union[dict, BaseModel, str]:
llm = self.agent.function_calling_llm or self.agent.llm # type: ignore # Item "None" of "BaseAgent | None" has no attribute "function_calling_llm"
instructions = self._get_conversion_instructions(model, llm)
converter = Converter(
llm=llm, text=result, model=model, instructions=instructions
converter = self._create_converter(
llm=llm, text=result, model=model, instructions=instructions
)
exported_result = (
converter.to_pydantic() if self.output_pydantic else converter.to_json()
)
if isinstance(exported_result, ConverterError):
Printer().print(
content=f"{exported_result.message} Using raw output instead.",
color="red",
)
if self.output_pydantic:
exported_result = converter.to_pydantic()
elif self.output_json:
exported_result = converter.to_json()
if isinstance(exported_result, ConverterError):
Printer().print(
content=f"{exported_result.message} Using raw output instead.",
color="red",
)
exported_result = result
if self.output_file:
content = (
exported_result if not self.output_pydantic else exported_result.json() # type: ignore # "str" has no attribute "json"
)
self._save_file(content)
return result
return exported_result
def _get_output_format(self) -> OutputFormat:
if self.output_json:
return OutputFormat.JSON
if self.output_pydantic:
return OutputFormat.PYDANTIC
return OutputFormat.RAW
def _get_conversion_instructions(self, model: Type[BaseModel], llm: Any) -> str:
instructions = "I'm gonna convert this raw text into valid JSON."
if not self._is_gpt(llm):
model_schema = PydanticSchemaParser(model=model).get_schema()
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
return instructions
def _save_output(self, content: str) -> None:
if not self.output_file:
raise Exception("Output file path is not set.")
directory = os.path.dirname(self.output_file)
if directory and not os.path.exists(directory):
os.makedirs(directory)
with open(self.output_file, "w", encoding="utf-8") as file:
file.write(content)
def _is_gpt(self, llm) -> bool:
return isinstance(llm, ChatOpenAI) and llm.openai_api_base is None
def _save_file(self, result: Any) -> None:
# type: ignore # Value of type variable "AnyOrLiteralStr" of "dirname" cannot be "str | None"
directory = os.path.dirname(self.output_file)
directory = os.path.dirname(self.output_file) # type: ignore # Value of type variable "AnyOrLiteralStr" of "dirname" cannot be "str | None"
if directory and not os.path.exists(directory):
os.makedirs(directory)
# type: ignore # Argument 1 to "open" has incompatible type "str | None"; expected "int | str | bytes | PathLike[str] | PathLike[bytes]"
with open(self.output_file, "w", encoding='utf-8') as file:
with open(self.output_file, "w", encoding="utf-8") as file: # type: ignore # Argument 1 to "open" has incompatible type "str | None"; expected "int | str | bytes | PathLike[str] | PathLike[bytes]"
file.write(result)
return None

View File

@@ -0,0 +1,4 @@
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
__all__ = ["OutputFormat", "TaskOutput"]

View File

@@ -0,0 +1,47 @@
from typing import Any, Callable
from pydantic import Field
from crewai.task import Task
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
class ConditionalTask(Task):
"""
A task that can be conditionally executed based on the output of another task.
Note: This cannot be the only task you have in your crew and cannot be the first since its needs context from the previous task.
"""
condition: Callable[[TaskOutput], bool] = Field(
default=None,
description="Maximum number of retries for an agent to execute a task when an error occurs.",
)
def __init__(
self,
condition: Callable[[Any], bool],
**kwargs,
):
super().__init__(**kwargs)
self.condition = condition
def should_execute(self, context: TaskOutput) -> bool:
"""
Determines whether the conditional task should be executed based on the provided context.
Args:
context (Any): The context or output from the previous task that will be evaluated by the condition.
Returns:
bool: True if the task should be executed, False otherwise.
"""
return self.condition(context)
def get_skipped_task_output(self):
return TaskOutput(
description=self.description,
raw="",
agent=self.agent.role if self.agent else "",
output_format=OutputFormat.RAW,
)

View File

@@ -0,0 +1,9 @@
from enum import Enum
class OutputFormat(str, Enum):
"""Enum that represents the output format of a task."""
JSON = "json"
PYDANTIC = "pydantic"
RAW = "raw"

View File

@@ -1,24 +1,60 @@
from typing import Optional, Union
import json
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field, model_validator
from crewai.tasks.output_format import OutputFormat
class TaskOutput(BaseModel):
"""Class that represents the result of a task."""
description: str = Field(description="Description of the task")
summary: Optional[str] = Field(description="Summary of the task", default=None)
exported_output: Union[str, BaseModel] = Field(
description="Output of the task", default=None
raw: str = Field(description="Raw output of the task", default="")
pydantic: Optional[BaseModel] = Field(
description="Pydantic output of task", default=None
)
json_dict: Optional[Dict[str, Any]] = Field(
description="JSON dictionary of task", default=None
)
agent: str = Field(description="Agent that executed the task")
raw_output: str = Field(description="Result of the task")
output_format: OutputFormat = Field(
description="Output format of the task", default=OutputFormat.RAW
)
@model_validator(mode="after")
def set_summary(self):
"""Set the summary field based on the description."""
excerpt = " ".join(self.description.split(" ")[:10])
self.summary = f"{excerpt}..."
return self
def result(self):
return self.exported_output
@property
def json(self) -> Optional[str]:
if self.output_format != OutputFormat.JSON:
raise ValueError(
"""
Invalid output format requested.
If you would like to access the JSON output,
please make sure to set the output_json property for the task
"""
)
return json.dumps(self.json_dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert json_output and pydantic_output to a dictionary."""
output_dict = {}
if self.json_dict:
output_dict.update(self.json_dict)
elif self.pydantic:
output_dict.update(self.pydantic.model_dump())
return output_dict
def __str__(self) -> str:
if self.pydantic:
return str(self.pydantic)
if self.json_dict:
return str(self.json_dict)
return self.raw

View File

@@ -1,8 +1,10 @@
from __future__ import annotations
import asyncio
import json
import os
import platform
from typing import Any
from typing import TYPE_CHECKING, Any
import pkg_resources
from opentelemetry import trace
@@ -10,7 +12,11 @@ from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExport
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import Status, StatusCode
from opentelemetry.trace import Span, Status, StatusCode
if TYPE_CHECKING:
from crewai.crew import Crew
from crewai.task import Task
class Telemetry:
@@ -74,7 +80,7 @@ class Telemetry:
self.ready = False
self.trace_set = False
def crew_creation(self, crew):
def crew_creation(self, crew: Crew, inputs: dict[str, Any] | None):
"""Records the creation of a crew."""
if self.ready:
try:
@@ -86,11 +92,9 @@ class Telemetry:
pkg_resources.get_distribution("crewai").version,
)
self._add_attribute(span, "python_version", platform.python_version())
self._add_attribute(span, "crew_key", crew.key)
self._add_attribute(span, "crew_id", str(crew.id))
self._add_attribute(span, "crew_process", crew.process)
self._add_attribute(
span, "crew_language", crew.prompt_file if crew.i18n else "None"
)
self._add_attribute(span, "crew_memory", crew.memory)
self._add_attribute(span, "crew_number_of_tasks", len(crew.tasks))
self._add_attribute(span, "crew_number_of_agents", len(crew.agents))
@@ -100,8 +104,11 @@ class Telemetry:
json.dumps(
[
{
"key": agent.key,
"id": str(agent.id),
"role": agent.role,
"goal": agent.goal,
"backstory": agent.backstory,
"verbose?": agent.verbose,
"max_iter": agent.max_iter,
"max_rpm": agent.max_rpm,
@@ -109,7 +116,7 @@ class Telemetry:
"llm": json.dumps(self._safe_llm_attributes(agent.llm)),
"delegation_enabled?": agent.allow_delegation,
"tools_names": [
tool.name.casefold() for tool in agent.tools
tool.name.casefold() for tool in agent.tools or []
],
}
for agent in crew.agents
@@ -122,11 +129,21 @@ class Telemetry:
json.dumps(
[
{
"key": task.key,
"id": str(task.id),
"description": task.description,
"expected_output": task.expected_output,
"async_execution?": task.async_execution,
"human_input?": task.human_input,
"agent_role": task.agent.role if task.agent else "None",
"agent_key": task.agent.key if task.agent else None,
"context": (
[task.description for task in task.context]
if task.context
else None
),
"tools_names": [
tool.name.casefold() for tool in task.tools
tool.name.casefold() for tool in task.tools or []
],
}
for task in crew.tasks
@@ -138,6 +155,71 @@ class Telemetry:
self._add_attribute(span, "platform_system", platform.system())
self._add_attribute(span, "platform_version", platform.version())
self._add_attribute(span, "cpus", os.cpu_count())
if crew.share_crew:
self._add_attribute(
span, "crew_inputs", json.dumps(inputs) if inputs else None
)
span.set_status(Status(StatusCode.OK))
span.end()
except Exception:
pass
def task_started(self, crew: Crew, task: Task) -> Span | None:
"""Records task started in a crew."""
if self.ready:
try:
tracer = trace.get_tracer("crewai.telemetry")
created_span = tracer.start_span("Task Created")
self._add_attribute(created_span, "crew_key", crew.key)
self._add_attribute(created_span, "crew_id", str(crew.id))
self._add_attribute(created_span, "task_key", task.key)
self._add_attribute(created_span, "task_id", str(task.id))
if crew.share_crew:
self._add_attribute(
created_span, "formatted_description", task.description
)
self._add_attribute(
created_span, "formatted_expected_output", task.expected_output
)
created_span.set_status(Status(StatusCode.OK))
created_span.end()
span = tracer.start_span("Task Execution")
self._add_attribute(span, "crew_key", crew.key)
self._add_attribute(span, "crew_id", str(crew.id))
self._add_attribute(span, "task_key", task.key)
self._add_attribute(span, "task_id", str(task.id))
if crew.share_crew:
self._add_attribute(span, "formatted_description", task.description)
self._add_attribute(
span, "formatted_expected_output", task.expected_output
)
return span
except Exception:
pass
return None
def task_ended(self, span: Span, task: Task, crew: Crew):
"""Records task execution in a crew."""
if self.ready:
try:
if crew.share_crew:
self._add_attribute(
span,
"task_output",
task.output.raw if task.output else "",
)
span.set_status(Status(StatusCode.OK))
span.end()
except Exception:
@@ -207,10 +289,12 @@ class Telemetry:
except Exception:
pass
def crew_execution_span(self, crew):
def crew_execution_span(self, crew: Crew, inputs: dict[str, Any] | None):
"""Records the complete execution of a crew.
This is only collected if the user has opted-in to share the crew.
"""
self.crew_creation(crew, inputs)
if (self.ready) and (crew.share_crew):
try:
tracer = trace.get_tracer("crewai.telemetry")
@@ -220,13 +304,18 @@ class Telemetry:
"crewai_version",
pkg_resources.get_distribution("crewai").version,
)
self._add_attribute(span, "crew_key", crew.key)
self._add_attribute(span, "crew_id", str(crew.id))
self._add_attribute(
span, "crew_inputs", json.dumps(inputs) if inputs else None
)
self._add_attribute(
span,
"crew_agents",
json.dumps(
[
{
"key": agent.key,
"id": str(agent.id),
"role": agent.role,
"goal": agent.goal,
@@ -238,7 +327,7 @@ class Telemetry:
"llm": json.dumps(self._safe_llm_attributes(agent.llm)),
"delegation_enabled?": agent.allow_delegation,
"tools_names": [
tool.name.casefold() for tool in agent.tools
tool.name.casefold() for tool in agent.tools or []
],
}
for agent in crew.agents
@@ -253,16 +342,18 @@ class Telemetry:
{
"id": str(task.id),
"description": task.description,
"expected_output": task.expected_output,
"async_execution?": task.async_execution,
"output": task.expected_output,
"human_input?": task.human_input,
"agent_role": task.agent.role if task.agent else "None",
"agent_key": task.agent.key if task.agent else None,
"context": (
[task.description for task in task.context]
if task.context
else "None"
else None
),
"tools_names": [
tool.name.casefold() for tool in task.tools
tool.name.casefold() for tool in task.tools or []
],
}
for task in crew.tasks
@@ -273,7 +364,7 @@ class Telemetry:
except Exception:
pass
def end_crew(self, crew, output):
def end_crew(self, crew, final_string_output):
if (self.ready) and (crew.share_crew):
try:
self._add_attribute(
@@ -281,7 +372,9 @@ class Telemetry:
"crewai_version",
pkg_resources.get_distribution("crewai").version,
)
self._add_attribute(crew._execution_span, "crew_output", output)
self._add_attribute(
crew._execution_span, "crew_output", final_string_output
)
self._add_attribute(
crew._execution_span,
"crew_tasks_output",

View File

@@ -1,86 +1,25 @@
from typing import List, Union
from langchain.tools import StructuredTool
from pydantic import BaseModel, Field
from crewai.agent import Agent
from crewai.task import Task
from crewai.utilities import I18N
from crewai.agents.agent_builder.utilities.base_agent_tool import BaseAgentTools
class AgentTools(BaseModel):
class AgentTools(BaseAgentTools):
"""Default tools around agent delegation"""
agents: List[Agent] = Field(description="List of agents in this crew.")
i18n: I18N = Field(default=I18N(), description="Internationalization settings.")
def tools(self):
coworkers = ", ".join([f"{agent.role}" for agent in self.agents])
tools = [
StructuredTool.from_function(
func=self.delegate_work,
name="Delegate work to co-worker",
name="Delegate work to coworker",
description=self.i18n.tools("delegate_work").format(
coworkers=f"[{', '.join([f'{agent.role}' for agent in self.agents])}]"
coworkers=coworkers
),
),
StructuredTool.from_function(
func=self.ask_question,
name="Ask question to co-worker",
description=self.i18n.tools("ask_question").format(
coworkers=f"[{', '.join([f'{agent.role}' for agent in self.agents])}]"
),
name="Ask question to coworker",
description=self.i18n.tools("ask_question").format(coworkers=coworkers),
),
]
return tools
def delegate_work(
self, task: str, context: str, coworker: Union[str, None] = None, **kwargs
):
"""Useful to delegate a specific task to a co-worker passing all necessary context and names."""
coworker = coworker or kwargs.get("co_worker") or kwargs.get("co-worker")
if coworker is not None:
is_list = coworker.startswith("[") and coworker.endswith("]")
if is_list:
coworker = coworker[1:-1].split(",")[0]
return self._execute(coworker, task, context)
def ask_question(
self, question: str, context: str, coworker: Union[str, None] = None, **kwargs
):
"""Useful to ask a question, opinion or take from a co-worker passing all necessary context and names."""
coworker = coworker or kwargs.get("co_worker") or kwargs.get("co-worker")
if coworker is not None:
is_list = coworker.startswith("[") and coworker.endswith("]")
if is_list:
coworker = coworker[1:-1].split(",")[0]
return self._execute(coworker, question, context)
def _execute(self, agent, task, context):
"""Execute the command."""
try:
agent = [
available_agent
for available_agent in self.agents
if available_agent.role.casefold().strip() == agent.casefold().strip()
]
except Exception as _:
return self.i18n.errors("agent_tool_unexsiting_coworker").format(
coworkers="\n".join(
[f"- {agent.role.casefold()}" for agent in self.agents]
)
)
if not agent:
return self.i18n.errors("agent_tool_unexsiting_coworker").format(
coworkers="\n".join(
[f"- {agent.role.casefold()}" for agent in self.agents]
)
)
agent = agent[0]
task = Task(
description=task,
agent=agent,
expected_output="Your best answer to your co-worker asking you this, accounting for the context shared.",
)
return agent.execute_task(task, context)

View File

@@ -8,7 +8,7 @@ from pydantic.v1 import BaseModel, Field
class ToolCalling(BaseModel):
tool_name: str = Field(..., description="The name of the tool to be called.")
arguments: Optional[Dict[str, Any]] = Field(
..., description="A dictinary of arguments to be passed to the tool."
..., description="A dictionary of arguments to be passed to the tool."
)
@@ -17,5 +17,5 @@ class InstructorToolCalling(PydanticBaseModel):
..., description="The name of the tool to be called."
)
arguments: Optional[Dict[str, Any]] = PydanticField(
..., description="A dictinary of arguments to be passed to the tool."
..., description="A dictionary of arguments to be passed to the tool."
)

View File

@@ -11,11 +11,10 @@ from crewai.telemetry import Telemetry
from crewai.tools.tool_calling import InstructorToolCalling, ToolCalling
from crewai.utilities import I18N, Converter, ConverterError, Printer
agentops = None
try:
import agentops
except ImportError:
pass
agentops = None
OPENAI_BIGGER_MODELS = ["gpt-4"]
@@ -51,6 +50,7 @@ class ToolUsage:
tools_names: str,
task: Any,
function_calling_llm: Any,
agent: Any,
action: Any,
) -> None:
self._i18n: I18N = I18N()
@@ -59,6 +59,7 @@ class ToolUsage:
self._run_attempts: int = 1
self._max_parsing_attempts: int = 3
self._remember_format_after_usages: int = 3
self.agent = agent
self.tools_description = tools_description
self.tools_names = tools_names
self.tools_handler = tools_handler
@@ -97,18 +98,16 @@ class ToolUsage:
self.task.increment_tools_errors()
self._printer.print(content=f"\n\n{error}\n", color="red")
return error
# type: ignore # BUG?: "_use" of "ToolUsage" does not return a value (it only ever returns None)
return f"{self._use(tool_string=tool_string, tool=tool, calling=calling)}"
return f"{self._use(tool_string=tool_string, tool=tool, calling=calling)}" # type: ignore # BUG?: "_use" of "ToolUsage" does not return a value (it only ever returns None)
def _use(
self,
tool_string: str,
tool: BaseTool,
calling: Union[ToolCalling, InstructorToolCalling],
) -> None: # TODO: Fix this return type
) -> str: # TODO: Fix this return type
tool_event = agentops.ToolEvent(name=calling.tool_name) if agentops else None
# type: ignore # _check_tool_repeated_usage of "ToolUsage" does not return a value (it only ever returns None)
if self._check_tool_repeated_usage(calling=calling):
if self._check_tool_repeated_usage(calling=calling): # type: ignore # _check_tool_repeated_usage of "ToolUsage" does not return a value (it only ever returns None)
try:
result = self._i18n.errors("task_repeated_usage").format(
tool_names=self.tools_names
@@ -119,49 +118,45 @@ class ToolUsage:
tool_name=tool.name,
attempts=self._run_attempts,
)
# type: ignore # "_format_result" of "ToolUsage" does not return a value (it only ever returns None)
result = self._format_result(result=result)
return result # type: ignore # Fix the reutrn type of this function
result = self._format_result(result=result) # type: ignore # "_format_result" of "ToolUsage" does not return a value (it only ever returns None)
return result # type: ignore # Fix the return type of this function
except Exception:
self.task.increment_tools_errors()
# type: ignore # Incompatible types in assignment (expression has type "None", variable has type "str")
result = None
result = None # type: ignore # Incompatible types in assignment (expression has type "None", variable has type "str")
if self.tools_handler.cache:
result = self.tools_handler.cache.read( # type: ignore # Incompatible types in assignment (expression has type "str | None", variable has type "str")
tool=calling.tool_name, input=calling.arguments
)
if not result:
original_tool = next(
(ot for ot in self.original_tools if ot.name == tool.name), None
)
if result is None: #! finecwg: if not result --> if result is None
try:
if calling.tool_name in [
"Delegate work to co-worker",
"Ask question to co-worker",
"Delegate work to coworker",
"Ask question to coworker",
]:
self.task.increment_delegations()
if calling.arguments:
try:
# type: ignore # Item "None" of "type[BaseModel] | None" has no attribute "schema"
acceptable_args = tool.args_schema.schema()["properties"].keys()
acceptable_args = tool.args_schema.schema()["properties"].keys() # type: ignore # Item "None" of "type[BaseModel] | None" has no attribute "schema"
arguments = {
k: v
for k, v in calling.arguments.items()
if k in acceptable_args
}
result = tool._run(**arguments)
result = tool.invoke(input=arguments)
except Exception:
if tool.args_schema:
arguments = calling.arguments
result = tool._run(**arguments)
else:
# type: ignore # Incompatible types in assignment (expression has type "dict_values[str, Any]", variable has type "dict[str, Any]")
arguments = calling.arguments.values()
result = tool._run(*arguments)
arguments = calling.arguments
result = tool.invoke(input=arguments)
else:
result = tool._run()
result = tool.invoke(input={})
except Exception as e:
self._run_attempts += 1
if self._run_attempts > self._max_parsing_attempts:
@@ -185,9 +180,6 @@ class ToolUsage:
if self.tools_handler:
should_cache = True
original_tool = next(
(ot for ot in self.original_tools if ot.name == tool.name), None
)
if (
hasattr(original_tool, "cache_function")
and original_tool.cache_function # type: ignore # Item "None" of "Any | None" has no attribute "cache_function"
@@ -208,15 +200,28 @@ class ToolUsage:
tool_name=tool.name,
attempts=self._run_attempts,
)
# type: ignore # "_format_result" of "ToolUsage" does not return a value (it only ever returns None)
result = self._format_result(result=result)
result = self._format_result(result=result) # type: ignore # "_format_result" of "ToolUsage" does not return a value (it only ever returns None)
data = {
"result": result,
"tool_name": tool.name,
"tool_args": calling.arguments,
}
if (
hasattr(original_tool, "result_as_answer")
and original_tool.result_as_answer # type: ignore # Item "None" of "Any | None" has no attribute "cache_function"
):
result_as_answer = original_tool.result_as_answer # type: ignore # Item "None" of "Any | None" has no attribute "result_as_answer"
data["result_as_answer"] = result_as_answer
self.agent.tools_results.append(data)
return result # type: ignore # No return value expected
def _format_result(self, result: Any) -> None:
self.task.used_tools += 1
if self._should_remember_format(): # type: ignore # "_should_remember_format" of "ToolUsage" does not return a value (it only ever returns None)
# type: ignore # "_remember_format" of "ToolUsage" does not return a value (it only ever returns None)
result = self._remember_format(result=result)
result = self._remember_format(result=result) # type: ignore # "_remember_format" of "ToolUsage" does not return a value (it only ever returns None)
return result
def _should_remember_format(self) -> None:
@@ -311,7 +316,7 @@ class ToolUsage:
Example:
{"tool_name": "tool name", "arguments": {"arg_name1": "value", "arg_name2": 2}}""",
),
max_attemps=1,
max_attempts=1,
)
calling = converter.to_pydantic()

View File

@@ -16,12 +16,12 @@
"format_without_tools": "\nSorry, I didn't use the right format. I MUST either use a tool (among the available ones), OR give my best final answer.\nI just remembered the expected format I must follow:\n\nQuestion: the input question you must answer\nThought: you should always think about what to do\nAction: the action to take, should be one of [{tool_names}]\nAction Input: the input to the action\nObservation: the result of the action\n... (this Thought/Action/Action Input/Observation can repeat N times)\nThought: I now can give a great answer\nFinal Answer: my best complete final answer to the task\nYour final answer must be the great and the most complete as possible, it must be outcome described\n\n",
"task_with_context": "{task}\n\nThis is the context you're working with:\n{context}",
"expected_output": "\nThis is the expect criteria for your final answer: {expected_output} \n you MUST return the actual complete content as the final answer, not a summary.",
"human_feedback": "You got human feedback on your work, re-avaluate it and give a new Final Answer when ready.\n {human_feedback}",
"getting_input": "This is the agent final answer: {final_answer}\nPlease provide a feedback: "
"human_feedback": "You got human feedback on your work, re-evaluate it and give a new Final Answer when ready.\n {human_feedback}",
"getting_input": "This is the agent's final answer: {final_answer}\nPlease provide feedback: "
},
"errors": {
"force_final_answer": "Tool won't be use because it's time to give your final answer. Don't use tools and just your absolute BEST Final answer.",
"agent_tool_unexsiting_coworker": "\nError executing tool. Co-worker mentioned not found, it must to be one of the following options:\n{coworkers}\n",
"agent_tool_unexsiting_coworker": "\nError executing tool. coworker mentioned not found, it must be one of the following options:\n{coworkers}\n",
"task_repeated_usage": "I tried reusing the same input, I must stop using this action input. I'll try something else instead.\n\n",
"tool_usage_error": "I encountered an error: {error}",
"tool_arguments_error": "Error: the Action Input is not a valid key, value dictionary.",
@@ -29,7 +29,7 @@
"tool_usage_exception": "I encountered an error while trying to use the tool. This was the error: {error}.\n Tool {tool} accepts these inputs: {tool_inputs}"
},
"tools": {
"delegate_work": "Delegate a specific task to one of the following co-workers: {coworkers}\nThe input to this tool should be the co-worker, the task you want them to do, and ALL necessary context to execute the task, they know nothing about the task, so share absolute everything you know, don't reference things but instead explain them.",
"ask_question": "Ask a specific question to one of the following co-workers: {coworkers}\nThe input to this tool should be the co-worker, the question you have for them, and ALL necessary context to ask the question properly, they know nothing about the question, so share absolute everything you know, don't reference things but instead explain them."
"delegate_work": "Delegate a specific task to one of the following coworkers: {coworkers}\nThe input to this tool should be the coworker, the task you want them to do, and ALL necessary context to execute the task, they know nothing about the task, so share absolute everything you know, don't reference things but instead explain them.",
"ask_question": "Ask a specific question to one of the following coworkers: {coworkers}\nThe input to this tool should be the coworker, the question you have for them, and ALL necessary context to ask the question properly, they know nothing about the question, so share absolute everything you know, don't reference things but instead explain them."
}
}

View File

@@ -1,9 +1,22 @@
from .converter import Converter, ConverterError
from .file_handler import FileHandler
from .i18n import I18N
from .instructor import Instructor
from .logger import Logger
from .parser import YamlParser
from .printer import Printer
from .prompts import Prompts
from .rpm_controller import RPMController
from .fileHandler import FileHandler
from .parser import YamlParser
__all__ = [
"Converter",
"ConverterError",
"FileHandler",
"I18N",
"Instructor",
"Logger",
"Printer",
"Prompts",
"RPMController",
"YamlParser",
]

View File

@@ -0,0 +1,2 @@
TRAINING_DATA_FILE = "training_data.pkl"
TRAINED_AGENTS_DATA_FILE = "trained_agents_data.pkl"

View File

@@ -1,9 +1,9 @@
import json
from typing import Any, Optional
from langchain.schema import HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field, PrivateAttr, model_validator
from crewai.agents.agent_builder.utilities.base_output_converter import OutputConverter
class ConverterError(Exception):
@@ -14,33 +14,18 @@ class ConverterError(Exception):
self.message = message
class Converter(BaseModel):
class Converter(OutputConverter):
"""Class that converts text into either pydantic or json."""
_is_gpt: bool = PrivateAttr(default=True)
text: str = Field(description="Text to be converted.")
llm: Any = Field(description="The language model to be used to convert the text.")
model: Any = Field(description="The model to be used to convert the text.")
instructions: str = Field(description="Conversion instructions to the LLM.")
max_attemps: Optional[int] = Field(
description="Max number of attemps to try to get the output formated.",
default=3,
)
@model_validator(mode="after")
def check_llm_provider(self):
if not self._is_gpt(self.llm):
self._is_gpt = False
def to_pydantic(self, current_attempt=1):
"""Convert text to pydantic."""
try:
if self._is_gpt:
if self.is_gpt:
return self._create_instructor().to_pydantic()
else:
return self._create_chain().invoke({})
except Exception as e:
if current_attempt < self.max_attemps:
if current_attempt < self.max_attempts:
return self.to_pydantic(current_attempt + 1)
return ConverterError(
f"Failed to convert text into a pydantic model due to the following error: {e}"
@@ -49,14 +34,14 @@ class Converter(BaseModel):
def to_json(self, current_attempt=1):
"""Convert text to json."""
try:
if self._is_gpt:
if self.is_gpt:
return self._create_instructor().to_json()
else:
return json.dumps(self._create_chain().invoke({}).model_dump())
except Exception:
if current_attempt < self.max_attemps:
except Exception as e:
if current_attempt < self.max_attempts:
return self.to_json(current_attempt + 1)
return ConverterError("Failed to convert text into JSON.")
return ConverterError(f"Failed to convert text into JSON, error: {e}.")
def _create_instructor(self):
"""Create an instructor."""
@@ -64,7 +49,7 @@ class Converter(BaseModel):
inst = Instructor(
llm=self.llm,
max_attemps=self.max_attemps,
max_attempts=self.max_attempts,
model=self.model,
content=self.text,
instructions=self.instructions,
@@ -83,5 +68,7 @@ class Converter(BaseModel):
)
return new_prompt | self.llm | parser
def _is_gpt(self, llm) -> bool: # type: ignore # BUG? Name "_is_gpt" defined on line 20 hides name from outer scope
return isinstance(llm, ChatOpenAI) and llm.openai_api_base is None
@property
def is_gpt(self) -> bool:
"""Return if llm provided is of gpt from openai."""
return isinstance(self.llm, ChatOpenAI) and self.llm.openai_api_base is None

View File

@@ -0,0 +1,31 @@
from datetime import datetime
import json
from uuid import UUID
from pydantic import BaseModel
class CrewJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, BaseModel):
return self._handle_pydantic_model(obj)
elif isinstance(obj, UUID):
return str(obj)
elif isinstance(obj, datetime):
return obj.isoformat()
return super().default(obj)
def _handle_pydantic_model(self, obj):
try:
data = obj.model_dump()
# Remove circular references
for key, value in data.items():
if isinstance(value, BaseModel):
data[key] = str(
value
) # Convert nested models to string representation
return data
except RecursionError:
return str(
obj
) # Fall back to string representation if circular reference is detected

View File

@@ -17,6 +17,16 @@ class CrewPydanticOutputParser(PydanticOutputParser):
def parse_result(self, result: List[Generation], *, partial: bool = False) -> Any:
result[0].text = self._transform_in_valid_json(result[0].text)
# Treating edge case of function calling llm returning the name instead of tool_name
json_object = json.loads(result[0].text)
json_object["tool_name"] = (
json_object["name"]
if "tool_name" not in json_object
else json_object["tool_name"]
)
result[0].text = json.dumps(json_object)
json_object = super().parse_result(result)
try:
return self.pydantic_object.parse_obj(json_object)

View File

@@ -5,9 +5,9 @@ from pydantic import BaseModel, Field
from crewai.utilities import Converter
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
agentops = None
try:
import agentops
from agentops import track_agent
except ImportError:
@@ -37,6 +37,18 @@ class TaskEvaluation(BaseModel):
)
class TrainingTaskEvaluation(BaseModel):
suggestions: List[str] = Field(
description="Based on the Human Feedbacks and the comparison between Initial Outputs and Improved outputs provide action items based on human_feedback for future tasks."
)
quality: float = Field(
description="A score from 0 to 10 evaluating on completion, quality, and overall performance from the improved output to the initial output based on the human feedback."
)
final_summary: str = Field(
description="A step by step action items to improve the next Agent based on the human-feedback and improved output."
)
@track_agent(name="Task Evaluator")
class TaskEvaluator:
def __init__(self, original_agent):
@@ -71,3 +83,49 @@ class TaskEvaluator:
def _is_gpt(self, llm) -> bool:
return isinstance(llm, ChatOpenAI) and llm.openai_api_base is None
def evaluate_training_data(
self, training_data: dict, agent_id: str
) -> TrainingTaskEvaluation:
"""
Evaluate the training data based on the llm output, human feedback, and improved output.
Parameters:
- training_data (dict): The training data to be evaluated.
- agent_id (str): The ID of the agent.
"""
output_training_data = training_data[agent_id]
final_aggregated_data = ""
for _, data in output_training_data.items():
final_aggregated_data += (
f"Initial Output:\n{data['initial_output']}\n\n"
f"Human Feedback:\n{data['human_feedback']}\n\n"
f"Improved Output:\n{data['improved_output']}\n\n"
)
evaluation_query = (
"Assess the quality of the training data based on the llm output, human feedback , and llm output improved result.\n\n"
f"{final_aggregated_data}"
"Please provide:\n"
"- Based on the Human Feedbacks and the comparison between Initial Outputs and Improved outputs provide action items based on human_feedback for future tasks\n"
"- A score from 0 to 10 evaluating on completion, quality, and overall performance from the improved output to the initial output based on the human feedback\n"
)
instructions = "I'm gonna convert this raw text into valid JSON."
if not self._is_gpt(self.llm):
model_schema = PydanticSchemaParser(
model=TrainingTaskEvaluation
).get_schema()
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
converter = Converter(
llm=self.llm,
text=evaluation_query,
model=TrainingTaskEvaluation,
instructions=instructions,
)
pydantic_result = converter.to_pydantic()
return pydantic_result

View File

@@ -1,20 +0,0 @@
import os
from datetime import datetime
class FileHandler:
"""take care of file operations, currently it only logs messages to a file"""
def __init__(self, file_path):
if isinstance(file_path, bool):
self._path = os.path.join(os.curdir, "logs.txt")
elif isinstance(file_path, str):
self._path = file_path
else:
raise ValueError("file_path must be either a boolean or a string.")
def log(self, **kwargs):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = f"{now}: ".join([f"{key}={value}" for key, value in kwargs.items()])
with open(self._path, "a", encoding = 'utf-8') as file:
file.write(message + "\n")

View File

@@ -0,0 +1,70 @@
import os
import pickle
from datetime import datetime
class FileHandler:
"""take care of file operations, currently it only logs messages to a file"""
def __init__(self, file_path):
if isinstance(file_path, bool):
self._path = os.path.join(os.curdir, "logs.txt")
elif isinstance(file_path, str):
self._path = file_path
else:
raise ValueError("file_path must be either a boolean or a string.")
def log(self, **kwargs):
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = f"{now}: ".join([f"{key}={value}" for key, value in kwargs.items()])
with open(self._path, "a", encoding="utf-8") as file:
file.write(message + "\n")
class PickleHandler:
def __init__(self, file_name: str) -> None:
"""
Initialize the PickleHandler with the name of the file where data will be stored.
The file will be saved in the current directory.
Parameters:
- file_name (str): The name of the file for saving and loading data.
"""
self.file_path = os.path.join(os.getcwd(), file_name)
def initialize_file(self) -> None:
"""
Initialize the file with an empty dictionary if it does not exist or is empty.
"""
if not os.path.exists(self.file_path) or os.path.getsize(self.file_path) == 0:
self.save({}) # Save an empty dictionary to initialize the file
def save(self, data) -> None:
"""
Save the data to the specified file using pickle.
Parameters:
- data (object): The data to be saved.
"""
with open(self.file_path, "wb") as file:
pickle.dump(data, file)
def load(self) -> dict:
"""
Load the data from the specified file using pickle.
Returns:
- dict: The data loaded from the file.
"""
if not os.path.exists(self.file_path) or os.path.getsize(self.file_path) == 0:
return {} # Return an empty dictionary if the file does not exist or is empty
with open(self.file_path, "rb") as file:
try:
return pickle.load(file)
except EOFError:
return {} # Return an empty dictionary if the file is empty or corrupted
except Exception:
raise # Raise any other exceptions that occur during loading

View File

@@ -0,0 +1,20 @@
from typing import List
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
def aggregate_raw_outputs_from_task_outputs(task_outputs: List[TaskOutput]) -> str:
"""Generate string context from the task outputs."""
dividers = "\n\n----------\n\n"
# Join task outputs with dividers
context = dividers.join(output.raw for output in task_outputs)
return context
def aggregate_raw_outputs_from_tasks(tasks: List[Task]) -> str:
"""Generate string context from the tasks."""
task_outputs = [task.output for task in tasks if task.output is not None]
return aggregate_raw_outputs_from_task_outputs(task_outputs)

View File

@@ -1,3 +1,5 @@
from datetime import datetime
from crewai.utilities.printer import Printer
from datetime import datetime
@@ -13,6 +15,7 @@ class Logger:
def log(self, level, message, color="bold_green"):
level_map = {"debug": 1, "info": 2}
if self.verbose_level and level_map.get(level, 0) <= self.verbose_level:
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
self._printer.print(f"[{timestamp}][{level.upper()}]: {message}", color=color)
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self._printer.print(
f"[{timestamp}][{level.upper()}]: {message}", color=color
)

View File

@@ -0,0 +1,64 @@
from typing import List, Optional
from pydantic import BaseModel
from crewai.agent import Agent
from crewai.task import Task
class PlannerTaskPydanticOutput(BaseModel):
list_of_plans_per_task: List[str]
class CrewPlanner:
def __init__(self, tasks: List[Task]):
self.tasks = tasks
def _handle_crew_planning(self) -> Optional[BaseModel]:
"""Handles the Crew planning by creating detailed step-by-step plans for each task."""
planning_agent = self._create_planning_agent()
tasks_summary = self._create_tasks_summary()
planner_task = self._create_planner_task(planning_agent, tasks_summary)
return planner_task.execute_sync().pydantic
def _create_planning_agent(self) -> Agent:
"""Creates the planning agent for the crew planning."""
return Agent(
role="Task Execution Planner",
goal=(
"Your goal is to create an extremely detailed, step-by-step plan based on the tasks and tools "
"available to each agent so that they can perform the tasks in an exemplary manner"
),
backstory="Planner agent for crew planning",
)
def _create_planner_task(self, planning_agent: Agent, tasks_summary: str) -> Task:
"""Creates the planner task using the given agent and tasks summary."""
return Task(
description=(
f"Based on these tasks summary: {tasks_summary} \n Create the most descriptive plan based on the tasks "
"descriptions, tools available, and agents' goals for them to execute their goals with perfection."
),
expected_output="Step by step plan on how the agents can execute their tasks using the available tools with mastery",
agent=planning_agent,
output_pydantic=PlannerTaskPydanticOutput,
)
def _create_tasks_summary(self) -> str:
"""Creates a summary of all tasks."""
tasks_summary = []
for idx, task in enumerate(self.tasks):
tasks_summary.append(
f"""
Task Number {idx + 1} - {task.description}
"task_description": {task.description}
"task_expected_output": {task.expected_output}
"agent": {task.agent.role if task.agent else "None"}
"agent_goal": {task.agent.goal if task.agent else "None"}
"task_tools": {task.tools}
"agent_tools": {task.agent.tools if task.agent else "None"}
"""
)
return " ".join(tasks_summary)

View File

@@ -8,6 +8,10 @@ class Printer:
self._print_bold_green(content)
elif color == "bold_purple":
self._print_bold_purple(content)
elif color == "bold_blue":
self._print_bold_blue(content)
elif color == "yellow":
self._print_yellow(content)
else:
print(content)
@@ -22,3 +26,9 @@ class Printer:
def _print_red(self, content):
print("\033[91m {}\033[00m".format(content))
def _print_bold_blue(self, content):
print("\033[1m\033[94m {}\033[00m".format(content))
def _print_yellow(self, content):
print("\033[93m {}\033[00m".format(content))

View File

@@ -0,0 +1,61 @@
from pydantic import BaseModel, Field
from datetime import datetime
from typing import Dict, Any, Optional, List
from crewai.memory.storage.kickoff_task_outputs_storage import (
KickoffTaskOutputsSQLiteStorage,
)
from crewai.task import Task
class ExecutionLog(BaseModel):
task_id: str
expected_output: Optional[str] = None
output: Dict[str, Any]
timestamp: datetime = Field(default_factory=datetime.now)
task_index: int
inputs: Dict[str, Any] = Field(default_factory=dict)
was_replayed: bool = False
def __getitem__(self, key: str) -> Any:
return getattr(self, key)
class TaskOutputStorageHandler:
def __init__(self) -> None:
self.storage = KickoffTaskOutputsSQLiteStorage()
def update(self, task_index: int, log: Dict[str, Any]):
saved_outputs = self.load()
if saved_outputs is None:
raise ValueError("Logs cannot be None")
if log.get("was_replayed", False):
replayed = {
"task_id": str(log["task"].id),
"expected_output": log["task"].expected_output,
"output": log["output"],
"was_replayed": log["was_replayed"],
"inputs": log["inputs"],
}
self.storage.update(
task_index,
**replayed,
)
else:
self.storage.add(**log)
def add(
self,
task: Task,
output: Dict[str, Any],
task_index: int,
inputs: Dict[str, Any] = {},
was_replayed: bool = False,
):
self.storage.add(task, output, task_index, was_replayed, inputs)
def reset(self):
self.storage.delete_all()
def load(self) -> Optional[List[Dict[str, Any]]]:
return self.storage.load()

View File

@@ -4,46 +4,22 @@ import tiktoken
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult
class TokenProcess:
total_tokens: int = 0
prompt_tokens: int = 0
completion_tokens: int = 0
successful_requests: int = 0
def sum_prompt_tokens(self, tokens: int):
self.prompt_tokens = self.prompt_tokens + tokens
self.total_tokens = self.total_tokens + tokens
def sum_completion_tokens(self, tokens: int):
self.completion_tokens = self.completion_tokens + tokens
self.total_tokens = self.total_tokens + tokens
def sum_successful_requests(self, requests: int):
self.successful_requests = self.successful_requests + requests
def get_summary(self) -> Dict[str, Any]:
return {
"total_tokens": self.total_tokens,
"prompt_tokens": self.prompt_tokens,
"completion_tokens": self.completion_tokens,
"successful_requests": self.successful_requests,
}
from crewai.agents.agent_builder.utilities.base_token_process import TokenProcess
class TokenCalcHandler(BaseCallbackHandler):
model: str = ""
model_name: str = ""
token_cost_process: TokenProcess
def __init__(self, model, token_cost_process):
self.model = model
def __init__(self, model_name, token_cost_process):
self.model_name = model_name
self.token_cost_process = token_cost_process
def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> None:
try:
encoding = tiktoken.encoding_for_model(self.model)
encoding = tiktoken.encoding_for_model(self.model_name)
except KeyError:
encoding = tiktoken.get_encoding("cl100k_base")

View File

@@ -0,0 +1,31 @@
from crewai.utilities.file_handler import PickleHandler
class CrewTrainingHandler(PickleHandler):
def save_trained_data(self, agent_id: str, trained_data: dict) -> None:
"""
Save the trained data for a specific agent.
Parameters:
- agent_id (str): The ID of the agent.
- trained_data (dict): The trained data to be saved.
"""
data = self.load()
data[agent_id] = trained_data
self.save(data)
def append(self, train_iteration: int, agent_id: str, new_data) -> None:
"""
Append new data to the existing pickle file.
Parameters:
- new_data (object): The new data to be appended.
"""
data = self.load()
if agent_id in data:
data[agent_id][train_iteration] = new_data
else:
data[agent_id] = {train_iteration: new_data}
self.save(data)