From 2c2193cda3e34d7199fd8f19ae4a6da55e31b0cd Mon Sep 17 00:00:00 2001 From: Eduardo Chiarotti Date: Wed, 10 Jul 2024 16:51:37 -0300 Subject: [PATCH] feat: add planning feature to Crew --- src/crewai/crew.py | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/src/crewai/crew.py b/src/crewai/crew.py index 5d29c6db3..f704b40cd 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -133,6 +133,10 @@ class Crew(BaseModel): default=False, description="output_log_file", ) + plan: Optional[bool] = Field( + default=False, + description="Plan the crew execution and add the plan to the crew.", + ) @field_validator("id", mode="before") @classmethod @@ -340,6 +344,10 @@ class Crew(BaseModel): agent.create_agent_executor() + if self.plan: + self._logger.log("info", "Planning the crew execution") + self._handle_crew_planning() + metrics = [] if self.process == Process.sequential: @@ -421,6 +429,44 @@ class Crew(BaseModel): return results + def _handle_crew_planning(self): + """Handles the crew planning.""" + planning_agent = Agent( + role="Task Execution Planner", + goal="Your goal is to create an extremely detailed, step-by-step plan based on the tasks and tools " + "available to each agent so that they can perform the tasks in an exemplary manner", + backstory="Planner agent for crew planning", + ) + + tasks_summary = [] + for idx, task in enumerate(self.tasks): + tasks_summary.append( + f""" + Task Number {idx + 1} - {task.description} + "task_description": {task.description} + "task_expected_output": {task.expected_output} + "agent": {task.agent.role if task.agent else "None"} + "agent_goal": {task.agent.goal if task.agent else "None"} + "task_tools": {task.tools} + "agent_tools": {task.agent.tools if task.agent else "None"} + """ + ) + + class PlannerTaskPydanticOutput(BaseModel): + list_of_plans_per_task: List[str] + + planner_task = Task( + description=f"Based on this tasks summary: {' '.join(tasks_summary)} \n Create the most descriptive plan based on the tasks descriptions, tools availables and agents goals for them to execute with perfection their goals", + expected_output="Step by step plan on how the agents can execute their tasks using the available tools with mastery", + agent=planning_agent, + output_pydantic=PlannerTaskPydanticOutput, + ) + + result = planner_task.execute() + + for task, step_plan in zip(self.tasks, result.list_of_plans_per_task): + task.description += step_plan + def _run_sequential_process(self) -> Union[str, Dict[str, Any]]: """Executes tasks sequentially and returns the final output.""" task_output = None