improved readability (#90)

This commit is contained in:
yanzz
2024-01-10 03:29:50 +01:00
committed by GitHub
parent bc54d310f2
commit 8eba7aab89
2 changed files with 83 additions and 101 deletions

View File

@@ -3,14 +3,7 @@ import uuid
from typing import Any, Dict, List, Optional, Union from typing import Any, Dict, List, Optional, Union
from pydantic import ( from pydantic import (
UUID4, BaseModel, ConfigDict, Field, InstanceOf, Json, UUID4
BaseModel,
ConfigDict,
Field,
InstanceOf,
Json,
field_validator,
model_validator,
) )
from pydantic_core import PydanticCustomError from pydantic_core import PydanticCustomError
@@ -20,118 +13,110 @@ from crewai.process import Process
from crewai.task import Task from crewai.task import Task
from crewai.tools.agent_tools import AgentTools from crewai.tools.agent_tools import AgentTools
class Crew(BaseModel): class Crew(BaseModel):
"""Class that represents a group of agents, how they should work together and their tasks.""" """
Represents a group of agents, defining how they should collaborate and the tasks they should perform.
Attributes:
tasks: List of tasks assigned to the crew.
agents: List of agents part of this crew.
process: The process flow that the crew will follow (e.g., sequential).
verbose: Indicates the verbosity level for logging during execution.
config: Configuration settings for the crew.
cache_handler: Handles caching for the crew's operations.
id: A unique identifier for the crew instance.
"""
__hash__ = object.__hash__ __hash__ = object.__hash__
model_config = ConfigDict(arbitrary_types_allowed=True) model_config = ConfigDict(arbitrary_types_allowed=True)
tasks: List[Task] = Field(description="List of tasks", default_factory=list) tasks: List[Task] = Field(default_factory=list)
agents: List[Agent] = Field( agents: List[Agent] = Field(default_factory=list)
description="List of agents in this crew.", default_factory=list process: Process = Field(default=Process.sequential)
) verbose: Union[int, bool] = Field(default=0)
process: Process = Field( config: Optional[Union[Json, Dict[str, Any]]] = Field(default=None)
description="Process that the crew will follow.", default=Process.sequential
)
verbose: Union[int, bool] = Field(
description="Verbose mode for the Agent Execution", default=0
)
config: Optional[Union[Json, Dict[str, Any]]] = Field(
description="Configuration of the crew.", default=None
)
cache_handler: Optional[InstanceOf[CacheHandler]] = Field( cache_handler: Optional[InstanceOf[CacheHandler]] = Field(
default=CacheHandler(), description="An instance of the CacheHandler class." default=CacheHandler()
) )
id: UUID4 = Field( id: UUID4 = Field(
default_factory=uuid.uuid4, default_factory=uuid.uuid4, frozen=True
frozen=True,
description="Unique identifier for the object, not set by user.",
) )
@field_validator("id", mode="before") @field_validator("id", mode="before")
@classmethod @classmethod
def _deny_user_set_id(cls, v: Optional[UUID4]) -> None: def _deny_user_set_id(cls, v: Optional[UUID4]) -> None:
"""Prevent manual setting of the 'id' field by users."""
if v: if v:
raise PydanticCustomError( raise PydanticCustomError(
"may_not_set_field", "This field is not to be set by the user.", {} "may_not_set_field", "The 'id' field cannot be set by the user.", {}
) )
@classmethod @classmethod
@field_validator("config", mode="before") @field_validator("config", mode="before")
def check_config_type(cls, v: Union[Json, Dict[str, Any]]): def check_config_type(cls, v: Union[Json, Dict[str, Any]]):
"""Ensures the 'config' field is a valid JSON or dictionary."""
if isinstance(v, Json): if isinstance(v, Json):
return json.loads(v) return json.loads(v)
return v return v
@model_validator(mode="after") @model_validator(mode="after")
def check_config(self): def check_config(self):
"""Validates that the crew is properly configured with agents and tasks."""
if not self.config and not self.tasks and not self.agents: if not self.config and not self.tasks and not self.agents:
raise PydanticCustomError( raise PydanticCustomError(
"missing_keys", "Either agents and task need to be set or config.", {} "missing_keys", "Either 'agents' and 'tasks' need to be set or 'config'.", {}
) )
if self.config: if self.config:
if not self.config.get("agents") or not self.config.get("tasks"): self._setup_from_config()
raise PydanticCustomError(
"missing_keys_in_config", "Config should have agents and tasks", {}
)
self.agents = [Agent(**agent) for agent in self.config["agents"]]
tasks = []
for task in self.config["tasks"]:
task_agent = [agt for agt in self.agents if agt.role == task["agent"]][
0
]
del task["agent"]
tasks.append(Task(**task, agent=task_agent))
self.tasks = tasks
if self.agents: if self.agents:
for agent in self.agents: for agent in self.agents:
agent.set_cache_handler(self.cache_handler) agent.set_cache_handler(self.cache_handler)
return self return self
def kickoff(self) -> str: def _setup_from_config(self):
"""Kickoff the crew to work on its tasks. """Initializes agents and tasks from the provided config."""
if not self.config.get("agents") or not self.config.get("tasks"):
raise PydanticCustomError(
"missing_keys_in_config", "Config should have 'agents' and 'tasks'.", {}
)
Returns: self.agents = [Agent(**agent) for agent in self.config["agents"]]
Output of the crew for each task. self.tasks = [self._create_task(task) for task in self.config["tasks"]]
"""
def _create_task(self, task_config):
"""Creates a task instance from its configuration."""
task_agent = next(agt for agt in self.agents if agt.role == task_config["agent"])
del task_config["agent"]
return Task(**task_config, agent=task_agent)
def kickoff(self) -> str:
"""Starts the crew to work on its assigned tasks."""
for agent in self.agents: for agent in self.agents:
agent.cache_handler = self.cache_handler agent.cache_handler = self.cache_handler
if self.process == Process.sequential: if self.process == Process.sequential:
return self.__sequential_loop() return self._sequential_loop()
def __sequential_loop(self) -> str: def _sequential_loop(self) -> str:
"""Loop that executes the sequential process. """Executes tasks sequentially and returns the final output."""
Returns:
Output of the crew.
"""
task_output = None task_output = None
for task in self.tasks: for task in self.tasks:
# Add delegation tools to the task if the agent allows it self._prepare_and_execute_task(task)
if task.agent.allow_delegation:
agent_tools = AgentTools(agents=self.agents).tools()
task.tools += agent_tools
self.__log("debug", f"Working Agent: {task.agent.role}")
self.__log("info", f"Starting Task: {task.description}")
task_output = task.execute(task_output) task_output = task.execute(task_output)
self.__log(
"debug", f"\n\n[{task.agent.role}] Task output: {task_output}\n\n"
)
return task_output return task_output
def __log(self, level, message): def _prepare_and_execute_task(self, task):
"""Log a message""" """Prepares and logs information about the task being executed."""
if task.agent.allow_delegation:
task.tools += AgentTools(agents=self.agents).tools()
self._log("debug", f"Working Agent: {task.agent.role}")
self._log("info", f"Starting Task: {task.description}")
def _log(self, level, message):
"""Logs a message at the specified verbosity level."""
level_map = {"debug": 1, "info": 2} level_map = {"debug": 1, "info": 2}
verbose_level = ( verbose_level = 2 if isinstance(self.verbose, bool) and self.verbose else self.verbose
2 if isinstance(self.verbose, bool) and self.verbose else self.verbose
)
if verbose_level and level_map[level] <= verbose_level: if verbose_level and level_map[level] <= verbose_level:
print(message) print(message)

View File

@@ -1,53 +1,50 @@
"""Prompts for generic agent."""
import json import json
import os import os
from typing import ClassVar, Dict, Optional from typing import ClassVar, Dict, Optional
from langchain.prompts import PromptTemplate from langchain.prompts import PromptTemplate
from pydantic import BaseModel, Field, PrivateAttr, model_validator from pydantic import BaseModel, Field, PrivateAttr, model_validator, ValidationError
class Prompts(BaseModel): class Prompts(BaseModel):
"""Prompts for generic agent.""" """Manages and generates prompts for a generic agent with support for different languages."""
_prompts: Optional[Dict[str, str]] = PrivateAttr() _prompts: Optional[Dict[str, str]] = PrivateAttr()
language: Optional[str] = Field( language: Optional[str] = Field(
default="en", default="en",
description="Language of crewai prompts.", description="Language of the prompts.",
) )
@model_validator(mode="after") @model_validator(mode="after")
def load_prompts(self) -> "Prompts": def load_prompts(self) -> "Prompts":
"""Load prompts from file.""" """Load prompts from a JSON file based on the specified language."""
dir_path = os.path.dirname(os.path.realpath(__file__)) try:
prompts_path = os.path.join(dir_path, f"prompts/{self.language}.json") dir_path = os.path.dirname(os.path.realpath(__file__))
prompts_path = os.path.join(dir_path, f"prompts/{self.language}.json")
with open(prompts_path, "r") as f: with open(prompts_path, "r") as f:
self._prompts = json.load(f)["slices"] self._prompts = json.load(f)["slices"]
except FileNotFoundError:
raise ValidationError(f"Prompt file for language '{self.language}' not found.")
except json.JSONDecodeError:
raise ValidationError(f"Error decoding JSON from the prompts file.")
return self return self
SCRATCHPAD_SLICE: ClassVar[str] = "\n{agent_scratchpad}" SCRATCHPAD_SLICE: ClassVar[str] = "\n{agent_scratchpad}"
def task_execution_with_memory(self) -> str: def task_execution_with_memory(self) -> str:
return PromptTemplate.from_template( """Generate a prompt for task execution with memory components."""
self._prompts["role_playing"] return self._build_prompt(["role_playing", "tools", "memory", "task"])
+ self._prompts["tools"]
+ self._prompts["memory"]
+ self._prompts["task"]
+ self.SCRATCHPAD_SLICE
)
def task_execution_without_tools(self) -> str: def task_execution_without_tools(self) -> str:
return PromptTemplate.from_template( """Generate a prompt for task execution without tools components."""
self._prompts["role_playing"] return self._build_prompt(["role_playing", "task"])
+ self._prompts["task"]
+ self.SCRATCHPAD_SLICE
)
def task_execution(self) -> str: def task_execution(self) -> str:
return PromptTemplate.from_template( """Generate a standard prompt for task execution."""
self._prompts["role_playing"] return self._build_prompt(["role_playing", "tools", "task"])
+ self._prompts["tools"]
+ self._prompts["task"] def _build_prompt(self, components: [str]) -> str:
+ self.SCRATCHPAD_SLICE """Constructs a prompt string from specified components."""
) prompt_parts = [self._prompts[component] for component in components if component in self._prompts]
prompt_parts.append(self.SCRATCHPAD_SLICE)
return PromptTemplate.from_template("".join(prompt_parts))