feat: Add planning feature to crew (#919)

* feat: add planning feature to crew

* feat: add test to planning handler and change to execute_async method

* docs: add planning parameter to the Core documentation

* docs: add planning docs

* fix: fix type checking issue

* fix: test and logic
This commit is contained in:
Eduardo Chiarotti
2024-07-18 13:15:08 -03:00
committed by GitHub
parent 4f99ea547f
commit 3f2f832e8d
9 changed files with 290 additions and 7 deletions

View File

@@ -42,6 +42,7 @@ from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs,
aggregate_raw_outputs_from_tasks,
)
from crewai.utilities.planning_handler import CrewPlanner
from crewai.utilities.task_output_storage_handler import TaskOutputStorageHandler
from crewai.utilities.training_handler import CrewTrainingHandler
@@ -73,6 +74,7 @@ class Crew(BaseModel):
task_callback: Callback to be executed after each task for every agents execution.
step_callback: Callback to be executed after each step for every agents execution.
share_crew: Whether you want to share the complete crew information and execution with crewAI to make the library better, and allow us to train models.
planning: Plan the crew execution and add the plan to the crew.
"""
__hash__ = object.__hash__ # type: ignore
@@ -148,6 +150,10 @@ class Crew(BaseModel):
default=False,
description="output_log_file",
)
planning: Optional[bool] = Field(
default=False,
description="Plan the crew execution and add the plan to the crew.",
)
task_execution_output_json_files: Optional[List[str]] = Field(
default=None,
description="List of file paths for task execution JSON files.",
@@ -453,6 +459,9 @@ class Crew(BaseModel):
agent.create_agent_executor()
if self.planning:
self._handle_crew_planning()
metrics = []
if self.process == Process.sequential:
@@ -547,6 +556,14 @@ class Crew(BaseModel):
self._task_output_handler.reset()
return results
def _handle_crew_planning(self):
"""Handles the Crew planning."""
self._logger.log("info", "Planning the crew execution")
result = CrewPlanner(self.tasks)._handle_crew_planning()
for task, step_plan in zip(self.tasks, result.list_of_plans_per_task):
task.description += step_plan
def _store_execution_log(
self,
task: Task,

View File

@@ -254,7 +254,9 @@ class Task(BaseModel):
content = (
json_output
if json_output
else pydantic_output.model_dump_json() if pydantic_output else result
else pydantic_output.model_dump_json()
if pydantic_output
else result
)
self._save_file(content)
@@ -326,9 +328,14 @@ class Task(BaseModel):
def _create_converter(self, *args, **kwargs) -> Converter:
"""Create a converter instance."""
converter = self.agent.get_output_converter(*args, **kwargs)
if self.converter_cls:
if self.agent and not self.converter_cls:
converter = self.agent.get_output_converter(*args, **kwargs)
elif self.converter_cls:
converter = self.converter_cls(*args, **kwargs)
if not converter:
raise Exception("No output converter found or set.")
return converter
def _export_output(

View File

@@ -0,0 +1,64 @@
from typing import List
from pydantic import BaseModel
from crewai.agent import Agent
from crewai.task import Task
class PlannerTaskPydanticOutput(BaseModel):
list_of_plans_per_task: List[str]
class CrewPlanner:
def __init__(self, tasks: List[Task]):
self.tasks = tasks
def _handle_crew_planning(self):
"""Handles the Crew planning by creating detailed step-by-step plans for each task."""
planning_agent = self._create_planning_agent()
tasks_summary = self._create_tasks_summary()
planner_task = self._create_planner_task(planning_agent, tasks_summary)
return planner_task.execute_sync()
def _create_planning_agent(self) -> Agent:
"""Creates the planning agent for the crew planning."""
return Agent(
role="Task Execution Planner",
goal=(
"Your goal is to create an extremely detailed, step-by-step plan based on the tasks and tools "
"available to each agent so that they can perform the tasks in an exemplary manner"
),
backstory="Planner agent for crew planning",
)
def _create_planner_task(self, planning_agent: Agent, tasks_summary: str) -> Task:
"""Creates the planner task using the given agent and tasks summary."""
return Task(
description=(
f"Based on these tasks summary: {tasks_summary} \n Create the most descriptive plan based on the tasks "
"descriptions, tools available, and agents' goals for them to execute their goals with perfection."
),
expected_output="Step by step plan on how the agents can execute their tasks using the available tools with mastery",
agent=planning_agent,
output_pydantic=PlannerTaskPydanticOutput,
)
def _create_tasks_summary(self) -> str:
"""Creates a summary of all tasks."""
tasks_summary = []
for idx, task in enumerate(self.tasks):
tasks_summary.append(
f"""
Task Number {idx + 1} - {task.description}
"task_description": {task.description}
"task_expected_output": {task.expected_output}
"agent": {task.agent.role if task.agent else "None"}
"agent_goal": {task.agent.goal if task.agent else "None"}
"task_tools": {task.tools}
"agent_tools": {task.agent.tools if task.agent else "None"}
"""
)
return " ".join(tasks_summary)