mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-09 16:18:30 +00:00
hierarchical process unblocked for async tasks (#995)
* WIP: hierarchical unblock for async tasks * added better test * update name change * added more test and crew manager cleanup * remove prints * code cleanup, no need to pass manager
This commit is contained in:
@@ -271,20 +271,6 @@ class Crew(BaseModel):
|
||||
|
||||
return self
|
||||
|
||||
@model_validator(mode="after")
|
||||
def check_tasks_in_hierarchical_process_not_async(self):
|
||||
"""Validates that the tasks in hierarchical process are not flagged with async_execution."""
|
||||
if self.process == Process.hierarchical:
|
||||
for task in self.tasks:
|
||||
if task.async_execution:
|
||||
raise PydanticCustomError(
|
||||
"async_execution_in_hierarchical_process",
|
||||
"Hierarchical process error: Tasks cannot be flagged with async_execution.",
|
||||
{},
|
||||
)
|
||||
|
||||
return self
|
||||
|
||||
@model_validator(mode="after")
|
||||
def validate_end_with_at_most_one_async_task(self):
|
||||
"""Validates that the crew ends with at most one asynchronous task."""
|
||||
@@ -607,7 +593,7 @@ class Crew(BaseModel):
|
||||
def _run_hierarchical_process(self) -> CrewOutput:
|
||||
"""Creates and assigns a manager agent to make sure the crew completes the tasks."""
|
||||
self._create_manager_agent()
|
||||
return self._execute_tasks(self.tasks, self.manager_agent)
|
||||
return self._execute_tasks(self.tasks)
|
||||
|
||||
def _create_manager_agent(self):
|
||||
i18n = I18N(prompt_file=self.prompt_file)
|
||||
@@ -631,7 +617,6 @@ class Crew(BaseModel):
|
||||
def _execute_tasks(
|
||||
self,
|
||||
tasks: List[Task],
|
||||
manager: Optional[BaseAgent] = None,
|
||||
start_index: Optional[int] = 0,
|
||||
was_replayed: bool = False,
|
||||
) -> CrewOutput:
|
||||
@@ -659,13 +644,13 @@ class Crew(BaseModel):
|
||||
last_sync_output = task.output
|
||||
continue
|
||||
|
||||
agent_to_use = self._get_agent_to_use(task, manager)
|
||||
agent_to_use = self._get_agent_to_use(task)
|
||||
if agent_to_use is None:
|
||||
raise ValueError(
|
||||
f"No agent available for task: {task.description}. Ensure that either the task has an assigned agent or a manager agent is provided."
|
||||
)
|
||||
|
||||
self._prepare_agent_tools(task, manager)
|
||||
self._prepare_agent_tools(task)
|
||||
self._log_task_start(task, agent_to_use.role)
|
||||
|
||||
if isinstance(task, ConditionalTask):
|
||||
@@ -731,20 +716,18 @@ class Crew(BaseModel):
|
||||
return skipped_task_output
|
||||
return None
|
||||
|
||||
def _prepare_agent_tools(self, task: Task, manager: Optional[BaseAgent]):
|
||||
def _prepare_agent_tools(self, task: Task):
|
||||
if self.process == Process.hierarchical:
|
||||
if manager:
|
||||
self._update_manager_tools(task, manager)
|
||||
if self.manager_agent:
|
||||
self._update_manager_tools(task)
|
||||
else:
|
||||
raise ValueError("Manager agent is required for hierarchical process.")
|
||||
elif task.agent and task.agent.allow_delegation:
|
||||
self._add_delegation_tools(task)
|
||||
|
||||
def _get_agent_to_use(
|
||||
self, task: Task, manager: Optional[BaseAgent]
|
||||
) -> Optional[BaseAgent]:
|
||||
def _get_agent_to_use(self, task: Task) -> Optional[BaseAgent]:
|
||||
if self.process == Process.hierarchical:
|
||||
return manager
|
||||
return self.manager_agent
|
||||
return task.agent
|
||||
|
||||
def _add_delegation_tools(self, task: Task):
|
||||
@@ -780,11 +763,14 @@ class Crew(BaseModel):
|
||||
if self.output_log_file:
|
||||
self._file_handler.log(agent=role, task=task.description, status="started")
|
||||
|
||||
def _update_manager_tools(self, task: Task, manager: BaseAgent):
|
||||
if task.agent:
|
||||
manager.tools = task.agent.get_delegation_tools([task.agent])
|
||||
else:
|
||||
manager.tools = manager.get_delegation_tools(self.agents)
|
||||
def _update_manager_tools(self, task: Task):
|
||||
if self.manager_agent:
|
||||
if task.agent:
|
||||
self.manager_agent.tools = task.agent.get_delegation_tools([task.agent])
|
||||
else:
|
||||
self.manager_agent.tools = self.manager_agent.get_delegation_tools(
|
||||
self.agents
|
||||
)
|
||||
|
||||
def _get_context(self, task: Task, task_outputs: List[TaskOutput]):
|
||||
context = (
|
||||
@@ -883,7 +869,7 @@ class Crew(BaseModel):
|
||||
self.tasks[i].output = task_output
|
||||
|
||||
self._logging_color = "bold_blue"
|
||||
result = self._execute_tasks(self.tasks, self.manager_agent, start_index, True)
|
||||
result = self._execute_tasks(self.tasks, start_index, True)
|
||||
return result
|
||||
|
||||
def copy(self):
|
||||
|
||||
Reference in New Issue
Block a user