Merge branch 'main' into feature/kickoff-consistent-output

This commit is contained in:
Brandon Hancock
2024-07-08 08:59:38 -04:00
54 changed files with 216613 additions and 17211 deletions

View File

@@ -15,6 +15,7 @@ from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry.telemetry import Telemetry
from crewai.utilities.converter import Converter, ConverterError
from crewai.utilities.formatter import aggregate_raw_outputs_from_task_outputs
from crewai.utilities.i18n import I18N
from crewai.utilities.printer import Printer
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
@@ -158,6 +159,18 @@ class Task(BaseModel):
)
return self
def wait_for_completion(self) -> str | BaseModel:
"""Wait for asynchronous task completion and return the output."""
assert self.async_execution, "Task is not set to be executed asynchronously."
if self._future:
self._future.result() # Wait for the future to complete
self._future = None
assert self.output, "Task output is not set."
return self.output.exported_output
def execute_sync(
self,
agent: Optional[BaseAgent] = None,
@@ -198,22 +211,22 @@ class Task(BaseModel):
tools: Optional[List[Any]],
) -> TaskOutput:
"""Run the core execution logic of the task."""
self._execution_span = self._telemetry.task_started(self)
agent = agent or self.agent
if not agent:
raise Exception(
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical."
)
self._execution_span = self._telemetry.task_started(self)
if self.context:
context_list = []
task_outputs: List[TaskOutput] = []
for task in self.context:
if task.async_execution and task._thread:
task._thread.join()
if task and task.output:
context_list.append(task.output.raw_output)
context = "\n".join(context_list)
if task.async_execution:
task.wait_for_completion()
if task.output:
task_outputs.append(task.output)
context = aggregate_raw_outputs_from_task_outputs(task_outputs)
self.prompt_context = context
tools = tools or self.tools
@@ -289,7 +302,7 @@ class Task(BaseModel):
copied_data = {k: v for k, v in copied_data.items() if v is not None}
cloned_context = (
[task.copy() for task in self.context] if self.context else None
[task.copy(agents) for task in self.context] if self.context else None
)
def get_agent_by_role(role: str) -> Union["BaseAgent", None]:
@@ -397,14 +410,12 @@ class Task(BaseModel):
return isinstance(llm, ChatOpenAI) and llm.openai_api_base is None
def _save_file(self, result: Any) -> None:
# type: ignore # Value of type variable "AnyOrLiteralStr" of "dirname" cannot be "str | None"
directory = os.path.dirname(self.output_file)
directory = os.path.dirname(self.output_file) # type: ignore # Value of type variable "AnyOrLiteralStr" of "dirname" cannot be "str | None"
if directory and not os.path.exists(directory):
os.makedirs(directory)
# type: ignore # Argument 1 to "open" has incompatible type "str | None"; expected "int | str | bytes | PathLike[str] | PathLike[bytes]"
with open(self.output_file, "w", encoding="utf-8") as file:
with open(self.output_file, "w", encoding="utf-8") as file: # type: ignore # Argument 1 to "open" has incompatible type "str | None"; expected "int | str | bytes | PathLike[str] | PathLike[bytes]"
file.write(result)
return None