Compare commits

...

1 Commits

Author SHA1 Message Date
Devin AI
c835152f29 Add regression tests for kickoff_for_each inside flows (#4385)
Add tests covering crew.copy(), kickoff_for_each, kickoff_for_each_async,
and akickoff_for_each when called inside a Flow[MyState] method.

These tests ensure the fix from commit 9d7f4537 (replacing parent_flow
field with context variables) prevents the Pydantic validation error
reported in issue #4385.

Co-Authored-By: João <joao@crewai.com>
2026-02-05 22:42:31 +00:00

View File

@@ -4856,3 +4856,200 @@ def test_ensure_exchanged_messages_are_propagated_to_external_memory():
assert "Researcher" in messages[0]["content"]
assert messages[1]["role"] == "user"
assert "Research a topic to teach a kid aged 6 about math" in messages[1]["content"]
def test_crew_copy_inside_typed_flow():
"""Tests that crew.copy() works inside a Flow[MyState] without Pydantic validation errors.
Regression test for https://github.com/crewAIInc/crewAI/issues/4385
"""
class MyState(BaseModel):
question: str = ""
captured_copy = None
class MyFlow(Flow[MyState]):
@start()
def run_batch(self):
nonlocal captured_copy
agent = Agent(role="Worker", goal="Do work", backstory="A worker")
task = Task(
description="{task_input}",
agent=agent,
expected_output="output",
)
crew = Crew(
agents=[agent],
tasks=[task],
process=Process.sequential,
)
captured_copy = crew.copy()
return "done"
flow = MyFlow()
flow.kickoff()
assert captured_copy is not None
assert len(captured_copy.agents) == 1
assert len(captured_copy.tasks) == 1
def test_kickoff_for_each_inside_flow():
"""Tests that kickoff_for_each works when called inside a Flow method.
Regression test for https://github.com/crewAIInc/crewAI/issues/4385
"""
class MyState(BaseModel):
question: str = ""
captured_results = None
class MyFlow(Flow[MyState]):
@start()
def run_batch(self):
nonlocal captured_results
agent = Agent(role="Worker", goal="Do work", backstory="A worker")
task = Task(
description="{task_input}",
agent=agent,
expected_output="output",
)
crew = Crew(
agents=[agent],
tasks=[task],
process=Process.sequential,
)
inputs = [
{"task_input": "Task 1"},
{"task_input": "Task 2"},
]
mock_output = CrewOutput(
raw="result",
tasks_output=[],
token_usage=UsageMetrics(),
json_dict=None,
pydantic=None,
)
with patch.object(Crew, "kickoff", return_value=mock_output):
captured_results = crew.kickoff_for_each(inputs=inputs)
return "done"
flow = MyFlow()
flow.kickoff()
assert captured_results is not None
assert len(captured_results) == 2
@pytest.mark.asyncio
async def test_kickoff_for_each_async_inside_flow():
"""Tests that kickoff_for_each_async works when called inside a Flow method.
Regression test for https://github.com/crewAIInc/crewAI/issues/4385
"""
class MyState(BaseModel):
question: str = ""
captured_results = None
class MyFlow(Flow[MyState]):
@start()
async def run_batch(self):
nonlocal captured_results
agent = Agent(role="Worker", goal="Do work", backstory="A worker")
task = Task(
description="{task_input}",
agent=agent,
expected_output="output",
)
crew = Crew(
agents=[agent],
tasks=[task],
process=Process.sequential,
)
inputs = [
{"task_input": "Task 1"},
{"task_input": "Task 2"},
]
mock_output = CrewOutput(
raw="result",
tasks_output=[],
token_usage=UsageMetrics(),
json_dict=None,
pydantic=None,
)
with patch.object(
Crew, "kickoff_async", return_value=mock_output
):
captured_results = await crew.kickoff_for_each_async(
inputs=inputs
)
return "done"
flow = MyFlow()
await flow.kickoff_async()
assert captured_results is not None
assert len(captured_results) == 2
@pytest.mark.asyncio
async def test_akickoff_for_each_inside_flow():
"""Tests that akickoff_for_each works when called inside a Flow method.
Regression test for https://github.com/crewAIInc/crewAI/issues/4385
"""
class MyState(BaseModel):
question: str = ""
captured_results = None
class MyFlow(Flow[MyState]):
@start()
async def run_batch(self):
nonlocal captured_results
agent = Agent(role="Worker", goal="Do work", backstory="A worker")
task = Task(
description="{task_input}",
agent=agent,
expected_output="output",
)
crew = Crew(
agents=[agent],
tasks=[task],
process=Process.sequential,
)
inputs = [
{"task_input": "Task 1"},
{"task_input": "Task 2"},
]
mock_output = CrewOutput(
raw="result",
tasks_output=[],
token_usage=UsageMetrics(),
json_dict=None,
pydantic=None,
)
async def mock_akickoff(**kwargs):
return mock_output
with patch.object(Crew, "akickoff", side_effect=mock_akickoff):
captured_results = await crew.akickoff_for_each(inputs=inputs)
return "done"
flow = MyFlow()
await flow.kickoff_async()
assert captured_results is not None
assert len(captured_results) == 2