mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-07-02 13:48:09 +00:00
Add regression tests for kickoff_for_each inside flows (#4385)
Add tests covering crew.copy(), kickoff_for_each, kickoff_for_each_async,
and akickoff_for_each when called inside a Flow[MyState] method.
These tests ensure the fix from commit 9d7f4537 (replacing parent_flow
field with context variables) prevents the Pydantic validation error
reported in issue #4385.
Co-Authored-By: João <joao@crewai.com>
This commit is contained in:
@@ -4856,3 +4856,200 @@ def test_ensure_exchanged_messages_are_propagated_to_external_memory():
|
||||
assert "Researcher" in messages[0]["content"]
|
||||
assert messages[1]["role"] == "user"
|
||||
assert "Research a topic to teach a kid aged 6 about math" in messages[1]["content"]
|
||||
|
||||
|
||||
def test_crew_copy_inside_typed_flow():
|
||||
"""Tests that crew.copy() works inside a Flow[MyState] without Pydantic validation errors.
|
||||
|
||||
Regression test for https://github.com/crewAIInc/crewAI/issues/4385
|
||||
"""
|
||||
|
||||
class MyState(BaseModel):
|
||||
question: str = ""
|
||||
|
||||
captured_copy = None
|
||||
|
||||
class MyFlow(Flow[MyState]):
|
||||
@start()
|
||||
def run_batch(self):
|
||||
nonlocal captured_copy
|
||||
agent = Agent(role="Worker", goal="Do work", backstory="A worker")
|
||||
task = Task(
|
||||
description="{task_input}",
|
||||
agent=agent,
|
||||
expected_output="output",
|
||||
)
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
process=Process.sequential,
|
||||
)
|
||||
captured_copy = crew.copy()
|
||||
return "done"
|
||||
|
||||
flow = MyFlow()
|
||||
flow.kickoff()
|
||||
|
||||
assert captured_copy is not None
|
||||
assert len(captured_copy.agents) == 1
|
||||
assert len(captured_copy.tasks) == 1
|
||||
|
||||
|
||||
def test_kickoff_for_each_inside_flow():
|
||||
"""Tests that kickoff_for_each works when called inside a Flow method.
|
||||
|
||||
Regression test for https://github.com/crewAIInc/crewAI/issues/4385
|
||||
"""
|
||||
|
||||
class MyState(BaseModel):
|
||||
question: str = ""
|
||||
|
||||
captured_results = None
|
||||
|
||||
class MyFlow(Flow[MyState]):
|
||||
@start()
|
||||
def run_batch(self):
|
||||
nonlocal captured_results
|
||||
agent = Agent(role="Worker", goal="Do work", backstory="A worker")
|
||||
task = Task(
|
||||
description="{task_input}",
|
||||
agent=agent,
|
||||
expected_output="output",
|
||||
)
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
process=Process.sequential,
|
||||
)
|
||||
|
||||
inputs = [
|
||||
{"task_input": "Task 1"},
|
||||
{"task_input": "Task 2"},
|
||||
]
|
||||
|
||||
mock_output = CrewOutput(
|
||||
raw="result",
|
||||
tasks_output=[],
|
||||
token_usage=UsageMetrics(),
|
||||
json_dict=None,
|
||||
pydantic=None,
|
||||
)
|
||||
with patch.object(Crew, "kickoff", return_value=mock_output):
|
||||
captured_results = crew.kickoff_for_each(inputs=inputs)
|
||||
return "done"
|
||||
|
||||
flow = MyFlow()
|
||||
flow.kickoff()
|
||||
|
||||
assert captured_results is not None
|
||||
assert len(captured_results) == 2
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_kickoff_for_each_async_inside_flow():
|
||||
"""Tests that kickoff_for_each_async works when called inside a Flow method.
|
||||
|
||||
Regression test for https://github.com/crewAIInc/crewAI/issues/4385
|
||||
"""
|
||||
|
||||
class MyState(BaseModel):
|
||||
question: str = ""
|
||||
|
||||
captured_results = None
|
||||
|
||||
class MyFlow(Flow[MyState]):
|
||||
@start()
|
||||
async def run_batch(self):
|
||||
nonlocal captured_results
|
||||
agent = Agent(role="Worker", goal="Do work", backstory="A worker")
|
||||
task = Task(
|
||||
description="{task_input}",
|
||||
agent=agent,
|
||||
expected_output="output",
|
||||
)
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
process=Process.sequential,
|
||||
)
|
||||
|
||||
inputs = [
|
||||
{"task_input": "Task 1"},
|
||||
{"task_input": "Task 2"},
|
||||
]
|
||||
|
||||
mock_output = CrewOutput(
|
||||
raw="result",
|
||||
tasks_output=[],
|
||||
token_usage=UsageMetrics(),
|
||||
json_dict=None,
|
||||
pydantic=None,
|
||||
)
|
||||
with patch.object(
|
||||
Crew, "kickoff_async", return_value=mock_output
|
||||
):
|
||||
captured_results = await crew.kickoff_for_each_async(
|
||||
inputs=inputs
|
||||
)
|
||||
return "done"
|
||||
|
||||
flow = MyFlow()
|
||||
await flow.kickoff_async()
|
||||
|
||||
assert captured_results is not None
|
||||
assert len(captured_results) == 2
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_akickoff_for_each_inside_flow():
|
||||
"""Tests that akickoff_for_each works when called inside a Flow method.
|
||||
|
||||
Regression test for https://github.com/crewAIInc/crewAI/issues/4385
|
||||
"""
|
||||
|
||||
class MyState(BaseModel):
|
||||
question: str = ""
|
||||
|
||||
captured_results = None
|
||||
|
||||
class MyFlow(Flow[MyState]):
|
||||
@start()
|
||||
async def run_batch(self):
|
||||
nonlocal captured_results
|
||||
agent = Agent(role="Worker", goal="Do work", backstory="A worker")
|
||||
task = Task(
|
||||
description="{task_input}",
|
||||
agent=agent,
|
||||
expected_output="output",
|
||||
)
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
process=Process.sequential,
|
||||
)
|
||||
|
||||
inputs = [
|
||||
{"task_input": "Task 1"},
|
||||
{"task_input": "Task 2"},
|
||||
]
|
||||
|
||||
mock_output = CrewOutput(
|
||||
raw="result",
|
||||
tasks_output=[],
|
||||
token_usage=UsageMetrics(),
|
||||
json_dict=None,
|
||||
pydantic=None,
|
||||
)
|
||||
|
||||
async def mock_akickoff(**kwargs):
|
||||
return mock_output
|
||||
|
||||
with patch.object(Crew, "akickoff", side_effect=mock_akickoff):
|
||||
captured_results = await crew.akickoff_for_each(inputs=inputs)
|
||||
return "done"
|
||||
|
||||
flow = MyFlow()
|
||||
await flow.kickoff_async()
|
||||
|
||||
assert captured_results is not None
|
||||
assert len(captured_results) == 2
|
||||
|
||||
Reference in New Issue
Block a user