mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-09 08:08:32 +00:00
Feature/kickoff consistent output (#847)
* Cleaned up task execution to now have separate paths for async and sync execution. Updating all kickoff functions to return CrewOutput. WIP. Waiting for Joao feedback on async task execution with task_output * Consistently storing async and sync output for context * outline tests I need to create going forward * Major rehaul of TaskOutput and CrewOutput. Updated all tests to work with new change. Need to add in a few final tricky async tests and add a few more to verify output types on TaskOutput and CrewOutput. * Encountering issues with callback. Need to test on main. WIP * working on tests. WIP * WIP. Figuring out disconnect issue. * Cleaned up logs now that I've isolated the issue to the LLM * more wip. * WIP. It looks like usage metrics has always been broken for async * Update parent crew who is managing for_each loop * Merge in main to bugfix/kickoff-for-each-usage-metrics * Clean up code for review * Add new tests * Final cleanup. Ready for review. * Moving copy functionality from Agent to BaseAgent * Fix renaming issue * Fix linting errors * use BaseAgent instead of Agent where applicable * Fixing missing function. Working on tests. * WIP. Needing team to review change * Fixing issues brought about by merge * WIP * Implement major fixes from yesterdays group conversation. Now working on tests. * The majority of tasks are working now. Need to fix converter class * Fix final failing test * Fix linting and type-checker issues * Add more tests to fully test CrewOutput and TaskOutput changes * Add in validation for async cannot depend on other async tasks. * Update validators and tests
This commit is contained in:
committed by
GitHub
parent
691b094a40
commit
7b53457ef3
@@ -1,9 +1,11 @@
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import threading
|
||||
import uuid
|
||||
from concurrent.futures import Future, ThreadPoolExecutor
|
||||
from concurrent.futures import Future
|
||||
from copy import copy
|
||||
from typing import Any, Dict, List, Optional, Type, Union
|
||||
from typing import Any, Dict, List, Optional, Tuple, Type, Union
|
||||
|
||||
from langchain_openai import ChatOpenAI
|
||||
from opentelemetry.trace import Span
|
||||
@@ -11,6 +13,7 @@ from pydantic import UUID4, BaseModel, Field, field_validator, model_validator
|
||||
from pydantic_core import PydanticCustomError
|
||||
|
||||
from crewai.agents.agent_builder.base_agent import BaseAgent
|
||||
from crewai.tasks.output_format import OutputFormat
|
||||
from crewai.tasks.task_output import TaskOutput
|
||||
from crewai.telemetry.telemetry import Telemetry
|
||||
from crewai.utilities.converter import Converter, ConverterError
|
||||
@@ -106,7 +109,7 @@ class Task(BaseModel):
|
||||
_execution_span: Span | None = None
|
||||
_original_description: str | None = None
|
||||
_original_expected_output: str | None = None
|
||||
_future: Future | None = None
|
||||
_thread: threading.Thread | None = None
|
||||
|
||||
def __init__(__pydantic_self__, **data):
|
||||
config = data.pop("config", {})
|
||||
@@ -161,80 +164,74 @@ class Task(BaseModel):
|
||||
)
|
||||
return self
|
||||
|
||||
def wait_for_completion(self) -> str | BaseModel:
|
||||
"""Wait for asynchronous task completion and return the output."""
|
||||
assert self.async_execution, "Task is not set to be executed asynchronously."
|
||||
def execute_sync(
|
||||
self,
|
||||
agent: Optional[BaseAgent] = None,
|
||||
context: Optional[str] = None,
|
||||
tools: Optional[List[Any]] = None,
|
||||
) -> TaskOutput:
|
||||
"""Execute the task synchronously."""
|
||||
return self._execute_core(agent, context, tools)
|
||||
|
||||
if self._future:
|
||||
self._future.result() # Wait for the future to complete
|
||||
self._future = None
|
||||
|
||||
assert self.output, "Task output is not set."
|
||||
|
||||
return self.output.exported_output
|
||||
|
||||
def execute(
|
||||
def execute_async(
|
||||
self,
|
||||
agent: BaseAgent | None = None,
|
||||
context: Optional[str] = None,
|
||||
tools: Optional[List[Any]] = None,
|
||||
) -> str | None:
|
||||
"""Execute the task.
|
||||
) -> Future[TaskOutput]:
|
||||
"""Execute the task asynchronously."""
|
||||
future: Future[TaskOutput] = Future()
|
||||
threading.Thread(
|
||||
target=self._execute_task_async, args=(agent, context, tools, future)
|
||||
).start()
|
||||
return future
|
||||
|
||||
Returns:
|
||||
Output of the task.
|
||||
"""
|
||||
def _execute_task_async(
|
||||
self,
|
||||
agent: Optional[BaseAgent],
|
||||
context: Optional[str],
|
||||
tools: Optional[List[Any]],
|
||||
future: Future[TaskOutput],
|
||||
) -> None:
|
||||
"""Execute the task asynchronously with context handling."""
|
||||
result = self._execute_core(agent, context, tools)
|
||||
future.set_result(result)
|
||||
|
||||
def _execute_core(
|
||||
self,
|
||||
agent: Optional[BaseAgent],
|
||||
context: Optional[str],
|
||||
tools: Optional[List[Any]],
|
||||
) -> TaskOutput:
|
||||
"""Run the core execution logic of the task."""
|
||||
agent = agent or self.agent
|
||||
if not agent:
|
||||
raise Exception(
|
||||
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly "
|
||||
"and should be executed in a Crew using a specific process that support that, like hierarchical."
|
||||
f"The task '{self.description}' has no agent assigned, therefore it can't be executed directly and should be executed in a Crew using a specific process that support that, like hierarchical."
|
||||
)
|
||||
|
||||
self._execution_span = self._telemetry.task_started(crew=agent.crew, task=self)
|
||||
|
||||
if self.context:
|
||||
internal_context = []
|
||||
for task in self.context:
|
||||
if task.async_execution:
|
||||
task.wait_for_completion()
|
||||
if task.output:
|
||||
internal_context.append(task.output.raw_output)
|
||||
context = "\n".join(internal_context)
|
||||
|
||||
self.prompt_context = context
|
||||
tools = tools or self.tools
|
||||
tools = tools or self.tools or []
|
||||
|
||||
if self.async_execution:
|
||||
with ThreadPoolExecutor() as executor:
|
||||
self._future = executor.submit(
|
||||
self._execute, agent, self, context, tools
|
||||
)
|
||||
return None
|
||||
else:
|
||||
result = self._execute(
|
||||
task=self,
|
||||
agent=agent,
|
||||
context=context,
|
||||
tools=tools,
|
||||
)
|
||||
return result
|
||||
|
||||
def _execute(self, agent: "BaseAgent", task, context, tools) -> str | None:
|
||||
result = agent.execute_task(
|
||||
task=task,
|
||||
task=self,
|
||||
context=context,
|
||||
tools=tools,
|
||||
)
|
||||
exported_output = self._export_output(result)
|
||||
|
||||
self.output = TaskOutput(
|
||||
pydantic_output, json_output = self._export_output(result)
|
||||
|
||||
task_output = TaskOutput(
|
||||
description=self.description,
|
||||
exported_output=exported_output,
|
||||
raw_output=result,
|
||||
raw=result,
|
||||
pydantic=pydantic_output,
|
||||
json_dict=json_output,
|
||||
agent=agent.role,
|
||||
output_format=self._get_output_format(),
|
||||
)
|
||||
self.output = task_output
|
||||
|
||||
if self.callback:
|
||||
self.callback(self.output)
|
||||
@@ -243,7 +240,15 @@ class Task(BaseModel):
|
||||
self._telemetry.task_ended(self._execution_span, self)
|
||||
self._execution_span = None
|
||||
|
||||
return exported_output
|
||||
if self.output_file:
|
||||
content = (
|
||||
json_output
|
||||
if json_output
|
||||
else pydantic_output.model_dump_json() if pydantic_output else result
|
||||
)
|
||||
self._save_file(content)
|
||||
|
||||
return task_output
|
||||
|
||||
def prompt(self) -> str:
|
||||
"""Prompt the task.
|
||||
@@ -278,7 +283,7 @@ class Task(BaseModel):
|
||||
"""Increment the delegations counter."""
|
||||
self.delegations += 1
|
||||
|
||||
def copy(self, agents: Optional[List["BaseAgent"]] = None) -> "Task": # type: ignore # Signature of "copy" incompatible with supertype "BaseModel"
|
||||
def copy(self, agents: List["BaseAgent"]) -> "Task":
|
||||
"""Create a deep copy of the Task."""
|
||||
exclude = {
|
||||
"id",
|
||||
@@ -295,7 +300,7 @@ class Task(BaseModel):
|
||||
)
|
||||
|
||||
def get_agent_by_role(role: str) -> Union["BaseAgent", None]:
|
||||
return next((agent for agent in agents if agent.role == role), None) # type: ignore # Item "None" of "list[BaseAgent] | None" has no attribute "__iter__" (not iterable)
|
||||
return next((agent for agent in agents if agent.role == role), None)
|
||||
|
||||
cloned_agent = get_agent_by_role(self.agent.role) if self.agent else None
|
||||
cloned_tools = copy(self.tools) if self.tools else []
|
||||
@@ -309,72 +314,113 @@ class Task(BaseModel):
|
||||
|
||||
return copied_task
|
||||
|
||||
def _create_converter(self, *args, **kwargs) -> Converter: # type: ignore
|
||||
converter = self.agent.get_output_converter( # type: ignore # Item "None" of "BaseAgent | None" has no attribute "get_output_converter"
|
||||
*args, **kwargs
|
||||
)
|
||||
def _create_converter(self, *args, **kwargs) -> Converter:
|
||||
"""Create a converter instance."""
|
||||
converter = self.agent.get_output_converter(*args, **kwargs)
|
||||
if self.converter_cls:
|
||||
converter = self.converter_cls( # type: ignore # Item "None" of "BaseAgent | None" has no attribute "get_output_converter"
|
||||
*args, **kwargs
|
||||
)
|
||||
converter = self.converter_cls(*args, **kwargs)
|
||||
return converter
|
||||
|
||||
def _export_output(self, result: str) -> Any:
|
||||
exported_result = result
|
||||
instructions = "I'm gonna convert this raw text into valid JSON."
|
||||
def _export_output(
|
||||
self, result: str
|
||||
) -> Tuple[Optional[BaseModel], Optional[Dict[str, Any]]]:
|
||||
pydantic_output: Optional[BaseModel] = None
|
||||
json_output: Optional[Dict[str, Any]] = None
|
||||
|
||||
if self.output_pydantic or self.output_json:
|
||||
model = self.output_pydantic or self.output_json
|
||||
model_output = self._convert_to_model(result)
|
||||
pydantic_output = (
|
||||
model_output if isinstance(model_output, BaseModel) else None
|
||||
)
|
||||
if isinstance(model_output, str):
|
||||
try:
|
||||
json_output = json.loads(model_output)
|
||||
except json.JSONDecodeError:
|
||||
json_output = None
|
||||
else:
|
||||
json_output = model_output if isinstance(model_output, dict) else None
|
||||
|
||||
# try to convert task_output directly to pydantic/json
|
||||
return pydantic_output, json_output
|
||||
|
||||
def _convert_to_model(self, result: str) -> Union[dict, BaseModel, str]:
|
||||
model = self.output_pydantic or self.output_json
|
||||
if model is None:
|
||||
return result
|
||||
|
||||
try:
|
||||
return self._validate_model(result, model)
|
||||
except Exception:
|
||||
return self._handle_partial_json(result, model)
|
||||
|
||||
def _validate_model(
|
||||
self, result: str, model: Type[BaseModel]
|
||||
) -> Union[dict, BaseModel]:
|
||||
exported_result = model.model_validate_json(result)
|
||||
if self.output_json:
|
||||
return exported_result.model_dump()
|
||||
return exported_result
|
||||
|
||||
def _handle_partial_json(
|
||||
self, result: str, model: Type[BaseModel]
|
||||
) -> Union[dict, BaseModel, str]:
|
||||
match = re.search(r"({.*})", result, re.DOTALL)
|
||||
if match:
|
||||
try:
|
||||
exported_result = model.model_validate_json(result) # type: ignore # Item "None" of "type[BaseModel] | None" has no attribute "model_validate_json"
|
||||
exported_result = model.model_validate_json(match.group(0))
|
||||
if self.output_json:
|
||||
return exported_result.model_dump() # type: ignore # "str" has no attribute "model_dump"
|
||||
return exported_result.model_dump()
|
||||
return exported_result
|
||||
except Exception:
|
||||
# sometimes the response contains valid JSON in the middle of text
|
||||
match = re.search(r"({.*})", result, re.DOTALL)
|
||||
if match:
|
||||
try:
|
||||
exported_result = model.model_validate_json(match.group(0)) # type: ignore # Item "None" of "type[BaseModel] | None" has no attribute "model_validate_json"
|
||||
if self.output_json:
|
||||
return exported_result.model_dump() # type: ignore # "str" has no attribute "model_dump"
|
||||
return exported_result
|
||||
except Exception:
|
||||
pass
|
||||
pass
|
||||
|
||||
llm = getattr(self.agent, "function_calling_llm", None) or self.agent.llm # type: ignore # Item "None" of "BaseAgent | None" has no attribute "function_calling_llm"
|
||||
if not self._is_gpt(llm):
|
||||
model_schema = PydanticSchemaParser(model=model).get_schema() # type: ignore # Argument "model" to "PydanticSchemaParser" has incompatible type "type[BaseModel] | None"; expected "type[BaseModel]"
|
||||
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
|
||||
return self._convert_with_instructions(result, model)
|
||||
|
||||
converter = self._create_converter( # type: ignore # Item "None" of "BaseAgent | None" has no attribute "get_output_converter"
|
||||
llm=llm, text=result, model=model, instructions=instructions
|
||||
def _convert_with_instructions(
|
||||
self, result: str, model: Type[BaseModel]
|
||||
) -> Union[dict, BaseModel, str]:
|
||||
llm = self.agent.function_calling_llm or self.agent.llm
|
||||
instructions = self._get_conversion_instructions(model, llm)
|
||||
|
||||
converter = self._create_converter(
|
||||
llm=llm, text=result, model=model, instructions=instructions
|
||||
)
|
||||
exported_result = (
|
||||
converter.to_pydantic() if self.output_pydantic else converter.to_json()
|
||||
)
|
||||
|
||||
if isinstance(exported_result, ConverterError):
|
||||
Printer().print(
|
||||
content=f"{exported_result.message} Using raw output instead.",
|
||||
color="red",
|
||||
)
|
||||
|
||||
if self.output_pydantic:
|
||||
exported_result = converter.to_pydantic()
|
||||
elif self.output_json:
|
||||
exported_result = converter.to_json()
|
||||
|
||||
if isinstance(exported_result, ConverterError):
|
||||
Printer().print(
|
||||
content=f"{exported_result.message} Using raw output instead.",
|
||||
color="red",
|
||||
)
|
||||
exported_result = result
|
||||
|
||||
if self.output_file:
|
||||
content = (
|
||||
exported_result
|
||||
if not self.output_pydantic
|
||||
else exported_result.model_dump_json() # type: ignore # "str" has no attribute "json"
|
||||
)
|
||||
self._save_file(content)
|
||||
return result
|
||||
|
||||
return exported_result
|
||||
|
||||
def _get_output_format(self) -> OutputFormat:
|
||||
if self.output_json:
|
||||
return OutputFormat.JSON
|
||||
if self.output_pydantic:
|
||||
return OutputFormat.PYDANTIC
|
||||
return OutputFormat.RAW
|
||||
|
||||
def _get_conversion_instructions(self, model: Type[BaseModel], llm: Any) -> str:
|
||||
instructions = "I'm gonna convert this raw text into valid JSON."
|
||||
if not self._is_gpt(llm):
|
||||
model_schema = PydanticSchemaParser(model=model).get_schema()
|
||||
instructions = f"{instructions}\n\nThe json should have the following structure, with the following keys:\n{model_schema}"
|
||||
return instructions
|
||||
|
||||
def _save_output(self, content: str) -> None:
|
||||
if not self.output_file:
|
||||
raise Exception("Output file path is not set.")
|
||||
|
||||
directory = os.path.dirname(self.output_file)
|
||||
if directory and not os.path.exists(directory):
|
||||
os.makedirs(directory)
|
||||
with open(self.output_file, "w", encoding="utf-8") as file:
|
||||
file.write(content)
|
||||
|
||||
def _is_gpt(self, llm) -> bool:
|
||||
return isinstance(llm, ChatOpenAI) and llm.openai_api_base is None
|
||||
|
||||
|
||||
Reference in New Issue
Block a user