diff --git a/lib/crewai/src/crewai/crew.py b/lib/crewai/src/crewai/crew.py index 980830af5..49487c580 100644 --- a/lib/crewai/src/crewai/crew.py +++ b/lib/crewai/src/crewai/crew.py @@ -1677,6 +1677,7 @@ class Crew(FlowTrackable, BaseModel): "knowledge", "manager_agent", "manager_llm", + "parent_flow", } cloned_agents = [agent.copy() for agent in self.agents] diff --git a/lib/crewai/tests/test_crew.py b/lib/crewai/tests/test_crew.py index 64d122a7c..d09bbd7fc 100644 --- a/lib/crewai/tests/test_crew.py +++ b/lib/crewai/tests/test_crew.py @@ -32,7 +32,7 @@ from crewai.events.types.memory_events import ( MemorySaveFailedEvent, MemorySaveStartedEvent, ) -from crewai.flow import Flow, start +from crewai.flow import Flow, listen, start from crewai.knowledge.knowledge import Knowledge from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource from crewai.llm import LLM @@ -4793,3 +4793,366 @@ def test_memory_remember_receives_task_content(): assert "Researcher" in raw assert "Expected result:" in raw assert "Result:" in raw + + +def test_kickoff_for_each_in_flow_listener_with_typed_state(): + """Tests that kickoff_for_each works inside a @listen method of a Flow[TypedState]. + + Regression test for https://github.com/crewAIInc/crewAI/issues/4555 + """ + + class ResearchState(BaseModel): + chapter: str = "Basics of Distributed Systems" + topics: dict[str, str] = {} + + captured_results = None + + class ResearchFlow(Flow[ResearchState]): + @start() + def generate_topics(self): + return ["Topic A", "Topic B"] + + @listen(generate_topics) + def generate_sections(self, topics): + nonlocal captured_results + agent = Agent(role="Writer", goal="Write sections", backstory="Expert writer") + task = Task( + description="Write about {topic}", + agent=agent, + expected_output="A section", + ) + crew = Crew( + agents=[agent], + tasks=[task], + process=Process.sequential, + ) + + inputs = [{"topic": t} for t in topics] + + mock_output = CrewOutput( + raw="section content", + tasks_output=[], + token_usage=UsageMetrics(), + json_dict=None, + pydantic=None, + ) + with patch.object(Crew, "kickoff", return_value=mock_output): + captured_results = crew.kickoff_for_each(inputs=inputs) + return "done" + + flow = ResearchFlow() + flow.kickoff() + + assert captured_results is not None + assert len(captured_results) == 2 + + +def test_kickoff_for_each_with_crewbase_inside_flow_listener(): + """Tests that kickoff_for_each works with @CrewBase pattern inside a flow listener. + + Regression test for https://github.com/crewAIInc/crewAI/issues/4555 + """ + + @CrewBase + class WriterCrew: + agents_config = None + tasks_config = None + + agents: list[BaseAgent] + tasks: list[Task] + + @agent + def writer_agent(self) -> Agent: + return Agent( + role="Writer", + goal="Write content", + backstory="Expert writer", + ) + + @task + def write_task(self) -> Task: + return Task( + description="Write about {topic}", + expected_output="Written content", + agent=self.writer_agent(), + ) + + @crew + def crew(self) -> Crew: + return Crew( + agents=self.agents, + tasks=self.tasks, + process=Process.sequential, + ) + + class MyState(BaseModel): + topics: list[str] = [] + + captured_results = None + + class MyFlow(Flow[MyState]): + @start() + def research(self): + return ["Topic 1", "Topic 2", "Topic 3"] + + @listen(research) + def write_sections(self, topics): + nonlocal captured_results + writer_crew = WriterCrew().crew() + + inputs = [{"topic": t} for t in topics] + + mock_output = CrewOutput( + raw="content", + tasks_output=[], + token_usage=UsageMetrics(), + json_dict=None, + pydantic=None, + ) + with patch.object(Crew, "kickoff", return_value=mock_output): + captured_results = writer_crew.kickoff_for_each(inputs=inputs) + return "done" + + flow = MyFlow() + flow.kickoff() + + assert captured_results is not None + assert len(captured_results) == 3 + + +def test_crew_copy_preserves_flow_context_in_listener(): + """Tests that crew.copy() inside a @listen method preserves flow context. + + Regression test for https://github.com/crewAIInc/crewAI/issues/4555 + """ + + class MyState(BaseModel): + data: str = "" + + captured_original = None + captured_copy = None + + class MyFlow(Flow[MyState]): + @start() + def step_one(self): + return "data" + + @listen(step_one) + def step_two(self, result): + nonlocal captured_original, captured_copy + agent = Agent(role="Worker", goal="Do work", backstory="A worker") + task = Task( + description="Work on {input}", + agent=agent, + expected_output="output", + ) + original = Crew( + agents=[agent], + tasks=[task], + process=Process.sequential, + ) + captured_original = original + captured_copy = original.copy() + return "done" + + flow = MyFlow() + flow.kickoff() + + assert captured_original is not None + assert captured_copy is not None + assert len(captured_copy.agents) == 1 + assert len(captured_copy.tasks) == 1 + assert captured_copy._flow_id == flow.flow_id + assert captured_copy._request_id == flow.flow_id + + +@pytest.mark.asyncio +async def test_kickoff_for_each_async_in_flow_listener(): + """Tests that kickoff_for_each_async works inside a @listen method of a typed Flow. + + Regression test for https://github.com/crewAIInc/crewAI/issues/4555 + """ + + class MyState(BaseModel): + question: str = "" + + captured_results = None + + class MyFlow(Flow[MyState]): + @start() + async def step_one(self): + return ["input1", "input2"] + + @listen(step_one) + async def step_two(self, items): + nonlocal captured_results + agent = Agent(role="Worker", goal="Do work", backstory="A worker") + task = Task( + description="{task_input}", + agent=agent, + expected_output="output", + ) + crew = Crew( + agents=[agent], + tasks=[task], + process=Process.sequential, + ) + + inputs = [{"task_input": item} for item in items] + + mock_output = CrewOutput( + raw="result", + tasks_output=[], + token_usage=UsageMetrics(), + json_dict=None, + pydantic=None, + ) + with patch.object(Crew, "kickoff_async", return_value=mock_output): + captured_results = await crew.kickoff_for_each_async(inputs=inputs) + return "done" + + flow = MyFlow() + await flow.kickoff_async() + + assert captured_results is not None + assert len(captured_results) == 2 + + +@pytest.mark.asyncio +async def test_akickoff_for_each_in_flow_listener(): + """Tests that akickoff_for_each works inside a @listen method of a typed Flow. + + Regression test for https://github.com/crewAIInc/crewAI/issues/4555 + """ + + class MyState(BaseModel): + question: str = "" + + captured_results = None + + class MyFlow(Flow[MyState]): + @start() + async def step_one(self): + return ["input1", "input2"] + + @listen(step_one) + async def step_two(self, items): + nonlocal captured_results + agent = Agent(role="Worker", goal="Do work", backstory="A worker") + task = Task( + description="{task_input}", + agent=agent, + expected_output="output", + ) + crew = Crew( + agents=[agent], + tasks=[task], + process=Process.sequential, + ) + + inputs = [{"task_input": item} for item in items] + + mock_output = CrewOutput( + raw="result", + tasks_output=[], + token_usage=UsageMetrics(), + json_dict=None, + pydantic=None, + ) + + async def mock_akickoff(**kwargs): + return mock_output + + with patch.object(Crew, "akickoff", side_effect=mock_akickoff): + captured_results = await crew.akickoff_for_each(inputs=inputs) + return "done" + + flow = MyFlow() + await flow.kickoff_async() + + assert captured_results is not None + assert len(captured_results) == 2 + + +def test_multiple_kickoff_for_each_in_chained_listeners(): + """Tests that kickoff_for_each works across multiple chained @listen methods. + + Regression test for https://github.com/crewAIInc/crewAI/issues/4555 + """ + + class ChainState(BaseModel): + chapter: str = "Test Chapter" + topics: list[str] = [] + sections: list[str] = [] + + step1_results = None + step2_results = None + + class ChainedFlow(Flow[ChainState]): + @start() + def generate_topics(self): + return ["Topic A", "Topic B"] + + @listen(generate_topics) + def research_topics(self, topics): + nonlocal step1_results + agent = Agent(role="Researcher", goal="Research topics", backstory="Expert") + task = Task( + description="Research {topic}", + agent=agent, + expected_output="Research output", + ) + crew = Crew( + agents=[agent], + tasks=[task], + process=Process.sequential, + ) + + inputs = [{"topic": t} for t in topics] + + mock_output = CrewOutput( + raw="research result", + tasks_output=[], + token_usage=UsageMetrics(), + json_dict=None, + pydantic=None, + ) + with patch.object(Crew, "kickoff", return_value=mock_output): + step1_results = crew.kickoff_for_each(inputs=inputs) + return topics + + @listen(research_topics) + def write_sections(self, topics): + nonlocal step2_results + agent = Agent(role="Writer", goal="Write sections", backstory="Expert") + task = Task( + description="Write about {topic}", + agent=agent, + expected_output="Section output", + ) + crew = Crew( + agents=[agent], + tasks=[task], + process=Process.sequential, + ) + + inputs = [{"topic": t} for t in topics] + + mock_output = CrewOutput( + raw="section content", + tasks_output=[], + token_usage=UsageMetrics(), + json_dict=None, + pydantic=None, + ) + with patch.object(Crew, "kickoff", return_value=mock_output): + step2_results = crew.kickoff_for_each(inputs=inputs) + return "done" + + flow = ChainedFlow() + flow.kickoff() + + assert step1_results is not None + assert len(step1_results) == 2 + assert step2_results is not None + assert len(step2_results) == 2