mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-09 08:08:32 +00:00
feat: Add planning feature to crew (#919)
* feat: add planning feature to crew * feat: add test to planning handler and change to execute_async method * docs: add planning parameter to the Core documentation * docs: add planning docs * fix: fix type checking issue * fix: test and logic
This commit is contained in:
committed by
GitHub
parent
5b442e4350
commit
61a196394b
64
src/crewai/utilities/planning_handler.py
Normal file
64
src/crewai/utilities/planning_handler.py
Normal file
@@ -0,0 +1,64 @@
|
||||
from typing import List
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from crewai.agent import Agent
|
||||
from crewai.task import Task
|
||||
|
||||
|
||||
class PlannerTaskPydanticOutput(BaseModel):
|
||||
list_of_plans_per_task: List[str]
|
||||
|
||||
|
||||
class CrewPlanner:
|
||||
def __init__(self, tasks: List[Task]):
|
||||
self.tasks = tasks
|
||||
|
||||
def _handle_crew_planning(self):
|
||||
"""Handles the Crew planning by creating detailed step-by-step plans for each task."""
|
||||
planning_agent = self._create_planning_agent()
|
||||
tasks_summary = self._create_tasks_summary()
|
||||
|
||||
planner_task = self._create_planner_task(planning_agent, tasks_summary)
|
||||
|
||||
return planner_task.execute_sync()
|
||||
|
||||
def _create_planning_agent(self) -> Agent:
|
||||
"""Creates the planning agent for the crew planning."""
|
||||
return Agent(
|
||||
role="Task Execution Planner",
|
||||
goal=(
|
||||
"Your goal is to create an extremely detailed, step-by-step plan based on the tasks and tools "
|
||||
"available to each agent so that they can perform the tasks in an exemplary manner"
|
||||
),
|
||||
backstory="Planner agent for crew planning",
|
||||
)
|
||||
|
||||
def _create_planner_task(self, planning_agent: Agent, tasks_summary: str) -> Task:
|
||||
"""Creates the planner task using the given agent and tasks summary."""
|
||||
return Task(
|
||||
description=(
|
||||
f"Based on these tasks summary: {tasks_summary} \n Create the most descriptive plan based on the tasks "
|
||||
"descriptions, tools available, and agents' goals for them to execute their goals with perfection."
|
||||
),
|
||||
expected_output="Step by step plan on how the agents can execute their tasks using the available tools with mastery",
|
||||
agent=planning_agent,
|
||||
output_pydantic=PlannerTaskPydanticOutput,
|
||||
)
|
||||
|
||||
def _create_tasks_summary(self) -> str:
|
||||
"""Creates a summary of all tasks."""
|
||||
tasks_summary = []
|
||||
for idx, task in enumerate(self.tasks):
|
||||
tasks_summary.append(
|
||||
f"""
|
||||
Task Number {idx + 1} - {task.description}
|
||||
"task_description": {task.description}
|
||||
"task_expected_output": {task.expected_output}
|
||||
"agent": {task.agent.role if task.agent else "None"}
|
||||
"agent_goal": {task.agent.goal if task.agent else "None"}
|
||||
"task_tools": {task.tools}
|
||||
"agent_tools": {task.agent.tools if task.agent else "None"}
|
||||
"""
|
||||
)
|
||||
return " ".join(tasks_summary)
|
||||
Reference in New Issue
Block a user