hierarchical process unblocked for async tasks (#995)

* WIP: hierarchical unblock for async tasks

* added better test

* update name change

* added more test and crew manager cleanup

* remove prints

* code cleanup, no need to pass manager
This commit is contained in:
Lorenze Jay
2024-07-26 10:55:51 -07:00
committed by GitHub
parent b415e2273a
commit 0c6514024e
4 changed files with 25797 additions and 47 deletions

View File

@@ -271,20 +271,6 @@ class Crew(BaseModel):
return self
@model_validator(mode="after")
def check_tasks_in_hierarchical_process_not_async(self):
"""Validates that the tasks in hierarchical process are not flagged with async_execution."""
if self.process == Process.hierarchical:
for task in self.tasks:
if task.async_execution:
raise PydanticCustomError(
"async_execution_in_hierarchical_process",
"Hierarchical process error: Tasks cannot be flagged with async_execution.",
{},
)
return self
@model_validator(mode="after")
def validate_end_with_at_most_one_async_task(self):
"""Validates that the crew ends with at most one asynchronous task."""
@@ -607,7 +593,7 @@ class Crew(BaseModel):
def _run_hierarchical_process(self) -> CrewOutput:
"""Creates and assigns a manager agent to make sure the crew completes the tasks."""
self._create_manager_agent()
return self._execute_tasks(self.tasks, self.manager_agent)
return self._execute_tasks(self.tasks)
def _create_manager_agent(self):
i18n = I18N(prompt_file=self.prompt_file)
@@ -631,7 +617,6 @@ class Crew(BaseModel):
def _execute_tasks(
self,
tasks: List[Task],
manager: Optional[BaseAgent] = None,
start_index: Optional[int] = 0,
was_replayed: bool = False,
) -> CrewOutput:
@@ -659,13 +644,13 @@ class Crew(BaseModel):
last_sync_output = task.output
continue
agent_to_use = self._get_agent_to_use(task, manager)
agent_to_use = self._get_agent_to_use(task)
if agent_to_use is None:
raise ValueError(
f"No agent available for task: {task.description}. Ensure that either the task has an assigned agent or a manager agent is provided."
)
self._prepare_agent_tools(task, manager)
self._prepare_agent_tools(task)
self._log_task_start(task, agent_to_use.role)
if isinstance(task, ConditionalTask):
@@ -731,20 +716,18 @@ class Crew(BaseModel):
return skipped_task_output
return None
def _prepare_agent_tools(self, task: Task, manager: Optional[BaseAgent]):
def _prepare_agent_tools(self, task: Task):
if self.process == Process.hierarchical:
if manager:
self._update_manager_tools(task, manager)
if self.manager_agent:
self._update_manager_tools(task)
else:
raise ValueError("Manager agent is required for hierarchical process.")
elif task.agent and task.agent.allow_delegation:
self._add_delegation_tools(task)
def _get_agent_to_use(
self, task: Task, manager: Optional[BaseAgent]
) -> Optional[BaseAgent]:
def _get_agent_to_use(self, task: Task) -> Optional[BaseAgent]:
if self.process == Process.hierarchical:
return manager
return self.manager_agent
return task.agent
def _add_delegation_tools(self, task: Task):
@@ -780,11 +763,14 @@ class Crew(BaseModel):
if self.output_log_file:
self._file_handler.log(agent=role, task=task.description, status="started")
def _update_manager_tools(self, task: Task, manager: BaseAgent):
if task.agent:
manager.tools = task.agent.get_delegation_tools([task.agent])
else:
manager.tools = manager.get_delegation_tools(self.agents)
def _update_manager_tools(self, task: Task):
if self.manager_agent:
if task.agent:
self.manager_agent.tools = task.agent.get_delegation_tools([task.agent])
else:
self.manager_agent.tools = self.manager_agent.get_delegation_tools(
self.agents
)
def _get_context(self, task: Task, task_outputs: List[TaskOutput]):
context = (
@@ -883,7 +869,7 @@ class Crew(BaseModel):
self.tasks[i].output = task_output
self._logging_color = "bold_blue"
result = self._execute_tasks(self.tasks, self.manager_agent, start_index, True)
result = self._execute_tasks(self.tasks, start_index, True)
return result
def copy(self):

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1356,28 +1356,66 @@ def test_hierarchical_crew_creation_tasks_with_agents():
@pytest.mark.vcr(filter_headers=["authorization"])
def test_hierarchical_crew_creation_tasks_with_async_execution():
"""
Agents are not required for tasks in a hierarchical process but sometimes they are still added
This test makes sure that the manager still delegates the task to the agent even if the agent is passed in the task
"""
from langchain_openai import ChatOpenAI
task = Task(
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
expected_output="5 bullet points with a paragraph for each idea.",
async_execution=True, # should throw an error
description="Write one amazing paragraph about AI.",
expected_output="A single paragraph with 4 sentences.",
agent=writer,
async_execution=True,
)
with pytest.raises(pydantic_core._pydantic_core.ValidationError) as exec_info:
Crew(
tasks=[task],
agents=[researcher],
process=Process.hierarchical,
manager_llm=ChatOpenAI(model="gpt-4o"),
)
assert (
exec_info.value.errors()[0]["type"] == "async_execution_in_hierarchical_process"
crew = Crew(
tasks=[task],
agents=[writer, researcher, ceo],
process=Process.hierarchical,
manager_llm=ChatOpenAI(model="gpt-4o"),
)
assert (
"Hierarchical process error: Tasks cannot be flagged with async_execution."
in exec_info.value.errors()[0]["msg"]
crew.kickoff()
assert crew.manager_agent is not None
assert crew.manager_agent.tools is not None
assert crew.manager_agent.tools[0].description.startswith(
"Delegate a specific task to one of the following coworkers: Senior Writer\n"
)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_hierarchical_crew_creation_tasks_with_sync_last():
"""
Agents are not required for tasks in a hierarchical process but sometimes they are still added
This test makes sure that the manager still delegates the task to the agent even if the agent is passed in the task
"""
from langchain_openai import ChatOpenAI
task = Task(
description="Write one amazing paragraph about AI.",
expected_output="A single paragraph with 4 sentences.",
agent=writer,
async_execution=True,
)
task2 = Task(
description="Write one amazing paragraph about AI.",
expected_output="A single paragraph with 4 sentences.",
async_execution=False,
)
crew = Crew(
tasks=[task, task2],
agents=[writer, researcher, ceo],
process=Process.hierarchical,
manager_llm=ChatOpenAI(model="gpt-4o"),
)
crew.kickoff()
assert crew.manager_agent is not None
assert crew.manager_agent.tools is not None
assert crew.manager_agent.tools[0].description.startswith(
"Delegate a specific task to one of the following coworkers: Senior Writer, Researcher, CEO\n"
)