From c835152f29dfe1a325b29629510f7af376e4865c Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 5 Feb 2026 22:42:31 +0000 Subject: [PATCH] Add regression tests for kickoff_for_each inside flows (#4385) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add tests covering crew.copy(), kickoff_for_each, kickoff_for_each_async, and akickoff_for_each when called inside a Flow[MyState] method. These tests ensure the fix from commit 9d7f4537 (replacing parent_flow field with context variables) prevents the Pydantic validation error reported in issue #4385. Co-Authored-By: João --- lib/crewai/tests/test_crew.py | 197 ++++++++++++++++++++++++++++++++++ 1 file changed, 197 insertions(+) diff --git a/lib/crewai/tests/test_crew.py b/lib/crewai/tests/test_crew.py index d2eeb531d..39d8b1fa3 100644 --- a/lib/crewai/tests/test_crew.py +++ b/lib/crewai/tests/test_crew.py @@ -4856,3 +4856,200 @@ def test_ensure_exchanged_messages_are_propagated_to_external_memory(): assert "Researcher" in messages[0]["content"] assert messages[1]["role"] == "user" assert "Research a topic to teach a kid aged 6 about math" in messages[1]["content"] + + +def test_crew_copy_inside_typed_flow(): + """Tests that crew.copy() works inside a Flow[MyState] without Pydantic validation errors. + + Regression test for https://github.com/crewAIInc/crewAI/issues/4385 + """ + + class MyState(BaseModel): + question: str = "" + + captured_copy = None + + class MyFlow(Flow[MyState]): + @start() + def run_batch(self): + nonlocal captured_copy + agent = Agent(role="Worker", goal="Do work", backstory="A worker") + task = Task( + description="{task_input}", + agent=agent, + expected_output="output", + ) + crew = Crew( + agents=[agent], + tasks=[task], + process=Process.sequential, + ) + captured_copy = crew.copy() + return "done" + + flow = MyFlow() + flow.kickoff() + + assert captured_copy is not None + assert len(captured_copy.agents) == 1 + assert len(captured_copy.tasks) == 1 + + +def test_kickoff_for_each_inside_flow(): + """Tests that kickoff_for_each works when called inside a Flow method. + + Regression test for https://github.com/crewAIInc/crewAI/issues/4385 + """ + + class MyState(BaseModel): + question: str = "" + + captured_results = None + + class MyFlow(Flow[MyState]): + @start() + def run_batch(self): + nonlocal captured_results + agent = Agent(role="Worker", goal="Do work", backstory="A worker") + task = Task( + description="{task_input}", + agent=agent, + expected_output="output", + ) + crew = Crew( + agents=[agent], + tasks=[task], + process=Process.sequential, + ) + + inputs = [ + {"task_input": "Task 1"}, + {"task_input": "Task 2"}, + ] + + mock_output = CrewOutput( + raw="result", + tasks_output=[], + token_usage=UsageMetrics(), + json_dict=None, + pydantic=None, + ) + with patch.object(Crew, "kickoff", return_value=mock_output): + captured_results = crew.kickoff_for_each(inputs=inputs) + return "done" + + flow = MyFlow() + flow.kickoff() + + assert captured_results is not None + assert len(captured_results) == 2 + + +@pytest.mark.asyncio +async def test_kickoff_for_each_async_inside_flow(): + """Tests that kickoff_for_each_async works when called inside a Flow method. + + Regression test for https://github.com/crewAIInc/crewAI/issues/4385 + """ + + class MyState(BaseModel): + question: str = "" + + captured_results = None + + class MyFlow(Flow[MyState]): + @start() + async def run_batch(self): + nonlocal captured_results + agent = Agent(role="Worker", goal="Do work", backstory="A worker") + task = Task( + description="{task_input}", + agent=agent, + expected_output="output", + ) + crew = Crew( + agents=[agent], + tasks=[task], + process=Process.sequential, + ) + + inputs = [ + {"task_input": "Task 1"}, + {"task_input": "Task 2"}, + ] + + mock_output = CrewOutput( + raw="result", + tasks_output=[], + token_usage=UsageMetrics(), + json_dict=None, + pydantic=None, + ) + with patch.object( + Crew, "kickoff_async", return_value=mock_output + ): + captured_results = await crew.kickoff_for_each_async( + inputs=inputs + ) + return "done" + + flow = MyFlow() + await flow.kickoff_async() + + assert captured_results is not None + assert len(captured_results) == 2 + + +@pytest.mark.asyncio +async def test_akickoff_for_each_inside_flow(): + """Tests that akickoff_for_each works when called inside a Flow method. + + Regression test for https://github.com/crewAIInc/crewAI/issues/4385 + """ + + class MyState(BaseModel): + question: str = "" + + captured_results = None + + class MyFlow(Flow[MyState]): + @start() + async def run_batch(self): + nonlocal captured_results + agent = Agent(role="Worker", goal="Do work", backstory="A worker") + task = Task( + description="{task_input}", + agent=agent, + expected_output="output", + ) + crew = Crew( + agents=[agent], + tasks=[task], + process=Process.sequential, + ) + + inputs = [ + {"task_input": "Task 1"}, + {"task_input": "Task 2"}, + ] + + mock_output = CrewOutput( + raw="result", + tasks_output=[], + token_usage=UsageMetrics(), + json_dict=None, + pydantic=None, + ) + + async def mock_akickoff(**kwargs): + return mock_output + + with patch.object(Crew, "akickoff", side_effect=mock_akickoff): + captured_results = await crew.akickoff_for_each(inputs=inputs) + return "done" + + flow = MyFlow() + await flow.kickoff_async() + + assert captured_results is not None + assert len(captured_results) == 2