diff --git a/src/crewai/crew.py b/src/crewai/crew.py index e15199c01..5d29c6db3 100644 --- a/src/crewai/crew.py +++ b/src/crewai/crew.py @@ -421,17 +421,17 @@ class Crew(BaseModel): return results - def _run_sequential_process(self) -> str: + def _run_sequential_process(self) -> Union[str, Dict[str, Any]]: """Executes tasks sequentially and returns the final output.""" - task_output = "" + task_output = None for task in self.tasks: - if task.agent.allow_delegation: # type: ignore # Item "None" of "Agent | None" has no attribute "allow_delegation" + if task.agent and task.agent.allow_delegation: agents_for_delegation = [ agent for agent in self.agents if agent != task.agent ] if len(self.agents) > 1 and len(agents_for_delegation) > 0: - task.tools += task.agent.get_delegation_tools(agents_for_delegation) # type: ignore # Item "None" of "BaseAgent | None" has no attribute "get_delegation_tools" + task.tools += task.agent.get_delegation_tools(agents_for_delegation) role = task.agent.role if task.agent is not None else "None" self._logger.log("debug", f"== Working Agent: {role}", color="bold_purple") @@ -458,7 +458,7 @@ class Crew(BaseModel): token_usage = self.calculate_usage_metrics() - return self._format_output(task_output, token_usage) # type: ignore # Incompatible return value type (got "tuple[str, Any]", expected "str") + return self._format_output(task_output if task_output else "", token_usage) def _run_hierarchical_process( self, @@ -483,7 +483,7 @@ class Crew(BaseModel): ) self.manager_agent = manager - task_output = "" + task_output = None for task in self.tasks: self._logger.log("debug", f"Working Agent: {manager.role}") @@ -510,10 +510,11 @@ class Crew(BaseModel): self._finish_execution(task_output) - # type: ignore # Incompatible return value type (got "tuple[str, Any]", expected "str") token_usage = self.calculate_usage_metrics() - return self._format_output(task_output, token_usage), token_usage + return self._format_output( + task_output if task_output else "", token_usage + ), token_usage def copy(self): """Create a deep copy of the Crew.""" @@ -574,7 +575,7 @@ class Crew(BaseModel): """ if self.full_output: - return { # type: ignore # Incompatible return value type (got "dict[str, Sequence[str | TaskOutput | None]]", expected "str") + return { "final_output": output, "tasks_outputs": [task.output for task in self.tasks if task], "usage_metrics": token_usage, diff --git a/src/crewai/task.py b/src/crewai/task.py index 168d5db87..292f178ff 100644 --- a/src/crewai/task.py +++ b/src/crewai/task.py @@ -1,7 +1,7 @@ import os import re -import threading import uuid +from concurrent.futures import Future, ThreadPoolExecutor from copy import copy from typing import Any, Dict, List, Optional, Type, Union @@ -102,7 +102,7 @@ class Task(BaseModel): _execution_span: Span | None = None _original_description: str | None = None _original_expected_output: str | None = None - _thread: threading.Thread | None = None + _future: Future | None = None def __init__(__pydantic_self__, **data): config = data.pop("config", {}) @@ -161,20 +161,20 @@ class Task(BaseModel): """Wait for asynchronous task completion and return the output.""" assert self.async_execution, "Task is not set to be executed asynchronously." - if self._thread: - self._thread.join() - self._thread = None + if self._future: + self._future.result() # Wait for the future to complete + self._future = None assert self.output, "Task output is not set." return self.output.exported_output - def execute( # type: ignore # Missing return statement + def execute( self, agent: BaseAgent | None = None, context: Optional[str] = None, tools: Optional[List[Any]] = None, - ) -> str: + ) -> str | None: """Execute the task. Returns: @@ -186,26 +186,28 @@ class Task(BaseModel): agent = agent or self.agent if not agent: raise Exception( - f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical." + f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly " + "and should be executed in a Crew using a specific process that support that, like hierarchical." ) if self.context: - context = [] # type: ignore # Incompatible types in assignment (expression has type "list[Never]", variable has type "str | None") + internal_context = [] for task in self.context: if task.async_execution: task.wait_for_completion() if task.output: - context.append(task.output.raw_output) # type: ignore # Item "str" of "str | None" has no attribute "append" - context = "\n".join(context) # type: ignore # Argument 1 to "join" of "str" has incompatible type "str | None"; expected "Iterable[str]" + internal_context.append(task.output.raw_output) + context = "\n".join(internal_context) self.prompt_context = context tools = tools or self.tools if self.async_execution: - self._thread = threading.Thread( - target=self._execute, args=(agent, self, context, tools) - ) - self._thread.start() + with ThreadPoolExecutor() as executor: + self._future = executor.submit( + self._execute, agent, self, context, tools + ) + return None else: result = self._execute( task=self, @@ -215,7 +217,7 @@ class Task(BaseModel): ) return result - def _execute(self, agent: "BaseAgent", task, context, tools): + def _execute(self, agent: "BaseAgent", task, context, tools) -> str | None: result = agent.execute_task( task=task, context=context, @@ -223,7 +225,7 @@ class Task(BaseModel): ) exported_output = self._export_output(result) - self.output = TaskOutput( # type: ignore # the responses are usually str but need to figure out a more elegant solution here + self.output = TaskOutput( description=self.description, exported_output=exported_output, raw_output=result, diff --git a/tests/crew_test.py b/tests/crew_test.py index 31ac55444..fe2c698a4 100644 --- a/tests/crew_test.py +++ b/tests/crew_test.py @@ -501,13 +501,13 @@ def test_agents_rpm_is_never_set_if_crew_max_RPM_is_not_set(): def test_async_task_execution(): - import threading - from unittest.mock import patch + from concurrent.futures import ThreadPoolExecutor + from unittest.mock import MagicMock, patch from crewai.tasks.task_output import TaskOutput list_ideas = Task( - description="Give me a list of 5 interesting ideas to explore for na article, what makes them unique and interesting.", + description="Give me a list of 5 interesting ideas to explore for an article, what makes them unique and interesting.", expected_output="Bullet point list of 5 important events.", agent=researcher, async_execution=True, @@ -533,23 +533,24 @@ def test_async_task_execution(): with patch.object(Agent, "execute_task") as execute: execute.return_value = "ok" - with patch.object(threading.Thread, "start") as start: - thread = threading.Thread(target=lambda: None, args=()).start() - start.return_value = thread - with patch.object(threading.Thread, "join", wraps=thread.join()) as join: - list_ideas.output = TaskOutput( - description="A 4 paragraph article about AI.", - raw_output="ok", - agent="writer", - ) - list_important_history.output = TaskOutput( - description="A 4 paragraph article about AI.", - raw_output="ok", - agent="writer", - ) - crew.kickoff() - start.assert_called() - join.assert_called() + with patch.object(ThreadPoolExecutor, "submit") as submit: + future = MagicMock() + future.result.return_value = "ok" + submit.return_value = future + + list_ideas.output = TaskOutput( + description="A 4 paragraph article about AI.", + raw_output="ok", + agent="writer", + ) + list_important_history.output = TaskOutput( + description="A 4 paragraph article about AI.", + raw_output="ok", + agent="writer", + ) + crew.kickoff() + submit.assert_called() + future.result.assert_called() @pytest.mark.vcr(filter_headers=["authorization"])