mirror of
https://github.com/crewAIInc/crewAI.git
synced 2026-01-25 08:08:14 +00:00
Compare commits
1 Commits
1.8.1
...
devin/1766
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
029eedfddb |
@@ -957,8 +957,8 @@ class Crew(FlowTrackable, BaseModel):
|
|||||||
pending_tasks.append((task, async_task, task_index))
|
pending_tasks.append((task, async_task, task_index))
|
||||||
else:
|
else:
|
||||||
if pending_tasks:
|
if pending_tasks:
|
||||||
task_outputs = await self._aprocess_async_tasks(
|
task_outputs.extend(
|
||||||
pending_tasks, was_replayed
|
await self._aprocess_async_tasks(pending_tasks, was_replayed)
|
||||||
)
|
)
|
||||||
pending_tasks.clear()
|
pending_tasks.clear()
|
||||||
|
|
||||||
@@ -973,7 +973,9 @@ class Crew(FlowTrackable, BaseModel):
|
|||||||
self._store_execution_log(task, task_output, task_index, was_replayed)
|
self._store_execution_log(task, task_output, task_index, was_replayed)
|
||||||
|
|
||||||
if pending_tasks:
|
if pending_tasks:
|
||||||
task_outputs = await self._aprocess_async_tasks(pending_tasks, was_replayed)
|
task_outputs.extend(
|
||||||
|
await self._aprocess_async_tasks(pending_tasks, was_replayed)
|
||||||
|
)
|
||||||
|
|
||||||
return self._create_crew_output(task_outputs)
|
return self._create_crew_output(task_outputs)
|
||||||
|
|
||||||
@@ -987,7 +989,9 @@ class Crew(FlowTrackable, BaseModel):
|
|||||||
) -> TaskOutput | None:
|
) -> TaskOutput | None:
|
||||||
"""Handle conditional task evaluation using native async."""
|
"""Handle conditional task evaluation using native async."""
|
||||||
if pending_tasks:
|
if pending_tasks:
|
||||||
task_outputs = await self._aprocess_async_tasks(pending_tasks, was_replayed)
|
task_outputs.extend(
|
||||||
|
await self._aprocess_async_tasks(pending_tasks, was_replayed)
|
||||||
|
)
|
||||||
pending_tasks.clear()
|
pending_tasks.clear()
|
||||||
|
|
||||||
return check_conditional_skip(
|
return check_conditional_skip(
|
||||||
@@ -1152,7 +1156,7 @@ class Crew(FlowTrackable, BaseModel):
|
|||||||
futures.append((task, future, task_index))
|
futures.append((task, future, task_index))
|
||||||
else:
|
else:
|
||||||
if futures:
|
if futures:
|
||||||
task_outputs = self._process_async_tasks(futures, was_replayed)
|
task_outputs.extend(self._process_async_tasks(futures, was_replayed))
|
||||||
futures.clear()
|
futures.clear()
|
||||||
|
|
||||||
context = self._get_context(task, task_outputs)
|
context = self._get_context(task, task_outputs)
|
||||||
@@ -1166,7 +1170,7 @@ class Crew(FlowTrackable, BaseModel):
|
|||||||
self._store_execution_log(task, task_output, task_index, was_replayed)
|
self._store_execution_log(task, task_output, task_index, was_replayed)
|
||||||
|
|
||||||
if futures:
|
if futures:
|
||||||
task_outputs = self._process_async_tasks(futures, was_replayed)
|
task_outputs.extend(self._process_async_tasks(futures, was_replayed))
|
||||||
|
|
||||||
return self._create_crew_output(task_outputs)
|
return self._create_crew_output(task_outputs)
|
||||||
|
|
||||||
@@ -1179,7 +1183,7 @@ class Crew(FlowTrackable, BaseModel):
|
|||||||
was_replayed: bool,
|
was_replayed: bool,
|
||||||
) -> TaskOutput | None:
|
) -> TaskOutput | None:
|
||||||
if futures:
|
if futures:
|
||||||
task_outputs = self._process_async_tasks(futures, was_replayed)
|
task_outputs.extend(self._process_async_tasks(futures, was_replayed))
|
||||||
futures.clear()
|
futures.clear()
|
||||||
|
|
||||||
return check_conditional_skip(
|
return check_conditional_skip(
|
||||||
|
|||||||
@@ -382,3 +382,170 @@ class TestAsyncProcessAsyncTasks:
|
|||||||
"""Test processing empty list of async tasks."""
|
"""Test processing empty list of async tasks."""
|
||||||
result = await test_crew._aprocess_async_tasks([])
|
result = await test_crew._aprocess_async_tasks([])
|
||||||
assert result == []
|
assert result == []
|
||||||
|
|
||||||
|
|
||||||
|
class TestMixedSyncAsyncTaskOutputs:
|
||||||
|
"""Tests for issue #4137: Task outputs lost when mixing sync and async tasks.
|
||||||
|
|
||||||
|
These tests verify that when a Crew executes a mix of synchronous and
|
||||||
|
asynchronous tasks, all task outputs are preserved correctly.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock)
|
||||||
|
async def test_async_sync_task_before_async_task_outputs_preserved(
|
||||||
|
self, mock_execute: AsyncMock, test_agent: Agent
|
||||||
|
) -> None:
|
||||||
|
"""Test that sync task outputs before async tasks are preserved.
|
||||||
|
|
||||||
|
Scenario: sync -> async -> sync
|
||||||
|
Expected: All 3 task outputs should be in the result.
|
||||||
|
"""
|
||||||
|
task1 = Task(
|
||||||
|
description="Sync task 1",
|
||||||
|
expected_output="Output 1",
|
||||||
|
agent=test_agent,
|
||||||
|
async_execution=False,
|
||||||
|
)
|
||||||
|
task2 = Task(
|
||||||
|
description="Async task 2",
|
||||||
|
expected_output="Output 2",
|
||||||
|
agent=test_agent,
|
||||||
|
async_execution=True,
|
||||||
|
)
|
||||||
|
task3 = Task(
|
||||||
|
description="Sync task 3",
|
||||||
|
expected_output="Output 3",
|
||||||
|
agent=test_agent,
|
||||||
|
async_execution=False,
|
||||||
|
)
|
||||||
|
crew = Crew(
|
||||||
|
agents=[test_agent],
|
||||||
|
tasks=[task1, task2, task3],
|
||||||
|
verbose=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_output1 = TaskOutput(
|
||||||
|
description="Sync task 1",
|
||||||
|
raw="Result 1",
|
||||||
|
agent="Test Agent",
|
||||||
|
)
|
||||||
|
mock_output2 = TaskOutput(
|
||||||
|
description="Async task 2",
|
||||||
|
raw="Result 2",
|
||||||
|
agent="Test Agent",
|
||||||
|
)
|
||||||
|
mock_output3 = TaskOutput(
|
||||||
|
description="Sync task 3",
|
||||||
|
raw="Result 3",
|
||||||
|
agent="Test Agent",
|
||||||
|
)
|
||||||
|
mock_execute.side_effect = [mock_output1, mock_output2, mock_output3]
|
||||||
|
|
||||||
|
result = await crew._aexecute_tasks(crew.tasks)
|
||||||
|
|
||||||
|
assert result is not None
|
||||||
|
assert len(result.tasks_output) == 3
|
||||||
|
assert result.tasks_output[0].raw == "Result 1"
|
||||||
|
assert result.tasks_output[1].raw == "Result 2"
|
||||||
|
assert result.tasks_output[2].raw == "Result 3"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock)
|
||||||
|
async def test_async_crew_ending_with_async_task_preserves_outputs(
|
||||||
|
self, mock_execute: AsyncMock, test_agent: Agent
|
||||||
|
) -> None:
|
||||||
|
"""Test that outputs are preserved when crew ends with async task.
|
||||||
|
|
||||||
|
Scenario: sync -> async (final)
|
||||||
|
Expected: Both task outputs should be in the result.
|
||||||
|
"""
|
||||||
|
task1 = Task(
|
||||||
|
description="Sync task 1",
|
||||||
|
expected_output="Output 1",
|
||||||
|
agent=test_agent,
|
||||||
|
async_execution=False,
|
||||||
|
)
|
||||||
|
task2 = Task(
|
||||||
|
description="Async task 2",
|
||||||
|
expected_output="Output 2",
|
||||||
|
agent=test_agent,
|
||||||
|
async_execution=True,
|
||||||
|
)
|
||||||
|
crew = Crew(
|
||||||
|
agents=[test_agent],
|
||||||
|
tasks=[task1, task2],
|
||||||
|
verbose=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_output1 = TaskOutput(
|
||||||
|
description="Sync task 1",
|
||||||
|
raw="Result 1",
|
||||||
|
agent="Test Agent",
|
||||||
|
)
|
||||||
|
mock_output2 = TaskOutput(
|
||||||
|
description="Async task 2",
|
||||||
|
raw="Result 2",
|
||||||
|
agent="Test Agent",
|
||||||
|
)
|
||||||
|
mock_execute.side_effect = [mock_output1, mock_output2]
|
||||||
|
|
||||||
|
result = await crew._aexecute_tasks(crew.tasks)
|
||||||
|
|
||||||
|
assert result is not None
|
||||||
|
assert len(result.tasks_output) == 2
|
||||||
|
assert result.tasks_output[0].raw == "Result 1"
|
||||||
|
assert result.tasks_output[1].raw == "Result 2"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@patch("crewai.task.Task.aexecute_sync", new_callable=AsyncMock)
|
||||||
|
async def test_async_multiple_sync_before_async_all_preserved(
|
||||||
|
self, mock_execute: AsyncMock, test_agent: Agent
|
||||||
|
) -> None:
|
||||||
|
"""Test that multiple sync task outputs before async are preserved.
|
||||||
|
|
||||||
|
Scenario: sync -> sync -> async -> sync
|
||||||
|
Expected: All 4 task outputs should be in the result.
|
||||||
|
"""
|
||||||
|
task1 = Task(
|
||||||
|
description="Sync task 1",
|
||||||
|
expected_output="Output 1",
|
||||||
|
agent=test_agent,
|
||||||
|
async_execution=False,
|
||||||
|
)
|
||||||
|
task2 = Task(
|
||||||
|
description="Sync task 2",
|
||||||
|
expected_output="Output 2",
|
||||||
|
agent=test_agent,
|
||||||
|
async_execution=False,
|
||||||
|
)
|
||||||
|
task3 = Task(
|
||||||
|
description="Async task 3",
|
||||||
|
expected_output="Output 3",
|
||||||
|
agent=test_agent,
|
||||||
|
async_execution=True,
|
||||||
|
)
|
||||||
|
task4 = Task(
|
||||||
|
description="Sync task 4",
|
||||||
|
expected_output="Output 4",
|
||||||
|
agent=test_agent,
|
||||||
|
async_execution=False,
|
||||||
|
)
|
||||||
|
crew = Crew(
|
||||||
|
agents=[test_agent],
|
||||||
|
tasks=[task1, task2, task3, task4],
|
||||||
|
verbose=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_outputs = [
|
||||||
|
TaskOutput(description=f"Task {i}", raw=f"Result {i}", agent="Test Agent")
|
||||||
|
for i in range(1, 5)
|
||||||
|
]
|
||||||
|
mock_execute.side_effect = mock_outputs
|
||||||
|
|
||||||
|
result = await crew._aexecute_tasks(crew.tasks)
|
||||||
|
|
||||||
|
assert result is not None
|
||||||
|
assert len(result.tasks_output) == 4
|
||||||
|
for i in range(4):
|
||||||
|
assert result.tasks_output[i].raw == f"Result {i + 1}"
|
||||||
|
|||||||
@@ -1251,6 +1251,200 @@ async def test_async_task_execution_call_count(researcher, writer):
|
|||||||
assert mock_execute_sync.call_count == 1
|
assert mock_execute_sync.call_count == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_sync_task_outputs_preserved_when_mixing_sync_async_tasks():
|
||||||
|
"""Test for issue #4137: Task outputs lost when mixing sync and async tasks.
|
||||||
|
|
||||||
|
Scenario: sync -> async -> sync
|
||||||
|
Expected: All 3 task outputs should be in the result.
|
||||||
|
"""
|
||||||
|
researcher_agent = Agent(
|
||||||
|
role="Researcher",
|
||||||
|
goal="Research topics",
|
||||||
|
backstory="Expert researcher",
|
||||||
|
allow_delegation=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
task1 = Task(
|
||||||
|
description="Sync task 1",
|
||||||
|
expected_output="Output 1",
|
||||||
|
agent=researcher_agent,
|
||||||
|
async_execution=False,
|
||||||
|
)
|
||||||
|
task2 = Task(
|
||||||
|
description="Async task 2",
|
||||||
|
expected_output="Output 2",
|
||||||
|
agent=researcher_agent,
|
||||||
|
async_execution=True,
|
||||||
|
)
|
||||||
|
task3 = Task(
|
||||||
|
description="Sync task 3",
|
||||||
|
expected_output="Output 3",
|
||||||
|
agent=researcher_agent,
|
||||||
|
async_execution=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
crew = Crew(
|
||||||
|
agents=[researcher_agent],
|
||||||
|
tasks=[task1, task2, task3],
|
||||||
|
verbose=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_output1 = TaskOutput(
|
||||||
|
description="Sync task 1",
|
||||||
|
raw="Result 1",
|
||||||
|
agent="Researcher",
|
||||||
|
)
|
||||||
|
mock_output2 = TaskOutput(
|
||||||
|
description="Async task 2",
|
||||||
|
raw="Result 2",
|
||||||
|
agent="Researcher",
|
||||||
|
)
|
||||||
|
mock_output3 = TaskOutput(
|
||||||
|
description="Sync task 3",
|
||||||
|
raw="Result 3",
|
||||||
|
agent="Researcher",
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_future = MagicMock(spec=Future)
|
||||||
|
mock_future.result.return_value = mock_output2
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch.object(Task, "execute_sync", side_effect=[mock_output1, mock_output3]),
|
||||||
|
patch.object(Task, "execute_async", return_value=mock_future),
|
||||||
|
):
|
||||||
|
result = crew.kickoff()
|
||||||
|
|
||||||
|
assert result is not None
|
||||||
|
assert len(result.tasks_output) == 3
|
||||||
|
assert result.tasks_output[0].raw == "Result 1"
|
||||||
|
assert result.tasks_output[1].raw == "Result 2"
|
||||||
|
assert result.tasks_output[2].raw == "Result 3"
|
||||||
|
|
||||||
|
|
||||||
|
def test_sync_task_outputs_preserved_when_crew_ends_with_async_task():
|
||||||
|
"""Test for issue #4137: Task outputs preserved when crew ends with async task.
|
||||||
|
|
||||||
|
Scenario: sync -> async (final)
|
||||||
|
Expected: Both task outputs should be in the result.
|
||||||
|
"""
|
||||||
|
researcher_agent = Agent(
|
||||||
|
role="Researcher",
|
||||||
|
goal="Research topics",
|
||||||
|
backstory="Expert researcher",
|
||||||
|
allow_delegation=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
task1 = Task(
|
||||||
|
description="Sync task 1",
|
||||||
|
expected_output="Output 1",
|
||||||
|
agent=researcher_agent,
|
||||||
|
async_execution=False,
|
||||||
|
)
|
||||||
|
task2 = Task(
|
||||||
|
description="Async task 2",
|
||||||
|
expected_output="Output 2",
|
||||||
|
agent=researcher_agent,
|
||||||
|
async_execution=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
crew = Crew(
|
||||||
|
agents=[researcher_agent],
|
||||||
|
tasks=[task1, task2],
|
||||||
|
verbose=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_output1 = TaskOutput(
|
||||||
|
description="Sync task 1",
|
||||||
|
raw="Result 1",
|
||||||
|
agent="Researcher",
|
||||||
|
)
|
||||||
|
mock_output2 = TaskOutput(
|
||||||
|
description="Async task 2",
|
||||||
|
raw="Result 2",
|
||||||
|
agent="Researcher",
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_future = MagicMock(spec=Future)
|
||||||
|
mock_future.result.return_value = mock_output2
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch.object(Task, "execute_sync", return_value=mock_output1),
|
||||||
|
patch.object(Task, "execute_async", return_value=mock_future),
|
||||||
|
):
|
||||||
|
result = crew.kickoff()
|
||||||
|
|
||||||
|
assert result is not None
|
||||||
|
assert len(result.tasks_output) == 2
|
||||||
|
assert result.tasks_output[0].raw == "Result 1"
|
||||||
|
assert result.tasks_output[1].raw == "Result 2"
|
||||||
|
|
||||||
|
|
||||||
|
def test_sync_multiple_sync_tasks_before_async_all_preserved():
|
||||||
|
"""Test for issue #4137: Multiple sync task outputs before async are preserved.
|
||||||
|
|
||||||
|
Scenario: sync -> sync -> async -> sync
|
||||||
|
Expected: All 4 task outputs should be in the result.
|
||||||
|
"""
|
||||||
|
researcher_agent = Agent(
|
||||||
|
role="Researcher",
|
||||||
|
goal="Research topics",
|
||||||
|
backstory="Expert researcher",
|
||||||
|
allow_delegation=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
task1 = Task(
|
||||||
|
description="Sync task 1",
|
||||||
|
expected_output="Output 1",
|
||||||
|
agent=researcher_agent,
|
||||||
|
async_execution=False,
|
||||||
|
)
|
||||||
|
task2 = Task(
|
||||||
|
description="Sync task 2",
|
||||||
|
expected_output="Output 2",
|
||||||
|
agent=researcher_agent,
|
||||||
|
async_execution=False,
|
||||||
|
)
|
||||||
|
task3 = Task(
|
||||||
|
description="Async task 3",
|
||||||
|
expected_output="Output 3",
|
||||||
|
agent=researcher_agent,
|
||||||
|
async_execution=True,
|
||||||
|
)
|
||||||
|
task4 = Task(
|
||||||
|
description="Sync task 4",
|
||||||
|
expected_output="Output 4",
|
||||||
|
agent=researcher_agent,
|
||||||
|
async_execution=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
crew = Crew(
|
||||||
|
agents=[researcher_agent],
|
||||||
|
tasks=[task1, task2, task3, task4],
|
||||||
|
verbose=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_outputs = [
|
||||||
|
TaskOutput(description=f"Task {i}", raw=f"Result {i}", agent="Researcher")
|
||||||
|
for i in range(1, 5)
|
||||||
|
]
|
||||||
|
|
||||||
|
mock_future = MagicMock(spec=Future)
|
||||||
|
mock_future.result.return_value = mock_outputs[2]
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch.object(
|
||||||
|
Task, "execute_sync", side_effect=[mock_outputs[0], mock_outputs[1], mock_outputs[3]]
|
||||||
|
),
|
||||||
|
patch.object(Task, "execute_async", return_value=mock_future),
|
||||||
|
):
|
||||||
|
result = crew.kickoff()
|
||||||
|
|
||||||
|
assert result is not None
|
||||||
|
assert len(result.tasks_output) == 4
|
||||||
|
for i in range(4):
|
||||||
|
assert result.tasks_output[i].raw == f"Result {i + 1}"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.vcr()
|
@pytest.mark.vcr()
|
||||||
def test_kickoff_for_each_single_input():
|
def test_kickoff_for_each_single_input():
|
||||||
"""Tests if kickoff_for_each works with a single input."""
|
"""Tests if kickoff_for_each works with a single input."""
|
||||||
|
|||||||
Reference in New Issue
Block a user