Compare commits

..

1 Commits

Author SHA1 Message Date
theCyberTech
d0fc50eab5 Added new CLI functionality: docs generator. Updated cli.py and added doc_generator.py 2024-07-24 22:56:57 +08:00
19 changed files with 302 additions and 26242 deletions

View File

@@ -33,7 +33,6 @@ A crew in crewAI represents a collaborative group of agents working together to
| **Manager Callbacks** _(optional)_ | `manager_callbacks` | `manager_callbacks` takes a list of callback handlers to be executed by the manager agent when a hierarchical process is used. |
| **Prompt File** _(optional)_ | `prompt_file` | Path to the prompt JSON file to be used for the crew. |
| **Planning** *(optional)* | `planning` | Adds planning ability to the Crew. When activated before each Crew iteration, all Crew data is sent to an AgentPlanner that will plan the tasks and this plan will be added to each task description.
| **Planning LLM** *(optional)* | `planning_llm` | The language model used by the AgentPlanner in a planning process. |
!!! note "Crew Max RPM"
The `max_rpm` attribute sets the maximum number of requests per minute the crew can perform to avoid rate limits and will override individual agents' `max_rpm` settings if you set it.

View File

@@ -23,25 +23,6 @@ my_crew = Crew(
From this point on, your crew will have planning enabled, and the tasks will be planned before each iteration.
#### Planning LLM
Now you can define the LLM that will be used to plan the tasks. You can use any ChatOpenAI LLM model available.
```python
from crewai import Crew, Agent, Task, Process
from langchain_openai import ChatOpenAI
# Assemble your crew with planning capabilities and custom LLM
my_crew = Crew(
agents=self.agents,
tasks=self.tasks,
process=Process.sequential,
planning=True,
planning_llm=ChatOpenAI(model="gpt-4o")
)
```
### Example
When running the base case example, you will see something like the following output, which represents the output of the AgentPlanner responsible for creating the step-by-step logic to add to the Agents tasks.

View File

@@ -1,41 +0,0 @@
---
title: crewAI Testing
description: Learn how to test your crewAI Crew and evaluate their performance.
---
## Introduction
Testing is a crucial part of the development process, and it is essential to ensure that your crew is performing as expected. And with crewAI, you can easily test your crew and evaluate its performance using the built-in testing capabilities.
### Using the Testing Feature
We added the CLI command `crewai test` to make it easy to test your crew. This command will run your crew for a specified number of iterations and provide detailed performance metrics.
The parameters are `n_iterations` and `model` which are optional and default to 2 and `gpt-4o-mini` respectively. For now the only provider available is OpenAI.
```bash
crewai test
```
If you want to run more iterations or use a different model, you can specify the parameters like this:
```bash
crewai test --n_iterations 5 --model gpt-4o
```
What happens when you run the `crewai test` command is that the crew will be executed for the specified number of iterations, and the performance metrics will be displayed at the end of the run.
A table of scores at the end will show the performance of the crew in terms of the following metrics:
```
Task Scores
(1-10 Higher is better)
┏━━━━━━━━━━━━┳━━━━━━━┳━━━━━━━┳━━━━━━━━━━━━┓
┃ Tasks/Crew ┃ Run 1 ┃ Run 2 ┃ Avg. Total ┃
┡━━━━━━━━━━━━╇━━━━━━━╇━━━━━━━╇━━━━━━━━━━━━┩
│ Task 1 │ 10.0 │ 9.0 │ 9.5 │
│ Task 2 │ 9.0 │ 9.0 │ 9.0 │
│ Crew │ 9.5 │ 9.0 │ 9.2 │
└────────────┴───────┴───────┴────────────┘
```
The example above shows the test results for two runs of the crew with two tasks, with the average total score for each task and the crew as a whole.

View File

@@ -129,7 +129,6 @@ nav:
- Training: 'core-concepts/Training-Crew.md'
- Memory: 'core-concepts/Memory.md'
- Planning: 'core-concepts/Planning.md'
- Testing: 'core-concepts/Testing.md'
- Using LangChain Tools: 'core-concepts/Using-LangChain-Tools.md'
- Using LlamaIndex Tools: 'core-concepts/Using-LlamaIndex-Tools.md'
- How to Guides:

View File

@@ -10,6 +10,7 @@ from .replay_from_task import replay_task_command
from .reset_memories_command import reset_memories_command
from .test_crew import test_crew
from .train_crew import train_crew
from .doc_generator import generate_documentation
@click.group()
@@ -146,6 +147,18 @@ def test(n_iterations: int, model: str):
click.echo(f"Testing the crew for {n_iterations} iterations with model {model}")
test_crew(n_iterations, model)
@crewai.command()
@click.option('--output', '-o', default='crew_documentation.md', help='Output file for the documentation')
@click.option('--format', '-f', default='markdown', help='Output format')
def generate_docs(output, format):
"""Generate documentation for the current project setup."""
try:
click.echo(f"Generating documentation in {format} format...")
generate_documentation(output, format)
click.echo(f"Documentation generated and saved to {output}")
except ValueError as e:
click.echo(f"Error: {str(e)}", err=True)
click.echo("Please ensure you are in the root directory of your CrewAI project.")
if __name__ == "__main__":
crewai()

View File

@@ -0,0 +1,204 @@
import os
import yaml
import logging
def is_project_root():
"""
Check if the current directory is the root of a CrewAI project.
Returns:
bool: True if in project root, False otherwise.
"""
# Check for key indicators of a CrewAI project root
indicators = ["pyproject.toml", "poetry.lock", "src"]
return all(os.path.exists(indicator) for indicator in indicators)
def generate_documentation(output_file, format):
"""
Generate documentation for the current CrewAI project setup.
Args:
output_file (str): The path and filename where the generated documentation
will be saved.
format (str): The desired output format for the documentation.
Supported values currently 'markdown'.
Returns:
None: The function writes the generated documentation to the specified
output file and doesn't return any value.
Raises:
ValueError: If not in the project root or if an unsupported output format is specified.
"""
if not is_project_root():
raise ValueError(
"Not in the root of a CrewAI project."
)
# Load the current project configuration
config = load_crew_configuration()
if config is None:
logging.error("Failed to load crew configuration. Exiting.")
return
if format == "markdown":
content = generate_markdown(config)
else:
raise ValueError(f"Unsupported output format: {format}")
with open(output_file, "w") as f:
f.write(content)
logging.info(f"Documentation generated and saved to {output_file}")
def find_config_dir():
"""
Find the configuration directory based on the project structure.
This function attempts to locate the configuration directory for a CrewAI project
by assuming a standard project structure. It starts from the current working
directory and constructs an expected path to the config directory.
Returns:
str or None: The path to the configuration directory if found, None otherwise.
The function performs the following steps:
1. Gets the current working directory.
2. Extracts the project name from the current directory path.
3. Constructs the expected config path using the project structure convention.
4. Checks if the expected config directory exists.
5. Returns the path if found, or None if not found.
Logging:
- Logs debug information about the search process.
- Logs the starting directory, the checked path, and the result of the search.
Note:
This function assumes a specific project structure where the config
directory is located at 'src/<project_name>/config' relative to the
project root.
"""
current_dir = os.getcwd()
logging.debug(f"Starting search from: {current_dir}")
# Split the path to get the project name
path_parts = current_dir.split(os.path.sep)
project_name = path_parts[-1]
# Construct the expected config path
expected_config_path = os.path.join(current_dir, "src", project_name, "config")
logging.debug(f"Checking for config directory: {expected_config_path}")
if os.path.isdir(expected_config_path):
logging.debug(f"Found config directory: {expected_config_path}")
return expected_config_path
logging.debug("Config directory not found in the expected location")
return None
def load_crew_configuration():
"""
Load the crew configuration from YAML files.
This function attempts to find the configuration directory and load the agents
and tasks configurations from their respective YAML files.
Returns:
dict or None: A dictionary containing 'agents' and 'tasks' configurations
if successful, None if there was an error.
The function performs the following steps:
1. Finds the configuration directory using find_config_dir().
2. Constructs paths to agents.yaml and tasks.yaml files.
3. Checks if both files exist.
4. Loads and parses the YAML content of both files.
5. Returns a dictionary with the parsed configurations.
Logging:
- Logs an error if the configuration directory is not found.
- Logs an error if either agents.yaml or tasks.yaml is not found.
Note:
This function assumes that the configuration files are named 'agents.yaml'
and 'tasks.yaml' and are located in the directory returned by find_config_dir().
"""
config_dir = find_config_dir()
if not config_dir:
logging.error(
"Configuration directory not found. Make sure you're in the root of your CrewAI project."
)
return None
agents_file = os.path.join(config_dir, "agents.yaml")
tasks_file = os.path.join(config_dir, "tasks.yaml")
if not os.path.exists(agents_file) or not os.path.exists(tasks_file):
logging.error(f"agents.yaml or tasks.yaml not found in {config_dir}")
return None
with open(agents_file, "r") as f:
agents_config = yaml.safe_load(f)
with open(tasks_file, "r") as f:
tasks_config = yaml.safe_load(f)
return {"agents": agents_config, "tasks": tasks_config}
def generate_markdown(config):
"""
Generate Markdown documentation for the CrewAI project configuration.
This function takes the parsed configuration dictionary and generates
a formatted Markdown string containing documentation for the project's
agents and tasks.
Args:
config (dict): A dictionary containing the parsed configuration
with 'agents' and 'tasks' keys.
Returns:
str: A formatted Markdown string containing the project documentation.
If the input config is None, it returns an error message.
The generated Markdown includes:
1. A title for the project documentation.
2. A section for Agents, listing each agent's name, role, goal, and backstory.
3. A section for Tasks, listing each task's name, description, expected output,
and assigned agent.
Each piece of information is wrapped in code blocks for better readability
in rendered Markdown.
Note:
This function assumes that the config dictionary has the correct structure
with 'agents' and 'tasks' keys, each containing nested dictionaries of
agent and task information respectively.
"""
if config is None:
return "# Error: No crew configuration available"
md = "# CrewAI Project Documentation\n\n"
md += "## Agents\n\n"
for agent_name, agent_data in config["agents"].items():
md += f"### \n```\n{agent_name}\n```\n"
md += f"Role: \n```\n{agent_data.get('role', 'Not specified')}\n```\n"
md += f"Goal: \n```\n{agent_data.get('goal', 'Not specified')}\n```\n"
md += f"Backstory: \n```\n{agent_data.get('backstory', 'Not specified')}\n```\n"
md += f""
md += "## Tasks\n\n"
for task_name, task_data in config["tasks"].items():
md += f"### {task_name}\n"
md += f"Description: \n```\n{task_data.get('description', 'Not specified')}\n```\n"
md += f"Expected Output: \n```\n{task_data.get('expected_output', 'Not specified')}\n```\n"
md += f"Assigned Agent: \n```\n{task_data.get('agent', 'Not assigned')}\n```\n"
return md

View File

@@ -48,7 +48,7 @@ def test():
"topic": "AI LLMs"
}
try:
{{crew_name}}Crew().crew().test(n_iterations=int(sys.argv[1]), openai_model_name=sys.argv[2], inputs=inputs)
{{crew_name}}Crew().crew().test(n_iterations=int(sys.argv[1]), model=sys.argv[2], inputs=inputs)
except Exception as e:
raise Exception(f"An error occurred while replaying the crew: {e}")

View File

@@ -37,7 +37,6 @@ from crewai.utilities.constants import (
TRAINED_AGENTS_DATA_FILE,
TRAINING_DATA_FILE,
)
from crewai.utilities.evaluators.crew_evaluator_handler import CrewEvaluator
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs,
@@ -155,10 +154,6 @@ class Crew(BaseModel):
default=False,
description="Plan the crew execution and add the plan to the crew.",
)
planning_llm: Optional[Any] = Field(
default=None,
description="Language model that will run the AgentPlanner if planning is True.",
)
task_execution_output_json_files: Optional[List[str]] = Field(
default=None,
description="List of file paths for task execution JSON files.",
@@ -271,6 +266,20 @@ class Crew(BaseModel):
return self
@model_validator(mode="after")
def check_tasks_in_hierarchical_process_not_async(self):
"""Validates that the tasks in hierarchical process are not flagged with async_execution."""
if self.process == Process.hierarchical:
for task in self.tasks:
if task.async_execution:
raise PydanticCustomError(
"async_execution_in_hierarchical_process",
"Hierarchical process error: Tasks cannot be flagged with async_execution.",
{},
)
return self
@model_validator(mode="after")
def validate_end_with_at_most_one_async_task(self):
"""Validates that the crew ends with at most one asynchronous task."""
@@ -550,12 +559,15 @@ class Crew(BaseModel):
def _handle_crew_planning(self):
"""Handles the Crew planning."""
self._logger.log("info", "Planning the crew execution")
result = CrewPlanner(
tasks=self.tasks, planning_agent_llm=self.planning_llm
)._handle_crew_planning()
result = CrewPlanner(self.tasks)._handle_crew_planning()
for task, step_plan in zip(self.tasks, result.list_of_plans_per_task):
task.description += step_plan
if result is not None and hasattr(result, "list_of_plans_per_task"):
for task, step_plan in zip(self.tasks, result.list_of_plans_per_task):
task.description += step_plan
else:
self._logger.log(
"info", "Something went wrong with the planning process of the Crew"
)
def _store_execution_log(
self,
@@ -593,7 +605,7 @@ class Crew(BaseModel):
def _run_hierarchical_process(self) -> CrewOutput:
"""Creates and assigns a manager agent to make sure the crew completes the tasks."""
self._create_manager_agent()
return self._execute_tasks(self.tasks)
return self._execute_tasks(self.tasks, self.manager_agent)
def _create_manager_agent(self):
i18n = I18N(prompt_file=self.prompt_file)
@@ -617,6 +629,7 @@ class Crew(BaseModel):
def _execute_tasks(
self,
tasks: List[Task],
manager: Optional[BaseAgent] = None,
start_index: Optional[int] = 0,
was_replayed: bool = False,
) -> CrewOutput:
@@ -644,13 +657,13 @@ class Crew(BaseModel):
last_sync_output = task.output
continue
agent_to_use = self._get_agent_to_use(task)
agent_to_use = self._get_agent_to_use(task, manager)
if agent_to_use is None:
raise ValueError(
f"No agent available for task: {task.description}. Ensure that either the task has an assigned agent or a manager agent is provided."
)
self._prepare_agent_tools(task)
self._prepare_agent_tools(task, manager)
self._log_task_start(task, agent_to_use.role)
if isinstance(task, ConditionalTask):
@@ -716,18 +729,20 @@ class Crew(BaseModel):
return skipped_task_output
return None
def _prepare_agent_tools(self, task: Task):
def _prepare_agent_tools(self, task: Task, manager: Optional[BaseAgent]):
if self.process == Process.hierarchical:
if self.manager_agent:
self._update_manager_tools(task)
if manager:
self._update_manager_tools(task, manager)
else:
raise ValueError("Manager agent is required for hierarchical process.")
elif task.agent and task.agent.allow_delegation:
self._add_delegation_tools(task)
def _get_agent_to_use(self, task: Task) -> Optional[BaseAgent]:
def _get_agent_to_use(
self, task: Task, manager: Optional[BaseAgent]
) -> Optional[BaseAgent]:
if self.process == Process.hierarchical:
return self.manager_agent
return manager
return task.agent
def _add_delegation_tools(self, task: Task):
@@ -763,14 +778,11 @@ class Crew(BaseModel):
if self.output_log_file:
self._file_handler.log(agent=role, task=task.description, status="started")
def _update_manager_tools(self, task: Task):
if self.manager_agent:
if task.agent:
self.manager_agent.tools = task.agent.get_delegation_tools([task.agent])
else:
self.manager_agent.tools = self.manager_agent.get_delegation_tools(
self.agents
)
def _update_manager_tools(self, task: Task, manager: BaseAgent):
if task.agent:
manager.tools = task.agent.get_delegation_tools([task.agent])
else:
manager.tools = manager.get_delegation_tools(self.agents)
def _get_context(self, task: Task, task_outputs: List[TaskOutput]):
context = (
@@ -869,7 +881,7 @@ class Crew(BaseModel):
self.tasks[i].output = task_output
self._logging_color = "bold_blue"
result = self._execute_tasks(self.tasks, start_index, True)
result = self._execute_tasks(self.tasks, self.manager_agent, start_index, True)
return result
def copy(self):
@@ -955,19 +967,10 @@ class Crew(BaseModel):
return total_usage_metrics
def test(
self,
n_iterations: int,
openai_model_name: str,
inputs: Optional[Dict[str, Any]] = None,
self, n_iterations: int, model: str, inputs: Optional[Dict[str, Any]] = None
) -> None:
"""Test and evaluate the Crew with the given inputs for n iterations."""
evaluator = CrewEvaluator(self, openai_model_name)
for i in range(1, n_iterations + 1):
evaluator.set_iteration(i)
self.kickoff(inputs=inputs)
evaluator.print_crew_evaluation_result()
"""Test the crew with the given inputs."""
pass
def __repr__(self):
return f"Crew(id={self.id}, process={self.process}, number_of_agents={len(self.agents)}, number_of_tasks={len(self.tasks)})"

View File

@@ -1,5 +1,5 @@
import json
from typing import Any, List, Type
from typing import Any, List, Type, Union
import regex
from langchain.output_parsers import PydanticOutputParser
@@ -7,24 +7,29 @@ from langchain_core.exceptions import OutputParserException
from langchain_core.outputs import Generation
from langchain_core.pydantic_v1 import ValidationError
from pydantic import BaseModel
from pydantic.v1 import BaseModel as V1BaseModel
class CrewPydanticOutputParser(PydanticOutputParser):
"""Parses the text into pydantic models"""
pydantic_object: Type[BaseModel]
pydantic_object: Union[Type[BaseModel], Type[V1BaseModel]]
def parse_result(self, result: List[Generation]) -> Any:
def parse_result(self, result: List[Generation], *, partial: bool = False) -> Any:
result[0].text = self._transform_in_valid_json(result[0].text)
# Treating edge case of function calling llm returning the name instead of tool_name
json_object = json.loads(result[0].text)
if "tool_name" not in json_object:
json_object["tool_name"] = json_object.get("name", "")
json_object["tool_name"] = (
json_object["name"]
if "tool_name" not in json_object
else json_object["tool_name"]
)
result[0].text = json.dumps(json_object)
json_object = super().parse_result(result)
try:
return self.pydantic_object.model_validate(json_object)
return self.pydantic_object.parse_obj(json_object)
except ValidationError as e:
name = self.pydantic_object.__name__
msg = f"Failed to parse {name} from completion {json_object}. Got: {e}"

View File

@@ -1,149 +0,0 @@
from collections import defaultdict
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field
from rich.console import Console
from rich.table import Table
from crewai.agent import Agent
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
class TaskEvaluationPydanticOutput(BaseModel):
quality: float = Field(
description="A score from 1 to 10 evaluating on completion, quality, and overall performance from the task_description and task_expected_output to the actual Task Output."
)
class CrewEvaluator:
"""
A class to evaluate the performance of the agents in the crew based on the tasks they have performed.
Attributes:
crew (Crew): The crew of agents to evaluate.
openai_model_name (str): The model to use for evaluating the performance of the agents (for now ONLY OpenAI accepted).
tasks_scores (defaultdict): A dictionary to store the scores of the agents for each task.
iteration (int): The current iteration of the evaluation.
"""
tasks_scores: defaultdict = defaultdict(list)
iteration: int = 0
def __init__(self, crew, openai_model_name: str):
self.crew = crew
self.openai_model_name = openai_model_name
self._setup_for_evaluating()
def _setup_for_evaluating(self) -> None:
"""Sets up the crew for evaluating."""
for task in self.crew.tasks:
task.callback = self.evaluate
def set_iteration(self, iteration: int) -> None:
self.iteration = iteration
def _evaluator_agent(self):
return Agent(
role="Task Execution Evaluator",
goal=(
"Your goal is to evaluate the performance of the agents in the crew based on the tasks they have performed using score from 1 to 10 evaluating on completion, quality, and overall performance."
),
backstory="Evaluator agent for crew evaluation with precise capabilities to evaluate the performance of the agents in the crew based on the tasks they have performed",
verbose=False,
llm=ChatOpenAI(model=self.openai_model_name),
)
def _evaluation_task(
self, evaluator_agent: Agent, task_to_evaluate: Task, task_output: str
) -> Task:
return Task(
description=(
"Based on the task description and the expected output, compare and evaluate the performance of the agents in the crew based on the Task Output they have performed using score from 1 to 10 evaluating on completion, quality, and overall performance."
f"task_description: {task_to_evaluate.description} "
f"task_expected_output: {task_to_evaluate.expected_output} "
f"agent: {task_to_evaluate.agent.role if task_to_evaluate.agent else None} "
f"agent_goal: {task_to_evaluate.agent.goal if task_to_evaluate.agent else None} "
f"Task Output: {task_output}"
),
expected_output="Evaluation Score from 1 to 10 based on the performance of the agents on the tasks",
agent=evaluator_agent,
output_pydantic=TaskEvaluationPydanticOutput,
)
def print_crew_evaluation_result(self) -> None:
"""
Prints the evaluation result of the crew in a table.
A Crew with 2 tasks using the command crewai test -n 2
will output the following table:
Task Scores
(1-10 Higher is better)
┏━━━━━━━━━━━━┳━━━━━━━┳━━━━━━━┳━━━━━━━━━━━━┓
┃ Tasks/Crew ┃ Run 1 ┃ Run 2 ┃ Avg. Total ┃
┡━━━━━━━━━━━━╇━━━━━━━╇━━━━━━━╇━━━━━━━━━━━━┩
│ Task 1 │ 10.0 │ 9.0 │ 9.5 │
│ Task 2 │ 9.0 │ 9.0 │ 9.0 │
│ Crew │ 9.5 │ 9.0 │ 9.2 │
└────────────┴───────┴───────┴────────────┘
"""
task_averages = [
sum(scores) / len(scores) for scores in zip(*self.tasks_scores.values())
]
crew_average = sum(task_averages) / len(task_averages)
# Create a table
table = Table(title="Tasks Scores \n (1-10 Higher is better)")
# Add columns for the table
table.add_column("Tasks/Crew")
for run in range(1, len(self.tasks_scores) + 1):
table.add_column(f"Run {run}")
table.add_column("Avg. Total")
# Add rows for each task
for task_index in range(len(task_averages)):
task_scores = [
self.tasks_scores[run][task_index]
for run in range(1, len(self.tasks_scores) + 1)
]
avg_score = task_averages[task_index]
table.add_row(
f"Task {task_index + 1}", *map(str, task_scores), f"{avg_score:.1f}"
)
# Add a row for the crew average
crew_scores = [
sum(self.tasks_scores[run]) / len(self.tasks_scores[run])
for run in range(1, len(self.tasks_scores) + 1)
]
table.add_row("Crew", *map(str, crew_scores), f"{crew_average:.1f}")
# Display the table in the terminal
console = Console()
console.print(table)
def evaluate(self, task_output: TaskOutput):
"""Evaluates the performance of the agents in the crew based on the tasks they have performed."""
current_task = None
for task in self.crew.tasks:
if task.description == task_output.description:
current_task = task
break
if not current_task or not task_output:
raise ValueError(
"Task to evaluate and task output are required for evaluation"
)
evaluator_agent = self._evaluator_agent()
evaluation_task = self._evaluation_task(
evaluator_agent, current_task, task_output.raw
)
evaluation_result = evaluation_task.execute_sync()
if isinstance(evaluation_result.pydantic, TaskEvaluationPydanticOutput):
self.tasks_scores[self.iteration].append(evaluation_result.pydantic.quality)
else:
raise ValueError("Evaluation result is not in the expected format")

View File

@@ -66,11 +66,11 @@ class TaskEvaluator:
"- Entities extracted from the task output, if any, their type, description, and relationships"
)
instructions = "Convert all responses into valid JSON output."
instructions = "I'm gonna convert this raw text into valid JSON."
if not self._is_gpt(self.llm):
model_schema = PydanticSchemaParser(model=TaskEvaluation).get_schema()
instructions = f"{instructions}\n\nReturn only valid JSON with the following schema:\n```json\n{model_schema}\n```"
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
converter = Converter(
llm=self.llm,

View File

@@ -1,6 +1,5 @@
from typing import Any, List, Optional
from typing import List, Optional
from langchain_openai import ChatOpenAI
from pydantic import BaseModel
from crewai.agent import Agent
@@ -12,27 +11,17 @@ class PlannerTaskPydanticOutput(BaseModel):
class CrewPlanner:
def __init__(self, tasks: List[Task], planning_agent_llm: Optional[Any] = None):
def __init__(self, tasks: List[Task]):
self.tasks = tasks
if planning_agent_llm is None:
self.planning_agent_llm = ChatOpenAI(model="gpt-4o-mini")
else:
self.planning_agent_llm = planning_agent_llm
def _handle_crew_planning(self) -> PlannerTaskPydanticOutput:
def _handle_crew_planning(self) -> Optional[BaseModel]:
"""Handles the Crew planning by creating detailed step-by-step plans for each task."""
planning_agent = self._create_planning_agent()
tasks_summary = self._create_tasks_summary()
planner_task = self._create_planner_task(planning_agent, tasks_summary)
result = planner_task.execute_sync()
if isinstance(result.pydantic, PlannerTaskPydanticOutput):
return result.pydantic
raise ValueError("Failed to get the Planning output")
return planner_task.execute_sync().pydantic
def _create_planning_agent(self) -> Agent:
"""Creates the planning agent for the crew planning."""
@@ -43,7 +32,6 @@ class CrewPlanner:
"available to each agent so that they can perform the tasks in an exemplary manner"
),
backstory="Planner agent for crew planning",
llm=self.planning_agent_llm,
)
def _create_planner_task(self, planning_agent: Agent, tasks_summary: str) -> Task:

View File

@@ -16,13 +16,11 @@ class PydanticSchemaParser(BaseModel):
return self._get_model_schema(self.model)
def _get_model_schema(self, model, depth=0) -> str:
indent = " " * depth
lines = [f"{indent}{{"]
lines = []
for field_name, field in model.model_fields.items():
field_type_str = self._get_field_type(field, depth + 1)
lines.append(f"{indent} {field_name}: {field_type_str},")
lines[-1] = lines[-1].rstrip(",") # Remove trailing comma from last item
lines.append(f"{indent}}}")
lines.append(f"{' ' * 4 * depth}- {field_name}: {field_type_str}")
return "\n".join(lines)
def _get_field_type(self, field, depth) -> str:
@@ -37,6 +35,6 @@ class PydanticSchemaParser(BaseModel):
else:
return f"List[{list_item_type.__name__}]"
elif issubclass(field_type, BaseModel):
return self._get_model_schema(field_type, depth)
return f"\n{self._get_model_schema(field_type, depth)}"
else:
return field_type.__name__

View File

@@ -8,7 +8,6 @@ from unittest.mock import MagicMock, patch
import pydantic_core
import pytest
from crewai.agent import Agent
from crewai.agents.cache import CacheHandler
from crewai.crew import Crew
@@ -1356,66 +1355,28 @@ def test_hierarchical_crew_creation_tasks_with_agents():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_hierarchical_crew_creation_tasks_with_async_execution():
"""
Agents are not required for tasks in a hierarchical process but sometimes they are still added
This test makes sure that the manager still delegates the task to the agent even if the agent is passed in the task
"""
from langchain_openai import ChatOpenAI
task = Task(
description="Write one amazing paragraph about AI.",
expected_output="A single paragraph with 4 sentences.",
agent=writer,
async_execution=True,
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
expected_output="5 bullet points with a paragraph for each idea.",
async_execution=True, # should throw an error
)
crew = Crew(
tasks=[task],
agents=[writer, researcher, ceo],
process=Process.hierarchical,
manager_llm=ChatOpenAI(model="gpt-4o"),
with pytest.raises(pydantic_core._pydantic_core.ValidationError) as exec_info:
Crew(
tasks=[task],
agents=[researcher],
process=Process.hierarchical,
manager_llm=ChatOpenAI(model="gpt-4o"),
)
assert (
exec_info.value.errors()[0]["type"] == "async_execution_in_hierarchical_process"
)
crew.kickoff()
assert crew.manager_agent is not None
assert crew.manager_agent.tools is not None
assert crew.manager_agent.tools[0].description.startswith(
"Delegate a specific task to one of the following coworkers: Senior Writer\n"
)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_hierarchical_crew_creation_tasks_with_sync_last():
"""
Agents are not required for tasks in a hierarchical process but sometimes they are still added
This test makes sure that the manager still delegates the task to the agent even if the agent is passed in the task
"""
from langchain_openai import ChatOpenAI
task = Task(
description="Write one amazing paragraph about AI.",
expected_output="A single paragraph with 4 sentences.",
agent=writer,
async_execution=True,
)
task2 = Task(
description="Write one amazing paragraph about AI.",
expected_output="A single paragraph with 4 sentences.",
async_execution=False,
)
crew = Crew(
tasks=[task, task2],
agents=[writer, researcher, ceo],
process=Process.hierarchical,
manager_llm=ChatOpenAI(model="gpt-4o"),
)
crew.kickoff()
assert crew.manager_agent is not None
assert crew.manager_agent.tools is not None
assert crew.manager_agent.tools[0].description.startswith(
"Delegate a specific task to one of the following coworkers: Senior Writer, Researcher, CEO\n"
assert (
"Hierarchical process error: Tasks cannot be flagged with async_execution."
in exec_info.value.errors()[0]["msg"]
)
@@ -2538,34 +2499,3 @@ def test_conditional_should_execute():
assert condition_mock.call_count == 1
assert condition_mock() is True
assert mock_execute_sync.call_count == 2
@mock.patch("crewai.crew.CrewEvaluator")
@mock.patch("crewai.crew.Crew.kickoff")
def test_crew_testing_function(mock_kickoff, crew_evaluator):
task = Task(
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
expected_output="5 bullet points with a paragraph for each idea.",
agent=researcher,
)
crew = Crew(
agents=[researcher],
tasks=[task],
)
n_iterations = 2
crew.test(n_iterations, openai_model_name="gpt-4o-mini", inputs={"topic": "AI"})
assert len(mock_kickoff.mock_calls) == n_iterations
mock_kickoff.assert_has_calls(
[mock.call(inputs={"topic": "AI"}), mock.call(inputs={"topic": "AI"})]
)
crew_evaluator.assert_has_calls(
[
mock.call(crew, "gpt-4o-mini"),
mock.call().set_iteration(1),
mock.call().set_iteration(2),
mock.call().print_crew_evaluation_result(),
]
)

View File

@@ -1,113 +0,0 @@
from unittest import mock
import pytest
from crewai.agent import Agent
from crewai.crew import Crew
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.evaluators.crew_evaluator_handler import (
CrewEvaluator,
TaskEvaluationPydanticOutput,
)
class TestCrewEvaluator:
@pytest.fixture
def crew_planner(self):
agent = Agent(role="Agent 1", goal="Goal 1", backstory="Backstory 1")
task = Task(
description="Task 1",
expected_output="Output 1",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task])
return CrewEvaluator(crew, openai_model_name="gpt-4o-mini")
def test_setup_for_evaluating(self, crew_planner):
crew_planner._setup_for_evaluating()
assert crew_planner.crew.tasks[0].callback == crew_planner.evaluate
def test_set_iteration(self, crew_planner):
crew_planner.set_iteration(1)
assert crew_planner.iteration == 1
def test_evaluator_agent(self, crew_planner):
agent = crew_planner._evaluator_agent()
assert agent.role == "Task Execution Evaluator"
assert (
agent.goal
== "Your goal is to evaluate the performance of the agents in the crew based on the tasks they have performed using score from 1 to 10 evaluating on completion, quality, and overall performance."
)
assert (
agent.backstory
== "Evaluator agent for crew evaluation with precise capabilities to evaluate the performance of the agents in the crew based on the tasks they have performed"
)
assert agent.verbose is False
assert agent.llm.model_name == "gpt-4o-mini"
def test_evaluation_task(self, crew_planner):
evaluator_agent = Agent(
role="Evaluator Agent",
goal="Evaluate the performance of the agents in the crew",
backstory="Master in Evaluation",
)
task_to_evaluate = Task(
description="Task 1",
expected_output="Output 1",
agent=Agent(role="Agent 1", goal="Goal 1", backstory="Backstory 1"),
)
task_output = "Task Output 1"
task = crew_planner._evaluation_task(
evaluator_agent, task_to_evaluate, task_output
)
assert task.description.startswith(
"Based on the task description and the expected output, compare and evaluate the performance of the agents in the crew based on the Task Output they have performed using score from 1 to 10 evaluating on completion, quality, and overall performance."
)
assert task.agent == evaluator_agent
assert (
task.description
== "Based on the task description and the expected output, compare and evaluate "
"the performance of the agents in the crew based on the Task Output they have "
"performed using score from 1 to 10 evaluating on completion, quality, and overall "
"performance.task_description: Task 1 task_expected_output: Output 1 "
"agent: Agent 1 agent_goal: Goal 1 Task Output: Task Output 1"
)
@mock.patch("crewai.utilities.evaluators.crew_evaluator_handler.Console")
@mock.patch("crewai.utilities.evaluators.crew_evaluator_handler.Table")
def test_print_crew_evaluation_result(self, table, console, crew_planner):
crew_planner.tasks_scores = {
1: [10, 9, 8],
2: [9, 8, 7],
}
crew_planner.print_crew_evaluation_result()
table.assert_has_calls(
[
mock.call(title="Tasks Scores \n (1-10 Higher is better)"),
mock.call().add_column("Tasks/Crew"),
mock.call().add_column("Run 1"),
mock.call().add_column("Run 2"),
mock.call().add_column("Avg. Total"),
mock.call().add_row("Task 1", "10", "9", "9.5"),
mock.call().add_row("Task 2", "9", "8", "8.5"),
mock.call().add_row("Task 3", "8", "7", "7.5"),
mock.call().add_row("Crew", "9.0", "8.0", "8.5"),
]
)
console.assert_has_calls([mock.call(), mock.call().print(table())])
def test_evaluate(self, crew_planner):
task_output = TaskOutput(
description="Task 1", agent=str(crew_planner.crew.agents[0])
)
with mock.patch.object(Task, "execute_sync") as execute:
execute().pydantic = TaskEvaluationPydanticOutput(quality=9.5)
crew_planner.evaluate(task_output)
assert crew_planner.tasks_scores[0] == [9.5]

View File

@@ -56,7 +56,8 @@ def test_evaluate_training_data(converter_mock):
"based on the human feedback\n",
model=TrainingTaskEvaluation,
instructions="I'm gonna convert this raw text into valid JSON.\n\nThe json should have the "
"following structure, with the following keys:\n{\n suggestions: List[str],\n quality: float,\n final_summary: str\n}",
"following structure, with the following keys:\n- suggestions: List[str]\n- "
"quality: float\n- final_summary: str",
),
mock.call().to_pydantic(),
]

View File

@@ -1,11 +1,10 @@
from unittest.mock import patch
from crewai.tasks.task_output import TaskOutput
import pytest
from langchain_openai import ChatOpenAI
from crewai.agent import Agent
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.planning_handler import CrewPlanner, PlannerTaskPydanticOutput
@@ -29,19 +28,7 @@ class TestCrewPlanner:
agent=Agent(role="Agent 3", goal="Goal 3", backstory="Backstory 3"),
),
]
return CrewPlanner(tasks, None)
@pytest.fixture
def crew_planner_different_llm(self):
tasks = [
Task(
description="Task 1",
expected_output="Output 1",
agent=Agent(role="Agent 1", goal="Goal 1", backstory="Backstory 1"),
)
]
planning_agent_llm = ChatOpenAI(model="gpt-3.5-turbo")
return CrewPlanner(tasks, planning_agent_llm)
return CrewPlanner(tasks)
def test_handle_crew_planning(self, crew_planner):
with patch.object(Task, "execute_sync") as execute:
@@ -53,7 +40,7 @@ class TestCrewPlanner:
),
)
result = crew_planner._handle_crew_planning()
assert crew_planner.planning_agent_llm.model_name == "gpt-4o-mini"
assert isinstance(result, PlannerTaskPydanticOutput)
assert len(result.list_of_plans_per_task) == len(crew_planner.tasks)
execute.assert_called_once()
@@ -85,22 +72,3 @@ class TestCrewPlanner:
assert isinstance(tasks_summary, str)
assert tasks_summary.startswith("\n Task Number 1 - Task 1")
assert tasks_summary.endswith('"agent_tools": []\n ')
def test_handle_crew_planning_different_llm(self, crew_planner_different_llm):
with patch.object(Task, "execute_sync") as execute:
execute.return_value = TaskOutput(
description="Description",
agent="agent",
pydantic=PlannerTaskPydanticOutput(list_of_plans_per_task=["Plan 1"]),
)
result = crew_planner_different_llm._handle_crew_planning()
assert (
crew_planner_different_llm.planning_agent_llm.model_name
== "gpt-3.5-turbo"
)
assert isinstance(result, PlannerTaskPydanticOutput)
assert len(result.list_of_plans_per_task) == len(
crew_planner_different_llm.tasks
)
execute.assert_called_once()