Compare commits

...

1 Commits

Author SHA1 Message Date
Devin AI
b6bb56146e Fix #4555: Flows do not work with kickoff_for_each
- Add 'parent_flow' to the exclude set in Crew.copy() for forward
  compatibility with older versions that had parent_flow as a Pydantic
  field on FlowTrackable
- Add comprehensive regression tests covering:
  - kickoff_for_each inside a @listen method of a Flow[TypedState]
  - kickoff_for_each with @CrewBase pattern inside flow listeners
  - crew.copy() preserving flow context in listeners
  - kickoff_for_each_async in flow listeners
  - akickoff_for_each in flow listeners
  - Multiple kickoff_for_each across chained @listen methods

Co-Authored-By: João <joao@crewai.com>
2026-02-21 12:30:56 +00:00
2 changed files with 365 additions and 1 deletions

View File

@@ -1677,6 +1677,7 @@ class Crew(FlowTrackable, BaseModel):
"knowledge",
"manager_agent",
"manager_llm",
"parent_flow",
}
cloned_agents = [agent.copy() for agent in self.agents]

View File

@@ -32,7 +32,7 @@ from crewai.events.types.memory_events import (
MemorySaveFailedEvent,
MemorySaveStartedEvent,
)
from crewai.flow import Flow, start
from crewai.flow import Flow, listen, start
from crewai.knowledge.knowledge import Knowledge
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai.llm import LLM
@@ -4793,3 +4793,366 @@ def test_memory_remember_receives_task_content():
assert "Researcher" in raw
assert "Expected result:" in raw
assert "Result:" in raw
def test_kickoff_for_each_in_flow_listener_with_typed_state():
"""Tests that kickoff_for_each works inside a @listen method of a Flow[TypedState].
Regression test for https://github.com/crewAIInc/crewAI/issues/4555
"""
class ResearchState(BaseModel):
chapter: str = "Basics of Distributed Systems"
topics: dict[str, str] = {}
captured_results = None
class ResearchFlow(Flow[ResearchState]):
@start()
def generate_topics(self):
return ["Topic A", "Topic B"]
@listen(generate_topics)
def generate_sections(self, topics):
nonlocal captured_results
agent = Agent(role="Writer", goal="Write sections", backstory="Expert writer")
task = Task(
description="Write about {topic}",
agent=agent,
expected_output="A section",
)
crew = Crew(
agents=[agent],
tasks=[task],
process=Process.sequential,
)
inputs = [{"topic": t} for t in topics]
mock_output = CrewOutput(
raw="section content",
tasks_output=[],
token_usage=UsageMetrics(),
json_dict=None,
pydantic=None,
)
with patch.object(Crew, "kickoff", return_value=mock_output):
captured_results = crew.kickoff_for_each(inputs=inputs)
return "done"
flow = ResearchFlow()
flow.kickoff()
assert captured_results is not None
assert len(captured_results) == 2
def test_kickoff_for_each_with_crewbase_inside_flow_listener():
"""Tests that kickoff_for_each works with @CrewBase pattern inside a flow listener.
Regression test for https://github.com/crewAIInc/crewAI/issues/4555
"""
@CrewBase
class WriterCrew:
agents_config = None
tasks_config = None
agents: list[BaseAgent]
tasks: list[Task]
@agent
def writer_agent(self) -> Agent:
return Agent(
role="Writer",
goal="Write content",
backstory="Expert writer",
)
@task
def write_task(self) -> Task:
return Task(
description="Write about {topic}",
expected_output="Written content",
agent=self.writer_agent(),
)
@crew
def crew(self) -> Crew:
return Crew(
agents=self.agents,
tasks=self.tasks,
process=Process.sequential,
)
class MyState(BaseModel):
topics: list[str] = []
captured_results = None
class MyFlow(Flow[MyState]):
@start()
def research(self):
return ["Topic 1", "Topic 2", "Topic 3"]
@listen(research)
def write_sections(self, topics):
nonlocal captured_results
writer_crew = WriterCrew().crew()
inputs = [{"topic": t} for t in topics]
mock_output = CrewOutput(
raw="content",
tasks_output=[],
token_usage=UsageMetrics(),
json_dict=None,
pydantic=None,
)
with patch.object(Crew, "kickoff", return_value=mock_output):
captured_results = writer_crew.kickoff_for_each(inputs=inputs)
return "done"
flow = MyFlow()
flow.kickoff()
assert captured_results is not None
assert len(captured_results) == 3
def test_crew_copy_preserves_flow_context_in_listener():
"""Tests that crew.copy() inside a @listen method preserves flow context.
Regression test for https://github.com/crewAIInc/crewAI/issues/4555
"""
class MyState(BaseModel):
data: str = ""
captured_original = None
captured_copy = None
class MyFlow(Flow[MyState]):
@start()
def step_one(self):
return "data"
@listen(step_one)
def step_two(self, result):
nonlocal captured_original, captured_copy
agent = Agent(role="Worker", goal="Do work", backstory="A worker")
task = Task(
description="Work on {input}",
agent=agent,
expected_output="output",
)
original = Crew(
agents=[agent],
tasks=[task],
process=Process.sequential,
)
captured_original = original
captured_copy = original.copy()
return "done"
flow = MyFlow()
flow.kickoff()
assert captured_original is not None
assert captured_copy is not None
assert len(captured_copy.agents) == 1
assert len(captured_copy.tasks) == 1
assert captured_copy._flow_id == flow.flow_id
assert captured_copy._request_id == flow.flow_id
@pytest.mark.asyncio
async def test_kickoff_for_each_async_in_flow_listener():
"""Tests that kickoff_for_each_async works inside a @listen method of a typed Flow.
Regression test for https://github.com/crewAIInc/crewAI/issues/4555
"""
class MyState(BaseModel):
question: str = ""
captured_results = None
class MyFlow(Flow[MyState]):
@start()
async def step_one(self):
return ["input1", "input2"]
@listen(step_one)
async def step_two(self, items):
nonlocal captured_results
agent = Agent(role="Worker", goal="Do work", backstory="A worker")
task = Task(
description="{task_input}",
agent=agent,
expected_output="output",
)
crew = Crew(
agents=[agent],
tasks=[task],
process=Process.sequential,
)
inputs = [{"task_input": item} for item in items]
mock_output = CrewOutput(
raw="result",
tasks_output=[],
token_usage=UsageMetrics(),
json_dict=None,
pydantic=None,
)
with patch.object(Crew, "kickoff_async", return_value=mock_output):
captured_results = await crew.kickoff_for_each_async(inputs=inputs)
return "done"
flow = MyFlow()
await flow.kickoff_async()
assert captured_results is not None
assert len(captured_results) == 2
@pytest.mark.asyncio
async def test_akickoff_for_each_in_flow_listener():
"""Tests that akickoff_for_each works inside a @listen method of a typed Flow.
Regression test for https://github.com/crewAIInc/crewAI/issues/4555
"""
class MyState(BaseModel):
question: str = ""
captured_results = None
class MyFlow(Flow[MyState]):
@start()
async def step_one(self):
return ["input1", "input2"]
@listen(step_one)
async def step_two(self, items):
nonlocal captured_results
agent = Agent(role="Worker", goal="Do work", backstory="A worker")
task = Task(
description="{task_input}",
agent=agent,
expected_output="output",
)
crew = Crew(
agents=[agent],
tasks=[task],
process=Process.sequential,
)
inputs = [{"task_input": item} for item in items]
mock_output = CrewOutput(
raw="result",
tasks_output=[],
token_usage=UsageMetrics(),
json_dict=None,
pydantic=None,
)
async def mock_akickoff(**kwargs):
return mock_output
with patch.object(Crew, "akickoff", side_effect=mock_akickoff):
captured_results = await crew.akickoff_for_each(inputs=inputs)
return "done"
flow = MyFlow()
await flow.kickoff_async()
assert captured_results is not None
assert len(captured_results) == 2
def test_multiple_kickoff_for_each_in_chained_listeners():
"""Tests that kickoff_for_each works across multiple chained @listen methods.
Regression test for https://github.com/crewAIInc/crewAI/issues/4555
"""
class ChainState(BaseModel):
chapter: str = "Test Chapter"
topics: list[str] = []
sections: list[str] = []
step1_results = None
step2_results = None
class ChainedFlow(Flow[ChainState]):
@start()
def generate_topics(self):
return ["Topic A", "Topic B"]
@listen(generate_topics)
def research_topics(self, topics):
nonlocal step1_results
agent = Agent(role="Researcher", goal="Research topics", backstory="Expert")
task = Task(
description="Research {topic}",
agent=agent,
expected_output="Research output",
)
crew = Crew(
agents=[agent],
tasks=[task],
process=Process.sequential,
)
inputs = [{"topic": t} for t in topics]
mock_output = CrewOutput(
raw="research result",
tasks_output=[],
token_usage=UsageMetrics(),
json_dict=None,
pydantic=None,
)
with patch.object(Crew, "kickoff", return_value=mock_output):
step1_results = crew.kickoff_for_each(inputs=inputs)
return topics
@listen(research_topics)
def write_sections(self, topics):
nonlocal step2_results
agent = Agent(role="Writer", goal="Write sections", backstory="Expert")
task = Task(
description="Write about {topic}",
agent=agent,
expected_output="Section output",
)
crew = Crew(
agents=[agent],
tasks=[task],
process=Process.sequential,
)
inputs = [{"topic": t} for t in topics]
mock_output = CrewOutput(
raw="section content",
tasks_output=[],
token_usage=UsageMetrics(),
json_dict=None,
pydantic=None,
)
with patch.object(Crew, "kickoff", return_value=mock_output):
step2_results = crew.kickoff_for_each(inputs=inputs)
return "done"
flow = ChainedFlow()
flow.kickoff()
assert step1_results is not None
assert len(step1_results) == 2
assert step2_results is not None
assert len(step2_results) == 2