Hierarchical process (#206)

* Hierarchical process +  Docs
Co-authored-by: João Moura <joaomdmoura@gmail.com>
This commit is contained in:
Gui Vieira
2024-02-02 13:56:35 -03:00
committed by GitHub
parent a3af73b593
commit 7efecd10ea
12 changed files with 1358 additions and 74 deletions

View File

@@ -130,7 +130,7 @@ print("######################")
print(result)
```
Currently the only supported process is `Process.sequential`, where one task is executed after the other and the outcome of one is passed as extra content into this next.
In addition to the sequential process, you can use the hierarchical process, which automatically assigns a manager to the defined crew to properly coordinate the planning and execution of tasks through delegation and validation of results. See more about the processes [here](./docs/core-concepts/Managing-Processes.md)
## Key Features
@@ -138,7 +138,7 @@ Currently the only supported process is `Process.sequential`, where one task is
- **Role-Based Agent Design**: Customize agents with specific roles, goals, and tools.
- **Autonomous Inter-Agent Delegation**: Agents can autonomously delegate tasks and inquire amongst themselves, enhancing problem-solving efficiency.
- **Flexible Task Management**: Define tasks with customizable tools and assign them to agents dynamically.
- **Processes Driven**: Currently only supports `sequential` task execution but more complex processes like consensual and hierarchical being worked on.
- **Processes Driven**: Currently only supports `sequential` task execution and `hierarchical` processes, but more complex processes like consensual and autonomous are being worked on.
- **Works with Open Source Models**: Run your crew using Open AI or open source models refer to the [Connect crewAI to LLMs](./docs/llm-connections.md) page for details on configuring you agents' connections to models, even ones running locally!
![CrewAI Mind Map](./docs/crewAI-mindmap.png "CrewAI Mind Map")

View File

@@ -8,42 +8,37 @@ A process in CrewAI can be thought of as the game plan for how your AI agents wi
## Process Implementations
- **Sequential (Supported)**: This is the only process currently implemented in CrewAI. It ensures tasks are handled one at a time, in a given order, much like a relay race where one runner passes the baton to the next.
- **Sequential (Supported)**: This process ensures tasks are handled one at a time, in a given order, much like a relay race where one runner passes the baton to the next.
- **Hierarchical**: This process introduces a chain of command to task execution. You define the crew and the system assigns a manager to properly coordinate the planning and execution of tasks through delegation and validation of results, akin to a traditional corporate hierarchy.
- **Consensual (WIP)**: Envisioned for a future update, the consensual process will enable agents to make joint decisions on task execution, similar to a team consensus in a meeting before proceeding.
- **Hierarchical (WIP)**: Also in the pipeline, this process will introduce a chain of command to task execution, where some agents may have the authority to prioritize tasks or delegate them, akin to a traditional corporate hierarchy.
These additional processes, once implemented, will offer more nuanced and sophisticated ways for agents to interact and complete tasks, much like teams in complex organizational structures.
## Defining a Sequential Process
Creating a sequential process in CrewAI is straightforward and reflects the simplicity of coordinating a team's efforts step by step. In this process the outcome of the previous task is sent into the next one as context that I should use to accomplish it's task
```python
from crewai import Process
# Define a sequential process
sequential_process = Process.sequential
```
# The Magic of Sequential Processes
The sequential process is where much of CrewAI's magic happens. It ensures that tasks are approached with the same thoughtful progression that a human team would use, fostering a natural and logical flow of work while passing on task outcome into the next.
## Assigning Processes to a Crew
To assign a process to a crew, simply set it during the crew's creation. The process will dictate the crew's approach to task execution.
```python
from crewai import Crew
# Create a crew with a sequential process
crew = Crew(agents=my_agents, tasks=my_tasks, process=sequential_process)
```
## The Role of Processes in Teamwork
The process you choose for your crew is critical. It's what transforms a group of individual agents into a cohesive unit that can tackle complex projects with the precision and harmony you'd find in a team of skilled humans.
## Assigning Processes to a Crew
To assign a process to a crew, simply set it during the crew's creation. The process will dictate the crew's approach to task execution. Different from the sequential process, you don't need to assign an agent to a task.
```python
from crewai import Crew
from crewai.process import Process
# Create a crew with a sequential process
crew = Crew(agents=my_agents, tasks=my_tasks, process=Process.sequential)
# OR create a crew with a hierarchical process
crew = Crew(agents=my_agents, tasks=my_tasks, process=Process.hierarchical)
```
## Sequential Process
The sequential process is where much of CrewAI's magic happens. It ensures that tasks are approached with the same thoughtful progression that a human team would use, fostering a natural and logical flow of work while passing on task outcome into the next.
## Hierarchical Process
The hierarchical process is tiny bit more magic, as the chain of command and task assignment is hidden from the user. The system will automatically assign a manager the execute the tasks, but the agent will never execute the job by itself. Instead, the manager will plan the steps to execute the task and delegate the work to the agents. The agents will then execute the task and report back to the manager, who will validate the results and pass the task outcome to the next task.
## Conclusion
Processes bring structure and order to the CrewAI ecosystem, allowing agents to collaborate effectively and accomplish goals systematically. As CrewAI evolves, additional process types will be introduced to enhance the framework's versatility, much like a team that grows and adapts over time.
Processes bring structure and order to the CrewAI ecosystem, allowing agents to collaborate effectively and accomplish goals systematically. As CrewAI evolves, additional process types will be introduced to enhance the framework's versatility, much like a team that grows and adapts over time.

View File

@@ -1,13 +1,12 @@
import uuid
from typing import Any, List, Optional
from langchain.agents.format_scratchpad import format_log_to_str
from langchain.agents.agent import RunnableAgent
from langchain.agents.format_scratchpad import format_log_to_str
from langchain.memory import ConversationSummaryMemory
from langchain.tools.render import render_text_description
from langchain_core.runnables.config import RunnableConfig
from langchain_openai import ChatOpenAI
from langchain_core.language_models import BaseLanguageModel
from pydantic import (
UUID4,
BaseModel,
@@ -171,7 +170,7 @@ class Agent(BaseModel):
"""
self.cache_handler = cache_handler
self.tools_handler = ToolsHandler(cache=self.cache_handler)
self.__create_agent_executor()
self._create_agent_executor()
def set_rpm_controller(self, rpm_controller: RPMController) -> None:
"""Set the rpm controller for the agent.
@@ -181,9 +180,9 @@ class Agent(BaseModel):
"""
if not self._rpm_controller:
self._rpm_controller = rpm_controller
self.__create_agent_executor()
self._create_agent_executor()
def __create_agent_executor(self) -> None:
def _create_agent_executor(self) -> None:
"""Create an agent executor for the agent.
Returns:

View File

@@ -142,18 +142,26 @@ class Crew(BaseModel):
agent.i18n = I18N(language=self.language)
if self.process == Process.sequential:
return self._sequential_loop()
else:
raise NotImplementedError(
f"The process '{self.process}' is not implemented yet."
)
return self._run_sequential_process()
if self.process == Process.hierarchical:
return self._run_hierarchical_process()
def _sequential_loop(self) -> str:
raise NotImplementedError(
f"The process '{self.process}' is not implemented yet."
)
def _run_sequential_process(self) -> str:
"""Executes tasks sequentially and returns the final output."""
task_output = ""
for task in self.tasks:
self._prepare_and_execute_task(task)
task_output = task.execute(task_output)
if task.agent is not None and task.agent.allow_delegation:
task.tools += AgentTools(agents=self.agents).tools()
role = task.agent.role if task.agent is not None else "None"
self._logger.log("debug", f"Working Agent: {role}")
self._logger.log("info", f"Starting Task: {task.description}")
task_output = task.execute(context=task_output)
role = task.agent.role if task.agent is not None else "None"
self._logger.log("debug", f"[{role}] Task output: {task_output}\n\n")
@@ -163,15 +171,29 @@ class Crew(BaseModel):
return task_output
def _prepare_and_execute_task(self, task: Task) -> None:
"""Prepares and logs information about the task being executed.
def _run_hierarchical_process(self) -> str:
"""Creates and assigns a manager agent to make sure the crew completes the tasks."""
Args:
task: The task to be executed.
"""
if task.agent is not None and task.agent.allow_delegation:
task.tools += AgentTools(agents=self.agents).tools()
i18n = I18N(language=self.language)
manager = Agent(
role=i18n.retrieve("hierarchical_manager_agent", "role"),
goal=i18n.retrieve("hierarchical_manager_agent", "goal"),
backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"),
tools=AgentTools(agents=self.agents).tools(),
verbose=True,
)
role = task.agent.role if task.agent is not None else "None"
self._logger.log("debug", f"Working Agent: {role}")
self._logger.log("info", f"Starting Task: {task.description}")
task_output = ""
for task in self.tasks:
self._logger.log("info", f"Starting Task: {task.description}")
task_output = task.execute(agent=manager, context=task_output)
self._logger.log(
"debug", f"[{manager.role}] Task output: {task_output}\n\n"
)
if self.max_rpm:
self._rpm_controller.stop_rpm_counter()
return task_output

View File

@@ -7,5 +7,5 @@ class Process(str, Enum):
"""
sequential = "sequential"
hierarchical = "hierarchical"
# TODO: consensual = 'consensual'
# TODO: hierarchical = 'hierarchical'

View File

@@ -19,7 +19,7 @@ class Task(BaseModel):
description="Callback to be executed after the task is completed.", default=None
)
agent: Optional[Agent] = Field(
description="Agent responsible for executiong the task.", default=None
description="Agent responsible for execution the task.", default=None
)
expected_output: Optional[str] = Field(
description="Clear definition of expected output for the task.",
@@ -53,18 +53,20 @@ class Task(BaseModel):
self.tools.extend(self.agent.tools)
return self
def execute(self, context: Optional[str] = None) -> str:
def execute(self, agent: Agent | None = None, context: Optional[str] = None) -> str:
"""Execute the task.
Returns:
Output of the task.
"""
if not self.agent:
agent = agent or self.agent
if not agent:
raise Exception(
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, either consensual or hierarchical."
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical."
)
result = self.agent.execute_task(
result = agent.execute_task(
task=self._prompt(), context=context, tools=self.tools
)

View File

@@ -33,13 +33,13 @@ class AgentTools(BaseModel):
def delegate_work(self, command):
"""Useful to delegate a specific task to a coworker."""
return self.__execute(command)
return self._execute(command)
def ask_question(self, command):
"""Useful to ask a question, opinion or take from a coworker."""
return self.__execute(command)
return self._execute(command)
def __execute(self, command):
def _execute(self, command):
"""Execute the command."""
try:
agent, task, context = command.split("|")

View File

@@ -1,4 +1,9 @@
{
"hierarchical_manager_agent": {
"role": "Διευθυντής Ομάδας",
"goal": "Διαχειρίσου την ομάδα σου για να ολοκληρώσει την εργασία με τον καλύτερο δυνατό τρόπο.",
"backstory": "Είσαι ένας έμπειρος διευθυντής με την ικανότητα να βγάζεις το καλύτερο από την ομάδα σου.\nΕίσαι επίσης γνωστός για την ικανότητά σου να αναθέτεις εργασίες στους σωστούς ανθρώπους και να κάνεις τις σωστές ερωτήσεις για να πάρεις το καλύτερο από την ομάδα σου.\nΑκόμα κι αν δεν εκτελείς εργασίες μόνος σου, έχεις πολλή εμπειρία στον τομέα, που σου επιτρέπει να αξιολογείς σωστά τη δουλειά των μελών της ομάδας σου."
},
"slices": {
"observation": "\nΠαρατήρηση",
"task": "Αρχή! Αυτό είναι ΠΟΛΥ σημαντικό για εσάς, η δουλειά σας εξαρτάται από αυτό!\n\nΤρέχουσα εργασία: {input}",

View File

@@ -1,4 +1,9 @@
{
"hierarchical_manager_agent": {
"role": "Crew Manager",
"goal": "Manage the team to complete the task in the best way possible.",
"backstory": "You are a seasoned manager with a knack for getting the best out of your team.\nYou are also known for your ability to delegate work to the right people, and to ask the right questions to get the best out of your team.\nEven though you don't perform tasks by yourself, you have a lot of experience in the field, which allows you to properly evaluate the work of your team members."
},
"slices": {
"observation": "\nObservation",
"task": "Begin! This is VERY important to you, your job depends on it!\n\nCurrent Task: {input}",
@@ -18,4 +23,4 @@
"delegate_work": "Useful to delegate a specific task to one of the following co-workers: {coworkers}.\nThe input to this tool should be a pipe (|) separated text of length 3 (three), representing the co-worker you want to ask it to (one of the options), the task and all actual context you have for the task.\nFor example, `coworker|task|context`.",
"ask_question": "Useful to ask a question, opinion or take from on of the following co-workers: {coworkers}.\nThe input to this tool should be a pipe (|) separated text of length 3 (three), representing the co-worker you want to ask it to (one of the options), the question and all actual context you have for the question.\n For example, `coworker|question|context`."
}
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -142,6 +142,39 @@ def test_crew_creation():
)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_hierarchical_process():
task = Task(
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
)
crew = Crew(
agents=[researcher, writer],
process=Process.hierarchical,
tasks=[task],
)
assert (
crew.kickoff()
== """Here are the 5 interesting ideas with a highlight paragraph for each:
1. "The Future of AI in Healthcare: Predicting Diseases Before They Happen"
- "Imagine a future where AI empowers us to detect diseases before they arise, transforming healthcare from reactive to proactive. Machine learning algorithms, trained on vast amounts of patient data, could potentially predict heart diseases, strokes, or cancers before they manifest, allowing for early interventions and significantly improving patient outcomes. This article will delve into the rapid advancements in AI within the healthcare sector and how these technologies are ushering us into a new era of predictive medicine."
2. "How AI is Changing the Way We Cook: An Insight into Smart Kitchens"
- "From the humble home kitchen to grand culinary stages, AI is revolutionizing the way we cook. Smart appliances, equipped with advanced sensors and predictive algorithms, are turning kitchens into creative playgrounds, offering personalized recipes, precise cooking instructions, and even automated meal preparation. This article explores the fascinating intersection of AI and gastronomy, revealing how technology is transforming our culinary experiences."
3. "Redefining Fitness with AI: Personalized Workout Plans and Nutritional Advice"
- "Fitness reimagined that's the promise of AI in the wellness industry. Picture a personal trainer who knows your strengths, weaknesses, and nutritional needs intimately. An AI-powered fitness app can provide this personalized experience, adapting your workout plans and dietary recommendations in real-time based on your progress and feedback. Join us as we unpack how AI is revolutionizing the fitness landscape, offering personalized, data-driven approaches to health and well-being."
4. "AI and the Art World: How Technology is Shaping Creativity"
- "Art and AI may seem like unlikely partners, but their synergy is sparking a creative revolution. AI algorithms are now creating mesmerizing artworks, challenging our perceptions of creativity and originality. From AI-assisted painting to generative music composition, this article will take you on a journey through the fascinating world of AI in art, exploring how technology is reshaping the boundaries of human creativity."
5. "AI in Space Exploration: The Next Frontier"
- "The vast expanse of space, once the sole domain of astronauts and rovers, is the next frontier for AI. AI technology is playing an increasingly vital role in space exploration, from predicting space weather to assisting in interstellar navigation. This article will delve into the exciting intersection of AI and space exploration, exploring how these advanced technologies are helping us uncover the mysteries of the cosmos.\""""
)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_crew_with_delegating_agents():
tasks = [

View File

@@ -1,5 +1,7 @@
"""Test Agent creation and execution basic functionality."""
from unittest.mock import MagicMock, patch
from crewai.agent import Agent
from crewai.task import Task
@@ -27,7 +29,7 @@ def test_task_tool_reflect_agent_tools():
assert task.tools == [fake_tool]
def test_task_tool_takes_precedence_ove_agent_tools():
def test_task_tool_takes_precedence_over_agent_tools():
from langchain.tools import tool
@tool
@@ -47,7 +49,7 @@ def test_task_tool_takes_precedence_ove_agent_tools():
)
task = Task(
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
description="Give me a list of 5 interesting ideas to explore for an article, what makes them unique and interesting.",
agent=researcher,
tools=[fake_task_tool],
)
@@ -69,8 +71,6 @@ def test_task_prompt_includes_expected_output():
agent=researcher,
)
from unittest.mock import patch
with patch.object(Agent, "execute_task") as execute:
execute.return_value = "ok"
task.execute()
@@ -78,8 +78,6 @@ def test_task_prompt_includes_expected_output():
def test_task_callback():
from unittest.mock import MagicMock
researcher = Agent(
role="Researcher",
goal="Make the best research and analysis on content about AI and AI agents",
@@ -94,12 +92,27 @@ def test_task_callback():
expected_output="Bullet point list of 5 interesting ideas.",
agent=researcher,
callback=task_completed,
allow_delegation=False,
)
from unittest.mock import patch
with patch.object(Agent, "execute_task") as execute:
execute.return_value = "ok"
task.execute()
task_completed.assert_called_once_with(task.output)
def test_execute_with_agent():
researcher = Agent(
role="Researcher",
goal="Make the best research and analysis on content about AI and AI agents",
backstory="You're an expert researcher, specialized in technology, software engineering, AI and startups. You work as a freelancer and is now working on doing research and analysis for a new customer.",
allow_delegation=False,
)
task = Task(
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.",
expected_output="Bullet point list of 5 interesting ideas.",
)
with patch.object(Agent, "execute_task", return_value="ok") as execute:
task.execute(agent=researcher)
execute.assert_called_once_with(task=task._prompt(), context=None, tools=[])