Merge branch 'main' into feature/procedure_v2

This commit is contained in:
Brandon Hancock
2024-07-29 12:48:53 -04:00
48 changed files with 437254 additions and 7599 deletions

View File

@@ -55,11 +55,6 @@ class Agent(BaseAgent):
tools: Tools at agents disposal
step_callback: Callback to be executed after each step of the agent execution.
callbacks: A list of callback functions from the langchain library that are triggered during the agent's execution process
allow_code_execution: Enable code execution for the agent.
max_retry_limit: Maximum number of retries for an agent to execute a task when an error occurs.
"""
_times_executed: int = PrivateAttr(default=0)
max_execution_time: Optional[int] = Field(
default=None,
description="Maximum execution time for an agent to execute a task",
@@ -191,20 +186,6 @@ class Agent(BaseAgent):
else:
task_prompt = self._use_trained_data(task_prompt=task_prompt)
try:
result = self.agent_executor.invoke(
{
"input": task_prompt,
"tool_names": self.agent_executor.tools_names,
"tools": self.agent_executor.tools_description,
}
)["output"]
except Exception as e:
self._times_executed += 1
if self._times_executed > self.max_retry_limit:
raise e
result = self.execute_task(task, context, tools)
if self.max_rpm:
self._rpm_controller.stop_rpm_counter()
@@ -262,6 +243,7 @@ class Agent(BaseAgent):
"tools_handler": self.tools_handler,
"function_calling_llm": self.function_calling_llm,
"callbacks": self.callbacks,
"max_tokens": self.max_tokens,
}
if self._rpm_controller:

View File

@@ -45,6 +45,7 @@ class BaseAgent(ABC, BaseModel):
i18n (I18N): Internationalization settings.
cache_handler (InstanceOf[CacheHandler]): An instance of the CacheHandler class.
tools_handler (InstanceOf[ToolsHandler]): An instance of the ToolsHandler class.
max_tokens: Maximum number of tokens for the agent to generate in a response.
Methods:
@@ -118,6 +119,9 @@ class BaseAgent(ABC, BaseModel):
tools_handler: InstanceOf[ToolsHandler] = Field(
default=None, description="An instance of the ToolsHandler class."
)
max_tokens: Optional[int] = Field(
default=None, description="Maximum number of tokens for the agent's execution."
)
_original_role: str | None = None
_original_goal: str | None = None

View File

@@ -5,11 +5,11 @@ from crewai.memory.storage.kickoff_task_outputs_storage import (
KickoffTaskOutputsSQLiteStorage,
)
from .create_crew import create_crew
from .train_crew import train_crew
from .replay_from_task import replay_task_command
from .reset_memories_command import reset_memories_command
from .test_crew import test_crew
from .train_crew import train_crew
@click.group()
@@ -126,5 +126,26 @@ def reset_memories(long, short, entities, kickoff_outputs, all):
click.echo(f"An error occurred while resetting memories: {e}", err=True)
@crewai.command()
@click.option(
"-n",
"--n_iterations",
type=int,
default=3,
help="Number of iterations to Test the crew",
)
@click.option(
"-m",
"--model",
type=str,
default="gpt-4o-mini",
help="LLM Model to run the tests on the Crew. For now only accepting only OpenAI models.",
)
def test(n_iterations: int, model: str):
"""Test the crew and evaluate the results."""
click.echo(f"Testing the crew for {n_iterations} iterations with model {model}")
test_crew(n_iterations, model)
if __name__ == "__main__":
crewai()

View File

@@ -9,10 +9,14 @@ from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandle
def reset_memories_command(long, short, entity, kickoff_outputs, all) -> None:
"""
Replay the crew execution from a specific task.
Reset the crew memories.
Args:
task_id (str): The ID of the task to replay from.
long (bool): Whether to reset the long-term memory.
short (bool): Whether to reset the short-term memory.
entity (bool): Whether to reset the entity memory.
kickoff_outputs (bool): Whether to reset the latest kickoff task outputs.
all (bool): Whether to reset all memories.
"""
try:

View File

@@ -5,6 +5,7 @@ research_task:
the current year is 2024.
expected_output: >
A list with 10 bullet points of the most relevant information about {topic}
agent: researcher
reporting_task:
description: >
@@ -13,3 +14,4 @@ reporting_task:
expected_output: >
A fully fledge reports with the mains topics, each with a full section of information.
Formatted as markdown without '```'
agent: reporting_analyst

View File

@@ -32,14 +32,12 @@ class {{crew_name}}Crew():
def research_task(self) -> Task:
return Task(
config=self.tasks_config['research_task'],
agent=self.researcher()
)
@task
def reporting_task(self) -> Task:
return Task(
config=self.tasks_config['reporting_task'],
agent=self.reporting_analyst(),
output_file='report.md'
)

View File

@@ -39,3 +39,16 @@ def replay():
except Exception as e:
raise Exception(f"An error occurred while replaying the crew: {e}")
def test():
"""
Test the crew execution and returns the results.
"""
inputs = {
"topic": "AI LLMs"
}
try:
{{crew_name}}Crew().crew().test(n_iterations=int(sys.argv[1]), openai_model_name=sys.argv[2], inputs=inputs)
except Exception as e:
raise Exception(f"An error occurred while replaying the crew: {e}")

View File

@@ -12,6 +12,7 @@ crewai = { extras = ["tools"], version = "^0.41.1" }
{{folder_name}} = "{{folder_name}}.main:run"
train = "{{folder_name}}.main:train"
replay = "{{folder_name}}.main:replay"
test = "{{folder_name}}.main:test"
[build-system]
requires = ["poetry-core"]

View File

@@ -0,0 +1,32 @@
import subprocess
import click
import pytest
pytest.skip(allow_module_level=True)
def test_crew(n_iterations: int, model: str) -> None:
"""
Test the crew by running a command in the Poetry environment.
Args:
n_iterations (int): The number of iterations to test the crew.
model (str): The model to test the crew with.
"""
command = ["poetry", "run", "test", str(n_iterations), model]
try:
if n_iterations <= 0:
raise ValueError("The number of iterations must be a positive integer.")
result = subprocess.run(command, capture_output=False, text=True, check=True)
if result.stderr:
click.echo(result.stderr, err=True)
except subprocess.CalledProcessError as e:
click.echo(f"An error occurred while testing the crew: {e}", err=True)
click.echo(e.output, err=True)
except Exception as e:
click.echo(f"An unexpected error occurred: {e}", err=True)

View File

@@ -34,6 +34,7 @@ from crewai.telemetry import Telemetry
from crewai.tools.agent_tools import AgentTools
from crewai.utilities import I18N, FileHandler, Logger, RPMController
from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_FILE
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs,
@@ -155,6 +156,10 @@ class Crew(BaseModel):
default=False,
description="Plan the crew execution and add the plan to the crew.",
)
planning_llm: Optional[Any] = Field(
default=None,
description="Language model that will run the AgentPlanner if planning is True.",
)
task_execution_output_json_files: Optional[List[str]] = Field(
default=None,
description="List of file paths for task execution JSON files.",
@@ -267,20 +272,6 @@ class Crew(BaseModel):
return self
@model_validator(mode="after")
def check_tasks_in_hierarchical_process_not_async(self):
"""Validates that the tasks in hierarchical process are not flagged with async_execution."""
if self.process == Process.hierarchical:
for task in self.tasks:
if task.async_execution:
raise PydanticCustomError(
"async_execution_in_hierarchical_process",
"Hierarchical process error: Tasks cannot be flagged with async_execution.",
{},
)
return self
@model_validator(mode="after")
def validate_end_with_at_most_one_async_task(self):
"""Validates that the crew ends with at most one asynchronous task."""
@@ -560,15 +551,12 @@ class Crew(BaseModel):
def _handle_crew_planning(self):
"""Handles the Crew planning."""
self._logger.log("info", "Planning the crew execution")
result = CrewPlanner(self.tasks)._handle_crew_planning()
result = CrewPlanner(
tasks=self.tasks, planning_agent_llm=self.planning_llm
)._handle_crew_planning()
if result is not None and hasattr(result, "list_of_plans_per_task"):
for task, step_plan in zip(self.tasks, result.list_of_plans_per_task):
task.description += step_plan
else:
self._logger.log(
"info", "Something went wrong with the planning process of the Crew"
)
for task, step_plan in zip(self.tasks, result.list_of_plans_per_task):
task.description += step_plan
def _store_execution_log(
self,
@@ -606,7 +594,7 @@ class Crew(BaseModel):
def _run_hierarchical_process(self) -> CrewOutput:
"""Creates and assigns a manager agent to make sure the crew completes the tasks."""
self._create_manager_agent()
return self._execute_tasks(self.tasks, self.manager_agent)
return self._execute_tasks(self.tasks)
def _create_manager_agent(self):
i18n = I18N(prompt_file=self.prompt_file)
@@ -630,7 +618,6 @@ class Crew(BaseModel):
def _execute_tasks(
self,
tasks: List[Task],
manager: Optional[BaseAgent] = None,
start_index: Optional[int] = 0,
was_replayed: bool = False,
) -> CrewOutput:
@@ -658,13 +645,13 @@ class Crew(BaseModel):
last_sync_output = task.output
continue
agent_to_use = self._get_agent_to_use(task, manager)
agent_to_use = self._get_agent_to_use(task)
if agent_to_use is None:
raise ValueError(
f"No agent available for task: {task.description}. Ensure that either the task has an assigned agent or a manager agent is provided."
)
self._prepare_agent_tools(task, manager)
self._prepare_agent_tools(task)
self._log_task_start(task, agent_to_use.role)
if isinstance(task, ConditionalTask):
@@ -730,20 +717,18 @@ class Crew(BaseModel):
return skipped_task_output
return None
def _prepare_agent_tools(self, task: Task, manager: Optional[BaseAgent]):
def _prepare_agent_tools(self, task: Task):
if self.process == Process.hierarchical:
if manager:
self._update_manager_tools(task, manager)
if self.manager_agent:
self._update_manager_tools(task)
else:
raise ValueError("Manager agent is required for hierarchical process.")
elif task.agent and task.agent.allow_delegation:
self._add_delegation_tools(task)
def _get_agent_to_use(
self, task: Task, manager: Optional[BaseAgent]
) -> Optional[BaseAgent]:
def _get_agent_to_use(self, task: Task) -> Optional[BaseAgent]:
if self.process == Process.hierarchical:
return manager
return self.manager_agent
return task.agent
def _add_delegation_tools(self, task: Task):
@@ -779,11 +764,14 @@ class Crew(BaseModel):
if self.output_log_file:
self._file_handler.log(agent=role, task=task.description, status="started")
def _update_manager_tools(self, task: Task, manager: BaseAgent):
if task.agent:
manager.tools = task.agent.get_delegation_tools([task.agent])
else:
manager.tools = manager.get_delegation_tools(self.agents)
def _update_manager_tools(self, task: Task):
if self.manager_agent:
if task.agent:
self.manager_agent.tools = task.agent.get_delegation_tools([task.agent])
else:
self.manager_agent.tools = self.manager_agent.get_delegation_tools(
self.agents
)
def _get_context(self, task: Task, task_outputs: List[TaskOutput]):
context = (
@@ -882,7 +870,7 @@ class Crew(BaseModel):
self.tasks[i].output = task_output
self._logging_color = "bold_blue"
result = self._execute_tasks(self.tasks, self.manager_agent, start_index, True)
result = self._execute_tasks(self.tasks, start_index, True)
return result
def copy(self):
@@ -967,6 +955,21 @@ class Crew(BaseModel):
return total_usage_metrics
def test(
self,
n_iterations: int,
openai_model_name: str,
inputs: Optional[Dict[str, Any]] = None,
) -> None:
"""Test and evaluate the Crew with the given inputs for n iterations."""
evaluator = CrewEvaluator(self, openai_model_name)
for i in range(1, n_iterations + 1):
evaluator.set_iteration(i)
self.kickoff(inputs=inputs)
evaluator.print_crew_evaluation_result()
def __rshift__(self, other: "Crew") -> "Pipeline":
"""
Implements the >> operator to add another Crew to an existing Pipeline.

View File

@@ -1,2 +1,25 @@
from .annotations import agent, crew, task
from .annotations import (
agent,
crew,
task,
output_json,
output_pydantic,
tool,
callback,
llm,
cache_handler,
)
from .crew_base import CrewBase
__all__ = [
"agent",
"crew",
"task",
"output_json",
"output_pydantic",
"tool",
"callback",
"CrewBase",
"llm",
"cache_handler",
]

View File

@@ -30,6 +30,37 @@ def agent(func):
return func
def llm(func):
func.is_llm = True
func = memoize(func)
return func
def output_json(cls):
cls.is_output_json = True
return cls
def output_pydantic(cls):
cls.is_output_pydantic = True
return cls
def tool(func):
func.is_tool = True
return memoize(func)
def callback(func):
func.is_callback = True
return memoize(func)
def cache_handler(func):
func.is_cache_handler = True
return memoize(func)
def crew(func):
def wrapper(self, *args, **kwargs):
instantiated_tasks = []

View File

@@ -1,6 +1,7 @@
import inspect
import os
from pathlib import Path
from typing import Any, Callable, Dict
import yaml
from dotenv import load_dotenv
@@ -20,11 +21,6 @@ def CrewBase(cls):
base_directory = Path(frame_info.filename).parent.resolve()
break
if base_directory is None:
raise Exception(
"Unable to dynamically determine the project's base directory, you must run it from the project's root directory."
)
original_agents_config_path = getattr(
cls, "agents_config", "config/agents.yaml"
)
@@ -32,12 +28,20 @@ def CrewBase(cls):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.base_directory is None:
raise Exception(
"Unable to dynamically determine the project's base directory, you must run it from the project's root directory."
)
self.agents_config = self.load_yaml(
os.path.join(self.base_directory, self.original_agents_config_path)
)
self.tasks_config = self.load_yaml(
os.path.join(self.base_directory, self.original_tasks_config_path)
)
self.map_all_agent_variables()
self.map_all_task_variables()
@staticmethod
def load_yaml(config_path: str):
@@ -45,4 +49,138 @@ def CrewBase(cls):
# parsedContent = YamlParser.parse(file) # type: ignore # Argument 1 to "parse" has incompatible type "TextIOWrapper"; expected "YamlParser"
return yaml.safe_load(file)
def _get_all_functions(self):
return {
name: getattr(self, name)
for name in dir(self)
if callable(getattr(self, name))
}
def _filter_functions(
self, functions: Dict[str, Callable], attribute: str
) -> Dict[str, Callable]:
return {
name: func
for name, func in functions.items()
if hasattr(func, attribute)
}
def map_all_agent_variables(self) -> None:
all_functions = self._get_all_functions()
llms = self._filter_functions(all_functions, "is_llm")
tool_functions = self._filter_functions(all_functions, "is_tool")
cache_handler_functions = self._filter_functions(
all_functions, "is_cache_handler"
)
callbacks = self._filter_functions(all_functions, "is_callback")
agents = self._filter_functions(all_functions, "is_agent")
for agent_name, agent_info in self.agents_config.items():
self._map_agent_variables(
agent_name,
agent_info,
agents,
llms,
tool_functions,
cache_handler_functions,
callbacks,
)
def _map_agent_variables(
self,
agent_name: str,
agent_info: Dict[str, Any],
agents: Dict[str, Callable],
llms: Dict[str, Callable],
tool_functions: Dict[str, Callable],
cache_handler_functions: Dict[str, Callable],
callbacks: Dict[str, Callable],
) -> None:
if llm := agent_info.get("llm"):
self.agents_config[agent_name]["llm"] = llms[llm]()
if tools := agent_info.get("tools"):
self.agents_config[agent_name]["tools"] = [
tool_functions[tool]() for tool in tools
]
if function_calling_llm := agent_info.get("function_calling_llm"):
self.agents_config[agent_name]["function_calling_llm"] = agents[
function_calling_llm
]()
if step_callback := agent_info.get("step_callback"):
self.agents_config[agent_name]["step_callback"] = callbacks[
step_callback
]()
if cache_handler := agent_info.get("cache_handler"):
self.agents_config[agent_name]["cache_handler"] = (
cache_handler_functions[cache_handler]()
)
def map_all_task_variables(self) -> None:
all_functions = self._get_all_functions()
agents = self._filter_functions(all_functions, "is_agent")
tasks = self._filter_functions(all_functions, "is_task")
output_json_functions = self._filter_functions(
all_functions, "is_output_json"
)
tool_functions = self._filter_functions(all_functions, "is_tool")
callback_functions = self._filter_functions(all_functions, "is_callback")
output_pydantic_functions = self._filter_functions(
all_functions, "is_output_pydantic"
)
for task_name, task_info in self.tasks_config.items():
self._map_task_variables(
task_name,
task_info,
agents,
tasks,
output_json_functions,
tool_functions,
callback_functions,
output_pydantic_functions,
)
def _map_task_variables(
self,
task_name: str,
task_info: Dict[str, Any],
agents: Dict[str, Callable],
tasks: Dict[str, Callable],
output_json_functions: Dict[str, Callable],
tool_functions: Dict[str, Callable],
callback_functions: Dict[str, Callable],
output_pydantic_functions: Dict[str, Callable],
) -> None:
if context_list := task_info.get("context"):
self.tasks_config[task_name]["context"] = [
tasks[context_task_name]() for context_task_name in context_list
]
if tools := task_info.get("tools"):
self.tasks_config[task_name]["tools"] = [
tool_functions[tool]() for tool in tools
]
if agent_name := task_info.get("agent"):
self.tasks_config[task_name]["agent"] = agents[agent_name]()
if output_json := task_info.get("output_json"):
self.tasks_config[task_name]["output_json"] = output_json_functions[
output_json
]
if output_pydantic := task_info.get("output_pydantic"):
self.tasks_config[task_name]["output_pydantic"] = (
output_pydantic_functions[output_pydantic]
)
if callbacks := task_info.get("callbacks"):
self.tasks_config[task_name]["callbacks"] = [
callback_functions[callback]() for callback in callbacks
]
return WrappedClass

View File

@@ -1,6 +1,5 @@
import json
import os
import re
import threading
import uuid
from concurrent.futures import Future
@@ -8,7 +7,6 @@ from copy import copy
from hashlib import md5
from typing import Any, Dict, List, Optional, Tuple, Type, Union
from langchain_openai import ChatOpenAI
from opentelemetry.trace import Span
from pydantic import UUID4, BaseModel, Field, field_validator, model_validator
from pydantic_core import PydanticCustomError
@@ -17,10 +15,8 @@ from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry.telemetry import Telemetry
from crewai.utilities.converter import Converter, ConverterError
from crewai.utilities.converter import Converter, convert_to_model
from crewai.utilities.i18n import I18N
from crewai.utilities.printer import Printer
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
class Task(BaseModel):
@@ -127,7 +123,7 @@ class Task(BaseModel):
@field_validator("output_file")
@classmethod
def output_file_validattion(cls, value: str) -> str:
def output_file_validation(cls, value: str) -> str:
"""Validate the output file path by removing the / from the beginning of the path."""
if value.startswith("/"):
return value[1:]
@@ -325,18 +321,6 @@ class Task(BaseModel):
return copied_task
def _create_converter(self, *args, **kwargs) -> Converter:
"""Create a converter instance."""
if self.agent and not self.converter_cls:
converter = self.agent.get_output_converter(*args, **kwargs)
elif self.converter_cls:
converter = self.converter_cls(*args, **kwargs)
if not converter:
raise Exception("No output converter found or set.")
return converter
def _export_output(
self, result: str
) -> Tuple[Optional[BaseModel], Optional[Dict[str, Any]]]:
@@ -344,75 +328,26 @@ class Task(BaseModel):
json_output: Optional[Dict[str, Any]] = None
if self.output_pydantic or self.output_json:
model_output = self._convert_to_model(result)
pydantic_output = (
model_output if isinstance(model_output, BaseModel) else None
model_output = convert_to_model(
result,
self.output_pydantic,
self.output_json,
self.agent,
self.converter_cls,
)
if isinstance(model_output, str):
if isinstance(model_output, BaseModel):
pydantic_output = model_output
elif isinstance(model_output, dict):
json_output = model_output
elif isinstance(model_output, str):
try:
json_output = json.loads(model_output)
except json.JSONDecodeError:
json_output = None
else:
json_output = model_output if isinstance(model_output, dict) else None
return pydantic_output, json_output
def _convert_to_model(self, result: str) -> Union[dict, BaseModel, str]:
model = self.output_pydantic or self.output_json
if model is None:
return result
try:
return self._validate_model(result, model)
except Exception:
return self._handle_partial_json(result, model)
def _validate_model(
self, result: str, model: Type[BaseModel]
) -> Union[dict, BaseModel]:
exported_result = model.model_validate_json(result)
if self.output_json:
return exported_result.model_dump()
return exported_result
def _handle_partial_json(
self, result: str, model: Type[BaseModel]
) -> Union[dict, BaseModel, str]:
match = re.search(r"({.*})", result, re.DOTALL)
if match:
try:
exported_result = model.model_validate_json(match.group(0))
if self.output_json:
return exported_result.model_dump()
return exported_result
except Exception:
pass
return self._convert_with_instructions(result, model)
def _convert_with_instructions(
self, result: str, model: Type[BaseModel]
) -> Union[dict, BaseModel, str]:
llm = self.agent.function_calling_llm or self.agent.llm # type: ignore # Item "None" of "BaseAgent | None" has no attribute "function_calling_llm"
instructions = self._get_conversion_instructions(model, llm)
converter = self._create_converter(
llm=llm, text=result, model=model, instructions=instructions
)
exported_result = (
converter.to_pydantic() if self.output_pydantic else converter.to_json()
)
if isinstance(exported_result, ConverterError):
Printer().print(
content=f"{exported_result.message} Using raw output instead.",
color="red",
)
return result
return exported_result
def _get_output_format(self) -> OutputFormat:
if self.output_json:
return OutputFormat.JSON
@@ -420,34 +355,18 @@ class Task(BaseModel):
return OutputFormat.PYDANTIC
return OutputFormat.RAW
def _get_conversion_instructions(self, model: Type[BaseModel], llm: Any) -> str:
instructions = "I'm gonna convert this raw text into valid JSON."
if not self._is_gpt(llm):
model_schema = PydanticSchemaParser(model=model).get_schema()
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
return instructions
def _save_output(self, content: str) -> None:
if not self.output_file:
raise Exception("Output file path is not set.")
directory = os.path.dirname(self.output_file)
if directory and not os.path.exists(directory):
os.makedirs(directory)
with open(self.output_file, "w", encoding="utf-8") as file:
file.write(content)
def _is_gpt(self, llm) -> bool:
return isinstance(llm, ChatOpenAI) and llm.openai_api_base is None
def _save_file(self, result: Any) -> None:
directory = os.path.dirname(self.output_file) # type: ignore # Value of type variable "AnyOrLiteralStr" of "dirname" cannot be "str | None"
if directory and not os.path.exists(directory):
os.makedirs(directory)
with open(self.output_file, "w", encoding="utf-8") as file: # type: ignore # Argument 1 to "open" has incompatible type "str | None"; expected "int | str | bytes | PathLike[str] | PathLike[bytes]"
file.write(result)
with open(self.output_file, "w", encoding="utf-8") as file:
if isinstance(result, dict):
import json
json.dump(result, file, ensure_ascii=False, indent=2)
else:
file.write(str(result))
return None
def __repr__(self):

View File

@@ -86,7 +86,8 @@ class ToolUsage:
) -> str:
if isinstance(calling, ToolUsageErrorException):
error = calling.message
self._printer.print(content=f"\n\n{error}\n", color="red")
if self.agent.verbose:
self._printer.print(content=f"\n\n{error}\n", color="red")
self.task.increment_tools_errors()
return error
@@ -96,7 +97,8 @@ class ToolUsage:
except Exception as e:
error = getattr(e, "message", str(e))
self.task.increment_tools_errors()
self._printer.print(content=f"\n\n{error}\n", color="red")
if self.agent.verbose:
self._printer.print(content=f"\n\n{error}\n", color="red")
return error
return f"{self._use(tool_string=tool_string, tool=tool, calling=calling)}" # type: ignore # BUG?: "_use" of "ToolUsage" does not return a value (it only ever returns None)
@@ -112,7 +114,8 @@ class ToolUsage:
result = self._i18n.errors("task_repeated_usage").format(
tool_names=self.tools_names
)
self._printer.print(content=f"\n\n{result}\n", color="purple")
if self.agent.verbose:
self._printer.print(content=f"\n\n{result}\n", color="purple")
self._telemetry.tool_repeated_usage(
llm=self.function_calling_llm,
tool_name=tool.name,
@@ -168,7 +171,10 @@ class ToolUsage:
f'\n{error_message}.\nMoving on then. {self._i18n.slice("format").format(tool_names=self.tools_names)}'
).message
self.task.increment_tools_errors()
self._printer.print(content=f"\n\n{error_message}\n", color="red")
if self.agent.verbose:
self._printer.print(
content=f"\n\n{error_message}\n", color="red"
)
return error # type: ignore # No return value expected
self.task.increment_tools_errors()
@@ -192,7 +198,8 @@ class ToolUsage:
calling=calling, output=result, should_cache=should_cache
)
self._printer.print(content=f"\n\n{result}\n", color="purple")
if self.agent.verbose:
self._printer.print(content=f"\n\n{result}\n", color="purple")
if agentops:
agentops.record(tool_event)
self._telemetry.tool_usage(
@@ -346,7 +353,8 @@ class ToolUsage:
if self._run_attempts > self._max_parsing_attempts:
self._telemetry.tool_usage_error(llm=self.function_calling_llm)
self.task.increment_tools_errors()
self._printer.print(content=f"\n\n{e}\n", color="red")
if self.agent.verbose:
self._printer.print(content=f"\n\n{e}\n", color="red")
return ToolUsageErrorException( # type: ignore # Incompatible return value type (got "ToolUsageErrorException", expected "ToolCalling | InstructorToolCalling")
f'{self._i18n.errors("tool_usage_error").format(error=e)}\nMoving on then. {self._i18n.slice("format").format(tool_names=self.tools_names)}'
)

View File

@@ -1,9 +1,14 @@
import json
import re
from typing import Any, Optional, Type, Union
from langchain.schema import HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, ValidationError
from crewai.agents.agent_builder.utilities.base_output_converter import OutputConverter
from crewai.utilities.printer import Printer
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
class ConverterError(Exception):
@@ -72,3 +77,153 @@ class Converter(OutputConverter):
def is_gpt(self) -> bool:
"""Return if llm provided is of gpt from openai."""
return isinstance(self.llm, ChatOpenAI) and self.llm.openai_api_base is None
def convert_to_model(
result: str,
output_pydantic: Optional[Type[BaseModel]],
output_json: Optional[Type[BaseModel]],
agent: Any,
converter_cls: Optional[Type[Converter]] = None,
) -> Union[dict, BaseModel, str]:
model = output_pydantic or output_json
if model is None:
return result
try:
escaped_result = json.dumps(json.loads(result, strict=False))
return validate_model(escaped_result, model, bool(output_json))
except json.JSONDecodeError as e:
Printer().print(
content=f"Error parsing JSON: {e}. Attempting to handle partial JSON.",
color="yellow",
)
return handle_partial_json(
result, model, bool(output_json), agent, converter_cls
)
except ValidationError as e:
Printer().print(
content=f"Pydantic validation error: {e}. Attempting to handle partial JSON.",
color="yellow",
)
return handle_partial_json(
result, model, bool(output_json), agent, converter_cls
)
except Exception as e:
Printer().print(
content=f"Unexpected error during model conversion: {type(e).__name__}: {e}. Returning original result.",
color="red",
)
return result
def validate_model(
result: str, model: Type[BaseModel], is_json_output: bool
) -> Union[dict, BaseModel]:
exported_result = model.model_validate_json(result)
if is_json_output:
return exported_result.model_dump()
return exported_result
def handle_partial_json(
result: str,
model: Type[BaseModel],
is_json_output: bool,
agent: Any,
converter_cls: Optional[Type[Converter]] = None,
) -> Union[dict, BaseModel, str]:
match = re.search(r"({.*})", result, re.DOTALL)
if match:
try:
exported_result = model.model_validate_json(match.group(0))
if is_json_output:
return exported_result.model_dump()
return exported_result
except json.JSONDecodeError as e:
Printer().print(
content=f"Error parsing JSON: {e}. The extracted JSON-like string is not valid JSON. Attempting alternative conversion method.",
color="yellow",
)
except ValidationError as e:
Printer().print(
content=f"Pydantic validation error: {e}. The JSON structure doesn't match the expected model. Attempting alternative conversion method.",
color="yellow",
)
except Exception as e:
Printer().print(
content=f"Unexpected error during partial JSON handling: {type(e).__name__}: {e}. Attempting alternative conversion method.",
color="red",
)
return convert_with_instructions(
result, model, is_json_output, agent, converter_cls
)
def convert_with_instructions(
result: str,
model: Type[BaseModel],
is_json_output: bool,
agent: Any,
converter_cls: Optional[Type[Converter]] = None,
) -> Union[dict, BaseModel, str]:
llm = agent.function_calling_llm or agent.llm
instructions = get_conversion_instructions(model, llm)
converter = create_converter(
agent=agent,
converter_cls=converter_cls,
llm=llm,
text=result,
model=model,
instructions=instructions,
)
exported_result = (
converter.to_pydantic() if not is_json_output else converter.to_json()
)
if isinstance(exported_result, ConverterError):
Printer().print(
content=f"{exported_result.message} Using raw output instead.",
color="red",
)
return result
return exported_result
def get_conversion_instructions(model: Type[BaseModel], llm: Any) -> str:
instructions = "I'm gonna convert this raw text into valid JSON."
if not is_gpt(llm):
model_schema = PydanticSchemaParser(model=model).get_schema()
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
return instructions
def is_gpt(llm: Any) -> bool:
from langchain_openai import ChatOpenAI
return isinstance(llm, ChatOpenAI) and llm.openai_api_base is None
def create_converter(
agent: Optional[Any] = None,
converter_cls: Optional[Type[Converter]] = None,
*args,
**kwargs,
) -> Converter:
if agent and not converter_cls:
if hasattr(agent, "get_output_converter"):
converter = agent.get_output_converter(*args, **kwargs)
else:
raise AttributeError("Agent does not have a 'get_output_converter' method")
elif converter_cls:
converter = converter_cls(*args, **kwargs)
else:
raise ValueError("Either agent or converter_cls must be provided")
if not converter:
raise Exception("No output converter found or set.")
return converter

View File

@@ -1,5 +1,5 @@
import json
from typing import Any, List, Type, Union
from typing import Any, List, Type
import regex
from langchain.output_parsers import PydanticOutputParser
@@ -7,29 +7,24 @@ from langchain_core.exceptions import OutputParserException
from langchain_core.outputs import Generation
from langchain_core.pydantic_v1 import ValidationError
from pydantic import BaseModel
from pydantic.v1 import BaseModel as V1BaseModel
class CrewPydanticOutputParser(PydanticOutputParser):
"""Parses the text into pydantic models"""
pydantic_object: Union[Type[BaseModel], Type[V1BaseModel]]
pydantic_object: Type[BaseModel]
def parse_result(self, result: List[Generation], *, partial: bool = False) -> Any:
def parse_result(self, result: List[Generation]) -> Any:
result[0].text = self._transform_in_valid_json(result[0].text)
# Treating edge case of function calling llm returning the name instead of tool_name
json_object = json.loads(result[0].text)
json_object["tool_name"] = (
json_object["name"]
if "tool_name" not in json_object
else json_object["tool_name"]
)
if "tool_name" not in json_object:
json_object["tool_name"] = json_object.get("name", "")
result[0].text = json.dumps(json_object)
json_object = super().parse_result(result)
try:
return self.pydantic_object.parse_obj(json_object)
return self.pydantic_object.model_validate(json_object)
except ValidationError as e:
name = self.pydantic_object.__name__
msg = f"Failed to parse {name} from completion {json_object}. Got: {e}"

View File

@@ -0,0 +1,149 @@
from collections import defaultdict
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from rich.console import Console
from rich.table import Table
from crewai.agent import Agent
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
class TaskEvaluationPydanticOutput(BaseModel):
quality: float = Field(
description="A score from 1 to 10 evaluating on completion, quality, and overall performance from the task_description and task_expected_output to the actual Task Output."
)
class CrewEvaluator:
"""
A class to evaluate the performance of the agents in the crew based on the tasks they have performed.
Attributes:
crew (Crew): The crew of agents to evaluate.
openai_model_name (str): The model to use for evaluating the performance of the agents (for now ONLY OpenAI accepted).
tasks_scores (defaultdict): A dictionary to store the scores of the agents for each task.
iteration (int): The current iteration of the evaluation.
"""
tasks_scores: defaultdict = defaultdict(list)
iteration: int = 0
def __init__(self, crew, openai_model_name: str):
self.crew = crew
self.openai_model_name = openai_model_name
self._setup_for_evaluating()
def _setup_for_evaluating(self) -> None:
"""Sets up the crew for evaluating."""
for task in self.crew.tasks:
task.callback = self.evaluate
def set_iteration(self, iteration: int) -> None:
self.iteration = iteration
def _evaluator_agent(self):
return Agent(
role="Task Execution Evaluator",
goal=(
"Your goal is to evaluate the performance of the agents in the crew based on the tasks they have performed using score from 1 to 10 evaluating on completion, quality, and overall performance."
),
backstory="Evaluator agent for crew evaluation with precise capabilities to evaluate the performance of the agents in the crew based on the tasks they have performed",
verbose=False,
llm=ChatOpenAI(model=self.openai_model_name),
)
def _evaluation_task(
self, evaluator_agent: Agent, task_to_evaluate: Task, task_output: str
) -> Task:
return Task(
description=(
"Based on the task description and the expected output, compare and evaluate the performance of the agents in the crew based on the Task Output they have performed using score from 1 to 10 evaluating on completion, quality, and overall performance."
f"task_description: {task_to_evaluate.description} "
f"task_expected_output: {task_to_evaluate.expected_output} "
f"agent: {task_to_evaluate.agent.role if task_to_evaluate.agent else None} "
f"agent_goal: {task_to_evaluate.agent.goal if task_to_evaluate.agent else None} "
f"Task Output: {task_output}"
),
expected_output="Evaluation Score from 1 to 10 based on the performance of the agents on the tasks",
agent=evaluator_agent,
output_pydantic=TaskEvaluationPydanticOutput,
)
def print_crew_evaluation_result(self) -> None:
"""
Prints the evaluation result of the crew in a table.
A Crew with 2 tasks using the command crewai test -n 2
will output the following table:
Task Scores
(1-10 Higher is better)
┏━━━━━━━━━━━━┳━━━━━━━┳━━━━━━━┳━━━━━━━━━━━━┓
┃ Tasks/Crew ┃ Run 1 ┃ Run 2 ┃ Avg. Total ┃
┡━━━━━━━━━━━━╇━━━━━━━╇━━━━━━━╇━━━━━━━━━━━━┩
│ Task 1 │ 10.0 │ 9.0 │ 9.5 │
│ Task 2 │ 9.0 │ 9.0 │ 9.0 │
│ Crew │ 9.5 │ 9.0 │ 9.2 │
└────────────┴───────┴───────┴────────────┘
"""
task_averages = [
sum(scores) / len(scores) for scores in zip(*self.tasks_scores.values())
]
crew_average = sum(task_averages) / len(task_averages)
# Create a table
table = Table(title="Tasks Scores \n (1-10 Higher is better)")
# Add columns for the table
table.add_column("Tasks/Crew")
for run in range(1, len(self.tasks_scores) + 1):
table.add_column(f"Run {run}")
table.add_column("Avg. Total")
# Add rows for each task
for task_index in range(len(task_averages)):
task_scores = [
self.tasks_scores[run][task_index]
for run in range(1, len(self.tasks_scores) + 1)
]
avg_score = task_averages[task_index]
table.add_row(
f"Task {task_index + 1}", *map(str, task_scores), f"{avg_score:.1f}"
)
# Add a row for the crew average
crew_scores = [
sum(self.tasks_scores[run]) / len(self.tasks_scores[run])
for run in range(1, len(self.tasks_scores) + 1)
]
table.add_row("Crew", *map(str, crew_scores), f"{crew_average:.1f}")
# Display the table in the terminal
console = Console()
console.print(table)
def evaluate(self, task_output: TaskOutput):
"""Evaluates the performance of the agents in the crew based on the tasks they have performed."""
current_task = None
for task in self.crew.tasks:
if task.description == task_output.description:
current_task = task
break
if not current_task or not task_output:
raise ValueError(
"Task to evaluate and task output are required for evaluation"
)
evaluator_agent = self._evaluator_agent()
evaluation_task = self._evaluation_task(
evaluator_agent, current_task, task_output.raw
)
evaluation_result = evaluation_task.execute_sync()
if isinstance(evaluation_result.pydantic, TaskEvaluationPydanticOutput):
self.tasks_scores[self.iteration].append(evaluation_result.pydantic.quality)
else:
raise ValueError("Evaluation result is not in the expected format")

View File

@@ -54,23 +54,23 @@ class TaskEvaluator:
def __init__(self, original_agent):
self.llm = original_agent.llm
def evaluate(self, task, ouput) -> TaskEvaluation:
def evaluate(self, task, output) -> TaskEvaluation:
evaluation_query = (
f"Assess the quality of the task completed based on the description, expected output, and actual results.\n\n"
f"Task Description:\n{task.description}\n\n"
f"Expected Output:\n{task.expected_output}\n\n"
f"Actual Output:\n{ouput}\n\n"
f"Actual Output:\n{output}\n\n"
"Please provide:\n"
"- Bullet points suggestions to improve future similar tasks\n"
"- A score from 0 to 10 evaluating on completion, quality, and overall performance"
"- Entities extracted from the task output, if any, their type, description, and relationships"
)
instructions = "I'm gonna convert this raw text into valid JSON."
instructions = "Convert all responses into valid JSON output."
if not self._is_gpt(self.llm):
model_schema = PydanticSchemaParser(model=TaskEvaluation).get_schema()
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
instructions = f"{instructions}\n\nReturn only valid JSON with the following schema:\n```json\n{model_schema}\n```"
converter = Converter(
llm=self.llm,

View File

@@ -1,17 +1,28 @@
import re
class YamlParser:
@staticmethod
def parse(file):
"""
Parses a YAML file, modifies specific patterns, and checks for unsupported 'context' usage.
Args:
file (file object): The YAML file to parse.
Returns:
str: The modified content of the YAML file.
Raises:
ValueError: If 'context:' is used incorrectly.
"""
content = file.read()
# Replace single { and } with doubled ones, while leaving already doubled ones intact and the other special characters {# and {%
modified_content = re.sub(r"(?<!\{){(?!\{)(?!\#)(?!\%)", "{{", content)
modified_content = re.sub(
r"(?<!\})(?<!\%)(?<!\#)\}(?!})", "}}", modified_content
)
modified_content = re.sub(r"(?<!\})(?<!\%)(?<!\#)\}(?!})", "}}", modified_content)
# Check for 'context:' not followed by '[' and raise an error
if re.search(r"context:(?!\s*\[)", modified_content):
raise ValueError(
"Context is currently only supported in code when creating a task. Please use the 'context' key in the task configuration."
"Context is currently only supported in code when creating a task. "
"Please use the 'context' key in the task configuration."
)
return modified_content

View File

@@ -1,5 +1,6 @@
from typing import List, Optional
from typing import Any, List, Optional
from langchain_openai import ChatOpenAI
from pydantic import BaseModel
from crewai.agent import Agent
@@ -11,17 +12,27 @@ class PlannerTaskPydanticOutput(BaseModel):
class CrewPlanner:
def __init__(self, tasks: List[Task]):
def __init__(self, tasks: List[Task], planning_agent_llm: Optional[Any] = None):
self.tasks = tasks
def _handle_crew_planning(self) -> Optional[BaseModel]:
if planning_agent_llm is None:
self.planning_agent_llm = ChatOpenAI(model="gpt-4o-mini")
else:
self.planning_agent_llm = planning_agent_llm
def _handle_crew_planning(self) -> PlannerTaskPydanticOutput:
"""Handles the Crew planning by creating detailed step-by-step plans for each task."""
planning_agent = self._create_planning_agent()
tasks_summary = self._create_tasks_summary()
planner_task = self._create_planner_task(planning_agent, tasks_summary)
return planner_task.execute_sync().pydantic
result = planner_task.execute_sync()
if isinstance(result.pydantic, PlannerTaskPydanticOutput):
return result.pydantic
raise ValueError("Failed to get the Planning output")
def _create_planning_agent(self) -> Agent:
"""Creates the planning agent for the crew planning."""
@@ -32,6 +43,7 @@ class CrewPlanner:
"available to each agent so that they can perform the tasks in an exemplary manner"
),
backstory="Planner agent for crew planning",
llm=self.planning_agent_llm,
)
def _create_planner_task(self, planning_agent: Agent, tasks_summary: str) -> Task:

View File

@@ -16,11 +16,13 @@ class PydanticSchemaParser(BaseModel):
return self._get_model_schema(self.model)
def _get_model_schema(self, model, depth=0) -> str:
lines = []
indent = " " * depth
lines = [f"{indent}{{"]
for field_name, field in model.model_fields.items():
field_type_str = self._get_field_type(field, depth + 1)
lines.append(f"{' ' * 4 * depth}- {field_name}: {field_type_str}")
lines.append(f"{indent} {field_name}: {field_type_str},")
lines[-1] = lines[-1].rstrip(",") # Remove trailing comma from last item
lines.append(f"{indent}}}")
return "\n".join(lines)
def _get_field_type(self, field, depth) -> str:
@@ -35,6 +37,6 @@ class PydanticSchemaParser(BaseModel):
else:
return f"List[{list_item_type.__name__}]"
elif issubclass(field_type, BaseModel):
return f"\n{self._get_model_schema(field_type, depth)}"
return self._get_model_schema(field_type, depth)
else:
return field_type.__name__