Fix/async tasks (#877)

* fix: async tasks calls

* fix: some issue along with some type check errors

* fix: some issue along with some type check errors

* fix: async test
This commit is contained in:
Eduardo Chiarotti
2024-07-06 01:30:07 -03:00
committed by GitHub
parent bb64c80964
commit a41bd18599
3 changed files with 50 additions and 46 deletions

View File

@@ -421,17 +421,17 @@ class Crew(BaseModel):
return results return results
def _run_sequential_process(self) -> str: def _run_sequential_process(self) -> Union[str, Dict[str, Any]]:
"""Executes tasks sequentially and returns the final output.""" """Executes tasks sequentially and returns the final output."""
task_output = "" task_output = None
for task in self.tasks: for task in self.tasks:
if task.agent.allow_delegation: # type: ignore # Item "None" of "Agent | None" has no attribute "allow_delegation" if task.agent and task.agent.allow_delegation:
agents_for_delegation = [ agents_for_delegation = [
agent for agent in self.agents if agent != task.agent agent for agent in self.agents if agent != task.agent
] ]
if len(self.agents) > 1 and len(agents_for_delegation) > 0: if len(self.agents) > 1 and len(agents_for_delegation) > 0:
task.tools += task.agent.get_delegation_tools(agents_for_delegation) # type: ignore # Item "None" of "BaseAgent | None" has no attribute "get_delegation_tools" task.tools += task.agent.get_delegation_tools(agents_for_delegation)
role = task.agent.role if task.agent is not None else "None" role = task.agent.role if task.agent is not None else "None"
self._logger.log("debug", f"== Working Agent: {role}", color="bold_purple") self._logger.log("debug", f"== Working Agent: {role}", color="bold_purple")
@@ -458,7 +458,7 @@ class Crew(BaseModel):
token_usage = self.calculate_usage_metrics() token_usage = self.calculate_usage_metrics()
return self._format_output(task_output, token_usage) # type: ignore # Incompatible return value type (got "tuple[str, Any]", expected "str") return self._format_output(task_output if task_output else "", token_usage)
def _run_hierarchical_process( def _run_hierarchical_process(
self, self,
@@ -483,7 +483,7 @@ class Crew(BaseModel):
) )
self.manager_agent = manager self.manager_agent = manager
task_output = "" task_output = None
for task in self.tasks: for task in self.tasks:
self._logger.log("debug", f"Working Agent: {manager.role}") self._logger.log("debug", f"Working Agent: {manager.role}")
@@ -510,10 +510,11 @@ class Crew(BaseModel):
self._finish_execution(task_output) self._finish_execution(task_output)
# type: ignore # Incompatible return value type (got "tuple[str, Any]", expected "str")
token_usage = self.calculate_usage_metrics() token_usage = self.calculate_usage_metrics()
return self._format_output(task_output, token_usage), token_usage return self._format_output(
task_output if task_output else "", token_usage
), token_usage
def copy(self): def copy(self):
"""Create a deep copy of the Crew.""" """Create a deep copy of the Crew."""
@@ -574,7 +575,7 @@ class Crew(BaseModel):
""" """
if self.full_output: if self.full_output:
return { # type: ignore # Incompatible return value type (got "dict[str, Sequence[str | TaskOutput | None]]", expected "str") return {
"final_output": output, "final_output": output,
"tasks_outputs": [task.output for task in self.tasks if task], "tasks_outputs": [task.output for task in self.tasks if task],
"usage_metrics": token_usage, "usage_metrics": token_usage,

View File

@@ -1,7 +1,7 @@
import os import os
import re import re
import threading
import uuid import uuid
from concurrent.futures import Future, ThreadPoolExecutor
from copy import copy from copy import copy
from typing import Any, Dict, List, Optional, Type, Union from typing import Any, Dict, List, Optional, Type, Union
@@ -102,7 +102,7 @@ class Task(BaseModel):
_execution_span: Span | None = None _execution_span: Span | None = None
_original_description: str | None = None _original_description: str | None = None
_original_expected_output: str | None = None _original_expected_output: str | None = None
_thread: threading.Thread | None = None _future: Future | None = None
def __init__(__pydantic_self__, **data): def __init__(__pydantic_self__, **data):
config = data.pop("config", {}) config = data.pop("config", {})
@@ -161,20 +161,20 @@ class Task(BaseModel):
"""Wait for asynchronous task completion and return the output.""" """Wait for asynchronous task completion and return the output."""
assert self.async_execution, "Task is not set to be executed asynchronously." assert self.async_execution, "Task is not set to be executed asynchronously."
if self._thread: if self._future:
self._thread.join() self._future.result() # Wait for the future to complete
self._thread = None self._future = None
assert self.output, "Task output is not set." assert self.output, "Task output is not set."
return self.output.exported_output return self.output.exported_output
def execute( # type: ignore # Missing return statement def execute(
self, self,
agent: BaseAgent | None = None, agent: BaseAgent | None = None,
context: Optional[str] = None, context: Optional[str] = None,
tools: Optional[List[Any]] = None, tools: Optional[List[Any]] = None,
) -> str: ) -> str | None:
"""Execute the task. """Execute the task.
Returns: Returns:
@@ -186,26 +186,28 @@ class Task(BaseModel):
agent = agent or self.agent agent = agent or self.agent
if not agent: if not agent:
raise Exception( raise Exception(
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical." f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly "
"and should be executed in a Crew using a specific process that support that, like hierarchical."
) )
if self.context: if self.context:
context = [] # type: ignore # Incompatible types in assignment (expression has type "list[Never]", variable has type "str | None") internal_context = []
for task in self.context: for task in self.context:
if task.async_execution: if task.async_execution:
task.wait_for_completion() task.wait_for_completion()
if task.output: if task.output:
context.append(task.output.raw_output) # type: ignore # Item "str" of "str | None" has no attribute "append" internal_context.append(task.output.raw_output)
context = "\n".join(context) # type: ignore # Argument 1 to "join" of "str" has incompatible type "str | None"; expected "Iterable[str]" context = "\n".join(internal_context)
self.prompt_context = context self.prompt_context = context
tools = tools or self.tools tools = tools or self.tools
if self.async_execution: if self.async_execution:
self._thread = threading.Thread( with ThreadPoolExecutor() as executor:
target=self._execute, args=(agent, self, context, tools) self._future = executor.submit(
) self._execute, agent, self, context, tools
self._thread.start() )
return None
else: else:
result = self._execute( result = self._execute(
task=self, task=self,
@@ -215,7 +217,7 @@ class Task(BaseModel):
) )
return result return result
def _execute(self, agent: "BaseAgent", task, context, tools): def _execute(self, agent: "BaseAgent", task, context, tools) -> str | None:
result = agent.execute_task( result = agent.execute_task(
task=task, task=task,
context=context, context=context,
@@ -223,7 +225,7 @@ class Task(BaseModel):
) )
exported_output = self._export_output(result) exported_output = self._export_output(result)
self.output = TaskOutput( # type: ignore # the responses are usually str but need to figure out a more elegant solution here self.output = TaskOutput(
description=self.description, description=self.description,
exported_output=exported_output, exported_output=exported_output,
raw_output=result, raw_output=result,

View File

@@ -501,13 +501,13 @@ def test_agents_rpm_is_never_set_if_crew_max_RPM_is_not_set():
def test_async_task_execution(): def test_async_task_execution():
import threading from concurrent.futures import ThreadPoolExecutor
from unittest.mock import patch from unittest.mock import MagicMock, patch
from crewai.tasks.task_output import TaskOutput from crewai.tasks.task_output import TaskOutput
list_ideas = Task( list_ideas = Task(
description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.", description="Give me a list of 5 interesting ideas to explore for an article, what makes them unique and interesting.",
expected_output="Bullet point list of 5 important events.", expected_output="Bullet point list of 5 important events.",
agent=researcher, agent=researcher,
async_execution=True, async_execution=True,
@@ -533,23 +533,24 @@ def test_async_task_execution():
with patch.object(Agent, "execute_task") as execute: with patch.object(Agent, "execute_task") as execute:
execute.return_value = "ok" execute.return_value = "ok"
with patch.object(threading.Thread, "start") as start: with patch.object(ThreadPoolExecutor, "submit") as submit:
thread = threading.Thread(target=lambda: None, args=()).start() future = MagicMock()
start.return_value = thread future.result.return_value = "ok"
with patch.object(threading.Thread, "join", wraps=thread.join()) as join: submit.return_value = future
list_ideas.output = TaskOutput(
description="A 4 paragraph article about AI.", list_ideas.output = TaskOutput(
raw_output="ok", description="A 4 paragraph article about AI.",
agent="writer", raw_output="ok",
) agent="writer",
list_important_history.output = TaskOutput( )
description="A 4 paragraph article about AI.", list_important_history.output = TaskOutput(
raw_output="ok", description="A 4 paragraph article about AI.",
agent="writer", raw_output="ok",
) agent="writer",
crew.kickoff() )
start.assert_called() crew.kickoff()
join.assert_called() submit.assert_called()
future.result.assert_called()
@pytest.mark.vcr(filter_headers=["authorization"]) @pytest.mark.vcr(filter_headers=["authorization"])