Cleaned up task execution to now have separate paths for async and sync execution. Updating all kickoff functions to return CrewOutput. WIP. Waiting for Joao feedback on async task execution with task_output

This commit is contained in:
Brandon Hancock
2024-06-20 12:11:27 -04:00
parent 377f919d42
commit ea5a784877
7 changed files with 10351 additions and 220 deletions

View File

@@ -1,7 +1,8 @@
import asyncio
import json
import uuid
from typing import Any, Dict, List, Optional, Union
from concurrent.futures import Future
from typing import Any, Dict, List, Optional, Tuple, Union
from langchain_core.callbacks import BaseCallbackHandler
from pydantic import (
@@ -19,6 +20,7 @@ from pydantic_core import PydanticCustomError
from crewai.agent import Agent
from crewai.agents.cache import CacheHandler
from crewai.crews.crew_output import CrewOutput
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory
@@ -245,7 +247,7 @@ class Crew(BaseModel):
def kickoff(
self,
inputs: Optional[Dict[str, Any]] = {},
) -> Union[str, Dict[str, Any]]:
) -> CrewOutput:
"""Starts the crew to work on its assigned tasks."""
self._execution_span = self._telemetry.crew_execution_span(self)
# type: ignore # Argument 1 to "_interpolate_inputs" of "Crew" has incompatible type "dict[str, Any] | None"; expected "dict[str, Any]"
@@ -288,9 +290,9 @@ class Crew(BaseModel):
return result
def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List:
def kickoff_for_each(self, inputs: List[Dict[str, Any]]) -> List[CrewOutput]:
"""Executes the Crew's workflow for each input in the list and aggregates results."""
results = []
results: List[CrewOutput] = []
for input_data in inputs:
crew = self.copy()
@@ -306,12 +308,12 @@ class Crew(BaseModel):
return results
async def kickoff_async(
self, inputs: Optional[Dict[str, Any]] = {}
self, inputs: Optional[CrewOutput] = {}
) -> Union[str, Dict]:
"""Asynchronous kickoff method to start the crew execution."""
return await asyncio.to_thread(self.kickoff, inputs)
async def kickoff_for_each_async(self, inputs: List[Dict]) -> List[Any]:
async def kickoff_for_each_async(self, inputs: List[Dict]) -> List[CrewOutput]:
async def run_crew(input_data):
crew = self.copy()
@@ -332,10 +334,20 @@ class Crew(BaseModel):
# TODO: Implement training
pass
def _run_sequential_process(self) -> Union[str, Dict[str, Any]]:
def _run_sequential_process(self) -> CrewOutput:
"""Executes tasks sequentially and returns the final output."""
# TODO: Check to see if we need to be clearing task output after each task
task_output = ""
for task in self.tasks:
futures: List[Tuple[int, Task, Future]] = []
task_results = {}
def _process_task_result(task, output):
role = task.agent.role if task.agent is not None else "None"
self._logger.log("debug", f"== [{role}] Task output: {output}\n\n")
if self.output_log_file:
self._file_handler.log(agent=role, task=output, status="completed")
for index, task in enumerate(self.tasks):
if task.agent.allow_delegation: # type: ignore # Item "None" of "Agent | None" has no attribute "allow_delegation"
agents_for_delegation = [
agent for agent in self.agents if agent != task.agent
@@ -354,24 +366,43 @@ class Crew(BaseModel):
agent=role, task=task.description, status="started"
)
output = task.execute(context=task_output)
if task.async_execution:
future = task.execute_async(
agent=task.agent, context=task_output, tools=task.tools
)
futures.append((index, task, future))
else:
# Before executing a synchronous task, wait for all async tasks to complete
if futures:
for future_index, future_task, future in futures:
output = future.result()
task_results[future_index] = output
_process_task_result(future_task, output)
if not task.async_execution:
# Clear the futures list after processing all async results
futures.clear()
output = task.execute_sync(
agent=task.agent, context=task_output, tools=task.tools
)
task_results[index] = output
task_output = output
_process_task_result(task, output)
role = task.agent.role if task.agent is not None else "None"
self._logger.log("debug", f"== [{role}] Task output: {task_output}\n\n")
if self.output_log_file:
self._file_handler.log(agent=role, task=task_output, status="completed")
# TODO: Check with Joao to see if we want to add or ignore outputs from async tasks
# Process any remaining async results
for future_index, future_task, future in futures:
output = future.result()
task_results[future_index] = output
_process_task_result(future_task, output)
self._finish_execution(task_output)
# type: ignore # Item "None" of "Agent | None" has no attribute "_token_process"
token_usage = task.agent._token_process.get_summary()
# type: ignore # Incompatible return value type (got "tuple[str, Any]", expected "str")
return self._format_output(task_output, token_usage)
def _run_hierarchical_process(self) -> Union[str, Dict[str, Any]]:
# TODO: Updates this to mimic the async and sync exeuction of tasks in sequential process
def _run_hierarchical_process(self) -> Tuple[CrewOutput, Dict[str, Any]]:
"""Creates and assigns a manager agent to make sure the crew completes the tasks."""
i18n = I18N(prompt_file=self.prompt_file)
@@ -392,7 +423,10 @@ class Crew(BaseModel):
)
task_output = ""
for task in self.tasks:
futures: List[Tuple[int, Task, Future]] = []
task_results = {}
for index, task in enumerate(self.tasks):
self._logger.log("debug", f"Working Agent: {manager.role}")
self._logger.log("info", f"Starting Task: {task.description}")
@@ -401,23 +435,47 @@ class Crew(BaseModel):
agent=manager.role, task=task.description, status="started"
)
task_output = task.execute(
agent=manager, context=task_output, tools=manager.tools
)
if task.async_execution:
future = task.execute_async(
agent=manager, context=task_output, tools=manager.tools
)
futures.append((index, task, future))
else:
output = task.execute_sync(
agent=manager, context=task_output, tools=manager.tools
)
task_results[index] = output
self._logger.log("debug", f"[{manager.role}] Task output: {task_output}")
self._logger.log("debug", f"[{manager.role}] Task output: {output}")
if self.output_log_file:
self._file_handler.log(
agent=manager.role, task=output, status="completed"
)
# Process async results in order
for index, task, future in sorted(futures):
output = future.result()
task_results[index] = output
role = manager.role
self._logger.log("debug", f"== [{role}] Task output: {output}\n\n")
if self.output_log_file:
self._file_handler.log(
agent=manager.role, task=task_output, status="completed"
)
self._file_handler.log(agent=role, task=output, status="completed")
# Get the final task_output from the last task result
final_index = len(self.tasks) - 1
if final_index in task_results:
task_output = task_results[final_index]
self._finish_execution(task_output)
# type: ignore # Incompatible return value type (got "tuple[str, Any]", expected "str")
manager_token_usage = manager._token_process.get_summary()
return self._format_output(
task_output, manager_token_usage
), manager_token_usage
return (
self._format_output(task_output, manager_token_usage),
manager_token_usage,
)
def copy(self):
"""Create a deep copy of the Crew."""
@@ -469,21 +527,21 @@ class Crew(BaseModel):
def _format_output(
self, output: str, token_usage: Optional[Dict[str, Any]]
) -> Union[str, Dict[str, Any]]:
) -> CrewOutput:
"""
Formats the output of the crew execution.
If full_output is True, then returned data type will be a dictionary else returned outputs are string
"""
if self.full_output:
return { # type: ignore # Incompatible return value type (got "dict[str, Sequence[str | TaskOutput | None]]", expected "str")
"final_output": output,
"tasks_outputs": [task.output for task in self.tasks if task],
"usage_metrics": token_usage,
}
else:
return output
print("Crew Output: ", output)
print("Crew output type: ", type(output))
print("SELF TASKS: ", self.tasks)
print("Tasks Output: ", [task.output for task in self.tasks if task])
return CrewOutput(
final_output=output,
tasks_output=[task.output for task in self.tasks if task],
token_output=token_usage,
)
def _finish_execution(self, output) -> None:
def _finish_execution(self, output: str) -> None:
if self.max_rpm:
self._rpm_controller.stop_rpm_counter()
self._telemetry.end_crew(self, output)

View File

@@ -0,0 +1 @@
from .crew_output import CrewOutput

View File

@@ -0,0 +1,18 @@
from typing import Any, Dict, Union
from pydantic import BaseModel, Field
from crewai.tasks.task_output import TaskOutput
class CrewOutput(BaseModel):
final_output: str = Field(description="Final output of the crew")
tasks_output: list[TaskOutput] = Field(
description="Output of each task", default=[]
)
token_output: Dict[str, Any] = Field(
description="Processed token summary", default={}
)
def __str__(self):
return self.final_output

View File

@@ -1,8 +1,9 @@
from copy import deepcopy
import os
import re
import threading
import uuid
from concurrent.futures import Future
from copy import deepcopy
from typing import Any, Dict, List, Optional, Type
from langchain_openai import ChatOpenAI
@@ -145,18 +146,55 @@ class Task(BaseModel):
)
return self
def execute( # type: ignore # Missing return statement
def execute_sync(
self,
agent: Agent | None = None,
agent: Optional[Agent] = None,
context: Optional[str] = None,
tools: Optional[List[Any]] = None,
) -> str:
"""Execute the task.
"""Execute the task synchronously."""
return self._execute_task_sync(agent, context, tools)
Returns:
Output of the task.
"""
def execute_async(
self,
agent: Optional[Agent] = None,
context: Optional[str] = None,
tools: Optional[List[Any]] = None,
) -> Future:
"""Execute the task asynchronously."""
future = Future()
threading.Thread(
target=self._execute_task_async, args=(agent, context, tools, future)
).start()
return future
def _execute_task_sync(
self,
agent: Optional[Agent],
context: Optional[str],
tools: Optional[List[Any]],
) -> str:
"""Execute the task synchronously with context handling."""
return self._execute_core(agent, context, tools)
def _execute_task_async(
self,
agent: Optional[Agent],
context: Optional[str],
tools: Optional[List[Any]],
future: Future,
) -> None:
"""Execute the task asynchronously with context handling."""
result = self._execute_core(agent, context, tools)
future.set_result(result)
def _execute_core(
self,
agent: Optional[Agent],
context: Optional[str],
tools: Optional[List[Any]],
) -> str:
"""Run the core execution logic of the task."""
agent = agent or self.agent
if not agent:
raise Exception(
@@ -164,37 +202,19 @@ class Task(BaseModel):
)
if self.context:
# type: ignore # Incompatible types in assignment (expression has type "list[Never]", variable has type "str | None")
context = []
context_list = []
for task in self.context:
if task.async_execution:
task.thread.join() # type: ignore # Item "None" of "Thread | None" has no attribute "join"
if task.async_execution and task.thread:
task.thread.join()
if task and task.output:
# type: ignore # Item "str" of "str | None" has no attribute "append"
context.append(task.output.raw_output)
# type: ignore # Argument 1 to "join" of "str" has incompatible type "str | None"; expected "Iterable[str]"
context = "\n".join(context)
context_list.append(task.output.raw_output)
context = "\n".join(context_list)
self.prompt_context = context
tools = tools or self.tools
if self.async_execution:
self.thread = threading.Thread(
target=self._execute, args=(agent, self, context, tools)
)
self.thread.start()
else:
result = self._execute(
task=self,
agent=agent,
context=context,
tools=tools,
)
return result
def _execute(self, agent, task, context, tools):
result = agent.execute_task(
task=task,
task=self,
context=context,
tools=tools,
)
@@ -328,7 +348,9 @@ class Task(BaseModel):
if self.output_file:
content = (
# type: ignore # "str" has no attribute "json"
exported_result if not self.output_pydantic else exported_result.json()
exported_result
if not self.output_pydantic
else exported_result.json()
)
self._save_file(content)