Merge branch 'main' into feature/procedure_v2

This commit is contained in:
Brandon Hancock
2024-07-17 13:37:27 -04:00
24 changed files with 32275 additions and 727 deletions

View File

@@ -205,7 +205,7 @@ class Agent(BaseAgent):
self._times_executed += 1
if self._times_executed > self.max_retry_limit:
raise e
return self.execute_task(task, context, tools)
result = self.execute_task(task, context, tools)
if self.max_rpm:
self._rpm_controller.stop_rpm_counter()

View File

@@ -242,6 +242,8 @@ class CrewAgentExecutor(AgentExecutor, CrewAgentExecutorMixin):
else:
if tool_calling.tool_name.casefold().strip() in [
name.casefold().strip() for name in name_to_tool_map
] or tool_calling.tool_name.casefold().replace("_", " ") in [
name.casefold().strip() for name in name_to_tool_map
]:
observation = tool_usage.use(tool_calling, agent_action.log)
else:

View File

@@ -28,6 +28,7 @@ from crewai.memory.long_term.long_term_memory import LongTermMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.process import Process
from crewai.task import Task
from crewai.tasks.conditional_task import ConditionalTask
from crewai.tasks.task_output import TaskOutput
from crewai.telemetry import Telemetry
from crewai.tools.agent_tools import AgentTools
@@ -294,6 +295,29 @@ class Crew(BaseModel):
return self
@model_validator(mode="after")
def validate_first_task(self) -> "Crew":
"""Ensure the first task is not a ConditionalTask."""
if self.tasks and isinstance(self.tasks[0], ConditionalTask):
raise PydanticCustomError(
"invalid_first_task",
"The first task cannot be a ConditionalTask.",
{},
)
return self
@model_validator(mode="after")
def validate_async_tasks_not_async(self) -> "Crew":
"""Ensure that ConditionalTask is not async."""
for task in self.tasks:
if task.async_execution and isinstance(task, ConditionalTask):
raise PydanticCustomError(
"invalid_async_conditional_task",
f"Conditional Task: {task.description} , cannot be executed asynchronously.", # type: ignore # Argument of type "str" cannot be assigned to parameter "message_template" of type "LiteralString"
{},
)
return self
@model_validator(mode="after")
def validate_async_task_cannot_include_sequential_async_tasks_in_context(self):
"""
@@ -611,21 +635,27 @@ class Crew(BaseModel):
last_sync_output = task.output
continue
self._prepare_task(task, manager)
if self.process == Process.hierarchical:
agent_to_use = manager
else:
agent_to_use = task.agent
agent_to_use = self._get_agent_to_use(task, manager)
if agent_to_use is None:
raise ValueError(
f"No agent available for task: {task.description}. Ensure that either the task has an assigned agent or a manager agent is provided."
)
self._log_task_start(task, agent_to_use)
self._prepare_agent_tools(task, manager)
self._log_task_start(task, agent_to_use.role)
if isinstance(task, ConditionalTask):
skipped_task_output = self._handle_conditional_task(
task, task_outputs, futures, task_index, was_replayed
)
if skipped_task_output:
continue
if task.async_execution:
context = self._get_context(
task, [last_sync_output] if last_sync_output else []
)
self._log_task_start(task, agent_to_use.role)
future = task.execute_async(
agent=agent_to_use,
context=context,
@@ -634,12 +664,11 @@ class Crew(BaseModel):
futures.append((task, future, task_index))
else:
if futures:
task_outputs.extend(
self._process_async_tasks(futures, was_replayed)
)
task_outputs = self._process_async_tasks(futures, was_replayed)
futures.clear()
context = self._get_context(task, task_outputs)
self._log_task_start(task, agent_to_use.role)
task_output = task.execute_sync(
agent=agent_to_use,
context=context,
@@ -654,12 +683,48 @@ class Crew(BaseModel):
return self._create_crew_output(task_outputs)
def _prepare_task(self, task: Task, manager: Optional[BaseAgent]):
def _handle_conditional_task(
self,
task: ConditionalTask,
task_outputs: List[TaskOutput],
futures: List[Tuple[Task, Future[TaskOutput], int]],
task_index: int,
was_replayed: bool,
) -> Optional[TaskOutput]:
if futures:
task_outputs = self._process_async_tasks(futures, was_replayed)
futures.clear()
previous_output = task_outputs[task_index - 1] if task_outputs else None
if previous_output is not None and not task.should_execute(previous_output):
self._logger.log(
"debug",
f"Skipping conditional task: {task.description}",
color="yellow",
)
skipped_task_output = task.get_skipped_task_output()
if not was_replayed:
self._store_execution_log(task, skipped_task_output, task_index)
return skipped_task_output
return None
def _prepare_agent_tools(self, task: Task, manager: Optional[BaseAgent]):
if self.process == Process.hierarchical:
self._update_manager_tools(task, manager)
if manager:
self._update_manager_tools(task, manager)
else:
raise ValueError("Manager agent is required for hierarchical process.")
elif task.agent and task.agent.allow_delegation:
self._add_delegation_tools(task)
def _get_agent_to_use(
self, task: Task, manager: Optional[BaseAgent]
) -> Optional[BaseAgent]:
if self.process == Process.hierarchical:
return manager
return task.agent
def _add_delegation_tools(self, task: Task):
agents_for_delegation = [agent for agent in self.agents if agent != task.agent]
if len(self.agents) > 1 and len(agents_for_delegation) > 0 and task.agent:
@@ -686,18 +751,17 @@ class Crew(BaseModel):
# Add the new tool
task.tools.append(new_tool)
def _log_task_start(self, task: Task, agent: Optional[BaseAgent]):
def _log_task_start(self, task: Task, role: str = "None"):
color = self._logging_color
role = agent.role if agent else "None"
self._logger.log("debug", f"== Working Agent: {role}", color=color)
self._logger.log("info", f"== Starting Task: {task.description}", color=color)
if self.output_log_file:
self._file_handler.log(agent=role, task=task.description, status="started")
def _update_manager_tools(self, task: Task, manager: Optional[BaseAgent]):
if task.agent and manager:
def _update_manager_tools(self, task: Task, manager: BaseAgent):
if task.agent:
manager.tools = task.agent.get_delegation_tools([task.agent])
if manager:
else:
manager.tools = manager.get_delegation_tools(self.agents)
def _get_context(self, task: Task, task_outputs: List[TaskOutput]):
@@ -737,7 +801,7 @@ class Crew(BaseModel):
futures: List[Tuple[Task, Future[TaskOutput], int]],
was_replayed: bool = False,
) -> List[TaskOutput]:
task_outputs = []
task_outputs: List[TaskOutput] = []
for future_task, future, task_index in futures:
task_output = future.result()
task_outputs.append(task_output)

View File

@@ -8,7 +8,6 @@ from copy import copy
from hashlib import md5
from typing import Any, Dict, List, Optional, Tuple, Type, Union
from langchain_openai import ChatOpenAI
from opentelemetry.trace import Span
from pydantic import UUID4, BaseModel, Field, field_validator, model_validator
@@ -255,9 +254,7 @@ class Task(BaseModel):
content = (
json_output
if json_output
else pydantic_output.model_dump_json()
if pydantic_output
else result
else pydantic_output.model_dump_json() if pydantic_output else result
)
self._save_file(content)

View File

@@ -0,0 +1,47 @@
from typing import Any, Callable
from pydantic import Field
from crewai.task import Task
from crewai.tasks.output_format import OutputFormat
from crewai.tasks.task_output import TaskOutput
class ConditionalTask(Task):
"""
A task that can be conditionally executed based on the output of another task.
Note: This cannot be the only task you have in your crew and cannot be the first since its needs context from the previous task.
"""
condition: Callable[[TaskOutput], bool] = Field(
default=None,
description="Maximum number of retries for an agent to execute a task when an error occurs.",
)
def __init__(
self,
condition: Callable[[Any], bool],
**kwargs,
):
super().__init__(**kwargs)
self.condition = condition
def should_execute(self, context: TaskOutput) -> bool:
"""
Determines whether the conditional task should be executed based on the provided context.
Args:
context (Any): The context or output from the previous task that will be evaluated by the condition.
Returns:
bool: True if the task should be executed, False otherwise.
"""
return self.condition(context)
def get_skipped_task_output(self):
return TaskOutput(
description=self.description,
raw="",
agent=self.agent.role if self.agent else "",
output_format=OutputFormat.RAW,
)

View File

@@ -38,10 +38,10 @@ class Converter(OutputConverter):
return self._create_instructor().to_json()
else:
return json.dumps(self._create_chain().invoke({}).model_dump())
except Exception:
except Exception as e:
if current_attempt < self.max_attempts:
return self.to_json(current_attempt + 1)
return ConverterError("Failed to convert text into JSON.")
return ConverterError(f"Failed to convert text into JSON, error: {e}.")
def _create_instructor(self):
"""Create an instructor."""

View File

@@ -17,6 +17,16 @@ class CrewPydanticOutputParser(PydanticOutputParser):
def parse_result(self, result: List[Generation], *, partial: bool = False) -> Any:
result[0].text = self._transform_in_valid_json(result[0].text)
# Treating edge case of function calling llm returning the name instead of tool_name
json_object = json.loads(result[0].text)
json_object["tool_name"] = (
json_object["name"]
if "tool_name" not in json_object
else json_object["tool_name"]
)
result[0].text = json.dumps(json_object)
json_object = super().parse_result(result)
try:
return self.pydantic_object.parse_obj(json_object)

View File

@@ -10,6 +10,8 @@ class Printer:
self._print_bold_purple(content)
elif color == "bold_blue":
self._print_bold_blue(content)
elif color == "yellow":
self._print_yellow(content)
else:
print(content)
@@ -27,3 +29,6 @@ class Printer:
def _print_bold_blue(self, content):
print("\033[1m\033[94m {}\033[00m".format(content))
def _print_yellow(self, content):
print("\033[93m {}\033[00m".format(content))