diff --git a/lib/crewai/tests/test_crew.py b/lib/crewai/tests/test_crew.py index d2eeb531d..39d8b1fa3 100644 --- a/lib/crewai/tests/test_crew.py +++ b/lib/crewai/tests/test_crew.py @@ -4856,3 +4856,200 @@ def test_ensure_exchanged_messages_are_propagated_to_external_memory(): assert "Researcher" in messages[0]["content"] assert messages[1]["role"] == "user" assert "Research a topic to teach a kid aged 6 about math" in messages[1]["content"] + + +def test_crew_copy_inside_typed_flow(): + """Tests that crew.copy() works inside a Flow[MyState] without Pydantic validation errors. + + Regression test for https://github.com/crewAIInc/crewAI/issues/4385 + """ + + class MyState(BaseModel): + question: str = "" + + captured_copy = None + + class MyFlow(Flow[MyState]): + @start() + def run_batch(self): + nonlocal captured_copy + agent = Agent(role="Worker", goal="Do work", backstory="A worker") + task = Task( + description="{task_input}", + agent=agent, + expected_output="output", + ) + crew = Crew( + agents=[agent], + tasks=[task], + process=Process.sequential, + ) + captured_copy = crew.copy() + return "done" + + flow = MyFlow() + flow.kickoff() + + assert captured_copy is not None + assert len(captured_copy.agents) == 1 + assert len(captured_copy.tasks) == 1 + + +def test_kickoff_for_each_inside_flow(): + """Tests that kickoff_for_each works when called inside a Flow method. + + Regression test for https://github.com/crewAIInc/crewAI/issues/4385 + """ + + class MyState(BaseModel): + question: str = "" + + captured_results = None + + class MyFlow(Flow[MyState]): + @start() + def run_batch(self): + nonlocal captured_results + agent = Agent(role="Worker", goal="Do work", backstory="A worker") + task = Task( + description="{task_input}", + agent=agent, + expected_output="output", + ) + crew = Crew( + agents=[agent], + tasks=[task], + process=Process.sequential, + ) + + inputs = [ + {"task_input": "Task 1"}, + {"task_input": "Task 2"}, + ] + + mock_output = CrewOutput( + raw="result", + tasks_output=[], + token_usage=UsageMetrics(), + json_dict=None, + pydantic=None, + ) + with patch.object(Crew, "kickoff", return_value=mock_output): + captured_results = crew.kickoff_for_each(inputs=inputs) + return "done" + + flow = MyFlow() + flow.kickoff() + + assert captured_results is not None + assert len(captured_results) == 2 + + +@pytest.mark.asyncio +async def test_kickoff_for_each_async_inside_flow(): + """Tests that kickoff_for_each_async works when called inside a Flow method. + + Regression test for https://github.com/crewAIInc/crewAI/issues/4385 + """ + + class MyState(BaseModel): + question: str = "" + + captured_results = None + + class MyFlow(Flow[MyState]): + @start() + async def run_batch(self): + nonlocal captured_results + agent = Agent(role="Worker", goal="Do work", backstory="A worker") + task = Task( + description="{task_input}", + agent=agent, + expected_output="output", + ) + crew = Crew( + agents=[agent], + tasks=[task], + process=Process.sequential, + ) + + inputs = [ + {"task_input": "Task 1"}, + {"task_input": "Task 2"}, + ] + + mock_output = CrewOutput( + raw="result", + tasks_output=[], + token_usage=UsageMetrics(), + json_dict=None, + pydantic=None, + ) + with patch.object( + Crew, "kickoff_async", return_value=mock_output + ): + captured_results = await crew.kickoff_for_each_async( + inputs=inputs + ) + return "done" + + flow = MyFlow() + await flow.kickoff_async() + + assert captured_results is not None + assert len(captured_results) == 2 + + +@pytest.mark.asyncio +async def test_akickoff_for_each_inside_flow(): + """Tests that akickoff_for_each works when called inside a Flow method. + + Regression test for https://github.com/crewAIInc/crewAI/issues/4385 + """ + + class MyState(BaseModel): + question: str = "" + + captured_results = None + + class MyFlow(Flow[MyState]): + @start() + async def run_batch(self): + nonlocal captured_results + agent = Agent(role="Worker", goal="Do work", backstory="A worker") + task = Task( + description="{task_input}", + agent=agent, + expected_output="output", + ) + crew = Crew( + agents=[agent], + tasks=[task], + process=Process.sequential, + ) + + inputs = [ + {"task_input": "Task 1"}, + {"task_input": "Task 2"}, + ] + + mock_output = CrewOutput( + raw="result", + tasks_output=[], + token_usage=UsageMetrics(), + json_dict=None, + pydantic=None, + ) + + async def mock_akickoff(**kwargs): + return mock_output + + with patch.object(Crew, "akickoff", side_effect=mock_akickoff): + captured_results = await crew.akickoff_for_each(inputs=inputs) + return "done" + + flow = MyFlow() + await flow.kickoff_async() + + assert captured_results is not None + assert len(captured_results) == 2