Merge branch 'feature/kickoff-consistent-output' of https://github.com/bhancockio/crewAI into temp-feature/replay_from_task

This commit is contained in:
Lorenze Jay
2024-07-10 12:33:32 -07:00
19 changed files with 9051 additions and 281 deletions

View File

@@ -36,7 +36,9 @@ from crewai.utilities.constants import TRAINED_AGENTS_DATA_FILE, TRAINING_DATA_F
from crewai.utilities.crew_json_encoder import CrewJSONEncoder
from crewai.utilities.evaluators.task_evaluator import TaskEvaluator
from crewai.utilities.file_handler import TaskOutputJsonHandler
from crewai.utilities.formatter import aggregate_raw_outputs_from_task_outputs
from crewai.utilities.formatter import (
aggregate_raw_outputs_from_task_outputs,
)
from crewai.utilities.training_handler import CrewTrainingHandler
try:
@@ -267,6 +269,63 @@ class Crew(BaseModel):
return self
@model_validator(mode="after")
def validate_end_with_at_most_one_async_task(self):
"""Validates that the crew ends with at most one asynchronous task."""
final_async_task_count = 0
# Traverse tasks backward
for task in reversed(self.tasks):
if task.async_execution:
final_async_task_count += 1
else:
break # Stop traversing as soon as a non-async task is encountered
if final_async_task_count > 1:
raise PydanticCustomError(
"async_task_count",
"The crew must end with at most one asynchronous task.",
{},
)
return self
@model_validator(mode="after")
def validate_async_task_cannot_include_sequential_async_tasks_in_context(self):
"""
Validates that if a task is set to be executed asynchronously,
it cannot include other asynchronous tasks in its context unless
separated by a synchronous task.
"""
for i, task in enumerate(self.tasks):
if task.async_execution and task.context:
for context_task in task.context:
if context_task.async_execution:
for j in range(i - 1, -1, -1):
if self.tasks[j] == context_task:
raise ValueError(
f"Task '{task.description}' is asynchronous and cannot include other sequential asynchronous tasks in its context."
)
if not self.tasks[j].async_execution:
break
return self
@model_validator(mode="after")
def validate_context_no_future_tasks(self):
"""Validates that a task's context does not include future tasks."""
task_indices = {id(task): i for i, task in enumerate(self.tasks)}
for task in self.tasks:
if task.context:
for context_task in task.context:
if id(context_task) not in task_indices:
continue # Skip context tasks not in the main tasks list
if task_indices[id(context_task)] > task_indices[id(task)]:
raise ValueError(
f"Task '{task.description}' has a context dependency on a future task '{context_task.description}', which is not allowed."
)
return self
def _setup_from_config(self):
assert self.config is not None, "Config should not be None."
@@ -337,8 +396,7 @@ class Crew(BaseModel):
self._task_output_handler.reset()
if inputs is not None:
self._inputs = inputs
self._interpolate_inputs(inputs)
# self._interpolate_inputs(inputs)
# self._interpolate_inputs(inputs)
self._set_tasks_callbacks()
i18n = I18N(prompt_file=self.prompt_file)
@@ -364,8 +422,7 @@ class Crew(BaseModel):
if self.process == Process.sequential:
result = self._run_sequential_process()
elif self.process == Process.hierarchical:
result, manager_metrics = self._run_hierarchical_process() # type: ignore # Incompatible types in assignment (expression has type "str | dict[str, Any]", variable has type "str")
metrics.append(manager_metrics)
result = self._run_hierarchical_process() # type: ignore # Incompatible types in assignment (expression has type "str | dict[str, Any]", variable has type "str")
else:
raise NotImplementedError(
f"The process '{self.process}' is not implemented yet."
@@ -404,9 +461,7 @@ class Crew(BaseModel):
self.usage_metrics = total_usage_metrics
return results
async def kickoff_async(
self, inputs: Optional[CrewOutput] = {}
) -> Union[str, Dict]:
async def kickoff_async(self, inputs: Optional[Dict[str, Any]] = {}) -> CrewOutput:
"""Asynchronous kickoff method to start the crew execution."""
return await asyncio.to_thread(self.kickoff, inputs)

View File

@@ -1,13 +1,22 @@
from typing import Any, Dict, List
import json
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.utilities.formatter import aggregate_raw_outputs_from_task_outputs
class CrewOutput(BaseModel):
output: List[TaskOutput] = Field(description="Result of the final task")
"""Class that represents the result of a crew."""
raw: str = Field(description="Raw output of crew", default="")
pydantic: Optional[BaseModel] = Field(
description="Pydantic output of Crew", default=None
)
json_dict: Optional[Dict[str, Any]] = Field(
description="JSON dict output of Crew", default=None
)
tasks_output: list[TaskOutput] = Field(
description="Output of each task", default=[]
)
@@ -15,30 +24,37 @@ class CrewOutput(BaseModel):
description="Processed token summary", default={}
)
# TODO: Ask @joao what is the desired behavior here
def result(
self,
) -> List[str | BaseModel | Dict[str, Any]]:
"""Return the result of the task based on the available output."""
results = [output.result() for output in self.output]
return results
# TODO: Joao - Adding this safety check breakes when people want to see
# The full output of a CrewOutput.
# @property
# def pydantic(self) -> Optional[BaseModel]:
# # Check if the final task output included a pydantic model
# if self.tasks_output[-1].output_format != OutputFormat.PYDANTIC:
# raise ValueError(
# "No pydantic model found in the final task. Please make sure to set the output_pydantic property in the final task in your crew."
# )
def raw_output(self) -> str:
"""Return the raw output of the task."""
return aggregate_raw_outputs_from_task_outputs(self.output)
# return self._pydantic
def to_output_dict(self) -> List[Dict[str, Any]]:
output_dict = [output.to_output_dict() for output in self.output]
return output_dict
@property
def json(self) -> Optional[str]:
if self.tasks_output[-1].output_format != OutputFormat.JSON:
raise ValueError(
"No JSON output found in the final task. Please make sure to set the output_json property in the final task in your crew."
)
def __getitem__(self, key: str) -> Any:
if len(self.output) == 0:
return None
elif len(self.output) == 1:
return self.output[0][key]
else:
return [output[key] for output in self.output]
return json.dumps(self.json_dict)
def to_dict(self) -> Dict[str, Any]:
if self.json_dict:
return self.json_dict
if self.pydantic:
return self.pydantic.model_dump()
raise ValueError("No output to convert to dictionary")
# TODO: Confirm with Joao that we want to print the raw output and not the object
def __str__(self):
return str(self.raw_output())
if self.pydantic:
return str(self.pydantic)
if self.json_dict:
return str(self.json_dict)
return self.raw

View File

@@ -1,10 +1,11 @@
import json
import os
import re
import threading
import uuid
from concurrent.futures import Future
from copy import copy
from typing import Any, Dict, List, Optional, Type, Union
from typing import Any, Dict, List, Optional, Tuple, Type, Union
from langchain_openai import ChatOpenAI
from opentelemetry.trace import Span
@@ -12,10 +13,10 @@ from pydantic import UUID4, BaseModel, Field, field_validator, model_validator
from pydantic_core import PydanticCustomError
from crewai.agents.agent_builder.base_agent import BaseAgent
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry.telemetry import Telemetry
from crewai.utilities.converter import Converter, ConverterError
from crewai.utilities.formatter import aggregate_raw_outputs_from_task_outputs
from crewai.utilities.i18n import I18N
from crewai.utilities.printer import Printer
from crewai.utilities.pydantic_schema_parser import PydanticSchemaParser
@@ -99,6 +100,10 @@ class Task(BaseModel):
description="Whether the task should have a human review the final answer of the agent",
default=False,
)
converter_cls: Optional[Type[Converter]] = Field(
description="A converter class used to export structured output",
default=None,
)
_telemetry: Telemetry
_execution_span: Span | None = None
@@ -159,18 +164,6 @@ class Task(BaseModel):
)
return self
def wait_for_completion(self) -> str | BaseModel:
"""Wait for asynchronous task completion and return the output."""
assert self.async_execution, "Task is not set to be executed asynchronously."
if self._future:
self._future.result() # Wait for the future to complete
self._future = None
assert self.output, "Task output is not set."
return self.output.exported_output
def execute_sync(
self,
agent: Optional[BaseAgent] = None,
@@ -187,7 +180,7 @@ class Task(BaseModel):
tools: Optional[List[Any]] = None,
) -> Future[TaskOutput]:
"""Execute the task asynchronously."""
future = Future()
future: Future[TaskOutput] = Future()
threading.Thread(
target=self._execute_task_async, args=(agent, context, tools, future)
).start()
@@ -220,31 +213,24 @@ class Task(BaseModel):
self._execution_span = self._telemetry.task_started(crew=agent.crew, task=self)
if self.context:
task_outputs: List[TaskOutput] = []
for task in self.context:
# if task.async_execution:
# task.wait_for_completion()
if task.output:
task_outputs.append(task.output)
context = aggregate_raw_outputs_from_task_outputs(task_outputs)
self.prompt_context = context
tools = tools or self.tools
tools = tools or self.tools or []
result = agent.execute_task(
task=self,
context=context,
tools=tools,
)
exported_output = self._export_output(result)
pydantic_output, json_output = self._export_output(result)
task_output = TaskOutput(
description=self.description,
raw_output=result,
pydantic_output=exported_output["pydantic"],
json_output=exported_output["json"],
raw=result,
pydantic=pydantic_output,
json_dict=json_output,
agent=agent.role,
output_format=self._get_output_format(),
)
self.output = task_output
@@ -255,6 +241,16 @@ class Task(BaseModel):
self._telemetry.task_ended(self._execution_span, self)
self._execution_span = None
if self.output_file:
content = (
json_output
if json_output
else pydantic_output.model_dump_json()
if pydantic_output
else result
)
self._save_file(content)
return task_output
def prompt(self) -> str:
@@ -290,7 +286,7 @@ class Task(BaseModel):
"""Increment the delegations counter."""
self.delegations += 1
def copy(self, agents: Optional[List["BaseAgent"]] = None) -> "Task":
def copy(self, agents: List["BaseAgent"]) -> "Task":
"""Create a deep copy of the Task."""
exclude = {
"id",
@@ -323,28 +319,39 @@ class Task(BaseModel):
return copied_task
def _create_converter(self, *args, **kwargs) -> Converter:
"""Create a converter instance."""
converter = self.agent.get_output_converter(*args, **kwargs)
if self.converter_cls:
converter = self.converter_cls(*args, **kwargs)
return converter
def _export_output(
self, result: str
) -> Dict[str, Union[BaseModel, Dict[str, Any]]]:
output = {
"pydantic": None,
"json": None,
}
) -> Tuple[Optional[BaseModel], Optional[Dict[str, Any]]]:
pydantic_output: Optional[BaseModel] = None
json_output: Optional[Dict[str, Any]] = None
if self.output_pydantic or self.output_json:
model_output = self._convert_to_model(result)
output["pydantic"] = (
pydantic_output = (
model_output if isinstance(model_output, BaseModel) else None
)
output["json"] = model_output if isinstance(model_output, dict) else None
if isinstance(model_output, str):
try:
json_output = json.loads(model_output)
except json.JSONDecodeError:
json_output = None
else:
json_output = model_output if isinstance(model_output, dict) else None
if self.output_file:
self._save_output(output["raw"])
return output
return pydantic_output, json_output
def _convert_to_model(self, result: str) -> Union[dict, BaseModel, str]:
model = self.output_pydantic or self.output_json
if model is None:
return result
try:
return self._validate_model(result, model)
except Exception:
@@ -379,7 +386,7 @@ class Task(BaseModel):
llm = self.agent.function_calling_llm or self.agent.llm
instructions = self._get_conversion_instructions(model, llm)
converter = Converter(
converter = self._create_converter(
llm=llm, text=result, model=model, instructions=instructions
)
exported_result = (
@@ -395,6 +402,13 @@ class Task(BaseModel):
return exported_result
def _get_output_format(self) -> OutputFormat:
if self.output_json:
return OutputFormat.JSON
if self.output_pydantic:
return OutputFormat.PYDANTIC
return OutputFormat.RAW
def _get_conversion_instructions(self, model: Type[BaseModel], llm: Any) -> str:
instructions = "I'm gonna convert this raw text into valid JSON."
if not self._is_gpt(llm):
@@ -403,6 +417,9 @@ class Task(BaseModel):
return instructions
def _save_output(self, content: str) -> None:
if not self.output_file:
raise Exception("Output file path is not set.")
directory = os.path.dirname(self.output_file)
if directory and not os.path.exists(directory):
os.makedirs(directory)

View File

@@ -0,0 +1,4 @@
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
__all__ = ["OutputFormat", "TaskOutput"]

View File

@@ -0,0 +1,9 @@
from enum import Enum
class OutputFormat(str, Enum):
"""Enum that represents the output format of a task."""
JSON = "json"
PYDANTIC = "pydantic"
RAW = "raw"

View File

@@ -1,22 +1,29 @@
from typing import Any, Dict, Optional, Union
import json
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field, model_validator
from crewai.tasks.output_format import OutputFormat
# TODO: This is a breaking change. Confirm with @joao
class TaskOutput(BaseModel):
"""Class that represents the result of a task."""
description: str = Field(description="Description of the task")
summary: Optional[str] = Field(description="Summary of the task", default=None)
raw_output: str = Field(description="Result of the task")
pydantic_output: Optional[BaseModel] = Field(
description="Pydantic model output", default=None
raw: str = Field(
description="Raw output of the task", default=""
) # TODO: @joao: breaking change, by renaming raw_output to raw, but now consistent with CrewOutput
pydantic: Optional[BaseModel] = Field(
description="Pydantic output of task", default=None
)
json_output: Optional[Dict[str, Any]] = Field(
description="JSON output", default=None
json_dict: Optional[Dict[str, Any]] = Field(
description="JSON dictionary of task", default=None
)
agent: str = Field(description="Agent that executed the task")
output_format: OutputFormat = Field(
description="Output format of the task", default=OutputFormat.RAW
)
@model_validator(mode="after")
def set_summary(self):
@@ -25,32 +32,47 @@ class TaskOutput(BaseModel):
self.summary = f"{excerpt}..."
return self
# TODO: Ask @joao what is the desired behavior here
def result(self) -> Union[str, BaseModel, Dict[str, Any]]:
"""Return the result of the task based on the available output."""
if self.pydantic_output:
return self.pydantic_output
elif self.json_output:
return self.json_output
else:
return self.raw_output
# TODO: Joao - Adding this safety check breakes when people want to see
# The full output of a TaskOutput or CrewOutput.
# @property
# def pydantic(self) -> Optional[BaseModel]:
# # Check if the final task output included a pydantic model
# if self.output_format != OutputFormat.PYDANTIC:
# raise ValueError(
# """
# Invalid output format requested.
# If you would like to access the pydantic model,
# please make sure to set the output_pydantic property for the task.
# """
# )
def __getitem__(self, key: str) -> Any:
"""Retrieve a value from the pydantic_output or json_output based on the key."""
if self.pydantic_output and hasattr(self.pydantic_output, key):
return getattr(self.pydantic_output, key)
if self.json_output and key in self.json_output:
return self.json_output[key]
raise KeyError(f"Key '{key}' not found in pydantic_output or json_output")
# return self._pydantic
def to_output_dict(self) -> Dict[str, Any]:
@property
def json(self) -> Optional[str]:
if self.output_format != OutputFormat.JSON:
raise ValueError(
"""
Invalid output format requested.
If you would like to access the JSON output,
please make sure to set the output_json property for the task
"""
)
return json.dumps(self.json_dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert json_output and pydantic_output to a dictionary."""
output_dict = {}
if self.json_output:
output_dict.update(self.json_output)
if self.pydantic_output:
output_dict.update(self.pydantic_output.model_dump())
if self.json_dict:
output_dict.update(self.json_dict)
if self.pydantic:
output_dict.update(self.pydantic.model_dump())
return output_dict
def __str__(self) -> str:
return self.raw_output
if self.pydantic:
return str(self.pydantic)
if self.json_dict:
return str(self.json_dict)
return self.raw

View File

@@ -1,5 +1,6 @@
from typing import List
from crewai.task import Task
from crewai.tasks.task_output import TaskOutput
@@ -8,5 +9,12 @@ def aggregate_raw_outputs_from_task_outputs(task_outputs: List[TaskOutput]) -> s
dividers = "\n\n----------\n\n"
# Join task outputs with dividers
context = dividers.join(output.raw_output for output in task_outputs)
context = dividers.join(output.raw for output in task_outputs)
return context
def aggregate_raw_outputs_from_tasks(tasks: List[Task]) -> str:
"""Generate string context from the tasks."""
task_outputs = [task.output for task in tasks if task.output is not None]
return aggregate_raw_outputs_from_task_outputs(task_outputs)