mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-10 00:28:31 +00:00
refactoring for cleaner code
This commit is contained in:
@@ -539,7 +539,7 @@ class Crew(BaseModel):
|
|||||||
"inputs": inputs,
|
"inputs": inputs,
|
||||||
"was_replayed": was_replayed,
|
"was_replayed": was_replayed,
|
||||||
}
|
}
|
||||||
# Update the existing log or append if it's a new entry
|
|
||||||
if task_index < len(self.execution_logs):
|
if task_index < len(self.execution_logs):
|
||||||
self.execution_logs[task_index] = log
|
self.execution_logs[task_index] = log
|
||||||
else:
|
else:
|
||||||
@@ -548,92 +548,113 @@ class Crew(BaseModel):
|
|||||||
|
|
||||||
def _run_sequential_process(self) -> CrewOutput:
|
def _run_sequential_process(self) -> CrewOutput:
|
||||||
"""Executes tasks sequentially and returns the final output."""
|
"""Executes tasks sequentially and returns the final output."""
|
||||||
self.execution_logs = []
|
|
||||||
return self._execute_tasks(self.tasks)
|
return self._execute_tasks(self.tasks)
|
||||||
|
|
||||||
|
def _run_hierarchical_process(self) -> CrewOutput:
|
||||||
|
"""Creates and assigns a manager agent to make sure the crew completes the tasks."""
|
||||||
|
self._create_manager_agent()
|
||||||
|
return self._execute_tasks(self.tasks, self.manager_agent)
|
||||||
|
|
||||||
|
def _create_manager_agent(self):
|
||||||
|
i18n = I18N(prompt_file=self.prompt_file)
|
||||||
|
if self.manager_agent is not None:
|
||||||
|
self.manager_agent.allow_delegation = True
|
||||||
|
manager = self.manager_agent
|
||||||
|
if manager.tools is not None and len(manager.tools) > 0:
|
||||||
|
raise Exception("Manager agent should not have tools")
|
||||||
|
manager.tools = self.manager_agent.get_delegation_tools(self.agents)
|
||||||
|
else:
|
||||||
|
manager = Agent(
|
||||||
|
role=i18n.retrieve("hierarchical_manager_agent", "role"),
|
||||||
|
goal=i18n.retrieve("hierarchical_manager_agent", "goal"),
|
||||||
|
backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"),
|
||||||
|
tools=AgentTools(agents=self.agents).tools(),
|
||||||
|
llm=self.manager_llm,
|
||||||
|
verbose=self.verbose,
|
||||||
|
)
|
||||||
|
self.manager_agent = manager
|
||||||
|
|
||||||
def _execute_tasks(
|
def _execute_tasks(
|
||||||
self,
|
self,
|
||||||
tasks: List[Task],
|
tasks: List[Task],
|
||||||
manager: Optional[BaseAgent] = None,
|
manager: Optional[BaseAgent] = None,
|
||||||
) -> CrewOutput:
|
) -> CrewOutput:
|
||||||
|
"""Executes tasks sequentially and returns the final output.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
tasks (List[Task]): List of tasks to execute
|
||||||
|
manager (Optional[BaseAgent], optional): Manager agent to use for delegation. Defaults to None.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
CrewOutput: Final output of the crew
|
||||||
|
"""
|
||||||
task_outputs: List[TaskOutput] = []
|
task_outputs: List[TaskOutput] = []
|
||||||
futures: List[Tuple[Task, Future[TaskOutput], int]] = []
|
futures: List[Tuple[Task, Future[TaskOutput], int]] = []
|
||||||
|
self.execution_logs = []
|
||||||
for task_index, task in enumerate(tasks):
|
for task_index, task in enumerate(tasks):
|
||||||
if task.agent and task.agent.allow_delegation:
|
self._prepare_task(task, manager)
|
||||||
agents_for_delegation = [
|
|
||||||
agent for agent in self.agents if agent != task.agent
|
|
||||||
]
|
|
||||||
if len(self.agents) > 1 and len(agents_for_delegation) > 0:
|
|
||||||
task.tools += task.agent.get_delegation_tools(agents_for_delegation)
|
|
||||||
|
|
||||||
if self.process == Process.hierarchical:
|
|
||||||
if task.agent and manager:
|
|
||||||
manager.tools = task.agent.get_delegation_tools([task.agent])
|
|
||||||
if manager:
|
|
||||||
manager.tools = manager.get_delegation_tools(self.agents)
|
|
||||||
|
|
||||||
agent_to_use = task.agent if task.agent else manager
|
agent_to_use = task.agent if task.agent else manager
|
||||||
role = agent_to_use.role if agent_to_use is not None else "None"
|
if agent_to_use is None:
|
||||||
|
raise ValueError(
|
||||||
self._logger.log("debug", f"Working Agent: {role}", color="bold_purple")
|
f"No agent available for task: {task.description}. Ensure that either the task has an assigned agent or a manager agent is provided."
|
||||||
self._logger.log(
|
|
||||||
"info",
|
|
||||||
f"Starting Task: {task.description}",
|
|
||||||
color="bold_purple",
|
|
||||||
)
|
|
||||||
|
|
||||||
if self.output_log_file:
|
|
||||||
self._file_handler.log(
|
|
||||||
agent=role, task=task.description, status="started"
|
|
||||||
)
|
)
|
||||||
|
self._log_task_start(task, agent_to_use)
|
||||||
|
|
||||||
if task.async_execution:
|
if task.async_execution:
|
||||||
context = self._set_context(task, task_outputs)
|
context = self._set_context(task, task_outputs)
|
||||||
if agent_to_use:
|
future = task.execute_async(
|
||||||
future = task.execute_async(
|
agent=agent_to_use,
|
||||||
agent=agent_to_use,
|
context=context,
|
||||||
context=context,
|
tools=agent_to_use.tools,
|
||||||
tools=agent_to_use.tools,
|
)
|
||||||
)
|
futures.append((task, future, task_index))
|
||||||
futures.append((task, future, task_index))
|
else:
|
||||||
else: # sequential async
|
|
||||||
self._logger.log(
|
|
||||||
"warning", f"No agent available for task: {task.description}"
|
|
||||||
)
|
|
||||||
|
|
||||||
else: # sync execution
|
|
||||||
if futures:
|
if futures:
|
||||||
task_outputs = self._process_async_tasks(futures)
|
task_outputs = self._process_async_tasks(futures)
|
||||||
futures.clear()
|
futures.clear()
|
||||||
|
|
||||||
context = self._set_context(task, task_outputs)
|
context = self._set_context(task, task_outputs)
|
||||||
if agent_to_use:
|
task_output = task.execute_sync(
|
||||||
task_output = task.execute_sync(
|
agent=agent_to_use,
|
||||||
agent=agent_to_use,
|
context=context,
|
||||||
context=context,
|
tools=agent_to_use.tools,
|
||||||
tools=agent_to_use.tools,
|
)
|
||||||
)
|
task_outputs = [task_output]
|
||||||
task_outputs = [task_output]
|
self._process_task_result(task, task_output)
|
||||||
self._process_task_result(task, task_output)
|
self._store_execution_log(task, task_output, task_index)
|
||||||
self._store_execution_log(task, task_output, task_index)
|
|
||||||
|
|
||||||
if futures:
|
if futures:
|
||||||
task_outputs = self._process_async_tasks(futures)
|
task_outputs = self._process_async_tasks(futures)
|
||||||
|
|
||||||
final_task_output = task_outputs[0]
|
return self._create_crew_output(task_outputs)
|
||||||
|
|
||||||
final_string_output = final_task_output.raw
|
def _prepare_task(self, task: Task, manager: Optional[BaseAgent]):
|
||||||
self._finish_execution(final_string_output)
|
if task.agent and task.agent.allow_delegation:
|
||||||
|
self._add_delegation_tools(task)
|
||||||
|
if self.process == Process.hierarchical:
|
||||||
|
self._update_manager_tools(task, manager)
|
||||||
|
|
||||||
token_usage = self.calculate_usage_metrics()
|
def _add_delegation_tools(self, task: Task):
|
||||||
|
agents_for_delegation = [agent for agent in self.agents if agent != task.agent]
|
||||||
|
if len(self.agents) > 1 and agents_for_delegation:
|
||||||
|
task.tools += task.agent.get_delegation_tools(
|
||||||
|
agents_for_delegation
|
||||||
|
) # TODO: FIX TYPE ERROR HERE
|
||||||
|
|
||||||
return CrewOutput(
|
def _log_task_start(
|
||||||
raw=final_task_output.raw,
|
self, task: Task, agent: Optional[BaseAgent], color: str = "bold_purple"
|
||||||
pydantic=final_task_output.pydantic,
|
):
|
||||||
json_dict=final_task_output.json_dict,
|
role = agent.role if agent else "None"
|
||||||
tasks_output=[task.output for task in self.tasks if task.output],
|
self._logger.log("debug", f"Working Agent: {role}", color=color)
|
||||||
token_usage=token_usage,
|
self._logger.log("info", f"Starting Task: {task.description}", color=color)
|
||||||
)
|
if self.output_log_file:
|
||||||
# return task_outputs
|
self._file_handler.log(agent=role, task=task.description, status="started")
|
||||||
|
|
||||||
|
def _update_manager_tools(self, task: Task, manager: Optional[BaseAgent]):
|
||||||
|
if task.agent and manager:
|
||||||
|
manager.tools = task.agent.get_delegation_tools([task.agent])
|
||||||
|
if manager:
|
||||||
|
manager.tools = manager.get_delegation_tools(self.agents)
|
||||||
|
|
||||||
def _set_context(self, task: Task, task_outputs: List[TaskOutput]):
|
def _set_context(self, task: Task, task_outputs: List[TaskOutput]):
|
||||||
context = (
|
context = (
|
||||||
@@ -649,6 +670,24 @@ class Crew(BaseModel):
|
|||||||
if self.output_log_file:
|
if self.output_log_file:
|
||||||
self._file_handler.log(agent=role, task=output, status="completed")
|
self._file_handler.log(agent=role, task=output, status="completed")
|
||||||
|
|
||||||
|
def _create_crew_output(self, task_outputs: List[TaskOutput]) -> CrewOutput:
|
||||||
|
if len(task_outputs) != 1:
|
||||||
|
raise ValueError(
|
||||||
|
"Something went wrong. Kickoff should return only one task output."
|
||||||
|
)
|
||||||
|
final_task_output = task_outputs[0]
|
||||||
|
final_string_output = final_task_output.raw
|
||||||
|
self._finish_execution(final_string_output)
|
||||||
|
token_usage = self.calculate_usage_metrics()
|
||||||
|
|
||||||
|
return CrewOutput(
|
||||||
|
raw=final_task_output.raw,
|
||||||
|
pydantic=final_task_output.pydantic,
|
||||||
|
json_dict=final_task_output.json_dict,
|
||||||
|
tasks_output=[task.output for task in self.tasks if task.output],
|
||||||
|
token_usage=token_usage,
|
||||||
|
)
|
||||||
|
|
||||||
def _process_async_tasks(
|
def _process_async_tasks(
|
||||||
self,
|
self,
|
||||||
futures: List[Tuple[Task, Future[TaskOutput], int]],
|
futures: List[Tuple[Task, Future[TaskOutput], int]],
|
||||||
@@ -679,16 +718,13 @@ class Crew(BaseModel):
|
|||||||
def replay_from_task(
|
def replay_from_task(
|
||||||
self, task_id: str, inputs: Dict[str, Any] | None = None
|
self, task_id: str, inputs: Dict[str, Any] | None = None
|
||||||
) -> CrewOutput:
|
) -> CrewOutput:
|
||||||
# stored_outputs = self._load_stored_outputs()
|
|
||||||
stored_outputs = TaskOutputJsonHandler(CREW_TASKS_OUTPUT_FILE).load()
|
stored_outputs = TaskOutputJsonHandler(CREW_TASKS_OUTPUT_FILE).load()
|
||||||
start_index = self._find_task_index(task_id, stored_outputs)
|
start_index = self._find_task_index(task_id, stored_outputs)
|
||||||
|
|
||||||
if start_index is None:
|
if start_index is None:
|
||||||
raise ValueError(f"Task with id {task_id} not found in the crew's tasks.")
|
raise ValueError(f"Task with id {task_id} not found in the crew's tasks.")
|
||||||
|
|
||||||
task_outputs: List[
|
task_outputs: List[TaskOutput] = []
|
||||||
TaskOutput
|
|
||||||
] = [] # will propogate the old outputs first to add context then fill the content with the new task outputs relative to the replay start
|
|
||||||
futures: List[Tuple[Task, Future[TaskOutput], int]] = []
|
futures: List[Tuple[Task, Future[TaskOutput], int]] = []
|
||||||
|
|
||||||
# inputs can be overrided with new passed inputs
|
# inputs can be overrided with new passed inputs
|
||||||
@@ -717,41 +753,14 @@ class Crew(BaseModel):
|
|||||||
self.tasks[task_index].output = task_output
|
self.tasks[task_index].output = task_output
|
||||||
task_outputs = [task_output]
|
task_outputs = [task_output]
|
||||||
else:
|
else:
|
||||||
if task.agent and task.agent.allow_delegation:
|
self._prepare_task(task, self.manager_agent)
|
||||||
agents_for_delegation = [
|
|
||||||
agent for agent in self.agents if agent != task.agent
|
|
||||||
]
|
|
||||||
if len(self.agents) > 1 and len(agents_for_delegation) > 0:
|
|
||||||
task.tools += task.agent.get_delegation_tools(
|
|
||||||
agents_for_delegation
|
|
||||||
)
|
|
||||||
|
|
||||||
if self.process == Process.hierarchical:
|
|
||||||
if task.agent and self.manager_agent:
|
|
||||||
self.manager_agent.tools = task.agent.get_delegation_tools(
|
|
||||||
[task.agent]
|
|
||||||
)
|
|
||||||
if self.manager_agent:
|
|
||||||
self.manager_agent.tools = (
|
|
||||||
self.manager_agent.get_delegation_tools(self.agents)
|
|
||||||
)
|
|
||||||
agent_to_use = task.agent if task.agent else self.manager_agent
|
agent_to_use = task.agent if task.agent else self.manager_agent
|
||||||
role = agent_to_use.role if agent_to_use is not None else "None"
|
if agent_to_use is None:
|
||||||
log_color = "bold_blue"
|
raise ValueError(
|
||||||
self._logger.log(
|
f"No agent available for task: {task.description}. Ensure that either the task has an assigned agent or a manager agent is provided."
|
||||||
"debug", f"Replaying Working Agent: {role}", color=log_color
|
|
||||||
)
|
|
||||||
self._logger.log(
|
|
||||||
"info",
|
|
||||||
f"Replaying Task: {task.description}",
|
|
||||||
color=log_color,
|
|
||||||
)
|
|
||||||
|
|
||||||
if self.output_log_file:
|
|
||||||
self._file_handler.log(
|
|
||||||
agent=role, task=task.description, status="started"
|
|
||||||
)
|
)
|
||||||
# Execute task for replay and subsequent tasks
|
self._log_task_start(task, agent_to_use, "bold_blue")
|
||||||
|
|
||||||
if task.async_execution:
|
if task.async_execution:
|
||||||
context = self._set_context(task, task_outputs)
|
context = self._set_context(task, task_outputs)
|
||||||
future = task.execute_async(
|
future = task.execute_async(
|
||||||
@@ -778,65 +787,7 @@ class Crew(BaseModel):
|
|||||||
if futures:
|
if futures:
|
||||||
task_outputs = self._process_async_tasks(futures, True)
|
task_outputs = self._process_async_tasks(futures, True)
|
||||||
|
|
||||||
if len(task_outputs) != 1:
|
return self._create_crew_output(task_outputs)
|
||||||
raise ValueError(
|
|
||||||
"Something went wrong. Kickoff should return only one task output."
|
|
||||||
)
|
|
||||||
final_task_output = task_outputs[0]
|
|
||||||
final_string_output = final_task_output.raw
|
|
||||||
self._finish_execution(final_string_output)
|
|
||||||
|
|
||||||
token_usage = self.calculate_usage_metrics()
|
|
||||||
|
|
||||||
return CrewOutput(
|
|
||||||
raw=final_task_output.raw,
|
|
||||||
pydantic=final_task_output.pydantic,
|
|
||||||
json_dict=final_task_output.json_dict,
|
|
||||||
tasks_output=[task.output for task in self.tasks if task.output],
|
|
||||||
token_usage=token_usage,
|
|
||||||
)
|
|
||||||
|
|
||||||
def _create_manager_agent(self):
|
|
||||||
i18n = I18N(prompt_file=self.prompt_file)
|
|
||||||
if self.manager_agent is not None:
|
|
||||||
self.manager_agent.allow_delegation = True
|
|
||||||
manager = self.manager_agent
|
|
||||||
if manager.tools is not None and len(manager.tools) > 0:
|
|
||||||
raise Exception("Manager agent should not have tools")
|
|
||||||
manager.tools = self.manager_agent.get_delegation_tools(self.agents)
|
|
||||||
else:
|
|
||||||
manager = Agent(
|
|
||||||
role=i18n.retrieve("hierarchical_manager_agent", "role"),
|
|
||||||
goal=i18n.retrieve("hierarchical_manager_agent", "goal"),
|
|
||||||
backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"),
|
|
||||||
tools=AgentTools(agents=self.agents).tools(),
|
|
||||||
llm=self.manager_llm,
|
|
||||||
verbose=self.verbose,
|
|
||||||
)
|
|
||||||
self.manager_agent = manager
|
|
||||||
|
|
||||||
def _run_hierarchical_process(self) -> CrewOutput:
|
|
||||||
"""Creates and assigns a manager agent to make sure the crew completes the tasks."""
|
|
||||||
self.execution_logs = []
|
|
||||||
i18n = I18N(prompt_file=self.prompt_file)
|
|
||||||
if self.manager_agent is not None:
|
|
||||||
self.manager_agent.allow_delegation = True
|
|
||||||
manager = self.manager_agent
|
|
||||||
if manager.tools is not None and len(manager.tools) > 0:
|
|
||||||
raise Exception("Manager agent should not have tools")
|
|
||||||
manager.tools = self.manager_agent.get_delegation_tools(self.agents)
|
|
||||||
else:
|
|
||||||
manager = Agent(
|
|
||||||
role=i18n.retrieve("hierarchical_manager_agent", "role"),
|
|
||||||
goal=i18n.retrieve("hierarchical_manager_agent", "goal"),
|
|
||||||
backstory=i18n.retrieve("hierarchical_manager_agent", "backstory"),
|
|
||||||
tools=AgentTools(agents=self.agents).tools(),
|
|
||||||
llm=self.manager_llm,
|
|
||||||
verbose=self.verbose,
|
|
||||||
)
|
|
||||||
self.manager_agent = manager
|
|
||||||
|
|
||||||
return self._execute_tasks(self.tasks, manager)
|
|
||||||
|
|
||||||
def copy(self):
|
def copy(self):
|
||||||
"""Create a deep copy of the Crew."""
|
"""Create a deep copy of the Crew."""
|
||||||
|
|||||||
Reference in New Issue
Block a user