added validator for async_execution true in tasks whenever in hierarchical run

This commit is contained in:
Lorenze Jay
2024-07-03 12:27:21 -07:00
parent 9efd03e2df
commit ae2e9de893
2 changed files with 43 additions and 5 deletions

View File

@@ -226,11 +226,8 @@ class Crew(BaseModel):
@model_validator(mode="after")
def validate_tasks(self):
process = self.process
tasks = self.tasks
if process == Process.sequential:
for task in tasks:
if self.process == Process.sequential:
for task in self.tasks:
if task.agent is None:
raise PydanticCustomError(
"missing_agent_in_task",
@@ -240,6 +237,20 @@ class Crew(BaseModel):
return self
@model_validator(mode="after")
def check_tasks_in_hierarchical_process_not_async(self):
"""Validates that the tasks in hierarchical process are not flagged with async_execution."""
if self.process == Process.hierarchical:
for task in self.tasks:
if task.async_execution:
raise PydanticCustomError(
"async_execution_in_hierarchical_process",
"Hierarchical process error: Tasks cannot be flagged with async_execution.",
{},
)
return self
def _setup_from_config(self):
assert self.config is not None, "Config should not be None."

View File

@@ -1109,6 +1109,33 @@ def test_hierarchical_crew_creation_tasks_with_agents():
)
@pytest.mark.vcr(filter_headers=["authorization"])
def test_hierarchical_crew_creation_tasks_without_async_execution():
from langchain_openai import ChatOpenAI
task = Task(
description="Come up with a list of 5 interesting ideas to explore for an article, then write one amazing paragraph highlight for each idea that showcases how good an article about this topic could be. Return the list of ideas with their paragraph and your notes.",
expected_output="5 bullet points with a paragraph for each idea.",
async_execution=True, # should throw an error
)
with pytest.raises(pydantic_core._pydantic_core.ValidationError) as exec_info:
Crew(
tasks=[task],
agents=[researcher],
process=Process.hierarchical,
manager_llm=ChatOpenAI(model="gpt-4o"),
)
assert (
exec_info.value.errors()[0]["type"] == "async_execution_in_hierarchical_process"
)
assert (
"Hierarchical process error: Tasks cannot be flagged with async_execution."
in exec_info.value.errors()[0]["msg"]
)
def test_crew_inputs_interpolate_both_agents_and_tasks():
agent = Agent(
role="{topic} Researcher",