fixed bug for manager overriding task agent and then added pydanic valditors to sequential when no agent is added to task

This commit is contained in:
Lorenze Jay
2024-07-01 09:32:43 -07:00
parent 5b66e87621
commit 9392788ed0
4 changed files with 3724 additions and 13 deletions

View File

@@ -1,7 +1,7 @@
import asyncio
import json
import uuid
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Union, Tuple
from langchain_core.callbacks import BaseCallbackHandler
from pydantic import (
@@ -219,6 +219,22 @@ class Crew(BaseModel):
agent.set_rpm_controller(self._rpm_controller)
return self
@model_validator(mode="after")
def validate_tasks(self):
process = self.process
tasks = self.tasks
if process == Process.sequential:
for task in tasks:
if task.agent is None:
raise PydanticCustomError(
"missing_agent_in_task",
"Agent is missing in the task with the following description: {task.description}",
{},
)
return self
def _setup_from_config(self):
assert self.config is not None, "Config should not be None."
@@ -309,9 +325,7 @@ class Crew(BaseModel):
if self.process == Process.sequential:
result = self._run_sequential_process()
elif self.process == Process.hierarchical:
# type: ignore # Unpacking a string is disallowed
result, manager_metrics = self._run_hierarchical_process()
# type: ignore # Cannot determine type of "manager_metrics"
metrics.append(manager_metrics)
else:
raise NotImplementedError(
@@ -409,14 +423,16 @@ class Crew(BaseModel):
# type: ignore # Incompatible return value type (got "tuple[str, Any]", expected "str")
return self._format_output(task_output, token_usage_formatted)
def _run_hierarchical_process(self) -> Union[str, Dict[str, Any]]:
def _run_hierarchical_process(
self,
) -> Tuple[Union[str, Dict[str, Any]], Dict[str, Any]]:
"""Creates and assigns a manager agent to make sure the crew completes the tasks."""
i18n = I18N(prompt_file=self.prompt_file)
if self.manager_agent is not None:
self.manager_agent.allow_delegation = True
manager = self.manager_agent
if len(manager.tools) > 0:
if manager.tools is not None and len(manager.tools) > 0:
raise Exception("Manager agent should not have tools")
manager.tools = self.manager_agent.get_delegation_tools(self.agents)
else:
@@ -428,7 +444,7 @@ class Crew(BaseModel):
llm=self.manager_llm,
verbose=True,
)
self.manager_agent = manager
task_output = ""
token_usage = []
for task in self.tasks:
@@ -439,15 +455,18 @@ class Crew(BaseModel):
self._file_handler.log(
agent=manager.role, task=task.description, status="started"
)
if task.agent is not None:
manager.tools = task.agent.get_delegation_tools([task.agent])
else:
manager.tools = manager.get_delegation_tools(self.agents)
task_output = task.execute(
agent=manager, context=task_output, tools=manager.tools
)
if hasattr(manager, "_token_process"):
token_summ = manager._token_process.get_summary()
token_usage.append(token_summ)
self._logger.log("debug", f"[{manager.role}] Task output: {task_output}")
if hasattr(task, "agent._token_process"):
token_summ = task.agent._token_process.get_summary()
token_usage.append(token_summ)
if self.output_log_file:
self._file_handler.log(
agent=manager.role, task=task_output, status="completed"
@@ -455,13 +474,13 @@ class Crew(BaseModel):
self._finish_execution(task_output)
# type: ignore # Incompatible return value type (got "tuple[str, Any]", expected "str")
manager_token_usage = manager._token_process.get_summary()
token_usage.append(manager_token_usage)
token_usage_formatted = self.aggregate_token_usage(token_usage)
return self._format_output(
task_output, token_usage_formatted
task_output,
token_usage_formatted,
), manager_token_usage
def copy(self):

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -667,6 +667,29 @@ def test_agent_usage_metrics_are_captured_for_sequential_process():
}
@pytest.mark.vcr(filter_headers=["authorization"])
def test_sequential_crew_creation_tasks_without_agents():
task = Task(
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
expected_output="5 bullet points with a paragraph for each idea.",
# agent=researcher, #this should throw an error
)
# Expected Output: The sequential crew should fail to create because the task is missing an agent
with pytest.raises(pydantic_core._pydantic_core.ValidationError) as exec_info:
Crew(
tasks=[task],
agents=[researcher],
process=Process.sequential,
)
assert exec_info.value.errors()[0]["type"] == "missing_agent_in_task"
assert (
"Agent is missing in the task with the following description"
in exec_info.value.errors()[0]["msg"]
)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_agent_usage_metrics_are_captured_for_hierarchical_process():
from langchain_openai import ChatOpenAI
@@ -698,6 +721,38 @@ def test_agent_usage_metrics_are_captured_for_hierarchical_process():
}
@pytest.mark.vcr(filter_headers=["authorization"])
def test_hierarchical_crew_creation_tasks_with_agents():
"""
Agents are not required for tasks in a hierarchical process but sometimes they are still added
This test makes sure that the manager still delegates the task to the agent even if the agent is passed in the task
"""
from langchain_openai import ChatOpenAI
task = Task(
description="Write one amazing paragraph about AI.",
expected_output="A single paragraph with 4 sentences.",
agent=writer,
)
crew = Crew(
tasks=[task],
agents=[writer],
process=Process.hierarchical,
manager_llm=ChatOpenAI(model="gpt-4o"),
)
assert crew.process == Process.hierarchical
assert crew.manager_llm is not None
assert crew.tasks[0].agent == writer
result = crew.kickoff()
assert (
result
== "Artificial Intelligence (AI) is revolutionizing the way we live and work, driving advancements across numerous industries from healthcare to finance and beyond. By harnessing the power of complex algorithms and vast datasets, AI systems can perform tasks with unprecedented speed, accuracy, and efficiency, often surpassing human capabilities. From predictive analytics and personalized recommendations to autonomous vehicles and intelligent virtual assistants, AI's applications are both diverse and transformative. As we continue to innovate and integrate AI into our daily lives, its potential to shape a smarter, more efficient, and interconnected future is boundless."
)
def test_crew_inputs_interpolate_both_agents_and_tasks():
agent = Agent(
role="{topic} Researcher",
@@ -708,9 +763,10 @@ def test_crew_inputs_interpolate_both_agents_and_tasks():
task = Task(
description="Give me an analysis around {topic}.",
expected_output="{points} bullet points about {topic}.",
agent=agent,
)
crew = Crew(agents=[agent], tasks=[task], inputs={"topic": "AI", "points": 5})
crew = Crew(agents=[agent], tasks=[task])
inputs = {"topic": "AI", "points": 5}
crew._interpolate_inputs(inputs=inputs) # Manual call for now