mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-04-16 16:02:36 +00:00
Compare commits
1 Commits
gl/chore/r
...
devin/1771
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b6bb56146e |
@@ -1677,6 +1677,7 @@ class Crew(FlowTrackable, BaseModel):
|
||||
"knowledge",
|
||||
"manager_agent",
|
||||
"manager_llm",
|
||||
"parent_flow",
|
||||
}
|
||||
|
||||
cloned_agents = [agent.copy() for agent in self.agents]
|
||||
|
||||
@@ -32,7 +32,7 @@ from crewai.events.types.memory_events import (
|
||||
MemorySaveFailedEvent,
|
||||
MemorySaveStartedEvent,
|
||||
)
|
||||
from crewai.flow import Flow, start
|
||||
from crewai.flow import Flow, listen, start
|
||||
from crewai.knowledge.knowledge import Knowledge
|
||||
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
|
||||
from crewai.llm import LLM
|
||||
@@ -4793,3 +4793,366 @@ def test_memory_remember_receives_task_content():
|
||||
assert "Researcher" in raw
|
||||
assert "Expected result:" in raw
|
||||
assert "Result:" in raw
|
||||
|
||||
|
||||
def test_kickoff_for_each_in_flow_listener_with_typed_state():
|
||||
"""Tests that kickoff_for_each works inside a @listen method of a Flow[TypedState].
|
||||
|
||||
Regression test for https://github.com/crewAIInc/crewAI/issues/4555
|
||||
"""
|
||||
|
||||
class ResearchState(BaseModel):
|
||||
chapter: str = "Basics of Distributed Systems"
|
||||
topics: dict[str, str] = {}
|
||||
|
||||
captured_results = None
|
||||
|
||||
class ResearchFlow(Flow[ResearchState]):
|
||||
@start()
|
||||
def generate_topics(self):
|
||||
return ["Topic A", "Topic B"]
|
||||
|
||||
@listen(generate_topics)
|
||||
def generate_sections(self, topics):
|
||||
nonlocal captured_results
|
||||
agent = Agent(role="Writer", goal="Write sections", backstory="Expert writer")
|
||||
task = Task(
|
||||
description="Write about {topic}",
|
||||
agent=agent,
|
||||
expected_output="A section",
|
||||
)
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
process=Process.sequential,
|
||||
)
|
||||
|
||||
inputs = [{"topic": t} for t in topics]
|
||||
|
||||
mock_output = CrewOutput(
|
||||
raw="section content",
|
||||
tasks_output=[],
|
||||
token_usage=UsageMetrics(),
|
||||
json_dict=None,
|
||||
pydantic=None,
|
||||
)
|
||||
with patch.object(Crew, "kickoff", return_value=mock_output):
|
||||
captured_results = crew.kickoff_for_each(inputs=inputs)
|
||||
return "done"
|
||||
|
||||
flow = ResearchFlow()
|
||||
flow.kickoff()
|
||||
|
||||
assert captured_results is not None
|
||||
assert len(captured_results) == 2
|
||||
|
||||
|
||||
def test_kickoff_for_each_with_crewbase_inside_flow_listener():
|
||||
"""Tests that kickoff_for_each works with @CrewBase pattern inside a flow listener.
|
||||
|
||||
Regression test for https://github.com/crewAIInc/crewAI/issues/4555
|
||||
"""
|
||||
|
||||
@CrewBase
|
||||
class WriterCrew:
|
||||
agents_config = None
|
||||
tasks_config = None
|
||||
|
||||
agents: list[BaseAgent]
|
||||
tasks: list[Task]
|
||||
|
||||
@agent
|
||||
def writer_agent(self) -> Agent:
|
||||
return Agent(
|
||||
role="Writer",
|
||||
goal="Write content",
|
||||
backstory="Expert writer",
|
||||
)
|
||||
|
||||
@task
|
||||
def write_task(self) -> Task:
|
||||
return Task(
|
||||
description="Write about {topic}",
|
||||
expected_output="Written content",
|
||||
agent=self.writer_agent(),
|
||||
)
|
||||
|
||||
@crew
|
||||
def crew(self) -> Crew:
|
||||
return Crew(
|
||||
agents=self.agents,
|
||||
tasks=self.tasks,
|
||||
process=Process.sequential,
|
||||
)
|
||||
|
||||
class MyState(BaseModel):
|
||||
topics: list[str] = []
|
||||
|
||||
captured_results = None
|
||||
|
||||
class MyFlow(Flow[MyState]):
|
||||
@start()
|
||||
def research(self):
|
||||
return ["Topic 1", "Topic 2", "Topic 3"]
|
||||
|
||||
@listen(research)
|
||||
def write_sections(self, topics):
|
||||
nonlocal captured_results
|
||||
writer_crew = WriterCrew().crew()
|
||||
|
||||
inputs = [{"topic": t} for t in topics]
|
||||
|
||||
mock_output = CrewOutput(
|
||||
raw="content",
|
||||
tasks_output=[],
|
||||
token_usage=UsageMetrics(),
|
||||
json_dict=None,
|
||||
pydantic=None,
|
||||
)
|
||||
with patch.object(Crew, "kickoff", return_value=mock_output):
|
||||
captured_results = writer_crew.kickoff_for_each(inputs=inputs)
|
||||
return "done"
|
||||
|
||||
flow = MyFlow()
|
||||
flow.kickoff()
|
||||
|
||||
assert captured_results is not None
|
||||
assert len(captured_results) == 3
|
||||
|
||||
|
||||
def test_crew_copy_preserves_flow_context_in_listener():
|
||||
"""Tests that crew.copy() inside a @listen method preserves flow context.
|
||||
|
||||
Regression test for https://github.com/crewAIInc/crewAI/issues/4555
|
||||
"""
|
||||
|
||||
class MyState(BaseModel):
|
||||
data: str = ""
|
||||
|
||||
captured_original = None
|
||||
captured_copy = None
|
||||
|
||||
class MyFlow(Flow[MyState]):
|
||||
@start()
|
||||
def step_one(self):
|
||||
return "data"
|
||||
|
||||
@listen(step_one)
|
||||
def step_two(self, result):
|
||||
nonlocal captured_original, captured_copy
|
||||
agent = Agent(role="Worker", goal="Do work", backstory="A worker")
|
||||
task = Task(
|
||||
description="Work on {input}",
|
||||
agent=agent,
|
||||
expected_output="output",
|
||||
)
|
||||
original = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
process=Process.sequential,
|
||||
)
|
||||
captured_original = original
|
||||
captured_copy = original.copy()
|
||||
return "done"
|
||||
|
||||
flow = MyFlow()
|
||||
flow.kickoff()
|
||||
|
||||
assert captured_original is not None
|
||||
assert captured_copy is not None
|
||||
assert len(captured_copy.agents) == 1
|
||||
assert len(captured_copy.tasks) == 1
|
||||
assert captured_copy._flow_id == flow.flow_id
|
||||
assert captured_copy._request_id == flow.flow_id
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_kickoff_for_each_async_in_flow_listener():
|
||||
"""Tests that kickoff_for_each_async works inside a @listen method of a typed Flow.
|
||||
|
||||
Regression test for https://github.com/crewAIInc/crewAI/issues/4555
|
||||
"""
|
||||
|
||||
class MyState(BaseModel):
|
||||
question: str = ""
|
||||
|
||||
captured_results = None
|
||||
|
||||
class MyFlow(Flow[MyState]):
|
||||
@start()
|
||||
async def step_one(self):
|
||||
return ["input1", "input2"]
|
||||
|
||||
@listen(step_one)
|
||||
async def step_two(self, items):
|
||||
nonlocal captured_results
|
||||
agent = Agent(role="Worker", goal="Do work", backstory="A worker")
|
||||
task = Task(
|
||||
description="{task_input}",
|
||||
agent=agent,
|
||||
expected_output="output",
|
||||
)
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
process=Process.sequential,
|
||||
)
|
||||
|
||||
inputs = [{"task_input": item} for item in items]
|
||||
|
||||
mock_output = CrewOutput(
|
||||
raw="result",
|
||||
tasks_output=[],
|
||||
token_usage=UsageMetrics(),
|
||||
json_dict=None,
|
||||
pydantic=None,
|
||||
)
|
||||
with patch.object(Crew, "kickoff_async", return_value=mock_output):
|
||||
captured_results = await crew.kickoff_for_each_async(inputs=inputs)
|
||||
return "done"
|
||||
|
||||
flow = MyFlow()
|
||||
await flow.kickoff_async()
|
||||
|
||||
assert captured_results is not None
|
||||
assert len(captured_results) == 2
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_akickoff_for_each_in_flow_listener():
|
||||
"""Tests that akickoff_for_each works inside a @listen method of a typed Flow.
|
||||
|
||||
Regression test for https://github.com/crewAIInc/crewAI/issues/4555
|
||||
"""
|
||||
|
||||
class MyState(BaseModel):
|
||||
question: str = ""
|
||||
|
||||
captured_results = None
|
||||
|
||||
class MyFlow(Flow[MyState]):
|
||||
@start()
|
||||
async def step_one(self):
|
||||
return ["input1", "input2"]
|
||||
|
||||
@listen(step_one)
|
||||
async def step_two(self, items):
|
||||
nonlocal captured_results
|
||||
agent = Agent(role="Worker", goal="Do work", backstory="A worker")
|
||||
task = Task(
|
||||
description="{task_input}",
|
||||
agent=agent,
|
||||
expected_output="output",
|
||||
)
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
process=Process.sequential,
|
||||
)
|
||||
|
||||
inputs = [{"task_input": item} for item in items]
|
||||
|
||||
mock_output = CrewOutput(
|
||||
raw="result",
|
||||
tasks_output=[],
|
||||
token_usage=UsageMetrics(),
|
||||
json_dict=None,
|
||||
pydantic=None,
|
||||
)
|
||||
|
||||
async def mock_akickoff(**kwargs):
|
||||
return mock_output
|
||||
|
||||
with patch.object(Crew, "akickoff", side_effect=mock_akickoff):
|
||||
captured_results = await crew.akickoff_for_each(inputs=inputs)
|
||||
return "done"
|
||||
|
||||
flow = MyFlow()
|
||||
await flow.kickoff_async()
|
||||
|
||||
assert captured_results is not None
|
||||
assert len(captured_results) == 2
|
||||
|
||||
|
||||
def test_multiple_kickoff_for_each_in_chained_listeners():
|
||||
"""Tests that kickoff_for_each works across multiple chained @listen methods.
|
||||
|
||||
Regression test for https://github.com/crewAIInc/crewAI/issues/4555
|
||||
"""
|
||||
|
||||
class ChainState(BaseModel):
|
||||
chapter: str = "Test Chapter"
|
||||
topics: list[str] = []
|
||||
sections: list[str] = []
|
||||
|
||||
step1_results = None
|
||||
step2_results = None
|
||||
|
||||
class ChainedFlow(Flow[ChainState]):
|
||||
@start()
|
||||
def generate_topics(self):
|
||||
return ["Topic A", "Topic B"]
|
||||
|
||||
@listen(generate_topics)
|
||||
def research_topics(self, topics):
|
||||
nonlocal step1_results
|
||||
agent = Agent(role="Researcher", goal="Research topics", backstory="Expert")
|
||||
task = Task(
|
||||
description="Research {topic}",
|
||||
agent=agent,
|
||||
expected_output="Research output",
|
||||
)
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
process=Process.sequential,
|
||||
)
|
||||
|
||||
inputs = [{"topic": t} for t in topics]
|
||||
|
||||
mock_output = CrewOutput(
|
||||
raw="research result",
|
||||
tasks_output=[],
|
||||
token_usage=UsageMetrics(),
|
||||
json_dict=None,
|
||||
pydantic=None,
|
||||
)
|
||||
with patch.object(Crew, "kickoff", return_value=mock_output):
|
||||
step1_results = crew.kickoff_for_each(inputs=inputs)
|
||||
return topics
|
||||
|
||||
@listen(research_topics)
|
||||
def write_sections(self, topics):
|
||||
nonlocal step2_results
|
||||
agent = Agent(role="Writer", goal="Write sections", backstory="Expert")
|
||||
task = Task(
|
||||
description="Write about {topic}",
|
||||
agent=agent,
|
||||
expected_output="Section output",
|
||||
)
|
||||
crew = Crew(
|
||||
agents=[agent],
|
||||
tasks=[task],
|
||||
process=Process.sequential,
|
||||
)
|
||||
|
||||
inputs = [{"topic": t} for t in topics]
|
||||
|
||||
mock_output = CrewOutput(
|
||||
raw="section content",
|
||||
tasks_output=[],
|
||||
token_usage=UsageMetrics(),
|
||||
json_dict=None,
|
||||
pydantic=None,
|
||||
)
|
||||
with patch.object(Crew, "kickoff", return_value=mock_output):
|
||||
step2_results = crew.kickoff_for_each(inputs=inputs)
|
||||
return "done"
|
||||
|
||||
flow = ChainedFlow()
|
||||
flow.kickoff()
|
||||
|
||||
assert step1_results is not None
|
||||
assert len(step1_results) == 2
|
||||
assert step2_results is not None
|
||||
assert len(step2_results) == 2
|
||||
|
||||
Reference in New Issue
Block a user