Add repository agents to flow definitions (#6437)
Some checks are pending
CodeQL Advanced / Analyze (actions) (push) Waiting to run
CodeQL Advanced / Analyze (python) (push) Waiting to run
Vulnerability Scan / pip-audit (push) Waiting to run

Inline agent and crew actions can now use repository-backed agents
without duplicating role, goal, and backstory in each definition.

Examples:
* `agent.with.from_repository: support_specialist`
* `crew.with.agents.researcher.from_repository: researcher`

`PlusAPI.get_agent` now uses the shared synchronous request path so
project loaders can fetch repository agents without nested event loops.
This commit is contained in:
Vinicius Brasil
2026-07-02 14:28:01 -07:00
committed by GitHub
parent 24901cd4f6
commit 2b90117e88
10 changed files with 436 additions and 92 deletions

View File

@@ -1,8 +1,6 @@
import os
import unittest
from unittest.mock import ANY, AsyncMock, MagicMock, patch
import pytest
from unittest.mock import ANY, MagicMock, patch
from crewai_cli.plus_api import PlusAPI
@@ -343,28 +341,23 @@ class TestPlusAPI(unittest.TestCase):
)
@pytest.mark.asyncio
@patch("httpx.AsyncClient")
async def test_get_agent(mock_async_client_class):
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_get_agent(mock_make_request):
api = PlusAPI("test_api_key")
mock_response = MagicMock()
mock_client_instance = AsyncMock()
mock_client_instance.get.return_value = mock_response
mock_async_client_class.return_value.__aenter__.return_value = mock_client_instance
mock_make_request.return_value = mock_response
response = await api.get_agent("test_agent_handle")
response = api.get_agent("test_agent_handle")
mock_client_instance.get.assert_called_once_with(
f"{api.base_url}/crewai_plus/api/v1/agents/test_agent_handle",
headers=api.headers,
mock_make_request.assert_called_once_with(
"GET", "/crewai_plus/api/v1/agents/test_agent_handle"
)
assert response == mock_response
@pytest.mark.asyncio
@patch("httpx.AsyncClient")
@patch("crewai_core.plus_api.PlusAPI._make_request")
@patch("crewai_core.plus_api.Settings")
async def test_get_agent_with_org_uuid(mock_settings_class, mock_async_client_class):
def test_get_agent_with_org_uuid(mock_settings_class, mock_make_request):
org_uuid = "test-org-uuid"
mock_settings = MagicMock()
mock_settings.org_uuid = org_uuid
@@ -374,15 +367,12 @@ async def test_get_agent_with_org_uuid(mock_settings_class, mock_async_client_cl
api = PlusAPI("test_api_key")
mock_response = MagicMock()
mock_client_instance = AsyncMock()
mock_client_instance.get.return_value = mock_response
mock_async_client_class.return_value.__aenter__.return_value = mock_client_instance
mock_make_request.return_value = mock_response
response = await api.get_agent("test_agent_handle")
response = api.get_agent("test_agent_handle")
mock_client_instance.get.assert_called_once_with(
f"{api.base_url}/crewai_plus/api/v1/agents/test_agent_handle",
headers=api.headers,
mock_make_request.assert_called_once_with(
"GET", "/crewai_plus/api/v1/agents/test_agent_handle"
)
assert "X-Crewai-Organization-Id" in api.headers
assert api.headers["X-Crewai-Organization-Id"] == org_uuid

View File

@@ -232,10 +232,8 @@ class PlusAPI:
def get_tool(self, handle: str) -> httpx.Response:
return self._make_request("GET", f"{self.TOOLS_RESOURCE}/{handle}")
async def get_agent(self, handle: str) -> httpx.Response:
url = urljoin(self.base_url, f"{self.AGENTS_RESOURCE}/{handle}")
async with httpx.AsyncClient() as client:
return await client.get(url, headers=cast(dict[str, str], self.headers))
def get_agent(self, handle: str) -> httpx.Response:
return self._make_request("GET", f"{self.AGENTS_RESOURCE}/{handle}")
def publish_tool(
self,

View File

@@ -138,11 +138,12 @@ class CrewAction:
local_context = _pop_local_context(kwargs)
if self.definition.from_declaration is not None:
crew, default_inputs = load_crew(
crew, default_inputs = await asyncio.to_thread(
load_crew,
_resolve_crew_declaration(
self.definition.from_declaration,
base_dir=self.flow._definition.source_dir,
)
),
)
input_template = {**default_inputs, **(self.definition.inputs or {})}
else:
@@ -155,7 +156,9 @@ class CrewAction:
**crew_definition.inputs,
**(self.definition.inputs or {}),
}
crew, _ = load_crew_from_definition(crew_definition, source="crew action")
crew, _ = await asyncio.to_thread(
load_crew_from_definition, crew_definition, source="crew action"
)
inputs = Expression.from_flow(
cast(ExpressionData, input_template),
@@ -184,7 +187,8 @@ class AgentAction:
if not isinstance(rendered_input, str):
raise ValueError("agent input must render to a string")
agent, response_format = load_agent_from_definition(
agent, response_format = await asyncio.to_thread(
load_agent_from_definition,
self.definition.with_,
source="agent action",
)

View File

@@ -66,21 +66,24 @@ class CrewAgentDefinition(BaseModel):
model_config = ConfigDict(extra="allow")
role: str = Field(
role: str | None = Field(
default=None,
description=(
"Crew agent role. Crew inputs are interpolated with `{name}` "
"placeholders such as `{topic}`; this is not CEL."
),
examples=["Research analyst"],
)
goal: str = Field(
goal: str | None = Field(
default=None,
description=(
"Crew agent goal. Crew inputs are interpolated with `{name}` "
"placeholders such as `{topic}`; this is not CEL."
),
examples=["Research {topic}"],
)
backstory: str = Field(
backstory: str | None = Field(
default=None,
description=(
"Crew agent backstory. Crew inputs are interpolated with `{name}` "
"placeholders such as `{topic}`; this is not CEL."
@@ -92,6 +95,15 @@ class CrewAgentDefinition(BaseModel):
description="Optional built-in type or Python reference used to load the agent.",
examples=["agent", {"python": "my_project.agents.ResearchAgent"}],
)
from_repository: str | None = Field(
default=None,
description=(
"Agent repository name to load. Repository values supply missing "
"agent configuration; explicitly provided local fields override the "
"repository values."
),
examples=["researcher"],
)
settings: dict[str, Any] = Field(
default_factory=dict,
description="Additional agent settings passed to the loader.",
@@ -183,15 +195,18 @@ class CrewAgentDefinition(BaseModel):
class AgentDefinition(CrewAgentDefinition):
"""Inline individual agent definition used outside of a crew."""
role: str = Field(
role: str | None = Field(
default=None,
description="Individual agent role used by a Flow agent action outside of a crew.",
examples=["Support specialist"],
)
goal: str = Field(
goal: str | None = Field(
default=None,
description="Individual agent goal for the Flow agent action outside of a crew.",
examples=["Draft a concise customer reply"],
)
backstory: str = Field(
backstory: str | None = Field(
default=None,
description=(
"Individual agent backstory used to shape behavior outside of a crew."
),

View File

@@ -978,9 +978,10 @@ def _agent_kwargs_from_definition(
extra_allowed,
skip_unknown=skip_unknown,
)
for required in ("role", "goal", "backstory"):
if required not in defn:
errors.append(f"{path}: missing required field '{required}'")
if not defn.get("from_repository"):
for required in ("role", "goal", "backstory"):
if defn.get(required) is None:
errors.append(f"{path}: missing required field '{required}'")
settings = defn.get("settings", {})
if settings is None:

View File

@@ -1125,7 +1125,7 @@ def load_agent_from_repository(from_repository: str) -> dict[str, Any]:
client = PlusAPI(api_key=get_auth_token())
_print_current_organization()
response = asyncio.run(client.get_agent(from_repository))
response = client.get_agent(from_repository)
if response.status_code == 404:
raise AgentRepositoryError(
f"Agent {from_repository} does not exist, make sure the name is correct or the agent is available on your organization."
@@ -1158,6 +1158,8 @@ def load_agent_from_repository(from_repository: str) -> dict[str, Any]:
raise AgentRepositoryError(
f"Tool {tool['name']} could not be loaded: {e}"
) from e
elif key == "skills" and value == []:
continue
else:
attributes[key] = value
return attributes

View File

@@ -2243,6 +2243,27 @@ def test_agent_from_repository_override_attributes(mock_get_agent, mock_get_auth
assert isinstance(agent.tools[0], SerperDevTool)
@patch("crewai.plus_api.PlusAPI.get_agent")
def test_agent_from_repository_ignores_empty_skills(
mock_get_agent, mock_get_auth_token
):
mock_get_response = MagicMock()
mock_get_response.status_code = 200
mock_get_response.json.return_value = {
"role": "test role",
"goal": "test goal",
"backstory": "test backstory",
"tools": [],
"skills": [],
}
mock_get_agent.return_value = mock_get_response
agent = Agent(from_repository="test_agent")
assert agent.role == "test role"
assert agent.skills is None
@patch("crewai.plus_api.PlusAPI.get_agent")
def test_agent_from_repository_with_invalid_tools(mock_get_agent, mock_get_auth_token):
mock_get_response = MagicMock()

View File

@@ -1,8 +1,6 @@
import os
import unittest
from unittest.mock import ANY, AsyncMock, MagicMock, patch
import pytest
from unittest.mock import ANY, MagicMock, patch
from crewai.plus_api import PlusAPI
@@ -396,28 +394,23 @@ class TestPlusAPI(unittest.TestCase):
)
@pytest.mark.asyncio
@patch("httpx.AsyncClient")
async def test_get_agent(mock_async_client_class):
@patch("crewai_core.plus_api.PlusAPI._make_request")
def test_get_agent(mock_make_request):
api = PlusAPI("test_api_key")
mock_response = MagicMock()
mock_client_instance = AsyncMock()
mock_client_instance.get.return_value = mock_response
mock_async_client_class.return_value.__aenter__.return_value = mock_client_instance
mock_make_request.return_value = mock_response
response = await api.get_agent("test_agent_handle")
response = api.get_agent("test_agent_handle")
mock_client_instance.get.assert_called_once_with(
f"{api.base_url}/crewai_plus/api/v1/agents/test_agent_handle",
headers=api.headers,
mock_make_request.assert_called_once_with(
"GET", "/crewai_plus/api/v1/agents/test_agent_handle"
)
assert response == mock_response
@pytest.mark.asyncio
@patch("httpx.AsyncClient")
@patch("crewai_core.plus_api.PlusAPI._make_request")
@patch("crewai_core.plus_api.Settings")
async def test_get_agent_with_org_uuid(mock_settings_class, mock_async_client_class):
def test_get_agent_with_org_uuid(mock_settings_class, mock_make_request):
org_uuid = "test-org-uuid"
mock_settings = MagicMock()
mock_settings.org_uuid = org_uuid
@@ -427,15 +420,12 @@ async def test_get_agent_with_org_uuid(mock_settings_class, mock_async_client_cl
api = PlusAPI("test_api_key")
mock_response = MagicMock()
mock_client_instance = AsyncMock()
mock_client_instance.get.return_value = mock_response
mock_async_client_class.return_value.__aenter__.return_value = mock_client_instance
mock_make_request.return_value = mock_response
response = await api.get_agent("test_agent_handle")
response = api.get_agent("test_agent_handle")
mock_client_instance.get.assert_called_once_with(
f"{api.base_url}/crewai_plus/api/v1/agents/test_agent_handle",
headers=api.headers,
mock_make_request.assert_called_once_with(
"GET", "/crewai_plus/api/v1/agents/test_agent_handle"
)
assert "X-Crewai-Organization-Id" in api.headers
assert api.headers["X-Crewai-Organization-Id"] == org_uuid

View File

@@ -355,6 +355,24 @@ class TestLoadAgent:
with pytest.raises(Exception):
load_agent(agent_file)
@pytest.mark.parametrize("field", ["role", "goal", "backstory"])
def test_load_agent_rejects_null_required_fields(
self, tmp_path: Path, field: str
):
agent_def = {
"role": "Researcher",
"goal": "Find information",
"backstory": "Expert researcher.",
}
agent_def[field] = None
agent_file = tmp_path / "agent.json"
agent_file.write_text(json.dumps(agent_def))
with pytest.raises(
JSONProjectValidationError, match=f"missing required field '{field}'"
):
load_agent(agent_file)
def test_load_agent_file_not_found(self):
with pytest.raises(FileNotFoundError):
load_agent(Path("/nonexistent/agent.json"))

View File

@@ -1163,6 +1163,139 @@ methods:
}
def test_agent_action_runs_repository_yaml_definition(
monkeypatch: pytest.MonkeyPatch,
):
from crewai import Agent
from crewai.plus_api import PlusAPI
fetched_agents: list[str] = []
class FakeResponse:
status_code = 200
text = ""
def json(self) -> dict[str, Any]:
return {
"role": "Repository specialist",
"goal": "Answer support questions",
"backstory": "Loaded from the agent repository.",
"max_iter": 3,
"tools": [],
}
def fake_get_agent(self: PlusAPI, handle: str) -> FakeResponse:
fetched_agents.append(handle)
return FakeResponse()
async def fake_kickoff_async(
self: Agent, messages: str, **_kwargs: Any
) -> dict[str, Any]:
return {"agent": self.role, "input": messages, "max_iter": self.max_iter}
monkeypatch.setattr("crewai.auth.token.get_auth_token", lambda: "test-token")
monkeypatch.setattr(PlusAPI, "get_agent", fake_get_agent)
monkeypatch.setattr(Agent, "kickoff_async", fake_kickoff_async)
yaml_str = """
schema: crewai.flow/v1
name: AgentFlow
methods:
answer:
do:
call: agent
with:
from_repository: support_specialist
input: "${state.question}"
start: true
"""
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff(inputs={"question": "What is CrewAI?"}) == {
"agent": "Repository specialist",
"input": "What is CrewAI?",
"max_iter": 3,
}
assert fetched_agents == ["support_specialist"]
def test_agent_action_repository_fetch_does_not_block_event_loop(
monkeypatch: pytest.MonkeyPatch,
):
from crewai import Agent
from crewai.plus_api import PlusAPI
loop_marker_ran = threading.Event()
fetch_started = threading.Event()
release_fetch = threading.Event()
fetch_saw_loop_marker = False
class FakeResponse:
status_code = 200
text = ""
def json(self) -> dict[str, Any]:
return {
"role": "Repository specialist",
"goal": "Answer support questions",
"backstory": "Loaded from the agent repository.",
"tools": [],
}
def fake_get_agent(self: PlusAPI, handle: str) -> FakeResponse:
nonlocal fetch_saw_loop_marker
fetch_started.set()
release_fetch.wait(timeout=1)
fetch_saw_loop_marker = loop_marker_ran.is_set()
return FakeResponse()
async def fake_kickoff_async(
self: Agent, messages: str, **_kwargs: Any
) -> str:
return f"{self.role}:{messages}"
monkeypatch.setattr("crewai.auth.token.get_auth_token", lambda: "test-token")
monkeypatch.setattr(PlusAPI, "get_agent", fake_get_agent)
monkeypatch.setattr(Agent, "kickoff_async", fake_kickoff_async)
yaml_str = """
schema: crewai.flow/v1
name: AgentFlow
methods:
answer:
do:
call: agent
with:
from_repository: support_specialist
input: "${state.question}"
start: true
"""
flow = Flow.from_declaration(contents=yaml_str)
async def run_flow() -> str:
async def mark_loop_progress() -> None:
while not fetch_started.is_set():
await asyncio.sleep(0)
loop_marker_ran.set()
release_fetch.set()
marker_task = asyncio.create_task(mark_loop_progress())
kickoff_task = asyncio.create_task(
flow.kickoff_async(inputs={"question": "What is CrewAI?"})
)
try:
result = await asyncio.wait_for(kickoff_task, timeout=2)
await asyncio.wait_for(marker_task, timeout=2)
return result
finally:
release_fetch.set()
assert asyncio.run(run_flow()) == "Repository specialist:What is CrewAI?"
assert fetch_saw_loop_marker
def test_agent_action_renders_text_custom_expression_input(
monkeypatch: pytest.MonkeyPatch,
):
@@ -1281,6 +1414,7 @@ def test_agent_action_json_schema_describes_inline_agent_definitions():
"role",
"goal",
"backstory",
"from_repository",
"settings",
"llm",
"input",
@@ -1385,6 +1519,167 @@ methods:
}
def test_crew_action_runs_repository_agent_yaml_definition(
monkeypatch: pytest.MonkeyPatch,
):
from crewai import Crew
from crewai.plus_api import PlusAPI
fetched_agents: list[str] = []
class FakeResponse:
status_code = 200
text = ""
def json(self) -> dict[str, Any]:
return {
"role": "Repository researcher",
"goal": "Research {topic}",
"backstory": "Loaded from the agent repository.",
"max_iter": 5,
"tools": [],
}
def fake_get_agent(self: PlusAPI, handle: str) -> FakeResponse:
fetched_agents.append(handle)
return FakeResponse()
async def fake_kickoff_async(
self: Crew, inputs: dict[str, Any] | None = None, **_kwargs: Any
) -> dict[str, Any]:
return {
"crew": self.name,
"agents": [
{"role": agent.role, "max_iter": agent.max_iter}
for agent in self.agents
],
"tasks": [task.description for task in self.tasks],
"inputs": inputs,
}
monkeypatch.setattr("crewai.auth.token.get_auth_token", lambda: "test-token")
monkeypatch.setattr(PlusAPI, "get_agent", fake_get_agent)
monkeypatch.setattr(Crew, "kickoff_async", fake_kickoff_async)
yaml_str = """
schema: crewai.flow/v1
name: CrewFlow
methods:
research:
do:
call: crew
with:
name: inline_research
agents:
researcher:
from_repository: researcher
tasks:
- name: research_task
description: Research {topic}
expected_output: Findings about {topic}
agent: researcher
inputs:
topic: "${state.topic}"
start: true
"""
flow = Flow.from_declaration(contents=yaml_str)
assert flow.kickoff(inputs={"topic": "AI"}) == {
"crew": "inline_research",
"agents": [{"role": "Repository researcher", "max_iter": 5}],
"tasks": ["Research {topic}"],
"inputs": {"topic": "AI"},
}
assert fetched_agents == ["researcher"]
def test_crew_action_repository_fetch_does_not_block_event_loop(
monkeypatch: pytest.MonkeyPatch,
):
from crewai import Crew
from crewai.plus_api import PlusAPI
loop_marker_ran = threading.Event()
fetch_started = threading.Event()
release_fetch = threading.Event()
fetch_saw_loop_marker = False
class FakeResponse:
status_code = 200
text = ""
def json(self) -> dict[str, Any]:
return {
"role": "Repository researcher",
"goal": "Research {topic}",
"backstory": "Loaded from the agent repository.",
"tools": [],
}
def fake_get_agent(self: PlusAPI, handle: str) -> FakeResponse:
nonlocal fetch_saw_loop_marker
fetch_started.set()
release_fetch.wait(timeout=1)
fetch_saw_loop_marker = loop_marker_ran.is_set()
return FakeResponse()
async def fake_kickoff_async(
self: Crew, inputs: dict[str, Any] | None = None, **_kwargs: Any
) -> dict[str, Any]:
return {"agents": [agent.role for agent in self.agents], "inputs": inputs}
monkeypatch.setattr("crewai.auth.token.get_auth_token", lambda: "test-token")
monkeypatch.setattr(PlusAPI, "get_agent", fake_get_agent)
monkeypatch.setattr(Crew, "kickoff_async", fake_kickoff_async)
yaml_str = """
schema: crewai.flow/v1
name: CrewFlow
methods:
research:
do:
call: crew
with:
agents:
researcher:
from_repository: researcher
tasks:
- description: Research {topic}
expected_output: Findings about {topic}
agent: researcher
inputs:
topic: "${state.topic}"
start: true
"""
flow = Flow.from_declaration(contents=yaml_str)
async def run_flow() -> dict[str, Any]:
async def mark_loop_progress() -> None:
while not fetch_started.is_set():
await asyncio.sleep(0)
loop_marker_ran.set()
release_fetch.set()
marker_task = asyncio.create_task(mark_loop_progress())
kickoff_task = asyncio.create_task(
flow.kickoff_async(inputs={"topic": "AI"})
)
try:
result = await asyncio.wait_for(kickoff_task, timeout=2)
await asyncio.wait_for(marker_task, timeout=2)
return result
finally:
release_fetch.set()
assert asyncio.run(run_flow()) == {
"agents": ["Repository researcher"],
"inputs": {"topic": "AI"},
}
assert fetch_saw_loop_marker
def test_crew_action_interpolates_runtime_strings_and_lists(
monkeypatch: pytest.MonkeyPatch,
):
@@ -1709,6 +2004,7 @@ def test_crew_action_json_schema_describes_inline_crew_definitions():
"role",
"goal",
"backstory",
"from_repository",
"settings",
"llm",
"tools",
@@ -1728,36 +2024,45 @@ def test_crew_action_json_schema_describes_inline_crew_definitions():
def test_crew_action_rejects_incomplete_inline_agent_definition():
with pytest.raises(ValidationError, match="goal"):
FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "CrewFlow",
"methods": {
"research": {
"start": True,
"do": {
"call": "crew",
"with": {
"agents": {
"researcher": {
"role": "Researcher",
"backstory": "Knows things.",
}
},
"tasks": [
{
"description": "Research",
"expected_output": "Findings",
"agent": "researcher",
}
],
from crewai.project.crew_loader import load_crew_from_definition
from crewai.project.json_loader import JSONProjectValidationError
definition = FlowDefinition.from_declaration(contents=
{
"schema": "crewai.flow/v1",
"name": "CrewFlow",
"methods": {
"research": {
"start": True,
"do": {
"call": "crew",
"with": {
"agents": {
"researcher": {
"role": "Researcher",
"backstory": "Knows things.",
}
},
"tasks": [
{
"description": "Research",
"expected_output": "Findings",
"agent": "researcher",
}
],
},
}
},
}
)
},
}
},
}
)
crew_definition = definition.methods["research"].do.with_
assert crew_definition.agents["researcher"].goal is None
with pytest.raises(
JSONProjectValidationError, match="missing required field 'goal'"
):
load_crew_from_definition(crew_definition, source="crew action")
def test_crew_action_rejects_python_ref_field():