mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 16:48:30 +00:00
better logic for seq and hier
This commit is contained in:
@@ -480,20 +480,14 @@ class Crew(BaseModel):
|
||||
self.execution_logs.append(log)
|
||||
self._task_output_handler.append(log)
|
||||
|
||||
def _run_sequential_process(self) -> CrewOutput:
|
||||
def _run_sequential_process(self):
|
||||
"""Executes tasks sequentially and returns the final output."""
|
||||
self.execution_logs = []
|
||||
task_outputs = self._execute_tasks(self.tasks)
|
||||
final_string_output = aggregate_raw_outputs_from_task_outputs(task_outputs)
|
||||
self._finish_execution(final_string_output)
|
||||
self.save_execution_logs()
|
||||
token_usage = self.calculate_usage_metrics()
|
||||
|
||||
return self._format_output(task_outputs, token_usage)
|
||||
return self._execute_tasks(self.tasks)
|
||||
|
||||
def _execute_tasks(
|
||||
self,
|
||||
tasks,
|
||||
tasks: List[Task],
|
||||
manager: Optional[BaseAgent] = None,
|
||||
):
|
||||
task_outputs: List[TaskOutput] = []
|
||||
@@ -506,13 +500,16 @@ class Crew(BaseModel):
|
||||
if len(self.agents) > 1 and len(agents_for_delegation) > 0:
|
||||
task.tools += task.agent.get_delegation_tools(agents_for_delegation)
|
||||
|
||||
role = task.agent.role if task.agent is not None else "None"
|
||||
if manager:
|
||||
self._logger.log(
|
||||
"debug", f"Manager Agent: {manager.role}", color="bold_purple"
|
||||
)
|
||||
else:
|
||||
self._logger.log("debug", f"Working Agent: {role}", color="bold_purple")
|
||||
if self.process == Process.hierarchical:
|
||||
if task.agent and manager:
|
||||
manager.tools = task.agent.get_delegation_tools([task.agent])
|
||||
if manager:
|
||||
manager.tools = manager.get_delegation_tools(self.agents)
|
||||
|
||||
agent_to_use = task.agent if task.agent else manager
|
||||
role = agent_to_use.role if agent_to_use is not None else "None"
|
||||
|
||||
self._logger.log("debug", f"Working Agent: {role}", color="bold_purple")
|
||||
self._logger.log(
|
||||
"info",
|
||||
f"Starting Task: {task.description}",
|
||||
@@ -525,56 +522,32 @@ class Crew(BaseModel):
|
||||
)
|
||||
if task.async_execution:
|
||||
context = aggregate_raw_outputs_from_task_outputs(task_outputs)
|
||||
if self.process == Process.hierarchical:
|
||||
if task.agent:
|
||||
task.tools += task.agent.get_delegation_tools(self.agents)
|
||||
future = task.execute_async(
|
||||
agent=task.agent, context=context, tools=task.tools
|
||||
)
|
||||
futures.append((task, future, task_index))
|
||||
else:
|
||||
if manager:
|
||||
manager.tools = manager.get_delegation_tools(self.agents)
|
||||
future = task.execute_async(
|
||||
agent=manager, context=context, tools=manager.tools
|
||||
)
|
||||
futures.append((task, future, task_index))
|
||||
else:
|
||||
if agent_to_use:
|
||||
future = task.execute_async(
|
||||
agent=task.agent, context=context, tools=task.tools
|
||||
agent=agent_to_use,
|
||||
context=context,
|
||||
tools=agent_to_use.tools,
|
||||
)
|
||||
futures.append((task, future, task_index))
|
||||
else:
|
||||
else: # sequential async
|
||||
self._logger.log(
|
||||
"warning", f"No agent available for task: {task.description}"
|
||||
)
|
||||
|
||||
else: # sync execution
|
||||
if futures:
|
||||
task_outputs = self._process_async_tasks(futures)
|
||||
futures.clear()
|
||||
|
||||
context = aggregate_raw_outputs_from_task_outputs(task_outputs)
|
||||
|
||||
if self.process == Process.hierarchical:
|
||||
if task.agent:
|
||||
task.tools += task.agent.get_delegation_tools(self.agents)
|
||||
task_output = task.execute_sync(
|
||||
agent=task.agent, context=context, tools=task.tools
|
||||
)
|
||||
task_outputs = [task_output]
|
||||
self._process_task_result(task, task_output)
|
||||
self._store_execution_log(task, task_output, task_index)
|
||||
else:
|
||||
if manager:
|
||||
manager.tools = manager.get_delegation_tools(self.agents)
|
||||
task_output = task.execute_sync(
|
||||
agent=manager, context=context, tools=manager.tools
|
||||
)
|
||||
task_outputs = [task_output]
|
||||
self._process_task_result(task, task_output)
|
||||
self._store_execution_log(task, task_output, task_index)
|
||||
else:
|
||||
if agent_to_use:
|
||||
task_output = task.execute_sync(
|
||||
agent=task.agent, context=context, tools=task.tools
|
||||
agent=agent_to_use,
|
||||
context=context,
|
||||
tools=agent_to_use.tools,
|
||||
)
|
||||
task_outputs = [task_output]
|
||||
|
||||
self._process_task_result(task, task_output)
|
||||
self._store_execution_log(task, task_output, task_index)
|
||||
|
||||
@@ -665,12 +638,6 @@ class Crew(BaseModel):
|
||||
raise NotImplementedError(
|
||||
f"The process '{self.process}' is not implemented yet."
|
||||
)
|
||||
# all_tasks = self.tasks.copy()
|
||||
# stored_outputs = self._load_stored_outputs()
|
||||
# start_index = next(
|
||||
# (index for (index, d) in enumerate(stored_outputs) if d["task_id"] == str(task_id)),
|
||||
# None,
|
||||
# )
|
||||
|
||||
# this works sequentially
|
||||
def replay_from_task(self, task_id: str, inputs: Dict[str, Any] | None = None):
|
||||
@@ -854,64 +821,17 @@ class Crew(BaseModel):
|
||||
)
|
||||
self.manager_agent = manager
|
||||
|
||||
task_outputs = self._execute_tasks(self.tasks, manager)
|
||||
return self._execute_tasks(self.tasks, manager)
|
||||
|
||||
# task_outputs: List[TaskOutput] = []
|
||||
# futures: List[Tuple[Task, Future[TaskOutput]]] = []
|
||||
# final_string_output = aggregate_raw_outputs_from_task_outputs(task_outputs)
|
||||
# self._finish_execution(final_string_output)
|
||||
|
||||
# for task in self.tasks:
|
||||
# self._logger.log("debug", f"Working Agent: {manager.role}")
|
||||
# self._logger.log("info", f"Starting Task: {task.description}")
|
||||
# token_usage = self.calculate_usage_metrics()
|
||||
|
||||
# if self.output_log_file:
|
||||
# self._file_handler.log(
|
||||
# agent=manager.role, task=task.description, status="started"
|
||||
# )
|
||||
|
||||
# if task.async_execution:
|
||||
# context = aggregate_raw_outputs_from_task_outputs(task_outputs)
|
||||
# future = task.execute_async(
|
||||
# agent=manager, context=context, tools=manager.tools
|
||||
# )
|
||||
# futures.append((task, future))
|
||||
# else:
|
||||
# # Before executing a synchronous task, wait for all async tasks to complete
|
||||
# if futures:
|
||||
# # Clear task_outputs before processing async tasks
|
||||
# task_outputs = []
|
||||
# for future_task, future in futures:
|
||||
# task_output = future.result()
|
||||
# task_outputs.append(task_output)
|
||||
# self._process_task_result(future_task, task_output)
|
||||
|
||||
# # Clear the futures list after processing all async results
|
||||
# futures.clear()
|
||||
|
||||
# context = aggregate_raw_outputs_from_task_outputs(task_outputs)
|
||||
# task_output = task.execute_sync(
|
||||
# agent=manager, context=context, tools=manager.tools
|
||||
# )
|
||||
# task_outputs = [task_output]
|
||||
# self._process_task_result(task, task_output)
|
||||
|
||||
# # Process any remaining async results
|
||||
# if futures:
|
||||
# # Clear task_outputs before processing async tasks
|
||||
# task_outputs = []
|
||||
# for future_task, future in futures:
|
||||
# task_output = future.result()
|
||||
# task_outputs.append(task_output)
|
||||
# self._process_task_result(future_task, task_output)
|
||||
|
||||
final_string_output = aggregate_raw_outputs_from_task_outputs(task_outputs)
|
||||
self._finish_execution(final_string_output)
|
||||
|
||||
token_usage = self.calculate_usage_metrics()
|
||||
|
||||
return (
|
||||
self._format_output(task_outputs, token_usage),
|
||||
token_usage,
|
||||
)
|
||||
# return (
|
||||
# self._format_output(task_outputs, token_usage),
|
||||
# token_usage,
|
||||
# )
|
||||
|
||||
def copy(self):
|
||||
"""Create a deep copy of the Crew."""
|
||||
|
||||
@@ -211,6 +211,7 @@ class Task(BaseModel):
|
||||
tools: Optional[List[Any]],
|
||||
) -> TaskOutput:
|
||||
"""Run the core execution logic of the task."""
|
||||
self.agent = agent
|
||||
agent = agent or self.agent
|
||||
if not agent:
|
||||
raise Exception(
|
||||
|
||||
Reference in New Issue
Block a user